mq

package module
v2.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 28, 2020 License: Apache-2.0 Imports: 12 Imported by: 0

README

mq GoDoc Go Go Report Card

MQ is a generic interface to abstract the details of implementation of messaging queues systems.

Similar to the package database/sql, this package implements a common interface to interact with different message queue systems, in a unified way.

Installation

The recommended way to install mq is:

go get github.com/go-mq/mq/v2

Usage

This example shows how to publish and consume a Job from the in-memory implementation, very useful for unit tests.

The mq implementations to be supported by the NewBroker should be imported as shows the example.

package main

import (
    "fmt"
    "github.com/go-mq/mq/v2"
    _ "github.com/go-mq/mq/v2/memory"
    "log"
)

//...
func main() {
    b, _ := mq.NewBroker("memory://")
    q, _ := b.Queue("test-queue")
    j := mq.NewJob()
    
    if err := q.Publish(j); err != nil {
        log.Fatal(err)
    }
    
    iter, err := q.Consume(1)
    if err != nil {
        log.Fatal(err)
    }
    
    consumedJob, _ := iter.Next()
    
    var payload string
    _ = consumedJob.Decode(&payload)
    
    fmt.Println(payload)
    // Output: hello world!
}

License

Apache License Version 2.0, see LICENSE

Documentation

Index

Constants

View Source
const (
	ContentTypeMsgpack  = "application/msgpack"
	ContentTypeJSON     = "application/json"
	ContentTypeYAML     = "application/yaml"
	ContentTypeProtobuf = "application/protobuf"
)

Variables

View Source
var (
	// ErrAlreadyClosed is the error returned when trying to close an already closed
	// connection.
	ErrAlreadyClosed = errors.NewKind("already closed")
	// ErrEmptyJob is the error returned when an empty job is published.
	ErrEmptyJob = errors.NewKind("invalid empty job")
	// ErrTxNotSupported is the error returned when the transaction receives a
	// callback does not know how to handle.
	ErrTxNotSupported = errors.NewKind("transactions not supported")
)
View Source
var (
	// ErrUnsupportedProtocol is the error returned when a Broker does not know
	// how to connect to a given URI.
	ErrUnsupportedProtocol = errors.NewKind("unsupported protocol: %s")
	// ErrMalformedURI is the error returned when a Broker does not know
	// how to parse a given URI.
	ErrMalformedURI = errors.NewKind("malformed connection URI: %s")
)
View Source
var ErrCantAck = errors.NewKind("can't acknowledge this message, it does not come from a queue")

ErrCantAck is the error returned when the Job does not come from a queue

Functions

func Register

func Register(name string, b BrokerBuilder)

Register registers a new BrokerBuilder to be used by NewBroker, this function should be used in an init function in the implementation packages such as `amqp` and `memory`.

Types

type Acknowledger

type Acknowledger interface {
	// Ack is called when the Job has finished.
	Ack() error
	// Reject is called if the job has errored. The parameter indicates
	// whether the job should be put back in the queue or not.
	Reject(requeue bool) error
}

Acknowledger represents the object in charge of acknowledgement management for a job. When a job is acknowledged using any of the functions in this interface, it will be considered delivered by the queue.

type Broker

type Broker interface {
	// Queue returns a Queue from the Broker with the given name.
	Queue(string) (Queue, error)
	// Close closes the connection of the Broker.
	Close() error
}

Broker represents a message broker.

func NewBroker

func NewBroker(uri string) (Broker, error)

NewBroker creates a new Broker based on the given URI. In order to register different implementations the package should be imported, example:

import _ "github.com/go-mq/mq/v1/amqp"

type BrokerBuilder

type BrokerBuilder func(uri string) (Broker, error)

BrokerBuilder instantiates a new Broker based on the given uri.

type Job

type Job struct {
	// ID of the job.
	ID string
	// Priority is the priority level.
	Priority Priority
	// Timestamp is the time of creation.
	Timestamp time.Time
	// Retries is the number of times this job can be processed before being rejected.
	Retries int32
	// ErrorType is the kind of error that made the job fail.
	ErrorType string
	// ContentType of the job
	ContentType string
	// Raw content of the Job
	Raw []byte
	// Acknowledger is the acknowledgement management system for the job.
	Acknowledger Acknowledger
}

Job contains the information for a job to be published to a queue.

func NewJob

func NewJob() *Job

NewJob creates a new Job with default values, a new unique ID and current timestamp.

func (*Job) Ack

func (j *Job) Ack() error

Ack is called when the job is finished.

func (*Job) Decode

func (j *Job) Decode(payload interface{}) error

Decode decodes the payload from the wire format.

func (*Job) Encode

func (j *Job) Encode(payload interface{}) error

Encode encodes the payload to the wire format used.

func (*Job) Reject

func (j *Job) Reject(requeue bool) error

Reject is called when the job errors. The parameter is true if and only if the job should be put back in the queue.

func (*Job) SetPriority

func (j *Job) SetPriority(priority Priority)

SetPriority sets job priority

func (*Job) Size

func (j *Job) Size() int

Size returns the size of the message.

type JobIter

type JobIter interface {
	// Next returns the next Job in the iterator. It should block until
	// a new job becomes available or while too many undelivered jobs have
	// been already returned (see the argument to Queue.Consume). Returns
	// ErrAlreadyClosed if the iterator is closed.
	Next() (*Job, error)
	io.Closer
}

JobIter represents an iterator over a set of Jobs.

type Marshaler

type Marshaler interface {
	Marshal() ([]byte, error)
}

type Priority

type Priority uint8

Priority represents a priority level.

const (
	// PriorityUrgent represents an urgent priority level.
	PriorityUrgent Priority = 8
	// PriorityNormal represents a normal priority level.
	PriorityNormal Priority = 4
	// PriorityLow represents a low priority level.
	PriorityLow Priority = 0
)

type Queue

type Queue interface {
	// Publish publishes the given Job to the queue.
	Publish(*Job) error
	// PublishDelayed publishes the given Job to the queue with a given delay.
	PublishDelayed(*Job, time.Duration) error
	// Transaction executes the passed TxCallback inside a transaction.
	Transaction(TxCallback) error
	// Consume returns a JobIter for the queue.  It receives the maximum
	// number of unacknowledged jobs the iterator will allow at any given
	// time (see the Acknowledger interface).
	Consume(advertisedWindow int) (JobIter, error)
	// RepublishBuried republishes to the main queue those jobs complying
	// one of the conditions, leaving the rest of them in the buried queue.
	RepublishBuried(conditions ...RepublishConditionFunc) error
}

Queue represents a message queue.

type RepublishConditionFunc

type RepublishConditionFunc func(job *Job) bool

RepublishConditionFunc is a function used to filter jobs to republish.

type RepublishConditions

type RepublishConditions []RepublishConditionFunc

RepublishConditions alias of a list RepublishConditionFunc

func (RepublishConditions) Comply

func (c RepublishConditions) Comply(job *Job) bool

Comply checks if the Job matches any of the defined conditions.

type TxCallback

type TxCallback func(q Queue) error

TxCallback is a function to be called in a transaction.

type Unmarshaler

type Unmarshaler interface {
	Unmarshal([]byte) error
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL