amboy: Index | Files

package queue

import ""

Package queue provides several implementations of the amboy.Queue interface capable of processing amboy.Job implementations.

Local Shuffled Queue

The shuffled queue is functionally similar to the LocalUnordered Queue (which is, in fact, a FIFO queue as a result of its implementation); however, the shuffled queue dispatches tasks randomized, using the properties of Go's map type, which is not dependent on insertion order.

Additionally this implementation does not using locking, which may improve performance for some workloads. Intentionally, the implementation retains pointers to all completed tasks, and does not cap the number of pending tasks.

Local Unordered Queue

The unordered queue provides a basic, single-instance, amboy.Queue that runs jobs locally in the context of the application with no persistence layer. The unordered queue does not guarantee any particular execution order, nor does it compute dependences between jobs, but, as an implementation detail, dispatches jobs to workers in a first-in-first-out (e.g. FIFO) model.

By default, LocalUnordered uses the amboy/pool.Workers implementation of amboy.Runner interface.


Package Files

adaptive_order.go adaptive_order_storage.go doc.go driver.go driver_internal.go driver_locker.go driver_mgo.go driver_mongo.go driver_priority.go fixed.go fixed_storage.go ordered.go priority.go priority_storage.go remote.go remote_base.go remote_ordered.go shuffled.go sqs.go unordered.go util.go


const LockTimeout = 5 * time.Minute

LockTimeout reflects the distributed lock timeout period.

func NewAdaptiveOrderedLocalQueue Uses

func NewAdaptiveOrderedLocalQueue(workers, capacity int) amboy.Queue

NewAdaptiveOrderedLocalQueue provides a queue implementation that stores jobs in memory, and dispatches tasks based on the dependency information.

Use this implementation rather than LocalOrderedQueue when you need to add jobs *after* starting the queue, and when you want to avoid the higher potential overhead of the remote-backed queues.

func NewLocalLimitedSize Uses

func NewLocalLimitedSize(workers, capacity int) amboy.Queue

NewLocalLimitedSize constructs a LocalLimitedSize queue instance with the specified number of workers and capacity.

func NewLocalOrdered Uses

func NewLocalOrdered(workers int) amboy.Queue

NewLocalOrdered constructs an LocalOrdered object. The "workers" argument is passed to a default pool.SimplePool object.

func NewLocalPriorityQueue Uses

func NewLocalPriorityQueue(workers, capacity int) amboy.Queue

NewLocalPriorityQueue constructs a new priority queue instance and initializes a local worker queue with the specified number of worker processes.

func NewLocalUnordered Uses

func NewLocalUnordered(workers int) amboy.Queue

NewLocalUnordered is a constructor for a local queue that does not respect dependency information in dispatching queue jobs.

All jobs are stored in memory and while there is a buffer of pending work, in general the number of buffered jobs is equal to twice the size of the worker pool, up to 64 jobs.

func NewSQSFifoQueue Uses

func NewSQSFifoQueue(queueName string, workers int) (amboy.Queue, error)

func NewShuffledLocal Uses

func NewShuffledLocal(workers, capacity int) amboy.Queue

NewShuffledLocal provides a queue implementation that shuffles the order of jobs, relative the insertion order.

type Driver Uses

type Driver interface {
    ID() string
    Open(context.Context) error

    Get(context.Context, string) (amboy.Job, error)
    Put(context.Context, amboy.Job) error
    Save(context.Context, amboy.Job) error
    SaveStatus(context.Context, amboy.Job, amboy.JobStatusInfo) error

    Jobs(context.Context) <-chan amboy.Job
    Next(context.Context) amboy.Job

    Stats(context.Context) amboy.QueueStats
    JobStats(context.Context) <-chan amboy.JobStatusInfo


Driver describes the interface between a queue and an out of process persistence layer, like a database.

func NewInternalDriver Uses

func NewInternalDriver() Driver

NewInternalDriver creates a local persistence layer object.

func NewMgoDriver Uses

func NewMgoDriver(name string, opts MongoDBOptions) Driver

NewMgoDriver creates a driver object given a name, which serves as a prefix for collection names, and a MongoDB connection

func NewMongoDriver Uses

func NewMongoDriver(name string, opts MongoDBOptions) Driver

func NewPriorityDriver Uses

func NewPriorityDriver() Driver

NewPriorityDriver returns an initialized Priority Driver instances.

func OpenNewMgoDriver Uses

func OpenNewMgoDriver(ctx context.Context, name string, opts MongoDBOptions, session *mgo.Session) (Driver, error)

OpenNewMgoDriver constructs and opens a new MongoDB driver instance using the specified session. It is equivalent to calling NewMgo() and calling *MongoDB.Open().

func OpenNewMongoDriver Uses

func OpenNewMongoDriver(ctx context.Context, name string, opts MongoDBOptions, client *mongo.Client) (Driver, error)

OpenNewMongoDriver constructs and opens a new MongoDB driver instance using the specified session. It is equivalent to calling NewMongoDriver() and calling driver.Open().

type LockManager Uses

type LockManager interface {
    Lock(context.Context, amboy.Job) error
    Unlock(context.Context, amboy.Job) error

LockManager describes the component of the Driver interface that handles job mutexing.

func NewLockManager Uses

func NewLockManager(ctx context.Context, d Driver) LockManager

NewLockManager configures a Lock manager for use in Driver implementations. This operation does *not* start the background thread. The name *must* be unique per driver/queue combination, to ensure that each driver/queue can have exclusive locks over jobs.

type MongoDBOptions Uses

type MongoDBOptions struct {
    URI            string
    DB             string
    Priority       bool
    CheckWaitUntil bool

MongoDBOptions is a struct passed to the NewMgo constructor to communicate mgoDriver specific settings about the driver's behavior and operation.

func DefaultMongoDBOptions Uses

func DefaultMongoDBOptions() MongoDBOptions

DefaultMongoDBOptions constructs a new options object with default values: connecting to a MongoDB instance on localhost, using the "amboy" database, and *not* using priority ordering of jobs.

type Remote Uses

type Remote interface {
    SetDriver(Driver) error
    Driver() Driver

Remote queues extend the queue interface to allow a pluggable-storage backend, or "driver"

func NewRemoteUnordered Uses

func NewRemoteUnordered(size int) Remote

NewRemoteUnordered returns a queue that has been initialized with a local worker pool Runner instance of the specified size.

func NewSimpleRemoteOrdered Uses

func NewSimpleRemoteOrdered(size int) Remote

NewSimpleRemoteOrdered returns a queue with a configured local runner with the specified number of workers.

Package queue imports 34 packages (graph) and is imported by 33 packages. Updated 2019-02-18. Refresh now. Tools for package owners.