rmq

package
v0.0.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 19, 2020 License: MIT Imports: 6 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type ChannelOpts added in v0.0.2

type ChannelOpts struct {
	PrefetchCount int
	PrefetchSize  int
	Global        bool
}

ChannelOpts ...

func DefaultChannelOpts added in v0.0.2

func DefaultChannelOpts() *ChannelOpts

DefaultChannelOpts ...

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client is rabbitmq client object

func GetRMQClient

func GetRMQClient(
	username, password, url, port, vhost string,
	secure bool) *Client

GetRMQClient returns a RMQ client

func (*Client) ExchangeDeclare

func (c *Client) ExchangeDeclare(name string, opts *DeclareExchangeOpts, connOpts *ConnectOpts) error

ExchangeDeclare declares an exchange on the RabbitMQ server

name is name of the exhange

opts is options for declaring an exchange

connOpts provides connection options such as retry to connect if connection closes or fails and number of retries to attempt.

func (*Client) ExchangeDelete

func (c *Client) ExchangeDelete(name string, ifUnused, noWait bool, connOpts *ConnectOpts) error

ExchangeDelete removes the named exchange from the server. When an exchange is deleted all queue bindings on the exchange are also deleted. If this exchange does not exist, the channel will be closed with an error.

When ifUnused is true, the server will only delete the exchange if it has no queue bindings. If the exchange has queue bindings the server does not delete it but close the channel with an exception instead. Set this to true if you are not the sole owner of the exchange.

When noWait is true, do not wait for a server confirmation that the exchange has been deleted. Failing to delete the channel could close the channel. Add a NotifyClose listener to respond to these channel exceptions.

connOpts provides connection options such as retry to connect if connection closes or fails and number of retries to attempt.

func (*Client) Publish

func (c *Client) Publish(msg amqp.Publishing, exchange, key string, opts *PublishOpts, connOpts *ConnectOpts) error

Publish publishes a message to the exchange

msg is the message that needs to be published on the exchange

exchange is the name of exchange where this message will be published

key is the routing key that will be used for routing the message on exchange to different queues

opts is option for publishing a message

connOpts provides connection options such as retry to connect if connection closes or fails and number of retries to attempt.

func (*Client) QueueBind

func (c *Client) QueueBind(
	exchange, queue, key string,
	opts *QueueBindOpts,
	connOpts *ConnectOpts) error

QueueBind binds a queue to an exchange with provided routing key on the RabbitMQ server

exchange name to bind with the queue

queue name to bind with the exchange

key used for routing messages on exchange to the queue

opts providing queue binding options

connOpts provides connection options such as retry to connect if connection closes or fails and number of retries to attempt.

func (*Client) QueueDeclare

func (c *Client) QueueDeclare(
	name string,
	opts *DeclareQueueOpts,
	connOpts *ConnectOpts) (amqp.Queue, error)

QueueDeclare declares a queue on the RabbitMQ server

name is the name of queue

opts is the options for declaring a queue

connOpts provides connection options such as retry to connect if connection closes or fails and number of retries to attempt.

func (*Client) QueueDelete

func (c *Client) QueueDelete(
	queue string,
	opts *QueueDeleteOpts,
	connOpts *ConnectOpts) error

QueueDelete deletes a queue from the server

queue name that you want to delete

opts providing options for deleting queue

connOpts provides connection options such as retry to connect if connection closes or fails and number of retries to attempt.

func (*Client) QueuePurge

func (c *Client) QueuePurge(queue string, noWait bool, connOpts *ConnectOpts) error

QueuePurge purges messages from the queue

name is the name of the queue that needs to be purged of messages

noWait If noWait is true, do not wait for the server response and the number of messages purged will not be meaningful.

connOpts provides connection options such as retry to connect if connection closes or fails and number of retries to attempt.

func (*Client) Subscribe

func (c *Client) Subscribe(
	ctx context.Context,
	queue string,
	opts *SubscribeOpts,
	chanOpts *ChannelOpts,
	connOpts *ConnectOpts,
	handler func(amqp.Delivery) (amqp.Publishing, error),
) error

Subscribe subscribes you to receive messages from a queue. It processes one message at a time and responds back with a message if required. You can subscribe to a queue indefinitely in case you want to keep on processing new messages.

ctx is the context object that can be used for signaling ctx.Done()

queue is the name of the queue from it will receive messages

opts is subscribe option which provides information like correlation ID to look for, listen indefinitley, publish response from handler

connOpts provides connection options such as retry to connect if connection closes or fails and number of retries to attempt.

handler is a function that will process the incoming messages and it should return response(optional, see publishResponse flag defn) and error object.

type ConnectOpts added in v0.0.3

type ConnectOpts struct {
	ReconnectRetries  int           // Number of retries for reconnecting
	ReconnectInterval time.Duration // Interval to wait before retrying connection
}

ConnectOpts to specify whether user wants to reconnect if connection closes or fails

func DefaultConnectOpts added in v0.0.3

func DefaultConnectOpts() *ConnectOpts

DefaultConnectOpts returns default connect options

type DeclareExchangeOpts

type DeclareExchangeOpts struct {
	Kind        string     // default amqp.ExchangeDirect
	Durable     bool       // default true
	AutoDeleted bool       // default false
	Internal    bool       // default false
	NoWait      bool       // default false
	Args        amqp.Table // default nil
}

DeclareExchangeOpts ...

Each exchange belongs to one of a set of exchange kinds/types implemented by the server. The exchange types define the functionality of the exchange - i.e. how messages are routed through it. Once an exchange is declared, its type cannot be changed. The common types are "direct", "fanout", "topic" and "headers".

Durable and Non-Auto-Deleted exchanges will survive server restarts and remain declared when there are no remaining bindings. This is the best lifetime for long-lived exchange configurations like stable routes and default exchanges.

Non-Durable and Auto-Deleted exchanges will be deleted when there are no remaining bindings and not restored on server restart. This lifetime is useful for temporary topologies that should not pollute the virtual host on failure or after the consumers have completed.

Non-Durable and Non-Auto-deleted exchanges will remain as long as the server is running including when there are no remaining bindings. This is useful for temporary topologies that may have long delays between bindings.

Durable and Auto-Deleted exchanges will survive server restarts and will be removed before and after server restarts when there are no remaining bindings. These exchanges are useful for robust temporary topologies or when you require binding durable queues to auto-deleted exchanges.

Note: RabbitMQ declares the default exchange types like 'amq.fanout' as durable, so queues that bind to these pre-declared exchanges must also be durable.

Exchanges declared as `internal` do not accept accept publishings. Internal exchanges are useful when you wish to implement inter-exchange topologies that should not be exposed to users of the broker.

When noWait is true, declare without waiting for a confirmation from the server. The channel may be closed as a result of an error. Add a NotifyClose listener to respond to any exceptions.

Optional amqp.Table of arguments that are specific to the server's implementation of the exchange can be sent for exchange types that require extra parameters.

func DefaultDeclareExchangeOpts

func DefaultDeclareExchangeOpts() *DeclareExchangeOpts

DefaultDeclareExchangeOpts returns default DeclareExchangeOpts

type DeclareQueueOpts

type DeclareQueueOpts struct {
	Durable    bool       // default true
	AutoDelete bool       // default false
	Exclusive  bool       // default false
	NoWait     bool       // default false
	Args       amqp.Table // default nil
}

DeclareQueueOpts ...

Durable and Non-Auto-Deleted queues will survive server restarts and remain when there are no remaining consumers or bindings. Persistent publishings will be restored in this queue on server restart. These queues are only able to be bound to durable exchanges.

Non-Durable and Auto-Deleted queues will not be redeclared on server restart and will be deleted by the server after a short time when the last consumer is canceled or the last consumer's channel is closed. Queues with this lifetime can also be deleted normally with QueueDelete. These durable queues can only be bound to non-durable exchanges.

Non-Durable and Non-Auto-Deleted queues will remain declared as long as the server is running regardless of how many consumers. This lifetime is useful for temporary topologies that may have long delays between consumer activity. These queues can only be bound to non-durable exchanges.

Durable and Auto-Deleted queues will be restored on server restart, but without active consumers will not survive and be removed. This Lifetime is unlikely to be useful.

Exclusive queues are only accessible by the connection that declares them and will be deleted when the connection closes. Channels on other connections will receive an error when attempting to declare, bind, consume, purge or delete a queue with the same name.

When noWait is true, the queue will assume to be declared on the server. A channel exception will arrive if the conditions are met for existing queues or attempting to modify an existing queue from a different connection.

func DefaultDeclareQueueOpts

func DefaultDeclareQueueOpts() *DeclareQueueOpts

DefaultDeclareQueueOpts ...

type PublishOpts

type PublishOpts struct {
	Mandatory bool // default false
	Immediate bool // default false
}

PublishOpts ...

func DefaultPublishOpts

func DefaultPublishOpts() *PublishOpts

DefaultPublishOpts ...

type QueueBindOpts

type QueueBindOpts struct {
	NoWait bool       // default false
	Args   amqp.Table // default nil
}

QueueBindOpts ...

func DefaultQueueBindOpts

func DefaultQueueBindOpts() *QueueBindOpts

DefaultQueueBindOpts ...

type QueueDeleteOpts

type QueueDeleteOpts struct {
	IfUnused bool // default false
	IfEmpty  bool //default false
	NoWait   bool // default false
}

QueueDeleteOpts ...

func DefaultQueueDeleteOpts

func DefaultQueueDeleteOpts() *QueueDeleteOpts

DefaultQueueDeleteOpts ...

type SubscribeOpts

type SubscribeOpts struct {
	CorrelationID      string // Correlation ID
	Reconnect          bool   // Reconnect if connection closed
	ListenIndefinitely bool   // Listen indefinitely
	PublishResponse    bool   // Publish response from handler
}

SubscribeOpts ...

func DefaultSubscribeOpts added in v0.0.2

func DefaultSubscribeOpts() *SubscribeOpts

DefaultSubscribeOpts ...

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL