raftstore

package
v0.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 9, 2022 License: Apache-2.0 Imports: 66 Imported by: 2

Documentation

Overview

Copyright 2020 MatrixOrigin.

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, See the License for the specific language governing permissions and limitations under the License.

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrInvalidConfigChangeRequest = errors.New("invalid config change request")
	ErrRemoveVoter                = errors.New("removing voter")
	ErrRemoveLeader               = errors.New("removing leader")
	ErrPendingConfigChange        = errors.New("pending config change")
	ErrDuplicatedRequest          = errors.New("duplicated config change request")
	ErrLearnerOnlyChange          = errors.New("learner only change")
)
View Source
var (
	ErrNotLearnerReplica = errors.New("not learner")
	ErrReplicaNotFound   = errors.New("replica not found")
	ErrReplicaDuplicated = errors.New("replica duplicated")
)
View Source
var (
	// SingleTestCluster single test raft cluster
	SingleTestCluster = WithTestClusterNodeCount(1)
	// DiskTestCluster using pebble storage test raft store
	DiskTestCluster = WithTestClusterUseDisk()
	// DisableScheduleTestCluster disable prophet schedulers  test raft store
	DisableScheduleTestCluster = WithTestClusterDisableSchedule()
	// NewTestCluster clean data before test cluster start
	NewTestCluster = WithTestClusterRecreate(true)
	// NoCleanTestCluster using exists data before test cluster start
	OldTestCluster = WithTestClusterRecreate(false)
)
View Source
var (
	ErrRemoveShardKeyRange = errors.New("failed to delete shard key range")
)
View Source
var (
	// ErrTimeout timeout error
	ErrTimeout = errors.New("exec timeout")
)
View Source
var (
	// ErrUnknownReplica indicates that the replica is unknown.
	ErrUnknownReplica = errors.New("unknown replica")
)

Functions

func IsShardUnavailableErr added in v0.3.0

func IsShardUnavailableErr(err error) bool

IsShardUnavailableErr is ShardUnavailableErr error

func NewNewShardUnavailableErr added in v0.3.0

func NewNewShardUnavailableErr(id uint64) error

NewShardUnavailableErr returns a wrapped error that the shard is unavailable

func NewResourceAdapterWithShard

func NewResourceAdapterWithShard(meta Shard) metadata.Resource

NewResourceAdapterWithShard create a prophet resource use shard

Types

type Epoch added in v0.2.0

type Epoch = metapb.ResourceEpoch

type ErrTryAgain added in v0.3.0

type ErrTryAgain struct {
	// caller should wait for this period before retry
	Wait time.Duration
}

ErrTryAgain indicates that an operation should retry later

func (*ErrTryAgain) Error added in v0.3.0

func (e *ErrTryAgain) Error() string

type FailureCallback added in v0.2.0

type FailureCallback func(requestID []byte, err error)

FailureCallback request failure callback

type LogReader added in v0.2.0

type LogReader struct {
	sync.Mutex
	// contains filtered or unexported fields
}

LogReader is the struct used to manage logs that have already been persisted into LogDB. LogReader implements the raft.Storage interface.

func NewLogReader added in v0.2.0

func NewLogReader(logger *zap.Logger, shardID uint64, replicaID uint64,
	db logdb.LogDB) *LogReader

NewLogReader creates and returns a new LogReader instance.

func (*LogReader) Append added in v0.2.0

func (lr *LogReader) Append(entries []pb.Entry) error

Append marks the specified entries as persisted and make them available from logreader.

func (*LogReader) ApplySnapshot added in v0.2.0

func (lr *LogReader) ApplySnapshot(snapshot pb.Snapshot) error

ApplySnapshot applies the specified snapshot.

func (*LogReader) Compact added in v0.2.0

func (lr *LogReader) Compact(index uint64) error

Compact compacts raft log entries up to index.

func (*LogReader) CreateSnapshot added in v0.2.0

func (lr *LogReader) CreateSnapshot(snapshot pb.Snapshot) error

CreateSnapshot keeps the metadata of the specified snapshot.

func (*LogReader) Entries added in v0.2.0

func (lr *LogReader) Entries(low uint64,
	high uint64, maxSize uint64) ([]pb.Entry, error)

Entries returns persisted entries between [low, high) with a total limit of up to maxSize bytes.

func (*LogReader) FirstIndex added in v0.2.0

func (lr *LogReader) FirstIndex() (uint64, error)

func (*LogReader) GetSnapshotRequested added in v0.2.0

func (lr *LogReader) GetSnapshotRequested() bool

GetSnapshotRequested returns a boolean value indicating whether creating a new snapshot has been requested.

func (*LogReader) InitialState added in v0.2.0

func (lr *LogReader) InitialState() (pb.HardState, pb.ConfState, error)

InitialState returns the saved HardState and ConfState information.

func (*LogReader) LastIndex added in v0.2.0

func (lr *LogReader) LastIndex() (uint64, error)

func (*LogReader) SetConfState added in v0.2.0

func (lr *LogReader) SetConfState(cs pb.ConfState)

func (*LogReader) SetRange added in v0.2.0

func (lr *LogReader) SetRange(firstIndex uint64, length uint64)

SetRange updates the LogReader to reflect what is available in it.

func (*LogReader) SetState added in v0.2.0

func (lr *LogReader) SetState(s pb.HardState)

SetState sets the persistent state.

func (*LogReader) Snapshot added in v0.2.0

func (lr *LogReader) Snapshot() (pb.Snapshot, error)

Snapshot returns the metadata of the lastest snapshot.

func (*LogReader) Term added in v0.2.0

func (lr *LogReader) Term(index uint64) (uint64, error)

Term returns the term of the entry specified by the entry index.

type Replica added in v0.2.0

type Replica = metapb.Replica

type RetryController added in v0.2.0

type RetryController interface {
	// Retry used to control retry if retryable error encountered. returns false means stop retry.
	Retry(requestID []byte) (rpc.Request, bool)
}

RetryController retry controller

type Router

type Router interface {
	// Start the router
	Start() error
	// Stop stops the router
	Stop()
	// SelectShard returns a shard and leader store that the key is in the range [shard.Start, shard.End).
	// If returns leader address is "", means the current shard has no leader
	SelectShard(group uint64, key []byte) (Shard, string)
	// Every do with all shards
	Every(group uint64, mustLeader bool, fn func(shard Shard, store meta.Store) bool)
	// ForeachShards foreach shards
	ForeachShards(group uint64, fn func(shard Shard) bool)
	// GetShard returns the shard by shard id
	GetShard(id uint64) Shard
	// UpdateLeader update shard leader
	UpdateLeader(shardID uint64, leaderReplciaID uint64)

	// LeaderStore return leader replica store
	LeaderReplicaStore(shardID uint64) meta.Store
	// RandomReplicaStore return random replica store
	RandomReplicaStore(shardID uint64) meta.Store

	// GetShardStats returns the runtime stats info of the shard
	GetShardStats(id uint64) metapb.ResourceStats
	// GetStoreStats returns the runtime stats info of the store
	GetStoreStats(id uint64) metapb.ContainerStats
}

Router route the request to the corresponding shard

type Shard added in v0.2.0

type Shard = meta.Shard

alias are provided for two most commonly used types

type ShardUnavailableErr added in v0.3.0

type ShardUnavailableErr struct {
	// contains filtered or unexported fields
}

ShardUnavailableErr is a error that the shard is unavailable

func (ShardUnavailableErr) Error added in v0.3.0

func (err ShardUnavailableErr) Error() string

String implement error interface

type ShardsPool

type ShardsPool interface {
	// Alloc alloc a shard from shards pool, returns error if no idle shards left. The `purpose` is used to avoid
	// duplicate allocation.
	Alloc(group uint64, purpose []byte) (meta.AllocatedShard, error)
}

ShardsPool is a shards pool, it will always create shards until the number of available shards reaches the value specified by `capacity`, we called these `Idle Shards`.

The pool will create a Job in the prophet. Once a node became the prophet leader, shards pool job will start, and stop if the node became the follower, So the job can be executed on any node. It will use prophet client to create shard after the job starts.

type ShardsProxy

type ShardsProxy interface {
	Start() error
	Stop() error
	Dispatch(req rpc.Request) error
	DispatchTo(req rpc.Request, shard Shard, store string) error
	SetCallback(SuccessCallback, FailureCallback)
	SetRetryController(retryController RetryController)
	OnResponse(rpc.ResponseBatch)
	Router() Router
}

ShardsProxy Shards proxy, distribute the appropriate request to the corresponding backend, retry the request for the error

type Store

type Store interface {
	// Start the raft store
	Start()
	// Stop the raft store
	Stop()
	// GetConfig returns the config of the store
	GetConfig() *config.Config
	// Meta returns store meta
	Meta() meta.Store
	// GetRouter returns a router
	GetRouter() Router
	// GetShardsProxy get shards proxy to dispatch requests
	GetShardsProxy() ShardsProxy
	// OnRequest receive a request, and call cb while the request is completed
	OnRequest(rpc.Request) error
	// OnRequestWithCB receive a request, and call cb while the request is completed
	OnRequestWithCB(req rpc.Request, cb func(resp rpc.ResponseBatch)) error
	// DataStorage returns a DataStorage of the shard group
	DataStorageByGroup(uint64) storage.DataStorage
	// MaybeLeader returns the shard replica maybe leader
	MaybeLeader(uint64) bool
	// AllocID returns a uint64 id, panic if has a error
	MustAllocID() uint64
	// Prophet return current prophet instance
	Prophet() prophet.Prophet

	// CreateResourcePool create resource pools, the resource pool will create shards,
	// and try to maintain the number of shards in the pool not less than the `capacity`
	// parameter. This is an idempotent operation.
	CreateResourcePool(...metapb.ResourcePool) (ShardsPool, error)
	// GetResourcePool returns `ShardsPool`, nil if `CreateResourcePool` not completed
	GetResourcePool() ShardsPool
}

Store manage a set of raft group

func NewStore

func NewStore(cfg *config.Config) Store

NewStore returns a raft store

type SuccessCallback added in v0.2.0

type SuccessCallback func(resp rpc.Response)

SuccessCallback request success callback

type TestClusterOption

type TestClusterOption func(*testClusterOptions)

TestClusterOption is the option for create TestCluster

func WithAppendTestClusterAdjustConfigFunc

func WithAppendTestClusterAdjustConfigFunc(value func(node int, cfg *config.Config)) TestClusterOption

WithAppendTestClusterAdjustConfigFunc adjust config

func WithDataStorageOption

func WithDataStorageOption(dataOpts *cpebble.Options) TestClusterOption

WithDataStorageOption set options to create data storage

func WithEnableTestParallel added in v0.2.0

func WithEnableTestParallel() TestClusterOption

WithEnableTestParallel enable parallel testing

func WithTestClusterDataPath

func WithTestClusterDataPath(path string) TestClusterOption

WithTestClusterDataPath set data data storage directory

func WithTestClusterDisableSchedule

func WithTestClusterDisableSchedule() TestClusterOption

WithTestClusterDisableSchedule disable pd schedule

func WithTestClusterEnableAdvertiseAddr added in v0.2.0

func WithTestClusterEnableAdvertiseAddr() TestClusterOption

WithTestClusterEnableAdvertiseAddr set data data storage directory

func WithTestClusterLogLevel

func WithTestClusterLogLevel(level zapcore.Level) TestClusterOption

WithTestClusterLogLevel set raftstore log level

func WithTestClusterNodeCount

func WithTestClusterNodeCount(n int) TestClusterOption

WithTestClusterNodeCount set node count of test cluster

func WithTestClusterNodeStartFunc

func WithTestClusterNodeStartFunc(value func(node int, store Store)) TestClusterOption

WithTestClusterNodeStartFunc custom node start func

func WithTestClusterRecreate

func WithTestClusterRecreate(value bool) TestClusterOption

WithTestClusterRecreate if true, the test cluster will clean and recreate the data dir

func WithTestClusterStoreFactory

func WithTestClusterStoreFactory(value func(node int, cfg *config.Config) Store) TestClusterOption

WithTestClusterStoreFactory custom create raftstore factory

func WithTestClusterUseDisk

func WithTestClusterUseDisk() TestClusterOption

WithTestClusterUseDisk use disk storage for testing

func WithTestClusterUseInitProphetCluster added in v0.3.0

func WithTestClusterUseInitProphetCluster() TestClusterOption

WithTestClusterUseInitProphetCluster set using init prophet cluster config

type TestDataBuilder added in v0.2.0

type TestDataBuilder struct {
}

TestDataBuilder build test data

func NewTestDataBuilder added in v0.2.0

func NewTestDataBuilder() *TestDataBuilder

NewTestDataBuilder create and return TestDataBuilder

func (*TestDataBuilder) CreateShard added in v0.2.0

func (b *TestDataBuilder) CreateShard(id uint64, replicasFormater string) Shard

CreateShard create shard for testing. format: id: id range: [id, id+1) replicasFormat: Voter Format: pid/cid

Learner Format: pid/cid/[l|v], default v
Initial Member: pid/cid/l/[t|f], default f
Use ',' to split multi-replica.
First is current replica

type TestKVClient

type TestKVClient interface {
	// Set set key-value to the backend kv storage
	Set(key, value string, timeout time.Duration) error
	// Get returns the value of the specific key from backend kv storage
	Get(key string, timeout time.Duration) (string, error)
	// Set set key-value to the backend kv storage
	SetWithShard(key, value string, id uint64, timeout time.Duration) error
	// Get returns the value of the specific key from backend kv storage
	GetWithShard(key string, id uint64, timeout time.Duration) (string, error)
	// UpdateLabel update the shard label
	UpdateLabel(shard, group uint64, key, value string, timeout time.Duration) error
	// Close close the test client
	Close()
}

TestKVClient is a kv client that uses `TestRaftCluster` as Backend's KV storage engine

type TestRaftCluster

type TestRaftCluster interface {
	// EveryStore do fn at every store, it can be used to init some store register
	EveryStore(fn func(i int, store Store))
	// GetStore returns the node store
	GetStore(node int) Store
	// GetStoreByID returns the store
	GetStoreByID(id uint64) Store
	// Start start each node sequentially
	Start()
	// Stop stop each node sequentially
	Stop()
	// StartWithConcurrent after starting the first node, other nodes start concurrently
	StartWithConcurrent(bool)
	// Restart restart the cluster
	Restart()
	// RestartWithFunc restart the cluster, `beforeStartFunc` is called before starting
	RestartWithFunc(beforeStartFunc func())
	// StartNode start the node
	StartNode(node int)
	// StopNode stop the node
	StopNode(node int)
	// RestartNode restart the node
	RestartNode(node int)
	// StartNetworkPartition node will in network partition, must call after node started
	StartNetworkPartition(partitions [][]int)
	// StopNetworkPartition stop network partition
	StopNetworkPartition()
	// GetPRCount returns the number of replicas on the node
	GetPRCount(node int) int
	// GetShardByIndex returns the shard by `shardIndex`, `shardIndex` is the order in which
	// the shard is created on the node
	GetShardByIndex(node int, shardIndex int) Shard
	// GetShardByID returns the shard from the node by shard id
	GetShardByID(node int, shardID uint64) Shard
	// CheckShardCount check whether the number of shards on each node is correct
	CheckShardCount(countPerNode int)
	// CheckShardRange check whether the range field of the shard on each node is correct,
	// `shardIndex` is the order in which the shard is created on the node
	CheckShardRange(shardIndex int, start, end []byte)
	// WaitRemovedByShardID check whether the specific shard removed from every node until timeout
	WaitRemovedByShardID(shardID uint64, timeout time.Duration)
	// WaitRemovedByShardIDAt check whether the specific shard removed from specific node until timeout
	WaitRemovedByShardIDAt(shardID uint64, nodes []int, timeout time.Duration)
	// WaitLeadersByCount check that the number of leaders of the cluster reaches at least the specified value
	// until the timeout
	WaitLeadersByCount(count int, timeout time.Duration)
	// WaitLeadersByCountsAndShardGroupAndLabel check that the number of leaders of the cluster reaches at least the specified value
	// until the timeout
	WaitLeadersByCountsAndShardGroupAndLabel(counts []int, group uint64, key, value string, timeout time.Duration)
	// WaitShardByCount check that the number of shard of the cluster reaches at least the specified value
	// until the timeout
	WaitShardByCount(count int, timeout time.Duration)
	// WaitShardByLabel check that the shard has the specified label until the timeout
	WaitShardByLabel(id uint64, label, value string, timeout time.Duration)
	// WaitVoterReplicaByCount check that the number of voter shard of the cluster reaches at least the specified value
	// until the timeout
	WaitVoterReplicaByCountPerNode(count int, timeout time.Duration)
	// WaitVoterReplicaByCounts check that the number of voter shard of the cluster reaches at least the specified value
	// until the timeout
	WaitVoterReplicaByCounts(counts []int, timeout time.Duration)
	// WaitVoterReplicaByCountsAndShardGroup check that the number of voter shard of the cluster reaches at least the specified value
	// until the timeout
	WaitVoterReplicaByCountsAndShardGroup(counts []int, shardGroup uint64, timeout time.Duration)
	// WaitVoterReplicaByCountsAndShardGroupAndLabel check that the number of voter shard of the cluster reaches at least the specified value
	// until the timeout
	WaitVoterReplicaByCountsAndShardGroupAndLabel(counts []int, shardGroup uint64, label, value string, timeout time.Duration)
	// WaitShardByCountPerNode check that the number of shard of each node reaches at least the specified value
	// until the timeout
	WaitShardByCountPerNode(count int, timeout time.Duration)
	// WaitAllReplicasChangeToVoter check that the role of shard of each node change to voter until the timeout
	WaitAllReplicasChangeToVoter(shard uint64, timeout time.Duration)
	// WaitShardByCountOnNode check that the number of shard of the specified node reaches at least the specified value
	// until the timeout
	WaitShardByCountOnNode(node, count int, timeout time.Duration)
	// WaitShardSplitByCount check whether the count of shard split reaches a specific value until timeout
	WaitShardSplitByCount(id uint64, count int, timeout time.Duration)
	// WaitShardByCounts check whether the number of shards reaches a specific value until timeout
	WaitShardByCounts(counts []int, timeout time.Duration)
	// WaitShardStateChangedTo check whether the state of shard changes to the specific value until timeout
	WaitShardStateChangedTo(shardID uint64, to metapb.ResourceState, timeout time.Duration)
	// GetShardLeaderStore return the leader node of the shard
	GetShardLeaderStore(shardID uint64) Store
	// GetProphet returns the prophet instance
	GetProphet() prophet.Prophet
	// CreateTestKVClient create and returns a kv client
	CreateTestKVClient(node int) TestKVClient
	// CreateTestKVClientWithAdjust create and returns a kv client with adjust func to modify request
	CreateTestKVClientWithAdjust(node int, adjust func(req *rpc.Request)) TestKVClient
}

TestRaftCluster is the test cluster is used to test starting N nodes in a process, and to provide the start and stop capabilities of a single node, which is used to test `raftstore` more easily.

func NewSingleTestClusterStore

func NewSingleTestClusterStore(t *testing.T, opts ...TestClusterOption) TestRaftCluster

NewSingleTestClusterStore create test cluster with 1 node

func NewTestClusterStore

func NewTestClusterStore(t *testing.T, opts ...TestClusterOption) TestRaftCluster

NewTestClusterStore create test cluster using options

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL