cellar

package module
v0.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 25, 2018 License: BSD-3-Clause Imports: 21 Imported by: 0

README

Cellar

Build Status CircleCI Coverage Status

License Go Report Card

Cellar is the append-only storage backend in Go designed based on Abdullin Cellar. This fork is currently being redesigned, so the API should be considered unstable.

Core features:

  • events are automatically split into the chunks;
  • chunks may be encrypted using the Cipher interface;
  • designed for batching operations (high throughput);
  • supports single writer and multiple concurrent readers;
  • store secondary indexes, lookups in the metadata DB.

Contributors

In the alphabetical order:

Don't hesitate to send a PR to include your profile.

Design

Cellar stores data in a very simple manner:

  • MetaDB database is used for keeping metadata (including user-defined), see metadb.go;
  • a single pre-allocated file is used to buffer all writes;
  • when buffer fills, it is compressed, encrypted and added to the chunk list.

License

3-clause BSD license.

Documentation

Overview

package cellar implements a low level, append only data storage engine.

cellar was originally forked from abdullin/cellar. This version focusses on a more standard API, less built in features and more configurability. It is used in carapace/core/pkg/append-db (versioned object DB) as the storage engine.

It is completely embedded, internally buffering writes until chunks are filled up, then automatically flushing them to file. See the example for a full usage of cellar.

Index

Constants

This section is empty.

Variables

View Source
var (
	CheckPointBucketKey = []byte("a")
	ChunkTableKey       = []byte("b")
	BufferBucketKey     = []byte("c")
	BufferKey           = []byte("d")
	CellarBucketKey     = []byte("e")
	CellarKey           = []byte("f")
)
View Source
var (
	ErrBucketNotExists = errors.New("boltdb: bucket not present")
)
View Source
var (
	ErrIsFile = errors.New("provided folder is actually a path")
)

Functions

func WithNoFileLock

func WithNoFileLock(db *DB) error

WithNoFileLock is only recommending in unit tests, as it allows for concurrent writers (which is a big nono if you want data integrity)

func WithReadOnly

func WithReadOnly(db *DB) error

Types

type AES

type AES struct {
	// contains filtered or unexported fields
}

func NewAES

func NewAES(key []byte) AES

WithAES returns the Cipher implementation based on AES

NOTE: the AES implementation was authored by Abdullin, this code has been minimally changed.

func (AES) Decrypt

func (a AES) Decrypt(src io.Reader) (io.Reader, error)

func (AES) Encrypt

func (a AES) Encrypt(w io.Writer) (*cipher.StreamWriter, error)

type BoltMetaDB

type BoltMetaDB struct {
	*bolt.DB
}

func (*BoltMetaDB) AddChunk

func (b *BoltMetaDB) AddChunk(pos int64, dto *pb.ChunkDto) error

func (*BoltMetaDB) CellarMeta

func (b *BoltMetaDB) CellarMeta() (dto *pb.MetaDto, err error)

func (*BoltMetaDB) GetBuffer

func (b *BoltMetaDB) GetBuffer() (buf *pb.BufferDto, err error)

func (*BoltMetaDB) GetCheckpoint

func (b *BoltMetaDB) GetCheckpoint(name string) (pos int64, err error)

func (*BoltMetaDB) Init

func (b *BoltMetaDB) Init() error

Init creates all needed buckets

func (*BoltMetaDB) ListChunks

func (b *BoltMetaDB) ListChunks() (dto []*pb.ChunkDto, err error)

func (*BoltMetaDB) PutBuffer

func (b *BoltMetaDB) PutBuffer(dto *pb.BufferDto) (err error)

func (*BoltMetaDB) PutCheckpoint

func (b *BoltMetaDB) PutCheckpoint(name string, pos int64) error

func (*BoltMetaDB) SetCellarMeta

func (b *BoltMetaDB) SetCellarMeta(dto *pb.MetaDto) (err error)

type Buffer

type Buffer struct {
	// contains filtered or unexported fields
}

type ChainCompressor

type ChainCompressor struct {
	CompressionLevel int
}

func (ChainCompressor) Compress

func (c ChainCompressor) Compress(w io.Writer) (CompressionWriter, error)

type ChainDecompressor

type ChainDecompressor struct{}

func (ChainDecompressor) Decompress

func (c ChainDecompressor) Decompress(r io.Reader) (io.Reader, error)

type Cipher

type Cipher interface {
	Decrypt(src io.Reader) (io.Reader, error)
	Encrypt(w io.Writer) (*cipher.StreamWriter, error)
}

Cipher defines the interface needed to support encryption of the DB

type CompressionWriter

type CompressionWriter interface {
	Close() error
	io.Writer
}

CompressionWriter is based on the functions needed from the lz4 compressor

type Compressor

type Compressor interface {
	Compress(io.Writer) (CompressionWriter, error)
}

type DB

type DB struct {
	// contains filtered or unexported fields
}

DB is a godlevel/convenience wrapper around Writer and Reader, ensuring only one writer exists per folder, and storing the cipher for faster performance.

func New

func New(folder string, options ...Option) (*DB, error)

New is the constructor for DB

func (*DB) Append

func (db *DB) Append(data []byte) (pos int64, err error)

Write creates a writer using sync.Once, and then reuses the writer over procedures

func (*DB) Buffer

func (db *DB) Buffer() int64

Buffer returs the max buffer size of the DB

func (*DB) Checkpoint

func (db *DB) Checkpoint() (pos int64, err error)

Checkpoint creates an anonymous checkpoint at the current cursor's location.

func (*DB) Close

func (db *DB) Close() (err error)

Close ensures filelocks are cleared and resources closed. Readers derived from this DB instance will remain functional.

func (*DB) Flush

func (db *DB) Flush() (err error)

SealTheBuffer explicitly flushes the old buffer and creates a new buffer

func (*DB) Folder

func (db *DB) Folder() string

Folder returns the DB folder

func (*DB) GetUserCheckpoint

func (db *DB) GetUserCheckpoint(name string) (pos int64, err error)

GetUserCheckpoint returns the position of a named checkpoint

func (*DB) PutUserCheckpoint

func (db *DB) PutUserCheckpoint(name string, pos int64) (err error)

PutUserCheckpoint creates a named checkpoint at a given position.

func (*DB) Reader

func (db *DB) Reader() *Reader

Reader returns a new db reader. The reader remains active even if the DB is closed

func (*DB) VolatilePos

func (db *DB) VolatilePos() int64

VolatilePos returns the current cursors location

type Decompressor

type Decompressor interface {
	Decompress(io.Reader) (io.Reader, error)
}

type FileLock

type FileLock interface {
	TryLock() (bool, error)
	Lock() error
	Unlock() error
}

type MetaDB

type MetaDB interface {
	GetBuffer() (*pb.BufferDto, error)
	PutBuffer(*pb.BufferDto) error
	ListChunks() ([]*pb.ChunkDto, error)
	AddChunk(int64, *pb.ChunkDto) error
	CellarMeta() (*pb.MetaDto, error)
	SetCellarMeta(*pb.MetaDto) error
	PutCheckpoint(name string, pos int64) error
	GetCheckpoint(name string) (int64, error)
	Close() error
	Init() error
}

MetaDB defines an interface for databases storing metadata on the cellar DB. the default implementation is based on either LMDB or Boltdb (K/V stores work best for this purpose)

type MockLock

type MockLock struct{}

MockLock mocks a flock (filelock)

func (MockLock) Lock

func (m MockLock) Lock() error

func (MockLock) TryLock

func (m MockLock) TryLock() (bool, error)

func (MockLock) Unlock

func (m MockLock) Unlock() error

type Option

type Option func(db *DB) error

func WithCipher

func WithCipher(cipher Cipher) Option

WithCipher allows for customizing the read/write encryption.

func WithLogger

func WithLogger(logger *zap.Logger) Option

func WithMetaDB

func WithMetaDB(mdb MetaDB) Option

type ReadFlag

type ReadFlag int
const (
	RF_None        ReadFlag = 0
	RF_LoadBuffer  ReadFlag = 1 << 1
	RF_PrintChunks ReadFlag = 1 << 2
)

type ReadOp

type ReadOp func(pos *ReaderInfo, data []byte) error

type Reader

type Reader struct {
	Folder      string
	Flags       ReadFlag
	StartPos    int64
	EndPos      int64
	LimitChunks int
	// contains filtered or unexported fields
}

func NewReader

func NewReader(folder string, cipher Cipher, decompressor Decompressor, meta MetaDB, logger *zap.Logger) *Reader

func (*Reader) Scan

func (r *Reader) Scan(op ReadOp) error

func (*Reader) ScanAsync

func (reader *Reader) ScanAsync(ctx context.Context, buffer int) (chan Rec, chan error)

ScanAsync runs Reader.Scan in a goroutine, returning the values obtained.

ScanAsync honors context cancellations. If an error is received in the error channel, no more values will be scanned and the routine exits.

type ReaderInfo

type ReaderInfo struct {
	// can be used to convert to file name
	ChunkPos int64
	// global start pos
	StartPos int64
	// global read pos
	NextPos int64
}

type Rec

type Rec struct {
	Data     []byte
	ChunkPos int64
	StartPos int64
	NextPos  int64
}

type Writer

type Writer struct {
	// contains filtered or unexported fields
}

func NewWriter

func NewWriter(folder string, maxBufferSize int64, cipher Cipher, compressor Compressor, db MetaDB, logger *zap.Logger) (*Writer, error)

func (*Writer) Append

func (w *Writer) Append(data []byte) (pos int64, err error)

func (*Writer) Checkpoint

func (w *Writer) Checkpoint() (int64, error)

func (*Writer) Close

func (w *Writer) Close() error

Close disposes all resources

func (*Writer) Flush

func (w *Writer) Flush() error

func (*Writer) GetUserCheckpoint

func (w *Writer) GetUserCheckpoint(name string) (int64, error)

func (*Writer) PutUserCheckpoint

func (w *Writer) PutUserCheckpoint(name string, pos int64) error

func (*Writer) VolatilePos

func (w *Writer) VolatilePos() int64

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL