listener

package
v0.7.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 3, 2023 License: MIT Imports: 17 Imported by: 0

README

AWS SNS listener package

Go Reference

The listener package can be used for listening to an Amazon SNS topic. It supports both regular topics and FIFO topics. The original motivation for this project was to be able to subscribe to a SNS topic and identify the shape and content of messages being published, making it easier to build other software. A CLI that wraps this package is available in the root of this repository.

Using the package

Importing

listener uses semantic versioning and conventional commits for releases. The latest published version can be fetched with:

go get github.com/whatsfordinner/aws-sns-listener/pkg/listener

A specific version can be fetched with:

go get github.com/whatsfordinner/aws-sns-listener/pkg/listener@v0.5.0
Tracing

The package uses the OpenTelemetry library for distributed tracing and if an exporter is defined in the calling service it will emit spans using that exporter.

Setup

The package uses two AWS APIs for operation:

  • Simple Notification Service for subscribing and unsubscribing to the topic
  • Simple Queue Service for creating a queue, receiving messages from it and then destroying it

The package defines two interfaces for this purpose: SNSAPI and SQSAPI which are all shims over the the clients provided by v2 of the AWS SDK. The easiest way to use this package would be to create those 2 clients and provide them to listener.New.

Configuration and execution are managed by the listener.Listener struct which tracks the SNS topic as well as the SQS queue and its subcription to the topic. A new Listener is created with listener.New which takes in the topic ARN and the AWS clients. The simplest possible implementation would be:

l := listener.New(
    "arn:aws:sns:us-east-1:123456789012:my-topic",
    sns.NewFromConfig(cfg),
    sqs.NewFromConfig(cfg),
)

A more complication configuration which overrides the queue name, enables verbose logging and sets a polling interval of 250 miliseconds would be:

l := listener.New(
    "arn:aws:sns-us-east-1:123456789012:my-topic",
    sns.NewFromConfig(cfg),
    sqs.NewFromConfig(cfg),
    listener.WithQueueName("my-queue"),
    listener.WithVerbose(true),
    listener.WithPollingInterval(250 * time.Milisecond),
)

The final component is the listener.Consumer interface which is used by the package to notify the calling service of messages. It requires only a single method: OnMessage(context.Context, listener.MessageContent). Context is supplied for use with tracing and to notify the consumer in the event of cancellation. An example implementation would be:

type consumer struct{}

func (c consumer) OnMessage(ctx context.Context, msg listener.MessageContent) {
    fmt.Printf("Message content:\n\tID: %s\n\tBody: %s\n", msg.Id, msg.Body)
}
Runtime

The package can start being used once the Listener struct has been created. The struct receives three methods:

  • Setup which creates the SQS queue and subscribes it to the SNS topic
  • Listen which receives messages from the queue and passes them to a listener.Consumer's OnMessage method
  • Teardown which destroys the subscription and queue

A possible implementation would be:

ctx := context.Background()

err := l.Setup(ctx) // our listener from earlier

if err != nil {
    panic(err)
}

sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, os.Interrupt) // stop on Ctrl+C

errCh := make(chan error, 1)

listenCtx, cancel := context.WithCancel(ctx)

go func() {
    errCh <-l.Listen(
        listenCtx,
        consumer{}, // our consumer from earlier
    )
}()

select {
case err := <- errCh: // catch any errors while listening
    fmt.Println(err.Error()) // log them but we still want to cleanup
case <- sigCh: // catch SIGINT
    cancel() // notify the goroutine it's time to shutdown
    err := <- errCh // catch any errors while stopping

    if err != nil {
        fmt.Println(err.Error()) // log them but proceed to cleanup
    }
}

_ = l.Teardown(ctx) // be careful not to use cancelled context here
Teardown

It is best not to panic even if an error occurs when setting up or listening. The teardown method will still be able to run because the struct has kept track of what needs destroying (if it's been created). If you don't care about catching teardown error specifically, it's fine to defer the method.

Documentation

Overview

Package listener provides a way of listening to messages published to an AWS SNS Topic.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Consumer

type Consumer interface {
	// OnMessage is called when a message is successfully processed from the SQS queue. If no messages are processed then OnMessage won't
	// be called.
	OnMessage(ctx context.Context, msg MessageContent)
}

A Consumer is used by ListenToTopic to process messages and errors during the course of oeprations. Both of its methods are provided with the existing context used by the package so that any implementing type is able to be propagate traces or be made aware of context cancellations.

type Listener added in v0.7.0

type Listener struct {
	// PollingInterval is the time between attempts to receive messages from the SQS queue
	PollingInterval time.Duration
	// QueueName is the desired name for the SQS queue. If blank a v4 UUID prefixed with "sns-listener" will be used
	QueueName string
	// TopicArn is the ARN of the SNS topic to be listened to.
	TopicArn string
	// Verbose will enable logging to stderr when true, otherwise logs are discarded
	Verbose bool
	// SnsClient is a user-provided client used to interact with the SNS API
	SnsClient SNSAPI
	// SqsClient is a user-provided client used to interact with the SQS API
	SqsClient SQSAPI
	// contains filtered or unexported fields
}

A Listener manages the resources for listening to a queue. It should not be instantiated directly, instead the New() function should be used.

func New added in v0.7.0

func New(topicArn string, snsClient SNSAPI, sqsClient SQSAPI, opts ...Option) *Listener

New creates a new Listener and returns a pointer to it.

func (*Listener) Listen added in v0.7.0

func (l *Listener) Listen(ctx context.Context, c Consumer) error

Listen is a blocking function that processes messages from the SQS queue as they arrive. Listen will block until the context provided to it is cancelled. Messages will be passed to the provided Consumer's OnMessage method then deleted from the queue. Do not pass the same context as provided to Teardown otherwise resources will not be destroyed.

func (*Listener) Setup added in v0.7.0

func (l *Listener) Setup(ctx context.Context) error

Setup will create an SQS queue and subscribe it to that queue. The queue is given a policy that allows the SNS topic to subscribe to it. Once the queue is subscribed to the SNS topic it will start receiving published messages.

func (*Listener) Teardown added in v0.7.0

func (l *Listener) Teardown(ctx context.Context) error

Teardown unsubscribes the queue from the topic and then deletes the queue. It will attempt to do both regardless of the existing state of the infrastructure.

type MessageContent

type MessageContent struct {
	Body *string
	Id   *string
}

A MessageContent maps the message body and message ID of a SQS message to a much more straightforward struct. For the purpose of listening to an SNS topic, the Body contains the full message that was published

type Option added in v0.7.0

type Option func(l *Listener)

An Option allows for the passing of optional parameters when creating a new Listener.

func WithPollingInterval added in v0.7.0

func WithPollingInterval(pollingInterval time.Duration) Option

WithPollingInterval will set PollingInterval to the provided time. Defaults to 1 second if set to 0.

func WithQueueName added in v0.7.0

func WithQueueName(queueName string) Option

WithQueueName will control the name of the SQS queue created by the Listener. When listening to a FIFO topic, the Listener will add ".fifo" to the queue itself.

func WithVerbose added in v0.7.0

func WithVerbose(verbose bool) Option

WithVerbose controls whether or not logs will be printed to stderr.

type SNSAPI

type SNSAPI interface {
	Subscribe(ctx context.Context,
		params *sns.SubscribeInput,
		optFns ...func(*sns.Options)) (*sns.SubscribeOutput, error)

	Unsubscribe(ctx context.Context,
		params *sns.UnsubscribeInput,
		optFns ...func(*sns.Options)) (*sns.UnsubscribeOutput, error)
}

SNSAPI is a shim over v2 of the AWS SDK's sns client. The sns client provided by github.com/aws/aws-sdk-go-v2/service/sns automatically satisfies this.

type SQSAPI

type SQSAPI interface {
	CreateQueue(ctx context.Context,
		params *sqs.CreateQueueInput,
		optFns ...func(*sqs.Options)) (*sqs.CreateQueueOutput, error)

	DeleteQueue(ctx context.Context,
		params *sqs.DeleteQueueInput,
		optFns ...func(*sqs.Options)) (*sqs.DeleteQueueOutput, error)

	GetQueueAttributes(ctx context.Context,
		params *sqs.GetQueueAttributesInput,
		optFns ...func(*sqs.Options)) (*sqs.GetQueueAttributesOutput, error)

	ReceiveMessage(ctx context.Context,
		params *sqs.ReceiveMessageInput,
		optFns ...func(*sqs.Options)) (*sqs.ReceiveMessageOutput, error)

	DeleteMessage(ctx context.Context,
		params *sqs.DeleteMessageInput,
		optFns ...func(*sqs.Options)) (*sqs.DeleteMessageOutput, error)
}

SQSAPI is a shim over v2 of the AWS SDK's sqs client. The sqs client provided by github.com/aws/aws-sdk-go-v2/service/sqs automatically satisfies this.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL