Documentation ¶
Overview ¶
Package amqp is a wrapper and drop-in replacement package for https://github.com/streadway/amqp with automatic redials, method middleware, and more.
Index ¶
- Constants
- Variables
- type Acknowledger
- type Authentication
- type BasicChannel
- type BasicConfig
- type BasicConfirmation
- type BasicConnection
- type Blocking
- type Channel
- func (channel *Channel) Ack(tag uint64, multiple bool) error
- func (manager *Channel) Close() error
- func (channel *Channel) Confirm(noWait bool) error
- func (channel *Channel) Consume(queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args Table) (deliveryChan <-chan internal.Delivery, err error)
- func (channel *Channel) ExchangeBind(destination, key, source string, noWait bool, args Table) (err error)
- func (channel *Channel) ExchangeDeclare(name, kind string, durable, autoDelete, internal, noWait bool, args Table) (err error)
- func (channel *Channel) ExchangeDeclarePassive(name, kind string, durable, autoDelete, internal, noWait bool, args Table) (err error)
- func (channel *Channel) ExchangeDelete(name string, ifUnused, noWait bool) (err error)
- func (channel *Channel) ExchangeUnbind(destination, key, source string, noWait bool, args Table) (err error)
- func (channel *Channel) Flow(active bool) error
- func (channel *Channel) Get(queue string, autoAck bool) (msg internal.Delivery, ok bool, err error)
- func (manager *Channel) IsClosed() bool
- func (channel *Channel) Nack(tag uint64, multiple bool, requeue bool) error
- func (channel *Channel) NotifyCancel(cancellations chan string) chan string
- func (manager *Channel) NotifyClose(receiver chan *streadway.Error) chan *streadway.Error
- func (channel *Channel) NotifyConfirm(ack, nack chan uint64) (chan uint64, chan uint64)
- func (channel *Channel) NotifyConfirmOrOrphaned(ack, nack, orphaned chan uint64) (chan uint64, chan uint64, chan uint64)
- func (manager *Channel) NotifyDial(receiver chan error) error
- func (manager *Channel) NotifyDisconnect(receiver chan error) error
- func (channel *Channel) NotifyFlow(flowNotifications chan bool) chan bool
- func (channel *Channel) NotifyPublish(confirm chan internal.Confirmation) chan internal.Confirmation
- func (channel *Channel) NotifyReturn(returns chan Return) chan Return
- func (channel *Channel) Publish(exchange string, key string, mandatory bool, immediate bool, msg Publishing) (err error)
- func (channel *Channel) Qos(prefetchCount, prefetchSize int, global bool) error
- func (channel *Channel) QueueBind(name, key, exchange string, noWait bool, args Table) error
- func (channel *Channel) QueueDeclare(name string, durable, autoDelete, exclusive, noWait bool, args Table) (queue Queue, err error)
- func (channel *Channel) QueueDeclarePassive(name string, durable, autoDelete, exclusive, noWait bool, args Table) (queue Queue, err error)
- func (channel *Channel) QueueDelete(name string, ifUnused, ifEmpty, noWait bool) (count int, err error)
- func (channel *Channel) QueueInspect(name string) (queue Queue, err error)
- func (channel *Channel) QueuePurge(name string, noWait bool) (count int, err error)
- func (channel *Channel) QueueUnbind(name, key, exchange string, args Table) error
- func (channel *Channel) Reject(tag uint64, requeue bool) error
- func (channel *Channel) Test(tb testing.TB) *ChannelTesting
- func (channel *Channel) Tx() error
- func (channel *Channel) TxCommit() error
- func (channel *Channel) TxRollback() error
- type ChannelMiddlewares
- func (config *ChannelMiddlewares) AddAck(middleware amqpmiddleware.Ack)
- func (config *ChannelMiddlewares) AddChannelReconnect(middleware amqpmiddleware.ChannelReconnect)
- func (config *ChannelMiddlewares) AddClose(middleware amqpmiddleware.Close)
- func (config *ChannelMiddlewares) AddConfirm(middleware amqpmiddleware.Confirm)
- func (config *ChannelMiddlewares) AddConsume(middleware amqpmiddleware.Consume)
- func (config *ChannelMiddlewares) AddConsumeEvents(middleware amqpmiddleware.ConsumeEvents)
- func (config *ChannelMiddlewares) AddExchangeBind(middleware amqpmiddleware.ExchangeBind)
- func (config *ChannelMiddlewares) AddExchangeDeclare(middleware amqpmiddleware.ExchangeDeclare)
- func (config *ChannelMiddlewares) AddExchangeDeclarePassive(middleware amqpmiddleware.ExchangeDeclare)
- func (config *ChannelMiddlewares) AddExchangeDelete(middleware amqpmiddleware.ExchangeDelete)
- func (config *ChannelMiddlewares) AddExchangeUnbind(middleware amqpmiddleware.ExchangeUnbind)
- func (config *ChannelMiddlewares) AddFlow(middleware amqpmiddleware.Flow)
- func (config *ChannelMiddlewares) AddGet(middleware amqpmiddleware.Get)
- func (config *ChannelMiddlewares) AddNack(middleware amqpmiddleware.Nack)
- func (config *ChannelMiddlewares) AddNotifyCancel(middleware amqpmiddleware.NotifyCancel)
- func (config *ChannelMiddlewares) AddNotifyCancelEvents(middleware amqpmiddleware.NotifyCancelEvents)
- func (config *ChannelMiddlewares) AddNotifyClose(middleware amqpmiddleware.NotifyClose)
- func (config *ChannelMiddlewares) AddNotifyCloseEvents(middleware amqpmiddleware.NotifyCloseEvents)
- func (config *ChannelMiddlewares) AddNotifyConfirm(middleware amqpmiddleware.NotifyConfirm)
- func (config *ChannelMiddlewares) AddNotifyConfirmEvents(middleware amqpmiddleware.NotifyConfirmEvents)
- func (config *ChannelMiddlewares) AddNotifyConfirmOrOrphaned(middleware amqpmiddleware.NotifyConfirmOrOrphaned)
- func (config *ChannelMiddlewares) AddNotifyConfirmOrOrphanedEvents(middleware amqpmiddleware.NotifyConfirmOrOrphanedEvents)
- func (config *ChannelMiddlewares) AddNotifyDial(middleware amqpmiddleware.NotifyDial)
- func (config *ChannelMiddlewares) AddNotifyDialEvents(middleware amqpmiddleware.NotifyDialEvents)
- func (config *ChannelMiddlewares) AddNotifyDisconnect(middleware amqpmiddleware.NotifyDisconnect)
- func (config *ChannelMiddlewares) AddNotifyDisconnectEvents(middleware amqpmiddleware.NotifyDisconnectEvents)
- func (config *ChannelMiddlewares) AddNotifyFlow(middleware amqpmiddleware.NotifyFlow)
- func (config *ChannelMiddlewares) AddNotifyFlowEvents(middleware amqpmiddleware.NotifyFlowEvents)
- func (config *ChannelMiddlewares) AddNotifyPublish(middleware amqpmiddleware.NotifyPublish)
- func (config *ChannelMiddlewares) AddNotifyPublishEvents(middleware amqpmiddleware.NotifyPublishEvents)
- func (config *ChannelMiddlewares) AddNotifyReturn(middleware amqpmiddleware.NotifyReturn)
- func (config *ChannelMiddlewares) AddNotifyReturnEvents(middleware amqpmiddleware.NotifyReturnEvents)
- func (config *ChannelMiddlewares) AddProviderFactory(factory amqpmiddleware.ProviderFactory)
- func (config *ChannelMiddlewares) AddProviderMethods(provider amqpmiddleware.ProvidesMiddleware) error
- func (config *ChannelMiddlewares) AddPublish(middleware amqpmiddleware.Publish)
- func (config *ChannelMiddlewares) AddQoS(middleware amqpmiddleware.QoS)
- func (config *ChannelMiddlewares) AddQueueBind(middleware amqpmiddleware.QueueBind)
- func (config *ChannelMiddlewares) AddQueueDeclare(middleware amqpmiddleware.QueueDeclare)
- func (config *ChannelMiddlewares) AddQueueDeclarePassive(middleware amqpmiddleware.QueueDeclare)
- func (config *ChannelMiddlewares) AddQueueDelete(middleware amqpmiddleware.QueueDelete)
- func (config *ChannelMiddlewares) AddQueueInspect(middleware amqpmiddleware.QueueInspect)
- func (config *ChannelMiddlewares) AddQueuePurge(middleware amqpmiddleware.QueuePurge)
- func (config *ChannelMiddlewares) AddQueueUnbind(middleware amqpmiddleware.QueueUnbind)
- func (config *ChannelMiddlewares) AddReject(middleware amqpmiddleware.Reject)
- type ChannelTesting
- func (tester *ChannelTesting) ConnTest() *TransportTesting
- func (tester *ChannelTesting) GetMiddlewareProvider(id amqpmiddleware.ProviderTypeID) amqpmiddleware.ProvidesMiddleware
- func (tester *ChannelTesting) ReconnectionCount() uint64
- func (tester *ChannelTesting) UnderlyingChannel() *BasicChannel
- func (tester *ChannelTesting) UnderlyingConnection() *BasicConnection
- type Config
- type Confirmation
- type Connection
- func Dial(url string) (*Connection, error)
- func DialConfig(url string, config Config) (*Connection, error)
- func DialConfigCtx(ctx context.Context, url string, config Config) (*Connection, error)
- func DialCtx(ctx context.Context, url string) (*Connection, error)
- func DialTLS(url string, amqps *tls.Config) (*Connection, error)
- func DialTLSCtx(ctx context.Context, url string, amqps *tls.Config) (*Connection, error)
- func (conn *Connection) Channel() (*Channel, error)
- func (manager *Connection) Close() error
- func (manager *Connection) IsClosed() bool
- func (manager *Connection) NotifyClose(receiver chan *streadway.Error) chan *streadway.Error
- func (manager *Connection) NotifyDial(receiver chan error) error
- func (manager *Connection) NotifyDisconnect(receiver chan error) error
- func (conn *Connection) Test(tb testing.TB) *ConnectionTesting
- type ConnectionMiddlewares
- func (config *ConnectionMiddlewares) AddClose(middleware amqpmiddleware.Close)
- func (config *ConnectionMiddlewares) AddConnectionReconnect(middleware amqpmiddleware.ConnectionReconnect)
- func (config *ConnectionMiddlewares) AddNotifyClose(middleware amqpmiddleware.NotifyClose)
- func (config *ConnectionMiddlewares) AddNotifyCloseEvents(middleware amqpmiddleware.NotifyCloseEvents)
- func (config *ConnectionMiddlewares) AddNotifyDial(middleware amqpmiddleware.NotifyDial)
- func (config *ConnectionMiddlewares) AddNotifyDialEvents(middleware amqpmiddleware.NotifyDialEvents)
- func (config *ConnectionMiddlewares) AddNotifyDisconnect(middleware amqpmiddleware.NotifyDisconnect)
- func (config *ConnectionMiddlewares) AddNotifyDisconnectEvents(middleware amqpmiddleware.NotifyDisconnectEvents)
- func (config *ConnectionMiddlewares) AddProviderFactory(factory amqpmiddleware.ProviderFactory)
- func (config *ConnectionMiddlewares) AddProviderMethods(provider amqpmiddleware.ProvidesMiddleware) error
- type ConnectionTesting
- type Decimal
- type Delivery
- type ErrCantAcknowledgeOrphans
- type Error
- type Publishing
- type Queue
- type Return
- type Table
- type TestReconnectSignaler
- type TransportTesting
- func (tester *TransportTesting) BlockReconnect()
- func (tester *TransportTesting) DisconnectTransport()
- func (tester *TransportTesting) ForceReconnect(ctx context.Context)
- func (tester *TransportTesting) SignalOnReconnect() *TestReconnectSignaler
- func (tester *TransportTesting) TransportLock() *sync.RWMutex
- func (tester *TransportTesting) UnblockReconnect()
Examples ¶
Constants ¶
const ( ContentTooLarge = streadway.ContentTooLarge NoRoute = streadway.NoRoute NoConsumers = streadway.NoConsumers ConnectionForced = streadway.ConnectionForced InvalidPath = streadway.InvalidPath AccessRefused = streadway.AccessRefused NotFound = streadway.NotFound ResourceLocked = streadway.ResourceLocked PreconditionFailed = streadway.PreconditionFailed FrameError = streadway.FrameError SyntaxError = streadway.SyntaxError CommandInvalid = streadway.CommandInvalid ChannelError = streadway.ChannelError UnexpectedFrame = streadway.UnexpectedFrame ResourceError = streadway.ResourceError NotAllowed = streadway.NotAllowed NotImplemented = streadway.NotImplemented InternalError = streadway.InternalError )
Copy over the error codes
const ( Persistent = streadway.Persistent Transient = streadway.Transient )
DeliveryMode. Transient means higher throughput but messages will not be restored on broker restart. The delivery mode of publishings is unrelated to the durability of the queues they reside on. Transient messages will not be restored to durable queues, persistent messages will be restored to durable queues and lost on non-durable queues during server restart.
This remains typed as uint8 to match Publishing.DeliveryMode. Other delivery modes specific to custom queue implementations are not enumerated here.
const ( ExchangeDirect = streadway.ExchangeDirect ExchangeFanout = streadway.ExchangeFanout ExchangeHeaders = streadway.ExchangeHeaders ExchangeTopic = streadway.ExchangeTopic )
Constants for standard AMQP 0-9-1 exchange types.
Variables ¶
var ( ErrChannelMax = streadway.ErrChannelMax ErrClosed = streadway.ErrClosed ErrCommandInvalid = streadway.ErrCommandInvalid ErrCredentials = streadway.ErrCredentials ErrFieldType = streadway.ErrFieldType ErrFrame = streadway.ErrFrame ErrSASL = streadway.ErrSASL ErrSyntax = streadway.ErrSyntax ErrUnexpectedFrame = streadway.ErrUnexpectedFrame ErrVhost = streadway.ErrVhost )
Aliases to sentinel errors
var ErrDuplicateProvider = errors.New(
"amqp middleware provider already registered. providers must only be registered once",
)
ErrDuplicateProvider is a sentinel error returned when am amqpmiddleware.ProvidesMiddleware is registered twice.
var ErrNoMiddlewareMethods = errors.New(
"amqp middleware provider does not implement any middleware methods",
)
ErrNoMiddlewareMethods is a sentinel error returned when am amqpmiddleware.ProvidesMiddleware is registered but has no .
Functions ¶
This section is empty.
Types ¶
type Acknowledger ¶
type Acknowledger = streadway.Acknowledger
Acknowledger notifies the server of successful or failed consumption of deliveries via identifier found in the Delivery.DeliveryTag field.
Applications can provide mock implementations in tests of Delivery handlers.
type Authentication ¶
type Authentication = streadway.Authentication
Authentication interface provides a means for different SASL authentication mechanisms to be used during connection tuning.
type BasicChannel ¶
BasicChannel is an alias to streadway/amqp.Channel, and is made available to avoid having to import both amqp packages if access to the base Channel type is desired
type BasicConfig ¶
BasicConfig is an alias to streadway/amqp.Config, and is made available to avoid having to import both amqp packages if access to the base Config type is desired
type BasicConfirmation ¶
type BasicConfirmation = streadway.Confirmation
BasicConfirmation is an alias to streadway/amqp.Confirmation, and is made available to avoid having to import both amqp packages if access to the base Confirmation type is desired
type BasicConnection ¶
type BasicConnection = streadway.Connection
BasicConnection is an alias to streadway/amqp.Connection, and is made available to avoid having to import both amqp packages if access to the base Connection type is desired
type Blocking ¶
Blocking notifies the server's TCP flow control of the Connection. When a server hits a memory or disk alarm it will block all connections until the resources are reclaimed. Use NotifyBlock on the Connection to receive these events.
type Channel ¶
type Channel struct {
// contains filtered or unexported fields
}
Channel represents an AMQP channel. Used as a context for valid message exchange. Errors on methods with this Channel as a receiver means this channel should be discarded and a new channel established.
---
ROGER NOTE: Channel is a drop-in replacement for streadway/amqp.Channel, with the exception that it automatically recovers from unexpected disconnections.
Unless otherwise noted at the beginning of their descriptions, all methods work exactly as their streadway counterparts, but will automatically re-attempt on ErrClosed errors. All other errors will be returned as normal. Descriptions have been copy-pasted from the streadway library for convenience.
Unlike streadway/amqp.Channel, this channel will remain open when an error is returned. Under the hood, the old, closed, channel will be replaced with a new, fresh, one -- so operation will continue as normal.
Method docstrings are copied from streadway amqp with notes where behavior diverges from the streadway/amqp package.
Example (AddMiddleware) ¶
Add a custom middleware to channels created by a connection.
package main import ( "context" "fmt" "github.com/peake100/rogerRabbit-go/pkg/amqp" "github.com/peake100/rogerRabbit-go/pkg/amqp/amqpmiddleware" "github.com/peake100/rogerRabbit-go/pkg/amqptest" ) func main() { // define our new middleware queueDeclareMiddleware := func( next amqpmiddleware.HandlerQueueDeclare, ) amqpmiddleware.HandlerQueueDeclare { return func( ctx context.Context, args amqpmiddleware.ArgsQueueDeclare, ) (amqpmiddleware.ResultsQueueDeclare, error) { fmt.Println("MIDDLEWARE INVOKED FOR QUEUE") fmt.Println("QUEUE NAME :", args.Name) fmt.Println("AUTO-DELETE:", args.AutoDelete) return next(ctx, args) } } // Create a config and add our middleware to it. config := amqp.DefaultConfig() config.ChannelMiddleware.AddQueueDeclare(queueDeclareMiddleware) // Get a new connection to our test broker. connection, err := amqp.DialConfigCtx( context.Background(), amqptest.TestDialAddress, config, ) if err != nil { panic(err) } defer connection.Close() // Get a new channel from our robust connection for publishing. The channel is // created with our default middleware. channel, err := connection.Channel() if err != nil { panic(err) } // Declare our queue, our middleware will be invoked and print a message. _, err = channel.QueueDeclare( "example_middleware", false, true, false, false, nil, ) if err != nil { panic(err) } // MIDDLEWARE INVOKED FOR QUEUE // QUEUE NAME : example_middleware // AUTO-DELETE: true }
Output:
Example (CustomMiddlewareProvider) ¶
package main import ( "context" "fmt" "github.com/peake100/rogerRabbit-go/pkg/amqp" "github.com/peake100/rogerRabbit-go/pkg/amqp/amqpmiddleware" "github.com/peake100/rogerRabbit-go/pkg/amqptest" ) // CustomMiddlewareProvider exposes methods for middlewares that need to coordinate. type CustomMiddlewareProvider struct { InvocationCount int } // TypeID implements amqpmiddleware.ProvidesMiddleware and returns a unique type ID // that can be used to fetch middleware values when testing. func (middleware *CustomMiddlewareProvider) TypeID() amqpmiddleware.ProviderTypeID { return "CustomMiddleware" } // QueueDeclare implements amqpmiddleware.ProvidesQueueDeclare. func (middleware *CustomMiddlewareProvider) QueueDeclare( next amqpmiddleware.HandlerQueueDeclare, ) amqpmiddleware.HandlerQueueDeclare { return func( ctx context.Context, args amqpmiddleware.ArgsQueueDeclare, ) (amqpmiddleware.ResultsQueueDeclare, error) { middleware.InvocationCount++ fmt.Printf( "DECLARED: %v, TOTAL: %v\n", args.Name, middleware.InvocationCount, ) return next(ctx, args) } } // QueueDelete implements amqpmiddleware.ProvidesQueueDelete. func (middleware *CustomMiddlewareProvider) QueueDelete( next amqpmiddleware.HandlerQueueDelete, ) amqpmiddleware.HandlerQueueDelete { return func( ctx context.Context, args amqpmiddleware.ArgsQueueDelete, ) (amqpmiddleware.ResultsQueueDelete, error) { middleware.InvocationCount++ fmt.Printf( "DELETED: %v, TOTAL: %v\n", args.Name, middleware.InvocationCount, ) return next(ctx, args) } } // NewCustomMiddlewareProvider creates a new CustomMiddlewareProvider. func NewCustomMiddlewareProvider() amqpmiddleware.ProvidesMiddleware { return new(CustomMiddlewareProvider) } func main() { // Create a config and add our middleware provider factory to it. config := amqp.DefaultConfig() config.ChannelMiddleware.AddProviderFactory(NewCustomMiddlewareProvider) // Get a new connection to our test broker. connection, err := amqp.DialConfigCtx( context.Background(), amqptest.TestDialAddress, config, ) if err != nil { panic(err) } defer connection.Close() // Get a new channel from our robust connection for publishing. The channel is // created with our default middleware. channel, err := connection.Channel() if err != nil { panic(err) } // Declare our queue, our middleware will be invoked and print a message. _, err = channel.QueueDeclare( "example_middleware", false, // durable true, // autoDelete false, // exclusive false, // noWait nil, // args ) if err != nil { panic(err) } // Delete our queue, our middleware will be invoked and print a message. _, err = channel.QueueDelete( "example_middleware", false, // ifUnused false, // ifEmpty false, // noWait ) if err != nil { panic(err) } // MIDDLEWARE INVOKED FOR QUEUE // DECLARED: example_middleware, TOTAL: 1 // DELETED: example_middleware, TOTAL: 2 // AUTO-DELETE: true }
Output:
Example (Reconnect) ¶
Channel reconnect examples.
package main import ( "context" "fmt" "github.com/peake100/rogerRabbit-go/pkg/amqp" "github.com/peake100/rogerRabbit-go/pkg/amqptest" "testing" ) func main() { // Get a new connection to our test broker. // // DialCtx is a new function that allows the Dial function to keep attempting // re-dials to the broker until the passed context expires. connection, err := amqp.DialCtx(context.Background(), amqptest.TestDialAddress) if err != nil { panic(err) } // Get a new channel from our robust connection. channel, err := connection.Channel() if err != nil { panic(err) } // We can use the Test method to return a testing harness with some additional // methods. ForceReconnect force-closes the underlying streadway Channel, causing // the robust Channel to reconnect. // // We'll use a dummy *testing.T object here. These methods are designed for tests // only and should not be used in production code. channel.Test(new(testing.T)).ForceReconnect(context.Background()) // We can see here our channel is still open. fmt.Println("IS CLOSED:", channel.IsClosed()) // We can even declare a queue on it queue, err := channel.QueueDeclare( "example_channel_reconnect", // name false, // durable true, // autoDelete false, // exclusive false, // noWait nil, // args ) if err != nil { panic(err) } // Here is the result fmt.Printf("QUEUE : %+v\n", queue) // Explicitly close the connection. This will also close all child channels. err = connection.Close() if err != nil { panic(err) } // Now that we have explicitly closed the connection, the channel will be closed. fmt.Println("IS CLOSED:", channel.IsClosed()) // IS CLOSED: false // QUEUE : {Name:example_channel_reconnect Messages:0 Consumers:0} // IS CLOSED: true }
Output:
func (*Channel) Ack ¶
Ack acknowledges a delivery by its delivery tag when having been consumed with Channel.Consume or Channel.Get.
Ack acknowledges all message received prior to the delivery tag when multiple is true.
See also Delivery.Ack ¶
---
ROGER NOTE: If a tag (or a tag range when acking multiple tags) is from a previously disconnected channel, a ErrCantAcknowledgeOrphans will be returned, which is a new error type specific to the Roger implementation.
When acking multiple tags, it is possible that some of the tags will be from a closed underlying channel, and therefore be orphaned, and some will be from the current channel, and therefore be successful. In such cases, the ErrCantAcknowledgeOrphans will still be returned, and can be checked for which tag ranges could not be acked.
func (*Channel) Close ¶
func (manager *Channel) Close() error
Close the robust connection. This both closes the current connection and keeps it from reconnecting.
func (*Channel) Confirm ¶
Confirm puts this channel into confirm mode so that the client can ensure all publishings have successfully been received by the server. After entering this mode, the server will send a basic.ack or basic.nack message with the deliver tag set to a 1 based incremental index corresponding to every publishing received after the this method returns.
Add a listener to Channel.NotifyPublish to respond to the Confirmations. If Channel.NotifyPublish is not called, the Confirmations will be silently ignored.
The order of acknowledgments is not bound to the order of deliveries.
Ack and Nack confirmations will arrive at some point in the future.
Uncountable mandatory or immediate messages are acknowledged immediately after any Channel.NotifyReturn listeners have been notified. Other messages are acknowledged when all queues that should have the message routed to them have either received acknowledgment of delivery or have enqueued the message, persisting the message if necessary.
When NoWait is true, the client will not wait for a response. A channel exception could occur if the server does not support this method.
---
ROGER NOTE: Tags will be continuous, even in the event of a re-connect. The Channel will take care of matching up caller-facing delivery tags to the current channel's underlying tag.
func (*Channel) Consume ¶
func (channel *Channel) Consume( queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args Table, ) (deliveryChan <-chan internal.Delivery, err error)
Consume immediately starts delivering queued messages.
Begin receiving on the returned chan Delivery before any other operation on the Connection or Channel.
Continues deliveries to the returned chan Delivery until Channel.Cancel, Connection.Close, Channel.Close, or an AMQP exception occurs. Consumers must range over the chan to ensure all deliveries are received. Unreceived deliveries will block all methods on the same connection.
All deliveries in AMQP must be acknowledged. It is expected of the consumer to call Delivery.Ack after it has successfully processed the delivery. If the consumer is cancelled or the channel or connection is closed any unacknowledged deliveries will be requeued at the end of the same queue.
The consumer is identified by a string that is unique and scoped for all consumers on this channel. If you wish to eventually cancel the consumer, use the same non-empty identifier in Channel.Cancel. An empty string will cause the library to generate a unique identity. The consumer identity will be included in every Delivery in the ConsumerTag field
When autoAck (also known as noAck) is true, the server will acknowledge deliveries to this consumer prior to writing the delivery to the network. When autoAck is true, the consumer should not call Delivery.Ack. Automatically acknowledging deliveries means that some deliveries may get lost if the consumer is unable to process them after the server delivers them. See http://www.rabbitmq.com/confirms.html for more details.
When Exclusive is true, the server will ensure that this is the sole consumer from this queue. When Exclusive is false, the server will fairly distribute deliveries across multiple consumers.
The noLocal flag is not supported by RabbitMQ.
It's advisable to use separate connections for Channel.Publish and Channel.Consume so not to have TCP pushback on publishing affect the ability to consume messages, so this parameter is here mostly for completeness.
When NoWait is true, do not wait for the server to confirm the request and immediately begin deliveries. If it is not possible to consume, a channel exception will be raised and the channel will be closed.
Optional arguments can be provided that have specific semantics for the queue or server.
Inflight messages, limited by Channel.Qos will be buffered until received from the returned chan.
When the Channel or Connection is closed, all buffered and inflight messages will be dropped.
When the consumer tag is cancelled, all inflight messages will be delivered until the returned chan is closed.
---
ROGER NOTE: Unlike the normal consume method, reconnections are handled automatically on channel errors. Delivery tags will appear un-interrupted to the consumer, and the Roger. Rabbit channel will track the lineup of caller-facing delivery tags to the delivery tags of the current underlying channel.
Example (DeliveryTags) ¶
package main import ( "context" "fmt" "github.com/peake100/rogerRabbit-go/pkg/amqp" "github.com/peake100/rogerRabbit-go/pkg/amqptest" "sync" "testing" ) func main() { // Get a new connection to our test broker. connection, err := amqp.DialCtx(context.Background(), amqptest.TestDialAddress) if err != nil { panic(err) } // Get a new channel from our robust connection for consuming. consumeChannel, err := connection.Channel() if err != nil { panic(err) } // Get a new channel from our robust connection for publishing. publishChannel, err := connection.Channel() if err != nil { panic(err) } queueName := "example_delivery_tag_continuity" // Declare the queue we are going to use. queue, err := consumeChannel.QueueDeclare( queueName, // name false, // durable false, // autoDelete false, // exclusive false, // noWait nil, // args ) if err != nil { panic(err) } // Clean up the queue on exit, defer consumeChannel.QueueDelete( queue.Name, false, false, false, ) // Start consuming the channel consume, err := consumeChannel.Consume( queue.Name, "example consumer", // consumer name true, // autoAck false, // exclusive false, // no local false, // no wait nil, // args ) if err != nil { panic(err) } // We'll close this channel when the consumer is exhausted consumeComplete := new(sync.WaitGroup) consumerClosed := make(chan struct{}) // Set the prefetch count to 1, that way we are less likely to lose a message // that in in-flight from the broker in this example. err = consumeChannel.Qos(1, 0, false) if err != nil { panic(err) } // Launch a consumer go func() { // Close the consumeComplete to signal exit defer close(consumerClosed) fmt.Println("STARTING CONSUMER") // Range over the consume channel for delivery := range consume { // Force-reconnect the channel after each delivery. consumeChannel.Test(new(testing.T)).ForceReconnect(context.Background()) // Tick down the consumeComplete WaitGroup consumeComplete.Done() // Print the delivery. Even though we are forcing a new underlying channel // to be connected each time, the delivery tags will still be continuous. fmt.Printf( "DELIVERY %v: %v\n", delivery.DeliveryTag, string(delivery.Body), ) } fmt.Println("DELIVERIES EXHAUSTED") }() // We'll publish 10 test messages. for i := 0; i < 10; i++ { // Add one to the consumeComplete WaitGroup. consumeComplete.Add(1) // Publish a message. Even though the consumer may be force re-connecting the // connection each time, we can keep using the channel. // // NOTE: it is possible that we will drop a message here during a reconnection // event. If we want to be sure all messages reach the broker, we'll need to // publish messages with the Channel in confirmation mode, which we will // show in another example. err = publishChannel.Publish( "", queue.Name, false, false, amqp.Publishing{ Body: []byte(fmt.Sprintf("message %v", i)), }, ) if err != nil { panic(err) } } // Wait for all messages to be received consumeComplete.Wait() // Close the connection err = connection.Close() if err != nil { panic(err) } // Wait for the consumer to exit <-consumerClosed // exit // STARTING CONSUMER // DELIVERY 1: message 0 // DELIVERY 2: message 1 // DELIVERY 3: message 2 // DELIVERY 4: message 3 // DELIVERY 5: message 4 // DELIVERY 6: message 5 // DELIVERY 7: message 6 // DELIVERY 8: message 7 // DELIVERY 9: message 8 // DELIVERY 10: message 9 // DELIVERIES EXHAUSTED }
Output:
Example (Orphan) ¶
package main import ( "context" "errors" "fmt" "github.com/peake100/rogerRabbit-go/pkg/amqp" "github.com/peake100/rogerRabbit-go/pkg/amqptest" "testing" ) func main() { // Get a new connection to our test broker. connection, err := amqp.DialCtx(context.Background(), amqptest.TestDialAddress) if err != nil { panic(err) } // Get a new channel from our robust connection for consuming. channel, err := connection.Channel() if err != nil { panic(err) } queueName := "example_delivery_ack_orphan" // Declare the queue we are going to use. _, err = channel.QueueDeclare( queueName, // name false, // durable true, // autoDelete false, // exclusive false, // noWait nil, // args ) if err != nil { panic(err) } // Cleanup channel on exit. defer channel.QueueDelete(queueName, false, false, false) // Start consuming the channel consume, err := channel.Consume( queueName, "example consumer", // consumer name // Auto-ack is set to false false, // autoAck false, // exclusive false, // no local false, // no wait nil, // args ) if err != nil { panic(err) } // publish a message err = channel.Publish( "", // exchange queueName, false, false, amqp.Publishing{ Body: []byte("test message"), }, ) if err != nil { panic(err) } // get the delivery of our published message delivery := <-consume fmt.Println("DELIVERY:", string(delivery.Body)) // Force-close the channel. channel.Test(new(testing.T)).ForceReconnect(context.Background()) // Now that the original underlying channel is closed, it is impossible to ack // the delivery. We will get an error when we try. err = delivery.Ack(false) fmt.Println("ACK ERROR:", err) // This error is an orphan error var orphanErr amqp.ErrCantAcknowledgeOrphans if !errors.As(err, &orphanErr) { panic("error not orphan error") } fmt.Println("FIRST ORPHAN TAG:", orphanErr.OrphanTagFirst) fmt.Println("LAST ORPHAN TAG :", orphanErr.OrphanTagLast) // DELIVERY: test message // ACK ERROR: 1 tags orphaned (1 - 1), 0 tags successfully acknowledged // FIRST ORPHAN TAG: 1 // LAST ORPHAN TAG : 1 }
Output:
func (*Channel) ExchangeBind ¶
func (channel *Channel) ExchangeBind( destination, key, source string, noWait bool, args Table, ) (err error)
ExchangeBind binds an exchange to another exchange to create inter-exchange routing topologies on the server. This can decouple the private topology and routing exchanges from exchanges intended solely for publishing endpoints.
Binding two exchanges with identical arguments will not create duplicate bindings.
Binding one exchange to another with multiple bindings will only deliver a message once. For example if you bind your exchange to `amq.fanout` with two different binding keys, only a single message will be delivered to your exchange even though multiple bindings will match.
Given a message delivered to the source exchange, the message will be forwarded to the destination exchange when the routing Key is matched.
ExchangeBind("sell", "MSFT", "trade", false, nil) ExchangeBind("buy", "AAPL", "trade", false, nil) Delivery Source Key Destination example exchange exchange ----------------------------------------------- Key: AAPL --> trade ----> MSFT sell \---> AAPL --> buy
When NoWait is true, do not wait for the server to confirm the binding. If any error occurs the channel will be closed. Add a listener to NotifyClose to handle these errors.
Optional arguments specific to the exchanges bound can also be specified.
---
ROGER NOTE: All bindings will be remembered and re-declared on reconnection events.
func (*Channel) ExchangeDeclare ¶
func (channel *Channel) ExchangeDeclare( name, kind string, durable, autoDelete, internal, noWait bool, args Table, ) (err error)
ExchangeDeclare declares an exchange on the server. If the exchange does not already exist, the server will create it. If the exchange exists, the server verifies that it is of the provided type, durability and auto-delete flags.
Errors returned from this method will close the channel.
Exchange names starting with "amq." are reserved for pre-declared and standardized exchanges. The client MAY declare an exchange starting with "amq." if the passive option is set, or the exchange already exists. Names can consist of a non-empty sequence of letters, digits, hyphen, underscore, period, or colon.
Each exchange belongs to one of a set of exchange kinds/types implemented by the server. The exchange types define the functionality of the exchange - i.e. how messages are routed through it. Once an exchange is declared, its type cannot be changed. The common types are "direct", "fanout", "topic" and "headers".
Durable and Non-Auto-Deleted exchanges will survive server restarts and remain declared when there are no remaining bindings. This is the best lifetime for long-lived exchange configurations like stable routes and default exchanges.
Non-Durable and Auto-Deleted exchanges will be deleted when there are no remaining bindings and not restored on server restart. This lifetime is useful for temporary topologies that should not pollute the virtual host on failure or after the consumers have completed.
Non-Durable and Non-Auto-deleted exchanges will remain as long as the server is running including when there are no remaining bindings. This is useful for temporary topologies that may have long delays between bindings.
Durable and Auto-Deleted exchanges will survive server restarts and will be removed before and after server restarts when there are no remaining bindings. These exchanges are useful for robust temporary topologies or when you require binding Durable queues to auto-deleted exchanges.
Note: RabbitMQ declares the default exchange types like 'amq.fanout' as Durable, so queues that bind to these pre-declared exchanges must also be Durable.
Exchanges declared as `internal` do not accept accept publishings. Internal exchanges are useful when you wish to implement inter-exchange topologies that should not be exposed to users of the broker.
When NoWait is true, declare without waiting for a confirmation from the server. The channel may be closed as a result of an error. Add a NotifyClose listener to respond to any exceptions.
Optional amqp.Table of arguments that are specific to the server's implementation of the exchange can be sent for exchange types that require extra parameters.
---
ROGER NOTE: Exchanges declared with a roger channel will be automatically re-declared upon channel disconnect recovery. Calling ExchangeDelete will remove the exchange from the list of exchanges to be re-declared in case of a disconnect.
This may cases where exchanges deleted by other producers / consumers are automatically re-declared. Future updates may introduce more control over when and how exchanges are re-declared on reconnection.
func (*Channel) ExchangeDeclarePassive ¶
func (channel *Channel) ExchangeDeclarePassive( name, kind string, durable, autoDelete, internal, noWait bool, args Table, ) (err error)
ExchangeDeclarePassive is functionally and parametrically equivalent to ExchangeDeclare, except that it sets the "passive" attribute to true. A passive exchange is assumed by RabbitMQ to already exist, and attempting to connect to a non-existent exchange will cause RabbitMQ to throw an exception. This function can be used to detect the existence of an exchange.
func (*Channel) ExchangeDelete ¶
ExchangeDelete removes the named exchange from the server. When an exchange is deleted all queue bindings on the exchange are also deleted. If this exchange does not exist, the channel will be closed with an error.
When ifUnused is true, the server will only delete the exchange if it has no queue bindings. If the exchange has queue bindings the server does not delete it but close the channel with an exception instead. Set this to true if you are not the sole owner of the exchange.
When NoWait is true, do not wait for a server confirmation that the exchange has been deleted. Failing to delete the channel could close the channel. Add a NotifyClose listener to respond to these channel exceptions.
---
ROGER NOTE: Calling ExchangeDelete will remove am exchange from the list of exchanges or relevant bindings to be re-declared on reconnections of the underlying streadway/amqp.Channel object.
func (*Channel) ExchangeUnbind ¶
func (channel *Channel) ExchangeUnbind( destination, key, source string, noWait bool, args Table, ) (err error)
ExchangeUnbind unbinds the destination exchange from the source exchange on the server by removing the routing Key between them. This is the inverse of ExchangeBind. If the binding does not currently exist, an error will be returned.
When NoWait is true, do not wait for the server to confirm the deletion of the binding. If any error occurs the channel will be closed. Add a listener to NotifyClose to handle these errors.
Optional arguments that are specific to the type of exchanges bound can also be provided. These must match the same arguments specified in ExchangeBind to identify the binding.
---
ROGER NOTE: All relevant bindings will be removed from the list of bindings to declare on reconnect.
func (*Channel) Flow ¶
Flow pauses the delivery of messages to consumers on this channel. Channels are opened with flow control active, to open a channel with paused deliveries immediately call this method with `false` after calling Connection.Channel.
When active is `false`, this method asks the server to temporarily pause deliveries until called again with active as `true`.
Channel.Get methods will not be affected by flow control.
This method is not intended to act as window control. Use Channel.Qos to limit the number of unacknowledged messages or bytes in flight instead.
The server may also send us flow methods to throttle our publishings. A well behaving publishing client should add a listener with Channel.NotifyFlow and pause its publishings when `false` is sent on that channel.
Note: RabbitMQ prefers to use TCP push back to control flow for all channels on a connection, so under high volume scenarios, it's wise to open separate Connections for publishings and deliveries.
func (*Channel) Get ¶
func (channel *Channel) Get( queue string, autoAck bool, ) (msg internal.Delivery, ok bool, err error)
Get synchronously receives a single Delivery from the head of a queue from the server to the client. In almost all cases, using Channel.Consume will be preferred.
If there was a delivery waiting on the queue and that delivery was received, the second return value will be true. If there was no delivery waiting or an error occurred, the ok bool will be false.
All deliveries must be acknowledged including those from Channel.Get. Call Delivery.Ack on the returned delivery when you have fully processed this delivery.
When autoAck is true, the server will automatically acknowledge this message so you don'tb have to. But if you are unable to fully process this message before the channel or connection is closed, the message will not get requeued.
---
ROGER NOTE: Roger, Rabbit Channel objects will expose a continuous set of delivery tags to the user, even over disconnects, adjusting a message's delivery tags to match its actual underlying tag relative to the current channel is all handled for you.
func (*Channel) IsClosed ¶
func (manager *Channel) IsClosed() bool
IsClosed returns true if the connection is marked as closed, otherwise false is returned.
--
ROGER NOTE: unlike streadway/amqp, which only implements IsClosed() on connection objects, rogerRabbit makes IsClosed() available on both connections and channels. IsClosed() will return false until the connection / channel is shut down, even if the underlying connection is currently disconnected and waiting to reconnectMiddleware.
func (*Channel) Nack ¶
Nack negatively acknowledges a delivery by its delivery tag. Prefer this method to notify the server that you were not able to process this delivery and it must be redelivered or dropped.
See also Delivery.Nack ¶
---
ROGER NOTE: If a tag (or a tag range when nacking multiple tags) is from a previously disconnected channel, a ErrCantAcknowledgeOrphans will be returned, which is a new error type specific to the Roger implementation.
When nacking multiple tags, it is possible that some of the tags will be from a closed underlying channel, and therefore be orphaned, and some will be from the current channel, and therefore be successful. In such cases, the ErrCantAcknowledgeOrphans will still be returned, and can be checked for which tag ranges could not be nacked.
func (*Channel) NotifyCancel ¶
NotifyCancel registers a listener for basic.cancel methods. These can be sent from the server when a queue is deleted or when consuming from a mirrored queue where the master has just failed (and was moved to another node).
The subscription tag is returned to the listener.
func (*Channel) NotifyClose ¶
NotifyClose is as NotifyClose on streadway Connection/Channel.NotifyClose. Subscribers to Close events will not be notified when a reconnection occurs under the hood, only when the roger Connection or Channel is closed by calling the Close method. This mirrors the streadway implementation, where Close events are only sent once when the livesOnce object becomes unusable.
For finer-grained connection status, see NotifyDial and NotifyDisconnect, which will both send individual events when the connection is lost or re-acquired.
func (*Channel) NotifyConfirm ¶
NotifyConfirm calls NotifyPublish and starts a goroutine sending ordered Ack and Nack publicationTag to the respective channels.
For strict ordering, use NotifyPublish instead.
---
ROGER NOTE: The nack channel will receive both tags that were explicitly nacked by the server AND tags that were orphaned due to a connection loss. If you wish to handle Orphaned tags separately, use the new method NotifyConfirmOrOrphaned.
func (*Channel) NotifyConfirmOrOrphaned ¶
func (channel *Channel) NotifyConfirmOrOrphaned( ack, nack, orphaned chan uint64, ) (chan uint64, chan uint64, chan uint64)
NotifyConfirmOrOrphaned is as NotifyConfirm, but with a third queue for delivery tags that were orphaned from disconnect, these tags are routed to the nack channel in NotifyConfirm.
func (*Channel) NotifyDial ¶
NotifyDial is new for robust Roger transportType objects. NotifyDial will send all subscribers an event notification every time we try to re-acquire a connection. This will include both failure AND successes.
func (*Channel) NotifyDisconnect ¶
NotifyDisconnect is new for robust Roger transportType objects. NotifyDisconnect will send all subscribers an event notification every time the underlying connection is lost.
func (*Channel) NotifyFlow ¶
NotifyFlow registers a listener for basic.flow methods sent by the server. When `false` is sent on one of the listener channels, all publishers should pause until a `true` is sent.
The server may ask the producer to pause or restart the flow of Publishings sent by on a channel. This is a simple flow-control mechanism that a server can use to avoid overflowing its queues or otherwise finding itself receiving more messages than it can process. Note that this method is not intended for window control. It does not affect contents returned by basic.get-ok methods.
When a new channel is opened, it is active (flow is active). Some applications assume that channels are inactive until started. To emulate this behavior a client MAY open the channel, then pause it.
Publishers should respond to a flow messages as rapidly as possible and the server may disconnect over producing channels that do not respect these messages.
basic.flow-ok methods will always be returned to the server regardless of the number of listeners there are.
To control the flow of deliveries from the server, use the Channel.Flow() method instead.
Note: RabbitMQ will rather use TCP pushback on the network connection instead of sending basic.flow. This means that if a single channel is producing too much on the same connection, all channels using that connection will suffer, including acknowledgments from deliveries. Use different Connections if you desire to interleave consumers and producers in the same process to avoid your basic.ack messages from getting rate limited with your basic.publish messages.
---
ROGER NOTE: Flow notification will be received when an unexpected disconnection occurs. If the broker disconnects, a “false“ notification will be sent unless the last notification from the broker was “false“, and when the connection is re-acquired, a “true“ notification will be sent before resuming relaying notification from the broker. This means that NotifyFlow can be a useful tool for dealing with disconnects, even when using RabbitMQ.
func (*Channel) NotifyPublish ¶
func (channel *Channel) NotifyPublish( confirm chan internal.Confirmation, ) chan internal.Confirmation
NotifyPublish registers a listener for reliable publishing. Receives from this chan for every publish after Channel.Confirm will be in order starting with publicationTag 1.
There will be one and only one Confirmation Publishing starting with the delivery tag of 1 and progressing sequentially until the total number of Publishings have been seen by the server.
Acknowledgments will be received in the order of delivery from the NotifyPublish channels even if the server acknowledges them out of order.
The listener chan will be closed when the Channel is closed.
The capacity of the chan Confirmation must be at least as large as the number of outstanding publishings. Not having enough buffered chans will create a deadlock if you attempt to perform other operations on the Connection or Channel while confirms are in-flight.
It's advisable to wait for all Confirmations to arrive before calling Channel.Close() or Connection.Close().
---
ROGER NOTE:If a channel is disconnected unexpectedly, there may be confirmations in flight that did not reach the client. In cases where a channel connection is re-established missing delivery tags will be reported qs nacked, but an additional Confirmation.DisconnectOrphan field will be set to `true`. It is possible that such messages DID reach the broker, but the Ack messages were lost in the disconnect event.
It's also possible that an orphan is caused from a problem with publishing the message in the first place. For instance, publishing with the “immediate“ flag set to false if the broker does not support it, or if a queue was not declared correctly. If you are getting a lot of orphaned messages, make sure to check what disconnect errors you are receiving.
func (*Channel) NotifyReturn ¶
NotifyReturn registers a listener for basic.return methods. These can be sent from the server when a publish is undeliverable either from the mandatory or immediate flags.
A return struct has a copy of the Publishing along with some error information about why the publishing failed.
---
ROGER NOTE: Because Channel survives over unexpected broker disconnects, it is possible that returns in-flight to the client from the broker will be dropped, and therefore will be missed.
func (*Channel) Publish ¶
func (channel *Channel) Publish( exchange string, key string, mandatory bool, immediate bool, msg Publishing, ) (err error)
Publish sends a Publishing from the client to an exchange on the server.
When you want a single message to be delivered to a single queue, you can publish to the default exchange with the routingKey of the queue Name. This is because every declared queue gets an implicit route to the default exchange.
Since publishings are asynchronous, any undeliverable message will get returned by the server. Add a listener with Channel.NotifyReturn to handle any undeliverable message when calling publish with either the mandatory or immediate parameters as true.
Publishings can be undeliverable when the mandatory flag is true and no queue is bound that matches the routing Key, or when the immediate flag is true and no consumer on the matched queue is ready to accept the delivery.
This can return an error when the channel, connection or socket is closed. The error or lack of an error does not indicate whether the server has received this publishing.
It is possible for publishing to not reach the broker if the underlying socket is shut down without pending publishing packets being flushed from the kernel buffers. The easy way of making it probable that all publishings reach the server is to always call Connection.Close before terminating your publishing application. The way to ensure that all publishings reach the server is to add a listener to Channel.NotifyPublish and put the channel in confirm mode with Channel.Confirm. Publishing delivery tags and their corresponding confirmations start at 1. Exit when all publishings are confirmed.
When Publish does not return an error and the channel is in confirm mode, the internal counter for DeliveryTags with the first confirmation starts at 1.
---
ROGER NOTE: Roger, Rabbit Channel objects will expose a continuous set of confirmation and delivery tags to the user, even over disconnects, adjusting a message's confirmation tag to match it's actual underlying tag relative to the current channel is all handled for you.
Example (TagContinuity) ¶
Publication tags remain continuous, even across disconnection events.
package main import ( "context" "fmt" "github.com/peake100/rogerRabbit-go/pkg/amqp" "github.com/peake100/rogerRabbit-go/pkg/amqptest" "sync" "testing" ) func main() { // Get a new connection to our test broker. connection, err := amqp.DialCtx(context.Background(), amqptest.TestDialAddress) if err != nil { panic(err) } // Get a new channel from our robust connection for publishing. publishChannel, err := connection.Channel() if err != nil { panic(err) } // Put the channel into confirmation mode err = publishChannel.Confirm(false) if err != nil { panic(err) } confirmationsReceived := new(sync.WaitGroup) confirmationsComplete := make(chan struct{}) // Create a channel to consume publication confirmations. publishEvents := publishChannel.NotifyPublish(make(chan amqp.Confirmation)) go func() { // Close to signal exit. defer close(confirmationsComplete) // Range over the confirmation channel. for confirmation := range publishEvents { // Mark 1 confirmation as done. confirmationsReceived.Done() // Print confirmation. fmt.Printf( "CONFIRMATION TAG %02d: ACK: %v ORPHAN: %v\n", confirmation.DeliveryTag, confirmation.Ack, // If the confirmation was never received because the channel was // disconnected, then confirmation.Ack will be false, and // confirmation.DisconnectOrphan will be true. confirmation.DisconnectOrphan, ) } }() // Declare the message queue queueName := "example_publish_tag_continuity" _, err = publishChannel.QueueDeclare( queueName, false, true, false, false, nil, ) if err != nil { panic(err) } // We'll publish 10 test messages. for i := 0; i < 10; i++ { // We want to wait here to make sure we got the confirmation from the last // publication before force-closing the connection to show we can handle it. confirmationsReceived.Wait() // Force a reconnection of the underlying channel. publishChannel.Test(new(testing.T)).ForceReconnect(context.Background()) // Increment the confirmation WaitGroup confirmationsReceived.Add(1) // Publish a message. Even though the consumer may be force re-connecting the // connection each time, we can keep using the channel. err = publishChannel.Publish( "", queueName, false, false, amqp.Publishing{ Body: []byte(fmt.Sprintf("message %v", i)), }, ) if err != nil { panic(err) } } // Wait for all confirmations to be received. confirmationsReceived.Wait() // Close the connection. err = connection.Close() if err != nil { panic(err) } // Wait for the confirmation routine to exit. <-confirmationsComplete // Exit. // CONFIRMATION TAG 01: ACK: true ORPHAN: false // CONFIRMATION TAG 02: ACK: true ORPHAN: false // CONFIRMATION TAG 03: ACK: true ORPHAN: false // CONFIRMATION TAG 04: ACK: true ORPHAN: false // CONFIRMATION TAG 05: ACK: true ORPHAN: false // CONFIRMATION TAG 06: ACK: true ORPHAN: false // CONFIRMATION TAG 07: ACK: true ORPHAN: false // CONFIRMATION TAG 08: ACK: true ORPHAN: false // CONFIRMATION TAG 09: ACK: true ORPHAN: false // CONFIRMATION TAG 10: ACK: true ORPHAN: false }
Output:
func (*Channel) Qos ¶
Qos controls how many messages or how many bytes the server will try to keep on the network for consumers before receiving delivery acks. The intent of Qos is to make sure the network buffers stay full between the server and client.
With a prefetch publishCount greater than zero, the server will deliver that many messages to consumers before acknowledgments are received. The server ignores this option when consumers are started with noAck because no acknowledgments are expected or sent.
With a prefetch size greater than zero, the server will try to keep at least that many bytes of deliveries flushed to the network before receiving acknowledgments from the consumers. This option is ignored when consumers are started with noAck.
When global is true, these Qos settings apply to all existing and future consumers on all channels on the same connection. When false, the Channel.Qos settings will apply to all existing and future consumers on this channel.
Please see the RabbitMQ Consumer Prefetch documentation for an explanation of how the global flag is implemented in RabbitMQ, as it differs from the AMQP 0.9.1 specification in that global Qos settings are limited in scope to channels, not connections (https://www.rabbitmq.com/consumer-prefetch.html).
To get round-robin behavior between consumers consuming from the same queue on different connections, set the prefetch publishCount to 1, and the next available message on the server will be delivered to the next available consumer.
If your consumer work time is reasonably consistent and not much greater than two times your network round trip time, you will see significant throughput improvements starting with a prefetch publishCount of 2 or slightly greater as described by benchmarks on RabbitMQ.
http://www.rabbitmq.com/blog/2012/04/25/rabbitmq-performance-measurements-part-2/
func (*Channel) QueueBind ¶
QueueBind binds an exchange to a queue so that publishings to the exchange will be routed to the queue when the publishing routing Key matches the binding routing Key.
QueueBind("pagers", "alert", "log", false, nil) QueueBind("emails", "info", "log", false, nil) Delivery Exchange Key Queue ----------------------------------------------- Key: alert --> log ----> alert --> pagers Key: info ---> log ----> info ---> emails Key: debug --> log (none) (dropped)
If a binding with the same Key and arguments already exists between the exchange and queue, the attempt to rebind will be ignored and the existing binding will be retained.
In the case that multiple bindings may cause the message to be routed to the same queue, the server will only route the publishing once. This is possible with topic exchanges.
QueueBind("pagers", "alert", "amq.topic", false, nil) QueueBind("emails", "info", "amq.topic", false, nil) QueueBind("emails", "#", "amq.topic", false, nil) // match everything Delivery Exchange Key Queue ----------------------------------------------- Key: alert --> amq.topic ----> alert --> pagers Key: info ---> amq.topic ----> # ------> emails \---> info ---/ Key: debug --> amq.topic ----> # ------> emails
It is only possible to bind a Durable queue to a Durable exchange regardless of whether the queue or exchange is auto-deleted. Bindings between Durable queues and exchanges will also be restored on server restart.
If the binding could not complete, an error will be returned and the channel will be closed.
When NoWait is false and the queue could not be bound, the channel will be closed with an error.
func (*Channel) QueueDeclare ¶
func (channel *Channel) QueueDeclare( name string, durable, autoDelete, exclusive, noWait bool, args Table, ) (queue Queue, err error)
QueueDeclare declares a queue to hold messages and deliver to consumers. Declaring creates a queue if it doesn'tb already exist, or ensures that an existing queue matches the same parameters.
Every queue declared gets a default binding to the empty exchange "" which has the type "direct" with the routing Key matching the queue's Name. With this default binding, it is possible to publish messages that route directly to this queue by publishing to "" with the routing Key of the queue Name.
QueueDeclare("alerts", true, false, false, false, nil) Publish("", "alerts", false, false, Publishing{Body: []byte("...")}) Delivery Exchange Key Queue ----------------------------------------------- Key: alerts -> "" -> alerts -> alerts
The queue Name may be empty, in which case the server will generate a unique Name which will be returned in the Name field of Queue struct.
Durable and Non-Auto-Deleted queues will survive server restarts and remain when there are no remaining consumers or bindings. Persistent publishings will be restored in this queue on server restart. These queues are only able to be bound to Durable exchanges.
Non-Durable and Auto-Deleted queues will not be redeclared on server restart and will be deleted by the server after a short time when the last consumer is canceled or the last consumer's channel is closed. Queues with this lifetime can also be deleted normally with QueueDelete. These Durable queues can only be bound to non-Durable exchanges.
Non-Durable and Non-Auto-Deleted queues will remain declared as long as the server is running regardless of how many consumers. This lifetime is useful for temporary topologies that may have long delays between consumer activity. These queues can only be bound to non-Durable exchanges.
Durable and Auto-Deleted queues will be restored on server restart, but without active consumers will not survive and be removed. This Lifetime is unlikely to be useful.
Exclusive queues are only accessible by the connection that declares them and will be deleted when the connection closes. Channels on other connections will receive an error when attempting to declare, bind, consume, purge or delete a queue with the same Name.
When NoWait is true, the queue will assume to be declared on the server. A channel exception will arrive if the conditions are met for existing queues or attempting to modify an existing queue from a different connection.
When the error return value is not nil, you can assume the queue could not be declared with these parameters, and the channel will be closed.
---
ROGER NOTE: Queues declared with a Roger, Rabbit Channel will be automatically re-declared upon channel disconnect recovery. Calling channel.QueueDelete() will remove the queue from the list of queues to be re-declared in case of a disconnect.
This may cases where queues deleted by other producers / consumers are automatically re-declared. Future updates will introduce more control over when and how queues are re-declared on reconnection.
Example (ReDeclare) ¶
Declared queues are re-declared on disconnect.
package main import ( "context" "fmt" "github.com/peake100/rogerRabbit-go/pkg/amqp" "github.com/peake100/rogerRabbit-go/pkg/amqptest" "testing" ) func main() { // Get a new connection to our test broker. connection, err := amqp.DialCtx(context.Background(), amqptest.TestDialAddress) if err != nil { panic(err) } // Close the connection on exit. defer connection.Close() // Get a new channel from our robust connection for publishing. channel, err := connection.Channel() if err != nil { panic(err) } queueName := "example_queue_declare_robust" // If we try to inspect this queue before declaring it, we will get an error. _, err = channel.QueueInspect(queueName) if err == nil { panic("expected queue inspect error") } fmt.Println("INSPECT ERROR:", err) // Declare the queue. _, err = channel.QueueDeclare( queueName, false, // durable true, // autoDelete false, // exclusive false, // noWait nil, // args ) if err != nil { panic(err) } // Delete the queue to clean up defer channel.QueueDelete(queueName, false, false, false) // Inspect the queue. queue, err := channel.QueueInspect(queueName) if err != nil { panic(err) } fmt.Println("INSPECTION:", queue.Name) // Force a re-connection channel.Test(new(testing.T)).ForceReconnect(context.Background()) // Inspect the queue again, it will already have been re-declared queue, err = channel.QueueInspect(queueName) if err != nil { panic(err) } fmt.Println("INSPECTION:", queue.Name) // Delete the queue to clean up _, err = channel.QueueDelete(queueName, false, false, false) if err != nil { panic(err) } // INSPECT ERROR: Exception (404) Reason: "NOT_FOUND - no queue 'example_queue_declare_robust' in vhost '/'" // INSPECTION: example_queue_declare_robust // INSPECTION: example_queue_declare_robust }
Output:
func (*Channel) QueueDeclarePassive ¶
func (channel *Channel) QueueDeclarePassive( name string, durable, autoDelete, exclusive, noWait bool, args Table, ) (queue Queue, err error)
QueueDeclarePassive is functionally and parametrically equivalent to QueueDeclare, except that it sets the "passive" attribute to true. A passive queue is assumed by RabbitMQ to already exist, and attempting to connect to a non-existent queue will cause RabbitMQ to throw an exception. This function can be used to test for the existence of a queue.
func (*Channel) QueueDelete ¶
func (channel *Channel) QueueDelete( name string, ifUnused, ifEmpty, noWait bool, ) (count int, err error)
QueueDelete removes the queue from the server including all bindings then purges the messages based on server configuration, returning the number of messages purged.
When ifUnused is true, the queue will not be deleted if there are any consumers on the queue. If there are consumers, an error will be returned and the channel will be closed.
When ifEmpty is true, the queue will not be deleted if there are any messages remaining on the queue. If there are messages, an error will be returned and the channel will be closed.
When NoWait is true, the queue will be deleted without waiting for a response from the server. The purged message publishCount will not be meaningful. If the queue could not be deleted, a channel exception will be raised and the channel will be closed.
---
ROGER NOTE: Calling QueueDelete will remove a queue from the list of queues to be re-declared on reconnections of the underlying streadway/amqp.Channel object.
func (*Channel) QueueInspect ¶
QueueInspect passively declares a queue by Name to inspect the current message publishCount and consumer publishCount.
Use this method to check how many messages ready for delivery reside in the queue, how many consumers are receiving deliveries, and whether a queue by this Name already exists.
If the queue by this Name exists, use Channel.QueueDeclare check if it is declared with specific parameters.
If a queue by this Name does not exist, an error will be returned and the channel will be closed.
func (*Channel) QueuePurge ¶
QueuePurge removes all messages from the named queue which are not waiting to be acknowledged. Messages that have been delivered but have not yet been acknowledged will not be removed.
When successful, returns the number of messages purged.
If NoWait is true, do not wait for the server response and the number of messages purged will not be meaningful.
func (*Channel) QueueUnbind ¶
QueueUnbind removes a binding between an exchange and queue matching the Key and arguments.
It is possible to send and empty string for the exchange Name which means to unbind the queue from the default exchange.
func (*Channel) Reject ¶
Reject negatively acknowledges a delivery by its delivery tag. Prefer Nack over Reject when communicating with a RabbitMQ server because you can Nack multiple messages, reducing the amount of protocol messages to exchange.
See also Delivery.Reject ¶
---
ROGER NOTE: If a tag (or a tag range when rejecting multiple tags) is from a previously disconnected channel, a ErrCantAcknowledgeOrphans will be returned, which is a new error type specific to the Roger implementation.
When nacking multiple tags, it is possible that some of the tags will be from a closed underlying channel, and therefore be orphaned, and some will be from the current channel, and therefore be successful. In such cases, the ErrCantAcknowledgeOrphans will still be returned, and can be checked for which tag ranges could not be rejected.
func (*Channel) Test ¶
func (channel *Channel) Test(tb testing.TB) *ChannelTesting
Test returns an object with methods for testing the Channel.
Example ¶
package main import ( "context" "fmt" "github.com/peake100/rogerRabbit-go/pkg/amqp" "github.com/peake100/rogerRabbit-go/pkg/amqptest" "testing" "time" ) func main() { // Get a new connection to our test broker. connection, err := amqp.DialCtx(context.Background(), amqptest.TestDialAddress) if err != nil { panic(err) } defer connection.Close() // Get a new channel from our robust connection for publishing. The channel is // created with our default middleware. channel, err := connection.Channel() if err != nil { panic(err) } // Get a channel testing harness. In a real test function, you would pass the // test's *testing.T value. Here, we will just pass a dummy one. testHarness := channel.Test(new(testing.T)) // We can use the test harness to force the channel to reconnect. If a reconnection // does not occur before the passed context expires, the test will be failed. ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() testHarness.ForceReconnect(ctx) // We can check how many times a reconnection has occurred. The first time we // connect to the broker is counted, so we should get '2': fmt.Println("RECONNECTION COUNT:", testHarness.ReconnectionCount()) // exit. // RECONNECTION COUNT: 2 }
Output:
func (*Channel) Tx ¶
Tx puts the channel into transaction mode on the server. All publishings and acknowledgments following this method will be atomically committed or rolled back for a single queue. Call either Channel.TxCommit or Channel.TxRollback to leave a this transaction and immediately start a new transaction.
The atomicity across multiple queues is not defined as queue declarations and bindings are not included in the transaction.
The behavior of publishings that are delivered as mandatory or immediate while the channel is in a transaction is not defined.
Once a channel has been put into transaction mode, it cannot be taken out of transaction mode. Use a different channel for non-transactional semantics.
---
ROGER NOTE: transactions are not currently implemented, and calling any of the Tx methods will result in a panic. The author of this library is not familiar with the intricacies of amqp transactions, and how a channel in a transaction state should behave over a disconnect.
PRs to add this functionality are welcome.
func (*Channel) TxCommit ¶
TxCommit atomically commits all publishings and acknowledgments for a single queue and immediately start a new transaction.
Calling this method without having called Channel.Tx is an error.
---
ROGER NOTE: transactions are not currently implemented, and calling any of the Tx methods will result in a panic. The author of this library is not familiar with the intricacies of amqp transactions, and how a channel in a transaction state should behave over a disconnect.
PRs to add this functionality are welcome.
func (*Channel) TxRollback ¶
TxRollback atomically rolls back all publishings and acknowledgments for a single queue and immediately start a new transaction.
Calling this method without having called Channel.Tx is an error.
ROGER NOTE: transactions are not currently implemented, and calling any of the Tx methods will result in a panic. The author of this library is not familiar with the intricacies of amqp transactions, and how a channel in a transaction state should behave over a disconnect.
PRs to add this functionality are welcome.
type ChannelMiddlewares ¶
type ChannelMiddlewares struct {
// contains filtered or unexported fields
}
ChannelMiddlewares holds all middlewares to be registered on transports created with a Config.
func (*ChannelMiddlewares) AddAck ¶
func (config *ChannelMiddlewares) AddAck(middleware amqpmiddleware.Ack)
AddAck adds a new middleware to be invoked on Channel.Ack method calls.
func (*ChannelMiddlewares) AddChannelReconnect ¶
func (config *ChannelMiddlewares) AddChannelReconnect(middleware amqpmiddleware.ChannelReconnect)
AddChannelReconnect adds a new middleware to be invoked on a Channel reconnection event.
func (*ChannelMiddlewares) AddClose ¶
func (config *ChannelMiddlewares) AddClose(middleware amqpmiddleware.Close)
AddClose adds a new middleware to be invoked on Channel.Close method calls.
func (*ChannelMiddlewares) AddConfirm ¶
func (config *ChannelMiddlewares) AddConfirm(middleware amqpmiddleware.Confirm)
AddConfirm adds a new middleware to be invoked on Channel.Confirm method calls.
func (*ChannelMiddlewares) AddConsume ¶
func (config *ChannelMiddlewares) AddConsume(middleware amqpmiddleware.Consume)
AddConsume adds a new middleware to be invoked on Channel.Consume method calls.
NOTE: this is a distinct middleware from AddConsumeEvents, which fires on every delivery sent from the broker. This event only fires once when the Channel.Consume method is first called.
func (*ChannelMiddlewares) AddConsumeEvents ¶
func (config *ChannelMiddlewares) AddConsumeEvents(middleware amqpmiddleware.ConsumeEvents)
AddConsumeEvents adds a new middleware to be invoked on events sent to callers of Channel.Consume.
func (*ChannelMiddlewares) AddExchangeBind ¶
func (config *ChannelMiddlewares) AddExchangeBind(middleware amqpmiddleware.ExchangeBind)
AddExchangeBind adds a new middleware to be invoked on Channel.ExchangeBind method calls.
func (*ChannelMiddlewares) AddExchangeDeclare ¶
func (config *ChannelMiddlewares) AddExchangeDeclare(middleware amqpmiddleware.ExchangeDeclare)
AddExchangeDeclare adds a new middleware to be invoked on Channel.ExchangeDeclare method calls.
func (*ChannelMiddlewares) AddExchangeDeclarePassive ¶
func (config *ChannelMiddlewares) AddExchangeDeclarePassive(middleware amqpmiddleware.ExchangeDeclare)
AddExchangeDeclarePassive adds a new middleware to be invoked on Channel.ExchangeDeclarePassive method calls.
func (*ChannelMiddlewares) AddExchangeDelete ¶
func (config *ChannelMiddlewares) AddExchangeDelete(middleware amqpmiddleware.ExchangeDelete)
AddExchangeDelete adds a new middleware to be invoked on Channel.ExchangeDelete method calls.
func (*ChannelMiddlewares) AddExchangeUnbind ¶
func (config *ChannelMiddlewares) AddExchangeUnbind(middleware amqpmiddleware.ExchangeUnbind)
AddExchangeUnbind adds a new middleware to be invoked on Channel.ExchangeUnbind method calls.
func (*ChannelMiddlewares) AddFlow ¶
func (config *ChannelMiddlewares) AddFlow(middleware amqpmiddleware.Flow)
AddFlow adds a new middleware to be invoked on Channel.Flow method calls.
func (*ChannelMiddlewares) AddGet ¶
func (config *ChannelMiddlewares) AddGet(middleware amqpmiddleware.Get)
AddGet adds a new middleware to be invoked on Channel.Get method calls.
func (*ChannelMiddlewares) AddNack ¶
func (config *ChannelMiddlewares) AddNack(middleware amqpmiddleware.Nack)
AddNack adds a new middleware to be invoked on Channel.Nack method calls.
func (*ChannelMiddlewares) AddNotifyCancel ¶
func (config *ChannelMiddlewares) AddNotifyCancel(middleware amqpmiddleware.NotifyCancel)
AddNotifyCancel adds a new middleware to be invoked on Channel.NotifyCancel method calls.
func (*ChannelMiddlewares) AddNotifyCancelEvents ¶
func (config *ChannelMiddlewares) AddNotifyCancelEvents(middleware amqpmiddleware.NotifyCancelEvents)
AddNotifyCancelEvents adds a new middleware to be invoked on events sent to callers of Channel.NotifyCancel.
func (*ChannelMiddlewares) AddNotifyClose ¶
func (config *ChannelMiddlewares) AddNotifyClose(middleware amqpmiddleware.NotifyClose)
AddNotifyClose adds a new middleware to be invoked on Channel.NotifyClose method calls.
func (*ChannelMiddlewares) AddNotifyCloseEvents ¶
func (config *ChannelMiddlewares) AddNotifyCloseEvents(middleware amqpmiddleware.NotifyCloseEvents)
AddNotifyCloseEvents adds a new middleware to be invoked on all events sent to callers of Channel.NotifyClose.
func (*ChannelMiddlewares) AddNotifyConfirm ¶
func (config *ChannelMiddlewares) AddNotifyConfirm(middleware amqpmiddleware.NotifyConfirm)
AddNotifyConfirm adds a new middleware to be invoked on Channel.NotifyConfirm method calls.
func (*ChannelMiddlewares) AddNotifyConfirmEvents ¶
func (config *ChannelMiddlewares) AddNotifyConfirmEvents(middleware amqpmiddleware.NotifyConfirmEvents)
AddNotifyConfirmEvents adds a new middleware to be invoked on events sent to callers of Channel.NotifyConfirm.
func (*ChannelMiddlewares) AddNotifyConfirmOrOrphaned ¶
func (config *ChannelMiddlewares) AddNotifyConfirmOrOrphaned(middleware amqpmiddleware.NotifyConfirmOrOrphaned)
AddNotifyConfirmOrOrphaned adds a new middleware to be invoked on Channel.NotifyConfirmOrOrphaned method calls.
func (*ChannelMiddlewares) AddNotifyConfirmOrOrphanedEvents ¶
func (config *ChannelMiddlewares) AddNotifyConfirmOrOrphanedEvents( middleware amqpmiddleware.NotifyConfirmOrOrphanedEvents, )
AddNotifyConfirmOrOrphanedEvents adds a new middleware to be invoked on events sent to callers of Channel.NotifyConfirmOrOrphaned.
func (*ChannelMiddlewares) AddNotifyDial ¶
func (config *ChannelMiddlewares) AddNotifyDial(middleware amqpmiddleware.NotifyDial)
AddNotifyDial adds a new middleware to be invoked on Channel.NotifyDial method calls.
func (*ChannelMiddlewares) AddNotifyDialEvents ¶
func (config *ChannelMiddlewares) AddNotifyDialEvents(middleware amqpmiddleware.NotifyDialEvents)
AddNotifyDialEvents adds a new middleware to be invoked on all events sent to callers of Channel.NotifyDial.
func (*ChannelMiddlewares) AddNotifyDisconnect ¶
func (config *ChannelMiddlewares) AddNotifyDisconnect(middleware amqpmiddleware.NotifyDisconnect)
AddNotifyDisconnect adds a new middleware to be invoked on Channel.NotifyDisconnect method calls.
func (*ChannelMiddlewares) AddNotifyDisconnectEvents ¶
func (config *ChannelMiddlewares) AddNotifyDisconnectEvents(middleware amqpmiddleware.NotifyDisconnectEvents)
AddNotifyDisconnectEvents adds a new middleware to be invoked on all events sent to callers of Channel.NotifyDial.
func (*ChannelMiddlewares) AddNotifyFlow ¶
func (config *ChannelMiddlewares) AddNotifyFlow(middleware amqpmiddleware.NotifyFlow)
AddNotifyFlow adds a new middleware to be invoked on Channel.NotifyFlow method calls.
func (*ChannelMiddlewares) AddNotifyFlowEvents ¶
func (config *ChannelMiddlewares) AddNotifyFlowEvents(middleware amqpmiddleware.NotifyFlowEvents)
AddNotifyFlowEvents adds a new middleware to be invoked on events sent to callers of Channel.NotifyCancel.
func (*ChannelMiddlewares) AddNotifyPublish ¶
func (config *ChannelMiddlewares) AddNotifyPublish(middleware amqpmiddleware.NotifyPublish)
AddNotifyPublish adds a new middleware to be invoked on Channel.NotifyPublish method calls.
func (*ChannelMiddlewares) AddNotifyPublishEvents ¶
func (config *ChannelMiddlewares) AddNotifyPublishEvents(middleware amqpmiddleware.NotifyPublishEvents)
AddNotifyPublishEvents adds a new middleware to be invoked on events sent to callers of Channel.NotifyPublish.
func (*ChannelMiddlewares) AddNotifyReturn ¶
func (config *ChannelMiddlewares) AddNotifyReturn(middleware amqpmiddleware.NotifyReturn)
AddNotifyReturn adds a new middleware to be invoked on Channel.NotifyReturn method calls.
func (*ChannelMiddlewares) AddNotifyReturnEvents ¶
func (config *ChannelMiddlewares) AddNotifyReturnEvents(middleware amqpmiddleware.NotifyReturnEvents)
AddNotifyReturnEvents adds a new middleware to be invoked on events sent to callers of Channel.NotifyReturn.
func (*ChannelMiddlewares) AddProviderFactory ¶
func (config *ChannelMiddlewares) AddProviderFactory(factory amqpmiddleware.ProviderFactory)
AddProviderFactory adds a factory function which creates a new middleware provider value which must implement one of the Middleware Provider interfaces from the amqpmiddleware package, like amqpmiddleware.ProvidesQueueDeclare, and must implement at-minimum amqpmiddleware.ProvidesMiddleware.
When middleware is registered on a new Channel, the provider factory will be called and all provider methods will be registered as middleware.
If you wish the same provider value's methods to be used as middleware for every *Channel created by a *Connection, consider using AddProviderMethods instead.
func (*ChannelMiddlewares) AddProviderMethods ¶
func (config *ChannelMiddlewares) AddProviderMethods(provider amqpmiddleware.ProvidesMiddleware) error
AddProviderMethods adds a amqpmiddleware.ProvidesMiddleware's methods as Middleware. If this method is invoked directly by the user, the same type value's method will be added to all *Channel values created by a *Connection.
If a new provider value should be made for each *Channel, consider using AddProviderFactory instead.
func (*ChannelMiddlewares) AddPublish ¶
func (config *ChannelMiddlewares) AddPublish(middleware amqpmiddleware.Publish)
AddPublish adds a new middleware to be invoked on Channel.Publish method calls.
func (*ChannelMiddlewares) AddQoS ¶
func (config *ChannelMiddlewares) AddQoS(middleware amqpmiddleware.QoS)
AddQoS adds a new middleware to be invoked on Channel.Qos method calls.
func (*ChannelMiddlewares) AddQueueBind ¶
func (config *ChannelMiddlewares) AddQueueBind(middleware amqpmiddleware.QueueBind)
AddQueueBind adds a new middleware to be invoked on Channel.QueueBind method calls.
func (*ChannelMiddlewares) AddQueueDeclare ¶
func (config *ChannelMiddlewares) AddQueueDeclare(middleware amqpmiddleware.QueueDeclare)
AddQueueDeclare adds a new middleware to be invoked on Channel.QueueDeclare method calls.
func (*ChannelMiddlewares) AddQueueDeclarePassive ¶
func (config *ChannelMiddlewares) AddQueueDeclarePassive(middleware amqpmiddleware.QueueDeclare)
AddQueueDeclarePassive adds a new middleware to be invoked on Channel.QueueDeclarePassive method calls.
func (*ChannelMiddlewares) AddQueueDelete ¶
func (config *ChannelMiddlewares) AddQueueDelete(middleware amqpmiddleware.QueueDelete)
AddQueueDelete adds a new middleware to be invoked on Channel.QueueDelete method calls.
func (*ChannelMiddlewares) AddQueueInspect ¶
func (config *ChannelMiddlewares) AddQueueInspect(middleware amqpmiddleware.QueueInspect)
AddQueueInspect adds a new middleware to be invoked on Channel.QueueInspect method calls.
func (*ChannelMiddlewares) AddQueuePurge ¶
func (config *ChannelMiddlewares) AddQueuePurge(middleware amqpmiddleware.QueuePurge)
AddQueuePurge adds a new middleware to be invoked on Channel.QueuePurge method calls.
func (*ChannelMiddlewares) AddQueueUnbind ¶
func (config *ChannelMiddlewares) AddQueueUnbind(middleware amqpmiddleware.QueueUnbind)
AddQueueUnbind adds a new middleware to be invoked on Channel.QueueUnbind method calls.
func (*ChannelMiddlewares) AddReject ¶
func (config *ChannelMiddlewares) AddReject(middleware amqpmiddleware.Reject)
AddReject adds a new middleware to be invoked on Channel.Reject method calls.
type ChannelTesting ¶
type ChannelTesting struct { // TransportTesting embedded common methods between Connections and Channels. *TransportTesting // contains filtered or unexported fields }
ChannelTesting exposes a number of methods designed for testing.
func (*ChannelTesting) ConnTest ¶
func (tester *ChannelTesting) ConnTest() *TransportTesting
ConnTest returns a tester for the RogerConnection feeding this channel.
func (*ChannelTesting) GetMiddlewareProvider ¶
func (tester *ChannelTesting) GetMiddlewareProvider( id amqpmiddleware.ProviderTypeID, ) amqpmiddleware.ProvidesMiddleware
GetMiddlewareProvider returns a middleware provider for inspection or fails the test immediately.
func (*ChannelTesting) ReconnectionCount ¶
func (tester *ChannelTesting) ReconnectionCount() uint64
ReconnectionCount returns the number of times this channel has been reconnected.
func (*ChannelTesting) UnderlyingChannel ¶
func (tester *ChannelTesting) UnderlyingChannel() *BasicChannel
UnderlyingChannel returns the current underlying streadway/amqp.Channel being used.
func (*ChannelTesting) UnderlyingConnection ¶
func (tester *ChannelTesting) UnderlyingConnection() *BasicConnection
UnderlyingConnection returns the current underlying streadway/amqp.Connection being used to feed this Channel.
type Config ¶
type Config struct { // The SASL mechanisms to try in the client request, and the successful // mechanism used on the Connection object. // If SASL is nil, PlainAuth from the URL is used. SASL []Authentication // Vhost specifies the namespace of permissions, exchanges, queues and // bindings on the server. Dial sets this to the path parsed from the URL. Vhost string ChannelMax int // 0 max channels means 2^16 - 1 FrameSize int // 0 max bytes means unlimited Heartbeat time.Duration // less than 1s uses the server's interval // TLSClientConfig specifies the client configuration of the TLS connection // when establishing a tls livesOnce. // If the URL uses an amqps scheme, then an empty tls.Config with the // ServerName from the URL is used. TLSClientConfig *tls.Config // Properties is table of properties that the client advertises to the server. // This is an optional setting - if the application does not set this, // the underlying library will use a generic set of client properties. Properties Table // Connection locale that we expect to always be en_US // Even though servers must return it as per the AMQP 0-9-1 spec, // we are not aware of it being used other than to satisfy the spec requirements Locale string // Dial returns a net.Conn prepared for a TLS handshake with TSLClientConfig, // then an AMQP connection handshake. // If Dial is nil, net.DialTimeout with a 30s connection and 30s deadline is // used during TLS and AMQP handshaking. Dial func(network, addr string) (net.Conn, error) // If set to true, the default handlers will not be registered on connection or // channels created as a result of passing this config. NoDefaultMiddleware bool // DefaultLoggerLevel is the logger level for the default logging middleware. // // If NoDefaultMiddleware is true, this setting will have no effect. // // Default: zerolog.InfoLevel. DefaultLoggerLevel zerolog.Level // ConnectionMiddleware holds middleware to add to connection method and event // handlers. ConnectionMiddleware ConnectionMiddlewares // ChannelMiddleware holds middleware to add to channel method and event handlers. ChannelMiddleware ChannelMiddlewares }
Config is used in DialConfig and Open to specify the desired tuning parameters used during a connection open handshake. The negotiated tuning will be stored in the returned connection's Config field.
---
ROGER NOTE: This config type is a re-implementation of streadway/amqp.Config. We any code that can declare such a config will work with this type. In the future this type may add additional options for rogerRabbit-go/amqp.
func DefaultConfig ¶
func DefaultConfig() Config
DefaultConfig returns the default config for Dial() as it is in the streadway application.
type Confirmation ¶
type Confirmation = internal.Confirmation
Confirmation is an alias to datamodels.Confirmation.
type Connection ¶
type Connection struct {
// contains filtered or unexported fields
}
Connection manages the serialization and deserialization of frames from IO and dispatches the frames to the appropriate channel. All RPC methods and asynchronous Publishing, Delivery, Ack, Nack and Return messages are multiplexed on this channel. There must always be active receivers for every asynchronous message on this connection.
---
ROGER NOTE: A robust connection acts as a normal connection except that is automatically re-dials the broker when the underlying connection is lost.
Unless otherwise noted at the beginning of their descriptions, all methods work exactly as their streadway counterparts, but will automatically re-attempt on ErrClosed errors. All other errors will be returned as normal. Descriptions have been copy-pasted from the streadway library for convenience.
As this library evolves, other error types may be added to the list of errors we will automatically suppress and re-establish connection for, but in these early days, ErrClosed seems like a good place to start.
func Dial ¶
func Dial(url string) (*Connection, error)
Dial accepts a string in the AMQP URI format and returns a new Connection over TCP using PlainAuth. Defaults to a server heartbeat interval of 10 seconds and sets the handshake deadline to 30 seconds. After handshake, deadlines are cleared.
Dial uses the zero value of tls.Config when it encounters an amqps:// scheme. It is equivalent to calling DialTLS(amqp, nil).
func DialConfig ¶
func DialConfig(url string, config Config) (*Connection, error)
DialConfig accepts a string in the AMQP URI format and a configuration for the livesOnce and connection setup, returning a new Connection. Defaults to a server heartbeat interval of 10 seconds and sets the initial read deadline to 30 seconds.
func DialConfigCtx ¶
DialConfigCtx is as DialConfig, but endlessly redials the connection until ctx is cancelled. Once returned, cancelling ctx does not affect the connection.
func DialCtx ¶
func DialCtx(ctx context.Context, url string) (*Connection, error)
DialCtx is as Dial, but endlessly redials the connection until ctx is cancelled. Once returned, cancelling ctx does not affect the connection.
func DialTLS ¶
func DialTLS(url string, amqps *tls.Config) (*Connection, error)
DialTLS accepts a string in the AMQP URI format and returns a new Connection over TCP using PlainAuth. Defaults to a server heartbeat interval of 10 seconds and sets the initial read deadline to 30 seconds.
DialTLS uses the provided tls.Config when encountering an amqps:// scheme.
func DialTLSCtx ¶
DialTLSCtx is as DialTLS, but endlessly redials the connection until ctx is cancelled. Once returned, cancelling ctx does not affect the connection.
func (*Connection) Channel ¶
func (conn *Connection) Channel() (*Channel, error)
Channel opens a unique, concurrent server channelConsume to process the bulk of AMQP messages. Any error from methods on this receiver will render the receiver invalid and a new Channel should be opened.
---
ROGER NOTE: Unlike the normal channels, roger channels will automatically reconnect on all errors until Channel.Close() is called.
func (*Connection) Close ¶
func (manager *Connection) Close() error
Close the robust connection. This both closes the current connection and keeps it from reconnecting.
func (*Connection) IsClosed ¶
func (manager *Connection) IsClosed() bool
IsClosed returns true if the connection is marked as closed, otherwise false is returned.
--
ROGER NOTE: unlike streadway/amqp, which only implements IsClosed() on connection objects, rogerRabbit makes IsClosed() available on both connections and channels. IsClosed() will return false until the connection / channel is shut down, even if the underlying connection is currently disconnected and waiting to reconnectMiddleware.
func (*Connection) NotifyClose ¶
NotifyClose is as NotifyClose on streadway Connection/Channel.NotifyClose. Subscribers to Close events will not be notified when a reconnection occurs under the hood, only when the roger Connection or Channel is closed by calling the Close method. This mirrors the streadway implementation, where Close events are only sent once when the livesOnce object becomes unusable.
For finer-grained connection status, see NotifyDial and NotifyDisconnect, which will both send individual events when the connection is lost or re-acquired.
func (*Connection) NotifyDial ¶
NotifyDial is new for robust Roger transportType objects. NotifyDial will send all subscribers an event notification every time we try to re-acquire a connection. This will include both failure AND successes.
func (*Connection) NotifyDisconnect ¶
NotifyDisconnect is new for robust Roger transportType objects. NotifyDisconnect will send all subscribers an event notification every time the underlying connection is lost.
func (*Connection) Test ¶
func (conn *Connection) Test(tb testing.TB) *ConnectionTesting
Test returns a ConnectionTesting object with a number of helper methods for testing Connection objects.
type ConnectionMiddlewares ¶
type ConnectionMiddlewares struct {
// contains filtered or unexported fields
}
ConnectionMiddlewares holds the middleware to add to Connection methods and events.
func (*ConnectionMiddlewares) AddClose ¶
func (config *ConnectionMiddlewares) AddClose(middleware amqpmiddleware.Close)
AddClose adds a new middleware to be invoked on a Connection.Close call.
func (*ConnectionMiddlewares) AddConnectionReconnect ¶
func (config *ConnectionMiddlewares) AddConnectionReconnect(middleware amqpmiddleware.ConnectionReconnect)
AddConnectionReconnect adds a new middleware to be invoked when a connection attempts to re-establish a connection.
func (*ConnectionMiddlewares) AddNotifyClose ¶
func (config *ConnectionMiddlewares) AddNotifyClose(middleware amqpmiddleware.NotifyClose)
AddNotifyClose adds a new middleware to be invoked on a Connection.NotifyClose call.
func (*ConnectionMiddlewares) AddNotifyCloseEvents ¶
func (config *ConnectionMiddlewares) AddNotifyCloseEvents(middleware amqpmiddleware.NotifyCloseEvents)
AddNotifyCloseEvents adds a new middleware to be invoked on each Connection.NotifyDial event.
func (*ConnectionMiddlewares) AddNotifyDial ¶
func (config *ConnectionMiddlewares) AddNotifyDial(middleware amqpmiddleware.NotifyDial)
AddNotifyDial adds a new middleware to be invoked on a Connection.NotifyDial call.
func (*ConnectionMiddlewares) AddNotifyDialEvents ¶
func (config *ConnectionMiddlewares) AddNotifyDialEvents(middleware amqpmiddleware.NotifyDialEvents)
AddNotifyDialEvents adds a new middleware to be invoked on each Connection.NotifyDial event.
func (*ConnectionMiddlewares) AddNotifyDisconnect ¶
func (config *ConnectionMiddlewares) AddNotifyDisconnect(middleware amqpmiddleware.NotifyDisconnect)
AddNotifyDisconnect adds a new middleware to be invoked on a Connection.NotifyDisconnect call.
func (*ConnectionMiddlewares) AddNotifyDisconnectEvents ¶
func (config *ConnectionMiddlewares) AddNotifyDisconnectEvents(middleware amqpmiddleware.NotifyDisconnectEvents)
AddNotifyDisconnectEvents adds a new middleware to be invoked on each Connection.NotifyDial event.
func (*ConnectionMiddlewares) AddProviderFactory ¶
func (config *ConnectionMiddlewares) AddProviderFactory(factory amqpmiddleware.ProviderFactory)
AddProviderFactory adds a factory function which creates a new middleware provider value which must implement one of the Middleware Provider interfaces from the amqpmiddleware package, like amqpmiddleware.ProvidesClose.
When middleware is registered on a new Connection, the provider factory will be called and all provider methods will be registered as middleware.
If you wish the same provider value's methods to be used as middleware for every *Connection created by a Config, consider using AddProviderMethods instead.
func (*ConnectionMiddlewares) AddProviderMethods ¶
func (config *ConnectionMiddlewares) AddProviderMethods(provider amqpmiddleware.ProvidesMiddleware) error
AddProviderMethods adds a amqpmiddleware.ProvidesMiddleware's methods as Middleware. If this method is invoked directly by the user, the same type value's method will be added to all *Connection values created by a Config
If a new provider value should be made for each Connection, consider using AddProviderFactory instead.
type ConnectionTesting ¶
type ConnectionTesting struct { TransportTesting // contains filtered or unexported fields }
ConnectionTesting offers methods for running tests with Connection.
func (*ConnectionTesting) UnderlyingConn ¶
func (tester *ConnectionTesting) UnderlyingConn() *BasicConnection
UnderlyingConn returns the current underlying streadway/amqp.Connection.
type Decimal ¶
Decimal matches the AMQP decimal type. Scale is the number of decimal digits Scale == 2, Value == 12345, Decimal == 123.45
type ErrCantAcknowledgeOrphans ¶
type ErrCantAcknowledgeOrphans = defaultmiddlewares.ErrCantAcknowledgeOrphans
ErrCantAcknowledgeOrphans is returned when an acknowledgement method (ack, nack, reject) cannot be completed because the original channel it was consumed from has been closed and replaced with a new one. When part of a multi-ack, it's possible that SOME tags will be orphaned and some will succeed, this error contains detailed information on both groups
type Error ¶
Error captures the code and reason a channelConsume or connection has been closed by the server.
type Publishing ¶
type Publishing = streadway.Publishing
Publishing captures the client message sent to the server. The fields outside of the Headers table included in this struct mirror the underlying fields in the content frame. They use native types for convenience and efficiency.
type Queue ¶
Queue captures the current server state of the queue on the server returned from Channel.QueueDeclare or Channel.QueueInspect.
type Return ¶
Return captures a flattened struct of fields returned by the server when a Publishing is unable to be delivered either due to the `mandatory` flag set and no route found, or `immediate` flag set and no free consumer.
type Table ¶
Table stores user supplied fields of the following types:
bool byte float32 float64 int int16 int32 int64 nil string time.Time amqp.Decimal amqp.Table []byte []interface{} - containing above types
Functions taking a table will immediately fail when the table contains a value of an unsupported type.
The caller must be specific in which precision of integer it wishes to encode.
Use a type assertion when reading values from a table for type conversion.
RabbitMQ expects int32 for integer values.
type TestReconnectSignaler ¶
type TestReconnectSignaler struct {
// contains filtered or unexported fields
}
TestReconnectSignaler allows us to block until a reconnection occurs during a test.
func (*TestReconnectSignaler) WaitOnReconnect ¶
func (signaler *TestReconnectSignaler) WaitOnReconnect(ctx context.Context)
WaitOnReconnect blocks until a reconnection of the underlying livesOnce occurs. Once the first reconnection event occurs, this object will no longer block and a new signaler will need to be created for the next re-connection.
If no context is passed a context with 3-second timeout will be used.
type TransportTesting ¶
type TransportTesting struct {
// contains filtered or unexported fields
}
TransportTesting provides testing methods for testing Channel and Connection.
func (*TransportTesting) BlockReconnect ¶
func (tester *TransportTesting) BlockReconnect()
BlockReconnect blocks a livesOnce from reconnecting. If too few calls to UnblockReconnect() are made, the block will be removed at the end of the test.
func (*TransportTesting) DisconnectTransport ¶
func (tester *TransportTesting) DisconnectTransport()
DisconnectTransport closes the underlying livesOnce to force a reconnection.
func (*TransportTesting) ForceReconnect ¶
func (tester *TransportTesting) ForceReconnect(ctx context.Context)
ForceReconnect forces a disconnect of the channel or connection and waits for a reconnection to occur or ctx to cancel. If a nil context is passed, a context with a 3-second timeout will be used instead.
func (*TransportTesting) SignalOnReconnect ¶
func (tester *TransportTesting) SignalOnReconnect() *TestReconnectSignaler
SignalOnReconnect returns a signaler that can be used to wait on the next reconnection event of the livesOnce.
func (*TransportTesting) TransportLock ¶
func (tester *TransportTesting) TransportLock() *sync.RWMutex
TransportLock is the underlying lock which controls access to the channel / connection. When held for read or write, a reconnection of the underlying livesOnce cannot occur.
func (*TransportTesting) UnblockReconnect ¶
func (tester *TransportTesting) UnblockReconnect()
UnblockReconnect unblocks the livesOnce from reconnecting after calling BlockReconnect()
Source Files ¶
- channel.go
- channelEventRelays.go
- channelEventRelaysSync.go
- channelHandlers.go
- channelHandlersBuilder.go
- common.go
- config.go
- configChannelMiddleware.go
- configConnMiddleware.go
- connection.go
- connectionDial.go
- connectionTester.go
- consts.go
- dataAliases.go
- doc.go
- errAliases.go
- eventRelayConsume.go
- eventRelayNotifyCancel.go
- eventRelayNotifyFlow.go
- eventRelayNotifyPublish.go
- eventRelayNotifyReturn.go
- streadwayAliases.go
- transportManager.go
- transportManagerHandlers.go
- transportManagerHandlersBuilder.go
- transportManagerReconnect.go
Directories ¶
Path | Synopsis |
---|---|
Package amqpmiddleware defines middleware signatures for methods on *amqp.Channel.
|
Package amqpmiddleware defines middleware signatures for methods on *amqp.Channel. |
Package defaultmiddlewares houses the default middleware for amqp.Channel.
|
Package defaultmiddlewares houses the default middleware for amqp.Channel. |
Package internal houses development helpers and should not be consumed by users.
|
Package internal houses development helpers and should not be consumed by users. |