Documentation ¶
Overview ¶
Package mq provides an ability to integrate with message broker via AMQP in a declarative way.
Index ¶
- Variables
- type BackoffPolicy
- type Backoffer
- type BindingConfig
- type Bindings
- type Client
- type ClientOpt
- type Config
- type Consumer
- type ConsumerConfig
- type ConsumerHandler
- type ConsumerOpt
- type Consumers
- type Declaration
- type Declarer
- type Exchange
- type ExchangeBinding
- type ExchangeConfig
- type Exchanges
- type MQ
- type Options
- type ProducerConfig
- type Producers
- type Publisher
- type PublisherOpt
- type Queue
- type QueueBinding
- type QueueConfig
- type Queues
Constants ¶
This section is empty.
Variables ¶
var (
ErrNoConnection = errors.New("no connection available")
)
var ErrPublisherDead = errors.New("publisher is dead")
ErrPublisherDead indicates that publisher was canceled, could be returned from Write() and Publish() methods
Functions ¶
This section is empty.
Types ¶
type BackoffPolicy ¶
type BackoffPolicy struct {
// contains filtered or unexported fields
}
BackoffPolicy is a default Backoffer implementation
type Backoffer ¶
Backoffer is interface to hold Backoff strategy
var DefaultBackoff Backoffer = BackoffPolicy{ []int{0, 10, 100, 200, 500, 1000, 2000, 3000, 5000}, }
DefaultBackoff See: http://blog.gopheracademy.com/advent-2014/backoff/
type BindingConfig ¶
type BindingConfig struct { Destination string `mapstructure:"destination" json:"destination" yaml:"destination"` Source string `mapstructure:"source" json:"source" yaml:"source"` RoutingKey string `mapstructure:"routing_key" json:"routing_key" yaml:"routing_key"` Options Options `mapstructure:"options" json:"options" yaml:"options"` }
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client is a Main AMQP client wrapper
func (*Client) Blocking ¶
Blocking notifies the server's TCP flow control of the Connection. Default buffer size is 10. Messages will be dropped in case if receiver can't keep up
func (*Client) Declare ¶
func (c *Client) Declare(d []Declaration)
Declare used to declare queues/exchanges/bindings. Declaration is saved and will be re-run every time Client gets connection
func (*Client) Errors ¶
Errors returns AMQP connection level errors. Default buffer size is 100. Messages will be dropped in case if receiver can't keep up
type ClientOpt ¶
type ClientOpt func(*Client)
func Backoff ¶
Backoff is a functional option, used to define backoff policy, used in `NewClient` constructor
func BlockingChan ¶
BlockingChan is a functional option, used to initialize blocking reporting channel in client code, maintaining control over buffering, used in `NewClient` constructor
func ErrorsChan ¶
ErrorsChan is a functional option, used to initialize error reporting channel in client code, maintaining control over buffer size. Default buffer size is 100. Messages will be dropped in case if receiver can't keep up, used in `NewClient` constructor
func RabbitConfig ¶
Config is a functional option, used to setup extended amqp configuration
type Config ¶
type Config struct { DSN *string `mapstructure:"dsn" json:"dsn" yaml:"dsn"` Host *string `mapstructure:"host" json:"host" yaml:"host"` Port *int `mapstructure:"port" json:"port" yaml:"port"` Username *string `mapstructure:"username" json:"username" yaml:"username"` Password *string `mapstructure:"password" json:"password" yaml:"password"` Vhost *string `mapstructure:"vhost" json:"vhost" yaml:"vhost"` SSL *bool `mapstructure:"ssl" json:"ssl" yaml:"ssl"` ReconnectDelay time.Duration `mapstructure:"reconnect_delay" json:"reconnect_delay" yaml:"reconnect_delay"` Exchanges Exchanges `mapstructure:"exchanges" json:"exchanges" yaml:"exchanges"` Queues Queues `mapstructure:"queues" json:"queues" yaml:"queues"` Producers Producers `mapstructure:"producers" json:"producers" yaml:"producers"` Consumers Consumers `mapstructure:"consumers" json:"consumers" yaml:"consumers"` Bindings Bindings `mapstructure:"bindings" json:"bindings" yaml:"bindings"` }
Config describes all available options for amqp connection creation.
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
Consumer holds definition for AMQP consumer
func NewConsumer ¶
func NewConsumer(queueName string, opts ...ConsumerOpt) *Consumer
NewConsumer Consumer's constructor
func (*Consumer) Cancel ¶
func (c *Consumer) Cancel()
Cancel this consumer.
This will CLOSE Deliveries() channel
func (*Consumer) Deliveries ¶
Deliveries return deliveries shipped to this consumer this channel never closed, even on disconnects
type ConsumerConfig ¶
type ConsumerConfig struct { Name string `mapstructure:"name" json:"name" yaml:"name"` Queue string `mapstructure:"queue" json:"queue" yaml:"queue"` Workers int `mapstructure:"workers" json:"workers" yaml:"workers"` Options Options `mapstructure:"options" json:"options" yaml:"options"` PrefetchCount int `mapstructure:"prefetch_count" json:"prefetch_count" yaml:"prefetch_count"` PrefetchSize int `mapstructure:"prefetch_size" json:"prefetch_size" yaml:"prefetch_size"` }
ConsumerConfig describes consumer's configuration.
type ConsumerOpt ¶
type ConsumerOpt func(*Consumer)
ConsumerOpt is a consumer's functional option type
func AutoTag ¶
func AutoTag() ConsumerOpt
AutoTag set automatically generated tag like this
fmt.Sprintf(QueueName+"-pid-%d@%s", os.Getpid(), os.Hostname())
type Consumers ¶
type Consumers []ConsumerConfig
Consumers describes configuration list for consumers.
type Declaration ¶
func DeclareExchange ¶
func DeclareExchange(e Exchange) Declaration
func DeclareExchangeBinding ¶
func DeclareExchangeBinding(eb ExchangeBinding) Declaration
func DeclareQueue ¶
func DeclareQueue(q Queue) Declaration
func DeclareQueueBinding ¶
func DeclareQueueBinding(b QueueBinding) Declaration
DeclareQueueBinding is a way to declare AMQP binding between AMQP queue and exchange
type Declarer ¶
type Declarer interface { QueueDeclare(name string, durable, autoDelete, exclusive, noWait bool, args amqp.Table) (amqp.Queue, error) ExchangeDeclare(name, kind string, durable, autoDelete, internal, noWait bool, args amqp.Table) error QueueBind(name, key, exchange string, noWait bool, args amqp.Table) error ExchangeBind(destination, key, source string, noWait bool, args amqp.Table) error }
Declarer is implemented by *amqp.Channel
type ExchangeBinding ¶
type ExchangeConfig ¶
type ExchangeConfig struct { Name string `mapstructure:"name" json:"name" yaml:"name"` Type string `mapstructure:"type" json:"type" yaml:"type"` Options Options `mapstructure:"options" json:"options" yaml:"options"` }
ExchangeConfig describes exchange's configuration.
type Exchanges ¶
type Exchanges []ExchangeConfig
Exchanges describes configuration list for exchanges.
type MQ ¶
type ProducerConfig ¶
type ProducerConfig struct { BufferSize int `mapstructure:"buffer_size" json:"buffer_size" yaml:"buffer_size"` Exchange string `mapstructure:"exchange" json:"exchange" yaml:"exchange"` Name string `mapstructure:"name" json:"name" yaml:"name"` RoutingKey string `mapstructure:"routing_key" json:"routing_key" yaml:"routing_key"` Options Options `mapstructure:"options" json:"options" yaml:"options"` }
ProducerConfig describes producer's configuration.
type Producers ¶
type Producers []ProducerConfig
Producers describes configuration list for publishers.
type Publisher ¶
type Publisher struct {
// contains filtered or unexported fields
}
Publisher hold definition for AMQP publishing
func NewPublisher ¶
func NewPublisher(exchange string, key string, opts ...PublisherOpt) *Publisher
NewPublisher is a Publisher constructor
func (*Publisher) Publish ¶
func (p *Publisher) Publish(pub amqp.Publishing) error
Publish used to publish custom amqp.Publishing
WARNING: this is blocking call, it will not return until connection is available. The only way to stop it is to use Cancel() method.
func (*Publisher) PublishWithRoutingKey ¶
func (p *Publisher) PublishWithRoutingKey(pub amqp.Publishing, key string) error
PublishWithRoutingKey used to publish custom amqp.Publishing and routing key
WARNING: this is blocking call, it will not return until connection is available. The only way to stop it is to use Cancel() method.
type PublisherOpt ¶
type PublisherOpt func(*Publisher)
PublisherOpt is a functional option type for Publisher
func PublishingTemplate ¶
func PublishingTemplate(t amqp.Publishing) PublisherOpt
PublishingTemplate Publisher's functional option. Provide template amqp.Publishing and save typing.
type Queue ¶
type Queue struct { Name string Durable bool AutoDelete bool Exclusive bool Args amqp.Table // contains filtered or unexported fields }
Queue hold definition of AMQP queue
type QueueBinding ¶
QueueBinding used to declare binding between AMQP Queue and AMQP Exchange
type QueueConfig ¶
type QueueConfig struct { Exchange string `mapstructure:"exchange" json:"exchange" yaml:"exchange"` Name string `mapstructure:"name" json:"name" yaml:"name"` RoutingKey string `mapstructure:"routing_key" json:"routing_key" yaml:"routing_key"` BindingOptions Options `mapstructure:"binding_options" json:"binding_options" yaml:"binding_options"` Options Options `mapstructure:"options" json:"options" yaml:"options"` }
QueueConfig describes queue's configuration.