core: go.gazette.dev/core/consumer/store-sqlite Index | Files

package store_sqlite

import "go.gazette.dev/core/consumer/store-sqlite"

Package store_sqlite implements the consumer.Store interface via an embedded SQLite instance.

Database Representation

SQLite records state across a very long lived database file, and short-lived transactional log files (journals, or write-ahead logs; see https://www.sqlite.org/fileformat.html).

The central strategy of this package is to provide a SQLite VFS implementation (https://www.sqlite.org/vfs.html) with "hooks" to record mutations of these files made by the database, as they occur, into a recovery log. Recovering a database consists of restoring the on-disk representation of the main database and associated transaction logs.

SQLite transaction logs are well-suited to this strategy, as they're append- only and (typically) short-lived. The main database file is not. It has unbounded lifetime and is read & written randomly by the SQLite engine. Directly representing the main database as a recoverylog.FNode would result in an unbounded recovery horizon, as every operation ever done against the database would need to be replayed. There must be a means of "compacting" away earlier recorded DB mutations which are no longer needed in order to tighten the recovery playback horizon to a more reasonable bound.

SQLite does make certain guarantees with respect to the main database file, particularly that reads and writes are always page-aligned, and of page size. The database file can thus be thought of as an associative map of page offsets and their current content. This package therefore employs a RocksDB instance, recorded into the same recovery log, into which page offsets and values are written and read. This adds some overhead to page reads and writes, but provides a much tighter bound to the recovery playback horizon (since RocksDB compactions will eventually discard old versions of pages that have since been re-written).

Stores may have multiple associated SQLite databases: a primary database, and arbitrary secondary databases ATTACH'd to the primary. Distinct databases are tracked as distinct column families of a single RocksDB.

Transaction logs (SQLite journals & the WAL) are directly recorded into the recovery log. SQLite provides multiple "journal_mode" settings, such as PERSIST and TRUNCATE, which cause a log file to be re-used indefinitely: this implementation detects and maps these operations into new recorded logs, to ensure a tighter recovery playback horizon.

Batch Atomic Writes

Where SQLITE_BATCH_ATOMIC_WRITE is enabled, the Store VFS presents itself as a supporting file system, with atomic write semantics implemented by a RocksDB WriteBatchWithIndex. This arrangement allows SQLite to by-pass writing a rollback journal altogether in almost all cases, and page mutations are written only once to the RocksDB WAL (reducing write amplification).

For remaining cases where a rollback journal is required (eg, because the SQLite pager's dirty page cache spills to disk, or because the transaction involves multiple journals), the choice of _journal_mode doesn't much matter but TRUNCATE or PERSIST have fewer CGO calls.

Note that _without_ BATCH_ATOMIC_WRITE mode, rollback journals result in substantial write amplification on a per-transaction basis, as each page is written first to the rollback journal (before its mutation) and also to the RocksDB WAL (after its mutation).

For this reason if BATCH_ATOMIC_WRITE is unavailable, use of journal_mode=WAL is recommended, as each page mutation is then written only once during a transaction to the SQLite WAL (and not to RocksDB). Later, when a WAL checkpoint is executed, only the most-recent version of a page is then written to RocksDB (and its WAL).

Buffering and the Synchronous Pragma

The Store VFS employs buffering in its C++ bindings to amortize and minimize the number of calls which must cross the CGO barrier (as these can be quite slow). It's completely reliant on the "sync" signals delivered by the SQLite engine to understand when buffered data must be delivered over CGO and sequenced into the recovery log. For this reason, the PRAGMA *must be*:

PRAGMA synchronous=FULL;

The default SQLite URI flags used by the Store set this pragma. SQLite behavior for any lower setting, such as synchronous=NORMAL, is to not sync the SQLite write-ahead log after each transaction commit, which is incompatible with this implementation and the expectations of the Store interface (specifically, that after StartCommit all mutations of the transaction have been flushed to the recovery log).

As Store databases are by nature ephemeral, and durability is provided by the recovery log, the Store VFS *does not* propagate sync signals to OS file delegates, and never waits for a host disk to sync. "synchronous=FULL" thus has negligible performance overhead as compared to "synchronous=OFF".

Index

Package Files

doc.go store.go store_test_support.go

func SQLiteCompiledOptions Uses

func SQLiteCompiledOptions() (map[string]struct{}, error)

SQLiteCompiledOptions returns the set of compile-time options that the linked SQLite library was built with. See https://www.sqlite.org/compile.html for a full listing. Note the "SQLITE_" prefix is dropped in the returned set:

map[string]struct{
    "COMPILER=gcc-8.3.0": {},
    "ENABLE_BATCH_ATOMIC_WRITE": {},
    "ENABLE_COLUMN_METADATA": {},
    "ENABLE_DBSTAT_VTAB": {},
    ... etc ...
}

type Store Uses

type Store struct {
    // PageDBOptions are options of the RocksDB which backs SQLite DBs.
    // NewStore initializes PageDBOptions with reasonable defaults, however
    // clients may wish to further customize.
    PageDBOptions *gorocksdb.Options
    // PageDBColumnOptions are column family RocksDB options applied to each
    // column family which backs a SQLite DB. NewStore initializes
    // PageDBColumnOptions with reasonable defaults, however clients may wish
    // to further customize.
    PageDBColumnOptions *gorocksdb.Options
    // PageDBColumns are names of RocksDB column families which back SQLite DBs,
    // and their associated *ColumnFamilyHandle.
    //
    // Typically only one SQLite DB is used, but additional DBs may also be
    // created via the SQLite "ATTACH" command, and each one is represented as
    // a RocksDB column family named after the DB (relative to the Recorder root).
    // One additional "default" family also exists, but is not used. Note attached
    // DBs must use this Store's Sqlite VFS (see also: URIForDB).
    // PageDBColumns is populated by NewStore.
    PageDBColumns map[string]*gorocksdb.ColumnFamilyHandle
    // PageDBEnv is the recorded RocksDB environment of the RocksDB
    // backing SQLite DB pages. NewStore initializes PageDBEnv.
    PageDBEnv *gorocksdb.Env
    // SQLiteURIValues of the filename URI used to open the primary SQLite
    // database. NewStore initializes SQLiteURIValues, but clients may wish to
    // customize URI parameters passed to the database prior to Open being
    // called, eg with:
    //
    // * "cache=shared" to enable SQLite's shared-cache mode, typically in
    //   combination with either "PRAGMA read_uncommitted" or go-sqlite3
    //   compiled with the "sqlite_unlock_notify" build tag (otherwise
    //   shared-cache mode increases the likelihood of "database is locked"
    //   errors; see https://github.com/mattn/go-sqlite3/issues/632).
    // * "_journal_mode=WAL" to enable SQLite WAL mode.
    //
    // See also the set of URI parameters supported by github.com/mattn/go-sqlite3`
    // and by SQLite itself (https://www.sqlite.org/uri.html). The "vfs" parameter
    // is specific to this particular Store, and must not be modified.
    SQLiteURIValues url.Values
    // SQLiteDB is the opened SQLite primary database. Clients may wish to use
    // this instance to perform read-only queries against the database outside
    // of consumer transactions. However, all mutations of the database should
    // be made through the store's Transaction. SQLiteDB is not set until Open.
    SQLiteDB *sql.DB
    // Cache is provided for application use in the temporary storage of
    // in-memory state associated with a Store. Eg, Cache might store records
    // which have been read and modified this transaction, and which will be
    // written out during FinalizeTxn.
    //
    // The representation of Cache is up to the application; it is not directly
    // used by Store.
    Cache interface{}
    // SQL statements prepared against the SQLiteDB (or a transaction thereof),
    // in the order provided to Open.
    Stmts []*sql.Stmt
    // contains filtered or unexported fields
}

Store is a consumer.Store implementation which wraps a primary SQLite database, as well as multiple auxiliary ATTACH'd databases. All file mutations of the database are sequenced into a recoverylog.Recorder for fault tolerance.

func NewStore Uses

func NewStore(recorder *recoverylog.Recorder) (*Store, error)

NewStore builds a new Store instance around the Recorder. The caller must Open the returned Store before use.

func (*Store) Destroy Uses

func (s *Store) Destroy()

Destroy the Store, removing the local processing directory and freeing associated resources.

func (*Store) Open Uses

func (s *Store) Open(bootstrapSQL string, statements ...string) error

Open the Store. The provided bootstrapSQL is executed against the DB before any other action is taken (this is a good opportunity to set PRAGMAs, create tables & indexes if they don't exist, set triggers, etc). The "gazette_checkpoint" table is then created, and any provided statements are prepared and added to Store.Stmts in the same order as provided to Open.

store.Open(
    // Create myTable if it doesn't yet exist:
    `CREATE TABLE IF NOT EXISTS myTable (
        id BLOB PRIMARY KEY NOT NULL,
        valueOne  INTEGER,
        valueTwo  TEXT
    );`,
    // Statement for querying on valueOne:
    `SELECT id, valueTwo FROM myTable WHERE valueOne > ?;`,
    // Statement for upserting into myTable:
    `INSERT INTO myTable(id, valueOne, valueTwo) VALUES(:id, :one, :two)
        ON CONFLICT(id) DO UPDATE SET valueOne = valueOne + :one, valueTwo = :two`,
)
// store.Stmts[0] is now prepared for queries, and store.Stmts[1] for upserts.

func (*Store) RestoreCheckpoint Uses

func (s *Store) RestoreCheckpoint(_ consumer.Shard) (cp pc.Checkpoint, _ error)

RestoreCheckpoint SELECTS the most recent Checkpoint of this Store.

func (*Store) StartCommit Uses

func (s *Store) StartCommit(_ consumer.Shard, cp pc.Checkpoint, waitFor client.OpFutures) client.OpFuture

StartCommit commits the current transaction to the local SQLite database, and returns a recorder barrier which resolves once all mutations of the database have been persisted to the recovery log.

func (*Store) Transaction Uses

func (s *Store) Transaction(ctx context.Context, txOpts *sql.TxOptions) (_ *sql.Tx, err error)

Transaction returns or (if not already begun) begins a SQL transaction. Optional *TxOptions have an effect only if Transaction begins a new SQL transaction.

func (*Store) URIForDB Uses

func (s *Store) URIForDB(name string) string

URIForDB takes a SQLite database name and returns a suitable SQLite URI given the Store's current SQLiteURIValues. Use URIForDB to map a relative database name (which is held constant across Store recoveries) to a URI specific to this *Store instance and which is suited for ATTACH-ing to the primary database. Eg:

URIForDB("sept_2019.db") =>
  "file:sept_2019.db?_journal_mode=WAL&other-setting=bar&vfs=store-123456&..."

Package store_sqlite imports 23 packages (graph) and is imported by 6 packages. Updated 2020-06-27. Refresh now. Tools for package owners.