rabbitmq

package
v0.0.0-...-3fe35c7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 25, 2019 License: MIT Imports: 8 Imported by: 0

Documentation

Index

Constants

View Source
const ReconnectDelay = 5 * time.Second

Variables

This section is empty.

Functions

This section is empty.

Types

type Binding

type Binding struct {
	// contains filtered or unexported fields
}

type Connection

type Connection struct {
	// contains filtered or unexported fields
}

Connection is a wrapper for amqp.Connection but adding reconnection functionality.

func NewConnection

func NewConnection(addr string, logger *zap.Logger) *Connection

func (*Connection) Channel

func (c *Connection) Channel() (*amqp.Channel, error)

func (*Connection) Connect

func (c *Connection) Connect() (err error)

Connect will dial to the specified AMQP server addr.

func (*Connection) IsConnected

func (c *Connection) IsConnected() bool

func (*Connection) Shutdown

func (c *Connection) Shutdown()

Shutdown the reconnector and terminate any existing connections

type Declaration

type Declaration func(Declarator) error

func AutoBinding

func AutoBinding(routingKey, queue, exchange string) Declaration

func AutoExchange

func AutoExchange(name string) Declaration

func AutoQueue

func AutoQueue(name string) Declaration

func DeclareBinding

func DeclareBinding(b *Binding) Declaration

func DeclareExchange

func DeclareExchange(e *Exchange) Declaration

func DeclareQueue

func DeclareQueue(q *Queue) Declaration

type Declarator

type Declarator interface {
	QueueDeclare(name string, durable, autoDelete, exclusive, noWait bool, args amqp.Table) (amqp.Queue, error)
	ExchangeDeclare(name, kind string, durable, autoDelete, internal, noWait bool, args amqp.Table) error
	QueueBind(name, key, exchange string, noWait bool, args amqp.Table) error
}

Declarator is implemented by amqp.Channel

type Exchange

type Exchange struct {
	// contains filtered or unexported fields
}

type PublishExchange

type PublishExchange string

type Publisher

type Publisher interface {
	Publish(routingKey string, event interface{}) error
}

type Queue

type Queue struct {
	// contains filtered or unexported fields
}

type Session

type Session struct {
	// contains filtered or unexported fields
}

func NewSession

func NewSession(addr string, logger *zap.Logger) *Session

func (*Session) AddPublisher

func (s *Session) AddPublisher(exchangeName, routingKey string) error

AddPublisher is a wrapper to convenitently prepare the session for publishing on a specific exchange. The method ensures that the target exchange is declared when calling Declare().

func (*Session) AddSubscription

func (s *Session) AddSubscription(exchangeName, queueName, routingKey string, handler Subscriber) error

AddSubscription is a wrapper which uses the Auto*() functions to quickly add an exchange, queue and binding to the declarations list. It will also register the subscriber handler function with the subscriber map. If no connection for the consumer exist, the connection is established at this point. This happens only once, even if you add multiple subscriptions.

func (*Session) Consume

func (s *Session) Consume()

func (*Session) Declare

func (s *Session) Declare() error

Declare goes through all declarations and uses the consumer/produce connection to obtain a channel and perform the declarations.

func (*Session) Publish

func (s *Session) Publish(routingKey string, event interface{}) error

Publish will take the event, marshall it into a proto.Message and then send it on it's journey to the spe

func (*Session) Shutdown

func (s *Session) Shutdown()

Shutdown all existing connections but wait for any in-flight messages to be processed first. Finally, the session context is cancelled which will stop any child-goroutines.

type Subscriber

type Subscriber func(delivery amqp.Delivery)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL