amqpextra

package module
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 2, 2021 License: MIT Imports: 9 Imported by: 0

README

Extra features for streadway/amqp package.

Build Status

Documentation

Dialer.

Provides:

  • Auto reconnect.
  • Context aware.
  • Configured by WithXXX options.
  • Dial multiple servers.
  • Notifies ready\unready\closed states.

Examples:

Consumer.

Provides:

  • Auto reconnect.
  • Context aware.
  • Configured by WithXXX options.
  • Can process messages in parallel.
  • Adds message context.
  • Detects queue deletion and reconnect.
  • Notifies ready\unready\closed states.

Examples:

Publisher.

Provides:

  • Auto reconnect.
  • Context aware.
  • Configured by WithXXX options.
  • Notifies ready\unready\closed states.
  • Publish could wait till connection ready.
  • Adds message context.
  • Publish a message struct (define only what you need).
  • Supports flow control.

Examples:

Consumer middlewares

The consumer could chain middlewares for a preprocessing received message.

Here's some built-in middlewares:

  • HasCorrelationID - Nack message if has no correlation id
  • HasReplyTo - Nack message if has no reply to.
  • Logger - Context with logger.
  • Recover - Recover worker from panic, nack message.
  • Expire - Convert Message expiration to context with timeout.
  • AckNack - Return middleware.Ack to ack message.

Documentation

Overview

Package amqpextra provides Dialer for dialing in case the connection lost.

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func Dial

func Dial(opts ...Option) (*amqp.Connection, error)

Dial returns established connection or an error. It keeps retrying until timeout 30sec is reached.

func NewConsumer

func NewConsumer(
	connCh <-chan *Connection,
	opts ...consumer.Option,
) (*consumer.Consumer, error)
Example
package main

import (
	"context"

	"log"

	"github.com/artezh/amqpextra"
	"github.com/artezh/amqpextra/consumer"
	"github.com/streadway/amqp"
)

func main() {
	// you can get connCh from dialer.ConnectionCh() method
	var connCh chan *amqpextra.Connection
	h := consumer.HandlerFunc(
		func(ctx context.Context, msg amqp.Delivery) interface{} {
			// process message
			msg.Ack(false)
			return nil
		})

	// create consumer
	c, err := amqpextra.NewConsumer(
		connCh,
		consumer.WithHandler(h),
		consumer.WithQueue("a_queue"),
	)
	if err != nil {
		log.Fatal(err)
	}

	// close consumer
	c.Close()
	<-c.NotifyClosed()

}
Output:

func NewPublisher

func NewPublisher(
	connCh <-chan *Connection,
	opts ...publisher.Option,
) (*publisher.Publisher, error)
Example
package main

import (
	"log"

	"github.com/artezh/amqpextra"
	"github.com/artezh/amqpextra/publisher"
	"github.com/streadway/amqp"
)

func main() {
	// you can get readyCh from dialer.ConnectionCh() method
	var connCh chan *amqpextra.Connection

	// create publisher
	p, err := amqpextra.NewPublisher(connCh)
	if err != nil {
		log.Fatal(err)
	}

	// publish a message
	go p.Publish(publisher.Message{
		Key: "test_queue",
		Publishing: amqp.Publishing{
			Body: []byte(`{"foo": "fooVal"}`),
		},
	})

	// close publisher
	p.Close()
	<-p.NotifyClosed()

}
Output:

Types

type AMQPConnection

type AMQPConnection interface {
	NotifyClose(chan *amqp.Error) chan *amqp.Error
	Close() error
}

AMQPConnection is an interface for streadway's *amqp.Connection

type Connection

type Connection struct {
	// contains filtered or unexported fields
}

Connection provides access to streadway's *amqp.Connection as well as notification channels A notification indicates that something wrong has happened to the connection. The client should get a fresh connection from Dialer.

func (*Connection) AMQPConnection

func (c *Connection) AMQPConnection() *amqp.Connection

AMQPConnection returns streadway's *amqp.Connection

func (*Connection) NotifyLost

func (c *Connection) NotifyLost() chan struct{}

NotifyLost notifies when current connection is lost and new once should be requested

type Dialer

type Dialer struct {
	// contains filtered or unexported fields
}

Dialer is responsible for keeping the connection up. If connection is lost or closed. It tries dial a server again and again with some wait periods. Dialer keep connection up until it Dialer.Close() method called or the context is canceled.

func NewDialer

func NewDialer(opts ...Option) (*Dialer, error)

NewDialer returns Dialer or a configuration error.

func (*Dialer) Close

func (c *Dialer) Close()

Close initiate Dialer close. Subscribe Dialer.NotifyClosed() to know when it was finally closed.

func (*Dialer) Connection

func (c *Dialer) Connection(ctx context.Context) (*amqp.Connection, error)

Connection returns streadway's *amqp.Connection. The client should subscribe on Dialer.NotifyReady(), Dialer.NotifyUnready() events in order to know when the connection is lost.

func (*Dialer) ConnectionCh

func (c *Dialer) ConnectionCh() <-chan *Connection

ConnectionCh returns Connection channel. The channel should be used to get established connections. The client must subscribe on Connection.NotifyLost(). Once lost, client must stop using current connection and get new one from Connection channel. Connection channel is closed when Dialer is closed. Don't forget to check for closed connection.

Example

nolint:gosimple // the purpose of select case is to stress the connCh close case.

package main

import (
	"log"

	"time"

	"github.com/artezh/amqpextra"
)

func main() {
	dialer, err := amqpextra.NewDialer(amqpextra.WithURL("amqp://guest:guest@localhost:5672/%2f"))
	if err != nil {
		log.Fatal(err)
	}

	connCh := dialer.ConnectionCh()
	go func() {
		for {
			select {
			case conn, ok := <-connCh:
				if !ok {
					// connection is permanently closed
					return
				}

				<-conn.NotifyLost()
			}
		}
	}()

	time.Sleep(time.Second)
	dialer.Close()

}
Output:

func (*Dialer) Consumer

func (c *Dialer) Consumer(opts ...consumer.Option) (*consumer.Consumer, error)

Consumer returns a consumer that support reconnection feature.

Example
package main

import (
	"context"

	"github.com/artezh/amqpextra"
	"github.com/artezh/amqpextra/consumer"
	"github.com/streadway/amqp"
)

func main() {
	// open connection
	d, _ := amqpextra.NewDialer(amqpextra.WithURL("amqp://guest:guest@localhost:5672/%2f"))

	h := consumer.HandlerFunc(func(ctx context.Context, msg amqp.Delivery) interface{} {
		// process message
		msg.Ack(false)

		return nil
	})

	c, _ := d.Consumer(
		consumer.WithQueue("a_queue"),
		consumer.WithHandler(h),
	)

	// close consumer
	c.Close()

	// close dialer
	d.Close()

}
Output:

func (*Dialer) Notify

func (c *Dialer) Notify(stateCh chan State) <-chan State

Notify could be used to subscribe on Dialer ready/unready events

func (*Dialer) NotifyClosed

func (c *Dialer) NotifyClosed() <-chan struct{}

NotifyClosed could be used to subscribe on Dialer closed event. Dialer.ConnectionCh() could no longer be used after this point

func (*Dialer) Publisher

func (c *Dialer) Publisher(opts ...publisher.Option) (*publisher.Publisher, error)

Publisher returns a consumer that support reconnection feature.

Example
package main

import (
	"context"
	"time"

	"github.com/artezh/amqpextra"
	"github.com/artezh/amqpextra/publisher"
	"github.com/streadway/amqp"
)

func main() {
	// open connection
	d, _ := amqpextra.NewDialer(amqpextra.WithURL("amqp://guest:guest@localhost:5672/%2f"))

	// create publisher
	p, _ := d.Publisher()

	ctx, cancelFunc := context.WithTimeout(context.Background(), time.Millisecond*100)
	defer cancelFunc()

	// publish a message
	p.Publish(publisher.Message{
		Key:     "test_queue",
		Context: ctx,
		Publishing: amqp.Publishing{
			Body: []byte(`{"foo": "fooVal"}`),
		},
	})

	// close publisher
	p.Close()

	// close connection
	d.Close()

}
Output:

type Option

type Option func(c *Dialer)

Option could be used to configure Dialer

func WithAMQPDial

func WithAMQPDial(dial func(url string, c amqp.Config) (AMQPConnection, error)) Option

WithAMQPDial configure dial function. The function takes the url and amqp.Config and returns AMQPConnection.

func WithConnectionProperties

func WithConnectionProperties(props amqp.Table) Option

WithConnectionProperties configure connection properties set on dial.

func WithContext

func WithContext(ctx context.Context) Option

WithLogger configure Dialer context The context could used later to stop Dialer

func WithLogger

func WithLogger(l logger.Logger) Option

WithLogger configure the logger used by Dialer

func WithNotify

func WithNotify(stateCh chan State) Option

WithNotify helps subscribe on Dialer ready/unready events.

func WithRetryPeriod

func WithRetryPeriod(dur time.Duration) Option

WithRetryPeriod configure how much time to wait before next dial attempt. Default: 5sec.

func WithURL

func WithURL(urls ...string) Option

WithURL configure RabbitMQ servers to dial. Dialer dials url by round-robbin

type Ready

type Ready struct{}

type State

type State struct {
	Ready   *Ready
	Unready *Unready
}

type Unready

type Unready struct {
	Err error
}

Directories

Path Synopsis
mock_consumer
Package mock_consumer is a generated GoMock package.
Package mock_consumer is a generated GoMock package.
e2e_test
Package mock_amqpextra is a generated GoMock package.
Package mock_amqpextra is a generated GoMock package.
mock_publisher
Package mock_publisher is a generated GoMock package.
Package mock_publisher is a generated GoMock package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL