raft

package module
v0.0.0-...-b25a44a Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 4, 2020 License: Apache-2.0 Imports: 18 Imported by: 60

README

Raft

A multi-raft implementation built on top of the CoreOS etcd raft library.

Installation

Download and install to GOPATH:

go get -u github.com/tiglabs/raft

Features

The CoreOS etcd/raft implementation has been modified to add the following features.

  • multi-raft support
  • snapshot manager
  • merged and compressed heartbeat message
  • check down replica
  • single raft's panic is allowed, detectable
  • new wal implementation
  • export more run status
  • implementation batch commit

License

Licensed under the Apache License, Version 2.0. For detail see LICENSE and NOTICE.

Documentation

Index

Constants

View Source
const (

	// KB killobytes
	KB = 1 << (10 * iota)
	// MB megabytes
	MB
)
View Source
const NoLeader uint64 = 0

NoLeader is a placeholder nodeID used when there is no leader.

Variables

View Source
var (
	ErrCompacted     = errors.New("requested index is unavailable due to compaction.")
	ErrRaftExists    = errors.New("raft already exists.")
	ErrRaftNotExists = errors.New("raft not exists.")
	ErrNotLeader     = errors.New("raft is not the leader.")
	ErrStopped       = errors.New("raft is already shutdown.")
	ErrSnapping      = errors.New("raft is doing snapshot.")
	ErrRetryLater    = errors.New("retry later")
)

Functions

This section is empty.

Types

type AppPanicError

type AppPanicError string

AppPanicError is panic error when repl occurred fatal error. The server will recover this panic and stop the shard repl.

func (*AppPanicError) Error

func (pe *AppPanicError) Error() string

type Config

type Config struct {
	TransportConfig
	// NodeID is the identity of the local node. NodeID cannot be 0.
	// This parameter is required.
	NodeID uint64
	// TickInterval is the interval of timer which check heartbeat and election timeout.
	// The default value is 2s.
	TickInterval time.Duration
	// HeartbeatTick is the heartbeat interval. A leader sends heartbeat
	// message to maintain the leadership every heartbeat interval.
	// The default value is 2s.
	HeartbeatTick int
	// ElectionTick is the election timeout. If a follower does not receive any message
	// from the leader of current term during ElectionTick, it will become candidate and start an election.
	// ElectionTick must be greater than HeartbeatTick.
	// We suggest to use ElectionTick = 10 * HeartbeatTick to avoid unnecessary leader switching.
	// The default value is 10s.
	ElectionTick int
	// MaxSizePerMsg limits the max size of each append message.
	// The default value is 1M.
	MaxSizePerMsg uint64
	// MaxInflightMsgs limits the max number of in-flight append messages during optimistic replication phase.
	// The application transportation layer usually has its own sending buffer over TCP/UDP.
	// Setting MaxInflightMsgs to avoid overflowing that sending buffer.
	// The default value is 128.
	MaxInflightMsgs int
	// ReqBufferSize limits the max number of recive request chan buffer.
	// The default value is 1024.
	ReqBufferSize int
	// AppBufferSize limits the max number of apply chan buffer.
	// The default value is 2048.
	AppBufferSize int
	// RetainLogs controls how many logs we leave after truncate.
	// This is used so that we can quickly replay logs on a follower instead of being forced to send an entire snapshot.
	// The default value is 20000.
	RetainLogs uint64
	// LeaseCheck whether to use the lease mechanism.
	// The default value is false.
	LeaseCheck bool
	// ReadOnlyOption specifies how the read only request is processed.
	//
	// ReadOnlySafe guarantees the linearizability of the read only request by
	// communicating with the quorum. It is the default and suggested option.
	//
	// ReadOnlyLeaseBased ensures linearizability of the read only request by
	// relying on the leader lease. It can be affected by clock drift.
	// If the clock drift is unbounded, leader might keep the lease longer than it
	// should (clock can move backward/pause without any bound). ReadIndex is not safe
	// in that case.
	// LeaseCheck MUST be enabled if ReadOnlyOption is ReadOnlyLeaseBased.
	ReadOnlyOption ReadOnlyOption
	// contains filtered or unexported fields
}

Config contains the parameters to start a raft server. Default: Do not use lease mechanism. NOTE: NodeID and Resolver must be required.Other parameter has default value.

func DefaultConfig

func DefaultConfig() *Config

DefaultConfig returns a Config with usable defaults.

type DownReplica

type DownReplica struct {
	NodeID      uint64
	DownSeconds int
}

DownReplica down replica

type FatalError

type FatalError struct {
	ID  uint64
	Err error
}

type Future

type Future struct {
	// contains filtered or unexported fields
}

Future the future

func (*Future) AsyncResponse

func (f *Future) AsyncResponse() (respCh <-chan interface{}, errCh <-chan error)

AsyncResponse export channels

func (*Future) Response

func (f *Future) Response() (resp interface{}, err error)

Response wait response

type MultiTransport

type MultiTransport struct {
	// contains filtered or unexported fields
}

func (*MultiTransport) Send

func (t *MultiTransport) Send(m *proto.Message)

func (*MultiTransport) SendSnapshot

func (t *MultiTransport) SendSnapshot(m *proto.Message, rs *snapshotStatus)

func (*MultiTransport) Stop

func (t *MultiTransport) Stop()

type RaftConfig

type RaftConfig struct {
	ID           uint64
	Term         uint64
	Leader       uint64
	Applied      uint64
	Peers        []proto.Peer
	Storage      storage.Storage
	StateMachine StateMachine
}

RaftConfig contains the parameters to create a raft.

type RaftServer

type RaftServer struct {
	// contains filtered or unexported fields
}

func NewRaftServer

func NewRaftServer(config *Config) (*RaftServer, error)

func (*RaftServer) AppliedIndex

func (rs *RaftServer) AppliedIndex(id uint64) uint64

func (*RaftServer) ChangeMember

func (rs *RaftServer) ChangeMember(id uint64, changeType proto.ConfChangeType, peer proto.Peer, context []byte) (future *Future)

func (*RaftServer) CommittedIndex

func (rs *RaftServer) CommittedIndex(id uint64) uint64

func (*RaftServer) CreateRaft

func (rs *RaftServer) CreateRaft(raftConfig *RaftConfig) error

func (*RaftServer) FirstCommittedIndex

func (rs *RaftServer) FirstCommittedIndex(id uint64) uint64

func (*RaftServer) GetDownReplicas

func (rs *RaftServer) GetDownReplicas(id uint64) (downReplicas []DownReplica)

GetDownReplicas 获取down的副本

func (*RaftServer) GetEntries

func (rs *RaftServer) GetEntries(id uint64, startIndex uint64, maxSize uint64) (future *Future)

GetEntries get raft log entries

func (*RaftServer) GetPendingReplica

func (rs *RaftServer) GetPendingReplica(id uint64) (peers []uint64)

GetPendingReplica get snapshot pending followers

func (*RaftServer) GetUnreachable

func (rs *RaftServer) GetUnreachable(id uint64) (nodes []uint64)

func (*RaftServer) IsLeader

func (rs *RaftServer) IsLeader(id uint64) bool

func (*RaftServer) LeaderTerm

func (rs *RaftServer) LeaderTerm(id uint64) (leader, term uint64)

func (*RaftServer) ReadIndex

func (rs *RaftServer) ReadIndex(id uint64) (future *Future)

ReadIndex read index

func (*RaftServer) RemoveRaft

func (rs *RaftServer) RemoveRaft(id uint64) error

func (*RaftServer) Status

func (rs *RaftServer) Status(id uint64) (status *Status)

func (*RaftServer) Stop

func (rs *RaftServer) Stop()

func (*RaftServer) Submit

func (rs *RaftServer) Submit(id uint64, cmd []byte) (future *Future)

func (*RaftServer) Truncate

func (rs *RaftServer) Truncate(id uint64, index uint64)

func (*RaftServer) TryToLeader

func (rs *RaftServer) TryToLeader(id uint64) (future *Future)

type ReadOnlyOption

type ReadOnlyOption int

ReadOnlyOption read only option

const (
	// ReadOnlySafe guarantees the linearizability of the read only request by
	// communicating with the quorum. It is the default and suggested option.
	ReadOnlySafe ReadOnlyOption = iota
	// ReadOnlyLeaseBased ensures linearizability of the read only request by
	// relying on the leader lease. It can be affected by clock drift.
	// If the clock drift is unbounded, leader might keep the lease longer than it
	// should (clock can move backward/pause without any bound). ReadIndex is not safe
	// in that case.
	ReadOnlyLeaseBased
)

type ReplicaStatus

type ReplicaStatus struct {
	Match       uint64 // 复制进度
	Commit      uint64 // commmit位置
	Next        uint64
	State       string
	Snapshoting bool
	Paused      bool
	Active      bool
	LastActive  time.Time
	Inflight    int
}

ReplicaStatus replica status

type SocketResolver

type SocketResolver interface {
	NodeAddress(nodeID uint64, stype SocketType) (addr string, err error)
}

The SocketResolver interface is supplied by the application to resolve NodeID to net.Addr addresses.

type SocketType

type SocketType byte
const (
	HeartBeat SocketType = 0
	Replicate SocketType = 1
)

func (SocketType) String

func (t SocketType) String() string

type StateMachine

type StateMachine interface {
	Apply(command []byte, index uint64) (interface{}, error)
	ApplyMemberChange(confChange *proto.ConfChange, index uint64) (interface{}, error)
	Snapshot() (proto.Snapshot, error)
	ApplySnapshot(peers []proto.Peer, iter proto.SnapIterator) error
	HandleFatalEvent(err *FatalError)
	HandleLeaderChange(leader uint64)
}

The StateMachine interface is supplied by the application to persist/snapshot data of application.

type Status

type Status struct {
	ID                uint64
	NodeID            uint64
	Leader            uint64
	Term              uint64
	Index             uint64
	Commit            uint64
	Applied           uint64
	Vote              uint64
	PendQueue         int
	RecvQueue         int
	AppQueue          int
	Stopped           bool
	RestoringSnapshot bool
	State             string // leader、follower、candidate
	Replicas          map[uint64]*ReplicaStatus
}

Status raft status

func (*Status) String

func (s *Status) String() string

type Transport

type Transport interface {
	Send(m *proto.Message)
	SendSnapshot(m *proto.Message, rs *snapshotStatus)
	Stop()
}

Transport raft server transport

func NewMultiTransport

func NewMultiTransport(raft *RaftServer, config *TransportConfig) (Transport, error)

type TransportConfig

type TransportConfig struct {
	// HeartbeatAddr is the Heartbeat port.
	// The default value is 3016.
	HeartbeatAddr string
	// ReplicateAddr is the Replation port.
	// The default value is 2015.
	ReplicateAddr string
	// 发送队列大小
	SendBufferSize int
	//复制并发数(node->node)
	MaxReplConcurrency int
	// MaxSnapConcurrency limits the max number of snapshot concurrency.
	// The default value is 10.
	MaxSnapConcurrency int
	// This parameter is required.
	Resolver SocketResolver
}

TransportConfig raft server transport config

Directories

Path Synopsis
wal
kvs
log

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL