store

package
v0.0.0-...-891c386 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 3, 2021 License: GPL-3.0 Imports: 13 Imported by: 0

Documentation

Overview

The Store serves as an intermediary between the Server and the p2p Node Host. Vault clients should be able to durably write the Store without connections to Vault peers. Both services acts as clients of the Store and watch it for changes to stream to either the Vault client or the host's peers.

Design notes on schema

Badger is a pure K/V store without a table/collection abstraction, but keeping open a large number of DBs is likely to run into ulimits usability issues (not to mention spawning large numbers of goroutines), so we want to use a single table space.

EntryIDs are 30 bytes, sortable by timestamp and including a random component sufficient to prevent collisions.

Because peers can come and go, we may get "historical" entries from those peers from a large time window in the past (configurable by the community but expected to be on the order of days and weeks, not seconds).

Therefore, in addition to the Entries keyspace, we'll have an ArrivalIndex keyspace (with zero-values) which gives us ordered keys for when entries were received by this peer. A client that comes online can use its last ArrivalIndex to ask the vault for the keys that it missed while disconnected.

The ArrivalIndex can be "aged out".

Entry key schema (63 bytes):

bytes  00:31  channel ID (sha256)
byte   32     table space (Entries) + reserved control bits
bytes  34:63  entry ID

ArrivalIndex key schema (69 bytes):

bytes  00:31  channel ID (sha256)
byte   32     table space (ArrivalIndex) + reserved control bits
bytes  33:38  timestamp in unix seconds (UTC, big endian)
bytes  39:69  entry ID

Entry value schema is a protos.Msg w/ Op and ReqID left unset.

Index

Constants

View Source
const (
	Tail = math.MaxUint64
)

Variables

View Source
var (
	ErrorInvalidEntryID = errors.New("invalid entry ID")
	ErrorKeyExists      = errors.New("entry ID already exists")
	ErrorKeyNotFound    = errors.New("key not found")
)

Functions

This section is empty.

Types

type ArrivalKey

type ArrivalKey = []byte

type BadgerLogger

type BadgerLogger struct {
	// contains filtered or unexported fields
}

BadgerLogger wraps the our logger. The badger.Logger says it's "implemented by any logging system that is used for standard logs" but it turns out only klog implements their logs with this interface and klog is still missing structured fields. And badger adds newlines at the end of its lines for some reason

func (BadgerLogger) Debugf

func (l BadgerLogger) Debugf(msg string, objs ...interface{})

func (BadgerLogger) Errorf

func (l BadgerLogger) Errorf(msg string, objs ...interface{})

func (BadgerLogger) Infof

func (l BadgerLogger) Infof(msg string, objs ...interface{})

func (BadgerLogger) Warningf

func (l BadgerLogger) Warningf(msg string, objs ...interface{})

type Channel

type Channel struct {
	// contains filtered or unexported fields
}

Channel is an abstraction around the Store that tracks state for subscribers

func (*Channel) Append

func (c *Channel) Append(entry *pb.Msg) (StoreKey, error)

Append creates a new entry and returns the ArrivalIndex key for that entry, so that clients can use it as a bookmark

func (*Channel) EntryKey

func (c *Channel) EntryKey(entryID []byte) []byte

func (*Channel) Get

func (c *Channel) Get(key StoreKey) (*pb.Msg, error)

Get accesses a key directly, implemented for p2p snapshot and restore, as well as debugging and testing.

func (*Channel) Subscribe

func (c *Channel) Subscribe(pctx context.Context, target SubscriptionTarget, opts *StreamOpts)

Subscribe sets up a subscription that will fire the target's Publish callback with each entry between the `start` and `max` IDs.

func (*Channel) URI

func (c *Channel) URI() string

type ChannelID

type ChannelID = [32]byte

type Config

type Config struct {
	DataDir      string
	DB           badger.Options
	HasDiscovery bool
	Log          *log.Entry
}

func DevConfig

func DevConfig() *Config

func (*Config) Init

func (cfg *Config) Init()

Init overrides the configuration values of the Config object with those passed in as flags

type Store

type Store struct {
	// contains filtered or unexported fields
}

func New

func New(ctx context.Context, cfg *Config) (*Store, error)

func (*Store) Channel

func (s *Store) Channel(id ChannelID) (*Channel, error)

Channel creates a new channel and starts its watcher, or returns one we've registered previously.

func (*Store) ChannelUpdates

func (s *Store) ChannelUpdates() chan *Channel

func (*Store) Get

func (s *Store) Get(key StoreKey) (*pb.Msg, error)

Get accesses a key directly, implemented for p2p snapshot and restore, as well as debugging and testing.

func (*Store) Upsert

func (s *Store) Upsert(key StoreKey, entry *pb.Msg) error

Upsert writes an entry directly, implemented for p2p snapshot and restore, as well as debugging and testing. It does not write into the index. The caller is responsible for anything it cares about in the EntryHeader.

type StoreKey

type StoreKey = []byte

type StreamOptFlags

type StreamOptFlags uint8
const (
	OptNone     StreamOptFlags = 0
	OptKeysOnly StreamOptFlags = 1 << iota
	OptSkipFirst
	OptFromHead
	OptFromGenesis
	OptFromIndex
)

type StreamOpts

type StreamOpts struct {
	Seek  []byte
	Max   uint64
	Flags StreamOptFlags
}

type SubscriptionTarget

type SubscriptionTarget interface {
	Publish(*pb.Msg)
	Done()
	ID() helpers.UUID
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL