Documentation ¶
Index ¶
- Variables
- func DecodeDataKey(key []byte) []byte
- func EncodeDataKey(group uint64, key []byte) []byte
- func GetBuf(attrs map[string]interface{}) *goetty.ByteBuf
- func GetMaxKey() []byte
- func GetMinKey() []byte
- func GetStoreIdentKey() []byte
- func IsFirstApplyRequest(attrs map[string]interface{}) bool
- func IsLastApplyRequest(attrs map[string]interface{}) bool
- func WriteGroupPrefix(group uint64, key []byte)
- type Cfg
- type CommandReadBatch
- type CommandWriteBatch
- type LocalCommandFunc
- type Option
- func WithApplyWorkerCount(value uint64) Option
- func WithCustomCanReadLocalFunc(value func(metapb.Shard) bool) Option
- func WithCustomInitShardCreateFunc(value func() []metapb.Shard) Option
- func WithCustomSnapshotDataFunc(createFunc, applyFunc func(string, metapb.Shard) error) Option
- func WithCustomSplitCheckFunc(value func(metapb.Shard) ([]byte, bool)) Option
- func WithCustomSplitCompletedFunc(value func(*metapb.Shard, *metapb.Shard)) Option
- func WithDataPath(value string) Option
- func WithDisableRaftLogCompactProtect(groups ...uint64) Option
- func WithDisableRefreshRoute(value bool) Option
- func WithDisableShardSplit() Option
- func WithDisableSyncRaftLog() Option
- func WithEnsureNewShardInterval(value time.Duration) Option
- func WithGroups(value uint64) Option
- func WithInitShards(value uint64) Option
- func WithLabels(locationLabel []string, labels []metapb.Label) Option
- func WithMaxAllowTransferLogLag(value uint64) Option
- func WithMaxConcurrencyWritesPerShard(maxConcurrencyWritesPerShard uint64) Option
- func WithMaxPeerDownTime(value time.Duration) Option
- func WithMaxProposalBytes(value int) Option
- func WithMaxRaftLogBytesToForceCompact(value uint64) Option
- func WithMaxRaftLogCompactProtectLag(value uint64) Option
- func WithMaxRaftLogCountToForceCompact(value uint64) Option
- func WithMemoryAsStorage() Option
- func WithProphetOptions(value ...prophet.Option) Option
- func WithRPC(value RPC) Option
- func WithRaftElectionTick(value int) Option
- func WithRaftHeartbeatTick(value int) Option
- func WithRaftLogCompactDuration(value time.Duration) Option
- func WithRaftMaxBytesPerMsg(value uint64) Option
- func WithRaftMaxInflightMsgCount(value int) Option
- func WithRaftMaxWorkers(value uint64) Option
- func WithRaftPreVote(value bool) Option
- func WithRaftThresholdCompactLog(value uint64) Option
- func WithRaftTickDuration(value time.Duration) Option
- func WithReadBatchFunc(value func(uint64) CommandReadBatch) Option
- func WithSchedulerFunc(shardAllowRebalanceFunc, shardAllowTransferLeaderFunc func(metapb.Shard) bool) Option
- func WithSendRaftBatchSize(value uint64) Option
- func WithSendRaftMsgWorkerCount(value uint64) Option
- func WithShardAddHandleFun(value func(metapb.Shard) error) Option
- func WithShardCapacityBytes(value uint64) Option
- func WithShardHeartbeatDuration(value time.Duration) Option
- func WithShardSplitCheckBytes(value uint64) Option
- func WithShardSplitCheckDuration(value time.Duration) Option
- func WithShardStateAware(value ShardStateAware) Option
- func WithSnapshotLimit(maxConcurrencySnapChunks uint64, snapChunkSize int) Option
- func WithSnapshotManager(value SnapshotManager) Option
- func WithStoreHeartbeatDuration(value time.Duration) Option
- func WithTransport(value Transport) Option
- func WithWriteBatchFunc(value func(uint64) CommandWriteBatch) Option
- type RPC
- type ReadCommandFunc
- type Router
- type ShardResource
- type ShardStateAware
- type SnapshotManager
- type Store
- type Transport
- type WriteCommandFunc
Constants ¶
This section is empty.
Variables ¶
var ( // Creating creating step Creating = 1 // Sending snapshot sending step Sending = 2 )
var (
// DataPrefixSize data prefix size
DataPrefixSize = dataPrefixKeySize + 8
)
data is in (z, z+1)
Functions ¶
func EncodeDataKey ¶
EncodeDataKey encode data key
func IsFirstApplyRequest ¶
IsFirstApplyRequest returns true if the current request is first in this apply batch
func IsLastApplyRequest ¶
IsLastApplyRequest returns true if the last request is first in this apply batch
func WriteGroupPrefix ¶
WriteGroupPrefix write group prefix
Types ¶
type Cfg ¶
type Cfg struct { // Name the node name in the cluster Name string // RaftAddr raft addr for exchange raft message RaftAddr string // RPCAddr the RPC address to serve requests RPCAddr string // MetadataStorage storage that to store local raft state, log, and sharding metadata MetadataStorage storage.MetadataStorage // DataStorages storages that to store application's data. Beehive will // select the corresponding storage according to shard id DataStorages []storage.DataStorage }
Cfg the configuration of the raftstore
type CommandReadBatch ¶
type CommandReadBatch interface { // Add add a request to this batch, returns true if it can be executed in this batch, // otherwrise false Add(uint64, *raftcmdpb.Request, map[string]interface{}) (bool, error) // Execute excute the batch, and return the responses Execute(metapb.Shard) ([]*raftcmdpb.Response, error) // Reset reset the current batch for reuse Reset() }
CommandReadBatch command read batch
type CommandWriteBatch ¶
type CommandWriteBatch interface { // Add add a request to this batch, returns true if it can be executed in this batch, // otherwrise false Add(uint64, *raftcmdpb.Request, map[string]interface{}) (bool, *raftcmdpb.Response, error) // Execute excute the batch, and return the write bytes, and diff bytes that used to // modify the size of the current shard Execute(metapb.Shard) (uint64, int64, error) // Reset reset the current batch for reuse Reset() }
CommandWriteBatch command write batch
type LocalCommandFunc ¶
LocalCommandFunc directly exec on local func
type Option ¶
type Option func(*options)
Option options
func WithApplyWorkerCount ¶
WithApplyWorkerCount goroutine number of apply raft log, for performance reasons, the raft log apply is executed asynchronously, so the system uses a fixed-size coroutine pool to apply the raft log of all the shards.
func WithCustomCanReadLocalFunc ¶
WithCustomCanReadLocalFunc set customCanReadLocalFunc
func WithCustomInitShardCreateFunc ¶
WithCustomInitShardCreateFunc set custom initShardCreateFunc
func WithCustomSnapshotDataFunc ¶
WithCustomSnapshotDataFunc set custom snapshot func
func WithCustomSplitCheckFunc ¶
WithCustomSplitCheckFunc set customSplitCheckFunc
func WithCustomSplitCompletedFunc ¶
WithCustomSplitCompletedFunc set customSplitCompletedFunc
func WithDataPath ¶
WithDataPath set the path to store the raft log, snapshots and other metadata
func WithDisableRaftLogCompactProtect ¶
WithDisableRaftLogCompactProtect set disable compact protect
func WithDisableRefreshRoute ¶
WithDisableRefreshRoute set disableRefreshRoute
func WithDisableShardSplit ¶
func WithDisableShardSplit() Option
WithDisableShardSplit disable split the shard, by default, the system checks all shards, when these shards are found to exceed the maximum storage threshold, they will perform split.
func WithDisableSyncRaftLog ¶
func WithDisableSyncRaftLog() Option
WithDisableSyncRaftLog disable sync operations every time you write raft log to disk.
func WithEnsureNewShardInterval ¶
WithEnsureNewShardInterval set ensureNewShardInterval
func WithInitShards ¶
WithInitShards how many shards will be created on bootstrap the cluster
func WithLabels ¶
WithLabels Give the current node a set of labels, and specify which label names are used to identify the location. The scheduler will create a replicas of the shard in a different location based on these location labels to achieve high availability.
func WithMaxAllowTransferLogLag ¶
WithMaxAllowTransferLogLag If the number of logs of the follower node behind the leader node exceeds this value, the transfer leader will not be accepted to this node.
func WithMaxConcurrencyWritesPerShard ¶
WithMaxConcurrencyWritesPerShard limit the write speed per shard
func WithMaxPeerDownTime ¶
WithMaxPeerDownTime In all replicas of a shard, when any message that does not receive a replica is exceeded this value, the system will remove the replica and the scheduler will choose a new store to create a new replica.
func WithMaxProposalBytes ¶
WithMaxProposalBytes the maximum bytes per proposal, if exceeded this value, application will receive `RaftEntryTooLarge` error
func WithMaxRaftLogBytesToForceCompact ¶
WithMaxRaftLogBytesToForceCompact the maximum bytes of raft logs that the leader node has been applied, if exceeded this value, the leader node will force compact log to last applied raft log index, so the follower node may receive a snapshot.
func WithMaxRaftLogCompactProtectLag ¶
WithMaxRaftLogCompactProtectLag If the leader node triggers the force compact raft log, the compact index is the last applied raft log index of leader node, to avoid sending snapshots to a smaller delayed follower in the future, set a protected value.
func WithMaxRaftLogCountToForceCompact ¶
WithMaxRaftLogCountToForceCompact the maximum number of raft logs that the leader node has been applied, if exceeded this value, the leader node will force compact log to last applied raft log index, so the follower node may receive a snapshot.
func WithMemoryAsStorage ¶
func WithMemoryAsStorage() Option
WithMemoryAsStorage use memory to store the Application's KV data, the scheduler collects the memory usage of the store node and balances the shards in the cluster, otherwise use disk.
func WithProphetOptions ¶
WithProphetOptions set prophet options
func WithRaftElectionTick ¶
WithRaftElectionTick how many ticks to perform timeout elections.
func WithRaftHeartbeatTick ¶
WithRaftHeartbeatTick how many ticks to perform raft headrtbeat.
func WithRaftLogCompactDuration ¶
WithRaftLogCompactDuration compact raft log time interval.
func WithRaftMaxBytesPerMsg ¶
WithRaftMaxBytesPerMsg the maximum bytes per raft message.
func WithRaftMaxInflightMsgCount ¶
WithRaftMaxInflightMsgCount the maximum inflight messages in raft append RPC.
func WithRaftMaxWorkers ¶
WithRaftMaxWorkers set raft workers
func WithRaftThresholdCompactLog ¶
WithRaftThresholdCompactLog The leader node periodically compact the replicates the logs that have been copied to the follower node, to prevent this operation from being too frequent, limit the number of raft logs that will be compacted.
func WithRaftTickDuration ¶
WithRaftTickDuration raft tick time interval.
func WithReadBatchFunc ¶
func WithReadBatchFunc(value func(uint64) CommandReadBatch) Option
WithReadBatchFunc the factory function to create applciation commands batch processor. By default the raftstore will process command one by one.
func WithSchedulerFunc ¶
func WithSchedulerFunc(shardAllowRebalanceFunc, shardAllowTransferLeaderFunc func(metapb.Shard) bool) Option
WithSchedulerFunc set the scheduler contorl func
func WithSendRaftBatchSize ¶
WithSendRaftBatchSize how many raft messages in a batch to send to other node
func WithSendRaftMsgWorkerCount ¶
WithSendRaftMsgWorkerCount goroutine number of send raft message, the system sends the raft message to the corresponding goroutine according to the shard ID, each goroutine is responsible for a group of stores, only one tcp connection between two stores.
func WithShardAddHandleFun ¶
WithShardAddHandleFun set shard added handle func
func WithShardCapacityBytes ¶
WithShardCapacityBytes the maximum bytes per shard
func WithShardHeartbeatDuration ¶
WithShardHeartbeatDuration reporting the shard information to the scheduler's heartbeat time.
func WithShardSplitCheckBytes ¶
WithShardSplitCheckBytes if the shard storaged size exceeded this value, the split check will started
func WithShardSplitCheckDuration ¶
WithShardSplitCheckDuration check the interval of shard split
func WithShardStateAware ¶
func WithShardStateAware(value ShardStateAware) Option
WithShardStateAware set shard state aware, the application will received shard event like: create, destory, become leader, become follower and so on.
func WithSnapshotLimit ¶
WithSnapshotLimit limit the speed and size of snapshots to transfer, to avoid taking up too much bandwidth, the snapshot is split into a number of chunks, the `maxConcurrencySnapChunks` controls the number of chunks sent concurrently, the `snapChunkSize` controls the bytes of each chunk
func WithSnapshotManager ¶
func WithSnapshotManager(value SnapshotManager) Option
WithSnapshotManager set the snapshot manager interface, this will used for create, apply, write, received and clean snapshots
func WithStoreHeartbeatDuration ¶
WithStoreHeartbeatDuration reporting the store information to the scheduler's heartbeat time.
func WithTransport ¶
WithTransport set the transport to send, receive raft message and snapshot message.
func WithWriteBatchFunc ¶
func WithWriteBatchFunc(value func(uint64) CommandWriteBatch) Option
WithWriteBatchFunc the factory function to create applciation commands batch processor. By default the raftstore will process command one by one.
type RPC ¶
type RPC interface { // Start start the RPC Start() error // Stop stop the RPC Stop() }
RPC requests RPC
type ReadCommandFunc ¶
type ReadCommandFunc func(metapb.Shard, *raftcmdpb.Request, map[string]interface{}) *raftcmdpb.Response
ReadCommandFunc the read command handler func
type Router ¶
type Router interface { // Start the router Start() error // SelectShard returns a shard and leader store that the key is in the range [shard.Start, shard.End). // If returns leader address is "", means the current shard has no leader SelectShard(group uint64, key []byte) (uint64, string) // Every do with all shards Every(uint64, bool, func(uint64, string)) // LeaderAddress return leader peer store address LeaderAddress(uint64) string // RandomPeerAddress return random peer store address RandomPeerAddress(uint64) string }
Router route the request to the corresponding shard
type ShardResource ¶
ShardResource shard resource
type ShardStateAware ¶
type ShardStateAware interface { // Created the shard was created on the current store Created(metapb.Shard) // Splited the shard was splited on the current store Splited(metapb.Shard) // Destory the shard was destoryed on the current store Destory(metapb.Shard) // BecomeLeader the shard was become leader on the current store BecomeLeader(metapb.Shard) // BecomeLeader the shard was become follower on the current store BecomeFollower(metapb.Shard) // SnapshotApplied snapshot applied SnapshotApplied(metapb.Shard) }
ShardStateAware shard state aware
type SnapshotManager ¶
type SnapshotManager interface { Register(msg *raftpb.SnapshotMessage, step int) bool Deregister(msg *raftpb.SnapshotMessage, step int) Create(msg *raftpb.SnapshotMessage) error Exists(msg *raftpb.SnapshotMessage) bool WriteTo(msg *raftpb.SnapshotMessage, conn goetty.IOSession) (uint64, error) CleanSnap(msg *raftpb.SnapshotMessage) error ReceiveSnapData(msg *raftpb.SnapshotMessage) error Apply(msg *raftpb.SnapshotMessage) error }
SnapshotManager manager snapshot
type Store ¶
type Store interface { // Start the raft store Start() // Stop the raft store Stop() // Meta returns store meta Meta() metapb.Store // NewRouter returns a new router NewRouter() Router // RegisterReadFunc register read command handler RegisterReadFunc(uint64, ReadCommandFunc) // RegisterWriteFunc register write command handler RegisterWriteFunc(uint64, WriteCommandFunc) // RegisterLocalFunc register local command handler RegisterLocalFunc(uint64, LocalCommandFunc) // RegisterLocalRequestCB register local request cb to process response RegisterLocalRequestCB(func(*raftcmdpb.RaftResponseHeader, *raftcmdpb.Response)) // RegisterRPCRequestCB register rpc request cb to process response RegisterRPCRequestCB(func(*raftcmdpb.RaftResponseHeader, *raftcmdpb.Response)) // OnRequest receive a request, and call cb while the request is completed OnRequest(*raftcmdpb.Request) error // MetadataStorage returns a MetadataStorage of the shard group MetadataStorage() storage.MetadataStorage // DataStorage returns a DataStorage of the shard group DataStorageByGroup(uint64) storage.DataStorage // MaybeLeader returns the shard replica maybe leader MaybeLeader(uint64) bool // AddShards add shards meta on the current store, and than prophet will // schedule this shard replicas to other nodes. AddShards(...metapb.Shard) error // AllocID returns a uint64 id, panic if has a error MustAllocID() uint64 // Prophet return current prophet instance Prophet() prophet.Prophet // CreateRPCCliendSideCodec returns the rpc codec at client side CreateRPCCliendSideCodec() (goetty.Decoder, goetty.Encoder) }
Store manage a set of raft group
type Transport ¶
type Transport interface { Start() Stop() Send(*raftpb.RaftMessage, *etcdraftpb.Message) SendingSnapshotCount() uint64 }
Transport raft transport
Source Files ¶
- attrs.go
- batch.go
- cfg.go
- cmd.go
- codec.go
- codec_rpc.go
- errors.go
- format.go
- keys.go
- metric.go
- options.go
- peer_apply.go
- peer_apply_exec.go
- peer_event_loop.go
- peer_event_post_apply.go
- peer_event_proposal.go
- peer_event_raft_ready.go
- peer_job.go
- peer_replica.go
- peer_storage.go
- pool.go
- prophet_adapter.go
- router.go
- rpc.go
- snap.go
- store.go
- store_handler.go
- store_shard_aware.go
- transport.go
- util.go