harego

package module
v2.2.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 8, 2023 License: Apache-2.0 Imports: 13 Imported by: 0

README

Harego

High-level library on top of amqp.

Build Status

Harego

  1. Description
  2. Usage
  3. Development

Description

A harego.Consumer/harego.Publisher is a concurrent safe queue manager for RabbitMQ, and a high-level implementation on top of amqp library. A Consumer/Publisher creates one or more workers for publishing/consuming messages. The default values are chosen to make the Consumer/Publisher a durable queue working with the default exchange and topic kind. Consumer/Publisher can be configure by passing provided ConfigFunc functions to NewConsumer/NewPublisher constructors.

The Consume() method will call the provided HandlerFunc with the next available message on the next available worker. The return value of the HandlerFunc decided what would happen to the message. The Consume worker will delay before act on the ack for the amount of time the HandlerFunc returns as the second value.

You can increase the worker sizes by passing Workers(n) to the NewClient constructor.

When the Close() method is called, all connections will be closed and the Client will be useless. You can create a new object for more works.

Note

This library is in beta phase and the API might change until we reach a stable release.

Usage

Consumer

The only requirement for the NewConsumer function is a Connector to connect to the broker when needed:

// to use an address:
harego.NewConsumer(harego.URLConnector(address))
// to use an amqp connection:
harego.NewConsumer(harego.AMQPConnector(conn))

The connector is used when the connection is lost, so the Client can initiate a new connection.

In this setup the myqueue is bound to the myexchange exchange, and handler is called for each message that are read from this queue:

consumer, err := harego.NewConsumer(harego.URLConnector(address),
	harego.ExchangeName("myexchange"),
	harego.QueueName("myqueue"),
)
// handle the error.
err = consumer.Consume(ctx, func(msg *amqp.Delivery) (harego.AckType, time.Duration) {
	return harego.AckTypeAck, 0
})
// handle the error.

You can create multiple workers in the above example for concurrently handle multiple messages:

consumer, err := harego.NewConsumer(harego.URLConnector(address),
	harego.ExchangeName("myexchange"),
	harego.QueueName("myqueue"),
	harego.Workers(20),
)
// handle the error.
err = consumer.Consume(ctx, func(msg *amqp.Delivery) (harego.AckType, time.Duration) {
	return harego.AckTypeAck, 0
})
// handle the error.

The handler will receive 20 messages concurrently and the Ack is sent for each message separately.

Publisher

In this setup the message is sent to the myexchange exchange:

publisher, err := harego.NewPublisher(harego.URLConnector(address),
	harego.ExchangeName("myexchange"),
)
// handle the error.
err = publisher.Publish(&amqp.Publishing{
	Body: []byte(msg),
})
// handle the error.
Delays

If the returned duration is 0, the acknowledgement is sent to the broker immediately. Otherwise Consume function sleeps for that duration before it's been sent. Please note that the delay will cause the current handler to sleep for this duration, therefore you need enough workers to be able to handle next available messages.

Requeueing

If you return a harego.AckTypeRequeue from the handler, the message is sent back to the same queue. This means this message will be consumed after all messages in the queue is consumed.

Development

Prerequisite

This project supports Go >= 1.20. To run targets from the Makefile you need to install GNU make. You also need docker installed for integration tests.

If you have asdf installed, the .tool-versions file will set the go version to the latest supported version.

In order to install dependencies:

make dependencies

This also installs reflex to help with development process.

To run this application you need to provide the following settings as environment variables or application arguments:

RABBITMQ_PORT
RABBITMQ_ADDR
RABBITMQ_ADMIN_PORT
RABBITMQ_USER
RABBITMQ_PASSWORD
RABBITMQ_VH
Running Tests

To watch for file changes and run unittest:

make unittest
# or to run them with race flag:
make unittest_race

There is also a integration_test target for running integration tests.

Make Examples
make unittest
make unittest run=TestMyTest # runs a specific test with regexp
make unittest dir=./db/...   # runs tests in a package
make unittest dir=./db/... run=TestSomethingElse
make unittest flags="-race -count=2"

Please see the Makefile for more targets.

Mocks

To generate mocks run:

make mocks

RabbitMQ

For convenience you can trigger the integration_deps target to setup required RabbitMQ instance:

make integration_deps

Documentation

Overview

Package harego contains the logic for communicating with RabbitMQ.

Publisher

A Publisher wraps an exchange in a concurrent safe manner for publishing messages to RabbitMQ. Zero value is not usable therefore a Publisher should be constructed with NewPublisher() function.

The only requirement for a Publisher to operate is a connector that returns a connection to the broker. There is also a helper function that can create a new connection from the address of the broker. The Publisher will create 1 worker by default for publishing messages. The default exchange of the Publisher is "default" and it is set with the "topic" type. The default delivery method is persistent. You can use provided ConfigFunc functions to change the Publisher's behaviour.

You should call the Close() method when you are done with this object, otherwise it will leak goroutines.

NewPublisher

NewPublisher returns a Publisher instance. You can configure the object by providing ConfigFunc functions. See ConfigFunc documentation for more information.

pub, err := harego.NewPublisher(harego.URLConnector("amqp://"),
	harego.Workers(6),
)
// handle error

Publish

If the publisher is setup with multiple workers, each Publish call will go through the next available worker.

Consumer

A Consumer wraps a queue in a concurrent safe manner for receiving messages from RabbitMQ. If the Consumer doesn't have a queue name set, it will not create a queue. Zero value is not usable, therefore a Consumer should be constructed with NewConsumer() function.

The only requirement for a Consumer to operate is a connector that returns a connection to the broker. There is also a helper function that can create a new connection from the address of the broker. The Consumer will create 1 worker by default for consuming messages. The default delivery method is persistent. You can use provided ConfigFunc functions to change the Consumer's behaviour.

Make sure to provide a queue name. The Consumer binds the queue to the given exchange.

You should call the Close() method when you are done with this object, otherwise it will leak goroutines.

NewConsumer

NewConsumer returns an Consumer instance. You can configure the object by providing ConfigFunc functions. See ConfigFunc documentation for more information.

cons, err := harego.NewConsumer(harego.URLConnector("amqp://"),
	harego.Workers(6),
)
// handle error

Consume

Consume calls the handler with the next available message from the next available worker. It stops handling messages when the context is done or the consumer is closed. Consume internally creates a Publisher for requeueing messages. By default messages are consumed with false autoAck.

Make sure you have enough workers so the Consume is not clogged on delays.

Index

Examples

Constants

View Source
const (
	// DeliveryModeTransient means higher throughput but messages will not be
	// restored on broker restart. The delivery mode of publishings is
	// unrelated to the durability of the queues they reside on. Transient
	// messages will not be restored to durable queues.
	DeliveryModeTransient = DeliveryMode(amqp.Transient)

	// DeliveryModePersistent messages will be restored to durable queues and
	// lost on non-durable queues during server restart.
	DeliveryModePersistent = DeliveryMode(amqp.Persistent)
)

Variables

View Source
var (
	// ErrInput is returned when an input is invalid.
	ErrInput = errors.New("invalid input")

	// ErrNilHnadler is returned when the handler is nil.
	ErrNilHnadler = errors.New("handler can not be nil")

	// ErrClosed is returned when the Client is closed and is being reused.
	ErrClosed = errors.New("exchange is already closed")

	// ErrAlreadyConfigured is returned when an already configured client is
	// about to receive new configuration.
	ErrAlreadyConfigured = errors.New("client is already configured")
)

Functions

func AutoAck

func AutoAck(c *config)

AutoAck sets the consuming ack behaviour. The default is false.

func AutoDelete

func AutoDelete(c *config)

AutoDelete marks the exchange and queues with autoDelete property which causes the messages to be automatically removed from the queue when consumed.

func ExclusiveQueue

func ExclusiveQueue(c *config)

ExclusiveQueue marks the queue as exclusive. Exclusive queues are only accessible by the connection that declares them and will be deleted when the connection closes. Channels on other connections will receive an error when attempting to declare, bind, consume, purge or delete a queue with the same name.

func Internal

func Internal(c *config)

Internal sets the exchange to be internal.

func NoWait

func NoWait(c *config)

NoWait marks the exchange as noWait. When noWait is true, declare without waiting for a confirmation from the server. The channel may be closed as a result of an error.

func NotDurable

func NotDurable(c *config)

NotDurable marks the exchange and the queue not to be durable. Default is durable.

Types

type AckType

type AckType int

AckType specifies how the message is acknowledged to RabbotMQ.

const (
	// AckTypeAck causes the message to be removed in broker. The multiple value
	// is false, causing the broker to act on one message.
	AckTypeAck AckType = iota

	// AckTypeNack causes the message to be requeued in broker. The multiple
	// value is false, causing the broker to act on one message.
	AckTypeNack

	// AckTypeReject causes the message to be dropped in broker. AckTypeNack
	// must not be used to select or requeue messages the client wishes not to
	// handle, rather it is to inform the server that the client is incapable
	// of handling this message at this time.
	AckTypeReject

	// AckTypeRequeue causes the message to be requeued back to the end of the
	// queue.
	AckTypeRequeue
)

func (AckType) IsValid

func (a AckType) IsValid() bool

IsValid returns true if the object is within the valid boundries.

func (AckType) String

func (i AckType) String() string

type Channel

type Channel interface {
	// ExchangeDeclare declares an exchange on the server. If the exchange does
	// not already exist, the server will create it. If the exchange exists,
	// the server verifies that it is of the provided type, durability and
	// auto-delete flags.
	ExchangeDeclare(name, kind string, durable, autoDelete, internal, noWait bool, args amqp.Table) error

	// Publish sends a Publishing from the client to an exchange on the server.
	Publish(exchange, key string, mandatory, immediate bool, msg amqp.Publishing) error

	// QueueDeclare declares a queue to hold messages and deliver to consumers.
	// Declaring creates a queue if it doesn't already exist, or ensures that
	// an existing queue matches the same parameters.
	QueueDeclare(name string, durable, autoDelete, exclusive, noWait bool, args amqp.Table) (amqp.Queue, error)

	// QueueBind binds an exchange to a queue so that publishings to the
	// exchange will be routed to the queue when the publishing routing key
	// matches the binding routing key.
	QueueBind(name, key, exchange string, noWait bool, args amqp.Table) error

	// Consume immediately starts delivering queued messages.
	Consume(queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args amqp.Table) (<-chan amqp.Delivery, error)

	// Qos controls how many messages or how many bytes the server will try to
	// keep on the network for consumers before receiving delivery acks. The
	// intent of Qos is to make sure the network buffers stay full between the
	// server and client.
	Qos(prefetchCount, prefetchSize int, global bool) error

	// Close initiate a clean channel closure by sending a close message with
	// the error code set to '200'.
	Close() error

	// NotifyClose registers a listener for when the server sends a channel or
	// connection exception in the form of a Connection.Close or Channel.Close
	// method. Connection exceptions will be broadcast to all open channels and
	// all channels will be closed, where channel exceptions will only be
	// broadcast to listeners to this channel.
	NotifyClose(receiver chan *amqp.Error) chan *amqp.Error
}

A Channel can operate queues. This is a subset of the amqp.Channel api.

type ConfigFunc

type ConfigFunc func(*config)

ConfigFunc is a function for setting up the Client. You should not use this type outside of the NewConsumer or NewPublisher function calls.

func Buffer

func Buffer(n int) ConfigFunc

Buffer sets the amount of messages each worker can keep in their channels.

func ConsumerName

func ConsumerName(name string) ConfigFunc

ConsumerName sets the consumer name of the consuming queue.

func Context

func Context(ctx context.Context) ConfigFunc

Context sets a context on the object that would stop it when the context is cancelled. The default context has no condition for cancellation.

func DeprecatedLogger added in v2.1.0

func DeprecatedLogger(l logger) ConfigFunc

DeprecatedLogger lets the user to provide their own logger. The default logger is a noop struct. Deprecated: please use the new Logger function.

func ExchangeName

func ExchangeName(name string) ConfigFunc

ExchangeName sets the exchange name. For each worker, and additional string will be appended for the worker number.

func Logger

func Logger(l logr.Logger) ConfigFunc

Logger lets the user to provide their own logger. The default logger is a noop struct.

func PrefetchCount

func PrefetchCount(i int) ConfigFunc

PrefetchCount sets how many items should be prefetched for consumption. With a prefetch count greater than zero, the server will deliver that many messages to consumers before acknowledgments are received. The server ignores this option when consumers are started with noAck because no acknowledgments are expected or sent.

func PrefetchSize

func PrefetchSize(i int) ConfigFunc

PrefetchSize sets the prefetch size of the Qos. If it is greater than zero, the server will try to keep at least that many bytes of deliveries flushed to the network before receiving acknowledgments from the consumers.

func QueueArgs

func QueueArgs(args amqp.Table) ConfigFunc

QueueArgs sets the args possed to the QueueDeclare method.

func QueueName

func QueueName(name string) ConfigFunc

QueueName sets the queue name.

func RetryDelay

func RetryDelay(d time.Duration) ConfigFunc

RetryDelay sets the time delay for attempting to reconnect. The default value is 100ms.

func RoutingKey

func RoutingKey(key string) ConfigFunc

RoutingKey sets the routing key of the queue.

func WithDeliveryMode

func WithDeliveryMode(mode DeliveryMode) ConfigFunc

WithDeliveryMode sets the default delivery mode of messages.

func WithExchangeType

func WithExchangeType(t ExchangeType) ConfigFunc

WithExchangeType sets the exchange type. The default is ExchangeTypeTopic.

func WithLogrus added in v2.1.0

func WithLogrus(l logrus.FieldLogger) ConfigFunc

WithLogrus is a helper that sets an already setup logrus instance as the logger.

func WithPanicHandler added in v2.1.0

func WithPanicHandler(h PanicHandler) ConfigFunc

WithPanicHandler sets a callback for handling panics during consuming messages. The default handler will log the panic with a traceback and returns a AckTypeRequeue with 1 sec delay. You should not panic during this handler!

func Workers

func Workers(n int) ConfigFunc

Workers sets the worker count for consuming messages.

type Connector

type Connector func() (RabbitMQ, error)

A Connector should return a live connection. It will be called during the Client initialisation and during reconnection process.

func AMQPConnector

func AMQPConnector(r *amqp.Connection) Connector

AMQPConnector uses r everytime the Client needs a new connection. You should make sure r keep being alive.

func URLConnector

func URLConnector(url string) Connector

URLConnector creates a new connection from url.

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

Consumer is a concurrent safe construct for consuming messages from queues. It creates multiple workers for safe communication. Zero value is not usable, therefore you should construct a usable Consumer by calling the NewConsumer constructor. nolint:govet // most likely not an issue, but cleaner this way.

func NewConsumer

func NewConsumer(connector Connector, conf ...ConfigFunc) (*Consumer, error)

NewConsumer returns a Consumer capable of publishing and consuming messages. The default Consumer config uses the "default" exchange of the "topic" type, both exchange and queues will be marked as "durable", messages will be persistent, and the consumer gets a random name. The connector value should provide a live connection. The connector value is used during reconnection process.

func (*Consumer) Close

func (c *Consumer) Close() error

Close closes the channel and the connection. A closed Consumer is not usable. nolint:dupl // They are quite different.

func (*Consumer) Consume

func (c *Consumer) Consume(ctx context.Context, handler HandlerFunc) error

Consume is a bloking call that passes each message to the handler and stops handling messages when the context is done. If the handler returns false, the message is returned back to the queue. If the context is cancelled, the Consumer remains operational but no messages will be deliverd to this handler. Consume returns an error if you don't specify a queue name.

type DeliveryMode

type DeliveryMode uint8

DeliveryMode is the DeliveryMode of a amqp.Publishing message.

func (DeliveryMode) IsValid

func (d DeliveryMode) IsValid() bool

IsValid returns true if the object is within the valid boundries.

func (DeliveryMode) String

func (i DeliveryMode) String() string

type ExchangeType

type ExchangeType int

ExchangeType is the kind of exchange.

const (
	// ExchangeTypeDirect defines a direct exchange.
	ExchangeTypeDirect ExchangeType = iota

	// ExchangeTypeFanout defines a fanout exchange.
	ExchangeTypeFanout

	// ExchangeTypeTopic defines a topic exchange.
	ExchangeTypeTopic

	// ExchangeTypeHeaders defines a headers exchange.
	ExchangeTypeHeaders
)

func (ExchangeType) IsValid

func (e ExchangeType) IsValid() bool

IsValid returns true if the object is within the valid boundries.

func (ExchangeType) String

func (e ExchangeType) String() string

type HandlerFunc

type HandlerFunc func(msg *amqp.Delivery) (a AckType, delay time.Duration)

A HandlerFunc receives a message when it is available. The returned AckType dictates how to deal with the message. The delay can be 0 or any duration. The consumer will sleep this amount before sending Ack. If the user needs to requeue the message, they may mutate the message if required. Mutate the msg at your own peril.

Example (Ack)

ExampleHandlerFunc_ack instructs the consumer to drop the message immediately.

var fn harego.HandlerFunc = func(msg *amqp.Delivery) (harego.AckType, time.Duration) {
	// logic for working with msg.Body goes here.
	return harego.AckTypeAck, 0
}
got, delay := fn(&amqp.Delivery{})
fmt.Printf("Got %s and will delay for %s", got, delay)
Output:

Got AckTypeAck and will delay for 0s
Example (Reject)

ExampleHandlerFunc_reject instructs the consumer to reject the message after 100ms. This will cause the consumer to sleep, therefore you need to make sure there are enough workers to respond to other tasks.

var fn harego.HandlerFunc = func(*amqp.Delivery) (harego.AckType, time.Duration) {
	// logic for working with msg.Body goes here.
	return harego.AckTypeReject, 100 * time.Millisecond
}
got, delay := fn(&amqp.Delivery{})
fmt.Printf("Got %s and will delay for %s", got, delay)
Output:

Got AckTypeReject and will delay for 100ms
Example (Requeue)

ExampleHandlerFunc_requeue instructs the consumer to put the messaage at the end of the queue after 1 second.

var fn harego.HandlerFunc = func(*amqp.Delivery) (harego.AckType, time.Duration) {
	// logic for working with msg.Body goes here.
	return harego.AckTypeRequeue, time.Second
}
got, delay := fn(&amqp.Delivery{})
fmt.Printf("Got %s and will delay for %s", got, delay)
Output:

Got AckTypeRequeue and will delay for 1s

type PanicHandler added in v2.1.0

type PanicHandler func(msg *amqp.Delivery, r any) (a AckType, delay time.Duration)

PanicHandler is used when the consumer encounters a panic during the handling of a message. The message is passed as is and the result of the recover() call is passed as the second argument. You must not panic in this handler!

type Publisher

type Publisher struct {
	// contains filtered or unexported fields
}

Publisher is a concurrent safe construct for publishing a message to exchanges, and consuming messages from queues. It creates multiple workers for safe communication. Zero value is not usable. nolint:govet // most likely not an issue, but cleaner this way.

func NewPublisher

func NewPublisher(connector Connector, conf ...ConfigFunc) (*Publisher, error)

NewPublisher returns a Client capable of publishing and consuming messages. The default Client config uses the "default" exchange of the "topic" type, both exchange and queues will be marked as "durable", messages will be persistent, and the consumer gets a random name. The connector value should provide a live connection. The connector value is used during reconnection process.

func (*Publisher) Close

func (p *Publisher) Close() error

Close closes the channel and the connection. A closed client is not usable.

func (*Publisher) Publish

func (p *Publisher) Publish(msg *amqp.Publishing) error

Publish sends the msg to the broker via the next available workers.

type RabbitMQ

type RabbitMQ interface {
	Channel() (Channel, error)
	Close() error
}

RabbitMQ defines a rabbitmq exchange.

Directories

Path Synopsis
Package internal provides some internal functionalities for the library.
Package internal provides some internal functionalities for the library.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL