Documentation ¶
Overview ¶
Package goamqp provides an opiniated way of using [rabbitmq](https://www.rabbitmq.com/) for event-driven architectures.
Example ¶
package main import ( "fmt" "gitlab.com/sparetimecoders/goamqp" "reflect" "time" ) func main() { config := goamqp.AmqpConfig{ Host: "localhost", Port: 5672, Username: "admin", Password: "password", VHost: "", } publisher := goamqp.NewPublisher(goamqp.Route{Type: IncomingMessage{}, Key: "testkey"}) handler := &TestIncomingMessageHandler{} connection := goamqp.New("service", config) _ = connection.Start( goamqp.EventStreamListener("testkey", handler.Process, reflect.TypeOf(IncomingMessage{})), goamqp.EventStreamPublisher(publisher), ) _ = publisher.Publish(IncomingMessage{"FAILED"}) _ = publisher.Publish(IncomingMessage{"OK"}) _ = connection.Close() } type TestIncomingMessageHandler struct { ctx string } func (i TestIncomingMessageHandler) Process(m interface{}, headers goamqp.Headers) (interface{}, error) { fmt.Printf("Called process with %v and ctx %v\n", m, i.ctx) return nil, nil } type IncomingMessage struct { Url string } func (IncomingMessage) TTL() time.Duration { return time.Minute }
Output:
Index ¶
- Variables
- func SendingService(headers Headers) (string, error)
- type AmqpChannel
- type AmqpConfig
- type Connection
- type HandlerFunc
- type Header
- type Headers
- type Logger
- type MessageLogger
- type Publisher
- type QueueBindingConfig
- type QueueBindingConfigSetup
- type Route
- type ServiceResponsePublisher
- type Setup
- func CloseListener(e chan error) Setup
- func EventStreamListener(routingKey string, handler HandlerFunc, eventType interface{}, ...) Setup
- func EventStreamPublisher(publisher *Publisher) Setup
- func PublishNotify(confirm chan amqp.Confirmation) Setup
- func QueuePublisher(publisher *Publisher, destinationQueueName string) Setup
- func RequestResponseHandler(routingKey string, handler HandlerFunc, eventType interface{}) Setup
- func ServicePublisher(targetService string, publisher *Publisher) Setup
- func ServiceRequestListener(routingKey string, handler HandlerFunc, eventType interface{}) Setup
- func ServiceResponseListener(targetService, routingKey string, handler HandlerFunc, eventType interface{}) Setup
- func TransientEventStreamListener(routingKey string, handler HandlerFunc, eventType interface{}) Setup
- func UseLogger(logger Logger) Setup
- func UseMessageLogger(logger MessageLogger) Setup
- func WithPrefetchLimit(limit int) Setup
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var ( // ErrEmptySuffix returned when an empty suffix is passed ErrEmptySuffix = fmt.Errorf("empty queue suffix not allowed") )
Functions ¶
func SendingService ¶ added in v0.3.0
SendingService returns the name of the service that produced the message Can be used to send a handlerResponse, see PublishServiceResponse
Types ¶
type AmqpChannel ¶ added in v0.1.1
type AmqpChannel interface { QueueBind(queue, key, exchange string, noWait bool, args amqp.Table) error Consume(queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args amqp.Table) (<-chan amqp.Delivery, error) ExchangeDeclare(name, kind string, durable, autoDelete, internal, noWait bool, args amqp.Table) error Publish(exchange, key string, mandatory, immediate bool, msg amqp.Publishing) error QueueDeclare(name string, durable, autoDelete, exclusive, noWait bool, args amqp.Table) (amqp.Queue, error) NotifyPublish(confirm chan amqp.Confirmation) chan amqp.Confirmation NotifyClose(c chan *amqp.Error) chan *amqp.Error Confirm(noWait bool) error Qos(prefetchCount, prefetchSize int, global bool) error }
AmqpChannel wraps the amqp.Channel to allow for mocking
type AmqpConfig ¶
type AmqpConfig struct { Username string `env:"RABBITMQ_USERNAME,required"` Password string `env:"RABBITMQ_PASSWORD,required"` Host string `env:"RABBITMQ_HOST,required"` Port int `env:"RABBITMQ_PORT" envDefault:"5672"` VHost string `env:"RABBITMQ_VHOST" envDefault:""` }
AmqpConfig contains the necessary variables for connecting to RabbitMQ.
func ParseAmqpURL ¶
func ParseAmqpURL(amqpURL string) (AmqpConfig, error)
ParseAmqpURL tries to parse the passed string and create a valid AmqpConfig object
func (AmqpConfig) AmqpURL ¶
func (c AmqpConfig) AmqpURL() string
AmqpURL returns a valid connection url
func (AmqpConfig) String ¶
func (c AmqpConfig) String() string
type Connection ¶
type Connection struct {
// contains filtered or unexported fields
}
Connection is a wrapper around the actual amqp.Connection and amqp.Channel
func New ¶
func New(serviceName string, config AmqpConfig) *Connection
New creates a new Connection from config
func NewFromURL ¶
func NewFromURL(serviceName string, amqpURL string) (*Connection, error)
NewFromURL creates a new Connection from an URL
func (*Connection) Close ¶ added in v0.1.1
func (c *Connection) Close() error
Close closes the amqp connection, see amqp.Connection.Close
func (*Connection) PublishServiceResponse ¶ added in v0.3.0
func (c *Connection) PublishServiceResponse(targetService, routingKey string, msg interface{}) error
PublishServiceResponse sends a message to targetService as a handlerResp TODO Document how messages flow, reference docs.md?
func (*Connection) Start ¶
func (c *Connection) Start(opts ...Setup) error
Start setups the amqp queues and exchanges defined by opts
type HandlerFunc ¶ added in v0.3.0
HandlerFunc is used to process an incoming message If processing fails, an error should be returned The optional handlerResp is used automatically when setting up a RequestResponseHandler, otherwise ignored
func ResponseWrapper ¶ added in v0.3.0
func ResponseWrapper(handler HandlerFunc, routingKey string, publisher ServiceResponsePublisher) HandlerFunc
ResponseWrapper is...TODO make this internal?
type Header ¶ added in v0.3.0
type Header struct { Key string Value interface{} }
Header represent meta-data for the message This is backed by an amqp.Table so the same restrictions regarding the type allowed for Value applies
type Headers ¶ added in v0.3.0
type Headers map[string]interface{}
Headers represent all meta-data for the message
type Logger ¶ added in v0.0.9
type Logger interface { Debug(string) Info(string) Warn(string) Error(string) Fatal(string) Debugf(string, ...interface{}) Infof(string, ...interface{}) Warnf(string, ...interface{}) Errorf(string, ...interface{}) Fatalf(string, ...interface{}) }
Logger represents the logging API Maps to Apex log interface for convenience https://github.com/apex/log/blob/master/interface.go
type MessageLogger ¶ added in v0.1.4
type MessageLogger func(jsonContent []byte, eventType reflect.Type, routingKey string, outgoing bool)
MessageLogger is a func that can be used to log in/outgoing messages for debugging purposes
func NoOpMessageLogger ¶ added in v0.3.0
func NoOpMessageLogger() MessageLogger
NoOpMessageLogger is a MessageLogger that will do nothing This is the default implementation if the setup func UseMessageLogger is not used
func StdOutMessageLogger ¶ added in v0.1.4
func StdOutMessageLogger() MessageLogger
StdOutMessageLogger is an example implementation of a MessageLogger that dumps messages with fmt.Printf
type Publisher ¶ added in v0.2.0
type Publisher struct {
// contains filtered or unexported fields
}
Publisher is used to send messages
func NewPublisher ¶ added in v0.2.0
NewPublisher returns a publisher that can be used to send messages
type QueueBindingConfig ¶ added in v0.4.0
type QueueBindingConfig struct {
// contains filtered or unexported fields
}
QueueBindingConfig is a wrapper around the actual amqp queue configuration
type QueueBindingConfigSetup ¶ added in v0.4.0
type QueueBindingConfigSetup func(config *QueueBindingConfig) error
QueueBindingConfigSetup is a setup function that takes a QueueBindingConfig and provide custom changes to the configuration
func AddQueueNameSuffix ¶ added in v0.4.0
func AddQueueNameSuffix(suffix string) QueueBindingConfigSetup
AddQueueNameSuffix appends the provided suffix to the queue name Useful when multiple listeners are needed for a routing key in the same service
type Route ¶ added in v0.2.1
type Route struct { Type interface{} Key string }
Route defines the routing key to be used for a message type
type ServiceResponsePublisher ¶ added in v0.3.0
ServiceResponsePublisher represents the function that is called to publish a response
type Setup ¶ added in v0.1.0
type Setup func(conn *Connection) error
Setup is a setup function that takes a Connection and use it to setup AMQP An example is to create exchanges and queues
func CloseListener ¶ added in v0.1.0
CloseListener receives a callback when the AMQP Channel gets closed
func EventStreamListener ¶ added in v0.1.0
func EventStreamListener(routingKey string, handler HandlerFunc, eventType interface{}, opts ...QueueBindingConfigSetup) Setup
EventStreamListener sets up ap a durable, persistent event stream listener TODO Document how messages flow, reference docs.md?
func EventStreamPublisher ¶ added in v0.1.0
EventStreamPublisher sets up ap a event stream publisher TODO Document how messages flow, reference docs.md?
func PublishNotify ¶ added in v0.1.0
func PublishNotify(confirm chan amqp.Confirmation) Setup
PublishNotify see amqp.Channel.Confirm
func QueuePublisher ¶ added in v0.5.0
QueuePublisher sets up a publisher that will send events to a specific queue instead of using the exchange, so called Sender-Selected distribution https://www.rabbitmq.com/sender-selected.html#:~:text=The%20RabbitMQ%20broker%20treats%20the,key%20if%20they%20are%20present.
func RequestResponseHandler ¶ added in v0.1.0
func RequestResponseHandler(routingKey string, handler HandlerFunc, eventType interface{}) Setup
RequestResponseHandler is a convenience func to setup ServiceRequestListener and combines it with PublishServiceResponse TODO Document how messages flow, reference docs.md?
func ServicePublisher ¶ added in v0.1.0
ServicePublisher sets up ap a publisher, that sends messages to the targetService TODO Document how messages flow, reference docs.md?
func ServiceRequestListener ¶ added in v0.3.0
func ServiceRequestListener(routingKey string, handler HandlerFunc, eventType interface{}) Setup
ServiceRequestListener is a specialization of EventStreamListener It sets up ap a durable, persistent listener (exchange->queue) for message to the service owning the Connection TODO Document how messages flow, reference docs.md?
func ServiceResponseListener ¶ added in v0.3.0
func ServiceResponseListener(targetService, routingKey string, handler HandlerFunc, eventType interface{}) Setup
ServiceResponseListener is a specialization of EventStreamListener It sets up ap a durable, persistent listener (exchange->queue) for responses from targetService TODO Document how messages flow, reference docs.md?
func TransientEventStreamListener ¶ added in v0.1.0
func TransientEventStreamListener(routingKey string, handler HandlerFunc, eventType interface{}) Setup
TransientEventStreamListener sets up ap a event stream listener that will get removed when the connection is closed TODO Document how messages flow, reference docs.md?
func UseLogger ¶ added in v0.3.0
UseLogger allows a Logger to be used to log errors during processing of messages
func UseMessageLogger ¶ added in v0.1.4
func UseMessageLogger(logger MessageLogger) Setup
UseMessageLogger allows a MessageLogger to be used when log in/outgoing messages
func WithPrefetchLimit ¶ added in v0.2.2
WithPrefetchLimit configures the number of messages to prefetch from the server.