Documentation ¶
Overview ¶
Package mgojq is a wrapper for mgo that turns MongoDB into a job queue.
Example ¶
// get jobs collection coll := Wrap(db.C("jobs")) // ensure indexes err := coll.EnsureIndexes(7 * 24 * time.Hour) if err != nil { panic(err) } // create a worker pool pool := NewPool(1, 100*time.Millisecond, 1*time.Hour) // register worker pool.Register("Adder", func(c *Collection, j *Job, q <-chan struct{}) error { r := j.Params["a"].(int) + j.Params["b"].(int) c.Complete(j.ID, bson.M{"r": r}) return nil }) // start pool pool.Start(coll) defer pool.Close() // add job id, err := coll.Enqueue("Adder", bson.M{"a": 10, "b": 5}, 0) if err != nil { panic(err) } // wait some time time.Sleep(200 * time.Millisecond) // get job job, err := coll.Fetch(id) if err != nil { panic(err) } fmt.Printf("%s: %d\n", job.Status, job.Result["r"].(int))
Output: completed: 15
Index ¶
- Constants
- type Bulk
- type Collection
- func (c *Collection) Bulk() *Bulk
- func (c *Collection) Cancel(id bson.ObjectId, reason string) error
- func (c *Collection) Complete(id bson.ObjectId, result bson.M) error
- func (c *Collection) Dequeue(names []string, timeout time.Duration) (*Job, error)
- func (c *Collection) Enqueue(name string, params bson.M, delay time.Duration) (bson.ObjectId, error)
- func (c *Collection) EnsureIndexes(removeAfter time.Duration) error
- func (c *Collection) Fail(id bson.ObjectId, error string, delay time.Duration) error
- func (c *Collection) Fetch(id bson.ObjectId) (*Job, error)
- type Job
- type Pool
- type Worker
Examples ¶
Constants ¶
const ( StatusEnqueued = "enqueued" StatusDequeued = "dequeued" StatusCompleted = "completed" StatusFailed = "failed" StatusCancelled = "cancelled" )
The available job statuses.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Bulk ¶
type Bulk struct {
// contains filtered or unexported fields
}
A Bulk represents an operation that can be used to enqueue multiple jobs at once. It is a wrapper around the mgo.Bulk type.
func (*Bulk) Enqueue ¶
Enqueue will queue the insert in the bulk operation. The returned id is only valid if the bulk operation run successfully,.
type Collection ¶
type Collection struct {
// contains filtered or unexported fields
}
A Collection represents a job queue enabled collection. It is a wrapper around the mgo.Collection type.
func Wrap ¶
func Wrap(coll *mgo.Collection) *Collection
Wrap will take a mgo.Collection and return a Collection.
func (*Collection) Cancel ¶
func (c *Collection) Cancel(id bson.ObjectId, reason string) error
Cancel will cancel the specified job with the specified reason.
func (*Collection) Complete ¶
Complete will complete the specified job and set the specified result.
func (*Collection) Enqueue ¶
func (c *Collection) Enqueue(name string, params bson.M, delay time.Duration) (bson.ObjectId, error)
Enqueue will enqueue a job using the specified name and params. If a delay is specified the job will not dequeued until the specified time has passed. If not error is returned the returned job id is valid.
func (*Collection) EnsureIndexes ¶
func (c *Collection) EnsureIndexes(removeAfter time.Duration) error
EnsureIndexes will ensure that the necessary indexes have been created. If removeAfter is specified, jobs are automatically removed when their ended timestamp falls behind the specified duration. Warning: this also applies to failed jobs!
Note: It is recommended to create custom indexes that support the exact nature of data and access patterns.
type Job ¶
type Job struct { // The unique id of the job. ID bson.ObjectId `bson:"_id"` // The name of the job. Name string // The params that have been supplied on creation. Params bson.M // The current status of the job. Status string // The time when the job was created. Created time.Time // The time until the job is delayed for execution. Delayed time.Time `bson:",omitempty"` // The time when the job was the last time dequeued. Started time.Time `bson:",omitempty"` // The time when the job was ended (completed, failed or cancelled). Ended time.Time `bson:",omitempty"` // Attempts can be used to determine if a job should be cancelled after too // many attempts. Attempts int // The supplied result submitted during completion. Result bson.M `bson:",omitempty"` // The error from the last failed attempt. Error string `bson:",omitempty"` // The reason that has been submitted when job was cancelled. Reason string `bson:",omitempty"` }
A Job as it is returned by Dequeue.
type Pool ¶
type Pool struct {
// contains filtered or unexported fields
}
Pool manages multiple goroutines that dequeue jobs.
func (*Pool) Close ¶
func (p *Pool) Close() <-chan struct{}
Close will close the pool and initiate the shutdown procedure. The returned channel will be closed when the pool has shut down.
func (*Pool) Start ¶
func (p *Pool) Start(coll *Collection) <-chan struct{}
Start will start the worker pool. The worker pool will dequeue and process jobs until an error occurs. The returned channel will be closed when the pool is shutting down either because of an error or Close has been called.
type Worker ¶
type Worker func(c *Collection, j *Job, quit <-chan struct{}) error
Worker is a function that processes a job. The function must complete, fail or cancel the job on its own. If the provided channel is closes the worker should immediately finish the job and cancel long running jobs.