server

package
v0.0.0-...-cff8cb6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 6, 2022 License: MIT Imports: 18 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type CMRole

type CMRole int

节点角色

const (
	Follower CMRole = iota
	Candidate
	Leader
	Learner
	Dead
)

learener 不参与投票, 也不算在quorum. 只接收 leader 的 append log 请求, 并生成 snapshot learner 将本地 snapshot 通过网络 rpc 传送给 leader 和 follower

func (CMRole) String

func (s CMRole) String() string

type CommandEtnry

type CommandEtnry struct {
	Op   string
	Size int
}

type Config

type Config struct {
	LocalID      string   `toml:"localID"`
	IsLearner    bool     `toml:"isLearner"`
	Peers        []string `toml:"peers"`
	Learner      string   `toml:"learner"`
	WalDir       string   `toml:"walDir"`
	SnapDir      string   `toml:"snapDir"`
	MaxIndexSpan uint64   `toml:"maxIndexSpan"`
}

type LogEntry

type LogEntry struct {
	Command CommandEtnry
	Term    uint64
	Index   uint64
}

type MessageType

type MessageType int
const (
	MsgVote MessageType = iota
	MsgVoteResp
	MsgHeartbeat
	MsgHeartbeatResp
	MsgAppendLog
	MsgAppendLogResp
)

func (MessageType) String

func (mt MessageType) String() string

type Peer

type Peer struct {
	Addr string
	Role CMRole
}

func (Peer) Empty

func (p Peer) Empty() bool

func (Peer) Equal

func (p Peer) Equal(x Peer) bool

type Persistence

type Persistence struct {
	Term     uint64
	Index    uint64
	WorkPath string
	FilePath string
}

func NewPersistence

func NewPersistence(term, index uint64, workPath string) *Persistence

func (*Persistence) Append

func (p *Persistence) Append(logEntry *LogEntry) error

Append 在文件末尾追加写一条raft操作日志

func (*Persistence) Load

func (p *Persistence) Load(filePath string, startIndex uint64) ([]LogEntry, error)

Load 加载磁盘文件到内存

func (*Persistence) SetPath

func (p *Persistence) SetPath()

type RequestAppend

type RequestAppend struct {
	Type            MessageType
	Term            uint64
	LeaderID        Peer
	PreLogIndex     uint64
	PreLogTerm      uint64
	Entries         []LogEntry
	LeaderCommitted uint64
}

复制日志rpc请求

type RequestVote

type RequestVote struct {
	Type MessageType
	// 发起投票请求节点的当前任期号
	Term        uint64
	CandidateID Peer
	// 发起投票节点在日志中的最后任期号
	LastTerm uint64
	// 发起投票节点在日志中的最后编号
	LastIndex uint64
}

投票请求

type ResponseAppend

type ResponseAppend struct {
	Term    uint64
	Success bool
}

复制日志rpc响应

type ResponseVote

type ResponseVote struct {
	// 接收节点所在的任期
	Term uint64
	// true: 赞成; false: 反对
	VoteGranted bool
}

投票响应

type Server

type Server struct {
	// 本节点
	LocalID Peer
	// 集群中其他节点
	Peers []Peer

	// 每个Server既是服务器端, 也是客户端.
	// server接受其他Server的请求
	RpcServer *rpc.Server

	// 状态
	Role CMRole
	// 节点所处的任期
	Term uint64
	// 节点上内存态的命令日志, 待刷入持久化存储
	Logs []LogEntry
	// 投票支持的节点. 空:未投票
	VotedFor Peer
	// 已提交的最新日志编号
	CommittedIndex uint64
	// 已应用到业务逻辑状态机中的最新日志索引
	AppliedIndex uint64
	// 选举超时时间, 开始时间. 每次超时检测完成后, 重置
	ElectionTimeStart time.Time
	// 超时时间间隔, now - ElectionTimeStart > TimeOut, 节点开始发起选举投票
	TimeOut time.Duration
	// 超时时间随机因子
	TimeOutRandomFactor float64
	// 节点状态锁
	MuLock sync.Mutex

	// 持久化存储
	Persist *Persistence
	// 快照处理器
	Snap *Snapshotter

	// 日志中最新apply index 和 日志开头apply index的差值达到
	// MaxIndexSpan, 则可以做快照
	MaxIndexSpan uint64
	// 是否在做快照中
	IsSnaping bool

	// ************* leader 仅有的字段 *******
	// leader记录其他每个Server应该接受的下个日志编号
	NextIndex map[string]uint64
	// contains filtered or unexported fields
}

func InitServer

func InitServer(conf Config) (*Server, error)

func (*Server) Addr

func (nd *Server) Addr() string

func (*Server) AppendEntryHandler

func (s *Server) AppendEntryHandler(req *RequestAppend, resp *ResponseAppend) error

func (*Server) Do

func (s *Server) Do(command CommandEtnry) error

func (*Server) Elect

func (nd *Server) Elect()

func (*Server) MaybeStartSnap

func (s *Server) MaybeStartSnap() bool

func (*Server) Run

func (s *Server) Run()

阻塞运行, 直到节点退出

func (*Server) RunElectionTimer

func (nd *Server) RunElectionTimer()

周期性检查是否发起选举投票

func (*Server) RunHeartbeatTimer

func (nd *Server) RunHeartbeatTimer()

周期性发送心跳

func (*Server) SendHeartbeat

func (s *Server) SendHeartbeat()

发送一次心跳

func (*Server) VoteHandler

func (nd *Server) VoteHandler(req RequestVote, resp *ResponseVote) error

func (*Server) WriteLog

func (s *Server) WriteLog(command CommandEtnry) (LogEntry, error)

leader 向 follower 复制日志

type Snapshot

type Snapshot struct {
	Data     []byte
	Metadata SnapshotMetadata
}

type SnapshotMetadata

type SnapshotMetadata struct {
	Index uint64
	Term  uint64
}

type Snapshotter

type Snapshotter struct {
	StartTerm  uint64
	StartIndex uint64
	EndTerm    uint64
	EndIndex   uint64
	WorkPath   string
	FilePath   string
}

func NewSnap

func NewSnap(term, index uint64, workPath string) *Snapshotter

func (*Snapshotter) GetPath

func (sp *Snapshotter) GetPath() string

func (*Snapshotter) Load

func (sp *Snapshotter) Load(filePath string) (*Snapshot, error)

Load 加载快照文件到快照的内存结构

func (*Snapshotter) Save

func (sp *Snapshotter) Save(snapshot *Snapshot) error

Save 保存业务层的快照数据到磁盘文件

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL