queue

package
v0.0.0-...-2a91a1d Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 21, 2019 License: Apache-2.0 Imports: 35 Imported by: 0

Documentation

Overview

Package queue provides several implementations of the amboy.Queue interface capable of processing amboy.Job implementations.

Local Shuffled Queue

The shuffled queue is functionally similar to the LocalUnordered Queue (which is, in fact, a FIFO queue as a result of its implementation); however, the shuffled queue dispatches tasks randomized, using the properties of Go's map type, which is not dependent on insertion order.

Additionally this implementation does not using locking, which may improve performance for some workloads. Intentionally, the implementation retains pointers to all completed tasks, and does not cap the number of pending tasks.

Local Unordered Queue

The unordered queue provides a basic, single-instance, amboy.Queue that runs jobs locally in the context of the application with no persistence layer. The unordered queue does not guarantee any particular execution order, nor does it compute dependences between jobs, but, as an implementation detail, dispatches jobs to workers in a first-in-first-out (e.g. FIFO) model.

By default, LocalUnordered uses the amboy/pool.Workers implementation of amboy.Runner interface.

Index

Constants

View Source
const LockTimeout = 5 * time.Minute

LockTimeout reflects the distributed lock timeout period.

Variables

This section is empty.

Functions

func NewAdaptiveOrderedLocalQueue

func NewAdaptiveOrderedLocalQueue(workers, capacity int) amboy.Queue

NewAdaptiveOrderedLocalQueue provides a queue implementation that stores jobs in memory, and dispatches tasks based on the dependency information.

Use this implementation rather than LocalOrderedQueue when you need to add jobs *after* starting the queue, and when you want to avoid the higher potential overhead of the remote-backed queues.

func NewLocalLimitedSize

func NewLocalLimitedSize(workers, capacity int) amboy.Queue

NewLocalLimitedSize constructs a LocalLimitedSize queue instance with the specified number of workers and capacity.

func NewLocalOrdered

func NewLocalOrdered(workers int) amboy.Queue

NewLocalOrdered constructs an LocalOrdered object. The "workers" argument is passed to a default pool.SimplePool object.

func NewLocalPriorityQueue

func NewLocalPriorityQueue(workers, capacity int) amboy.Queue

NewLocalPriorityQueue constructs a new priority queue instance and initializes a local worker queue with the specified number of worker processes.

func NewLocalQueueGroup

func NewLocalQueueGroup(ctx context.Context, opts LocalQueueGroupOptions) (amboy.QueueGroup, error)

NewLocalQueueGroup constructs a new local queue group. If ttl is 0, the queues will not be TTLed except when the client explicitly calls Prune.

func NewLocalUnordered

func NewLocalUnordered(workers int) amboy.Queue

NewLocalUnordered is a constructor for a local queue that does not respect dependency information in dispatching queue jobs.

All jobs are stored in memory and while there is a buffer of pending work, in general the number of buffered jobs is equal to twice the size of the worker pool, up to 64 jobs.

func NewMgoRemoteSingleQueueGroup

func NewMgoRemoteSingleQueueGroup(ctx context.Context, opts RemoteQueueGroupOptions, session *mgo.Session, mdbopts MongoDBOptions) (amboy.QueueGroup, error)

NewMgoRemoteSingleQueueGroup constructs a new remote queue group where all queues are stored in a single collection, using the legacy driver. If ttl is 0, the queues will not be TTLed except when the client explicitly calls Prune.

func NewMongoRemoteQueueGroup

func NewMongoRemoteQueueGroup(ctx context.Context, opts RemoteQueueGroupOptions, client *mongo.Client, mdbopts MongoDBOptions) (amboy.QueueGroup, error)

NewMongoRemoteQueueGroup constructs a new remote queue group. If ttl is 0, the queues will not be TTLed except when the client explicitly calls Prune.

The MongoRemoteQueue group creats a new collection for every queue, unlike the other remote queue group implementations. This is probably most viable for lower volume workloads; however, the caching mechanism may be more responsive in some situations.

func NewMongoRemoteSingleQueueGroup

func NewMongoRemoteSingleQueueGroup(ctx context.Context, opts RemoteQueueGroupOptions, client *mongo.Client, mdbopts MongoDBOptions) (amboy.QueueGroup, error)

NewMongoRemoteSingleQueueGroup constructs a new remote queue group. If ttl is 0, the queues will not be TTLed except when the client explicitly calls Prune.

func NewSQSFifoQueue

func NewSQSFifoQueue(queueName string, workers int) (amboy.Queue, error)

NewSQSFifoQueue constructs a AWS SQS backed Queue implementation. This queue, generally is ephemeral: tasks are removed from the queue, and therefore may not handle jobs across restarts.

func NewShuffledLocal

func NewShuffledLocal(workers, capacity int) amboy.Queue

NewShuffledLocal provides a queue implementation that shuffles the order of jobs, relative the insertion order.

Types

type Driver

Driver describes the interface between a queue and an out of process persistence layer, like a database.

func NewInternalDriver

func NewInternalDriver() Driver

NewInternalDriver creates a local persistence layer object.

func NewMgoDriver

func NewMgoDriver(name string, opts MongoDBOptions) Driver

NewMgoDriver creates a driver object given a name, which serves as a prefix for collection names, and a MongoDB connection

func NewMgoGroupDriver

func NewMgoGroupDriver(name string, opts MongoDBOptions, group string) Driver

NewMgoGroupDriver creates a driver object given a name, which serves as a prefix for collection names, and a MongoDB connection

func NewMongoDriver

func NewMongoDriver(name string, opts MongoDBOptions) Driver

NewMongoDriver constructs a MongoDB backed queue driver implementation using the go.mongodb.org/mongo-driver as the database interface.

func NewMongoGroupDriver

func NewMongoGroupDriver(name string, opts MongoDBOptions, group string) Driver

NewMongoGroupDriver is similar to the MongoDriver, except it prefixes job ids with a prefix and adds the group field to the documents in the database which makes it possible to manage distinct queues with a single MongoDB collection.

func NewPriorityDriver

func NewPriorityDriver() Driver

NewPriorityDriver returns an initialized Priority Driver instances.

func OpenNewMgoDriver

func OpenNewMgoDriver(ctx context.Context, name string, opts MongoDBOptions, session *mgo.Session) (Driver, error)

OpenNewMgoDriver constructs and opens a new MongoDB driver instance using the specified session. It is equivalent to calling NewMgo() and calling *MongoDB.Open().

func OpenNewMgoGroupDriver

func OpenNewMgoGroupDriver(ctx context.Context, name string, opts MongoDBOptions, group string, session *mgo.Session) (Driver, error)

OpenNewMgoGroupDriver constructs and opens a new MongoDB driver instance using the specified session. It is equivalent to calling NewMgoGroup() and calling *MongoDB.Open().

func OpenNewMongoDriver

func OpenNewMongoDriver(ctx context.Context, name string, opts MongoDBOptions, client *mongo.Client) (Driver, error)

OpenNewMongoDriver constructs and opens a new MongoDB driver instance using the specified session. It is equivalent to calling NewMongoDriver() and calling driver.Open().

func OpenNewMongoGroupDriver

func OpenNewMongoGroupDriver(ctx context.Context, name string, opts MongoDBOptions, group string, client *mongo.Client) (Driver, error)

OpenNewMongoGroupDriver constructs and opens a new MongoDB driver instance using the specified session. It is equivalent to calling NewMongoGroupDriver() and calling driver.Open().

type GroupCache

type GroupCache interface {
	Set(string, amboy.Queue, time.Duration) error
	Get(string) amboy.Queue
	Remove(context.Context, string) error
	Prune(context.Context) error
	Close(context.Context) error
	Names() []string
	Len() int
}

GroupCache provides a common mechanism for managing collections of queues, for use in specific group cache situations

func NewCacheWithCleanupHook

func NewCacheWithCleanupHook(ttl time.Duration, hook func(ctx context.Context, id string) error) GroupCache

NewCacheWithCleanupHook defines a cache but allows implementations to add additional cleanup logic to the prune and Close operations.

func NewGroupCache

func NewGroupCache(ttl time.Duration) GroupCache

NewGroupCache produces a GroupCache implementation that supports a default TTL setting, and supports cloning and closing operations.

type LocalQueueGroupOptions

type LocalQueueGroupOptions struct {
	Constructor func(ctx context.Context) (amboy.Queue, error)
	TTL         time.Duration
}

LocalQueueGroupOptions describe options passed to NewLocalQueueGroup.

type LockManager

type LockManager interface {
	Lock(context.Context, amboy.Job) error
	Unlock(context.Context, amboy.Job) error
}

LockManager describes the component of the Driver interface that handles job mutexing.

func NewLockManager

func NewLockManager(ctx context.Context, d Driver) LockManager

NewLockManager configures a Lock manager for use in Driver implementations. This operation does *not* start the background thread. The name *must* be unique per driver/queue combination, to ensure that each driver/queue can have exclusive locks over jobs.

type MongoDBOptions

type MongoDBOptions struct {
	URI             string
	DB              string
	Priority        bool
	CheckWaitUntil  bool
	SkipIndexBuilds bool
	Format          amboy.Format
	WaitInterval    time.Duration
}

MongoDBOptions is a struct passed to the NewMgo constructor to communicate mgoDriver specific settings about the driver's behavior and operation.

func DefaultMongoDBOptions

func DefaultMongoDBOptions() MongoDBOptions

DefaultMongoDBOptions constructs a new options object with default values: connecting to a MongoDB instance on localhost, using the "amboy" database, and *not* using priority ordering of jobs.

type Remote

type Remote interface {
	amboy.Queue
	SetDriver(Driver) error
	Driver() Driver
}

Remote queues extend the queue interface to allow a pluggable-storage backend, or "driver"

func NewRemoteUnordered

func NewRemoteUnordered(size int) Remote

NewRemoteUnordered returns a queue that has been initialized with a local worker pool Runner instance of the specified size.

func NewSimpleRemoteOrdered

func NewSimpleRemoteOrdered(size int) Remote

NewSimpleRemoteOrdered returns a queue with a configured local runner with the specified number of workers.

type RemoteQueueGroupOptions

type RemoteQueueGroupOptions struct {
	// Prefix is a string prepended to the queue collections.
	Prefix string

	// Ordered controls if an order-respecting queue will be
	// created, while default workers sets the defualt number of
	// workers new queues will have if the WorkerPoolSize function
	// is not set.
	Ordered        bool
	DefaultWorkers int

	// WorkerPoolSize determines how many works will be allocated
	// to each queue, based on the queue ID passed to it.
	WorkerPoolSize func(string) int

	// PruneFrequency is how often Prune runs by default.
	PruneFrequency time.Duration

	// BackgroundCreateFrequency is how often the background queue
	// creation runs, in the case that queues may be created in
	// the background without
	BackgroundCreateFrequency time.Duration

	// TTL is how old the oldest task in the queue must be for the collection to be pruned.
	TTL time.Duration
}

RemoteQueueGroupOptions describe options passed to NewRemoteQueueGroup.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL