Documentation ¶
Index ¶
- Constants
- Variables
- type AppEnts
- type AppEntsResp
- type BaseMsg
- func (b *BaseMsg) GetEpoch() uint64
- func (b *BaseMsg) GetFrom() string
- func (b *BaseMsg) GetFromGUID() uint64
- func (b *BaseMsg) GetTerm() uint64
- func (b *BaseMsg) GetTo() string
- func (b *BaseMsg) GetToGUID() uint64
- func (b *BaseMsg) SetEpoch(epoch uint64)
- func (b *BaseMsg) SetFrom(from string)
- func (b *BaseMsg) SetFromGUID(from uint64)
- func (b *BaseMsg) SetTerm(term uint64)
- func (b *BaseMsg) SetTo(to string)
- func (b *BaseMsg) SetToGUID(to uint64)
- func (b *BaseMsg) String() string
- type Config
- type Entry
- type FSM
- type InstallSnapshot
- type Log
- type LogIterator
- type Membership
- type Msg
- type Pending
- type Raft
- func (r *Raft) AddNode(node string) *Pending
- func (r *Raft) Propose(cmd []byte) *Pending
- func (r *Raft) ProposeIfTerm(cmd []byte, term uint64) *Pending
- func (r *Raft) ProposeInitialMembership(members []string) *Pending
- func (r *Raft) RemoveNode(node string) *Pending
- func (r *Raft) Start(fsm FSM)
- func (r *Raft) VerifyRead() *Pending
- type SnapshotFileReader
- type SnapshotFileWriter
- type SnapshotManager
- type SnapshotMetadata
- type Snapshoter
- type State
- type Storage
- type Transport
- func NewMemTransport(config TransportConfig) Transport
- func NewMsgDropper(lower Transport, seed int64, defaultProb float32) Transport
- func NewMsgDuplicator(lower Transport, limit int, p float32, seed int64) Transport
- func NewMsgReorder(lower Transport, p float32, limit time.Duration, seed int64) Transport
- type TransportConfig
- type VoteReq
- type VoteResp
Constants ¶
const ( EntryNormal uint8 = iota // Normal command proposed by applications. EntryConf // Membership change command proposed internally by Raft. EntryNOP // NOP command )
Type of Entry.
Variables ¶
var ( // DefaultConfig defines the default configuration for Raft. DefaultConfig = Config{ ClusterID: "default", CandidateTimeout: 100, FollowerTimeout: 100, LeaderStepdownTimeout: 100, RandomElectionRange: 50, HeartbeatTimeout: 20, SnapshotTimeout: 1000, DurationPerTick: 100 * time.Millisecond, MaxNumEntsPerAppEnts: 1000, LogEntriesAfterSnapshot: 10000, SnapshotThreshold: 10000, MaximumPendingProposals: 1000, MaximumProposalBatch: 100, } // DefaultTransportConfig includes default values for Raft transport. DefaultTransportConfig = TransportConfig{ MsgChanCap: 1000, } )
Default configuration values. Applications should override the values below when appropriate.
var ( // ErrNodeNotLeader will be returned if propose commands to non-leader node. ErrNodeNotLeader = errors.New("The node is not leader.") // ErrNotLeaderAnymore will be returned once the leader who proposed the // command has stepped down before committing the command. ErrNotLeaderAnymore = errors.New("Not leader anymore") // ErrTooManyPendingReqs will be returned if number of pending requests exceeds // a certain threshold. ErrTooManyPendingReqs = errors.New("Too many pending requests.") // ErrTermMismatch will be returned if a command can't have the term number // specified by users. ErrTermMismatch = errors.New("Term numbers mismatch") // ErrNodeExists will be returned if users require to add some node that is // already in configuration. ErrNodeExists = errors.New("The node required to be added already exists") // ErrNodeNotExists will be returned if users required to remove some node // that doesn't exist in current configuration. ErrNodeNotExists = errors.New("The node required to be removed doesn't exist") // ErrAlreadyConfigured will be returned if raft is given an initial // configuration when it already has a configuration. ErrAlreadyConfigured = errors.New("Raft can't process an initial configuration after it's already initialized") )
var NilSnapshotMetadata = SnapshotMetadata{0, 0, nil}
NilSnapshotMetadata is a special value that should be returned when there's no snapshot file found.
Functions ¶
This section is empty.
Types ¶
type AppEnts ¶
type AppEnts struct { // 'term' is provided by BaseMsg.Term // 'leaderId' is provied by BaseMsg.From BaseMsg // PrevLogIndex and PrevLogTerm will be used as consistency check. // Follower will only accept the AppEnts request if an entry with term // 'PrevLogTerm' and index 'PrevLogIndex' exists in its log. PrevLogIndex uint64 PrevLogTerm uint64 // The entries that need to be appended to follower's log. // It can be nil if it's just a pure heartbeat message or a probing message. Entries []Entry // The index that can be safely committed by follower. Leader is the only one // who decides which index of entry can be committed, and leader will use this // field to piggyback the commit index to followers so followers can apply // committed commands to their state machines. LeaderCommit uint64 }
AppEnts will be used by leader to:
- Update followers' logs.
- Probe the last agreed point between follower's log and log of itself.
- Send as a heartbeat message to maintain its leadership.
type AppEntsResp ¶
type AppEntsResp struct { BaseMsg // Whether its corresponding AppEnts request succeeded or not. Success bool // If its corresponding AppEnts succeeded, 'Index' will be the index of the last // matched entry in follower's log identified by that request, so leader can // update 'matchIndex' of follower. // // If its corresponding AppEnts failed, 'Index' will be the index used for // consisntecy check(AppEnts.PrevLogIndex) so leader can update follower's // 'nextIdx' for next probing. // // 'Index' field is not specified in paper because the paper phrases the // protocol in terms of RPC, response is paired with its request it can always // derive this 'Index' field from its corresponding request. // However, we decoupled our transport layer from Raft implementation so we // need to keep track of this ourselves. Index uint64 // Hint from follower. If 'Hint' is not 0 leader should decrement the // 'nextIndex' to 'Hint'. We use 'Hint' to bypass all log entries that // a follower is missing, instead of sending one probing message per // log entry. // Stale response with a stale 'Hint' is fine. In that case a leader might // just probe from a stale position. We implement the follower and leader // in a way that processing a stale message is still correct. Hint uint64 }
AppEntsResp is the response of its corresponding AppEnts request.
func (*AppEntsResp) String ¶
func (a *AppEntsResp) String() string
String converts AppEntsResp to a human-readable string.
type BaseMsg ¶
type BaseMsg struct { Term uint64 // The current term number of sender. To string // The ID of receiver. From string // The ID of sender. ToGUID uint64 // The GUID of receiver. FromGUID uint64 // The GUID of sender. Epoch uint64 // The epoch ID. }
BaseMsg contains all common fields of different message types.
func (*BaseMsg) GetFromGUID ¶
GetFromGUID gets the GUID of sender.
func (*BaseMsg) SetFromGUID ¶
SetFromGUID sets the GUID of sender.
type Config ¶
type Config struct { // ID of the node. Users must assign a unique ID to each Raft node in a group. ID string // The ID of the cluster. For now we only use it as a metric label to make our // monitoring system easier to aggregate and analyze metrics from raft nodes // of a same cluster. // TODO: use it to guard against misconfiguration of a cluster? ClusterID string // CandidateTimeout is used with 'RandomElectionRange' by a candidate to start // an election of next term, in units of 'tick'. If a candidate can't be elected // during time "CandidateTimeout + [0, RandomElectionRange]" it will start a new // election for next term. CandidateTimeout uint32 // FollowerTimeout is used with 'RandomElectionRange' by a follower to start an // election of next term, in units of 'tick'. If a follower can't hear anything // from its leader during time "FollowerTimeout + [0, RandomElectionRange]" it // will start a new election of next term. FollowerTimeout uint32 // LeaderStepdownTimeout is used to control how long the leader can maintain // its leadership without being able to contact a quorum of nodes, in units of // 'tick'. Once a leader loses the contact from a quorum after this amount // of time it will step down as leader. 'LeaderStepdownTimeout' must be larger // than 'HeartbeatTimeout' if it's not 0, otherwise the leader might falsely // think it loses its leadership and step down, which will make the whole system // unavailable until a new leader is elected. This is a voluntary decision and // do not depend on this to guarantee there's only one leader exists in cluster. // This is only used to step down a leader node who might already lose its // leadership at some point, so there might be multiple leaders(though only // one is effective) exist at the same time. // // If 'LeaderStepdownTimeout' is 0 then leader will keep its leadership until // a higher term is detected. // // The motivation of using 'LeaderStepdownTimeout' is when a network partiton // happens a stale leader may stay in leader state indefinitely even there's // a newer leader gets elected on the other side. Without realizing it might // already lost its leadership the stale leader will keep returning timeout // error to clients who are connecting to it. But if it steps down from leader // state at some point then clients can get more useful feedbacks(errors about // not being leader anymore) and fail over to the real leader node sooner. LeaderStepdownTimeout uint32 // RandomElectionRange is a range of the randomness we introduced to leader // election to avoid the problem of split votes. Before electing itself as // leader, nodes need to wait for an extra time in range [0, RandomElectionRange] // to avoid split votes. RandomElectionRange uint32 // HeartbeatTimeout is the interval of AppEnts requests, in units of 'tick'. // Leader must send at least one AppEnts request every this interval to maintain // its leadership. HeartbeatTimeout must be smaller than FollowerTimeout, // otherwise an elected leader can not maintain its leadership for a long time. HeartbeatTimeout uint32 // SnapshotTimeout specifies how long it's expected to transfer snapshot AND // recover state from the snapshot. Leader will try to send messages to // followers at some intervals to maintain its leadership or retransmit the // message that might be lost. Because snapshot messages might take a long // time to transfer and apply so we use a different timeout to avoid // retransmitting unnecessary snapshot messages. The timeout should be set to // be large enough to cover the time spent in transferring and recovery. // // TODO: It might be hard to guess a good value beforehand, we could adjust the // timeout value dynamically using: // - based on size of snapshot // - use exponential back-off // Some relevant discussions in ZooKeeper group(https://issues.apache.org/jira/browse/ZOOKEEPER-1977) // (I don't think this has been addressed by them) SnapshotTimeout uint32 // Duration per tick. We separated the logic time("tick") and real time(duration // per tick) to make our implementation more testable. DurationPerTick time.Duration // GenSeed is the function that takes a Raft ID as parameter and generates a // seed number for random number generator of that node. If users don't // specify one, a default seed generator will be used. GenSeed func(string) int64 // MaxNumEntsPerAppEnts specifies the maximum number of entries per AppEnts // message. We don't want to saturate disk/network IO when synchronizing a // straggler. MaxNumEntsPerAppEnts uint32 // LogEntriesAfterSnapshot specifies the number of entries to leave in the // log after a snapshot is taken. Once a snapshot is taken we are OK to // trim all log entries which are included in snapshot, but doing that might // force a leader to ship its entire snapshot to a follower when a follower // is missing anything in the snapshot. By keeping some entries in the log // after a snapshot, we can send those entries to followers to synchronize // them, instead of having to send an entire snapshot. // LogEntriesAfterSnapshot is the maximum number of entries we can trim(included // in snapshot) but keep them untrimmed in the log. LogEntriesAfterSnapshot uint64 // SnapshotThreshold specifies how many applied commands there must be since last // snapshhot before we perform a new snapshot. If it's 0 then no snapshot will // be performed. For example, if you set "SnapshotThreshold" to 1000, then after // every 1000 commands applied to state machine a new snapshot will be taken. // There's a trade-off on this number -- the larger it is, the less frequently // we'll take snapshot so that it would cost less IO, but it might take longer // time in in recovery to apply all log entries that are not included in snapshot. SnapshotThreshold uint64 // MaximumPendingProposals specifies the maximum number of pending proposals // allowed. Further requests will be blocked once the size of pending proposals // reaches this limit. If it is set to 0, no limit will be enforced. MaximumPendingProposals uint32 // MaximumProposalBatch is the maximum number of proposals leader will try to // process them at once from proposal channel. The larger it is, the larger // throughput you might get, but also larger latency per request you might // experience. MaximumProposalBatch uint32 }
Config stores the configurations for core layer.
type FSM ¶
type FSM interface { // Apply will be called once a command is committed in a Raft group. This callback // is called from a single goroutine to ensure that the state updates are applied // in the same order as they arrived. Application must apply the command in the // callback and can optionally return the result to it so that the one who proposed // the command can get the result from Pending.Res. // // Note that the behavior of the command should be deterministic, otherwise the // applications might end up in an inconsistent state. Apply(Entry) (result interface{}) // OnLeadershipChange will be called to notify applications the change of // leader status of Raft. 'isLeader' tells application whether it is in // leader state or in non-leader state. 'term' tells application the // current term of the Raft node. 'leader' tells application the Raft // address of the leader node, it will be an empty string if the leader // is unknown yet, but once the the leader is known Raft will call this // callback with the address of the leader. // // The term number can be used in an application specific way. For example, // it's possible that multiple Raft leaders exist at the same time, but only // one leader can be effective, it is because internally Raft uses term // number to detect stale leaders, applications who also want one elected // primary to intereact with outsiders can also attach term number in their // messages so outsiders can use it to detect stale primaries. Another use // case is if applications want to implement the logic like read and then // modify based on the state which were read, they can also attach the term // number in modify command so when modify command is applied applications // can compare the term number attached in modify command to the term number // of the command itself so if they don't match there might be a potential // conflict modification from another leader. By guaranteeing that the term // when applications read the state and the term of the command which will // modify state machine are the same, there can't be a conflict change from // another leader in between. OnLeadershipChange(isLeader bool, term uint64, leader string) // OnMembershipChange will be called to notify applications of membership // changes over time. OnMembershipChange(membership Membership) // Snapshot is called once a snapshot is needed to be taken to compact log. // Applications should return an implementation of "Snapshoter" whose method // "Save" will be called to dump the actual state in a different goroutine. // The state that will be dumped in "Snapshoter.Save" MUST be the state at // the point the callback "Snapshot" is called. // // Once the "Snapshot" is returned further mutations might be applied in Apply // callback. However, these mutations should not affect the state that is // being dumped in "Snapshoter.Save". // // If any erorr is returned no snapshot will be taken. Snapshot() (Snapshoter, error) // SnapshotRestore is called once applications need to restore its state from // snapshot. Once it's called applications need to discard its current state // and restore its state from the given reader. Also the index and term number // of last applied command to the snapshot is passed to the applications. SnapshotRestore(reader io.Reader, lastIndex, lastTerm uint64) }
FSM is the interface that should be implemented by applications. It contains a list of callbacks which will be called by Raft to notify that certain events happen. NOTE all callbacks of FSM will be called from the same goroutine so that you don't need to worry about race conditions in different callbacks.
type InstallSnapshot ¶
type InstallSnapshot struct { // 'term' is provided by BaseMsg.Term // 'leaderId' is provied by BaseMsg.From BaseMsg // The index of last applied command included in this snapshot. LastIndex uint64 // The term of last applied command included in this snapshot. LastTerm uint64 // The latest applied membership change in snapshot. Membership Membership // The payload of a snapshot. // // This message must be created with Body pointing to actual payload. // Transport is responsible for sending it out on sender side and pointing // it to actual payload on receiver side. // // Body MUST be closed after the messge is sent out on sender side or // processed on receiver side. Body io.ReadCloser }
InstallSnapshot will be sent by leader to ship its snapshot to followers.
func (*InstallSnapshot) String ¶
func (i *InstallSnapshot) String() string
String converts InstallSnapshot to a human-readable string.
type Log ¶
type Log interface { // The index of first entry in log. If there's no entry in log, // empty will be returned as true. FirstIndex() (index uint64, empty bool) // The index of last entry in log. If there's no entry in log, // empty will be returned as true. LastIndex() (index uint64, empty bool) // GetBound returns the first index and last index of a log if it's not empty, // otherwise the third return value "empty" will be returned as true. GetBound() (firstIndex, lastIndex uint64, empty bool) // Append appends multiple entries to log. It's user's responsibility to // guarantee the appended entries have contiguous indices with existing // entries in log. If any errors occur, process will log an error and die. Append(...Entry) // Truncate truncates all log entries after(not include) the entry with the // given index. If any errors occur, process will log an error and die. Truncate(index uint64) // Trim deletes all entries up to(include) the given index. // This is used for log compaction. If any errors occur, process will log an // error and die. Trim(index uint64) // Term returns the term number of an entry at the given index. It's caller's // responsibility to guarantee an entry with the given index is in the log, // otherwise we'll log an error message and die. Term(index uint64) uint64 // Entries returns a slice of entries in range [beg, end). If 'end' exceeds // the index of last entry in log then the log should return as many entries // as it can. But users need to guarantee the index 'beg' exists in log, // otherwise we will panic. Entries(beg, end uint64) []Entry // GetIterator returns an iterator that can be used to iterate the log, // starting at the entry with ID 'first' or the first entry after where // it would be if there is no entry with that ID. GetIterator(first uint64) LogIterator // Close releases any resources used by the log. The log must not // be used after calling Close. Close() }
Log defines the interface of Raft log and this the only log interface that Raft implementation will interact with. It stores a sequence of entries and the indices of these entries must be contiguous in log.
For now Raft doesn't have much to do when IO error occurs. Now the IO error handling is done inside log, once an IO error occurs, it'll simply panic.
TODO(PL-1130): Log should be an actual implementation instead of an interface, and it should be built on top of an on-disk log implementation which is passed to it. Here we define it as an interface so we can have a mock implementation to test Raft first without worrying about on-disk interface definition and implementation.
type LogIterator ¶
type LogIterator interface { // Next advances the iterator. It returns true if it was able to // advance to the next entry or false if there are no more entries // or an error occurred. Use Err() to tell the difference. Next() bool // Entry returns the current entry. Next must be called before every // call to Entry. Entry() Entry // Err returns any error that occurred during iteration or nil if no // error occurred. Err() error // Close releases any resources associated with the iterator and // returns an error, if any occurred during iteration. The iterator // must not be used after Close is called. Close() error }
LogIterator is used to iterate over entries in a log.
type Membership ¶
type Membership struct { // Members is the cluster members. Members []string // Epoch is an id for this instance of raft. It will never change after the // initial configuration. It is used to detect misconfigurations and prevent // corruption. Epoch uint64 // Index is the index of the change in log. // (When serialized in logs/snapshots, Index is unused.) Index uint64 // Term is the term of the change in log. // (When serialized in logs/snapshots, Term is unused.) Term uint64 }
Membership represents one proposed cluster membership. It is used in-memory, as well as encoded in the log and snapshots for configuration messages.
func (*Membership) Quorum ¶
func (m *Membership) Quorum() int
Quorum returns the quorum size of the configuration.
type Msg ¶
type Msg interface { GetTerm() uint64 SetTerm(uint64) GetTo() string SetTo(string) GetFrom() string SetFrom(string) GetToGUID() uint64 SetToGUID(uint64) GetFromGUID() uint64 SetFromGUID(uint64) GetEpoch() uint64 SetEpoch(uint64) }
Msg defines the interface that all message types should implement.
type Pending ¶
type Pending struct { // Res is the result of executing a command. When a command is applied to // the state machine, FSM.Apply is called. The return value of that function // is stored here. Res interface{} // Err will be set if any error occured. Err error // Done is the channel that will be signaled when the command concludes. Done chan struct{} // contains filtered or unexported fields }
Pending represents a pending request of Raft.
type Raft ¶
type Raft struct {
// contains filtered or unexported fields
}
Raft implements a Raft node in a consensus group.
func NewRaft ¶
NewRaft creates a Raft node. You have to call "Raft.Start/Restart/Join" before you can use it.
func NewTestRaftNode ¶
NewTestRaftNode creates a Raft node that can be used for unit testing the applications that are built on top of Raft, not Raft itself.
- It's the only node in a cluster thus no other replicas are needed.
- It simulates time in a much faster pace so everyting happens much faster.
- All of its states are volatile.
func (*Raft) AddNode ¶
AddNode adds a new node to the cluster. A Pending object will be returned so applications can block until the request concludes. Errors:
ErrNodeNotLeader: - If the node is not a leader. ErrNotLeaderAnymore - If the node was the leader when received the reuqest, but stepped down before the reconfiguration gets committed. Please note when this error is returned it's still possible that the reconfiguration will be committed or has been committed. ErrNodeExists: - If the node to be added already exists in the current configuration. ErrTooManyReqs: - If the latest configuration has not been committed yet. We only allow one pending reconfiguration at a time. You have to wait for the commit of previous reconfiguration before issuing a new one.
func (*Raft) Propose ¶
Propose proposes a new command to the consensus group. The command will be proposed only if this node is the leader of the group. A Pending object will be returned so applications can block on it until the command concludes. Here "conlude" means either an error happens or the command is applied to state machine successfully. Also there's no timeout associated with a pending object. In the case of network partition or nodes failures that leader can not reach a quorum, the pending object might be blocked indefinitely, it's the applications' responsibilities to implement timeout. But a timeout on application side doesn't necessarily mean the command will be discarded by Raft, the command might still be applied to state machine eventually so applications should be careful about retrying requests which are either returned as an error "ErrNotLeaderAnymore" or "timeout" if their requests are not idempotent, given retrying their requests might cause a command to be applied to their state machine multiple times. Errors:
ErrNodeNotLeader - If the node is not a leader. ErrNotLeaderAnymore - If the node was the leader when received the command, but stepped down before the command gets committed to state. Please note when this error is returned it's still possible that the command will be applied or has been applied to applications.
func (*Raft) ProposeIfTerm ¶
ProposeIfTerm is similar to 'Propose', the only difference is the command will not get proposed if it can't have a term number which matches the term number specified by users. If term is 0, then terms are not required to match. Errors:
ErrNodeNotLeader - If the node is not a leader. ErrNotLeaderAnymore - If the node was the leader when received the command, but stepped down before the command gets committed to state. Please note when this error is returned it's still possible that the command will be applied or has been applied to applications. ErrTermMismatch - If the command can't get the term that's specified by users.
func (*Raft) ProposeInitialMembership ¶
ProposeInitialMembership sets an initial membership for the cluster. Raft must be in a clean state to do this. The intended use is to call this on one node in the cluster, and that one will initialize the others. However, it is safe to propose the same initial membership on all the members at once.
func (*Raft) RemoveNode ¶
RemoveNode removes an existing node from the cluster. A Pending object will be returned so applications can block until the request concludes. Errors:
ErrNodeNotLeader: - If the node is not a leader. ErrNotLeaderAnymore - If the node was the leader when received the reuqest, but stepped down before the reconfiguration gets committed. Please note when this error is returned it's still possible that the reconfiguration will be committed or has been committed. ErrNodeNotExists: - If the node to be removed doesn't exist in the current configuration. ErrTooManyReqs: - If the latest configuration has not been committed yet. We only allow one pending reconfiguration at a time. You have to wait for the commit of previous reconfiguration before issuing a new one.
func (*Raft) Start ¶
Start starts a new Raft node. If there is existing state on disk, that will be used to rejoin an existing cluster. Otherwise it will wait for an initial configuration.
func (*Raft) VerifyRead ¶
VerifyRead verifies, at the time this method is called, that (1) the node is leader and there's no leader with a higher term exists, and (2) the node's state is at least as up-to-date as any previous leaders.
Once the two requirements are met one can serve read requests from its local state in a linearizable way because it can ensure its state is up-to-date at the time VerifyRead method is called. Doing so can guarantee linearizability because by definition of linearizability means each operation appears to take effect atomically at some point between its invocation and completion, and we can guarantee the read response reflects a state at some point between VerifyRead method is called and the returned Pending object concludes.
A Pending object will be returned and applications can block on it until the command conludes. If the command concludes with no error then applications can serve read requests from local state in a linearizable way. Otherwise linearizability of reading from local state is not guaranteed. There're two types of errors might be returned: Errors:
ErrNodeNotLeader: - If the node is not a leader. ErrNotLeaderAnymore - If the node was the leader when received the reuqest, but stepped down before the request has been verified.
type SnapshotFileReader ¶
type SnapshotFileReader interface { io.ReadCloser // GetMetadata returns the meatadata of this snapshot file. GetMetadata() SnapshotMetadata }
SnapshotFileReader represents a reader to an existing snapshot file.
type SnapshotFileWriter ¶
type SnapshotFileWriter interface { io.Writer // Commit should be called once we have written all state to the file. // If everything goes smoothly and "Commit" succeeds, the snapshot file will // become "effective"(durable and visible). Commit() error // Abort aborts a pending snapshot file. Abort() error // GetMetadata returns the metadata of the snapshot file. GetMetadata() SnapshotMetadata }
SnapshotFileWriter represents a writer to a newly created snapshot file.
type SnapshotManager ¶
type SnapshotManager interface { // BeginSnapshot returns a "SnapshotFileWriter" to which applications can dump // their state. BeginSnapshot(SnapshotMetadata) (SnapshotFileWriter, error) // GetSnapshot returns a "SnapshotFileReader" of the latest snapshot file from // which applications can restore their state. If reader is returned as nil it // means there's no snapshot file found. GetSnapshot() SnapshotFileReader // GetSnapshotMetadata returns the metadata of the latest file, and // (optionally) a path to a file on disk where the snapshot is stored. If // there's no snapshot file found 'NilSnapshotMetadata', "" will be // returned. // The file path is only indended for backups. Other packages shouldn't // assume anything about the snapshot file format, and not all storage // implementations may provide it. GetSnapshotMetadata() (SnapshotMetadata, string) }
SnapshotManager is the interface of managing snapshot data. SnapshotManager must be implemented in a thread-safe way so that one writer and multiple readers can access 'SnapshotManager' concurrently.
func NewMemSnapshotMgr ¶
func NewMemSnapshotMgr() SnapshotManager
NewMemSnapshotMgr creates an in-memory implementation of Snapshot interface.
type SnapshotMetadata ¶
type SnapshotMetadata struct { // The last index of applied command to the snapshot. LastIndex uint64 // The term of last index of applied command to the snapshot. LastTerm uint64 // Membership information in snapshot. Membership *Membership }
SnapshotMetadata represents the metadata of a snapshot.
func (SnapshotMetadata) String ¶
func (m SnapshotMetadata) String() string
String returns a human-readable string of SnapshotMetadata.
type Snapshoter ¶
type Snapshoter interface { // Save is the callback that will be called by Raft. When this callback is // called applications should write the serialized state to the given // "writer". This callback will be called concurrently with "FSM.Apply" so // it should be implemented in a way that concurrent mutations made in // "FSM.Apply" will have no effects to the state that is being dumped. // // If any error is returned the snapshot file will be discarded. Save(writer io.Writer) error // Release will be called by Raft once Raft is done(either succeed or fail) // with the snapshot. Applications should release any resources that are // assoicated the snapshot when this callback is called. Release() }
Snapshoter encapsulates a point-in-time state of applications and should be implemented by applications so that Raft can dump the state to a an underlying snapshot file.
type State ¶
type State interface { // SaveState persists states 'voteFor' and 'currentTerm' to disk atomically. SaveState(voteFor string, term uint64) // GetState returns the persisted states "voteFor" and "currentTerm" GetState() (voteFor string, term uint64) // GetCurrentTerm returns state "currentTerm". GetCurrentTerm() uint64 // SetCurrentTerm modifies state "currentTerm", state "voteFor" will not be // changed. SetCurrentTerm(uint64) // GetVoteFor returns state "voteFor". GetVoteFor() string // SetVoteFor modifies state "voteFor", state "currentTerm" will not be changed. SetVoteFor(string) // GetMyGUID gets this node's GUID. GetMyGUID() uint64 // GetGUIDFor gets the known GUID for the given node, or an empty string if // no messages from that node have been received. GetGUIDFor(id string) uint64 // SetGUIDFor sets the known GUID for the given node. SetGUIDFor(id string, guid uint64) // FilterGUIDs forgets about GUIDs of nodes that are not in the given set of ids. FilterGUIDs(ids []string) }
State is the interface of storing/retrieving the state of Raft.
func NewMemState ¶
NewMemState creates an in-memory implementation of State interface.
type Storage ¶
type Storage struct { State SnapshotManager // contains filtered or unexported fields }
Storage manages all persistent state(log, raft state, snapshot).
func NewStorage ¶
func NewStorage(snapshot SnapshotManager, log Log, state State) *Storage
NewStorage creates a storage object.
func (*Storage) IsInCleanState ¶
IsInCleanState returns true if the state on disk is empty.
type Transport ¶
type Transport interface { // Addr returns the local address of the transport. Addr() string // Receive returns a channel for receiving incoming messages // from the network. Receive() <-chan Msg // Send asynchronously sends message 'm' to the node referred // by 'm.GetTo()' through the transport. Send(m Msg) // Close closes all connections. Close() error }
Transport defines the network transport layer used by raft to communicate between peers. For most of the messages transport only needs to encode the messages and send them over wire and decodes the messages it received. The only exception is that if the message has the type of 'InstallSnapshot' the transport not only needs to send the message itself, but also the data of the actual payload that is stored in 'InstallSnapshot.Body' and when transports receives a message with type of 'InstallSnapshot' it needs to set up 'InstallSnapshot.Body' so that receiver can access the actual payload of snapshot by reading the body.
func NewMemTransport ¶
func NewMemTransport(config TransportConfig) Transport
NewMemTransport creates a new memTransport for the given 'addr' with 'maxMsg' allowable pending messages.
func NewMsgDropper ¶
NewMsgDropper creates a new msgDropper. No message is dropped by default.
func NewMsgDuplicator ¶
NewMsgDuplicator creates a msgDuplicator. 'lower' is the wrapped transport, 'limit' is the maximum number of cached messages, 'p' is the probability to send a duplicated message when Send is called.
type TransportConfig ¶
type TransportConfig struct { // Transport address that the raft instance listens on. Addr string // Capacity of the incoming message channel that is returned in // 'Transport.Receive()'. MsgChanCap int }
TransportConfig stores the parameters for raft transport layer.
type VoteReq ¶
type VoteReq struct { // 'term' is provided by BaseMsg.Term // 'candidateId' is provied by BaseMsg.From BaseMsg // The index and term number of last entry in candidate's log. Candidate can // be elected as leader only if it has "sufficient" up-to-dated log history. LastLogIndex uint64 LastLogTerm uint64 }
VoteReq will be sent by candidate to campaign for a new leadership.
Source Files ¶
- commands.go
- config.go
- core.go
- core_candidate.go
- core_follower.go
- core_leader.go
- default_config.go
- entry.go
- errors.go
- fsm.go
- fsm_loop.go
- log.go
- mem_storage.go
- mem_transport.go
- msg_dropper.go
- msg_duplicator.go
- msg_reorder.go
- peer.go
- pending.go
- raft.go
- single.go
- snapshot_loop.go
- storage.go
- transport.go
- util.go
- wal_cache.go