raftv2

package
v1.11.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 4, 2019 License: MIT Imports: 42 Imported by: 3

Documentation

Index

Constants

View Source
const (
	MembersNameInit    = "init"
	MembersNameApplied = "applied"
	MembersNameRemoved = "removed"
)
View Source
const (
	RaftServerStateRestart = iota
	RaftServerStateNewCluster
	RaftServerStateJoinCluster
)
View Source
const (
	DefaultCommitQueueLen = 10
)
View Source
const (
	DefaultTickMS = time.Millisecond * 30
)
View Source
const (
	HasNoLeader uint64 = 0
)

Variables

View Source
var (

	// blockIntervalMs is the block genration interval in milli-seconds.
	RaftTick           = DefaultTickMS
	RaftSkipEmptyBlock = false
	MaxCommitQueueLen  = DefaultCommitQueueLen

	BlockIntervalMs time.Duration
	BlockTimeoutMs  time.Duration
)
View Source
var (
	ErrClusterNotReady = errors.New("cluster is not ready")
	ErrNotRaftLeader   = errors.New("this node is not leader")
)
View Source
var (
	MaxConfChangeTimeOut = time.Second * 10

	ErrClusterHasNoMember   = errors.New("cluster has no member")
	ErrNotExistRaftMember   = errors.New("not exist member of raft cluster")
	ErrNoEnableSyncPeer     = errors.New("no peer to sync chain")
	ErrNotExistMembers      = errors.New("not exist members of cluster")
	ErrMemberAlreadyApplied = errors.New("member is already added")

	ErrInvalidMembershipReqType = errors.New("invalid type of membership change request")
	ErrPendingConfChange        = errors.New("pending membership change request is in progree. try again when it is finished")
	ErrConChangeTimeOut         = errors.New("timeouted membership change request")
	ErrConfChangeChannelBusy    = errors.New("channel of conf change propose is busy")
	ErrCCMemberIsNil            = errors.New("memeber is nil")
	ErrNotMatchedRaftName       = errors.New("mismatched name of raft identity")
)
View Source
var (
	ErrNotIncludedRaftMember = errors.New("this node isn't included in initial raft members")
	ErrRaftEmptyTLSFile      = errors.New("cert or key file name is empty")
	ErrNotHttpsURL           = errors.New("url scheme is not https")
	ErrDupBP                 = errors.New("raft bp description is duplicated")
	ErrInvalidRaftPeerID     = errors.New("peerID of current raft bp is not equals to p2p configure")
)
View Source
var (
	MaxTimeOutCluter = time.Second * 10
	MaxTryGetCluster = 3

	ErrGetClusterReplyC  = errors.New("reply channel of getcluster request is closed")
	ErrGetClusterTimeout = errors.New("timeout for getcluster")
	ErrGetClusterEmpty   = errors.New("getcluster reply is empty")
	ErrGetClusterFail    = errors.New("failed to get cluster info")
)
View Source
var (
	ConfSnapFrequency           uint64 = 10
	ConfSnapshotCatchUpEntriesN uint64 = ConfSnapFrequency
)

noinspection ALL

View Source
var (
	ErrNoSnapshot          = errors.New("no snapshot")
	ErrCCAlreadyApplied    = errors.New("conf change entry is already applied")
	ErrInvalidMember       = errors.New("member of conf change is invalid")
	ErrCCAlreadyAdded      = errors.New("member has already added")
	ErrCCAlreadyRemoved    = errors.New("member has already removed")
	ErrCCNoMemberToRemove  = errors.New("there is no member to remove")
	ErrEmptySnapshot       = errors.New("received empty snapshot")
	ErrInvalidRaftIdentity = errors.New("raft identity is not set")
)
View Source
var (
	DfltTimeWaitPeerLive        = time.Second * 5
	ErrNotMsgSnap               = errors.New("not pb.MsgSnap")
	ErrClusterMismatchConfState = errors.New("members of cluster doesn't match with raft confstate")
)
View Source
var (
	ErrInvalidEntry       = errors.New("Invalid raftpb.entry")
	ErrWalEntryTooLowTerm = errors.New("term of wal entry is too low")
)
View Source
var (
	ErrWalGetHardState = errors.New("failed to read hard state")
	ErrWalGetLastIdx   = errors.New("failed to read last Idx")
)
View Source
var (
	ErrInvCCType = errors.New("change type of ")
)
View Source
var ErrInvalidWalEntry = errors.New("invalid wal entry")
View Source
var (
	ErrUnmarshal = errors.New("failed to unmarshalEntryData log entry")
)
View Source
var ErrWalConvBlock = errors.New("failed to convert bytes of block from wal entry")

Functions

func GetConstructor

GetConstructor build and returns consensus.Constructor from New function.

func GetName

func GetName() string

GetName returns the name of the consensus.

func Init

func Init(raftCfg *config.RaftConfig)

func MaxUint64

func MaxUint64(x, y uint64) uint64

func MemberIDToString

func MemberIDToString(id uint64) string

func RecoverExit

func RecoverExit()

Types

type BlockFactory

type BlockFactory struct {
	*component.ComponentHub
	consensus.ChainWAL

	ID string
	// contains filtered or unexported fields
}

BlockFactory implments a raft block factory which generate block each cfg.Consensus.BlockInterval if this node is leader of raft

This can be used for testing purpose.

func New

New returns a BlockFactory.

func (*BlockFactory) BlockFactory

func (bf *BlockFactory) BlockFactory() consensus.BlockFactory

BlockFactory returns r itself.

func (*BlockFactory) ClusterInfo

func (bf *BlockFactory) ClusterInfo() ([]*types.MemberAttr, []byte, error)

func (*BlockFactory) ConfChange

func (bf *BlockFactory) ConfChange(req *types.MembershipChange) (*consensus.Member, error)

ConfChange change membership of raft cluster and returns new membership

func (*BlockFactory) ConsensusInfo

func (bf *BlockFactory) ConsensusInfo() *types.ConsensusInfo

func (*BlockFactory) GetType

func (bf *BlockFactory) GetType() consensus.ConsensusType

func (*BlockFactory) HasWAL

func (bf *BlockFactory) HasWAL() bool

func (*BlockFactory) Info

func (bf *BlockFactory) Info() string

Info retuns an empty string.

func (*BlockFactory) InitCluster

func (bf *BlockFactory) InitCluster(cfg *config.Config) error

func (*BlockFactory) IsBlockValid

func (bf *BlockFactory) IsBlockValid(block *types.Block, bestBlock *types.Block) error

IsBlockValid checks the consensus level validity of a block.

func (*BlockFactory) IsTransactionValid

func (bf *BlockFactory) IsTransactionValid(tx *types.Tx) bool

IsTransactionValid checks the onsensus level validity of a transaction

func (*BlockFactory) JobQueue

func (bf *BlockFactory) JobQueue() chan<- interface{}

// waitUntilStartable wait until this chain synchronizes with more than half of all peers

func (bf *BlockFactory) waitSyncWithMajority() error {
	ticker := time.NewTicker(peerCheckInterval)

	for {
		select {
		case <-ticker.C:
			if synced, err := bf.bpc.hasSynced(); err != nil {
				logger.Error().Err(err).Msg("failed to check sync with a majority of peers")
				return err
			} else if synced {
				return nil
			}

		case <-bf.QuitChan():
			logger.Info().Msg("quit while wait sync")
			return ErrBFQuit
		default:
		}
	}
}

JobQueue returns the queue for block production triggering.

func (*BlockFactory) NeedNotify

func (bf *BlockFactory) NeedNotify() bool

func (*BlockFactory) NeedReorganization

func (bf *BlockFactory) NeedReorganization(rootNo types.BlockNo) bool

NeedReorganization has nothing to do.

func (*BlockFactory) QueueJob

func (bf *BlockFactory) QueueJob(now time.Time, jq chan<- interface{})

QueueJob send a block triggering information to jq.

func (*BlockFactory) QuitChan

func (bf *BlockFactory) QuitChan() chan interface{}

QuitChan returns the channel from which consensus-related goroutines check when shutdown is initiated.

func (*BlockFactory) Save

func (bf *BlockFactory) Save(tx consensus.TxWriter) error

Save has nothging to do.

func (*BlockFactory) Start

func (bf *BlockFactory) Start()

Start run a raft block factory service.

func (*BlockFactory) Ticker

func (bf *BlockFactory) Ticker() *time.Ticker

Ticker returns a time.Ticker for the main consensus loop.

func (*BlockFactory) Update

func (bf *BlockFactory) Update(block *types.Block)

Update has nothging to do.

func (*BlockFactory) VerifySign

func (bf *BlockFactory) VerifySign(block *types.Block) error

VerifySign checks the consensus level validity of a block.

func (*BlockFactory) VerifyTimestamp

func (bf *BlockFactory) VerifyTimestamp(*types.Block) bool

VerifyTimestamp checks the validity of the block timestamp.

type ChainSnapshotter

type ChainSnapshotter struct {
	sync.Mutex

	*component.ComponentHub
	// contains filtered or unexported fields
}

func (*ChainSnapshotter) SaveFromRemote

func (chainsnap *ChainSnapshotter) SaveFromRemote(r io.Reader, id uint64, msg raftpb.Message) (int64, error)

chainSnapshotter rece ives snapshot from http request TODO replace rafthttp with p2p

type Cluster

type Cluster struct {
	component.ICompSyncRequester
	sync.Mutex

	Size uint32
	// contains filtered or unexported fields
}

raft cluster membership copy from dpos/bp TODO refactoring Cluster represents a cluster of block producers.

func GetClusterInfo

func GetClusterInfo(hs *component.ComponentHub) (*Cluster, error)

GetBestBlock returns the current best block from chainservice

func NewCluster

func NewCluster(chainID []byte, bf *BlockFactory, raftName string, chainTimestamp int64) *Cluster

func NewClusterFromMemberAttrs

func NewClusterFromMemberAttrs(chainID []byte, memberAttrs []*types.MemberAttr) (*Cluster, error)

func (*Cluster) AddInitialMembers

func (cl *Cluster) AddInitialMembers(raftCfg *config.RaftConfig, useTls bool) error

func (*Cluster) AppliedMembers

func (cl *Cluster) AppliedMembers() *Members

func (*Cluster) ChangeMembership

func (cl *Cluster) ChangeMembership(req *types.MembershipChange) (*consensus.Member, error)

func (*Cluster) IsIDRemoved

func (cl *Cluster) IsIDRemoved(id uint64) bool

IsIDRemoved return true if given raft id is not exist in cluster

func (*Cluster) Members

func (cl *Cluster) Members() *Members

func (*Cluster) NewMemberFromAddReq

func (cl *Cluster) NewMemberFromAddReq(req *types.MembershipChange) (*consensus.Member, error)

func (*Cluster) NewMemberFromRemoveReq

func (cl *Cluster) NewMemberFromRemoveReq(req *types.MembershipChange) (*consensus.Member, error)

func (*Cluster) NodeID

func (cl *Cluster) NodeID() uint64

func (*Cluster) NodeName

func (cl *Cluster) NodeName() string

func (*Cluster) Quorum

func (cl *Cluster) Quorum() uint32

func (*Cluster) Recover

func (cl *Cluster) Recover(snapshot *raftpb.Snapshot) error

func (*Cluster) RecoverIdentity

func (cl *Cluster) RecoverIdentity(id *consensus.RaftIdentity) error

RecoverIdentity reset node id and name of cluster. raft identity is saved in WAL and reset when server is restarted

func (*Cluster) RemovedMembers

func (cl *Cluster) RemovedMembers() *Members

func (*Cluster) ResetMembers

func (cl *Cluster) ResetMembers()

func (*Cluster) SetNodeID

func (cl *Cluster) SetNodeID(nodeid uint64)

func (*Cluster) SetThisNodeID

func (cl *Cluster) SetThisNodeID() error

func (*Cluster) ValidateAndMergeExistingCluster

func (cl *Cluster) ValidateAndMergeExistingCluster(existingCl *Cluster) bool

ValidateAndMergeExistingCluster tests if members of existing cluster are matched with this cluster

type CommitProgress

type CommitProgress struct {
	sync.Mutex
	// contains filtered or unexported fields
}

func (*CommitProgress) GetConnect

func (cp *CommitProgress) GetConnect() *commitEntry

func (*CommitProgress) GetRequest

func (cp *CommitProgress) GetRequest() *commitEntry

func (*CommitProgress) IsReadyToPropose

func (cp *CommitProgress) IsReadyToPropose() bool

func (*CommitProgress) UpdateConnect

func (cp *CommitProgress) UpdateConnect(ce *commitEntry)

func (*CommitProgress) UpdateRequest

func (cp *CommitProgress) UpdateRequest(ce *commitEntry)

type ErrorMembershipChange

type ErrorMembershipChange struct {
	Err error
}

func (ErrorMembershipChange) Error

func (e ErrorMembershipChange) Error() string

type LeaderStatus

type LeaderStatus struct {
	// contains filtered or unexported fields
}

type Members

type Members struct {
	MapByID   map[uint64]*consensus.Member // restore from DB or snapshot
	MapByName map[string]*consensus.Member

	Index map[types.PeerID]uint64 // peer ID to raft ID mapping

	BPUrls []string //for raft server TODO remove
	// contains filtered or unexported fields
}

func (*Members) ToArray

func (mbrs *Members) ToArray() []*consensus.Member

type Proposed

type Proposed struct {
	// contains filtered or unexported fields
}

type RaftInfo

type RaftInfo struct {
	Leader string
	Total  string
	Name   string
	RaftId string
	Status *json.RawMessage
}

type RaftLogger

type RaftLogger struct {
	// contains filtered or unexported fields
}

Logger is a logging unit. It controls the flow of messages to a given (swappable) backend.

func NewRaftLogger

func NewRaftLogger(logger *log.Logger) *RaftLogger

func (*RaftLogger) Debug

func (l *RaftLogger) Debug(args ...interface{})

func (*RaftLogger) Debugf

func (l *RaftLogger) Debugf(format string, args ...interface{})

func (*RaftLogger) Error

func (l *RaftLogger) Error(args ...interface{})

func (*RaftLogger) Errorf

func (l *RaftLogger) Errorf(format string, args ...interface{})

func (RaftLogger) Fatal

func (l RaftLogger) Fatal(args ...interface{})

func (*RaftLogger) Fatalf

func (l *RaftLogger) Fatalf(format string, args ...interface{})

func (*RaftLogger) Info

func (l *RaftLogger) Info(args ...interface{})

func (*RaftLogger) Infof

func (l *RaftLogger) Infof(format string, args ...interface{})

func (*RaftLogger) Panic

func (l *RaftLogger) Panic(args ...interface{})

func (*RaftLogger) Panicf

func (l *RaftLogger) Panicf(format string, args ...interface{})

func (*RaftLogger) Warning

func (l *RaftLogger) Warning(args ...interface{})

func (*RaftLogger) Warningf

func (l *RaftLogger) Warningf(format string, args ...interface{})

type RaftOperator

type RaftOperator struct {
	// contains filtered or unexported fields
}

type RaftServerState

type RaftServerState int

type WalDB

type WalDB struct {
	consensus.ChainWAL
}

func NewWalDB

func NewWalDB(chainWal consensus.ChainWAL) *WalDB

func (*WalDB) ReadAll

func (wal *WalDB) ReadAll(snapshot *raftpb.Snapshot) (id *consensus.RaftIdentity, state *raftpb.HardState, ents []raftpb.Entry, err error)

ReadAll returns hard state, all uncommitted entries - read last hard state - read all uncommited entries after snapshot index

func (*WalDB) SaveEntry

func (wal *WalDB) SaveEntry(state raftpb.HardState, entries []raftpb.Entry) error

type Work

type Work struct {
	*types.Block
}

func (*Work) GetTimeout

func (work *Work) GetTimeout() time.Duration

func (*Work) ToString

func (work *Work) ToString() string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL