rabbitroutine

package module
v0.8.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 3, 2024 License: MIT Imports: 8 Imported by: 9

README

PkgGoDev Build Status Go Report Card

Rabbitmq Failover Routine

Lightweight library that handles RabbitMQ auto-reconnect and publishing retry routine for you. The library is designed to save the developer from the headache when working with RabbitMQ.

rabbitroutine solves your RabbitMQ reconnection problems:

Usage

go get github.com/furdarius/rabbitroutine
Consuming

You need to implement Consumer and register it with StartConsumer or with StartMultipleConsumers. When connection is established (at first time or after reconnect) Declare method is called. It can be used to declare required RabbitMQ entities (consumer example).

Usage example:


// Consumer declares your own RabbitMQ consumer implementing rabbitroutine.Consumer interface.
type Consumer struct {}
func (c *Consumer) Declare(ctx context.Context, ch *amqp.Channel) error {}
func (c *Consumer) Consume(ctx context.Context, ch *amqp.Channel) error {}

url := "amqp://guest:guest@127.0.0.1:5672/"

conn := rabbitroutine.NewConnector(rabbitroutine.Config{
    // How long to wait between reconnect
    Wait: 2 * time.Second,
})

ctx := context.Background()

go func() {
    err := conn.Dial(ctx, url)
    if err != nil {
    	log.Println(err)
    }
}()

consumer := &Consumer{}
go func() {
    err := conn.StartConsumer(ctx, consumer)
    if err != nil {
        log.Println(err)
    }
}()

Full example demonstrates messages consuming

Publishing

For publishing FireForgetPublisher and EnsurePublisher implemented. Both of them can be wrapped with RetryPublisher to repeat publishing on errors and mitigate short-term network problems.

Usage example:

ctx := context.Background()

url := "amqp://guest:guest@127.0.0.1:5672/"

conn := rabbitroutine.NewConnector(rabbitroutine.Config{
    // How long wait between reconnect
    Wait: 2 * time.Second,
})

pool := rabbitroutine.NewPool(conn)
ensurePub := rabbitroutine.NewEnsurePublisher(pool)
pub := rabbitroutine.NewRetryPublisher(
    ensurePub,
    rabbitroutine.PublishMaxAttemptsSetup(16),
    rabbitroutine.PublishDelaySetup(rabbitroutine.LinearDelay(10*time.Millisecond)),
)

go conn.Dial(ctx, url)

err := pub.Publish(ctx, "myexch", "myqueue", amqp.Publishing{Body: []byte("message")})
if err != nil {
    log.Println("publish error:", err)
}

Full example demonstrates messages publishing

Contributing

Pull requests are very much welcomed. Create your pull request, make sure a test or example is included that covers your change and your commits represent coherent changes that include a reason for the change.

To run the integration tests, make sure you have RabbitMQ running on any host

docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.13-management

Then export the environment variable AMQP_URL=amqp://host/ and run go test -tags integration.

AMQP_URL=amqp://guest:guest@127.0.0.1:5672/ go test -v -race -cpu=1,2 -tags integration -timeout 5s

Use golangci-lint to check code with linters:

golangci-lint run ./...

Documentation

Index

Examples

Constants

This section is empty.

Variables

View Source
var (
	// ErrNotFound indicates that RabbitMQ entity doesn't exist.
	ErrNotFound = errors.New("rabbitmq entity not found")
	// ErrNoRoute indicates that queue is bound that matches the routing key.
	// @see: https://www.rabbitmq.com/amqp-0-9-1-errata.html#section_17
	ErrNoRoute = errors.New("queue not bound")
)

Functions

This section is empty.

Types

type AMQPNotified

type AMQPNotified struct {
	Error *amqp.Error
}

AMQPNotified is fired when AMQP error occurred.

type ChannelKeeper

type ChannelKeeper struct {
	// contains filtered or unexported fields
}

ChannelKeeper stores AMQP Channel with Confirmation and Close chans.

func (*ChannelKeeper) Channel

func (k *ChannelKeeper) Channel() *amqp.Channel

Channel returns an amqp.Channel stored in ChannelKeeper.

func (*ChannelKeeper) Close

func (k *ChannelKeeper) Close() error

Close closes RabbitMQ channel stored in ChannelKeeper.

func (*ChannelKeeper) Confirm

func (k *ChannelKeeper) Confirm() <-chan amqp.Confirmation

Confirm returns a channel that will receive amqp.Confirmation when it occurs.

func (*ChannelKeeper) Error

func (k *ChannelKeeper) Error() <-chan *amqp.Error

Error returns a channel that will receive amqp.Error when it occurs.

func (*ChannelKeeper) Return added in v0.5.0

func (k *ChannelKeeper) Return() <-chan amqp.Return

Return returns a channel that will receive amqp.Return when it occurs.

type Config

type Config struct {
	// ReconnectAttempts is a number that defines how many reconnect attempts would be made after the connection was broke off.
	// After a new connection have been established this number is reset.
	// So, when a next broke off happens there will be not less than ReconnectAttempts attempts to reconnect.
	// In case of maximum reconnect attempts exceeded* Dial or DialConfig func will just return error and that's it.
	// It's your turn to handle this situation.
	// But in generall it's better have unlimited ReconnectAttemts and log errors using Connector.AddRetriedListener (see examples dir)
	ReconnectAttempts uint
	// How long to wait between reconnect attempts.
	Wait time.Duration
}

Config stores reconnect options.

type Connector

type Connector struct {
	// contains filtered or unexported fields
}

Connector implement RabbitMQ failover.

func NewConnector

func NewConnector(cfg Config) *Connector

NewConnector return a new instance of Connector.

func (*Connector) AddAMQPNotifiedListener

func (c *Connector) AddAMQPNotifiedListener(h func(n AMQPNotified))

AddAMQPNotifiedListener registers a event listener of AMQP error receiving.

NOTE: not concurrency-safe.

func (*Connector) AddDialedListener

func (c *Connector) AddDialedListener(h func(r Dialed))

AddDialedListener registers a event listener of connection successfully established.

NOTE: not concurrency-safe.

func (*Connector) AddRetriedListener

func (c *Connector) AddRetriedListener(h func(Retried))

AddRetriedListener registers a event listener of connection establishing attempts.

NOTE: not concurrency-safe.

func (*Connector) Channel

func (c *Connector) Channel(ctx context.Context) (*amqp.Channel, error)

Channel allocate and return new amqp.Channel. On error new Channel should be opened.

NOTE: It's blocking method. (It's waiting before connection will be established)

func (*Connector) Dial added in v0.2.0

func (c *Connector) Dial(ctx context.Context, url string) error

Dial will try to keep RabbitMQ connection active by catching and handling connection errors. It will return any error only if ctx was done.

NOTE: It's blocking method.

func (*Connector) DialConfig added in v0.2.0

func (c *Connector) DialConfig(ctx context.Context, url string, config amqp.Config) error

DialConfig used to configure RabbitMQ connection with amqp.Config. It will try to keep RabbitMQ connection active by catching and handling connection errors. It will return any error only if ctx was done.

NOTE: It's blocking method.

func (*Connector) StartConsumer

func (c *Connector) StartConsumer(ctx context.Context, consumer Consumer) error

StartConsumer is used to start Consumer.

NOTE: It's blocking method.

func (*Connector) StartMultipleConsumers

func (c *Connector) StartMultipleConsumers(ctx context.Context, consumer Consumer, count int) error

StartMultipleConsumers is used to start Consumer "count" times. Method Declare will be called once, and Consume will be called "count" times (one goroutine per call) so you can scale consumer horizontally. It's blocking method.

NOTE: It's blocking method. nolint: gocyclo

type Consumer

type Consumer interface {
	// Declare used to declare required RabbitMQ entities.
	// Will be called once before Consume (even when StartMultipleConsumers called).
	// On any problems with connection or channel RabbitMQ entities will be redeclared.
	Declare(ctx context.Context, ch *amqp.Channel) error
	// Consume used to consuming RabbitMQ queue.
	// Can be called 1+ times if you register it with StartMultipleConsumers.
	Consume(ctx context.Context, ch *amqp.Channel) error
}

Consumer interface provides functionality of rabbit entity Declaring and queue consuming.

Example

This example demonstrates consuming messages from RabbitMQ queue.

package main

import (
	"context"
	"fmt"
	"log"
	"os"
	"os/signal"
	"syscall"
	"time"

	"github.com/furdarius/rabbitroutine"
	amqp "github.com/rabbitmq/amqp091-go"
)

// Consumer implement rabbitroutine.Consumer interface.
type Consumer struct {
	ExchangeName string
	QueueName    string
}

// Declare implement rabbitroutine.Consumer.(Declare) interface method.
func (c *Consumer) Declare(ctx context.Context, ch *amqp.Channel) error {
	err := ch.ExchangeDeclare(
		c.ExchangeName, // name
		"direct",       // type
		true,           // durable
		false,          // auto-deleted
		false,          // internal
		false,          // no-wait
		nil,            // arguments
	)
	if err != nil {
		log.Printf("failed to declare exchange %v: %v", c.ExchangeName, err)

		return err
	}

	_, err = ch.QueueDeclare(
		c.QueueName, // name
		true,        // durable
		false,       // delete when unused
		false,       // exclusive
		false,       // no-wait
		nil,         // arguments
	)
	if err != nil {
		log.Printf("failed to declare queue %v: %v", c.QueueName, err)

		return err
	}

	err = ch.QueueBind(
		c.QueueName,    // queue name
		c.QueueName,    // routing key
		c.ExchangeName, // exchange
		false,          // no-wait
		nil,            // arguments
	)
	if err != nil {
		log.Printf("failed to bind queue %v: %v", c.QueueName, err)

		return err
	}

	return nil
}

// Consume implement rabbitroutine.Consumer.(Consume) interface method.
func (c *Consumer) Consume(ctx context.Context, ch *amqp.Channel) error {
	defer log.Println("consume method finished")

	err := ch.Qos(
		1,     // prefetch count
		0,     // prefetch size
		false, // global
	)
	if err != nil {
		log.Printf("failed to set qos: %v", err)

		return err
	}

	msgs, err := ch.Consume(
		c.QueueName,  // queue
		"myconsumer", // consumer name
		false,        // auto-ack
		false,        // exclusive
		false,        // no-local
		false,        // no-wait
		nil,          // args
	)
	if err != nil {
		log.Printf("failed to consume %v: %v", c.QueueName, err)

		return err
	}

	for {
		select {
		case msg, ok := <-msgs:
			if !ok {
				return amqp.ErrClosed
			}

			content := string(msg.Body)

			fmt.Println("New message:", content)

			err := msg.Ack(false)
			if err != nil {
				log.Printf("failed to Ack message: %v", err)
			}
		case <-ctx.Done():
			return ctx.Err()
		}
	}
}

// This example demonstrates consuming messages from RabbitMQ queue.
func main() {
	ctx := context.Background()

	url := "amqp://guest:guest@127.0.0.1:5672/"

	conn := rabbitroutine.NewConnector(rabbitroutine.Config{
		// Max reconnect attempts
		ReconnectAttempts: 20,
		// How long wait between reconnect
		Wait: 2 * time.Second,
	})

	conn.AddRetriedListener(func(r rabbitroutine.Retried) {
		log.Printf("try to connect to RabbitMQ: attempt=%d, error=\"%v\"",
			r.ReconnectAttempt, r.Error)
	})

	conn.AddDialedListener(func(_ rabbitroutine.Dialed) {
		log.Printf("RabbitMQ connection successfully established")
	})

	conn.AddAMQPNotifiedListener(func(n rabbitroutine.AMQPNotified) {
		log.Printf("RabbitMQ error received: %v", n.Error)
	})

	consumer := &Consumer{
		ExchangeName: "myexch",
		QueueName:    "myqueue",
	}

	go func() {
		err := conn.Dial(ctx, url)
		if err != nil {
			log.Println("failed to establish RabbitMQ connection:", err)
		}
	}()

	go func() {
		err := conn.StartMultipleConsumers(ctx, consumer, 5)
		if err != nil {
			log.Println("failed to start consumer:", err)
		}
	}()

	sigc := make(chan os.Signal, 1)
	signal.Notify(sigc, syscall.SIGINT, os.Interrupt, syscall.SIGTERM)

	// Wait for OS termination signal
	<-sigc
}
Output:

type Dialed

type Dialed struct{}

Dialed is fired when connection was successfully established.

type EnsurePublisher

type EnsurePublisher struct {
	// contains filtered or unexported fields
}

EnsurePublisher implements Publisher interface and guarantees delivery of the message to the server. When EnsurePublisher used, publishing confirmation is enabled, so we have delivery guarantees. @see http://www.rabbitmq.com/blog/2011/02/10/introducing-publisher-confirms/ @see https://www.rabbitmq.com/amqp-0-9-1-errata.html#section_17

Example

This example demonstrates publishing messages in RabbitMQ exchange delivery guarantees by EnsurePublisher and publishing retries by RetryPublisher.

package main

import (
	"context"
	"fmt"
	"log"
	"time"

	"github.com/furdarius/rabbitroutine"
	amqp "github.com/rabbitmq/amqp091-go"
)

func main() {
	ctx := context.Background()

	url := "amqp://guest:guest@127.0.0.1:5672/"

	conn := rabbitroutine.NewConnector(rabbitroutine.Config{
		// Max reconnect attempts
		ReconnectAttempts: 20000,
		// How long wait between reconnect
		Wait: 2 * time.Second,
	})

	pool := rabbitroutine.NewPool(conn)
	ensurePub := rabbitroutine.NewEnsurePublisher(pool)
	pub := rabbitroutine.NewRetryPublisher(ensurePub)

	go func() {
		err := conn.Dial(ctx, url)
		if err != nil {
			log.Println("failed to establish RabbitMQ connection:", err)
		}
	}()

	for i := 0; i < 5000; i++ {
		timeoutCtx, cancel := context.WithTimeout(ctx, 100*time.Millisecond)

		err := pub.Publish(timeoutCtx, "myexch", "myqueue", amqp.Publishing{
			Body: []byte(fmt.Sprintf("message %d", i)),
		})
		if err != nil {
			log.Println("failed to publish:", err)
		}

		cancel()
	}
}
Output:

func NewEnsurePublisher

func NewEnsurePublisher(p *Pool) *EnsurePublisher

NewEnsurePublisher returns a new instance of EnsurePublisher.

func (*EnsurePublisher) Publish

func (p *EnsurePublisher) Publish(ctx context.Context, exchange, key string, msg amqp.Publishing) error

Publish sends msg to an exchange on the RabbitMQ and wait to ensure that msg have been successfully received by the server. Returns error if no queue is bound that matches the routing key. It will blocks until is either message is successfully delivered, context has cancelled or error received.

While reconnecting is in process Publishing can't be finished, because amqp.Channel can't be received. Publisher doesn't know about the state of the connection, so for publisher reconniction is the same as "request took too long to be finished". "Too long" is defined by context.Context that is passed as first argument to Publish. If context has been cancelled, Publish returns context.DeadlineExceeded error. If connection was reestablished and Publish had enough time to be finished, then request would be finished successfully.

type FireForgetPublisher added in v0.4.0

type FireForgetPublisher struct {
	// contains filtered or unexported fields
}

FireForgetPublisher implements Publisher interface and used to publish messages to RabbitMQ exchange without delivery guarantees. When FireForgetPublisher used, publishing confirmation is not enabled, so we haven't delivery guarantees. @see http://www.rabbitmq.com/blog/2011/02/10/introducing-publisher-confirms/

Example

This example demonstrates publishing messages in RabbitMQ exchange using FireForgetPublisher.

package main

import (
	"context"
	"fmt"
	"log"
	"time"

	"github.com/furdarius/rabbitroutine"
	amqp "github.com/rabbitmq/amqp091-go"
)

func main() {
	ctx := context.Background()

	url := "amqp://guest:guest@127.0.0.1:5672/"

	conn := rabbitroutine.NewConnector(rabbitroutine.Config{
		// Max reconnect attempts
		ReconnectAttempts: 20000,
		// How long wait between reconnect
		Wait: 2 * time.Second,
	})

	pool := rabbitroutine.NewLightningPool(conn)
	pub := rabbitroutine.NewFireForgetPublisher(pool)

	go func() {
		err := conn.Dial(ctx, url)
		if err != nil {
			log.Println("failed to establish RabbitMQ connection:", err)
		}
	}()

	for i := 0; i < 5000; i++ {
		timeoutCtx, cancel := context.WithTimeout(ctx, 100*time.Millisecond)

		err := pub.Publish(timeoutCtx, "myexch", "myqueue", amqp.Publishing{
			Body: []byte(fmt.Sprintf("message %d", i)),
		})
		if err != nil {
			log.Println("failed to publish:", err)
		}

		cancel()
	}
}
Output:

func NewFireForgetPublisher added in v0.4.0

func NewFireForgetPublisher(p *LightningPool) *FireForgetPublisher

NewFireForgetPublisher returns a new instance of FireForgetPublisher.

func (*FireForgetPublisher) Publish added in v0.4.0

func (p *FireForgetPublisher) Publish(ctx context.Context, exchange, key string, msg amqp.Publishing) error

Publish sends msg to an exchange on the RabbitMQ.

type LightningPool added in v0.4.0

type LightningPool struct {
	// contains filtered or unexported fields
}

LightningPool stores AMQP Channels without confirm mode, so they will be used without delivery guarantees.

func NewLightningPool added in v0.4.0

func NewLightningPool(conn *Connector) *LightningPool

NewLightningPool return a new instance of LightningPool.

func (*LightningPool) Channel added in v0.4.0

func (p *LightningPool) Channel(ctx context.Context) (*amqp.Channel, error)

Channel return AMQP Channel.

func (*LightningPool) Release added in v0.4.0

func (p *LightningPool) Release(k *amqp.Channel)

Release adds k to the pool.

func (*LightningPool) Size added in v0.4.0

func (p *LightningPool) Size() int

Size returns current pool size. note: not thread-safe operation.

type Pool

type Pool struct {
	// contains filtered or unexported fields
}

Pool is a set of AMQP Channels that may be individually saved and retrieved.

func NewPool

func NewPool(conn *Connector) *Pool

NewPool returns a new instance of Pool.

func (*Pool) ChannelWithConfirm

func (p *Pool) ChannelWithConfirm(ctx context.Context) (ChannelKeeper, error)

ChannelWithConfirm returns a ChannelKeeper with AMQP Channel into confirm mode.

func (*Pool) Release

func (p *Pool) Release(k ChannelKeeper)

Release adds k to the pool.

func (*Pool) Size added in v0.4.0

func (p *Pool) Size() int

Size returns current pool size. note: not thread-safe operation.

type Publisher

type Publisher interface {
	// Publish used to send msg to RabbitMQ exchange.
	Publish(ctx context.Context, exchange, key string, msg amqp.Publishing) error
}

Publisher interface provides functionality of publishing to RabbitMQ.

type Retried

type Retried struct {
	ReconnectAttempt uint
	Error            error
}

Retried is fired when connection retrying occurs. The event will be emitted only if the connection was not established. If connection was successfully established Dialed event emitted.

type RetryDelayFunc added in v0.5.0

type RetryDelayFunc func(attempt uint) time.Duration

RetryDelayFunc returns how long to wait before retry.

func ConstDelay added in v0.5.0

func ConstDelay(delay time.Duration) RetryDelayFunc

ConstDelay returns constant delay value.

func LinearDelay added in v0.5.0

func LinearDelay(delay time.Duration) RetryDelayFunc

LinearDelay returns delay value increases linearly depending on the current attempt.

type RetryPublisher

type RetryPublisher struct {
	Publisher
	// contains filtered or unexported fields
}

RetryPublisher retries to publish message before context done.

func NewRetryPublisher

func NewRetryPublisher(p Publisher, opts ...RetryPublisherOption) *RetryPublisher

NewRetryPublisher returns a new instance of RetryPublisherOption.

func (*RetryPublisher) Publish

func (p *RetryPublisher) Publish(ctx context.Context, exchange, key string, msg amqp.Publishing) error

Publish is used to send msg to RabbitMQ exchange. It will block until is either message is delivered or context has cancelled. Error returned only if context was done.

type RetryPublisherOption added in v0.5.0

type RetryPublisherOption func(*RetryPublisher)

RetryPublisherOption describes a functional option for configuring RetryPublisher.

func PublishDelaySetup added in v0.5.0

func PublishDelaySetup(fn RetryDelayFunc) RetryPublisherOption

PublishDelaySetup sets function for publish delay time.Duration receiving.

func PublishMaxAttemptsSetup added in v0.5.0

func PublishMaxAttemptsSetup(maxAttempts uint) RetryPublisherOption

PublishMaxAttemptsSetup sets limit of publish attempts.

Directories

Path Synopsis
examples

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL