riverdb

package module
v0.4.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 14, 2023 License: MIT Imports: 31 Imported by: 0

README

RiverDB

Static Badge GitHub License

 ███████   ██                         ███████   ██████
░██░░░░██ ░░             To the moon ░██░░░░██ ░█░░░░██
░██   ░██  ██ ██    ██  █████  ██████░██    ░██░█   ░██
░███████  ░██░██   ░██ ██░░░██░░██░░█░██    ░██░██████
░██░░░██  ░██░░██ ░██ ░███████ ░██ ░ ░██    ░██░█░░░░ ██
░██  ░░██ ░██ ░░████  ░██░░░░  ░██   ░██    ██ ░█    ░██
░██   ░░██░██  ░░██   ░░██████░███   ░███████  ░███████
░░     ░░ ░░    ░░     ░░░░░░ ░░░    ░░░░░░░   ░░░░░░░

RiverDB is a light-weight embeddable key-value nosql database, it is base on bitcask model and wal. Features as follows:

  • ACID transactions
  • record ttl
  • custom key sorting rules
  • range matching and iteration
  • event watcher
  • batch write and delete
  • targzip backup and recover from backup
  • grpc supported, to see river-server

RiverDB can be used as a standalone database or as an underlying storage engine.

The project is still under testing and stability cannot be guaranteed

install

it is a embeddable db, so you can use it in your code without network transportation.

go get -u github.com/246859/river

how to use

quick start

this is a simple example for use put and get operation.

import (
	"fmt"
	riverdb "github.com/246859/river"
)

func main() {
	// open the river db
	db, err := riverdb.Open(riverdb.DefaultOptions, riverdb.WithDir("riverdb"))
	if err != nil {
		panic(err)
	}
	defer db.Close()
	// put key-value pairs
	err = db.Put([]byte("key"), []byte("value"), 0)
	if err != nil {
		panic(err)
	}

	// get value from key
	value, err := db.Get([]byte("key"))
	if err != nil {
		panic(err)
	}
	fmt.Println(string(value))
}

Remember to close db after used up.

iteration

riverdb iteration is key-only.

import (
    "fmt"
    riverdb "github.com/246859/river"
)

func main() {
    // open the river db
    db, err := riverdb.Open(riverdb.DefaultOptions, riverdb.WithDir("riverdb"))
    if err != nil {
       panic(err)
    }
    defer db.Close()
    // put key-value pairs
    err = db.Put([]byte("key"), []byte("value"), 0)
    if err != nil {
       panic(err)
    }

    // get value from key
    value, err := db.Get([]byte("key"))
    if err != nil {
       panic(err)
    }
    fmt.Println(string(value))

    db.Range(riverdb.RangeOptions{
       Min:     nil,
       Max:     nil,
       Pattern: nil,
       Descend: false,
    }, func(key riverdb.Key) bool {
       fmt.Println(key)
       return false
    })
}
transaction

simplely use transaction by Begin, Commit, RollBack APIs.

import (
	"fmt"
	riverdb "github.com/246859/river"
	"strings"
)

func main() {
	// open the river db
	db, err := riverdb.Open(riverdb.DefaultOptions, riverdb.WithDir("riverdb"))
	if err != nil {
		panic(err)
	}
	defer db.Close()

	// read write transactions
	db.Begin(func(txn *riverdb.Txn) error {
		for i := 0; i < 10; i++ {
			db.Put([]byte(strings.Repeat("a", i+1)), []byte(strings.Repeat("a", i+1)), 0)
		}
		return nil
	})

	// read only transactions
	db.View(func(txn *riverdb.Txn) error {
		for i := 0; i < 10; i++ {
			get, err := db.Get([]byte(strings.Repeat("a", i)))
			if err != nil {
				return err
			}
			fmt.Println(string(get))
		}
		return nil
	})
}
batch operation

batch operation has better performance than call db.Put or db.Del directly in large amount of data

import (
	"fmt"
	riverdb "github.com/246859/river"
	"os"
	"path/filepath"
	"strconv"
	"strings"
)

func main() {
	// open db
	db, err := riverdb.Open(riverdb.DefaultOptions, riverdb.WithDir(filepath.Join(os.TempDir(), "example")))
	if err != nil {
		panic(err)
	}
	// close
	defer db.Close()

	// open batch
	batch, err := db.Batch(riverdb.BatchOption{
		Size:        500,
		SyncOnFlush: true,
	})

	var rs []riverdb.Record
	var ks []riverdb.Key

	for i := 0; i < 1000; i++ {
		rs = append(rs, riverdb.Record{
			K:   []byte(strconv.Itoa(i)),
			V:   []byte(strings.Repeat("a", i)),
			TTL: 0,
		})
		ks = append(ks, rs[i].K)
	}

	// write all
	if err := batch.WriteAll(rs); err != nil {
		panic(err)
	}

	// delete all
	if err := batch.DeleteAll(ks); err != nil {
		panic(err)
	}

	// wait to batch finished
	if err := batch.Flush(); err != nil {
		panic(err)
	}
	fmt.Println(batch.Effected())
}
backup & recover

backup only archive data in datadir

import (
	riverdb "github.com/246859/river"
	"os"
	"path/filepath"
)

func main() {
	// open db
	db, err := riverdb.Open(riverdb.DefaultOptions, riverdb.WithDir(filepath.Join(os.TempDir(), "example")))
	if err != nil {
		panic(err)
	}
	// close
	defer db.Close()

	archive := filepath.Join(os.TempDir(), "example.tar.gz")
	err = db.Backup(archive)
	if err != nil {
		panic(err)
	}

	err = db.Recover(archive)
	if err != nil {
		panic(err)
	}
}
statistic
func main() {
	// open db
	db, err := riverdb.Open(riverdb.DefaultOptions, riverdb.WithDir(filepath.Join(os.TempDir(), "example")))
	if err != nil {
		panic(err)
	}
	// close
	defer db.Close()
	
    // statistic
	stats := db.Stats()
	fmt.Println(stats.DataSize)
	fmt.Println(stats.HintSize)
	fmt.Println(stats.KeyNums)
	fmt.Println(stats.RecordNums)
}
watcher

you can modfiy which event to watch in db option

import (
	"fmt"
	riverdb "github.com/246859/river"
	"os"
	"path/filepath"
	"sync"
	"time"
)

func main() {
	// open db
	db, err := riverdb.Open(riverdb.DefaultOptions, riverdb.WithDir(filepath.Join(os.TempDir(), "example")))
	defer db.Close()
	if err != nil {
		panic(err)
	}

	var wg sync.WaitGroup
	wg.Add(1)

	watcher, err := db.Watcher(riverdb.PutEvent)
	if err != nil {
		panic(err)
	}

	db.Put([]byte("hello world"), []byte("world"), 0)

	go func() {
		defer wg.Done()
		listen, err := watcher.Listen()
		if err != nil {
			panic(err)
		}

		for event := range listen {
			fmt.Println(event)
		}
	}()

	time.Sleep(time.Second)
	watcher.Close()

	wg.Wait()
}
merge

you can use db.Merge to proactively clean up redundant data in the database.

import (
	riverdb "github.com/246859/river"
	"os"
	"path/filepath"
)

func main() {
	// open db
	db, err := riverdb.Open(riverdb.DefaultOptions, riverdb.WithDir(filepath.Join(os.TempDir(), "example")))
	defer db.Close()
	if err != nil {
		panic(err)
	}

	db.Merge(true)
}

set Options.MergeCheckup=0 if you want to disable the default merge check up job.

benchmark

all benchmark tests use the default options.

goos: windows
goarch: amd64
pkg: github.com/246859/river
cpu: 11th Gen Intel(R) Core(TM) i7-11800H @ 2.30GHz
BenchmarkDB_Get_1k
BenchmarkDB_Get_1k-16             375000              3194 ns/op            1598 B/op         10 allocs/op
BenchmarkDB_Get_1w
BenchmarkDB_Get_1w-16             362989              3211 ns/op            2153 B/op         10 allocs/op
BenchmarkDB_Get_10w
BenchmarkDB_Get_10w-16            151460              7520 ns/op           12284 B/op         10 allocs/op
BenchmarkDB_Get_100w
BenchmarkDB_Get_100w-16            60800             19827 ns/op           32790 B/op         12 allocs/op
BenchmarkDb_Put_1
BenchmarkDb_Put_1-16              106227             16909 ns/op            6236 B/op         34 allocs/op
BenchmarkDb_Put_100
BenchmarkDb_Put_100-16               871           1449504 ns/op          601988 B/op       3405 allocs/op
BenchmarkDb_Put_1K
BenchmarkDb_Put_1K-16                 81          14566722 ns/op         5995953 B/op      34065 allocs/op
BenchmarkDb_Put_1W
BenchmarkDb_Put_1W-16                  8         166566838 ns/op        59862081 B/op     340381 allocs/op
BenchmarkDb_Put_10W
BenchmarkDb_Put_10W-16                 1        1431037500 ns/op        618953960 B/op   3407025 allocs/op
BenchmarkDb_Put_256B
BenchmarkDb_Put_256B-16           107794             10583 ns/op            5658 B/op         34 allocs/op
BenchmarkDb_Put_64KB
BenchmarkDb_Put_64KB-16             4440            473349 ns/op           55333 B/op         35 allocs/op
BenchmarkDb_Put_256KB
BenchmarkDb_Put_256KB-16           10000            840960 ns/op          106526 B/op         37 allocs/op
BenchmarkDb_Put_1MB
BenchmarkDb_Put_1MB-16               720           4849446 ns/op          653983 B/op         53 allocs/op
BenchmarkDb_Put_4MB
BenchmarkDb_Put_4MB-16               411           6401892 ns/op         1462057 B/op         69 allocs/op
BenchmarkDb_Put_8MB
BenchmarkDb_Put_8MB-16               462           4738826 ns/op          613061 B/op         52 allocs/op
PASS
ok      github.com/246859/river 71.723s

BenchmarkDB_Get_1k means that db use db.Get() to perform a random query based on the existing foundation of 1000 data entries in the database.

BenchmarkDb_Put_1k means that db use db.Put() to put 1000 data entries into the database at once.

BenchmarkDb_Put_256B means that db use db.Put() to put a data entry whose size is 256B.

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrKeyNotFound    = errors.New("key not found")
	ErrNilKey         = entry.ErrNilKey
	ErrDBClosed       = errors.New("db is already closed")
	ErrDirAlreadyUsed = errors.New("dir already used by another process")
)
View Source
var (
	ErrTxnClosed   = errors.New("transaction is closed")
	ErrTxnReadonly = errors.New("transaction is read-only")
	ErrTxnConflict = errors.New("transaction is conflict")
)
View Source
var (
	ErrWatcherClosed   = errors.New("watcher is closed")
	ErrInvalidEvent    = errors.New("invalid event")
	ErrWatcherDisabled = errors.New("event watcher disabled")
)
View Source
var DefaultOptions = Options{
	MaxSize:         defaultMaxFileSize,
	BlockCache:      defaultMaxFileSize / types.MB,
	Fsync:           false,
	FsyncThreshold:  blockSize * (defaultMaxFileSize / types.MB) / 4,
	Compare:         index.DefaultCompare,
	WatchSize:       2000,
	WatchEvents:     []EventType{PutEvent, DelEvent},
	Level:           ReadCommitted,
	MergeCheckpoint: 0,
}
View Source
var ErrBatchClosed = errors.New("batch is closed")
View Source
var (
	ErrMergedNotFinished = errors.New("merged not finished")
)

Functions

func FastRand

func FastRand() uint32

FastRand is a fast thread local random function.

Types

type Batch added in v0.2.0

type Batch struct {
	// contains filtered or unexported fields
}

Batch operator

func (*Batch) DeleteAll added in v0.2.0

func (ba *Batch) DeleteAll(keys []Key) error

DeleteAll delete all given key matching records from db in batchTxn

func (*Batch) Effected added in v0.2.0

func (ba *Batch) Effected() int64

func (*Batch) Flush added in v0.2.0

func (ba *Batch) Flush() error

Flush close Batch, then waiting for the remaining batchTxn to complete, and call db.Sync to Flush db finally.

func (*Batch) WriteAll added in v0.2.0

func (ba *Batch) WriteAll(records []Record) error

WriteAll writes all given records to db in batchTxn

type BatchOption added in v0.2.0

type BatchOption struct {
	// size of per batch
	Size int64
	// call Fsync after per batch has been written
	SyncPerBatch bool
	// call Fsync after all batch finished, not recommended enabled both SyncPerBatch and SyncOnFlush simultaneously
	// if both of SyncPerBatch and SyncOnFlush is false, batch will apply sync rules in db.options
	SyncOnFlush bool
}

type BitFlag added in v0.2.0

type BitFlag uint64

BitFlag 64-bit mask is used to store different status information

func (*BitFlag) Check added in v0.2.0

func (bf *BitFlag) Check(flags ...uint64) bool

func (*BitFlag) Revoke added in v0.2.0

func (bf *BitFlag) Revoke(flags ...uint64)

func (*BitFlag) Store added in v0.2.0

func (bf *BitFlag) Store(flags ...uint64)

type DB

type DB struct {
	// contains filtered or unexported fields
}

DB represents a db instance, which stores wal file in a specific data directory

func Open

func Open(options Options, opts ...Option) (*DB, error)

Open returns a river db instance

func OpenWithCtx added in v0.1.0

func OpenWithCtx(ctx context.Context, options Options, opts ...Option) (*DB, error)

OpenWithCtx returns a river db instance with context

func (*DB) Backup added in v0.1.0

func (db *DB) Backup(destpath string) error

Backup use tar gzip to compress data wal files to dest path

func (*DB) Batch added in v0.2.0

func (db *DB) Batch(opt BatchOption) (*Batch, error)

Batch provides ability to write or delete entries in batch which is update-only, it will split given records into several batches, and run a new goroutine for each batch to handle in batchTxn. each batch records will be stored in memory temporarily, then they will be written to the database in batch finally update db index if commit succeeds.

func (*DB) Begin

func (db *DB) Begin(fn func(txn *Txn) error) error

Begin begins a read-write transaction

func (*DB) Close

func (db *DB) Close() error

Close closes db Once the db is closed, it can no longer be used

func (*DB) Del

func (db *DB) Del(key Key) error

Del remove the key-value pair match the give key from db. it will return nil if key not exist

func (*DB) Expire

func (db *DB) Expire(key Key, ttl time.Duration) error

Expire update ttl of the specified key if ttl <= 0, the key will never expired

func (*DB) Get

func (db *DB) Get(key Key) (Value, error)

Get returns value match the given key, if it expired or not found db will return ErrKeyNotFound. nil Key is not allowed.

func (*DB) Merge

func (db *DB) Merge(domerge bool) error

Merge clean the redundant data entry in db, shrinking the db size if domerge is false, it will only record of merged data, will not replace them to data dir

func (*DB) Purge

func (db *DB) Purge() error

Purge remove all entries from data wal

func (*DB) Put

func (db *DB) Put(key Key, value Value, ttl time.Duration) error

Put puts a key-value pair into db, overwrite value if key already exists. nil key is invalid, but nil value is allowed, it will be overwritten to empty []byte. if tll == 0, key will be persisted, or ttl < 0, key will apply the previous ttl.

func (*DB) Range

func (db *DB) Range(option RangeOptions, handler RangeHandler) error

Range iterates over all the keys that match the given RangeOption and call handler for each key-value

func (*DB) Recover added in v0.1.0

func (db *DB) Recover(srcpath string) error

Recover recovers wal files from specified targz archive. it will purge current data, and overwrite by the backup.

func (*DB) Stats added in v0.2.0

func (db *DB) Stats() Stats

func (*DB) Sync

func (db *DB) Sync() error

Sync syncs written buffer to disk

func (*DB) TTL

func (db *DB) TTL(key Key) (time.Duration, error)

TTL returns left live time of the specified key

func (*DB) View added in v0.3.0

func (db *DB) View(fn func(txn *Txn) error) error

View begin a read-only transaction

func (*DB) Watcher added in v0.2.2

func (db *DB) Watcher(name string, events ...EventType) (*Watcher, error)

Watcher returns a new event watcher with the given event type, if events is empty, it will apply db.Option

type Event added in v0.1.0

type Event struct {
	Type  EventType
	Value any
}

Event represents a push event

type EventType added in v0.1.0

type EventType uint
const (
	PutEvent EventType = 1 + iota
	DelEvent
	RollbackEvent
	MergeEvent
	BackupEvent
	RecoverEvent
)

func (EventType) String added in v0.2.2

func (e EventType) String() string

type Key

type Key = []byte

Key db key type

type Option

type Option func(option *Options)

Option applying changes to the given option

func WithBlockCache

func WithBlockCache(block uint32) Option

func WithClosedGc

func WithClosedGc(gc bool) Option

func WithCompare

func WithCompare(compare index.Compare) Option

func WithDir

func WithDir(dir string) Option

func WithFsync

func WithFsync(sync bool) Option

func WithFsyncThreshold

func WithFsyncThreshold(threshold int64) Option

func WithMaxSize

func WithMaxSize(size int64) Option

func WithMergeCheckPoint added in v0.3.0

func WithMergeCheckPoint(checkPoint float64) Option

func WithTxnLevel added in v0.3.0

func WithTxnLevel(level TxnLevel) Option

func WithWatchEvent added in v0.1.0

func WithWatchEvent(events ...EventType) Option

func WithWatchSize added in v0.1.0

func WithWatchSize(size int) Option

type Options

type Options struct {
	// data dir that stores data files
	Dir string
	// max bytes size of the single data file can hold
	MaxSize int64
	// wal block cache size
	BlockCache uint32
	// call sync per write
	Fsync bool
	// call sync when reach the threshold
	FsyncThreshold int64
	// kv put/get events size for watch queue, disabled if is 0
	WatchSize int
	// specified events to watch
	WatchEvents []EventType
	// decide how to sort keys
	Compare index.Compare
	// check point of auto merge, disabled if is 0
	MergeCheckpoint float64
	// transaction isolation level
	Level TxnLevel
	// manually gc after closed db to release memory used by index
	ClosedGc bool
	// contains filtered or unexported fields
}

Options represent db configuration

type RangeHandler

type RangeHandler = func(key Key) bool

RangeHandler iterate over key-value in db

type RangeOptions

type RangeOptions = index.RangeOption

RangeOptions is alias of index.RangeOption

type Record added in v0.2.0

type Record struct {
	K   Key
	V   Value
	TTL time.Duration
}

Record is a user-oriented struct representing an entry data in db

type Stats added in v0.2.0

type Stats struct {
	// number of key in db
	KeyNums int64
	// number of values in db, due to bitcask model is append-only, it usually greater than KeyNums.
	// normally, result of RecordNums / KeyNums can be used to determine if is needed to merge the wal files
	RecordNums int64
	// real data size
	DataSize int64
	// hint file size
	HintSize int64
}

Stats represents a simple statistics information of db at a moment

type Txn

type Txn struct {
	// contains filtered or unexported fields
}

Txn represents a transaction

func (*Txn) Del

func (txn *Txn) Del(key Key) error

func (*Txn) Expire

func (txn *Txn) Expire(key Key, ttl time.Duration) error

func (*Txn) Get

func (txn *Txn) Get(key Key) (Value, error)

func (*Txn) Put

func (txn *Txn) Put(key Key, value Value, ttl time.Duration) error

func (*Txn) Range

func (txn *Txn) Range(opt RangeOptions, handler RangeHandler) error

func (*Txn) TTL

func (txn *Txn) TTL(key Key) (time.Duration, error)

type TxnLevel added in v0.3.0

type TxnLevel uint8
const (
	ReadCommitted TxnLevel = 1 + iota
	Serializable
)

func (TxnLevel) String added in v0.3.0

func (t TxnLevel) String() string

type Value

type Value = []byte

Value db value type

type Watcher added in v0.2.2

type Watcher struct {
	// contains filtered or unexported fields
}

Watcher a watcher should be closed after it used up all, or it will be closed when db closed at last

func (*Watcher) Close added in v0.2.2

func (w *Watcher) Close() error

Close closes the watcher.

func (*Watcher) Listen added in v0.2.2

func (w *Watcher) Listen() (<-chan *Event, error)

func (*Watcher) Name added in v0.2.2

func (w *Watcher) Name() string

Directories

Path Synopsis
cmd
pkg
crc
str

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL