Documentation ¶
Overview ¶
Package amqpextra provides Dialer for dialing in case the connection lost.
Index ¶
- func Dial(opts ...Option) (*amqp.Connection, error)
- func NewConsumer(connCh <-chan *Connection, opts ...consumer.Option) (*consumer.Consumer, error)
- func NewPublisher(connCh <-chan *Connection, opts ...publisher.Option) (*publisher.Publisher, error)
- type AMQPConnection
- type Connection
- type Dialer
- func (c *Dialer) Close()
- func (c *Dialer) Connection(ctx context.Context) (*amqp.Connection, error)
- func (c *Dialer) ConnectionCh() <-chan *Connection
- func (c *Dialer) Consumer(opts ...consumer.Option) (*consumer.Consumer, error)
- func (c *Dialer) Notify(stateCh chan State) <-chan State
- func (c *Dialer) NotifyClosed() <-chan struct{}
- func (c *Dialer) Publisher(opts ...publisher.Option) (*publisher.Publisher, error)
- type Option
- func WithAMQPDial(dial func(url string, c amqp.Config) (AMQPConnection, error)) Option
- func WithConnectionProperties(props amqp.Table) Option
- func WithContext(ctx context.Context) Option
- func WithLogger(l logger.Logger) Option
- func WithNotify(stateCh chan State) Option
- func WithRetryPeriod(dur time.Duration) Option
- func WithURL(urls ...string) Option
- type Ready
- type State
- type Unready
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func Dial ¶
func Dial(opts ...Option) (*amqp.Connection, error)
Dial returns established connection or an error. It keeps retrying until timeout 30sec is reached.
func NewConsumer ¶
Example ¶
package main import ( "context" "log" "github.com/artezh/amqpextra" "github.com/artezh/amqpextra/consumer" "github.com/streadway/amqp" ) func main() { // you can get connCh from dialer.ConnectionCh() method var connCh chan *amqpextra.Connection h := consumer.HandlerFunc( func(ctx context.Context, msg amqp.Delivery) interface{} { // process message msg.Ack(false) return nil }) // create consumer c, err := amqpextra.NewConsumer( connCh, consumer.WithHandler(h), consumer.WithQueue("a_queue"), ) if err != nil { log.Fatal(err) } // close consumer c.Close() <-c.NotifyClosed() }
Output:
func NewPublisher ¶
func NewPublisher( connCh <-chan *Connection, opts ...publisher.Option, ) (*publisher.Publisher, error)
Example ¶
package main import ( "log" "github.com/artezh/amqpextra" "github.com/artezh/amqpextra/publisher" "github.com/streadway/amqp" ) func main() { // you can get readyCh from dialer.ConnectionCh() method var connCh chan *amqpextra.Connection // create publisher p, err := amqpextra.NewPublisher(connCh) if err != nil { log.Fatal(err) } // publish a message go p.Publish(publisher.Message{ Key: "test_queue", Publishing: amqp.Publishing{ Body: []byte(`{"foo": "fooVal"}`), }, }) // close publisher p.Close() <-p.NotifyClosed() }
Output:
Types ¶
type AMQPConnection ¶
AMQPConnection is an interface for streadway's *amqp.Connection
type Connection ¶
type Connection struct {
// contains filtered or unexported fields
}
Connection provides access to streadway's *amqp.Connection as well as notification channels A notification indicates that something wrong has happened to the connection. The client should get a fresh connection from Dialer.
func (*Connection) AMQPConnection ¶
func (c *Connection) AMQPConnection() *amqp.Connection
AMQPConnection returns streadway's *amqp.Connection
func (*Connection) NotifyLost ¶
func (c *Connection) NotifyLost() chan struct{}
NotifyLost notifies when current connection is lost and new once should be requested
type Dialer ¶
type Dialer struct {
// contains filtered or unexported fields
}
Dialer is responsible for keeping the connection up. If connection is lost or closed. It tries dial a server again and again with some wait periods. Dialer keep connection up until it Dialer.Close() method called or the context is canceled.
func (*Dialer) Close ¶
func (c *Dialer) Close()
Close initiate Dialer close. Subscribe Dialer.NotifyClosed() to know when it was finally closed.
func (*Dialer) Connection ¶
Connection returns streadway's *amqp.Connection. The client should subscribe on Dialer.NotifyReady(), Dialer.NotifyUnready() events in order to know when the connection is lost.
func (*Dialer) ConnectionCh ¶
func (c *Dialer) ConnectionCh() <-chan *Connection
ConnectionCh returns Connection channel. The channel should be used to get established connections. The client must subscribe on Connection.NotifyLost(). Once lost, client must stop using current connection and get new one from Connection channel. Connection channel is closed when Dialer is closed. Don't forget to check for closed connection.
Example ¶
nolint:gosimple // the purpose of select case is to stress the connCh close case.
package main import ( "log" "time" "github.com/artezh/amqpextra" ) func main() { dialer, err := amqpextra.NewDialer(amqpextra.WithURL("amqp://guest:guest@localhost:5672/%2f")) if err != nil { log.Fatal(err) } connCh := dialer.ConnectionCh() go func() { for { select { case conn, ok := <-connCh: if !ok { // connection is permanently closed return } <-conn.NotifyLost() } } }() time.Sleep(time.Second) dialer.Close() }
Output:
func (*Dialer) Consumer ¶
Consumer returns a consumer that support reconnection feature.
Example ¶
package main import ( "context" "github.com/artezh/amqpextra" "github.com/artezh/amqpextra/consumer" "github.com/streadway/amqp" ) func main() { // open connection d, _ := amqpextra.NewDialer(amqpextra.WithURL("amqp://guest:guest@localhost:5672/%2f")) h := consumer.HandlerFunc(func(ctx context.Context, msg amqp.Delivery) interface{} { // process message msg.Ack(false) return nil }) c, _ := d.Consumer( consumer.WithQueue("a_queue"), consumer.WithHandler(h), ) // close consumer c.Close() // close dialer d.Close() }
Output:
func (*Dialer) NotifyClosed ¶
func (c *Dialer) NotifyClosed() <-chan struct{}
NotifyClosed could be used to subscribe on Dialer closed event. Dialer.ConnectionCh() could no longer be used after this point
func (*Dialer) Publisher ¶
Publisher returns a consumer that support reconnection feature.
Example ¶
package main import ( "context" "time" "github.com/artezh/amqpextra" "github.com/artezh/amqpextra/publisher" "github.com/streadway/amqp" ) func main() { // open connection d, _ := amqpextra.NewDialer(amqpextra.WithURL("amqp://guest:guest@localhost:5672/%2f")) // create publisher p, _ := d.Publisher() ctx, cancelFunc := context.WithTimeout(context.Background(), time.Millisecond*100) defer cancelFunc() // publish a message p.Publish(publisher.Message{ Key: "test_queue", Context: ctx, Publishing: amqp.Publishing{ Body: []byte(`{"foo": "fooVal"}`), }, }) // close publisher p.Close() // close connection d.Close() }
Output:
type Option ¶
type Option func(c *Dialer)
Option could be used to configure Dialer
func WithAMQPDial ¶
WithAMQPDial configure dial function. The function takes the url and amqp.Config and returns AMQPConnection.
func WithConnectionProperties ¶
WithConnectionProperties configure connection properties set on dial.
func WithContext ¶
WithLogger configure Dialer context The context could used later to stop Dialer
func WithLogger ¶
WithLogger configure the logger used by Dialer
func WithNotify ¶
WithNotify helps subscribe on Dialer ready/unready events.
func WithRetryPeriod ¶
WithRetryPeriod configure how much time to wait before next dial attempt. Default: 5sec.
Directories ¶
Path | Synopsis |
---|---|
mock_consumer
Package mock_consumer is a generated GoMock package.
|
Package mock_consumer is a generated GoMock package. |
e2e_test
|
|
Package mock_amqpextra is a generated GoMock package.
|
Package mock_amqpextra is a generated GoMock package. |
mock_publisher
Package mock_publisher is a generated GoMock package.
|
Package mock_publisher is a generated GoMock package. |