rabbit

package module
v0.0.0-...-d6e29b8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 23, 2018 License: MIT Imports: 9 Imported by: 0

README

rabbit

Go rabbit mq library supporting fabric setup, publishers and consumers

Documentation

Overview

Package mq provides an ability to integrate with message broker via AMQP in a declarative way.

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrNoConnection = errors.New("no connection available")
)
View Source
var ErrPublisherDead = errors.New("publisher is dead")

ErrPublisherDead indicates that publisher was canceled, could be returned from Write() and Publish() methods

Functions

This section is empty.

Types

type BackoffPolicy

type BackoffPolicy struct {
	// contains filtered or unexported fields
}

BackoffPolicy is a default Backoffer implementation

func (BackoffPolicy) Backoff

func (b BackoffPolicy) Backoff(n int) time.Duration

Backoff implements Backoffer

type Backoffer

type Backoffer interface {
	Backoff(int) time.Duration
}

Backoffer is interface to hold Backoff strategy

var DefaultBackoff Backoffer = BackoffPolicy{
	[]int{0, 10, 100, 200, 500, 1000, 2000, 3000, 5000},
}

DefaultBackoff See: http://blog.gopheracademy.com/advent-2014/backoff/

type BindingConfig

type BindingConfig struct {
	Destination string  `mapstructure:"destination" json:"destination" yaml:"destination"`
	Source      string  `mapstructure:"source" json:"source" yaml:"source"`
	RoutingKey  string  `mapstructure:"routing_key" json:"routing_key" yaml:"routing_key"`
	Options     Options `mapstructure:"options" json:"options" yaml:"options"`
}

type Bindings

type Bindings []BindingConfig

Bindings describes configuration list for bindings.

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client is a Main AMQP client wrapper

func NewClient

func NewClient(opts ...ClientOpt) *Client

NewClient initializes new Client

func (*Client) Blocking

func (c *Client) Blocking() <-chan amqp.Blocking

Blocking notifies the server's TCP flow control of the Connection. Default buffer size is 10. Messages will be dropped in case if receiver can't keep up

func (*Client) Close

func (c *Client) Close()

Close shutdown the client

func (*Client) Consume

func (c *Client) Consume(cons *Consumer)

Consume used to declare consumers

func (*Client) Declare

func (c *Client) Declare(d []Declaration)

Declare used to declare queues/exchanges/bindings. Declaration is saved and will be re-run every time Client gets connection

func (*Client) Errors

func (c *Client) Errors() <-chan error

Errors returns AMQP connection level errors. Default buffer size is 100. Messages will be dropped in case if receiver can't keep up

func (*Client) Loop

func (c *Client) Loop() bool

Loop should be run as condition for `for` with receiving from (*Client).Errors()

It will manage AMQP connection, run queue and exchange declarations, consumers. Will start to return false once (*Client).Close() called.

func (*Client) Publish

func (c *Client) Publish(pub *Publisher)

Publish used to declare publishers

type ClientOpt

type ClientOpt func(*Client)

func Backoff

func Backoff(bo Backoffer) ClientOpt

Backoff is a functional option, used to define backoff policy, used in `NewClient` constructor

func BlockingChan

func BlockingChan(blockingChan chan amqp.Blocking) ClientOpt

BlockingChan is a functional option, used to initialize blocking reporting channel in client code, maintaining control over buffering, used in `NewClient` constructor

func ErrorsChan

func ErrorsChan(errChan chan error) ClientOpt

ErrorsChan is a functional option, used to initialize error reporting channel in client code, maintaining control over buffer size. Default buffer size is 100. Messages will be dropped in case if receiver can't keep up, used in `NewClient` constructor

func RabbitConfig

func RabbitConfig(config amqp.Config) ClientOpt

Config is a functional option, used to setup extended amqp configuration

func URL

func URL(addr string) ClientOpt

URL is a functional option, used in `NewClient` constructor default URL is amqp://guest:guest@localhost/

type Config

type Config struct {
	DSN            *string       `mapstructure:"dsn" json:"dsn" yaml:"dsn"`
	Host           *string       `mapstructure:"host" json:"host" yaml:"host"`
	Port           *int          `mapstructure:"port" json:"port" yaml:"port"`
	Username       *string       `mapstructure:"username" json:"username" yaml:"username"`
	Password       *string       `mapstructure:"password" json:"password" yaml:"password"`
	Vhost          *string       `mapstructure:"vhost" json:"vhost" yaml:"vhost"`
	SSL            *bool         `mapstructure:"ssl" json:"ssl" yaml:"ssl"`
	ReconnectDelay time.Duration `mapstructure:"reconnect_delay" json:"reconnect_delay" yaml:"reconnect_delay"`
	Exchanges      Exchanges     `mapstructure:"exchanges" json:"exchanges" yaml:"exchanges"`
	Queues         Queues        `mapstructure:"queues" json:"queues" yaml:"queues"`
	Producers      Producers     `mapstructure:"producers" json:"producers" yaml:"producers"`
	Consumers      Consumers     `mapstructure:"consumers" json:"consumers" yaml:"consumers"`
	Bindings       Bindings      `mapstructure:"bindings" json:"bindings" yaml:"bindings"`
}

Config describes all available options for amqp connection creation.

func (Config) GetDSN

func (config Config) GetDSN() string

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

Consumer holds definition for AMQP consumer

func NewConsumer

func NewConsumer(queueName string, opts ...ConsumerOpt) *Consumer

NewConsumer Consumer's constructor

func (*Consumer) Cancel

func (c *Consumer) Cancel()

Cancel this consumer.

This will CLOSE Deliveries() channel

func (*Consumer) Consume

func (c *Consumer) Consume(ctx context.Context, handler ConsumerHandler)

func (*Consumer) Deliveries

func (c *Consumer) Deliveries() <-chan amqp.Delivery

Deliveries return deliveries shipped to this consumer this channel never closed, even on disconnects

func (*Consumer) Errors

func (c *Consumer) Errors() <-chan error

Errors returns channel with AMQP channel level errors

func (*Consumer) Run

func (c *Consumer) Run(ctx context.Context, handler ConsumerHandler)

type ConsumerConfig

type ConsumerConfig struct {
	Name          string  `mapstructure:"name" json:"name" yaml:"name"`
	Queue         string  `mapstructure:"queue" json:"queue" yaml:"queue"`
	Workers       int     `mapstructure:"workers" json:"workers" yaml:"workers"`
	Options       Options `mapstructure:"options" json:"options" yaml:"options"`
	PrefetchCount int     `mapstructure:"prefetch_count" json:"prefetch_count" yaml:"prefetch_count"`
	PrefetchSize  int     `mapstructure:"prefetch_size" json:"prefetch_size" yaml:"prefetch_size"`
}

ConsumerConfig describes consumer's configuration.

type ConsumerHandler

type ConsumerHandler func(ctx context.Context, delivery amqp.Delivery)

type ConsumerOpt

type ConsumerOpt func(*Consumer)

ConsumerOpt is a consumer's functional option type

func AutoAck

func AutoAck() ConsumerOpt

AutoAck set this consumer in AutoAck mode

func AutoTag

func AutoTag() ConsumerOpt

AutoTag set automatically generated tag like this

fmt.Sprintf(QueueName+"-pid-%d@%s", os.Getpid(), os.Hostname())

func Exclusive

func Exclusive() ConsumerOpt

Exclusive set this consumer in exclusive mode

func NoLocal

func NoLocal() ConsumerOpt

NoLocal set this consumer in NoLocal mode.

func Qos

func Qos(count int) ConsumerOpt

Qos on channel

func Tag

func Tag(tag string) ConsumerOpt

Tag the consumer

type Consumers

type Consumers []ConsumerConfig

Consumers describes configuration list for consumers.

type Declaration

type Declaration func(Declarer) error

func DeclareExchange

func DeclareExchange(e Exchange) Declaration

func DeclareExchangeBinding

func DeclareExchangeBinding(eb ExchangeBinding) Declaration

func DeclareQueue

func DeclareQueue(q Queue) Declaration

func DeclareQueueBinding

func DeclareQueueBinding(b QueueBinding) Declaration

DeclareQueueBinding is a way to declare AMQP binding between AMQP queue and exchange

type Declarer

type Declarer interface {
	QueueDeclare(name string, durable, autoDelete, exclusive, noWait bool, args amqp.Table) (amqp.Queue, error)
	ExchangeDeclare(name, kind string, durable, autoDelete, internal, noWait bool, args amqp.Table) error
	QueueBind(name, key, exchange string, noWait bool, args amqp.Table) error
	ExchangeBind(destination, key, source string, noWait bool, args amqp.Table) error
}

Declarer is implemented by *amqp.Channel

type Exchange

type Exchange struct {
	Name       string
	Kind       string
	Durable    bool
	AutoDelete bool
	Args       amqp.Table
}

Exchange hold definition of AMQP exchange

type ExchangeBinding

type ExchangeBinding struct {
	Destination string
	Key         string
	Source      string
	Args        amqp.Table
}

type ExchangeConfig

type ExchangeConfig struct {
	Name    string  `mapstructure:"name" json:"name" yaml:"name"`
	Type    string  `mapstructure:"type" json:"type" yaml:"type"`
	Options Options `mapstructure:"options" json:"options" yaml:"options"`
}

ExchangeConfig describes exchange's configuration.

type Exchanges

type Exchanges []ExchangeConfig

Exchanges describes configuration list for exchanges.

type MQ

type MQ interface {
	GetConsumer(name string) (*Consumer, error)
	SetConsumerHandler(name string, ctx context.Context, handler ConsumerHandler) error
	GetPublisher(name string) (*Publisher, error)
	Errors() <-chan error
	Close()
}

func New

func New(config Config) (MQ, error)

New configures AMQP

type Options

type Options map[string]interface{}

Options describes optional configuration.

type ProducerConfig

type ProducerConfig struct {
	BufferSize int     `mapstructure:"buffer_size" json:"buffer_size" yaml:"buffer_size"`
	Exchange   string  `mapstructure:"exchange" json:"exchange" yaml:"exchange"`
	Name       string  `mapstructure:"name" json:"name" yaml:"name"`
	RoutingKey string  `mapstructure:"routing_key" json:"routing_key" yaml:"routing_key"`
	Options    Options `mapstructure:"options" json:"options" yaml:"options"`
}

ProducerConfig describes producer's configuration.

type Producers

type Producers []ProducerConfig

Producers describes configuration list for publishers.

type Publisher

type Publisher struct {
	// contains filtered or unexported fields
}

Publisher hold definition for AMQP publishing

func NewPublisher

func NewPublisher(exchange string, key string, opts ...PublisherOpt) *Publisher

NewPublisher is a Publisher constructor

func (*Publisher) Cancel

func (p *Publisher) Cancel()

Cancel this publisher

func (*Publisher) Publish

func (p *Publisher) Publish(pub amqp.Publishing) error

Publish used to publish custom amqp.Publishing

WARNING: this is blocking call, it will not return until connection is available. The only way to stop it is to use Cancel() method.

func (*Publisher) PublishWithRoutingKey

func (p *Publisher) PublishWithRoutingKey(pub amqp.Publishing, key string) error

PublishWithRoutingKey used to publish custom amqp.Publishing and routing key

WARNING: this is blocking call, it will not return until connection is available. The only way to stop it is to use Cancel() method.

func (*Publisher) Write

func (p *Publisher) Write(b []byte) (int, error)

Template will be used, input buffer will be added as Publishing.Body. return int will always be len(b)

Implements io.Writer

WARNING: this is blocking call, it will not return until connection is available. The only way to stop it is to use Cancel() method.

type PublisherOpt

type PublisherOpt func(*Publisher)

PublisherOpt is a functional option type for Publisher

func PublishingTemplate

func PublishingTemplate(t amqp.Publishing) PublisherOpt

PublishingTemplate Publisher's functional option. Provide template amqp.Publishing and save typing.

type Queue

type Queue struct {
	Name       string
	Durable    bool
	AutoDelete bool
	Exclusive  bool
	Args       amqp.Table
	// contains filtered or unexported fields
}

Queue hold definition of AMQP queue

type QueueBinding

type QueueBinding struct {
	QueueName    string
	ExchangeName string
	Key          string
	Args         amqp.Table
}

QueueBinding used to declare binding between AMQP Queue and AMQP Exchange

type QueueConfig

type QueueConfig struct {
	Exchange       string  `mapstructure:"exchange" json:"exchange" yaml:"exchange"`
	Name           string  `mapstructure:"name" json:"name" yaml:"name"`
	RoutingKey     string  `mapstructure:"routing_key" json:"routing_key" yaml:"routing_key"`
	BindingOptions Options `mapstructure:"binding_options" json:"binding_options" yaml:"binding_options"`
	Options        Options `mapstructure:"options" json:"options" yaml:"options"`
}

QueueConfig describes queue's configuration.

type Queues

type Queues []QueueConfig

Queues describes configuration list for queues.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL