Documentation ¶
Overview ¶
Package bcamqp is a RabbitMQ/AMQP client library.
AMQP is a messaging protocol, RabbitMQ is a server implementation of that protocol. Read more about them here: https://www.rabbitmq.com/tutorials/amqp-concepts.html
bcamqp allows your program to send and receive messages via the AMQP protocol with relative ease. It tries to abstract away the complexities and provide a clear interface following Go's usual style.
There is also basic reconnection logic in place. While the connection is down, the libary will block.
Example ¶
broker := New(BrokerOptions{ Encrypted: true, Address: "localhost", User: "guest", Password: "guest", AutoTimestamp: true, }) err := broker.Connect() if err != nil { log.Fatalf("connect to broker: %v", err) } defer broker.Close() err = broker.DeclareExchange(ExchangeOptions{ Name: "example-exchange", Durable: false, Type: Direct, }) if err != nil { log.Fatalf("declare exchange: %v", err) } err = broker.DeclareQueue(QueueOptions{ Name: "example-queue", Durable: false, Exclusive: false, }) if err != nil { log.Fatalf("declare queue: %v", err) } err = broker.DeclareBinding(BindingOptions{ Exchange: "example-exchange", Queue: "example-queue", RoutingKey: "#", }) if err != nil { log.Fatalf("declare binding: %v", err) } err = broker.Publish(Message{ Exchange: "example-exchange", Body: []byte("test"), ContentType: "text/plain", }) if err != nil { log.Fatalf("publish: %v", err) } cons, err := broker.Consume(ConsumerOptions{ Name: "bcamqp-example", Queue: "example-queue", AutoAck: false, Exclusive: false, }) if err != nil { log.Fatalf("consume: %v", err) } defer cons.Close() for msg := range cons.Messages() { log.Printf(`%v message with routing key "%s": %s`, msg.Timestamp, msg.RoutingKey, msg.Body) err = msg.Ack() if err != nil { log.Printf("ack message: %v", err) } }
Output:
Index ¶
- type BindingOptions
- type Broker
- func (b *Broker) Close() error
- func (b *Broker) Connect() error
- func (b *Broker) Consume(options ConsumerOptions) (*Consumer, error)
- func (b *Broker) DeclareBinding(options BindingOptions) error
- func (b *Broker) DeclareExchange(options ExchangeOptions) error
- func (b *Broker) DeclareQueue(options QueueOptions) error
- func (b *Broker) Publish(msg Message) error
- type BrokerOptions
- type Consumer
- type ConsumerOptions
- type ExchangeOptions
- type ExchangeType
- type Message
- type QueueOptions
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type BindingOptions ¶
BindingOptions holds options for binding creation
type Broker ¶
type Broker struct {
// contains filtered or unexported fields
}
Broker represents a logical connection to a broker
func New ¶
func New(options BrokerOptions) *Broker
New configures a new Broker
Connect needs to be called to use the instance
func (*Broker) Consume ¶
func (b *Broker) Consume(options ConsumerOptions) (*Consumer, error)
Consume starts a new consumer
func (*Broker) DeclareBinding ¶
func (b *Broker) DeclareBinding(options BindingOptions) error
DeclareBinding creates a binding between an exchange and a queue
func (*Broker) DeclareExchange ¶
func (b *Broker) DeclareExchange(options ExchangeOptions) error
DeclareExchange makes sure that there is an exchange with the specified properties on the broker
It returns an error when a exchange with the specified name already exists, but differs
func (*Broker) DeclareQueue ¶
func (b *Broker) DeclareQueue(options QueueOptions) error
DeclareQueue makes sure that there is a queue with the specified properties on the broker
It returns an error when a queue with the specified name already exisits, but differs
type BrokerOptions ¶
type BrokerOptions struct { Encrypted bool // Wether to use AMQPs Address string User string Password string AutoTimestamp bool // Wether to set the (unset) timestamp field when publishing messages }
BrokerOptions holds options for broker setup
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
Consumer gets messages from the broker
func (*Consumer) Close ¶
Close gracefully shuts down the consumer
Trying to read from a closed consumer
type ConsumerOptions ¶
type ConsumerOptions struct { Name string // application-defined, e.g. executable name Queue string AutoAck bool Exclusive bool }
ConsumerOptions holds options for consumer setup
type ExchangeOptions ¶
type ExchangeOptions struct { Name string Type ExchangeType Durable bool }
ExchangeOptions holds options for exchange creation
type ExchangeType ¶
type ExchangeType string
ExchangeType specifies the type of the exchange to be created
const ( Direct ExchangeType = "direct" Fanout ExchangeType = "fanout" Topic ExchangeType = "topic" Headers ExchangeType = "headers" )
Lists the different exchange types
type Message ¶
type Message struct { Exchange string RoutingKey string Body []byte Headers map[string]interface{} Timestamp time.Time // application-defined, may be set to any value ContentType string CorrelationID string ReplyTo string Expiration time.Duration // contains filtered or unexported fields }
Message is an AMQP message entity
These fields are part of the AMQP standard or RabbitMQ extensions
func (*Message) Ack ¶
Ack acknowledges the message, implying that it was processed correctly and completely
type QueueOptions ¶
QueueOptions holds options for queue creation