Documentation ¶
Index ¶
- Variables
- func DLQCanonicalName(prefix string, name QueueName) string
- type MessageBroker
- type MessageConsumer
- type MessageJSON
- type OnQueueCreation
- type Queue
- type QueueDefinition
- type QueueDefinitionOption
- type QueueManager
- func (qm *QueueManager) Delete(name QueueName) error
- func (qm *QueueManager) Get(name QueueName) Queue
- func (qm *QueueManager) GetByNameString(name string) Queue
- func (qm *QueueManager) GetDLQName(name QueueName) QueueName
- func (qm *QueueManager) GetQDef(name QueueName) *QueueDefinition
- func (qm *QueueManager) Process(q Queue, opts *QueueProcessingOptions)
- func (qm *QueueManager) ProcessOne(q Queue)
- func (qm *QueueManager) QueueName(name string) QueueName
- func (qm *QueueManager) SendMany(queueName QueueName, msgs []SendMessage) error
- func (qm *QueueManager) SendOne(queueName QueueName, msg SendMessage) error
- func (qm *QueueManager) TestQueuesCreate()
- func (qm *QueueManager) TestQueuesDestroy()
- type QueueManagerOption
- type QueueName
- type QueueProcessingOptions
- type ReceiveMessage
- type SendMessage
Constants ¶
This section is empty.
Variables ¶
var ErrQueueNoMessages = errors.New("no messages")
ErrQueueNoMessages indicates that there were no messages in the queue.
var ErrQueuePollError = errors.New("poll error")
ErrQueuePollError indicates a temporary problem during queue polling.
Functions ¶
func DLQCanonicalName ¶
DLQCanonicalName is the full name, shortnum with prefix as its known by the broker.
Types ¶
type MessageBroker ¶
type MessageBroker interface { // CreateQueue creates the specified queue if it does not exist or // returns the existing queue with the name specified in the // QueueDefinition. If a dead letter queue has been specified for // the queue, it will be returned as the second queue in returned // queue slice. CreateQueue(qd *QueueDefinition) ([]Queue, error) }
MessageBroker is an interface used to interact with a message broker.
type MessageConsumer ¶
type MessageConsumer interface { // HandleMessage processes the message. If true is returned, the // message is acknowledged and removed from the message queue. If false // is returned, this message will reappear on the message queue. Consume(qm *QueueManager, q Queue, msg ReceiveMessage) bool }
MessageConsumer is an interface for processing messages received from the broker.
type MessageJSON ¶
type MessageJSON struct { GroupID *string V interface{} }
MessageJSON is a SendMessage implementation for JSON messages
func (*MessageJSON) GroupId ¶ added in v0.0.12
func (m *MessageJSON) GroupId() *string
GroupId returns a pointer to the GroupID of this message, or nil if it hasn't been set
func (*MessageJSON) Marshal ¶
func (m *MessageJSON) Marshal() ([]byte, error)
Marshal implements the SendMessage implementation and converts the message payload to json or returns an error.
type OnQueueCreation ¶
type OnQueueCreation func(qm *QueueManager, q Queue, name QueueName)
type Queue ¶
type Queue interface { SendOne(SendMessage) error // SendMany sends multiple message to the queue. SendMany([]SendMessage) error // Receive a message from the queue. Don't forget to call the Message's // Ack() method once the message has been handled! ReceiveOne() (ReceiveMessage, error) // Put this ReceiveMessage back on the queue Requeue(ReceiveMessage) error // Fetch metadata by name from broker. FetchAttributes([]string) (map[string]string, error) // Remove this queue from the broker. (For testing CreateQueue) Delete() error // Pretty print the name of the queue String() string // MoveMessages moves the contents of one queue to another. It returns // the number of messages moved. MoveMessages(Queue) (int, error) // GetApproximateNumberOfMessages GetApproximateNumberOfMessages() (*int, error) }
Queue is a simplified interface for acting on a message queue.
type QueueDefinition ¶
type QueueDefinition struct { // The name of the queue minus the prefix. This is how we refer // to the queue in the code and from the command line. Name QueueName // The name of the queue as it exists on the message broker. This // is the name plus a prefix. CanonicalName string // Queue configuration options Options map[string]string // Queue (redrive) policy options Policy map[string]string // Tags are key value pairs that are not used internally, rather they're // meant to allow consumers to easily annotate for classification or configuration Tags map[string]string // If specified, this is the name of the dead letter queue as it exists // on the broker. If set, this queue will be created and configured as // a dead letter queue for this queue. In addition, any policy options // in Policy will be applied to the queue during queue creation. DLQ *QueueDefinition // if enabled the contents of the message will be compressed while // stored on the queue. Message will still be received uncompressed. UseGzip bool // contains filtered or unexported fields }
QueueDefinition houses all the configuration needed to create a queue.
func NewQueueDefinition ¶
func NewQueueDefinition(name QueueName, prefix string, consumer MessageConsumer, opts ...QueueDefinitionOption) *QueueDefinition
NewQueueDefinition is the QueueDefinition constructor.
type QueueDefinitionOption ¶
type QueueDefinitionOption func(*QueueDefinition)
QueueDefinitionOption is a functional option that can modify a QueueDefinition. Used to create a more readable constructor.
func WithDLQ ¶
func WithDLQ(d *QueueDefinition) QueueDefinitionOption
WithDLQ configures the queue with a dead letter queue.
func WithGzip ¶
func WithGzip() QueueDefinitionOption
WithGzip enables transparent message compression and decompression.
func WithQueueOptions ¶
func WithQueueOptions(opts map[string]string) QueueDefinitionOption
WithQueueOptions adds broker options to the QueueDefinition.
func WithQueuePolicy ¶
func WithQueuePolicy(policy map[string]string) QueueDefinitionOption
WithQueuePolicy adds broker policy options to the QueueDefinition.
func WithTags ¶ added in v0.0.13
func WithTags(tags map[string]string) QueueDefinitionOption
WithTags tags the queue with external properties
type QueueManager ¶
type QueueManager struct {
// contains filtered or unexported fields
}
QueueManager (queue manager) simplifies queue access and queue creation.
func NewQueueManager ¶
func NewQueueManager(qDefs []*QueueDefinition, mb MessageBroker, opts ...QueueManagerOption) *QueueManager
NewQueueManager is the QueueManager constructor.
func (*QueueManager) Delete ¶
func (qm *QueueManager) Delete(name QueueName) error
Delete removes the named queue from the broker and removes its the queue from its cache.
func (*QueueManager) Get ¶
func (qm *QueueManager) Get(name QueueName) Queue
Get returns the requested queue or dies trying.
func (*QueueManager) GetByNameString ¶
func (qm *QueueManager) GetByNameString(name string) Queue
GetByNameString returns the Queue using the string name of the Queue. Will halt program execution if an invalid queue is given (a queue without a QueueDefinition)
func (*QueueManager) GetDLQName ¶
func (qm *QueueManager) GetDLQName(name QueueName) QueueName
GetDLQName returns the QueueName of the dead-letter queue configured for the specified queue.
func (*QueueManager) GetQDef ¶
func (qm *QueueManager) GetQDef(name QueueName) *QueueDefinition
GetQDef returns the QueueDefintions of the named queue or dies trying. This does not require that the queue be created first.
func (*QueueManager) Process ¶
func (qm *QueueManager) Process(q Queue, opts *QueueProcessingOptions)
Process receives messages from the the message broker and passes each one to the message consumer defined for each queue in its QueueDefinition. Thispongs chan<- string method will run until the StopReceive method is called on the queue.
func (*QueueManager) ProcessOne ¶
func (qm *QueueManager) ProcessOne(q Queue)
ProcessOne fetches one message residing in the queue, processes it and returns. It's meant for use in tests. Simulates how it's used in tests.
func (*QueueManager) QueueName ¶
func (qm *QueueManager) QueueName(name string) QueueName
QueueName returns the QueueName that matches the string or dies trying.
func (*QueueManager) SendMany ¶ added in v0.0.8
func (qm *QueueManager) SendMany(queueName QueueName, msgs []SendMessage) error
func (*QueueManager) SendOne ¶
func (qm *QueueManager) SendOne(queueName QueueName, msg SendMessage) error
SendOne sends a message to the specified queue.
func (*QueueManager) TestQueuesCreate ¶
func (qm *QueueManager) TestQueuesCreate()
TestQueuesCreate intializes all the queues defined in the queue definitions held by the QueueManager.
func (*QueueManager) TestQueuesDestroy ¶
func (qm *QueueManager) TestQueuesDestroy()
TestQueuesDestroy removes the state of the queues from the broker so that new queues can be created/initialized.
type QueueManagerOption ¶
type QueueManagerOption func(*QueueManager)
QueueDefinitionOption is a functional option that can modify a QueueManager.
func WithDefaultConsumer ¶
func WithDefaultConsumer(consumer MessageConsumer) QueueManagerOption
WithDefaultConsumer specifies a message consumer to use if one was not specified in the QueueDefinition.
func WithOnQueueCreationCallback ¶
func WithOnQueueCreationCallback(cb OnQueueCreation) QueueManagerOption
WithOnQueueCreationCallback adds a callback to be called when queues are created.
type QueueName ¶
type QueueName string
QueueName is the name of the queue as used by the developer when specifying queue names to process from the command line. They are prepended by a string (environemnt or username) when created or accessed by MessageBroker implementations.
type QueueProcessingOptions ¶
type QueueProcessingOptions struct { // amount of parallelism desired (number of consumer go routines) Parallelism int // consumer of broker library (controller) will pass a message to stop Stop chan bool // signal to controller that it's safe to exit Done chan bool // if true, will requeue this message on SIGTERM RequeueOnSigterm bool }
QueueProcessingOptions controls how the queue is processed.
func NewQueueProcessingOptions ¶
func NewQueueProcessingOptions(parallelism int) *QueueProcessingOptions
NewQueueProcessingOptions constructs queue processing options so we do not have to duplicate it.
type ReceiveMessage ¶
type ReceiveMessage interface { // Deletes the from the queue. This *must be done* or the // the message will reappear on the queue. Ack() error // Payload returns the message as it was received into v Unmarshal(v interface{}) error // Returns the unique identifier representing the message. ID() string // Sets the number of seconds this message should be invisible to other consumers // This overrides the default visibility set on the queue SetVisibilityTimeout(n int) error }
ReceiveMessage is the abstraction of the message that sits on the queue.
type SendMessage ¶
type SendMessage interface { // Marshal serializes the payload for transport to the message queue. Marshal() ([]byte, error) // GroupId returns the group that this message belongs to GroupId() *string }
SendMessage is the abstraction of the message that is provided to the broker.