raftstore

package
v0.0.0-...-01d9b8a Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 15, 2021 License: Apache-2.0 Imports: 36 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// Creating creating step
	Creating = 1
	// Sending snapshot sending step
	Sending = 2
)
View Source
var (

	// DataPrefixSize data prefix size
	DataPrefixSize = dataPrefixKeySize + 8
)

data is in (z, z+1)

Functions

func DecodeDataKey

func DecodeDataKey(key []byte) []byte

DecodeDataKey decode data key

func EncodeDataKey

func EncodeDataKey(group uint64, key []byte) []byte

EncodeDataKey encode data key

func GetBuf

func GetBuf(attrs map[string]interface{}) *goetty.ByteBuf

GetBuf returns byte buffer from attr

func GetMaxKey

func GetMaxKey() []byte

GetMaxKey return max key

func GetMinKey

func GetMinKey() []byte

GetMinKey return min key

func GetStoreIdentKey

func GetStoreIdentKey() []byte

GetStoreIdentKey return key of StoreIdent

func IsFirstApplyRequest

func IsFirstApplyRequest(attrs map[string]interface{}) bool

IsFirstApplyRequest returns true if the current request is first in this apply batch

func IsLastApplyRequest

func IsLastApplyRequest(attrs map[string]interface{}) bool

IsLastApplyRequest returns true if the last request is first in this apply batch

func WriteGroupPrefix

func WriteGroupPrefix(group uint64, key []byte)

WriteGroupPrefix write group prefix

Types

type Cfg

type Cfg struct {
	// Name the node name in the cluster
	Name string
	// RaftAddr raft addr for exchange raft message
	RaftAddr string
	// RPCAddr the RPC address to serve requests
	RPCAddr string
	// MetadataStorage storage that to store local raft state, log, and sharding metadata
	MetadataStorage storage.MetadataStorage
	// DataStorages storages that to store application's data. Beehive will
	// select the corresponding storage according to shard id
	DataStorages []storage.DataStorage
}

Cfg the configuration of the raftstore

type CommandReadBatch

type CommandReadBatch interface {
	// Add add a request to this batch, returns true if it can be executed in this batch,
	// otherwrise false
	Add(uint64, *raftcmdpb.Request, map[string]interface{}) (bool, error)
	// Execute excute the batch, and return the responses
	Execute(metapb.Shard) ([]*raftcmdpb.Response, error)
	// Reset reset the current batch for reuse
	Reset()
}

CommandReadBatch command read batch

type CommandWriteBatch

type CommandWriteBatch interface {
	// Add add a request to this batch, returns true if it can be executed in this batch,
	// otherwrise false
	Add(uint64, *raftcmdpb.Request, map[string]interface{}) (bool, *raftcmdpb.Response, error)
	// Execute excute the batch, and return the write bytes, and diff bytes that used to
	// modify the size of the current shard
	Execute(metapb.Shard) (uint64, int64, error)
	// Reset reset the current batch for reuse
	Reset()
}

CommandWriteBatch command write batch

type LocalCommandFunc

type LocalCommandFunc func(metapb.Shard, *raftcmdpb.Request) (*raftcmdpb.Response, error)

LocalCommandFunc directly exec on local func

type Option

type Option func(*options)

Option options

func WithApplyWorkerCount

func WithApplyWorkerCount(value uint64) Option

WithApplyWorkerCount goroutine number of apply raft log, for performance reasons, the raft log apply is executed asynchronously, so the system uses a fixed-size coroutine pool to apply the raft log of all the shards.

func WithCustomCanReadLocalFunc

func WithCustomCanReadLocalFunc(value func(metapb.Shard) bool) Option

WithCustomCanReadLocalFunc set customCanReadLocalFunc

func WithCustomInitShardCreateFunc

func WithCustomInitShardCreateFunc(value func() []metapb.Shard) Option

WithCustomInitShardCreateFunc set custom initShardCreateFunc

func WithCustomSnapshotDataFunc

func WithCustomSnapshotDataFunc(createFunc, applyFunc func(string, metapb.Shard) error) Option

WithCustomSnapshotDataFunc set custom snapshot func

func WithCustomSplitCheckFunc

func WithCustomSplitCheckFunc(value func(metapb.Shard) ([]byte, bool)) Option

WithCustomSplitCheckFunc set customSplitCheckFunc

func WithCustomSplitCompletedFunc

func WithCustomSplitCompletedFunc(value func(*metapb.Shard, *metapb.Shard)) Option

WithCustomSplitCompletedFunc set customSplitCompletedFunc

func WithDataPath

func WithDataPath(value string) Option

WithDataPath set the path to store the raft log, snapshots and other metadata

func WithDisableRaftLogCompactProtect

func WithDisableRaftLogCompactProtect(groups ...uint64) Option

WithDisableRaftLogCompactProtect set disable compact protect

func WithDisableRefreshRoute

func WithDisableRefreshRoute(value bool) Option

WithDisableRefreshRoute set disableRefreshRoute

func WithDisableShardSplit

func WithDisableShardSplit() Option

WithDisableShardSplit disable split the shard, by default, the system checks all shards, when these shards are found to exceed the maximum storage threshold, they will perform split.

func WithDisableSyncRaftLog

func WithDisableSyncRaftLog() Option

WithDisableSyncRaftLog disable sync operations every time you write raft log to disk.

func WithEnsureNewShardInterval

func WithEnsureNewShardInterval(value time.Duration) Option

WithEnsureNewShardInterval set ensureNewShardInterval

func WithGroups

func WithGroups(value uint64) Option

WithGroups set group count

func WithInitShards

func WithInitShards(value uint64) Option

WithInitShards how many shards will be created on bootstrap the cluster

func WithLabels

func WithLabels(locationLabel []string, labels []metapb.Label) Option

WithLabels Give the current node a set of labels, and specify which label names are used to identify the location. The scheduler will create a replicas of the shard in a different location based on these location labels to achieve high availability.

func WithMaxAllowTransferLogLag

func WithMaxAllowTransferLogLag(value uint64) Option

WithMaxAllowTransferLogLag If the number of logs of the follower node behind the leader node exceeds this value, the transfer leader will not be accepted to this node.

func WithMaxConcurrencyWritesPerShard

func WithMaxConcurrencyWritesPerShard(maxConcurrencyWritesPerShard uint64) Option

WithMaxConcurrencyWritesPerShard limit the write speed per shard

func WithMaxPeerDownTime

func WithMaxPeerDownTime(value time.Duration) Option

WithMaxPeerDownTime In all replicas of a shard, when any message that does not receive a replica is exceeded this value, the system will remove the replica and the scheduler will choose a new store to create a new replica.

func WithMaxProposalBytes

func WithMaxProposalBytes(value int) Option

WithMaxProposalBytes the maximum bytes per proposal, if exceeded this value, application will receive `RaftEntryTooLarge` error

func WithMaxRaftLogBytesToForceCompact

func WithMaxRaftLogBytesToForceCompact(value uint64) Option

WithMaxRaftLogBytesToForceCompact the maximum bytes of raft logs that the leader node has been applied, if exceeded this value, the leader node will force compact log to last applied raft log index, so the follower node may receive a snapshot.

func WithMaxRaftLogCompactProtectLag

func WithMaxRaftLogCompactProtectLag(value uint64) Option

WithMaxRaftLogCompactProtectLag If the leader node triggers the force compact raft log, the compact index is the last applied raft log index of leader node, to avoid sending snapshots to a smaller delayed follower in the future, set a protected value.

func WithMaxRaftLogCountToForceCompact

func WithMaxRaftLogCountToForceCompact(value uint64) Option

WithMaxRaftLogCountToForceCompact the maximum number of raft logs that the leader node has been applied, if exceeded this value, the leader node will force compact log to last applied raft log index, so the follower node may receive a snapshot.

func WithMemoryAsStorage

func WithMemoryAsStorage() Option

WithMemoryAsStorage use memory to store the Application's KV data, the scheduler collects the memory usage of the store node and balances the shards in the cluster, otherwise use disk.

func WithProphetOptions

func WithProphetOptions(value ...prophet.Option) Option

WithProphetOptions set prophet options

func WithRPC

func WithRPC(value RPC) Option

WithRPC set the rpc implemention to serve request and sent response

func WithRaftElectionTick

func WithRaftElectionTick(value int) Option

WithRaftElectionTick how many ticks to perform timeout elections.

func WithRaftHeartbeatTick

func WithRaftHeartbeatTick(value int) Option

WithRaftHeartbeatTick how many ticks to perform raft headrtbeat.

func WithRaftLogCompactDuration

func WithRaftLogCompactDuration(value time.Duration) Option

WithRaftLogCompactDuration compact raft log time interval.

func WithRaftMaxBytesPerMsg

func WithRaftMaxBytesPerMsg(value uint64) Option

WithRaftMaxBytesPerMsg the maximum bytes per raft message.

func WithRaftMaxInflightMsgCount

func WithRaftMaxInflightMsgCount(value int) Option

WithRaftMaxInflightMsgCount the maximum inflight messages in raft append RPC.

func WithRaftMaxWorkers

func WithRaftMaxWorkers(value uint64) Option

WithRaftMaxWorkers set raft workers

func WithRaftPreVote

func WithRaftPreVote(value bool) Option

WithRaftPreVote enable raft pre-vote

func WithRaftThresholdCompactLog

func WithRaftThresholdCompactLog(value uint64) Option

WithRaftThresholdCompactLog The leader node periodically compact the replicates the logs that have been copied to the follower node, to prevent this operation from being too frequent, limit the number of raft logs that will be compacted.

func WithRaftTickDuration

func WithRaftTickDuration(value time.Duration) Option

WithRaftTickDuration raft tick time interval.

func WithReadBatchFunc

func WithReadBatchFunc(value func(uint64) CommandReadBatch) Option

WithReadBatchFunc the factory function to create applciation commands batch processor. By default the raftstore will process command one by one.

func WithSchedulerFunc

func WithSchedulerFunc(shardAllowRebalanceFunc, shardAllowTransferLeaderFunc func(metapb.Shard) bool) Option

WithSchedulerFunc set the scheduler contorl func

func WithSendRaftBatchSize

func WithSendRaftBatchSize(value uint64) Option

WithSendRaftBatchSize how many raft messages in a batch to send to other node

func WithSendRaftMsgWorkerCount

func WithSendRaftMsgWorkerCount(value uint64) Option

WithSendRaftMsgWorkerCount goroutine number of send raft message, the system sends the raft message to the corresponding goroutine according to the shard ID, each goroutine is responsible for a group of stores, only one tcp connection between two stores.

func WithShardAddHandleFun

func WithShardAddHandleFun(value func(metapb.Shard) error) Option

WithShardAddHandleFun set shard added handle func

func WithShardCapacityBytes

func WithShardCapacityBytes(value uint64) Option

WithShardCapacityBytes the maximum bytes per shard

func WithShardHeartbeatDuration

func WithShardHeartbeatDuration(value time.Duration) Option

WithShardHeartbeatDuration reporting the shard information to the scheduler's heartbeat time.

func WithShardSplitCheckBytes

func WithShardSplitCheckBytes(value uint64) Option

WithShardSplitCheckBytes if the shard storaged size exceeded this value, the split check will started

func WithShardSplitCheckDuration

func WithShardSplitCheckDuration(value time.Duration) Option

WithShardSplitCheckDuration check the interval of shard split

func WithShardStateAware

func WithShardStateAware(value ShardStateAware) Option

WithShardStateAware set shard state aware, the application will received shard event like: create, destory, become leader, become follower and so on.

func WithSnapshotLimit

func WithSnapshotLimit(maxConcurrencySnapChunks uint64, snapChunkSize int) Option

WithSnapshotLimit limit the speed and size of snapshots to transfer, to avoid taking up too much bandwidth, the snapshot is split into a number of chunks, the `maxConcurrencySnapChunks` controls the number of chunks sent concurrently, the `snapChunkSize` controls the bytes of each chunk

func WithSnapshotManager

func WithSnapshotManager(value SnapshotManager) Option

WithSnapshotManager set the snapshot manager interface, this will used for create, apply, write, received and clean snapshots

func WithStoreHeartbeatDuration

func WithStoreHeartbeatDuration(value time.Duration) Option

WithStoreHeartbeatDuration reporting the store information to the scheduler's heartbeat time.

func WithTransport

func WithTransport(value Transport) Option

WithTransport set the transport to send, receive raft message and snapshot message.

func WithWriteBatchFunc

func WithWriteBatchFunc(value func(uint64) CommandWriteBatch) Option

WithWriteBatchFunc the factory function to create applciation commands batch processor. By default the raftstore will process command one by one.

type RPC

type RPC interface {
	// Start start the RPC
	Start() error
	// Stop stop the RPC
	Stop()
}

RPC requests RPC

type ReadCommandFunc

type ReadCommandFunc func(metapb.Shard, *raftcmdpb.Request, map[string]interface{}) *raftcmdpb.Response

ReadCommandFunc the read command handler func

type Router

type Router interface {
	// Start the router
	Start() error
	// SelectShard returns a shard and leader store that the key is in the range [shard.Start, shard.End).
	// If returns leader address is "", means the current shard has no leader
	SelectShard(group uint64, key []byte) (uint64, string)
	// Every do with all shards
	Every(uint64, bool, func(uint64, string))

	// LeaderAddress return leader peer store address
	LeaderAddress(uint64) string
	// RandomPeerAddress return random peer store address
	RandomPeerAddress(uint64) string
}

Router route the request to the corresponding shard

type ShardResource

type ShardResource interface {
	Meta() metapb.Shard
}

ShardResource shard resource

type ShardStateAware

type ShardStateAware interface {
	// Created the shard was created on the current store
	Created(metapb.Shard)
	// Splited the shard was splited on the current store
	Splited(metapb.Shard)
	// Destory the shard was destoryed on the current store
	Destory(metapb.Shard)
	// BecomeLeader the shard was become leader on the current store
	BecomeLeader(metapb.Shard)
	// BecomeLeader the shard was become follower on the current store
	BecomeFollower(metapb.Shard)
	// SnapshotApplied snapshot applied
	SnapshotApplied(metapb.Shard)
}

ShardStateAware shard state aware

type SnapshotManager

type SnapshotManager interface {
	Register(msg *raftpb.SnapshotMessage, step int) bool
	Deregister(msg *raftpb.SnapshotMessage, step int)
	Create(msg *raftpb.SnapshotMessage) error
	Exists(msg *raftpb.SnapshotMessage) bool
	WriteTo(msg *raftpb.SnapshotMessage, conn goetty.IOSession) (uint64, error)
	CleanSnap(msg *raftpb.SnapshotMessage) error
	ReceiveSnapData(msg *raftpb.SnapshotMessage) error
	Apply(msg *raftpb.SnapshotMessage) error
}

SnapshotManager manager snapshot

type Store

type Store interface {
	// Start the raft store
	Start()
	// Stop the raft store
	Stop()
	// Meta returns store meta
	Meta() metapb.Store
	// NewRouter returns a new router
	NewRouter() Router
	// RegisterReadFunc register read command handler
	RegisterReadFunc(uint64, ReadCommandFunc)
	// RegisterWriteFunc register write command handler
	RegisterWriteFunc(uint64, WriteCommandFunc)
	// RegisterLocalFunc register local command handler
	RegisterLocalFunc(uint64, LocalCommandFunc)
	// RegisterLocalRequestCB register local request cb to process response
	RegisterLocalRequestCB(func(*raftcmdpb.RaftResponseHeader, *raftcmdpb.Response))
	// RegisterRPCRequestCB register rpc request cb to process response
	RegisterRPCRequestCB(func(*raftcmdpb.RaftResponseHeader, *raftcmdpb.Response))
	// OnRequest receive a request, and call cb while the request is completed
	OnRequest(*raftcmdpb.Request) error
	// MetadataStorage returns a MetadataStorage of the shard group
	MetadataStorage() storage.MetadataStorage
	// DataStorage returns a DataStorage of the shard group
	DataStorageByGroup(uint64) storage.DataStorage
	// MaybeLeader returns the shard replica maybe leader
	MaybeLeader(uint64) bool
	// AddShards add shards meta on the current store, and than prophet will
	// schedule this shard replicas to other nodes.
	AddShards(...metapb.Shard) error
	// AllocID returns a uint64 id, panic if has a error
	MustAllocID() uint64
	// Prophet return current prophet instance
	Prophet() prophet.Prophet
	// CreateRPCCliendSideCodec returns the rpc codec at client side
	CreateRPCCliendSideCodec() (goetty.Decoder, goetty.Encoder)
}

Store manage a set of raft group

func NewStore

func NewStore(cfg Cfg, opts ...Option) Store

NewStore returns a raft store

type Transport

type Transport interface {
	Start()
	Stop()
	Send(*raftpb.RaftMessage, *etcdraftpb.Message)
	SendingSnapshotCount() uint64
}

Transport raft transport

type WriteCommandFunc

type WriteCommandFunc func(metapb.Shard, *raftcmdpb.Request, map[string]interface{}) (uint64, int64, *raftcmdpb.Response)

WriteCommandFunc the write command handler func, returns write bytes and the diff bytes that used to modify the size of the current shard

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL