amboy: github.com/mongodb/amboy/queue Index | Files | Directories

package queue

import "github.com/mongodb/amboy/queue"

Package queue provides several implementations of the amboy.Queue interface capable of processing amboy.Job implementations.

Local Shuffled Queue

The shuffled queue is functionally similar to the LocalUnordered Queue (which is, in fact, a FIFO queue as a result of its implementation); however, the shuffled queue dispatches tasks randomized, using the properties of Go's map type, which is not dependent on insertion order.

Additionally this implementation does not using locking, which may improve performance for some workloads. Intentionally, the implementation retains pointers to all completed tasks, and does not cap the number of pending tasks.

Local Unordered Queue

The unordered queue provides a basic, single-instance, amboy.Queue that runs jobs locally in the context of the application with no persistence layer. The unordered queue does not guarantee any particular execution order, nor does it compute dependences between jobs, but, as an implementation detail, dispatches jobs to workers in a first-in-first-out (e.g. FIFO) model.

By default, LocalUnordered uses the amboy/pool.Workers implementation of amboy.Runner interface.

Index

Package Files

doc.go fixed.go ordered.go priority.go remote.go remote_base.go remote_ordered.go shuffled.go unordered.go

type LocalLimitedSize Uses

type LocalLimitedSize struct {
    // contains filtered or unexported fields
}

LocalLimitedSize implements the amboy.Queue interface, and unlike other implementations, the size of the queue is limited for both incoming tasks and completed tasks. This makes it possible to use these queues in situations as parts of services and in longer-running contexts.

func NewLocalLimitedSize Uses

func NewLocalLimitedSize(workers, capacity int) *LocalLimitedSize

NewLocalLimitedSize constructs a LocalLimitedSize queue instance with the specified number of workers and capacity.

func (*LocalLimitedSize) Complete Uses

func (q *LocalLimitedSize) Complete(ctx context.Context, j amboy.Job)

Complete marks a job complete in the queue.

func (*LocalLimitedSize) Get Uses

func (q *LocalLimitedSize) Get(name string) (amboy.Job, bool)

Get returns a job, by name, from the results storage. This does not retrieve pending tasks.

func (*LocalLimitedSize) Next Uses

func (q *LocalLimitedSize) Next(ctx context.Context) amboy.Job

Next returns the next pending job, and is used by amboy.Runner implementations to fetch work. This operation blocks until a job is available or the context is canceled.

func (*LocalLimitedSize) Put Uses

func (q *LocalLimitedSize) Put(j amboy.Job) error

Put adds a job to the queue, returning an error if the queue isn't opened, a task of that name exists has been completed (and is stored in the results storage,) or is pending, and finally if the queue is at capacity.

func (*LocalLimitedSize) Results Uses

func (q *LocalLimitedSize) Results() <-chan amboy.Job

Results is a generator of all completed tasks in the queue.

func (*LocalLimitedSize) Runner Uses

func (q *LocalLimitedSize) Runner() amboy.Runner

Runner returns the Queue's embedded amboy.Runner instance.

func (*LocalLimitedSize) SetRunner Uses

func (q *LocalLimitedSize) SetRunner(r amboy.Runner) error

SetRunner allows callers to, if the queue has not started, inject a different runner implementation.

func (*LocalLimitedSize) Start Uses

func (q *LocalLimitedSize) Start(ctx context.Context) error

Start starts the runner and initializes the pending task storage. Only produces an error if the underlying runner fails to start.

func (*LocalLimitedSize) Started Uses

func (q *LocalLimitedSize) Started() bool

Started returns true if the queue is open and is processing jobs, and false otherwise.

func (*LocalLimitedSize) Stats Uses

func (q *LocalLimitedSize) Stats() amboy.QueueStats

Stats returns information about the current state of jobs in the queue, and the amount of work completed.

type LocalOrdered Uses

type LocalOrdered struct {
    // contains filtered or unexported fields
}

LocalOrdered implements a dependency aware local queue. The queue will execute tasks ordered by the topological sort of the dependency graph derived from the Edges() output of each job's Dependency object. If no task edges are specified, task ordering should be roughly equivalent to other non-ordered queues. If there are cycles in the dependency graph, the queue will error before starting.

func NewLocalOrdered Uses

func NewLocalOrdered(workers int) *LocalOrdered

NewLocalOrdered constructs an LocalOrdered object. The "workers" argument is passed to a default pool.SimplePool object.

func (*LocalOrdered) Complete Uses

func (q *LocalOrdered) Complete(ctx context.Context, j amboy.Job)

Complete marks a job as complete in the context of this queue instance.

func (*LocalOrdered) Get Uses

func (q *LocalOrdered) Get(name string) (amboy.Job, bool)

Get takes a name and returns a completed job.

func (*LocalOrdered) Next Uses

func (q *LocalOrdered) Next(ctx context.Context) amboy.Job

Next returns a job from the Queue. This call is non-blocking. If there are no pending jobs at the moment, then Next returns an error.

func (*LocalOrdered) Put Uses

func (q *LocalOrdered) Put(j amboy.Job) error

Put adds a job to the queue. If the queue has started dispatching jobs you cannot add new jobs to the queue. Additionally all jobs must have unique names. (i.e. job.ID() values.)

func (*LocalOrdered) Results Uses

func (q *LocalOrdered) Results() <-chan amboy.Job

Results provides an iterator of all "result objects," or completed amboy.Job objects. Does not wait for all results to be complete, and is closed when all results have been exhausted, even if there are more results pending. Other implementations may have different semantics for this method.

func (*LocalOrdered) Runner Uses

func (q *LocalOrdered) Runner() amboy.Runner

Runner returns the embedded task runner.

func (*LocalOrdered) SetRunner Uses

func (q *LocalOrdered) SetRunner(r amboy.Runner) error

SetRunner allows users to substitute alternate Runner implementations at run time. This method fails if the runner has started.

func (*LocalOrdered) Start Uses

func (q *LocalOrdered) Start(ctx context.Context) error

Start starts the runner worker processes organizes the graph and begins dispatching jobs to the workers.

func (*LocalOrdered) Started Uses

func (q *LocalOrdered) Started() bool

Started returns true when the Queue has begun dispatching tasks to runners.

func (*LocalOrdered) Stats Uses

func (q *LocalOrdered) Stats() amboy.QueueStats

Stats returns a statistics object with data about the total number of jobs tracked by the queue.

type LocalPriorityQueue Uses

type LocalPriorityQueue struct {
    // contains filtered or unexported fields
}

LocalPriorityQueue is an amboy.Queue implementation that dispatches jobs in priority order, using the Priority method of the Job interface to determine priority. These queues do not have shared storage.

func NewLocalPriorityQueue Uses

func NewLocalPriorityQueue(workers int) *LocalPriorityQueue

NewLocalPriorityQueue constructs a new priority queue instance and initializes a local worker queue with the specified number of worker processes.

func (*LocalPriorityQueue) Complete Uses

func (q *LocalPriorityQueue) Complete(ctx context.Context, j amboy.Job)

Complete marks a job complete. The operation is asynchronous in this implementation.

func (*LocalPriorityQueue) Get Uses

func (q *LocalPriorityQueue) Get(name string) (amboy.Job, bool)

Get takes the name of a job and returns the job from the queue that matches that ID. Use the second return value to check if a job object with that ID exists in the queue.e

func (*LocalPriorityQueue) Next Uses

func (q *LocalPriorityQueue) Next(ctx context.Context) amboy.Job

Next returns a job for processing the queue. This may be a nil job if the context is canceled. Otherwise, this operation blocks until a job is available for dispatching.

func (*LocalPriorityQueue) Put Uses

func (q *LocalPriorityQueue) Put(j amboy.Job) error

Put adds a job to the priority queue. If the Job already exists, this operation updates it in the queue, potentially reordering the queue accordingly.

func (*LocalPriorityQueue) Results Uses

func (q *LocalPriorityQueue) Results() <-chan amboy.Job

Results is a generator of all jobs that report as "Completed" in the queue.

func (*LocalPriorityQueue) Runner Uses

func (q *LocalPriorityQueue) Runner() amboy.Runner

Runner returns the embedded runner instance, which provides and manages the worker processes.

func (*LocalPriorityQueue) SetRunner Uses

func (q *LocalPriorityQueue) SetRunner(r amboy.Runner) error

SetRunner allows users to override the default embedded runner. This is *only* possible if the queue has not started processing jobs. If you attempt to set the runner after the queue has started the operation returns an error and has no effect.

func (*LocalPriorityQueue) Start Uses

func (q *LocalPriorityQueue) Start(ctx context.Context) error

Start begins the work of the queue. It is a noop, without error, to start a queue that's been started, but the operation can error if there were problems starting the underlying runner instance. All resources are released when the context is canceled.

func (*LocalPriorityQueue) Started Uses

func (q *LocalPriorityQueue) Started() bool

Started reports if the queue has begun processing work.

func (*LocalPriorityQueue) Stats Uses

func (q *LocalPriorityQueue) Stats() amboy.QueueStats

Stats returns an amboy.QueueStats object that reflects the queue's current state.

type LocalShuffled Uses

type LocalShuffled struct {
    // contains filtered or unexported fields
}

LocalShuffled provides a queue implementation that shuffles the order of jobs, relative the insertion order. Unlike some of the other local queue implementations that predate LocalShuffled (e.g. LocalUnordered,) there are no mutexes uses in the implementation.

To use the LocalShuffled queue, simply construct a pointer to a LocalShuffled instance, use SetRunner() to specify the runner, and call the Start() method to begin accepting jobs. You cannot add jobs this queue before without starting it.

func (*LocalShuffled) Complete Uses

func (q *LocalShuffled) Complete(ctx context.Context, j amboy.Job)

Complete marks a job as complete in the internal representation. If the context is canceled after calling Complete but before it executes, no change occurs.

func (*LocalShuffled) Get Uses

func (q *LocalShuffled) Get(name string) (amboy.Job, bool)

Get returns a job based on the specified ID. Considers all pending, completed, and in progress jobs.

func (*LocalShuffled) Next Uses

func (q *LocalShuffled) Next(ctx context.Context) amboy.Job

Next returns a new pending job, and is used by the Runner interface to fetch new jobs. This method returns a nil job object is there are no pending jobs.

func (*LocalShuffled) Put Uses

func (q *LocalShuffled) Put(j amboy.Job) error

Put adds a job to the queue, and returns errors if the queue hasn't started or if a job with the same ID value already exists.

func (*LocalShuffled) Results Uses

func (q *LocalShuffled) Results() <-chan amboy.Job

Results returns all completed jobs processed by the queue.

func (*LocalShuffled) Runner Uses

func (q *LocalShuffled) Runner() amboy.Runner

Runner returns the embedded runner.

func (*LocalShuffled) SetRunner Uses

func (q *LocalShuffled) SetRunner(r amboy.Runner) error

SetRunner modifies the embedded amboy.Runner instance, and return an error if the current runner has started.

func (*LocalShuffled) Start Uses

func (q *LocalShuffled) Start(ctx context.Context) error

Start takes a context object and starts the embedded Runner instance and the queue's own background dispatching thread. Returns an error if there is no embedded runner, but is safe to call multiple times.

func (*LocalShuffled) Started Uses

func (q *LocalShuffled) Started() bool

Started returns true after the queue has started processing work, and false otherwise. When the queue has terminated (as a result of the starting context's cancellation.

func (*LocalShuffled) Stats Uses

func (q *LocalShuffled) Stats() amboy.QueueStats

Stats returns a standard report on the number of pending, running, and completed jobs processed by the queue.

type LocalUnordered Uses

type LocalUnordered struct {
    // contains filtered or unexported fields
}

LocalUnordered implements a local-only, channel based, queue interface, and it is a good prototype for testing, in addition to non-distributed workloads.

func NewLocalUnordered Uses

func NewLocalUnordered(workers int) *LocalUnordered

NewLocalUnordered is a constructor for the LocalUnordered implementation of the Queue interface. The constructor takes a single argument, for the number of workers the Runner instance should have. The channels have a buffer of at least 8 or 2 times the number of workers up a total of 64.

func (*LocalUnordered) Complete Uses

func (q *LocalUnordered) Complete(ctx context.Context, j amboy.Job)

Complete marks a job as complete, moving it from the in progress state to the completed state. This operation is asynchronous and non-blocking.

func (*LocalUnordered) Get Uses

func (q *LocalUnordered) Get(name string) (amboy.Job, bool)

Get takes a name and returns a completed job.

func (*LocalUnordered) Next Uses

func (q *LocalUnordered) Next(ctx context.Context) amboy.Job

Next returns a job from the Queue. This call is non-blocking. If there are no pending jobs at the moment, then Next returns an error. If all jobs are complete, then Next also returns an error.

func (*LocalUnordered) Put Uses

func (q *LocalUnordered) Put(j amboy.Job) error

Put adds a job to the amboy.Job Queue. Returns an error if the Queue has not yet started or if an amboy.Job with the same name (i.e. amboy.Job.ID()) exists.

func (*LocalUnordered) Results Uses

func (q *LocalUnordered) Results() <-chan amboy.Job

Results provides an iterator of all "result objects," or completed amboy.Job objects. Does not wait for all results to be complete, and is closed when all results have been exhausted, even if there are more results pending. Other implementations may have different semantics for this method.

func (*LocalUnordered) Runner Uses

func (q *LocalUnordered) Runner() amboy.Runner

Runner returns the embedded task runner.

func (*LocalUnordered) SetRunner Uses

func (q *LocalUnordered) SetRunner(r amboy.Runner) error

SetRunner allows users to substitute alternate Runner implementations at run time. This method fails if the runner has started.

func (*LocalUnordered) Start Uses

func (q *LocalUnordered) Start(ctx context.Context) error

Start kicks off the background process that dispatches Jobs. Also starts the embedded runner, and errors if it cannot start. Should handle all errors from this method as fatal errors. If you call start on a queue that has been started, subsequent calls to Start() are a noop, and do not return an error.

func (*LocalUnordered) Started Uses

func (q *LocalUnordered) Started() bool

Started returns true when the Queue has begun dispatching tasks to runners.

func (*LocalUnordered) Stats Uses

func (q *LocalUnordered) Stats() amboy.QueueStats

Stats returns a statistics object with data about the total number of jobs tracked by the queue.

type RemoteUnordered Uses

type RemoteUnordered struct {
    // contains filtered or unexported fields
}

RemoteUnordered are queues that use a Driver as backend for job storage and processing and do not impose any additional ordering beyond what's provided by the driver.

func NewRemoteUnordered Uses

func NewRemoteUnordered(size int) *RemoteUnordered

NewRemoteUnordered returns a queue that has been initialized with a local worker pool Runner instance of the specified size.

func (RemoteUnordered) Complete Uses

func (q RemoteUnordered) Complete(ctx context.Context, j amboy.Job)

Complete takes a context and, asynchronously, marks the job complete, in the queue.

func (RemoteUnordered) Driver Uses

func (q RemoteUnordered) Driver() driver.Driver

Driver provides access to the embedded driver instance which provides access to the Queue's persistence layer. This method is not part of the amboy.Queue interface.

func (RemoteUnordered) Get Uses

func (q RemoteUnordered) Get(name string) (amboy.Job, bool)

Get retrieves a job from the queue's storage. The second value reflects the existence of a job of that name in the queue's storage.

func (*RemoteUnordered) Next Uses

func (q *RemoteUnordered) Next(ctx context.Context) amboy.Job

Next returns a Job from the queue. Returns a nil Job object if the context is canceled. The operation is blocking until an undispatched, unlocked job is available. This operation takes a job lock.

func (RemoteUnordered) Put Uses

func (q RemoteUnordered) Put(j amboy.Job) error

Put adds a Job to the queue. It is generally an error to add the same job to a queue more than once, but this depends on the implementation of the underlying driver.

func (RemoteUnordered) Results Uses

func (q RemoteUnordered) Results() <-chan amboy.Job

Results provides a generator that iterates all completed jobs.

func (RemoteUnordered) Runner Uses

func (q RemoteUnordered) Runner() amboy.Runner

Runner returns (a pointer generally) to the instances' embedded amboy.Runner instance. Typically used to call the runner's close method.

func (RemoteUnordered) SetDriver Uses

func (q RemoteUnordered) SetDriver(d driver.Driver) error

SetDriver allows callers to inject at runtime alternate driver instances. It is an error to change Driver instances after starting a queue. This method is not part of the amboy.Queue interface.

func (RemoteUnordered) SetRunner Uses

func (q RemoteUnordered) SetRunner(r amboy.Runner) error

SetRunner allows callers to inject alternate runner implementations before starting the queue. After the queue is started it is an error to use SetRunner.

func (RemoteUnordered) Start Uses

func (q RemoteUnordered) Start(ctx context.Context) error

Start initiates the job dispatching and prcessing functions of the queue. If the queue is started this is a noop, however, if the driver or runner are not initialized, this operation returns an error. To release the resources created when starting the queue, cancel the context used when starting the queue.

func (RemoteUnordered) Started Uses

func (q RemoteUnordered) Started() bool

Started reports if the queue has begun processing jobs.

func (RemoteUnordered) Stats Uses

func (q RemoteUnordered) Stats() amboy.QueueStats

Stats returns a amboy.QueueStats object that reflects the progress jobs in the queue.

type SimpleRemoteOrdered Uses

type SimpleRemoteOrdered struct {
    // contains filtered or unexported fields
}

SimpleRemoteOrdered queue implements the amboy.Queue interface and uses a driver backend, like the RemoteUnordered queue. However, this implementation evaluates and respects dependencies, unlike the RemoteUnordered implementation which executes all tasks.

The term simple differentiates from a queue that schedules tasks in order based on reported edges, which may be more efficient with more complex dependency graphs. Internally SimpleRemoteOrdered and RemoteUnordred share an implementation *except* for the Next method, which differs in task dispatching strategies.

func NewSimpleRemoteOrdered Uses

func NewSimpleRemoteOrdered(size int) *SimpleRemoteOrdered

NewSimpleRemoteOrdered returns a queue with a configured local runner with the specified number of workers.

func (SimpleRemoteOrdered) Complete Uses

func (q SimpleRemoteOrdered) Complete(ctx context.Context, j amboy.Job)

Complete takes a context and, asynchronously, marks the job complete, in the queue.

func (SimpleRemoteOrdered) Driver Uses

func (q SimpleRemoteOrdered) Driver() driver.Driver

Driver provides access to the embedded driver instance which provides access to the Queue's persistence layer. This method is not part of the amboy.Queue interface.

func (SimpleRemoteOrdered) Get Uses

func (q SimpleRemoteOrdered) Get(name string) (amboy.Job, bool)

Get retrieves a job from the queue's storage. The second value reflects the existence of a job of that name in the queue's storage.

func (*SimpleRemoteOrdered) Next Uses

func (q *SimpleRemoteOrdered) Next(ctx context.Context) amboy.Job

Next contains the unique implementation details of the SimpleRemoteOrdered queue. It fetches a job from the backend, skipping all jobs that are: locked (in progress elsewhere,) marked as "Passed" (all work complete,) and Unresolvable (e.g. stuck). For jobs that are Ready to run, it dispatches them immediately.

For job that are Blocked, Next also skips these jobs *but* in hopes that the next time this job is dispatched its dependencies will be ready. if there is only one Edge reported, blocked will attempt to dispatch the dependent job.

func (SimpleRemoteOrdered) Put Uses

func (q SimpleRemoteOrdered) Put(j amboy.Job) error

Put adds a Job to the queue. It is generally an error to add the same job to a queue more than once, but this depends on the implementation of the underlying driver.

func (SimpleRemoteOrdered) Results Uses

func (q SimpleRemoteOrdered) Results() <-chan amboy.Job

Results provides a generator that iterates all completed jobs.

func (SimpleRemoteOrdered) Runner Uses

func (q SimpleRemoteOrdered) Runner() amboy.Runner

Runner returns (a pointer generally) to the instances' embedded amboy.Runner instance. Typically used to call the runner's close method.

func (SimpleRemoteOrdered) SetDriver Uses

func (q SimpleRemoteOrdered) SetDriver(d driver.Driver) error

SetDriver allows callers to inject at runtime alternate driver instances. It is an error to change Driver instances after starting a queue. This method is not part of the amboy.Queue interface.

func (SimpleRemoteOrdered) SetRunner Uses

func (q SimpleRemoteOrdered) SetRunner(r amboy.Runner) error

SetRunner allows callers to inject alternate runner implementations before starting the queue. After the queue is started it is an error to use SetRunner.

func (SimpleRemoteOrdered) Start Uses

func (q SimpleRemoteOrdered) Start(ctx context.Context) error

Start initiates the job dispatching and prcessing functions of the queue. If the queue is started this is a noop, however, if the driver or runner are not initialized, this operation returns an error. To release the resources created when starting the queue, cancel the context used when starting the queue.

func (SimpleRemoteOrdered) Started Uses

func (q SimpleRemoteOrdered) Started() bool

Started reports if the queue has begun processing jobs.

func (SimpleRemoteOrdered) Stats Uses

func (q SimpleRemoteOrdered) Stats() amboy.QueueStats

Stats returns a amboy.QueueStats object that reflects the progress jobs in the queue.

Directories

PathSynopsis
driverCapped Results Storage

Package queue imports 16 packages (graph) and is imported by 6 packages. Updated 2017-09-14. Refresh now. Tools for package owners.