transport

package
v0.0.0-...-a582c34 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 16, 2021 License: Apache-2.0 Imports: 33 Imported by: 0

Documentation

Overview

Package transport implements the transport component used for exchanging Raft messages between NodeHosts.

This package is internally used by Dragonboat, applications are not expected to import this package.

Index

Constants

View Source
const (
	// TCPTransportName is the name of the tcp transport module.
	TCPTransportName = "go-tcp-transport"
)

Variables

View Source
var (
	// ErrStopped is the error returned to indicate that the connection has
	// already been stopped.
	ErrStopped = errors.New("connection stopped")
	// ErrStreamSnapshot is the error returned to indicate that snapshot
	// streaming failed.
	ErrStreamSnapshot = errors.New("stream snapshot failed")
)
View Source
var (
	// NOOPRaftName is the module name for the NOOP transport module.
	NOOPRaftName = "noop-test-transport"
	// ErrRequestedToFail is the error used to indicate that the error is
	// requested.
	ErrRequestedToFail = errors.New("requested to returned error")
)
View Source
var (
	// ErrBadMessage is the error returned to indicate the incoming message is
	// corrupted.
	ErrBadMessage = errors.New("invalid message")
)
View Source
var (
	// ErrSnapshotOutOfDate is returned when the snapshot being received is
	// considered as out of date.
	ErrSnapshotOutOfDate = errors.New("snapshot is out of date")
)
View Source
var (
	// ErrUnknownTarget is the error returned when the target address of the node
	// is unknown.
	ErrUnknownTarget = errors.New("target address unknown")
)

Functions

func NewNOOPTransport

func NewNOOPTransport(nhConfig config.NodeHostConfig,
	requestHandler raftio.MessageHandler,
	chunkHandler raftio.ChunkHandler) raftio.ITransport

NewNOOPTransport creates a new NOOPTransport instance.

func NewTCPTransport

func NewTCPTransport(nhConfig config.NodeHostConfig,
	requestHandler raftio.MessageHandler,
	chunkHandler raftio.ChunkHandler) raftio.ITransport

NewTCPTransport creates and returns a new TCP transport module.

Types

type Chunk

type Chunk struct {
	// contains filtered or unexported fields
}

Chunk managed on the receiving side

func NewChunk

func NewChunk(onReceive func(pb.MessageBatch),
	confirm func(uint64, uint64, uint64), dir server.SnapshotDirFunc,
	did uint64, fs vfs.IFS) *Chunk

NewChunk creates and returns a new snapshot chunks instance.

func (*Chunk) Add

func (c *Chunk) Add(chunk pb.Chunk) bool

Add adds a received trunk to chunks.

func (*Chunk) Close

func (c *Chunk) Close()

Close closes the chunks instance.

func (*Chunk) Tick

func (c *Chunk) Tick()

Tick moves the internal logical clock forward.

type DefaultTransportFactory

type DefaultTransportFactory struct{}

DefaultTransportFactory is the default transport module used.

func (*DefaultTransportFactory) Create

Create creates a default transport instance.

func (*DefaultTransportFactory) Validate

func (dtm *DefaultTransportFactory) Validate(addr string) bool

Validate returns a boolean value indicating whether the specified address is valid.

type IMessageHandler

type IMessageHandler interface {
	HandleMessageBatch(batch pb.MessageBatch) (uint64, uint64)
	HandleUnreachable(clusterID uint64, nodeID uint64)
	HandleSnapshotStatus(clusterID uint64, nodeID uint64, rejected bool)
	HandleSnapshot(clusterID uint64, nodeID uint64, from uint64)
}

IMessageHandler is the interface required to handle incoming raft requests.

type INodeRegistry

type INodeRegistry interface {
	Close() error
	Add(clusterID uint64, nodeID uint64, url string)
	Remove(clusterID uint64, nodeID uint64)
	RemoveCluster(clusterID uint64)
	Resolve(clusterID uint64, nodeID uint64) (string, string, error)
}

INodeRegistry is the local registry interface used to keep all known nodes in the system..

func NewNodeHostIDRegistry

func NewNodeHostIDRegistry(nhid string,
	nhConfig config.NodeHostConfig, streamConnections uint64,
	v config.TargetValidator) (INodeRegistry, error)

NewNodeHostIDRegistry creates a new NodeHostIDRegistry instance.

type IResolver

type IResolver interface {
	Resolve(uint64, uint64) (string, string, error)
	Add(uint64, uint64, string)
}

IResolver converts the (cluster id, node id( tuple to network address.

type ITransport

type ITransport interface {
	Name() string
	Send(pb.Message) bool
	SendSnapshot(pb.Message) bool
	GetStreamSink(clusterID uint64, nodeID uint64) *Sink
	Close() error
}

ITransport is the interface of the transport layer used for exchanging Raft messages.

type ITransportEvent

type ITransportEvent interface {
	ConnectionEstablished(string, bool)
	ConnectionFailed(string, bool)
}

ITransportEvent is the interface for notifying connection status changes.

type NOOPConnection

type NOOPConnection struct {
	// contains filtered or unexported fields
}

NOOPConnection is the connection used to exchange messages between node hosts.

func (*NOOPConnection) Close

func (c *NOOPConnection) Close()

Close closes the NOOPConnection instance.

func (*NOOPConnection) SendMessageBatch

func (c *NOOPConnection) SendMessageBatch(batch raftpb.MessageBatch) error

SendMessageBatch return ErrRequestedToFail when requested.

type NOOPSnapshotConnection

type NOOPSnapshotConnection struct {
	// contains filtered or unexported fields
}

NOOPSnapshotConnection is the connection used to send snapshots.

func (*NOOPSnapshotConnection) Close

func (c *NOOPSnapshotConnection) Close()

Close closes the NOOPSnapshotConnection.

func (*NOOPSnapshotConnection) SendChunk

func (c *NOOPSnapshotConnection) SendChunk(chunk raftpb.Chunk) error

SendChunk returns ErrRequestedToFail when requested.

type NOOPTransport

type NOOPTransport struct {
	// contains filtered or unexported fields
}

NOOPTransport is a transport module for testing purposes. It does not actually has the ability to exchange messages or snapshots between nodehosts.

func (*NOOPTransport) Close

func (g *NOOPTransport) Close() error

Close closes the NOOPTransport instance.

func (*NOOPTransport) GetConnection

func (g *NOOPTransport) GetConnection(ctx context.Context,
	target string) (raftio.IConnection, error)

GetConnection returns a connection.

func (*NOOPTransport) GetSnapshotConnection

func (g *NOOPTransport) GetSnapshotConnection(ctx context.Context,
	target string) (raftio.ISnapshotConnection, error)

GetSnapshotConnection returns a snapshot connection.

func (*NOOPTransport) Name

func (g *NOOPTransport) Name() string

Name returns the module name.

func (*NOOPTransport) Start

func (g *NOOPTransport) Start() error

Start starts the NOOPTransport instance.

type NOOPTransportFactory

type NOOPTransportFactory struct{}

NOOPTransportFactory is a NOOP transport module used in testing

func (*NOOPTransportFactory) Create

Create creates a noop transport instance.

func (*NOOPTransportFactory) Validate

func (n *NOOPTransportFactory) Validate(addr string) bool

Validate returns a boolean value indicating whether the input address is valid.

type NodeHostIDRegistry

type NodeHostIDRegistry struct {
	// contains filtered or unexported fields
}

NodeHostIDRegistry is a node registry backed by gossip. It is capable of supporting NodeHosts with dynamic RaftAddress values.

func (*NodeHostIDRegistry) Add

func (n *NodeHostIDRegistry) Add(clusterID uint64,
	nodeID uint64, target string)

Add adds a new node with its known NodeHostID to the registry.

func (*NodeHostIDRegistry) AdvertiseAddress

func (n *NodeHostIDRegistry) AdvertiseAddress() string

AdvertiseAddress returns the advertise address of the gossip service.

func (*NodeHostIDRegistry) Close

func (n *NodeHostIDRegistry) Close() error

Close closes the NodeHostIDRegistry instance.

func (*NodeHostIDRegistry) NumMembers

func (n *NodeHostIDRegistry) NumMembers() int

NumMembers returns the number of live nodes known by the gossip service.

func (*NodeHostIDRegistry) Remove

func (n *NodeHostIDRegistry) Remove(clusterID uint64, nodeID uint64)

Remove removes the specified node from the registry.

func (*NodeHostIDRegistry) RemoveCluster

func (n *NodeHostIDRegistry) RemoveCluster(clusterID uint64)

RemoveCluster removes the specified node from the registry.

func (*NodeHostIDRegistry) Resolve

func (n *NodeHostIDRegistry) Resolve(clusterID uint64,
	nodeID uint64) (string, string, error)

Resolve returns the current RaftAddress and connection key of the specified node. It returns ErrUnknownTarget when the RaftAddress is unknown.

type Registry

type Registry struct {
	// contains filtered or unexported fields
}

Registry is used to manage all known node addresses in the multi raft system. The transport layer uses this address registry to locate nodes.

func NewNodeRegistry

func NewNodeRegistry(streamConnections uint64, v config.TargetValidator) *Registry

NewNodeRegistry returns a new Registry object.

func (*Registry) Add

func (n *Registry) Add(clusterID uint64, nodeID uint64, target string)

Add adds the specified node and its target info to the registry.

func (*Registry) Close

func (n *Registry) Close() error

Close closes the node registry.

func (*Registry) Remove

func (n *Registry) Remove(clusterID uint64, nodeID uint64)

Remove removes a remote from the node registry.

func (*Registry) RemoveCluster

func (n *Registry) RemoveCluster(clusterID uint64)

RemoveCluster removes all nodes info associated with the specified cluster

func (*Registry) Resolve

func (n *Registry) Resolve(clusterID uint64, nodeID uint64) (string, string, error)

Resolve looks up the Addr of the specified node.

type SendMessageBatchFunc

type SendMessageBatchFunc func(pb.MessageBatch) (pb.MessageBatch, bool)

SendMessageBatchFunc is a func type that is used to determine whether the specified message batch should be sent. This func is used in test only.

type Sink

type Sink struct {
	// contains filtered or unexported fields
}

Sink is the chunk sink for receiving generated snapshot chunk.

func (*Sink) Close

func (s *Sink) Close() error

Close closes the sink processing.

func (*Sink) ClusterID

func (s *Sink) ClusterID() uint64

ClusterID returns the cluster ID of the source node.

func (*Sink) Receive

func (s *Sink) Receive(chunk pb.Chunk) (bool, bool)

Receive receives a snapshot chunk.

func (*Sink) ToNodeID

func (s *Sink) ToNodeID() uint64

ToNodeID returns the node ID of the node intended to get and handle the received snapshot chunk.

type StreamChunkSendFunc

type StreamChunkSendFunc func(pb.Chunk) (pb.Chunk, bool)

StreamChunkSendFunc is a func type that is used to determine whether a snapshot chunk should indeed be sent. This func is used in test only.

type TCP

type TCP struct {
	// contains filtered or unexported fields
}

TCP is a TCP based transport module for exchanging raft messages and snapshots between NodeHost instances.

func (*TCP) Close

func (t *TCP) Close() error

Close closes the TCP transport module.

func (*TCP) GetConnection

func (t *TCP) GetConnection(ctx context.Context,
	target string) (raftio.IConnection, error)

GetConnection returns a new raftio.IConnection for sending raft messages.

func (*TCP) GetSnapshotConnection

func (t *TCP) GetSnapshotConnection(ctx context.Context,
	target string) (raftio.ISnapshotConnection, error)

GetSnapshotConnection returns a new raftio.IConnection for sending raft snapshots.

func (*TCP) Name

func (t *TCP) Name() string

Name returns a human readable name of the TCP transport module.

func (*TCP) Start

func (t *TCP) Start() error

Start starts the TCP transport module.

type TCPConnection

type TCPConnection struct {
	// contains filtered or unexported fields
}

TCPConnection is the connection used for sending raft messages to remote nodes.

func NewTCPConnection

func NewTCPConnection(conn net.Conn,
	rb *ratelimit.Bucket, wb *ratelimit.Bucket, encrypted bool) *TCPConnection

NewTCPConnection creates and returns a new TCPConnection instance.

func (*TCPConnection) Close

func (c *TCPConnection) Close()

Close closes the TCPConnection instance.

func (*TCPConnection) SendMessageBatch

func (c *TCPConnection) SendMessageBatch(batch pb.MessageBatch) error

SendMessageBatch sends a raft message batch to remote node.

type TCPSnapshotConnection

type TCPSnapshotConnection struct {
	// contains filtered or unexported fields
}

TCPSnapshotConnection is the connection for sending raft snapshot chunks to remote nodes.

func NewTCPSnapshotConnection

func NewTCPSnapshotConnection(conn net.Conn,
	rb *ratelimit.Bucket, wb *ratelimit.Bucket,
	encrypted bool) *TCPSnapshotConnection

NewTCPSnapshotConnection creates and returns a new snapshot connection.

func (*TCPSnapshotConnection) Close

func (c *TCPSnapshotConnection) Close()

Close closes the snapshot connection.

func (*TCPSnapshotConnection) SendChunk

func (c *TCPSnapshotConnection) SendChunk(chunk pb.Chunk) error

SendChunk sends the specified snapshot chunk to remote node.

type Transport

type Transport struct {
	// contains filtered or unexported fields
}

Transport is the transport layer for delivering raft messages and snapshots.

func NewTransport

func NewTransport(nhConfig config.NodeHostConfig,
	handler IMessageHandler, env *server.Env, resolver IResolver,
	dir server.SnapshotDirFunc, sysEvents ITransportEvent,
	fs vfs.IFS) (*Transport, error)

NewTransport creates a new Transport object.

func (*Transport) Close

func (t *Transport) Close() error

Close closes the Transport object.

func (*Transport) GetCircuitBreaker

func (t *Transport) GetCircuitBreaker(key string) *circuit.Breaker

GetCircuitBreaker returns the circuit breaker used for the specified target node.

func (*Transport) GetStreamSink

func (t *Transport) GetStreamSink(clusterID uint64, nodeID uint64) *Sink

GetStreamSink returns a connection used for streaming snapshot.

func (*Transport) GetTrans

func (t *Transport) GetTrans() raftio.ITransport

GetTrans returns the transport instance.

func (*Transport) Name

func (t *Transport) Name() string

Name returns the type name of the transport module

func (*Transport) Send

func (t *Transport) Send(req pb.Message) bool

Send asynchronously sends raft messages to their target nodes.

The generic async send Go pattern used in Send() is found in CockroachDB's codebase.

func (*Transport) SendSnapshot

func (t *Transport) SendSnapshot(m pb.Message) bool

SendSnapshot asynchronously sends raft snapshot message to its target.

func (*Transport) SetPreSendBatchHook

func (t *Transport) SetPreSendBatchHook(h SendMessageBatchFunc)

SetPreSendBatchHook set the SendMessageBatch hook. This function is only expected to be used in monkey testing.

func (*Transport) SetPreStreamChunkSendHook

func (t *Transport) SetPreStreamChunkSendHook(h StreamChunkSendFunc)

SetPreStreamChunkSendHook sets the StreamChunkSend hook function that will be called before each snapshot chunk is sent.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL