mgojq

package module
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 8, 2019 License: MIT Imports: 4 Imported by: 0

README

mgojq

Build Status Coverage Status GoDoc Release Go Report Card

A wrapper for mgo that turns MongoDB into a job queue.

Example

// get jobs collection
coll := Wrap(db.C("jobs"))

// ensure indexes
err := coll.EnsureIndexes(7 * 24 * time.Hour)
if err != nil {
    panic(err)
}

// create a worker pool
pool := NewPool(1, 100*time.Millisecond, 1*time.Hour)

// register worker
pool.Register("Adder", func(c *Collection, j *Job, q <-chan struct{}) error {
    r := j.Params["a"].(int) + j.Params["b"].(int)
    c.Complete(j.ID, bson.M{"r": r})
    return nil
})

// start pool
pool.Start(coll)
defer pool.Close()

// add job
id, err := coll.Enqueue("Adder", bson.M{"a": 10, "b": 5}, 0)
if err != nil {
    panic(err)
}

// wait some time
time.Sleep(200 * time.Millisecond)

// get job
job, err := coll.Fetch(id)
if err != nil {
    panic(err)
}

fmt.Printf("%s: %d\n", job.Status, job.Result["r"].(int))

// Output:
// completed: 15

Documentation

Overview

Package mgojq is a wrapper for mgo that turns MongoDB into a job queue.

Example
// get jobs collection
coll := Wrap(db.C("jobs"))

// ensure indexes
err := coll.EnsureIndexes(7 * 24 * time.Hour)
if err != nil {
	panic(err)
}

// create a worker pool
pool := NewPool(1, 100*time.Millisecond, 1*time.Hour)

// register worker
pool.Register("Adder", func(c *Collection, j *Job, q <-chan struct{}) error {
	r := j.Params["a"].(int) + j.Params["b"].(int)
	c.Complete(j.ID, bson.M{"r": r})
	return nil
})

// start pool
pool.Start(coll)
defer pool.Close()

// add job
id, err := coll.Enqueue("Adder", bson.M{"a": 10, "b": 5}, 0)
if err != nil {
	panic(err)
}

// wait some time
time.Sleep(200 * time.Millisecond)

// get job
job, err := coll.Fetch(id)
if err != nil {
	panic(err)
}

fmt.Printf("%s: %d\n", job.Status, job.Result["r"].(int))
Output:

completed: 15

Index

Examples

Constants

View Source
const (
	StatusEnqueued  = "enqueued"
	StatusDequeued  = "dequeued"
	StatusCompleted = "completed"
	StatusFailed    = "failed"
	StatusCancelled = "cancelled"
)

The available job statuses.

Variables

This section is empty.

Functions

This section is empty.

Types

type Bulk

type Bulk struct {
	// contains filtered or unexported fields
}

A Bulk represents an operation that can be used to enqueue multiple jobs at once. It is a wrapper around the mgo.Bulk type.

func (*Bulk) Cancel

func (b *Bulk) Cancel(id bson.ObjectId, reason string)

Cancel will queue the cancel in the bulk operation.

func (*Bulk) Complete

func (b *Bulk) Complete(id bson.ObjectId, result bson.M)

Complete will queue the complete in the bulk operation.

func (*Bulk) Enqueue

func (b *Bulk) Enqueue(name string, params bson.M, delay time.Duration) bson.ObjectId

Enqueue will queue the insert in the bulk operation. The returned id is only valid if the bulk operation run successfully,.

func (*Bulk) Fail

func (b *Bulk) Fail(id bson.ObjectId, error string, delay time.Duration)

Fail will queue the fail in the bulk operation.

func (*Bulk) Run

func (b *Bulk) Run() error

Run will insert all queued insert operations.

type Collection

type Collection struct {
	// contains filtered or unexported fields
}

A Collection represents a job queue enabled collection. It is a wrapper around the mgo.Collection type.

func Wrap

func Wrap(coll *mgo.Collection) *Collection

Wrap will take a mgo.Collection and return a Collection.

func (*Collection) Bulk

func (c *Collection) Bulk() *Bulk

Bulk will return a new bulk operation.

func (*Collection) Cancel

func (c *Collection) Cancel(id bson.ObjectId, reason string) error

Cancel will cancel the specified job with the specified reason.

func (*Collection) Complete

func (c *Collection) Complete(id bson.ObjectId, result bson.M) error

Complete will complete the specified job and set the specified result.

func (*Collection) Dequeue

func (c *Collection) Dequeue(names []string, timeout time.Duration) (*Job, error)

Dequeue will try to dequeue a job.

func (*Collection) Enqueue

func (c *Collection) Enqueue(name string, params bson.M, delay time.Duration) (bson.ObjectId, error)

Enqueue will enqueue a job using the specified name and params. If a delay is specified the job will not dequeued until the specified time has passed. If not error is returned the returned job id is valid.

func (*Collection) EnsureIndexes

func (c *Collection) EnsureIndexes(removeAfter time.Duration) error

EnsureIndexes will ensure that the necessary indexes have been created. If removeAfter is specified, jobs are automatically removed when their ended timestamp falls behind the specified duration. Warning: this also applies to failed jobs!

Note: It is recommended to create custom indexes that support the exact nature of data and access patterns.

func (*Collection) Fail

func (c *Collection) Fail(id bson.ObjectId, error string, delay time.Duration) error

Fail will fail the specified job with the specified error. Delay can be set enforce a delay until the job can be dequeued again.

func (*Collection) Fetch

func (c *Collection) Fetch(id bson.ObjectId) (*Job, error)

Fetch will load the job with the specified id.

type Job

type Job struct {
	// The unique id of the job.
	ID bson.ObjectId `bson:"_id"`

	// The name of the job.
	Name string

	// The params that have been supplied on creation.
	Params bson.M

	// The current status of the job.
	Status string

	// The time when the job was created.
	Created time.Time

	// The time until the job is delayed for execution.
	Delayed time.Time `bson:",omitempty"`

	// The time when the job was the last time dequeued.
	Started time.Time `bson:",omitempty"`

	// The time when the job was ended (completed, failed or cancelled).
	Ended time.Time `bson:",omitempty"`

	// Attempts can be used to determine if a job should be cancelled after too
	// many attempts.
	Attempts int

	// The supplied result submitted during completion.
	Result bson.M `bson:",omitempty"`

	// The error from the last failed attempt.
	Error string `bson:",omitempty"`

	// The reason that has been submitted when job was cancelled.
	Reason string `bson:",omitempty"`
}

A Job as it is returned by Dequeue.

type Pool

type Pool struct {
	// contains filtered or unexported fields
}

Pool manages multiple goroutines that dequeue jobs.

func NewPool

func NewPool(size int, interval, timeout time.Duration) *Pool

NewPool will create a new pool.

func (*Pool) Close

func (p *Pool) Close() <-chan struct{}

Close will close the pool and initiate the shutdown procedure. The returned channel will be closed when the pool has shut down.

func (*Pool) Register

func (p *Pool) Register(name string, worker Worker)

Register will register the specified worker for the specified job name.

func (*Pool) Start

func (p *Pool) Start(coll *Collection) <-chan struct{}

Start will start the worker pool. The worker pool will dequeue and process jobs until an error occurs. The returned channel will be closed when the pool is shutting down either because of an error or Close has been called.

func (*Pool) Wait

func (p *Pool) Wait() error

Wait will wait until the pool has shut down and will return the error.

type Worker

type Worker func(c *Collection, j *Job, quit <-chan struct{}) error

Worker is a function that processes a job. The function must complete, fail or cancel the job on its own. If the provided channel is closes the worker should immediately finish the job and cancel long running jobs.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL