raft

package
v0.0.0-...-fd5963e Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 13, 2019 License: MIT Imports: 20 Imported by: 0

Documentation

Index

Constants

View Source
const (
	EntryNormal uint8 = iota // Normal command proposed by applications.
	EntryConf                // Membership change command proposed internally by Raft.
	EntryNOP                 // NOP command
)

Type of Entry.

Variables

View Source
var (
	// DefaultConfig defines the default configuration for Raft.
	DefaultConfig = Config{
		ClusterID:             "default",
		CandidateTimeout:      100,
		FollowerTimeout:       100,
		LeaderStepdownTimeout: 100,
		RandomElectionRange:   50,
		HeartbeatTimeout:      20,
		SnapshotTimeout:       1000,
		DurationPerTick:       100 * time.Millisecond,
		MaxNumEntsPerAppEnts:  1000,

		LogEntriesAfterSnapshot: 10000,

		SnapshotThreshold:       10000,
		MaximumPendingProposals: 1000,
		MaximumProposalBatch:    100,
	}

	// DefaultTransportConfig includes default values for Raft transport.
	DefaultTransportConfig = TransportConfig{
		MsgChanCap: 1000,
	}
)

Default configuration values. Applications should override the values below when appropriate.

View Source
var (
	// ErrNodeNotLeader will be returned if propose commands to non-leader node.
	ErrNodeNotLeader = errors.New("The node is not leader.")

	// ErrNotLeaderAnymore will be returned once the leader who proposed the
	// command has stepped down before committing the command.
	ErrNotLeaderAnymore = errors.New("Not leader anymore")

	// ErrTooManyPendingReqs will be returned if number of pending requests exceeds
	// a certain threshold.
	ErrTooManyPendingReqs = errors.New("Too many pending requests.")

	// ErrTermMismatch will be returned if a command can't have the term number
	// specified by users.
	ErrTermMismatch = errors.New("Term numbers mismatch")

	// ErrNodeExists will be returned if users require to add some node that is
	// already in configuration.
	ErrNodeExists = errors.New("The node required to be added already exists")

	// ErrNodeNotExists will be returned if users required to remove some node
	// that doesn't exist in current configuration.
	ErrNodeNotExists = errors.New("The node required to be removed doesn't exist")

	// ErrAlreadyConfigured will be returned if raft is given an initial
	// configuration when it already has a configuration.
	ErrAlreadyConfigured = errors.New("Raft can't process an initial configuration after it's already initialized")
)
View Source
var NilSnapshotMetadata = SnapshotMetadata{0, 0, nil}

NilSnapshotMetadata is a special value that should be returned when there's no snapshot file found.

Functions

This section is empty.

Types

type AppEnts

type AppEnts struct {
	// 'term' is provided by BaseMsg.Term
	// 'leaderId' is provied by BaseMsg.From
	BaseMsg

	// PrevLogIndex and PrevLogTerm will be used as consistency check.
	// Follower will only accept the AppEnts request if an entry with term
	// 'PrevLogTerm' and index 'PrevLogIndex' exists in its log.
	PrevLogIndex uint64
	PrevLogTerm  uint64

	// The entries that need to be appended to follower's log.
	// It can be nil if it's just a pure heartbeat message or a probing message.
	Entries []Entry

	// The index that can be safely committed by follower. Leader is the only one
	// who decides which index of entry can be committed, and leader will use this
	// field to piggyback the commit index to followers so followers can apply
	// committed commands to their state machines.
	LeaderCommit uint64
}

AppEnts will be used by leader to:

  1. Update followers' logs.
  2. Probe the last agreed point between follower's log and log of itself.
  3. Send as a heartbeat message to maintain its leadership.

func (*AppEnts) String

func (a *AppEnts) String() string

String converts AppEnts to a human-readable string.

type AppEntsResp

type AppEntsResp struct {
	BaseMsg
	// Whether its corresponding AppEnts request succeeded or not.
	Success bool

	// If its corresponding AppEnts succeeded, 'Index' will be the index of the last
	// matched entry in follower's log identified by that request, so leader can
	// update 'matchIndex' of follower.
	//
	// If its corresponding AppEnts failed, 'Index' will be the index used for
	// consisntecy check(AppEnts.PrevLogIndex) so leader can update follower's
	// 'nextIdx' for next probing.
	//
	// 'Index' field is not specified in paper because the paper phrases the
	// protocol in terms of RPC, response is paired with its request it can always
	// derive this 'Index' field from its corresponding request.
	// However, we decoupled our transport layer from Raft implementation so we
	// need to keep track of this ourselves.
	Index uint64

	// Hint from follower. If 'Hint' is not 0 leader should decrement the
	// 'nextIndex' to 'Hint'. We use 'Hint' to bypass all log entries that
	// a follower is missing, instead of sending one probing message per
	// log entry.
	// Stale response with a stale 'Hint' is fine. In that case a leader might
	// just probe from a stale position. We implement the follower and leader
	// in a way that processing a stale message is still correct.
	Hint uint64
}

AppEntsResp is the response of its corresponding AppEnts request.

func (*AppEntsResp) String

func (a *AppEntsResp) String() string

String converts AppEntsResp to a human-readable string.

type BaseMsg

type BaseMsg struct {
	Term     uint64 // The current term number of sender.
	To       string // The ID of receiver.
	From     string // The ID of sender.
	ToGUID   uint64 // The GUID of receiver.
	FromGUID uint64 // The GUID of sender.
	Epoch    uint64 // The epoch ID.
}

BaseMsg contains all common fields of different message types.

func (*BaseMsg) GetEpoch

func (b *BaseMsg) GetEpoch() uint64

GetEpoch gets the epoch id.

func (*BaseMsg) GetFrom

func (b *BaseMsg) GetFrom() string

GetFrom gets the ID of sender.

func (*BaseMsg) GetFromGUID

func (b *BaseMsg) GetFromGUID() uint64

GetFromGUID gets the GUID of sender.

func (*BaseMsg) GetTerm

func (b *BaseMsg) GetTerm() uint64

GetTerm returns the current term of sender.

func (*BaseMsg) GetTo

func (b *BaseMsg) GetTo() string

GetTo returns the ID of receiver.

func (*BaseMsg) GetToGUID

func (b *BaseMsg) GetToGUID() uint64

GetToGUID returns the GUID of receiver.

func (*BaseMsg) SetEpoch

func (b *BaseMsg) SetEpoch(epoch uint64)

SetEpoch sets the epoch id.

func (*BaseMsg) SetFrom

func (b *BaseMsg) SetFrom(from string)

SetFrom sets the ID of sender.

func (*BaseMsg) SetFromGUID

func (b *BaseMsg) SetFromGUID(from uint64)

SetFromGUID sets the GUID of sender.

func (*BaseMsg) SetTerm

func (b *BaseMsg) SetTerm(term uint64)

SetTerm sets the current term of sender.

func (*BaseMsg) SetTo

func (b *BaseMsg) SetTo(to string)

SetTo sets the ID of receiver.

func (*BaseMsg) SetToGUID

func (b *BaseMsg) SetToGUID(to uint64)

SetToGUID sets the GUID of receiver.

func (*BaseMsg) String

func (b *BaseMsg) String() string

String converts BaseMsg to a human-readable string.

type Config

type Config struct {
	// ID of the node. Users must assign a unique ID to each Raft node in a group.
	ID string

	// The ID of the cluster. For now we only use it as a metric label to make our
	// monitoring system easier to aggregate and analyze metrics from raft nodes
	// of a same cluster.
	// TODO: use it to guard against misconfiguration of a cluster?
	ClusterID string

	// CandidateTimeout is used with 'RandomElectionRange' by a candidate to start
	// an election of next term, in units of 'tick'. If a candidate can't be elected
	// during time "CandidateTimeout + [0, RandomElectionRange]" it will start a new
	// election for next term.
	CandidateTimeout uint32

	// FollowerTimeout is used with 'RandomElectionRange' by a follower to start an
	// election of next term, in units of 'tick'. If a follower can't hear anything
	// from its leader during time "FollowerTimeout + [0, RandomElectionRange]" it
	// will start a new election of next term.
	FollowerTimeout uint32

	// LeaderStepdownTimeout is used to control how long the leader can maintain
	// its leadership without being able to contact a quorum of nodes, in units of
	// 'tick'. Once a leader loses the contact from a quorum after this amount
	// of time it will step down as leader. 'LeaderStepdownTimeout' must be larger
	// than 'HeartbeatTimeout' if it's not 0, otherwise the leader might falsely
	// think it loses its leadership and step down, which will make the whole system
	// unavailable until a new leader is elected. This is a voluntary decision and
	// do not depend on this to guarantee there's only one leader exists in cluster.
	// This is only used to step down a leader node who might already lose its
	// leadership at some point, so there might be multiple leaders(though only
	// one is effective) exist at the same time.
	//
	// If 'LeaderStepdownTimeout' is 0 then leader will keep its leadership until
	// a higher term is detected.
	//
	// The motivation of using 'LeaderStepdownTimeout' is when a network partiton
	// happens a stale leader may stay in leader state indefinitely even there's
	// a newer leader gets elected on the other side. Without realizing it might
	// already lost its leadership the stale leader will keep returning timeout
	// error to clients who are connecting to it. But if it steps down from leader
	// state at some point then clients can get more useful feedbacks(errors about
	// not being leader anymore) and fail over to the real leader node sooner.
	LeaderStepdownTimeout uint32

	// RandomElectionRange is a range of the randomness we introduced to leader
	// election to avoid the problem of split votes. Before electing itself as
	// leader, nodes need to wait for an extra time in range [0, RandomElectionRange]
	// to avoid split votes.
	RandomElectionRange uint32

	// HeartbeatTimeout is the interval of AppEnts requests, in units of 'tick'.
	// Leader must send at least one AppEnts request every this interval to maintain
	// its leadership. HeartbeatTimeout must be smaller than FollowerTimeout,
	// otherwise an elected leader can not maintain its leadership for a long time.
	HeartbeatTimeout uint32

	// SnapshotTimeout specifies how long it's expected to transfer snapshot AND
	// recover state from the snapshot. Leader will try to send messages to
	// followers at some intervals to maintain its leadership or retransmit the
	// message that might be lost. Because snapshot messages might take a long
	// time to transfer and apply so we use a different timeout to avoid
	// retransmitting unnecessary snapshot messages. The timeout should be set to
	// be large enough to cover the time spent in transferring and recovery.
	//
	// TODO: It might be hard to guess a good value beforehand, we could adjust the
	// timeout value dynamically using:
	//  - based on size of snapshot
	//  - use exponential back-off
	// Some relevant discussions in ZooKeeper group(https://issues.apache.org/jira/browse/ZOOKEEPER-1977)
	// (I don't think this has been addressed by them)
	SnapshotTimeout uint32

	// Duration per tick. We separated the logic time("tick") and real time(duration
	// per tick) to make our implementation more testable.
	DurationPerTick time.Duration

	// GenSeed is the function that takes a Raft ID as parameter and generates a
	// seed number for random number generator of that node. If users don't
	// specify one, a default seed generator will be used.
	GenSeed func(string) int64

	// MaxNumEntsPerAppEnts specifies the maximum number of entries per AppEnts
	// message. We don't want to saturate disk/network IO when synchronizing a
	// straggler.
	MaxNumEntsPerAppEnts uint32

	// LogEntriesAfterSnapshot specifies the number of entries to leave in the
	// log after a snapshot is taken. Once a snapshot is taken we are OK to
	// trim all log entries which are included in snapshot, but doing that might
	// force a leader to ship its entire snapshot to a follower when a follower
	// is missing anything in the snapshot. By keeping some entries in the log
	// after a snapshot, we can send those entries to followers to synchronize
	// them, instead of having to send an entire snapshot.
	// LogEntriesAfterSnapshot is the maximum number of entries we can trim(included
	// in snapshot) but keep them untrimmed in the log.
	LogEntriesAfterSnapshot uint64

	// SnapshotThreshold specifies how many applied commands there must be since last
	// snapshhot before we perform a new snapshot. If it's 0 then no snapshot will
	// be performed. For example, if you set "SnapshotThreshold" to 1000, then after
	// every 1000 commands applied to state machine a new snapshot will be taken.
	// There's a trade-off on this number -- the larger it is, the less frequently
	// we'll take snapshot so that it would cost less IO, but it might take longer
	// time in in recovery to apply all log entries that are not included in snapshot.
	SnapshotThreshold uint64

	// MaximumPendingProposals specifies the maximum number of pending proposals
	// allowed. Further requests will be blocked once the size of pending proposals
	// reaches this limit. If it is set to 0, no limit will be enforced.
	MaximumPendingProposals uint32

	// MaximumProposalBatch is the maximum number of proposals leader will try to
	// process them at once from proposal channel. The larger it is, the larger
	// throughput you might get, but also larger latency per request you might
	// experience.
	MaximumProposalBatch uint32
}

Config stores the configurations for core layer.

type Entry

type Entry struct {
	Term  uint64
	Index uint64
	Cmd   []byte
	Type  uint8
}

Entry represents one command in log.

func (Entry) String

func (e Entry) String() string

String returns human-readable representation of the entry.

type FSM

type FSM interface {
	// Apply will be called once a command is committed in a Raft group. This callback
	// is called from a single goroutine to ensure that the state updates are applied
	// in the same order as they arrived. Application must apply the command in the
	// callback and can optionally return the result to it so that the one who proposed
	// the command can get the result from Pending.Res.
	//
	// Note that the behavior of the command should be deterministic, otherwise the
	// applications might end up in an inconsistent state.
	Apply(Entry) (result interface{})

	// OnLeadershipChange will be called to notify applications the change of
	// leader status of Raft. 'isLeader' tells application whether it is in
	// leader state or in non-leader state. 'term' tells application the
	// current term of the Raft node. 'leader' tells application the Raft
	// address of the leader node, it will be an empty string if the leader
	// is unknown yet, but once the the leader is known Raft will call this
	// callback with the address of the leader.
	//
	// The term number can be used in an application specific way. For example,
	// it's possible that multiple Raft leaders exist at the same time, but only
	// one leader can be effective, it is because internally Raft uses term
	// number to detect stale leaders, applications who also want one elected
	// primary to intereact with outsiders can also attach term number in their
	// messages so outsiders can use it to detect stale primaries. Another use
	// case is if applications want to implement the logic like read and then
	// modify based on the state which were read, they can also attach the term
	// number in modify command so when modify command is applied applications
	// can compare the term number attached in modify command to the term number
	// of the command itself so if they don't match there might be a potential
	// conflict modification from another leader. By guaranteeing that the term
	// when applications read the state and the term of the command which will
	// modify state machine are the same, there can't be a conflict change from
	// another leader in between.
	OnLeadershipChange(isLeader bool, term uint64, leader string)

	// OnMembershipChange will be called to notify applications of membership
	// changes over time.
	OnMembershipChange(membership Membership)

	// Snapshot is called once a snapshot is needed to be taken to compact log.
	// Applications should return an implementation of "Snapshoter" whose method
	// "Save" will be called to dump the actual state in a different goroutine.
	// The state that will be dumped in "Snapshoter.Save" MUST be the state at
	// the point the callback "Snapshot" is called.
	//
	// Once the "Snapshot" is returned further mutations might be applied in Apply
	// callback. However, these mutations should not affect the state that is
	// being dumped in "Snapshoter.Save".
	//
	// If any erorr is returned no snapshot will be taken.
	Snapshot() (Snapshoter, error)

	// SnapshotRestore is called once applications need to restore its state from
	// snapshot. Once it's called applications need to discard its current state
	// and restore its state from the given reader. Also the index and term number
	// of last applied command to the snapshot is passed to the applications.
	SnapshotRestore(reader io.Reader, lastIndex, lastTerm uint64)
}

FSM is the interface that should be implemented by applications. It contains a list of callbacks which will be called by Raft to notify that certain events happen. NOTE all callbacks of FSM will be called from the same goroutine so that you don't need to worry about race conditions in different callbacks.

type InstallSnapshot

type InstallSnapshot struct {
	// 'term' is provided by BaseMsg.Term
	// 'leaderId' is provied by BaseMsg.From
	BaseMsg

	// The index of last applied command included in this snapshot.
	LastIndex uint64
	// The term of last applied command included in this snapshot.
	LastTerm uint64

	// The latest applied membership change in snapshot.
	Membership Membership

	// The payload of a snapshot.
	//
	// This message must be created with Body pointing to actual payload.
	// Transport is responsible for sending it out on sender side and pointing
	// it to actual payload on receiver side.
	//
	// Body MUST be closed after the messge is sent out on sender side or
	// processed on receiver side.
	Body io.ReadCloser
}

InstallSnapshot will be sent by leader to ship its snapshot to followers.

func (*InstallSnapshot) String

func (i *InstallSnapshot) String() string

String converts InstallSnapshot to a human-readable string.

type Log

type Log interface {
	// The index of first entry in log. If there's no entry in log,
	// empty will be returned as true.
	FirstIndex() (index uint64, empty bool)

	// The index of last entry in log. If there's no entry in log,
	// empty will be returned as true.
	LastIndex() (index uint64, empty bool)

	// GetBound returns the first index and last index of a log if it's not empty,
	// otherwise the third return value "empty" will be returned as true.
	GetBound() (firstIndex, lastIndex uint64, empty bool)

	// Append appends multiple entries to log. It's user's responsibility to
	// guarantee the appended entries have contiguous indices with existing
	// entries in log. If any errors occur, process will log an error and die.
	Append(...Entry)

	// Truncate truncates all log entries after(not include) the entry with the
	// given index. If any errors occur, process will log an error and die.
	Truncate(index uint64)

	// Trim deletes all entries up to(include) the given index.
	// This is used for log compaction. If any errors occur, process will log an
	// error and die.
	Trim(index uint64)

	// Term returns the term number of an entry at the given index. It's caller's
	// responsibility to guarantee an entry with the given index is in the log,
	// otherwise we'll log an error message and die.
	Term(index uint64) uint64

	// Entries returns a slice of entries in range [beg, end). If 'end' exceeds
	// the index of last entry in log then the log should return as many entries
	// as it can. But users need to guarantee the index 'beg' exists in log,
	// otherwise we will panic.
	Entries(beg, end uint64) []Entry

	// GetIterator returns an iterator that can be used to iterate the log,
	// starting at the entry with ID 'first' or the first entry after where
	// it would be if there is no entry with that ID.
	GetIterator(first uint64) LogIterator

	// Close releases any resources used by the log. The log must not
	// be used after calling Close.
	Close()
}

Log defines the interface of Raft log and this the only log interface that Raft implementation will interact with. It stores a sequence of entries and the indices of these entries must be contiguous in log.

For now Raft doesn't have much to do when IO error occurs. Now the IO error handling is done inside log, once an IO error occurs, it'll simply panic.

TODO(PL-1130): Log should be an actual implementation instead of an interface, and it should be built on top of an on-disk log implementation which is passed to it. Here we define it as an interface so we can have a mock implementation to test Raft first without worrying about on-disk interface definition and implementation.

func NewFSLog

func NewFSLog(homeDir string, cacheCapacity uint) Log

NewFSLog creates a durable file-based implementation of Log.

func NewMemLog

func NewMemLog() Log

NewMemLog creates an in-memory mock log.

type LogIterator

type LogIterator interface {
	// Next advances the iterator. It returns true if it was able to
	// advance to the next entry or false if there are no more entries
	// or an error occurred. Use Err() to tell the difference.
	Next() bool

	// Entry returns the current entry. Next must be called before every
	// call to Entry.
	Entry() Entry

	// Err returns any error that occurred during iteration or nil if no
	// error occurred.
	Err() error

	// Close releases any resources associated with the iterator and
	// returns an error, if any occurred during iteration. The iterator
	// must not be used after Close is called.
	Close() error
}

LogIterator is used to iterate over entries in a log.

type Membership

type Membership struct {
	// Members is the cluster members.
	Members []string

	// Epoch is an id for this instance of raft. It will never change after the
	// initial configuration. It is used to detect misconfigurations and prevent
	// corruption.
	Epoch uint64

	// Index is the index of the change in log.
	// (When serialized in logs/snapshots, Index is unused.)
	Index uint64

	// Term is the term of the change in log.
	// (When serialized in logs/snapshots, Term is unused.)
	Term uint64
}

Membership represents one proposed cluster membership. It is used in-memory, as well as encoded in the log and snapshots for configuration messages.

func (*Membership) Quorum

func (m *Membership) Quorum() int

Quorum returns the quorum size of the configuration.

type Msg

type Msg interface {
	GetTerm() uint64
	SetTerm(uint64)
	GetTo() string
	SetTo(string)
	GetFrom() string
	SetFrom(string)
	GetToGUID() uint64
	SetToGUID(uint64)
	GetFromGUID() uint64
	SetFromGUID(uint64)
	GetEpoch() uint64
	SetEpoch(uint64)
}

Msg defines the interface that all message types should implement.

type Pending

type Pending struct {
	// Res is the result of executing a command. When a command is applied to
	// the state machine, FSM.Apply is called. The return value of that function
	// is stored here.
	Res interface{}

	//  Err will be set if any error occured.
	Err error

	// Done is the channel that will be signaled when the command concludes.
	Done chan struct{}
	// contains filtered or unexported fields
}

Pending represents a pending request of Raft.

type Raft

type Raft struct {
	// contains filtered or unexported fields
}

Raft implements a Raft node in a consensus group.

func NewRaft

func NewRaft(config Config, storage *Storage, transport Transport) *Raft

NewRaft creates a Raft node. You have to call "Raft.Start/Restart/Join" before you can use it.

func NewTestRaftNode

func NewTestRaftNode(ID, clusterID string) *Raft

NewTestRaftNode creates a Raft node that can be used for unit testing the applications that are built on top of Raft, not Raft itself.

  • It's the only node in a cluster thus no other replicas are needed.
  • It simulates time in a much faster pace so everyting happens much faster.
  • All of its states are volatile.

func (*Raft) AddNode

func (r *Raft) AddNode(node string) *Pending

AddNode adds a new node to the cluster. A Pending object will be returned so applications can block until the request concludes. Errors:

ErrNodeNotLeader:
  - If the node is not a leader.
ErrNotLeaderAnymore
  - If the node was the leader when received the reuqest, but stepped down
    before the reconfiguration gets committed. Please note when this error
    is returned it's still possible that the reconfiguration will be
    committed or has been committed.
ErrNodeExists:
  - If the node to be added already exists in the current configuration.
ErrTooManyReqs:
  - If the latest configuration has not been committed yet. We only allow
    one pending reconfiguration at a time. You have to wait for the commit
    of previous reconfiguration before issuing a new one.

func (*Raft) Propose

func (r *Raft) Propose(cmd []byte) *Pending

Propose proposes a new command to the consensus group. The command will be proposed only if this node is the leader of the group. A Pending object will be returned so applications can block on it until the command concludes. Here "conlude" means either an error happens or the command is applied to state machine successfully. Also there's no timeout associated with a pending object. In the case of network partition or nodes failures that leader can not reach a quorum, the pending object might be blocked indefinitely, it's the applications' responsibilities to implement timeout. But a timeout on application side doesn't necessarily mean the command will be discarded by Raft, the command might still be applied to state machine eventually so applications should be careful about retrying requests which are either returned as an error "ErrNotLeaderAnymore" or "timeout" if their requests are not idempotent, given retrying their requests might cause a command to be applied to their state machine multiple times. Errors:

ErrNodeNotLeader
  - If the node is not a leader.
ErrNotLeaderAnymore
  - If the node was the leader when received the command, but stepped down
    before the command gets committed to state. Please note when this error
    is returned it's still possible that the command will be applied or has
    been applied to applications.

func (*Raft) ProposeIfTerm

func (r *Raft) ProposeIfTerm(cmd []byte, term uint64) *Pending

ProposeIfTerm is similar to 'Propose', the only difference is the command will not get proposed if it can't have a term number which matches the term number specified by users. If term is 0, then terms are not required to match. Errors:

ErrNodeNotLeader
  - If the node is not a leader.
ErrNotLeaderAnymore
  - If the node was the leader when received the command, but stepped down
    before the command gets committed to state. Please note when this error
    is returned it's still possible that the command will be applied or has
    been applied to applications.
ErrTermMismatch
  - If the command can't get the term that's specified by users.

func (*Raft) ProposeInitialMembership

func (r *Raft) ProposeInitialMembership(members []string) *Pending

ProposeInitialMembership sets an initial membership for the cluster. Raft must be in a clean state to do this. The intended use is to call this on one node in the cluster, and that one will initialize the others. However, it is safe to propose the same initial membership on all the members at once.

func (*Raft) RemoveNode

func (r *Raft) RemoveNode(node string) *Pending

RemoveNode removes an existing node from the cluster. A Pending object will be returned so applications can block until the request concludes. Errors:

ErrNodeNotLeader:
  - If the node is not a leader.
ErrNotLeaderAnymore
  - If the node was the leader when received the reuqest, but stepped down
    before the reconfiguration gets committed. Please note when this error
    is returned it's still possible that the reconfiguration will be
    committed or has been committed.
ErrNodeNotExists:
  - If the node to be removed doesn't exist in the current configuration.
ErrTooManyReqs:
  - If the latest configuration has not been committed yet. We only allow
    one pending reconfiguration at a time. You have to wait for the commit
    of previous reconfiguration before issuing a new one.

func (*Raft) Start

func (r *Raft) Start(fsm FSM)

Start starts a new Raft node. If there is existing state on disk, that will be used to rejoin an existing cluster. Otherwise it will wait for an initial configuration.

func (*Raft) VerifyRead

func (r *Raft) VerifyRead() *Pending

VerifyRead verifies, at the time this method is called, that (1) the node is leader and there's no leader with a higher term exists, and (2) the node's state is at least as up-to-date as any previous leaders.

Once the two requirements are met one can serve read requests from its local state in a linearizable way because it can ensure its state is up-to-date at the time VerifyRead method is called. Doing so can guarantee linearizability because by definition of linearizability means each operation appears to take effect atomically at some point between its invocation and completion, and we can guarantee the read response reflects a state at some point between VerifyRead method is called and the returned Pending object concludes.

A Pending object will be returned and applications can block on it until the command conludes. If the command concludes with no error then applications can serve read requests from local state in a linearizable way. Otherwise linearizability of reading from local state is not guaranteed. There're two types of errors might be returned: Errors:

ErrNodeNotLeader:
  - If the node is not a leader.
ErrNotLeaderAnymore
  - If the node was the leader when received the reuqest, but stepped down
    before the request has been verified.

type SnapshotFileReader

type SnapshotFileReader interface {
	io.ReadCloser

	// GetMetadata returns the meatadata of this snapshot file.
	GetMetadata() SnapshotMetadata
}

SnapshotFileReader represents a reader to an existing snapshot file.

type SnapshotFileWriter

type SnapshotFileWriter interface {
	io.Writer

	// Commit should be called once we have written all state to the file.
	// If everything goes smoothly and "Commit" succeeds, the snapshot file will
	// become "effective"(durable and visible).
	Commit() error

	// Abort aborts a pending snapshot file.
	Abort() error

	// GetMetadata returns the metadata of the snapshot file.
	GetMetadata() SnapshotMetadata
}

SnapshotFileWriter represents a writer to a newly created snapshot file.

type SnapshotManager

type SnapshotManager interface {
	// BeginSnapshot returns a "SnapshotFileWriter" to which applications can dump
	// their state.
	BeginSnapshot(SnapshotMetadata) (SnapshotFileWriter, error)

	// GetSnapshot returns a "SnapshotFileReader" of the latest snapshot file from
	// which applications can restore their state. If reader is returned as nil it
	// means there's no snapshot file found.
	GetSnapshot() SnapshotFileReader

	// GetSnapshotMetadata returns the metadata of the latest file, and
	// (optionally) a path to a file on disk where the snapshot is stored. If
	// there's no snapshot file found 'NilSnapshotMetadata', "" will be
	// returned.
	// The file path is only indended for backups. Other packages shouldn't
	// assume anything about the snapshot file format, and not all storage
	// implementations may provide it.
	GetSnapshotMetadata() (SnapshotMetadata, string)
}

SnapshotManager is the interface of managing snapshot data. SnapshotManager must be implemented in a thread-safe way so that one writer and multiple readers can access 'SnapshotManager' concurrently.

func NewMemSnapshotMgr

func NewMemSnapshotMgr() SnapshotManager

NewMemSnapshotMgr creates an in-memory implementation of Snapshot interface.

type SnapshotMetadata

type SnapshotMetadata struct {
	// The last index of applied command to the snapshot.
	LastIndex uint64
	// The term of last index of applied command to the snapshot.
	LastTerm uint64
	// Membership information in snapshot.
	Membership *Membership
}

SnapshotMetadata represents the metadata of a snapshot.

func (SnapshotMetadata) String

func (m SnapshotMetadata) String() string

String returns a human-readable string of SnapshotMetadata.

type Snapshoter

type Snapshoter interface {
	// Save is the callback that will be called by Raft. When this callback is
	// called applications should write the serialized state to the given
	// "writer". This callback will be called concurrently with "FSM.Apply" so
	// it should be implemented in a way that concurrent mutations made in
	// "FSM.Apply" will have no effects to the state that is being dumped.
	//
	// If any error is returned the snapshot file will be discarded.
	Save(writer io.Writer) error

	// Release will be called by Raft once Raft is done(either succeed or fail)
	// with the snapshot. Applications should release any resources that are
	// assoicated the snapshot when this callback is called.
	Release()
}

Snapshoter encapsulates a point-in-time state of applications and should be implemented by applications so that Raft can dump the state to a an underlying snapshot file.

type State

type State interface {
	// SaveState persists states 'voteFor' and 'currentTerm' to disk atomically.
	SaveState(voteFor string, term uint64)

	// GetState returns the persisted states "voteFor" and "currentTerm"
	GetState() (voteFor string, term uint64)

	// GetCurrentTerm returns state "currentTerm".
	GetCurrentTerm() uint64

	// SetCurrentTerm modifies state "currentTerm", state "voteFor" will not be
	// changed.
	SetCurrentTerm(uint64)

	// GetVoteFor returns state "voteFor".
	GetVoteFor() string

	// SetVoteFor modifies state "voteFor", state "currentTerm" will not be changed.
	SetVoteFor(string)

	// GetMyGUID gets this node's GUID.
	GetMyGUID() uint64

	// GetGUIDFor gets the known GUID for the given node, or an empty string if
	// no messages from that node have been received.
	GetGUIDFor(id string) uint64

	// SetGUIDFor sets the known GUID for the given node.
	SetGUIDFor(id string, guid uint64)

	// FilterGUIDs forgets about GUIDs of nodes that are not in the given set of ids.
	FilterGUIDs(ids []string)
}

State is the interface of storing/retrieving the state of Raft.

func NewMemState

func NewMemState(term uint64) State

NewMemState creates an in-memory implementation of State interface.

type Storage

type Storage struct {
	State
	SnapshotManager
	// contains filtered or unexported fields
}

Storage manages all persistent state(log, raft state, snapshot).

func NewStorage

func NewStorage(snapshot SnapshotManager, log Log, state State) *Storage

NewStorage creates a storage object.

func (*Storage) IsInCleanState

func (s *Storage) IsInCleanState() bool

IsInCleanState returns true if the state on disk is empty.

type Transport

type Transport interface {
	// Addr returns the local address of the transport.
	Addr() string

	// Receive returns a channel for receiving incoming messages
	// from the network.
	Receive() <-chan Msg

	// Send asynchronously sends message 'm' to the node referred
	// by 'm.GetTo()' through the transport.
	Send(m Msg)

	// Close closes all connections.
	Close() error
}

Transport defines the network transport layer used by raft to communicate between peers. For most of the messages transport only needs to encode the messages and send them over wire and decodes the messages it received. The only exception is that if the message has the type of 'InstallSnapshot' the transport not only needs to send the message itself, but also the data of the actual payload that is stored in 'InstallSnapshot.Body' and when transports receives a message with type of 'InstallSnapshot' it needs to set up 'InstallSnapshot.Body' so that receiver can access the actual payload of snapshot by reading the body.

func NewMemTransport

func NewMemTransport(config TransportConfig) Transport

NewMemTransport creates a new memTransport for the given 'addr' with 'maxMsg' allowable pending messages.

func NewMsgDropper

func NewMsgDropper(lower Transport, seed int64, defaultProb float32) Transport

NewMsgDropper creates a new msgDropper. No message is dropped by default.

func NewMsgDuplicator

func NewMsgDuplicator(lower Transport, limit int, p float32, seed int64) Transport

NewMsgDuplicator creates a msgDuplicator. 'lower' is the wrapped transport, 'limit' is the maximum number of cached messages, 'p' is the probability to send a duplicated message when Send is called.

func NewMsgReorder

func NewMsgReorder(lower Transport, p float32, limit time.Duration, seed int64) Transport

NewMsgReorder creates a msgReorder. 'lower' is the wrapped transport, 'p' is the probability to delay a message, 'limit' is the maximum amount of time to delay a message.

type TransportConfig

type TransportConfig struct {
	// Transport address that the raft instance listens on.
	Addr string

	// Capacity of the incoming message channel that is returned in
	// 'Transport.Receive()'.
	MsgChanCap int
}

TransportConfig stores the parameters for raft transport layer.

type VoteReq

type VoteReq struct {
	// 'term' is provided by BaseMsg.Term
	// 'candidateId' is provied by BaseMsg.From
	BaseMsg

	// The index and term number of last entry in candidate's log. Candidate can
	// be elected as leader only if it has "sufficient" up-to-dated log history.
	LastLogIndex uint64
	LastLogTerm  uint64
}

VoteReq will be sent by candidate to campaign for a new leadership.

func (*VoteReq) String

func (v *VoteReq) String() string

String converts VoteReq to a human-readable string.

type VoteResp

type VoteResp struct {
	BaseMsg
	// Whether its corresponding VoteReq is granted or not.
	Granted bool
}

VoteResp is the response of VoteReq.

func (*VoteResp) String

func (v *VoteResp) String() string

String converts VoteResp to a human-readable string.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL