Documentation ¶
Index ¶
- Constants
- Variables
- func BackOff(base time.Duration, power, limit uint64) time.Duration
- func GenerateUUID() string
- func KeyFunc(address ServerAddress, id ServerId) string
- func NewNetTransport()
- func RandomTimeout(timeout time.Duration) (<-chan time.Time, time.Duration)
- func ResolveDataDir(dir string) (string, error)
- func SnapshotName(index, term uint64) string
- type ApplyCommand
- type ChannelTransport
- func (c *ChannelTransport) Channel() chan *RPC
- func (c *ChannelTransport) Connect(address ServerAddress, ct *ChannelTransport)
- func (c *ChannelTransport) Consumer() <-chan *RPC
- func (c *ChannelTransport) Disconnect(address ServerAddress)
- func (c *ChannelTransport) LocalAddr() ServerAddress
- func (c *ChannelTransport) SendAppendEntries(server Server, request *raftpb.AppendEntriesRequest) (*raftpb.AppendEntriesReply, error)
- func (c *ChannelTransport) SendRequestVote(server Server, request *raftpb.RequestVoteRequest) (*raftpb.RequestVoteReply, error)
- type Client
- func (c *Client) AddNonVoter(id ServerId, addr ServerAddress, prevIndex uint64, timeout time.Duration) FutureInterface
- func (c *Client) AddVoter(id ServerId, addr ServerAddress, prevIndex uint64, timeout time.Duration) FutureInterface
- func (c *Client) Apply(cmd []byte, timeout time.Duration) FutureInterface
- type ClientInterface
- type Config
- type Configuration
- type ConfigurationChangeFuture
- type Future
- type FutureInterface
- type Leader
- type LogInterface
- type MemorySnapshotStore
- type MemoryStateMachine
- type MemoryStorage
- func (m *MemoryStorage) AppendEntries(entries []*raftpb.Entry) error
- func (m *MemoryStorage) DeleteRange(from uint64, to uint64) error
- func (m *MemoryStorage) FirstIndex() (uint64, error)
- func (m *MemoryStorage) Get(key []byte) ([]byte, error)
- func (m *MemoryStorage) GetEntry(index uint64) (*raftpb.Entry, error)
- func (m *MemoryStorage) GetUint64(key []byte) (uint64, error)
- func (m *MemoryStorage) LastIndex() (uint64, error)
- func (m *MemoryStorage) Set(key []byte, val []byte) error
- func (m *MemoryStorage) SetUint64(key []byte, val uint64) error
- type NetTransport
- type Node
- type RPC
- type RPCResponse
- type Raft
- func (r *Raft) ApplyChannel() chan *ApplyCommand
- func (r *Raft) ConfigurationChangeChannel() chan *ConfigurationChangeFuture
- func (r *Raft) Done() chan struct{}
- func (r *Raft) SetState(state State)
- func (r *Raft) Start() error
- func (r *Raft) State() State
- func (r *Raft) Stop()
- func (r *Raft) String() string
- type Replication
- type Server
- type ServerAddress
- type ServerId
- type ServerInterface
- type Snapshot
- type SnapshotStoreInterface
- type State
- type StateMachineInterface
- type StateMachineSnapshot
- type Storage
- func (s *Storage) AppendEntries(entry []*raftpb.Entry) error
- func (s *Storage) DeleteRange(from uint64, to uint64) error
- func (s *Storage) FirstIndex() (uint64, error)
- func (s *Storage) Get(key []byte) ([]byte, error)
- func (s *Storage) GetEntry(index uint64) (*raftpb.Entry, error)
- func (s *Storage) GetUint64(key []byte) (uint64, error)
- func (s *Storage) LastIndex() (uint64, error)
- func (s *Storage) Set(key, value []byte) error
- func (s *Storage) SetUint64(key []byte, value uint64) error
- type StorageInterface
- type Suffrage
- type TransportInterface
Constants ¶
View Source
const ( StateFollower = "Follower" StateCandidate = "Candidate" StateLeader = "Leader" StateUnknown = "Unknown" )
View Source
const ( FailureRetryInterval = 10 * time.Millisecond MaxFailures = 8 )
Variables ¶
View Source
var ( ErrorTimeout = errors.New("timeout") ErrorSopped = errors.New("raft stopped") )
View Source
var ( KeyCurrentTerm = []byte("CurrentTerm") KeyVoteFor = []byte("VoteFor") )
View Source
var ( BucketConfName = []byte("conf") ErrorKeyNotFound = errors.New("key not found") )
Functions ¶
func GenerateUUID ¶
func GenerateUUID() string
func KeyFunc ¶
func KeyFunc(address ServerAddress, id ServerId) string
func NewNetTransport ¶
func NewNetTransport()
func ResolveDataDir ¶
func SnapshotName ¶
Types ¶
type ApplyCommand ¶
type ApplyCommand struct { *Future // contains filtered or unexported fields }
func NewApplyCommand ¶
func NewApplyCommand(cmd []byte) *ApplyCommand
type ChannelTransport ¶
func NewChannelTransport ¶
func NewChannelTransport(localAddr string, timeout time.Duration) *ChannelTransport
func (*ChannelTransport) Channel ¶
func (c *ChannelTransport) Channel() chan *RPC
func (*ChannelTransport) Connect ¶
func (c *ChannelTransport) Connect(address ServerAddress, ct *ChannelTransport)
func (*ChannelTransport) Consumer ¶
func (c *ChannelTransport) Consumer() <-chan *RPC
func (*ChannelTransport) Disconnect ¶
func (c *ChannelTransport) Disconnect(address ServerAddress)
func (*ChannelTransport) LocalAddr ¶
func (c *ChannelTransport) LocalAddr() ServerAddress
func (*ChannelTransport) SendAppendEntries ¶
func (c *ChannelTransport) SendAppendEntries(server Server, request *raftpb.AppendEntriesRequest) (*raftpb.AppendEntriesReply, error)
func (*ChannelTransport) SendRequestVote ¶
func (c *ChannelTransport) SendRequestVote(server Server, request *raftpb.RequestVoteRequest) (*raftpb.RequestVoteReply, error)
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
func (*Client) AddNonVoter ¶
func (c *Client) AddNonVoter(id ServerId, addr ServerAddress, prevIndex uint64, timeout time.Duration) FutureInterface
func (*Client) AddVoter ¶
func (c *Client) AddVoter(id ServerId, addr ServerAddress, prevIndex uint64, timeout time.Duration) FutureInterface
type ClientInterface ¶
type ClientInterface interface { Apply(cmd []byte, timeout time.Duration) FutureInterface AddNonVoter(id ServerId, addr ServerAddress, prevIndex uint64, timeout time.Duration) FutureInterface AddVoter(id ServerId, addr ServerAddress, prevIndex uint64, timeout time.Duration) FutureInterface }
func NewClient ¶
func NewClient(raft ServerInterface) ClientInterface
type Config ¶
type Config struct { // A follower receive no communication over a period of time. ElectionTimeout time.Duration HeartbeatInterval time.Duration LocalId ServerId SnapshotThreshold uint64 SnapshotInterval time.Duration MaxLogEntriesAfterSnapshot uint64 }
func NewDefaultConfig ¶
func NewDefaultConfig() *Config
type Configuration ¶
type Configuration struct {
Servers []Server
}
type ConfigurationChangeFuture ¶
type ConfigurationChangeFuture struct { *Future // contains filtered or unexported fields }
func NewConfigurationChangeFuture ¶
func NewConfigurationChangeFuture(req *raftpb.ConfigurationChangeRequest) *ConfigurationChangeFuture
type Future ¶
type Future struct {
// contains filtered or unexported fields
}
func NewFailedFuture ¶
func NewSucceedFuture ¶
func NewSucceedFuture(result interface{}) *Future
type FutureInterface ¶
type LogInterface ¶
type MemorySnapshotStore ¶
type MemorySnapshotStore struct {
// contains filtered or unexported fields
}
func NewMemorySnapshotStore ¶
func NewMemorySnapshotStore() *MemorySnapshotStore
func (*MemorySnapshotStore) List ¶
func (m *MemorySnapshotStore) List() ([]*Snapshot, error)
func (*MemorySnapshotStore) Save ¶
func (m *MemorySnapshotStore) Save(lastIndex, lastTerm uint64, configuration Configuration) (*Snapshot, error)
type MemoryStateMachine ¶
type MemoryStateMachine struct { StateMachineSnapshot // contains filtered or unexported fields }
func NewMemoryStateMachine ¶
func NewMemoryStateMachine() *MemoryStateMachine
func (*MemoryStateMachine) Apply ¶
func (s *MemoryStateMachine) Apply(entry *raftpb.Entry)
func (*MemoryStateMachine) Recover ¶
func (s *MemoryStateMachine) Recover(snapshot *Snapshot) error
func (*MemoryStateMachine) Snapshot ¶
func (s *MemoryStateMachine) Snapshot() (*StateMachineSnapshot, error)
type MemoryStorage ¶
func (*MemoryStorage) AppendEntries ¶
func (m *MemoryStorage) AppendEntries(entries []*raftpb.Entry) error
func (*MemoryStorage) DeleteRange ¶
func (m *MemoryStorage) DeleteRange(from uint64, to uint64) error
func (*MemoryStorage) FirstIndex ¶
func (m *MemoryStorage) FirstIndex() (uint64, error)
func (*MemoryStorage) GetEntry ¶
func (m *MemoryStorage) GetEntry(index uint64) (*raftpb.Entry, error)
func (*MemoryStorage) LastIndex ¶
func (m *MemoryStorage) LastIndex() (uint64, error)
type NetTransport ¶
type NetTransport struct { }
type RPC ¶
type RPC struct { Request interface{} ResponseCh chan RPCResponse }
type RPCResponse ¶
type RPCResponse struct { Err error Data interface{} }
type Raft ¶
func NewRaft ¶
func NewRaft(storage StorageInterface, conf Config, transport TransportInterface, sm StateMachineInterface, snapshotStore SnapshotStoreInterface) (*Raft, error)
func (*Raft) ApplyChannel ¶
func (r *Raft) ApplyChannel() chan *ApplyCommand
func (*Raft) ConfigurationChangeChannel ¶
func (r *Raft) ConfigurationChangeChannel() chan *ConfigurationChangeFuture
type Replication ¶
func NewReplication ¶
func NewReplication(server Server, raft *Raft, stepDownCh chan struct{}) *Replication
func (*Replication) Do ¶
func (r *Replication) Do()
func (*Replication) FlushAndStop ¶
func (r *Replication) FlushAndStop(lastIndex uint64)
func (*Replication) SetPrevIndex ¶
func (r *Replication) SetPrevIndex(index uint64)
func (*Replication) String ¶
func (r *Replication) String() string
type Server ¶
type Server struct { // Network address Address ServerAddress // Identifier ID ServerId Suffrage Suffrage }
type ServerAddress ¶
type ServerAddress string
type ServerInterface ¶
type ServerInterface interface { ApplyChannel() chan *ApplyCommand ConfigurationChangeChannel() chan *ConfigurationChangeFuture Done() chan struct{} }
type Snapshot ¶
type Snapshot struct { ID string `json:"id"` LastIndex uint64 `json:"lastIndex"` LastTerm uint64 `json:"lastTerm"` Configuration Configuration `json:"configuration"` }
type SnapshotStoreInterface ¶
type StateMachineInterface ¶
type StateMachineInterface interface { Apply(entry *raftpb.Entry) Snapshot() (*StateMachineSnapshot, error) Recover(snapshot *Snapshot) error }
type StateMachineSnapshot ¶
type StateMachineSnapshot struct {
// contains filtered or unexported fields
}
type Storage ¶
type Storage struct {
// contains filtered or unexported fields
}
func (*Storage) AppendEntries ¶
func (*Storage) FirstIndex ¶
type StorageInterface ¶
type StorageInterface interface { LogInterface SetUint64(key []byte, val uint64) error GetUint64(key []byte) (uint64, error) Set(key []byte, val []byte) error Get(key []byte) ([]byte, error) }
func NewMemoryStorage ¶
func NewMemoryStorage() StorageInterface
func NewStorage ¶
func NewStorage(path string) (StorageInterface, error)
type TransportInterface ¶
Source Files ¶
Click to show internal directories.
Click to hide internal directories.