broker

package module
v0.0.23 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 10, 2021 License: MIT Imports: 6 Imported by: 0

README

Broker

Offers an interface over basic message queue options. Broker supports AWS SQS and a dead-simple, slice-based in-memory broker.

broker.go

This section summarizes the important types defined in broker.go.

type QueueDefinition

QueueDefinition houses configuration needed to create a queue. It is agnostic to underlying provider, but should contain all of the information needed to uniquely identify a queue on any provider.

Queue interface

Queue provides us with a way to communicate with a message queue.

MessageBroker interface

MessageBroker is an interface used to communicate with a message broker. It makes the call to create a queue given a QueueDefinition.

QueueManager

QueueManager is a struct that makes managing queues easier. It contains a mix of things - for example, it contains QueueDefinitions, but also contains the queues themselves. It also contains prometheus gauges and callbacks. As such, there should only be one instance of QueueManager per client.

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrQueueNoMessages = errors.New("no messages")

ErrQueueNoMessages indicates that there were no messages in the queue.

View Source
var ErrQueuePollError = errors.New("poll error")

ErrQueuePollError indicates a temporary problem during queue polling.

Functions

func DLQCanonicalName

func DLQCanonicalName(prefix string, name QueueName) string

DLQCanonicalName is the full name, shortnum with prefix as its known by the broker.

Types

type MessageBroker

type MessageBroker interface {
	// CreateQueue creates the specified queue if it does not exist or
	// returns the existing queue with the name specified in the
	// QueueDefinition. If a dead letter queue has been specified for
	// the queue, it will be returned as the second queue in returned
	// queue slice.
	CreateQueue(qd *QueueDefinition) ([]Queue, error)
}

MessageBroker is an interface used to interact with a message broker.

type MessageConsumer

type MessageConsumer interface {
	// HandleMessage processes the message. If true is returned, the
	// message is acknowledged and removed from the message queue. If false
	// is returned, this message will reappear on the message queue.
	Consume(qm *QueueManager, q Queue, msg ReceiveMessage) bool
}

MessageConsumer is an interface for processing messages received from the broker.

type MessageJSON

type MessageJSON struct {
	GroupID *string
	V       interface{}
}

MessageJSON is a SendMessage implementation for JSON messages

func (*MessageJSON) GroupId added in v0.0.12

func (m *MessageJSON) GroupId() *string

GroupId returns a pointer to the GroupID of this message, or nil if it hasn't been set

func (*MessageJSON) Marshal

func (m *MessageJSON) Marshal() ([]byte, error)

Marshal implements the SendMessage implementation and converts the message payload to json or returns an error.

type OnQueueCreation

type OnQueueCreation func(qm *QueueManager, q Queue, name QueueName)

type Queue

type Queue interface {
	SendOne(SendMessage) error
	// SendMany sends multiple message to the queue.
	SendMany([]SendMessage) error
	// Receive a message from the queue. Don't forget to call the Message's
	// Ack() method once the message has been handled!
	ReceiveOne() (ReceiveMessage, error)
	// Put this ReceiveMessage back on the queue
	Requeue(ReceiveMessage) error
	// Fetch metadata by name from broker.
	FetchAttributes([]string) (map[string]string, error)
	// Remove this queue from the broker. (For testing CreateQueue)
	Delete() error
	// Pretty print the name of the queue
	String() string
	// MoveMessages moves the contents of one queue to another. It returns
	// the number of messages moved.
	MoveMessages(Queue) (int, error)
	// GetApproximateNumberOfMessages
	GetApproximateNumberOfMessages() (*int, error)
}

Queue is a simplified interface for acting on a message queue.

type QueueDefinition

type QueueDefinition struct {
	// The name of the queue minus the prefix. This is how we refer
	// to the queue in the code and from the command line.
	Name QueueName
	// The name of the queue as it exists on the message broker. This
	// is the name plus a prefix.
	CanonicalName string
	// Queue configuration options
	Options map[string]string
	// Queue (redrive) policy options
	Policy map[string]string
	// Tags are key value pairs that are not used internally, rather they're
	// meant to allow consumers to easily annotate for classification or configuration
	Tags map[string]string

	// If specified, this is the name of the dead letter queue as it exists
	// on the broker. If set, this queue will be created and configured as
	// a dead letter queue for this queue. In addition, any policy options
	// in Policy will be applied to the queue during queue creation.
	DLQ *QueueDefinition
	// if enabled the contents of the message will be compressed while
	// stored on the queue. Message will still be received uncompressed.
	UseGzip bool
	// contains filtered or unexported fields
}

QueueDefinition houses all the configuration needed to create a queue.

func NewQueueDefinition

func NewQueueDefinition(name QueueName, prefix string,
	consumer MessageConsumer, opts ...QueueDefinitionOption) *QueueDefinition

NewQueueDefinition is the QueueDefinition constructor.

type QueueDefinitionOption

type QueueDefinitionOption func(*QueueDefinition)

QueueDefinitionOption is a functional option that can modify a QueueDefinition. Used to create a more readable constructor.

func WithDLQ

WithDLQ configures the queue with a dead letter queue.

func WithGzip

func WithGzip() QueueDefinitionOption

WithGzip enables transparent message compression and decompression.

func WithQueueOptions

func WithQueueOptions(opts map[string]string) QueueDefinitionOption

WithQueueOptions adds broker options to the QueueDefinition.

func WithQueuePolicy

func WithQueuePolicy(policy map[string]string) QueueDefinitionOption

WithQueuePolicy adds broker policy options to the QueueDefinition.

func WithTags added in v0.0.13

func WithTags(tags map[string]string) QueueDefinitionOption

WithTags tags the queue with external properties

type QueueManager

type QueueManager struct {
	// contains filtered or unexported fields
}

QueueManager (queue manager) simplifies queue access and queue creation.

func NewQueueManager

func NewQueueManager(qDefs []*QueueDefinition, mb MessageBroker, opts ...QueueManagerOption) *QueueManager

NewQueueManager is the QueueManager constructor.

func (*QueueManager) Delete

func (qm *QueueManager) Delete(name QueueName) error

Delete removes the named queue from the broker and removes its the queue from its cache.

func (*QueueManager) Get

func (qm *QueueManager) Get(name QueueName) Queue

Get returns the requested queue or dies trying.

func (*QueueManager) GetByNameString

func (qm *QueueManager) GetByNameString(name string) Queue

GetByNameString returns the Queue using the string name of the Queue. Will halt program execution if an invalid queue is given (a queue without a QueueDefinition)

func (*QueueManager) GetDLQName

func (qm *QueueManager) GetDLQName(name QueueName) QueueName

GetDLQName returns the QueueName of the dead-letter queue configured for the specified queue.

func (*QueueManager) GetQDef

func (qm *QueueManager) GetQDef(name QueueName) *QueueDefinition

GetQDef returns the QueueDefintions of the named queue or dies trying. This does not require that the queue be created first.

func (*QueueManager) Process

func (qm *QueueManager) Process(q Queue, opts *QueueProcessingOptions)

Process receives messages from the the message broker and passes each one to the message consumer defined for each queue in its QueueDefinition. Thispongs chan<- string method will run until the StopReceive method is called on the queue.

func (*QueueManager) ProcessOne

func (qm *QueueManager) ProcessOne(q Queue)

ProcessOne fetches one message residing in the queue, processes it and returns. It's meant for use in tests. Simulates how it's used in tests.

func (*QueueManager) QueueName

func (qm *QueueManager) QueueName(name string) QueueName

QueueName returns the QueueName that matches the string or dies trying.

func (*QueueManager) SendMany added in v0.0.8

func (qm *QueueManager) SendMany(queueName QueueName, msgs []SendMessage) error

func (*QueueManager) SendOne

func (qm *QueueManager) SendOne(queueName QueueName, msg SendMessage) error

SendOne sends a message to the specified queue.

func (*QueueManager) TestQueuesCreate

func (qm *QueueManager) TestQueuesCreate()

TestQueuesCreate intializes all the queues defined in the queue definitions held by the QueueManager.

func (*QueueManager) TestQueuesDestroy

func (qm *QueueManager) TestQueuesDestroy()

TestQueuesDestroy removes the state of the queues from the broker so that new queues can be created/initialized.

type QueueManagerOption

type QueueManagerOption func(*QueueManager)

QueueDefinitionOption is a functional option that can modify a QueueManager.

func WithDefaultConsumer

func WithDefaultConsumer(consumer MessageConsumer) QueueManagerOption

WithDefaultConsumer specifies a message consumer to use if one was not specified in the QueueDefinition.

func WithOnQueueCreationCallback

func WithOnQueueCreationCallback(cb OnQueueCreation) QueueManagerOption

WithOnQueueCreationCallback adds a callback to be called when queues are created.

type QueueName

type QueueName string

QueueName is the name of the queue as used by the developer when specifying queue names to process from the command line. They are prepended by a string (environemnt or username) when created or accessed by MessageBroker implementations.

func (QueueName) String

func (qn QueueName) String() string

This is how we refer to the queue in log messages and on the command line. The actual name of the queue is a combination of strings, depending on environment, etc.

type QueueProcessingOptions

type QueueProcessingOptions struct {
	// amount of parallelism desired (number of consumer go routines)
	Parallelism int
	// consumer of broker library (controller) will pass a message to stop
	Stop chan bool
	// signal to controller that it's safe to exit
	Done chan bool
	// if true, will requeue this message on SIGTERM
	RequeueOnSigterm bool
}

QueueProcessingOptions controls how the queue is processed.

func NewQueueProcessingOptions

func NewQueueProcessingOptions(parallelism int) *QueueProcessingOptions

NewQueueProcessingOptions constructs queue processing options so we do not have to duplicate it.

type ReceiveMessage

type ReceiveMessage interface {
	// Deletes the from the queue. This *must be done* or the
	// the message will reappear on the queue.
	Ack() error
	// Payload returns the message as it was received into v
	Unmarshal(v interface{}) error
	// Returns the unique identifier representing the message.
	ID() string
	// Sets the number of seconds this message should be invisible to other consumers
	// This overrides the default visibility set on the queue
	SetVisibilityTimeout(n int) error
}

ReceiveMessage is the abstraction of the message that sits on the queue.

type SendMessage

type SendMessage interface {
	// Marshal serializes the payload for transport to the message queue.
	Marshal() ([]byte, error)

	// GroupId returns the group that this message belongs to
	GroupId() *string
}

SendMessage is the abstraction of the message that is provided to the broker.

Directories

Path Synopsis
lib

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL