rabbitmq

package
v0.7.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 9, 2024 License: Apache-2.0 Imports: 13 Imported by: 0

Documentation

Index

Constants

View Source
const (
	Transient  uint8 = amqp.Transient
	Persistent uint8 = amqp.Persistent
)

DeliveryMode. Transient means higher throughput but messages will not be restored on broker restart. The delivery mode of publishings is unrelated to the durability of the queues they reside on. Transient messages will not be restored to durable queues, persistent messages will be restored to durable queues and lost on non-durable queues during server restart.

This remains typed as uint8 to match Publishing.DeliveryMode. Other delivery modes specific to custom queue implementations are not enumerated here.

Variables

This section is empty.

Functions

func WithConsumeOptionsBindingExchangeArgs

func WithConsumeOptionsBindingExchangeArgs(args amqp.Table) func(*ConsumeOptions)

WithConsumeOptionsBindingExchangeArgs returns a function that sets the binding exchange arguments that are specific to the server's implementation of the exchange

func WithConsumeOptionsBindingExchangeAutoDelete

func WithConsumeOptionsBindingExchangeAutoDelete(options *ConsumeOptions)

WithConsumeOptionsBindingExchangeAutoDelete returns a function that sets the binding exchange autoDelete flag

func WithConsumeOptionsBindingExchangeDurable

func WithConsumeOptionsBindingExchangeDurable(options *ConsumeOptions)

WithConsumeOptionsBindingExchangeDurable returns a function that sets the binding exchange durable flag

func WithConsumeOptionsBindingExchangeInternal

func WithConsumeOptionsBindingExchangeInternal(options *ConsumeOptions)

WithConsumeOptionsBindingExchangeInternal returns a function that sets the binding exchange internal flag

func WithConsumeOptionsBindingExchangeKind

func WithConsumeOptionsBindingExchangeKind(kind string) func(*ConsumeOptions)

WithConsumeOptionsBindingExchangeKind returns a function that sets the binding exchange kind/type

func WithConsumeOptionsBindingExchangeName

func WithConsumeOptionsBindingExchangeName(name string) func(*ConsumeOptions)

WithConsumeOptionsBindingExchangeName returns a function that sets the exchange name the queue will be bound to

func WithConsumeOptionsBindingExchangeNoWait

func WithConsumeOptionsBindingExchangeNoWait(options *ConsumeOptions)

WithConsumeOptionsBindingExchangeNoWait returns a function that sets the binding exchange noWait flag

func WithConsumeOptionsBindingNoWait

func WithConsumeOptionsBindingNoWait(options *ConsumeOptions)

WithConsumeOptionsBindingNoWait sets the bindings to nowait, which means if the queue can not be bound the channel will not be closed with an error.

func WithConsumeOptionsConcurrency

func WithConsumeOptionsConcurrency(concurrency int) func(*ConsumeOptions)

WithConsumeOptionsConcurrency returns a function that sets the concurrency, which means that many goroutines will be spawned to run the provided handler on messages

func WithConsumeOptionsConsumerAutoAck

func WithConsumeOptionsConsumerAutoAck(autoAck bool) func(*ConsumeOptions)

WithConsumeOptionsConsumerAutoAck returns a function that sets the auto acknowledge property on the server of this consumer if unset the default will be used (false)

func WithConsumeOptionsConsumerExclusive

func WithConsumeOptionsConsumerExclusive(options *ConsumeOptions)

WithConsumeOptionsConsumerExclusive sets the consumer to exclusive, which means the server will ensure that this is the sole consumer from this queue. When exclusive is false, the server will fairly distribute deliveries across multiple consumers.

func WithConsumeOptionsConsumerName

func WithConsumeOptionsConsumerName(consumerName string) func(*ConsumeOptions)

WithConsumeOptionsConsumerName returns a function that sets the name on the server of this consumer if unset a random name will be given

func WithConsumeOptionsConsumerNoWait

func WithConsumeOptionsConsumerNoWait(options *ConsumeOptions)

WithConsumeOptionsConsumerNoWait sets the consumer to nowait, which means it does not wait for the server to confirm the request and immediately begin deliveries. If it is not possible to consume, a channel exception will be raised and the channel will be closed.

func WithConsumeOptionsQOSGlobal

func WithConsumeOptionsQOSGlobal(options *ConsumeOptions)

WithConsumeOptionsQOSGlobal sets the qos on the channel to global, which means these QOS settings apply to ALL existing and future consumers on all channels on the same connection

func WithConsumeOptionsQOSPrefetch

func WithConsumeOptionsQOSPrefetch(prefetchCount int) func(*ConsumeOptions)

WithConsumeOptionsQOSPrefetch returns a function that sets the prefetch count, which means that many messages will be fetched from the server in advance to help with throughput. This doesn't affect the handler, messages are still processed one at a time.

func WithConsumeOptionsQueueAutoDelete

func WithConsumeOptionsQueueAutoDelete(options *ConsumeOptions)

WithConsumeOptionsQueueAutoDelete sets the queue to auto delete, which means it will be deleted when there are no more conusmers on it

func WithConsumeOptionsQueueDurable

func WithConsumeOptionsQueueDurable(options *ConsumeOptions)

WithConsumeOptionsQueueDurable sets the queue to durable, which means it won't be destroyed when the server restarts. It must only be bound to durable exchanges

func WithConsumeOptionsQueueExclusive

func WithConsumeOptionsQueueExclusive(options *ConsumeOptions)

WithConsumeOptionsQueueExclusive sets the queue to exclusive, which means it's are only accessible by the connection that declares it and will be deleted when the connection closes. Channels on other connections will receive an error when attempting to declare, bind, consume, purge or delete a queue with the same name.

func WithConsumeOptionsQueueNoWait

func WithConsumeOptionsQueueNoWait(options *ConsumeOptions)

WithConsumeOptionsQueueNoWait sets the queue to nowait, which means the queue will assume to be declared on the server. A channel exception will arrive if the conditions are met for existing queues or attempting to modify an existing queue from a different connection.

func WithConsumeOptionsQuorum

func WithConsumeOptionsQuorum(options *ConsumeOptions)

WithConsumeOptionsQuorum sets the queue a quorum type, which means multiple nodes in the cluster will have the messages distributed amongst them for higher reliability

func WithPublishOptionsContentType

func WithPublishOptionsContentType(contentType string) func(*PublishOptions)

WithPublishOptionsContentType returns a function that sets the content type, i.e. "application/json"

func WithPublishOptionsExchange

func WithPublishOptionsExchange(exchange string) func(*PublishOptions)

WithPublishOptionsExchange returns a function that sets the exchange to publish to

func WithPublishOptionsExpiration

func WithPublishOptionsExpiration(expiration string) func(options *PublishOptions)

WithPublishOptionsExpiration returns a function that sets the expiry/TTL of a message. As per RabbitMq spec, it must be a string value in milliseconds.

func WithPublishOptionsHeaders

func WithPublishOptionsHeaders(headers amqp.Table) func(*PublishOptions)

WithPublishOptionsHeaders returns a function that sets message header values, i.e. "msg-id"

func WithPublishOptionsImmediate

func WithPublishOptionsImmediate(options *PublishOptions)

WithPublishOptionsImmediate makes the publishing immediate, which means when a consumer is not available to immediately handle the new message, a message will be sent back on the returns channel for you to handle

func WithPublishOptionsMandatory

func WithPublishOptionsMandatory(options *PublishOptions)

WithPublishOptionsMandatory makes the publishing mandatory, which means when a queue is not bound to the routing key a message will be sent back on the returns channel for you to handle

func WithPublishOptionsPersistentDelivery

func WithPublishOptionsPersistentDelivery(options *PublishOptions)

WithPublishOptionsPersistentDelivery sets the message to persist. Transient messages will not be restored to durable queues, persistent messages will be restored to durable queues and lost on non-durable queues during server restart. By default publishings are transient

Types

type BindingExchangeOptions

type BindingExchangeOptions struct {
	Name         string
	Kind         string
	Durable      bool
	AutoDelete   bool
	Internal     bool
	NoWait       bool
	ExchangeArgs amqp.Table
}

BindingExchangeOptions are used when binding to an exchange. it will verify the exchange is created before binding to it.

type ConsumeOptions

type ConsumeOptions struct {
	QueueDurable      bool
	QueueAutoDelete   bool
	QueueExclusive    bool
	QueueNoWait       bool
	QueueArgs         amqp.Table
	BindingExchange   *BindingExchangeOptions
	BindingNoWait     bool
	BindingArgs       amqp.Table
	Concurrency       int
	QOSPrefetch       int
	QOSGlobal         bool
	ConsumerName      string
	ConsumerAutoAck   bool
	ConsumerExclusive bool
	ConsumerNoWait    bool
	ConsumerNoLocal   bool
	ConsumerArgs      amqp.Table
}

ConsumeOptions are used to describe how a new consumer will be created.

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

Consumer allows you to create and connect to queues for data consumption.

func NewConsumer

func NewConsumer(url string, config *amqp.Config) (*Consumer, error)

NewConsumer returns a new Consumer connected to the given rabbitmq server

func (Consumer) Disconnect

func (consumer Consumer) Disconnect()

Disconnect disconnects both the channel and the connection. This method doesn't throw a reconnect, and should be used when finishing a program. IMPORTANT: If this method is executed before StopConsuming, it could cause unexpected behavior such as messages being processed, but not being acknowledged, thus being requeued by the broker

func (Consumer) StartConsuming

func (consumer Consumer) StartConsuming(
	handler func(d amqp.Delivery) bool,
	queue string,
	routingKeys []string,
	optionFuncs ...func(*ConsumeOptions),
) error

StartConsuming starts n goroutines where n="ConsumeOptions.QosOptions.Concurrency". Each goroutine spawns a handler that consumes off of the given queue which binds to the routing key(s). The provided handler is called once for each message. If the provided queue doesn't exist, it will be created on the cluster

func (Consumer) StopConsuming

func (consumer Consumer) StopConsuming(consumerName string, noWait bool)

StopConsuming stops the consumption of messages. The consumer should be discarded as it's not safe for re-use. This method sends a basic.cancel notification. The consumerName is the name or delivery tag of the amqp consumer we want to cancel. When noWait is true, do not wait for the server to acknowledge the cancel. Only use this when you are certain there are no deliveries in flight that require an acknowledgment, otherwise they will arrive and be dropped in the client without an ack, and will not be redelivered to other consumers. IMPORTANT: Since the amqp library doesn't provide a way to retrieve the consumer's tag after the creation it's imperative for you to set the name when creating the consumer, if you want to use this function later a simple uuid4 should do the trick, since it should be unique. If you start many consumers, you should store the name of the consumers when creating them, such that you can use them in a for to stop all the consumers.

type PublishOptions

type PublishOptions struct {
	Exchange string
	// Mandatory fails to publish if there are no queues
	// bound to the routing key
	Mandatory bool
	// Immediate fails to publish if there are no consumers
	// that can ack bound to the queue on the routing key
	Immediate   bool
	ContentType string
	// Transient or Persistent
	DeliveryMode uint8
	// Expiration time in ms that a message will expire from a queue.
	// See https://www.rabbitmq.com/ttl.html#per-message-ttl-in-publishers
	Expiration string
	Headers    amqp.Table
}

PublishOptions are used to control how data is published

type Publisher

type Publisher struct {
	// contains filtered or unexported fields
}

Publisher allows you to publish messages safely across an open connection

func NewPublisher

func NewPublisher(url string, config *amqp.Config) (*Publisher, <-chan amqp.Return, error)

NewPublisher returns a new publisher with an open channel to the cluster. If you plan to enforce mandatory or immediate publishing, those failures will be reported on the channel of Returns that you should setup a listener on. Flow controls are automatically handled as they are sent from the server, and publishing will fail with an error when the server is requesting a slowdown

func (*Publisher) Publish

func (publisher *Publisher) Publish(
	ctx context.Context,
	data []byte,
	routingKeys []string,
	optionFuncs ...func(*PublishOptions),
) error

Publish publishes the provided data to the given routing keys over the connection

func (Publisher) StopPublishing

func (publisher Publisher) StopPublishing()

StopPublishing stops the publishing of messages. The publisher should be discarded as it's not safe for re-use

type TestEnv

type TestEnv struct {
	*test.TestEnv
	AmqpURL string
}

func NewTestEnvWithParent

func NewTestEnvWithParent(testEnv *test.TestEnv) (*TestEnv, error)

func (*TestEnv) Shutdown

func (env *TestEnv) Shutdown() error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL