bcamqp

package module
v0.0.0-...-12bd6f7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 22, 2021 License: Apache-2.0 Imports: 6 Imported by: 0

README

bcamqp

Library for using RabbitMQ/AMQP with Go

Motivation

This library is a wrapper around the most popular client library: github.com/streadway/amqp

Its interface is designed around the protocol and therefore isn't the most intuitive to use. Additionally, it only serves primitives for you to stick together, without handling the hard bits like reconnecting.

I think a good library should provide enough abstraction and features around a protocol. This is why I built bcamqp.

Features

  • Named parameter objects instead of positional arguments
  • Abstraction around protocol primitives
  • First crude attempt at reconnection handling

Documentation

Overview

Package bcamqp is a RabbitMQ/AMQP client library.

AMQP is a messaging protocol, RabbitMQ is a server implementation of that protocol. Read more about them here: https://www.rabbitmq.com/tutorials/amqp-concepts.html

bcamqp allows your program to send and receive messages via the AMQP protocol with relative ease. It tries to abstract away the complexities and provide a clear interface following Go's usual style.

There is also basic reconnection logic in place. While the connection is down, the libary will block.

Example
broker := New(BrokerOptions{
	Encrypted:     true,
	Address:       "localhost",
	User:          "guest",
	Password:      "guest",
	AutoTimestamp: true,
})

err := broker.Connect()
if err != nil {
	log.Fatalf("connect to broker: %v", err)
}
defer broker.Close()

err = broker.DeclareExchange(ExchangeOptions{
	Name:    "example-exchange",
	Durable: false,
	Type:    Direct,
})
if err != nil {
	log.Fatalf("declare exchange: %v", err)
}

err = broker.DeclareQueue(QueueOptions{
	Name:      "example-queue",
	Durable:   false,
	Exclusive: false,
})
if err != nil {
	log.Fatalf("declare queue: %v", err)
}

err = broker.DeclareBinding(BindingOptions{
	Exchange:   "example-exchange",
	Queue:      "example-queue",
	RoutingKey: "#",
})
if err != nil {
	log.Fatalf("declare binding: %v", err)
}

err = broker.Publish(Message{
	Exchange:    "example-exchange",
	Body:        []byte("test"),
	ContentType: "text/plain",
})
if err != nil {
	log.Fatalf("publish: %v", err)
}

cons, err := broker.Consume(ConsumerOptions{
	Name:      "bcamqp-example",
	Queue:     "example-queue",
	AutoAck:   false,
	Exclusive: false,
})
if err != nil {
	log.Fatalf("consume: %v", err)
}
defer cons.Close()

for msg := range cons.Messages() {
	log.Printf(`%v message with routing key "%s": %s`, msg.Timestamp, msg.RoutingKey, msg.Body)

	err = msg.Ack()
	if err != nil {
		log.Printf("ack message: %v", err)
	}
}
Output:

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type BindingOptions

type BindingOptions struct {
	Queue      string
	RoutingKey string
	Exchange   string
}

BindingOptions holds options for binding creation

type Broker

type Broker struct {
	// contains filtered or unexported fields
}

Broker represents a logical connection to a broker

func New

func New(options BrokerOptions) *Broker

New configures a new Broker

Connect needs to be called to use the instance

func (*Broker) Close

func (b *Broker) Close() error

Close tries to gracefully shut down the broker connection

func (*Broker) Connect

func (b *Broker) Connect() error

Connect establishes a connection to the broker

func (*Broker) Consume

func (b *Broker) Consume(options ConsumerOptions) (*Consumer, error)

Consume starts a new consumer

func (*Broker) DeclareBinding

func (b *Broker) DeclareBinding(options BindingOptions) error

DeclareBinding creates a binding between an exchange and a queue

func (*Broker) DeclareExchange

func (b *Broker) DeclareExchange(options ExchangeOptions) error

DeclareExchange makes sure that there is an exchange with the specified properties on the broker

It returns an error when a exchange with the specified name already exists, but differs

func (*Broker) DeclareQueue

func (b *Broker) DeclareQueue(options QueueOptions) error

DeclareQueue makes sure that there is a queue with the specified properties on the broker

It returns an error when a queue with the specified name already exisits, but differs

func (*Broker) Publish

func (b *Broker) Publish(msg Message) error

Publish sends a message to the broker

Publishing is blocking concurrency safe

type BrokerOptions

type BrokerOptions struct {
	Encrypted     bool // Wether to use AMQPs
	Address       string
	User          string
	Password      string
	AutoTimestamp bool // Wether to set the (unset) timestamp field when publishing messages
}

BrokerOptions holds options for broker setup

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

Consumer gets messages from the broker

func (*Consumer) Close

func (c *Consumer) Close() error

Close gracefully shuts down the consumer

Trying to read from a closed consumer

func (*Consumer) ErrChan

func (c *Consumer) ErrChan() <-chan error

ErrChan returns a channel that will emit consumer exceptions

func (*Consumer) Messages

func (c *Consumer) Messages() <-chan Message

Messages returns a channel from which incoming messages can be read

type ConsumerOptions

type ConsumerOptions struct {
	Name      string // application-defined, e.g. executable name
	Queue     string
	AutoAck   bool
	Exclusive bool
}

ConsumerOptions holds options for consumer setup

type ExchangeOptions

type ExchangeOptions struct {
	Name    string
	Type    ExchangeType
	Durable bool
}

ExchangeOptions holds options for exchange creation

type ExchangeType

type ExchangeType string

ExchangeType specifies the type of the exchange to be created

const (
	Direct  ExchangeType = "direct"
	Fanout  ExchangeType = "fanout"
	Topic   ExchangeType = "topic"
	Headers ExchangeType = "headers"
)

Lists the different exchange types

type Message

type Message struct {
	Exchange      string
	RoutingKey    string
	Body          []byte
	Headers       map[string]interface{}
	Timestamp     time.Time // application-defined, may be set to any value
	ContentType   string
	CorrelationID string
	ReplyTo       string
	Expiration    time.Duration
	// contains filtered or unexported fields
}

Message is an AMQP message entity

These fields are part of the AMQP standard or RabbitMQ extensions

func (*Message) Ack

func (m *Message) Ack() error

Ack acknowledges the message, implying that it was processed correctly and completely

func (*Message) Nack

func (m *Message) Nack(requeue bool) error

Nack is used to tell the server that this client is not willing to handle this message, optionally requeueing it for other consumers

func (*Message) Reject

func (m *Message) Reject(requeue bool) error

Reject is used to tell the server that this client is unable to handle this message, optionally requeueing it

type QueueOptions

type QueueOptions struct {
	Name      string
	Durable   bool
	Exclusive bool
}

QueueOptions holds options for queue creation

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL