Documentation ¶
Index ¶
- Constants
- Variables
- type Addresses
- type AppendEntriesRequest
- type AppendEntriesResponse
- type ClusterRequest
- type ClusterResponse
- type Configuration
- type FSM
- type FSMSnapshot
- type FileSnapshotSink
- type FsmReadRequest
- type FsmReadResponse
- type Future
- type Handler
- type Handlers
- type HeartbeatPayload
- type HeartbeatRequest
- type HeartbeatResponse
- type InstallSnapshotRequest
- type InstallSnapshotResponse
- type LeaderOptions
- type Log
- type LogMeta
- type LogType
- type Message
- type MessageReader
- type MessageRequestType
- type MessageWriter
- type Node
- type Options
- type OptionsBuilder
- func (builder *OptionsBuilder) Addresses(addresses Addresses)
- func (builder *OptionsBuilder) Fsm(fsm FSM)
- func (builder *OptionsBuilder) Id(id string)
- func (builder *OptionsBuilder) LeaderCommitTimeout(timeout time.Duration)
- func (builder *OptionsBuilder) LeaderElectionTimeout(timeout time.Duration)
- func (builder *OptionsBuilder) LeaderHeartbeatTimeout(timeout time.Duration)
- func (builder *OptionsBuilder) LeaderLeaseTimeout(timeout time.Duration)
- func (builder *OptionsBuilder) Nonvoter(yes bool)
- func (builder *OptionsBuilder) SnapshotInterval(interval time.Duration)
- func (builder *OptionsBuilder) SnapshotNotRestoreOnStart(yes bool)
- func (builder *OptionsBuilder) SnapshotStore(store SnapshotStore)
- func (builder *OptionsBuilder) SnapshotThreshold(threshold uint64)
- func (builder *OptionsBuilder) SnapshotTrailingLogs(trailingLogs uint64)
- func (builder *OptionsBuilder) Store(store Store)
- func (builder *OptionsBuilder) TLS(v TLS)
- func (builder *OptionsBuilder) Transport(transport Transport)
- type Promise
- type RPC
- type RPCHeader
- type Raft
- type Server
- type ServerSuffrage
- type SnapshotMeta
- type SnapshotMetas
- type SnapshotOptions
- type SnapshotSink
- type SnapshotStore
- type State
- type Store
- type TLS
- type TimeoutNowRequest
- type TimeoutNowResponse
- type Transport
- type Trunk
- type VoteRequest
- type VoteResponse
Constants ¶
View Source
const ( AppendEntriesRequestType = MessageRequestType(iota + 1) AppendEntriesResponseType VoteRequestType VoteResponseType InstallSnapshotRequestType InstallSnapshotResponseType TimeoutNowRequestType TimeoutNowResponseType HeartbeatRequestType HeartbeatResponseType ClusterRequestType ClusterResponseType FsmReadRequestType FsmReadResponseType )
Variables ¶
View Source
var ( ErrLogNotFound = fmt.Errorf("log not found") ErrNotFound = fmt.Errorf("data not found") )
View Source
var (
FutureWaitTimeoutErr = fmt.Errorf("future wait timeout")
)
Functions ¶
This section is empty.
Types ¶
type Addresses ¶
type Addresses interface { Local() (local string, err error) Members() (addresses []string, err error) }
func DesignatedAddresses ¶
type AppendEntriesRequest ¶
type AppendEntriesRequest struct { RPCHeader Term uint64 PrevLogEntry uint64 PrevLogTerm uint64 LeaderCommitIndex uint64 Key []byte Entries []*Log }
func (*AppendEntriesRequest) Decode ¶
func (request *AppendEntriesRequest) Decode(msg MessageReader) (err error)
func (*AppendEntriesRequest) Encode ¶
func (request *AppendEntriesRequest) Encode() (writer MessageWriter, err error)
type AppendEntriesResponse ¶
type AppendEntriesResponse struct { RPCHeader Term uint64 LastLog uint64 Succeed bool NoRetryBackoff bool }
func (*AppendEntriesResponse) Decode ¶
func (response *AppendEntriesResponse) Decode(msg MessageReader) (err error)
func (*AppendEntriesResponse) Encode ¶
func (response *AppendEntriesResponse) Encode() (writer MessageWriter, err error)
type ClusterRequest ¶
func (*ClusterRequest) Decode ¶
func (request *ClusterRequest) Decode(msg MessageReader) (err error)
func (*ClusterRequest) Encode ¶
func (request *ClusterRequest) Encode() (writer MessageWriter, err error)
type ClusterResponse ¶
func (*ClusterResponse) Decode ¶
func (response *ClusterResponse) Decode(msg MessageReader) (err error)
func (*ClusterResponse) Encode ¶
func (response *ClusterResponse) Encode() (writer MessageWriter, err error)
type Configuration ¶
type Configuration struct {
Servers []Server
}
type FSMSnapshot ¶
type FSMSnapshot interface { Persist(sink SnapshotSink) error Release() }
type FileSnapshotSink ¶
type FileSnapshotSink struct {
// contains filtered or unexported fields
}
func (*FileSnapshotSink) Cancel ¶
func (sink *FileSnapshotSink) Cancel() (err error)
func (*FileSnapshotSink) Close ¶
func (sink *FileSnapshotSink) Close() (err error)
func (*FileSnapshotSink) Id ¶
func (sink *FileSnapshotSink) Id() (id string)
type FsmReadRequest ¶
func (*FsmReadRequest) Decode ¶
func (request *FsmReadRequest) Decode(msg MessageReader) (err error)
func (*FsmReadRequest) Encode ¶
func (request *FsmReadRequest) Encode() (writer MessageWriter, err error)
type FsmReadResponse ¶
func (*FsmReadResponse) Decode ¶
func (response *FsmReadResponse) Decode(msg MessageReader) (err error)
func (*FsmReadResponse) Encode ¶
func (response *FsmReadResponse) Encode() (writer MessageWriter, err error)
type Handler ¶
type Handler struct { }
func (*Handler) Handle ¶
func (handler *Handler) Handle(request *AppendEntriesRequest) (response *AppendEntriesResponse)
type Handlers ¶
type Handlers struct {
// contains filtered or unexported fields
}
func (*Handlers) Dispatch ¶
func (handlers *Handlers) Dispatch(request *AppendEntriesRequest) (response *AppendEntriesResponse)
type HeartbeatPayload ¶
type HeartbeatRequest ¶
type HeartbeatRequest struct { RPCHeader Term uint64 Key []byte // contains filtered or unexported fields }
func (*HeartbeatRequest) Decode ¶
func (request *HeartbeatRequest) Decode(msg MessageReader) (err error)
func (*HeartbeatRequest) Encode ¶
func (request *HeartbeatRequest) Encode() (writer MessageWriter, err error)
type HeartbeatResponse ¶
type HeartbeatResponse struct {
RPCHeader
}
func (*HeartbeatResponse) Decode ¶
func (response *HeartbeatResponse) Decode(msg MessageReader) (err error)
func (*HeartbeatResponse) Encode ¶
func (response *HeartbeatResponse) Encode() (writer MessageWriter, err error)
type InstallSnapshotRequest ¶
type InstallSnapshotRequest struct { RPCHeader Term uint64 LastLogIndex uint64 LastLogTerm uint64 ConfigurationIndex uint64 Size uint64 Leader []byte Configuration []byte Snapshot io.Reader }
func (*InstallSnapshotRequest) Decode ¶
func (request *InstallSnapshotRequest) Decode(msg MessageReader) (err error)
func (*InstallSnapshotRequest) Encode ¶
func (request *InstallSnapshotRequest) Encode() (writer MessageWriter, err error)
type InstallSnapshotResponse ¶
func (*InstallSnapshotResponse) Decode ¶
func (response *InstallSnapshotResponse) Decode(msg MessageReader) (err error)
func (*InstallSnapshotResponse) Encode ¶
func (response *InstallSnapshotResponse) Encode() (writer MessageWriter, err error)
type LeaderOptions ¶
type LeaderOptions struct { HeartbeatTimeout time.Duration ElectionTimeout time.Duration CommitTimeout time.Duration // LeaseTimeout // 用于控制“租约”的持续时间 // 作为领导者而无法联系法定人数 // 个节点。如果我们在没有联系的情况下达到这个间隔,我们将 // 辞去领导职务。 LeaseTimeout time.Duration }
func (LeaderOptions) Verify ¶
func (options LeaderOptions) Verify() (err error)
type Message ¶
type Message struct {
// contains filtered or unexported fields
}
Message +---------------------------------------------------------+-----------+-----------+ | Header | Body | Trunk | +---------------------+-----------------+-----------------+-----------+-----------+ | 4(BigEndian) | 2(BigEndian) | 2(BigEndian) | n | reader | +---------------------+-----------------+-----------------+-----------+-----------+ | Len(data) | request type | kind | data | snappy | +---------------------+-----------------+-----------------+-----------+-----------+
func (*Message) RequestType ¶
func (msg *Message) RequestType() (typ MessageRequestType)
type MessageReader ¶
type MessageReader interface { ReadFrom(r io.Reader) (n int64, err error) RequestType() (typ MessageRequestType) Bytes() (p []byte) Trunk() (trunk *Trunk, has bool) }
func NewMessageReader ¶
func NewMessageReader() (msg MessageReader)
type MessageRequestType ¶
type MessageRequestType uint16
type MessageWriter ¶
func NewMessageWriter ¶
func NewMessageWriter(requestType MessageRequestType, data []byte) (msg MessageWriter)
func NewMessageWriterWithTrunk ¶
func NewMessageWriterWithTrunk(requestType MessageRequestType, data []byte, sink io.Reader) (msg MessageWriter)
type Options ¶
type Options struct { Id string Addresses Addresses Nonvoter bool TLS TLS Leader LeaderOptions Snapshot SnapshotOptions Store Store FSM FSM Transport Transport }
type OptionsBuilder ¶
type OptionsBuilder struct {
// contains filtered or unexported fields
}
func NewOptionsBuilder ¶
func NewOptionsBuilder() (builder *OptionsBuilder)
func (*OptionsBuilder) Addresses ¶
func (builder *OptionsBuilder) Addresses(addresses Addresses)
func (*OptionsBuilder) Fsm ¶
func (builder *OptionsBuilder) Fsm(fsm FSM)
func (*OptionsBuilder) Id ¶
func (builder *OptionsBuilder) Id(id string)
func (*OptionsBuilder) LeaderCommitTimeout ¶
func (builder *OptionsBuilder) LeaderCommitTimeout(timeout time.Duration)
func (*OptionsBuilder) LeaderElectionTimeout ¶
func (builder *OptionsBuilder) LeaderElectionTimeout(timeout time.Duration)
func (*OptionsBuilder) LeaderHeartbeatTimeout ¶
func (builder *OptionsBuilder) LeaderHeartbeatTimeout(timeout time.Duration)
func (*OptionsBuilder) LeaderLeaseTimeout ¶
func (builder *OptionsBuilder) LeaderLeaseTimeout(timeout time.Duration)
func (*OptionsBuilder) Nonvoter ¶
func (builder *OptionsBuilder) Nonvoter(yes bool)
func (*OptionsBuilder) SnapshotInterval ¶
func (builder *OptionsBuilder) SnapshotInterval(interval time.Duration)
func (*OptionsBuilder) SnapshotNotRestoreOnStart ¶
func (builder *OptionsBuilder) SnapshotNotRestoreOnStart(yes bool)
func (*OptionsBuilder) SnapshotStore ¶
func (builder *OptionsBuilder) SnapshotStore(store SnapshotStore)
func (*OptionsBuilder) SnapshotThreshold ¶
func (builder *OptionsBuilder) SnapshotThreshold(threshold uint64)
func (*OptionsBuilder) SnapshotTrailingLogs ¶
func (builder *OptionsBuilder) SnapshotTrailingLogs(trailingLogs uint64)
func (*OptionsBuilder) Store ¶
func (builder *OptionsBuilder) Store(store Store)
func (*OptionsBuilder) TLS ¶
func (builder *OptionsBuilder) TLS(v TLS)
func (*OptionsBuilder) Transport ¶
func (builder *OptionsBuilder) Transport(transport Transport)
type RPC ¶
type RPC interface { Encode() (writer MessageWriter, err error) Decode(msg MessageReader) (err error) }
type Raft ¶
type Raft interface { // Run start node // // check cluster was serving, when serving, then call leader to add this node, // when not serving, then boot a cluster. Run(ctx context.Context) (err error) // Close shutdown node // // check leader, when this node is leader, then just shutdown, // when this nod is not leader, then call leader to remove this node and shutdown. Close(ctx context.Context) (err error) // Apply make fsm to apply a log Apply(ctx context.Context, key []byte, body []byte, timeout time.Duration) (result []byte, err error) }
type Server ¶
type Server struct { Suffrage ServerSuffrage Id string Address string }
type ServerSuffrage ¶
type ServerSuffrage int
const ( Voter ServerSuffrage = iota + 1 Nonvoter )
func (ServerSuffrage) String ¶
func (s ServerSuffrage) String() string
type SnapshotMeta ¶
type SnapshotMeta struct { Id string Index uint64 Term uint64 Configuration Configuration ConfigurationIndex uint64 Size uint64 CRC []byte CreateAT time.Time }
func (*SnapshotMeta) Encode ¶
func (meta *SnapshotMeta) Encode() (p []byte)
type SnapshotMetas ¶
type SnapshotMetas []*SnapshotMeta
func (SnapshotMetas) Len ¶
func (metas SnapshotMetas) Len() int
func (SnapshotMetas) Less ¶
func (metas SnapshotMetas) Less(i, j int) bool
func (SnapshotMetas) Swap ¶
func (metas SnapshotMetas) Swap(i, j int)
type SnapshotOptions ¶
type SnapshotOptions struct { Store SnapshotStore // TrailingLogs控制快照后留下的日志数量。这是为了让我们可以快速回放跟踪者的日志,而不是被迫发送整个快照。此处传递的值是使用的初始设置。这可以在运行期间使用ReloadConfig进行调整。 TrailingLogs uint64 // SnapshotThreshold控制在执行快照之前必须有多少未完成的日志。这是为了通过重放一小组日志来防止过度快照。此处传递的值是使用的初始设置。这可以在运行期间使用ReloadConfig进行调整。 Threshold uint64 Interval time.Duration NoSnapshotRestoreOnStart bool }
func (SnapshotOptions) Verify ¶
func (options SnapshotOptions) Verify() (err error)
type SnapshotSink ¶
type SnapshotSink interface { io.WriteCloser Id() string Cancel() error }
type SnapshotStore ¶
type SnapshotStore interface { Create(index uint64, term uint64, configuration Configuration, configurationIndex uint64) (sink SnapshotSink, err error) List() (metas []*SnapshotMeta, err error) Open(id string) (meta *SnapshotMeta, reader io.ReadCloser, err error) }
func FileSnapshotStore ¶
func FileSnapshotStore(dir string) (store SnapshotStore, err error)
type Store ¶
type Store interface { AcquireIndex() (term uint64, index uint64, err error) Meta() (meta LogMeta) FirstIndex() (index uint64, err error) LastIndex() (index uint64, err error) Read(index uint64) (log *Log, err error) Write(logs ...*Log) (err error) Commit(logs ...*Log) (err error) Remove(logs ...*Log) (err error) Close() }
type TimeoutNowRequest ¶
type TimeoutNowRequest struct {
RPCHeader
}
func (*TimeoutNowRequest) Decode ¶
func (request *TimeoutNowRequest) Decode(msg MessageReader) (err error)
func (*TimeoutNowRequest) Encode ¶
func (request *TimeoutNowRequest) Encode() (writer MessageWriter, err error)
type TimeoutNowResponse ¶
type TimeoutNowResponse struct {
RPCHeader
}
func (*TimeoutNowResponse) Decode ¶
func (response *TimeoutNowResponse) Decode(msg MessageReader) (err error)
func (*TimeoutNowResponse) Encode ¶
func (response *TimeoutNowResponse) Encode() (writer MessageWriter, err error)
type Transport ¶
type Transport interface { Dial(address string) (conn net.Conn, err error) Listen(address string) (ln net.Listener, err error) }
func TcpTransport ¶
func TcpTransport() Transport
type Trunk ¶
type Trunk struct {
// contains filtered or unexported fields
}
func (*Trunk) SetLimiter ¶
type VoteRequest ¶
type VoteRequest struct { RPCHeader Term uint64 LastLogIndex uint64 LastLogTerm uint64 LeadershipTransfer bool }
func (*VoteRequest) Decode ¶
func (request *VoteRequest) Decode(msg MessageReader) (err error)
func (*VoteRequest) Encode ¶
func (request *VoteRequest) Encode() (writer MessageWriter, err error)
type VoteResponse ¶
func (*VoteResponse) Decode ¶
func (response *VoteResponse) Decode(msg MessageReader) (err error)
func (*VoteResponse) Encode ¶
func (response *VoteResponse) Encode() (writer MessageWriter, err error)
Source Files ¶
Click to show internal directories.
Click to hide internal directories.