raft

package module
v0.0.0-...-abbc735 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 16, 2022 License: Apache-2.0 Imports: 43 Imported by: 0

README

Raft

workflow-test codecov

An implementation of the Raft distributed consensus algorithm in Go.

This implementation tries to implement Raft with:

  • Leader election
  • Log replication
  • Persistence
  • Membership changes (joint consensus)
  • Log compaction (snapshotting)

Roadmap

  • API server
  • Persistence (with bbolt)
  • gRPC transport
  • KV store (as an example)
  • Snapshotting
  • Full tests
  • Replication optimization
  • Improve API server
  • Internal metrics
  • Logger optimization

License

This implementation is under the Apache 2.0 license. See the LICENSE file for details.

Documentation

Index

Constants

View Source
const (
	MetricGoroutines = "goroutines"
)

Variables

View Source
var (
	ErrDeadlineExceeded = errors.New("deadline exceeded")

	// ErrServerShutdown indicates that the server was already shutted
	// down or the server shutting down is in progress.
	ErrServerShutdown = errors.New("server shutdown")

	// ErrNonLeader indicates that the server received an RPC that cannot
	// be processed on non-leader server.
	ErrNonLeader = errors.New("not a leader")

	// ErrNonFollower indicates that the server received an RPC that cannot
	// be processed on non-follower server.
	ErrNonFollower = errors.New("not a follower")

	// ErrUnrecognizedRPC indicates that the server has received an
	// unrecongized RPC request.
	ErrUnrecognizedRPC = errors.New("unrecognized RPC request")

	// ErrInJointConsensus indicates that the server is already in a joint
	// consensus.
	ErrInJointConsensus = errors.New("already in a joint consensus")

	// ErrInJointConsensus indicates that the server is not in a joint consensus.
	ErrNotInJointConsensus = errors.New("not in a joint consensus")

	ErrUnknownTransporClient = errors.New("unknown transport client")

	ErrUnknownRPC = errors.New("unknown RPC")
)
View Source
var ErrInvalidHex = errors.New("the provided hex string is not a valid ObjectID")

ErrInvalidHex indicates that a hex string cannot be converted to an ObjectID.

Functions

func Context

func Context(timeout ...time.Duration) (context.Context, context.CancelFunc)

func DecodeUint64

func DecodeUint64(b []byte) uint64

func EncodeUint64

func EncodeUint64(v uint64) []byte

func IsValidObjectID

func IsValidObjectID(s string) bool

IsValidObjectID returns true if the provided hex string represents a valid ObjectID and false if not.

func Must1

func Must1(err error)

func Must2

func Must2[T any](result T, err error) T

func PathJoin

func PathJoin(prefix, suffix string) string

func Ptr

func Ptr[T any](v T) *T

func Zero

func Zero(v interface{})

Types

type APIExtension

type APIExtension interface {
	Setup(s *Server, r *mux.Router) error
}

type BoltLogStore

type BoltLogStore struct {
	// contains filtered or unexported fields
}

BoltLogStore is a LogStore that uses bbolt as a backend.

func NewBoltLogStore

func NewBoltLogStore(db *bbolt.DB) *BoltLogStore

func (*BoltLogStore) AppendLogs

func (s *BoltLogStore) AppendLogs(logs []*pb.Log) error

func (*BoltLogStore) Close

func (p *BoltLogStore) Close() error

func (*BoltLogStore) DebugPrint

func (s *BoltLogStore) DebugPrint()

func (*BoltLogStore) Entry

func (s *BoltLogStore) Entry(index uint64) (*pb.Log, error)

func (*BoltLogStore) FirstIndex

func (s *BoltLogStore) FirstIndex() (uint64, error)

func (*BoltLogStore) LastEntry

func (s *BoltLogStore) LastEntry(t pb.LogType) (*pb.Log, error)

func (*BoltLogStore) LastIndex

func (s *BoltLogStore) LastIndex() (uint64, error)

func (*BoltLogStore) TrimPrefix

func (s *BoltLogStore) TrimPrefix(index uint64) error

func (*BoltLogStore) TrimSuffix

func (s *BoltLogStore) TrimSuffix(index uint64) error

type BoltStateStore

type BoltStateStore struct {
	// contains filtered or unexported fields
}

func NewBoltStateStore

func NewBoltStateStore(db *bbolt.DB) *BoltStateStore

func (*BoltStateStore) CurrentTerm

func (s *BoltStateStore) CurrentTerm() (uint64, error)

func (*BoltStateStore) LastVote

func (s *BoltStateStore) LastVote() (voteSummary, error)

func (*BoltStateStore) SetCurrentTerm

func (s *BoltStateStore) SetCurrentTerm(currentTerm uint64) error

func (*BoltStateStore) SetLastVote

func (s *BoltStateStore) SetLastVote(summary voteSummary) error

type BoltStore

type BoltStore struct {
	LogStore
	StateStore
}

func NewBoltStore

func NewBoltStore(path string) (*BoltStore, error)

type BufferedReadCloser

type BufferedReadCloser struct {
	// contains filtered or unexported fields
}

func NewBufferedReadCloser

func NewBufferedReadCloser(r io.ReadCloser) *BufferedReadCloser

func (*BufferedReadCloser) Close

func (r *BufferedReadCloser) Close() error

func (*BufferedReadCloser) Read

func (r *BufferedReadCloser) Read(p []byte) (n int, err error)

type BufferedWriteCloser

type BufferedWriteCloser struct {
	// contains filtered or unexported fields
}

func NewBufferedWriteCloser

func NewBufferedWriteCloser(w io.WriteCloser) *BufferedWriteCloser

func (*BufferedWriteCloser) Close

func (w *BufferedWriteCloser) Close() error

func (*BufferedWriteCloser) Flush

func (w *BufferedWriteCloser) Flush() error

func (*BufferedWriteCloser) Write

func (w *BufferedWriteCloser) Write(p []byte) (n int, err error)

type CappedSlice

type CappedSlice struct {
	// contains filtered or unexported fields
}

CappedSlice is a ring buffer like slice which holds a maximum of `cap` items. When the slice overflows, older items will be evicted first.

func NewCappedSlice

func NewCappedSlice(cap int) *CappedSlice

func (*CappedSlice) Push

func (c *CappedSlice) Push(v interface{})

func (*CappedSlice) Range

func (c *CappedSlice) Range(fn func(i int, v interface{}) (cont bool))

type Command

type Command []byte

type CounterTimer

type CounterTimer struct {
	// contains filtered or unexported fields
}

func NewCounterTimer

func NewCounterTimer(counts int, interval time.Duration) *CounterTimer

func (*CounterTimer) C

func (t *CounterTimer) C() <-chan struct{}

func (*CounterTimer) Count

func (t *CounterTimer) Count()

func (*CounterTimer) Stop

func (t *CounterTimer) Stop()

func (*CounterTimer) StopC

func (t *CounterTimer) StopC() <-chan struct{}

type Future

type Future[T any] interface {
	Result() (T, error)
	// contains filtered or unexported methods
}

Future represents an async task with an undetermined result.

type FutureTask

type FutureTask[FUTURE any, TASK any] interface {
	Future[FUTURE]
	Task() TASK
}

type GRPCTransport

type GRPCTransport struct {
	// contains filtered or unexported fields
}

func NewGRPCTransport

func NewGRPCTransport(listenAddr string) (*GRPCTransport, error)

func (*GRPCTransport) AppendEntries

func (t *GRPCTransport) AppendEntries(
	ctx context.Context, peer *pb.Peer, request *pb.AppendEntriesRequest,
) (*pb.AppendEntriesResponse, error)

func (*GRPCTransport) ApplyLog

func (t *GRPCTransport) ApplyLog(
	ctx context.Context, peer *pb.Peer, request *pb.ApplyLogRequest,
) (*pb.ApplyLogResponse, error)

func (*GRPCTransport) Close

func (t *GRPCTransport) Close() error

func (*GRPCTransport) Connect

func (t *GRPCTransport) Connect(peer *pb.Peer) error

func (*GRPCTransport) Disconnect

func (t *GRPCTransport) Disconnect(peer *pb.Peer)

func (*GRPCTransport) DisconnectAll

func (t *GRPCTransport) DisconnectAll()

func (*GRPCTransport) Endpoint

func (t *GRPCTransport) Endpoint() string

func (*GRPCTransport) InstallSnapshot

func (t *GRPCTransport) InstallSnapshot(
	ctx context.Context, peer *pb.Peer, requestMeta *pb.InstallSnapshotRequestMeta, reader io.Reader,
) (*pb.InstallSnapshotResponse, error)

func (*GRPCTransport) RPC

func (t *GRPCTransport) RPC() <-chan *RPC

func (*GRPCTransport) RequestVote

func (t *GRPCTransport) RequestVote(
	ctx context.Context, peer *pb.Peer, request *pb.RequestVoteRequest,
) (*pb.RequestVoteResponse, error)

func (*GRPCTransport) Serve

func (t *GRPCTransport) Serve() error

type HandyEncoding

type HandyEncoding string
const (
	HandyEncodingJSON   HandyEncoding = "json"
	HandyEncodingBase64 HandyEncoding = "base64"
	HandyEncodingRaw    HandyEncoding = "raw"
)

type HandyRespWriter

type HandyRespWriter struct {
	http.ResponseWriter
	// contains filtered or unexported fields
}

func NewHandyRespWriter

func NewHandyRespWriter(w http.ResponseWriter, logger *zap.Logger) (h HandyRespWriter)

func (*HandyRespWriter) Encoded

func (rw *HandyRespWriter) Encoded(v interface{}, e HandyEncoding, statusCode int)

func (*HandyRespWriter) Error

func (rw *HandyRespWriter) Error(err error)

func (*HandyRespWriter) JSON

func (rw *HandyRespWriter) JSON(v interface{})

func (*HandyRespWriter) JSONFunc

func (rw *HandyRespWriter) JSONFunc(fn func() (v interface{}, statusCode int, err error))

func (*HandyRespWriter) JSONStatus

func (rw *HandyRespWriter) JSONStatus(v interface{}, statusCode int)

type InstallSnapshotRequest

type InstallSnapshotRequest struct {
	Metadata *pb.InstallSnapshotRequestMeta
	Reader   io.ReadCloser
}

type LogStore

type LogStore interface {
	// AppendLogs is used to append logs to the LogStore.
	// It's recommended to use techniques like transaction processing to
	// avoid data inconsistency due to an error or interruption.
	AppendLogs(logs []*pb.Log) error

	// TrimPrefix is used to trim the logs by evicting UNPACKED logs forwards from
	// the first log until the log with the index is reached. Index is exclusive.
	TrimPrefix(index uint64) error

	// TrimSuffix is used to trim the logs by evicting UNPACKED logs backwards from
	// the last log until the log with the index is reached. Index is exclusive.
	TrimSuffix(index uint64) error

	FirstIndex() (uint64, error)
	LastIndex() (uint64, error)

	Entry(index uint64) (*pb.Log, error)

	// LastEntry is used to find the last log entry.
	// If t is not zero, a log type filter should be applied.
	LastEntry(t pb.LogType) (*pb.Log, error)
}

LogStore defines the interface for appending, trimming, and retrieving logs in a stable store. A LogStore implementation can also implement the optional io.Closer interface to allow releasing the underlying resources it has acquired.

type MetricsExporter

type MetricsExporter interface {
	Record(time time.Time, name string, value interface{})
}

type ObjectID

type ObjectID [12]byte

ObjectID is the BSON ObjectID type.

var NilObjectID ObjectID

NilObjectID is the zero value for ObjectID.

func NewObjectID

func NewObjectID() ObjectID

NewObjectID generates a new ObjectID.

func NewObjectIDFromTimestamp

func NewObjectIDFromTimestamp(timestamp time.Time) ObjectID

NewObjectIDFromTimestamp generates a new ObjectID based on the given time.

func ObjectIDFromHex

func ObjectIDFromHex(s string) (ObjectID, error)

ObjectIDFromHex creates a new ObjectID from a hex string. It returns an error if the hex string is not a valid ObjectID.

func (ObjectID) Hex

func (id ObjectID) Hex() string

Hex returns the hex encoding of the ObjectID as a string.

func (ObjectID) IsZero

func (id ObjectID) IsZero() bool

IsZero returns true if id is the empty ObjectID.

func (ObjectID) MarshalJSON

func (id ObjectID) MarshalJSON() ([]byte, error)

MarshalJSON returns the ObjectID as a string

func (ObjectID) MarshalText

func (id ObjectID) MarshalText() ([]byte, error)

MarshalText returns the ObjectID as UTF-8-encoded text. Implementing this allows us to use ObjectID as a map key when marshalling JSON. See https://pkg.go.dev/encoding#TextMarshaler

func (ObjectID) String

func (id ObjectID) String() string

func (ObjectID) Timestamp

func (id ObjectID) Timestamp() time.Time

Timestamp extracts the time part of the ObjectId.

func (*ObjectID) UnmarshalJSON

func (id *ObjectID) UnmarshalJSON(b []byte) error

UnmarshalJSON populates the byte slice with the ObjectID. If the byte slice is 24 bytes long, it will be populated with the hex representation of the ObjectID. If the byte slice is twelve bytes long, it will be populated with the BSON representation of the ObjectID. This method also accepts empty strings and decodes them as NilObjectID. For any other inputs, an error will be returned.

func (*ObjectID) UnmarshalText

func (id *ObjectID) UnmarshalText(b []byte) error

UnmarshalText populates the byte slice with the ObjectID. Implementing this allows us to use ObjectID as a map key when unmarshalling JSON. See https://pkg.go.dev/encoding#TextUnmarshaler

type RPC

type RPC struct {
	// contains filtered or unexported fields
}

func NewRPC

func NewRPC(ctx context.Context, request interface{}) *RPC

func (*RPC) Context

func (r *RPC) Context() context.Context

func (*RPC) Request

func (r *RPC) Request() interface{}

func (*RPC) Respond

func (r *RPC) Respond(response interface{}, err error)

func (*RPC) Response

func (r *RPC) Response() (interface{}, error)

type Server

type Server struct {
	// contains filtered or unexported fields
}

func NewServer

func NewServer(coreOpts ServerCoreOptions, opts ...ServerOption) (*Server, error)

func (*Server) Apply

func (s *Server) Apply(ctx context.Context, body *pb.LogBody) FutureTask[*pb.LogMeta, *pb.LogBody]

Apply. Future(LogMeta, error)

func (*Server) ApplyCommand

func (s *Server) ApplyCommand(ctx context.Context, command Command) FutureTask[*pb.LogMeta, *pb.LogBody]

ApplyCommand. Future(LogMeta, error)

func (*Server) Endpoint

func (s *Server) Endpoint() string

func (*Server) Id

func (s *Server) Id() string

func (*Server) Info

func (s *Server) Info() ServerInfo

func (*Server) Leader

func (s *Server) Leader() *pb.Peer

func (*Server) Register

func (s *Server) Register(peer *pb.Peer) error

Register is used to register a server to current cluster. ErrInJointConsensus is returned when the server is already in a joint consensus.

func (*Server) Serve

func (s *Server) Serve() error

func (*Server) Shutdown

func (s *Server) Shutdown(err error)

func (*Server) StateMachine

func (s *Server) StateMachine() StateMachine

func (*Server) States

func (s *Server) States() ServerStates

type ServerCoreOptions

type ServerCoreOptions struct {
	Id             string
	InitialCluster []*pb.Peer
	StableStore    StableStore
	StateMachine   StateMachine
	SnapshotStore  SnapshatStore
	Transport      Transport
}

type ServerInfo

type ServerInfo struct {
	ID       string `json:"id"`
	Endpoint string `json:"endpoint"`
}

type ServerOption

type ServerOption func(options *serverOptions)

func APIExtensionOption

func APIExtensionOption(extension APIExtension) ServerOption

func APIServerListenAddressOption

func APIServerListenAddressOption(address string) ServerOption

func ElectionTimeoutOption

func ElectionTimeoutOption(timeout time.Duration) ServerOption

func FollowerTimeoutOption

func FollowerTimeoutOption(timeout time.Duration) ServerOption

func LogLevelOption

func LogLevelOption(level zapcore.Level) ServerOption

func MetricsKeeperOption

func MetricsKeeperOption(exporter MetricsExporter) ServerOption

func SnapshotPolicyOption

func SnapshotPolicyOption(policy SnapshotPolicy) ServerOption

type ServerRole

type ServerRole uint32
const (
	Leader ServerRole = 1 + iota
	Candidate
	Follower
)

func (ServerRole) String

func (r ServerRole) String() string

type ServerStates

type ServerStates struct {
	ID                string   `json:"id"`
	Endpoint          string   `json:"endpoint"`
	Leader            *pb.Peer `json:"leader"`
	Role              string   `json:"role"`
	CurrentTerm       uint64   `json:"current_term"`
	LastLogIndex      uint64   `json:"last_log_index"`
	LastVoteTerm      uint64   `json:"last_vote_term"`
	LastVoteCandidate string   `json:"last_vote_candidate"`
	CommitIndex       uint64   `json:"commit_index"`
}

type SingleFlight

type SingleFlight[T any] struct {
	// contains filtered or unexported fields
}

SingleFlight executes the function only once on the first Do() call, returns the function return value and save it for future Do() calls.

func (*SingleFlight[T]) Do

func (s *SingleFlight[T]) Do(f func() T) T

type SnapshatStore

type SnapshatStore interface {
	Create(index, term uint64, c *pb.Configuration, cIndex uint64) (SnapshotSink, error)
	List() ([]SnapshotMeta, error)
	Open(id string) (Snapshot, error)
	DecodeMeta(b []byte) (SnapshotMeta, error)
	Trim() error
}

type Snapshot

type Snapshot interface {
	Meta() (SnapshotMeta, error)
	Reader() (io.Reader, error)

	// Close is used to close the snapshot's underlying file descriptors or handles.
	Close() error
}

Snapshot is a descriptor that holds the snapshot file.

type SnapshotMeta

type SnapshotMeta interface {
	Id() string
	Index() uint64
	Term() uint64
	Configuration() *pb.Configuration
	ConfigurationIndex() uint64
	Encode() ([]byte, error)
}

type SnapshotPolicy

type SnapshotPolicy struct {
	Applies  int
	Interval time.Duration
}

type SnapshotSink

type SnapshotSink interface {
	io.WriteCloser
	Meta() SnapshotMeta
	Cancel() error
}

type StableStore

type StableStore interface {
	LogStore
	StateStore
}

type StateMachine

type StateMachine interface {
	Apply(command Command)
	Snapshot() (StateMachineSnapshot, error)
	Restore(snapshot Snapshot) error
}

type StateMachineSnapshot

type StateMachineSnapshot interface {
	Write(sink SnapshotSink) error
}

type StateStore

type StateStore interface {
	CurrentTerm() (uint64, error)
	SetCurrentTerm(term uint64) error
	LastVote() (voteSummary, error)
	SetLastVote(summary voteSummary) error
}

StateStore defines the interface to save and restore the persistent server states from a stable store.

type StreamAverage

type StreamAverage struct {
	// contains filtered or unexported fields
}

func (*StreamAverage) Avg

func (a *StreamAverage) Avg() float64

func (*StreamAverage) N

func (a *StreamAverage) N() int

func (*StreamAverage) Push

func (a *StreamAverage) Push(v float64) float64

type StreamMinMaxFloat64

type StreamMinMaxFloat64 struct {
	// contains filtered or unexported fields
}

func (*StreamMinMaxFloat64) Max

func (a *StreamMinMaxFloat64) Max() float64

func (*StreamMinMaxFloat64) Min

func (a *StreamMinMaxFloat64) Min() float64

func (*StreamMinMaxFloat64) N

func (a *StreamMinMaxFloat64) N() int

func (*StreamMinMaxFloat64) Push

func (a *StreamMinMaxFloat64) Push(v float64) (min, max float64)

type StreamMinMaxInt64

type StreamMinMaxInt64 struct {
	// contains filtered or unexported fields
}

func (*StreamMinMaxInt64) Cap

func (a *StreamMinMaxInt64) Cap(cap int) int64

func (*StreamMinMaxInt64) Max

func (a *StreamMinMaxInt64) Max() int64

func (*StreamMinMaxInt64) Min

func (a *StreamMinMaxInt64) Min() int64

func (*StreamMinMaxInt64) N

func (a *StreamMinMaxInt64) N() int

func (*StreamMinMaxInt64) Push

func (a *StreamMinMaxInt64) Push(v int64) (min, max int64)

type TickTrigger

type TickTrigger struct {
	// contains filtered or unexported fields
}

func NewTickTrigger

func NewTickTrigger(ticks int64, timeout time.Duration, fn func()) *TickTrigger

func (*TickTrigger) Tick

func (t *TickTrigger) Tick()

type Transport

type Transport interface {
	// Endpoint returns the endpoint used by current Transport instance
	Endpoint() string

	AppendEntries(ctx context.Context, peer *pb.Peer, request *pb.AppendEntriesRequest) (*pb.AppendEntriesResponse, error)
	RequestVote(ctx context.Context, peer *pb.Peer, request *pb.RequestVoteRequest) (*pb.RequestVoteResponse, error)
	InstallSnapshot(ctx context.Context, peer *pb.Peer, requestMeta *pb.InstallSnapshotRequestMeta, reader io.Reader) (*pb.InstallSnapshotResponse, error)
	ApplyLog(ctx context.Context, peer *pb.Peer, request *pb.ApplyLogRequest) (*pb.ApplyLogResponse, error)

	RPC() <-chan *RPC
}

type TransportCloser

type TransportCloser interface {
	Close() error
}

TransportCloser is an optional interface for those implementations that allow explicit close operation on its underlying connections.

type TransportConnecter

type TransportConnecter interface {
	Connect(peer *pb.Peer) error
	Disconnect(peer *pb.Peer)
	DisconnectAll()
}

TransportConnecter is an optional interface for those implementations that allow explicit connect and disconnect operations on a per peer basis.

type TransportServer

type TransportServer interface {
	Serve() error
}

Directories

Path Synopsis
cmd
kv

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL