klevdb

package module
v0.6.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 22, 2024 License: MIT Imports: 18 Imported by: 0

README

klevdb

CI Go Reference

klevdb is a fast message store, written in Go. Think single partition on kafka, but stored locally.

In addition to basic consuming by offset, you can also configure klevdb to index times and keys. Times index allow you to quickly find a message by its time (or the first message after a certain time). Keys index allow you to quickly find the last message with a given key.

Usage

To add klevdb to your package use:

go get github.com/klev-dev/klevdb

To use klevdb:

package main

import (
    "github.com/klev-dev/klevdb"
)

func main() {
	db, _ := klevdb.Open("/tmp/kdb", klevdb.Options{
		CreateDirs: true,
		KeyIndex:   true,
	})
	defer db.Close()

	publishNext, _ := db.Publish([]klevdb.Message{
		{
			Key:   []byte("key1"),
			Value: []byte("val1"),
		},
		{
			Key:   []byte("key1"),
			Value: []byte("val2"),
		},
	})
	fmt.Println("published, next offset:", publishNext)

	consumeNext, msgs, _ := db.Consume(klevdb.OffsetOldest, 1)
	fmt.Println("consumed:", msgs, "value:", string(msgs[0].Value))
	fmt.Println("next consume offset:", consumeNext)

	msg, _ := db.GetByKey([]byte("key1"))
	fmt.Println("got:", msg, "value:", string(msg.Value))
}

Running the above program, outputs the following:

published, next offset: 2
consumed: [{0 2009-11-10 23:00:00 +0000 UTC [107 101 121 49] [118 97 108 49]}] value: val1
next consume offset: 1
got: {1 2009-11-10 23:00:00 +0000 UTC [107 101 121 49] [118 97 108 50]} value: val2

Further documentation is available at GoDoc

Performance

Benchmarks on framework gen1 i5:

goos: linux
goarch: amd64
pkg: github.com/klev-dev/klevdb
cpu: 11th Gen Intel(R) Core(TM) i5-1135G7 @ 2.40GHz
Publish
≻ make bench-publish 
go test -bench=BenchmarkSingle/Publish -benchmem

BenchmarkSingle/Publish/1/No-8    	  347466	      3337 ns/op	  54.54 MB/s	     156 B/op	       1 allocs/op
BenchmarkSingle/Publish/1/Times-8 	  391308	      3585 ns/op	  53.00 MB/s	     162 B/op	       1 allocs/op
BenchmarkSingle/Publish/1/Keys-8  	  314779	      3960 ns/op	  47.98 MB/s	     305 B/op	       7 allocs/op
BenchmarkSingle/Publish/1/All-8   	  319302	      3907 ns/op	  50.68 MB/s	     310 B/op	       7 allocs/op
BenchmarkSingle/Publish/8/No-8    	  397518	      3266 ns/op	 445.81 MB/s	     156 B/op	       0 allocs/op
BenchmarkSingle/Publish/8/Times-8 	  451623	      3402 ns/op	 446.73 MB/s	     161 B/op	       0 allocs/op
BenchmarkSingle/Publish/8/Keys-8  	  309150	      3821 ns/op	 397.78 MB/s	     298 B/op	       5 allocs/op
BenchmarkSingle/Publish/8/All-8   	  382129	      3797 ns/op	 417.17 MB/s	     303 B/op	       5 allocs/op

PASS
ok  	github.com/klev-dev/klevdb	12.433s

With default rollover of 1MB, for messages with keys 10B and values 128B:

  • ~300,000 writes/sec, no indexes
  • ~250,000 writes/sec, with all indexes enabled
  • scales lineary with the batch size
Consume
≻ make bench-consume 

BenchmarkSingle/Consume/W/1-8     	 4372142	       279.5 ns/op	 651.05 MB/s	     224 B/op	       2 allocs/op
BenchmarkSingle/Consume/RW/1-8    	 4377028	       287.1 ns/op	 633.94 MB/s	     274 B/op	       2 allocs/op
BenchmarkSingle/Consume/R/1-8     	 4356441	       299.0 ns/op	 608.71 MB/s	     274 B/op	       2 allocs/op
BenchmarkSingle/Consume/W/8-8     	 6508213	       178.4 ns/op	8163.31 MB/s	     294 B/op	       1 allocs/op
BenchmarkSingle/Consume/RW/8-8    	 6069168	       194.8 ns/op	7475.85 MB/s	     344 B/op	       1 allocs/op
BenchmarkSingle/Consume/R/8-8     	 6271984	       196.4 ns/op	7413.22 MB/s	     344 B/op	       1 allocs/op

PASS
ok  	github.com/klev-dev/klevdb	147.152s

With default rollover of 1MB, for messages with keys 10B and values 128B:

  • ~3,500,000 reads/sec, single message consume
  • ~5,500,000 reads/sec, 8 message batches
Get
≻ make bench-get
go test -bench=BenchmarkSingle/Get -benchmem

BenchmarkSingle/Get/ByOffset-8         	 5355378	       225.2 ns/op	 808.24 MB/s	     144 B/op	       1 allocs/op
BenchmarkSingle/Get/ByKey-8            	 1000000	      3583 ns/op	  53.04 MB/s	     152 B/op	       2 allocs/op
BenchmarkSingle/Get/ByKey/R-8          	 1000000	      3794 ns/op	  50.08 MB/s	     345 B/op	       7 allocs/op
BenchmarkSingle/Get/ByTime-8           	 1000000	      2197 ns/op	  86.48 MB/s	     144 B/op	       1 allocs/op
BenchmarkSingle/Get/ByTime/R-8         	 1000000	      2178 ns/op	  87.25 MB/s	     202 B/op	       1 allocs/op

PASS
ok  	github.com/klev-dev/klevdb	52.528s

With default rollover of 1MB, for messages with keys 10B and values 128B:

  • ~4,400,000 gets/sec, across all offsets
  • ~270,000 key reads/sec, across all keys
  • ~450,000 time reads/sec, across all times
Multi
≻ make bench-multi
go test -bench=BenchmarkMulti -benchmem

BenchmarkMulti/Base-8         	  282462	      4433 ns/op	     673 B/op	       7 allocs/op
BenchmarkMulti/Publish-8      	   30628	     40717 ns/op	  19.45 MB/s	    2974 B/op	      56 allocs/op
BenchmarkMulti/Consume-8      	 1289114	       909.9 ns/op	 835.24 MB/s	    2842 B/op	      17 allocs/op
BenchmarkMulti/GetKey-8       	  459753	      5729 ns/op	  34.56 MB/s	    1520 B/op	      20 allocs/op

PASS
ok  	github.com/klev-dev/klevdb	22.973s

Documentation

Index

Constants

View Source
const (
	// OffsetOldest represents the smallest offset still available
	// Use it to consume all messages, starting at the first available
	OffsetOldest = message.OffsetOldest
	// OffsetNewest represents the offset that will be used for the next produce
	// Use it to consume only new messages
	OffsetNewest = message.OffsetNewest
	// OffsetInvalid is the offset returned when error is detected
	OffsetInvalid = message.OffsetInvalid
)

Variables

View Source
var ErrInvalidOffset = message.ErrInvalidOffset

ErrInvalidOffset error is returned when the offset attribute is invalid or out of bounds

View Source
var ErrNoIndex = errors.New("no index")

ErrNoIndex error is returned when we try to use key or timestamp, but the log doesn't include index on them

View Source
var ErrNotFound = message.ErrNotFound

ErrNotFound error is returned when the offset, key or timestamp is not found

View Source
var ErrReadonly = errors.New("log opened in readonly mode")

ErrReadonly error is returned when attempting to modify (e.g. publish or delete) from a log that is open as a readonly

View Source
var InvalidMessage = message.Invalid

InvalidMessage returned when an error have occurred

View Source
var StringCodec = stringCodec{}
View Source
var StringOptCodec = stringOptCodec{}
View Source
var VarintCodec = varintCodec{}

Functions

func Backup

func Backup(src, dst string) error

Backup backups a store directory to another location, without opening the store

func Check

func Check(dir string, opts Options) error

Check runs an integrity check, without opening the store

func DeleteMulti added in v0.3.0

func DeleteMulti(ctx context.Context, l Log, offsets map[int64]struct{}, backoff DeleteMultiBackoff) (map[int64]struct{}, int64, error)

DeleteMulti tries to delete all messages with offsets

from the log and returns the amount of storage deleted

If error is encountered, it will return the deleted offsets

and size, together with the error

DeleteMultiBackoff is called on each iteration to give

others a chanse to work with the log, while being deleted

func Recover

func Recover(dir string, opts Options) error

Recover rewrites the storage to include all messages prior the first that fails an integrity check

Types

type BlockingLog added in v0.6.0

type BlockingLog interface {
	Log

	// ConsumeBlocking is similar to Consume, but if offset is equal to the next offsetit will block until next event is produced
	ConsumeBlocking(ctx context.Context, offset int64, maxCount int64) (nextOffset int64, messages []Message, err error)

	// ConsumeByKeyBlocking is similar to ConsumeBlocking, but only returns messages matching the key
	ConsumeByKeyBlocking(ctx context.Context, key []byte, offset int64, maxCount int64) (nextOffset int64, messages []Message, err error)
}

func OpenBlocking added in v0.6.0

func OpenBlocking(dir string, opts Options) (BlockingLog, error)

func WrapBlocking added in v0.6.0

func WrapBlocking(l Log) (BlockingLog, error)

type Codec added in v0.5.0

type Codec[T any] interface {
	Encode(t T, empty bool) (b []byte, err error)
	Decode(b []byte) (t T, empty bool, err error)
}

type DeleteMultiBackoff added in v0.3.0

type DeleteMultiBackoff func(context.Context) error

DeleteMultiBackoff is call on each iteration of DeleteMulti to give applications opportunity to not overload the target log with deletes

func DeleteMultiWithWait added in v0.3.0

func DeleteMultiWithWait(d time.Duration) DeleteMultiBackoff

DeleteMultiWithWait returns a backoff func that sleeps/waits for a certain duration. If context is canceled while executing it returns the associated error

type JsonCodec added in v0.5.0

type JsonCodec[T any] struct{}

func (JsonCodec[T]) Decode added in v0.5.0

func (c JsonCodec[T]) Decode(b []byte) (T, bool, error)

func (JsonCodec[T]) Encode added in v0.5.0

func (c JsonCodec[T]) Encode(t T, empty bool) ([]byte, error)

type Log

type Log interface {
	// Publish appends messages to the log.
	// It returns the offset of the next message to be appended.
	// The offset of the message is ignored, set to the actual offset.
	// If the time of the message is 0, it set to the current UTC time.
	Publish(messages []Message) (nextOffset int64, err error)

	// NextOffset returns the offset of the next message to be published.
	NextOffset() (nextOffset int64, err error)

	// Consume retrieves messages from the log, starting at the offset.
	// It returns offset, which can be used to retrieve for the next consume.
	// If offset == OffsetOldest, the first message will be the oldest
	//   message still available on the log. If the log is empty,
	//   it will return no error, nextOffset will be 0
	// If offset == OffsetNewest, no actual messages will be returned,
	//   but nextOffset will be set to the offset that will be used
	//   for the next Publish call
	// If offset is before the first available on the log, or is after
	//   NextOffset, it returns ErrInvalidOffset
	// If the exact offset is already deleted, it will start consuming
	//   from the next available offset.
	// Consume is allowed to return no messages, but with increasing nextOffset
	//   in case messages between offset and nextOffset have been deleted.
	// NextOffset is always bigger then offset, unless we are caught up
	//   to the head of the log in which case they are equal.
	Consume(offset int64, maxCount int64) (nextOffset int64, messages []Message, err error)

	// ConsumeByKey is similar to Consume, but only returns messages matching the key
	ConsumeByKey(key []byte, offset int64, maxCount int64) (nextOffset int64, messages []Message, err error)

	// Get retrieves a single message, by its offset
	// If offset == OffsetOldest, it returns the first message on the log
	// If offset == OffsetNewest, it returns the last message on the log
	// If offset is before the first available on the log, or is after
	//   NextOffset, it returns ErrInvalidOffset
	// If log is empty, it returns ErrInvalidOffset
	// If the exact offset have been deleted, it returns ErrNotFound
	Get(offset int64) (message Message, err error)

	// GetByKey retrieves the last message in the log for this key
	// If no such message is found, it returns ErrNotFound
	GetByKey(key []byte) (message Message, err error)
	// OffsetByKey retrieves the last message offset in the log for this key
	// If no such message is found, it returns ErrNotFound
	OffsetByKey(key []byte) (offset int64, err error)

	// GetByTime retrieves the first message after start time
	// If start time is after all messages in the log, it returns ErrNotFound
	GetByTime(start time.Time) (message Message, err error)
	// OffsetByTime retrieves the first message offset and its time after start time
	// If start time is after all messages in the log, it returns ErrNotFound
	OffsetByTime(start time.Time) (offset int64, messageTime time.Time, err error)

	// Delete tries to delete a set of messages by their offset
	//   from the log and returns the amount of storage deleted
	// It does not guarantee that it will delete all messages,
	//   it returns the set of actually deleted offsets.
	Delete(offsets map[int64]struct{}) (deletedOffsets map[int64]struct{}, deletedSize int64, err error)

	// Size returns the amount of storage associated with a message
	Size(m Message) int64

	// Stat returns log stats like disk space, number of messages
	Stat() (Stats, error)

	// Backup takes a backup snapshot of this log to another location
	Backup(dir string) error

	// Sync forces persisting data to the disk. It returns the nextOffset
	// at the time of the Sync, so clients can determine what portion
	// of the log is now durable.
	Sync() (nextOffset int64, err error)

	// GC releases any unused resources associated with this log
	GC(unusedFor time.Duration) error

	// Close closes the log
	Close() error
}

func Open

func Open(dir string, opts Options) (Log, error)

Open create a log based on a dir and set of options

type Message

type Message = message.Message

type OffsetNotify added in v0.6.0

type OffsetNotify struct {
	// contains filtered or unexported fields
}

func NewOffsetNotify added in v0.6.0

func NewOffsetNotify(nextOffset int64) *OffsetNotify

func (*OffsetNotify) Set added in v0.6.0

func (w *OffsetNotify) Set(nextOffset int64)

func (*OffsetNotify) Wait added in v0.6.0

func (w *OffsetNotify) Wait(ctx context.Context, offset int64) error

type Options

type Options struct {
	// When set will try to create all directories
	CreateDirs bool
	// Open the store in readonly mode
	Readonly bool
	// Index message keys, enabling GetByKey and OffsetByKey
	KeyIndex bool
	// Index message times, enabling GetByTime and OffsetByTime
	TimeIndex bool
	// Force filesystem sync after each Publish
	AutoSync bool
	// At what segment size it will rollover to a new segment. Defaults to 1mb.
	Rollover int64
	// Check the head segment for integrity, before opening it for reading/writing.
	Check bool
}

type Stats

type Stats = segment.Stats

func Stat

func Stat(dir string, opts Options) (Stats, error)

Stat stats a store directory, without opening the store

type TBlockingLog added in v0.6.0

type TBlockingLog[K any, V any] interface {
	TLog[K, V]

	ConsumeBlocking(ctx context.Context, offset int64, maxCount int64) (nextOffset int64, messages []TMessage[K, V], err error)

	ConsumeByKeyBlocking(ctx context.Context, key K, empty bool, offset int64, maxCount int64) (nextOffset int64, messages []TMessage[K, V], err error)
}

func OpenTBlocking added in v0.6.0

func OpenTBlocking[K any, V any](dir string, opts Options, keyCodec Codec[K], valueCodec Codec[V]) (TBlockingLog[K, V], error)

func WrapTBlocking added in v0.6.0

func WrapTBlocking[K any, V any](l TLog[K, V]) (TBlockingLog[K, V], error)

type TLog added in v0.6.0

type TLog[K any, V any] interface {
	Publish(messages []TMessage[K, V]) (nextOffset int64, err error)

	NextOffset() (nextOffset int64, err error)

	Consume(offset int64, maxCount int64) (nextOffset int64, messages []TMessage[K, V], err error)

	ConsumeByKey(key K, empty bool, offset int64, maxCount int64) (nextOffset int64, messages []TMessage[K, V], err error)

	Get(offset int64) (message TMessage[K, V], err error)

	GetByKey(key K, empty bool) (message TMessage[K, V], err error)

	GetByTime(start time.Time) (message TMessage[K, V], err error)

	Delete(offsets map[int64]struct{}) (deletedOffsets map[int64]struct{}, deletedSize int64, err error)

	Size(m Message) int64

	Stat() (Stats, error)

	Backup(dir string) error

	Sync() (nextOffset int64, err error)

	GC(unusedFor time.Duration) error

	Close() error
}

func OpenT added in v0.6.0

func OpenT[K any, V any](dir string, opts Options, keyCodec Codec[K], valueCodec Codec[V]) (TLog[K, V], error)

func WrapT added in v0.6.0

func WrapT[K any, V any](l Log, keyCodec Codec[K], valueCodec Codec[V]) (TLog[K, V], error)

type TMessage added in v0.6.0

type TMessage[K any, V any] struct {
	Offset     int64
	Time       time.Time
	Key        K
	KeyEmpty   bool
	Value      V
	ValueEmpty bool
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL