rabbitmq

package
v0.0.0-...-3a37e37 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 11, 2023 License: Apache-2.0 Imports: 6 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type MessageQueue

type MessageQueue struct {
	// contains filtered or unexported fields
}

MessageQueue is a wrapper around RabbitMQ's implementation of queues and implements the lanternmq.MessageQueue interface. It allows the user to: * connect to a queueing service * create a channel for that queueing service * state how many messages a receiver can process at one time * declare a durable queue, and send and receive from that queue * declare a durable exchange, and send and receive from that exchange

  • potential exchange options are: 'direct', 'topic', 'headers', and 'fanout'

* close the MessageQueue, which includes closing all channels and the connection to the underlying service.

func (*MessageQueue) Close

func (mq *MessageQueue) Close()

Close closes each channel that's been created, and then closes the connection to the underlying RabbitMQ message service.

func (*MessageQueue) Connect

func (mq *MessageQueue) Connect(username string, password string, host string, port string) error

Connect creates a connection to a RabbitMQ service.

func (*MessageQueue) ConsumeFromQueue

func (mq *MessageQueue) ConsumeFromQueue(chID lanternmq.ChannelID, qName string) (lanternmq.Messages, error)

ConsumeFromQueue opens a receive channel for amqp.Delivery objects for the queue with name 'qName' over the channel with ID 'chID'. ConsumeFromQueue wraps amqp.Delivery in a lanternmq.Messages object. ConsumeFromQueue creates the receive channel using the RabbitMQ Consume method with the following arguments: queue: qName consumer: "" autoAck: false exclusive: false noLocal: false noWait: false args: nil

func (*MessageQueue) CreateChannel

func (mq *MessageQueue) CreateChannel() (lanternmq.ChannelID, error)

CreateChannel creates a channel to the RabbitMQ service that has already been connected to. If the RabbitMQ service has not been connected to already, an error is thrown. The channel's ID is returned.

func (*MessageQueue) DeclareExchange

func (mq *MessageQueue) DeclareExchange(chID lanternmq.ChannelID, name string, exchangeType string) error

DeclareExchange creates a target named 'name' and exchangeType 'exchangeType' over the channel with ID 'chID'. It uses RabbitMQ's ExchangeDeclare method with the following arguments: name: name kind: exchangeType durable: true autoDelete: false internal: false noWait: false args: nil

func (*MessageQueue) DeclareExchangeReceiveQueue

func (mq *MessageQueue) DeclareExchangeReceiveQueue(chID lanternmq.ChannelID, exchangeName string, qName string, routingKey string) error

DeclareExchangeReceiveQueue creates a queue named 'qName' to receive messages from exchange named 'exchangeName' with routing key 'routingKey' over the channel with ID 'chID'. It uses the RabbitMQ method QueueDeclare with the following arguments: name: qName durable: false autoDelete: false exclusive: true noWait: false args: nil

It then calls the RabbitMQ method QueueBind with the following arguments: name: qName key: routingKey exchange: exchangeName noWait: false args: nil

func (*MessageQueue) DeclareQueue

func (mq *MessageQueue) DeclareQueue(chID lanternmq.ChannelID, qName string) error

DeclareQueue creates a queue with the given name on the given channel using RabbitMQ's QueueDeclare method with the following arguments: * name: qName * durable: true * autoDelete: false * exclusive: false * noWait: false * args: nil

func (*MessageQueue) NumConcurrentMsgs

func (mq *MessageQueue) NumConcurrentMsgs(chID lanternmq.ChannelID, num int) error

NumConcurrentMsgs defines how many messages the user can process from the channel at one time.

func (*MessageQueue) ProcessMessages

func (mq *MessageQueue) ProcessMessages(ctx context.Context, msgs lanternmq.Messages, handler lanternmq.MessageHandler, args *map[string]interface{}, errs chan<- error)

ProcessMessages takes 'msgs', which wraps a receive channel for amqp.Delivery objects, and processes each Delivery object by retrieving the message from the Delivery object and providing that along with 'args' to the lanternmq.MessageHandler 'handler'. An acknowledgement is sent to the sender after each message is processed. If there's an error processing a message, the error is sent to the 'errs' channel. ProcessMessages should be called as a goroutine. Example:

go mq.ProcessMessages(msgs, handler, nil, errs)

func (*MessageQueue) PublishToExchange

func (mq *MessageQueue) PublishToExchange(chID lanternmq.ChannelID, name string, routingKey string, message string) error

PublishToExchange sends 'message' to the exchange 'name' over the channel with ID 'chID' with routing key 'routingKey'. It uses RabbitMQ's Publish method with the following arguments: exchange: name key: routingKey mandatory: false immediate: false publishing:

ContentType: "text/plain"
Body: []byte(message)

func (*MessageQueue) PublishToQueue

func (mq *MessageQueue) PublishToQueue(chID lanternmq.ChannelID, qName string, message string) error

PublishToQueue publishes 'message' on the queue with name 'qName' over the channenl with ID 'chID' by calling the RabbitMQ Publish method with the following arguments: exchange: "" key: qName mandatory: false immediate: false publishing:

DeliveryMode: amqp.Persistent
ContentType: "text/plain"
Body: []byte(message)

func (*MessageQueue) QueueExists

func (mq *MessageQueue) QueueExists(chID lanternmq.ChannelID, qName string) (bool, error)

QueueExists checks whether or not a queue already exists. If so, it returns (true, nil). If not, it returns (false, nil). If an error is encountered, it returns (false, err).

type Messages

type Messages struct {
	// contains filtered or unexported fields
}

Messages wraps the delivery channel.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL