Documentation ¶
Index ¶
- type MessageQueue
- func (mq *MessageQueue) Close()
- func (mq *MessageQueue) Connect(username string, password string, host string, port string) error
- func (mq *MessageQueue) ConsumeFromQueue(chID lanternmq.ChannelID, qName string) (lanternmq.Messages, error)
- func (mq *MessageQueue) CreateChannel() (lanternmq.ChannelID, error)
- func (mq *MessageQueue) DeclareExchange(chID lanternmq.ChannelID, name string, exchangeType string) error
- func (mq *MessageQueue) DeclareExchangeReceiveQueue(chID lanternmq.ChannelID, exchangeName string, qName string, routingKey string) error
- func (mq *MessageQueue) DeclareQueue(chID lanternmq.ChannelID, qName string) error
- func (mq *MessageQueue) NumConcurrentMsgs(chID lanternmq.ChannelID, num int) error
- func (mq *MessageQueue) ProcessMessages(ctx context.Context, msgs lanternmq.Messages, handler lanternmq.MessageHandler, ...)
- func (mq *MessageQueue) PublishToExchange(chID lanternmq.ChannelID, name string, routingKey string, message string) error
- func (mq *MessageQueue) PublishToQueue(chID lanternmq.ChannelID, qName string, message string) error
- func (mq *MessageQueue) QueueExists(chID lanternmq.ChannelID, qName string) (bool, error)
- type Messages
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type MessageQueue ¶
type MessageQueue struct {
// contains filtered or unexported fields
}
MessageQueue is a wrapper around RabbitMQ's implementation of queues and implements the lanternmq.MessageQueue interface. It allows the user to: * connect to a queueing service * create a channel for that queueing service * state how many messages a receiver can process at one time * declare a durable queue, and send and receive from that queue * declare a durable exchange, and send and receive from that exchange
- potential exchange options are: 'direct', 'topic', 'headers', and 'fanout'
* close the MessageQueue, which includes closing all channels and the connection to the underlying service.
func (*MessageQueue) Close ¶
func (mq *MessageQueue) Close()
Close closes each channel that's been created, and then closes the connection to the underlying RabbitMQ message service.
func (*MessageQueue) ConsumeFromQueue ¶
func (mq *MessageQueue) ConsumeFromQueue(chID lanternmq.ChannelID, qName string) (lanternmq.Messages, error)
ConsumeFromQueue opens a receive channel for amqp.Delivery objects for the queue with name 'qName' over the channel with ID 'chID'. ConsumeFromQueue wraps amqp.Delivery in a lanternmq.Messages object. ConsumeFromQueue creates the receive channel using the RabbitMQ Consume method with the following arguments: queue: qName consumer: "" autoAck: false exclusive: false noLocal: false noWait: false args: nil
func (*MessageQueue) CreateChannel ¶
func (mq *MessageQueue) CreateChannel() (lanternmq.ChannelID, error)
CreateChannel creates a channel to the RabbitMQ service that has already been connected to. If the RabbitMQ service has not been connected to already, an error is thrown. The channel's ID is returned.
func (*MessageQueue) DeclareExchange ¶
func (mq *MessageQueue) DeclareExchange(chID lanternmq.ChannelID, name string, exchangeType string) error
DeclareExchange creates a target named 'name' and exchangeType 'exchangeType' over the channel with ID 'chID'. It uses RabbitMQ's ExchangeDeclare method with the following arguments: name: name kind: exchangeType durable: true autoDelete: false internal: false noWait: false args: nil
func (*MessageQueue) DeclareExchangeReceiveQueue ¶
func (mq *MessageQueue) DeclareExchangeReceiveQueue(chID lanternmq.ChannelID, exchangeName string, qName string, routingKey string) error
DeclareExchangeReceiveQueue creates a queue named 'qName' to receive messages from exchange named 'exchangeName' with routing key 'routingKey' over the channel with ID 'chID'. It uses the RabbitMQ method QueueDeclare with the following arguments: name: qName durable: false autoDelete: false exclusive: true noWait: false args: nil
It then calls the RabbitMQ method QueueBind with the following arguments: name: qName key: routingKey exchange: exchangeName noWait: false args: nil
func (*MessageQueue) DeclareQueue ¶
func (mq *MessageQueue) DeclareQueue(chID lanternmq.ChannelID, qName string) error
DeclareQueue creates a queue with the given name on the given channel using RabbitMQ's QueueDeclare method with the following arguments: * name: qName * durable: true * autoDelete: false * exclusive: false * noWait: false * args: nil
func (*MessageQueue) NumConcurrentMsgs ¶
func (mq *MessageQueue) NumConcurrentMsgs(chID lanternmq.ChannelID, num int) error
NumConcurrentMsgs defines how many messages the user can process from the channel at one time.
func (*MessageQueue) ProcessMessages ¶
func (mq *MessageQueue) ProcessMessages(ctx context.Context, msgs lanternmq.Messages, handler lanternmq.MessageHandler, args *map[string]interface{}, errs chan<- error)
ProcessMessages takes 'msgs', which wraps a receive channel for amqp.Delivery objects, and processes each Delivery object by retrieving the message from the Delivery object and providing that along with 'args' to the lanternmq.MessageHandler 'handler'. An acknowledgement is sent to the sender after each message is processed. If there's an error processing a message, the error is sent to the 'errs' channel. ProcessMessages should be called as a goroutine. Example:
go mq.ProcessMessages(msgs, handler, nil, errs)
func (*MessageQueue) PublishToExchange ¶
func (mq *MessageQueue) PublishToExchange(chID lanternmq.ChannelID, name string, routingKey string, message string) error
PublishToExchange sends 'message' to the exchange 'name' over the channel with ID 'chID' with routing key 'routingKey'. It uses RabbitMQ's Publish method with the following arguments: exchange: name key: routingKey mandatory: false immediate: false publishing:
ContentType: "text/plain" Body: []byte(message)
func (*MessageQueue) PublishToQueue ¶
func (mq *MessageQueue) PublishToQueue(chID lanternmq.ChannelID, qName string, message string) error
PublishToQueue publishes 'message' on the queue with name 'qName' over the channenl with ID 'chID' by calling the RabbitMQ Publish method with the following arguments: exchange: "" key: qName mandatory: false immediate: false publishing:
DeliveryMode: amqp.Persistent ContentType: "text/plain" Body: []byte(message)
func (*MessageQueue) QueueExists ¶
QueueExists checks whether or not a queue already exists. If so, it returns (true, nil). If not, it returns (false, nil). If an error is encountered, it returns (false, err).