rabbitmq

package module
v0.2.10 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 14, 2023 License: MIT Imports: 10 Imported by: 0

README

go-rabbitmq

Wrapper of rabbitmq/amqp091-go that provides reconnection logic and sane defaults. Hit the project with a star if you find it useful ⭐

Supported by Boot.dev

Deploy

Motivation

Streadway's AMQP library is currently the most robust and well-supported Go client I'm aware of. It's a fantastic option and I recommend starting there and seeing if it fulfills your needs. Their project has made an effort to stay within the scope of the AMQP protocol, as such, no reconnection logic and few ease-of-use abstractions are provided.

Goal

The goal with go-rabbitmq is to provide most (but not all) of the nitty-gritty functionality of Streadway's AMQP, but to make it easier to work with via a higher-level API. go-rabbitmq is also built specifically for Rabbit, not for the AMQP protocol. In particular, we want:

  • Automatic reconnection
  • Multithreaded consumers via a handler function
  • Reasonable defaults
  • Flow control handling
  • TCP block handling

⚙️ Installation

Inside a Go module:

go get github.com/DizoftTeam/go-rabbitmq

🚀 Quick Start Consumer

Take note of the optional options parameters after the queue name. The queue will be declared automatically, but the exchange will not. You'll also probably want to bind to at least one routing key.

conn, err := rabbitmq.NewConn(
	"amqp://guest:guest@localhost",
	rabbitmq.WithConnectionOptionsLogging,
)
if err != nil {
	log.Fatal(err)
}
defer conn.Close()

consumer, err := rabbitmq.NewConsumer(
	conn,
	func(d rabbitmq.Delivery) rabbitmq.Action {
		log.Printf("consumed: %v", string(d.Body))
		// rabbitmq.Ack, rabbitmq.NackDiscard, rabbitmq.NackRequeue
		return rabbitmq.Ack
	},
	"my_queue",
	rabbitmq.WithConsumerOptionsRoutingKey("my_routing_key"),
	rabbitmq.WithConsumerOptionsExchangeName("events"),
	rabbitmq.WithConsumerOptionsExchangeDeclare,
)
if err != nil {
	log.Fatal(err)
}
defer consumer.Close()

🚀 Quick Start Publisher

The exchange is not declared by default, that's why I recommend using the following options.

conn, err := rabbitmq.NewConn(
	"amqp://guest:guest@localhost",
	rabbitmq.WithConnectionOptionsLogging,
)
if err != nil {
	log.Fatal(err)
}
defer conn.Close()

publisher, err := rabbitmq.NewPublisher(
	conn,
	rabbitmq.WithPublisherOptionsLogging,
	rabbitmq.WithPublisherOptionsExchangeName("events"),
	rabbitmq.WithPublisherOptionsExchangeDeclare,
)
if err != nil {
	log.Fatal(err)
}
defer publisher.Close()

err = publisher.Publish(
	[]byte("hello, world"),
	[]string{"my_routing_key"},
	rabbitmq.WithPublishOptionsContentType("application/json"),
	rabbitmq.WithPublishOptionsExchange("events"),
)
if err != nil {
	log.Println(err)
}

Other usage examples

See the examples directory for more ideas.

Options and configuring

  • By default, queues are declared if they didn't already exist by new consumers
  • By default, routing-key bindings are declared by consumers if you're using WithConsumerOptionsRoutingKey
  • By default, exchanges are not declared by publishers or consumers if they didn't already exist, hence WithPublisherOptionsExchangeDeclare and WithConsumerOptionsExchangeDeclare.

Read up on all the options in the GoDoc, there are quite a few of them. I try to pick sane and simple defaults.

Closing and resources

Close your publishers and consumers when you're done with them and do not attempt to reuse them. Only close the connection itself once you've closed all associated publishers and consumers.

Stability

Note that the API is currently in v0. I don't plan on huge changes, but there may be some small breaking changes before we hit v1.

💬 Contact

Twitter Follow

Submit an issue here on GitHub

Transient Dependencies

My goal is to keep dependencies limited to 1, github.com/rabbitmq/amqp091-go.

👏 Contributing

I would love your help! Contribute by forking the repo and opening pull requests. Please ensure that your code passes the existing tests and linting, and write tests to test your changes if applicable.

All pull requests should be submitted to the main branch.

Documentation

Index

Constants

View Source
const (
	Transient  uint8 = amqp.Transient
	Persistent uint8 = amqp.Persistent
)

DeliveryMode. Transient means higher throughput but messages will not be restored on broker restart. The delivery mode of publishings is unrelated to the durability of the queues they reside on. Transient messages will not be restored to durable queues, persistent messages will be restored to durable queues and lost on non-durable queues during server restart.

This remains typed as uint8 to match Publishing.DeliveryMode. Other delivery modes specific to custom queue implementations are not enumerated here.

Variables

This section is empty.

Functions

func WithConnectionOptionsConfig

func WithConnectionOptionsConfig(cfg Config) func(options *ConnectionOptions)

WithConnectionOptionsConfig sets the Config used in the connection

func WithConnectionOptionsLogger

func WithConnectionOptionsLogger(log Logger) func(options *ConnectionOptions)

WithConnectionOptionsLogger sets logging to true on the consumer options and sets the

func WithConnectionOptionsLogging

func WithConnectionOptionsLogging(options *ConnectionOptions)

WithConnectionOptionsLogging sets logging to true on the consumer options and sets the

func WithConnectionOptionsReconnectInterval

func WithConnectionOptionsReconnectInterval(interval time.Duration) func(options *ConnectionOptions)

WithConnectionOptionsReconnectInterval sets the reconnection interval

func WithConsumerOptionsBinding

func WithConsumerOptionsBinding(binding Binding) func(*ConsumerOptions)

WithConsumerOptionsBinding adds a new binding to the queue which allows you to set the binding options on a per-binding basis. Keep in mind that everything in the BindingOptions struct will default to the zero value. If you want to declare your bindings for example, be sure to set Declare=true

func WithConsumerOptionsConcurrency

func WithConsumerOptionsConcurrency(concurrency int) func(*ConsumerOptions)

WithConsumerOptionsConcurrency returns a function that sets the concurrency, which means that many goroutines will be spawned to run the provided handler on messages

func WithConsumerOptionsConsumerAutoAck

func WithConsumerOptionsConsumerAutoAck(autoAck bool) func(*ConsumerOptions)

WithConsumerOptionsConsumerAutoAck returns a function that sets the auto acknowledge property on the server of this consumer if unset the default will be used (false)

func WithConsumerOptionsConsumerExclusive

func WithConsumerOptionsConsumerExclusive(options *ConsumerOptions)

WithConsumerOptionsConsumerExclusive sets the consumer to exclusive, which means the server will ensure that this is the sole consumer from this queue. When exclusive is false, the server will fairly distribute deliveries across multiple consumers.

func WithConsumerOptionsConsumerName

func WithConsumerOptionsConsumerName(consumerName string) func(*ConsumerOptions)

WithConsumerOptionsConsumerName returns a function that sets the name on the server of this consumer if unset a random name will be given

func WithConsumerOptionsConsumerNoWait

func WithConsumerOptionsConsumerNoWait(options *ConsumerOptions)

WithConsumerOptionsConsumerNoWait sets the consumer to nowait, which means it does not wait for the server to confirm the request and immediately begin deliveries. If it is not possible to consume, a channel exception will be raised and the channel will be closed.

func WithConsumerOptionsExchangeArgs

func WithConsumerOptionsExchangeArgs(args Table) func(*ConsumerOptions)

WithConsumerOptionsExchangeArgs adds optional args to the exchange

func WithConsumerOptionsExchangeAutoDelete

func WithConsumerOptionsExchangeAutoDelete(options *ConsumerOptions)

WithConsumerOptionsExchangeAutoDelete ensures the exchange is an auto-delete exchange

func WithConsumerOptionsExchangeDeclare

func WithConsumerOptionsExchangeDeclare(options *ConsumerOptions)

WithConsumerOptionsExchangeDeclare stops this library from declaring the exchanges existance

func WithConsumerOptionsExchangeDurable

func WithConsumerOptionsExchangeDurable(options *ConsumerOptions)

WithConsumerOptionsExchangeDurable ensures the exchange is a durable exchange

func WithConsumerOptionsExchangeInternal

func WithConsumerOptionsExchangeInternal(options *ConsumerOptions)

WithConsumerOptionsExchangeInternal ensures the exchange is an internal exchange

func WithConsumerOptionsExchangeKind

func WithConsumerOptionsExchangeKind(kind string) func(*ConsumerOptions)

WithConsumerOptionsExchangeKind ensures the queue is a durable queue

func WithConsumerOptionsExchangeName

func WithConsumerOptionsExchangeName(name string) func(*ConsumerOptions)

WithConsumerOptionsExchangeName sets the exchange name

func WithConsumerOptionsExchangeNoWait

func WithConsumerOptionsExchangeNoWait(options *ConsumerOptions)

WithConsumerOptionsExchangeNoWait ensures the exchange is a no-wait exchange

func WithConsumerOptionsExchangePassive

func WithConsumerOptionsExchangePassive(options *ConsumerOptions)

WithConsumerOptionsExchangePassive ensures the exchange is a passive exchange

func WithConsumerOptionsLogger

func WithConsumerOptionsLogger(log logger.Logger) func(options *ConsumerOptions)

WithConsumerOptionsLogger sets logging to a custom interface. Use WithConsumerOptionsLogging to just log to stdout.

func WithConsumerOptionsLogging

func WithConsumerOptionsLogging(options *ConsumerOptions)

WithConsumerOptionsLogging uses a default logger that writes to std out

func WithConsumerOptionsQOSGlobal

func WithConsumerOptionsQOSGlobal(options *ConsumerOptions)

WithConsumerOptionsQOSGlobal sets the qos on the channel to global, which means these QOS settings apply to ALL existing and future consumers on all channels on the same connection

func WithConsumerOptionsQOSPrefetch

func WithConsumerOptionsQOSPrefetch(prefetchCount int) func(*ConsumerOptions)

WithConsumerOptionsQOSPrefetch returns a function that sets the prefetch count, which means that many messages will be fetched from the server in advance to help with throughput. This doesn't affect the handler, messages are still processed one at a time.

func WithConsumerOptionsQueueArgs

func WithConsumerOptionsQueueArgs(args Table) func(*ConsumerOptions)

WithConsumerOptionsQueueArgs adds optional args to the queue

func WithConsumerOptionsQueueAutoDelete

func WithConsumerOptionsQueueAutoDelete(options *ConsumerOptions)

WithConsumerOptionsQueueAutoDelete ensures the queue is an auto-delete queue

func WithConsumerOptionsQueueDurable

func WithConsumerOptionsQueueDurable(options *ConsumerOptions)

WithConsumerOptionsQueueDurable ensures the queue is a durable queue

func WithConsumerOptionsQueueExclusive

func WithConsumerOptionsQueueExclusive(options *ConsumerOptions)

WithConsumerOptionsQueueExclusive ensures the queue is an exclusive queue

func WithConsumerOptionsQueueNoDeclare

func WithConsumerOptionsQueueNoDeclare(options *ConsumerOptions)

WithConsumerOptionsQueueNoDeclare will turn off the declaration of the queue's existance upon startup

func WithConsumerOptionsQueueNoWait

func WithConsumerOptionsQueueNoWait(options *ConsumerOptions)

WithConsumerOptionsQueueNoWait ensures the queue is a no-wait queue

func WithConsumerOptionsQueuePassive

func WithConsumerOptionsQueuePassive(options *ConsumerOptions)

WithConsumerOptionsQueuePassive ensures the queue is a passive queue

func WithConsumerOptionsQueueQuorum

func WithConsumerOptionsQueueQuorum(options *ConsumerOptions)

WithConsumerOptionsQueueQuorum sets the queue a quorum type, which means multiple nodes in the cluster will have the messages distributed amongst them for higher reliability

func WithConsumerOptionsRoutingKey

func WithConsumerOptionsRoutingKey(routingKey string) func(*ConsumerOptions)

WithConsumerOptionsRoutingKey binds the queue to a routing key with the default binding options

func WithPublishOptionsAppID

func WithPublishOptionsAppID(appID string) func(*PublishOptions)

WithPublishOptionsAppID returns a function that sets the application id

func WithPublishOptionsContentEncoding

func WithPublishOptionsContentEncoding(contentEncoding string) func(*PublishOptions)

WithPublishOptionsContentEncoding returns a function that sets the content encoding, i.e. "utf-8"

func WithPublishOptionsContentType

func WithPublishOptionsContentType(contentType string) func(*PublishOptions)

WithPublishOptionsContentType returns a function that sets the content type, i.e. "application/json"

func WithPublishOptionsCorrelationID

func WithPublishOptionsCorrelationID(correlationID string) func(*PublishOptions)

WithPublishOptionsCorrelationID returns a function that sets the content correlation identifier

func WithPublishOptionsExchange

func WithPublishOptionsExchange(exchange string) func(*PublishOptions)

WithPublishOptionsExchange returns a function that sets the exchange to publish to

func WithPublishOptionsExpiration

func WithPublishOptionsExpiration(expiration string) func(options *PublishOptions)

WithPublishOptionsExpiration returns a function that sets the expiry/TTL of a message. As per RabbitMq spec, it must be a string value in milliseconds.

func WithPublishOptionsHeaders

func WithPublishOptionsHeaders(headers Table) func(*PublishOptions)

WithPublishOptionsHeaders returns a function that sets message header values, i.e. "msg-id"

func WithPublishOptionsImmediate

func WithPublishOptionsImmediate(options *PublishOptions)

WithPublishOptionsImmediate makes the publishing immediate, which means when a consumer is not available to immediately handle the new message, a message will be sent back on the returns channel for you to handle

func WithPublishOptionsMandatory

func WithPublishOptionsMandatory(options *PublishOptions)

WithPublishOptionsMandatory makes the publishing mandatory, which means when a queue is not bound to the routing key a message will be sent back on the returns channel for you to handle

func WithPublishOptionsMessageID

func WithPublishOptionsMessageID(messageID string) func(*PublishOptions)

WithPublishOptionsMessageID returns a function that sets the message identifier

func WithPublishOptionsPersistentDelivery

func WithPublishOptionsPersistentDelivery(options *PublishOptions)

WithPublishOptionsPersistentDelivery sets the message to persist. Transient messages will not be restored to durable queues, persistent messages will be restored to durable queues and lost on non-durable queues during server restart. By default publishings are transient

func WithPublishOptionsPriority

func WithPublishOptionsPriority(priority uint8) func(*PublishOptions)

WithPublishOptionsPriority returns a function that sets the content priority from 0 to 9

func WithPublishOptionsReplyTo

func WithPublishOptionsReplyTo(replyTo string) func(*PublishOptions)

WithPublishOptionsReplyTo returns a function that sets the reply to field

func WithPublishOptionsTimestamp

func WithPublishOptionsTimestamp(timestamp time.Time) func(*PublishOptions)

WithPublishOptionsTimestamp returns a function that sets the timestamp for the message

func WithPublishOptionsType

func WithPublishOptionsType(messageType string) func(*PublishOptions)

WithPublishOptionsType returns a function that sets the message type name

func WithPublishOptionsUserID

func WithPublishOptionsUserID(userID string) func(*PublishOptions)

WithPublishOptionsUserID returns a function that sets the user id i.e. "user"

func WithPublisherOptionsConfirm added in v0.2.10

func WithPublisherOptionsConfirm(options *PublisherOptions)

WithPublisherOptionsConfirm enables confirm mode on the connection this is required if publisher confirmations should be used

func WithPublisherOptionsExchangeArgs

func WithPublisherOptionsExchangeArgs(args Table) func(*PublisherOptions)

WithPublisherOptionsExchangeArgs adds optional args to the exchange

func WithPublisherOptionsExchangeAutoDelete

func WithPublisherOptionsExchangeAutoDelete(options *PublisherOptions)

WithPublisherOptionsExchangeAutoDelete ensures the exchange is an auto-delete exchange

func WithPublisherOptionsExchangeDeclare

func WithPublisherOptionsExchangeDeclare(options *PublisherOptions)

WithPublisherOptionsExchangeDeclare stops this library from declaring the exchanges existance

func WithPublisherOptionsExchangeDurable

func WithPublisherOptionsExchangeDurable(options *PublisherOptions)

WithPublisherOptionsExchangeDurable ensures the exchange is a durable exchange

func WithPublisherOptionsExchangeInternal

func WithPublisherOptionsExchangeInternal(options *PublisherOptions)

WithPublisherOptionsExchangeInternal ensures the exchange is an internal exchange

func WithPublisherOptionsExchangeKind

func WithPublisherOptionsExchangeKind(kind string) func(*PublisherOptions)

WithPublisherOptionsExchangeKind ensures the queue is a durable queue

func WithPublisherOptionsExchangeName

func WithPublisherOptionsExchangeName(name string) func(*PublisherOptions)

WithPublisherOptionsExchangeName sets the exchange name

func WithPublisherOptionsExchangeNoWait

func WithPublisherOptionsExchangeNoWait(options *PublisherOptions)

WithPublisherOptionsExchangeNoWait ensures the exchange is a no-wait exchange

func WithPublisherOptionsExchangePassive

func WithPublisherOptionsExchangePassive(options *PublisherOptions)

WithPublisherOptionsExchangePassive ensures the exchange is a passive exchange

func WithPublisherOptionsLogger

func WithPublisherOptionsLogger(log Logger) func(options *PublisherOptions)

WithPublisherOptionsLogger sets logging to a custom interface. Use WithPublisherOptionsLogging to just log to stdout.

func WithPublisherOptionsLogging

func WithPublisherOptionsLogging(options *PublisherOptions)

WithPublisherOptionsLogging sets logging to true on the publisher options and sets the

Types

type Action

type Action int

Action is an action that occurs after processed this delivery

const (
	// Ack default ack this msg after you have successfully processed this delivery.
	Ack Action = iota
	// NackDiscard the message will be dropped or delivered to a server configured dead-letter queue.
	NackDiscard
	// NackRequeue deliver this message to a different consumer.
	NackRequeue
	// Message acknowledgement is left to the user using the msg.Ack() method
	Manual
)

type Binding

type Binding struct {
	RoutingKey string
	BindingOptions
}

Binding describes the bhinding of a queue to a routing key on an exchange

type BindingOptions

type BindingOptions struct {
	NoWait  bool
	Args    Table
	Declare bool
}

BindingOptions describes the options a binding can have

type Config

type Config amqp.Config

Config wraps amqp.Config Config is used in DialConfig and Open to specify the desired tuning parameters used during a connection open handshake. The negotiated tuning will be stored in the returned connection's Config field.

type Confirmation

type Confirmation struct {
	amqp.Confirmation
	ReconnectionCount int
}

Confirmation notifies the acknowledgment or negative acknowledgement of a publishing identified by its delivery tag. Use NotifyPublish to consume these events. ReconnectionCount is useful in that each time it increments, the DeliveryTag is reset to 0, meaning you can use ReconnectionCount+DeliveryTag to ensure uniqueness

type Conn

type Conn struct {
	// contains filtered or unexported fields
}

Conn manages the connection to a rabbit cluster it is intended to be shared across publishers and consumers

func NewConn

func NewConn(url string, optionFuncs ...func(*ConnectionOptions)) (*Conn, error)

NewConn creates a new connection manager

func (*Conn) Close

func (conn *Conn) Close() error

Close closes the connection, it's not safe for re-use. You should also close any consumers and publishers before closing the connection

func (*Conn) RegisterReconnectHook added in v0.2.8

func (conn *Conn) RegisterReconnectHook(hook func(error))

type ConnectionOptions

type ConnectionOptions struct {
	ReconnectInterval time.Duration
	Logger            Logger
	Config            Config
}

ConnectionOptions are used to describe how a new consumer will be created.

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

Consumer allows you to create and connect to queues for data consumption.

func NewConsumer

func NewConsumer(
	conn *Conn,
	handler Handler,
	queue string,
	optionFuncs ...func(*ConsumerOptions),
) (*Consumer, error)

NewConsumer returns a new Consumer connected to the given rabbitmq server it also starts consuming on the given connection with automatic reconnection handling Do not reuse the returned consumer for anything other than to close it

func (*Consumer) Close

func (consumer *Consumer) Close()

Close cleans up resources and closes the consumer. It does not close the connection manager, just the subscription to the connection manager and the consuming goroutines. Only call once.

type ConsumerOptions

type ConsumerOptions struct {
	RabbitConsumerOptions RabbitConsumerOptions
	QueueOptions          QueueOptions
	ExchangeOptions       ExchangeOptions
	Bindings              []Binding
	Concurrency           int
	Logger                logger.Logger
	QOSPrefetch           int
	QOSGlobal             bool
}

ConsumerOptions are used to describe how a new consumer will be created. If QueueOptions is not nil, the options will be used to declare a queue If ExchangeOptions is not nil, it will be used to declare an exchange If there are Bindings, the queue will be bound to them

type Declarator added in v0.2.0

type Declarator struct {
	// contains filtered or unexported fields
}

func NewDeclarator added in v0.2.0

func NewDeclarator(conn *Conn) (*Declarator, error)

func (*Declarator) BindExchanges added in v0.2.4

func (d *Declarator) BindExchanges(bindings []ExchangeBinding) error

func (*Declarator) Close added in v0.2.2

func (d *Declarator) Close()

func (*Declarator) DeclareExchange added in v0.2.4

func (d *Declarator) DeclareExchange(optionFuncs ...func(*PublisherOptions)) error

func (*Declarator) DeclareQueue added in v0.2.4

func (d *Declarator) DeclareQueue(queue string, optionFuncs ...func(*ConsumerOptions)) error

type Delivery

type Delivery struct {
	amqp.Delivery
}

Delivery captures the fields for a previously delivered message resident in a queue to be delivered by the server to a consumer from Channel.Consume or Channel.Get.

type ExchangeBinding added in v0.2.2

type ExchangeBinding struct {
	From       string
	To         string
	RoutingKey string
	Args       Table
	NoWait     bool
}

type ExchangeOptions

type ExchangeOptions struct {
	Name       string
	Kind       string // possible values: empty string for default exchange or direct, topic, fanout
	Durable    bool
	AutoDelete bool
	Internal   bool
	NoWait     bool
	Passive    bool // if false, a missing exchange will be created on the server
	Args       Table
	Declare    bool
}

ExchangeOptions are used to configure an exchange. If the Passive flag is set the client will only check if the exchange exists on the server and that the settings match, no creation attempt will be made.

type Handler

type Handler func(d Delivery) (action Action)

Handler defines the handler of each Delivery and return Action

type Logger

type Logger logger.Logger

Logger is describes a logging structure. It can be set using WithPublisherOptionsLogger() or WithConsumerOptionsLogger().

type PublishOptions

type PublishOptions struct {
	Exchange string
	// Mandatory fails to publish if there are no queues
	// bound to the routing key
	Mandatory bool
	// Immediate fails to publish if there are no consumers
	// that can ack bound to the queue on the routing key
	Immediate bool
	// MIME content type
	ContentType string
	// Transient (0 or 1) or Persistent (2)
	DeliveryMode uint8
	// Expiration time in ms that a message will expire from a queue.
	// See https://www.rabbitmq.com/ttl.html#per-message-ttl-in-publishers
	Expiration string
	// MIME content encoding
	ContentEncoding string
	// 0 to 9
	Priority uint8
	// correlation identifier
	CorrelationID string
	// address to to reply to (ex: RPC)
	ReplyTo string
	// message identifier
	MessageID string
	// message timestamp
	Timestamp time.Time
	// message type name
	Type string
	// creating user id - ex: "guest"
	UserID string
	// creating application id
	AppID string
	// Application or exchange specific fields,
	// the headers exchange will inspect this field.
	Headers Table
}

PublishOptions are used to control how data is published

type Publisher

type Publisher struct {
	// contains filtered or unexported fields
}

Publisher allows you to publish messages safely across an open connection

func NewPublisher

func NewPublisher(conn *Conn, optionFuncs ...func(*PublisherOptions)) (*Publisher, error)

NewPublisher returns a new publisher with an open channel to the cluster. If you plan to enforce mandatory or immediate publishing, those failures will be reported on the channel of Returns that you should setup a listener on. Flow controls are automatically handled as they are sent from the server, and publishing will fail with an error when the server is requesting a slowdown

func (*Publisher) Close

func (publisher *Publisher) Close()

Close closes the publisher and releases resources The publisher should be discarded as it's not safe for re-use Only call Close() once

func (*Publisher) NotifyPublish

func (publisher *Publisher) NotifyPublish(handler func(p Confirmation))

NotifyPublish registers a listener for publish confirmations, must set ConfirmPublishings option These notifications are shared across an entire connection, so if you're creating multiple publishers on the same connection keep that in mind

func (*Publisher) NotifyReturn

func (publisher *Publisher) NotifyReturn(handler func(r Return))

NotifyReturn registers a listener for basic.return methods. These can be sent from the server when a publish is undeliverable either from the mandatory or immediate flags. These notifications are shared across an entire connection, so if you're creating multiple publishers on the same connection keep that in mind

func (*Publisher) Publish

func (publisher *Publisher) Publish(
	data []byte,
	routingKeys []string,
	optionFuncs ...func(*PublishOptions),
) error

Publish publishes the provided data to the given routing keys over the connection.

func (*Publisher) PublishWithContext

func (publisher *Publisher) PublishWithContext(
	ctx context.Context,
	data []byte,
	routingKeys []string,
	optionFuncs ...func(*PublishOptions),
) error

PublishWithContext publishes the provided data to the given routing keys over the connection.

func (*Publisher) PublishWithDeferredConfirmWithContext

func (publisher *Publisher) PublishWithDeferredConfirmWithContext(
	ctx context.Context,
	data []byte,
	routingKeys []string,
	optionFuncs ...func(*PublishOptions),
) (PublisherConfirmation, error)

PublishWithContext publishes the provided data to the given routing keys over the connection. if the publisher is in confirm mode (which can be either done by calling `NotifyPublish` with a custom handler or by using `WithPublisherOptionsConfirm`) a publisher confirmation is returned. This confirmation can be used to check if the message was actually published or wait for this to happen.

type PublisherConfirmation

type PublisherConfirmation []*amqp.DeferredConfirmation

type PublisherOptions

type PublisherOptions struct {
	ExchangeOptions ExchangeOptions
	Logger          Logger
	ConfirmMode     bool
}

PublisherOptions are used to describe a publisher's configuration. Logger is a custom logging interface.

type QueueOptions

type QueueOptions struct {
	Name       string
	Durable    bool
	AutoDelete bool
	Exclusive  bool
	NoWait     bool
	Passive    bool // if false, a missing queue will be created on the server
	Args       Table
	Declare    bool
}

QueueOptions are used to configure a queue. A passive queue is assumed by RabbitMQ to already exist, and attempting to connect to a non-existent queue will cause RabbitMQ to throw an exception.

type RabbitConsumerOptions

type RabbitConsumerOptions struct {
	Name      string
	AutoAck   bool
	Exclusive bool
	NoWait    bool
	NoLocal   bool
	Args      Table
}

RabbitConsumerOptions are used to configure the consumer on the rabbit server

type Return

type Return struct {
	amqp.Return
}

Return captures a flattened struct of fields returned by the server when a Publishing is unable to be delivered either due to the `mandatory` flag set and no route found, or `immediate` flag set and no free consumer.

type Table

type Table map[string]interface{}

Table stores user supplied fields of the following types:

bool
byte
float32
float64
int
int16
int32
int64
nil
string
time.Time
amqp.Decimal
amqp.Table
[]byte
[]interface{} - containing above types

Functions taking a table will immediately fail when the table contains a value of an unsupported type.

The caller must be specific in which precision of integer it wishes to encode.

Use a type assertion when reading values from a table for type conversion.

RabbitMQ expects int32 for integer values.

Directories

Path Synopsis
examples
internal

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL