amboy: Index | Files

package queue

import ""

Package queue provides several implementations of the amboy.Queue interface capable of processing amboy.Job implementations.

Local Shuffled Queue

The shuffled queue is functionally similar to the LocalUnordered Queue (which is, in fact, a FIFO queue as a result of its implementation); however, the shuffled queue dispatches tasks randomized, using the properties of Go's map type, which is not dependent on insertion order.

Additionally this implementation does not using locking, which may improve performance for some workloads. Intentionally, the implementation retains pointers to all completed tasks, and does not cap the number of pending tasks.

Local Unordered Queue

The unordered queue provides a basic, single-instance, amboy.Queue that runs jobs locally in the context of the application with no persistence layer. The unordered queue does not guarantee any particular execution order, nor does it compute dependences between jobs, but, as an implementation detail, dispatches jobs to workers in a first-in-first-out (e.g. FIFO) model.

By default, LocalUnordered uses the amboy/pool.Workers implementation of amboy.Runner interface.


Package Files

adaptive_order.go adaptive_order_storage.go doc.go driver.go driver_internal.go driver_locker.go driver_mongodb.go driver_priority.go fixed.go ordered.go priority.go priority_storage.go remote.go remote_base.go remote_ordered.go shuffled.go sqs.go unordered.go util.go


const LockTimeout = 5 * time.Minute

LockTimeout reflects the distributed lock timeout period.

func NewAdaptiveOrderedLocalQueue Uses

func NewAdaptiveOrderedLocalQueue(workers int) amboy.Queue

NewAdaptiveOrderedLocalQueue provides a queue implementation that stores jobs in memory, and dispatches tasks based on the dependency information.

Use this implementation rather than LocalOrderedQueue when you need to add jobs *after* starting the queue, and when you want to avoid the higher potential overhead of the remote-backed queues.

func NewLocalLimitedSize Uses

func NewLocalLimitedSize(workers, capacity int) amboy.Queue

NewLocalLimitedSize constructs a LocalLimitedSize queue instance with the specified number of workers and capacity.

func NewLocalOrdered Uses

func NewLocalOrdered(workers int) amboy.Queue

NewLocalOrdered constructs an LocalOrdered object. The "workers" argument is passed to a default pool.SimplePool object.

func NewLocalPriorityQueue Uses

func NewLocalPriorityQueue(workers int) amboy.Queue

NewLocalPriorityQueue constructs a new priority queue instance and initializes a local worker queue with the specified number of worker processes.

func NewLocalUnordered Uses

func NewLocalUnordered(workers int) amboy.Queue

NewLocalUnordered is a constructor for a local queue that does not respect dependency information in dispatching queue jobs.

All jobs are stored in memory and while there is a buffer of pending work, in general the number of buffered jobs is equal to twice the size of the worker pool, up to 64 jobs.

func NewSQSFifoQueue Uses

func NewSQSFifoQueue(queueName string, workers int) (amboy.Queue, error)

func NewShuffledLocal Uses

func NewShuffledLocal(workers int) amboy.Queue

NewShuffledLocal provides a queue implementation that shuffles the order of jobs, relative the insertion order.

type Driver Uses

type Driver interface {
    ID() string
    Open(context.Context) error

    Get(string) (amboy.Job, error)
    Put(amboy.Job) error
    Save(amboy.Job) error
    SaveStatus(amboy.Job, amboy.JobStatusInfo) error

    Jobs() <-chan amboy.Job
    Next(context.Context) amboy.Job

    Stats() amboy.QueueStats
    JobStats(context.Context) <-chan amboy.JobStatusInfo


Driver describes the interface between a queue and an out of process persistence layer, like a database.

func NewInternalDriver Uses

func NewInternalDriver() Driver

NewInternalDriver creates a local persistence layer object.

func NewMongoDBDriver Uses

func NewMongoDBDriver(name string, opts MongoDBOptions) Driver

NewMongoDBDriver creates a driver object given a name, which serves as a prefix for collection names, and a MongoDB connection

func NewPriorityDriver Uses

func NewPriorityDriver() Driver

NewPriorityDriver returns an initialized Priority Driver instances.

func OpenNewMongoDBDriver Uses

func OpenNewMongoDBDriver(ctx context.Context, name string, opts MongoDBOptions, session *mgo.Session) (Driver, error)

OpenNewMongoDBDriver constructs and opens a new MongoDB driver instance using the specified session. It is equivalent to calling NewMongoDB() and calling *MongoDB.Open().

type LockManager Uses

type LockManager interface {
    Lock(context.Context, amboy.Job) error
    Unlock(amboy.Job) error

LockManager describes the component of the Driver interface that handles job mutexing.

func NewLockManager Uses

func NewLockManager(ctx context.Context, d Driver) LockManager

NewLockManager configures a Lock manager for use in Driver implementations. This operation does *not* start the background thread. The name *must* be unique per driver/queue combination, to ensure that each driver/queue can have exclusive locks over jobs.

type MongoDBOptions Uses

type MongoDBOptions struct {
    URI            string
    DB             string
    Priority       bool
    CheckWaitUntil bool

MongoDBOptions is a struct passed to the NewMongoDB constructor to communicate mongoDB specific settings about the driver's behavior and operation.

func DefaultMongoDBOptions Uses

func DefaultMongoDBOptions() MongoDBOptions

DefaultMongoDBOptions constructs a new options object with default values: connecting to a MongoDB instance on localhost, using the "amboy" database, and *not* using priority ordering of jobs.

type Remote Uses

type Remote interface {
    SetDriver(Driver) error
    Driver() Driver

Remote queues extend the queue interface to allow a pluggable-storage backend, or "driver"

func NewRemoteUnordered Uses

func NewRemoteUnordered(size int) Remote

NewRemoteUnordered returns a queue that has been initialized with a local worker pool Runner instance of the specified size.

func NewSimpleRemoteOrdered Uses

func NewSimpleRemoteOrdered(size int) Remote

NewSimpleRemoteOrdered returns a queue with a configured local runner with the specified number of workers.

Package queue imports 30 packages (graph) and is imported by 25 packages. Updated 2018-11-21. Refresh now. Tools for package owners.