disque

package module
v0.0.0-...-ae073e1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 22, 2019 License: MIT Imports: 5 Imported by: 2

README

disque

Golang client for Disque, the Persistent Distributed Job Priority Queue.

  • Persistent - Jobs can be either in-memory or persisted on disk[1].
  • Distributed - Disque pool. Multiple producers, multiple consumers.
  • Job Priority Queue - Multiple queues. Consumers Get() from higher priority queues first.
  • Fault tolerant - Jobs must be replicated to N nodes before Add() returns. Consumer must Ack() the job within a specified RetryAfter timeout or the job will be re-queued automatically.

GoDoc Travis

Note: The examples below ignore error handling for readability.

Producer

import (
    "github.com/goware/disque"
)

func main() {
    // Connect to Disque pool.
    jobs, _ := disque.New("127.0.0.1:7711") // Accepts more arguments.
    defer jobs.Close()

    // Enqueue three jobs with different priorities.
    job1, _ := jobs.Add(data1, "high")
    job2, _ := jobs.Add(data2, "low")
    job3, _ := jobs.Add(data3, "urgent")

    // Block until job3 is done.
    jobs.Wait(job3)
}

Consumer (worker)

import (
    "github.com/goware/disque"
)

func main() {
    // Connect to Disque pool.
    jobs, _ := disque.New("127.0.0.1:7711") // Accepts more arguments.
    defer jobs.Close()

    for {
        // Get job from highest priority queue possible. Blocks by default.
        job, _ := jobs.Get("urgent", "high", "low") // Left-to-right priority.

        // Do some hard work with the job data.
        if err := Process(job.Data); err != nil {
            // Failed. Re-queue the job.
            jobs.Nack(job)
        }

        // Acknowledge (dequeue) the job.
        jobs.Ack(job)
    }
}

Default configuration

Config option Default value Description
Timeout 0 Block on each operation until it returns.
Replicate 0 Job doesn't need to be replicated before Add() returns.
Delay 0 Job is enqueued immediately.
RetryAfter 0 Don't re-queue job automatically.
TTL 0 Job lives until it's ACKed.
MaxLen 0 Unlimited queue.

Custom configuration

jobs, _ := disque.New("127.0.0.1:7711")

config := disque.Config{
    Timeout:    time.Second,    // Each operation fails after 1s timeout elapses.
    Replicate:  2,              // Replicates job to 2+ nodes before Add() returns.
    Delay:      time.Hour,      // Schedules the job (enqueues after 1h).
    RetryAfter: time.Minute,    // Re-queues the job after 1min of not being ACKed.
    TTL:        24 * time.Hour, // Removes the job from the queue after one day.
    MaxLen:     1000,           // Fails if there are 1000+ jobs in the queue.
}

// Apply globally.
jobs.Use(config)

// Apply to a single operation.
jobs.With(config).Add(data, "queue")

// Apply single option to a single operation.
jobs.Timeout(time.Second).Get("queue", "queue2")
jobs.MaxLen(1000).RetryAfter(time.Minute).Add(data, "queue")
jobs.TTL(24 * time.Hour).Add(data, "queue")

License

Disque is licensed under the MIT License.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Config

type Config struct {
	Timeout    time.Duration // Each operation fails after a specified timeout elapses. Blocks by default.
	Replicate  int           // Replicate job to at least N nodes before Add() returns.
	Delay      time.Duration // Schedule the job on Add() - enqueue after a specified time.
	RetryAfter time.Duration // Re-queue job after a specified time elapses between Get() and Ack().
	TTL        time.Duration // Remove the job from the queue after a specified time.
	MaxLen     int           // Fail on Add() if there are more than N jobs in the queue.
}

Config represents Disque configuration for certain operations.

type Job

type Job struct {
	ID                   string
	Data                 string
	Queue                string
	State                string
	TTL                  time.Duration
	Delay                time.Duration
	Retry                time.Duration
	CreatedAt            time.Time
	Replication          int64
	Nacks                int64
	AdditionalDeliveries int64
}

Job represents job/message returned from a Disque server.

type Pool

type Pool struct {
	// contains filtered or unexported fields
}

Pool represent Redis connection to a Disque Pool with a certain Disque configuration.

func New

func New(address string, extra ...string) (*Pool, error)

New creates a new connection to a given Disque Pool.

func NewWithPool

func NewWithPool(pool *redis.Pool) *Pool

func (*Pool) Ack

func (pool *Pool) Ack(job *Job) error

Ack acknowledges (dequeues/removes) a job from its queue.

func (*Pool) ActiveLen

func (pool *Pool) ActiveLen(queue string) (int, error)

ActiveLen returns length of active jobs taken from a given queue.

func (*Pool) Add

func (pool *Pool) Add(data string, queue string) (*Job, error)

Add enqueues new job with a specified data to a given queue.

func (*Pool) Close

func (pool *Pool) Close() error

Close closes the connection to a Disque Pool.

func (*Pool) Delay

func (pool *Pool) Delay(delay time.Duration) *Pool

Delay option applied to a single operation.

func (*Pool) Fetch

func (pool *Pool) Fetch(ID string) (*Job, error)

Fetch finds the job by its id and return its details

func (*Pool) Get

func (pool *Pool) Get(queues ...string) (*Job, error)

Get returns first available job from a highest priority queue possible (left-to-right priority).

func (*Pool) Len

func (pool *Pool) Len(queue string) (int, error)

Len returns length of a given queue.

func (*Pool) MaxLen

func (pool *Pool) MaxLen(maxlen int) *Pool

MaxLen option applied to a single operation.

func (*Pool) Nack

func (pool *Pool) Nack(job *Job) error

Nack re-queues a job back into its queue.

func (*Pool) Ping

func (pool *Pool) Ping() error

Ping returns nil if Disque Pool is alive, error otherwise.

func (*Pool) Replicate

func (pool *Pool) Replicate(replicate int) *Pool

Replicate option applied to a single operation.

func (*Pool) RetryAfter

func (pool *Pool) RetryAfter(after time.Duration) *Pool

RetryAfter option applied to a single operation.

func (*Pool) TTL

func (pool *Pool) TTL(ttl time.Duration) *Pool

TTL option applied to a single operation.

func (*Pool) Timeout

func (pool *Pool) Timeout(timeout time.Duration) *Pool

Timeout option applied to a single operation.

func (*Pool) Use

func (pool *Pool) Use(conf Config) *Pool

Use applies given config to every subsequent operation of this connection.

func (*Pool) Wait

func (pool *Pool) Wait(job *Job) error

Wait blocks until the given job is ACKed. Native WAITJOB discussed upstream at https://github.com/antirez/disque/issues/168.

func (*Pool) With

func (pool *Pool) With(conf Config) *Pool

With applies given config to a single operation.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL