Documentation ¶
Overview ¶
Package rabbeasy is an AMQP wrapper that makes your code more easy to test.
Achieves this goal by encourage the use of provided interfaces. While your own code base depends on interfaces its more easy to test with mock.
Index ¶
- type Acker
- type ClientNotifiable
- type CloserConnection
- type Connection
- func (c *Connection) Close() (err error)
- func (c *Connection) StartConsumer(consumer ConsumerClient) (err error)
- func (c *Connection) StartNotifiableConsumer(consumer ConsumerClientNotifiable) (err error)
- func (c *Connection) StartNotifiablePublisher(publisher PublisherClientNotifiable) (err error)
- func (c *Connection) StartPublisher(publisher PublisherClient) (err error)
- type ConnectionConfig
- type ConnectionParameter
- type Consumer
- type ConsumerClient
- type ConsumerClientNotifiable
- type ConsumerConfig
- type ConsumerConnection
- type ConsumerParameter
- type DeadLetterSender
- type Delivery
- type Deliveryer
- type DestinationConfig
- type GlobalConsumerConfig
- type Handler
- type Logger
- type Message
- type MessageBody
- type MessageConsumer
- type MessagePublisher
- type NotifiableConsumer
- type NotifiableConsumerConnection
- type NotifiableConsumerParameter
- type NotifiablePublisher
- type NotifiablePublisherConnection
- type NotifiablePublisherParameter
- type Publisher
- type PublisherClient
- type PublisherClientNotifiable
- type PublisherConfig
- type PublisherConnection
- type PublisherParameter
- type Requeuer
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type ClientNotifiable ¶
ClientNotifiable notifies channel status
type CloserConnection ¶
type CloserConnection interface {
Close() error
}
CloserConnection defines closeable connection interface
type Connection ¶
type Connection struct { ConsumerClients []ConsumerClient PublisherClients []PublisherClient ConsumerClientNotifiables []ConsumerClientNotifiable PublisherClientNotifiables []PublisherClientNotifiable // contains filtered or unexported fields }
Connection manages amqp connection
func NewConnection ¶
func NewConnection(param ConnectionParameter) (c *Connection, err error)
NewConnection creates Connection and its underlying *amqp.Connection
Initialises underlying *amqp.Connection and reconnection handle process
func (*Connection) StartConsumer ¶
func (c *Connection) StartConsumer(consumer ConsumerClient) (err error)
StartConsumer creates amqp.Channel and assigns to consumer parameter
func (*Connection) StartNotifiableConsumer ¶
func (c *Connection) StartNotifiableConsumer(consumer ConsumerClientNotifiable) (err error)
StartNotifiableConsumer creates amqp.Channel and assigns to notifiableConsumer parameter
func (*Connection) StartNotifiablePublisher ¶
func (c *Connection) StartNotifiablePublisher(publisher PublisherClientNotifiable) (err error)
StartNotifiablePublisher creates amqp.Channel and assigns to notifiablePublisher parameter
func (*Connection) StartPublisher ¶
func (c *Connection) StartPublisher(publisher PublisherClient) (err error)
StartPublisher creates amqp.Channel and assigns to publisher parameter
type ConnectionConfig ¶
type ConnectionConfig struct { Environment string Domain string Name string Number int Host string Port int User string Password string ReconnectInterval time.Duration GlobalConsumerConfig }
ConnectionConfig holds connection settings
func (ConnectionConfig) GetConnectionName ¶
func (c ConnectionConfig) GetConnectionName() (cn string)
GetConnectionName produces connection name
func (ConnectionConfig) GetProperties ¶
func (c ConnectionConfig) GetProperties() (p map[string]interface{})
GetProperties produces properties table
func (ConnectionConfig) GetURL ¶
func (c ConnectionConfig) GetURL() (u string)
GetURL produces url string
type ConnectionParameter ¶
type ConnectionParameter struct { Config ConnectionConfig Logger Logger OnConnect chan interface{} OnDisconnect chan *amqp.Error }
ConnectionParameter holds creation parameters
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
Consumer implements messaging consumer
func NewConsumer ¶
func NewConsumer(param ConsumerParameter) (c *Consumer, err error)
NewConsumer creates messaging consumer
func (*Consumer) Destination ¶
func (c *Consumer) Destination() (d ConsumerConfig)
Destination returns destination config configuration
type ConsumerClient ¶
type ConsumerClient interface { From(<-chan amqp.Delivery) Destination() ConsumerConfig }
ConsumerClient processes channel messages
type ConsumerClientNotifiable ¶
type ConsumerClientNotifiable interface { ConsumerClient ClientNotifiable }
ConsumerClientNotifiable processes channel messages
Also receives channel status
type ConsumerConfig ¶
type ConsumerConfig struct { DestinationConfig PrefetchCount int PrefetchSize int }
ConsumerConfig holds consumer configs
type ConsumerConnection ¶
type ConsumerConnection interface {
StartConsumer(ConsumerClient) error
}
ConsumerConnection defines consumer connection interface
type ConsumerParameter ¶
type ConsumerParameter struct { Config ConsumerConfig Connection ConsumerConnection Handler Handler }
ConsumerParameter holds creation parameters
type DeadLetterSender ¶
type DeadLetterSender interface {
DeadLetter()
}
DeadLetterSender confirms message with sending to discard
type Delivery ¶
type Delivery struct {
// contains filtered or unexported fields
}
Delivery implements Message and wraps message delivered
func NewDelivery ¶
NewDelivery creates message based on an amqp.Delivery
func (*Delivery) DeadLetter ¶
func (d *Delivery) DeadLetter()
DeadLetter removes or send to dead-letter
type Deliveryer ¶
Deliveryer returns original received message
type DestinationConfig ¶
type DestinationConfig struct { // ConsumerConfig // PublisherConfig Queue string Exchange string ExchangeType string RoutingKey string DlxExchange string DlxExchangeType string DlxRoutingKey string Durable bool }
DestinationConfig holds exchange destination configs
type GlobalConsumerConfig ¶
GlobalConsumerConfig holds connection global consumer configs
type Handler ¶
type Handler func(d Message)
Handler called when a message is received
Although you can implement business rules directly in the Handler, it is advisable to delegate the treatment to another function or channel.
Example (Basics) ¶
package main import ( "fmt" "github.com/kimprado/rabbeasy/pkg/rabbeasy" ) func main() { var h rabbeasy.Handler // Consumer will invoke handler for each message var listenerCh = make(chan rabbeasy.MessageBody) h = func(m rabbeasy.Message) { listenerCh <- m } rabbeasy.NewConsumer(rabbeasy.ConsumerParameter{ Handler: h, // Consumer created with handler h reference }) message := <-listenerCh fmt.Println(string(message.Body())) }
Output:
Example (Channel) ¶
package main import ( "fmt" "github.com/kimprado/rabbeasy/pkg/rabbeasy" "github.com/streadway/amqp" ) func main() { type MySimpleMessageType interface { rabbeasy.MessageBody // Declared only what is used ( Body() ) rabbeasy.Acker // Declared only what is used ( Ack() ) // rabbeasy.Requeuer // Not declared for non-use // rabbeasy.DeadLetterSender // Not declared for non-use // rabbeasy.Deliveryer // Not declared for non-use } var listenerCh = make(chan MySimpleMessageType) var handler = func(m rabbeasy.Message) { listenerCh <- m } go produceMockMassege(handler) message := <-listenerCh fmt.Println(string(message.Body())) message.Ack() } func produceMockMassege(h rabbeasy.Handler) { h(&mockMessage{}) } type mockMessage struct{} func (m *mockMessage) Body() []byte { return []byte("hello") } func (m *mockMessage) Ack() {} func (m *mockMessage) Requeue() {} func (m *mockMessage) Delivery() *amqp.Delivery { return nil } func (m *mockMessage) DeadLetter() {}
Output: hello
type Logger ¶
type Logger interface { Errorf(msg string, v ...interface{}) Warnf(msg string, v ...interface{}) Infof(msg string, v ...interface{}) Debugf(msg string, v ...interface{}) Tracef(msg string, v ...interface{}) }
Logger specifies interface for loggers implementations
type Message ¶
type Message interface { MessageBody Acker Requeuer DeadLetterSender Deliveryer }
Message wraps message delivered
type MessageBody ¶
type MessageBody interface {
Body() []byte
}
MessageBody returns bytes of message
type MessageConsumer ¶
type MessageConsumer interface {
Consume(Handler)
}
MessageConsumer processes channel messages
type MessagePublisher ¶
MessagePublisher publishes messages on the channel
type NotifiableConsumer ¶
type NotifiableConsumer struct { Consumer // contains filtered or unexported fields }
NotifiableConsumer implements messaging notifiable consumer
func NewNotifiableConsumer ¶
func NewNotifiableConsumer(param NotifiableConsumerParameter) (c *NotifiableConsumer, err error)
NewNotifiableConsumer creates messaging notifiable consumer
func (*NotifiableConsumer) NotifyClose ¶
func (c *NotifiableConsumer) NotifyClose(e *amqp.Error)
NotifyClose handles channel closure event
Forwards message in amqp.Error form to listener receiver
type NotifiableConsumerConnection ¶
type NotifiableConsumerConnection interface {
StartNotifiableConsumer(ConsumerClientNotifiable) error
}
NotifiableConsumerConnection defines consumer connection interface
type NotifiableConsumerParameter ¶
type NotifiableConsumerParameter struct {
// contains filtered or unexported fields
}
NotifiableConsumerParameter holds creation parameters
type NotifiablePublisher ¶
type NotifiablePublisher struct { Publisher // contains filtered or unexported fields }
NotifiablePublisher implements messaging notifiable publisher
func NewNotifiablePublisher ¶
func NewNotifiablePublisher(param NotifiablePublisherParameter) (p *NotifiablePublisher, err error)
NewNotifiablePublisher creates messaging notifiable publisher
func (*NotifiablePublisher) NotifyClose ¶
func (p *NotifiablePublisher) NotifyClose(e *amqp.Error)
NotifyClose handles channel closure event
Forwards message in amqp.Error form to listener receiver
func (*NotifiablePublisher) Publish ¶
func (p *NotifiablePublisher) Publish(body []byte) (err error)
Publish publishes message in queue or topic
Message are sent to destination defined at publisher creation
Example ¶
package main import ( "github.com/kimprado/rabbeasy/pkg/rabbeasy" "github.com/streadway/amqp" ) func main() { var ( publisher *rabbeasy.NotifiablePublisher param rabbeasy.NotifiablePublisherParameter conn rabbeasy.NotifiablePublisherConnection cfg rabbeasy.PublisherConfig eCh chan *amqp.Error err error ) cfg = rabbeasy.PublisherConfig{ DestinationConfig: rabbeasy.DestinationConfig{ RoutingKey: "sample", // Sending to queue 'sample' }, Default: amqp.Publishing{ ContentType: "text/plain", }, } param = rabbeasy.NotifiablePublisherParameter{ Config: cfg, Connection: conn, Receiver: eCh, } publisher, err = rabbeasy.NewNotifiablePublisher(param) if err != nil { return } publisher.Publish([]byte("New message to send")) }
Output:
type NotifiablePublisherConnection ¶
type NotifiablePublisherConnection interface {
StartNotifiablePublisher(PublisherClientNotifiable) error
}
NotifiablePublisherConnection defines publisher connection interface
type NotifiablePublisherParameter ¶
type NotifiablePublisherParameter struct { Config PublisherConfig Connection NotifiablePublisherConnection Receiver chan *amqp.Error }
NotifiablePublisherParameter holds creation parameters
type Publisher ¶
type Publisher struct {
// contains filtered or unexported fields
}
Publisher implements messaging publisher
func NewPublisher ¶
func NewPublisher(param PublisherParameter) (p *Publisher, err error)
NewPublisher creates messaging Publisher
func (*Publisher) Destination ¶
func (p *Publisher) Destination() (d PublisherConfig)
Destination returns destination config configuration
func (*Publisher) Publish ¶
Publish publishes message in queue or topic
Message are sent to destination defined at publisher creation
Example ¶
package main import ( "github.com/kimprado/rabbeasy/pkg/rabbeasy" "github.com/streadway/amqp" ) func main() { var ( publisher *rabbeasy.Publisher param rabbeasy.PublisherParameter conn rabbeasy.PublisherConnection cfg rabbeasy.PublisherConfig err error ) cfg = rabbeasy.PublisherConfig{ DestinationConfig: rabbeasy.DestinationConfig{ RoutingKey: "sample", // Sending to queue 'sample' }, Default: amqp.Publishing{ ContentType: "text/plain", }, } param = rabbeasy.PublisherParameter{ Config: cfg, Connection: conn, } publisher, err = rabbeasy.NewPublisher(param) if err != nil { return } publisher.Publish([]byte("New message to send")) }
Output:
type PublisherClient ¶
type PublisherClient interface { PublishIn(*amqp.Channel) Destination() PublisherConfig }
PublisherClient publishes messages on the channel
type PublisherClientNotifiable ¶
type PublisherClientNotifiable interface { PublisherClient ClientNotifiable }
PublisherClientNotifiable publishes messages on the channel
Also receives channel status
type PublisherConfig ¶
type PublisherConfig struct { DestinationConfig Default amqp.Publishing }
PublisherConfig holds message publishing configs
type PublisherConnection ¶
type PublisherConnection interface {
StartPublisher(PublisherClient) error
}
PublisherConnection defines publisher connection interface
type PublisherParameter ¶
type PublisherParameter struct { Config PublisherConfig Connection PublisherConnection }
PublisherParameter holds creation parameters