Documentation ¶
Overview ¶
AMQP driver implementation
Index ¶
- Constants
- func Drivers() []string
- func GetBool(values url.Values, key string, def bool) bool
- func GetInt(values url.Values, key string, def int) int
- func GetString(values url.Values, key string, def string) string
- func NewAMQPURLSettings(values url.Values) *amqpURLSettings
- func Register(name string, driver MessageQueueDriverer)
- type AMQPDSN
- type AMQPDriver
- type MessageQueueDriverer
- type MessageQueuer
- type QueueMessage
- type SubscribeMessage
Constants ¶
View Source
const ( RELIABLE_URL_PARAM = "reliable" AUTO_ACK_URL_PARAM = "auto_ack" EXCHANGE_URL_PARAM = "exchange" DEF_AMQP_RELIABLE = true DEF_AMQP_AUTO_ACK = false DEF_AMQP_PREFIX = "errgoq" DEF_EXCHANGE = "errgo" )
View Source
const ( URL_PARAM_NAME_AUTO_ACK = "auto_ack" URL_PARAM_PREFIX = "prefix" // default values DEFAULT_AUTO_ACK = false DEFAULT_PREFIX = "errgoq" )
View Source
const ( // default values for redisMessageQueue MAX_IDLE = 10 MAX_ACTIVE = 10 IDLE_TIMEOUT = 500 RETRY_NON_ACKED_TIMEOUT = 600 AUTO_ACK = false REQUEUE_NON_ACKED_NUM = 10 )
View Source
const ( RETRY_QUEUE = "ergoq-retry:" MESSAGE_COUNTER = "ergoq-counter:" )
Variables ¶
This section is empty.
Functions ¶
func NewAMQPURLSettings ¶
func Register ¶
func Register(name string, driver MessageQueueDriverer)
Register makes a message queue driver available by the provided name. If Register is called twice with the same name or if driver is nil, it panics.
Types ¶
type AMQPDriver ¶
type AMQPDriver struct{}
func (*AMQPDriver) OpenConnection ¶
func (a *AMQPDriver) OpenConnection(connection interface{}, settings string) (MessageQueuer, error)
open by already instantiated connection
type MessageQueueDriverer ¶
type MessageQueueDriverer interface { // "opens" message queuer Open(dsn string) (MessageQueuer, error) // Opens queuer by connection // settings is url encoded params e.g. "auto_ack=true&exchange=exchange" OpenConnection(connection interface{}, settings string) (MessageQueuer, error) }
type MessageQueuer ¶
type MessageQueuer interface { // Pushes message to queue Push(queue string, message []byte) error // Pops message from queue Pop(queue string) (QueueMessage, error) // Publishes message to queue(fanout for all subscribers) Publish(queue string, message []byte) error // Subscribes to queue(s) Subscribe(quit <-chan struct{}, queues ...string) (chan SubscribeMessage, chan error) }
func OpenConnection ¶
func OpenConnection(driverName string, connection interface{}, settings ...string) (MessageQueuer, error)
opens message queue by name and connection
type QueueMessage ¶
type QueueMessage interface { // queue Queue() string // returns contents of message Message() []byte // Acknowledges message Ack() error // returns id of message Id() string }
QueueMessage interface
type SubscribeMessage ¶
type SubscribeMessage interface { // returns queue where was message published Queue() string // returns content of message Message() []byte }
SubscribeMessage interface
func NewSubscriberMessage ¶
func NewSubscriberMessage(queue string, message []byte) SubscribeMessage
Click to show internal directories.
Click to hide internal directories.