syncbase

package
v2.5.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 15, 2020 License: Apache-2.0 Imports: 12 Imported by: 12

Documentation

Overview

Package syncbase defines common structures used in multiple datasync transports. The following reusable structures are defined:

- KeyValProtoWatcher maintains watch registrations/subscriptions.

  • Registry of the latest revisions of values per each key, synchronized by datasync.

- Default implementation of Events & Iterators interfaces defined in data_api.go.

- Events & Iterators in this package are reused by some datasync transports.

Index

Constants

This section is empty.

Variables

View Source
var (
	// PropagateChangesTimeout defines timeout used during
	// change propagation after which it will return an error.
	PropagateChangesTimeout = time.Second * 20
)

Functions

func AggregateDone

func AggregateDone(events []func(chan error), done chan error)

AggregateDone can be reused to avoid repetitive code that triggers a slice of events and waits until it is finished.

Types

type Adapter

type Adapter struct {
	Watcher   datasync.KeyValProtoWatcher
	Publisher datasync.KeyProtoValWriter
}

Adapter implements datasync.TransportAdapter but allows the Watch/ Put functions to be optionally implemented.

func (*Adapter) Put

func (adapter *Adapter) Put(key string, data proto.Message) error

Put uses Kafka KeyValProtoWatcher Topic KeyProtoValWriter.

func (*Adapter) Watch

func (adapter *Adapter) Watch(resyncName string, changeChan chan datasync.ChangeEvent,
	resyncChan chan datasync.ResyncEvent, keyPrefixes ...string) (datasync.WatchRegistration, error)

Watch uses Kafka KeyValProtoWatcher Topic KeyValProtoWatcher.

type Change

type Change struct {
	datasync.KeyVal
	// contains filtered or unexported fields
}

Change represents a single Key-value pair plus changeType.

func NewChange

func NewChange(key string, value proto.Message, rev int64, changeType datasync.Op) *Change

NewChange creates a new instance of Change.

func NewChangeBytes

func NewChangeBytes(key string, value []byte, rev int64, changeType datasync.Op) *Change

NewChangeBytes creates a new instance of NewChangeBytes.

func (*Change) GetChangeType

func (kv *Change) GetChangeType() datasync.Op

GetChangeType returns type of the change.

type ChangeEvent

type ChangeEvent struct {
	Changes []datasync.ProtoWatchResp
	// contains filtered or unexported fields
}

ChangeEvent is a simple structure that implements interface datasync.ChangeEvent.

func (*ChangeEvent) Done

func (ev *ChangeEvent) Done(err error)

Done propagates call to delegate. If the delegate is nil, then the error is logged (if occurred).

func (*ChangeEvent) GetChanges

func (ev *ChangeEvent) GetChanges() []datasync.ProtoWatchResp

GetChanges returns list of changes for the change event.

func (*ChangeEvent) GetContext

func (ev *ChangeEvent) GetContext() context.Context

GetContext returns the context associated with the event.

type ChangeResp

type ChangeResp struct {
	Key        string
	ChangeType datasync.Op
	CurrVal    datasync.LazyValue
	CurrRev    int64
	PrevVal    datasync.LazyValue
}

ChangeResp represents single change in the change event.

func (*ChangeResp) GetChangeType

func (ev *ChangeResp) GetChangeType() datasync.Op

GetChangeType returns type of the event.

func (*ChangeResp) GetKey

func (ev *ChangeResp) GetKey() string

GetKey returns the Key associated with the change.

func (*ChangeResp) GetPrevValue

func (ev *ChangeResp) GetPrevValue(prevVal proto.Message) (prevExists bool, err error)

GetPrevValue returns the value before change.

func (*ChangeResp) GetRevision

func (ev *ChangeResp) GetRevision() int64

GetRevision - see the comments in the interface datasync.ChangeEvent

func (*ChangeResp) GetValue

func (ev *ChangeResp) GetValue(val proto.Message) (err error)

GetValue - see the comments in the interface datasync.ChangeEvent

type DoneCallback

type DoneCallback struct {
	Callback func(error)
}

DoneCallback is a small reusable part that is embedded to other events using composition. It implements datasync.CallbackResult.

func (*DoneCallback) Done

func (ev *DoneCallback) Done(err error)

Done propagates error to the callback.

type DoneChannel

type DoneChannel struct {
	DoneChan chan error
}

DoneChannel is a small reusable part that is embedded to other events using composition. It implements datasync.CallbackResult.

func NewDoneChannel

func NewDoneChannel(doneChan chan error) *DoneChannel

NewDoneChannel creates a new instance of DoneChannel.

func (*DoneChannel) Done

func (ev *DoneChannel) Done(err error)

Done propagates error to the channel.

type KVIterator

type KVIterator struct {
	// contains filtered or unexported fields
}

KVIterator is a simple in memory implementation of data.Iterator.

func NewKVIterator

func NewKVIterator(data []datasync.KeyVal) *KVIterator

NewKVIterator creates a new instance of KVIterator.

func (*KVIterator) GetNext

func (it *KVIterator) GetNext() (kv datasync.KeyVal, allReceived bool)

GetNext TODO

type KeyVal

type KeyVal struct {
	datasync.LazyValue
	// contains filtered or unexported fields
}

KeyVal represents a single key-value pair.

func NewKeyVal

func NewKeyVal(key string, value datasync.LazyValue, rev int64) *KeyVal

NewKeyVal creates a new instance of KeyVal.

func (*KeyVal) GetKey

func (kv *KeyVal) GetKey() string

GetKey returns the key of the pair.

func (*KeyVal) GetRevision

func (kv *KeyVal) GetRevision() int64

GetRevision returns revision associated with the latest change in the key-value pair.

type KeyValBytes

type KeyValBytes struct {
	// contains filtered or unexported fields
}

KeyValBytes represents a single key-value pair.

func NewKeyValBytes

func NewKeyValBytes(key string, value []byte, rev int64) *KeyValBytes

NewKeyValBytes creates a new instance of KeyValBytes.

func (*KeyValBytes) GetKey

func (kv *KeyValBytes) GetKey() string

GetKey returns the key of the pair.

func (*KeyValBytes) GetRevision

func (kv *KeyValBytes) GetRevision() int64

GetRevision returns revision associated with the latest change in the key-value pair.

func (*KeyValBytes) GetValue

func (kv *KeyValBytes) GetValue(message proto.Message) error

GetValue returns the value of the pair.

type PrevRevisions

type PrevRevisions struct {
	// contains filtered or unexported fields
}

PrevRevisions maintains the map of keys & values with revision.

func NewLatestRev

func NewLatestRev() *PrevRevisions

NewLatestRev is a constructor.

func (*PrevRevisions) Cleanup

func (r *PrevRevisions) Cleanup()

Cleanup removes all data from the registry

func (*PrevRevisions) Del

func (r *PrevRevisions) Del(key string) (found bool, prev datasync.KeyVal)

Del deletes the entry from revisions and returns previous value.

func (*PrevRevisions) Get

func (r *PrevRevisions) Get(key string) (found bool, value datasync.KeyVal)

Get gets the last proto.Message with it's revision.

func (*PrevRevisions) ListKeys

func (r *PrevRevisions) ListKeys() (ret []string)

ListKeys returns all stored keys.

func (*PrevRevisions) Put

func (r *PrevRevisions) Put(key string, val datasync.LazyValue) (
	found bool, prev datasync.KeyVal, currRev int64)

Put updates the entry in the revisions and returns previous value.

func (*PrevRevisions) PutWithRevision

func (r *PrevRevisions) PutWithRevision(key string, inCurrent datasync.KeyVal) (
	found bool, prev datasync.KeyVal)

PutWithRevision updates the entry in the revisions and returns previous value.

type Registry

type Registry struct {
	// contains filtered or unexported fields
}

Registry of subscriptions and latest revisions. This structure contains extracted reusable code among various datasync implementations. Because of this code, datasync plugins does not need to repeat code related management of subscriptions.

func NewRegistry

func NewRegistry() *Registry

NewRegistry creates reusable registry of subscriptions for a particular datasync plugin.

func (*Registry) LastRev

func (adapter *Registry) LastRev() *PrevRevisions

LastRev is only a getter.

func (*Registry) PropagateChanges

func (adapter *Registry) PropagateChanges(ctx context.Context, txData map[string]datasync.ChangeValue) error

PropagateChanges fills registered channels with the data.

func (*Registry) PropagateResync

func (adapter *Registry) PropagateResync(ctx context.Context, txData map[string]datasync.ChangeValue) error

PropagateResync fills registered channels with the data.

func (*Registry) Subscriptions

func (adapter *Registry) Subscriptions() map[string]*Subscription

Subscriptions returns the current subscriptions.

func (*Registry) Watch

func (adapter *Registry) Watch(resyncName string, changeChan chan datasync.ChangeEvent,
	resyncChan chan datasync.ResyncEvent, keyPrefixes ...string) (datasync.WatchRegistration, error)

Watch only appends channels.

type ResyncEventDB

type ResyncEventDB struct {
	*DoneChannel
	// contains filtered or unexported fields
}

ResyncEventDB implements the interface datasync.ResyncEvent (see comments in there).

func NewResyncEventDB

func NewResyncEventDB(ctx context.Context, its map[string]datasync.KeyValIterator) *ResyncEventDB

NewResyncEventDB creates a new instance of ResyncEventDB using the given map of iterators.

func (*ResyncEventDB) GetContext

func (ev *ResyncEventDB) GetContext() context.Context

GetContext returns the context associated with the event.

func (*ResyncEventDB) GetValues

func (ev *ResyncEventDB) GetValues() map[string]datasync.KeyValIterator

GetValues returns values of the event.

type Subscription

type Subscription struct {
	ResyncName  string
	ChangeChan  chan datasync.ChangeEvent
	ResyncChan  chan datasync.ResyncEvent
	CloseChan   chan string
	KeyPrefixes []string
}

Subscription represents single subscription for Registry.

type WatchDataReg

type WatchDataReg struct {
	ResyncName string
	// contains filtered or unexported fields
}

WatchDataReg implements interface datasync.WatchDataRegistration.

func (*WatchDataReg) Close

func (reg *WatchDataReg) Close() error

Close stops watching of particular KeyPrefixes.

func (*WatchDataReg) Register

func (reg *WatchDataReg) Register(resyncName, keyPrefix string) error

Register starts watching of particular key prefix. Method returns error if key which should be added already exists

func (*WatchDataReg) Unregister

func (reg *WatchDataReg) Unregister(keyPrefix string) error

Unregister stops watching of particular key prefix. Method returns error if key which should be removed does not exist or in case the channel to close goroutine is nil

Directories

Path Synopsis
Package msg contains: - the definition of PROTOBUF structures and gRPC service, - helpers for mapping between PROTOBUF structures & the datasync_api.go.
Package msg contains: - the definition of PROTOBUF structures and gRPC service, - helpers for mapping between PROTOBUF structures & the datasync_api.go.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL