store

package
v1.1.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 26, 2020 License: BSD-3-Clause Imports: 31 Imported by: 2

Documentation

Index

Constants

View Source
const (
	HTREE_SUFFIX       = "hash"
	HINT_SUFFIX        = "s"
	MERGED_HINT_SUFFIX = "m"
)
View Source
const (
	BUCKET_STAT_EMPTY = iota
	BUCKET_STAT_NOT_EMPTY
	BUCKET_STAT_READY
)
View Source
const (
	MAX_NUM_CHUNK = 998
	MAX_NUM_SPLIT = 998
)
View Source
const (
	SecsBeforeDumpDefault = int64(5)
	HintStateIdle         = 0
)
View Source
const (
	HintStateDump = 1 << iota
	HintStateMerge
	HintStateGC
)
View Source
const (
	HINTFILE_HEAD_SIZE = 16
	HINTITEM_HEAD_SIZE = 23
	HINTINDEX_ROW_SIZE = 4096
)
View Source
const (
	BUCKET_SIZE = 16
	MAX_DEPTH   = 8

	ThresholdListKeyDefault = uint32(64 * 4)
	ThresholdBigHash        = 64 * 4
)
View Source
const (
	FLAG_INCR            = 0x00000204
	FLAG_COMPRESS        = 0x00010000
	FLAG_CLIENT_COMPRESS = 0x00000010
	COMPRESS_RATIO_LIMIT = 0.7
	TRY_COMPRESS_SIZE    = 1024 * 10
	PADDING              = 256
	HEADER_SIZE          = 512
)
View Source
const LEN_USE_C_FIND = 100
View Source
const (
	MAX_KEY_LEN = 250
)
View Source
const TREE_ITEM_HEAD_SIZE = 11

Variables

View Source
var (
	DefaultHintConfig = HintConfig{
		NoMerged:             false,
		SplitCapStr:          "1M",
		IndexIntervalSizeStr: "4K",
		MergeInterval:        1,
	}

	DefaultHTreeConfig HTreeConfig = HTreeConfig{
		TreeHeight: 3,
		TreeDump:   3,
	}

	DefaultDataConfig = DataConfig{
		DataFileMaxStr: "4000M",
		CheckVHash:     false,
		FlushInterval:  0,
		FlushWakeStr:   "0",
		BufIOCapStr:    "1M",

		NoGCDays: 0,
		NotCompress: map[string]bool{
			"audio/wave": true,
			"audio/mpeg": true,
		},
	}

	DefaultDBLocalConfig = DBLocalConfig{
		Home: "./testdb",
	}
)
View Source
var (
	SecsBeforeDump = SecsBeforeDumpDefault // may change in test
)

Functions

func DataToHint

func DataToHint(path string) (err error)

func DataToHintDir

func DataToHintDir(path string, start, end int) (err error)

func DataToHintFile

func DataToHintFile(path string) (err error)

func FreeMem

func FreeMem()

func GetBucketDir

func GetBucketDir(numBucket, bucketID int) string

func GetBucketPath

func GetBucketPath(bucketID int) string

func Getvhash

func Getvhash(value []byte) uint16

func IsValidKeyString

func IsValidKeyString(key string) bool

func NeedCompress

func NeedCompress(header []byte) bool

func NewdataStore

func NewdataStore(bucketID int, home string) *dataStore

func ParsePathString

func ParsePathString(pathStr string, buf []int) ([]int, error)

func ParsePathUint64

func ParsePathUint64(khash uint64, buf []int) []int

func StartCpuProfile

func StartCpuProfile(name string) *os.File

func StopCpuProfile

func StopCpuProfile(f *os.File)

func WakeupFlush

func WakeupFlush()

func WriteHeapProfile

func WriteHeapProfile(name string)

Types

type Bucket

type Bucket struct {
	BucketInfo

	GCHistory []GCState
	// contains filtered or unexported fields
}

func (*Bucket) GetRecordByKeyHash

func (bkt *Bucket) GetRecordByKeyHash(ki *KeyInfo) (rec *Record, inbuffer bool, err error)

type BucketInfo

type BucketInfo struct {
	BucketStat

	// tmp
	Pos             Position
	LastGC          *GCState
	HintState       int
	MaxDumpedHintID HintID
	DU              int64
	NumSameVhash    int64
	SizeSameVhash   int64
	SizeVhashKey    string
	NumSet          int64
	NumGet          int64
}

type BucketStat

type BucketStat struct {
	// pre open init
	State int

	// init in open
	ID   int
	Home string

	TreeID      HintID
	NextGCChunk int
}

type CollisionTable

type CollisionTable struct {
	sync.Mutex `yaml:"-"`
	HintID
	Items map[uint64]map[string]HintItem
}

type DBLocalConfig

type DBLocalConfig struct {
	Home string `yaml:",omitempty"`
}

type DU

type DU struct {
	Disks      map[string]utils.DiskStatus
	BucketsHex map[string]int64
	Buckets    map[int]int64 `json:"-"`
	Errs       []string
}

func NewDU

func NewDU() (du *DU)

type DataConfig

type DataConfig struct {
	FlushWake     int64 `yaml:"-"`                        // after set to flush buffer, wake up flush go routine if buffer size > this
	DataFileMax   int64 `yaml:"-"`                        // data rotate when reach the size
	CheckVHash    bool  `yaml:"check_vhash,omitempty"`    // not really set if vhash is the same
	FlushInterval int   `yaml:"flush_interval,omitempty"` // the flush go routine run at this interval
	NoGCDays      int   `yaml:"no_gc_days,omitempty"`     // not data files whose mtime in recent NoGCDays days

	FlushWakeStr   string          `yaml:"flush_wake_str"` //
	DataFileMaxStr string          `yaml:"datafile_max_str,omitempty"`
	BufIOCap       int             `yaml:"-"` // for bufio reader/writer, if value is big, then enlarge this cap, defalult: 1MB
	BufIOCapStr    string          `yaml:"bufio_cap_str,omitempty"`
	NotCompress    map[string]bool `yaml:"not_compress,omitempty"` // kind do not compress
}

type DataStreamReader

type DataStreamReader struct {
	// contains filtered or unexported fields
}

func (*DataStreamReader) Close

func (stream *DataStreamReader) Close() error

func (*DataStreamReader) Next

func (stream *DataStreamReader) Next() (res *Record, offset uint32, sizeBroken uint32, err error)

func (*DataStreamReader) Offset

func (stream *DataStreamReader) Offset() uint32

type DataStreamWriter

type DataStreamWriter struct {
	// contains filtered or unexported fields
}

func GetStreamWriter

func GetStreamWriter(path string, isappend bool) (*DataStreamWriter, error)

func (*DataStreamWriter) Append

func (stream *DataStreamWriter) Append(rec *Record) (offset uint32, err error)

func (*DataStreamWriter) Close

func (stream *DataStreamWriter) Close() error

func (*DataStreamWriter) Offset

func (stream *DataStreamWriter) Offset() uint32

type GCFileState

type GCFileState struct {
	NumBefore          int64
	NumReleased        int64
	NumReleasedDeleted int64
	SizeBefore         int64
	SizeReleased       int64
	SizeDeleted        int64
	SizeBroken         int64
	NumNotInHtree      int64
}

func (*GCFileState) String

func (s *GCFileState) String() string

type GCMgr

type GCMgr struct {
	// contains filtered or unexported fields
}

func (*GCMgr) AfterBucket

func (mgr *GCMgr) AfterBucket(bkt *Bucket)

func (*GCMgr) BeforeBucket

func (mgr *GCMgr) BeforeBucket(bkt *Bucket, startChunkID, endChunkID int, merge bool)

func (*GCMgr) UpdateCollision

func (mgr *GCMgr) UpdateCollision(bkt *Bucket, ki *KeyInfo, oldPos, newPos Position, rec *Record)

func (*GCMgr) UpdateHtreePos

func (mgr *GCMgr) UpdateHtreePos(bkt *Bucket, ki *KeyInfo, oldPos, newPos Position)

type GCState

type GCState struct {
	BeginTS time.Time
	EndTS   time.Time

	// Begin and End are chunckIDs, they determine the range of GC.
	Begin int
	End   int

	// Src and Dst are chunkIDs, they are tmp variables used in gc process.
	Src int
	Dst int

	// For beansdbadmin check status.
	Running bool

	Err        error
	CancelFlag bool
	// sum
	GCFileState
}

type HStore

type HStore struct {
	// contains filtered or unexported fields
}

func NewHStore

func NewHStore() (store *HStore, err error)

func (*HStore) CancelGC added in v1.1.0

func (store *HStore) CancelGC(bucketID int) (src, dst int)

func (*HStore) ChangeRoute

func (store *HStore) ChangeRoute(newConf config.DBRouteConfig) (loaded, unloaded []int, err error)

func (*HStore) Close

func (store *HStore) Close()

func (*HStore) Flusher

func (store *HStore) Flusher()

func (*HStore) GC

func (store *HStore) GC(bucketID, beginChunkID, endChunkID, noGCDays int, merge, pretend bool) (begin, end int, err error)

func (*HStore) GCBuckets added in v1.1.0

func (store *HStore) GCBuckets() map[string][]string

func (*HStore) Get

func (store *HStore) Get(ki *KeyInfo, memOnly bool) (payload *Payload, pos Position, err error)

func (*HStore) GetBucketInfo

func (store *HStore) GetBucketInfo(bucketID int) *BucketInfo

func (*HStore) GetCollisionsByBucket

func (store *HStore) GetCollisionsByBucket(bucketID int) (content []byte)

func (*HStore) GetDU

func (store *HStore) GetDU() (du *DU)

func (*HStore) GetNumCmdByBuckets

func (store *HStore) GetNumCmdByBuckets() (counts [][]int64)

func (*HStore) GetRecordByKeyHash

func (store *HStore) GetRecordByKeyHash(ki *KeyInfo) (*Record, bool, error)

func (*HStore) HintDumper

func (store *HStore) HintDumper(interval time.Duration)

func (*HStore) Incr

func (store *HStore) Incr(ki *KeyInfo, value int) int

func (*HStore) IsGCRunning added in v1.1.0

func (store *HStore) IsGCRunning() bool

func (*HStore) ListDir

func (store *HStore) ListDir(ki *KeyInfo) ([]byte, error)

func (*HStore) ListUpper

func (store *HStore) ListUpper(ki *KeyInfo) ([]byte, error)

func (*HStore) NumKey

func (store *HStore) NumKey() (n int)

func (*HStore) Set

func (store *HStore) Set(ki *KeyInfo, p *Payload) error

type HStoreConfig

type HStoreConfig struct {
	config.DBRouteConfig `yaml:"-"` // from route table
	DBLocalConfig        `yaml:"local,omitempty"`

	DataConfig  `yaml:"data,omitempty"`
	HintConfig  `yaml:"hint,omitempty"`
	HTreeConfig `yaml:"htree,omitempty"`
}
var (
	KHASH_LENS = [8]int{8, 8, 7, 7, 6, 6, 5, 5}
	Conf       *HStoreConfig
)

func (*HStoreConfig) Init

func (c *HStoreConfig) Init() error

for test

func (*HStoreConfig) InitDefault

func (c *HStoreConfig) InitDefault()

func (*HStoreConfig) InitTree

func (c *HStoreConfig) InitTree() error

must be called before use NumBucket => TreeDepth => (TreeKeyHashLen & TreeKeyHashMask)

type HTree

type HTree struct {
	sync.Mutex
	// contains filtered or unexported fields
}

func (*HTree) ListDir

func (tree *HTree) ListDir(ki *KeyInfo) (ret []byte, err error)

func (*HTree) ListTop

func (tree *HTree) ListTop()

func (*HTree) Update

func (tree *HTree) Update() (node *Node)

type HTreeConfig

type HTreeConfig struct {
	TreeHeight int `yaml:"tree_height,omitempty"`
	TreeDump   int `yaml:"tree_dump,omitempty"`

	HtreeDerivedConfig `yaml:"-"`
}

type HTreeItem

type HTreeItem HintItemMeta

type HTreeReq

type HTreeReq struct {
	Meta
	Position
	// contains filtered or unexported fields
}

type HashFuncType

type HashFuncType func(key []byte) uint64

type HintBuffer

type HintBuffer struct {
	// contains filtered or unexported fields
}

func NewHintBuffer

func NewHintBuffer() *HintBuffer

func (*HintBuffer) Dump

func (h *HintBuffer) Dump(path string) (index *hintFileIndex, err error)

func (*HintBuffer) Get

func (h *HintBuffer) Get(keyhash uint64, key string) (it *HintItem, iscollision bool)

func (*HintBuffer) Set

func (h *HintBuffer) Set(it *HintItem, recSize uint32) bool

func (*HintBuffer) SetMaxOffset

func (h *HintBuffer) SetMaxOffset(offset uint32)

type HintConfig

type HintConfig struct {
	NoMerged          bool  `yaml:"hint_no_merged,omitempty"`      // merge only used to find collision, but not dump idx.m to save disk space
	MergeInterval     int   `yaml:"hint_merge_interval,omitempty"` // merge after rotating each MergeInterval chunk
	IndexIntervalSize int64 `yaml:"-"`                             // max diff of offsets of two adjacent hint index items
	SplitCap          int64 `yaml:"-"`                             // pre alloc SplitCap slot for each split, when slots are all filled, slot is dumped

	SplitCapStr          string `yaml:"hint_split_cap_str,omitempty"`
	IndexIntervalSizeStr string `yaml:"hint_index_interval_str,omitempty"`
}

type HintID

type HintID struct {
	Chunk int
	Split int
}

type HintItem

type HintItem struct {
	HintItemMeta
	Key string
}

type HintItemMeta

type HintItemMeta struct {
	Keyhash uint64
	Pos     Position
	Ver     int32
	Vhash   uint16
}

type HintStatus

type HintStatus struct {
	NumRead int
	MaxRead int
	MaxTime time.Duration
}

type HtreeDerivedConfig

type HtreeDerivedConfig struct {
	TreeDepth       int // from NumBucket
	TreeKeyHashMask uint64
	TreeKeyHashLen  int
}

type ItemFunc

type ItemFunc func(uint64, *HTreeItem)

type KeyInfo

type KeyInfo struct {
	KeyHash   uint64
	KeyIsPath bool
	Key       []byte
	StringKey string
	KeyPos
}

func NewKeyInfoFromBytes

func NewKeyInfoFromBytes(key []byte, keyhash uint64, keyIsPath bool) (ki *KeyInfo)

func (*KeyInfo) Prepare

func (ki *KeyInfo) Prepare() (err error)

type KeyPos

type KeyPos struct {
	KeyPathBuf [16]int
	KeyPath    []int

	// need depth
	BucketID        int
	KeyPathInBucket []int
}

computed once, before being routed to a bucket

type Meta

type Meta struct {
	TS   uint32
	Flag uint32
	Ver  int32
	// computed once
	ValueHash uint16
	RecSize   uint32
}

type Node

type Node struct {
	// contains filtered or unexported fields
}

type NodeInfo

type NodeInfo struct {
	// contains filtered or unexported fields
}

type Payload

type Payload struct {
	Meta
	cmem.CArray
}

func GetPayloadForDelete

func GetPayloadForDelete() *Payload

func (*Payload) CalcValueHash

func (p *Payload) CalcValueHash()

func (*Payload) Copy

func (p *Payload) Copy() *Payload

func (*Payload) Decompress

func (p *Payload) Decompress() (err error)

func (*Payload) DiffSizeAfterDecompressed

func (p *Payload) DiffSizeAfterDecompressed() int

func (*Payload) Getvhash

func (p *Payload) Getvhash() uint16

func (*Payload) IsCompressed

func (p *Payload) IsCompressed() bool

func (*Payload) RawValueSize

func (p *Payload) RawValueSize() int

type Position

type Position struct {
	ChunkID int
	Offset  uint32
}

func (*Position) CmpKey

func (pos *Position) CmpKey() int64

type Record

type Record struct {
	Key     []byte
	Payload *Payload
}

func (*Record) Copy

func (rec *Record) Copy() *Record

func (*Record) Dumps

func (rec *Record) Dumps() []byte

func (*Record) LogString

func (rec *Record) LogString() string

func (*Record) Size

func (rec *Record) Size() uint32

func (*Record) Sizes

func (rec *Record) Sizes() (uint32, uint32)

must be compressed

func (*Record) TryCompress

func (rec *Record) TryCompress()

type SliceHeader

type SliceHeader struct {
	Data uintptr
	Len  int
}

func (*SliceHeader) Get

func (sh *SliceHeader) Get(req *HTreeReq) (exist bool)

func (*SliceHeader) Iter

func (sh *SliceHeader) Iter(f ItemFunc, ni *NodeInfo)

func (*SliceHeader) Remove

func (sh *SliceHeader) Remove(ki *KeyInfo, oldPos Position) (oldm HTreeItem, removed bool)

func (*SliceHeader) Set

func (sh *SliceHeader) Set(req *HTreeReq) (oldm HTreeItem, exist bool)

func (*SliceHeader) ToBytes

func (sh *SliceHeader) ToBytes() (b []byte)

type WriteRecord

type WriteRecord struct {
	// contains filtered or unexported fields
}

func (*WriteRecord) String

func (wrec *WriteRecord) String() string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL