pram

package module
v0.2.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 28, 2021 License: MIT Imports: 19 Imported by: 0

README

pram

Build Status codecov Go Report Card

pram is a lightweight messaging framework using AWS SNS/SQS and Google Protobuf with convention based infrastructure creation.

Publisher

Publisher publishes messages to the appropriate topic. The topic ARN is resolved using the PublisherOptions.TopicARNFn function. A Registry instance can be used to resolve/create infrastucture by convention.

Published messages are wrapped with the prampb.Message type, then encoded to a base64 representation of the marshalled byte slice prior to being sent to SNS.

r := pram.NewRegistry(snsClient, sqsClient, pram.WithPrefixNaming("dev", "service"))
p := pram.NewPublisher(snsClient, pram.WithTopicRegistry(r))

err := p.Publish(context.Background(), &testpb.Message{Value: "value"})
if err != nil {
    log.Fatalln(err)
}
Metadata

Message metadata can be modified at the point of publish, for example to add a correlation ID.

err := p.Publish(context.Background(), m, pram.WithCorrelationID(correlationID))

Subscriber

Subscriber receives messages published to the appropriate queue. The queue URL is resolved using the SubscriberOptions.QueueURLFn function. A Registry instance can be used to resolve/create infrastructure by convention.

Handler

Each message subscription requires an implementation of pram.Handler to generate empty messages of the appropriate type and handle received messages. A one-to-one mapping between message types and handlers is assumed, with the message instance from Message guaranteed to be the input to Handle.

type handler struct {}

func (h *handler) Message() proto.Message {
    return new(testpb.Message)
}

func (h *handler) Handle(ctx context.Context, m proto.Message, md pram.Metadata) error {
	tm := m.(*testpb.Message)
	// handle the message
	return nil
}
Subscribe

A message subscription can be created using Subscribe. Each received message will spawn a new goroutine to execute the supplied handler.

By default message receive and handling errors are discarded. This behaviour can be changed using pram.WithErrorHandler.

r := pram.NewRegistry(snsClient, sqsClient, pram.WithPrefixNaming("dev", "service"))
s := pram.NewSubscriber(sqsClient, pram.WithQueueRegistry(r))

// Subscribe will block until the supplied context is cancelled
err := s.Subscribe(context.Background(), new(handler))

While each call to Subscribe is blocking, a single subscriber can handle multiple message types by using goroutines.

r := pram.NewRegistry(snsClient, sqsClient, pram.WithPrefixNaming("dev", "service"))
s := pram.NewSubscriber(sqsClient, pram.WithQueueRegistry(r))

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

wg := new(sync.WaitGroup)
for _, h := range []pram.Handler{new(handlerA), new(handlerB)} {
    wg.Add(1)

    go func(h pram.Handler) {
        defer wg.Done()

        err := s.Subscribe(ctx, h)
        if err != nil {
            log.Println(err)
        }
    }(h)
}

wg.Wait()

Registry

Registry is responsible for creating SNS/SQS infrastructure by convention. The adopted naming convention defines how messages will be routed.

By default each published/subscribed message will result in a single SNS topic and associated SQS queue. This effectively results in competing consumers for all message types, which will likely be inappropriate for all but the simplest implementations.

To support typical routing patterns, the registry should be configured to generate queues for each subscribing service. This convention can be applied using pram.WithPrefixNaming.

Example

Service 'a' publishes a message to the dev-package-Message SNS topic. All instances of service 'a' will publish to the same topic.

r := pram.NewRegistry(snsc, sqsc, pram.WithPrefixNaming("dev", "a"))
p := pram.NewPublisher(snsc, pram.WithTopicRegistry(r))

p.Publish(ctx, new(package.Message))

Service 'b' subscribes to the dev-package-Message topic using the dev-b-package-Message SQS queue. All instances of service 'b' will act as competing consumers.

r := pram.NewRegistry(snsc, sqsc, pram.WithPrefixNaming("dev", "b"))
s := pram.NewSubscriber(snsc, sqsc, pram.WithQueueRegistry(r))

s.Subscribte(ctx, new(handler))

Service 'c' subscribes to the same topic, but uses the dev-c-package-Message queue. All instances of service 'c' will act as competing consumers, but are served by a separate queue from that used by service 'b'.

r := pram.NewRegistry(snsc, sqsc, pram.WithPrefixNaming("dev", "c"))
s := pram.NewSubscriber(snsc, sqsc, pram.WithQueueRegistry(r))

s.Subscribte(ctx, new(handler))

Logging

Info level logs, such as infrastructure creation and message publish/receive can be output by providing a pram.Logger implementation to pram.SetLogger. This can be used to understand the underlying AWS SDK calls being made. For example, the following configuration uses a standard library logger.

l := log.New(os.Stdout, "", log.Ldate|log.Ltime)
pram.SetLogger(l)

In the case of message publishing, registry and handler errors are returned immediately to the calling code, so can be handled in the usual manner. For message subscriptions, however, the Subscribe function will only return an error if the required queue cannot be resolved. Handler and AWS SDK errors will not be returned. To log these errors, an handler should be supplied when creating the subscriber.

s := pram.NewSubscriber(sqsClient, pram.WithQueueRegistry(reg), pram.WithErrorHandler(func(err error) {
    pram.Logf("subscriber: %v", err)
}))

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Log

func Log(v ...interface{})

Log logs the input to the configured logger

func Logf

func Logf(format string, a ...interface{})

Logf logs the input to the configured logger

func Marshal

func Marshal(m proto.Message, optFns ...func(*Metadata)) ([]byte, error)

Marshal marshals the specified message

func MessageName

func MessageName(m proto.Message) string

MessageName returns the message name with hyphen separation, e.g. my.package.MessageName -> my-package-MessageName

func SetLogger

func SetLogger(l Logger)

SetLogger sets the logger

func WithCorrelationID

func WithCorrelationID(id string) func(*Metadata)

WithCorrelationID sets the message correlation id

func WithErrorHandler

func WithErrorHandler(fn func(error)) func(*SubscriberOptions)

WithErrorHandler configures the subscriber to use the specified error handler func

func WithPrefixNaming

func WithPrefixNaming(stage, service string) func(*RegistryOptions)

WithPrefixNaming configures the registry to use prefix naming to support complex message routing It applies the following format, assuming a protobuf type name of package.Message:

topic: stage-package-Message
queue: stage-service-package-Message
error: stage-service-package-Message_error

func WithQueueRegistry

func WithQueueRegistry(r *Registry) func(*SubscriberOptions)

WithQueueRegistry configures the subscriber to use the specified registry to resolve queues, creating them if they do not exist

func WithStore

func WithStore(s Store) func(*RegistryOptions)

WithStore configures the registry to use the specified store

func WithTopicRegistry

func WithTopicRegistry(r *Registry) func(*PublisherOptions)

WithTopicRegistry configures the subscriber to use the specified registry to resolve topics, creating them if they do not exist

Types

type Handler

type Handler interface {
	Message() proto.Message
	Handle(ctx context.Context, m proto.Message, md Metadata) error
}

Handler represents a message handler

type Logger

type Logger interface {
	Print(v ...interface{})
	Printf(format string, a ...interface{})
}

Logger represents a logger

type Message

type Message struct {
	Payload proto.Message
	Metadata
}

Message represents a message

func Unmarshal

func Unmarshal(b []byte, m proto.Message) (Message, error)

Unmarshal unmarshals the specified message

type Metadata

type Metadata struct {
	ID            string
	Type          string
	CorrelationID string
	Timestamp     time.Time
}

Metadata represents message metadata

type Publisher

type Publisher struct {
	// contains filtered or unexported fields
}

Publisher represents a publisher

func NewPublisher

func NewPublisher(client SNS, optFns ...func(*PublisherOptions)) *Publisher

NewPublisher returns a new publisher

func (*Publisher) Publish

func (p *Publisher) Publish(ctx context.Context, m proto.Message, opts ...func(*Metadata)) error

Publish publishes the specified message

type PublisherOptions

type PublisherOptions struct {
	TopicARNFn func(context.Context, proto.Message) (string, error)
}

PublisherOptions represents a set of publisher options

type QueueOptions

type QueueOptions struct {
	NameFn          func(proto.Message) string
	ErrorNameFn     func(proto.Message) string
	MaxReceiveCount int
}

QueueOptions represents a set of queue options

type Registry

type Registry struct {
	// contains filtered or unexported fields
}

Registry represents an infrastructure registry

func NewRegistry

func NewRegistry(snsc SNS, sqsc SQS, optFns ...func(*RegistryOptions)) *Registry

NewRegistry returns a new registry

func (*Registry) QueueURL

func (r *Registry) QueueURL(ctx context.Context, m proto.Message) (string, error)

QueueURL returns the queue url for the specified message, or registers it if it does not exist

func (*Registry) TopicARN

func (r *Registry) TopicARN(ctx context.Context, m proto.Message) (string, error)

TopicARN returns the topic arn for the specified message, or registers it if it does not exist

type RegistryOptions

type RegistryOptions struct {
	Store Store
	Topic TopicOptions
	Queue QueueOptions
}

RegistryOptions represents a set of registry options

type SNS

type SNS interface {
	Publish(ctx context.Context, params *sns.PublishInput, optFns ...func(*sns.Options)) (*sns.PublishOutput, error)
	aws.SNS
}

SNS represents an sns client interface

type SQS

type SQS interface {
	ReceiveMessage(ctx context.Context, params *sqs.ReceiveMessageInput, optFns ...func(*sqs.Options)) (*sqs.ReceiveMessageOutput, error)
	DeleteMessage(ctx context.Context, params *sqs.DeleteMessageInput, optFns ...func(*sqs.Options)) (*sqs.DeleteMessageOutput, error)
	aws.SQS
}

SQS represents an sqs client interface

type Store

type Store interface {
	GetOrSetTopicARN(ctx context.Context, topicName string, fn func() (string, error)) (string, error)
	GetOrSetQueueURL(ctx context.Context, queueName string, fn func() (string, error)) (string, error)
}

Store represents a key value store

type Subscriber

type Subscriber struct {
	// contains filtered or unexported fields
}

Subscriber represents a subscriber

func NewSubscriber

func NewSubscriber(client SQS, optFns ...func(*SubscriberOptions)) *Subscriber

NewSubscriber returns a new subscriber

func (*Subscriber) Subscribe

func (s *Subscriber) Subscribe(ctx context.Context, h Handler) error

Subscribe subscribes listens to messages for the specified handler

type SubscriberOptions

type SubscriberOptions struct {
	QueueURLFn               func(context.Context, proto.Message) (string, error)
	ErrorFn                  func(error)
	MaxNumberOfMessages      int
	ReceiveInterval          time.Duration
	WaitTimeSeconds          int
	VisibilityTimeoutSeconds int
}

SubscriberOptions represents a set of subscriber options

type TopicOptions

type TopicOptions struct {
	NameFn func(proto.Message) string
}

TopicOptions represents a set of topic options

Directories

Path Synopsis
internal
aws
aws/mocks
Package mocks is a generated GoMock package.
Package mocks is a generated GoMock package.
Package mocks is a generated GoMock package.
Package mocks is a generated GoMock package.
proto

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL