amboy: github.com/mongodb/amboy/queue/driver Index | Files

package driver

import "github.com/mongodb/amboy/queue/driver"

Capped Results Storage

The CappedResultsStorage type provides a fixed size for results storage, for local queues (that don't have external storage) in the context of long running applications.

Index

Package Files

capped_results_storage.go interface.go internal.go locker.go mongodb.go priority.go priority_storage.go

type CappedResultStorage Uses

type CappedResultStorage struct {
    Cap int
    // contains filtered or unexported fields
}

CappedResultStorage provides a fixed size storage structure for queue results, and is used as the backend for a queue implementation.

func NewCappedResultStorage Uses

func NewCappedResultStorage(cap int) *CappedResultStorage

NewCappedResultStorage creates a new structure for storing queue results.

func (*CappedResultStorage) Add Uses

func (s *CappedResultStorage) Add(j amboy.Job)

Add inserts a job into the structure.

func (*CappedResultStorage) Contents Uses

func (s *CappedResultStorage) Contents() <-chan amboy.Job

Contents is a generator that produces all jobs in the results storage. The order is random.

func (*CappedResultStorage) Get Uses

func (s *CappedResultStorage) Get(name string) (amboy.Job, bool)

Get retrieves an object from the results storage by name. If the object doesn't exist, the second value is false.

func (*CappedResultStorage) Size Uses

func (s *CappedResultStorage) Size() int

Size returns the current number of results in the storage structure.

type Driver Uses

type Driver interface {
    Open(context.Context) error
    Close()

    Get(string) (amboy.Job, error)
    Save(amboy.Job) error
    SaveStatus(amboy.Job, amboy.JobStatusInfo) error

    Jobs() <-chan amboy.Job
    Next() amboy.Job

    Stats() amboy.QueueStats

    // The Lock and Unlock methods are typically provided by the
    // LockManager type.
    Lock(amboy.Job) error
    Unlock(amboy.Job) error
}

Driver describes the interface between a queue and an out of process persistence layer, like a database.

type Internal Uses

type Internal struct {
    *LockManager
    // contains filtered or unexported fields
}

Internal implements the driver interface, but rather than connecting to a remote data source, this implementation is mostly for testing the queue implementation locally, and providing a proof of concept for the remote driver. May also be useful for converting a remote queue into a local-only architecture in a dependency-injection situation.

func NewInternal Uses

func NewInternal() *Internal

NewInternal creates a local persistence layer object.

func (*Internal) Close Uses

func (d *Internal) Close()

Close is a noop for the Internal implementation, and exists to satisfy the Driver interface.

func (*Internal) Get Uses

func (d *Internal) Get(name string) (amboy.Job, error)

Get retrieves a job object from the persistence system based on the name (ID) of the job. If no job exists by this name, the error is non-nil.

func (*Internal) Jobs Uses

func (d *Internal) Jobs() <-chan amboy.Job

Jobs is a generator of all Job objects stored by the driver. There is no additional filtering of the jobs produced by this generator.

func (*Internal) Next Uses

func (d *Internal) Next() amboy.Job

Next returns a job that is not complete from the queue. If there are no pending jobs, then this method returns nil, but does not block.

func (*Internal) Open Uses

func (d *Internal) Open(ctx context.Context) error

Open is a noop for the Internal implementation, and exists to satisfy the Driver interface.

func (*Internal) Save Uses

func (d *Internal) Save(j amboy.Job) error

Save takes a job and persists it in the storage for this driver. If there is no job with a matching ID, then this operation returns an error.

func (*Internal) SaveStatus Uses

func (d *Internal) SaveStatus(j amboy.Job, stat amboy.JobStatusInfo) error

SaveStatus persists only the status document in the job in the persistence layer. If the job does not exist, this method produces an error.

func (*Internal) Stats Uses

func (d *Internal) Stats() amboy.QueueStats

Stats iterates through all of the jobs stored in the driver and determines how many locked, completed, and pending jobs are stored in the queue.

type LockManager Uses

type LockManager struct {
    // contains filtered or unexported fields
}

LockManager provides an implementation of the Lock and Unlock methods to be composed by amboy/queue/driver.Driver implementations.

LockManagers open a single background process that updates all tracked locks at an interval, less than the configured lockTimeout to avoid locks growing stale.

func NewLockManager Uses

func NewLockManager(name string, d Driver) *LockManager

NewLockManager configures a Lock manager for use in Driver implementations. This operation does *not* start the background thread. The name *must* be unique per driver/queue combination, to ensure that each driver/queue can have exclusive locks over jobs.

func (*LockManager) Lock Uses

func (l *LockManager) Lock(j amboy.Job) error

Lock takes an exclusive lock on the specified job and instructs a background process to update it continually.

Returns an error if the Lock is already locked or if there's a problem updating the document.

func (*LockManager) Open Uses

func (l *LockManager) Open(ctx context.Context)

Open starts the background thread for the Lock manager if it does not already exist.

func (*LockManager) Unlock Uses

func (l *LockManager) Unlock(j amboy.Job) error

Unlock removes this process' exclusive lock on the specified job and instructs the background job to begin updating the lock regularly. Returns an error if no lock exists or if there was a problem updating the lock in the persistence layer.

type MongoDB Uses

type MongoDB struct {
    *LockManager
    // contains filtered or unexported fields
}

MongoDB is a type that represents and wraps a queues persistence of jobs *and* locks to a MongoDB instance.

func NewMongoDB Uses

func NewMongoDB(name string, opts MongoDBOptions) *MongoDB

NewMongoDB creates a driver object given a name, which serves as a prefix for collection names, and a MongoDB connection

func OpenNewMongoDB Uses

func OpenNewMongoDB(ctx context.Context, name string, opts MongoDBOptions, session *mgo.Session) (*MongoDB, error)

OpenNewMongoDB constructs and opens a new MongoDB driver instance using the specified session. It is equivalent to calling NewMongoDB() and calling *MongoDB.Open().

func (*MongoDB) Close Uses

func (d *MongoDB) Close()

Close terminates the connection to the database server.

func (*MongoDB) Get Uses

func (d *MongoDB) Get(name string) (amboy.Job, error)

Get takes the name of a job and returns an amboy.Job object from the persistence layer for the job matching that unique id.

func (*MongoDB) Jobs Uses

func (d *MongoDB) Jobs() <-chan amboy.Job

Jobs returns a channel containing all jobs persisted by this driver. This includes all completed, pending, and locked jobs. Errors, including those with connections to MongoDB or with corrupt job documents, are logged.

func (*MongoDB) Next Uses

func (d *MongoDB) Next() amboy.Job

Next returns one job, not marked complete from the database.

func (*MongoDB) Open Uses

func (d *MongoDB) Open(ctx context.Context) error

Open creates a connection to MongoDB, and returns an error if there's a problem connecting.

func (*MongoDB) Save Uses

func (d *MongoDB) Save(j amboy.Job) error

Save takes a job object and updates that job in the persistence layer. Replaces or updates an existing job with the same ID

func (*MongoDB) SaveStatus Uses

func (d *MongoDB) SaveStatus(j amboy.Job, stat amboy.JobStatusInfo) error

SaveStatus persists only the status document in the job in the persistence layer. If the job does not exist, or the underlying status document has changed incompatibly this operation produces an error.

func (*MongoDB) Stats Uses

func (d *MongoDB) Stats() amboy.QueueStats

Stats returns a Stats object that contains information about the state of the queue in the persistence layer. This operation performs a number of asynchronous queries to collect data, and in an active system with a number of active queues, stats may report incongruous data.

type MongoDBOptions Uses

type MongoDBOptions struct {
    URI      string
    DB       string
    Priority bool
}

MongoDBOptions is a struct passed to the NewMongoDB constructor to communicate MongoDB specific settings about the driver's behavior and operation.

func DefaultMongoDBOptions Uses

func DefaultMongoDBOptions() MongoDBOptions

DefaultMongoDBOptions constructs a new options object with default values: connecting to a MongoDB instance on localhost, using the "amboy" database, and *not* using priority ordering of jobs.

type Priority Uses

type Priority struct {
    *LockManager
    // contains filtered or unexported fields
}

Priority implements the Driver interface, wrapping a PriorityStorage instance. This allows "local" (i.e. intraprocess) shared queues that dispatch jobs in priority order.

func NewPriority Uses

func NewPriority() *Priority

NewPriority returns an initialized Priority Driver instances.

func (*Priority) Close Uses

func (p *Priority) Close()

Close release all resources associated with the Driver instance.

func (*Priority) Get Uses

func (p *Priority) Get(name string) (amboy.Job, error)

Get returns a job object, specified by name/ID from the backing storage. If the job doesn't exist the error value is non-nil.

func (*Priority) Jobs Uses

func (p *Priority) Jobs() <-chan amboy.Job

Jobs returns an iterator of all Job objects tracked by the Driver.

func (*Priority) Next Uses

func (p *Priority) Next() amboy.Job

Next returns the next, highest priority Job from the Driver's backing storage. If there are no queued jobs, the job object is nil.

func (*Priority) Open Uses

func (p *Priority) Open(ctx context.Context) error

Open initilizes the resources of the Driver, and is part of the Driver interface. In the case of the Priority Driver, this operation cannot error.

func (*Priority) Save Uses

func (p *Priority) Save(j amboy.Job) error

Save updates the stored version of the job in the Driver's backing storage. If the job is not tracked by the Driver, this operation is an error.

func (*Priority) SaveStatus Uses

func (p *Priority) SaveStatus(j amboy.Job, stat amboy.JobStatusInfo) error

SaveStatus persists only the status document in the job in the persistence layer. If the job does not exist, this method produces an error.

func (*Priority) Stats Uses

func (p *Priority) Stats() amboy.QueueStats

Stats returns a report of the Driver's current state in the form of a driver.Stats document.

type PriorityStorage Uses

type PriorityStorage struct {
    // contains filtered or unexported fields
}

PriorityStorage is a local storage system for Jobs in priority order. Used by the LocalPriorityQueue, and wrapped by the LocalPriorityDriver for use in remote queues.

func NewPriorityStorage Uses

func NewPriorityStorage() *PriorityStorage

NewPriorityStorage returns an initialized PriorityStorage object.

func (*PriorityStorage) Contents Uses

func (s *PriorityStorage) Contents() <-chan amboy.Job

Contents returns a generator of all jobs. tracked by this instance. This includes completed jobs.

func (*PriorityStorage) Get Uses

func (s *PriorityStorage) Get(name string) (amboy.Job, bool)

Get returns a job from the queue's storage by name, with the boolean value used to validate the job's existence.

func (*PriorityStorage) JobServer Uses

func (s *PriorityStorage) JobServer(ctx context.Context, jobs chan amboy.Job)

JobServer takes a channel constructed outside of this instance, and pushes jobs from the priority queue through that channel. The JobServer does not push nil jobs through the channel.

func (*PriorityStorage) Pending Uses

func (s *PriorityStorage) Pending() int

Pending returns the total number of pending jobs waiting for dispatch.

func (*PriorityStorage) Pop Uses

func (s *PriorityStorage) Pop() amboy.Job

Pop returns the next highest priority job from the queue. If there are no Jobs in the queue, Pop returns nil.

func (*PriorityStorage) Push Uses

func (s *PriorityStorage) Push(j amboy.Job)

Push inserts a job into the priority queue. If the Job exists (by ID), then this operation updates the existing job.

func (*PriorityStorage) Size Uses

func (s *PriorityStorage) Size() int

Size returns the total number of jobs stored in the instance.

Package driver imports 13 packages (graph) and is imported by 2 packages. Updated 2017-04-18. Refresh now. Tools for package owners.