Documentation ¶
Index ¶
- Constants
- Variables
- func Context(timeout ...time.Duration) (context.Context, context.CancelFunc)
- func DecodeUint64(b []byte) uint64
- func EncodeUint64(v uint64) []byte
- func IsValidObjectID(s string) bool
- func Must1(err error)
- func Must2[T any](result T, err error) T
- func PathJoin(prefix, suffix string) string
- func Ptr[T any](v T) *T
- func Zero(v interface{})
- type APIExtension
- type BoltLogStore
- func (s *BoltLogStore) AppendLogs(logs []*pb.Log) error
- func (p *BoltLogStore) Close() error
- func (s *BoltLogStore) DebugPrint()
- func (s *BoltLogStore) Entry(index uint64) (*pb.Log, error)
- func (s *BoltLogStore) FirstIndex() (uint64, error)
- func (s *BoltLogStore) LastEntry(t pb.LogType) (*pb.Log, error)
- func (s *BoltLogStore) LastIndex() (uint64, error)
- func (s *BoltLogStore) TrimPrefix(index uint64) error
- func (s *BoltLogStore) TrimSuffix(index uint64) error
- type BoltStateStore
- type BoltStore
- type BufferedReadCloser
- type BufferedWriteCloser
- type CappedSlice
- type Command
- type CounterTimer
- type Future
- type FutureTask
- type GRPCTransport
- func (t *GRPCTransport) AppendEntries(ctx context.Context, peer *pb.Peer, request *pb.AppendEntriesRequest) (*pb.AppendEntriesResponse, error)
- func (t *GRPCTransport) ApplyLog(ctx context.Context, peer *pb.Peer, request *pb.ApplyLogRequest) (*pb.ApplyLogResponse, error)
- func (t *GRPCTransport) Close() error
- func (t *GRPCTransport) Connect(peer *pb.Peer) error
- func (t *GRPCTransport) Disconnect(peer *pb.Peer)
- func (t *GRPCTransport) DisconnectAll()
- func (t *GRPCTransport) Endpoint() string
- func (t *GRPCTransport) InstallSnapshot(ctx context.Context, peer *pb.Peer, requestMeta *pb.InstallSnapshotRequestMeta, ...) (*pb.InstallSnapshotResponse, error)
- func (t *GRPCTransport) RPC() <-chan *RPC
- func (t *GRPCTransport) RequestVote(ctx context.Context, peer *pb.Peer, request *pb.RequestVoteRequest) (*pb.RequestVoteResponse, error)
- func (t *GRPCTransport) Serve() error
- type HandyEncoding
- type HandyRespWriter
- func (rw *HandyRespWriter) Encoded(v interface{}, e HandyEncoding, statusCode int)
- func (rw *HandyRespWriter) Error(err error)
- func (rw *HandyRespWriter) JSON(v interface{})
- func (rw *HandyRespWriter) JSONFunc(fn func() (v interface{}, statusCode int, err error))
- func (rw *HandyRespWriter) JSONStatus(v interface{}, statusCode int)
- type InstallSnapshotRequest
- type LogStore
- type MetricsExporter
- type ObjectID
- func (id ObjectID) Hex() string
- func (id ObjectID) IsZero() bool
- func (id ObjectID) MarshalJSON() ([]byte, error)
- func (id ObjectID) MarshalText() ([]byte, error)
- func (id ObjectID) String() string
- func (id ObjectID) Timestamp() time.Time
- func (id *ObjectID) UnmarshalJSON(b []byte) error
- func (id *ObjectID) UnmarshalText(b []byte) error
- type RPC
- type Server
- func (s *Server) Apply(ctx context.Context, body *pb.LogBody) FutureTask[*pb.LogMeta, *pb.LogBody]
- func (s *Server) ApplyCommand(ctx context.Context, command Command) FutureTask[*pb.LogMeta, *pb.LogBody]
- func (s *Server) Endpoint() string
- func (s *Server) Id() string
- func (s *Server) Info() ServerInfo
- func (s *Server) Leader() *pb.Peer
- func (s *Server) Register(peer *pb.Peer) error
- func (s *Server) Serve() error
- func (s *Server) Shutdown(err error)
- func (s *Server) StateMachine() StateMachine
- func (s *Server) States() ServerStates
- type ServerCoreOptions
- type ServerInfo
- type ServerOption
- func APIExtensionOption(extension APIExtension) ServerOption
- func APIServerListenAddressOption(address string) ServerOption
- func ElectionTimeoutOption(timeout time.Duration) ServerOption
- func FollowerTimeoutOption(timeout time.Duration) ServerOption
- func LogLevelOption(level zapcore.Level) ServerOption
- func MetricsKeeperOption(exporter MetricsExporter) ServerOption
- func SnapshotPolicyOption(policy SnapshotPolicy) ServerOption
- type ServerRole
- type ServerStates
- type SingleFlight
- type SnapshatStore
- type Snapshot
- type SnapshotMeta
- type SnapshotPolicy
- type SnapshotSink
- type StableStore
- type StateMachine
- type StateMachineSnapshot
- type StateStore
- type StreamAverage
- type StreamMinMaxFloat64
- type StreamMinMaxInt64
- type TickTrigger
- type Transport
- type TransportCloser
- type TransportConnecter
- type TransportServer
Constants ¶
const (
MetricGoroutines = "goroutines"
)
Variables ¶
var ( ErrDeadlineExceeded = errors.New("deadline exceeded") // ErrServerShutdown indicates that the server was already shutted // down or the server shutting down is in progress. ErrServerShutdown = errors.New("server shutdown") // ErrNonLeader indicates that the server received an RPC that cannot // be processed on non-leader server. ErrNonLeader = errors.New("not a leader") // ErrNonFollower indicates that the server received an RPC that cannot // be processed on non-follower server. ErrNonFollower = errors.New("not a follower") // ErrUnrecognizedRPC indicates that the server has received an // unrecongized RPC request. ErrUnrecognizedRPC = errors.New("unrecognized RPC request") // ErrInJointConsensus indicates that the server is already in a joint // consensus. ErrInJointConsensus = errors.New("already in a joint consensus") // ErrInJointConsensus indicates that the server is not in a joint consensus. ErrNotInJointConsensus = errors.New("not in a joint consensus") ErrUnknownTransporClient = errors.New("unknown transport client") ErrUnknownRPC = errors.New("unknown RPC") )
var ErrInvalidHex = errors.New("the provided hex string is not a valid ObjectID")
ErrInvalidHex indicates that a hex string cannot be converted to an ObjectID.
Functions ¶
func DecodeUint64 ¶
func EncodeUint64 ¶
func IsValidObjectID ¶
IsValidObjectID returns true if the provided hex string represents a valid ObjectID and false if not.
Types ¶
type BoltLogStore ¶
type BoltLogStore struct {
// contains filtered or unexported fields
}
BoltLogStore is a LogStore that uses bbolt as a backend.
func NewBoltLogStore ¶
func NewBoltLogStore(db *bbolt.DB) *BoltLogStore
func (*BoltLogStore) AppendLogs ¶
func (s *BoltLogStore) AppendLogs(logs []*pb.Log) error
func (*BoltLogStore) Close ¶
func (p *BoltLogStore) Close() error
func (*BoltLogStore) DebugPrint ¶
func (s *BoltLogStore) DebugPrint()
func (*BoltLogStore) FirstIndex ¶
func (s *BoltLogStore) FirstIndex() (uint64, error)
func (*BoltLogStore) LastIndex ¶
func (s *BoltLogStore) LastIndex() (uint64, error)
func (*BoltLogStore) TrimPrefix ¶
func (s *BoltLogStore) TrimPrefix(index uint64) error
func (*BoltLogStore) TrimSuffix ¶
func (s *BoltLogStore) TrimSuffix(index uint64) error
type BoltStateStore ¶
type BoltStateStore struct {
// contains filtered or unexported fields
}
func NewBoltStateStore ¶
func NewBoltStateStore(db *bbolt.DB) *BoltStateStore
func (*BoltStateStore) CurrentTerm ¶
func (s *BoltStateStore) CurrentTerm() (uint64, error)
func (*BoltStateStore) LastVote ¶
func (s *BoltStateStore) LastVote() (voteSummary, error)
func (*BoltStateStore) SetCurrentTerm ¶
func (s *BoltStateStore) SetCurrentTerm(currentTerm uint64) error
func (*BoltStateStore) SetLastVote ¶
func (s *BoltStateStore) SetLastVote(summary voteSummary) error
type BufferedReadCloser ¶
type BufferedReadCloser struct {
// contains filtered or unexported fields
}
func NewBufferedReadCloser ¶
func NewBufferedReadCloser(r io.ReadCloser) *BufferedReadCloser
func (*BufferedReadCloser) Close ¶
func (r *BufferedReadCloser) Close() error
type BufferedWriteCloser ¶
type BufferedWriteCloser struct {
// contains filtered or unexported fields
}
func NewBufferedWriteCloser ¶
func NewBufferedWriteCloser(w io.WriteCloser) *BufferedWriteCloser
func (*BufferedWriteCloser) Close ¶
func (w *BufferedWriteCloser) Close() error
func (*BufferedWriteCloser) Flush ¶
func (w *BufferedWriteCloser) Flush() error
type CappedSlice ¶
type CappedSlice struct {
// contains filtered or unexported fields
}
CappedSlice is a ring buffer like slice which holds a maximum of `cap` items. When the slice overflows, older items will be evicted first.
func NewCappedSlice ¶
func NewCappedSlice(cap int) *CappedSlice
func (*CappedSlice) Push ¶
func (c *CappedSlice) Push(v interface{})
func (*CappedSlice) Range ¶
func (c *CappedSlice) Range(fn func(i int, v interface{}) (cont bool))
type CounterTimer ¶
type CounterTimer struct {
// contains filtered or unexported fields
}
func NewCounterTimer ¶
func NewCounterTimer(counts int, interval time.Duration) *CounterTimer
func (*CounterTimer) C ¶
func (t *CounterTimer) C() <-chan struct{}
func (*CounterTimer) Count ¶
func (t *CounterTimer) Count()
func (*CounterTimer) Stop ¶
func (t *CounterTimer) Stop()
func (*CounterTimer) StopC ¶
func (t *CounterTimer) StopC() <-chan struct{}
type FutureTask ¶
type GRPCTransport ¶
type GRPCTransport struct {
// contains filtered or unexported fields
}
func NewGRPCTransport ¶
func NewGRPCTransport(listenAddr string) (*GRPCTransport, error)
func (*GRPCTransport) AppendEntries ¶
func (t *GRPCTransport) AppendEntries( ctx context.Context, peer *pb.Peer, request *pb.AppendEntriesRequest, ) (*pb.AppendEntriesResponse, error)
func (*GRPCTransport) ApplyLog ¶
func (t *GRPCTransport) ApplyLog( ctx context.Context, peer *pb.Peer, request *pb.ApplyLogRequest, ) (*pb.ApplyLogResponse, error)
func (*GRPCTransport) Close ¶
func (t *GRPCTransport) Close() error
func (*GRPCTransport) Disconnect ¶
func (t *GRPCTransport) Disconnect(peer *pb.Peer)
func (*GRPCTransport) DisconnectAll ¶
func (t *GRPCTransport) DisconnectAll()
func (*GRPCTransport) Endpoint ¶
func (t *GRPCTransport) Endpoint() string
func (*GRPCTransport) InstallSnapshot ¶
func (t *GRPCTransport) InstallSnapshot( ctx context.Context, peer *pb.Peer, requestMeta *pb.InstallSnapshotRequestMeta, reader io.Reader, ) (*pb.InstallSnapshotResponse, error)
func (*GRPCTransport) RPC ¶
func (t *GRPCTransport) RPC() <-chan *RPC
func (*GRPCTransport) RequestVote ¶
func (t *GRPCTransport) RequestVote( ctx context.Context, peer *pb.Peer, request *pb.RequestVoteRequest, ) (*pb.RequestVoteResponse, error)
func (*GRPCTransport) Serve ¶
func (t *GRPCTransport) Serve() error
type HandyEncoding ¶
type HandyEncoding string
const ( HandyEncodingJSON HandyEncoding = "json" HandyEncodingBase64 HandyEncoding = "base64" HandyEncodingRaw HandyEncoding = "raw" )
type HandyRespWriter ¶
type HandyRespWriter struct { http.ResponseWriter // contains filtered or unexported fields }
func NewHandyRespWriter ¶
func NewHandyRespWriter(w http.ResponseWriter, logger *zap.Logger) (h HandyRespWriter)
func (*HandyRespWriter) Encoded ¶
func (rw *HandyRespWriter) Encoded(v interface{}, e HandyEncoding, statusCode int)
func (*HandyRespWriter) Error ¶
func (rw *HandyRespWriter) Error(err error)
func (*HandyRespWriter) JSON ¶
func (rw *HandyRespWriter) JSON(v interface{})
func (*HandyRespWriter) JSONFunc ¶
func (rw *HandyRespWriter) JSONFunc(fn func() (v interface{}, statusCode int, err error))
func (*HandyRespWriter) JSONStatus ¶
func (rw *HandyRespWriter) JSONStatus(v interface{}, statusCode int)
type InstallSnapshotRequest ¶
type InstallSnapshotRequest struct { Metadata *pb.InstallSnapshotRequestMeta Reader io.ReadCloser }
type LogStore ¶
type LogStore interface { // AppendLogs is used to append logs to the LogStore. // It's recommended to use techniques like transaction processing to // avoid data inconsistency due to an error or interruption. AppendLogs(logs []*pb.Log) error // TrimPrefix is used to trim the logs by evicting UNPACKED logs forwards from // the first log until the log with the index is reached. Index is exclusive. TrimPrefix(index uint64) error // TrimSuffix is used to trim the logs by evicting UNPACKED logs backwards from // the last log until the log with the index is reached. Index is exclusive. TrimSuffix(index uint64) error FirstIndex() (uint64, error) LastIndex() (uint64, error) Entry(index uint64) (*pb.Log, error) // LastEntry is used to find the last log entry. // If t is not zero, a log type filter should be applied. LastEntry(t pb.LogType) (*pb.Log, error) }
LogStore defines the interface for appending, trimming, and retrieving logs in a stable store. A LogStore implementation can also implement the optional io.Closer interface to allow releasing the underlying resources it has acquired.
type MetricsExporter ¶
type ObjectID ¶
type ObjectID [12]byte
ObjectID is the BSON ObjectID type.
var NilObjectID ObjectID
NilObjectID is the zero value for ObjectID.
func NewObjectIDFromTimestamp ¶
NewObjectIDFromTimestamp generates a new ObjectID based on the given time.
func ObjectIDFromHex ¶
ObjectIDFromHex creates a new ObjectID from a hex string. It returns an error if the hex string is not a valid ObjectID.
func (ObjectID) MarshalJSON ¶
MarshalJSON returns the ObjectID as a string
func (ObjectID) MarshalText ¶
MarshalText returns the ObjectID as UTF-8-encoded text. Implementing this allows us to use ObjectID as a map key when marshalling JSON. See https://pkg.go.dev/encoding#TextMarshaler
func (*ObjectID) UnmarshalJSON ¶
UnmarshalJSON populates the byte slice with the ObjectID. If the byte slice is 24 bytes long, it will be populated with the hex representation of the ObjectID. If the byte slice is twelve bytes long, it will be populated with the BSON representation of the ObjectID. This method also accepts empty strings and decodes them as NilObjectID. For any other inputs, an error will be returned.
func (*ObjectID) UnmarshalText ¶
UnmarshalText populates the byte slice with the ObjectID. Implementing this allows us to use ObjectID as a map key when unmarshalling JSON. See https://pkg.go.dev/encoding#TextUnmarshaler
type Server ¶
type Server struct {
// contains filtered or unexported fields
}
func NewServer ¶
func NewServer(coreOpts ServerCoreOptions, opts ...ServerOption) (*Server, error)
func (*Server) ApplyCommand ¶
func (s *Server) ApplyCommand(ctx context.Context, command Command) FutureTask[*pb.LogMeta, *pb.LogBody]
ApplyCommand. Future(LogMeta, error)
func (*Server) Info ¶
func (s *Server) Info() ServerInfo
func (*Server) Register ¶
Register is used to register a server to current cluster. ErrInJointConsensus is returned when the server is already in a joint consensus.
func (*Server) StateMachine ¶
func (s *Server) StateMachine() StateMachine
func (*Server) States ¶
func (s *Server) States() ServerStates
type ServerCoreOptions ¶
type ServerCoreOptions struct { Id string InitialCluster []*pb.Peer StableStore StableStore StateMachine StateMachine SnapshotStore SnapshatStore Transport Transport }
type ServerInfo ¶
type ServerOption ¶
type ServerOption func(options *serverOptions)
func APIExtensionOption ¶
func APIExtensionOption(extension APIExtension) ServerOption
func APIServerListenAddressOption ¶
func APIServerListenAddressOption(address string) ServerOption
func ElectionTimeoutOption ¶
func ElectionTimeoutOption(timeout time.Duration) ServerOption
func FollowerTimeoutOption ¶
func FollowerTimeoutOption(timeout time.Duration) ServerOption
func LogLevelOption ¶
func LogLevelOption(level zapcore.Level) ServerOption
func MetricsKeeperOption ¶
func MetricsKeeperOption(exporter MetricsExporter) ServerOption
func SnapshotPolicyOption ¶
func SnapshotPolicyOption(policy SnapshotPolicy) ServerOption
type ServerRole ¶
type ServerRole uint32
const ( Leader ServerRole = 1 + iota Candidate Follower )
func (ServerRole) String ¶
func (r ServerRole) String() string
type ServerStates ¶
type ServerStates struct { ID string `json:"id"` Endpoint string `json:"endpoint"` Leader *pb.Peer `json:"leader"` Role string `json:"role"` CurrentTerm uint64 `json:"current_term"` LastLogIndex uint64 `json:"last_log_index"` LastVoteTerm uint64 `json:"last_vote_term"` LastVoteCandidate string `json:"last_vote_candidate"` CommitIndex uint64 `json:"commit_index"` }
type SingleFlight ¶
type SingleFlight[T any] struct { // contains filtered or unexported fields }
SingleFlight executes the function only once on the first Do() call, returns the function return value and save it for future Do() calls.
func (*SingleFlight[T]) Do ¶
func (s *SingleFlight[T]) Do(f func() T) T
type SnapshatStore ¶
type SnapshatStore interface { Create(index, term uint64, c *pb.Configuration, cIndex uint64) (SnapshotSink, error) List() ([]SnapshotMeta, error) Open(id string) (Snapshot, error) DecodeMeta(b []byte) (SnapshotMeta, error) Trim() error }
type Snapshot ¶
type Snapshot interface { Meta() (SnapshotMeta, error) Reader() (io.Reader, error) // Close is used to close the snapshot's underlying file descriptors or handles. Close() error }
Snapshot is a descriptor that holds the snapshot file.
type SnapshotMeta ¶
type SnapshotPolicy ¶
type SnapshotSink ¶
type SnapshotSink interface { io.WriteCloser Meta() SnapshotMeta Cancel() error }
type StableStore ¶
type StableStore interface { LogStore StateStore }
type StateMachine ¶
type StateMachine interface { Apply(command Command) Snapshot() (StateMachineSnapshot, error) Restore(snapshot Snapshot) error }
type StateMachineSnapshot ¶
type StateMachineSnapshot interface {
Write(sink SnapshotSink) error
}
type StateStore ¶
type StateStore interface { CurrentTerm() (uint64, error) SetCurrentTerm(term uint64) error LastVote() (voteSummary, error) SetLastVote(summary voteSummary) error }
StateStore defines the interface to save and restore the persistent server states from a stable store.
type StreamAverage ¶
type StreamAverage struct {
// contains filtered or unexported fields
}
func (*StreamAverage) Avg ¶
func (a *StreamAverage) Avg() float64
func (*StreamAverage) N ¶
func (a *StreamAverage) N() int
func (*StreamAverage) Push ¶
func (a *StreamAverage) Push(v float64) float64
type StreamMinMaxFloat64 ¶
type StreamMinMaxFloat64 struct {
// contains filtered or unexported fields
}
func (*StreamMinMaxFloat64) Max ¶
func (a *StreamMinMaxFloat64) Max() float64
func (*StreamMinMaxFloat64) Min ¶
func (a *StreamMinMaxFloat64) Min() float64
func (*StreamMinMaxFloat64) N ¶
func (a *StreamMinMaxFloat64) N() int
func (*StreamMinMaxFloat64) Push ¶
func (a *StreamMinMaxFloat64) Push(v float64) (min, max float64)
type StreamMinMaxInt64 ¶
type StreamMinMaxInt64 struct {
// contains filtered or unexported fields
}
func (*StreamMinMaxInt64) Cap ¶
func (a *StreamMinMaxInt64) Cap(cap int) int64
func (*StreamMinMaxInt64) Max ¶
func (a *StreamMinMaxInt64) Max() int64
func (*StreamMinMaxInt64) Min ¶
func (a *StreamMinMaxInt64) Min() int64
func (*StreamMinMaxInt64) N ¶
func (a *StreamMinMaxInt64) N() int
func (*StreamMinMaxInt64) Push ¶
func (a *StreamMinMaxInt64) Push(v int64) (min, max int64)
type TickTrigger ¶
type TickTrigger struct {
// contains filtered or unexported fields
}
func NewTickTrigger ¶
func NewTickTrigger(ticks int64, timeout time.Duration, fn func()) *TickTrigger
func (*TickTrigger) Tick ¶
func (t *TickTrigger) Tick()
type Transport ¶
type Transport interface { // Endpoint returns the endpoint used by current Transport instance Endpoint() string AppendEntries(ctx context.Context, peer *pb.Peer, request *pb.AppendEntriesRequest) (*pb.AppendEntriesResponse, error) RequestVote(ctx context.Context, peer *pb.Peer, request *pb.RequestVoteRequest) (*pb.RequestVoteResponse, error) InstallSnapshot(ctx context.Context, peer *pb.Peer, requestMeta *pb.InstallSnapshotRequestMeta, reader io.Reader) (*pb.InstallSnapshotResponse, error) ApplyLog(ctx context.Context, peer *pb.Peer, request *pb.ApplyLogRequest) (*pb.ApplyLogResponse, error) RPC() <-chan *RPC }
type TransportCloser ¶
type TransportCloser interface {
Close() error
}
TransportCloser is an optional interface for those implementations that allow explicit close operation on its underlying connections.
type TransportConnecter ¶
type TransportConnecter interface { Connect(peer *pb.Peer) error Disconnect(peer *pb.Peer) DisconnectAll() }
TransportConnecter is an optional interface for those implementations that allow explicit connect and disconnect operations on a per peer basis.
type TransportServer ¶
type TransportServer interface {
Serve() error
}
Source Files ¶
- apiserver.go
- assert.go
- command.go
- configuration.go
- context.go
- ctl.go
- errors.go
- future.go
- http.go
- log.go
- log_bolt.go
- log_internal.go
- logger.go
- metric.go
- nocopy.go
- objectid.go
- option.go
- replication.go
- rpc.go
- server.go
- signal.go
- singleflight.go
- snapshot.go
- stable.go
- state.go
- state_bolt.go
- state_internal.go
- statemachine.go
- store_bolt.go
- store_internal.go
- transport.go
- transport_grpc.go
- transport_internal.go
- transport_testing.go
- util.go