disco

package module
v0.0.0-...-e2e3fa6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 17, 2016 License: MIT Imports: 6 Imported by: 0

README

Disco - GoDoc

A flexible, idiomatic approach to a Go Disque client.

The Project

Disco attempts to provide two ways of using Disque: a low level API that allows more flexibility and control for users that need it and high level API for the most common usage patterns.

High Level Api

The high level API attempts to provide a common usage pattern in a idiomatic Go manner, ideally it should simplify Disque usage by not having to deal with the nuts and bolts of the low level API.

Funnels

Funnels are an abstraction on top of a disco.Pool: they provide Go channels that you can use to enqueue or receive jobs from Disque.

  // See GoDoc for further details in connection Pool options.
  pool, _ := NewPool(2, 5, 1, time.Second * 200)
  funnel := pool.NewFunnel("disco-test-queue", "other-queue")
  defer funnel.Close()

  // Enqueue jobs simply by directing them to the Outgoing channel.
  funnel.Outgoing <- Job{Queue: "disco-test-queue", Payload: []byte("this-is-the-payload")}:

  // Receive jobs from disque simply by leveraging the Incoming channel, you can leverage
  // common Go patterns such as a select statement to handle timeouts or other kinds of errors.
  select {
  case job, ok := <- funnel.Incoming:
    string(job.Payload) //=> "this-is-the-payload" {
  case <- time.Tick(time.Second):
    // Handle timeout (or not)
  }

A funnel will also manage the job's lifecycle for you: jobs received via the Incoming channel will be acknowledged in Disque automatically (you'll still have the option to put it back in the queue if need be) and jobs fetched from Disque after the funnel is closed will be automatically NAcked so as not to lose data.

Low Level Api

Connections

Connections represent a persistent connection to a Disque cluster, it's the most basic form of Disco usage there is. Disco is built on top of Redigo, and the Connection struct is a Disque-specific wrapper around a redigo Conn interface, which means you can send commands to Disque directly.

  // Will connect to the Disque nodes specified in the $DISQUE_NODES env variable.
  connection, err := disco.NewConnection(100)

  connection, err := disco.NewConnectionToURLS(100, "localhost:7701,localhost:7702,localhost:7703")

  connection.Do("PING")
Connection Pools

In most cases it's better to have a global connection pool that your application uses instead of manually creating them each time.

  // Will connect to the Disque nodes specified in the $DISQUE_NODES env variable.
  // Args are: Max idle connections, max active, cycle and idle timeout.
  // see GoDoc for further details
  pool, err := disco.NewPool(2, 5, 1, time.Second * 200)

  connection := pool.Get()
  defer connection.Close()

  connection.Do("PING")
Wrappers for Disque commands.
AddJob

The AADDJOB command is one of the two most used one, it enqueues a payload in a given queue in Disque.

  connection, _ := disco.NewConnection(100)

  id, err := connection.AddJob("disco-test-queue", "this-is-the-payload", time.Second * 10)
GetJob

GETJOB is the other fundamental Disque command: fetches enqueued jobs from a list of specified queues.

Keep in mind that this is a blocking call.

  id, _ := connection.AddJob("disco-test-queue", "this-is-the-payload", time.Second * 10)
  job, err := connection.GetJob(1, time.Second * 10, "disco-test-queue")

  string(job.Payload) //=> "this-is-the-payload"
Ack

Wrapper around the 'ACKJOB' command.

Acknowledges the execution of one or more jobs via job IDs

  job, _ := connection.GetJob(1, time.Second * 10, "disco-test-queue")

  connection.Ack(job.ID)
NAck

Wrapper around the 'NACK' command.

The NACK command tells Disque to put the job back in the queue ASAP

  job, _ := connection.GetJob(1, time.Second * 10, "disco-test-queue")

  connection.NAck(job.ID)

Contributing

You'll need gpm installed in order to pull in the necessary dependencies.

$ git clone git@github.com:pote/disco.git && cd disco
$ source .env.sample # feel free to cp it to .env and make any config changes you deem necessary.

$ make # Will pull dependencies if necessary, build the project and run the test suite.

Documentation

Overview

Disco is a flexible, idiomatic approach to a Go Disque client.

It attempts to provide two ways of using Disque: a low level API that allows more flexibility and control for users that need it and high level API for the most common usage patterns.

High Level Api

The high level API attempts to provide a common usage pattern in a idiomatic Go manner, ideally it should simplify Disque usage by not having to deal with the nuts and bolts of the low level API.

Funnels

Funnels are an abstraction on top of a `disco.Pool`: they provide Go channels that you can use to enqueue or receive jobs from Disque.

// See GoDoc for further details in connection Pool creation.
pool, _ := NewPool(2, 5, 1, time.Second * 200)
funnel := pool.NewFunnel("disco-test-queue", "other-queue")
defer funnel.Close()

// Enqueue jobs simply by directing them to the Outgoing channel.
funnel.Outgoing <- Job{Queue: "disco-test-queue", Payload: []byte("this-is-the-payload")}:

// Receive jobs from disque simply by leveraging the Incoming channel, you can leverage
// common Go patterns such as a select statement to handle timeouts or other kinds of errors.
select {
case job, ok := <- funnel.Incoming:
  string(job.Payload) //=> "this-is-the-payload" {
case <- time.Tick(time.Second):
  // Handle timeout (or not)
}

A funnel will also manage the job's lifecycle for you: jobs received via the `Incoming` channel will be acknowledged in Disque automatically (you'll still have the option to put it back in the queue if need be) and jobs fetched from Disque after the funnel is closed will be automatically NAcked so as not to lose data.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Connection

type Connection struct {
	redis.Conn

	Cycle int
	Nodes []string
}

A Disque connection.

func NewConnection

func NewConnection(cycle int) (Connection, error)

Creates a new connection to the disque nodes specified in the `DISQUE_NODES` environment variable.

func NewConnectionToNodes

func NewConnectionToNodes(cycle int, nodes ...string) (Connection, error)

Creates a new connection to an array of Disque nodes.

func NewConnectionToURLS

func NewConnectionToURLS(cycle int, nodes string) (Connection, error)

Creates a new connection to a list of comma-separated disque node URLs.

func (*Connection) Ack

func (c *Connection) Ack(jobID string) error

Wrapper around Disque's `ACKJOB` call

func (*Connection) AddJob

func (c *Connection) AddJob(queue string, payload string, timeout time.Duration) (string, error)

Wrapper around Disque's `ADDJOB` call.

func (*Connection) GetJob

func (c *Connection) GetJob(count int, timeout time.Duration, queues ...string) (Job, error)

Wrapper around Disque's `GETJOB` call.

func (*Connection) NAck

func (c *Connection) NAck(jobID string) error

Wrapper around Disque's `NACK` call

type Funnel

type Funnel struct {
	Queues      []string
	Incoming    chan Job
	Outgoing    chan Job
	Connections *Pool

	FetchTimeout time.Duration
	FetchCount   int

	Closed bool
}

A funnel is a high-level API for Disque usage: it acts as a bridge between Disque and Go native channels, allowing for idiomatic interaction with the datastore.

func NewFunnel

func NewFunnel(pool *Pool, fetchCount int, fetchTimeout time.Duration, queues ...string) Funnel

Creates a new funnel with a specific queue configuration and starts the appropriate goroutines to keep it's go channels synchronized with Disque.

func (*Funnel) Close

func (f *Funnel) Close()

Marks the funnel as closed, which in turn closes its internal go channels gracefully.

func (*Funnel) Dispatch

func (f *Funnel) Dispatch()

Listens to the `Outgoing` channel in the funnel, and dispatches any messages received to it's appropriate queue taking a connection from the funnel's pool.

This is a blocking call which is launched on a goroutine when #NewFunnel is called, you won't reguarly call it directly, but it's left as a public method to allow more flexibility of use cases.

func (*Funnel) Listen

func (f *Funnel) Listen()

Takes a connection from the funnel's Disque connection pool and uses it to fetch jobs from the funnel's configured queues.

This is a blocking call which is launched on a goroutine when #NewFunnel is called, you won't reguarly call it directly, but it's left as a public method to allow more flexibility of use cases.

type Job

type Job struct {
	Queue   string
	ID      string
	Payload []byte
}

Wraps a Disque job in a Go struct.

type Pool

type Pool struct {
	Connections redis.Pool
	Cycle       int
	Nodes       []string
}

A Disque connection pool.

func NewPool

func NewPool(maxIdle, maxActive, cycle int, idleTimeout time.Duration) (Pool, error)

Creates a new Pool with connections to the disque nodes specified in the `DISQUE_NODES` environment variable.

func NewPoolToNodes

func NewPoolToNodes(maxIdle, maxActive, cycle int, idleTimeout time.Duration, nodes ...string) (Pool, error)

Creates a new Pool with connections to an array of Disque nodes.

func NewPoolToURLS

func NewPoolToURLS(maxIdle, maxActive, cycle int, idleTimeout time.Duration, urls string) (Pool, error)

Creates a new Pool with connections to a list of comma-separated disque node URLs.

func (*Pool) Get

func (p *Pool) Get() Connection

Returns a disco.Connection from the Pool.

func (*Pool) NewFunnel

func (p *Pool) NewFunnel(queues ...string) Funnel

Creates a funnel using this Pool.

func (*Pool) NewFunnelWithOptions

func (p *Pool) NewFunnelWithOptions(fetchCount int, fetchTimeout time.Duration, queues ...string) Funnel

Creates a funnel using this Pool, allowing for custom configuration options.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL