go-mq: github.com/cheshir/go-mq Index | Files | Directories

package mq

import "github.com/cheshir/go-mq"

Package mq provides an ability to integrate with message broker via AMQP in a declarative way.


Package Files

async_producer.go config.go consumer.go consumer_registries.go internal_producer.go mq.go producer_registry.go sync_producer.go worker_status.go

type AsyncProducer Uses

type AsyncProducer interface {
    // Produce sends message to broker. Returns immediately.
    Produce(data []byte)

AsyncProducer describes available methods for producer. This kind of producer is asynchronous. All occurred errors will be accessible with MQ.Error().

type Config Uses

type Config struct {
    DSN            string        `mapstructure:"dsn" json:"dsn" yaml:"dsn"`
    ReconnectDelay time.Duration `mapstructure:"reconnect_delay" json:"reconnect_delay" yaml:"reconnect_delay"`
    TestMode       bool          `mapstructure:"test_mode" json:"test_mode" yaml:"test_mode"`
    Exchanges      Exchanges     `mapstructure:"exchanges" json:"exchanges" yaml:"exchanges"`
    Queues         Queues        `mapstructure:"queues" json:"queues" yaml:"queues"`
    Producers      Producers     `mapstructure:"producers" json:"producers" yaml:"producers"`
    Consumers      Consumers     `mapstructure:"consumers" json:"consumers" yaml:"consumers"`

Config describes all available options for amqp connection creation.

type Consumer Uses

type Consumer interface {
    // Consume runs consumer's workers with specified handler.
    Consume(handler ConsumerHandler)

Consumer describes available methods for consumer.

type ConsumerConfig Uses

type ConsumerConfig struct {
    Name          string  `mapstructure:"name" json:"name" yaml:"name"`
    Queue         string  `mapstructure:"queue" json:"queue" yaml:"queue"`
    Workers       int     `mapstructure:"workers" json:"workers" yaml:"workers"`
    Options       Options `mapstructure:"options" json:"options" yaml:"options"`
    PrefetchCount int     `mapstructure:"prefetch_count" json:"prefetch_count" yaml:"prefetch_count"`
    PrefetchSize  int     `mapstructure:"prefetch_size" json:"prefetch_size" yaml:"prefetch_size"`

ConsumerConfig describes consumer's configuration.

type ConsumerHandler Uses

type ConsumerHandler func(message Message)

ConsumerHandler describes handler function signature. It will be called for each obtained message.

type Consumers Uses

type Consumers []ConsumerConfig

Consumers describes configuration list for consumers.

type DeliveryMode Uses

type DeliveryMode int

DeliveryMode describes an AMQP message delivery mode.

const (
    NonPersistent DeliveryMode = 1
    Persistent                 = 2

List of available values for `delivery_mode` producer option.

type ExchangeConfig Uses

type ExchangeConfig struct {
    Name    string  `mapstructure:"name" json:"name" yaml:"name"`
    Type    string  `mapstructure:"type" json:"type" yaml:"type"`
    Options Options `mapstructure:"options" json:"options" yaml:"options"`

ExchangeConfig describes exchange's configuration.

type Exchanges Uses

type Exchanges []ExchangeConfig

Exchanges describes configuration list for exchanges.

type MQ Uses

type MQ interface {
    // Consumer returns consumer object by its name.
    Consumer(name string) (Consumer, error)
    // SetConsumerHandler allows you to set handler callback without getting consumer.
    SetConsumerHandler(name string, handler ConsumerHandler) error
    // AsyncProducer returns async producer. Should be used in most cases.
    AsyncProducer(name string) (AsyncProducer, error)
    // SyncProducer returns sync producer.
    SyncProducer(name string) (SyncProducer, error)
    // Error returns channel with all occurred errors.
    // Errors from sync producer won't be accessible.
    Error() <-chan error
    // Close stop all consumers and producers and close connection to broker.

MQ describes methods provided by message broker adapter.

func New Uses

func New(config Config) (MQ, error)

New initializes AMQP connection to the message broker and returns adapter that provides an ability to get configured consumers and producers, read occurred errors and shutdown all workers.

type Message Uses

type Message interface {
    Ack(multiple bool) error
    Nack(multiple, request bool) error
    Reject(requeue bool) error
    Body() []byte

Message describes available methods of the message obtained from queue.

type Options Uses

type Options map[string]interface{}

Options describes optional configuration.

type ProducerConfig Uses

type ProducerConfig struct {
    Sync       bool    `mapstructure:"sync" json:"sync" yaml:"sync"`
    BufferSize int     `mapstructure:"buffer_size" json:"buffer_size" yaml:"buffer_size"`
    Exchange   string  `mapstructure:"exchange" json:"exchange" yaml:"exchange"`
    Name       string  `mapstructure:"name" json:"name" yaml:"name"`
    RoutingKey string  `mapstructure:"routing_key" json:"routing_key" yaml:"routing_key"`
    Options    Options `mapstructure:"options" json:"options" yaml:"options"`

ProducerConfig describes producer's configuration.

type Producers Uses

type Producers []ProducerConfig

Producers describes configuration list for producers.

type QueueConfig Uses

type QueueConfig struct {
    Exchange       string  `mapstructure:"exchange" json:"exchange" yaml:"exchange"`
    Name           string  `mapstructure:"name" json:"name" yaml:"name"`
    RoutingKey     string  `mapstructure:"routing_key" json:"routing_key" yaml:"routing_key"`
    BindingOptions Options `mapstructure:"binding_options" json:"binding_options" yaml:"binding_options"`
    Options        Options `mapstructure:"options" json:"options" yaml:"options"`

QueueConfig describes queue's configuration.

type Queues Uses

type Queues []QueueConfig

Queues describes configuration list for queues.

type SyncProducer Uses

type SyncProducer interface {
    // Produce sends message to broker. Waits for result (ok, error).
    Produce(data []byte) error

SyncProducer describes available methods for synchronous producer.



Package mq imports 10 packages (graph) and is imported by 1 packages. Updated 2020-10-08. Refresh now. Tools for package owners.