rmq

package module
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 16, 2018 License: MIT Imports: 9 Imported by: 0

README

Build Status

Overview

A simple wrapper to streadway/amqp for RabbitMQ with support for auto reconnections.

Usage

The library maintains a single connection and channel. It also maintains a map of bindings of exchanges and queues added by the user. Each binding can be configured to be a producer, a consumer, or both.

First, create the connection object with:

// replace the values below with your own
c := rmq.New(&rmq.Config{
		Host:     "localhost",
		Port:     5672,
		Username: "guest",
		Password: "guest",
		Vhost:    "/",
})

err := c.Connect()
...

Next, we will create a binding for a direct-type exchange test-exchange and queue qtest with:

bindId, err := c.AddBinding(&rmq.BindConfig{
		ExchangeOpt: &rmq.ExchangeOptions{
			Name:       "test-exchange",
			Type:       "direct",
			Durable:    false,
			AutoDelete: true,
		},
		QueueOpt: &rmq.QueueOptions{
			QueueName:  "qtest",
			Durable:    false,
			AutoDelete: true,
		},
		QueueBindOpt: &rmq.QueueBindOptions{
			RoutingKey: "rk1",
		},
		// when `ConsumeOpt` is provided, this binding is able to receive
		// messages from the specified exchange/queue
		ConsumeOpt: &rmq.ConsumeOptions{
			ClientTag:  "consumer1",
			FnCallback: func (b []byte) error {
				log.Printf("payload: %s", b)
				return nil
			},
		},
})

// send a message using the binding above
c.Send(bindId, "rk1", []byte("hello world"))

You can also create a send-only binding with:

bindId, err := c.AddBinding(&rmq.BindConfig{
		ExchangeOpt: &rmq.ExchangeOptions{
			Name:       "test-exchange",
			Type:       "direct",
			Durable:    false,
			AutoDelete: true,
		},
		QueueOpt: &rmq.QueueOptions{
			QueueName:  "queue1",
			Durable:    false,
			AutoDelete: true,
		},
		QueueBindOpt: &rmq.QueueBindOptions{
			RoutingKey: "rk1",
		},
})

c.Send(bindId, "rk1", []byte("hello world"))

See the examples directory for a simple receiver/sender implementation.

License

The MIT License

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type BindConfig

type BindConfig struct {
	ExchangeOpt  *ExchangeOptions
	QueueOpt     *QueueOptions
	QueueBindOpt *QueueBindOptions
	ConsumeOpt   *ConsumeOptions
}

type Config

type Config struct {
	Host        string
	Port        int
	Username    string
	Password    string
	Vhost       string
	AutoConnect bool
}

type ConsumeOptions

type ConsumeOptions struct {
	ClientTag  string
	NoAck      bool
	Exclusive  bool
	NoWait     bool
	Args       amqp.Table
	FnCallback func([]byte) error
}

type ExchangeOptions

type ExchangeOptions struct {
	Name       string
	Type       string
	Durable    bool
	AutoDelete bool
	Internal   bool
	NoWait     bool
	Args       amqp.Table
}

type QueueBindOptions

type QueueBindOptions struct {
	RoutingKey string
	NoWait     bool
	Args       amqp.Table
}

type QueueOptions

type QueueOptions struct {
	QueueName  string
	Durable    bool // will be set to false when queue name is empty (autogenerated name)
	AutoDelete bool
	Exclusive  bool
	NoWait     bool
	Args       amqp.Table
}

type RabbitMqBroker

type RabbitMqBroker struct {
	// contains filtered or unexported fields
}

func New

func New(c *Config) *RabbitMqBroker

func (*RabbitMqBroker) AddBinding

func (b *RabbitMqBroker) AddBinding(bc *BindConfig) (string, error)

func (*RabbitMqBroker) Close

func (b *RabbitMqBroker) Close()

func (*RabbitMqBroker) Connect

func (b *RabbitMqBroker) Connect() error

func (*RabbitMqBroker) Send

func (b *RabbitMqBroker) Send(id, key string, payload []byte) error

func (*RabbitMqBroker) SendWithConfig

func (b *RabbitMqBroker) SendWithConfig(id, key string, sc SendConfig) error

type SendConfig

type SendConfig struct {
	Mandatory   bool
	Immediate   bool
	PublishConf *amqp.Publishing
}

Directories

Path Synopsis
examples

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL