que-go: github.com/bgentry/que-go Index | Files

package que

import "github.com/bgentry/que-go"

Package que-go is a fully interoperable Golang port of Chris Hanks' Ruby Que queueing library for PostgreSQL. Que uses PostgreSQL's advisory locks for speed and reliability. See the original Que documentation for more details: https://github.com/chanks/que

Because que-go is an interoperable port of Que, you can enqueue jobs in Ruby (i.e. from a Rails app) and write your workers in Go. Or if you have a limited set of jobs that you want to write in Go, you can leave most of your workers in Ruby and just add a few Go workers on a different queue name.

PostgreSQL Driver pgx

Instead of using database/sql and the more popular pq PostgreSQL driver, this package uses the pgx driver: https://github.com/jackc/pgx

Because Que uses session-level advisory locks, we have to hold the same connection throughout the process of getting a job, working it, deleting it, and removing the lock.

Pq and the built-in database/sql interfaces do not offer this functionality, so we'd have to implement our own connection pool. Fortunately, pgx already has a perfectly usable one built for us. Even better, it offers better performance than pq due largely to its use of binary encoding.

Prepared Statements

que-go relies on prepared statements for performance. As of now these have to be initialized manually on your connection pool like so:

pgxpool, err := pgx.NewConnPool(pgx.ConnPoolConfig{
    ConnConfig:   pgxcfg,
    AfterConnect: que.PrepareStatements,
})

If you have suggestions on how to cleanly do this automatically, please open an issue!

Usage

Here is a complete example showing worker setup and two jobs enqueued, one with a delay:

type printNameArgs struct {
    Name string
}

printName := func(j *que.Job) error {
    var args printNameArgs
    if err := json.Unmarshal(j.Args, &args); err != nil {
        return err
    }
    fmt.Printf("Hello %s!\n", args.Name)
    return nil
}

pgxcfg, err := pgx.ParseURI(os.Getenv("DATABASE_URL"))
if err != nil {
    log.Fatal(err)
}

pgxpool, err := pgx.NewConnPool(pgx.ConnPoolConfig{
    ConnConfig:   pgxcfg,
    AfterConnect: que.PrepareStatements,
})
if err != nil {
    log.Fatal(err)
}
defer pgxpool.Close()

qc := que.NewClient(pgxpool)
wm := que.WorkMap{
    "PrintName": printName,
}
workers := que.NewWorkerPool(qc, wm, 2) // create a pool w/ 2 workers
go workers.Start() // work jobs in another goroutine

args, err := json.Marshal(printNameArgs{Name: "bgentry"})
if err != nil {
    log.Fatal(err)
}

j := &que.Job{
    Type:  "PrintName",
    Args:  args,
}
if err := qc.Enqueue(j); err != nil {
    log.Fatal(err)
}

j := &que.Job{
    Type:  "PrintName",
    RunAt: time.Now().UTC().Add(30 * time.Second), // delay 30 seconds
    Args:  args,
}
if err := qc.Enqueue(j); err != nil {
    log.Fatal(err)
}

time.Sleep(35 * time.Second) // wait for while

workers.Shutdown()

Index

Package Files

doc.go que.go sql.go util.go worker.go

Variables

var ErrAgain = errors.New("maximum number of LockJob attempts reached")

Returned by LockJob if a job could not be retrieved from the queue after several attempts because of concurrently running transactions. This error should not be returned unless the queue is under extremely heavy concurrency.

var ErrMissingType = errors.New("job type must be specified")

ErrMissingType is returned when you attempt to enqueue a job with no Type specified.

func PrepareStatements Uses

func PrepareStatements(conn *pgx.Conn) error

PrepareStatements prepares the required statements to run que on the provided *pgx.Conn. Typically it is used as an AfterConnect func for a *pgx.ConnPool. Every connection used by que must have the statements prepared ahead of time.

func PrepareStatementsWithPreparer Uses

func PrepareStatementsWithPreparer(p Preparer) error

PrepareStatementsWithPreparer prepares the required statements to run que on the provided Preparer. This func can be used to prepare statements on a *pgx.ConnPool after it is created, or on a *pg.Tx. Every connection used by que must have the statements prepared ahead of time.

type Client Uses

type Client struct {
    // contains filtered or unexported fields
}

Client is a Que client that can add jobs to the queue and remove jobs from the queue.

func NewClient Uses

func NewClient(pool *pgx.ConnPool) *Client

NewClient creates a new Client that uses the pgx pool.

func (*Client) Enqueue Uses

func (c *Client) Enqueue(j *Job) error

Enqueue adds a job to the queue.

func (*Client) EnqueueInTx Uses

func (c *Client) EnqueueInTx(j *Job, tx *pgx.Tx) error

EnqueueInTx adds a job to the queue within the scope of the transaction tx. This allows you to guarantee that an enqueued job will either be committed or rolled back atomically with other changes in the course of this transaction.

It is the caller's responsibility to Commit or Rollback the transaction after this function is called.

func (*Client) LockJob Uses

func (c *Client) LockJob(queue string) (*Job, error)

LockJob attempts to retrieve a Job from the database in the specified queue. If a job is found, a session-level Postgres advisory lock is created for the Job's ID. If no job is found, nil will be returned instead of an error.

Because Que uses session-level advisory locks, we have to hold the same connection throughout the process of getting a job, working it, deleting it, and removing the lock.

After the Job has been worked, you must call either Done() or Error() on it in order to return the database connection to the pool and remove the lock.

type Job Uses

type Job struct {
    // ID is the unique database ID of the Job. It is ignored on job creation.
    ID  int64

    // Queue is the name of the queue. It defaults to the empty queue "".
    Queue string

    // Priority is the priority of the Job. The default priority is 100, and a
    // lower number means a higher priority. A priority of 5 would be very
    // important.
    Priority int16

    // RunAt is the time that this job should be executed. It defaults to now(),
    // meaning the job will execute immediately. Set it to a value in the future
    // to delay a job's execution.
    RunAt time.Time

    // Type corresponds to the Ruby job_class. If you are interoperating with
    // Ruby, you should pick suitable Ruby class names (such as MyJob).
    Type string

    // Args must be the bytes of a valid JSON string
    Args []byte

    // ErrorCount is the number of times this job has attempted to run, but
    // failed with an error. It is ignored on job creation.
    ErrorCount int32

    // LastError is the error message or stack trace from the last time the job
    // failed. It is ignored on job creation.
    LastError pgtype.Text
    // contains filtered or unexported fields
}

Job is a single unit of work for Que to perform.

func (*Job) Conn Uses

func (j *Job) Conn() *pgx.Conn

Conn returns the pgx connection that this job is locked to. You may initiate transactions on this connection or use it as you please until you call Done(). At that point, this conn will be returned to the pool and it is unsafe to keep using it. This function will return nil if the Job's connection has already been released with Done().

func (*Job) Delete Uses

func (j *Job) Delete() error

Delete marks this job as complete by deleting it form the database.

You must also later call Done() to return this job's database connection to the pool.

func (*Job) Done Uses

func (j *Job) Done()

Done releases the Postgres advisory lock on the job and returns the database connection to the pool.

func (*Job) Error Uses

func (j *Job) Error(msg string) error

Error marks the job as failed and schedules it to be reworked. An error message or backtrace can be provided as msg, which will be saved on the job. It will also increase the error count.

You must also later call Done() to return this job's database connection to the pool.

type Preparer Uses

type Preparer interface {
    Prepare(name, sql string) (*pgx.PreparedStatement, error)
}

Preparer defines the interface for types that support preparing statements. This includes all of *pgx.ConnPool, *pgx.Conn, and *pgx.Tx

type WorkFunc Uses

type WorkFunc func(j *Job) error

WorkFunc is a function that performs a Job. If an error is returned, the job is reenqueued with exponential backoff.

type WorkMap Uses

type WorkMap map[string]WorkFunc

WorkMap is a map of Job names to WorkFuncs that are used to perform Jobs of a given type.

type Worker Uses

type Worker struct {
    // Interval is the amount of time that this Worker should sleep before trying
    // to find another Job.
    Interval time.Duration

    // Queue is the name of the queue to pull Jobs off of. The default value, "",
    // is usable and is the default for both que-go and the ruby que library.
    Queue string
    // contains filtered or unexported fields
}

Worker is a single worker that pulls jobs off the specified Queue. If no Job is found, the Worker will sleep for Interval seconds.

func NewWorker Uses

func NewWorker(c *Client, m WorkMap) *Worker

NewWorker returns a Worker that fetches Jobs from the Client and executes them using WorkMap. If the type of Job is not registered in the WorkMap, it's considered an error and the job is re-enqueued with a backoff.

Workers default to an Interval of 5 seconds, which can be overridden by setting the environment variable QUE_WAKE_INTERVAL. The default Queue is the nameless queue "", which can be overridden by setting QUE_QUEUE. Either of these settings can be changed on the returned Worker before it is started with Work().

func (*Worker) Shutdown Uses

func (w *Worker) Shutdown()

Shutdown tells the worker to finish processing its current job and then stop. There is currently no timeout for in-progress jobs. This function blocks until the Worker has stopped working. It should only be called on an active Worker.

func (*Worker) Work Uses

func (w *Worker) Work()

Work pulls jobs off the Worker's Queue at its Interval. This function only returns after Shutdown() is called, so it should be run in its own goroutine.

func (*Worker) WorkOne Uses

func (w *Worker) WorkOne() (didWork bool)

type WorkerPool Uses

type WorkerPool struct {
    WorkMap  WorkMap
    Interval time.Duration
    Queue    string
    // contains filtered or unexported fields
}

WorkerPool is a pool of Workers, each working jobs from the queue Queue at the specified Interval using the WorkMap.

func NewWorkerPool Uses

func NewWorkerPool(c *Client, wm WorkMap, count int) *WorkerPool

NewWorkerPool creates a new WorkerPool with count workers using the Client c.

func (*WorkerPool) Shutdown Uses

func (w *WorkerPool) Shutdown()

Shutdown sends a Shutdown signal to each of the Workers in the WorkerPool and waits for them all to finish shutting down.

func (*WorkerPool) Start Uses

func (w *WorkerPool) Start()

Start starts all of the Workers in the WorkerPool.

Package que imports 11 packages (graph) and is imported by 9 packages. Updated 2018-12-10. Refresh now. Tools for package owners.