Documentation ¶
Index ¶
- Constants
- func AssertUniqueQueues(log logrus.Ext1FieldLogger, confs ...config.Config)
- func AutoSetup(log logrus.Ext1FieldLogger, queues ...AMQPService)
- func BindQueuesToExchange(log logrus.Ext1FieldLogger, queues ...AMQPService)
- func CreateMessage(body interface{}) (amqp.Publishing, error)
- func GetKickbackMessage(maxRetries int64, msg amqp.Delivery) (amqp.Publishing, error)
- func GetNextMessage(msg amqp.Delivery, body interface{}) (amqp.Publishing, error)
- func OpenAMQPConnection(conf config.Endpoint) (*amqp.Connection, error)
- func TryCreateQueues(log logrus.Ext1FieldLogger, queues ...AMQPService)
- type AMQPMessage
- type AMQPRepository
- type AMQPService
Constants ¶
const RetryCountHeader = "retryCount"
RetryCountHeader is the header for the retry count of the amqp message
Variables ¶
This section is empty.
Functions ¶
func AssertUniqueQueues ¶ added in v1.2.0
func AssertUniqueQueues(log logrus.Ext1FieldLogger, confs ...config.Config)
AssertUniqueQueues ensures that the configurations consists of unique queues
func AutoSetup ¶ added in v1.3.0
func AutoSetup(log logrus.Ext1FieldLogger, queues ...AMQPService)
AutoSetup calls TryCreateQueues and then BindQueuesToExchange
func BindQueuesToExchange ¶ added in v1.3.0
func BindQueuesToExchange(log logrus.Ext1FieldLogger, queues ...AMQPService)
BindQueuesToExchange binds to queues to the exchange, if it is not the default exchange, with the queue name as the routing key
func CreateMessage ¶ added in v1.1.0
func CreateMessage(body interface{}) (amqp.Publishing, error)
CreateMessage creates a message from the given body
func GetKickbackMessage ¶ added in v1.1.0
GetKickbackMessage takes the delivery and creates a message from it for requeuing on non-fatal error. It returns an error if the number of retries is exceeded
func GetNextMessage ¶ added in v1.1.0
func GetNextMessage(msg amqp.Delivery, body interface{}) (amqp.Publishing, error)
GetNextMessage is similar to GetKickbackMessage but takes in a new body, and does not increment the retry count
func OpenAMQPConnection ¶
func OpenAMQPConnection(conf config.Endpoint) (*amqp.Connection, error)
OpenAMQPConnection attempts to dial a new AMQP connection
func TryCreateQueues ¶ added in v1.1.3
func TryCreateQueues(log logrus.Ext1FieldLogger, queues ...AMQPService)
TryCreateQueues atempts to create the given queues, but doesn't error out if it fails
Types ¶
type AMQPMessage ¶
type AMQPMessage interface { CreateMessage(body interface{}) (amqp.Publishing, error) // GetKickbackMessage takes the delivery and creates a message from it // for requeuing on non-fatal error GetKickbackMessage(msg amqp.Delivery) (amqp.Publishing, error) GetNextMessage(msg amqp.Delivery, body interface{}) (amqp.Publishing, error) }
AMQPMessage contains utilities for manipulating AMQP messages
func NewAMQPMessage ¶
func NewAMQPMessage(maxRetries int64) AMQPMessage
NewAMQPMessage creates a new AMQPMessage
type AMQPRepository ¶
type AMQPRepository interface { GetChannel() (externals.AMQPChannel, error) RejectDelivery(msg externals.AMQPDelivery, requeue bool) error SwapConn(conn externals.AMQPConnection) AddListeners(closeChan chan *amqp.Error, blockChan chan amqp.Blocking) }
AMQPRepository represents functions for connecting to a AMQP provider
func NewAMQPRepository ¶
func NewAMQPRepository(conn externals.AMQPConnection) AMQPRepository
NewAMQPRepository creates a new AMQPRepository
type AMQPService ¶
type AMQPService interface { //Consume immediately starts delivering queued messages. Consume() (<-chan amqp.Delivery, error) //send places a message into the queue Send(pub amqp.Publishing) error //Requeue rejects the oldMsg and queues the newMsg in a transaction Requeue(oldMsg amqp.Delivery, newMsg amqp.Publishing) error //CreateQueue attempts to publish a queue CreateQueue() error // CreateExchange will attempt to create an exchange CreateExchange() error // Channel gets a channel Channel() (externals.AMQPChannel, error) Config() config.Config }
AMQPService acts as a simple interface to the command queue
func NewAMQPService ¶
func NewAMQPService( conf config.Config, repo AMQPRepository, log logrus.Ext1FieldLogger) AMQPService
NewAMQPService creates a new AMQPService
func NewService ¶ added in v1.1.2
func NewService(conf config.Config, log logrus.Ext1FieldLogger) AMQPService
NewService is a more convienent form of NewAMQPService. It calls log.Panic on error