kv

package
v0.3.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 21, 2024 License: Apache-2.0 Imports: 23 Imported by: 0

Documentation

Index

Constants

View Source
const (
	RollupContext = "RollupContext"
)

Variables

View Source
var (
	Options atomic.Value
)

Functions

func InitStoreManager

func InitStoreManager(storeMgr StoreManager)

InitStoreManager initializes StoreManager.

func RegisterMerger

func RegisterMerger(name MergerType, merger NewMerger)

RegisterMerger registers family merger NOTICE: must register before create family

Types

type CompactJob

type CompactJob interface {
	// Run runs compact logic
	Run() error
}

CompactJob represents the compact job which does merge sst files

type Family

type Family interface {
	// ID return family's id.
	ID() version.FamilyID
	// Name return family's name.
	Name() string
	// NewFlusher creates flusher for saving data to family.
	NewFlusher() Flusher
	// GetSnapshot returns current version's snapshot.
	GetSnapshot() version.Snapshot
	// Compact compacts all files of level0.
	Compact()
	// contains filtered or unexported methods
}

Family implements column family for data isolation each family.

type FamilyOption

type FamilyOption struct {
	ID               int    `toml:"id"`
	Name             string `toml:"name"`
	CompactThreshold int    `toml:"compactThreshold"` // level 0 compact threshold
	RollupThreshold  int    `toml:"rollupThreshold"`  // level 0 rollup threshold
	Merger           string `toml:"merger"`           // merger which need implement Merger interface
	MaxFileSize      uint32 `toml:"maxFileSize"`      // max file size
}

FamilyOption defines config items for family level

type Flusher

type Flusher interface {
	// StreamWriter creates a stream writer for flushing in stream.
	StreamWriter() (table.StreamWriter, error)
	// Add puts k/v pair
	Add(key uint32, value []byte) error
	// Sequence sets write sequence number.
	Sequence(leader int32, seq int64)
	// Commit flushes data and commits metadata.
	Commit() error
	// Release releases the resource of flusher.
	// NOTICE: MUST invoke Release() after new fluster instance.
	Release()
}

Flusher flushes data into kv store, for big data will be split into many sst files.

type JobScheduler

type JobScheduler interface {
	// Startup starts the job scheduler.
	Startup()
	// Shutdown stops the job scheduler.
	Shutdown()
	// IsRunning returns the scheduler if running.
	IsRunning() bool
}

JobScheduler represents a background compaction job scheduler.

func NewJobScheduler

func NewJobScheduler(ctx context.Context, option StoreOptions) JobScheduler

NewJobScheduler creates a JobScheduler instance.

type Merger

type Merger interface {
	// Init initializes merger params or context, before does merge operation
	Init(params map[string]interface{})
	// Merge merges values for same key,
	// merged data will be written into Flusher directly
	// return err if failure
	Merge(key uint32, values [][]byte) error
}

Merger represents merger values of same key when do compaction job(compact/rollup etc.)

type MergerType

type MergerType string

MergerType represents the merger type

type NewMerger

type NewMerger func(flusher Flusher) (Merger, error)

NewMerger represents create merger instance function

type NopFlusher

type NopFlusher struct {
	// contains filtered or unexported fields
}

NopFlusher implements Flusher, but does nothing.

func NewNopFlusher

func NewNopFlusher() *NopFlusher

NewNopFlusher returns a new no-operation-flusher

func (*NopFlusher) Add

func (nf *NopFlusher) Add(_ uint32, value []byte) error

Add puts value to the buffer.

func (*NopFlusher) Bytes

func (nf *NopFlusher) Bytes() []byte

Bytes returns last-flushed-value

func (*NopFlusher) Commit

func (nf *NopFlusher) Commit() error

Commit always return nil

func (*NopFlusher) Release

func (nf *NopFlusher) Release()

func (*NopFlusher) Sequence

func (nf *NopFlusher) Sequence(_ int32, _ int64)

func (*NopFlusher) StreamWriter

func (nf *NopFlusher) StreamWriter() (table.StreamWriter, error)

type Rollup

type Rollup interface {
	// GetTimestamp returns the timestamp based on source family and source slot
	GetTimestamp(slot uint16) int64
	// IntervalRatio return interval ratio = target interval/source interval
	IntervalRatio() uint16
	// CalcSlot calculates the target slot based on source timestamp
	CalcSlot(timestamp int64) uint16
	// BaseSlot returns base slot by source family time/target interval.
	BaseSlot() uint16
}

Rollup represents rollup relation(source store/family => target store/family)

type Store

type Store interface {
	// Name returns the store's name.
	Name() string
	// Path returns the store root path.
	Path() string
	// CreateFamily create/load column family.
	CreateFamily(familyName string, option FamilyOption) (Family, error)
	// GetFamily gets family based on name, return nil if not exist.
	GetFamily(familyName string) Family
	// ListFamilyNames returns the all family's name
	ListFamilyNames() []string
	// Option returns the store configuration options
	Option() StoreOption
	// ForceRollup does rollup job manual.
	ForceRollup()
	// contains filtered or unexported methods
}

Store is kv store, supporting column family, but is different from other LSM implementation. Current implementation doesn't contain memory table write logic.

type StoreManager

type StoreManager interface {
	// CreateStore creates the store by name/option.
	// NOTE: name need include relatively path based on root path.
	CreateStore(name string, option StoreOption) (Store, error)
	// GetStoreByName returns Store by given name.
	GetStoreByName(name string) (Store, bool)
	// GetStores returns all Store under manager cache.
	GetStores() []Store
	// CloseStore closes the Store, then remove from manager cache.
	CloseStore(name string) error
}

StoreManager represents a global store manager.

func GetStoreManager

func GetStoreManager() StoreManager

GetStoreManager returns the kv store manager singleton instance.

type StoreOption

type StoreOption struct {
	Levels int            `toml:"levels"` // num. of levels
	TTL    ltoml.Duration `toml:"ttl"`

	Source timeutil.Interval   `toml:"source"` // optional(source interval)
	Rollup []timeutil.Interval `toml:"rollup"` // optional(target interval)
}

StoreOption defines config item for store level

func DefaultStoreOption

func DefaultStoreOption() StoreOption

DefaultStoreOption builds default store option

type StoreOptions

type StoreOptions struct {
	Dir                  string // store root path
	CompactCheckInterval int    // compact/rollup job check interval(number of seconds)
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL