amqpirq

package module
v0.0.0-...-e612aa3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 20, 2017 License: Apache-2.0 Imports: 7 Imported by: 0

README

Interruptible connectivity for Go RabbitMQ Client Library

A wrapper for Go RabbitMQ Client Library providing auto-reconnecting functionality to amqp.Connection.

Maximum number of re-connect attempts can be configured by setting int value conn.MaxAttempts. By default the value is unlimited (-1). After a network failure or any other connection issue, an attempt is going to be made to connect after conn.Delay seconds (by default 30).

It is possible to start multiple listeners on a single connection in separate goroutines. The underlying amqp.Connection is expected to be thread safe.

An interruptible connection can be configured using one of three Dial[...] functions that map to related amqp.Dial[...] functions respectively: amqpirq.Dial, amqpirq.DialTLS and amqpirq.DialConfig.

Status

Beta

Build Status Coverage Status GoDoc

Usage

Go get
go get -u github.com/go-amqpirq/amqpirq
Import
import "github.com/go-amqpirq/amqpirq"
Examples

Depending on level of control required by implementing application, it is possible to integrate amqpirq with business logic for amqp.Delivery, amqp.Channel or amqp.Connection

amqp.Delivery

Implement amqpirq.DeliveryConsumer interface providing your business logic to handle inbound amqp.Delivery. NOTE: remember to acknowledge the delivery by invoking d.Ack or d.Reject as required by the business flow of your application:

type MyDeliveryConsumer struct {
}
 
func (*MyDeliveryConsumer) Consume(ch *amqp.Channel, d *amqp.Delivery) {
        defer d.Ack(false)
        doStuff(&d)
}

Configure parallel connection worker for the consumer MyDeliveryConsumer, e.g.:

...
conn, err := amqpirq.Dial("amqp://guest:guest@127.0.0.1:5672//")
if err != nil {
        panic(err)
}
defer conn.Close()
 
consumer := new(MyDeliveryConsumer)
queueMaker := func(ch *amqp.Channel) (amqp.Queue, error) { return amqpirq.NamedReplyQueue(ch, "work_queue") }
numWorkers := 16
worker, err := NewParallelConnectionWorker(queueMaker, numWorkers, consumer)
if err != nil {
        panic(err)
}
go conn.Listen(worker)
...
amqp.Channel

Implement amqpirq.ChannelWorker interface to integrate requirements for interacting directly with amqp.Channel, e.g.:

type MyChannelWorker struct {
}
 
func (*MyChannelWorker) Do(ch *amqp.Channel, done <-chan struct{}) {
        // do things with channel
        <-done
}

Configure connection worker for the channel processor MyChannelWorker, e.g.:

...
conn, err := amqpirq.Dial("amqp://guest:guest@127.0.0.1:5672//")
if err != nil {
        panic(err)
}
defer conn.Close()
 
processor := new(MyChannelWorker)
worker := NewConnectionWorker(processor)
if err != nil {
        panic(err)
}
go conn.Listen(worker)
...
amqp.Connection

Implement amqpirq.ConnectionWorker interface and start amqpirq.Connection.Listen using specialised worker, e.g.:

type MyConnectionWorker struct {
}
 
func (*MyConnectionWorker) Do(conn *amqp.Connection, done <-chan struct{}) {
        // to things with connection
        <-done
}

...
conn, err := amqpirq.Dial("amqp://guest:guest@127.0.0.1:5672//")
if err != nil {
        panic(err)
}
defer conn.Close()
 
go conn.Listen(new(MyConnectionWorker))
...

Testing

To run all the tests, make sure you have RabbitMQ running on a host, export the environment variable AMQP_URI= and run go test -v ./....

License

Apache License v2 - see LICENSE for more details.

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// NamedReplyQueue is a lambda for a amqp.Queue key definition on
	// amqp.Channel ch. The queue is defined as durable, non-exclusive
	NamedReplyQueue = func(ch *amqp.Channel, key string) (amqp.Queue, error) {
		return ch.QueueDeclare(
			key,
			true,
			false,
			false,
			false,
			nil,
		)
	}
)

Functions

This section is empty.

Types

type ChannelWorker

type ChannelWorker interface {
	Do(*amqp.Channel, <-chan struct{})
}

ChannelWorker is enabling configuring queues to implement client code depending on amqp.Channel. The Do function is usually invoked in a goroutine and the channel is closed when either a connection to the broker is lost or the amqpirq.Connection is closed.

func NewFixedChannelWorker

func NewFixedChannelWorker(queue func(ch *amqp.Channel) (amqp.Queue, error), size int, consumer DeliveryConsumer) ChannelWorker

NewFixedChannelWorker configures fixed size number of workers on amqp.Queue declared by lambda queue

func NewFixedChannelWorkerName

func NewFixedChannelWorkerName(key string, size int, consumer DeliveryConsumer) ChannelWorker

NewFixedChannelWorker configures fixed size number of workers on amqp.Queue configured using NamedReplyQueue with name key

type Connection

type Connection struct {
	// MaxAttempts is after how many unsuccessful attempts an error
	// is returned
	MaxAttempts int
	// Delay is number of seconds to wait before re-attempting connection
	Delay uint
	// contains filtered or unexported fields
}

Connection facilitates interruptible connectivity to RabbitMQ broker, re-attempting connects until max attempts (default unlimited) with configured delay (default 30 secods)

func Dial

func Dial(url string) (*Connection, error)

Dial is a wrapper around amqp.Dial that accepts a string in the AMQP URI format and returns a new Connection over TCP using PlainAuth.

func DialConfig

func DialConfig(url string, config amqp.Config) (*Connection, error)

DialConfig is a wrapper around amqp.DialConfig that accepts a string in the AMQP URI format and a configuration for the transport and connection setup, returning a new Connection.

func DialTLS

func DialTLS(url string, amqps *tls.Config) (*Connection, error)

DialTLS is a wrapper around amqp.DialTLS that accepts a string in the AMQP URI format and returns a new Connection over TCP using PlainAuth.

func (*Connection) Close

func (conn *Connection) Close()

Close sends a done signal to all workers

func (*Connection) LastError

func (conn *Connection) LastError() error

LastError returns last connection error (if any)

func (*Connection) Listen

func (conn *Connection) Listen(worker ConnectionWorker) (err error)

type ConnectionWorker

type ConnectionWorker interface {
	Do(*amqp.Connection, <-chan struct{})
}

ConnectionWorker is enabling configuring channels and queues to implement client code depending on amqp.Connection to the broker. The Do function is usually invoked in a goroutine and the channel is closed when either a connection to the broker is lost or the amqpirq.Connection is closed.

func NewConnectionWorker

func NewConnectionWorker(worker ChannelWorker) ConnectionWorker

NewConnectionWorker returns new ConnectionWorker with .

func NewParallelConnectionWorker

func NewParallelConnectionWorker(queue func(ch *amqp.Channel) (amqp.Queue, error), size int, consumer DeliveryConsumer) (ConnectionWorker, error)

NewParallelConnectionWorker returns new ConnectionWorker with a fixed pool size for the queue configured using lambda queue. Inbound messages are processed using DeliveryConsumer consumer.

func NewParallelConnectionWorkerName

func NewParallelConnectionWorkerName(key string, size int, consumer DeliveryConsumer) (ConnectionWorker, error)

NewParallelConnectionWorker returns new ConnectionWorker with a fixed pool size for the queue key configured using NamedReplyQueue. Inbound messages are processed using DeliveryConsumer consumer.

type DeliveryConsumer

type DeliveryConsumer interface {
	Consume(*amqp.Channel, *amqp.Delivery)
}

DeliveryConsumer is an interface for handling amqp.Delivery messages consumed from amqp.Channel on a queue

type FixedChannelWorker

type FixedChannelWorker struct {
	// contains filtered or unexported fields
}

FixedChannelWorker is a fixed prefetch parallel processing worker

func (FixedChannelWorker) Do

func (worker FixedChannelWorker) Do(ch *amqp.Channel, done <-chan struct{})

type ParallelConnectionWorker

type ParallelConnectionWorker struct {
	// contains filtered or unexported fields
}

ParallelConnectionWorker is a parallel and asynchronous implementation of ConnectionWorker

func (ParallelConnectionWorker) Do

func (worker ParallelConnectionWorker) Do(conn *amqp.Connection, done <-chan struct{})

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL