rabbeasy

package
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 5, 2019 License: MIT Imports: 5 Imported by: 0

Documentation

Overview

Package rabbeasy is an AMQP wrapper that makes your code more easy to test.

Achieves this goal by encourage the use of provided interfaces. While your own code base depends on interfaces its more easy to test with mock.

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Acker

type Acker interface {
	Ack()
}

Acker confirms message with ok

type ClientNotifiable

type ClientNotifiable interface {
	NotifyClose(e *amqp.Error)
}

ClientNotifiable notifies channel status

type CloserConnection

type CloserConnection interface {
	Close() error
}

CloserConnection defines closeable connection interface

type Connection

type Connection struct {
	ConsumerClients            []ConsumerClient
	PublisherClients           []PublisherClient
	ConsumerClientNotifiables  []ConsumerClientNotifiable
	PublisherClientNotifiables []PublisherClientNotifiable
	// contains filtered or unexported fields
}

Connection manages amqp connection

func NewConnection

func NewConnection(param ConnectionParameter) (c *Connection, err error)

NewConnection creates Connection and its underlying *amqp.Connection

Initialises underlying *amqp.Connection and reconnection handle process

func (*Connection) Close

func (c *Connection) Close() (err error)

Close closes amqp connection

func (*Connection) StartConsumer

func (c *Connection) StartConsumer(consumer ConsumerClient) (err error)

StartConsumer creates amqp.Channel and assigns to consumer parameter

func (*Connection) StartNotifiableConsumer

func (c *Connection) StartNotifiableConsumer(consumer ConsumerClientNotifiable) (err error)

StartNotifiableConsumer creates amqp.Channel and assigns to notifiableConsumer parameter

func (*Connection) StartNotifiablePublisher

func (c *Connection) StartNotifiablePublisher(publisher PublisherClientNotifiable) (err error)

StartNotifiablePublisher creates amqp.Channel and assigns to notifiablePublisher parameter

func (*Connection) StartPublisher

func (c *Connection) StartPublisher(publisher PublisherClient) (err error)

StartPublisher creates amqp.Channel and assigns to publisher parameter

type ConnectionConfig

type ConnectionConfig struct {
	Environment       string
	Domain            string
	Name              string
	Number            int
	Host              string
	Port              int
	User              string
	Password          string
	ReconnectInterval time.Duration
	GlobalConsumerConfig
}

ConnectionConfig holds connection settings

func (ConnectionConfig) GetConnectionName

func (c ConnectionConfig) GetConnectionName() (cn string)

GetConnectionName produces connection name

func (ConnectionConfig) GetProperties

func (c ConnectionConfig) GetProperties() (p map[string]interface{})

GetProperties produces properties table

func (ConnectionConfig) GetURL

func (c ConnectionConfig) GetURL() (u string)

GetURL produces url string

type ConnectionParameter

type ConnectionParameter struct {
	Config       ConnectionConfig
	Logger       Logger
	OnConnect    chan interface{}
	OnDisconnect chan *amqp.Error
}

ConnectionParameter holds creation parameters

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

Consumer implements messaging consumer

func NewConsumer

func NewConsumer(param ConsumerParameter) (c *Consumer, err error)

NewConsumer creates messaging consumer

func (*Consumer) Destination

func (c *Consumer) Destination() (d ConsumerConfig)

Destination returns destination config configuration

func (*Consumer) From

func (c *Consumer) From(messages <-chan amqp.Delivery)

From connects consumer to new message listener channel

type ConsumerClient

type ConsumerClient interface {
	From(<-chan amqp.Delivery)
	Destination() ConsumerConfig
}

ConsumerClient processes channel messages

type ConsumerClientNotifiable

type ConsumerClientNotifiable interface {
	ConsumerClient
	ClientNotifiable
}

ConsumerClientNotifiable processes channel messages

Also receives channel status

type ConsumerConfig

type ConsumerConfig struct {
	DestinationConfig
	PrefetchCount int
	PrefetchSize  int
}

ConsumerConfig holds consumer configs

type ConsumerConnection

type ConsumerConnection interface {
	StartConsumer(ConsumerClient) error
}

ConsumerConnection defines consumer connection interface

type ConsumerParameter

type ConsumerParameter struct {
	Config     ConsumerConfig
	Connection ConsumerConnection
	Handler    Handler
}

ConsumerParameter holds creation parameters

type DeadLetterSender

type DeadLetterSender interface {
	DeadLetter()
}

DeadLetterSender confirms message with sending to discard

type Delivery

type Delivery struct {
	// contains filtered or unexported fields
}

Delivery implements Message and wraps message delivered

func NewDelivery

func NewDelivery(d *amqp.Delivery) *Delivery

NewDelivery creates message based on an amqp.Delivery

func (*Delivery) Ack

func (d *Delivery) Ack()

Ack confirms message processing for server

func (*Delivery) Body

func (d *Delivery) Body() []byte

Body returns original Delivery body

func (*Delivery) DeadLetter

func (d *Delivery) DeadLetter()

DeadLetter removes or send to dead-letter

func (*Delivery) Delivery

func (d *Delivery) Delivery() *amqp.Delivery

Delivery returns underlying amqp.Delivery

func (*Delivery) Requeue

func (d *Delivery) Requeue()

Requeue sendes message go back to queue

type Deliveryer

type Deliveryer interface {
	Delivery() *amqp.Delivery
}

Deliveryer returns original received message

type DestinationConfig

type DestinationConfig struct {
	// ConsumerConfig
	// PublisherConfig
	Queue           string
	Exchange        string
	ExchangeType    string
	RoutingKey      string
	DlxExchange     string
	DlxExchangeType string
	DlxRoutingKey   string
	Durable         bool
}

DestinationConfig holds exchange destination configs

type GlobalConsumerConfig

type GlobalConsumerConfig struct {
	PrefetchCount int
	PrefetchSize  int
}

GlobalConsumerConfig holds connection global consumer configs

type Handler

type Handler func(d Message)

Handler called when a message is received

Although you can implement business rules directly in the Handler, it is advisable to delegate the treatment to another function or channel.

Example (Basics)
package main

import (
	"fmt"

	"github.com/kimprado/rabbeasy/pkg/rabbeasy"
)

func main() {
	var h rabbeasy.Handler // Consumer will invoke handler for each message

	var listenerCh = make(chan rabbeasy.MessageBody)
	h = func(m rabbeasy.Message) {
		listenerCh <- m
	}

	rabbeasy.NewConsumer(rabbeasy.ConsumerParameter{
		Handler: h, // Consumer created with handler h reference
	})

	message := <-listenerCh
	fmt.Println(string(message.Body()))
}
Output:

Example (Channel)
package main

import (
	"fmt"

	"github.com/kimprado/rabbeasy/pkg/rabbeasy"
	"github.com/streadway/amqp"
)

func main() {
	type MySimpleMessageType interface {
		rabbeasy.MessageBody // Declared only what is used ( Body() )
		rabbeasy.Acker       // Declared only what is used ( Ack()  )
		// rabbeasy.Requeuer         // Not declared for non-use
		// rabbeasy.DeadLetterSender // Not declared for non-use
		// rabbeasy.Deliveryer       // Not declared for non-use
	}

	var listenerCh = make(chan MySimpleMessageType)
	var handler = func(m rabbeasy.Message) {
		listenerCh <- m
	}

	go produceMockMassege(handler)

	message := <-listenerCh
	fmt.Println(string(message.Body()))
	message.Ack()

}

func produceMockMassege(h rabbeasy.Handler) {
	h(&mockMessage{})
}

type mockMessage struct{}

func (m *mockMessage) Body() []byte {
	return []byte("hello")
}
func (m *mockMessage) Ack()     {}
func (m *mockMessage) Requeue() {}
func (m *mockMessage) Delivery() *amqp.Delivery {
	return nil
}
func (m *mockMessage) DeadLetter() {}
Output:

hello

type Logger

type Logger interface {
	Errorf(msg string, v ...interface{})
	Warnf(msg string, v ...interface{})
	Infof(msg string, v ...interface{})
	Debugf(msg string, v ...interface{})
	Tracef(msg string, v ...interface{})
}

Logger specifies interface for loggers implementations

type Message

Message wraps message delivered

type MessageBody

type MessageBody interface {
	Body() []byte
}

MessageBody returns bytes of message

type MessageConsumer

type MessageConsumer interface {
	Consume(Handler)
}

MessageConsumer processes channel messages

type MessagePublisher

type MessagePublisher interface {
	Publish(body []byte) error
}

MessagePublisher publishes messages on the channel

type NotifiableConsumer

type NotifiableConsumer struct {
	Consumer
	// contains filtered or unexported fields
}

NotifiableConsumer implements messaging notifiable consumer

func NewNotifiableConsumer

func NewNotifiableConsumer(param NotifiableConsumerParameter) (c *NotifiableConsumer, err error)

NewNotifiableConsumer creates messaging notifiable consumer

func (*NotifiableConsumer) NotifyClose

func (c *NotifiableConsumer) NotifyClose(e *amqp.Error)

NotifyClose handles channel closure event

Forwards message in amqp.Error form to listener receiver

type NotifiableConsumerConnection

type NotifiableConsumerConnection interface {
	StartNotifiableConsumer(ConsumerClientNotifiable) error
}

NotifiableConsumerConnection defines consumer connection interface

type NotifiableConsumerParameter

type NotifiableConsumerParameter struct {
	// contains filtered or unexported fields
}

NotifiableConsumerParameter holds creation parameters

type NotifiablePublisher

type NotifiablePublisher struct {
	Publisher
	// contains filtered or unexported fields
}

NotifiablePublisher implements messaging notifiable publisher

func NewNotifiablePublisher

func NewNotifiablePublisher(param NotifiablePublisherParameter) (p *NotifiablePublisher, err error)

NewNotifiablePublisher creates messaging notifiable publisher

func (*NotifiablePublisher) NotifyClose

func (p *NotifiablePublisher) NotifyClose(e *amqp.Error)

NotifyClose handles channel closure event

Forwards message in amqp.Error form to listener receiver

func (*NotifiablePublisher) Publish

func (p *NotifiablePublisher) Publish(body []byte) (err error)

Publish publishes message in queue or topic

Message are sent to destination defined at publisher creation

Example
package main

import (
	"github.com/kimprado/rabbeasy/pkg/rabbeasy"
	"github.com/streadway/amqp"
)

func main() {
	var (
		publisher *rabbeasy.NotifiablePublisher
		param     rabbeasy.NotifiablePublisherParameter
		conn      rabbeasy.NotifiablePublisherConnection
		cfg       rabbeasy.PublisherConfig
		eCh       chan *amqp.Error
		err       error
	)

	cfg = rabbeasy.PublisherConfig{
		DestinationConfig: rabbeasy.DestinationConfig{
			RoutingKey: "sample", // Sending to queue 'sample'
		},
		Default: amqp.Publishing{
			ContentType: "text/plain",
		},
	}

	param = rabbeasy.NotifiablePublisherParameter{
		Config:     cfg,
		Connection: conn,
		Receiver:   eCh,
	}

	publisher, err = rabbeasy.NewNotifiablePublisher(param)
	if err != nil {
		return
	}

	publisher.Publish([]byte("New message to send"))
}
Output:

type NotifiablePublisherConnection

type NotifiablePublisherConnection interface {
	StartNotifiablePublisher(PublisherClientNotifiable) error
}

NotifiablePublisherConnection defines publisher connection interface

type NotifiablePublisherParameter

type NotifiablePublisherParameter struct {
	Config     PublisherConfig
	Connection NotifiablePublisherConnection
	Receiver   chan *amqp.Error
}

NotifiablePublisherParameter holds creation parameters

type Publisher

type Publisher struct {
	// contains filtered or unexported fields
}

Publisher implements messaging publisher

func NewPublisher

func NewPublisher(param PublisherParameter) (p *Publisher, err error)

NewPublisher creates messaging Publisher

func (*Publisher) Destination

func (p *Publisher) Destination() (d PublisherConfig)

Destination returns destination config configuration

func (*Publisher) Publish

func (p *Publisher) Publish(body []byte) (err error)

Publish publishes message in queue or topic

Message are sent to destination defined at publisher creation

Example
package main

import (
	"github.com/kimprado/rabbeasy/pkg/rabbeasy"
	"github.com/streadway/amqp"
)

func main() {
	var (
		publisher *rabbeasy.Publisher
		param     rabbeasy.PublisherParameter
		conn      rabbeasy.PublisherConnection
		cfg       rabbeasy.PublisherConfig
		err       error
	)

	cfg = rabbeasy.PublisherConfig{
		DestinationConfig: rabbeasy.DestinationConfig{
			RoutingKey: "sample", // Sending to queue 'sample'
		},
		Default: amqp.Publishing{
			ContentType: "text/plain",
		},
	}

	param = rabbeasy.PublisherParameter{
		Config:     cfg,
		Connection: conn,
	}

	publisher, err = rabbeasy.NewPublisher(param)
	if err != nil {
		return
	}

	publisher.Publish([]byte("New message to send"))
}
Output:

func (*Publisher) PublishIn

func (p *Publisher) PublishIn(amqpCh *amqp.Channel)

PublishIn connects publisher to amqp.Channel

type PublisherClient

type PublisherClient interface {
	PublishIn(*amqp.Channel)
	Destination() PublisherConfig
}

PublisherClient publishes messages on the channel

type PublisherClientNotifiable

type PublisherClientNotifiable interface {
	PublisherClient
	ClientNotifiable
}

PublisherClientNotifiable publishes messages on the channel

Also receives channel status

type PublisherConfig

type PublisherConfig struct {
	DestinationConfig
	Default amqp.Publishing
}

PublisherConfig holds message publishing configs

type PublisherConnection

type PublisherConnection interface {
	StartPublisher(PublisherClient) error
}

PublisherConnection defines publisher connection interface

type PublisherParameter

type PublisherParameter struct {
	Config     PublisherConfig
	Connection PublisherConnection
}

PublisherParameter holds creation parameters

type Requeuer

type Requeuer interface {
	Requeue()
}

Requeuer confirms message with requeue

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL