rabbitmq

package
v0.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 23, 2021 License: MIT Imports: 11 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Dial

func Dial(opts ...Option) (*amqp.Connection, error)

Dial returns established connection or an error. It keeps retrying until timeout 30sec is reached.

func NewConsumer

func NewConsumer(
	connCh <-chan *Connection,
	opts ...consumer.Option,
) (*consumer.Consumer, error)

func NewPublisher

func NewPublisher(
	connCh <-chan *Connection,
	opts ...publisher.Option,
) (*publisher.Publisher, error)

func Queue

func Queue(
	ctx context.Context,
	c *Dialer,
	name string,
	durable,
	autDelete,
	exclusive,
	noWait bool,
	args amqp.Table,
) (amqp.Queue, error)

func TempQueue

func TempQueue(
	ctx context.Context,
	c *Dialer,
) (amqp.Queue, error)

Types

type AMQPConnection

type AMQPConnection interface {
	NotifyClose(chan *amqp.Error) chan *amqp.Error
	Close() error
}

AMQPConnection is an interface for streadway's *amqp.Connection

type Connection

type Connection struct {
	// contains filtered or unexported fields
}

Connection provides access to streadway's *amqp.Connection as well as notification channels A notification indicates that something wrong has happened to the connection. The client should get a fresh connection from Dialer.

func (*Connection) AMQPConnection

func (c *Connection) AMQPConnection() *amqp.Connection

AMQPConnection returns streadway's *amqp.Connection

func (*Connection) NotifyLost

func (c *Connection) NotifyLost() chan struct{}

NotifyLost notifies when current connection is lost and new once should be requested

type Dialer

type Dialer struct {
	// contains filtered or unexported fields
}

Dialer is responsible for keeping the connection up. If connection is lost or closed. It tries dial a server again and again with some wait periods. Dialer keep connection up until it Dialer.Close() method called or the context is canceled.

func NewDialer

func NewDialer(opts ...Option) (*Dialer, error)

NewDialer returns Dialer or a configuration error.

func (*Dialer) Close

func (c *Dialer) Close()

Close initiate Dialer close. Subscribe Dialer.NotifyClosed() to know when it was finally closed.

func (*Dialer) Connection

func (c *Dialer) Connection(ctx context.Context) (*amqp.Connection, error)

Connection returns streadway's *amqp.Connection. The client should subscribe on Dialer.NotifyReady(), Dialer.NotifyUnready() events in order to know when the connection is lost.

func (*Dialer) ConnectionCh

func (c *Dialer) ConnectionCh() <-chan *Connection

ConnectionCh returns Connection channel. The channel should be used to get established connections. The client must subscribe on Connection.NotifyLost(). Once lost, client must stop using current connection and get new one from Connection channel. Connection channel is closed when Dialer is closed. Don't forget to check for closed connection.

func (*Dialer) Consumer

func (c *Dialer) Consumer(opts ...consumer.Option) (*consumer.Consumer, error)

Consumer returns a consumer that support reconnection feature.

func (*Dialer) Notify

func (c *Dialer) Notify(stateCh chan State) <-chan State

Notify could be used to subscribe on Dialer ready/unready events

func (*Dialer) NotifyClosed

func (c *Dialer) NotifyClosed() <-chan struct{}

NotifyClosed could be used to subscribe on Dialer closed event. Dialer.ConnectionCh() could no longer be used after this point

func (*Dialer) Publisher

func (c *Dialer) Publisher(opts ...publisher.Option) (*publisher.Publisher, error)

Publisher returns a consumer that support reconnection feature.

type Option

type Option func(c *Dialer)

Option could be used to configure Dialer

func WithAMQPDial

func WithAMQPDial(dial func(url string, c amqp.Config) (AMQPConnection, error)) Option

WithAMQPDial configure dial function. The function takes the url and amqp.Config and returns AMQPConnection.

func WithConnectionProperties

func WithConnectionProperties(props amqp.Table) Option

WithConnectionProperties configure connection properties set on dial.

func WithContext

func WithContext(ctx context.Context) Option

WithLogger configure Dialer context The context could used later to stop Dialer

func WithLogger

func WithLogger(l logger.Logger) Option

WithLogger configure the logger used by Dialer

func WithNotify

func WithNotify(stateCh chan State) Option

WithNotify helps subscribe on Dialer ready/unready events.

func WithRetryPeriod

func WithRetryPeriod(dur time.Duration) Option

WithRetryPeriod configure how much time to wait before next dial attempt. Default: 5sec.

func WithURL

func WithURL(urls ...string) Option

WithURL configure RabbitMQ servers to dial. Dialer dials url by round-robbin

type Ready

type Ready struct{}

type State

type State struct {
	Ready   *Ready
	Unready *Unready
}

type Unready

type Unready struct {
	Err error
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL