watchable

package
v0.0.0-...-ff5f600 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 30, 2016 License: BSD-3-Clause Imports: 20 Imported by: 0

Documentation

Overview

Package watchable provides a Syncbase-specific store.Store wrapper that provides versioned storage for specified prefixes and maintains a watchable log of operations performed on versioned records. This log forms the basis for the implementation of client-facing watch as well as the sync module's internal watching of store updates.

LogEntry records are stored chronologically, using keys of the form "$log:<seq>". Sequence numbers are zero-padded to ensure that the lexicographic order matches the numeric order.

Version number records are stored using keys of the form "$version:<key>", where <key> is the client-specified key.

TODO(razvanm): Switch to package_test. This is a little involved because createStore is used by the other _test.go files in this directory and TestLogEntryTimestamps is poking inside the watchable store to read the sequence number.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func AddOp

func AddOp(tx *Transaction, op interface{}, precond func() error) error

AddOp provides a generic way to add an arbitrary op to the log. If precond is not nil it will be run with the locks held and the append will only happen if the precond returns an error.

func GetAtVersion

func GetAtVersion(ctx *context.T, st store.StoreReader, key, valbuf, version []byte) ([]byte, error)

GetAtVersion returns the value of a managed key at the requested version. This method is used by the Sync module when the responder needs to send objects over the wire. At minimum, an object implementing the StoreReader interface is required since this is a Get operation. TODO(razvanm): find a way to get rid of the type switch.

func GetResumeMarker

func GetResumeMarker(st store.StoreReader) (watch.ResumeMarker, error)

GetResumeMarker returns the ResumeMarker that points to the current end of the event log.

func GetVersion

func GetVersion(ctx *context.T, st store.StoreReader, key []byte) ([]byte, error)

GetVersion returns the current version of a managed key. This method is used by the Sync module when the initiator is attempting to add new versions of objects. Reading the version key is used for optimistic concurrency control. At minimum, an object implementing the StoreReader interface is required since this is a Get operation. TODO(razvanm): find a way to get rid of the type switch.

func MakeResumeMarker

func MakeResumeMarker(seq uint64) watch.ResumeMarker

MakeResumeMarker converts a sequence number to the resume marker.

func ManagesKey

func ManagesKey(tx *Transaction, key []byte) bool

ManagesKey returns true if the store used by a transaction manages a particular key.

func NewVersion

func NewVersion() []byte

NewVersion returns a new version for a store entry mutation.

func PutAtVersion

func PutAtVersion(ctx *context.T, tx *Transaction, key, valbuf, version []byte) error

PutAtVersion puts a value for the managed key at the requested version. This method is used by the Sync module exclusively when the initiator adds objects with versions created on other Syncbases. At minimum, an object implementing the Transaction interface is required since this is a Put operation.

func PutVersion

func PutVersion(ctx *context.T, tx *Transaction, key, version []byte) error

PutVersion updates the version of a managed key to the requested version. This method is used by the Sync module exclusively when the initiator selects which of the already stored versions (via PutAtVersion calls) becomes the current version. At minimum, an object implementing the Transaction interface is required since this is a Put operation.

func RunInTransaction

func RunInTransaction(st *Store, fn func(tx *Transaction) error) error

RunInTransaction runs the given fn in a transaction, managing retries and commit/abort.

func SetTransactionFromSync

func SetTransactionFromSync(tx *Transaction)

SetTransactionFromSync marks this transaction as created by sync as opposed to one created by an application. The net effect is that, at commit time, the log entries written are marked as made by sync. This allows the sync Watcher to ignore them (echo suppression) because it made these updates. Note: this is an internal function used by sync, not part of the interface. TODO(rdaoud): support a generic echo-suppression mechanism for apps as well maybe by having a creator ID in the transaction and log entries. TODO(rdaoud): fold this flag (or creator ID) into Tx options when available. TODO(razvanm): move to syncbase side by using a generic annotation mechanism.

func WatchUpdates

func WatchUpdates(st store.Store) (update <-chan struct{}, cancel func())

WatchUpdates returns a channel that can be used to wait for changes of the database, as well as a cancel function which MUST be called to release the watch resources. If the update channel is closed, the store is closed and no more updates will happen. Otherwise, the channel will have a value available whenever the store has changed since the last receive on the channel.

Types

type Clock

type Clock interface {
	// Now returns the current time.
	Now() (time.Time, error)
}

Clock is an interface to a generic clock.

type DeleteOp

type DeleteOp struct {
	Key []byte
}

DeleteOp represents a store delete operation.

func (*DeleteOp) FillVDLTarget

func (m *DeleteOp) FillVDLTarget(t vdl.Target, tt *vdl.Type) error

func (*DeleteOp) MakeVDLTarget

func (m *DeleteOp) MakeVDLTarget() vdl.Target

func (DeleteOp) VDLIsZero

func (x DeleteOp) VDLIsZero() bool

func (*DeleteOp) VDLRead

func (x *DeleteOp) VDLRead(dec vdl.Decoder) error

func (DeleteOp) VDLWrite

func (x DeleteOp) VDLWrite(enc vdl.Encoder) error

type DeleteOpTarget

type DeleteOpTarget struct {
	Value *DeleteOp

	vdl.TargetBase
	vdl.FieldsTargetBase
	// contains filtered or unexported fields
}

func (*DeleteOpTarget) FinishField

func (t *DeleteOpTarget) FinishField(_, _ vdl.Target) error

func (*DeleteOpTarget) FinishFields

func (t *DeleteOpTarget) FinishFields(_ vdl.FieldsTarget) error

func (*DeleteOpTarget) StartField

func (t *DeleteOpTarget) StartField(name string) (key, field vdl.Target, _ error)

func (*DeleteOpTarget) StartFields

func (t *DeleteOpTarget) StartFields(tt *vdl.Type) (vdl.FieldsTarget, error)

func (*DeleteOpTarget) ZeroField

func (t *DeleteOpTarget) ZeroField(name string) error

type GetOp

type GetOp struct {
	Key []byte
}

GetOp represents a store get operation.

func (*GetOp) FillVDLTarget

func (m *GetOp) FillVDLTarget(t vdl.Target, tt *vdl.Type) error

func (*GetOp) MakeVDLTarget

func (m *GetOp) MakeVDLTarget() vdl.Target

func (GetOp) VDLIsZero

func (x GetOp) VDLIsZero() bool

func (*GetOp) VDLRead

func (x *GetOp) VDLRead(dec vdl.Decoder) error

func (GetOp) VDLWrite

func (x GetOp) VDLWrite(enc vdl.Encoder) error

type GetOpTarget

type GetOpTarget struct {
	Value *GetOp

	vdl.TargetBase
	vdl.FieldsTargetBase
	// contains filtered or unexported fields
}

func (*GetOpTarget) FinishField

func (t *GetOpTarget) FinishField(_, _ vdl.Target) error

func (*GetOpTarget) FinishFields

func (t *GetOpTarget) FinishFields(_ vdl.FieldsTarget) error

func (*GetOpTarget) StartField

func (t *GetOpTarget) StartField(name string) (key, field vdl.Target, _ error)

func (*GetOpTarget) StartFields

func (t *GetOpTarget) StartFields(tt *vdl.Type) (vdl.FieldsTarget, error)

func (*GetOpTarget) ZeroField

func (t *GetOpTarget) ZeroField(name string) error

type LogEntry

type LogEntry struct {
	// The store operation that was performed.
	Op *vom.RawBytes
	// Time when the operation was committed in nanoseconds since the epoch.
	// Note: We don't use time.Time here because VDL's time.Time consists of
	// {Seconds int64, Nanos int32}, which is more expensive than a single int64.
	CommitTimestamp int64
	// Operation came from sync (used for echo suppression).
	// TODO(razvanm): this field is specific to syncbase. We should add a
	// generic way to add fields and use that instead.
	FromSync bool
	// If true, this entry is followed by more entries that belong to the same
	// commit as this entry.
	Continued bool
}

LogEntry represents a single store operation. This operation may have been part of a transaction, as signified by the Continued boolean. Read-only operations (and read-only transactions) are not logged.

func ReadBatchFromLog

func ReadBatchFromLog(st store.Store, resumeMarker watch.ResumeMarker) ([]*LogEntry, watch.ResumeMarker, error)

ReadBatchFromLog returns a batch of watch log records (a transaction) from the given database and the new resume marker at the end of the batch.

func (*LogEntry) FillVDLTarget

func (m *LogEntry) FillVDLTarget(t vdl.Target, tt *vdl.Type) error

func (*LogEntry) MakeVDLTarget

func (m *LogEntry) MakeVDLTarget() vdl.Target

func (LogEntry) VDLIsZero

func (x LogEntry) VDLIsZero() bool

func (*LogEntry) VDLRead

func (x *LogEntry) VDLRead(dec vdl.Decoder) error

func (LogEntry) VDLWrite

func (x LogEntry) VDLWrite(enc vdl.Encoder) error

type LogEntryTarget

type LogEntryTarget struct {
	Value *LogEntry

	vdl.TargetBase
	vdl.FieldsTargetBase
	// contains filtered or unexported fields
}

func (*LogEntryTarget) FinishField

func (t *LogEntryTarget) FinishField(_, _ vdl.Target) error

func (*LogEntryTarget) FinishFields

func (t *LogEntryTarget) FinishFields(_ vdl.FieldsTarget) error

func (*LogEntryTarget) StartField

func (t *LogEntryTarget) StartField(name string) (key, field vdl.Target, _ error)

func (*LogEntryTarget) StartFields

func (t *LogEntryTarget) StartFields(tt *vdl.Type) (vdl.FieldsTarget, error)

func (*LogEntryTarget) ZeroField

func (t *LogEntryTarget) ZeroField(name string) error

type Options

type Options struct {
	// Key prefixes to version and log. If nil, all keys are managed.
	ManagedPrefixes []string
}

Options configures a Store.

func GetOptions

func GetOptions(st store.Store) (*Options, error)

GetOptions returns the options configured on a watchable.Store. TODO(rdaoud): expose watchable store through an interface and change this function to be a method on the store.

type PutOp

type PutOp struct {
	Key     []byte
	Version []byte
}

PutOp represents a store put operation. The new version is written instead of the value to avoid duplicating the user data in the store. The version is used to access the user data of that specific mutation.

func (*PutOp) FillVDLTarget

func (m *PutOp) FillVDLTarget(t vdl.Target, tt *vdl.Type) error

func (*PutOp) MakeVDLTarget

func (m *PutOp) MakeVDLTarget() vdl.Target

func (PutOp) VDLIsZero

func (x PutOp) VDLIsZero() bool

func (*PutOp) VDLRead

func (x *PutOp) VDLRead(dec vdl.Decoder) error

func (PutOp) VDLWrite

func (x PutOp) VDLWrite(enc vdl.Encoder) error

type PutOpTarget

type PutOpTarget struct {
	Value *PutOp

	vdl.TargetBase
	vdl.FieldsTargetBase
	// contains filtered or unexported fields
}

func (*PutOpTarget) FinishField

func (t *PutOpTarget) FinishField(_, _ vdl.Target) error

func (*PutOpTarget) FinishFields

func (t *PutOpTarget) FinishFields(_ vdl.FieldsTarget) error

func (*PutOpTarget) StartField

func (t *PutOpTarget) StartField(name string) (key, field vdl.Target, _ error)

func (*PutOpTarget) StartFields

func (t *PutOpTarget) StartFields(tt *vdl.Type) (vdl.FieldsTarget, error)

func (*PutOpTarget) ZeroField

func (t *PutOpTarget) ZeroField(name string) error

type ScanOp

type ScanOp struct {
	Start []byte
	Limit []byte
}

ScanOp represents a store scan operation.

func (*ScanOp) FillVDLTarget

func (m *ScanOp) FillVDLTarget(t vdl.Target, tt *vdl.Type) error

func (*ScanOp) MakeVDLTarget

func (m *ScanOp) MakeVDLTarget() vdl.Target

func (ScanOp) VDLIsZero

func (x ScanOp) VDLIsZero() bool

func (*ScanOp) VDLRead

func (x *ScanOp) VDLRead(dec vdl.Decoder) error

func (ScanOp) VDLWrite

func (x ScanOp) VDLWrite(enc vdl.Encoder) error

type ScanOpTarget

type ScanOpTarget struct {
	Value *ScanOp

	vdl.TargetBase
	vdl.FieldsTargetBase
	// contains filtered or unexported fields
}

func (*ScanOpTarget) FinishField

func (t *ScanOpTarget) FinishField(_, _ vdl.Target) error

func (*ScanOpTarget) FinishFields

func (t *ScanOpTarget) FinishFields(_ vdl.FieldsTarget) error

func (*ScanOpTarget) StartField

func (t *ScanOpTarget) StartField(name string) (key, field vdl.Target, _ error)

func (*ScanOpTarget) StartFields

func (t *ScanOpTarget) StartFields(tt *vdl.Type) (vdl.FieldsTarget, error)

func (*ScanOpTarget) ZeroField

func (t *ScanOpTarget) ZeroField(name string) error

type Store

type Store struct {

	// TODO(razvanm): make the clock private. The clock is used only by the
	// addSyncgroupLogRec function from the vsync package.
	Clock Clock // used to provide write timestamps
	// contains filtered or unexported fields
}

func Wrap

func Wrap(st store.Store, clock Clock, opts *Options) (*Store, error)

Wrap returns a *Store that wraps the given store.Store.

func (*Store) Close

func (st *Store) Close() error

Close implements the store.Store interface.

func (*Store) Delete

func (st *Store) Delete(key []byte) error

Delete implements the store.StoreWriter interface.

func (*Store) Get

func (st *Store) Get(key, valbuf []byte) ([]byte, error)

Get implements the store.StoreReader interface.

func (*Store) NewSnapshot

func (st *Store) NewSnapshot() store.Snapshot

NewSnapshot implements the store.Store interface.

func (*Store) NewTransaction

func (st *Store) NewTransaction() store.Transaction

NewTransaction implements the store.Store interface.

func (*Store) NewWatchableTransaction

func (st *Store) NewWatchableTransaction() *Transaction

NewWatchableTransaction implements the Store interface.

func (*Store) Put

func (st *Store) Put(key, value []byte) error

Put implements the store.StoreWriter interface.

func (*Store) Scan

func (st *Store) Scan(start, limit []byte) store.Stream

Scan implements the store.StoreReader interface.

type Transaction

type Transaction struct {
	St *Store
	// contains filtered or unexported fields
}

func (*Transaction) Abort

func (tx *Transaction) Abort() error

Abort implements the store.Transaction interface.

func (*Transaction) Commit

func (tx *Transaction) Commit() error

Commit implements the store.Transaction interface.

func (*Transaction) Delete

func (tx *Transaction) Delete(key []byte) error

Delete implements the store.StoreWriter interface.

func (*Transaction) Get

func (tx *Transaction) Get(key, valbuf []byte) ([]byte, error)

Get implements the store.StoreReader interface.

func (*Transaction) Put

func (tx *Transaction) Put(key, value []byte) error

Put implements the store.StoreWriter interface.

func (*Transaction) Scan

func (tx *Transaction) Scan(start, limit []byte) store.Stream

Scan implements the store.StoreReader interface.

type TransactionOptions

type TransactionOptions struct {
	NumAttempts int // number of attempts; only used by RunInTransaction
}

TODO(razvanm): This is copied from store/util.go. TODO(sadovsky): Move this to model.go and make it an argument to Store.NewTransaction.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL