papillon

package module
v0.0.0-...-356cffa Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 20, 2023 License: MIT Imports: 34 Imported by: 1

README

papillon

raft 协议实现

线程模型

  • 主线程:
    1. 跟随者、候选人、领导者三个状态的切换以及命令的接收
    2. 需要和快照线程、状态机线程交互
    • 复制线程:
      1. 由领导者创建每个跟随者都对应一个线程,执行具体的复制逻辑
  • 状态机线程:
    1. 日志提交后由主线程触发提交给状态机线程并应用到状态机中
    2. 由主线程引导应用快照到状态机中
    3. 由快照线程触发生成快照
  • 快照线程:
    1. 负责定时生成快照,
    2. 由外部触发获取一个当前快照

Documentation

Index

Constants

View Source
const (
	RpcVoteRequest rpcType = iota + 1
	RpcAppendEntry
	RpcAppendEntryPipeline
	RpcInstallSnapshot
	RpcFastTimeout
)
View Source
const (

	// DefaultTimeoutScale is the default TimeoutScale in a NetworkTransport.
	DefaultTimeoutScale = 256 * 1024 // 256KB
)

Variables

View Source
var (
	ErrNotExist                        = errors.New("not exist")
	ErrPipelineReplicationNotSupported = errors.New("pipeline replication not supported")
	ErrNotFound                        = customError{"not found"}
	ErrNotLeader                       = errors.New("not leader")
	ErrCantBootstrap                   = errors.New("bootstrap only works on new clusters")
	ErrIllegalConfiguration            = errors.New("illegal clusterInfo")
	ErrShutDown                        = errors.New("shut down")
	ErrNotStarted                      = errors.New("not started")
	ErrLeadershipTransferInProgress    = errors.New("leader ship transfer in progress")
	// ErrAbortedByRestore is returned when a leader fails to commit a log
	// entry because it's been superseded by a user snapshot restore.
	ErrAbortedByRestore       = errors.New("snapshot restored while committing log")
	ErrEnqueueTimeout         = errors.New("timed out enqueuing operation")
	ErrTimeout                = errors.New("time out")
	ErrPipelineShutdown       = errors.New("append pipeline closed")
	ErrNotVoter               = errors.New("not voter")
	ErrLeadershipTransferFail = errors.New("not found transfer peer")
	ErrLeadershipLost         = errors.New("leadership lost")
	ErrNothingNewToSnapshot   = errors.New("nothing new to snapshot")
	ErrEmptyCommit            = errors.New("empty commit")
	ErrPrevLogNotMatch        = errors.New("prev log term not match")
)
View Source
var (
	FutureErrTimeout   = errors.New("time out")
	FutureErrNotLeader = errors.New("not leader")
)
View Source
var (
	ErrKeyIsNil   = customError{"key is nil"}
	ErrValueIsNil = customError{"value is nil"}
	ErrRange      = customError{"from must no bigger than to"}
)
View Source
var (
	KeyCurrentTerm  = []byte("CurrentTerm")
	KeyLastVoteFor  = []byte("LastVoteFor")
	KeyLastVoteTerm = []byte("LastVoteTerm")
)

Functions

func EncodeCluster

func EncodeCluster(c ClusterInfo) (data []byte)

func NewMemFSM

func NewMemFSM() *memFSM

func NewMemRpc

func NewMemRpc(localAddr string) *memRPC

func NewMemSnapShot

func NewMemSnapShot() *memSnapshot

func ValidateConfig

func ValidateConfig(c *Config) (bool, string)

Types

type AppendEntriesFuture

type AppendEntriesFuture interface {
	Future[*AppendEntryResponse]
	StartAt() time.Time
	Request() *AppendEntryRequest
}

type AppendEntryPipeline

type AppendEntryPipeline interface {
	AppendEntries(*AppendEntryRequest) (AppendEntriesFuture, error)
	Consumer() <-chan AppendEntriesFuture
	Close() error
}

type AppendEntryRequest

type AppendEntryRequest struct {
	*RPCHeader
	Term         uint64
	PrevLogIndex uint64
	PrevLogTerm  uint64
	Entries      []*LogEntry
	LeaderCommit uint64
}

AppendEntryRequest 追加日志

type AppendEntryResponse

type AppendEntryResponse struct {
	*RPCHeader
	Term        uint64
	Success     bool
	LatestIndex uint64 // peer 当前保存最新的日志 index ,用于新节点快速定位 nextIndex
}

type ApplyFuture

type ApplyFuture interface {
	IndexFuture
	Future[nilRespFuture]
}

type BatchFSM

type BatchFSM interface {
	FSM
	BatchApply([]*LogEntry) []interface{}
}

type CacheLog

type CacheLog struct {
	// contains filtered or unexported fields
}

CacheLog 带缓存的 LogStore 用于减少磁盘 IO,只在执行 SetLogs, DeleteRange 时更新 cache 以保证局部性

func (*CacheLog) DeleteRange

func (c *CacheLog) DeleteRange(from, to uint64) error

func (*CacheLog) FirstIndex

func (c *CacheLog) FirstIndex() (uint64, error)

func (*CacheLog) GetLog

func (c *CacheLog) GetLog(index uint64) (log *LogEntry, err error)

func (*CacheLog) GetLogRange

func (c *CacheLog) GetLogRange(from, to uint64) (logs []*LogEntry, err error)

func (*CacheLog) LastIndex

func (c *CacheLog) LastIndex() (uint64, error)

func (*CacheLog) SetLogs

func (c *CacheLog) SetLogs(logs []*LogEntry) error

type ClusterInfo

type ClusterInfo struct {
	Servers []ServerInfo
}

func DecodeCluster

func DecodeCluster(data []byte) (c ClusterInfo)

func (*ClusterInfo) Clone

func (c *ClusterInfo) Clone() (copy ClusterInfo)

type Config

type Config struct {
	ElectionTimeout         time.Duration
	HeartbeatTimeout        time.Duration
	LeaderLeaseTimeout      time.Duration
	ApplyBatch              bool
	MaxAppendEntries        uint64
	CommitTimeout           time.Duration
	SnapshotInterval        time.Duration
	SnapshotThreshold       uint64
	TrailingLogs            uint64
	Logger                  Logger
	LocalID                 string
	LeadershipCatchUpRounds uint
	LeadershipLostShutDown  bool
	Debug                   bool // 开启后使用 expvar 包导出 raft 状态信息,可以使用 /debug/vars 路由进行访问
}

func DefaultConfig

func DefaultConfig() *Config

type ConfigurationStorage

type ConfigurationStorage interface {
	KVStorage
	SetConfiguration(index uint64, configuration ClusterInfo) error
}

type DataBus

type DataBus struct {
	// contains filtered or unexported fields
}

DataBus 提供发布、订阅功能

func (*DataBus) AddObserver

func (d *DataBus) AddObserver(obs observer)

AddObserver 添加订阅者

func (*DataBus) Publish

func (d *DataBus) Publish(event int, param interface{})

Publish 触发事件

type DefaultPackageParser

type DefaultPackageParser struct{}

func (*DefaultPackageParser) Decode

func (d *DefaultPackageParser) Decode(reader *bufio.Reader) (rpcType, []byte, error)

func (*DefaultPackageParser) Encode

func (d *DefaultPackageParser) Encode(writer *bufio.Writer, rpcType rpcType, data []byte) (err error)

type FSM

type FSM interface {
	Apply(*LogEntry) interface{}
	ReStore(reader io.ReadCloser) error // 从快照恢复,需要自行实现觅等
	Snapshot() (FsmSnapshot, error)
}

type FastTimeoutRequest

type FastTimeoutRequest struct {
	*RPCHeader
	Term               uint64
	LeaderShipTransfer bool
}

FastTimeoutRequest 引导 leader 直接超时

type FastTimeoutResponse

type FastTimeoutResponse struct {
	*RPCHeader
	Success bool
}

type FileSnapshot

type FileSnapshot struct {
	// contains filtered or unexported fields
}

func NewFileSnapshot

func NewFileSnapshot(dirPath string, noSync bool, retainCount int) (*FileSnapshot, error)

func (*FileSnapshot) Create

func (f *FileSnapshot) Create(version SnapshotVersion, index, term uint64, configuration ClusterInfo, configurationIndex uint64, rpc RpcInterface) (SnapshotSink, error)

func (*FileSnapshot) List

func (f *FileSnapshot) List() (list []*SnapshotMeta, err error)

func (*FileSnapshot) Open

type FileSnapshotSink

type FileSnapshotSink struct {
	// contains filtered or unexported fields
}

func (*FileSnapshotSink) Cancel

func (f *FileSnapshotSink) Cancel() error

func (*FileSnapshotSink) Close

func (f *FileSnapshotSink) Close() error

func (*FileSnapshotSink) ID

func (f *FileSnapshotSink) ID() string

func (*FileSnapshotSink) Write

func (f *FileSnapshotSink) Write(p []byte) (n int, err error)

type FileWithSync

type FileWithSync interface {
	fs.File
	Sync() error
}

type FsmSnapshot

type FsmSnapshot interface {
	Persist(sink SnapshotSink) error
	Release()
}

type Future

type Future[T any] interface {
	Response() (T, error)
}

Future 用于异步提交,Response 会同步返回,可以重复调用

type IndexFuture

type IndexFuture interface {
	Index() uint64
	// contains filtered or unexported methods
}

type InstallSnapshotRequest

type InstallSnapshotRequest struct {
	*RPCHeader
	SnapshotMeta *SnapshotMeta
	Term         uint64
}

InstallSnapshotRequest 安装快照

type InstallSnapshotResponse

type InstallSnapshotResponse struct {
	*RPCHeader
	Term    uint64
	Success bool
}

type JsonRpcHandler

type JsonRpcHandler struct{}

JsonRpcHandler 提供 json 的序列化能力

func (*JsonRpcHandler) Deserialization

func (j *JsonRpcHandler) Deserialization(data []byte, i interface{}) error

func (*JsonRpcHandler) Serialization

func (j *JsonRpcHandler) Serialization(i interface{}) (bytes []byte, err error)

type KVStorage

type KVStorage interface {
	// Get 用于存储日志
	Get(key []byte) (val []byte, err error)
	// Set 用于存储日志
	Set(key, val []byte) error

	// SetUint64 用于存储任期
	SetUint64(key []byte, val uint64) error
	// GetUint64 用于返回任期
	GetUint64(key []byte) (uint64, error)
}

KVStorage 提供稳定存储的抽象

type KvSchema

type KvSchema [2]string

func (KvSchema) Encode

func (s KvSchema) Encode(k, v string) []byte

type LogEntry

type LogEntry struct {
	Index     uint64    // 日志的日志索引
	Term      uint64    // 创建日志时的任期
	Data      []byte    // 日志内容
	Type      LogType   // 日志类型
	CreatedAt time.Time // 创建时间
}

LogEntry 日志条目,毕传 Data 、Type 字段,Index、Term、CreatedAt 字段会在 applyLog 方法中默认设置

type LogFuture

type LogFuture struct {
	// contains filtered or unexported fields
}

func (*LogFuture) Index

func (l *LogFuture) Index() uint64

func (*LogFuture) Response

func (d *LogFuture) Response() (T, error)

type LogStore

type LogStore interface {
	// FirstIndex 返回第一个写入的索引,-1 代表没有
	FirstIndex() (uint64, error)
	// LastIndex 返回最后一个写入的索引,-1 代表没有
	LastIndex() (uint64, error)
	// GetLog 返回指定位置的索引
	GetLog(index uint64) (log *LogEntry, err error)
	// GetLogRange 按指定范围遍历索引,闭区间
	GetLogRange(from, to uint64) (log []*LogEntry, err error)
	// SetLogs 追加日志
	SetLogs(logs []*LogEntry) error
	// DeleteRange 批量删除指定范围的索引内容,用于快照生成
	DeleteRange(from, to uint64) error
}

LogStore 提供日志操作的抽象

func NewCacheLog

func NewCacheLog(store LogStore, capacity uint64) LogStore

NewCacheLog capacity 必须大于 0

type LogType

type LogType uint8 // 日志类型
const (
	LogCommand LogType = iota + 1 // 用户命令
	LogBarrier                    // 用于确认 index 已经被应用到状态机,[LogFuture] 会被状态机线程确认
	LogNoop                       // 禁止操作,用于提交一个空命令帮助 raft 确认之前的索引都已经提交,[LogFuture] 不会被状态机线程确认
	LogCluster                    // 用于存储集群配置
)

type Logger

type Logger interface {
	Infof(format string, v ...any)
	Info(v ...any)
	Errorf(format string, v ...any)
	Error(v ...any)
	Warnf(format string, v ...any)
	Warn(v ...any)
	Debugf(format string, v ...any)
	Debug(v ...any)
}

type MemoryStore

type MemoryStore struct {
	// contains filtered or unexported fields
}

func NewMemoryStore

func NewMemoryStore() *MemoryStore

func (*MemoryStore) DeleteRange

func (m *MemoryStore) DeleteRange(min, max uint64) error

func (*MemoryStore) FirstIndex

func (m *MemoryStore) FirstIndex() (uint64, error)

func (*MemoryStore) Get

func (m *MemoryStore) Get(key []byte) (val []byte, err error)

func (*MemoryStore) GetLog

func (m *MemoryStore) GetLog(index uint64) (log *LogEntry, err error)

func (*MemoryStore) GetLogRange

func (m *MemoryStore) GetLogRange(from, to uint64) (logs []*LogEntry, err error)

func (*MemoryStore) GetUint64

func (m *MemoryStore) GetUint64(key []byte) (uint64, error)

func (*MemoryStore) LastIndex

func (m *MemoryStore) LastIndex() (uint64, error)

func (*MemoryStore) Set

func (m *MemoryStore) Set(key []byte, val []byte) (err error)

func (*MemoryStore) SetLogs

func (m *MemoryStore) SetLogs(logs []*LogEntry) (err error)

func (*MemoryStore) SetUint64

func (m *MemoryStore) SetUint64(key []byte, val uint64) (err error)

type NetLayer

type NetLayer interface {
	net.Listener
	// Dial is used to create a new outgoing connection
	Dial(peer ServerAddr, timeout time.Duration) (net.Conn, error)
}

NetLayer 网络层抽象

func NewTcpLayer

func NewTcpLayer(l net.Listener, advertise net.Addr) NetLayer

type NetTransport

type NetTransport struct {
	TimeoutScale int64
	// contains filtered or unexported fields
}

func NewNetTransport

func NewNetTransport(conf *NetWorkTransportConfig) *NetTransport

func NewTcpTransport

func NewTcpTransport(bindAddr string, maxPool int, timeout time.Duration) (*NetTransport, error)

func (*NetTransport) AppendEntries

func (n *NetTransport) AppendEntries(info *ServerInfo, request *AppendEntryRequest) (*AppendEntryResponse, error)

func (*NetTransport) AppendEntryPipeline

func (n *NetTransport) AppendEntryPipeline(info *ServerInfo) (AppendEntryPipeline, error)

func (*NetTransport) Close

func (n *NetTransport) Close() error

func (*NetTransport) CloseConnections

func (n *NetTransport) CloseConnections()

func (*NetTransport) Consumer

func (n *NetTransport) Consumer() <-chan *RPC

func (*NetTransport) DecodeAddr

func (n *NetTransport) DecodeAddr(bytes []byte) ServerAddr

func (*NetTransport) EncodeAddr

func (n *NetTransport) EncodeAddr(info *ServerInfo) []byte

func (*NetTransport) FastTimeout

func (n *NetTransport) FastTimeout(info *ServerInfo, req *FastTimeoutRequest) (*FastTimeoutResponse, error)

func (*NetTransport) InstallSnapShot

func (n *NetTransport) InstallSnapShot(info *ServerInfo, request *InstallSnapshotRequest, r io.Reader) (*InstallSnapshotResponse, error)

func (*NetTransport) LocalAddr

func (n *NetTransport) LocalAddr() ServerAddr

func (*NetTransport) SetHeartbeatFastPath

func (n *NetTransport) SetHeartbeatFastPath(cb fastPath)

func (*NetTransport) Start

func (n *NetTransport) Start()

func (*NetTransport) VoteRequest

func (n *NetTransport) VoteRequest(info *ServerInfo, request *VoteRequest) (*VoteResponse, error)

type NetWorkTransportConfig

type NetWorkTransportConfig struct {
	ServerAddressProvider ServerAddrProvider

	Logger Logger

	NetLayer NetLayer

	MaxPool int

	Timeout time.Duration
}

type OpenSnapshot

type OpenSnapshot = func() (*SnapshotMeta, io.ReadCloser, error)

OpenSnapshot 用于 API 请求执行完快照后再需要的时候延迟打开快照

type PackageParser

type PackageParser interface {
	Encode(writer *bufio.Writer, rpcType rpcType, data []byte) (err error)
	Decode(reader *bufio.Reader) (rpcType, []byte, error)
}

type Processor

type Processor interface {
	Do(rpcType, interface{}, io.Reader) (interface{}, error)
	SetFastPath(cb fastPath)
}

type ProcessorProxy

type ProcessorProxy struct {
	Processor
}

ProcessorProxy 服务器接口 handler 代理,提供将序列化数据,解析成接口 struct 指针的功能

func (*ProcessorProxy) Do

func (p *ProcessorProxy) Do(rpcType rpcType, reqBytes interface{}, reader io.Reader) (respBytes interface{}, err error)

func (*ProcessorProxy) SetFastPath

func (d *ProcessorProxy) SetFastPath(cb fastPath)

type RPC

type RPC struct {
	RpcType  rpcType
	Request  any
	Response chan any
	Reader   io.Reader // 链接读接口,安装快照的时候用
}

RPC rpc 请求的封装

type RPCHeader

type RPCHeader struct {
	ID   ServerID
	Addr ServerAddr
}

type Raft

type Raft struct {
	// contains filtered or unexported fields
}

func NewRaft

func NewRaft(conf *Config,
	fsm FSM,
	rpc RpcInterface,
	logStore LogStore,
	kvStore KVStorage,
	snapshotStore SnapshotStore) (*Raft, error)

func (*Raft) AddServer

func (r *Raft) AddServer(peer ServerInfo, prevIndex uint64, timeout time.Duration) IndexFuture

func (*Raft) Apply

func (r *Raft) Apply(data []byte, timeout time.Duration) ApplyFuture

Apply 向 raft 提交日志

func (*Raft) Barrier

func (r *Raft) Barrier(readIndex uint64, timeout time.Duration) Future[uint64]

func (*Raft) BootstrapCluster

func (r *Raft) BootstrapCluster(configuration ClusterInfo) defaultFuture

func (*Raft) Conf

func (r *Raft) Conf() *Config

func (*Raft) GetConfiguration

func (r *Raft) GetConfiguration() ClusterInfo

GetConfiguration 获取集群配置

func (*Raft) GetState

func (s *Raft) GetState() State

func (*Raft) LastApplied

func (r *Raft) LastApplied() uint64

func (*Raft) LastContact

func (r *Raft) LastContact() time.Time

func (*Raft) LatestIndex

func (r *Raft) LatestIndex() uint64

func (*Raft) LeaderInfo

func (r *Raft) LeaderInfo() (ServerID, ServerAddr)

func (*Raft) LeaderTransfer

func (r *Raft) LeaderTransfer(id ServerID, address ServerAddr, timeout time.Duration) defaultFuture

func (*Raft) RaftStats

func (r *Raft) RaftStats() Future[map[string]interface{}]

func (*Raft) ReStoreSnapshot

func (r *Raft) ReStoreSnapshot(meta *SnapshotMeta, reader io.ReadCloser) error

func (*Raft) ReadIndex

func (r *Raft) ReadIndex(timeout time.Duration) Future[uint64]

func (*Raft) ReloadConfig

func (r *Raft) ReloadConfig(rc ReloadableConfig) error

func (*Raft) RemoveServer

func (r *Raft) RemoveServer(peer ServerInfo, prevIndex uint64, timeout time.Duration) IndexFuture

func (*Raft) ShutDown

func (r *Raft) ShutDown() defaultFuture

func (*Raft) SnapShot

func (r *Raft) SnapShot() Future[OpenSnapshot]

func (*Raft) StateCh

func (r *Raft) StateCh() <-chan *StateChange

StateCh 状态变化的通知

func (*Raft) UpdateServer

func (r *Raft) UpdateServer(peer ServerInfo, prevIndex uint64, timeout time.Duration) IndexFuture

func (*Raft) VerifyLeader

func (r *Raft) VerifyLeader() Future[bool]

VerifyLeader 验证当前节点是否是领导人

type ReloadableConfig

type ReloadableConfig struct {
	TrailingLogs      uint64
	SnapshotInterval  time.Duration
	SnapshotThreshold uint64
	HeartbeatTimeout  time.Duration
	ElectionTimeout   time.Duration
}

type RpcConvert

type RpcConvert interface {
	Deserialization(data []byte, i interface{}) error
	Serialization(i interface{}) (bytes []byte, err error)
}

type RpcInterface

type RpcInterface interface {
	// Consumer 返回一个可消费的 Chan
	Consumer() <-chan *RPC
	// VoteRequest 发起投票请求
	VoteRequest(*ServerInfo, *VoteRequest) (*VoteResponse, error)
	// AppendEntries 追加日志
	AppendEntries(*ServerInfo, *AppendEntryRequest) (*AppendEntryResponse, error)
	// AppendEntryPipeline 以 pipe 形式追加日志
	AppendEntryPipeline(*ServerInfo) (AppendEntryPipeline, error)
	// InstallSnapShot 安装快照
	InstallSnapShot(*ServerInfo, *InstallSnapshotRequest, io.Reader) (*InstallSnapshotResponse, error)
	// SetHeartbeatFastPath 用于快速处理,不用经过主流程,不支持也没关系
	SetHeartbeatFastPath(cb fastPath)
	// FastTimeout 快速超时转换为候选人
	FastTimeout(*ServerInfo, *FastTimeoutRequest) (*FastTimeoutResponse, error)

	LocalAddr() ServerAddr
	EncodeAddr(info *ServerInfo) []byte
	DecodeAddr([]byte) ServerAddr
}

type ServerAddr

type ServerAddr string

type ServerAddrProvider

type ServerAddrProvider interface {
	GetAddr(id ServerID) (ServerAddr, error)
}

type ServerID

type ServerID string

type ServerInfo

type ServerInfo struct {
	Suffrage Suffrage
	Addr     ServerAddr
	ID       ServerID
}

ServerInfo 节点的地址信息

type ServerProcessor

type ServerProcessor struct {
	// contains filtered or unexported fields
}

ServerProcessor 服务器接口 handler ,提供具体的接口处理逻辑

func (*ServerProcessor) Do

func (d *ServerProcessor) Do(typ rpcType, req interface{}, reader io.Reader) (resp interface{}, err error)

Do ServerProcessor 不关心上层协议,所以不用处理第一个参数(rpcType)

func (*ServerProcessor) SetFastPath

func (d *ServerProcessor) SetFastPath(cb fastPath)

type SnapShotFutureResp

type SnapShotFutureResp struct {
	// contains filtered or unexported fields
}

type SnapshotMeta

type SnapshotMeta struct {
	Version            SnapshotVersion
	ID                 string
	Index              uint64
	Term               uint64
	Configuration      ClusterInfo
	ConfigurationIndex uint64
	Size               int64
}

SnapshotMeta 快照元信息

type SnapshotSink

type SnapshotSink interface {
	io.WriteCloser
	ID() string
	Cancel() error
}

SnapshotSink 快照的抽象提供写入、取消写入、返回快照 ID 的能力

type SnapshotStore

type SnapshotStore interface {
	Open(id string) (*SnapshotMeta, io.ReadCloser, error)
	List() ([]*SnapshotMeta, error)
	Create(version SnapshotVersion, index, term uint64, configuration ClusterInfo, configurationIndex uint64, rpc RpcInterface) (SnapshotSink, error)
}

SnapshotStore 快照存储的抽象,提供打开快照,创建快照,查询快照列表的能力

type SnapshotVersion

type SnapshotVersion uint64

SnapshotVersion 表示快照的版本,会在以后的快照结构变更的时候使用

const (
	SnapShotVersionDefault SnapshotVersion = iota + 1
)

type State

type State uint64
const (
	Follower State = iota + 1
	Candidate
	Leader
	ShutDown
)

func (State) String

func (s State) String() string

type StateChange

type StateChange struct {
	Before, After State
}

type Suffrage

type Suffrage int // 是否有选举权,枚举: Voter NonVoter
const (
	Voter Suffrage = iota
	NonVoter
)

func (*Suffrage) MarshalText

func (s *Suffrage) MarshalText() (text []byte, err error)

func (*Suffrage) String

func (s *Suffrage) String() string

func (*Suffrage) UnmarshalText

func (s *Suffrage) UnmarshalText(text []byte) error

type TcpLayer

type TcpLayer struct {
	// contains filtered or unexported fields
}

func (*TcpLayer) Accept

func (t *TcpLayer) Accept() (net.Conn, error)

func (*TcpLayer) Addr

func (t *TcpLayer) Addr() net.Addr

func (*TcpLayer) Close

func (t *TcpLayer) Close() error

func (*TcpLayer) Dial

func (t *TcpLayer) Dial(peer ServerAddr, timeout time.Duration) (net.Conn, error)

type VoteRequest

type VoteRequest struct {
	*RPCHeader
	Term           uint64
	CandidateID    ServerID
	LastLogIndex   uint64
	LastLogTerm    uint64
	LeaderTransfer bool
}

VoteRequest 投票

type VoteResponse

type VoteResponse struct {
	*RPCHeader
	Term    uint64
	Granted bool
}

type WithPeers

type WithPeers interface {
	Connect(addr ServerAddr, rpc RpcInterface)
	Disconnect(addr ServerAddr)
	DisconnectAll()
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL