Documentation ¶
Index ¶
- Variables
- type AMQPNotified
- type ChannelKeeper
- type Config
- type Connector
- func (c *Connector) AddAMQPNotifiedListener(h func(n AMQPNotified))
- func (c *Connector) AddDialedListener(h func(r Dialed))
- func (c *Connector) AddRetriedListener(h func(Retried))
- func (c *Connector) Channel(ctx context.Context) (*amqp.Channel, error)
- func (c *Connector) Dial(ctx context.Context, url string) error
- func (c *Connector) DialConfig(ctx context.Context, url string, config amqp.Config) error
- func (c *Connector) StartConsumer(ctx context.Context, consumer Consumer) error
- func (c *Connector) StartMultipleConsumers(ctx context.Context, consumer Consumer, count int) error
- type Consumer
- type Dialed
- type EnsurePublisher
- type FireForgetPublisher
- type LightningPool
- type Pool
- type Publisher
- type Retried
- type RetryDelayFunc
- type RetryPublisher
- type RetryPublisherOption
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var ( // ErrNotFound indicates that RabbitMQ entity doesn't exist. ErrNotFound = errors.New("rabbitmq entity not found") // ErrNoRoute indicates that queue is bound that matches the routing key. // @see: https://www.rabbitmq.com/amqp-0-9-1-errata.html#section_17 ErrNoRoute = errors.New("queue not bound") )
Functions ¶
This section is empty.
Types ¶
type AMQPNotified ¶
AMQPNotified is fired when AMQP error occurred.
type ChannelKeeper ¶
type ChannelKeeper struct {
// contains filtered or unexported fields
}
ChannelKeeper stores AMQP Channel with Confirmation and Close chans.
func (*ChannelKeeper) Channel ¶
func (k *ChannelKeeper) Channel() *amqp.Channel
Channel returns an amqp.Channel stored in ChannelKeeper.
func (*ChannelKeeper) Close ¶
func (k *ChannelKeeper) Close() error
Close closes RabbitMQ channel stored in ChannelKeeper.
func (*ChannelKeeper) Confirm ¶
func (k *ChannelKeeper) Confirm() <-chan amqp.Confirmation
Confirm returns a channel that will receive amqp.Confirmation when it occurs.
func (*ChannelKeeper) Error ¶
func (k *ChannelKeeper) Error() <-chan *amqp.Error
Error returns a channel that will receive amqp.Error when it occurs.
func (*ChannelKeeper) Return ¶ added in v0.5.0
func (k *ChannelKeeper) Return() <-chan amqp.Return
Return returns a channel that will receive amqp.Return when it occurs.
type Config ¶
type Config struct { // ReconnectAttempts is a number that defines how many reconnect attempts would be made after the connection was broke off. // After a new connection have been established this number is reset. // So, when a next broke off happens there will be not less than ReconnectAttempts attempts to reconnect. // In case of maximum reconnect attempts exceeded* Dial or DialConfig func will just return error and that's it. // It's your turn to handle this situation. // But in generall it's better have unlimited ReconnectAttemts and log errors using Connector.AddRetriedListener (see examples dir) ReconnectAttempts uint // How long to wait between reconnect attempts. Wait time.Duration }
Config stores reconnect options.
type Connector ¶
type Connector struct {
// contains filtered or unexported fields
}
Connector implement RabbitMQ failover.
func NewConnector ¶
NewConnector return a new instance of Connector.
func (*Connector) AddAMQPNotifiedListener ¶
func (c *Connector) AddAMQPNotifiedListener(h func(n AMQPNotified))
AddAMQPNotifiedListener registers a event listener of AMQP error receiving.
NOTE: not concurrency-safe.
func (*Connector) AddDialedListener ¶
AddDialedListener registers a event listener of connection successfully established.
NOTE: not concurrency-safe.
func (*Connector) AddRetriedListener ¶
AddRetriedListener registers a event listener of connection establishing attempts.
NOTE: not concurrency-safe.
func (*Connector) Channel ¶
Channel allocate and return new amqp.Channel. On error new Channel should be opened.
NOTE: It's blocking method. (It's waiting before connection will be established)
func (*Connector) Dial ¶ added in v0.2.0
Dial will try to keep RabbitMQ connection active by catching and handling connection errors. It will return any error only if ctx was done.
NOTE: It's blocking method.
func (*Connector) DialConfig ¶ added in v0.2.0
DialConfig used to configure RabbitMQ connection with amqp.Config. It will try to keep RabbitMQ connection active by catching and handling connection errors. It will return any error only if ctx was done.
NOTE: It's blocking method.
func (*Connector) StartConsumer ¶
StartConsumer is used to start Consumer.
NOTE: It's blocking method.
func (*Connector) StartMultipleConsumers ¶
StartMultipleConsumers is used to start Consumer "count" times. Method Declare will be called once, and Consume will be called "count" times (one goroutine per call) so you can scale consumer horizontally. It's blocking method.
NOTE: It's blocking method. nolint: gocyclo
type Consumer ¶
type Consumer interface { // Declare used to declare required RabbitMQ entities. // Will be called once before Consume (even when StartMultipleConsumers called). // On any problems with connection or channel RabbitMQ entities will be redeclared. Declare(ctx context.Context, ch *amqp.Channel) error // Consume used to consuming RabbitMQ queue. // Can be called 1+ times if you register it with StartMultipleConsumers. Consume(ctx context.Context, ch *amqp.Channel) error }
Consumer interface provides functionality of rabbit entity Declaring and queue consuming.
Example ¶
This example demonstrates consuming messages from RabbitMQ queue.
package main import ( "context" "fmt" "log" "os" "os/signal" "syscall" "time" "github.com/furdarius/rabbitroutine" amqp "github.com/rabbitmq/amqp091-go" ) // Consumer implement rabbitroutine.Consumer interface. type Consumer struct { ExchangeName string QueueName string } // Declare implement rabbitroutine.Consumer.(Declare) interface method. func (c *Consumer) Declare(ctx context.Context, ch *amqp.Channel) error { err := ch.ExchangeDeclare( c.ExchangeName, // name "direct", // type true, // durable false, // auto-deleted false, // internal false, // no-wait nil, // arguments ) if err != nil { log.Printf("failed to declare exchange %v: %v", c.ExchangeName, err) return err } _, err = ch.QueueDeclare( c.QueueName, // name true, // durable false, // delete when unused false, // exclusive false, // no-wait nil, // arguments ) if err != nil { log.Printf("failed to declare queue %v: %v", c.QueueName, err) return err } err = ch.QueueBind( c.QueueName, // queue name c.QueueName, // routing key c.ExchangeName, // exchange false, // no-wait nil, // arguments ) if err != nil { log.Printf("failed to bind queue %v: %v", c.QueueName, err) return err } return nil } // Consume implement rabbitroutine.Consumer.(Consume) interface method. func (c *Consumer) Consume(ctx context.Context, ch *amqp.Channel) error { defer log.Println("consume method finished") err := ch.Qos( 1, // prefetch count 0, // prefetch size false, // global ) if err != nil { log.Printf("failed to set qos: %v", err) return err } msgs, err := ch.Consume( c.QueueName, // queue "myconsumer", // consumer name false, // auto-ack false, // exclusive false, // no-local false, // no-wait nil, // args ) if err != nil { log.Printf("failed to consume %v: %v", c.QueueName, err) return err } for { select { case msg, ok := <-msgs: if !ok { return amqp.ErrClosed } content := string(msg.Body) fmt.Println("New message:", content) err := msg.Ack(false) if err != nil { log.Printf("failed to Ack message: %v", err) } case <-ctx.Done(): return ctx.Err() } } } // This example demonstrates consuming messages from RabbitMQ queue. func main() { ctx := context.Background() url := "amqp://guest:guest@127.0.0.1:5672/" conn := rabbitroutine.NewConnector(rabbitroutine.Config{ // Max reconnect attempts ReconnectAttempts: 20, // How long wait between reconnect Wait: 2 * time.Second, }) conn.AddRetriedListener(func(r rabbitroutine.Retried) { log.Printf("try to connect to RabbitMQ: attempt=%d, error=\"%v\"", r.ReconnectAttempt, r.Error) }) conn.AddDialedListener(func(_ rabbitroutine.Dialed) { log.Printf("RabbitMQ connection successfully established") }) conn.AddAMQPNotifiedListener(func(n rabbitroutine.AMQPNotified) { log.Printf("RabbitMQ error received: %v", n.Error) }) consumer := &Consumer{ ExchangeName: "myexch", QueueName: "myqueue", } go func() { err := conn.Dial(ctx, url) if err != nil { log.Println("failed to establish RabbitMQ connection:", err) } }() go func() { err := conn.StartMultipleConsumers(ctx, consumer, 5) if err != nil { log.Println("failed to start consumer:", err) } }() sigc := make(chan os.Signal, 1) signal.Notify(sigc, syscall.SIGINT, os.Interrupt, syscall.SIGTERM) // Wait for OS termination signal <-sigc }
Output:
type EnsurePublisher ¶
type EnsurePublisher struct {
// contains filtered or unexported fields
}
EnsurePublisher implements Publisher interface and guarantees delivery of the message to the server. When EnsurePublisher used, publishing confirmation is enabled, so we have delivery guarantees. @see http://www.rabbitmq.com/blog/2011/02/10/introducing-publisher-confirms/ @see https://www.rabbitmq.com/amqp-0-9-1-errata.html#section_17
Example ¶
This example demonstrates publishing messages in RabbitMQ exchange delivery guarantees by EnsurePublisher and publishing retries by RetryPublisher.
package main import ( "context" "fmt" "log" "time" "github.com/furdarius/rabbitroutine" amqp "github.com/rabbitmq/amqp091-go" ) func main() { ctx := context.Background() url := "amqp://guest:guest@127.0.0.1:5672/" conn := rabbitroutine.NewConnector(rabbitroutine.Config{ // Max reconnect attempts ReconnectAttempts: 20000, // How long wait between reconnect Wait: 2 * time.Second, }) pool := rabbitroutine.NewPool(conn) ensurePub := rabbitroutine.NewEnsurePublisher(pool) pub := rabbitroutine.NewRetryPublisher(ensurePub) go func() { err := conn.Dial(ctx, url) if err != nil { log.Println("failed to establish RabbitMQ connection:", err) } }() for i := 0; i < 5000; i++ { timeoutCtx, cancel := context.WithTimeout(ctx, 100*time.Millisecond) err := pub.Publish(timeoutCtx, "myexch", "myqueue", amqp.Publishing{ Body: []byte(fmt.Sprintf("message %d", i)), }) if err != nil { log.Println("failed to publish:", err) } cancel() } }
Output:
func NewEnsurePublisher ¶
func NewEnsurePublisher(p *Pool) *EnsurePublisher
NewEnsurePublisher returns a new instance of EnsurePublisher.
func (*EnsurePublisher) Publish ¶
func (p *EnsurePublisher) Publish(ctx context.Context, exchange, key string, msg amqp.Publishing) error
Publish sends msg to an exchange on the RabbitMQ and wait to ensure that msg have been successfully received by the server. Returns error if no queue is bound that matches the routing key. It will blocks until is either message is successfully delivered, context has cancelled or error received.
While reconnecting is in process Publishing can't be finished, because amqp.Channel can't be received. Publisher doesn't know about the state of the connection, so for publisher reconniction is the same as "request took too long to be finished". "Too long" is defined by context.Context that is passed as first argument to Publish. If context has been cancelled, Publish returns context.DeadlineExceeded error. If connection was reestablished and Publish had enough time to be finished, then request would be finished successfully.
type FireForgetPublisher ¶ added in v0.4.0
type FireForgetPublisher struct {
// contains filtered or unexported fields
}
FireForgetPublisher implements Publisher interface and used to publish messages to RabbitMQ exchange without delivery guarantees. When FireForgetPublisher used, publishing confirmation is not enabled, so we haven't delivery guarantees. @see http://www.rabbitmq.com/blog/2011/02/10/introducing-publisher-confirms/
Example ¶
This example demonstrates publishing messages in RabbitMQ exchange using FireForgetPublisher.
package main import ( "context" "fmt" "log" "time" "github.com/furdarius/rabbitroutine" amqp "github.com/rabbitmq/amqp091-go" ) func main() { ctx := context.Background() url := "amqp://guest:guest@127.0.0.1:5672/" conn := rabbitroutine.NewConnector(rabbitroutine.Config{ // Max reconnect attempts ReconnectAttempts: 20000, // How long wait between reconnect Wait: 2 * time.Second, }) pool := rabbitroutine.NewLightningPool(conn) pub := rabbitroutine.NewFireForgetPublisher(pool) go func() { err := conn.Dial(ctx, url) if err != nil { log.Println("failed to establish RabbitMQ connection:", err) } }() for i := 0; i < 5000; i++ { timeoutCtx, cancel := context.WithTimeout(ctx, 100*time.Millisecond) err := pub.Publish(timeoutCtx, "myexch", "myqueue", amqp.Publishing{ Body: []byte(fmt.Sprintf("message %d", i)), }) if err != nil { log.Println("failed to publish:", err) } cancel() } }
Output:
func NewFireForgetPublisher ¶ added in v0.4.0
func NewFireForgetPublisher(p *LightningPool) *FireForgetPublisher
NewFireForgetPublisher returns a new instance of FireForgetPublisher.
func (*FireForgetPublisher) Publish ¶ added in v0.4.0
func (p *FireForgetPublisher) Publish(ctx context.Context, exchange, key string, msg amqp.Publishing) error
Publish sends msg to an exchange on the RabbitMQ.
type LightningPool ¶ added in v0.4.0
type LightningPool struct {
// contains filtered or unexported fields
}
LightningPool stores AMQP Channels without confirm mode, so they will be used without delivery guarantees.
func NewLightningPool ¶ added in v0.4.0
func NewLightningPool(conn *Connector) *LightningPool
NewLightningPool return a new instance of LightningPool.
func (*LightningPool) Release ¶ added in v0.4.0
func (p *LightningPool) Release(k *amqp.Channel)
Release adds k to the pool.
func (*LightningPool) Size ¶ added in v0.4.0
func (p *LightningPool) Size() int
Size returns current pool size. note: not thread-safe operation.
type Pool ¶
type Pool struct {
// contains filtered or unexported fields
}
Pool is a set of AMQP Channels that may be individually saved and retrieved.
func (*Pool) ChannelWithConfirm ¶
func (p *Pool) ChannelWithConfirm(ctx context.Context) (ChannelKeeper, error)
ChannelWithConfirm returns a ChannelKeeper with AMQP Channel into confirm mode.
type Publisher ¶
type Publisher interface { // Publish used to send msg to RabbitMQ exchange. Publish(ctx context.Context, exchange, key string, msg amqp.Publishing) error }
Publisher interface provides functionality of publishing to RabbitMQ.
type Retried ¶
Retried is fired when connection retrying occurs. The event will be emitted only if the connection was not established. If connection was successfully established Dialed event emitted.
type RetryDelayFunc ¶ added in v0.5.0
RetryDelayFunc returns how long to wait before retry.
func ConstDelay ¶ added in v0.5.0
func ConstDelay(delay time.Duration) RetryDelayFunc
ConstDelay returns constant delay value.
func LinearDelay ¶ added in v0.5.0
func LinearDelay(delay time.Duration) RetryDelayFunc
LinearDelay returns delay value increases linearly depending on the current attempt.
type RetryPublisher ¶
type RetryPublisher struct { Publisher // contains filtered or unexported fields }
RetryPublisher retries to publish message before context done.
func NewRetryPublisher ¶
func NewRetryPublisher(p Publisher, opts ...RetryPublisherOption) *RetryPublisher
NewRetryPublisher returns a new instance of RetryPublisherOption.
func (*RetryPublisher) Publish ¶
func (p *RetryPublisher) Publish(ctx context.Context, exchange, key string, msg amqp.Publishing) error
Publish is used to send msg to RabbitMQ exchange. It will block until is either message is delivered or context has cancelled. Error returned only if context was done.
type RetryPublisherOption ¶ added in v0.5.0
type RetryPublisherOption func(*RetryPublisher)
RetryPublisherOption describes a functional option for configuring RetryPublisher.
func PublishDelaySetup ¶ added in v0.5.0
func PublishDelaySetup(fn RetryDelayFunc) RetryPublisherOption
PublishDelaySetup sets function for publish delay time.Duration receiving.
func PublishMaxAttemptsSetup ¶ added in v0.5.0
func PublishMaxAttemptsSetup(maxAttempts uint) RetryPublisherOption
PublishMaxAttemptsSetup sets limit of publish attempts.