core: go.gazette.dev/core/consumer Index | Files | Directories

package consumer

import "go.gazette.dev/core/consumer"

Package consumer is a framework for distributed, stateful consumption of Gazette journals.

Most users will want to use package go.gazette.dev/core/mainboilerplate/runconsumer to build complete, configured framework applications. This package focuses on interface definitions and implementation of the consumer runtime.

The primary entry point to this package is the Application interface, which users implement to power the event-driven callback logic of the consumer. Service is then the top-level runtime of a consumer process, cooperatively executing an Application across a number of distributed peer processes and servicing the gRPC shard API.

A number of convenience functions are also supplied for interfacing with a remote Shard server endpoint.

Index

Package Files

interfaces.go key_space.go recovery.go resolver.go service.go shard.go shard_api.go store_json_file.go store_sql.go transaction.go

Variables

var ErrResolverStopped = errors.New("resolver has stopped serving local shards")

ErrResolverStopped is returned by Resolver if a ShardID resolves to a local shard, but the Resolver has already been asked to stop serving local shards. This happens when the consumer is shutting down abnormally (eg, due to Etcd lease keep-alive failure) but its local KeySpace doesn't reflect a re-assignment of the Shard, and we therefore cannot hope to proxy the request to another server. Resolve fails in this case to encourage the immediate completion of related RPCs, as we're probably also trying to drain the gRPC server.

func ApplyShards Uses

func ApplyShards(ctx context.Context, sc pc.ShardClient, req *pc.ApplyRequest) (*pc.ApplyResponse, error)

ApplyShards applies shard changes detailed in the ApplyRequest via the consumer Apply RPC. Changes are applied as a single Etcd transaction. If the change list is larger than an Etcd transaction can accommodate, ApplyShardsInBatches should be used instead. ApplyResponse statuses other than OK are mapped to an error.

func ApplyShardsInBatches Uses

func ApplyShardsInBatches(ctx context.Context, sc pc.ShardClient, req *pc.ApplyRequest, size int) (*pc.ApplyResponse, error)

ApplyShardsInBatches is like ApplyShards, but chunks the ApplyRequest into batches of the given size, which should be less than Etcd's maximum configured transaction size (usually 128). If size is 0 all changes will be attempted in a single transaction. Be aware that ApplyShardsInBatches may only partially succeed, with some batches having applied and others not. The final ApplyResponse is returned, unless an error occurs. ApplyResponse statuses other than OK are mapped to an error.

func FetchHints Uses

func FetchHints(ctx context.Context, sc pc.ShardClient, req *pc.GetHintsRequest) (*pc.GetHintsResponse, error)

FetchHints is a convenience for invoking the GetHints RPC, which maps a response validation or !OK status to an error.

func ListShards Uses

func ListShards(ctx context.Context, sc pc.ShardClient, req *pc.ListRequest) (*pc.ListResponse, error)

ListShards is a convenience for invoking the List RPC, which maps a validation or !OK status to an error.

func NewKeySpace Uses

func NewKeySpace(prefix string) *keyspace.KeySpace

NewKeySpace returns a consumer KeySpace suitable for use with an Allocator, which groups member processes over the given prefix. It decodes allocator Items as ShardSpec messages, Members as ConsumerSpecs, and Assignments as RecoveryStatus enums.

func ShardIsConsistent Uses

func ShardIsConsistent(shard allocator.Item, assignment keyspace.KeyValue, all keyspace.KeyValues) bool

ShardIsConsistent returns true if no replicas of the shard are currently back-filling.

func StatShard Uses

func StatShard(ctx context.Context, rc pc.RoutedShardClient, req *pc.StatRequest) (*pc.StatResponse, error)

StatShard is a convenience for invoking the Stat RPC, which maps a validation or !OK status to an error.

func VerifyReferencedJournals Uses

func VerifyReferencedJournals(ctx context.Context, jc pb.JournalClient, req *pc.ApplyRequest) error

VerifyReferencedJournals ensures the referential integrity of journals (sources and recovery logs, and their content types) referenced by Shards of the ApplyRequest. It returns a descriptive error if any invalid references are found.

type Application Uses

type Application interface {
    // NewStore constructs a Store for the Shard around the initialize file-
    // system *Recorder. If the ShardSpec does not have a configured recovery
    // log, then *Recorder will be nil.
    NewStore(Shard, *recoverylog.Recorder) (Store, error)
    // NewMessage returns a zero-valued Message of an appropriate representation
    // for the JournalSpec.
    NewMessage(*pb.JournalSpec) (message.Message, error)
    // ConsumeMessage consumes a read-committed message within the scope of a
    // transaction. It should use the provided Publisher to PublishUncommitted
    // messages to other journals. Doing so enables Gazette to properly sequence
    // messages and ensure they are either acknowledged or rolled-back atomically
    // with this consumer transaction.
    ConsumeMessage(Shard, Store, message.Envelope, *message.Publisher) error
    // FinalizeTxn indicates a consumer transaction is ending, and that the
    // Application must flush any in-memory transaction state or caches, and
    // begin any deferred journal appends. At completion:
    //  * All transaction messages must have been published to the provided Publisher,
    //  * All raw transaction []byte content must have been appended via the shard
    //    JournalClient, and
    //  * All in-memory state must be marshalled to the active Store transaction.
    FinalizeTxn(Shard, Store, *message.Publisher) error
}

Application is the interface provided by user applications running as Gazette consumers. Only unrecoverable errors should be returned by Application. A returned error will abort processing of an assigned Shard, and will update the assignment's ReplicaStatus to FAILED.

Gazette consumers process messages within pipelined transactions. A transaction begins upon the first call to ConsumeMessage, which is invoked for each read-committed message of a source journal. In the course of the transaction many more messages may be passed to ConsumeMessage. When consuming a message the Application is free to:

1) Begin or continue a transaction with its Store.
2) Publish exactly-once Messages to other journals via the provided Publisher.
3) Append raw at-least-once []bytes via the Shard's JournalClient.
4) Keep in-memory-only aggregates such as counts.

Messages published via PublishUncommitted will be visible to read-committed readers once the consumer transaction completes. Read-uncommitted readers will see them while the transaction continues to run. Similarly writes issued directly through the shard JournalClient are also readable while the transaction runs.

A transaction *must* continue to run while asynchronous appends of the _prior_ transaction are ongoing (including appends to the shard recovery log and appends of post-commit acknowledgements). The transaction will continue to process messages during this time until the ShardSpec's MaxTxnDuration is reached, at which point the transaction will stop reading messages but continue to wait for prior appends to finish.

It must also continue to run until the ShardSpec's MinTxnDuration is reached.

Assuming both of those conditions are satisfied, the transaction will close upon encountering a stall in a buffered channel of decoded input messages. If a stall isn't forthcoming (as is frequent at high write rates), it will close upon reaching the ShardSpec's MaxTxnDuration.

Upon transaction close, FinalizeTxn is called. At this point the Application must publish any pending messages and/or begin related journal appends, and must flush any in-memory caches or aggregates into its Store transaction (simply starting appends is sufficient: the Application does _not_ need to wait for journal appends to complete).

StartCommit of the Store is then called with a Checkpoint. For correct exactly-once processing semantics, the Checkpoint must be written in a single transaction alongside all other Store mutations made within the scope of this consumer transaction:

* Eg for `store-rocksdb`, all store mutations and the Checkpoint are
  written together within a single RocksDB WriteBatch.
* For `SQLStore`, verification of the write fence, INSERTS, UPDATES, and the
  Checkpoint itself are written within a single BEGIN/COMMIT transaction.
* Similarly, `store-sqlite` persists Checkpoints within the scope of the
  current SQL transaction.

Note that other, non-transactional Store mutations are permitted, but will have a weaker at-least-once processing guarantee with respect to Store state. This can make sense for applications filling caches in BigTable, Memcache, or other systems which support transactions only over a single key (ie, check-and-set). In this case the Store should apply all mutations, followed by a fenced CAS Checkpoint update. So long as the Checkpoint itself is fenced properly, messages will continue to have exactly-once semantics (though be cautious of publishing messages which are derived from other Store keys that may have been updated more than once).

Once the commit completes, acknowledgements of messages published during the transaction are written to applicable journals, which informs downstream readers that those messages have committed and may now be consumed.

Transactions are fully pipelined: once StartCommit has returned, the next consumer transaction immediately begins processing even though the prior transaction continues to commit in the background (and could even fail). In fact, at this point there's no guarantee that any journal writes of the previous transaction have even _started_, including those to the recovery log.

To make this work safely and correctly, transactions use barriers to ensure that background operations are started and complete in the correct order: for example, that the prior transaction doesn't commit until all its writes to other journals have also completed, and that writes of message acknowledgements don't start until mutations & the checkpoint have committed to the Store.

While those operations complete in the background, the next transaction will consume new messages concurrently. Its one constraint is that it may not itself start to commit until its predecessor transaction has fully completed. This means that transactions will almost always exhibit some degree of batching, which will depend on the rate of incoming messages, the latency to Gazette, and the latency and speed of a utilized external store. If the prior commit takes so long that the successor transaction reaches its maximum duration, then that successor will stall without processing further messages until its predecessor commits. This is the _only_ case under which a consumer can be blocked from processing ready messages.

type BeginFinisher Uses

type BeginFinisher interface {
    // BeginTxn is called to notify the Application that a transaction is beginning
    // (and a call to ConsumeMessage will be immediately forthcoming), allowing
    // the Application to perform any preparatory work. For consumers doing
    // extensive aggregation, it may be beneficial to focus available compute
    // resource on a small number of transactions while completely stalling
    // others: this can be accomplished by blocking in BeginTxn until a semaphore
    // is acquired. A call to BeginTx is always paired with a call to FinishTxn.
    BeginTxn(Shard, Store) error
    // FinishedTxn is notified that a previously begun transaction has started to
    // commit, or has errored. It allows the Application to perform related cleanup
    // (eg, releasing a previously acquired semaphore). Note transactions are
    // pipelined, and commit operations of this transaction may still be ongoing.
    // FinishedTxn can await the provided OpFuture for its final status.
    FinishedTxn(Shard, Store, OpFuture)
}

BeginFinisher is an optional interface of Application which is informed when consumer transactions begin or finish.

type JSONFileStore Uses

type JSONFileStore struct {
    // State is a user-provided instance which is un/marshal-able to JSON.
    State interface{}
    // contains filtered or unexported fields
}

JSONFileStore is a simple Store implementation which materializes itself as a JSON-encoded file, which is re-written at the commit of every consumer transaction.

func NewJSONFileStore Uses

func NewJSONFileStore(rec *recoverylog.Recorder, state interface{}) (*JSONFileStore, error)

NewJSONFileStore returns a new JSONFileStore. |state| is the runtime instance of the Store's state, which is decoded into, encoded from, and retained as JSONFileState.State.

func (*JSONFileStore) Destroy Uses

func (s *JSONFileStore) Destroy()

Destroy the JSONFileStore directory and state file.

func (*JSONFileStore) RestoreCheckpoint Uses

func (s *JSONFileStore) RestoreCheckpoint(Shard) (pc.Checkpoint, error)

RestoreCheckpoint returns the checkpoint encoded in the recovered JSON state file.

func (*JSONFileStore) StartCommit Uses

func (s *JSONFileStore) StartCommit(_ Shard, cp pc.Checkpoint, waitFor OpFutures) OpFuture

StartCommit marshals the in-memory state and Checkpoint into a recorded JSON state file.

type OpFuture Uses

type OpFuture = client.OpFuture

OpFuture represents an operation which is executing in the background. Aliased for brevity from the "client" package.

type OpFutures Uses

type OpFutures = client.OpFutures

type Resolution Uses

type Resolution struct {
    Status pc.Status
    // Header captures the resolved consumer ProcessId, effective Etcd Revision,
    // and Route of the shard resolution.
    Header pb.Header
    // Spec of the Shard at the current Etcd revision.
    Spec *pc.ShardSpec
    // Shard processing context, or nil if this process is not primary for the ShardID.
    Shard Shard
    // Store of the Shard, or nil if this process is not primary for the ShardID.
    Store Store
    // Done releases Shard & Store, and must be called when no longer needed.
    // Iff Shard & Store are nil, so is Done.
    Done func()
}

Resolution is the result of resolving a ShardID to a responsible consumer process.

type ResolveArgs Uses

type ResolveArgs struct {
    Context context.Context
    // ShardID to be resolved.
    ShardID pc.ShardID
    // Whether we may resolve to another consumer peer. If false and this
    // process is not assigned as shard primary, Resolve returns status
    // NOT_SHARD_PRIMARY.
    MayProxy bool
    // If this request is a proxy from a consumer process peer, Header is
    // the forwarded Header of that peer's Resolution. ProxyHeader is used
    // to ensure this resolver first reads through the Etcd revision which
    // was effective at the peer at time of resolution, and to sanity check
    // the consistency of those resolutions.
    ProxyHeader *pb.Header
    // ReadThrough is journals and offsets which must have been read through in a
    // completed consumer transaction before Resolve may return (blocking if
    // required). Offsets of journals not read by this shard are ignored.
    //
    // ReadThrough aides in the implementation of services which "read their
    // own writes", where those writes propagate through one or more consumer
    // applications: callers pass offsets of their appends to Resolve, which
    // will block until those appends have been processed.
    // See also: Shard.Progress.
    ReadThrough pb.Offsets
}

ResolveArgs parametrize a request to resolve a ShardID to a current, responsible consumer process.

type Resolver Uses

type Resolver struct {
    // contains filtered or unexported fields
}

Resolver applies the current allocator.State to map journals to shards, and shards to responsible consumer processes. It also monitors the State to manage the lifetime of local shards which are assigned to this process.

func NewResolver Uses

func NewResolver(state *allocator.State, newShard func(keyspace.KeyValue) *shard) *Resolver

NewResolver returns a Resolver derived from the allocator.State, which will use the newShard closure to instantiate a *shard for each local Assignment of the local ConsumerSpec.

func (*Resolver) Resolve Uses

func (r *Resolver) Resolve(args ResolveArgs) (res Resolution, err error)

Resolve resolves a ShardID to a responsible consumer process. If this process is the assigned primary for a ShardID but its Store is not yet ready (eg, because it's still playing back from its recovery log), then Resolve will block until the Store is ready.

func (*Resolver) ShardsWithSource Uses

func (r *Resolver) ShardsWithSource(journal pb.Journal) []*pc.ShardSpec

ShardsWithSource returns specs having the journal as a source, in ShardID order. APIs presented by consumer applications often want to map an app-defined key to an associated journal through a message.MappingFunc, and from there to a responsible shard against which the API request is ultimately dispatched. "Responsible" is up to the application: while it's common to have 1:1 assignment between shards and source journals, other patterns are possible and the application must make an appropriate selection from among the returned ShardSpecs for its use case.

var mapping message.MappingFunc = ...
var mappedID pc.ShardID

if journal, _, err := mapping(key); err != nil {
    // Handle error.
} else if specs := resolver.ShardsWithSource(journal); len(specs) == 0 {
    err = fmt.Errorf("no ShardSpec is consuming mapped journal %s", journal)
    // Handle error.
} else {
    mappedID = specs[0].Id
}

var resolution, err = svc.Resolver.Resolve(consumer.ResolveArgs{
    ShardID:     specs[0].Id,
    ...
})

type SQLStore Uses

type SQLStore struct {
    DB  *sql.DB

    // Cache is a provided for application use in the temporary storage of
    // in-memory state associated with a SQLStore. Eg, Cache might store records
    // which have been read this transaction and modified in-memory, and which
    // will be written out during FinalizeTxn.
    //
    // The representation of Cache is up to the application; it is not directly
    // used by SQLStore.
    Cache interface{}
    // contains filtered or unexported fields
}

SQLStore is a Store implementation which utilizes a remote database having a "database/sql" compatible driver. For each consumer transaction, SQLStore begins a corresponding SQL transaction which may be obtained via Transaction, through which reads and writes to the store should be issued. A table "gazette_checkpoints" is also required, to which consumer checkpoints are persisted, and having a schema like:

CREATE TABLE gazette_checkpoints (
  shard_fqn  TEXT    PRIMARY KEY NOT NULL,
  fence      INTEGER NOT NULL,
  checkpoint BLOB    NOT NULL
);

The user is responsible for creating this table. Exact data types may vary with the store dialect, but "shard_fqn" must be its PRIMARY KEY. StartCommit writes the checkpoint to this table before committing the transaction.

func NewSQLStore Uses

func NewSQLStore(db *sql.DB) *SQLStore

NewSQLStore returns a new SQLStore using the *DB.

func (*SQLStore) Destroy Uses

func (s *SQLStore) Destroy()

Destroy rolls back an existing transaction, but does not close the *DB instance previously passed to NewSQLStore.

func (*SQLStore) RestoreCheckpoint Uses

func (s *SQLStore) RestoreCheckpoint(shard Shard) (cp pc.Checkpoint, _ error)

RestoreCheckpoint issues a SQL transaction which SELECTS the most recent Checkpoint of this shard FQN and also increments its "fence" column.

func (*SQLStore) StartCommit Uses

func (s *SQLStore) StartCommit(shard Shard, cp pc.Checkpoint, waitFor OpFutures) OpFuture

StartCommit starts a background commit of the current transaction. While it runs, a new transaction may be started. The returned OpFuture will fail if the "fence" column has been further modified from the result previously read and updated by RestoreCheckpoint.

func (*SQLStore) Transaction Uses

func (s *SQLStore) Transaction(ctx context.Context, txOpts *sql.TxOptions) (_ *sql.Tx, err error)

Transaction returns or (if not already begun) begins a SQL transaction. Optional *TxOptions have an effect only if a transaction has not already been started.

type Service Uses

type Service struct {
    // Application served by the Service.
    App Application
    // Resolver of Service shards.
    Resolver *Resolver
    // Distributed allocator state of the service.
    State *allocator.State
    // Loopback connection which defaults to the local server, but is wired with
    // a pb.DispatchBalancer. Consumer applications may use Loopback to proxy
    // application-specific RPCs to peer consumer instance, after performing
    // shard resolution.
    Loopback *grpc.ClientConn
    // Journal client for use by consumer applications.
    Journals pb.RoutedJournalClient
    // Etcd client for use by consumer applications.
    Etcd *clientv3.Client
    // contains filtered or unexported fields
}

Service is the top-level runtime entity of a Gazette Consumer process. It drives local shard processing in response to allocator.State, powers shard resolution, and is also an implementation of ShardServer.

func NewService Uses

func NewService(app Application, state *allocator.State, rjc pb.RoutedJournalClient, lo *grpc.ClientConn, etcd *clientv3.Client) *Service

NewService constructs a new Service of the Application, driven by allocator.State.

func (*Service) Apply Uses

func (srv *Service) Apply(ctx context.Context, req *pc.ApplyRequest) (*pc.ApplyResponse, error)

Apply dispatches the ShardServer.Apply API.

func (*Service) GetHints Uses

func (srv *Service) GetHints(ctx context.Context, req *pc.GetHintsRequest) (*pc.GetHintsResponse, error)

GetHints dispatches the ShardServer.Hints API.

func (*Service) List Uses

func (srv *Service) List(ctx context.Context, req *pc.ListRequest) (*pc.ListResponse, error)

List dispatches the ShardServer.List API.

func (*Service) QueueTasks Uses

func (svc *Service) QueueTasks(tasks *task.Group, server *server.Server)

QueueTasks to watch the Service KeySpace and serve any local assignments reflected therein, until the Context is cancelled or an error occurs. All local shards are gracefully stopped prior to return, even when exiting due to an error.

func (*Service) Stat Uses

func (srv *Service) Stat(ctx context.Context, req *pc.StatRequest) (*pc.StatResponse, error)

Stat dispatches the ShardServer.Stat API.

func (*Service) Stopping Uses

func (svc *Service) Stopping() <-chan struct{}

Stopping returns a channel which signals when the Service is in the process of shutting down. Consumer applications with long-lived RPCs should use this signal to begin graceful cleanup of outstanding RPCs.

type Shard Uses

type Shard interface {
    // Context of this shard. Notably, the Context will be cancelled when this
    // process is no longer responsible for the shard.
    Context() context.Context
    // Fully-qualified name of this shard, AKA the Etcd key of the ShardSpec.
    // That key is used as the Shard FQN because it composes the consumer
    // application prefix with this ShardID, and is universally unique
    // within the backing Etcd cluster. An FQN can only conflict if another
    // consumer deployment, utilizing another Etcd cluster, also reuses the
    // same application prefix and ShardID.
    FQN() string
    // Current Spec of the shard. Fields of a returned Spec instance will never
    // change, but the instance returned by Spec may change over time to reflect
    // updated Etcd states.
    Spec() *pc.ShardSpec
    // Assignment of the shard to this process. Fields of a returned KeyValue
    // instance will never change, but the instance returned by Assignment may
    // change over time to reflect updated Etcd states.
    Assignment() keyspace.KeyValue
    // JournalClient to be used for raw journal []byte appends made on behalf
    // of this Shard. Consistent use of this client enables Gazette to ensure
    // that all writes issued within a consumer transaction have completed prior
    // to that transaction being committed. Put differently, if a consumed message
    // triggers a raw append to a journal, Gazette can guarantee that append will
    // occur at-least-once no matter how this Shard or brokers may fail.
    JournalClient() client.AsyncJournalClient
    // Progress of the Shard as-of its most recent completed transaction.
    // readThrough is offsets of source journals which have been read
    // through. publishAt is journals and offsets this Shard has published
    // through, including acknowledgements. If a read message A results in this
    // Shard publishing messages B, and A falls within readThrough, then all
    // messages B (& their acknowledgements) fall within publishAt.
    //
    // While readThrough is comprehensive and persists across Shard faults,
    // note that publishAt is *advisory* and not necessarily complete: it
    // includes only journals written to since this Shard was assigned to
    // this process.
    Progress() (readThrough, publishAt pb.Offsets)
}

Shard is the processing context of a ShardSpec which is assigned to the local consumer process.

type Store Uses

type Store interface {
    // StartCommit starts an asynchronous, atomic "commit" to the Store of state
    // updates from this transaction along with the Checkpoint. If Store uses an
    // external transactional system, then StartCommit must fail if another
    // process has invoked RestoreCheckpoint after *this* Store instance did
    // so. Put differently, Store cannot commit a Checkpoint that another Store
    // may never see (because it continues from an earlier Checkpoint that it
    // restored).
    //
    // In general terms, this means StartCommit must verify a "write fence"
    // previously installed by RestoreCheckpoint as part of its transaction.
    // Embedded Stores which use recovery logs can rely on the write fence of
    // the log itself, implemented via journal registers.
    //
    // StartCommit may immediately begin a transaction with the external system
    // (if one hasn't already been started from ConsumeMessage), but cannot
    // allow it to commit until all waitFor OpFutures have completed
    // successfully. A failure of one of these OpFutures must also fail this
    // commit.
    //
    // StartCommit will never be called if the OpFuture returned by a previous
    // StartCommit hasn't yet completed. However, ConsumeMessage *will* be
    // called while a previous StartCommit continues to run. Stores should
    // account for this, typically by starting a new transaction which runs
    // alongside the old.
    StartCommit(_ Shard, _ pc.Checkpoint, waitFor OpFutures) OpFuture
    // RestoreCheckpoint recovers the most recent Checkpoint previously committed
    // to the Store. It is called just once, at Shard start-up. If an external
    // system is used, it should install a transactional "write fence" to ensure
    // that an older Store instance of another process cannot successfully
    // StartCommit after this RestoreCheckpoint returns.
    RestoreCheckpoint(Shard) (pc.Checkpoint, error)
    // Destroy releases all resources associated with the Store (eg, local files).
    // It is guaranteed that the Store is no longer being used or referenced at
    // the onset of this call.
    Destroy()
}

Store is a durable and transactional storage backend used to persist arbitrary Application-defined states alongside the consumer Checkpoints which produced those states. The particular means by which the Store represents transactions varies from implementation to implementation, and is not modeled by this interface. However for correct exactly-once processing semantics, it must be the case that Store modifications made by Applications are made in transactions which are committed by StartCommit, and which incorporate the Checkpoint provided to StartCommit.

Often Stores are implemented as embedded databases which record their file operations into a provided `recoverylog.Recorder`. Stores which instead utilize an external transactional system (eg, an RDBMS) are also supported.

Application implementations control the selection, initialization, and usage of an appropriate Store for their use case.

Directories

PathSynopsis
protocolPackage protocol defines the consumer datamodel, validation behaviors, and gRPC APIs which are shared across clients and consumer application servers.
recoverylogPackage recoverylog specifies a finite state machine for recording and replaying observed filesystem operations into a Gazette journal.
shardspacePackage shardspace provides mechanisms for mapping a collection of ShardSpecs into a minimally-described, semi hierarchical structure, and for mapping back again.
store-rocksdbPackage store_rocksdb implements the consumer.Store interface via an embedded RocksDB instance.
store-sqlitePackage store_sqlite implements the consumer.Store interface via an embedded SQLite instance.

Package consumer imports 33 packages (graph) and is imported by 23 packages. Updated 2020-06-18. Refresh now. Tools for package owners.