queue

package module
v1.3.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 3, 2020 License: MIT Imports: 9 Imported by: 0

Documentation

Index

Constants

View Source
const RetryCountHeader = "retryCount"

RetryCountHeader is the header for the retry count of the amqp message

Variables

This section is empty.

Functions

func AssertUniqueQueues added in v1.2.0

func AssertUniqueQueues(log logrus.Ext1FieldLogger, confs ...config.Config)

AssertUniqueQueues ensures that the configurations consists of unique queues

func AutoSetup added in v1.3.0

func AutoSetup(log logrus.Ext1FieldLogger, queues ...AMQPService)

AutoSetup calls TryCreateQueues and then BindQueuesToExchange

func BindQueuesToExchange added in v1.3.0

func BindQueuesToExchange(log logrus.Ext1FieldLogger, queues ...AMQPService)

BindQueuesToExchange binds to queues to the exchange, if it is not the default exchange, with the queue name as the routing key

func CreateMessage added in v1.1.0

func CreateMessage(body interface{}) (amqp.Publishing, error)

CreateMessage creates a message from the given body

func GetKickbackMessage added in v1.1.0

func GetKickbackMessage(maxRetries int64, msg amqp.Delivery) (amqp.Publishing, error)

GetKickbackMessage takes the delivery and creates a message from it for requeuing on non-fatal error. It returns an error if the number of retries is exceeded

func GetNextMessage added in v1.1.0

func GetNextMessage(msg amqp.Delivery, body interface{}) (amqp.Publishing, error)

GetNextMessage is similar to GetKickbackMessage but takes in a new body, and does not increment the retry count

func OpenAMQPConnection

func OpenAMQPConnection(conf config.Endpoint) (*amqp.Connection, error)

OpenAMQPConnection attempts to dial a new AMQP connection

func TryCreateQueues added in v1.1.3

func TryCreateQueues(log logrus.Ext1FieldLogger, queues ...AMQPService)

TryCreateQueues atempts to create the given queues, but doesn't error out if it fails

Types

type AMQPMessage

type AMQPMessage interface {
	CreateMessage(body interface{}) (amqp.Publishing, error)
	// GetKickbackMessage takes the delivery and creates a message from it
	// for requeuing on non-fatal error
	GetKickbackMessage(msg amqp.Delivery) (amqp.Publishing, error)
	GetNextMessage(msg amqp.Delivery, body interface{}) (amqp.Publishing, error)
}

AMQPMessage contains utilities for manipulating AMQP messages

func NewAMQPMessage

func NewAMQPMessage(maxRetries int64) AMQPMessage

NewAMQPMessage creates a new AMQPMessage

type AMQPRepository

type AMQPRepository interface {
	GetChannel() (externals.AMQPChannel, error)
	RejectDelivery(msg externals.AMQPDelivery, requeue bool) error
	SwapConn(conn externals.AMQPConnection)
	AddListeners(closeChan chan *amqp.Error, blockChan chan amqp.Blocking)
}

AMQPRepository represents functions for connecting to a AMQP provider

func NewAMQPRepository

func NewAMQPRepository(conn externals.AMQPConnection) AMQPRepository

NewAMQPRepository creates a new AMQPRepository

type AMQPService

type AMQPService interface {
	//Consume immediately starts delivering queued messages.
	Consume() (<-chan amqp.Delivery, error)
	//send places a message into the queue
	Send(pub amqp.Publishing) error
	//Requeue rejects the oldMsg and queues the newMsg in a transaction
	Requeue(oldMsg amqp.Delivery, newMsg amqp.Publishing) error
	//CreateQueue attempts to publish a queue
	CreateQueue() error

	// CreateExchange will attempt to create an exchange
	CreateExchange() error

	// Channel gets a channel
	Channel() (externals.AMQPChannel, error)

	Config() config.Config
}

AMQPService acts as a simple interface to the command queue

func NewAMQPService

func NewAMQPService(
	conf config.Config,
	repo AMQPRepository,
	log logrus.Ext1FieldLogger) AMQPService

NewAMQPService creates a new AMQPService

func NewService added in v1.1.2

func NewService(conf config.Config, log logrus.Ext1FieldLogger) AMQPService

NewService is a more convienent form of NewAMQPService. It calls log.Panic on error

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL