Documentation ¶
Overview ¶
Package rabbit provides the functions to create RabbitMQ consumers that listen to the queue specified in the config file, and forward the message body to worker scripts.
Index ¶
- Constants
- type Amqp
- func (a *Amqp) Channel() (chn *amqp.Channel, err error)
- func (a *Amqp) Close() (err error)
- func (a *Amqp) Consume(queue, consumer string, autoAck, exclusive, noLocal, noWait bool, ...) (<-chan amqp.Delivery, error)
- func (a *Amqp) Dial(url string) (conn *amqp.Connection, err error)
- func (a *Amqp) ExchangeDeclare(name, kind string, durable, autoDelete, internal, noWait bool, args amqp.Table) error
- func (a *Amqp) NotifyClose(c chan *amqp.Error) chan *amqp.Error
- func (a *Amqp) Qos(prefetchCount, prefetchSize int, global bool) error
- func (a *Amqp) QueueBind(name, key, exchange string, noWait bool, args amqp.Table) error
- func (a *Amqp) QueueDeclare(name string, durable, autoDelete, exclusive, noWait bool, args amqp.Table) (amqp.Queue, error)
- type AmqpChannel
- type AmqpChannelCloser
- type AmqpCloser
- type AmqpConnection
- type AmqpConnectionCloser
- type AmqpConsumer
- type Config
- type Logger
- type MessageForwarder
- type NotifyCloseCallback
- type Rex
- type Script
- type ScriptCaller
Constants ¶
const ( DefaultPrefetchCount = 10 DefaultRetryInterval = 30 )
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Amqp ¶ added in v0.5.0
type Amqp struct { Chan AmqpChannelCloser Conn AmqpConnectionCloser }
func (*Amqp) Dial ¶ added in v0.5.0
func (a *Amqp) Dial(url string) (conn *amqp.Connection, err error)
func (*Amqp) ExchangeDeclare ¶ added in v0.5.0
func (*Amqp) NotifyClose ¶ added in v0.5.0
type AmqpChannel ¶ added in v0.5.0
type AmqpChannel interface { Qos(prefetchCount, prefetchSize int, global bool) error QueueDeclare(name string, durable, autoDelete, exclusive, noWait bool, args amqp.Table) (amqp.Queue, error) ExchangeDeclare(name, kind string, durable, autoDelete, internal, noWait bool, args amqp.Table) error QueueBind(name, key, exchange string, noWait bool, args amqp.Table) error NotifyClose(c chan *amqp.Error) chan *amqp.Error Consume(queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args amqp.Table) (<-chan amqp.Delivery, error) }
type AmqpChannelCloser ¶ added in v0.5.0
type AmqpChannelCloser interface { AmqpChannel AmqpCloser }
type AmqpCloser ¶ added in v0.5.0
type AmqpCloser interface {
Close() error
}
type AmqpConnection ¶ added in v0.5.0
type AmqpConnectionCloser ¶ added in v0.5.0
type AmqpConnectionCloser interface { AmqpConnection AmqpCloser }
type AmqpConsumer ¶ added in v0.5.0
type AmqpConsumer interface { Dial(url string) (*amqp.Connection, error) AmqpConnection AmqpChannel AmqpCloser }
type Config ¶
type Config struct { Connection struct { Host string Username string Password string Vhost string Port int } Consumer struct { Exchange struct { Name string Type string Durable bool AutoDelete bool `mapstructure:"auto_delete"` } Prefetch struct { Count int Global bool } Queue struct { Name string RoutingKey string `mapstructure:"routing_key"` Durable bool AutoDelete bool `mapstructure:"auto_delete"` } Worker struct { Script string Count int RetryInterval int `mapstructure:"retry_interval"` } } Logger struct { Output string Formatter string Level string LogFile string `mapstructure:"log_file"` } }
Config defines the configuration for Rex. It helps transform the yml config to a Go struct, which will be used and referenced directly in Logger and Rex structs.
type Logger ¶
type Logger struct { // Embedding logrus.Logger log.Logger // contains filtered or unexported fields }
Logger inherits all the exported fields and method from logrus Logger. It's configurable, and can write log to both stdin and a file.
type MessageForwarder ¶ added in v0.5.0
type NotifyCloseCallback ¶ added in v0.5.0
type NotifyCloseCallback func()
type Rex ¶
type Rex struct { Amqp AmqpConsumer Config *Config Logger *Logger Script ScriptCaller Forever chan bool }
Rex represents the RabbitMQ consumer. It has methods connect to RabbitMQ, create exchange, queue, bind queue to exchange with routing key, listen to channel for incoming messages, forward message body to worker scripts, and ACK/NACK deliveries based on the worker script's return code.
func NewRex ¶
func NewRex(c *Config, l *Logger, a AmqpConsumer, s ScriptCaller) (r *Rex, err error)
NewRex returns a configured Rex instance. It will return a non nil error if anything goes wrong when connecting to RabbitMQ, or creating queue and exchange.
func (*Rex) Close ¶
func (r *Rex) Close()
Close closes RabbitMQ connection and channel. It also closes Logger.
func (*Rex) Consume ¶
Consume listens to RabbitMQ channel, forward each message's body to a worker script. Based on return code of the worker script, it acknowledge the delivery when the code is 0, and negatively acknowledge the delivery when the code is 1.
func (*Rex) NotifyClose ¶ added in v0.5.0
func (r *Rex) NotifyClose(fn NotifyCloseCallback)
NotifyClose registers a listener for when the server sends a channel or connection exception in the form of a Connection.Close or Channel.Close method. Connection exceptions will be broadcast to all open channels and all channels will be closed, where channel exceptions will only be broadcast to listeners to this channel.