goamqp

package module
v0.5.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 25, 2022 License: MIT Imports: 15 Imported by: 1

README

go_amqp

GoReportCard GoDoc Build Statuscoverage report

package goamqp provides an opiniated way of using rabbitmq for event-driven architectures.

Getting Started

Supports Go 1.11+ and uses streadway-amqp to connect to RabbitMQ.

Using Go Modules

Starting with Go 1.13, you can use Go Modules to install goamqp.

Import the goamqp package from GitHub in your code:

import "gitlab.com/sparetimecoders/goamqp"

Build your project:

go build ./...

A dependency to the latest stable version of goamqp should be automatically added to your go.mod file.

Install the client

If Go modules can't be used:

go get gitlab.com/sparetimecoders/go_amqp

Usage

See the 'examples' subdirectory.

Contributing

TODO

References

License

MIT - see LICENSE for more details.

Developing

TODO

Tests

go test ./...

Integration testing

Requires a running rabbitmq, for example:

docker run --name rabbit -p 15672:15672 -p 5672:5672 sparetimecoders/rabbitmq

Run the tests:

go test ./... -tags=integration

Documentation

Overview

Package goamqp provides an opiniated way of using [rabbitmq](https://www.rabbitmq.com/) for event-driven architectures.

Example
package main

import (
	"fmt"
	"gitlab.com/sparetimecoders/goamqp"
	"reflect"
	"time"
)

func main() {

	config := goamqp.AmqpConfig{
		Host:     "localhost",
		Port:     5672,
		Username: "admin",
		Password: "password",
		VHost:    "",
	}
	publisher := goamqp.NewPublisher(goamqp.Route{Type: IncomingMessage{}, Key: "testkey"})

	handler := &TestIncomingMessageHandler{}
	connection := goamqp.New("service", config)
	_ = connection.Start(
		goamqp.EventStreamListener("testkey", handler.Process, reflect.TypeOf(IncomingMessage{})),
		goamqp.EventStreamPublisher(publisher),
	)

	_ = publisher.Publish(IncomingMessage{"FAILED"})
	_ = publisher.Publish(IncomingMessage{"OK"})
	_ = connection.Close()
}

type TestIncomingMessageHandler struct {
	ctx string
}

func (i TestIncomingMessageHandler) Process(m interface{}, headers goamqp.Headers) (interface{}, error) {
	fmt.Printf("Called process with %v and ctx %v\n", m, i.ctx)
	return nil, nil
}

type IncomingMessage struct {
	Url string
}

func (IncomingMessage) TTL() time.Duration {
	return time.Minute
}
Output:

Index

Examples

Constants

This section is empty.

Variables

View Source
var (
	// ErrEmptySuffix returned when an empty suffix is passed
	ErrEmptySuffix = fmt.Errorf("empty queue suffix not allowed")
)

Functions

func SendingService added in v0.3.0

func SendingService(headers Headers) (string, error)

SendingService returns the name of the service that produced the message Can be used to send a handlerResponse, see PublishServiceResponse

Types

type AmqpChannel added in v0.1.1

type AmqpChannel interface {
	QueueBind(queue, key, exchange string, noWait bool, args amqp.Table) error
	Consume(queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args amqp.Table) (<-chan amqp.Delivery, error)
	ExchangeDeclare(name, kind string, durable, autoDelete, internal, noWait bool, args amqp.Table) error
	Publish(exchange, key string, mandatory, immediate bool, msg amqp.Publishing) error
	QueueDeclare(name string, durable, autoDelete, exclusive, noWait bool, args amqp.Table) (amqp.Queue, error)
	NotifyPublish(confirm chan amqp.Confirmation) chan amqp.Confirmation
	NotifyClose(c chan *amqp.Error) chan *amqp.Error
	Confirm(noWait bool) error
	Qos(prefetchCount, prefetchSize int, global bool) error
}

AmqpChannel wraps the amqp.Channel to allow for mocking

type AmqpConfig

type AmqpConfig struct {
	Username string `env:"RABBITMQ_USERNAME,required"`
	Password string `env:"RABBITMQ_PASSWORD,required"`
	Host     string `env:"RABBITMQ_HOST,required"`
	Port     int    `env:"RABBITMQ_PORT" envDefault:"5672"`
	VHost    string `env:"RABBITMQ_VHOST" envDefault:""`
}

AmqpConfig contains the necessary variables for connecting to RabbitMQ.

func ParseAmqpURL

func ParseAmqpURL(amqpURL string) (AmqpConfig, error)

ParseAmqpURL tries to parse the passed string and create a valid AmqpConfig object

func (AmqpConfig) AmqpURL

func (c AmqpConfig) AmqpURL() string

AmqpURL returns a valid connection url

func (AmqpConfig) String

func (c AmqpConfig) String() string

type Connection

type Connection struct {
	// contains filtered or unexported fields
}

Connection is a wrapper around the actual amqp.Connection and amqp.Channel

func New

func New(serviceName string, config AmqpConfig) *Connection

New creates a new Connection from config

func NewFromURL

func NewFromURL(serviceName string, amqpURL string) (*Connection, error)

NewFromURL creates a new Connection from an URL

func (*Connection) Close added in v0.1.1

func (c *Connection) Close() error

Close closes the amqp connection, see amqp.Connection.Close

func (*Connection) PublishServiceResponse added in v0.3.0

func (c *Connection) PublishServiceResponse(targetService, routingKey string, msg interface{}) error

PublishServiceResponse sends a message to targetService as a handlerResp TODO Document how messages flow, reference docs.md?

func (*Connection) Start

func (c *Connection) Start(opts ...Setup) error

Start setups the amqp queues and exchanges defined by opts

type HandlerFunc added in v0.3.0

type HandlerFunc func(msg interface{}, headers Headers) (response interface{}, err error)

HandlerFunc is used to process an incoming message If processing fails, an error should be returned The optional handlerResp is used automatically when setting up a RequestResponseHandler, otherwise ignored

func ResponseWrapper added in v0.3.0

func ResponseWrapper(handler HandlerFunc, routingKey string, publisher ServiceResponsePublisher) HandlerFunc

ResponseWrapper is...TODO make this internal?

type Header struct {
	Key   string
	Value interface{}
}

Header represent meta-data for the message This is backed by an amqp.Table so the same restrictions regarding the type allowed for Value applies

type Headers added in v0.3.0

type Headers map[string]interface{}

Headers represent all meta-data for the message

func (Headers) Get added in v0.3.0

func (h Headers) Get(key string) interface{}

Get returns the header value for key, or nil if not present

type Logger added in v0.0.9

type Logger interface {
	Debug(string)
	Info(string)
	Warn(string)
	Error(string)
	Fatal(string)
	Debugf(string, ...interface{})
	Infof(string, ...interface{})
	Warnf(string, ...interface{})
	Errorf(string, ...interface{})
	Fatalf(string, ...interface{})
}

Logger represents the logging API Maps to Apex log interface for convenience https://github.com/apex/log/blob/master/interface.go

type MessageLogger added in v0.1.4

type MessageLogger func(jsonContent []byte, eventType reflect.Type, routingKey string, outgoing bool)

MessageLogger is a func that can be used to log in/outgoing messages for debugging purposes

func NoOpMessageLogger added in v0.3.0

func NoOpMessageLogger() MessageLogger

NoOpMessageLogger is a MessageLogger that will do nothing This is the default implementation if the setup func UseMessageLogger is not used

func StdOutMessageLogger added in v0.1.4

func StdOutMessageLogger() MessageLogger

StdOutMessageLogger is an example implementation of a MessageLogger that dumps messages with fmt.Printf

type Publisher added in v0.2.0

type Publisher struct {
	// contains filtered or unexported fields
}

Publisher is used to send messages

func NewPublisher added in v0.2.0

func NewPublisher(routes ...Route) *Publisher

NewPublisher returns a publisher that can be used to send messages

func (*Publisher) Publish added in v0.2.0

func (p *Publisher) Publish(msg interface{}, headers ...Header) error

Publish publishes a message to a given exchange TODO Document how messages flow, reference docs.md?

type QueueBindingConfig added in v0.4.0

type QueueBindingConfig struct {
	// contains filtered or unexported fields
}

QueueBindingConfig is a wrapper around the actual amqp queue configuration

type QueueBindingConfigSetup added in v0.4.0

type QueueBindingConfigSetup func(config *QueueBindingConfig) error

QueueBindingConfigSetup is a setup function that takes a QueueBindingConfig and provide custom changes to the configuration

func AddQueueNameSuffix added in v0.4.0

func AddQueueNameSuffix(suffix string) QueueBindingConfigSetup

AddQueueNameSuffix appends the provided suffix to the queue name Useful when multiple listeners are needed for a routing key in the same service

type Route added in v0.2.1

type Route struct {
	Type interface{}
	Key  string
}

Route defines the routing key to be used for a message type

type ServiceResponsePublisher added in v0.3.0

type ServiceResponsePublisher func(targetService, routingKey string, msg interface{}) error

ServiceResponsePublisher represents the function that is called to publish a response

type Setup added in v0.1.0

type Setup func(conn *Connection) error

Setup is a setup function that takes a Connection and use it to setup AMQP An example is to create exchanges and queues

func CloseListener added in v0.1.0

func CloseListener(e chan error) Setup

CloseListener receives a callback when the AMQP Channel gets closed

func EventStreamListener added in v0.1.0

func EventStreamListener(routingKey string, handler HandlerFunc, eventType interface{}, opts ...QueueBindingConfigSetup) Setup

EventStreamListener sets up ap a durable, persistent event stream listener TODO Document how messages flow, reference docs.md?

func EventStreamPublisher added in v0.1.0

func EventStreamPublisher(publisher *Publisher) Setup

EventStreamPublisher sets up ap a event stream publisher TODO Document how messages flow, reference docs.md?

func PublishNotify added in v0.1.0

func PublishNotify(confirm chan amqp.Confirmation) Setup

PublishNotify see amqp.Channel.Confirm

func QueuePublisher added in v0.5.0

func QueuePublisher(publisher *Publisher, destinationQueueName string) Setup

QueuePublisher sets up a publisher that will send events to a specific queue instead of using the exchange, so called Sender-Selected distribution https://www.rabbitmq.com/sender-selected.html#:~:text=The%20RabbitMQ%20broker%20treats%20the,key%20if%20they%20are%20present.

func RequestResponseHandler added in v0.1.0

func RequestResponseHandler(routingKey string, handler HandlerFunc, eventType interface{}) Setup

RequestResponseHandler is a convenience func to setup ServiceRequestListener and combines it with PublishServiceResponse TODO Document how messages flow, reference docs.md?

func ServicePublisher added in v0.1.0

func ServicePublisher(targetService string, publisher *Publisher) Setup

ServicePublisher sets up ap a publisher, that sends messages to the targetService TODO Document how messages flow, reference docs.md?

func ServiceRequestListener added in v0.3.0

func ServiceRequestListener(routingKey string, handler HandlerFunc, eventType interface{}) Setup

ServiceRequestListener is a specialization of EventStreamListener It sets up ap a durable, persistent listener (exchange->queue) for message to the service owning the Connection TODO Document how messages flow, reference docs.md?

func ServiceResponseListener added in v0.3.0

func ServiceResponseListener(targetService, routingKey string, handler HandlerFunc, eventType interface{}) Setup

ServiceResponseListener is a specialization of EventStreamListener It sets up ap a durable, persistent listener (exchange->queue) for responses from targetService TODO Document how messages flow, reference docs.md?

func TransientEventStreamListener added in v0.1.0

func TransientEventStreamListener(routingKey string, handler HandlerFunc, eventType interface{}) Setup

TransientEventStreamListener sets up ap a event stream listener that will get removed when the connection is closed TODO Document how messages flow, reference docs.md?

func UseLogger added in v0.3.0

func UseLogger(logger Logger) Setup

UseLogger allows a Logger to be used to log errors during processing of messages

func UseMessageLogger added in v0.1.4

func UseMessageLogger(logger MessageLogger) Setup

UseMessageLogger allows a MessageLogger to be used when log in/outgoing messages

func WithPrefetchLimit added in v0.2.2

func WithPrefetchLimit(limit int) Setup

WithPrefetchLimit configures the number of messages to prefetch from the server.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL