raftchunking

package module
v0.7.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 9, 2023 License: MPL-2.0 Imports: 10 Imported by: 8

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// ChunkSize is the threshold used for breaking a large value into chunks.
	// Defaults to the suggested max data size for the raft library.
	ChunkSize = raft.SuggestedMaxDataSize
)

Functions

func ChunkingApply

func ChunkingApply(cmd, extensions []byte, timeout time.Duration, applyFunc ApplyFunc) raft.ApplyFuture

ChunkingApply takes in a byte slice and chunks into ChunkSize (or less if EOF) chunks, calling Apply on each. It requires a corresponding wrapper around the FSM to handle reconstructing on the other end. Timeout will be the timeout for each individual operation, not total. The return value is a future whose Error() will return only when all underlying Apply futures have had Error() return. Note that any error indicates that the entire operation will not be applied, assuming the correct FSM wrapper is used. If extensions is passed in, it will be set as the Extensions value on the Apply once all chunks are received.

Types

type ApplyFunc

type ApplyFunc func(raft.Log, time.Duration) raft.ApplyFuture

type ChunkInfo added in v0.6.0

type ChunkInfo struct {
	OpNum       uint64
	SequenceNum uint32
	NumChunks   uint32
	Term        uint64
	Data        []byte
}

ChunkInfo holds chunk information

type ChunkMap added in v0.6.0

type ChunkMap map[uint64][]*ChunkInfo

ChunkMap represents a set of data chunks. We use ChunkInfo with Data instead of bare []byte in case there is a need to extend this info later.

type ChunkStorage added in v0.6.0

type ChunkStorage interface {
	// StoreChunk stores Data from ChunkInfo according to the other metadata
	// (OpNum, SeqNum). The bool returns whether or not all chunks have been
	// received, as in, the number of non-nil chunks is the same as NumChunks.
	StoreChunk(*ChunkInfo) (bool, error)

	// FinalizeOp gets all chunks for an op number and then removes the chunk
	// info for that op from the store. It should only be called when
	// StoreChunk for a given op number returns true but should be safe to call
	// at any time; clearing an op can be accomplished by calling this function
	// and ignoring the non-error result.
	FinalizeOp(uint64) ([]*ChunkInfo, error)

	// GetState gets all currently tracked ops, for snapshotting
	GetChunks() (ChunkMap, error)

	// RestoreChunks restores the current FSM state from a map
	RestoreChunks(ChunkMap) error
}

type ChunkingBatchingFSM added in v0.7.0

type ChunkingBatchingFSM struct {
	*ChunkingFSM
	// contains filtered or unexported fields
}

func NewChunkingBatchingFSM added in v0.7.0

func NewChunkingBatchingFSM(underlying raft.BatchingFSM, store ChunkStorage) *ChunkingBatchingFSM

func (*ChunkingBatchingFSM) ApplyBatch added in v0.7.0

func (c *ChunkingBatchingFSM) ApplyBatch(logs []*raft.Log) []interface{}

ApplyBatch applies the logs, handling chunking as needed. The return value will be an array containing an error or whatever is returned from the underlying Apply for each log.

type ChunkingConfigurationStore added in v0.5.1

type ChunkingConfigurationStore struct {
	*ChunkingFSM
	// contains filtered or unexported fields
}

func NewChunkingConfigurationStore added in v0.5.1

func NewChunkingConfigurationStore(underlying raft.ConfigurationStore, store ChunkStorage) *ChunkingConfigurationStore

func (*ChunkingConfigurationStore) StoreConfiguration added in v0.5.1

func (c *ChunkingConfigurationStore) StoreConfiguration(index uint64, configuration raft.Configuration)

type ChunkingFSM

type ChunkingFSM struct {
	// contains filtered or unexported fields
}

ChunkingFSM is an FSM that implements chunking; it's the sister of ChunkingApply.

N.B.: If a term change happens the final apply from the client will have a nil result and not be passed through to the underlying FSM. To detect this, the final apply to the underlying FSM is wrapped in ChunkingSuccess.

func NewChunkingFSM

func NewChunkingFSM(underlying raft.FSM, store ChunkStorage) *ChunkingFSM

func (*ChunkingFSM) Apply

func (c *ChunkingFSM) Apply(l *raft.Log) interface{}

Apply applies the log, handling chunking as needed. The return value will either be an error or whatever is returned from the underlying Apply.

func (*ChunkingFSM) CurrentState added in v0.6.0

func (c *ChunkingFSM) CurrentState() (*State, error)

func (*ChunkingFSM) Restore

func (c *ChunkingFSM) Restore(rc io.ReadCloser) error

func (*ChunkingFSM) RestoreState added in v0.6.0

func (c *ChunkingFSM) RestoreState(state *State) error

func (*ChunkingFSM) Snapshot

func (c *ChunkingFSM) Snapshot() (raft.FSMSnapshot, error)

func (*ChunkingFSM) Underlying

func (c *ChunkingFSM) Underlying() raft.FSM

Note: this is used in tests via the Raft package test helper functions, even if it's not used in client code

type ChunkingSuccess added in v0.6.0

type ChunkingSuccess struct {
	Response interface{}
}

type InmemChunkStorage added in v0.6.0

type InmemChunkStorage struct {
	// contains filtered or unexported fields
}

InmemChunkStorage satisfies ChunkStorage using an in-memory-only tracking method.

func NewInmemChunkStorage added in v0.6.0

func NewInmemChunkStorage() *InmemChunkStorage

func (*InmemChunkStorage) FinalizeOp added in v0.6.0

func (i *InmemChunkStorage) FinalizeOp(opNum uint64) ([]*ChunkInfo, error)

func (*InmemChunkStorage) GetChunks added in v0.6.0

func (i *InmemChunkStorage) GetChunks() (ChunkMap, error)

func (*InmemChunkStorage) RestoreChunks added in v0.6.0

func (i *InmemChunkStorage) RestoreChunks(chunks ChunkMap) error

func (*InmemChunkStorage) StoreChunk added in v0.6.0

func (i *InmemChunkStorage) StoreChunk(chunk *ChunkInfo) (bool, error)

type State added in v0.6.0

type State struct {
	ChunkMap ChunkMap
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL