transport

package
v3.2.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 10, 2020 License: Apache-2.0 Imports: 31 Imported by: 0

Documentation

Overview

Package transport implements the transport component used for exchanging Raft messages between NodeHosts.

This package is internally used by Dragonboat, applications are not expected to import this package.

Index

Constants

View Source
const (
	// TCPRaftRPCName is the name of the tcp RPC module.
	TCPRaftRPCName = "go-tcp-transport"
)
View Source
const (

	// UnmanagedDeploymentID is the special DeploymentID used when the system is
	// not managed by master servers.
	UnmanagedDeploymentID = uint64(1)
)

Variables

View Source
var (
	// ErrStopped is the error returned to indicate that the connection has
	// already been stopped.
	ErrStopped = errors.New("connection stopped")
	// ErrStreamSnapshot is the error returned to indicate that snapshot
	// streaming failed.
	ErrStreamSnapshot = errors.New("stream snapshot failed")
)
View Source
var (
	// NOOPRaftName is the module name for the NOOP transport module.
	NOOPRaftName = "noop-test-transport"
	// ErrRequestedToFail is the error used to indicate that the error is
	// requested.
	ErrRequestedToFail = errors.New("requested to returned error")
)
View Source
var (
	// ErrBadMessage is the error returned to indicate the incoming message is
	// corrupted.
	ErrBadMessage = errors.New("invalid message")
)
View Source
var (
	// ErrSnapshotOutOfDate is returned when the snapshot being received is
	// considered as out of date.
	ErrSnapshotOutOfDate = errors.New("snapshot is out of date")
)

Functions

func NewNOOPTransport

func NewNOOPTransport(nhConfig config.NodeHostConfig,
	requestHandler raftio.RequestHandler,
	chunkHandler raftio.IChunkHandler) raftio.IRaftRPC

NewNOOPTransport creates a new NOOPTransport instance.

func NewTCPTransport

func NewTCPTransport(nhConfig config.NodeHostConfig,
	requestHandler raftio.RequestHandler,
	chunkHandler raftio.IChunkHandler) raftio.IRaftRPC

NewTCPTransport creates and returns a new TCP transport module.

Types

type ChunkFile

type ChunkFile struct {
	// contains filtered or unexported fields
}

ChunkFile is the snapshot chunk file being transferred.

func CreateChunkFile

func CreateChunkFile(fp string, fs vfs.IFS) (*ChunkFile, error)

CreateChunkFile creates a new chunk file.

func OpenChunkFileForAppend

func OpenChunkFileForAppend(fp string, fs vfs.IFS) (*ChunkFile, error)

OpenChunkFileForAppend opens the chunk file at fp for appending.

func OpenChunkFileForRead

func OpenChunkFileForRead(fp string, fs vfs.IFS) (*ChunkFile, error)

OpenChunkFileForRead opens for the chunk file for read-only operation.

func (*ChunkFile) Close

func (cf *ChunkFile) Close() error

Close closes the chunk file.

func (*ChunkFile) Read

func (cf *ChunkFile) Read(data []byte) (int, error)

Read reads from the file.

func (*ChunkFile) ReadAt

func (cf *ChunkFile) ReadAt(data []byte, offset int64) (int, error)

ReadAt reads from the file.

func (*ChunkFile) Sync

func (cf *ChunkFile) Sync() error

Sync syncs the chunk file.

func (*ChunkFile) Write

func (cf *ChunkFile) Write(data []byte) (int, error)

Write writes the specified data to the chunk file.

type Chunks

type Chunks struct {
	// contains filtered or unexported fields
}

Chunks managed on the receiving side

func NewChunks

func NewChunks(onReceive func(pb.MessageBatch),
	confirm func(uint64, uint64, uint64), getDeploymentID func() uint64,
	folder server.GetSnapshotDirFunc, fs vfs.IFS) *Chunks

NewChunks creates and returns a new snapshot chunks instance.

func (*Chunks) AddChunk

func (c *Chunks) AddChunk(chunk pb.Chunk) bool

AddChunk adds an received trunk to chunks.

func (*Chunks) Close

func (c *Chunks) Close()

Close closes the chunks instance.

func (*Chunks) Tick

func (c *Chunks) Tick()

Tick moves the internal logical clock forward.

type DeploymentID

type DeploymentID struct {
	// contains filtered or unexported fields
}

DeploymentID struct is the manager type used to manage the deployment id value.

func (*DeploymentID) SetDeploymentID

func (d *DeploymentID) SetDeploymentID(x uint64)

SetDeploymentID sets the deployment id to the specified value.

func (*DeploymentID) SetUnmanagedDeploymentID

func (d *DeploymentID) SetUnmanagedDeploymentID()

SetUnmanagedDeploymentID sets the deployment id to indicate that the user is not managed.

type INodeAddressResolver

type INodeAddressResolver interface {
	Resolve(uint64, uint64) (string, string, error)
	ReverseResolve(string) []raftio.NodeInfo
	AddRemoteAddress(uint64, uint64, string)
}

INodeAddressResolver converts the (cluster id, node id( tuple to network address

type INodeRegistry

type INodeRegistry interface {
	AddNode(clusterID uint64, nodeID uint64, url string)
	RemoveNode(clusterID uint64, nodeID uint64)
	RemoveCluster(clusterID uint64)
	Resolve(clusterID uint64, nodeID uint64) (string, string, error)
}

INodeRegistry is the local registry interface used to keep all known nodes in the system..

type IRaftMessageHandler

type IRaftMessageHandler interface {
	HandleMessageBatch(batch pb.MessageBatch) (uint64, uint64)
	HandleUnreachable(clusterID uint64, nodeID uint64)
	HandleSnapshotStatus(clusterID uint64, nodeID uint64, rejected bool)
	HandleSnapshot(clusterID uint64, nodeID uint64, from uint64)
}

IRaftMessageHandler is the interface required to handle incoming raft requests.

type ITransport

type ITransport interface {
	Name() string
	SetUnmanagedDeploymentID()
	SetDeploymentID(uint64)
	SetMessageHandler(IRaftMessageHandler)
	ASyncSend(pb.Message) bool
	ASyncSendSnapshot(pb.Message) bool
	GetStreamConnection(clusterID uint64, nodeID uint64) *Sink
	Stop()
}

ITransport is the interface of the transport layer used for exchanging Raft messages.

type ITransportEvent

type ITransportEvent interface {
	ConnectionEstablished(string, bool)
	ConnectionFailed(string, bool)
}

ITransportEvent is the interface for notifying connection status changes.

type Marshaler

type Marshaler interface {
	MarshalTo([]byte) (int, error)
	Size() int
}

Marshaler is the interface for types that can be Marshaled.

type NOOPConnection

type NOOPConnection struct {
	// contains filtered or unexported fields
}

NOOPConnection is the connection used to exchange messages between node hosts.

func (*NOOPConnection) Close

func (c *NOOPConnection) Close()

Close closes the NOOPConnection instance.

func (*NOOPConnection) SendMessageBatch

func (c *NOOPConnection) SendMessageBatch(batch raftpb.MessageBatch) error

SendMessageBatch return ErrRequestedToFail when requested.

type NOOPSnapshotConnection

type NOOPSnapshotConnection struct {
	// contains filtered or unexported fields
}

NOOPSnapshotConnection is the connection used to send snapshots.

func (*NOOPSnapshotConnection) Close

func (c *NOOPSnapshotConnection) Close()

Close closes the NOOPSnapshotConnection.

func (*NOOPSnapshotConnection) SendChunk

func (c *NOOPSnapshotConnection) SendChunk(chunk raftpb.Chunk) error

SendChunk returns ErrRequestedToFail when requested.

type NOOPTransport

type NOOPTransport struct {
	// contains filtered or unexported fields
}

NOOPTransport is a transport module for testing purposes. It does not actually has the ability to exchange messages or snapshots between nodehosts.

func (*NOOPTransport) GetConnection

func (g *NOOPTransport) GetConnection(ctx context.Context,
	target string) (raftio.IConnection, error)

GetConnection returns a connection.

func (*NOOPTransport) GetSnapshotConnection

func (g *NOOPTransport) GetSnapshotConnection(ctx context.Context,
	target string) (raftio.ISnapshotConnection, error)

GetSnapshotConnection returns a snapshot connection.

func (*NOOPTransport) Name

func (g *NOOPTransport) Name() string

Name returns the module name.

func (*NOOPTransport) Start

func (g *NOOPTransport) Start() error

Start starts the NOOPTransport instance.

func (*NOOPTransport) Stop

func (g *NOOPTransport) Stop()

Stop stops the NOOPTransport instance.

type Nodes

type Nodes struct {
	// contains filtered or unexported fields
}

Nodes is used to manage all known node addresses in the multi raft system. The transport layer uses this address registry to locate nodes.

func NewNodes

func NewNodes(streamConnections uint64) *Nodes

NewNodes returns a new Nodes object.

func (*Nodes) AddNode

func (n *Nodes) AddNode(clusterID uint64, nodeID uint64, url string)

AddNode add a new node.

func (*Nodes) AddRemoteAddress

func (n *Nodes) AddRemoteAddress(clusterID uint64,
	nodeID uint64, address string)

AddRemoteAddress remembers the specified address obtained from the source of the incoming message.

func (*Nodes) RemoveAllPeers

func (n *Nodes) RemoveAllPeers()

RemoveAllPeers removes all remotes.

func (*Nodes) RemoveCluster

func (n *Nodes) RemoveCluster(clusterID uint64)

RemoveCluster removes all nodes info associated with the specified cluster

func (*Nodes) RemoveNode

func (n *Nodes) RemoveNode(clusterID uint64, nodeID uint64)

RemoveNode removes a remote from the node registry.

func (*Nodes) Resolve

func (n *Nodes) Resolve(clusterID uint64, nodeID uint64) (string, string, error)

Resolve looks up the Addr of the specified node.

func (*Nodes) ReverseResolve

func (n *Nodes) ReverseResolve(addr string) []raftio.NodeInfo

ReverseResolve does the reverse lookup for the specified address. A list of node raftio.NodeInfos are returned for nodes that match the specified address

type SendMessageBatchFunc

type SendMessageBatchFunc func(pb.MessageBatch) (pb.MessageBatch, bool)

SendMessageBatchFunc is a func type that is used to determine whether the specified message batch should be sent. This func is used in test only.

type Sink

type Sink struct {
	// contains filtered or unexported fields
}

Sink is the chunk sink for receiving generated snapshot chunk.

func (*Sink) ClusterID

func (s *Sink) ClusterID() uint64

ClusterID returns the cluster ID of the source node.

func (*Sink) Receive

func (s *Sink) Receive(chunk pb.Chunk) (bool, bool)

Receive receives a snapshot chunk.

func (*Sink) Stop

func (s *Sink) Stop()

Stop stops the sink processing.

func (*Sink) ToNodeID

func (s *Sink) ToNodeID() uint64

ToNodeID returns the node ID of the node intended to get and handle the received snapshot chunk.

type StreamChunkSendFunc

type StreamChunkSendFunc func(pb.Chunk) (pb.Chunk, bool)

StreamChunkSendFunc is a func type that is used to determine whether a snapshot chunk should indeed be sent. This func is used in test only.

type TCPConnection

type TCPConnection struct {
	// contains filtered or unexported fields
}

TCPConnection is the connection used for sending raft messages to remote nodes.

func NewTCPConnection

func NewTCPConnection(conn net.Conn,
	rb *ratelimit.Bucket, wb *ratelimit.Bucket, encrypted bool) *TCPConnection

NewTCPConnection creates and returns a new TCPConnection instance.

func (*TCPConnection) Close

func (c *TCPConnection) Close()

Close closes the TCPConnection instance.

func (*TCPConnection) SendMessageBatch

func (c *TCPConnection) SendMessageBatch(batch pb.MessageBatch) error

SendMessageBatch sends a raft message batch to remote node.

type TCPSnapshotConnection

type TCPSnapshotConnection struct {
	// contains filtered or unexported fields
}

TCPSnapshotConnection is the connection for sending raft snapshot chunks to remote nodes.

func NewTCPSnapshotConnection

func NewTCPSnapshotConnection(conn net.Conn,
	rb *ratelimit.Bucket, wb *ratelimit.Bucket,
	encrypted bool) *TCPSnapshotConnection

NewTCPSnapshotConnection creates and returns a new snapshot connection.

func (*TCPSnapshotConnection) Close

func (c *TCPSnapshotConnection) Close()

Close closes the snapshot connection.

func (*TCPSnapshotConnection) SendChunk

func (c *TCPSnapshotConnection) SendChunk(chunk pb.Chunk) error

SendChunk sends the specified snapshot chunk to remote node.

type TCPTransport

type TCPTransport struct {
	// contains filtered or unexported fields
}

TCPTransport is a TCP based RPC module for exchanging raft messages and snapshots between NodeHost instances.

func (*TCPTransport) GetConnection

func (g *TCPTransport) GetConnection(ctx context.Context,
	target string) (raftio.IConnection, error)

GetConnection returns a new raftio.IConnection for sending raft messages.

func (*TCPTransport) GetSnapshotConnection

func (g *TCPTransport) GetSnapshotConnection(ctx context.Context,
	target string) (raftio.ISnapshotConnection, error)

GetSnapshotConnection returns a new raftio.IConnection for sending raft snapshots.

func (*TCPTransport) Name

func (g *TCPTransport) Name() string

Name returns a human readable name of the TCP transport module.

func (*TCPTransport) Start

func (g *TCPTransport) Start() error

Start starts the TCP transport module.

func (*TCPTransport) Stop

func (g *TCPTransport) Stop()

Stop stops the TCP transport module.

type Transport

type Transport struct {
	DeploymentID
	// contains filtered or unexported fields
}

Transport is the transport layer for delivering raft messages and snapshots.

func NewTransport

func NewTransport(nhConfig config.NodeHostConfig,
	ctx *server.Context, resolver INodeAddressResolver,
	folder server.GetSnapshotDirFunc, sysEvents ITransportEvent,
	fs vfs.IFS) (*Transport, error)

NewTransport creates a new Transport object.

func (*Transport) ASyncSend

func (t *Transport) ASyncSend(req pb.Message) bool

ASyncSend sends raft messages using RPC

The generic async send Go pattern used in ASyncSend is found in CockroachDB's codebase.

func (*Transport) ASyncSendSnapshot

func (t *Transport) ASyncSendSnapshot(m pb.Message) bool

ASyncSendSnapshot sends raft snapshot message to its target.

func (*Transport) GetCircuitBreaker

func (t *Transport) GetCircuitBreaker(key string) *circuit.Breaker

GetCircuitBreaker returns the circuit breaker used for the specified target node.

func (*Transport) GetRaftRPC

func (t *Transport) GetRaftRPC() raftio.IRaftRPC

GetRaftRPC returns the raft RPC instance.

func (*Transport) GetStreamConnection

func (t *Transport) GetStreamConnection(clusterID uint64, nodeID uint64) *Sink

GetStreamConnection returns a connection used for streaming snapshot.

func (*Transport) Name

func (t *Transport) Name() string

Name returns the type name of the transport module

func (*Transport) SetMessageHandler

func (t *Transport) SetMessageHandler(handler IRaftMessageHandler)

SetMessageHandler sets the raft message handler.

func (*Transport) SetPreSendMessageBatchHook

func (t *Transport) SetPreSendMessageBatchHook(h SendMessageBatchFunc)

SetPreSendMessageBatchHook set the SendMessageBatch hook. This function is only expected to be used in monkey testing.

func (*Transport) SetPreStreamChunkSendHook

func (t *Transport) SetPreStreamChunkSendHook(h StreamChunkSendFunc)

SetPreStreamChunkSendHook sets the StreamChunkSend hook function that will be called before each snapshot chunk is sent.

func (*Transport) Stop

func (t *Transport) Stop()

Stop stops the Transport object.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL