queue

package module
v1.0.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 7, 2019 License: Apache-2.0 Imports: 8 Imported by: 0

README

go-queue GoDoc Build Status Build status codecov.io Go Report Card

Queue is a generic interface to abstract the details of implementation of queue systems.

Similar to the package database/sql, this package implements a common interface to interact with different queue systems, in a unified way.

Currently, only AMQP queues and an in-memory queue are supported.

Installation

The recommended way to install go-queue is:

go get -u gopkg.in/src-d/go-queue.v1/...

Usage

This example shows how to publish and consume a Job from the in-memory implementation, very useful for unit tests.

The queue implementations to be supported by the NewBroker should be imported as shows the example.

import (
    ...
	"gopkg.in/src-d/go-queue.v1"
	_ "gopkg.in/src-d/go-queue.v1/memory"
)

...

b, _ := queue.NewBroker("memory://")
q, _ := b.Queue("test-queue")

j, _ := queue.NewJob()
if err := j.Encode("hello world!"); err != nil {
    log.Fatal(err)
}

if err := q.Publish(j); err != nil {
    log.Fatal(err)
}

iter, err := q.Consume(1)
if err != nil {
    log.Fatal(err)
}

consumedJob, _ := iter.Next()

var payload string
_ = consumedJob.Decode(&payload)

fmt.Println(payload)
// Output: hello world!

Configuration

AMQP

The list of available variables is:

  • AMQP_BACKOFF_MIN (default: 20ms): Minimum time to wait for retry the connection or queue channel assignment.
  • AMQP_BACKOFF_MAX (default: 30s): Maximum time to wait for retry the connection or queue channel assignment.
  • AMQP_BACKOFF_FACTOR (default: 2): Multiplying factor for each increment step on the retry.
  • AMQP_BURIED_QUEUE_SUFFIX (default: .buriedQueue): Suffix for the buried queue name.
  • AMQP_BURIED_EXCHANGE_SUFFIX (default: .buriedExchange): Suffix for the exchange name.
  • AMQP_BURIED_TIMEOUT (default: 500): Time in milliseconds to wait for new jobs from the buried queue.
  • AMQP_RETRIES_HEADER (default: x-retries): Message header to set the number of retries.
  • AMQP_ERROR_HEADER (default: x-error-type): Message header to set the error type.

License

Apache License Version 2.0, see LICENSE

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrAlreadyClosed is the error returned when trying to close an already closed
	// connection.
	ErrAlreadyClosed = errors.NewKind("already closed")
	// ErrEmptyJob is the error returned when an empty job is published.
	ErrEmptyJob = errors.NewKind("invalid empty job")
	// ErrTxNotSupported is the error returned when the transaction receives a
	// callback does not know how to handle.
	ErrTxNotSupported = errors.NewKind("transactions not supported")
)
View Source
var (
	// ErrUnsupportedProtocol is the error returned when a Broker does not know
	// how to connect to a given URI.
	ErrUnsupportedProtocol = errors.NewKind("unsupported protocol: %s")
	// ErrMalformedURI is the error returned when a Broker does not know
	// how to parse a given URI.
	ErrMalformedURI = errors.NewKind("malformed connection URI: %s")
)
View Source
var ErrCantAck = errors.NewKind("can't acknowledge this message, it does not come from a queue")

ErrCantAck is the error returned when the Job does not come from a queue

Functions

func Register

func Register(name string, b BrokerBuilder)

Register registers a new BrokerBuilder to be used by NewBroker, this function should be used in an init function in the implementation packages such as `amqp` and `memory`.

Types

type Acknowledger

type Acknowledger interface {
	// Ack is called when the Job has finished.
	Ack() error
	// Reject is called if the job has errored. The parameter indicates
	// whether the job should be put back in the queue or not.
	Reject(requeue bool) error
}

Acknowledger represents the object in charge of acknowledgement management for a job. When a job is acknowledged using any of the functions in this interface, it will be considered delivered by the queue.

type Broker

type Broker interface {
	// Queue returns a Queue from the Broker with the given name.
	Queue(string) (Queue, error)
	// Close closes the connection of the Broker.
	Close() error
}

Broker represents a message broker.

func NewBroker

func NewBroker(uri string) (Broker, error)

NewBroker creates a new Broker based on the given URI. In order to register different implementations the package should be imported, example:

import _ "gopkg.in/src-d/go-queue.v1/amqp"

type BrokerBuilder

type BrokerBuilder func(uri string) (Broker, error)

BrokerBuilder instantiates a new Broker based on the given uri.

type Job

type Job struct {
	// ID of the job.
	ID string
	// Priority is the priority level.
	Priority Priority
	// Timestamp is the time of creation.
	Timestamp time.Time
	// Retries is the number of times this job can be processed before being rejected.
	Retries int32
	// ErrorType is the kind of error that made the job fail.
	ErrorType string
	// ContentType of the job
	ContentType string
	// Raw content of the Job
	Raw []byte
	// Acknowledger is the acknowledgement management system for the job.
	Acknowledger Acknowledger
}

Job contains the information for a job to be published to a queue.

func NewJob

func NewJob() (*Job, error)

NewJob creates a new Job with default values, a new unique ID and current timestamp.

func (*Job) Ack

func (j *Job) Ack() error

Ack is called when the job is finished.

func (*Job) Decode

func (j *Job) Decode(payload interface{}) error

Decode decodes the payload from the wire format.

func (*Job) Encode

func (j *Job) Encode(payload interface{}) error

Encode encodes the payload to the wire format used.

func (*Job) Reject

func (j *Job) Reject(requeue bool) error

Reject is called when the job errors. The parameter is true if and only if the job should be put back in the queue.

func (*Job) SetPriority

func (j *Job) SetPriority(priority Priority)

SetPriority sets job priority

func (*Job) Size

func (j *Job) Size() int

Size returns the size of the message.

type JobIter

type JobIter interface {
	// Next returns the next Job in the iterator. It should block until
	// a new job becomes available or while too many undelivered jobs have
	// been already returned (see the argument to Queue.Consume). Returns
	// ErrAlreadyClosed if the iterator is closed.
	Next() (*Job, error)
	io.Closer
}

JobIter represents an iterator over a set of Jobs.

type Priority

type Priority uint8

Priority represents a priority level.

const (
	// PriorityUrgent represents an urgent priority level.
	PriorityUrgent Priority = 8
	// PriorityNormal represents a normal priority level.
	PriorityNormal Priority = 4
	// PriorityLow represents a low priority level.
	PriorityLow Priority = 0
)

type Queue

type Queue interface {
	// Publish publishes the given Job to the queue.
	Publish(*Job) error
	// PublishDelayed publishes the given Job to the queue with a given delay.
	PublishDelayed(*Job, time.Duration) error
	// Transaction executes the passed TxCallback inside a transaction.
	Transaction(TxCallback) error
	// Consume returns a JobIter for the queue.  It receives the maximum
	// number of unacknowledged jobs the iterator will allow at any given
	// time (see the Acknowledger interface).
	Consume(advertisedWindow int) (JobIter, error)
	// RepublishBuried republishes to the main queue those jobs complying
	// one of the conditions, leaving the rest of them in the buried queue.
	RepublishBuried(conditions ...RepublishConditionFunc) error
}

Queue represents a message queue.

type RepublishConditionFunc

type RepublishConditionFunc func(job *Job) bool

RepublishConditionFunc is a function used to filter jobs to republish.

type RepublishConditions

type RepublishConditions []RepublishConditionFunc

RepublishConditions alias of a list RepublishConditionFunc

func (RepublishConditions) Comply

func (c RepublishConditions) Comply(job *Job) bool

Comply checks if the Job matches any of the defined conditions.

type TxCallback

type TxCallback func(q Queue) error

TxCallback is a function to be called in a transaction.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL