rabbids

package module
v0.0.0-...-6e64f46 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 30, 2020 License: MIT Imports: 23 Imported by: 0

README

Rabbids

A library to create AMQP consumers and producers nicely.

Software License Go Coverage Status Go Doc Go Report Card

  • A wrapper over amqp to make possible declare all the blocks (exchanges, queues, dead-letters, bindings) from a YAML or struct.
  • Handle connection problems
    • reconnect when a connection is lost or closed.
    • retry with exponential backoff for sending messages
  • Go channel API for the producer (we are fans of github.com/rafaeljesus/rabbus API).
  • Support for multiple connections.
  • Delayed messages - send messages to arrive in the queue only after the time duration is passed.
  • The consumer uses a handler approach, so it's possible to add middlewares wrapping the handler

Installation

go get -u github.com/leveeml/rabbids

Usage

We create some examples inside the _example directory. To run the examples first you need a rabbitMQ server running. If you didn't have a server already running you can run one with docker:

docker run -d -p 15672:15672 -p 5672:5672 rabbitmq:3-management

The examples expect an ENV var RABBITMQ_ADDRESS with the amqp address.

In one console tab run the consumer:

cd _examples
export RABBITMQ_ADDRESS=amqp://0.0.0.0:5672
go run consumer/main.go

In another tab run the producer:

cd _examples
export RABBITMQ_ADDRESS=amqp://0.0.0.0:5672
go run producer/main.go

Or send some delayed messages:

cd _examples
export RABBITMQ_ADDRESS=amqp://0.0.0.0:5672
go run delay-message/main.go

Delayed Messages

The delayed message implementation is based on the implementation created by the NServiceBus project. For more information go to the docs here.

MessageHandler

MessageHandler is an interface expected by a consumer to process the messages from rabbitMQ. See the godocs for more details. If you don't need the close something you can use the rabbids.MessageHandlerFunc to pass a function as a MessageHandler.

Concurency

Every consumer runs on a separated goroutine and by default process every message (call the MessageHandler) synchronously but it's possible to change that and process the messages with a pool of goroutines. To make this you need to set the worker attribute inside the ConsumerConfig with the number of concurrent workers you need. example.

Documentation

Index

Constants

View Source
const (
	Version        = "0.0.1"
	DefaultTimeout = 2 * time.Second
	DefaultSleep   = 500 * time.Millisecond
	DefaultRetries = 5
)
View Source
const (
	MaxDelay              time.Duration = ((1 << maxNumberOfBitsToUse) - 1) * time.Second
	DelayDeliveryExchange string        = "rabbids.delay-delivery"
)

Variables

This section is empty.

Functions

func NoOPLoggerFN

func NoOPLoggerFN(message string, fields Fields)

func StartSupervisor

func StartSupervisor(rabbids *Rabbids, intervalChecks time.Duration) (stop func(), err error)

StartSupervisor init a new supervisor that will start all the consumers from Rabbids and check if the consumers are alive, if not alive it will be restarted. It returns the stop function to gracefully shutdown the consumers and an error if fail to create the consumers the first time.

Types

type Binding

type Binding struct {
	Exchange    string   `mapstructure:"exchange"`
	RoutingKeys []string `mapstructure:"routing_keys"`
	Options     Options  `mapstructure:"options"`
}

Binding describe how a queue connects to a exchange.

type Config

type Config struct {
	// Connections describe the connections used by consumers.
	Connections map[string]Connection `mapstructure:"connections"`
	// Exchanges have all the exchanges used by consumers.
	// This exchanges are declared on startup of the rabbids client.
	Exchanges map[string]ExchangeConfig `mapstructure:"exchanges"`
	// DeadLetters have all the deadletters queues used internally by other queues
	// This will be declared at startup of the rabbids client.
	DeadLetters map[string]DeadLetter `mapstructure:"dead_letters"`
	// Consumers describes configuration list for consumers.
	Consumers map[string]ConsumerConfig `mapstructure:"consumers"`
	// Registered Message handlers used by consumers
	Handlers map[string]MessageHandler
}

Config describes all available options to declare all the components used by rabbids Consumers and Producers.

func ConfigFromFile

func ConfigFromFile(file File) (*Config, error)

ConfigFromFilename read a YAML file and convert it into a Config struct with all the configuration to build the Consumers and producers. Also, it Is possible to use environment variables values inside the YAML file. The syntax is like the syntax used inside the docker-compose file. To use a required variable just use like this: ${ENV_NAME} and to put an default value you can use: ${ENV_NAME:=some-value} inside any value. If a required variable didn't exist, an error will be returned.

func ConfigFromFilename

func ConfigFromFilename(filename string) (*Config, error)

ConfigFromFilename is a wrapper to open the file and pass to ConfigFromFile.

func (*Config) RegisterHandler

func (c *Config) RegisterHandler(consumerName string, h MessageHandler)

RegisterHandler is used to set the MessageHandler used by one Consumer. The consumerName MUST be equal as the name used by the Consumer (the key inside the map of consumers).

type Connection

type Connection struct {
	DSN     string        `mapstructure:"dsn"`
	Timeout time.Duration `mapstructure:"timeout"`
	Sleep   time.Duration `mapstructure:"sleep"`
	Retries int           `mapstructure:"retries"`
}

Connection describe a config for one connection.

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

Consumer is a high level rabbitMQ consumer.

func (*Consumer) Alive

func (c *Consumer) Alive() bool

Alive returns true if the tomb is not in a dying or dead state.

func (*Consumer) Kill

func (c *Consumer) Kill()

Kill will try to stop the internal work.

func (*Consumer) Name

func (c *Consumer) Name() string

Name return the consumer name.

func (*Consumer) Run

func (c *Consumer) Run()

Run start a goroutine to consume messages from a queue and pass to one runner.

type ConsumerConfig

type ConsumerConfig struct {
	Connection    string      `mapstructure:"connection"`
	Workers       int         `mapstructure:"workers"`
	PrefetchCount int         `mapstructure:"prefetch_count"`
	DeadLetter    string      `mapstructure:"dead_letter"`
	Queue         QueueConfig `mapstructure:"queue"`
	Options       Options     `mapstructure:"options"`
}

ConsumerConfig describes consumer's configuration.

type DeadLetter

type DeadLetter struct {
	Queue QueueConfig `mapstructure:"queue"`
}

DeadLetter describe all the dead letters queues to be declared before declare other queues.

type ExchangeConfig

type ExchangeConfig struct {
	Type    string  `mapstructure:"type"`
	Options Options `mapstructure:"options"`
}

ExchangeConfig describes exchange's configuration.

type Fields

type Fields map[string]interface{}

type File

type File interface {
	io.Reader
	Stat() (os.FileInfo, error)
}

File represents the file operations needed to works with our config loader.

type LoggerFN

type LoggerFN func(message string, fields Fields)

type Message

type Message struct {
	amqp.Delivery
}

Message is an ampq.Delivery with some helper methods used by our systems.

type MessageHandler

type MessageHandler interface {
	// Handle a single message, this method MUST be safe for concurrent use
	Handle(m Message)
	// Close the handler, this method is called when the consumer is closing
	Close()
}

MessageHandler is the base interface used to consumer AMPQ messages.

type MessageHandlerFunc

type MessageHandlerFunc func(m Message)

MessageHandlerFunc implements the MessageHandler interface.

func (MessageHandlerFunc) Close

func (h MessageHandlerFunc) Close()

func (MessageHandlerFunc) Handle

func (h MessageHandlerFunc) Handle(m Message)

type Options

type Options struct {
	Durable    bool       `mapstructure:"durable"`
	Internal   bool       `mapstructure:"internal"`
	AutoDelete bool       `mapstructure:"auto_delete"`
	Exclusive  bool       `mapstructure:"exclusive"`
	NoWait     bool       `mapstructure:"no_wait"`
	NoLocal    bool       `mapstructure:"no_local"`
	AutoAck    bool       `mapstructure:"auto_ack"`
	Args       amqp.Table `mapstructure:"args"`
}

Options describes optionals configuration for consumer, queue, bindings and exchanges declaration.

type Producer

type Producer struct {
	// contains filtered or unexported fields
}

Producer is an high level rabbitMQ producer instance.

func NewProducer

func NewProducer(dsn string, opts ...ProducerOption) (*Producer, error)

NewProcucer create a new high level rabbitMQ producer instance

dsn is a string in the AMQP URI format the ProducerOptions can be:

rabbids.WithLogger     - to set a logger instance
rabbids.WithFactory    - to use one instance of a factory.
                         when added the factory is used to declare the topics
                         in the first time the topic is used.
rabbids.WithSerializer - used to set a specific serializer
                         the default is the a JSON serializer.

func (*Producer) Close

func (p *Producer) Close() error

Close will close all the underline channels and close the connection with rabbitMQ. Any Emit call after calling the Close method will panic.

func (*Producer) Emit

func (p *Producer) Emit() chan<- Publishing

Emit emits a message to rabbitMQ but does not wait for the response from the broker. Errors with the Publishing (encoding, validation) or with the broker will be sent to the EmitErr channel. It's your responsibility to handle these errors somehow.

func (*Producer) EmitErr

func (p *Producer) EmitErr() <-chan PublishingError

EmitErr returns a channel used to receive all the errors from Emit channel. The error handle is not required but and the send inside this channel is buffered. WARNING: If the channel gets full, new errors will be dropped to avoid stop the producer internal loop.

func (*Producer) GetAMQPChannel

func (p *Producer) GetAMQPChannel() *amqp.Channel

GetAMQPChannel returns the current connection channel.

func (*Producer) GetAMQPConnection

func (p *Producer) GetAMQPConnection() *amqp.Connection

GetAGetAMQPConnection returns the current amqp connetion.

func (*Producer) Send

func (p *Producer) Send(m Publishing) error

Send a message to rabbitMQ. In case of connection errors, the send will block and retry until the reconnection is done. It returns an error if the Serializer returned an error OR the connection error persisted after the retries.

type ProducerOption

type ProducerOption func(*Producer) error

ProducerOption represents an option function to add some functionality or change the producer state on creation time.

func WithCustomName

func WithCustomName(name string) ProducerOption

func WithLogger

func WithLogger(log LoggerFN) ProducerOption

WithLogger will override the default logger (no Operation Log).

func WithSerializer

func WithSerializer(s Serializer) ProducerOption

type Publishing

type Publishing struct {
	// Exchange name
	Exchange string
	// The routing key
	Key string
	// Data to be encoded inside the message
	Data interface{}
	// Delay is the duration to wait until the message is delivered to the queue.
	// The max delay period is 268,435,455 seconds, or about 8.5 years.
	Delay time.Duration

	amqp.Publishing
	// contains filtered or unexported fields
}

Publishing have the fields for sending a message to rabbitMQ.

func NewDelayedPublishing

func NewDelayedPublishing(queue string, delay time.Duration, data interface{}, options ...PublishingOption) Publishing

SendWithDelay send a message to arrive the queue only after the time is passed. The minimum delay is one second, if the delay is less than the minimum, the minimum will be used. The max delay period is 268,435,455 seconds, or about 8.5 years.

func NewPublishing

func NewPublishing(exchange, key string, data interface{}, options ...PublishingOption) Publishing

NewPublishing create a message to be sent by some consumer.

type PublishingError

type PublishingError struct {
	Publishing
	Err error
}

PublishingError is returned by the async error reporting. When an async publishing message is sent and an error happens the Publishing and the error will be sent to the EmitErr channel. To get this channel, call the EmitErr method inside the producer.

type PublishingOption

type PublishingOption func(*Publishing)

PublishingOption represents an option you can pass to setup some data inside the Publishing.

func WithPriority

func WithPriority(v int) PublishingOption

WithPriority change the priority of the Publishing message.

type QueueConfig

type QueueConfig struct {
	Name     string    `mapstructure:"name"`
	Bindings []Binding `mapstructure:"bindings"`
	Options  Options   `mapstructure:"options"`
}

QueueConfig describes queue's configuration.

type Rabbids

type Rabbids struct {
	// contains filtered or unexported fields
}

Rabbids is the main block used to create and run rabbitMQ consumers and producers.

func New

func New(config *Config, log LoggerFN) (*Rabbids, error)

func (*Rabbids) CreateConsumer

func (r *Rabbids) CreateConsumer(name string) (*Consumer, error)

CreateConsumer create a new consumer for a specific name using the config provided.

func (*Rabbids) CreateConsumers

func (r *Rabbids) CreateConsumers() ([]*Consumer, error)

CreateConsumers will iterate over config and create all the consumers.

func (*Rabbids) CreateProducer

func (r *Rabbids) CreateProducer(connectionName string, customOpts ...ProducerOption) (*Producer, error)

CreateConsumer create a new consumer using the connection inside the config.

type Serializer

type Serializer interface {
	Marshal(interface{}) ([]byte, error)
	// Name return the name used on the content type of the messsage
	Name() string
}

Serializer is the base interface for all message serializers.

Directories

Path Synopsis
_examples

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL