promisedb

package module
v0.0.0-...-722fe32 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 28, 2023 License: Apache-2.0 Imports: 21 Imported by: 0

README

PromiseDB

PromiseDB is a high-performance key-value storage engine built upon the BitCask model.

Key features

  • Fast read and write: Efficiently read operations are achieved by directly accessing the data's position in memory using an index, enabling fast retrieval. Write operations are performed in an append-only manner, ensuring speed.
  • Fast startup: By batching the reading of log entries instead of processing them individually, the number of IO operations is minimized, leading to a considerably faster startup. Moreover, smaller log entries contribute to even greater speed.
  • Fast compaction: The garbage manager asynchronously counts the number of invalid records in each file. Only files that exceed the threshold of invalid records proportion are compacted during each compaction cycle, ensuring faster compaction process.

warn: Due to the bitcask model, all keys must fit in memory.

Simple example

func main() {
    option := promisedb.DefaultOption()
    db, err := promisedb.OpenDB(option)
    defer db.Close()
    if err != nil {
        panic(err.Error())
    }

    err = db.Put([]byte("hello"), []byte("world"), 0)
    if err != nil {
        panic(err.Error())
    }

    et, err := db.Get([]byte("hello"))
    if err != nil {
        panic(err.Error())
    }

    if bytes.Compare(et.Value, []byte("world")) != 0 {
        panic(et.Value)
    }

    err = db.Del([]byte("hello"))
    if err != nil {
        panic(err.Error())
    }

    et, err = db.Get([]byte("hello"))
    if err != promisedb.ErrKeyNotExist {
        panic(err.Error())
    }

    err = db.Put([]byte("bigboss"), []byte("2063"), 10*time.Second)
    if err != nil {
        panic(err.Error())
    }

    os.RemoveAll(option.Path)
}

Todo

  • Support transaction(Serializable Isolation) to ensure ACID properties.(Since bitcask needs to put all indexes in memory, the cost of implementing MVCC is too high, so the SSI isolation level is not considered).
  • Support TTL(Implement expired deletion based on time heap).
  • Support Watch(There are some pref problems).
  • Support Redis protocol and commands.
  • Support some more complex data structures, such as:
    • String
    • List
    • Hash
    • Set
    • ZSet
  • Support distributed cluster based on Raft algorithm.

If you have any good ideas, just open a new issue.

Contribute

PRs always welcome.

I warmly invite developers who are interested in this project to actively engage in its development and join the discussions.

Contact me

You can contact me from this email: bigboss2063@outlook.com.

License

Apache License v2 @bigboss2063

Documentation

Index

Constants

View Source
const (
	GarbageManagerFreeListSize = 1024
	GarbageManagerFileName     = "gm"
	GarbageManagerRecordSize   = 9
)
View Source
const (
	DBDirectory           = "/apexdb"
	MaxDataFileSize       = 64 * 1024 * 1024
	CompactionInternal    = 8 * time.Hour
	GarbageRation         = 0.3
	GarbageManagerBufSize = 8 * 1024 * 1024
)
View Source
const (
	DataFileSuffix = ".apex"
)
View Source
const EntryMetaSize = 2 + 4 + 8 + 8 + 4 + 4

Variables

View Source
var (
	ErrKeyIsEmpty       = errors.New("the key is empty")
	ErrKeyNotExist      = errors.New("the key dose not exist")
	ErrDataFileNotExist = errors.New("the data file dose not exist")
	ErrCompacting       = errors.New("compacting in progress")
)
View Source
var (
	ErrGarbageMetaDataNotExist = errors.New("the garbage meta data not exist")
	ErrGarbageFreeListEmpty    = errors.New("the garbage free list is empty")
)
View Source
var (
	ErrDBClosed     = errors.New("db already closed")
	ErrTxnCommitted = errors.New("txn already committed")
	ErrTxnReadOnly  = errors.New("txn is read only")
)
View Source
var (
	ErrEntryWrong = errors.New("the data of entry is wrong")
)

Functions

This section is empty.

Types

type DB

type DB struct {
	// contains filtered or unexported fields
}

func OpenDB

func OpenDB(option *Options) (*DB, error)

func (*DB) Begin

func (db *DB) Begin(readOnly bool) (*Txn, error)

func (*DB) Close

func (db *DB) Close() error

func (*DB) Compaction

func (db *DB) Compaction() error

func (*DB) Del

func (db *DB) Del(key []byte) error

func (*DB) Get

func (db *DB) Get(key []byte) (*Entry, error)

func (*DB) Notify

func (db *DB) Notify(key string, value []byte, entryType watch.EventType)

func (*DB) Put

func (db *DB) Put(key []byte, value []byte) error

func (*DB) PutWithExpiration

func (db *DB) PutWithExpiration(key []byte, value []byte, duration time.Duration) error

func (*DB) Watch

func (db *DB) Watch(ctx context.Context, key string)

type DataFile

type DataFile struct {
	// contains filtered or unexported fields
}

func CreateDataFile

func CreateDataFile(path string, fileId uint32) (*DataFile, error)

func OpenDataFile

func OpenDataFile(path string, fileId uint32) (*DataFile, error)

func (*DataFile) Close

func (df *DataFile) Close() error

func (*DataFile) ReadAt

func (df *DataFile) ReadAt(off int64) (*Entry, error)

func (*DataFile) ReadBatch

func (df *DataFile) ReadBatch(off int64) ([]*Entry, []uint32, int64, error)

func (*DataFile) ReadEntryAt

func (df *DataFile) ReadEntryAt(off int64, entrySize int) (*Entry, error)

func (*DataFile) ReadGarbageSize

func (df *DataFile) ReadGarbageSize(data []byte) error

func (*DataFile) Sync

func (df *DataFile) Sync() error

func (*DataFile) WriteAt

func (df *DataFile) WriteAt(data []byte, off int64) (int, error)

func (*DataFile) WriteGarbageSize

func (df *DataFile) WriteGarbageSize(length uint32) error

type DataPos

type DataPos struct {
	FileId uint32
	Vsz    uint32
	Vpos   uint32
}

type Entry

type Entry struct {
	Key      []byte
	Value    []byte
	MetaData *MetaData
}

func NewEntry

func NewEntry(key []byte, value []byte, entryType EntryType) *Entry

func (*Entry) DecodeLogEntry

func (et *Entry) DecodeLogEntry(data []byte) error

func (*Entry) DecodeLogEntryMeta

func (et *Entry) DecodeLogEntryMeta(data []byte)

func (*Entry) EncodeLogEntry

func (et *Entry) EncodeLogEntry() []byte

func (*Entry) Size

func (et *Entry) Size() int

type EntryType

type EntryType = uint16
const (
	NormalEntry EntryType = iota
	Tombstone
	Begin
	Commit
)

type Fd

type Fd struct {
	// contains filtered or unexported fields
}

func NewFd

func NewFd(path string) (*Fd, error)

func (*Fd) Close

func (fd *Fd) Close() (err error)

func (*Fd) ReadAt

func (fd *Fd) ReadAt(data []byte, off int64) (n int, err error)

func (*Fd) Sync

func (fd *Fd) Sync() (err error)

func (*Fd) WriteAt

func (fd *Fd) WriteAt(data []byte, off int64) (n int, err error)

type GarbageManager

type GarbageManager struct {
	// contains filtered or unexported fields
}

func NewGarbageManager

func NewGarbageManager(path string, bufSize int) (*GarbageManager, error)

type GarbageMetaData

type GarbageMetaData struct {
	// contains filtered or unexported fields
}

type HashTable

type HashTable struct {
	// contains filtered or unexported fields
}

func (*HashTable) Del

func (h *HashTable) Del(key string)

func (*HashTable) Get

func (h *HashTable) Get(key string) *DataPos

func (*HashTable) Put

func (h *HashTable) Put(key string, dataPost *DataPos)

func (*HashTable) Size

func (h *HashTable) Size() int

type Index

type Index interface {
	Put(key string, dataPost *DataPos)
	Get(key string) *DataPos
	Del(key string)
	Size() int
}

func NewIndex

func NewIndex() Index

type MetaData

type MetaData struct {
	Crc        uint32
	EntryType  EntryType
	TxnId      uint64
	Expiration uint64
	Ksz        uint32
	Vsz        uint32
}

type Options

type Options struct {
	Sync                  bool
	Path                  string
	MaxDataFileSize       uint32
	CompactionInternal    time.Duration
	GarbageRate           float64
	GarbageManagerBufSize int
}

func DefaultOptions

func DefaultOptions() *Options

type RWManager

type RWManager interface {
	io.ReaderAt
	io.WriterAt
	Sync() (err error)
	Close() (err error)
}

type Txn

type Txn struct {
	// contains filtered or unexported fields
}

func (*Txn) Commit

func (txn *Txn) Commit() error

func (*Txn) Del

func (txn *Txn) Del(key []byte) error

func (*Txn) Get

func (txn *Txn) Get(key []byte) (*Entry, error)

func (*Txn) Put

func (txn *Txn) Put(key []byte, value []byte) error

func (*Txn) PutWithExpiration

func (txn *Txn) PutWithExpiration(key []byte, value []byte, duration time.Duration) error

Directories

Path Synopsis
pkg
ttl

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL