amqp

package module
v1.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 2, 2024 License: MIT Imports: 6 Imported by: 0

README

AMQP Interface

This library is a wrapper around the streadway AMQP library that provides an interface for a relatively simple set of use cases.

The library is designed to be resilient and self-healing after failures. It will automatically reconnect to the underlying AMQP host if it is disconnected.

Consuming

This library assumes that the user wants to consume basic messages from an AMQP exchange. The following conditions are assumed:

  • Messages will be consumed and immediately acknowledged.
  • Messages bodies represent JSON
  • A single queue, with a single routing-key to an exchange is sufficient
  • If the required AMQP exchange does not exist, it will be created

The consumer library uses the Observer pattern to allow messages retrieved from an AMQP queue to be published to the caller. The caller specifies a function that will be called every time a message is received. Design note: A go channel could have been used to communicate messages to the caller. This option was not taken due to the semantics around queue handling, blocking, etc. By calling a function, the onus is on the caller to handle the dynamics of processing the message or potentially queuing it without the library worrying about blocking.

The library will block until the message processing function returns.

Usage

The following example shows how to connect to and use the AMQP library to consume messages.

// Process each incoming message
func ProcessMessage(message *amqp.Payload) (err error)  {
  fmt.Println(string(message.Body))
}

// Create a new consumer and start a goroutine that will continuously listen for incoming messages
consumer = amqp.NewConsumer(broker, ExchangeType, AMQPQueueName, BindingKey)
consumer.SetObserver(ProcessMessage)
consumer.Start()

// Block forever
<-make(chan bool)

Producing

This library assumes that the user wants to publish basic messages to an AMQP exchange. The following conditions are assumed:

  • Messages will be converted to JSON for publishing
  • Messages are transient
  • Messages do not have a priority
  • Messages support a routing key
  • No additional AMQP headers will be added to the outgoing message
Usage
output := make(chan amqp.AMQPPayload)

 producer := amqp.NewProducer(amqp.ProducerConfig{
  Source:            "myapp",
  ExchangeName:      "myexchange",
  ExchangeType:      "topic",
  PublishingChannel: output,
 })

// Create a goroutine that will listen for messages to be published to AMQP
go producer.Start()

// Send a message
output <- amqp.AMQPPayload{
     RoutingKey: "mykey.mydestination",
     Body:       "sample payload",
    }

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type AMQPPayload

type AMQPPayload struct {
	RoutingKey string
	Body       string
	Headers    *amqp.Table
}

AMQPPayload represents a single message to be published to an AMQP queue

type BrokerConfig

type BrokerConfig struct {
	Host              string
	VHost             string
	Port              string
	User              string
	Password          string
	ConsumerName      string
	ExchangeName      string
	ConnectionTimeout string
}

BrokerConfig contains the fundamental configuration values required to connect to an AMQP host.

type Consumer

type Consumer struct {
	LastHeartbeat time.Time
	// contains filtered or unexported fields
}

Consumer refers to a connection to an AMQP instance that reads messages.

func NewConsumer

func NewConsumer(broker BrokerConfig, exchangeType string, queueName string, bindingKeys []string) *Consumer

NewConsumer creates a new, initialized instance of an AMQP message consumer using the configuration information supplied in config. New incoming AMQP messages are sent to the handler function provided.

func (*Consumer) SetAutoDelete added in v1.2.0

func (c *Consumer) SetAutoDelete(value bool)

SetAutoDelete marks the queue so that RabbitMQ will automatically delete the queue when the client disconnects.

func (*Consumer) SetDurable added in v1.2.0

func (c *Consumer) SetDurable(value bool)

SetDurable marks the queue to determine if the values stored in the queue will persist across restarts

func (*Consumer) SetExclusive added in v1.2.0

func (c *Consumer) SetExclusive(value bool)

func (*Consumer) SetObserver added in v1.0.2

func (c *Consumer) SetObserver(obs MessageProcessor)

SetObserver provides a function to be called whenever a message is received from AMQP

func (*Consumer) SetQueueTTL added in v1.2.1

func (c *Consumer) SetQueueTTL(value int)

SetQueueTTL sets the time to live for the queue in milliseconds

func (*Consumer) SetSource

func (c *Consumer) SetSource(source string)

SetSource sets a prefix that is appended to any log messages generated by the consumer

func (*Consumer) Start

func (c *Consumer) Start()

Start connects to a previously defined AMQP queue and begins consuming messages. All messages consumed are sent to the MessageProcessor provided in NewConsumer

This call blocks until a panic occurs.

type IConsumer added in v1.0.2

type IConsumer interface {
	// Start instructs the consumer to begin listening for and consuming inforation from AMQP
	Start()

	// SetObserver provides a function that will be called every time the consumer receives a new message
	SetObserver(obs MessageProcessor)
}

Consumer is an interface specification that defines the operations that can be performed on an AMQP consumer. The basic consumer consists of a goroutine that actively retrieves messages from AMQP and an observer function that is called for every message retrieved.

type MessageProcessor

type MessageProcessor func(message *Payload) error

MessageProcessor is a type of function that is called whenever a message is received from an AMQP queue.

type Payload added in v1.0.1

type Payload struct {
	ContentType     string
	ContentEncoding string
	CorrelationId   string
	Timestamp       time.Time
	AppId           string
	Exchange        string
	RoutingKey      string
	Headers         amqp.Table
	Body            []byte
}

Payload represents the contents of a message retrieved from an AMQP queue.

type Producer

type Producer struct {
	LastHeartbeat time.Time
	// contains filtered or unexported fields
}

Producer represents a user that produces messages to be published to an AMQP queue.

func NewProducer

func NewProducer(broker BrokerConfig, config ProducerConfig) *Producer

NewProducer creates a new, initialized instance of AMQPPublsher

func (*Producer) IsConnected

func (p *Producer) IsConnected() bool

IsConnected returns true if the publisher is connected to an AMQP instance and false otherwise

func (*Producer) Start

func (p *Producer) Start()

Start will connect to AMQP and start a loop to publish any message retrieved from the input channel

type ProducerConfig

type ProducerConfig struct {
	Source            string
	ExchangeName      string
	ExchangeType      string
	PublishingChannel chan AMQPPayload
}

ProducerConfig is configuration that is specific to outgoing connections to an AMQP host

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL