Documentation ¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
var ErrTooManyFailures = errors.New("too many consecutive failures")
Functions ¶
This section is empty.
Types ¶
type Connection ¶
type Connection struct { *amqp.Connection // contains filtered or unexported fields }
Connection represents a wrapper over amqp.Connection for use with Server and Publisher.
func NewConnection ¶
func NewConnection(factory ConnectionFactory, backoff backoff.Backoff) (*Connection, error)
NewConnection creates a new *Connection as in NewConnectionWithContext, but uses context.Background.
func NewConnectionWithContext ¶
func NewConnectionWithContext(ctx context.Context, factory ConnectionFactory, backoff backoff.Backoff) (*Connection, error)
NewConnectionWithContext accepts connection factory and a backoff returning *Connection. When amqp connection is closed unexpectedly, Connection attempts to reconnect with backoff. Context is passed to backoff to cancel reconnection.
func (*Connection) Close ¶
func (c *Connection) Close()
Close closes underlying *amqp.Connection. It notifies all subscribers about closure with nil error. Subscribers can subscribe to Close event with NotifyClose.
func (*Connection) IsAlive ¶
func (c *Connection) IsAlive() bool
IsAlive states whether underlying amqp.Connection is closed.
func (*Connection) NotifyClose ¶
func (c *Connection) NotifyClose(ch chan error) chan error
NotifyClose registers a new subscriber for closure events. When a connection is closed, subscriber will receive a notification with closure error.
func (*Connection) NotifyReconnect ¶
func (c *Connection) NotifyReconnect(ch chan error) chan error
NotifyReconnect registers a new subscriber for reconnect events. When underlying amqp.Connection is shutdown unexpectedly and reconnection is finished, subscriber will receive a notification with reconnect status. Reconnect can be finished with an error if canceled by context.
type ConnectionFactory ¶
type ConnectionFactory func() (*amqp.Connection, error)
ConnectionFactory is a wrapper used for reconnecting to rabbitmq server.
type ConsumerParams ¶
ConsumerParams generalizes amqp consumer settings
type ControllerFunc ¶
type ControllerFunc func(ctx *DeliveryContext)
ControllerFunc represents controller type used to process delivered messages.
type DeliveryContext ¶
func NewDeliveryContext ¶
func (*DeliveryContext) Ack ¶
func (c *DeliveryContext) Ack() bool
func (*DeliveryContext) BindJSON ¶
func (c *DeliveryContext) BindJSON(ptr interface{}) error
type ExchangeParams ¶
type ExchangeParams struct { Name string Kind string Durable bool AutoDelete bool Internal bool NoWait bool Args amqp.Table }
ExchangeParams generalizes amqp exchange settings
type GroupOption ¶
type GroupOption func(g *RouterGroup)
func WithNumWorkers ¶
func WithNumWorkers(workers int) GroupOption
WithNumWorkers informs Server to use multiple amqp.Channel for that particular routing group.
func WithRouterEngine ¶
func WithRouterEngine(engine RouterEngine) GroupOption
WithRouterEngine defines what engine will be used to route delivered message to different controllers.
type Publisher ¶
type Publisher struct { Conn *Connection // contains filtered or unexported fields }
Publisher is used to publish messages to single exchange. On construction, publisher creates a new channel and declares an exchange. It features an option to declare a queue as well. It features an option to retry publish attempts on failures.
func NewPublisher ¶
func NewPublisher(connection *Connection, exchangeParams ExchangeParams, opts ...PublisherOption) (*Publisher, error)
NewPublisher creates a new *Publisher. Publisher is used to declare exchange and publish messages to this exchange.
type PublisherOption ¶
type PublisherOption func(p *Publisher)
func WithQueueDeclaration ¶
func WithQueueDeclaration(queueParams QueueParams, bindingKey string) PublisherOption
WithQueueDeclaration makes publisher declare a queue on rabbitmq server and bind it to Publisher exchange by bindingKey parameter.
func WithRetries ¶
func WithRetries(backoff backoff.Backoff, consecutiveFailuresBeforeBreak uint32) PublisherOption
WithRetries makes Publisher.Publish retry on failure with a backoff. consecutiveFailuresBeforeBreak is used as a simple circuit breaker. When consecutiveFailuresBeforeBreak is greater than 0 and reached, Publisher.Publish will return ErrTooManyFailures and Publisher.Broken will return true.
type QualityOfService ¶
QualityOfService generalizes amqp qos settings
type QueueParams ¶
type QueueParams struct { Name string Durable bool AutoDelete bool Exclusive bool NoWait bool Args amqp.Table }
QueueParams generalizes amqp queue settings
type Router ¶
type Router struct {
// contains filtered or unexported fields
}
Router represents the level of message routing within the queue. It registers multiple RouterGroup to be used by Server for exchange and queue declarations and queue bindings.
func (*Router) Group ¶
func (r *Router) Group(exchange ExchangeParams, queue QueueParams, qos QualityOfService, consumer ConsumerParams, opts ...GroupOption) *RouterGroup
Group accepts generalized transport parameters and constructs RouterGroup. It is then used by Server to route messages from defined group queue to controllers based on queue routing.
type RouterEngine ¶
type RouterEngine interface { // AddBinding adds routing from bindingKey to controllers // When used by a Server, Server binds RouterGroup queue to RouterGroup exchange on that bindingKey AddBinding(bindingKey string, controllers ...ControllerFunc) // Route selects all suitable controllers according to the configured routing parameters Route(routingKey string) []ControllerFunc }
RouterEngine represents message routing logic within a queue. When a message is delivered to a queue, RouterEngine is used to route messages to different controllers based on specific pattern.
func NewDirectRouterEngine ¶
func NewDirectRouterEngine() RouterEngine
NewDirectRouterEngine is used for direct message routing as described by amqp0-9-1 protocol. For reference on direct routing see https://www.rabbitmq.com/resources/specs/amqp0-9-1.pdf.
func NewTopicRouterEngine ¶
func NewTopicRouterEngine() RouterEngine
NewTopicRouterEngine is a topic routing mechanism to route messages as described by amqp0-9-1 protocol. For reference on topic routing see https://www.rabbitmq.com/resources/specs/amqp0-9-1.pdf.
type RouterGroup ¶
type RouterGroup struct {
// contains filtered or unexported fields
}
RouterGroup generalized consumer's transport, it unites exchange and queue as a single transport line
func (*RouterGroup) Route ¶
func (g *RouterGroup) Route(routingKey string, controllers ...ControllerFunc) *RouterGroup
Route registers message routing for one particular queue defined in RouterGroup. When receiving messages in a queue, deliveries will be routed to matched controllers based on routingKey parameter. routingKey parameter is also used by Server to bind RouterGroup queue to corresponding RouterGroup exchange. controllers' chain will be executed for every matched route.
type Server ¶
type Server struct { Conn *Connection // contains filtered or unexported fields }
Server is used to accept messages from rabbitmq server and route then to controllers registered in Router.
func NewServer ¶
func NewServer(conn *Connection, router *Router) *Server
NewServer constructs a new *Server.
func (*Server) ListenAndServe ¶
ListenAndServe blocks on listening for deliveries. It creates a separate channel for each worker in every RouterGroup. Channel is then used to receive messages and route then to RouterGroup controllers.