queue

package
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 3, 2018 License: Apache-2.0 Imports: 14 Imported by: 8

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrConnectionFailed = errors.NewKind("failed to connect to RabbitMQ: %s")
	ErrOpenChannel      = errors.NewKind("failed to open a channel: %s")
	ErrRetrievingHeader = errors.NewKind("error retrieving '%s' header from message %s")
	ErrRepublishingJobs = errors.NewKind("couldn't republish some jobs : %s")
)
View Source
var (
	// ErrAlreadyClosed is the error returned when trying to close an already closed
	// connection.
	ErrAlreadyClosed = errors.NewKind("already closed")
	// ErrEmptyJob is the error returned when an empty job is published.
	ErrEmptyJob = errors.NewKind("invalid empty job")
	// ErrTxNotSupported is the error returned when the transaction receives a
	// callback does not know how to handle.
	ErrTxNotSupported = errors.NewKind("transactions not supported")
	// ErrUnsupportedProtocol is the error returned when a Broker does not know how
	// to connect to a given URL
	ErrUnsupportedProtocol = errors.NewKind("unsupported protocol")
)
View Source
var ErrCantAck = errors.NewKind("can't acknowledge this message, it does not come from a queue")

Functions

This section is empty.

Types

type AMQPAcknowledger

type AMQPAcknowledger struct {
	// contains filtered or unexported fields
}

AMQPAcknowledger implements the Acknowledger for AMQP.

func (*AMQPAcknowledger) Ack

func (a *AMQPAcknowledger) Ack() error

Ack signals ackwoledgement.

func (*AMQPAcknowledger) Reject

func (a *AMQPAcknowledger) Reject(requeue bool) error

Reject signals rejection. If requeue is false, the job will go to the buried queue until Queue.RepublishBuried() is called.

type AMQPBroker

type AMQPBroker struct {
	// contains filtered or unexported fields
}

AMQPBroker implements the Broker interface for AMQP.

func (*AMQPBroker) Close

func (b *AMQPBroker) Close() error

Close closes all the connections managed by the broker.

func (*AMQPBroker) Queue

func (b *AMQPBroker) Queue(name string) (Queue, error)

Queue returns the queue with the given name.

type AMQPJobIter

type AMQPJobIter struct {
	// contains filtered or unexported fields
}

AMQPJobIter implements the JobIter interface for AMQP.

func (*AMQPJobIter) Close

func (i *AMQPJobIter) Close() error

Close closes the channel of the JobIter.

func (*AMQPJobIter) Next

func (i *AMQPJobIter) Next() (*Job, error)

Next returns the next job in the iter.

type AMQPQueue

type AMQPQueue struct {
	// contains filtered or unexported fields
}

AMQPQueue implements the Queue interface for the AMQP.

func (*AMQPQueue) Consume

func (q *AMQPQueue) Consume(advertisedWindow int) (JobIter, error)

Implements Queue. The advertisedWindow value will be the exact number of undelivered jobs in transit, not just the minium.

func (*AMQPQueue) Publish

func (q *AMQPQueue) Publish(j *Job) error

Publish publishes the given Job to the Queue.

func (*AMQPQueue) PublishDelayed

func (q *AMQPQueue) PublishDelayed(j *Job, delay time.Duration) error

PublishDelayed publishes the given Job with a given delay. Delayed messages wont go into the buried queue if they fail.

func (*AMQPQueue) RepublishBuried

func (q *AMQPQueue) RepublishBuried(conditions ...RepublishConditionFunc) error

RepublishBuried will republish in the main queue those jobs that timed out without Ack or were Rejected with requeue = False and makes comply return true.

func (*AMQPQueue) Transaction

func (q *AMQPQueue) Transaction(txcb TxCallback) error

Transaction executes the given callback inside a transaction.

type Acknowledger

type Acknowledger interface {
	// Ack is called when the Job has finished.
	Ack() error
	// Reject is called if the job has errored. The parameter indicates
	// whether the job should be put back in the queue or not.
	Reject(requeue bool) error
}

Acknowledger represents the object in charge of acknowledgement management for a job. When a job is acknowledged using any of the functions in this interface, it will be considered delivered by the queue.

type Broker

type Broker interface {
	// Queue returns a Queue from the with the given name.
	Queue(string) (Queue, error)
	// Close closes the connection of the Broker.
	Close() error
}

Broker represents a message broker.

func NewAMQPBroker

func NewAMQPBroker(url string) (Broker, error)

NewAMQPBroker creates a new AMQPBroker.

func NewBroker

func NewBroker(uri string) (Broker, error)

NewBroker creates a new Broker based on the given URI. Possible URIs are

amqp://<host>[:port]
memory://

func NewMemoryBroker

func NewMemoryBroker() Broker

Creates a new Broker for an in-memory queue.

type Job

type Job struct {
	// ID of the job.
	ID string
	// Priority is the priority level.
	Priority Priority
	// Timestamp is the time of creation.
	Timestamp time.Time
	// Retries is the number of times this job can be processed before being rejected.
	Retries int32
	// ErrorType is the kind of error that made the job failing.
	ErrorType string
	// contains filtered or unexported fields
}

Job contains the information for a job to be published to a queue.

func NewJob

func NewJob() (*Job, error)

NewJob creates a new Job with default values, a new unique ID and current timestamp.

func (*Job) Ack

func (j *Job) Ack() error

Ack is called when the job is finished.

func (*Job) Decode

func (j *Job) Decode(payload interface{}) error

Decode decodes the payload from the wire format.

func (*Job) Encode

func (j *Job) Encode(payload interface{}) error

Encode encodes the payload to the wire format used.

func (*Job) Reject

func (j *Job) Reject(requeue bool) error

Reject is called when the job errors. The parameter is true if and only if the job should be put back in the queue.

func (*Job) SetPriority

func (j *Job) SetPriority(priority Priority)

SetPriority sets job priority

type JobIter

type JobIter interface {
	// Next returns the next Job in the iterator. It should block until
	// the job becomes available or while too many undelivered jobs has
	// been already returned (see the argument to Queue.Consume). Returns
	// ErrAlreadyClosed if the iterator is closed.
	Next() (*Job, error)
	io.Closer
}

JobIter represents an iterator over a set of Jobs.

type Priority

type Priority uint8

Priority represents a priority level.

const (
	// PriorityUrgent represents an urgent priority level.
	PriorityUrgent Priority = 8
	// PriorityNormal represents a normal priority level.
	PriorityNormal Priority = 4
	// PriorityLow represents a low priority level.
	PriorityLow Priority = 0
)

type Queue

type Queue interface {
	// Publish publishes the given Job to the queue.
	Publish(*Job) error
	// Publish publishes the given Job to the queue with a given delay.
	PublishDelayed(*Job, time.Duration) error
	// Transaction executes the passed TxCallback inside a transaction.
	Transaction(TxCallback) error
	// Consume returns a JobIter for the queue.  Ir receives the minimum
	// number of undelivered jobs the iterator will allow at any given
	// time (see the Acknowledger interface).
	Consume(advertisedWindow int) (JobIter, error)
	// RepublishBuried republish to the main queue those jobs complying
	// one of the conditions, leaving the rest of them in the buried queue.
	RepublishBuried(conditions ...RepublishConditionFunc) error
}

Queue represents a message queue.

type RepublishConditionFunc

type RepublishConditionFunc func(job *Job) bool

RepublishConditionFunc is a function used to filter jobs to republish.

type TxCallback

type TxCallback func(q Queue) error

TxCallback is a function to be called in a transaction.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL