amqp

package
v0.4.19 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 23, 2021 License: MIT Imports: 17 Imported by: 1

Documentation

Overview

Package amqp is a wrapper and drop-in replacement package for https://github.com/streadway/amqp with automatic redials, method middleware, and more.

Index

Examples

Constants

View Source
const (
	ContentTooLarge    = streadway.ContentTooLarge
	NoRoute            = streadway.NoRoute
	NoConsumers        = streadway.NoConsumers
	ConnectionForced   = streadway.ConnectionForced
	InvalidPath        = streadway.InvalidPath
	AccessRefused      = streadway.AccessRefused
	NotFound           = streadway.NotFound
	ResourceLocked     = streadway.ResourceLocked
	PreconditionFailed = streadway.PreconditionFailed
	FrameError         = streadway.FrameError
	SyntaxError        = streadway.SyntaxError
	CommandInvalid     = streadway.CommandInvalid
	ChannelError       = streadway.ChannelError
	UnexpectedFrame    = streadway.UnexpectedFrame
	ResourceError      = streadway.ResourceError
	NotAllowed         = streadway.NotAllowed
	NotImplemented     = streadway.NotImplemented
	InternalError      = streadway.InternalError
)

Copy over the error codes

View Source
const (
	Persistent = streadway.Persistent
	Transient  = streadway.Transient
)

DeliveryMode. Transient means higher throughput but messages will not be restored on broker restart. The delivery mode of publishings is unrelated to the durability of the queues they reside on. Transient messages will not be restored to durable queues, persistent messages will be restored to durable queues and lost on non-durable queues during server restart.

This remains typed as uint8 to match Publishing.DeliveryMode. Other delivery modes specific to custom queue implementations are not enumerated here.

View Source
const (
	ExchangeDirect  = streadway.ExchangeDirect
	ExchangeFanout  = streadway.ExchangeFanout
	ExchangeHeaders = streadway.ExchangeHeaders
	ExchangeTopic   = streadway.ExchangeTopic
)

Constants for standard AMQP 0-9-1 exchange types.

Variables

View Source
var (
	ErrChannelMax      = streadway.ErrChannelMax
	ErrClosed          = streadway.ErrClosed
	ErrCommandInvalid  = streadway.ErrCommandInvalid
	ErrCredentials     = streadway.ErrCredentials
	ErrFieldType       = streadway.ErrFieldType
	ErrFrame           = streadway.ErrFrame
	ErrSASL            = streadway.ErrSASL
	ErrSyntax          = streadway.ErrSyntax
	ErrUnexpectedFrame = streadway.ErrUnexpectedFrame
	ErrVhost           = streadway.ErrVhost
)

Aliases to sentinel errors

View Source
var ErrDuplicateProvider = errors.New(
	"amqp middleware provider already registered. providers must only be registered once",
)

ErrDuplicateProvider is a sentinel error returned when am amqpmiddleware.ProvidesMiddleware is registered twice.

View Source
var ErrNoMiddlewareMethods = errors.New(
	"amqp middleware provider does not implement any middleware methods",
)

ErrNoMiddlewareMethods is a sentinel error returned when am amqpmiddleware.ProvidesMiddleware is registered but has no .

Functions

This section is empty.

Types

type Acknowledger

type Acknowledger = streadway.Acknowledger

Acknowledger notifies the server of successful or failed consumption of deliveries via identifier found in the Delivery.DeliveryTag field.

Applications can provide mock implementations in tests of Delivery handlers.

type Authentication

type Authentication = streadway.Authentication

Authentication interface provides a means for different SASL authentication mechanisms to be used during connection tuning.

type BasicChannel

type BasicChannel = streadway.Channel

BasicChannel is an alias to streadway/amqp.Channel, and is made available to avoid having to import both amqp packages if access to the base Channel type is desired

type BasicConfig

type BasicConfig = streadway.Config

BasicConfig is an alias to streadway/amqp.Config, and is made available to avoid having to import both amqp packages if access to the base Config type is desired

type BasicConfirmation

type BasicConfirmation = streadway.Confirmation

BasicConfirmation is an alias to streadway/amqp.Confirmation, and is made available to avoid having to import both amqp packages if access to the base Confirmation type is desired

type BasicConnection

type BasicConnection = streadway.Connection

BasicConnection is an alias to streadway/amqp.Connection, and is made available to avoid having to import both amqp packages if access to the base Connection type is desired

type Blocking

type Blocking = streadway.Blocking

Blocking notifies the server's TCP flow control of the Connection. When a server hits a memory or disk alarm it will block all connections until the resources are reclaimed. Use NotifyBlock on the Connection to receive these events.

type Channel

type Channel struct {
	// contains filtered or unexported fields
}

Channel represents an AMQP channel. Used as a context for valid message exchange. Errors on methods with this Channel as a receiver means this channel should be discarded and a new channel established.

---

ROGER NOTE: Channel is a drop-in replacement for streadway/amqp.Channel, with the exception that it automatically recovers from unexpected disconnections.

Unless otherwise noted at the beginning of their descriptions, all methods work exactly as their streadway counterparts, but will automatically re-attempt on ErrClosed errors. All other errors will be returned as normal. Descriptions have been copy-pasted from the streadway library for convenience.

Unlike streadway/amqp.Channel, this channel will remain open when an error is returned. Under the hood, the old, closed, channel will be replaced with a new, fresh, one -- so operation will continue as normal.

Method docstrings are copied from streadway amqp with notes where behavior diverges from the streadway/amqp package.

Example (AddMiddleware)

Add a custom middleware to channels created by a connection.

package main

import (
	"context"
	"fmt"
	"github.com/peake100/rogerRabbit-go/pkg/amqp"
	"github.com/peake100/rogerRabbit-go/pkg/amqp/amqpmiddleware"
	"github.com/peake100/rogerRabbit-go/pkg/amqptest"
)

func main() {
	// define our new middleware
	queueDeclareMiddleware := func(
		next amqpmiddleware.HandlerQueueDeclare,
	) amqpmiddleware.HandlerQueueDeclare {
		return func(
			ctx context.Context, args amqpmiddleware.ArgsQueueDeclare,
		) (amqpmiddleware.ResultsQueueDeclare, error) {
			fmt.Println("MIDDLEWARE INVOKED FOR QUEUE")
			fmt.Println("QUEUE NAME :", args.Name)
			fmt.Println("AUTO-DELETE:", args.AutoDelete)
			return next(ctx, args)
		}
	}

	// Create a config and add our middleware to it.
	config := amqp.DefaultConfig()
	config.ChannelMiddleware.AddQueueDeclare(queueDeclareMiddleware)

	// Get a new connection to our test broker.
	connection, err := amqp.DialConfigCtx(
		context.Background(), amqptest.TestDialAddress, config,
	)
	if err != nil {
		panic(err)
	}
	defer connection.Close()

	// Get a new channel from our robust connection for publishing. The channel is
	// created with our default middleware.
	channel, err := connection.Channel()
	if err != nil {
		panic(err)
	}

	// Declare our queue, our middleware will be invoked and print a message.
	_, err = channel.QueueDeclare(
		"example_middleware",
		false,
		true,
		false,
		false,
		nil,
	)
	if err != nil {
		panic(err)
	}

	// MIDDLEWARE INVOKED FOR QUEUE
	// QUEUE NAME : example_middleware
	// AUTO-DELETE: true
}
Output:

Example (CustomMiddlewareProvider)
package main

import (
	"context"
	"fmt"
	"github.com/peake100/rogerRabbit-go/pkg/amqp"
	"github.com/peake100/rogerRabbit-go/pkg/amqp/amqpmiddleware"
	"github.com/peake100/rogerRabbit-go/pkg/amqptest"
)

// CustomMiddlewareProvider exposes methods for middlewares that need to coordinate.
type CustomMiddlewareProvider struct {
	InvocationCount int
}

// TypeID implements amqpmiddleware.ProvidesMiddleware and returns a unique type ID
// that can be used to fetch middleware values when testing.
func (middleware *CustomMiddlewareProvider) TypeID() amqpmiddleware.ProviderTypeID {
	return "CustomMiddleware"
}

// QueueDeclare implements amqpmiddleware.ProvidesQueueDeclare.
func (middleware *CustomMiddlewareProvider) QueueDeclare(
	next amqpmiddleware.HandlerQueueDeclare,
) amqpmiddleware.HandlerQueueDeclare {
	return func(
		ctx context.Context, args amqpmiddleware.ArgsQueueDeclare,
	) (amqpmiddleware.ResultsQueueDeclare, error) {
		middleware.InvocationCount++
		fmt.Printf(
			"DECLARED: %v, TOTAL: %v\n", args.Name, middleware.InvocationCount,
		)
		return next(ctx, args)
	}
}

// QueueDelete implements amqpmiddleware.ProvidesQueueDelete.
func (middleware *CustomMiddlewareProvider) QueueDelete(
	next amqpmiddleware.HandlerQueueDelete,
) amqpmiddleware.HandlerQueueDelete {
	return func(
		ctx context.Context, args amqpmiddleware.ArgsQueueDelete,
	) (amqpmiddleware.ResultsQueueDelete, error) {
		middleware.InvocationCount++
		fmt.Printf(
			"DELETED: %v, TOTAL: %v\n", args.Name, middleware.InvocationCount,
		)
		return next(ctx, args)
	}
}

// NewCustomMiddlewareProvider creates a new CustomMiddlewareProvider.
func NewCustomMiddlewareProvider() amqpmiddleware.ProvidesMiddleware {
	return new(CustomMiddlewareProvider)
}

func main() {
	// Create a config and add our middleware provider factory to it.
	config := amqp.DefaultConfig()
	config.ChannelMiddleware.AddProviderFactory(NewCustomMiddlewareProvider)

	// Get a new connection to our test broker.
	connection, err := amqp.DialConfigCtx(
		context.Background(), amqptest.TestDialAddress, config,
	)
	if err != nil {
		panic(err)
	}
	defer connection.Close()

	// Get a new channel from our robust connection for publishing. The channel is
	// created with our default middleware.
	channel, err := connection.Channel()
	if err != nil {
		panic(err)
	}

	// Declare our queue, our middleware will be invoked and print a message.
	_, err = channel.QueueDeclare(
		"example_middleware",
		false, // durable
		true,  // autoDelete
		false, // exclusive
		false, // noWait
		nil,   // args
	)
	if err != nil {
		panic(err)
	}

	// Delete our queue, our middleware will be invoked and print a message.
	_, err = channel.QueueDelete(
		"example_middleware",
		false, // ifUnused
		false, // ifEmpty
		false, // noWait
	)
	if err != nil {
		panic(err)
	}

	// MIDDLEWARE INVOKED FOR QUEUE
	// DECLARED: example_middleware, TOTAL: 1
	// DELETED: example_middleware, TOTAL: 2
	// AUTO-DELETE: true
}
Output:

Example (Reconnect)

Channel reconnect examples.

package main

import (
	"context"
	"fmt"
	"github.com/peake100/rogerRabbit-go/pkg/amqp"
	"github.com/peake100/rogerRabbit-go/pkg/amqptest"
	"testing"
)

func main() {
	// Get a new connection to our test broker.
	//
	// DialCtx is a new function that allows the Dial function to keep attempting
	// re-dials to the broker until the passed context expires.
	connection, err := amqp.DialCtx(context.Background(), amqptest.TestDialAddress)
	if err != nil {
		panic(err)
	}

	// Get a new channel from our robust connection.
	channel, err := connection.Channel()
	if err != nil {
		panic(err)
	}

	// We can use the Test method to return a testing harness with some additional
	// methods. ForceReconnect force-closes the underlying streadway Channel, causing
	// the robust Channel to reconnect.
	//
	// We'll use a dummy *testing.T object here. These methods are designed for tests
	// only and should not be used in production code.
	channel.Test(new(testing.T)).ForceReconnect(context.Background())

	// We can see here our channel is still open.
	fmt.Println("IS CLOSED:", channel.IsClosed())

	// We can even declare a queue on it
	queue, err := channel.QueueDeclare(
		"example_channel_reconnect", // name
		false,                       // durable
		true,                        // autoDelete
		false,                       // exclusive
		false,                       // noWait
		nil,                         // args
	)
	if err != nil {
		panic(err)
	}

	// Here is the result
	fmt.Printf("QUEUE    : %+v\n", queue)

	// Explicitly close the connection. This will also close all child channels.
	err = connection.Close()
	if err != nil {
		panic(err)
	}

	// Now that we have explicitly closed the connection, the channel will be closed.
	fmt.Println("IS CLOSED:", channel.IsClosed())

	// IS CLOSED: false
	// QUEUE    : {Name:example_channel_reconnect Messages:0 Consumers:0}
	// IS CLOSED: true
}
Output:

func (*Channel) Ack

func (channel *Channel) Ack(tag uint64, multiple bool) error

Ack acknowledges a delivery by its delivery tag when having been consumed with Channel.Consume or Channel.Get.

Ack acknowledges all message received prior to the delivery tag when multiple is true.

See also Delivery.Ack

---

ROGER NOTE: If a tag (or a tag range when acking multiple tags) is from a previously disconnected channel, a ErrCantAcknowledgeOrphans will be returned, which is a new error type specific to the Roger implementation.

When acking multiple tags, it is possible that some of the tags will be from a closed underlying channel, and therefore be orphaned, and some will be from the current channel, and therefore be successful. In such cases, the ErrCantAcknowledgeOrphans will still be returned, and can be checked for which tag ranges could not be acked.

func (*Channel) Close

func (manager *Channel) Close() error

Close the robust connection. This both closes the current connection and keeps it from reconnecting.

func (*Channel) Confirm

func (channel *Channel) Confirm(noWait bool) error

Confirm puts this channel into confirm mode so that the client can ensure all publishings have successfully been received by the server. After entering this mode, the server will send a basic.ack or basic.nack message with the deliver tag set to a 1 based incremental index corresponding to every publishing received after the this method returns.

Add a listener to Channel.NotifyPublish to respond to the Confirmations. If Channel.NotifyPublish is not called, the Confirmations will be silently ignored.

The order of acknowledgments is not bound to the order of deliveries.

Ack and Nack confirmations will arrive at some point in the future.

Uncountable mandatory or immediate messages are acknowledged immediately after any Channel.NotifyReturn listeners have been notified. Other messages are acknowledged when all queues that should have the message routed to them have either received acknowledgment of delivery or have enqueued the message, persisting the message if necessary.

When NoWait is true, the client will not wait for a response. A channel exception could occur if the server does not support this method.

---

ROGER NOTE: Tags will be continuous, even in the event of a re-connect. The Channel will take care of matching up caller-facing delivery tags to the current channel's underlying tag.

func (*Channel) Consume

func (channel *Channel) Consume(
	queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args Table,
) (deliveryChan <-chan internal.Delivery, err error)

Consume immediately starts delivering queued messages.

Begin receiving on the returned chan Delivery before any other operation on the Connection or Channel.

Continues deliveries to the returned chan Delivery until Channel.Cancel, Connection.Close, Channel.Close, or an AMQP exception occurs. Consumers must range over the chan to ensure all deliveries are received. Unreceived deliveries will block all methods on the same connection.

All deliveries in AMQP must be acknowledged. It is expected of the consumer to call Delivery.Ack after it has successfully processed the delivery. If the consumer is cancelled or the channel or connection is closed any unacknowledged deliveries will be requeued at the end of the same queue.

The consumer is identified by a string that is unique and scoped for all consumers on this channel. If you wish to eventually cancel the consumer, use the same non-empty identifier in Channel.Cancel. An empty string will cause the library to generate a unique identity. The consumer identity will be included in every Delivery in the ConsumerTag field

When autoAck (also known as noAck) is true, the server will acknowledge deliveries to this consumer prior to writing the delivery to the network. When autoAck is true, the consumer should not call Delivery.Ack. Automatically acknowledging deliveries means that some deliveries may get lost if the consumer is unable to process them after the server delivers them. See http://www.rabbitmq.com/confirms.html for more details.

When Exclusive is true, the server will ensure that this is the sole consumer from this queue. When Exclusive is false, the server will fairly distribute deliveries across multiple consumers.

The noLocal flag is not supported by RabbitMQ.

It's advisable to use separate connections for Channel.Publish and Channel.Consume so not to have TCP pushback on publishing affect the ability to consume messages, so this parameter is here mostly for completeness.

When NoWait is true, do not wait for the server to confirm the request and immediately begin deliveries. If it is not possible to consume, a channel exception will be raised and the channel will be closed.

Optional arguments can be provided that have specific semantics for the queue or server.

Inflight messages, limited by Channel.Qos will be buffered until received from the returned chan.

When the Channel or Connection is closed, all buffered and inflight messages will be dropped.

When the consumer tag is cancelled, all inflight messages will be delivered until the returned chan is closed.

---

ROGER NOTE: Unlike the normal consume method, reconnections are handled automatically on channel errors. Delivery tags will appear un-interrupted to the consumer, and the Roger. Rabbit channel will track the lineup of caller-facing delivery tags to the delivery tags of the current underlying channel.

Example (DeliveryTags)
package main

import (
	"context"
	"fmt"
	"github.com/peake100/rogerRabbit-go/pkg/amqp"
	"github.com/peake100/rogerRabbit-go/pkg/amqptest"
	"sync"
	"testing"
)

func main() {
	// Get a new connection to our test broker.
	connection, err := amqp.DialCtx(context.Background(), amqptest.TestDialAddress)
	if err != nil {
		panic(err)
	}

	// Get a new channel from our robust connection for consuming.
	consumeChannel, err := connection.Channel()
	if err != nil {
		panic(err)
	}

	// Get a new channel from our robust connection for publishing.
	publishChannel, err := connection.Channel()
	if err != nil {
		panic(err)
	}

	queueName := "example_delivery_tag_continuity"

	// Declare the queue we are going to use.
	queue, err := consumeChannel.QueueDeclare(
		queueName, // name
		false,     // durable
		false,     // autoDelete
		false,     // exclusive
		false,     // noWait
		nil,       // args
	)
	if err != nil {
		panic(err)
	}

	// Clean up the queue on exit,
	defer consumeChannel.QueueDelete(
		queue.Name, false, false, false,
	)

	// Start consuming the channel
	consume, err := consumeChannel.Consume(
		queue.Name,
		"example consumer", // consumer name
		true,               // autoAck
		false,              // exclusive
		false,              // no local
		false,              // no wait
		nil,                // args
	)
	if err != nil {
		panic(err)
	}

	// We'll close this channel when the consumer is exhausted
	consumeComplete := new(sync.WaitGroup)
	consumerClosed := make(chan struct{})

	// Set the prefetch count to 1, that way we are less likely to lose a message
	// that in in-flight from the broker in this example.
	err = consumeChannel.Qos(1, 0, false)
	if err != nil {
		panic(err)
	}

	// Launch a consumer
	go func() {
		// Close the consumeComplete to signal exit
		defer close(consumerClosed)

		fmt.Println("STARTING CONSUMER")

		// Range over the consume channel
		for delivery := range consume {
			// Force-reconnect the channel after each delivery.
			consumeChannel.Test(new(testing.T)).ForceReconnect(context.Background())

			// Tick down the consumeComplete WaitGroup
			consumeComplete.Done()

			// Print the delivery. Even though we are forcing a new underlying channel
			// to be connected each time, the delivery tags will still be continuous.
			fmt.Printf(
				"DELIVERY %v: %v\n", delivery.DeliveryTag, string(delivery.Body),
			)
		}

		fmt.Println("DELIVERIES EXHAUSTED")
	}()

	// We'll publish 10 test messages.
	for i := 0; i < 10; i++ {
		// Add one to the consumeComplete WaitGroup.
		consumeComplete.Add(1)

		// Publish a message. Even though the consumer may be force re-connecting the
		// connection each time, we can keep using the channel.
		//
		// NOTE: it is possible that we will drop a message here during a reconnection
		// event. If we want to be sure all messages reach the broker, we'll need to
		// publish messages with the Channel in confirmation mode, which we will
		// show in another example.
		err = publishChannel.Publish(
			"",
			queue.Name,
			false,
			false,
			amqp.Publishing{
				Body: []byte(fmt.Sprintf("message %v", i)),
			},
		)
		if err != nil {
			panic(err)
		}
	}

	// Wait for all messages to be received
	consumeComplete.Wait()

	// Close the connection
	err = connection.Close()
	if err != nil {
		panic(err)
	}

	// Wait for the consumer to exit
	<-consumerClosed

	// exit

	// STARTING CONSUMER
	// DELIVERY 1: message 0
	// DELIVERY 2: message 1
	// DELIVERY 3: message 2
	// DELIVERY 4: message 3
	// DELIVERY 5: message 4
	// DELIVERY 6: message 5
	// DELIVERY 7: message 6
	// DELIVERY 8: message 7
	// DELIVERY 9: message 8
	// DELIVERY 10: message 9
	// DELIVERIES EXHAUSTED
}
Output:

Example (Orphan)
package main

import (
	"context"
	"errors"
	"fmt"
	"github.com/peake100/rogerRabbit-go/pkg/amqp"
	"github.com/peake100/rogerRabbit-go/pkg/amqptest"
	"testing"
)

func main() {
	// Get a new connection to our test broker.
	connection, err := amqp.DialCtx(context.Background(), amqptest.TestDialAddress)
	if err != nil {
		panic(err)
	}

	// Get a new channel from our robust connection for consuming.
	channel, err := connection.Channel()
	if err != nil {
		panic(err)
	}

	queueName := "example_delivery_ack_orphan"

	// Declare the queue we are going to use.
	_, err = channel.QueueDeclare(
		queueName, // name
		false,     // durable
		true,      // autoDelete
		false,     // exclusive
		false,     // noWait
		nil,       // args
	)
	if err != nil {
		panic(err)
	}

	// Cleanup channel on exit.
	defer channel.QueueDelete(queueName, false, false, false)

	// Start consuming the channel
	consume, err := channel.Consume(
		queueName,
		"example consumer", // consumer name
		// Auto-ack is set to false
		false, // autoAck
		false, // exclusive
		false, // no local
		false, // no wait
		nil,   // args
	)
	if err != nil {
		panic(err)
	}

	// publish a message
	err = channel.Publish(
		"", // exchange
		queueName,
		false,
		false,
		amqp.Publishing{
			Body: []byte("test message"),
		},
	)
	if err != nil {
		panic(err)
	}

	// get the delivery of our published message
	delivery := <-consume
	fmt.Println("DELIVERY:", string(delivery.Body))

	// Force-close the channel.
	channel.Test(new(testing.T)).ForceReconnect(context.Background())

	// Now that the original underlying channel is closed, it is impossible to ack
	// the delivery. We will get an error when we try.
	err = delivery.Ack(false)
	fmt.Println("ACK ERROR:", err)

	// This error is an orphan error
	var orphanErr amqp.ErrCantAcknowledgeOrphans
	if !errors.As(err, &orphanErr) {
		panic("error not orphan error")
	}

	fmt.Println("FIRST ORPHAN TAG:", orphanErr.OrphanTagFirst)
	fmt.Println("LAST ORPHAN TAG :", orphanErr.OrphanTagLast)

	// DELIVERY: test message
	// ACK ERROR: 1 tags orphaned (1 - 1), 0 tags successfully acknowledged
	// FIRST ORPHAN TAG: 1
	// LAST ORPHAN TAG : 1
}
Output:

func (*Channel) ExchangeBind

func (channel *Channel) ExchangeBind(
	destination, key, source string, noWait bool, args Table,
) (err error)

ExchangeBind binds an exchange to another exchange to create inter-exchange routing topologies on the server. This can decouple the private topology and routing exchanges from exchanges intended solely for publishing endpoints.

Binding two exchanges with identical arguments will not create duplicate bindings.

Binding one exchange to another with multiple bindings will only deliver a message once. For example if you bind your exchange to `amq.fanout` with two different binding keys, only a single message will be delivered to your exchange even though multiple bindings will match.

Given a message delivered to the source exchange, the message will be forwarded to the destination exchange when the routing Key is matched.

ExchangeBind("sell", "MSFT", "trade", false, nil)
ExchangeBind("buy", "AAPL", "trade", false, nil)

Delivery       Source      Key      Destination
example        exchange             exchange
-----------------------------------------------
Key: AAPL  --> trade ----> MSFT     sell
                     \---> AAPL --> buy

When NoWait is true, do not wait for the server to confirm the binding. If any error occurs the channel will be closed. Add a listener to NotifyClose to handle these errors.

Optional arguments specific to the exchanges bound can also be specified.

---

ROGER NOTE: All bindings will be remembered and re-declared on reconnection events.

func (*Channel) ExchangeDeclare

func (channel *Channel) ExchangeDeclare(
	name, kind string, durable, autoDelete, internal, noWait bool, args Table,
) (err error)

ExchangeDeclare declares an exchange on the server. If the exchange does not already exist, the server will create it. If the exchange exists, the server verifies that it is of the provided type, durability and auto-delete flags.

Errors returned from this method will close the channel.

Exchange names starting with "amq." are reserved for pre-declared and standardized exchanges. The client MAY declare an exchange starting with "amq." if the passive option is set, or the exchange already exists. Names can consist of a non-empty sequence of letters, digits, hyphen, underscore, period, or colon.

Each exchange belongs to one of a set of exchange kinds/types implemented by the server. The exchange types define the functionality of the exchange - i.e. how messages are routed through it. Once an exchange is declared, its type cannot be changed. The common types are "direct", "fanout", "topic" and "headers".

Durable and Non-Auto-Deleted exchanges will survive server restarts and remain declared when there are no remaining bindings. This is the best lifetime for long-lived exchange configurations like stable routes and default exchanges.

Non-Durable and Auto-Deleted exchanges will be deleted when there are no remaining bindings and not restored on server restart. This lifetime is useful for temporary topologies that should not pollute the virtual host on failure or after the consumers have completed.

Non-Durable and Non-Auto-deleted exchanges will remain as long as the server is running including when there are no remaining bindings. This is useful for temporary topologies that may have long delays between bindings.

Durable and Auto-Deleted exchanges will survive server restarts and will be removed before and after server restarts when there are no remaining bindings. These exchanges are useful for robust temporary topologies or when you require binding Durable queues to auto-deleted exchanges.

Note: RabbitMQ declares the default exchange types like 'amq.fanout' as Durable, so queues that bind to these pre-declared exchanges must also be Durable.

Exchanges declared as `internal` do not accept accept publishings. Internal exchanges are useful when you wish to implement inter-exchange topologies that should not be exposed to users of the broker.

When NoWait is true, declare without waiting for a confirmation from the server. The channel may be closed as a result of an error. Add a NotifyClose listener to respond to any exceptions.

Optional amqp.Table of arguments that are specific to the server's implementation of the exchange can be sent for exchange types that require extra parameters.

---

ROGER NOTE: Exchanges declared with a roger channel will be automatically re-declared upon channel disconnect recovery. Calling ExchangeDelete will remove the exchange from the list of exchanges to be re-declared in case of a disconnect.

This may cases where exchanges deleted by other producers / consumers are automatically re-declared. Future updates may introduce more control over when and how exchanges are re-declared on reconnection.

func (*Channel) ExchangeDeclarePassive

func (channel *Channel) ExchangeDeclarePassive(
	name, kind string, durable, autoDelete, internal, noWait bool, args Table,
) (err error)

ExchangeDeclarePassive is functionally and parametrically equivalent to ExchangeDeclare, except that it sets the "passive" attribute to true. A passive exchange is assumed by RabbitMQ to already exist, and attempting to connect to a non-existent exchange will cause RabbitMQ to throw an exception. This function can be used to detect the existence of an exchange.

func (*Channel) ExchangeDelete

func (channel *Channel) ExchangeDelete(
	name string, ifUnused, noWait bool,
) (err error)

ExchangeDelete removes the named exchange from the server. When an exchange is deleted all queue bindings on the exchange are also deleted. If this exchange does not exist, the channel will be closed with an error.

When ifUnused is true, the server will only delete the exchange if it has no queue bindings. If the exchange has queue bindings the server does not delete it but close the channel with an exception instead. Set this to true if you are not the sole owner of the exchange.

When NoWait is true, do not wait for a server confirmation that the exchange has been deleted. Failing to delete the channel could close the channel. Add a NotifyClose listener to respond to these channel exceptions.

---

ROGER NOTE: Calling ExchangeDelete will remove am exchange from the list of exchanges or relevant bindings to be re-declared on reconnections of the underlying streadway/amqp.Channel object.

func (*Channel) ExchangeUnbind

func (channel *Channel) ExchangeUnbind(
	destination, key, source string, noWait bool, args Table,
) (err error)

ExchangeUnbind unbinds the destination exchange from the source exchange on the server by removing the routing Key between them. This is the inverse of ExchangeBind. If the binding does not currently exist, an error will be returned.

When NoWait is true, do not wait for the server to confirm the deletion of the binding. If any error occurs the channel will be closed. Add a listener to NotifyClose to handle these errors.

Optional arguments that are specific to the type of exchanges bound can also be provided. These must match the same arguments specified in ExchangeBind to identify the binding.

---

ROGER NOTE: All relevant bindings will be removed from the list of bindings to declare on reconnect.

func (*Channel) Flow

func (channel *Channel) Flow(active bool) error

Flow pauses the delivery of messages to consumers on this channel. Channels are opened with flow control active, to open a channel with paused deliveries immediately call this method with `false` after calling Connection.Channel.

When active is `false`, this method asks the server to temporarily pause deliveries until called again with active as `true`.

Channel.Get methods will not be affected by flow control.

This method is not intended to act as window control. Use Channel.Qos to limit the number of unacknowledged messages or bytes in flight instead.

The server may also send us flow methods to throttle our publishings. A well behaving publishing client should add a listener with Channel.NotifyFlow and pause its publishings when `false` is sent on that channel.

Note: RabbitMQ prefers to use TCP push back to control flow for all channels on a connection, so under high volume scenarios, it's wise to open separate Connections for publishings and deliveries.

func (*Channel) Get

func (channel *Channel) Get(
	queue string,
	autoAck bool,
) (msg internal.Delivery, ok bool, err error)

Get synchronously receives a single Delivery from the head of a queue from the server to the client. In almost all cases, using Channel.Consume will be preferred.

If there was a delivery waiting on the queue and that delivery was received, the second return value will be true. If there was no delivery waiting or an error occurred, the ok bool will be false.

All deliveries must be acknowledged including those from Channel.Get. Call Delivery.Ack on the returned delivery when you have fully processed this delivery.

When autoAck is true, the server will automatically acknowledge this message so you don'tb have to. But if you are unable to fully process this message before the channel or connection is closed, the message will not get requeued.

---

ROGER NOTE: Roger, Rabbit Channel objects will expose a continuous set of delivery tags to the user, even over disconnects, adjusting a message's delivery tags to match its actual underlying tag relative to the current channel is all handled for you.

func (*Channel) IsClosed

func (manager *Channel) IsClosed() bool

IsClosed returns true if the connection is marked as closed, otherwise false is returned.

--

ROGER NOTE: unlike streadway/amqp, which only implements IsClosed() on connection objects, rogerRabbit makes IsClosed() available on both connections and channels. IsClosed() will return false until the connection / channel is shut down, even if the underlying connection is currently disconnected and waiting to reconnectMiddleware.

func (*Channel) Nack

func (channel *Channel) Nack(tag uint64, multiple bool, requeue bool) error

Nack negatively acknowledges a delivery by its delivery tag. Prefer this method to notify the server that you were not able to process this delivery and it must be redelivered or dropped.

See also Delivery.Nack

---

ROGER NOTE: If a tag (or a tag range when nacking multiple tags) is from a previously disconnected channel, a ErrCantAcknowledgeOrphans will be returned, which is a new error type specific to the Roger implementation.

When nacking multiple tags, it is possible that some of the tags will be from a closed underlying channel, and therefore be orphaned, and some will be from the current channel, and therefore be successful. In such cases, the ErrCantAcknowledgeOrphans will still be returned, and can be checked for which tag ranges could not be nacked.

func (*Channel) NotifyCancel

func (channel *Channel) NotifyCancel(cancellations chan string) chan string

NotifyCancel registers a listener for basic.cancel methods. These can be sent from the server when a queue is deleted or when consuming from a mirrored queue where the master has just failed (and was moved to another node).

The subscription tag is returned to the listener.

func (*Channel) NotifyClose

func (manager *Channel) NotifyClose(
	receiver chan *streadway.Error,
) chan *streadway.Error

NotifyClose is as NotifyClose on streadway Connection/Channel.NotifyClose. Subscribers to Close events will not be notified when a reconnection occurs under the hood, only when the roger Connection or Channel is closed by calling the Close method. This mirrors the streadway implementation, where Close events are only sent once when the livesOnce object becomes unusable.

For finer-grained connection status, see NotifyDial and NotifyDisconnect, which will both send individual events when the connection is lost or re-acquired.

func (*Channel) NotifyConfirm

func (channel *Channel) NotifyConfirm(
	ack, nack chan uint64,
) (chan uint64, chan uint64)

NotifyConfirm calls NotifyPublish and starts a goroutine sending ordered Ack and Nack publicationTag to the respective channels.

For strict ordering, use NotifyPublish instead.

---

ROGER NOTE: The nack channel will receive both tags that were explicitly nacked by the server AND tags that were orphaned due to a connection loss. If you wish to handle Orphaned tags separately, use the new method NotifyConfirmOrOrphaned.

func (*Channel) NotifyConfirmOrOrphaned

func (channel *Channel) NotifyConfirmOrOrphaned(
	ack, nack, orphaned chan uint64,
) (chan uint64, chan uint64, chan uint64)

NotifyConfirmOrOrphaned is as NotifyConfirm, but with a third queue for delivery tags that were orphaned from disconnect, these tags are routed to the nack channel in NotifyConfirm.

func (*Channel) NotifyDial

func (manager *Channel) NotifyDial(
	receiver chan error,
) error

NotifyDial is new for robust Roger transportType objects. NotifyDial will send all subscribers an event notification every time we try to re-acquire a connection. This will include both failure AND successes.

func (*Channel) NotifyDisconnect

func (manager *Channel) NotifyDisconnect(
	receiver chan error,
) error

NotifyDisconnect is new for robust Roger transportType objects. NotifyDisconnect will send all subscribers an event notification every time the underlying connection is lost.

func (*Channel) NotifyFlow

func (channel *Channel) NotifyFlow(flowNotifications chan bool) chan bool

NotifyFlow registers a listener for basic.flow methods sent by the server. When `false` is sent on one of the listener channels, all publishers should pause until a `true` is sent.

The server may ask the producer to pause or restart the flow of Publishings sent by on a channel. This is a simple flow-control mechanism that a server can use to avoid overflowing its queues or otherwise finding itself receiving more messages than it can process. Note that this method is not intended for window control. It does not affect contents returned by basic.get-ok methods.

When a new channel is opened, it is active (flow is active). Some applications assume that channels are inactive until started. To emulate this behavior a client MAY open the channel, then pause it.

Publishers should respond to a flow messages as rapidly as possible and the server may disconnect over producing channels that do not respect these messages.

basic.flow-ok methods will always be returned to the server regardless of the number of listeners there are.

To control the flow of deliveries from the server, use the Channel.Flow() method instead.

Note: RabbitMQ will rather use TCP pushback on the network connection instead of sending basic.flow. This means that if a single channel is producing too much on the same connection, all channels using that connection will suffer, including acknowledgments from deliveries. Use different Connections if you desire to interleave consumers and producers in the same process to avoid your basic.ack messages from getting rate limited with your basic.publish messages.

---

ROGER NOTE: Flow notification will be received when an unexpected disconnection occurs. If the broker disconnects, a “false“ notification will be sent unless the last notification from the broker was “false“, and when the connection is re-acquired, a “true“ notification will be sent before resuming relaying notification from the broker. This means that NotifyFlow can be a useful tool for dealing with disconnects, even when using RabbitMQ.

func (*Channel) NotifyPublish

func (channel *Channel) NotifyPublish(
	confirm chan internal.Confirmation,
) chan internal.Confirmation

NotifyPublish registers a listener for reliable publishing. Receives from this chan for every publish after Channel.Confirm will be in order starting with publicationTag 1.

There will be one and only one Confirmation Publishing starting with the delivery tag of 1 and progressing sequentially until the total number of Publishings have been seen by the server.

Acknowledgments will be received in the order of delivery from the NotifyPublish channels even if the server acknowledges them out of order.

The listener chan will be closed when the Channel is closed.

The capacity of the chan Confirmation must be at least as large as the number of outstanding publishings. Not having enough buffered chans will create a deadlock if you attempt to perform other operations on the Connection or Channel while confirms are in-flight.

It's advisable to wait for all Confirmations to arrive before calling Channel.Close() or Connection.Close().

---

ROGER NOTE:If a channel is disconnected unexpectedly, there may be confirmations in flight that did not reach the client. In cases where a channel connection is re-established missing delivery tags will be reported qs nacked, but an additional Confirmation.DisconnectOrphan field will be set to `true`. It is possible that such messages DID reach the broker, but the Ack messages were lost in the disconnect event.

It's also possible that an orphan is caused from a problem with publishing the message in the first place. For instance, publishing with the “immediate“ flag set to false if the broker does not support it, or if a queue was not declared correctly. If you are getting a lot of orphaned messages, make sure to check what disconnect errors you are receiving.

func (*Channel) NotifyReturn

func (channel *Channel) NotifyReturn(returns chan Return) chan Return

NotifyReturn registers a listener for basic.return methods. These can be sent from the server when a publish is undeliverable either from the mandatory or immediate flags.

A return struct has a copy of the Publishing along with some error information about why the publishing failed.

---

ROGER NOTE: Because Channel survives over unexpected broker disconnects, it is possible that returns in-flight to the client from the broker will be dropped, and therefore will be missed.

func (*Channel) Publish

func (channel *Channel) Publish(
	exchange string,
	key string,
	mandatory bool,
	immediate bool,
	msg Publishing,
) (err error)

Publish sends a Publishing from the client to an exchange on the server.

When you want a single message to be delivered to a single queue, you can publish to the default exchange with the routingKey of the queue Name. This is because every declared queue gets an implicit route to the default exchange.

Since publishings are asynchronous, any undeliverable message will get returned by the server. Add a listener with Channel.NotifyReturn to handle any undeliverable message when calling publish with either the mandatory or immediate parameters as true.

Publishings can be undeliverable when the mandatory flag is true and no queue is bound that matches the routing Key, or when the immediate flag is true and no consumer on the matched queue is ready to accept the delivery.

This can return an error when the channel, connection or socket is closed. The error or lack of an error does not indicate whether the server has received this publishing.

It is possible for publishing to not reach the broker if the underlying socket is shut down without pending publishing packets being flushed from the kernel buffers. The easy way of making it probable that all publishings reach the server is to always call Connection.Close before terminating your publishing application. The way to ensure that all publishings reach the server is to add a listener to Channel.NotifyPublish and put the channel in confirm mode with Channel.Confirm. Publishing delivery tags and their corresponding confirmations start at 1. Exit when all publishings are confirmed.

When Publish does not return an error and the channel is in confirm mode, the internal counter for DeliveryTags with the first confirmation starts at 1.

---

ROGER NOTE: Roger, Rabbit Channel objects will expose a continuous set of confirmation and delivery tags to the user, even over disconnects, adjusting a message's confirmation tag to match it's actual underlying tag relative to the current channel is all handled for you.

Example (TagContinuity)

Publication tags remain continuous, even across disconnection events.

package main

import (
	"context"
	"fmt"
	"github.com/peake100/rogerRabbit-go/pkg/amqp"
	"github.com/peake100/rogerRabbit-go/pkg/amqptest"
	"sync"
	"testing"
)

func main() {
	// Get a new connection to our test broker.
	connection, err := amqp.DialCtx(context.Background(), amqptest.TestDialAddress)
	if err != nil {
		panic(err)
	}

	// Get a new channel from our robust connection for publishing.
	publishChannel, err := connection.Channel()
	if err != nil {
		panic(err)
	}

	// Put the channel into confirmation mode
	err = publishChannel.Confirm(false)
	if err != nil {
		panic(err)
	}

	confirmationsReceived := new(sync.WaitGroup)
	confirmationsComplete := make(chan struct{})

	// Create a channel to consume publication confirmations.
	publishEvents := publishChannel.NotifyPublish(make(chan amqp.Confirmation))
	go func() {
		// Close to signal exit.
		defer close(confirmationsComplete)

		// Range over the confirmation channel.
		for confirmation := range publishEvents {
			// Mark 1 confirmation as done.
			confirmationsReceived.Done()

			// Print confirmation.
			fmt.Printf(
				"CONFIRMATION TAG %02d: ACK: %v ORPHAN: %v\n",
				confirmation.DeliveryTag,
				confirmation.Ack,
				// If the confirmation was never received because the channel was
				// disconnected, then confirmation.Ack will be false, and
				// confirmation.DisconnectOrphan will be true.
				confirmation.DisconnectOrphan,
			)
		}
	}()

	// Declare the message queue
	queueName := "example_publish_tag_continuity"
	_, err = publishChannel.QueueDeclare(
		queueName,
		false,
		true,
		false,
		false,
		nil,
	)
	if err != nil {
		panic(err)
	}

	// We'll publish 10 test messages.
	for i := 0; i < 10; i++ {
		// We want to wait here to make sure we got the confirmation from the last
		// publication before force-closing the connection to show we can handle it.
		confirmationsReceived.Wait()

		// Force a reconnection of the underlying channel.
		publishChannel.Test(new(testing.T)).ForceReconnect(context.Background())

		// Increment the confirmation WaitGroup
		confirmationsReceived.Add(1)

		// Publish a message. Even though the consumer may be force re-connecting the
		// connection each time, we can keep using the channel.
		err = publishChannel.Publish(
			"",
			queueName,
			false,
			false,
			amqp.Publishing{
				Body: []byte(fmt.Sprintf("message %v", i)),
			},
		)
		if err != nil {
			panic(err)
		}
	}

	// Wait for all confirmations to be received.
	confirmationsReceived.Wait()

	// Close the connection.
	err = connection.Close()
	if err != nil {
		panic(err)
	}

	// Wait for the confirmation routine to exit.
	<-confirmationsComplete

	// Exit.

	// CONFIRMATION TAG 01: ACK: true ORPHAN: false
	// CONFIRMATION TAG 02: ACK: true ORPHAN: false
	// CONFIRMATION TAG 03: ACK: true ORPHAN: false
	// CONFIRMATION TAG 04: ACK: true ORPHAN: false
	// CONFIRMATION TAG 05: ACK: true ORPHAN: false
	// CONFIRMATION TAG 06: ACK: true ORPHAN: false
	// CONFIRMATION TAG 07: ACK: true ORPHAN: false
	// CONFIRMATION TAG 08: ACK: true ORPHAN: false
	// CONFIRMATION TAG 09: ACK: true ORPHAN: false
	// CONFIRMATION TAG 10: ACK: true ORPHAN: false
}
Output:

func (*Channel) Qos

func (channel *Channel) Qos(prefetchCount, prefetchSize int, global bool) error

Qos controls how many messages or how many bytes the server will try to keep on the network for consumers before receiving delivery acks. The intent of Qos is to make sure the network buffers stay full between the server and client.

With a prefetch publishCount greater than zero, the server will deliver that many messages to consumers before acknowledgments are received. The server ignores this option when consumers are started with noAck because no acknowledgments are expected or sent.

With a prefetch size greater than zero, the server will try to keep at least that many bytes of deliveries flushed to the network before receiving acknowledgments from the consumers. This option is ignored when consumers are started with noAck.

When global is true, these Qos settings apply to all existing and future consumers on all channels on the same connection. When false, the Channel.Qos settings will apply to all existing and future consumers on this channel.

Please see the RabbitMQ Consumer Prefetch documentation for an explanation of how the global flag is implemented in RabbitMQ, as it differs from the AMQP 0.9.1 specification in that global Qos settings are limited in scope to channels, not connections (https://www.rabbitmq.com/consumer-prefetch.html).

To get round-robin behavior between consumers consuming from the same queue on different connections, set the prefetch publishCount to 1, and the next available message on the server will be delivered to the next available consumer.

If your consumer work time is reasonably consistent and not much greater than two times your network round trip time, you will see significant throughput improvements starting with a prefetch publishCount of 2 or slightly greater as described by benchmarks on RabbitMQ.

http://www.rabbitmq.com/blog/2012/04/25/rabbitmq-performance-measurements-part-2/

func (*Channel) QueueBind

func (channel *Channel) QueueBind(
	name, key, exchange string, noWait bool, args Table,
) error

QueueBind binds an exchange to a queue so that publishings to the exchange will be routed to the queue when the publishing routing Key matches the binding routing Key.

QueueBind("pagers", "alert", "log", false, nil)
QueueBind("emails", "info", "log", false, nil)

Delivery       Exchange  Key       Queue
-----------------------------------------------
Key: alert --> log ----> alert --> pagers
Key: info ---> log ----> info ---> emails
Key: debug --> log       (none)    (dropped)

If a binding with the same Key and arguments already exists between the exchange and queue, the attempt to rebind will be ignored and the existing binding will be retained.

In the case that multiple bindings may cause the message to be routed to the same queue, the server will only route the publishing once. This is possible with topic exchanges.

QueueBind("pagers", "alert", "amq.topic", false, nil)
QueueBind("emails", "info", "amq.topic", false, nil)
QueueBind("emails", "#", "amq.topic", false, nil) // match everything

Delivery       Exchange        Key       Queue
-----------------------------------------------
Key: alert --> amq.topic ----> alert --> pagers
Key: info ---> amq.topic ----> # ------> emails
                         \---> info ---/
Key: debug --> amq.topic ----> # ------> emails

It is only possible to bind a Durable queue to a Durable exchange regardless of whether the queue or exchange is auto-deleted. Bindings between Durable queues and exchanges will also be restored on server restart.

If the binding could not complete, an error will be returned and the channel will be closed.

When NoWait is false and the queue could not be bound, the channel will be closed with an error.

func (*Channel) QueueDeclare

func (channel *Channel) QueueDeclare(
	name string, durable, autoDelete, exclusive, noWait bool, args Table,
) (queue Queue, err error)

QueueDeclare declares a queue to hold messages and deliver to consumers. Declaring creates a queue if it doesn'tb already exist, or ensures that an existing queue matches the same parameters.

Every queue declared gets a default binding to the empty exchange "" which has the type "direct" with the routing Key matching the queue's Name. With this default binding, it is possible to publish messages that route directly to this queue by publishing to "" with the routing Key of the queue Name.

QueueDeclare("alerts", true, false, false, false, nil)
Publish("", "alerts", false, false, Publishing{Body: []byte("...")})

Delivery       Exchange  Key       Queue
-----------------------------------------------
Key: alerts -> ""     -> alerts -> alerts

The queue Name may be empty, in which case the server will generate a unique Name which will be returned in the Name field of Queue struct.

Durable and Non-Auto-Deleted queues will survive server restarts and remain when there are no remaining consumers or bindings. Persistent publishings will be restored in this queue on server restart. These queues are only able to be bound to Durable exchanges.

Non-Durable and Auto-Deleted queues will not be redeclared on server restart and will be deleted by the server after a short time when the last consumer is canceled or the last consumer's channel is closed. Queues with this lifetime can also be deleted normally with QueueDelete. These Durable queues can only be bound to non-Durable exchanges.

Non-Durable and Non-Auto-Deleted queues will remain declared as long as the server is running regardless of how many consumers. This lifetime is useful for temporary topologies that may have long delays between consumer activity. These queues can only be bound to non-Durable exchanges.

Durable and Auto-Deleted queues will be restored on server restart, but without active consumers will not survive and be removed. This Lifetime is unlikely to be useful.

Exclusive queues are only accessible by the connection that declares them and will be deleted when the connection closes. Channels on other connections will receive an error when attempting to declare, bind, consume, purge or delete a queue with the same Name.

When NoWait is true, the queue will assume to be declared on the server. A channel exception will arrive if the conditions are met for existing queues or attempting to modify an existing queue from a different connection.

When the error return value is not nil, you can assume the queue could not be declared with these parameters, and the channel will be closed.

---

ROGER NOTE: Queues declared with a Roger, Rabbit Channel will be automatically re-declared upon channel disconnect recovery. Calling channel.QueueDelete() will remove the queue from the list of queues to be re-declared in case of a disconnect.

This may cases where queues deleted by other producers / consumers are automatically re-declared. Future updates will introduce more control over when and how queues are re-declared on reconnection.

Example (ReDeclare)

Declared queues are re-declared on disconnect.

package main

import (
	"context"
	"fmt"
	"github.com/peake100/rogerRabbit-go/pkg/amqp"
	"github.com/peake100/rogerRabbit-go/pkg/amqptest"
	"testing"
)

func main() {
	// Get a new connection to our test broker.
	connection, err := amqp.DialCtx(context.Background(), amqptest.TestDialAddress)
	if err != nil {
		panic(err)
	}

	// Close the connection on exit.
	defer connection.Close()

	// Get a new channel from our robust connection for publishing.
	channel, err := connection.Channel()
	if err != nil {
		panic(err)
	}

	queueName := "example_queue_declare_robust"

	// If we try to inspect this queue before declaring it, we will get an error.
	_, err = channel.QueueInspect(queueName)
	if err == nil {
		panic("expected queue inspect error")
	}
	fmt.Println("INSPECT ERROR:", err)

	// Declare the queue.
	_, err = channel.QueueDeclare(
		queueName,
		false, // durable
		true,  // autoDelete
		false, // exclusive
		false, // noWait
		nil,   // args
	)
	if err != nil {
		panic(err)
	}

	// Delete the queue to clean up
	defer channel.QueueDelete(queueName, false, false, false)

	// Inspect the queue.
	queue, err := channel.QueueInspect(queueName)
	if err != nil {
		panic(err)
	}
	fmt.Println("INSPECTION:", queue.Name)

	// Force a re-connection
	channel.Test(new(testing.T)).ForceReconnect(context.Background())

	// Inspect the queue again, it will already have been re-declared
	queue, err = channel.QueueInspect(queueName)
	if err != nil {
		panic(err)
	}
	fmt.Println("INSPECTION:", queue.Name)

	// Delete the queue to clean up
	_, err = channel.QueueDelete(queueName, false, false, false)
	if err != nil {
		panic(err)
	}

	// INSPECT ERROR: Exception (404) Reason: "NOT_FOUND - no queue 'example_queue_declare_robust' in vhost '/'"
	// INSPECTION: example_queue_declare_robust
	// INSPECTION: example_queue_declare_robust
}
Output:

func (*Channel) QueueDeclarePassive

func (channel *Channel) QueueDeclarePassive(
	name string, durable, autoDelete, exclusive, noWait bool, args Table,
) (queue Queue, err error)

QueueDeclarePassive is functionally and parametrically equivalent to QueueDeclare, except that it sets the "passive" attribute to true. A passive queue is assumed by RabbitMQ to already exist, and attempting to connect to a non-existent queue will cause RabbitMQ to throw an exception. This function can be used to test for the existence of a queue.

func (*Channel) QueueDelete

func (channel *Channel) QueueDelete(
	name string, ifUnused, ifEmpty, noWait bool,
) (count int, err error)

QueueDelete removes the queue from the server including all bindings then purges the messages based on server configuration, returning the number of messages purged.

When ifUnused is true, the queue will not be deleted if there are any consumers on the queue. If there are consumers, an error will be returned and the channel will be closed.

When ifEmpty is true, the queue will not be deleted if there are any messages remaining on the queue. If there are messages, an error will be returned and the channel will be closed.

When NoWait is true, the queue will be deleted without waiting for a response from the server. The purged message publishCount will not be meaningful. If the queue could not be deleted, a channel exception will be raised and the channel will be closed.

---

ROGER NOTE: Calling QueueDelete will remove a queue from the list of queues to be re-declared on reconnections of the underlying streadway/amqp.Channel object.

func (*Channel) QueueInspect

func (channel *Channel) QueueInspect(name string) (queue Queue, err error)

QueueInspect passively declares a queue by Name to inspect the current message publishCount and consumer publishCount.

Use this method to check how many messages ready for delivery reside in the queue, how many consumers are receiving deliveries, and whether a queue by this Name already exists.

If the queue by this Name exists, use Channel.QueueDeclare check if it is declared with specific parameters.

If a queue by this Name does not exist, an error will be returned and the channel will be closed.

func (*Channel) QueuePurge

func (channel *Channel) QueuePurge(name string, noWait bool) (
	count int, err error,
)

QueuePurge removes all messages from the named queue which are not waiting to be acknowledged. Messages that have been delivered but have not yet been acknowledged will not be removed.

When successful, returns the number of messages purged.

If NoWait is true, do not wait for the server response and the number of messages purged will not be meaningful.

func (*Channel) QueueUnbind

func (channel *Channel) QueueUnbind(name, key, exchange string, args Table) error

QueueUnbind removes a binding between an exchange and queue matching the Key and arguments.

It is possible to send and empty string for the exchange Name which means to unbind the queue from the default exchange.

func (*Channel) Reject

func (channel *Channel) Reject(tag uint64, requeue bool) error

Reject negatively acknowledges a delivery by its delivery tag. Prefer Nack over Reject when communicating with a RabbitMQ server because you can Nack multiple messages, reducing the amount of protocol messages to exchange.

See also Delivery.Reject

---

ROGER NOTE: If a tag (or a tag range when rejecting multiple tags) is from a previously disconnected channel, a ErrCantAcknowledgeOrphans will be returned, which is a new error type specific to the Roger implementation.

When nacking multiple tags, it is possible that some of the tags will be from a closed underlying channel, and therefore be orphaned, and some will be from the current channel, and therefore be successful. In such cases, the ErrCantAcknowledgeOrphans will still be returned, and can be checked for which tag ranges could not be rejected.

func (*Channel) Test

func (channel *Channel) Test(tb testing.TB) *ChannelTesting

Test returns an object with methods for testing the Channel.

Example
package main

import (
	"context"
	"fmt"
	"github.com/peake100/rogerRabbit-go/pkg/amqp"
	"github.com/peake100/rogerRabbit-go/pkg/amqptest"
	"testing"
	"time"
)

func main() {
	// Get a new connection to our test broker.
	connection, err := amqp.DialCtx(context.Background(), amqptest.TestDialAddress)
	if err != nil {
		panic(err)
	}
	defer connection.Close()

	// Get a new channel from our robust connection for publishing. The channel is
	// created with our default middleware.
	channel, err := connection.Channel()
	if err != nil {
		panic(err)
	}

	// Get a channel testing harness. In a real test function, you would pass the
	// test's *testing.T value. Here, we will just pass a dummy one.
	testHarness := channel.Test(new(testing.T))

	// We can use the test harness to force the channel to reconnect. If a reconnection
	// does not occur before the passed context expires, the test will be failed.
	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
	defer cancel()
	testHarness.ForceReconnect(ctx)

	// We can check how many times a reconnection has occurred. The first time we
	// connect to the broker is counted, so we should get '2':
	fmt.Println("RECONNECTION COUNT:", testHarness.ReconnectionCount())

	// exit.

	// RECONNECTION COUNT: 2
}
Output:

func (*Channel) Tx

func (channel *Channel) Tx() error

Tx puts the channel into transaction mode on the server. All publishings and acknowledgments following this method will be atomically committed or rolled back for a single queue. Call either Channel.TxCommit or Channel.TxRollback to leave a this transaction and immediately start a new transaction.

The atomicity across multiple queues is not defined as queue declarations and bindings are not included in the transaction.

The behavior of publishings that are delivered as mandatory or immediate while the channel is in a transaction is not defined.

Once a channel has been put into transaction mode, it cannot be taken out of transaction mode. Use a different channel for non-transactional semantics.

---

ROGER NOTE: transactions are not currently implemented, and calling any of the Tx methods will result in a panic. The author of this library is not familiar with the intricacies of amqp transactions, and how a channel in a transaction state should behave over a disconnect.

PRs to add this functionality are welcome.

func (*Channel) TxCommit

func (channel *Channel) TxCommit() error

TxCommit atomically commits all publishings and acknowledgments for a single queue and immediately start a new transaction.

Calling this method without having called Channel.Tx is an error.

---

ROGER NOTE: transactions are not currently implemented, and calling any of the Tx methods will result in a panic. The author of this library is not familiar with the intricacies of amqp transactions, and how a channel in a transaction state should behave over a disconnect.

PRs to add this functionality are welcome.

func (*Channel) TxRollback

func (channel *Channel) TxRollback() error

TxRollback atomically rolls back all publishings and acknowledgments for a single queue and immediately start a new transaction.

Calling this method without having called Channel.Tx is an error.

ROGER NOTE: transactions are not currently implemented, and calling any of the Tx methods will result in a panic. The author of this library is not familiar with the intricacies of amqp transactions, and how a channel in a transaction state should behave over a disconnect.

PRs to add this functionality are welcome.

type ChannelMiddlewares

type ChannelMiddlewares struct {
	// contains filtered or unexported fields
}

ChannelMiddlewares holds all middlewares to be registered on transports created with a Config.

func (*ChannelMiddlewares) AddAck

func (config *ChannelMiddlewares) AddAck(middleware amqpmiddleware.Ack)

AddAck adds a new middleware to be invoked on Channel.Ack method calls.

func (*ChannelMiddlewares) AddChannelReconnect

func (config *ChannelMiddlewares) AddChannelReconnect(middleware amqpmiddleware.ChannelReconnect)

AddChannelReconnect adds a new middleware to be invoked on a Channel reconnection event.

func (*ChannelMiddlewares) AddClose

func (config *ChannelMiddlewares) AddClose(middleware amqpmiddleware.Close)

AddClose adds a new middleware to be invoked on Channel.Close method calls.

func (*ChannelMiddlewares) AddConfirm

func (config *ChannelMiddlewares) AddConfirm(middleware amqpmiddleware.Confirm)

AddConfirm adds a new middleware to be invoked on Channel.Confirm method calls.

func (*ChannelMiddlewares) AddConsume

func (config *ChannelMiddlewares) AddConsume(middleware amqpmiddleware.Consume)

AddConsume adds a new middleware to be invoked on Channel.Consume method calls.

NOTE: this is a distinct middleware from AddConsumeEvents, which fires on every delivery sent from the broker. This event only fires once when the Channel.Consume method is first called.

func (*ChannelMiddlewares) AddConsumeEvents

func (config *ChannelMiddlewares) AddConsumeEvents(middleware amqpmiddleware.ConsumeEvents)

AddConsumeEvents adds a new middleware to be invoked on events sent to callers of Channel.Consume.

func (*ChannelMiddlewares) AddExchangeBind

func (config *ChannelMiddlewares) AddExchangeBind(middleware amqpmiddleware.ExchangeBind)

AddExchangeBind adds a new middleware to be invoked on Channel.ExchangeBind method calls.

func (*ChannelMiddlewares) AddExchangeDeclare

func (config *ChannelMiddlewares) AddExchangeDeclare(middleware amqpmiddleware.ExchangeDeclare)

AddExchangeDeclare adds a new middleware to be invoked on Channel.ExchangeDeclare method calls.

func (*ChannelMiddlewares) AddExchangeDeclarePassive

func (config *ChannelMiddlewares) AddExchangeDeclarePassive(middleware amqpmiddleware.ExchangeDeclare)

AddExchangeDeclarePassive adds a new middleware to be invoked on Channel.ExchangeDeclarePassive method calls.

func (*ChannelMiddlewares) AddExchangeDelete

func (config *ChannelMiddlewares) AddExchangeDelete(middleware amqpmiddleware.ExchangeDelete)

AddExchangeDelete adds a new middleware to be invoked on Channel.ExchangeDelete method calls.

func (*ChannelMiddlewares) AddExchangeUnbind

func (config *ChannelMiddlewares) AddExchangeUnbind(middleware amqpmiddleware.ExchangeUnbind)

AddExchangeUnbind adds a new middleware to be invoked on Channel.ExchangeUnbind method calls.

func (*ChannelMiddlewares) AddFlow

func (config *ChannelMiddlewares) AddFlow(middleware amqpmiddleware.Flow)

AddFlow adds a new middleware to be invoked on Channel.Flow method calls.

func (*ChannelMiddlewares) AddGet

func (config *ChannelMiddlewares) AddGet(middleware amqpmiddleware.Get)

AddGet adds a new middleware to be invoked on Channel.Get method calls.

func (*ChannelMiddlewares) AddNack

func (config *ChannelMiddlewares) AddNack(middleware amqpmiddleware.Nack)

AddNack adds a new middleware to be invoked on Channel.Nack method calls.

func (*ChannelMiddlewares) AddNotifyCancel

func (config *ChannelMiddlewares) AddNotifyCancel(middleware amqpmiddleware.NotifyCancel)

AddNotifyCancel adds a new middleware to be invoked on Channel.NotifyCancel method calls.

func (*ChannelMiddlewares) AddNotifyCancelEvents

func (config *ChannelMiddlewares) AddNotifyCancelEvents(middleware amqpmiddleware.NotifyCancelEvents)

AddNotifyCancelEvents adds a new middleware to be invoked on events sent to callers of Channel.NotifyCancel.

func (*ChannelMiddlewares) AddNotifyClose

func (config *ChannelMiddlewares) AddNotifyClose(middleware amqpmiddleware.NotifyClose)

AddNotifyClose adds a new middleware to be invoked on Channel.NotifyClose method calls.

func (*ChannelMiddlewares) AddNotifyCloseEvents

func (config *ChannelMiddlewares) AddNotifyCloseEvents(middleware amqpmiddleware.NotifyCloseEvents)

AddNotifyCloseEvents adds a new middleware to be invoked on all events sent to callers of Channel.NotifyClose.

func (*ChannelMiddlewares) AddNotifyConfirm

func (config *ChannelMiddlewares) AddNotifyConfirm(middleware amqpmiddleware.NotifyConfirm)

AddNotifyConfirm adds a new middleware to be invoked on Channel.NotifyConfirm method calls.

func (*ChannelMiddlewares) AddNotifyConfirmEvents

func (config *ChannelMiddlewares) AddNotifyConfirmEvents(middleware amqpmiddleware.NotifyConfirmEvents)

AddNotifyConfirmEvents adds a new middleware to be invoked on events sent to callers of Channel.NotifyConfirm.

func (*ChannelMiddlewares) AddNotifyConfirmOrOrphaned

func (config *ChannelMiddlewares) AddNotifyConfirmOrOrphaned(middleware amqpmiddleware.NotifyConfirmOrOrphaned)

AddNotifyConfirmOrOrphaned adds a new middleware to be invoked on Channel.NotifyConfirmOrOrphaned method calls.

func (*ChannelMiddlewares) AddNotifyConfirmOrOrphanedEvents

func (config *ChannelMiddlewares) AddNotifyConfirmOrOrphanedEvents(
	middleware amqpmiddleware.NotifyConfirmOrOrphanedEvents,
)

AddNotifyConfirmOrOrphanedEvents adds a new middleware to be invoked on events sent to callers of Channel.NotifyConfirmOrOrphaned.

func (*ChannelMiddlewares) AddNotifyDial

func (config *ChannelMiddlewares) AddNotifyDial(middleware amqpmiddleware.NotifyDial)

AddNotifyDial adds a new middleware to be invoked on Channel.NotifyDial method calls.

func (*ChannelMiddlewares) AddNotifyDialEvents

func (config *ChannelMiddlewares) AddNotifyDialEvents(middleware amqpmiddleware.NotifyDialEvents)

AddNotifyDialEvents adds a new middleware to be invoked on all events sent to callers of Channel.NotifyDial.

func (*ChannelMiddlewares) AddNotifyDisconnect

func (config *ChannelMiddlewares) AddNotifyDisconnect(middleware amqpmiddleware.NotifyDisconnect)

AddNotifyDisconnect adds a new middleware to be invoked on Channel.NotifyDisconnect method calls.

func (*ChannelMiddlewares) AddNotifyDisconnectEvents

func (config *ChannelMiddlewares) AddNotifyDisconnectEvents(middleware amqpmiddleware.NotifyDisconnectEvents)

AddNotifyDisconnectEvents adds a new middleware to be invoked on all events sent to callers of Channel.NotifyDial.

func (*ChannelMiddlewares) AddNotifyFlow

func (config *ChannelMiddlewares) AddNotifyFlow(middleware amqpmiddleware.NotifyFlow)

AddNotifyFlow adds a new middleware to be invoked on Channel.NotifyFlow method calls.

func (*ChannelMiddlewares) AddNotifyFlowEvents

func (config *ChannelMiddlewares) AddNotifyFlowEvents(middleware amqpmiddleware.NotifyFlowEvents)

AddNotifyFlowEvents adds a new middleware to be invoked on events sent to callers of Channel.NotifyCancel.

func (*ChannelMiddlewares) AddNotifyPublish

func (config *ChannelMiddlewares) AddNotifyPublish(middleware amqpmiddleware.NotifyPublish)

AddNotifyPublish adds a new middleware to be invoked on Channel.NotifyPublish method calls.

func (*ChannelMiddlewares) AddNotifyPublishEvents

func (config *ChannelMiddlewares) AddNotifyPublishEvents(middleware amqpmiddleware.NotifyPublishEvents)

AddNotifyPublishEvents adds a new middleware to be invoked on events sent to callers of Channel.NotifyPublish.

func (*ChannelMiddlewares) AddNotifyReturn

func (config *ChannelMiddlewares) AddNotifyReturn(middleware amqpmiddleware.NotifyReturn)

AddNotifyReturn adds a new middleware to be invoked on Channel.NotifyReturn method calls.

func (*ChannelMiddlewares) AddNotifyReturnEvents

func (config *ChannelMiddlewares) AddNotifyReturnEvents(middleware amqpmiddleware.NotifyReturnEvents)

AddNotifyReturnEvents adds a new middleware to be invoked on events sent to callers of Channel.NotifyReturn.

func (*ChannelMiddlewares) AddProviderFactory

func (config *ChannelMiddlewares) AddProviderFactory(factory amqpmiddleware.ProviderFactory)

AddProviderFactory adds a factory function which creates a new middleware provider value which must implement one of the Middleware Provider interfaces from the amqpmiddleware package, like amqpmiddleware.ProvidesQueueDeclare, and must implement at-minimum amqpmiddleware.ProvidesMiddleware.

When middleware is registered on a new Channel, the provider factory will be called and all provider methods will be registered as middleware.

If you wish the same provider value's methods to be used as middleware for every *Channel created by a *Connection, consider using AddProviderMethods instead.

func (*ChannelMiddlewares) AddProviderMethods

func (config *ChannelMiddlewares) AddProviderMethods(provider amqpmiddleware.ProvidesMiddleware) error

AddProviderMethods adds a amqpmiddleware.ProvidesMiddleware's methods as Middleware. If this method is invoked directly by the user, the same type value's method will be added to all *Channel values created by a *Connection.

If a new provider value should be made for each *Channel, consider using AddProviderFactory instead.

func (*ChannelMiddlewares) AddPublish

func (config *ChannelMiddlewares) AddPublish(middleware amqpmiddleware.Publish)

AddPublish adds a new middleware to be invoked on Channel.Publish method calls.

func (*ChannelMiddlewares) AddQoS

func (config *ChannelMiddlewares) AddQoS(middleware amqpmiddleware.QoS)

AddQoS adds a new middleware to be invoked on Channel.Qos method calls.

func (*ChannelMiddlewares) AddQueueBind

func (config *ChannelMiddlewares) AddQueueBind(middleware amqpmiddleware.QueueBind)

AddQueueBind adds a new middleware to be invoked on Channel.QueueBind method calls.

func (*ChannelMiddlewares) AddQueueDeclare

func (config *ChannelMiddlewares) AddQueueDeclare(middleware amqpmiddleware.QueueDeclare)

AddQueueDeclare adds a new middleware to be invoked on Channel.QueueDeclare method calls.

func (*ChannelMiddlewares) AddQueueDeclarePassive

func (config *ChannelMiddlewares) AddQueueDeclarePassive(middleware amqpmiddleware.QueueDeclare)

AddQueueDeclarePassive adds a new middleware to be invoked on Channel.QueueDeclarePassive method calls.

func (*ChannelMiddlewares) AddQueueDelete

func (config *ChannelMiddlewares) AddQueueDelete(middleware amqpmiddleware.QueueDelete)

AddQueueDelete adds a new middleware to be invoked on Channel.QueueDelete method calls.

func (*ChannelMiddlewares) AddQueueInspect

func (config *ChannelMiddlewares) AddQueueInspect(middleware amqpmiddleware.QueueInspect)

AddQueueInspect adds a new middleware to be invoked on Channel.QueueInspect method calls.

func (*ChannelMiddlewares) AddQueuePurge

func (config *ChannelMiddlewares) AddQueuePurge(middleware amqpmiddleware.QueuePurge)

AddQueuePurge adds a new middleware to be invoked on Channel.QueuePurge method calls.

func (*ChannelMiddlewares) AddQueueUnbind

func (config *ChannelMiddlewares) AddQueueUnbind(middleware amqpmiddleware.QueueUnbind)

AddQueueUnbind adds a new middleware to be invoked on Channel.QueueUnbind method calls.

func (*ChannelMiddlewares) AddReject

func (config *ChannelMiddlewares) AddReject(middleware amqpmiddleware.Reject)

AddReject adds a new middleware to be invoked on Channel.Reject method calls.

type ChannelTesting

type ChannelTesting struct {
	// TransportTesting embedded common methods between Connections and Channels.
	*TransportTesting
	// contains filtered or unexported fields
}

ChannelTesting exposes a number of methods designed for testing.

func (*ChannelTesting) ConnTest

func (tester *ChannelTesting) ConnTest() *TransportTesting

ConnTest returns a tester for the RogerConnection feeding this channel.

func (*ChannelTesting) GetMiddlewareProvider

func (tester *ChannelTesting) GetMiddlewareProvider(
	id amqpmiddleware.ProviderTypeID,
) amqpmiddleware.ProvidesMiddleware

GetMiddlewareProvider returns a middleware provider for inspection or fails the test immediately.

func (*ChannelTesting) ReconnectionCount

func (tester *ChannelTesting) ReconnectionCount() uint64

ReconnectionCount returns the number of times this channel has been reconnected.

func (*ChannelTesting) UnderlyingChannel

func (tester *ChannelTesting) UnderlyingChannel() *BasicChannel

UnderlyingChannel returns the current underlying streadway/amqp.Channel being used.

func (*ChannelTesting) UnderlyingConnection

func (tester *ChannelTesting) UnderlyingConnection() *BasicConnection

UnderlyingConnection returns the current underlying streadway/amqp.Connection being used to feed this Channel.

type Config

type Config struct {
	// The SASL mechanisms to try in the client request, and the successful
	// mechanism used on the Connection object.
	// If SASL is nil, PlainAuth from the URL is used.
	SASL []Authentication

	// Vhost specifies the namespace of permissions, exchanges, queues and
	// bindings on the server. Dial sets this to the path parsed from the URL.
	Vhost string

	ChannelMax int           // 0 max channels means 2^16 - 1
	FrameSize  int           // 0 max bytes means unlimited
	Heartbeat  time.Duration // less than 1s uses the server's interval

	// TLSClientConfig specifies the client configuration of the TLS connection
	// when establishing a tls livesOnce.
	// If the URL uses an amqps scheme, then an empty tls.Config with the
	// ServerName from the URL is used.
	TLSClientConfig *tls.Config

	// Properties is table of properties that the client advertises to the server.
	// This is an optional setting - if the application does not set this,
	// the underlying library will use a generic set of client properties.
	Properties Table

	// Connection locale that we expect to always be en_US
	// Even though servers must return it as per the AMQP 0-9-1 spec,
	// we are not aware of it being used other than to satisfy the spec requirements
	Locale string

	// Dial returns a net.Conn prepared for a TLS handshake with TSLClientConfig,
	// then an AMQP connection handshake.
	// If Dial is nil, net.DialTimeout with a 30s connection and 30s deadline is
	// used during TLS and AMQP handshaking.
	Dial func(network, addr string) (net.Conn, error)

	// If set to true, the default handlers will not be registered on connection or
	// channels created as a result of passing this config.
	NoDefaultMiddleware bool

	// DefaultLoggerLevel is the logger level for the default logging middleware.
	//
	// If NoDefaultMiddleware is true, this setting will have no effect.
	//
	// Default: zerolog.InfoLevel.
	DefaultLoggerLevel zerolog.Level

	// ConnectionMiddleware holds middleware to add to connection method and event
	// handlers.
	ConnectionMiddleware ConnectionMiddlewares

	// ChannelMiddleware holds middleware to add to channel method and event handlers.
	ChannelMiddleware ChannelMiddlewares
}

Config is used in DialConfig and Open to specify the desired tuning parameters used during a connection open handshake. The negotiated tuning will be stored in the returned connection's Config field.

---

ROGER NOTE: This config type is a re-implementation of streadway/amqp.Config. We any code that can declare such a config will work with this type. In the future this type may add additional options for rogerRabbit-go/amqp.

func DefaultConfig

func DefaultConfig() Config

DefaultConfig returns the default config for Dial() as it is in the streadway application.

type Confirmation

type Confirmation = internal.Confirmation

Confirmation is an alias to datamodels.Confirmation.

type Connection

type Connection struct {
	// contains filtered or unexported fields
}

Connection manages the serialization and deserialization of frames from IO and dispatches the frames to the appropriate channel. All RPC methods and asynchronous Publishing, Delivery, Ack, Nack and Return messages are multiplexed on this channel. There must always be active receivers for every asynchronous message on this connection.

---

ROGER NOTE: A robust connection acts as a normal connection except that is automatically re-dials the broker when the underlying connection is lost.

Unless otherwise noted at the beginning of their descriptions, all methods work exactly as their streadway counterparts, but will automatically re-attempt on ErrClosed errors. All other errors will be returned as normal. Descriptions have been copy-pasted from the streadway library for convenience.

As this library evolves, other error types may be added to the list of errors we will automatically suppress and re-establish connection for, but in these early days, ErrClosed seems like a good place to start.

func Dial

func Dial(url string) (*Connection, error)

Dial accepts a string in the AMQP URI format and returns a new Connection over TCP using PlainAuth. Defaults to a server heartbeat interval of 10 seconds and sets the handshake deadline to 30 seconds. After handshake, deadlines are cleared.

Dial uses the zero value of tls.Config when it encounters an amqps:// scheme. It is equivalent to calling DialTLS(amqp, nil).

func DialConfig

func DialConfig(url string, config Config) (*Connection, error)

DialConfig accepts a string in the AMQP URI format and a configuration for the livesOnce and connection setup, returning a new Connection. Defaults to a server heartbeat interval of 10 seconds and sets the initial read deadline to 30 seconds.

func DialConfigCtx

func DialConfigCtx(
	ctx context.Context, url string, config Config,
) (*Connection, error)

DialConfigCtx is as DialConfig, but endlessly redials the connection until ctx is cancelled. Once returned, cancelling ctx does not affect the connection.

func DialCtx

func DialCtx(ctx context.Context, url string) (*Connection, error)

DialCtx is as Dial, but endlessly redials the connection until ctx is cancelled. Once returned, cancelling ctx does not affect the connection.

func DialTLS

func DialTLS(url string, amqps *tls.Config) (*Connection, error)

DialTLS accepts a string in the AMQP URI format and returns a new Connection over TCP using PlainAuth. Defaults to a server heartbeat interval of 10 seconds and sets the initial read deadline to 30 seconds.

DialTLS uses the provided tls.Config when encountering an amqps:// scheme.

func DialTLSCtx

func DialTLSCtx(ctx context.Context, url string, amqps *tls.Config) (*Connection, error)

DialTLSCtx is as DialTLS, but endlessly redials the connection until ctx is cancelled. Once returned, cancelling ctx does not affect the connection.

func (*Connection) Channel

func (conn *Connection) Channel() (*Channel, error)

Channel opens a unique, concurrent server channelConsume to process the bulk of AMQP messages. Any error from methods on this receiver will render the receiver invalid and a new Channel should be opened.

---

ROGER NOTE: Unlike the normal channels, roger channels will automatically reconnect on all errors until Channel.Close() is called.

func (*Connection) Close

func (manager *Connection) Close() error

Close the robust connection. This both closes the current connection and keeps it from reconnecting.

func (*Connection) IsClosed

func (manager *Connection) IsClosed() bool

IsClosed returns true if the connection is marked as closed, otherwise false is returned.

--

ROGER NOTE: unlike streadway/amqp, which only implements IsClosed() on connection objects, rogerRabbit makes IsClosed() available on both connections and channels. IsClosed() will return false until the connection / channel is shut down, even if the underlying connection is currently disconnected and waiting to reconnectMiddleware.

func (*Connection) NotifyClose

func (manager *Connection) NotifyClose(
	receiver chan *streadway.Error,
) chan *streadway.Error

NotifyClose is as NotifyClose on streadway Connection/Channel.NotifyClose. Subscribers to Close events will not be notified when a reconnection occurs under the hood, only when the roger Connection or Channel is closed by calling the Close method. This mirrors the streadway implementation, where Close events are only sent once when the livesOnce object becomes unusable.

For finer-grained connection status, see NotifyDial and NotifyDisconnect, which will both send individual events when the connection is lost or re-acquired.

func (*Connection) NotifyDial

func (manager *Connection) NotifyDial(
	receiver chan error,
) error

NotifyDial is new for robust Roger transportType objects. NotifyDial will send all subscribers an event notification every time we try to re-acquire a connection. This will include both failure AND successes.

func (*Connection) NotifyDisconnect

func (manager *Connection) NotifyDisconnect(
	receiver chan error,
) error

NotifyDisconnect is new for robust Roger transportType objects. NotifyDisconnect will send all subscribers an event notification every time the underlying connection is lost.

func (*Connection) Test

func (conn *Connection) Test(tb testing.TB) *ConnectionTesting

Test returns a ConnectionTesting object with a number of helper methods for testing Connection objects.

type ConnectionMiddlewares

type ConnectionMiddlewares struct {
	// contains filtered or unexported fields
}

ConnectionMiddlewares holds the middleware to add to Connection methods and events.

func (*ConnectionMiddlewares) AddClose

func (config *ConnectionMiddlewares) AddClose(middleware amqpmiddleware.Close)

AddClose adds a new middleware to be invoked on a Connection.Close call.

func (*ConnectionMiddlewares) AddConnectionReconnect

func (config *ConnectionMiddlewares) AddConnectionReconnect(middleware amqpmiddleware.ConnectionReconnect)

AddConnectionReconnect adds a new middleware to be invoked when a connection attempts to re-establish a connection.

func (*ConnectionMiddlewares) AddNotifyClose

func (config *ConnectionMiddlewares) AddNotifyClose(middleware amqpmiddleware.NotifyClose)

AddNotifyClose adds a new middleware to be invoked on a Connection.NotifyClose call.

func (*ConnectionMiddlewares) AddNotifyCloseEvents

func (config *ConnectionMiddlewares) AddNotifyCloseEvents(middleware amqpmiddleware.NotifyCloseEvents)

AddNotifyCloseEvents adds a new middleware to be invoked on each Connection.NotifyDial event.

func (*ConnectionMiddlewares) AddNotifyDial

func (config *ConnectionMiddlewares) AddNotifyDial(middleware amqpmiddleware.NotifyDial)

AddNotifyDial adds a new middleware to be invoked on a Connection.NotifyDial call.

func (*ConnectionMiddlewares) AddNotifyDialEvents

func (config *ConnectionMiddlewares) AddNotifyDialEvents(middleware amqpmiddleware.NotifyDialEvents)

AddNotifyDialEvents adds a new middleware to be invoked on each Connection.NotifyDial event.

func (*ConnectionMiddlewares) AddNotifyDisconnect

func (config *ConnectionMiddlewares) AddNotifyDisconnect(middleware amqpmiddleware.NotifyDisconnect)

AddNotifyDisconnect adds a new middleware to be invoked on a Connection.NotifyDisconnect call.

func (*ConnectionMiddlewares) AddNotifyDisconnectEvents

func (config *ConnectionMiddlewares) AddNotifyDisconnectEvents(middleware amqpmiddleware.NotifyDisconnectEvents)

AddNotifyDisconnectEvents adds a new middleware to be invoked on each Connection.NotifyDial event.

func (*ConnectionMiddlewares) AddProviderFactory

func (config *ConnectionMiddlewares) AddProviderFactory(factory amqpmiddleware.ProviderFactory)

AddProviderFactory adds a factory function which creates a new middleware provider value which must implement one of the Middleware Provider interfaces from the amqpmiddleware package, like amqpmiddleware.ProvidesClose.

When middleware is registered on a new Connection, the provider factory will be called and all provider methods will be registered as middleware.

If you wish the same provider value's methods to be used as middleware for every *Connection created by a Config, consider using AddProviderMethods instead.

func (*ConnectionMiddlewares) AddProviderMethods

func (config *ConnectionMiddlewares) AddProviderMethods(provider amqpmiddleware.ProvidesMiddleware) error

AddProviderMethods adds a amqpmiddleware.ProvidesMiddleware's methods as Middleware. If this method is invoked directly by the user, the same type value's method will be added to all *Connection values created by a Config

If a new provider value should be made for each Connection, consider using AddProviderFactory instead.

type ConnectionTesting

type ConnectionTesting struct {
	TransportTesting
	// contains filtered or unexported fields
}

ConnectionTesting offers methods for running tests with Connection.

func (*ConnectionTesting) UnderlyingConn

func (tester *ConnectionTesting) UnderlyingConn() *BasicConnection

UnderlyingConn returns the current underlying streadway/amqp.Connection.

type Decimal

type Decimal = streadway.Decimal

Decimal matches the AMQP decimal type. Scale is the number of decimal digits Scale == 2, Value == 12345, Decimal == 123.45

type Delivery

type Delivery = internal.Delivery

Delivery is an alias to datamodels.Delivery.

type ErrCantAcknowledgeOrphans

type ErrCantAcknowledgeOrphans = defaultmiddlewares.ErrCantAcknowledgeOrphans

ErrCantAcknowledgeOrphans is returned when an acknowledgement method (ack, nack, reject) cannot be completed because the original channel it was consumed from has been closed and replaced with a new one. When part of a multi-ack, it's possible that SOME tags will be orphaned and some will succeed, this error contains detailed information on both groups

type Error

type Error = streadway.Error

Error captures the code and reason a channelConsume or connection has been closed by the server.

type Publishing

type Publishing = streadway.Publishing

Publishing captures the client message sent to the server. The fields outside of the Headers table included in this struct mirror the underlying fields in the content frame. They use native types for convenience and efficiency.

type Queue

type Queue = streadway.Queue

Queue captures the current server state of the queue on the server returned from Channel.QueueDeclare or Channel.QueueInspect.

type Return

type Return = streadway.Return

Return captures a flattened struct of fields returned by the server when a Publishing is unable to be delivered either due to the `mandatory` flag set and no route found, or `immediate` flag set and no free consumer.

type Table

type Table = streadway.Table

Table stores user supplied fields of the following types:

bool
byte
float32
float64
int
int16
int32
int64
nil
string
time.Time
amqp.Decimal
amqp.Table
[]byte
[]interface{} - containing above types

Functions taking a table will immediately fail when the table contains a value of an unsupported type.

The caller must be specific in which precision of integer it wishes to encode.

Use a type assertion when reading values from a table for type conversion.

RabbitMQ expects int32 for integer values.

type TestReconnectSignaler

type TestReconnectSignaler struct {
	// contains filtered or unexported fields
}

TestReconnectSignaler allows us to block until a reconnection occurs during a test.

func (*TestReconnectSignaler) WaitOnReconnect

func (signaler *TestReconnectSignaler) WaitOnReconnect(ctx context.Context)

WaitOnReconnect blocks until a reconnection of the underlying livesOnce occurs. Once the first reconnection event occurs, this object will no longer block and a new signaler will need to be created for the next re-connection.

If no context is passed a context with 3-second timeout will be used.

type TransportTesting

type TransportTesting struct {
	// contains filtered or unexported fields
}

TransportTesting provides testing methods for testing Channel and Connection.

func (*TransportTesting) BlockReconnect

func (tester *TransportTesting) BlockReconnect()

BlockReconnect blocks a livesOnce from reconnecting. If too few calls to UnblockReconnect() are made, the block will be removed at the end of the test.

func (*TransportTesting) DisconnectTransport

func (tester *TransportTesting) DisconnectTransport()

DisconnectTransport closes the underlying livesOnce to force a reconnection.

func (*TransportTesting) ForceReconnect

func (tester *TransportTesting) ForceReconnect(ctx context.Context)

ForceReconnect forces a disconnect of the channel or connection and waits for a reconnection to occur or ctx to cancel. If a nil context is passed, a context with a 3-second timeout will be used instead.

func (*TransportTesting) SignalOnReconnect

func (tester *TransportTesting) SignalOnReconnect() *TestReconnectSignaler

SignalOnReconnect returns a signaler that can be used to wait on the next reconnection event of the livesOnce.

func (*TransportTesting) TransportLock

func (tester *TransportTesting) TransportLock() *sync.RWMutex

TransportLock is the underlying lock which controls access to the channel / connection. When held for read or write, a reconnection of the underlying livesOnce cannot occur.

func (*TransportTesting) UnblockReconnect

func (tester *TransportTesting) UnblockReconnect()

UnblockReconnect unblocks the livesOnce from reconnecting after calling BlockReconnect()

Directories

Path Synopsis
Package amqpmiddleware defines middleware signatures for methods on *amqp.Channel.
Package amqpmiddleware defines middleware signatures for methods on *amqp.Channel.
Package defaultmiddlewares houses the default middleware for amqp.Channel.
Package defaultmiddlewares houses the default middleware for amqp.Channel.
Package internal houses development helpers and should not be consumed by users.
Package internal houses development helpers and should not be consumed by users.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL