pamqp

package module
v0.4.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 7, 2021 License: MIT Imports: 8 Imported by: 0

README

pamqp

Build Status codecov Go Report Card

pamqp offers an opinionated AMQP pub/sub messaging framework using the excellent amqp package and Google Protobuf. It is fundamentally a simplified version of pram for AMQP.

Publisher

Publisher publishes messages to the appropriate exchange. The target exchange is resolved using Options.ExchangeNameFn and assumes the existence of an exchange with the name format package-message by default. A Registry instance can be supplied to resolve/create infrastructure by convention.

Messages are published using the Publish function, which takes an optional function to update the outgoing metadata.

p.Publish(ctx, new(testpb.Message), func(md *pamqp.Metadata) {
    md.CorrelationID = pamqp.NewID()
})

Each publisher opens a single AMQP channel that should be closed using the Close function.

Consumer

Consumer receives messages published to the appropriate queue. The target queue is resolved using Options.QueueNameFn and assumes the existence of a queue with the name format package-message by default. A Registry instance can be supplied to resolve/create infrastructure by convention.

Messages are handled by calling Consume. This operation is blocking until the supplied context is cancelled. A dedicated channel is opened for each call and is closed once the context has been cancelled. This allows a single consumer to handle multiple message types using goroutines.

Each call to Consume expects a dedicated Handler implementation for the particular message type.

type handler struct{}

func (h *handler) Message() proto.Message {
	return new(testpb.Message)
}

func (h *handler) Handle(ctx context.Context, m proto.Message, md pamqp.Metadata) error {
	tm := m.(*testpb.Message)
	// handle message
	return nil
}

Calls to Consume will only return an error if infrastructure resolution fails. Handler errors will not result in an error being returned. Instead, Options.ErrorFn can be configured to log handler errors. By default handler errors are logged using the standard logger.

Registry

Registry is responsible for the craetion of AMQP infrastructure by convention. By default the registry will create a fanout exchange and associated queue for each message type. This effectively results in each consumer acting as a competing consumer.

To support more typical routing patterns, the registry should be configured to generate a separate queue for each consuming service. This convention can be applied with pamqp.WithConsumerNaming.

Example

Service 'a' publishes a message to the package.message exchange. All instances of service 'a' will publish to the same exchange.

r, _ := pamqp.NewRegistry(conn, pamqp.WithConsumerNaming("a"))
p, _ := pamqp.NewPublisher(conn, pamqp.WithRegistry(r))

p.Publish(ctx, new(package.Message))

Service 'b' consumes published messages by creating a service-specific queue named b.package.message and binding it to the exchange. All instances of service 'b' will act as competing consumers.

r, _ := pamqp.NewRegistry(conn, pamqp.WithConsumerNaming("b"))
c := pamqp.NewConsumer(conn, pamqp.WithRegistry(r))

c.Consume(ctx, new(handler))

Service 'c' consumes messages from the same exchange by creating a second service-specific queue named c.package.message. This is bound to the same exchange, resulting in fanout behaviour, but with all instances of service 'c' acting as competing consumers.

r, _ := pamqp.NewRegistry(conn, pamqp.WithConsumerNaming("c"))
c := pamqp.NewConsumer(conn, pamqp.WithRegistry(r))

c.Consume(ctx, new(handler))

Middleware

Both Publisher and Consumer allow middleware to be specified using Options.MiddlewareFn.

func publishLogging(n pamqp.HandlerFunc) pamqp.HandlerFunc {
	return func(ctx context.Context, m proto.Message, md pamqp.Metadata) error {
		if err := n(ctx, m, md); err != nil {
			return err
		}

		log.Printf("message published: %s", md.ID)
		return nil
	}
}
p, _ := pamqp.NewPublisher(conn, pamqp.WithRegistry(r), pamqp.WithMiddleware(publishLogging))

While the MiddlewareFunc signature is identical for both publishers and consumers, it is rare that a single middleware function would be valid for both scenarios. Care should also be take to not swallow errors in consumer middleware functions as this will result in the message being acked.

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// UTCNow returns the current utc time
	UTCNow = func() time.Time {
		return time.Now().UTC()
	}

	// NewID returns a unique id
	NewID = func() string {
		return uuid.NewString()
	}
)

Functions

func MessageName

func MessageName(m proto.Message) string

MessageName returns the message name

func WithConsumerNaming added in v0.2.0

func WithConsumerNaming(consumer string, prefixes ...string) func(*RegistryOptions)

WithConsumerNaming applies a consumer prefix naming convention to queue names This ensures that each separate consumer uses a dedicated queue bound to the publisher exchange.

func WithMiddleware

func WithMiddleware(mw ...MiddlewareFunc) func(*Options)

WithMiddleware configures the publisher/consumer to use the specified middleware

func WithRegistry

func WithRegistry(r *Registry) func(*Options)

WithRegistry configures the publisher/consumer to use the specified registry

Types

type Channel

type Channel interface {
	ExchangeDeclare(name string, kind string, durable bool, autoDelete bool, internal bool, noWait bool, args amqp.Table) error
	QueueDeclare(name string, durable bool, autoDelete bool, exclusive bool, noWait bool, args amqp.Table) (amqp.Queue, error)
	QueueBind(name string, key string, exchange string, noWait bool, args amqp.Table) error
	Publish(exchange string, key string, mandatory bool, immediate bool, msg amqp.Publishing) error
	Consume(queue string, consumer string, autoAck bool, exclusive bool, noLocal bool, noWait bool, args amqp.Table) (<-chan amqp.Delivery, error)
	Close() error
}

Channel represents an amqp channel interface

type Consumer added in v0.2.0

type Consumer struct {
	// contains filtered or unexported fields
}

Consumer represents a consumer

func NewConsumer added in v0.2.0

func NewConsumer(c *amqp.Connection, optFns ...func(*Options)) *Consumer

NewConsumer returns a new consumer

func (*Consumer) Consume added in v0.2.0

func (c *Consumer) Consume(ctx context.Context, h Handler) error

Consume consumes messages for the specified handler The call is blocking until the supplied context is cancelled

type Handler

type Handler interface {
	Message() proto.Message
	Handle(context.Context, proto.Message, Metadata) error
}

Handler represents a message handler

type HandlerFunc

type HandlerFunc func(context.Context, proto.Message, Metadata) error

HandlerFunc represents a handler func

type Metadata

type Metadata struct {
	ID            string
	Type          string
	ContentType   string
	CorrelationID string
	Timestamp     time.Time
	Headers       map[string]interface{}
}

Metadata represents message metadata

type MiddlewareFunc

type MiddlewareFunc func(HandlerFunc) HandlerFunc

MiddlewareFunc represents a middleware func

func ChainMiddleware

func ChainMiddleware(m ...MiddlewareFunc) MiddlewareFunc

ChainMiddleware returns a middleware func that wraps the specified funcs

type Options

type Options struct {
	ChannelFn      func(*amqp.Connection) (Channel, error)
	ExchangeNameFn func(proto.Message) (string, error)
	QueueNameFn    func(proto.Message) (string, error)
	RoutingKeyFn   func(proto.Message) (string, error)
	MiddlewareFn   MiddlewareFunc
	ErrorFn        func(Metadata, error)
}

Options represents a set of options

type Publisher

type Publisher struct {
	// contains filtered or unexported fields
}

Publisher represents a message publisher

func NewPublisher

func NewPublisher(c *amqp.Connection, optFns ...func(*Options)) (*Publisher, error)

NewPublisher returns a new publisher

func (*Publisher) Close

func (p *Publisher) Close() error

Close closes the underlying channel

func (*Publisher) Publish

func (p *Publisher) Publish(ctx context.Context, m proto.Message, mdFns ...func(*Metadata)) error

Publish publishes the specified message

type Registry

type Registry struct {
	// contains filtered or unexported fields
}

Registry represents an exchange/queue registry

func NewRegistry

func NewRegistry(conn *amqp.Connection, optFns ...func(*RegistryOptions)) (*Registry, error)

NewRegistry returns a new registry

func (*Registry) Close

func (r *Registry) Close() error

Close closes the underlying channel

func (*Registry) Exchange

func (r *Registry) Exchange(m proto.Message) (string, error)

Exchange ensures that the specified exchange exists and returns the name

func (*Registry) Queue

func (r *Registry) Queue(m proto.Message) (string, error)

Queue ensures that the specified queue exists and returns the name

type RegistryOptions

type RegistryOptions struct {
	ChannelFn                func(*amqp.Connection) (Channel, error)
	ExchangeNameFn           func(proto.Message) string
	QueueNameFn              func(proto.Message) string
	DeadLetterExchangeNameFn func(proto.Message) string
	DeadLetterQueueNameFn    func(proto.Message) string
	DeadLetterKeyFn          func(proto.Message) string
}

RegistryOptions represents a set of registry options

Directories

Path Synopsis
example
internal
mocks
Package mocks is a generated GoMock package.
Package mocks is a generated GoMock package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL