rabbit

package
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 1, 2016 License: BSD-3-Clause Imports: 12 Imported by: 2

Documentation

Overview

Package rabbit provides the functions to create RabbitMQ consumers that listen to the queue specified in the config file, and forward the message body to worker scripts.

Index

Constants

View Source
const (
	DefaultPrefetchCount = 10
	DefaultRetryInterval = 30
)

Variables

This section is empty.

Functions

This section is empty.

Types

type Amqp added in v0.5.0

type Amqp struct {
	Chan AmqpChannelCloser
	Conn AmqpConnectionCloser
}

func (*Amqp) Channel added in v0.5.0

func (a *Amqp) Channel() (chn *amqp.Channel, err error)

func (*Amqp) Close added in v0.5.0

func (a *Amqp) Close() (err error)

func (*Amqp) Consume added in v0.5.0

func (a *Amqp) Consume(queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args amqp.Table) (<-chan amqp.Delivery, error)

func (*Amqp) Dial added in v0.5.0

func (a *Amqp) Dial(url string) (conn *amqp.Connection, err error)

func (*Amqp) ExchangeDeclare added in v0.5.0

func (a *Amqp) ExchangeDeclare(name, kind string, durable, autoDelete, internal, noWait bool, args amqp.Table) error

func (*Amqp) NotifyClose added in v0.5.0

func (a *Amqp) NotifyClose(c chan *amqp.Error) chan *amqp.Error

func (*Amqp) Qos added in v0.5.0

func (a *Amqp) Qos(prefetchCount, prefetchSize int, global bool) error

func (*Amqp) QueueBind added in v0.5.0

func (a *Amqp) QueueBind(name, key, exchange string, noWait bool, args amqp.Table) error

func (*Amqp) QueueDeclare added in v0.5.0

func (a *Amqp) QueueDeclare(name string, durable, autoDelete, exclusive, noWait bool, args amqp.Table) (amqp.Queue, error)

type AmqpChannel added in v0.5.0

type AmqpChannel interface {
	Qos(prefetchCount, prefetchSize int, global bool) error
	QueueDeclare(name string, durable, autoDelete, exclusive, noWait bool, args amqp.Table) (amqp.Queue, error)
	ExchangeDeclare(name, kind string, durable, autoDelete, internal, noWait bool, args amqp.Table) error
	QueueBind(name, key, exchange string, noWait bool, args amqp.Table) error
	NotifyClose(c chan *amqp.Error) chan *amqp.Error
	Consume(queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args amqp.Table) (<-chan amqp.Delivery, error)
}

type AmqpChannelCloser added in v0.5.0

type AmqpChannelCloser interface {
	AmqpChannel
	AmqpCloser
}

type AmqpCloser added in v0.5.0

type AmqpCloser interface {
	Close() error
}

type AmqpConnection added in v0.5.0

type AmqpConnection interface {
	Channel() (*amqp.Channel, error)
}

type AmqpConnectionCloser added in v0.5.0

type AmqpConnectionCloser interface {
	AmqpConnection
	AmqpCloser
}

type AmqpConsumer added in v0.5.0

type AmqpConsumer interface {
	Dial(url string) (*amqp.Connection, error)

	AmqpConnection
	AmqpChannel
	AmqpCloser
}

type Config

type Config struct {
	Connection struct {
		Host     string
		Username string
		Password string
		Vhost    string
		Port     int
	}
	Consumer struct {
		Exchange struct {
			Name       string
			Type       string
			Durable    bool
			AutoDelete bool `mapstructure:"auto_delete"`
		}
		Prefetch struct {
			Count  int
			Global bool
		}
		Queue struct {
			Name       string
			RoutingKey string `mapstructure:"routing_key"`
			Durable    bool
			AutoDelete bool `mapstructure:"auto_delete"`
		}
		Worker struct {
			Script        string
			Count         int
			RetryInterval int `mapstructure:"retry_interval"`
		}
	}
	Logger struct {
		Output    string
		Formatter string
		Level     string
		LogFile   string `mapstructure:"log_file"`
	}
}

Config defines the configuration for Rex. It helps transform the yml config to a Go struct, which will be used and referenced directly in Logger and Rex structs.

type Logger

type Logger struct {

	// Embedding logrus.Logger
	log.Logger
	// contains filtered or unexported fields
}

Logger inherits all the exported fields and method from logrus Logger. It's configurable, and can write log to both stdin and a file.

func NewLogger

func NewLogger(c *Config) (l *Logger, err error)

NewLogger returns a configured Logger instance. It will return a non nil error if anything goes wrong when configuring the Logger's output mode, formatter and log level.

func (*Logger) Close

func (l *Logger) Close()

Close closes the log file if Logger is configured to write to a file.

type MessageForwarder added in v0.5.0

type MessageForwarder func(msgs <-chan amqp.Delivery, retryInterval int)

type NotifyCloseCallback added in v0.5.0

type NotifyCloseCallback func()

type Rex

type Rex struct {
	Amqp    AmqpConsumer
	Config  *Config
	Logger  *Logger
	Script  ScriptCaller
	Forever chan bool
}

Rex represents the RabbitMQ consumer. It has methods connect to RabbitMQ, create exchange, queue, bind queue to exchange with routing key, listen to channel for incoming messages, forward message body to worker scripts, and ACK/NACK deliveries based on the worker script's return code.

func NewRex

func NewRex(c *Config, l *Logger, a AmqpConsumer, s ScriptCaller) (r *Rex, err error)

NewRex returns a configured Rex instance. It will return a non nil error if anything goes wrong when connecting to RabbitMQ, or creating queue and exchange.

func (*Rex) Close

func (r *Rex) Close()

Close closes RabbitMQ connection and channel. It also closes Logger.

func (*Rex) Consume

func (r *Rex) Consume() (err error)

Consume listens to RabbitMQ channel, forward each message's body to a worker script. Based on return code of the worker script, it acknowledge the delivery when the code is 0, and negatively acknowledge the delivery when the code is 1.

func (*Rex) NotifyClose added in v0.5.0

func (r *Rex) NotifyClose(fn NotifyCloseCallback)

NotifyClose registers a listener for when the server sends a channel or connection exception in the form of a Connection.Close or Channel.Close method. Connection exceptions will be broadcast to all open channels and all channels will be closed, where channel exceptions will only be broadcast to listeners to this channel.

type Script added in v0.5.0

type Script struct {
	Config *Config
}

func (Script) ExecWith added in v0.5.0

func (s Script) ExecWith(msg []byte) ([]byte, error)

type ScriptCaller added in v0.5.0

type ScriptCaller interface {
	ExecWith([]byte) ([]byte, error)
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL