replication

package
v0.2.10 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 28, 2023 License: Apache-2.0 Imports: 29 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// DefaultSnapshotCatchUpEntries is the default number of entries
	// to preserve in memory when a snapshot is taken. This is for
	// slow followers to catch up.
	DefaultSnapshotCatchUpEntries = uint64(4)
)

Variables

View Source
var MaxSnapshotFiles = 4

MaxSnapshotFiles defines max number of etcd/raft snapshot files to retain on filesystem. Snapshot files are read from newest to oldest, until first intact file is found. The more snapshot files we keep around, the more we mitigate the impact of a corrupted snapshots. This is exported for testing purpose. This MUST be greater equal than 1.

Functions

func ClassifyClusterReConfig

func ClassifyClusterReConfig(currentConfig, updatedConfig *types.ClusterConfig) (nodes, consensus, ca, admins, ledger bool)

ClassifyClusterReConfig detects the kind of changes that happened in the ClusterConfig. We assume that both the current and updated config are internally consistent (valid), but not necessarily with respect to each other.

func ListSnapshots

func ListSnapshots(logger *logger.SugarLogger, snapDir string) []uint64

ListSnapshots returns a list of RaftIndex of snapshots stored on disk. If a file is corrupted, rename the file.

func VerifyConsensusReConfig

func VerifyConsensusReConfig(currentConfig, updatedConfig *types.ConsensusConfig, lg *logger.SugarLogger) error

VerifyConsensusReConfig checks the configuration changes in types.ConsensusConfig.

This method checks that the changes between one ConsensusConfig to the next are safe, because some mutations might cause a permanent loss of quorum in the cluster, something that is very difficult to recover from. - Members can be added or removed (membership change) one member at a time - Members' endpoints cannot be changed together with a membership change - Members' endpoints can be updated one at a time - An existing member cannot change its Raft ID (it must be removed from the cluster and added again as a new member) - The Raft ID of a new member must be unique - therefore it must be larger than MaxRaftId

We assume that both the current and updated ClusterConfig are internally consistent, specifically, that the Nodes and the ConsensusConfig.Members arrays match by NodeId in each.

Types

type BlockLedgerReader

type BlockLedgerReader interface {
	Height() (uint64, error)
	Get(blockNumber uint64) (*types.Block, error)
}

type BlockReplicator

type BlockReplicator struct {
	// contains filtered or unexported fields
}

func NewBlockReplicator

func NewBlockReplicator(conf *Config) (*BlockReplicator, error)

NewBlockReplicator creates a new BlockReplicator.

func (*BlockReplicator) Close

func (br *BlockReplicator) Close() (err error)

Close signals the internal go-routine to stop and waits for it to exit. If the component is already closed, and error is returned.

func (*BlockReplicator) GetClusterStatus

func (br *BlockReplicator) GetClusterStatus() (leaderID uint64, activePeers map[string]*types.PeerConfig)

func (*BlockReplicator) GetLeaderID

func (br *BlockReplicator) GetLeaderID() uint64

func (*BlockReplicator) IsIDRemoved

func (br *BlockReplicator) IsIDRemoved(id uint64) bool

func (*BlockReplicator) IsLeader

func (br *BlockReplicator) IsLeader() *ierrors.NotLeaderError

func (*BlockReplicator) Process

func (br *BlockReplicator) Process(ctx context.Context, m raftpb.Message) error

Process incoming raft messages

func (*BlockReplicator) RaftID

func (br *BlockReplicator) RaftID() uint64

func (*BlockReplicator) ReportSnapshot

func (br *BlockReplicator) ReportSnapshot(id uint64, status raft.SnapshotStatus)

func (*BlockReplicator) ReportUnreachable

func (br *BlockReplicator) ReportUnreachable(id uint64)

func (*BlockReplicator) Start

func (br *BlockReplicator) Start()

Start internal go-routines to serve the main replication loops.

If the `joinExistingCluster` flag is true, the on-boarding process starts first, in its own go-routine. When on-boarding is complete, replication will start.

func (*BlockReplicator) Submit

func (br *BlockReplicator) Submit(block *types.Block) error

Submit a block for replication.

This call may block if the replication input queue is full. Returns an error if the current node is not a leader. Returns an error if the component is already closed.

type Config

type Config struct {
	LocalConf            *config.LocalConfiguration
	ClusterConfig        *types.ClusterConfig
	JoinBlock            *types.Block
	LedgerReader         BlockLedgerReader
	Transport            *comm.HTTPTransport
	BlockOneQueueBarrier *queue.OneQueueBarrier
	PendingTxs           PendingTxsReleaser
	ConfigValidator      ConfigTxValidator
	Logger               *logger.SugarLogger
}

Config holds the configuration information required to initialize the block replicator.

type ConfigTxValidator added in v0.2.2

type ConfigTxValidator interface {
	Validate(txEnv *types.ConfigTxEnvelope) (*types.ValidationInfo, error)
}

type PendingTxsReleaser

type PendingTxsReleaser interface {
	ReleaseWithError(txIDs []string, err error)
}

type RaftHTTPError added in v0.2.2

type RaftHTTPError struct {
	Message string
	Code    int
}

RaftHTTPError is used when replication needs to send an error response to a raft request

func (*RaftHTTPError) Error added in v0.2.2

func (e *RaftHTTPError) Error() string

func (*RaftHTTPError) WriteTo added in v0.2.2

func (e *RaftHTTPError) WriteTo(w http.ResponseWriter)

type RaftStorage

type RaftStorage struct {
	SnapshotCatchUpEntries uint64

	MemoryStorage *raft.MemoryStorage
	// contains filtered or unexported fields
}

RaftStorage encapsulates storages needed for etcd/raft data, i.e. memory, wal

func CreateStorage

func CreateStorage(lg *logger.SugarLogger, walDir string, snapDir string) (*RaftStorage, error)

CreateStorage attempts to create a storage to persist etcd/raft data. If data presents in specified disk, they are loaded to reconstruct storage state.

func (*RaftStorage) ApplySnapshot

func (rs *RaftStorage) ApplySnapshot(snap raftpb.Snapshot)

ApplySnapshot applies snapshot to local memory storage

func (*RaftStorage) Close

func (rs *RaftStorage) Close() error

Close closes storage

func (*RaftStorage) Snapshot

func (rs *RaftStorage) Snapshot() raftpb.Snapshot

Snapshot returns the latest snapshot stored in memory

func (*RaftStorage) Store

func (rs *RaftStorage) Store(entries []raftpb.Entry, hardstate raftpb.HardState, snapshot raftpb.Snapshot) error

Store persists etcd/raft data

func (*RaftStorage) TakeSnapshot

func (rs *RaftStorage) TakeSnapshot(i uint64, cs raftpb.ConfState, data []byte) error

TakeSnapshot takes a snapshot at index i from MemoryStorage, and persists it to wal and disk.

Directories

Path Synopsis
Code generated by counterfeiter.
Code generated by counterfeiter.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL