raft

package module
v0.0.0-...-aa481f8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 17, 2023 License: MIT Imports: 15 Imported by: 1

README

Raft

Godoc Go Report Card

Implement raft consensus protocol .

Features

  • Leader election
  • Log replication
  • Membership changes (use joint consensus instead of single-server changes)
  • Log compaction

References

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrStopped       = errors.New("err: raft consensus module has been stopped")
	ErrRanRepeatedly = errors.New("err: raft consensus module can not bee ran repeatedly")
)
View Source
var (
	ErrIsNotLeader = errors.New("err: raft consensuse module isn't at Leader state")
)

Functions

This section is empty.

Types

type AppendEntriesArgs

type AppendEntriesArgs struct {
	// leader’s term
	Term uint64
	// so follower can redirect clients
	LeaderId RaftId

	// index of log entry immediately preceding new ones
	PrevLogIndex uint64
	// term of prevLogIndex entry
	PrevLogTerm uint64

	// log entries to store (empty for heartbeat;
	// may send more than one for efficiency)
	Entries []LogEntry

	// leader’s commitIndex
	LeaderCommit uint64
}

AppendEntriesArgs

type AppendEntriesResults

type AppendEntriesResults struct {
	// currentTerm
	Term uint64
	// for leader to update itself success true
	// if follower contained entry matching
	Success bool
}

AppendEntriesResults

type Apply

type Apply func(commands Commands) (appliedCount int, err error)

Apply 依序应用 commands 到状态机中 返回 应用的 Command 数量 appliedCount

type Command

type Command []byte

Command 一致性模型需要提交, 状态机需要处理的命令

type Commands

type Commands interface {
	// 获取命令序列
	Data() []Command
}

Commands 状态机需依序处理的命令

type Log

type Log interface {
	// Get 获取 raft log 中索引为 index 的 log entry term
	// 若无, 则返回 0, nil
	Get(index uint64) (term uint64, err error)
	// Match 是否有匹配上 term 与 index 的 log entry
	Match(index, term uint64) (bool, error)
	// Last 返回最后一个 log entry 的 term 与 index
	// 若无, 则返回 0 , 0
	Last() (index, term uint64, err error)
	// RangeGet 获取在 (i, j] 索引区间的 log entry
	// 若无, 则返回 nil, nil
	RangeGet(i, j uint64) ([]LogEntry, error)
	// AppendAfter 在afterIndex之后追加 log entry
	AppendAfter(afterIndex uint64, entries ...LogEntry) error
	// Append 追加log entry
	Append(entries ...LogEntry) error
	// AppendEntry 追加一个 log entry , 并返回索引
	AppendEntry(entry LogEntry) (index uint64, err error)
}

Log raft log

type LogEntry

type LogEntry struct {
	Index      uint64
	Term       uint64
	Type       LogEntryType
	Command    Command
	AppendTime time.Time
}

LogEntry raft log entry

each entry contains command for state machine,
and term when entry was received by leader (first index is 1)

type LogEntryType

type LogEntryType uint8

type Logger

type Logger interface {
	Debug(format string, args ...interface{})
}

type OptFn

type OptFn func(*opts)

OptFn raft 配置可选项

func WithBootstrapAsLeader

func WithBootstrapAsLeader() OptFn

WithBootstrapAsLeader bootstrap raft consensus module as leader

func WithElection

func WithElection(min, max time.Duration) OptFn

WithElection 提供选举超时范围

func WithLogger

func WithLogger(logger Logger) OptFn

WithLogger

func WithRPC

func WithRPC(rpc RPC) OptFn

WithRPC 提供 rpc 可选项

type RPC

type RPC interface {
	Listen(addr string) error
	Serve() error
	Register(RPCService) error
	Close() error

	CallAppendEntries(addr RaftAddr, args AppendEntriesArgs) (AppendEntriesResults, error)
	CallRequestVote(addr RaftAddr, args RequestVoteArgs) (RequestVoteResults, error)
}

RPC raft rpc client and register

type RPCService

type RPCService interface {
	AppendEntries(args AppendEntriesArgs, results *AppendEntriesResults) error
	RequestVote(args RequestVoteArgs, results *RequestVoteResults) error
}

RPCService raft rpc service

type Raft

type Raft interface {
	// Id 获取 raft 一致性模型 id
	Id() RaftId
	// Addr 获取 raft 一致性模型 rpc addr
	Addr() RaftAddr

	// Run 启动 raft 一致性模型
	Run() error
	// Stop 停止 raft 一致性模型
	Stop()
	// Done 是否已经停止
	Done() <-chan struct{}

	// Handle 处理 cmd
	//
	// append log entry --> log replication --> apply to state matchine
	Handle(ctx context.Context, cmd ...Command) error
	// IsLeader 是否是 Leader
	IsLeader() bool

	// ChangeConfig add added and remove removed
	ChangeConfig(ctx context.Context, added []RaftPeer, removed []RaftId) error
}

Raft raft 一致性模型

func New

func New(id RaftId, addr RaftAddr, apply Apply, store Store, log Log, optFns ...OptFn) (Raft, error)

New 实例化一个 raft 一致性模型

type RaftAddr

type RaftAddr string

RaftAddr raft 一致性模型 rpc 通信地址

type RaftId

type RaftId string

RaftId raft 一致性模型 id

type RaftPeer

type RaftPeer struct {
	Id   RaftId
	Addr RaftAddr
}

RaftPeer raft peer

func (RaftPeer) String

func (p RaftPeer) String() string

type RequestVoteArgs

type RequestVoteArgs struct {
	// term candidate’s term
	Term uint64
	// candidateId candidate requesting vote
	CandidateId RaftId

	// lastLogIndex index of candidate’s last log entry (§5.4)
	LastLogIndex uint64
	// lastLogTerm term of candidate’s last log entry (§5.4)
	LastLogTerm uint64
}

RequestVoteArgs

type RequestVoteResults

type RequestVoteResults struct {
	// currentTerm, for candidate to update itself
	Term uint64
	// true means candidate received vote
	VoteGranted bool
}

RequestVoteResults

type Store

type Store interface {
	Set(key []byte, val []byte) error
	// Get returns the value for key, or an empty byte slice if key was not found.
	Get(key []byte) ([]byte, error)

	SetUint64(key []byte, val uint64) error
	// GetUint64 returns the uint64 value for key, or 0 if key was not found.
	GetUint64(key []byte) (uint64, error)
}

Store is used to provide stable storage of key configurations to ensure safety.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL