raft

package
v0.42.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 25, 2024 License: MIT Imports: 21 Imported by: 0

Documentation

Overview

Package raft exports MessagePack codec functionality to serialize log entries, LogStore items, and FSM snaphosts.

Package raft exports a Raft finite state machine (FSM) for DDA state members to make use of replicated state.

Package raft provides a state synchronization binding implementation using the Raft consensus algorithm.

Package raft exports the RaftStore which is an implementation of a LogStore, a StableStore, and a FileSnapshotStore for the HashiCorp Raft library. RaftStore uses the internal DDA implementation of the Pebble storage engine.

Package raft exports a Raft transport based on DDA pub-sub communication.

Index

Constants

View Source
const (
	// DefaultStartupTimeout is the default timeout when starting up a new Raft
	// node, either as a leader or as a follower.
	DefaultStartupTimeout = 10000 * time.Millisecond

	// DefaultLfwTimeout is the default timeout for leaderforwarded Propose and
	// GetState remote operation responses. It only applies in situations where
	// there is no leader. It must not be set too low as Propose operations may
	// take some time.
	DefaultLfwTimeout = 20000 * time.Millisecond
)
View Source
const (
	// DefaultRpcTimeout is the default remote operation timeout in the Raft
	// transport.
	DefaultRpcTimeout = 1000 * time.Millisecond

	// DefaultInstallSnapshotTimeoutScale is the default TimeoutScale for
	// InstallSnapshot operations in the Raft transport.
	DefaultInstallSnapshotTimeoutScale = 256 * 1024 // 256KB
)

Variables

View Source
var (
	// ErrTransportShutdown is returned when operations on a transport are
	// invoked after it's been terminated.
	ErrTransportShutdown = fmt.Errorf("transport shutdown")

	// ErrPipelineShutdown is returned when the pipeline is closed.
	ErrPipelineShutdown = fmt.Errorf("append pipeline closed")
)
View Source
var (
	ErrKeyNotFound = errors.New("not found") // corresponds with Hashicorp raft key not found error string
)

Functions

func DecodeMsgPack

func DecodeMsgPack(b []byte, out any) error

DecodeMsgPack decodes from a MessagePack encoded byte slice.

func DecodeMsgPackFromReader

func DecodeMsgPackFromReader(r io.ReadCloser, out any) error

DecodeMsgPackFromReader decodes from a MessagePack encoded reader.

func EncodeMsgPack

func EncodeMsgPack(in any) ([]byte, error)

EncodeMsgPack returns an encoded MessagePack object as a byte slice.

Types

type AddVoterRequest

type AddVoterRequest struct {
	ServerId      hraft.ServerID
	ServerAddress hraft.ServerAddress
	Timeout       time.Duration // initial time to wait for AddVoter command to be started on leader
}

AddVoterRequest represents a leader forwarded request to add this node as a voting follower. The request responds with an error if the transport's configured rpcTimeout elapses before the corresponding command completes on the leader.

type AddVoterResponse

type AddVoterResponse struct {
	Index uint64 // holds the index of the newly applied log entry
}

AddVoterResponse represents a response to a leader forwarded AddVoterRequest.

type ApplyRequest

type ApplyRequest struct {
	Command []byte
	Timeout time.Duration // initial time to wait for Apply command to be started on leader
}

ApplyRequest represents a leader forwarded request to apply a given log command. The request responds with an error if the transport's configured rpcTimeout elapses before the corresponding command completes on the leader.

type ApplyResponse

type ApplyResponse struct {
	Index    uint64 // holds the index of the newly applied log entry on the leader
	Response error  // nil if operation is successful; an error otherwise
}

ApplyResponse represents a response to a leader forwarded ApplyRequest.

type RaftBinding

type RaftBinding struct {
	// contains filtered or unexported fields
}

RaftBinding realizes a state synchronization binding for the Raft consensus protocol by implementing the state synchronization API interface api.Api using the HashiCorp Raft library.

func (*RaftBinding) Close

func (b *RaftBinding) Close()

Close implements the api.Api interface.

func (*RaftBinding) Node

func (b *RaftBinding) Node() *hraft.Raft

Node returns the Raft node (exposed for testing purposes).

func (*RaftBinding) NodeId

func (b *RaftBinding) NodeId() string

NodeId returns the Raft node ID (exposed for testing purposes).

func (*RaftBinding) ObserveMembershipChange

func (b *RaftBinding) ObserveMembershipChange(ctx context.Context) (<-chan api.MembershipChange, error)

ObserveMembershipChange implements the api.Api interface.

func (*RaftBinding) ObserveStateChange

func (b *RaftBinding) ObserveStateChange(ctx context.Context) (<-chan api.Input, error)

ObserveStateChange implements the api.Api interface.

func (*RaftBinding) Open

func (b *RaftBinding) Open(cfg *config.Config, com comapi.Api) error

Open implements the api.Api interface.

func (*RaftBinding) ProposeInput

func (b *RaftBinding) ProposeInput(ctx context.Context, in *api.Input) error

ProposeInput implements the api.Api interface.

type RaftFsm

type RaftFsm struct {
	// contains filtered or unexported fields
}

RaftFsm implements the hraft.FSM interface to model replicated state in the form of a key-value dictionary.

func NewRaftFsm

func NewRaftFsm() *RaftFsm

NewRaftFsm creates a new Raft FSM that models replicated state in the form of a key-value dictionary.

func (*RaftFsm) AddStateChangeObserver

func (f *RaftFsm) AddStateChangeObserver(ch chan api.Input) uint64

AddStateChangeObserver registers the given channel to listen to state changes and returns a channel ID that can be used to deregister the channel later.

The channel should continuously receive data on the channel in a non-blocking manner to prevent blocking send operations.

func (*RaftFsm) Apply

func (f *RaftFsm) Apply(entry *hraft.Log) any

Apply is called once a log entry is committed by a majority of the cluster.

Apply should apply the log to the FSM. Apply must be deterministic and produce the same result on all peers in the cluster.

The returned value is returned to the client as the ApplyFuture.Response. Note that if Apply returns an error, it will be returned by Response, and not by the Error method of ApplyFuture, so it is always important to check Response for errors from the FSM. If the given input operation is applied successfully, ApplyFuture.Response returns nil.

Apply implements the hraft.FSM interface.

func (*RaftFsm) RemoveStateChangeObserver

func (f *RaftFsm) RemoveStateChangeObserver(chanId uint64)

RemoveStateChangeObserver deregisters the channel with the given channel id.

Note that the channel is not closed, it must be closed by the caller.

func (*RaftFsm) Restore

func (f *RaftFsm) Restore(snapshot io.ReadCloser) error

Restore is used to restore an FSM from a snapshot. It is not called concurrently with any other command. The FSM must discard all previous state before restoring the snapshot.

Restore implements the hraft.FSM interface.

func (*RaftFsm) Snapshot

func (f *RaftFsm) Snapshot() (hraft.FSMSnapshot, error)

Snapshot returns an FSMSnapshot used to: support log compaction, to restore the FSM to a previous state, or to bring out-of-date followers up to a recent log index.

The Snapshot implementation should return quickly, because Apply can not be called while Snapshot is running. Generally this means Snapshot should only capture a pointer to the state, and any expensive IO should happen as part of FSMSnapshot.Persist.

Apply and Snapshot are always called from the same thread, but Apply will be called concurrently with FSMSnapshot.Persist. This means the FSM should be implemented to allow for concurrent updates while a snapshot is happening.

Snapshot implements the hraft.FSM interface.

func (*RaftFsm) State

func (f *RaftFsm) State() api.State

State gets a deep copy of the current key-value pairs of a RaftFsm. The returned state can be safely mutated. To be used for testing purposes only.

type RaftStore

type RaftStore struct {
	SnapStore hraft.SnapshotStore // Raft snapshot store
	// contains filtered or unexported fields
}

RaftStore implements a LogStore, a StableStore, and a FileSnapshotStore for the [HashiCorp Raft] library.

func NewRaftStore

func NewRaftStore(location string) (*RaftStore, error)

NewRaftStore creates local storage for persisting Raft specific durable data including log entries, stable store, and file snapshot store. Returns an error along with a nil *RaftStore if any of the stores couldn't be created.

The given storage location should specify a directory given by an absolute pathname or a pathname relative to the working directory of the DDA sidecar or instance, or an empty string to indicate that storage is non-persistent and completely memory-backed as long as the DDA instance is running.

func (*RaftStore) AddMembershipChangeObserver

func (s *RaftStore) AddMembershipChangeObserver(ch chan api.MembershipChange, raft *hraft.Raft) uint64

AddMembershipChangeObserver registers the given channel to listen to membership changes and returns a channel ID that can be used to deregister the channel later.

The channel should continuously receive data on the channel in a non-blocking manner to prevent blocking send operations.

func (*RaftStore) Close

func (s *RaftStore) Close(removeStorage bool) error

Close gracefully closes the Raft store, optionally removing all associated persistent storage files and folders.

You should not remove storage if you want to restart your DDA state member at a later point in time.

func (*RaftStore) DeleteRange

func (s *RaftStore) DeleteRange(min, max uint64) error

DeleteRange deletes a range of log entries. The range is inclusive.

DeleteRange implements the hraft.LogStore interface.

func (*RaftStore) FirstIndex

func (s *RaftStore) FirstIndex() (uint64, error)

FirstIndex returns the first index written. 0 for no entries.

FirstIndex implements the hraft.LogStore interface.

func (*RaftStore) Get

func (s *RaftStore) Get(key []byte) ([]byte, error)

Get returns the value for key, or an empty byte slice if key was not found.

Get implements the hraft.StableStore interface.

func (*RaftStore) GetLog

func (s *RaftStore) GetLog(index uint64, log *hraft.Log) error

GetLog gets a log entry at a given index.

GetLog implements the hraft.LogStore interface.

func (*RaftStore) GetUint64

func (s *RaftStore) GetUint64(key []byte) (uint64, error)

GetUint64 returns the uint64 value for key, or 0 if key was not found.

GetUint64 implements the hraft.StableStore interface.

func (*RaftStore) LastIndex

func (s *RaftStore) LastIndex() (uint64, error)

LastIndex returns the last index written. 0 for no entries.

LastIndex implements the hraft.LogStore interface.

func (*RaftStore) RemoveMembershipChangeObserver

func (s *RaftStore) RemoveMembershipChangeObserver(chanId uint64)

RemoveMembershipChangeObserver deregisters the channel with the given channel id.

Note that the channel is not closed, it must be closed by the caller.

func (*RaftStore) Set

func (s *RaftStore) Set(key []byte, val []byte) error

Set sets the given key-value pair.

Set implements the hraft.StableStore interface.

func (*RaftStore) SetUint64

func (s *RaftStore) SetUint64(key []byte, val uint64) error

SetUint64 sets the given key-value pair.

SetUint64 implements the hraft.StableStore interface.

func (*RaftStore) StoreLog

func (s *RaftStore) StoreLog(log *hraft.Log) error

StoreLog stores a log entry.

StoreLog implements the hraft.LogStore interface.

func (*RaftStore) StoreLogs

func (s *RaftStore) StoreLogs(logs []*hraft.Log) error

StoreLogs stores multiple log entries.

By default the logs stored may not be contiguous with previous logs (i.e. may have a gap in Index since the last log written). If an implementation can't tolerate this it may optionally implement `MonotonicLogStore` to indicate that this is not allowed. This changes Raft's behaviour after restoring a user snapshot to remove all previous logs instead of relying on a "gap" to signal the discontinuity between logs before the snapshot and logs after.

StoreLogs implements the hraft.LogStore interface.

type RaftTransport

type RaftTransport struct {
	// contains filtered or unexported fields
}

RaftTransport implements the hraft.Transport interface to allow Raft to communicate with other Raft nodes over the configured DDA pub-sub communication protocol. In addition, it supports leader forwarding to allow non-leader Raft nodes to accept Apply, Barrier, and AddVoter commands.

func NewRaftTransport

func NewRaftTransport(config *RaftTransportConfig, addr hraft.ServerAddress, com comapi.Api) *RaftTransport

NewRaftTransport creates a new Raft pub-sub transport with the given transport configuration, a local address, and a ready-to-use DDA pub-sub communication API.

func (*RaftTransport) AppendEntries

AppendEntries sends the appropriate RPC to the target node.

AppendEntries implements the hraft.Transport interface.

func (*RaftTransport) AppendEntriesPipeline

func (t *RaftTransport) AppendEntriesPipeline(id hraft.ServerID, target hraft.ServerAddress) (hraft.AppendPipeline, error)

AppendEntriesPipeline returns an interface that can be used to pipeline AppendEntries requests to the target node.

AppendEntriesPipeline implements the hraft.Transport interface.

func (*RaftTransport) Close

func (t *RaftTransport) Close() error

Close permanently closes a transport, stopping any associated goroutines and freeing other resources.

Close implements the hraft.WithClose interface. WithClose is an interface that a transport may provide which allows a transport to be shut down cleanly when a Raft instance shuts down.

func (*RaftTransport) Consumer

func (t *RaftTransport) Consumer() <-chan hraft.RPC

Consumer returns a channel that can be used to consume and respond to RPC requests. This channel is not used for leader forwarding operations (see [LfwConsumer]).

Consumer implements the hraft.Transport interface.

func (*RaftTransport) DecodePeer

func (t *RaftTransport) DecodePeer(buf []byte) hraft.ServerAddress

DecodePeer is used to deserialize a peer's address.

DecodePeer implements the hraft.Transport interface.

func (*RaftTransport) EncodePeer

func (t *RaftTransport) EncodePeer(id hraft.ServerID, addr hraft.ServerAddress) []byte

EncodePeer is used to serialize a peer's address

EncodePeer implements the hraft.Transport interface.

func (*RaftTransport) InstallSnapshot

InstallSnapshot is used to push a snapshot down to a follower. The data is read from the ReadCloser and streamed to the client.

InstallSnapshot implements the hraft.Transport interface.

func (*RaftTransport) LfwAddVoter

func (t *RaftTransport) LfwAddVoter(ctx context.Context, args *AddVoterRequest, resp *AddVoterResponse) error

LfwAddVoter implements transparent leader forwarding for hraft.AddVoter command. It will forward the request to the leader which will add the given server to the cluster as a staging server, promoting it to a voter once that server is ready.

LfwAddVoter is a blocking operation that will time out with an error in case no response is received within a time interval given by the context.

func (*RaftTransport) LfwApply

func (t *RaftTransport) LfwApply(ctx context.Context, args *ApplyRequest, resp *ApplyResponse) error

LfwApply implements transparent leader forwarding for hraft.Apply command. It will forward a command to the leader which applies it to the FSM in a highly consistent manner.

LfwApply is a blocking operation that will time out with an error in case no response is received within a time interval given by the context.

func (*RaftTransport) LfwConsumer

func (t *RaftTransport) LfwConsumer() <-chan hraft.RPC

LfwConsumer returns a channel that can be used to consume and respond to leader forwarded RPC requests. This channel is not used for node-targeted RPC operations (see [Consumer]).

func (*RaftTransport) LfwRemoveServer

func (t *RaftTransport) LfwRemoveServer(ctx context.Context, args *RemoveServerRequest, resp *RemoveServerResponse) error

LfwRemoveServer implements transparent leader forwarding for hraft.RemoveServer command. It will forward the request to the leader which will remove the given server from the cluster. If the current leader is being removed, it will cause a new election to occur.

LfwRemoveServer is a blocking operation that will time out with an error in case no response is received within a time interval given by the context.

func (*RaftTransport) LocalAddr

func (t *RaftTransport) LocalAddr() hraft.ServerAddress

LocalAddr is used to return our local address to distinguish from our peers.

LocalAddr implements the hraft.Transport interface.

func (*RaftTransport) RequestVote

RequestVote sends the appropriate RPC to the target node.

RequestVote implements the hraft.Transport interface.

func (*RaftTransport) SetHeartbeatHandler

func (t *RaftTransport) SetHeartbeatHandler(cb func(rpc hraft.RPC))

SetHeartbeatHandler is used to setup a heartbeat handler as a fast-path. This is to avoid head-of-line blocking from RPC invocations. If a Transport does not support this, it can simply ignore the callback, and push the heartbeat onto the Consumer channel. Otherwise, it MUST be safe for this callback to be invoked concurrently with a blocking RPC.

SetHeartbeatHandler implements the hraft.Transport interface.

func (*RaftTransport) Timeout

func (t *RaftTransport) Timeout() time.Duration

Timeout gets the configured timeout duration of the transport.

func (*RaftTransport) TimeoutNow

TimeoutNow is used to start a leadership transfer to the target node.

TimeoutNow implements the hraft.Transport interface.

type RaftTransportConfig

type RaftTransportConfig struct {
	// Timeout used to apply I/O deadlines to remote operations on the Raft
	// transport. For InstallSnapshot, we multiply the timeout by (SnapshotSize
	// / TimeoutScale).
	//
	// If not present or zero, the default timeout is applied.
	Timeout time.Duration

	// For InstallSnapshot, timeout is proportional to the snapshot size. The
	// timeout is multiplied by (SnapshotSize / TimeoutScale).
	//
	// If not present or zero, a default value of 256KB is used.
	TimeoutScale int
}

RaftTransportConfig encapsulates configuration options for the Raft pub-sub transport layer.

type RemoveServerRequest

type RemoveServerRequest struct {
	ServerId hraft.ServerID
	Timeout  time.Duration // initial time to wait for RemoveServer command to be started on leader
}

AddVoterRequest represents a leader forwarded request to remove this node as a server from the cluster. The request responds with an error if the transport's configured rpcTimeout elapses before the corresponding command completes on the leader.

type RemoveServerResponse

type RemoveServerResponse struct {
	Index uint64 // holds the index of the newly applied log entry
}

RemoveServerResponse represents a response to a leader forwarded RemoveServerRequest.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL