rabbitmq

package module
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 4, 2023 License: MIT Imports: 9 Imported by: 1

README

About The Project

...

Getting Started

...

Installation

go get https://github.com/MashinIvan/rabbitmq

Examples

...

Contributing

...

License

Distributed under the MIT License. See LICENSE.txt for more information.

(back to top)

Contact

Project Link: https://github.com/MashinIvan/rabbitmq

(back to top)

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrTooManyFailures = errors.New("too many consecutive failures")

Functions

This section is empty.

Types

type Connection

type Connection struct {
	*amqp.Connection
	// contains filtered or unexported fields
}

Connection represents a wrapper over amqp.Connection for use with Server and Publisher.

func NewConnection

func NewConnection(factory ConnectionFactory, backoff backoff.Backoff) (*Connection, error)

NewConnection creates a new *Connection as in NewConnectionWithContext, but uses context.Background.

func NewConnectionWithContext

func NewConnectionWithContext(ctx context.Context, factory ConnectionFactory, backoff backoff.Backoff) (*Connection, error)

NewConnectionWithContext accepts connection factory and a backoff returning *Connection. When amqp connection is closed unexpectedly, Connection attempts to reconnect with backoff. Context is passed to backoff to cancel reconnection.

func (*Connection) Close

func (c *Connection) Close()

Close closes underlying *amqp.Connection. It notifies all subscribers about closure with nil error. Subscribers can subscribe to Close event with NotifyClose.

func (*Connection) IsAlive

func (c *Connection) IsAlive() bool

IsAlive states whether underlying amqp.Connection is closed.

func (*Connection) NotifyClose

func (c *Connection) NotifyClose(ch chan error) chan error

NotifyClose registers a new subscriber for closure events. When a connection is closed, subscriber will receive a notification with closure error.

func (*Connection) NotifyReconnect

func (c *Connection) NotifyReconnect(ch chan error) chan error

NotifyReconnect registers a new subscriber for reconnect events. When underlying amqp.Connection is shutdown unexpectedly and reconnection is finished, subscriber will receive a notification with reconnect status. Reconnect can be finished with an error if canceled by context.

type ConnectionFactory

type ConnectionFactory func() (*amqp.Connection, error)

ConnectionFactory is a wrapper used for reconnecting to rabbitmq server.

type ConsumerParams

type ConsumerParams struct {
	ConsumerName string
	AutoAck      bool
	ConsumerArgs amqp.Table
}

ConsumerParams generalizes amqp consumer settings

type ControllerFunc

type ControllerFunc func(ctx *DeliveryContext)

ControllerFunc represents controller type used to process delivered messages.

type DeliveryContext

type DeliveryContext struct {
	context.Context
	Delivery amqp.Delivery
	Channel  *amqp.Channel
}

func NewDeliveryContext

func NewDeliveryContext(baseCtx context.Context, delivery amqp.Delivery, ch *amqp.Channel) *DeliveryContext

func (*DeliveryContext) Ack

func (c *DeliveryContext) Ack() bool

func (*DeliveryContext) BindJSON

func (c *DeliveryContext) BindJSON(ptr interface{}) error

func (*DeliveryContext) Nack

func (c *DeliveryContext) Nack(requeue bool, err error) bool

type ExchangeParams

type ExchangeParams struct {
	Name       string
	Kind       string
	Durable    bool
	AutoDelete bool
	Internal   bool
	NoWait     bool
	Args       amqp.Table
}

ExchangeParams generalizes amqp exchange settings

type GroupOption

type GroupOption func(g *RouterGroup)

func WithNumWorkers

func WithNumWorkers(workers int) GroupOption

WithNumWorkers informs Server to use multiple amqp.Channel for that particular routing group.

func WithRouterEngine

func WithRouterEngine(engine RouterEngine) GroupOption

WithRouterEngine defines what engine will be used to route delivered message to different controllers.

type Publisher

type Publisher struct {
	Conn *Connection
	// contains filtered or unexported fields
}

Publisher is used to publish messages to single exchange. On construction, publisher creates a new channel and declares an exchange. It features an option to declare a queue as well. It features an option to retry publish attempts on failures.

func NewPublisher

func NewPublisher(connection *Connection, exchangeParams ExchangeParams, opts ...PublisherOption) (*Publisher, error)

NewPublisher creates a new *Publisher. Publisher is used to declare exchange and publish messages to this exchange.

func (*Publisher) Broken

func (p *Publisher) Broken() bool

Broken is true, if consecutive publish failures is more than Publisher.consecutiveFailuresAllowed

func (*Publisher) Publish

func (p *Publisher) Publish(ctx context.Context, key string, mandatory, immediate bool, msg amqp.Publishing) error

Publish sends a message to rabbitmq server.

type PublisherOption

type PublisherOption func(p *Publisher)

func WithQueueDeclaration

func WithQueueDeclaration(queueParams QueueParams, bindingKey string) PublisherOption

WithQueueDeclaration makes publisher declare a queue on rabbitmq server and bind it to Publisher exchange by bindingKey parameter.

func WithRetries

func WithRetries(backoff backoff.Backoff, consecutiveFailuresBeforeBreak uint32) PublisherOption

WithRetries makes Publisher.Publish retry on failure with a backoff. consecutiveFailuresBeforeBreak is used as a simple circuit breaker. When consecutiveFailuresBeforeBreak is greater than 0 and reached, Publisher.Publish will return ErrTooManyFailures and Publisher.Broken will return true.

type QualityOfService

type QualityOfService struct {
	PrefetchCount int
	PrefetchSize  int
}

QualityOfService generalizes amqp qos settings

type QueueParams

type QueueParams struct {
	Name       string
	Durable    bool
	AutoDelete bool
	Exclusive  bool
	NoWait     bool
	Args       amqp.Table
}

QueueParams generalizes amqp queue settings

type Router

type Router struct {
	// contains filtered or unexported fields
}

Router represents the level of message routing within the queue. It registers multiple RouterGroup to be used by Server for exchange and queue declarations and queue bindings.

func NewRouter

func NewRouter() *Router

NewRouter creates and returns a new Router.

func (*Router) Group

func (r *Router) Group(exchange ExchangeParams, queue QueueParams, qos QualityOfService, consumer ConsumerParams, opts ...GroupOption) *RouterGroup

Group accepts generalized transport parameters and constructs RouterGroup. It is then used by Server to route messages from defined group queue to controllers based on queue routing.

type RouterEngine

type RouterEngine interface {

	// AddBinding adds routing from bindingKey to controllers
	// When used by a Server, Server binds RouterGroup queue to RouterGroup exchange on that bindingKey
	AddBinding(bindingKey string, controllers ...ControllerFunc)

	// Route selects all suitable controllers according to the configured routing parameters
	Route(routingKey string) []ControllerFunc
}

RouterEngine represents message routing logic within a queue. When a message is delivered to a queue, RouterEngine is used to route messages to different controllers based on specific pattern.

func NewDirectRouterEngine

func NewDirectRouterEngine() RouterEngine

NewDirectRouterEngine is used for direct message routing as described by amqp0-9-1 protocol. For reference on direct routing see https://www.rabbitmq.com/resources/specs/amqp0-9-1.pdf.

func NewTopicRouterEngine

func NewTopicRouterEngine() RouterEngine

NewTopicRouterEngine is a topic routing mechanism to route messages as described by amqp0-9-1 protocol. For reference on topic routing see https://www.rabbitmq.com/resources/specs/amqp0-9-1.pdf.

type RouterGroup

type RouterGroup struct {
	// contains filtered or unexported fields
}

RouterGroup generalized consumer's transport, it unites exchange and queue as a single transport line

func (*RouterGroup) Route

func (g *RouterGroup) Route(routingKey string, controllers ...ControllerFunc) *RouterGroup

Route registers message routing for one particular queue defined in RouterGroup. When receiving messages in a queue, deliveries will be routed to matched controllers based on routingKey parameter. routingKey parameter is also used by Server to bind RouterGroup queue to corresponding RouterGroup exchange. controllers' chain will be executed for every matched route.

type Server

type Server struct {
	Conn *Connection
	// contains filtered or unexported fields
}

Server is used to accept messages from rabbitmq server and route then to controllers registered in Router.

func NewServer

func NewServer(conn *Connection, router *Router) *Server

NewServer constructs a new *Server.

func (*Server) ListenAndServe

func (s *Server) ListenAndServe(ctx context.Context) error

ListenAndServe blocks on listening for deliveries. It creates a separate channel for each worker in every RouterGroup. Channel is then used to receive messages and route then to RouterGroup controllers.

func (*Server) Shutdown

func (s *Server) Shutdown(ctx context.Context) error

Shutdown performs graceful shutdown of a Server. When a server is shut down, first every channel is closed for new deliveries. Then Server waits until either a context is cancelled, or every worker stops processing its message.

Directories

Path Synopsis
examples
pkg

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL