juju: github.com/juju/juju/state/watcher Index | Files | Directories

package watcher

import "github.com/juju/juju/state/watcher"

The watcher package provides an interface for observing changes to arbitrary MongoDB documents that are maintained via the mgo/txn transaction package.

Index

Package Files

helpers.go hubwatcher.go logger.go txnwatcher.go watcher.go

Variables

var (
    // HubWatcherIdleFunc allows tets to be able to get callbacks
    // when the hub watcher hasn't notified any watchers for a specified time.
    HubWatcherIdleFunc func(string)

    // HubWatcherIdleTime relates to how long the hub needs to wait
    // having notified no watchers to be considered idle.
    HubWatcherIdleTime = 50 * time.Millisecond
)
var (

    // PollStrategy is used to determine how long
    // to delay between poll intervals. A new timer
    // is created each time some watcher event is
    // fired or if the old timer completes.
    //
    // It must not be changed when any watchers are active.
    PollStrategy retry.Strategy = retry.Exponential{
        Initial:  txnWatcherShortWait,
        Factor:   1.5,
        MaxDelay: 5 * time.Second,
    }

    // TxnPollNotifyFunc allows tests to be able to specify
    // callbacks each time the database has been polled and processed.
    TxnPollNotifyFunc func()
)
var Period time.Duration = 5 * time.Second

Period is the delay between each sync. It must not be changed when any watchers are active.

func EnsureErr Uses

func EnsureErr(w Errer) error

EnsureErr returns the error with which w died. Calling it will also return an error if w is still running or was stopped cleanly.

func Stop Uses

func Stop(w Stopper, t *tomb.Tomb)

Stop stops the watcher. If an error is returned by the watcher, t is killed with the error.

type BaseWatcher Uses

type BaseWatcher interface {
    worker.Worker

    Dead() <-chan struct{}
    Err() error

    // Watch will send events on the Change channel whenever the document you
    // are watching is changed. Note that in order to not miss any changes, you
    // should start Watching the document before you read the document.
    // At this low level Watch layer, there will not be an initial event.
    // Instead, Watch is synchronous, the Watch will not return until the
    // watcher is registered.
    // TODO(jam): 2019-01-31 Update Watch() to return an error rather now
    // that it is synchronous
    Watch(collection string, id interface{}, ch chan<- Change)

    // WatchMulti is similar to Watch, it just allows you to watch a set of
    // documents in the same collection in one request. Just like Watch,
    // no event will be sent for documents that don't change.
    WatchMulti(collection string, ids []interface{}, ch chan<- Change) error

    // WatchCollection will give an event if any documents are modified/added/removed
    // from the collection.
    // TODO(jam): 2019-01-31 Update WatchCollection() to return an error rather now
    // that it is synchronous
    WatchCollection(collection string, ch chan<- Change)

    // WatchCollectionWithFilter will give an event if any documents are modified/added/removed
    // from the collection. Filter can be supplied to check if a given document
    // should send an event.
    // TODO(jam): 2019-01-31 Update WatchCollectionWithFilter() to return an error rather now
    // that it is synchronous
    WatchCollectionWithFilter(collection string, ch chan<- Change, filter func(interface{}) bool)

    // Unwatch is an asynchronous request to stop watching a given watch.
    // It is an error to try to Unwatch something that is not being watched.
    // Note that Unwatch can be called for things that have been registered with
    // either Watch() or WatchMulti(). For WatchCollection or WatchCollectionWithFilter
    // use UnwatchCollection.
    // TODO(jam): 2019-01-31 Currently Unwatching something that isn't watched
    // is a panic, should we make the method synchronous and turn it into an error?
    // Or just turn it into a no-op
    Unwatch(collection string, id interface{}, ch chan<- Change)

    // UnwatchCollection is used when you are done with a watch started with
    // either WatchCollection or WatchCollectionWithFilter. You must pass in
    // the same Change channel. Unwatching a collection that isn't being watched
    // is an error that will panic().
    UnwatchCollection(collection string, ch chan<- Change)
}

BaseWatcher represents watch methods on the worker responsible for watching for database changes.

type Change Uses

type Change struct {
    // C and Id hold the collection name and document _id field value.
    C   string
    Id  interface{}

    // Revno is the latest known value for the document's txn-revno
    // field, or -1 if the document was deleted.
    Revno int64
}

A Change holds information about a document change.

type Clock Uses

type Clock interface {
    Now() time.Time
    After(time.Duration) <-chan time.Time
}

Clock represents the time methods used.

type Errer Uses

type Errer interface {
    Err() error
}

Errer is implemented by all watchers.

type Hub Uses

type Hub interface {
    Publish(topic string, data interface{}) <-chan struct{}
}

Hub represents a pubsub hub. The TxnWatcher only ever publishes events to the hub.

type HubSource Uses

type HubSource interface {
    SubscribeMatch(matcher func(string) bool, handler func(string, interface{})) func()
}

HubSource represents the listening aspects of the pubsub hub.

type HubWatcher Uses

type HubWatcher struct {
    // contains filtered or unexported fields
}

HubWatcher listens to events from the hub and passes them on to the registered watchers.

func NewDead Uses

func NewDead(err error) *HubWatcher

NewDead returns a new watcher that is already dead and always returns the given error from its Err method.

func NewHubWatcher Uses

func NewHubWatcher(config HubWatcherConfig) (*HubWatcher, error)

NewHubWatcher returns a new watcher observing Change events published to the hub.

func (*HubWatcher) Dead Uses

func (w *HubWatcher) Dead() <-chan struct{}

Dead returns a channel that is closed when the watcher has stopped.

func (*HubWatcher) Err Uses

func (w *HubWatcher) Err() error

Err returns the error with which the watcher stopped. It returns nil if the watcher stopped cleanly, tomb.ErrStillAlive if the watcher is still running properly, or the respective error if the watcher is terminating or has terminated with an error.

func (*HubWatcher) Kill Uses

func (w *HubWatcher) Kill()

Kill is part of the worker.Worker interface.

func (*HubWatcher) Report Uses

func (w *HubWatcher) Report() map[string]interface{}

Report conforms to the worker.Runner.Report interface for returning information about the active worker.

func (*HubWatcher) Stats Uses

func (w *HubWatcher) Stats() HubWatcherStats

func (*HubWatcher) Stop Uses

func (w *HubWatcher) Stop() error

Stop stops all the watcher activities.

func (*HubWatcher) Unwatch Uses

func (w *HubWatcher) Unwatch(collection string, id interface{}, ch chan<- Change)

Unwatch stops watching the given collection and document id via ch.

func (*HubWatcher) UnwatchCollection Uses

func (w *HubWatcher) UnwatchCollection(collection string, ch chan<- Change)

UnwatchCollection stops watching the given collection via ch.

func (*HubWatcher) Wait Uses

func (w *HubWatcher) Wait() error

Wait is part of the worker.Worker interface.

func (*HubWatcher) Watch Uses

func (w *HubWatcher) Watch(collection string, id interface{}, ch chan<- Change)

Watch starts watching the given collection and document id. An event will be sent onto ch whenever a matching document's txn-revno field is observed to change after a transaction is applied.

func (*HubWatcher) WatchCollection Uses

func (w *HubWatcher) WatchCollection(collection string, ch chan<- Change)

WatchCollection starts watching the given collection. An event will be sent onto ch whenever the txn-revno field is observed to change after a transaction is applied for any document in the collection.

func (*HubWatcher) WatchCollectionWithFilter Uses

func (w *HubWatcher) WatchCollectionWithFilter(collection string, ch chan<- Change, filter func(interface{}) bool)

WatchCollectionWithFilter starts watching the given collection. An event will be sent onto ch whenever the txn-revno field is observed to change after a transaction is applied for any document in the collection, so long as the specified filter function returns true when called with the document id value.

func (*HubWatcher) WatchMulti Uses

func (w *HubWatcher) WatchMulti(collection string, ids []interface{}, ch chan<- Change) error

WatchMulti watches a particular collection for several ids. The request is synchronous with the worker loop, so by the time the function returns, we guarantee that the watch is in place. If there is a mistake in the arguments (id is nil, channel is already watching a given id), an error will be returned and no watches will be added.

type HubWatcherConfig Uses

type HubWatcherConfig struct {
    // Hub is the source of the events for the hub watcher.
    Hub HubSource
    // Clock allows tests to control the advancing of time.
    Clock Clock
    // ModelUUID refers to the model that this hub watcher is being
    // started for.
    ModelUUID string
    // Logger is used to control where the log messages for this watcher go.
    Logger Logger
}

HubWatcherConfig contains the configuration parameters required for a NewHubWatcher.

func (HubWatcherConfig) Validate Uses

func (config HubWatcherConfig) Validate() error

Validate ensures that all the values that have to be set are set.

type HubWatcherStats Uses

type HubWatcherStats struct {
    // WatchKeyCount is the number of keys being watched
    WatchKeyCount int
    // WatchCount is the number of watchers (keys can be watched by multiples)
    WatchCount uint64
    // SyncQueueCap is the maximum buffer size for synchronization events
    SyncQueueCap int
    // SyncQueueLen is the current number of events being queued
    SyncQueueLen int
    // SyncLastLen was the length of SyncQueue the last time we flushed
    SyncLastLen int
    // SyncAvgLen is a smoothed average of recent sync lengths
    SyncAvgLen int
    // SyncMaxLen was the longest we've seen SyncQueue when flushing
    SyncMaxLen int
    // SyncEventDocCount is the number of sync events we've generated for specific documents
    SyncEventDocCount uint64
    // SyncEventCollCount is the number of sync events we've generated for documents changed in collections
    SyncEventCollCount uint64
    // RequestCount is the number of requests (reqWatch/reqUnwatch, etc) that we've seen
    RequestCount uint64
    // ChangeCount is the number of changes we've processed
    ChangeCount uint64
}

HubWatcherStats defines a few metrics that the hub watcher tracks

type Logger Uses

type Logger interface {
    Warningf(format string, values ...interface{})
    Infof(format string, values ...interface{})
    Debugf(format string, values ...interface{})
    Tracef(format string, values ...interface{})
}

Logger represents methods called by this package to a logging system.

type Stopper Uses

type Stopper interface {
    Stop() error
}

Stopper is implemented by all watchers.

type TxnWatcher Uses

type TxnWatcher struct {
    // contains filtered or unexported fields
}

A TxnWatcher watches the txns.log collection and publishes all change events to the hub.

func NewTxnWatcher Uses

func NewTxnWatcher(config TxnWatcherConfig) (*TxnWatcher, error)

New returns a new Watcher observing the changelog collection, which must be a capped collection maintained by mgo/txn.

func (*TxnWatcher) Dead Uses

func (w *TxnWatcher) Dead() <-chan struct{}

Dead returns a channel that is closed when the watcher has stopped.

func (*TxnWatcher) Err Uses

func (w *TxnWatcher) Err() error

Err returns the error with which the watcher stopped. It returns nil if the watcher stopped cleanly, tomb.ErrStillAlive if the watcher is still running properly, or the respective error if the watcher is terminating or has terminated with an error.

func (*TxnWatcher) Kill Uses

func (w *TxnWatcher) Kill()

Kill is part of the worker.Worker interface.

func (*TxnWatcher) Report Uses

func (w *TxnWatcher) Report() map[string]interface{}

Report is part of the watcher/runner Reporting interface, to expose runtime details of the watcher.

func (*TxnWatcher) Stop Uses

func (w *TxnWatcher) Stop() error

Stop stops all the watcher activities.

func (*TxnWatcher) Wait Uses

func (w *TxnWatcher) Wait() error

Wait is part of the worker.Worker interface.

type TxnWatcherConfig Uses

type TxnWatcherConfig struct {
    // ChangeLog is usually the tnxs.log collection.
    ChangeLog *mgo.Collection
    // Hub is where the changes are published to.
    Hub Hub
    // Clock allows tests to control the advancing of time.
    Clock Clock
    // Logger is used to control where the log messages for this watcher go.
    Logger Logger
    // IteratorFunc can be overridden in tests to control what values the
    // watcher sees.
    IteratorFunc func() mongo.Iterator
}

TxnWatcherConfig contains the configuration parameters required for a NewTxnWatcher.

func (TxnWatcherConfig) Validate Uses

func (config TxnWatcherConfig) Validate() error

Validate ensures that all the values that have to be set are set.

Directories

PathSynopsis
watchertest

Package watcher imports 11 packages (graph) and is imported by 956 packages. Updated 2019-08-17. Refresh now. Tools for package owners.