core: go.gazette.dev/core/consumer/store-rocksdb Index | Files

package store_rocksdb

import "go.gazette.dev/core/consumer/store-rocksdb"

Package store_rocksdb implements the consumer.Store interface via an embedded RocksDB instance. To enable recording of RocksDB file operations it defines EnvObserver and WritableFileObserver, which roughly match the C++ interfaces of rocksdb::Env and rocksdb::WritableFile. A "hooked" environment implementation wraps the default rocksdb::Env to dispatch EnvObserver calls upon each matched method call of the delegate rocksdb::Env. This allows observers to inspect file operations initiated by the database as they're happening.

NewRecorder() then adapts a *recoverylog.Recorder to be an EnvObserver, and Store provides the top-level wiring for building a recorded RocksDB instance which satisfies the consumer.Store interface.

This package also offers ArenaIterator, which wraps a gorocksdb.Iterator in order to amortize the number of CGO calls required when iterating through a database, potentially providing a substantial speedup:

BenchmarkIterator/direct-iterator-8                 3000            428699 ns/op
BenchmarkIterator/arena-iterator-8                 20000             73638 ns/op

Index

Package Files

arena_iterator.go hooked_env.go recorder_rocksdb.go store_rocksdb.go

func NewHookedEnv Uses

func NewHookedEnv(obv EnvObserver) *gorocksdb.Env

NewHookedEnv returns a "hooked" RocksDB Environment which delegates to a default RocksDB Environment and then informs the provided EnvObserver of method calls on that Environment.

type ArenaIterator Uses

type ArenaIterator struct {
    // contains filtered or unexported fields
}

ArenaIterator adapts a gorocksdb.Iterator to amortize CGO calls by pre-fetching multiple keys and values into a memory arena. This is generally a performance win in the common case of stepping through via Next() with minimal seeking.

It's interface and semantics matches that of gorocksdb.Iterator, with the exception that Key() and Value() return []byte rather than a gorocksdb.Slice.

func AsArenaIterator Uses

func AsArenaIterator(it *gorocksdb.Iterator, arena []byte) *ArenaIterator

AsArenaIterator adapts a gorocksdb.Iterator to an ArenaIterator that pre-fetches multiple keys & values into |arena| via a single CGO call.

func (*ArenaIterator) Close Uses

func (it *ArenaIterator) Close()

Close closes the iterator.

func (*ArenaIterator) Err Uses

func (it *ArenaIterator) Err() error

Err returns nil if no errors happened during iteration, or the actual error otherwise.

func (*ArenaIterator) Key Uses

func (it *ArenaIterator) Key() []byte

Key returns the key the iterator currently holds.

func (*ArenaIterator) Next Uses

func (it *ArenaIterator) Next()

Next moves the iterator to the next sequential key in the database.

func (*ArenaIterator) Prev Uses

func (it *ArenaIterator) Prev()

Prev moves the iterator to the previous sequential key in the database.

func (*ArenaIterator) Seek Uses

func (it *ArenaIterator) Seek(key []byte)

Seek moves the iterator to the position greater than or equal to the key.

func (*ArenaIterator) SeekForPrev Uses

func (it *ArenaIterator) SeekForPrev(key []byte)

SeekForPrev moves the iterator to the last key that less than or equal to the target key, in contrast with Seek.

func (*ArenaIterator) SeekToFirst Uses

func (it *ArenaIterator) SeekToFirst()

SeekToFirst moves the iterator to the first key in the database.

func (*ArenaIterator) SeekToLast Uses

func (it *ArenaIterator) SeekToLast()

SeekToLast moves the iterator to the last key in the database.

func (*ArenaIterator) Valid Uses

func (it *ArenaIterator) Valid() bool

Valid returns false only when an Iterator has iterated past either the first or the last key in the database.

func (*ArenaIterator) ValidForPrefix Uses

func (it *ArenaIterator) ValidForPrefix(prefix []byte) bool

ValidForPrefix returns false only when an Iterator has iterated past the first or the last key in the database or the specified prefix.

func (*ArenaIterator) Value Uses

func (it *ArenaIterator) Value() []byte

Value returns the value in the database the iterator currently holds.

type EnvObserver Uses

type EnvObserver interface {
    // Invoked just before a new WritableFile is created. Returns a
    // WritableFileObserver which is associated with the result file.
    NewWritableFile(fname string) WritableFileObserver
    // Invoked just before |fname| is deleted.
    DeleteFile(fname string)
    // Invoked just before |dirname| is deleted.
    DeleteDir(dirname string)
    // Invoked just before |src| is renamed to |target|.
    RenameFile(src, target string)
    // Invoked just before |src| is linked to |target|.
    LinkFile(src, target string)
}

EnvObserver allows for observation of mutating Env operations. Consult |Env| in rocksdb/env.h for further details.

func NewRecorder Uses

func NewRecorder(recorder *recoverylog.Recorder) EnvObserver

NewRecorder adapts a recoverylog.Recorder to an EnvObserver.

type Store Uses

type Store struct {
    DB           *rocks.DB
    Env          *rocks.Env
    Options      *rocks.Options
    ReadOptions  *rocks.ReadOptions
    WriteBatch   *rocks.WriteBatch
    WriteOptions *rocks.WriteOptions

    // Cache is a convenient mechanism for consumers to associate shard-specific,
    // in-memory state with a Store, typically for performance reasons.
    // Examples might include:
    //
    // - Records we expect to reduce / aggregate over multiple times in a consumer
    //   transaction, and want to write to the DB only once per transaction (ie,
    //   as part of a consumer Flush).
    // - An LRU of "hot" records we expect to reference again soon.
    //
    // The representation of Cache is up to the consumer; it is not directly used
    // by Store.
    Cache interface{}
    // contains filtered or unexported fields
}

Store implements the consumer.Store interface.

func NewStore Uses

func NewStore(recorder *recoverylog.Recorder) *Store

NewStore builds a Store which is prepared to open its database, but has not yet done so. The caller may wish to further tweak Options and Env settings, and should then call Open to open the database.

func (*Store) Destroy Uses

func (s *Store) Destroy()

Destroy the Store.

func (*Store) Open Uses

func (s *Store) Open() (err error)

Open the RocksDB. After Open, further updates to Env or Options are ignored.

func (*Store) RestoreCheckpoint Uses

func (s *Store) RestoreCheckpoint(_ consumer.Shard) (pc.Checkpoint, error)

func (*Store) StartCommit Uses

func (s *Store) StartCommit(_ consumer.Shard, cp pc.Checkpoint, waitFor client.OpFutures) client.OpFuture

type WritableFileObserver Uses

type WritableFileObserver interface {
    // Invoked when |data| is appended to the file. Note that |data| is owned by
    // RocksDB and must not be referenced after this call.
    Append(data []byte)
    // Invoked when the file is closed.
    Close()
    // Invoked when the file is Synced.
    Sync()
    // Invoked when the file is Fsync'd. Note that this may in turn
    // delegate to sync, and result in a call to the Sync() observer.
    Fsync()
    // Invoked when the file is RangeSync'd.
    RangeSync(offset, nbytes uint64)
}

WritableFileObserver allows for observation of mutating WritableFile operations. Consult |WritableFile| in rocksdb/env.h for further details.

Package store_rocksdb imports 15 packages (graph) and is imported by 9 packages. Updated 2020-06-26. Refresh now. Tools for package owners.