nimbusdb

package module
v0.1.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 24, 2024 License: MIT Imports: 20 Imported by: 0

README

nimbusdb

WARNING: nimbusdb is in early stages of development; do not use this in production.

Persistent key-value store based on Bitcask paper.

nimbusdb is a fast, lightweight, and scalable key-value store based on Bitcask.

nimbusdb maintains an active datafile to which data is written. When it crosses a threshold, the datafile is made inactive, and a new datafile is created. As time passes, expired or deleted keys take up space that is not useful. Hence, a process called merge is done to remove all expired or deleted keys and free up space.

Features

Thread-Safe All operations are thread-safe. Read and Write operations can handle multiple operations from multiple goroutines at the same time with consistency.
Portable Data is extremely portable since it is only a bunch of files. All you have to do is move the folder and open an DB connection at that path.
Custom Expiry Supports custom expiry for keys. Default expiry is 1 week.
Supports Merge Supports `Sync` which can be called periodically to remove expired/deleted keys from disk and free-up more space.
Supports Batch operations Batch operations can be performed and committed to save to disk or rollbacked to discard the batch. Operations cannot be performed once the batch is closed.
Single disk-seek write Writes are just one disk seek since we're appending to the file.
Block cache for faster reads. Blocks are cached for faster reads. Default size of an Block is 32KB.

Usage

Initialize db connection
d, err := nimbusdb.Open(&nimbusdb.Options{Path: "/path/to/data/directory"})
if err != nil {
  // handle error
}
defer d.Close()
Set
kvPair := &nimbusdb.KeyValuePair{
  Key:   []byte("key"),
  Value: []byte("value"),
  Ttl: 5 * time.Minute, // Optional, default is 1 week
}
setValue, err := d.Set(kvPair)
if err != nil {
  // handle error
}
Get
value, err := d.Get([]byte("key"))
if err != nil {
  // handle error
}
Delete
value, err := d.Delete([]byte("key"))
if err != nil {
  // handle error
}
Sync

This does the merge process. This can be an expensive operation, hence it is better to run this periodically and whenever the traffic is low.

err := d.Sync()
if err != nil {
  // handle error
}
Batch Operations
d, err := nimbusdb.Open(&nimbusdb.Options{Path: "/path/to/data/directory"})
if err != nil {
  // handle error
}
defer d.Close()
b, err := d.NewBatch()
if err != nil {
  // handle error
}
defer b.Close()

_, err = b.Set([]byte("key"), []byte("value")) // not written to disk yet.
if err != nil {
  // handle error
}

key, err := b.Get([]byte("key"))
if err != nil {
  // handle error
}

err = b.Delete([]byte("key"))
if err != nil {
  // handle error
}

exists, err := b.Exists([]byte("key"))
if err != nil {
  // handle error
}

b.Commit() // write all pending writes to disk
b.Rollback() // discard all pending writes
Watch keys
func watchKeyChange(ch chan nimbusdb.WatcherEvent) {
  for event := range ch {
    switch event.EventType {
      case "CREATE":
        // Handle create key event
        break

      case "UPDATE":
        // Handle update key event
        break

      case "DELETE":
        // Handle delete key event
        break
    }
  }
}

func main() {
  d, err := nimbusdb.Open(&nimbusdb.Options{Path: "/path/to/data/directory", ShouldWatch: true})
  if err != nil {
    // handle error
  }
  defer d.Close()
  defer d.CloseWatch() // optional
  
  watchChannel, err := d.Watch()
  if err != nil {
    // handle error
  }
  
  go watchEvents(watchChannel)

  kvPair := &nimbusdb.KeyValuePair{
    Key:   []byte("key"),
    Value: []byte("value"),
  }
  setValue, err := d.Set(kvPair) // will trigger an CREATE event
  if err != nil {
    // handle error
  }

  setValue, err := d.Set(kvPair) // will trigger an UPDATE event
  if err != nil {
    // handle error
  }

  err = d.Delete(kvPair.Key) // will trigger an DELETE event
  if err != nil {
    // handle error
  }
}

Progress Board | Streams | godoc

Go CodeFactor

Documentation

Index

Constants

View Source
const (
	KB int64 = 1 << (10 * iota)
	MB
	GB
	TB
	PB
	EB
)
View Source
const (
	ActiveKeyValueEntryDatafileSuffix   = ".dfile"
	KeyValueEntryHintfileSuffix         = ".hfile"
	InactiveKeyValueEntryDataFileSuffix = ".idfile"
	TempDataFilePattern                 = "*.dfile"
	TempInactiveDataFilePattern         = "*.idfile"
	DefaultDataDir                      = "nimbusdb"

	DatafileThreshold = 1 * MB
	BlockSize         = 32 * KB
)
View Source
const (
	CrcSize         int64 = 5
	DeleteFlagSize        = 1
	TstampSize            = 10
	KeySizeSize           = 10
	ValueSizeSize         = 10
	StaticChunkSize       = CrcSize + DeleteFlagSize + TstampSize + KeySizeSize + ValueSizeSize

	CrcOffset        int64 = 5
	DeleteFlagOffset       = 6
	TstampOffset           = 16
	KeySizeOffset          = 26
	ValueSizeOffset        = 36

	BTreeDegree int = 10
)
View Source
const (
	KEY_EXPIRES_IN_DEFAULT = 168 * time.Hour // 1 week

	DELETED_FLAG_BYTE_VALUE  = byte(0x31)
	DELETED_FLAG_SET_VALUE   = byte(0x01)
	DELETED_FLAG_UNSET_VALUE = byte(0x00)

	LRU_SIZE = 50
	LRU_TTL  = 24 * time.Hour

	EXIT_NOT_OK = 0
	EXIT_OK     = 1

	INITIAL_SEGMENT_OFFSET         = 0
	INITIAL_KEY_VALUE_ENTRY_OFFSET = 0
)

Variables

View Source
var (
	ERROR_BATCH_CLOSED              = errors.New("batch is closed")
	ERROR_CANNOT_CLOSE_CLOSED_BATCH = errors.New("cannot close closed batch")
)
View Source
var (
	ERROR_KEY_NOT_FOUND             = errors.New("key expired or does not exist")
	ERROR_NO_ACTIVE_FILE_OPENED     = errors.New("no file opened for writing")
	ERROR_OFFSET_EXCEEDED_FILE_SIZE = errors.New("offset exceeded file size")
	ERROR_CANNOT_READ_FILE          = errors.New("error reading file")
	ERROR_KEY_VALUE_SIZE_EXCEEDED   = errors.New(fmt.Sprintf("exceeded limit of %d bytes", BlockSize))
	ERROR_CRC_DOES_NOT_MATCH        = errors.New("crc does not match. corrupted datafile")
	ERROR_DB_CLOSED                 = errors.New("database is closed")
)

Functions

This section is empty.

Types

type BTree

type BTree struct {
	// contains filtered or unexported fields
}

func (*BTree) Delete

func (b *BTree) Delete(key []byte) *KeyValuePair

func (*BTree) Get

func (b *BTree) Get(key []byte) *KeyDirValue

func (*BTree) List

func (b *BTree) List() []*KeyValuePair

func (*BTree) Set

func (b *BTree) Set(key []byte, value KeyDirValue) *KeyDirValue

type Batch added in v0.1.1

type Batch struct {
	// contains filtered or unexported fields
}

func (*Batch) Close added in v0.1.1

func (b *Batch) Close() error

func (*Batch) Commit added in v0.1.1

func (b *Batch) Commit() error

func (*Batch) Delete added in v0.1.1

func (b *Batch) Delete(k []byte) error

func (*Batch) Exists added in v0.1.1

func (b *Batch) Exists(k []byte) (bool, error)

func (*Batch) Get added in v0.1.1

func (b *Batch) Get(k []byte) ([]byte, error)

func (*Batch) Rollback added in v0.1.1

func (b *Batch) Rollback() error

func (*Batch) Set added in v0.1.1

func (b *Batch) Set(k []byte, v []byte) ([]byte, error)

func (*Batch) SetWithTTL added in v0.1.1

func (b *Batch) SetWithTTL(k []byte, v []byte, ttl time.Duration) ([]byte, error)

type Block

type Block struct {
	// contains filtered or unexported fields
}

Block represents a single block of disk memory. Default size is 32KB. Each Segment is a collection of blocks; Each block is a collection of KeyValueEntries.

type BlockOffsetPair

type BlockOffsetPair struct {
	// contains filtered or unexported fields
}

BlockOffsetPair contains metadata about the Block. The start and ending offsets of the Block, and the path.

type Db

type Db struct {
	// contains filtered or unexported fields
}

func NewDb

func NewDb(dirPath string, opts ...*Options) *Db

func Open

func Open(opts *Options) (*Db, error)

func (*Db) All

func (db *Db) All() []*KeyValuePair

func (*Db) Close

func (db *Db) Close() error

Closes the database. Closes the file pointer used to read/write the activeDataFile. Closes all file inactiveDataFile pointers and marks them as closed.

func (*Db) CloseWatch added in v0.1.2

func (db *Db) CloseWatch() error

func (*Db) Delete

func (db *Db) Delete(key []byte) error

Deletes a key-value pair. Returns error if any.

func (*Db) Get

func (db *Db) Get(key []byte) ([]byte, error)

Gets a key-value pair. Returns the value if the key exists and error if any.

func (*Db) KeyReader added in v0.1.1

func (db *Db) KeyReader(prefix string, handler func(k []byte))

KeyReader iterates through each key matching given prefix. If prefix is an empty string, all keys are matched. The second argument is a callback function which contains the key.

func (*Db) KeyValueReader added in v0.1.1

func (db *Db) KeyValueReader(keyPrefix string, handler func(k []byte, v []byte)) (bool, error)

KeyValueReader iterates through each key-value pair matching given key's prefix. If prefix is an empty string, all key-value pairs are matched. The second argument is a callback function which contains key and the value.

func (*Db) NewBatch added in v0.1.1

func (db *Db) NewBatch() (*Batch, error)

func (*Db) NewWatch added in v0.1.2

func (db *Db) NewWatch() (chan WatcherEvent, error)

func (*Db) SendWatchEvent added in v0.1.2

func (db *Db) SendWatchEvent(w WatcherEvent) error

func (*Db) Set

func (db *Db) Set(k []byte, v []byte) ([]byte, error)

Sets a key-value pair. Returns the value if set succeeds, else returns an error.

func (*Db) SetWithTTL added in v0.1.1

func (db *Db) SetWithTTL(k []byte, v []byte, ttl time.Duration) (interface{}, error)

func (*Db) Sync

func (db *Db) Sync() error

Syncs the database. Will remove all expired/deleted keys from disk. Since items are removed, disk usage will reduce.

type EventType added in v0.1.2

type EventType string
const (
	Create EventType = "CREATE"
	Update EventType = "UPDATE"
	Delete EventType = "DELETE"
)

type KeyDirValue

type KeyDirValue struct {
	// contains filtered or unexported fields
}

func NewKeyDirValue

func NewKeyDirValue(offset, size, tstamp int64, path string) *KeyDirValue

type KeyValueEntry

type KeyValueEntry struct {
	// contains filtered or unexported fields
}

KeyValueEntry is the raw and complete uncompressed data existing on the disk. KeyValueEntry is stored in Blocks in cache for faster reads.

func (*KeyValueEntry) Key

func (kv *KeyValueEntry) Key() []byte

func (*KeyValueEntry) PayloadToByte added in v0.1.1

func (kv *KeyValueEntry) PayloadToByte() []byte

func (*KeyValueEntry) StaticChunkSize

func (kv *KeyValueEntry) StaticChunkSize() int64

func (*KeyValueEntry) ToByte

func (kv *KeyValueEntry) ToByte() []byte

func (*KeyValueEntry) Value

func (kv *KeyValueEntry) Value() []byte

type KeyValuePair

type KeyValuePair struct {
	Key   []byte
	Value interface{}
	Ttl   time.Duration
}

type Options

type Options struct {
	IsMerge        bool
	MergeFilePath  string
	Path           string
	ShouldWatch    bool
	WatchQueueSize int
}

type Segment

type Segment struct {
	// contains filtered or unexported fields
}

Segment represents an entire file. It is divided into Blocks. Each Segment is a collection of Blocks of size 32KB. A file pointer is kept opened for reading purposes. closed represents the state of the Segment's file pointer.

type WatcherEvent added in v0.1.2

type WatcherEvent struct {
	EventType      EventType
	Key            []byte
	OldValue       []byte
	NewValue       []byte
	EventTimestamp time.Time
	BatchId        *ksuid.KSUID
}

func NewCreateWatcherEvent added in v0.1.2

func NewCreateWatcherEvent(key, oldValue, newValue []byte, batchId *ksuid.KSUID) WatcherEvent

func NewDeleteWatcherEvent added in v0.1.2

func NewDeleteWatcherEvent(key, oldValue, newValue []byte, batchId *ksuid.KSUID) WatcherEvent

func NewUpdateWatcherEvent added in v0.1.2

func NewUpdateWatcherEvent(key, oldValue, newValue []byte, batchId *ksuid.KSUID) WatcherEvent

Directories

Path Synopsis
db

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL