Documentation ¶
Index ¶
- type AMQPNotified
- type ChannelKeeper
- type Config
- type Connector
- func (c *Connector) AddAMQPNotifiedListener(h func(n AMQPNotified))
- func (c *Connector) AddDialedListener(h func(r Dialed))
- func (c *Connector) AddRetriedListener(h func(Retried))
- func (c *Connector) Channel(ctx context.Context) (*amqp.Channel, error)
- func (c *Connector) Dial(ctx context.Context, url string) error
- func (c *Connector) DialConfig(ctx context.Context, url string, config amqp.Config) error
- func (c *Connector) StartConsumer(ctx context.Context, consumer Consumer) error
- func (c *Connector) StartMultipleConsumers(ctx context.Context, consumer Consumer, count int) error
- type Consumer
- type Dialed
- type EnsurePublisher
- type FireForgetPublisher
- type LightningPool
- type Pool
- type Publisher
- type Retried
- type RetryPublisher
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type AMQPNotified ¶
AMQPNotified is fired when AMQP error occurred.
type ChannelKeeper ¶
type ChannelKeeper struct {
// contains filtered or unexported fields
}
ChannelKeeper stores AMQP Channel with Confirmation and Close chans.
func (*ChannelKeeper) Channel ¶
func (k *ChannelKeeper) Channel() *amqp.Channel
Channel returns an amqp.Channel stored in ChannelKeeper.
func (*ChannelKeeper) Close ¶
func (k *ChannelKeeper) Close() error
Close closes RabbitMQ channel stored in ChannelKeeper.
func (*ChannelKeeper) Confirm ¶
func (k *ChannelKeeper) Confirm() <-chan amqp.Confirmation
Confirm returns a channel that will receive amqp.Confirmation when it occurs.
func (*ChannelKeeper) Error ¶
func (k *ChannelKeeper) Error() <-chan *amqp.Error
Error returns a channel that will receive amqp.Error when it occurs.
type Config ¶
type Config struct { // Max reconnect attempts. ReconnectAttempts uint // How long to wait between reconnect. Wait time.Duration }
Config stores reconnect options.
type Connector ¶
type Connector struct {
// contains filtered or unexported fields
}
Connector implement RabbitMQ failover.
func NewConnector ¶
NewConnector return a new instance of Connector.
func (*Connector) AddAMQPNotifiedListener ¶
func (c *Connector) AddAMQPNotifiedListener(h func(n AMQPNotified))
AddAMQPNotifiedListener registers a event listener of AMQP error receiving.
NOTE: not concurrency-safe.
func (*Connector) AddDialedListener ¶
AddDialedListener registers a event listener of connection successfully established.
NOTE: not concurrency-safe.
func (*Connector) AddRetriedListener ¶
AddRetriedListener registers a event listener of connection establishing attempts.
NOTE: not concurrency-safe.
func (*Connector) Channel ¶
Channel allocate and return new amqp.Channel. On error new Channel should be opened.
NOTE: It's blocking method. (It's waiting before connection will be established)
func (*Connector) Dial ¶ added in v0.2.0
Dial will try to keep RabbitMQ connection active by catching and handling connection errors. It will return any error only if ctx was done.
NOTE: It's blocking method.
func (*Connector) DialConfig ¶ added in v0.2.0
DialConfig used to configure RabbitMQ connection with amqp.Config. It will try to keep RabbitMQ connection active by catching and handling connection errors. It will return any error only if ctx was done.
NOTE: It's blocking method.
func (*Connector) StartConsumer ¶
StartConsumer is used to start Consumer.
NOTE: It's blocking method.
func (*Connector) StartMultipleConsumers ¶
StartMultipleConsumers is used to start Consumer "count" times. Method Declare will be called once, and Consume will be called "count" times (one goroutine per call) so you can scale consumer horizontally. It's blocking method.
NOTE: It's blocking method. nolint: gocyclo
type Consumer ¶
type Consumer interface { // Declare used to declare required RabbitMQ entities. // Will be called once before Consume (even when StartMultipleConsumers called). // On any problems with connection or channel RabbitMQ entities will be redeclared. Declare(ctx context.Context, ch *amqp.Channel) error // Consume used to consuming RabbitMQ queue. // Can be called 1+ times if you register it with StartMultipleConsumers. Consume(ctx context.Context, ch *amqp.Channel) error }
Consumer interface provides functionality of rabbit entity Declaring and queue consuming.
Example ¶
This example demonstrates consuming messages from RabbitMQ queue.
package main import ( "context" "fmt" "log" "os" "os/signal" "syscall" "time" "github.com/furdarius/rabbitroutine" "github.com/streadway/amqp" ) // Consumer implement rabbitroutine.Consumer interface. type Consumer struct { ExchangeName string QueueName string } // Declare implement rabbitroutine.Consumer.(Declare) interface method. func (c *Consumer) Declare(ctx context.Context, ch *amqp.Channel) error { err := ch.ExchangeDeclare( c.ExchangeName, // name "direct", // type true, // durable false, // auto-deleted false, // internal false, // no-wait nil, // arguments ) if err != nil { log.Printf("failed to declare exchange %v: %v", c.ExchangeName, err) return err } _, err = ch.QueueDeclare( c.QueueName, // name true, // durable false, // delete when unused false, // exclusive false, // no-wait nil, // arguments ) if err != nil { log.Printf("failed to declare queue %v: %v", c.QueueName, err) return err } err = ch.QueueBind( c.QueueName, // queue name c.QueueName, // routing key c.ExchangeName, // exchange false, // no-wait nil, // arguments ) if err != nil { log.Printf("failed to bind queue %v: %v", c.QueueName, err) return err } return nil } // Consume implement rabbitroutine.Consumer.(Consume) interface method. func (c *Consumer) Consume(ctx context.Context, ch *amqp.Channel) error { defer log.Println("consume method finished") err := ch.Qos( 1, // prefetch count 0, // prefetch size false, // global ) if err != nil { log.Printf("failed to set qos: %v", err) return err } msgs, err := ch.Consume( c.QueueName, // queue "myconsumer", // consumer name false, // auto-ack false, // exclusive false, // no-local false, // no-wait nil, // args ) if err != nil { log.Printf("failed to consume %v: %v", c.QueueName, err) return err } for { select { case msg, ok := <-msgs: if !ok { return amqp.ErrClosed } content := string(msg.Body) fmt.Println("New message:", content) err := msg.Ack(false) if err != nil { log.Printf("failed to Ack message: %v", err) } case <-ctx.Done(): return ctx.Err() } } } // This example demonstrates consuming messages from RabbitMQ queue. func main() { ctx := context.Background() url := "amqp://guest:guest@127.0.0.1:5672/" conn := rabbitroutine.NewConnector(rabbitroutine.Config{ // Max reconnect attempts ReconnectAttempts: 20, // How long wait between reconnect Wait: 2 * time.Second, }) conn.AddRetriedListener(func(r rabbitroutine.Retried) { log.Printf("try to connect to RabbitMQ: attempt=%d, error=\"%v\"", r.ReconnectAttempt, r.Error) }) conn.AddDialedListener(func(_ rabbitroutine.Dialed) { log.Printf("RabbitMQ connection successfully established") }) conn.AddAMQPNotifiedListener(func(n rabbitroutine.AMQPNotified) { log.Printf("RabbitMQ error received: %v", n.Error) }) consumer := &Consumer{ ExchangeName: "myexch", QueueName: "myqueue", } go func() { err := conn.Dial(ctx, url) if err != nil { log.Println("failed to establish RabbitMQ connection:", err) } }() go func() { err := conn.StartMultipleConsumers(ctx, consumer, 5) if err != nil { log.Println("failed to start consumer:", err) } }() sigc := make(chan os.Signal, 1) signal.Notify(sigc, syscall.SIGINT, os.Interrupt, syscall.SIGTERM) // Wait for OS termination signal <-sigc }
Output:
type EnsurePublisher ¶
type EnsurePublisher struct {
// contains filtered or unexported fields
}
EnsurePublisher implement Publisher interface and used to publish messages to RabbitMQ exchange. It will block until is either message is successfully delivered, context has cancelled or error received. When EnsurePublisher used, publishing confirmation is enabled, so we have delivery guarantees. @see http://www.rabbitmq.com/blog/2011/02/10/introducing-publisher-confirms/
Example ¶
This example demonstrates publishing messages in RabbitMQ exchange delivery guarantees by EnsurePublisher and publishing retries by RetryPublisher.
package main import ( "context" "fmt" "log" "time" "github.com/furdarius/rabbitroutine" "github.com/streadway/amqp" ) func main() { ctx := context.Background() url := "amqp://guest:guest@127.0.0.1:5672/" conn := rabbitroutine.NewConnector(rabbitroutine.Config{ // Max reconnect attempts ReconnectAttempts: 20000, // How long wait between reconnect Wait: 2 * time.Second, }) pool := rabbitroutine.NewPool(conn) ensurePub := rabbitroutine.NewEnsurePublisher(pool) pub := rabbitroutine.NewRetryPublisher(ensurePub) go func() { err := conn.Dial(ctx, url) if err != nil { log.Println("failed to establish RabbitMQ connection:", err) } }() for i := 0; i < 5000; i++ { timeoutCtx, cancel := context.WithTimeout(ctx, 100*time.Millisecond) err := pub.Publish(timeoutCtx, "myexch", "myqueue", amqp.Publishing{ Body: []byte(fmt.Sprintf("message %d", i)), }) if err != nil { log.Println("failed to publish:", err) } cancel() } }
Output:
func NewEnsurePublisher ¶
func NewEnsurePublisher(p *Pool) *EnsurePublisher
NewEnsurePublisher return a new instance of EnsurePublisher.
func (*EnsurePublisher) Publish ¶
func (p *EnsurePublisher) Publish(ctx context.Context, exchange, key string, msg amqp.Publishing) error
Publish sends msg to an exchange on the RabbitMQ and wait to ensure that msg have successfully been received by the server. It will block until is either message is successfully delivered, context has cancelled or error received.
type FireForgetPublisher ¶ added in v0.4.0
type FireForgetPublisher struct {
// contains filtered or unexported fields
}
FireForgetPublisher implement Publisher interface and used to publish messages to RabbitMQ exchange without delivery guarantees. When FireForgetPublisher used, publishing confirmation is not enabled, so we haven't delivery guarantees. @see http://www.rabbitmq.com/blog/2011/02/10/introducing-publisher-confirms/
Example ¶
This example demonstrates publishing messages in RabbitMQ exchange using FireForgetPublisher.
package main import ( "context" "fmt" "log" "time" "github.com/furdarius/rabbitroutine" "github.com/streadway/amqp" ) func main() { ctx := context.Background() url := "amqp://guest:guest@127.0.0.1:5672/" conn := rabbitroutine.NewConnector(rabbitroutine.Config{ // Max reconnect attempts ReconnectAttempts: 20000, // How long wait between reconnect Wait: 2 * time.Second, }) pool := rabbitroutine.NewLightningPool(conn) pub := rabbitroutine.NewFireForgetPublisher(pool) go func() { err := conn.Dial(ctx, url) if err != nil { log.Println("failed to establish RabbitMQ connection:", err) } }() for i := 0; i < 5000; i++ { timeoutCtx, cancel := context.WithTimeout(ctx, 100*time.Millisecond) err := pub.Publish(timeoutCtx, "myexch", "myqueue", amqp.Publishing{ Body: []byte(fmt.Sprintf("message %d", i)), }) if err != nil { log.Println("failed to publish:", err) } cancel() } }
Output:
func NewFireForgetPublisher ¶ added in v0.4.0
func NewFireForgetPublisher(p *LightningPool) *FireForgetPublisher
NewFireForgetPublisher return a new instance of FireForgetPublisher.
func (*FireForgetPublisher) Publish ¶ added in v0.4.0
func (p *FireForgetPublisher) Publish(ctx context.Context, exchange, key string, msg amqp.Publishing) error
Publish sends msg to an exchange on the RabbitMQ.
type LightningPool ¶ added in v0.4.0
type LightningPool struct {
// contains filtered or unexported fields
}
LightningPool stores AMQP Channels without confirm mode, so they will be used without delivery guarantees.
func NewLightningPool ¶ added in v0.4.0
func NewLightningPool(conn *Connector) *LightningPool
NewLightningPool return a new instance of LightningPool.
func (*LightningPool) Release ¶ added in v0.4.0
func (p *LightningPool) Release(k *amqp.Channel)
Release adds k to the pool.
func (*LightningPool) Size ¶ added in v0.4.0
func (p *LightningPool) Size() int
Size returns current pool size. note: not thread-safe operation.
type Pool ¶
type Pool struct {
// contains filtered or unexported fields
}
Pool is a set of AMQP Channels that may be individually saved and retrieved.
func (*Pool) ChannelWithConfirm ¶
func (p *Pool) ChannelWithConfirm(ctx context.Context) (ChannelKeeper, error)
ChannelWithConfirm returns a ChannelKeeper with AMQP Channel into confirm mode.
type Publisher ¶
type Publisher interface { // Publish used to send msg to RabbitMQ exchange. Publish(ctx context.Context, exchange, key string, msg amqp.Publishing) error }
Publisher interface provides functionality of publishing to RabbitMQ.
type Retried ¶
Retried is fired when connection retrying occurs. The event will be emitted only if the connection was not established. If connection was successfully established Dialed event emitted.
type RetryPublisher ¶
type RetryPublisher struct { Publisher // contains filtered or unexported fields }
RetryPublisher implement Publisher interface and used to publish messages to RabbitMQ exchange. It will block until is either message is successfully delivered or context has cancelled. On error publisher will retry to publish msg.
func NewRetryPublisher ¶
func NewRetryPublisher(p Publisher) *RetryPublisher
NewRetryPublisher return a new instance of RetryPublisher.
func NewRetryPublisherWithDelay ¶
func NewRetryPublisherWithDelay(p Publisher, delay time.Duration) *RetryPublisher
NewRetryPublisherWithDelay return a new instance of RetryPublisher with defined delay between retries.
func (*RetryPublisher) Publish ¶
func (p *RetryPublisher) Publish(ctx context.Context, exchange, key string, msg amqp.Publishing) error
Publish is used to send msg to RabbitMQ exchange. It will block until is either message is delivered or context has cancelled. Error returned only if context was done.