frostdb

package module
v0.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 23, 2024 License: Apache-2.0 Imports: 54 Imported by: 0

README


Go Reference Go Report Card Build Discord

This project is still in its infancy, consider it not production-ready, probably has various consistency and correctness problems and all API will change!

FrostDB is an embeddable wide-column columnar database written in Go. It features semi-structured schemas, uses Apache Parquet for storage, and Apache Arrow at query time. Building on top of Apache Arrow, FrostDB provides a query builder and various optimizers (using DataFrame-like APIs).

FrostDB is optimized for use cases where the majority of interactions are writes, with occasional analytical queries over this data. FrostDB was built specifically for Parca for Observability use cases.

Read the announcement blog post to learn about what made us create it: https://www.polarsignals.com/blog/posts/2022/05/04/introducing-arcticdb/ (FrostDB was originally called ArcticDB)

Why you should use FrostDB

Columnar data stores have become incredibly popular for analytics. Structuring data in columns instead of rows leverages the architecture of modern hardware, allowing for efficient processing of data. A columnar data store might be right for you if you have workloads where you write a lot of data and need to perform analytics on that data.

FrostDB is similar to many other embeddable columnar databases such as DuckDB

FrostDB may be a better fit for you if:

  • Are developing a Go program
  • Want to embed a columnar database in your program instead of running a separate database server
  • Have immutable datasets that don't require updating or deleting
  • Your data contains dynamic columns, where the number of columns in the schema may increase at runtime

FrostDB is likely not suitable for your needs if:

  • You aren't developing in Go
  • You require a standalone database server
  • You need to modify or delete your data
  • You query by rows instead of columns

Getting Started

You can explore the examples directory for sample code using FrostDB. Below is a snippet from the simple database example. It creates a database with a dynamic column schema, inserts some data, and queries it back out.

https://github.com/dreamsxin/frostdb/blob/ee6970eff139c58a45998a87c02b661f32be5cbe/examples/simple/simple.go#L17-L69

Design choices

FrostDB was specifically built for Observability workloads. This resulted in several characteristics that make it unique.

Table Of Contents:

Columnar layout

Observability data is most useful when it is highly dimensional and those dimensions can be searched and aggregated by efficiently. Contrary to many relational databases (MySQL, PostgreSQL, CockroachDB, TiDB, etc.) that store data all data belonging to a single row together, a columnar layout stores all data of the same column in one contiguous chunk of data, making it very efficient to scan and aggregate data for any column. FrostDB uses Apache Parquet for storage, and Apache Arrow at query time. Apache Parquet is used for storage to make use of its efficient encodings to save on memory and disk space. Apache Arrow is used at query time as a foundation to vectorize the query execution.

Dynamic Columns

While columnar databases already exist, most require a static schema. However, Observability workloads differ in that data their schemas are not static, meaning not all columns are pre-defined. Wide column databases already exist, but typically are not strictly typed (e.g. document databases), and most wide-column databases are row-based databases, not columnar databases.

Take a Prometheus time-series for example. Prometheus time-series are uniquely identified by the combination of their label-sets:

http_requests_total{path="/api/v1/users", code="200"} 12

This model does not map well into a static schema, as label-names cannot be known upfront. The most suitable data-type some columnar databases have to offer is a map, however, maps have the same problems as row-based databases, where all values of a map in a row are stored together, resulting in an inability to exploit the advantages of a columnar layout. A FrostDB schema can define a column to be dynamic, causing a column to be created on the fly when a new label-name is seen.

A FrostDB schema for Prometheus could look like this:

type Prometheus struct {
	Labels    map[string]string `frostdb:",rle_dict,asc(1),null_first"`
	Timestamp int64             `frostdb:",asc(0)"`
	Value     float64
}

Note: We are aware that Prometheus uses double-delta encoding for timestamps and XOR encoding for values. This schema is purely an example to highlight the dynamic columns feature.

With this schema, all rows are expected to have a timestamp and a value but can vary in their columns prefixed with labels.. In this schema all dynamically created columns are still Dictionary and run-length encoded and must be of type string.

Immutable

There are only writes and reads. All data is immutable.

FrostDB maintains inserted data in an Log-structured merge-tree(LSM) like index. This index is implemented as lists of Parts. A Part containers either an Arrow record or a Parquet file. The first level (L0) contains a list of Arrrow records inserted as-is into the list. Upon reaching the maximum configured size of the level the level will be compacted into a single Parquet file and added to the next level of the index. This process continues for each configured level of the index until a file is written into the final level of the index.

LSM Index compacting into higher levels

Upon the size of the entire index reaching the configured max in-memory size the index is rotated out. It can be either configured to be dropped entirely or to be written out to your storage of choice.

At query time FrostDB will scan each part in the in the index. To maintain fast queries FrostDB leverages the sparse index features of Parquet files, such as bloom filters and min and max values of columns in each row group such that only the row groups that contain data that can satisfy the query are processed.

Snapshot isolation

FrostDB has snapshot isolation, however, it comes with a few caveats that should be well understood. It does not have read-after-write consistency as the intended use is for users reading data that are not the same as the entity writing data to it. To see new data the user re-runs a query. Choosing to trade-off read-after-write consistency allows for mechanisms to increase throughput significantly. FrostDB releases write transactions in batches. It essentially only ensures write atomicity and that writes are not torn when reading. Since data is immutable, those characteristics together result in snapshot isolation.

More concretely, FrostDB maintains a watermark indicating that all transactions equal and lower to the watermark are safe to be read. Only write transactions obtain a new transaction ID, while reads use the transaction ID of the watermark to identify data that is safe to be read. The watermark is only increased when strictly monotonic, consecutive transactions have finished. This means that a low write transaction can block higher write transactions to become available to be read. To ensure progress is made, write transactions have a timeout.

This mechanism is inspired by a mix of Google Spanner, Google Percolator and Highly Available Transactions.

Transactions are released in batches indicated by the watermark

Acknowledgments

FrostDB stands on the shoulders of giants. Shout out to Segment for creating the incredible parquet-go library as well as InfluxData for starting and various contributors after them working on Go support for Apache Arrow.

Documentation

Index

Constants

View Source
const (
	B   = 1
	KiB = 1024 * B
	MiB = 1024 * KiB
	GiB = 1024 * MiB
	TiB = 1024 * GiB
)
View Source
const DefaultBlockReaderLimit = 10

DefaultBlockReaderLimit is the concurrency limit for reading blocks.

Variables

View Source
var (
	ErrNoSchema     = fmt.Errorf("no schema")
	ErrTableClosing = fmt.Errorf("table closing")
)

Functions

func DefaultIndexConfig

func DefaultIndexConfig() []*index.LevelConfig

DefaultIndexConfig returns the default level configs used. This is a function So that any modifications to the result will not affect the default config.

func LoadSnapshot

func LoadSnapshot(ctx context.Context, db *DB, tx uint64, r io.ReaderAt, size int64, dir string, truncateWAL bool) (uint64, error)

func NewTableConfig

func NewTableConfig(
	schema proto.Message,
	options ...TableOption,
) *tablepb.TableConfig

func SnapshotDir

func SnapshotDir(db *DB, tx uint64) string

func StoreSnapshot

func StoreSnapshot(ctx context.Context, tx uint64, db *DB, snapshot io.Reader) error

func WriteSnapshot

func WriteSnapshot(ctx context.Context, tx uint64, db *DB, w io.Writer) error

Types

type CloseOption

type CloseOption func(*closeOptions)

func WithClearStorage

func WithClearStorage() CloseOption

type Closer

type Closer interface {
	Close(cleanup bool) error
}

type ColumnStore

type ColumnStore struct {
	// contains filtered or unexported fields
}

func New

func New(
	options ...Option,
) (*ColumnStore, error)

func (*ColumnStore) Close

func (s *ColumnStore) Close() error

Close persists all data from the columnstore to storage. It is no longer valid to use the coumnstore for reads or writes, and the object should not longer be reused.

func (*ColumnStore) DB

func (s *ColumnStore) DB(ctx context.Context, name string, opts ...DBOption) (*DB, error)

DB gets or creates a database on the given ColumnStore with the given options. Note that if the database already exists, the options will be applied cumulatively to the database.

func (*ColumnStore) DBs

func (s *ColumnStore) DBs() []string

DBs returns all the DB names of this column store.

func (*ColumnStore) DatabasesDir

func (s *ColumnStore) DatabasesDir() string

func (*ColumnStore) DropDB

func (s *ColumnStore) DropDB(name string) error

func (*ColumnStore) GetDB

func (s *ColumnStore) GetDB(name string) (*DB, error)

type DB

type DB struct {
	// contains filtered or unexported fields
}

func (*DB) Close

func (db *DB) Close(options ...CloseOption) error

func (*DB) GetTable

func (db *DB) GetTable(name string) (*Table, error)

func (*DB) HighWatermark

func (db *DB) HighWatermark() uint64

HighWatermark returns the current high watermark.

func (*DB) Table

func (db *DB) Table(name string, config *tablepb.TableConfig) (*Table, error)

Table will get or create a new table with the given name and config. If a table already exists with the given name, it will have it's configuration updated.

func (*DB) TableNames

func (db *DB) TableNames() []string

TableNames returns the names of all the db's tables.

func (*DB) TableProvider

func (db *DB) TableProvider() *DBTableProvider

func (*DB) Wait

func (db *DB) Wait(tx uint64)

Wait is a blocking function that returns once the high watermark has equaled or exceeded the transaction id. Wait makes no differentiation between completed and aborted transactions.

type DBOption

type DBOption func(*DB) error

func WithCompactionAfterOpen

func WithCompactionAfterOpen(compact bool, tableNames []string) DBOption

type DBTableProvider

type DBTableProvider struct {
	// contains filtered or unexported fields
}

func NewDBTableProvider

func NewDBTableProvider(db *DB) *DBTableProvider

func (*DBTableProvider) GetTable

func (p *DBTableProvider) GetTable(name string) (logicalplan.TableReader, error)

type DataSink

type DataSink interface {
	fmt.Stringer
	Upload(ctx context.Context, name string, r io.Reader) error
	Delete(ctx context.Context, name string) error
}

DataSink is a remote destination for data.

type DataSinkSource

type DataSinkSource interface {
	DataSink
	DataSource
}

DataSinkSource is a convenience interface for a data source and sink.

type DataSource

type DataSource interface {
	fmt.Stringer
	Scan(ctx context.Context, prefix string, schema *dynparquet.Schema, filter logicalplan.Expr, lastBlockTimestamp uint64, callback func(context.Context, any) error) error
	Prefixes(ctx context.Context, prefix string) ([]string, error)
}

DataSource is remote source of data that can be queried.

type DefaultObjstoreBucket

type DefaultObjstoreBucket struct {
	storage.Bucket
	// contains filtered or unexported fields
}

DefaultObjstoreBucket is the default implementation of the DataSource and DataSink interface.

func (*DefaultObjstoreBucket) Prefixes

func (b *DefaultObjstoreBucket) Prefixes(ctx context.Context, prefix string) ([]string, error)

func (*DefaultObjstoreBucket) ProcessFile

func (b *DefaultObjstoreBucket) ProcessFile(ctx context.Context, blockDir string, lastBlockTimestamp uint64, filter expr.TrueNegativeFilter, callback func(context.Context, any) error) error

ProcessFile will process a bucket block parquet file.

func (*DefaultObjstoreBucket) Scan

func (b *DefaultObjstoreBucket) Scan(ctx context.Context, prefix string, _ *dynparquet.Schema, filter logicalplan.Expr, lastBlockTimestamp uint64, callback func(context.Context, any) error) error

func (*DefaultObjstoreBucket) String

func (b *DefaultObjstoreBucket) String() string

type DefaultObjstoreBucketOption

type DefaultObjstoreBucketOption func(*DefaultObjstoreBucket)

func StorageWithBlockReaderLimit

func StorageWithBlockReaderLimit(limit int) DefaultObjstoreBucketOption

func StorageWithLogger

func StorageWithLogger(logger log.Logger) DefaultObjstoreBucketOption

func StorageWithTracer

func StorageWithTracer(tracer trace.Tracer) DefaultObjstoreBucketOption

type ErrCreateSchemaWriter

type ErrCreateSchemaWriter struct {
	// contains filtered or unexported fields
}

func (ErrCreateSchemaWriter) Error

func (e ErrCreateSchemaWriter) Error() string

type ErrReadRow

type ErrReadRow struct {
	// contains filtered or unexported fields
}

func (ErrReadRow) Error

func (e ErrReadRow) Error() string

type ErrTableNotFound

type ErrTableNotFound struct {
	TableName string
}

func (ErrTableNotFound) Error

func (e ErrTableNotFound) Error() string

type ErrWriteRow

type ErrWriteRow struct {
	// contains filtered or unexported fields
}

func (ErrWriteRow) Error

func (e ErrWriteRow) Error() string

type GenericTable

type GenericTable[T any] struct {
	*Table
	// contains filtered or unexported fields
}

GenericTable is a wrapper around *Table that writes structs of type T. It consist of a generic arrow.Record builder that ingests structs of type T. The generated record is then passed to (*Table).InsertRecord.

Struct tag `frostdb` is used to pass options for the schema for T.

This api is opinionated.

  • Nested Columns are not supported

Tags

Use `frostdb` to define tags that customizes field values. You can express everything needed to construct schema v1alpha1.

Tags are defined as a comma separated list. The first item is the column name. Column name is optional, when omitted it is derived from the field name (snake_cased)

Supported Tags

    delta_binary_packed | Delta binary packed encoding.
                 brotli | Brotli compression.
                    asc | Sorts in ascending order.Use asc(n) where n is an integer for sorting order.
                   gzip | GZIP compression.
                 snappy | Snappy compression.
delta_length_byte_array | Delta Length Byte Array encoding.
       delta_byte_array | Delta Byte Array encoding.
                   desc | Sorts in descending order.Use desc(n) where n is an integer for sorting order
                lz4_raw | LZ4_RAW compression.
               pre_hash | Prehash the column before storing it.
             null_first | When used wit asc nulls are smallest and with des nulls are largest.
                   zstd | ZSTD compression.
               rle_dict | Dictionary run-length encoding.
                  plain | Plain encoding.

Example tagged Sample struct

type Sample struct {
	ExampleType string      `frostdb:"example_type,rle_dict,asc(0)"`
	Labels      []Label     `frostdb:"labels,rle_dict,null,dyn,asc(1),null_first"`
	Stacktrace  []uuid.UUID `frostdb:"stacktrace,rle_dict,asc(3),null_first"`
	Timestamp   int64       `frostdb:"timestamp,asc(2)"`
	Value       int64       `frostdb:"value"`
}

Dynamic columns

Field of type map<string, T> is a dynamic column by default.

type Example struct {
	// Use supported tags to customize the column value
	Labels map[string]string `frostdb:"labels"`
}

Repeated columns

Fields of type []int64, []float64, []bool, and []string are supported. These are represented as arrow.LIST.

Generated schema for the repeated columns applies all supported tags. By default repeated fields are nullable. You can safely pass nil slices for repeated columns.

func NewGenericTable

func NewGenericTable[T any](db *DB, name string, mem memory.Allocator, options ...TableOption) (*GenericTable[T], error)

func (*GenericTable[T]) Release

func (t *GenericTable[T]) Release()

func (*GenericTable[T]) Write

func (t *GenericTable[T]) Write(ctx context.Context, values ...T) (uint64, error)

Write builds arrow.Record directly from values and calls (*Table).InsertRecord.

type Option

type Option func(*ColumnStore) error

func WithActiveMemorySize

func WithActiveMemorySize(size int64) Option

func WithCompactionAfterRecovery

func WithCompactionAfterRecovery(tableNames []string) Option

func WithIndexConfig

func WithIndexConfig(indexConfig []*index.LevelConfig) Option

func WithIndexDegree

func WithIndexDegree(indexDegree int) Option

func WithLogger

func WithLogger(logger log.Logger) Option

func WithManualBlockRotation

func WithManualBlockRotation() Option

func WithReadOnlyStorage

func WithReadOnlyStorage(ds DataSource) Option

func WithReadWriteStorage

func WithReadWriteStorage(ds DataSinkSource) Option

func WithRecoveryConcurrency

func WithRecoveryConcurrency(concurrency int) Option

WithRecoveryConcurrency limits the number of databases that are recovered simultaneously when calling frostdb.New. This helps limit memory usage on recovery.

func WithRegistry

func WithRegistry(reg prometheus.Registerer) Option

func WithSnapshotTriggerSize

func WithSnapshotTriggerSize(size int64) Option

WithSnapshotTriggerSize specifies a size in bytes of uncompressed inserts that will trigger a snapshot of the whole database. This can be larger than the active memory size given that the active memory size tracks the size of *compressed* data, while snapshots are triggered based on the *uncompressed* data inserted into the database. The reason this choice was made is that if a database instance crashes, it is forced to reread all uncompressed inserts since the last snapshot from the WAL, which could potentially lead to unrecoverable OOMs on startup. Defining the snapshot trigger in terms of uncompressed bytes limits the memory usage on recovery to at most the snapshot trigger size (as long as snapshots were successful). If 0, snapshots are disabled. Note that snapshots (if enabled) are also triggered on block rotation of any database table. Snapshots are complementary to the WAL and will also be disabled if the WAL is disabled.

func WithSplitSize

func WithSplitSize(size int) Option

func WithStoragePath

func WithStoragePath(path string) Option

func WithTracer

func WithTracer(tracer trace.Tracer) Option

func WithWAL

func WithWAL() Option

func WithWriteOnlyStorage

func WithWriteOnlyStorage(ds DataSink) Option

type ParquetWriter

type ParquetWriter interface {
	Flush() error
	WriteRows([]parquet.Row) (int, error)
	io.Closer
}

type Sync

type Sync interface {
	Sync() error
}

type Table

type Table struct {
	// contains filtered or unexported fields
}

func (*Table) ActiveBlock

func (t *Table) ActiveBlock() *TableBlock

func (*Table) ActiveWriteBlock

func (t *Table) ActiveWriteBlock() (*TableBlock, func(), error)

func (*Table) EnsureCompaction

func (t *Table) EnsureCompaction() error

func (*Table) IndexConfig

func (t *Table) IndexConfig() []*index.LevelConfig

IndexConfig returns the index configuration for the table. It makes a copy of the column store index config and injects it's compactParts method.

func (*Table) InsertRecord

func (t *Table) InsertRecord(ctx context.Context, record arrow.Record) (uint64, error)

func (*Table) Iterator

func (t *Table) Iterator(
	ctx context.Context,
	tx uint64,
	pool memory.Allocator,
	callbacks []logicalplan.Callback,
	options ...logicalplan.Option,
) error

Iterator iterates in order over all granules in the table. It stops iterating when the iterator function returns false.

func (*Table) RotateBlock

func (t *Table) RotateBlock(_ context.Context, block *TableBlock, skipPersist bool) error

func (*Table) Schema

func (t *Table) Schema() *dynparquet.Schema

func (*Table) SchemaIterator

func (t *Table) SchemaIterator(
	ctx context.Context,
	tx uint64,
	pool memory.Allocator,
	callbacks []logicalplan.Callback,
	options ...logicalplan.Option,
) error

SchemaIterator iterates in order over all granules in the table and returns all the schemas seen across the table.

func (*Table) View

func (t *Table) View(ctx context.Context, fn func(ctx context.Context, tx uint64) error) error

type TableBlock

type TableBlock struct {
	// contains filtered or unexported fields
}

func (*TableBlock) EnsureCompaction

func (t *TableBlock) EnsureCompaction() error

EnsureCompaction forces a TableBlock compaction.

func (*TableBlock) Index

func (t *TableBlock) Index() *index.LSM

Index provides atomic access to the table index.

func (*TableBlock) InsertRecord

func (t *TableBlock) InsertRecord(_ context.Context, tx uint64, record arrow.Record) error

func (*TableBlock) Persist

func (t *TableBlock) Persist() error

Persist uploads the block to the underlying bucket.

func (*TableBlock) Serialize

func (t *TableBlock) Serialize(writer io.Writer) error

Serialize the table block into a single Parquet file.

func (*TableBlock) Size

func (t *TableBlock) Size() int64

Size returns the cumulative size of all buffers in the table. This is roughly the size of the table in bytes.

type TableOption

type TableOption func(*tablepb.TableConfig) error

func FromConfig

func FromConfig(config *tablepb.TableConfig) TableOption

FromConfig sets the table configuration from the given config. NOTE: that this does not override the schema even though that is included in the passed in config.

func WithBlockReaderLimit

func WithBlockReaderLimit(n int) TableOption

WithBlockReaderLimit sets the limit of go routines that will be used to read persisted block files. A negative number indicates no limit.

func WithRowGroupSize

func WithRowGroupSize(numRows int) TableOption

WithRowGroupSize sets the size in number of rows for each row group for parquet files. A <= 0 value indicates no limit.

func WithUniquePrimaryIndex

func WithUniquePrimaryIndex(unique bool) TableOption

func WithoutWAL

func WithoutWAL() TableOption

WithoutWAL disables the WAL for this table.

type TxNode

type TxNode struct {
	// contains filtered or unexported fields
}

type TxPool

type TxPool struct {
	// contains filtered or unexported fields
}

func NewTxPool

func NewTxPool(watermark *atomic.Uint64) *TxPool

NewTxPool returns a new TxPool and starts the pool cleaner routine. The transaction pool is used to keep track of completed transactions. It does this by inserting completed transactions into an ordered linked list.

Ex: insert: 12 [9]->[10]->[13] => [9]->[10]->[12]->[13]

Inserting a new node triggers the pool cleaner routine to run. The pool cleaner's job is to increment a high-watermark counter when it encounters contiguous transactions in the list, and then remove those elements in the pool.

Ex: watermark: 7 insert: 8 [9]->[10]->[13] => [8]->[9]->[10]->[13] (cleaner notified)

[8]->[9]->[10]->[13]

^ watermark++; delete 8

[9]->[10]->[13]

^ watermark++; delete 9

[10]->[13]

^ watermark++; delete 9

[13] watermark: 10

TxPool is a sorted lockless linked-list described in https://timharris.uk/papers/2001-disc.pdf

func (*TxPool) Insert

func (l *TxPool) Insert(tx uint64)

Insert performs an insertion sort of the given tx.

func (*TxPool) Iterate

func (l *TxPool) Iterate(iterate func(txn uint64) bool)

func (*TxPool) Stop

func (l *TxPool) Stop()

Stop stops the TxPool's cleaner goroutine.

type WAL

type WAL interface {
	Close() error
	Log(tx uint64, record *walpb.Record) error
	LogRecord(tx uint64, table string, record arrow.Record) error
	// Replay replays WAL records from the given first index. If firstIndex is
	// 0, the first index read from the WAL is used (i.e. given a truncation,
	// using 0 is still valid). If the given firstIndex is less than the WAL's
	// first index on disk, the replay happens from the first index on disk.
	// If the handler panics, the WAL implementation will truncate the WAL up to
	// the last valid index.
	Replay(tx uint64, handler wal.ReplayHandlerFunc) error
	Truncate(tx uint64) error
	Reset(nextTx uint64) error
	FirstIndex() (uint64, error)
	LastIndex() (uint64, error)
}

Directories

Path Synopsis
cmd
parquet-tool Module
examples
gen
internal
builder
Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.
Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL