raft

package
v0.0.0-...-668a133 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 29, 2019 License: MIT Imports: 19 Imported by: 0

Documentation

Index

Constants

View Source
const (
	StateFollower  = "Follower"
	StateCandidate = "Candidate"
	StateLeader    = "Leader"
	StateUnknown   = "Unknown"
)
View Source
const (
	FailureRetryInterval = 10 * time.Millisecond
	MaxFailures          = 8
)

Variables

View Source
var (
	ErrorTimeout = errors.New("timeout")
	ErrorSopped  = errors.New("raft stopped")
)
View Source
var (
	KeyCurrentTerm = []byte("CurrentTerm")
	KeyVoteFor     = []byte("VoteFor")
)
View Source
var (
	BucketConfName   = []byte("conf")
	ErrorKeyNotFound = errors.New("key not found")
)

Functions

func BackOff

func BackOff(base time.Duration, power, limit uint64) time.Duration

func GenerateUUID

func GenerateUUID() string

func KeyFunc

func KeyFunc(address ServerAddress, id ServerId) string

func NewNetTransport

func NewNetTransport()

func RandomTimeout

func RandomTimeout(timeout time.Duration) (<-chan time.Time, time.Duration)

func ResolveDataDir

func ResolveDataDir(dir string) (string, error)

func SnapshotName

func SnapshotName(index, term uint64) string

Types

type ApplyCommand

type ApplyCommand struct {
	*Future
	// contains filtered or unexported fields
}

func NewApplyCommand

func NewApplyCommand(cmd []byte) *ApplyCommand

type ChannelTransport

type ChannelTransport struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

func NewChannelTransport

func NewChannelTransport(localAddr string, timeout time.Duration) *ChannelTransport

func (*ChannelTransport) Channel

func (c *ChannelTransport) Channel() chan *RPC

func (*ChannelTransport) Connect

func (c *ChannelTransport) Connect(address ServerAddress, ct *ChannelTransport)

func (*ChannelTransport) Consumer

func (c *ChannelTransport) Consumer() <-chan *RPC

func (*ChannelTransport) Disconnect

func (c *ChannelTransport) Disconnect(address ServerAddress)

func (*ChannelTransport) LocalAddr

func (c *ChannelTransport) LocalAddr() ServerAddress

func (*ChannelTransport) SendAppendEntries

func (c *ChannelTransport) SendAppendEntries(server Server, request *raftpb.AppendEntriesRequest) (*raftpb.AppendEntriesReply, error)

func (*ChannelTransport) SendRequestVote

func (c *ChannelTransport) SendRequestVote(server Server, request *raftpb.RequestVoteRequest) (*raftpb.RequestVoteReply, error)

type Client

type Client struct {
	// contains filtered or unexported fields
}

func (*Client) AddNonVoter

func (c *Client) AddNonVoter(id ServerId, addr ServerAddress, prevIndex uint64, timeout time.Duration) FutureInterface

func (*Client) AddVoter

func (c *Client) AddVoter(id ServerId, addr ServerAddress, prevIndex uint64, timeout time.Duration) FutureInterface

func (*Client) Apply

func (c *Client) Apply(cmd []byte, timeout time.Duration) FutureInterface

type ClientInterface

type ClientInterface interface {
	Apply(cmd []byte, timeout time.Duration) FutureInterface
	AddNonVoter(id ServerId, addr ServerAddress, prevIndex uint64, timeout time.Duration) FutureInterface
	AddVoter(id ServerId, addr ServerAddress, prevIndex uint64, timeout time.Duration) FutureInterface
}

func NewClient

func NewClient(raft ServerInterface) ClientInterface

type Config

type Config struct {

	// A follower receive no communication over a period of time.
	ElectionTimeout time.Duration

	HeartbeatInterval time.Duration

	LocalId ServerId

	SnapshotThreshold uint64
	SnapshotInterval  time.Duration

	MaxLogEntriesAfterSnapshot uint64
}

func NewDefaultConfig

func NewDefaultConfig() *Config

type Configuration

type Configuration struct {
	Servers []Server
}

type ConfigurationChangeFuture

type ConfigurationChangeFuture struct {
	*Future
	// contains filtered or unexported fields
}

func NewConfigurationChangeFuture

func NewConfigurationChangeFuture(req *raftpb.ConfigurationChangeRequest) *ConfigurationChangeFuture

type Future

type Future struct {
	// contains filtered or unexported fields
}

func NewFailedFuture

func NewFailedFuture(err error) *Future

func NewFuture

func NewFuture() *Future

func NewSucceedFuture

func NewSucceedFuture(result interface{}) *Future

func (*Future) Complete

func (f *Future) Complete(result interface{})

func (*Future) Error

func (f *Future) Error() error

func (*Future) Fail

func (f *Future) Fail(err error)

func (*Future) Failed

func (f *Future) Failed() bool

func (*Future) Result

func (f *Future) Result() interface{}

func (*Future) Succeeded

func (f *Future) Succeeded() bool

type FutureInterface

type FutureInterface interface {
	Error() error
	Result() interface{}
	Fail(err error)
	Complete(result interface{})
	Failed() bool
	Succeeded() bool
}

type Leader

type Leader struct {
	// contains filtered or unexported fields
}

type LogInterface

type LogInterface interface {
	GetEntry(index uint64) (*raftpb.Entry, error)
	DeleteRange(from uint64, to uint64) error
	AppendEntries(entry []*raftpb.Entry) error
	LastIndex() (uint64, error)
	FirstIndex() (uint64, error)
}

type MemorySnapshotStore

type MemorySnapshotStore struct {
	// contains filtered or unexported fields
}

func NewMemorySnapshotStore

func NewMemorySnapshotStore() *MemorySnapshotStore

func (*MemorySnapshotStore) List

func (m *MemorySnapshotStore) List() ([]*Snapshot, error)

func (*MemorySnapshotStore) Open

func (m *MemorySnapshotStore) Open(id string) (*Snapshot, error)

func (*MemorySnapshotStore) Save

func (m *MemorySnapshotStore) Save(lastIndex, lastTerm uint64, configuration Configuration) (*Snapshot, error)

type MemoryStateMachine

type MemoryStateMachine struct {
	StateMachineSnapshot
	// contains filtered or unexported fields
}

func NewMemoryStateMachine

func NewMemoryStateMachine() *MemoryStateMachine

func (*MemoryStateMachine) Apply

func (s *MemoryStateMachine) Apply(entry *raftpb.Entry)

func (*MemoryStateMachine) Recover

func (s *MemoryStateMachine) Recover(snapshot *Snapshot) error

func (*MemoryStateMachine) Snapshot

func (s *MemoryStateMachine) Snapshot() (*StateMachineSnapshot, error)

type MemoryStorage

type MemoryStorage struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

func (*MemoryStorage) AppendEntries

func (m *MemoryStorage) AppendEntries(entries []*raftpb.Entry) error

func (*MemoryStorage) DeleteRange

func (m *MemoryStorage) DeleteRange(from uint64, to uint64) error

func (*MemoryStorage) FirstIndex

func (m *MemoryStorage) FirstIndex() (uint64, error)

func (*MemoryStorage) Get

func (m *MemoryStorage) Get(key []byte) ([]byte, error)

func (*MemoryStorage) GetEntry

func (m *MemoryStorage) GetEntry(index uint64) (*raftpb.Entry, error)

func (*MemoryStorage) GetUint64

func (m *MemoryStorage) GetUint64(key []byte) (uint64, error)

func (*MemoryStorage) LastIndex

func (m *MemoryStorage) LastIndex() (uint64, error)

func (*MemoryStorage) Set

func (m *MemoryStorage) Set(key []byte, val []byte) error

func (*MemoryStorage) SetUint64

func (m *MemoryStorage) SetUint64(key []byte, val uint64) error

type NetTransport

type NetTransport struct {
}

type Node

type Node struct {
	// contains filtered or unexported fields
}

type RPC

type RPC struct {
	Request    interface{}
	ResponseCh chan RPCResponse
}

func (*RPC) Reply

func (r *RPC) Reply(resp interface{}, err error)

type RPCResponse

type RPCResponse struct {
	Err  error
	Data interface{}
}

type Raft

type Raft struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

func NewRaft

func NewRaft(storage StorageInterface, conf Config, transport TransportInterface, sm StateMachineInterface, snapshotStore SnapshotStoreInterface) (*Raft, error)

func (*Raft) ApplyChannel

func (r *Raft) ApplyChannel() chan *ApplyCommand

func (*Raft) ConfigurationChangeChannel

func (r *Raft) ConfigurationChangeChannel() chan *ConfigurationChangeFuture

func (*Raft) Done

func (r *Raft) Done() chan struct{}

func (*Raft) SetState

func (r *Raft) SetState(state State)

func (*Raft) Start

func (r *Raft) Start() error

func (*Raft) State

func (r *Raft) State() State

func (*Raft) Stop

func (r *Raft) Stop()

func (*Raft) String

func (r *Raft) String() string

type Replication

type Replication struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

func NewReplication

func NewReplication(server Server, raft *Raft, stepDownCh chan struct{}) *Replication

func (*Replication) Do

func (r *Replication) Do()

func (*Replication) FlushAndStop

func (r *Replication) FlushAndStop(lastIndex uint64)

func (*Replication) SetPrevIndex

func (r *Replication) SetPrevIndex(index uint64)

func (*Replication) String

func (r *Replication) String() string

type Server

type Server struct {

	// Network address
	Address ServerAddress

	// Identifier
	ID ServerId

	Suffrage Suffrage
}

func (*Server) String

func (s *Server) String() string

type ServerAddress

type ServerAddress string

type ServerId

type ServerId string

type ServerInterface

type ServerInterface interface {
	ApplyChannel() chan *ApplyCommand
	ConfigurationChangeChannel() chan *ConfigurationChangeFuture
	Done() chan struct{}
}

type Snapshot

type Snapshot struct {
	ID            string        `json:"id"`
	LastIndex     uint64        `json:"lastIndex"`
	LastTerm      uint64        `json:"lastTerm"`
	Configuration Configuration `json:"configuration"`
}

type SnapshotStoreInterface

type SnapshotStoreInterface interface {
	Save(lastIndex, lastTerm uint64, configuration Configuration) (*Snapshot, error)

	List() ([]*Snapshot, error)

	Open(id string) (*Snapshot, error)
}

type State

type State string

type StateMachineInterface

type StateMachineInterface interface {
	Apply(entry *raftpb.Entry)
	Snapshot() (*StateMachineSnapshot, error)
	Recover(snapshot *Snapshot) error
}

type StateMachineSnapshot

type StateMachineSnapshot struct {
	// contains filtered or unexported fields
}

type Storage

type Storage struct {
	// contains filtered or unexported fields
}

func (*Storage) AppendEntries

func (s *Storage) AppendEntries(entry []*raftpb.Entry) error

func (*Storage) DeleteRange

func (s *Storage) DeleteRange(from uint64, to uint64) error

func (*Storage) FirstIndex

func (s *Storage) FirstIndex() (uint64, error)

func (*Storage) Get

func (s *Storage) Get(key []byte) ([]byte, error)

func (*Storage) GetEntry

func (s *Storage) GetEntry(index uint64) (*raftpb.Entry, error)

func (*Storage) GetUint64

func (s *Storage) GetUint64(key []byte) (uint64, error)

func (*Storage) LastIndex

func (s *Storage) LastIndex() (uint64, error)

func (*Storage) Set

func (s *Storage) Set(key, value []byte) error

func (*Storage) SetUint64

func (s *Storage) SetUint64(key []byte, value uint64) error

type StorageInterface

type StorageInterface interface {
	LogInterface
	SetUint64(key []byte, val uint64) error

	GetUint64(key []byte) (uint64, error)

	Set(key []byte, val []byte) error

	Get(key []byte) ([]byte, error)
}

func NewMemoryStorage

func NewMemoryStorage() StorageInterface

func NewStorage

func NewStorage(path string) (StorageInterface, error)

type Suffrage

type Suffrage string
const (
	SuffrageVoter Suffrage = "voter"
)

type TransportInterface

type TransportInterface interface {
	SendAppendEntries(server Server, request *raftpb.AppendEntriesRequest) (*raftpb.AppendEntriesReply, error)
	SendRequestVote(server Server, request *raftpb.RequestVoteRequest) (*raftpb.RequestVoteReply, error)
	LocalAddr() ServerAddress
	Consumer() <-chan *RPC
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL