watchable

package
v0.0.0-...-ba1c585 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 29, 2017 License: BSD-3-Clause Imports: 19 Imported by: 0

Documentation

Overview

Package watchable provides a Syncbase-specific store.Store wrapper that provides versioned storage for specified prefixes and maintains a watchable log of operations performed on versioned records. This log forms the basis for the implementation of client-facing watch as well as the sync module's internal watching of store updates.

LogEntry records are stored chronologically, using keys of the form "$log:<seq>". Sequence numbers are zero-padded to ensure that the lexicographic order matches the numeric order.

Version number records are stored using keys of the form "$version:<key>", where <key> is the client-specified key.

TODO(razvanm): Switch to package_test. This is a little involved because createStore is used by the other _test.go files in this directory and TestLogEntryTimestamps is poking inside the watchable store to read the sequence number.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func AddOp

func AddOp(tx *Transaction, op interface{}, precond func() error) error

AddOp provides a generic way to add an arbitrary op to the log. If precond is not nil it will be run with the locks held and the append will only happen if the precond returns an error.

func GetAtVersion

func GetAtVersion(ctx *context.T, st store.StoreReader, key, valbuf, version []byte) ([]byte, error)

GetAtVersion returns the value of a managed key at the requested version. This method is used by the Sync module when the responder needs to send objects over the wire. At minimum, an object implementing the StoreReader interface is required since this is a Get operation. TODO(razvanm): find a way to get rid of the type switch.

func GetResumeMarker

func GetResumeMarker(sntx store.SnapshotOrTransaction) (watch.ResumeMarker, error)

GetResumeMarker returns the ResumeMarker that points to the current end of the event log.

func GetVersion

func GetVersion(ctx *context.T, st store.StoreReader, key []byte) ([]byte, error)

GetVersion returns the current version of a managed key. This method is used by the Sync module when the initiator is attempting to add new versions of objects. Reading the version key is used for optimistic concurrency control. At minimum, an object implementing the StoreReader interface is required since this is a Get operation. TODO(razvanm): find a way to get rid of the type switch.

func MakeResumeMarker

func MakeResumeMarker(seq uint64) watch.ResumeMarker

MakeResumeMarker converts a sequence number to the resume marker.

func ManagesKey

func ManagesKey(tx *Transaction, key []byte) bool

ManagesKey returns true if the store used by a transaction manages a particular key.

func NewVersion

func NewVersion() []byte

NewVersion returns a new version for a store entry mutation.

func PutAtVersion

func PutAtVersion(ctx *context.T, tx *Transaction, key, valbuf, version []byte) error

PutAtVersion puts a value for the managed key at the requested version. This method is used by the Sync module exclusively when the initiator adds objects with versions created on other Syncbases. At minimum, an object implementing the Transaction interface is required since this is a Put operation.

func PutVersion

func PutVersion(ctx *context.T, tx *Transaction, key, version []byte) error

PutVersion updates the version of a managed key to the requested version. This method is used by the Sync module exclusively when the initiator selects which of the already stored versions (via PutAtVersion calls) becomes the current version. At minimum, an object implementing the Transaction interface is required since this is a Put operation.

func RunInTransaction

func RunInTransaction(st *Store, fn func(tx *Transaction) error) error

RunInTransaction runs the given fn in a transaction, managing retries and commit/abort.

func SetTransactionFromSync

func SetTransactionFromSync(tx *Transaction)

SetTransactionFromSync marks this transaction as created by sync as opposed to one created by an application. The net effect is that, at commit time, the log entries written are marked as made by sync. This allows the sync Watcher to ignore them (echo suppression) because it made these updates. Note: this is an internal function used by sync, not part of the interface. TODO(rdaoud): support a generic echo-suppression mechanism for apps as well maybe by having a creator ID in the transaction and log entries. TODO(rdaoud): fold this flag (or creator ID) into Tx options when available. TODO(razvanm): move to syncbase side by using a generic annotation mechanism.

Types

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client encapsulates a channel used to notify watch clients of store updates and an iterator over the watch log.

func (*Client) Err

func (c *Client) Err() error

Err returns the error that caused the client to stop watching. If the error is nil, the client is active. Otherwise: * ErrCanceled - watch was canceled by the client. * ErrAborted - watcher was closed (store was closed, possibly destroyed). * ErrUnknownResumeMarker - watch was started with an invalid or too old resume marker. * other errors - NextBatchFromLog encountered an error.

func (*Client) NextBatchFromLog

func (c *Client) NextBatchFromLog(st store.Store) ([]*LogEntry, watch.ResumeMarker, error)

NextBatchFromLog returns the next batch of watch log records (transaction) from the given database and the resume marker at the end of the batch. If there is no batch available, it returns a nil slice and the same resume marker as the previous NextBatchFromLog call. The returned log entries are guaranteed to point to existing data versions until either the client is stopped or NextBatchFromLog is called again. If the client is stopped, NextBatchFromLog returns the same error as Err.

func (*Client) Wait

func (c *Client) Wait() <-chan struct{}

Wait returns the update channel that can be used to wait for new changes in the store. If the update channel is closed, the client is stopped and no more updates will happen. Otherwise, the channel will have a value available whenever the store has changed since the last receive on the channel.

type Clock

type Clock interface {
	// Now returns the current time.
	Now() (time.Time, error)
}

Clock is an interface to a generic clock.

type DeleteOp

type DeleteOp struct {
	Key []byte
}

DeleteOp represents a store delete operation.

func (DeleteOp) VDLIsZero

func (x DeleteOp) VDLIsZero() bool

func (*DeleteOp) VDLRead

func (x *DeleteOp) VDLRead(dec vdl.Decoder) error

func (DeleteOp) VDLReflect

func (DeleteOp) VDLReflect(struct {
	Name string `vdl:"v.io/x/ref/services/syncbase/store/watchable.DeleteOp"`
})

func (DeleteOp) VDLWrite

func (x DeleteOp) VDLWrite(enc vdl.Encoder) error

type GetOp

type GetOp struct {
	Key []byte
}

GetOp represents a store get operation.

func (GetOp) VDLIsZero

func (x GetOp) VDLIsZero() bool

func (*GetOp) VDLRead

func (x *GetOp) VDLRead(dec vdl.Decoder) error

func (GetOp) VDLReflect

func (GetOp) VDLReflect(struct {
	Name string `vdl:"v.io/x/ref/services/syncbase/store/watchable.GetOp"`
})

func (GetOp) VDLWrite

func (x GetOp) VDLWrite(enc vdl.Encoder) error

type LogEntry

type LogEntry struct {
	// The store operation that was performed.
	Op *vom.RawBytes
	// Time when the operation was committed in nanoseconds since the epoch.
	// Note: We don't use time.Time here because VDL's time.Time consists of
	// {Seconds int64, Nanos int32}, which is more expensive than a single int64.
	CommitTimestamp int64
	// Operation came from sync (used for echo suppression).
	// TODO(razvanm): this field is specific to syncbase. We should add a
	// generic way to add fields and use that instead.
	FromSync bool
	// If true, this entry is followed by more entries that belong to the same
	// commit as this entry.
	Continued bool
}

LogEntry represents a single store operation. This operation may have been part of a transaction, as signified by the Continued boolean. Read-only operations (and read-only transactions) are not logged.

func (LogEntry) VDLIsZero

func (x LogEntry) VDLIsZero() bool

func (*LogEntry) VDLRead

func (x *LogEntry) VDLRead(dec vdl.Decoder) error

func (LogEntry) VDLReflect

func (LogEntry) VDLReflect(struct {
	Name string `vdl:"v.io/x/ref/services/syncbase/store/watchable.LogEntry"`
})

func (LogEntry) VDLWrite

func (x LogEntry) VDLWrite(enc vdl.Encoder) error

type Options

type Options struct {
	// Key prefixes to version and log.
	ManagedPrefixes []string
}

Options configures a Store.

func GetOptions

func GetOptions(st store.Store) (*Options, error)

GetOptions returns the options configured on a watchable.Store. TODO(rdaoud): expose watchable store through an interface and change this function to be a method on the store.

type PutOp

type PutOp struct {
	Key     []byte
	Version []byte
}

PutOp represents a store put operation. The new version is written instead of the value to avoid duplicating the user data in the store. The version is used to access the user data of that specific mutation.

func (PutOp) VDLIsZero

func (x PutOp) VDLIsZero() bool

func (*PutOp) VDLRead

func (x *PutOp) VDLRead(dec vdl.Decoder) error

func (PutOp) VDLReflect

func (PutOp) VDLReflect(struct {
	Name string `vdl:"v.io/x/ref/services/syncbase/store/watchable.PutOp"`
})

func (PutOp) VDLWrite

func (x PutOp) VDLWrite(enc vdl.Encoder) error

type ScanOp

type ScanOp struct {
	Start []byte
	Limit []byte
}

ScanOp represents a store scan operation.

func (ScanOp) VDLIsZero

func (x ScanOp) VDLIsZero() bool

func (*ScanOp) VDLRead

func (x *ScanOp) VDLRead(dec vdl.Decoder) error

func (ScanOp) VDLReflect

func (ScanOp) VDLReflect(struct {
	Name string `vdl:"v.io/x/ref/services/syncbase/store/watchable.ScanOp"`
})

func (ScanOp) VDLWrite

func (x ScanOp) VDLWrite(enc vdl.Encoder) error

type Store

type Store struct {

	// TODO(razvanm): make the clock private. The clock is used only by the
	// addSyncgroupLogRec function from the vsync package.
	Clock Clock // used to provide write timestamps
	// contains filtered or unexported fields
}

func Wrap

func Wrap(st store.Store, clock Clock, opts *Options) (*Store, error)

Wrap returns a *Store that wraps the given store.Store.

func (*Store) Close

func (st *Store) Close() error

Close implements the store.Store interface.

func (*Store) Delete

func (st *Store) Delete(key []byte) error

Delete implements the store.StoreWriter interface.

func (*Store) Get

func (st *Store) Get(key, valbuf []byte) ([]byte, error)

Get implements the store.StoreReader interface.

func (*Store) NewSnapshot

func (st *Store) NewSnapshot() store.Snapshot

NewSnapshot implements the store.Store interface.

func (*Store) NewTransaction

func (st *Store) NewTransaction() store.Transaction

NewTransaction implements the store.Store interface.

func (*Store) NewWatchableTransaction

func (st *Store) NewWatchableTransaction() *Transaction

NewWatchableTransaction implements the Store interface.

func (*Store) Put

func (st *Store) Put(key, value []byte) error

Put implements the store.StoreWriter interface.

func (*Store) Scan

func (st *Store) Scan(start, limit []byte) store.Stream

Scan implements the store.StoreReader interface.

func (*Store) UpdateLogStart

func (st *Store) UpdateLogStart(syncMarker watch.ResumeMarker) (watch.ResumeMarker, error)

UpdateLogStart takes as input the resume marker of the sync watcher and returns the new log start, computed as the earliest resume marker of all active watchers including the sync watcher. The new log start is persisted before being returned, making it safe to garbage collect earlier log entries. syncMarker is assumed to monotonically increase, always remaining between the log start and end (inclusive).

func (*Store) WatchUpdates

func (st *Store) WatchUpdates(resumeMarker watch.ResumeMarker) (_ *Client, cancel func())

WatchUpdates returns a Client which supports waiting for changes and iterating over the watch log starting from resumeMarker, as well as a cancel function which MUST be called to release watch resources. Returns a stopped Client if the resume marker is invalid or pointing to an already garbage collected segment of the log.

type Transaction

type Transaction struct {
	St *Store
	// contains filtered or unexported fields
}

func (*Transaction) Abort

func (tx *Transaction) Abort() error

Abort implements the store.Transaction interface.

func (*Transaction) Commit

func (tx *Transaction) Commit() error

Commit implements the store.Transaction interface.

func (*Transaction) Delete

func (tx *Transaction) Delete(key []byte) error

Delete implements the store.StoreWriter interface.

func (*Transaction) Get

func (tx *Transaction) Get(key, valbuf []byte) ([]byte, error)

Get implements the store.StoreReader interface.

func (*Transaction) Put

func (tx *Transaction) Put(key, value []byte) error

Put implements the store.StoreWriter interface.

func (*Transaction) Scan

func (tx *Transaction) Scan(start, limit []byte) store.Stream

Scan implements the store.StoreReader interface.

type TransactionOptions

type TransactionOptions struct {
	NumAttempts int // number of attempts; only used by RunInTransaction
}

TODO(razvanm): This is copied from store/util.go. TODO(sadovsky): Move this to model.go and make it an argument to Store.NewTransaction.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL