xsqs

package module
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 15, 2023 License: MIT Imports: 13 Imported by: 0

README

XSQS - AWS SQS Consumer

Coverage

XSQS is a powerful Go library that simplifies and enhances the process of consuming messages from Amazon Simple Queue Service (SQS). With XSQS, you can seamlessly handle messages in an efficient and reliable manner, enabling you to focus on building robust applications.

Features

  • Simplified Message Consumption: XSQS abstracts away the complexities of working with SQS, allowing you to focus on processing messages rather than dealing with low-level details.
  • Error Handling Strategies: Handle errors with ease using customizable backoff and retry mechanisms. Mark messages as unrecoverable when certain errors occur to prevent endless retries.
  • Flexible Processing Strategies: Choose between sequential, parallel, or bulk processing strategies based on your application's needs.
  • Middleware Support: Enhance XSQS capabilities with middleware, enabling you to add custom logic before or after message processing. Create reusable components to address specific use cases.
  • Extended Deduplication: Extend SQS FIFO queue deduplication with XSQS's built-in deduplication feature. Handle duplicate messages more effectively, complementing SQS's deduplication 5 minutes interval.

Installation

To install XSQS, use the following command:

go get github.com/yklyahin/xsqs

Make sure to grant the necessary SQS policies:

"sqs:ReceiveMessage"
"sqs:DeleteMessage"
"sqs:DeleteMessageBatch"
"sqs:ChangeMessageVisibility"
"sqs:ChangeMessageVisibilityBatch"
"sqs:GetQueueUrl"

Usage

Here's an example of how to use XSQS to consume messages from an SQS queue:

package main

import (
	"context"
	"fmt"
	"log"
	"time"

	"github.com/aws/aws-sdk-go/aws/session"
	"github.com/aws/aws-sdk-go/service/sqs"
	"github.com/yklyahin/xsqs"
)

func main() {
	// Create an SQS client
	sqsClient := sqs.New(session.Must(session.NewSession()))
	// Create a XSQS client
	xsqsClient := xsqs.NewClient(sqsClient, "arn:aws:sqs:eu-west-1:100000000000:my-queue")
	// Create a message handler
	messageHandler := xsqs.HandlerFunc[*sqs.Message](handleMessage)
	// Create a consumer
	consumer := xsqs.NewParallelConsumer(
		xsqsClient, 
		messageHandler, 
		xsqs.WithBackoff(xsqs.ExponentialBackoff(time.Hour * 5)),
	)
	// Create a worker with options
	worker := xsqs.NewWorker("my-worker", xsqsClient, consumer)
	// Start the worker in a separate goroutine
	go worker.Start(context.Background())
}

func handleMessage(ctx context.Context, message *sqs.Message) error {
	// Process the message
	fmt.Println("Received message:", *message.Body)
	// Simulate processing time
	time.Sleep(2 * time.Second)
	return nil
}
Error handling

By default any of the XSQS consumers will keep retrying to process a message on any error.
If a consumer returns UnrecoverableError, it indicates that the error cannot be retried, and the message will be deleted from the queue.

import (
	"os"
	"fmt"

	"github.com/yklyahin/xsqs"
	"github.com/aws/aws-sdk-go/service/sqs"
)

func handleMessage(ctx context.Context, message *sqs.Message) error {
	file, err := os.Open("filepath")
	if err != nil {
		// This message won't be retried
		return xsqs.UnrecoverableError(fmt.Errorf("failed to open the file: %w", err))
	}
	// Do something
	return nil
}
Deduplication

XSQS introduces an advanced deduplication feature to prevent duplicate message processing. This capability complements SQS FIFO queue deduplication, particularly for scenarios involving duplicate messages arriving after the 5-minute interval.

Examples

For more examples and detailed usage instructions, please refer to the examples directory.

Contributing

Contributions are welcome! If you encounter any issues or have suggestions for improvements, please open an issue on GitHub. Feel free to fork the repository and submit pull requests for any enhancements.

License

XSQS is released under the MIT License.

Documentation

Overview

Package xsqs is the library that provides a convenient and flexible way to consume messages from Amazon Simple Queue Service (SQS) in a reliable and efficient manner.

Index

Constants

View Source
const (
	// MaxVisibilityTimeout represents the maximum visibility timeout duration for an SQS message.
	MaxVisibilityTimeout = time.Second*43200 - time.Second*1

	// MinVisibilityTimeout represents the minimum visibility timeout duration for an SQS message.
	MinVisibilityTimeout = time.Second * 10
)

Variables

This section is empty.

Functions

func ContextWithWorkerCtx

func ContextWithWorkerCtx(ctx context.Context, worker WorkerCtx) context.Context

ContextWithWorkerCtx returns a new context with WorkerCtx derived from the provided parent context.

func GetQueueURL

func GetQueueURL(client sqsiface.SQSAPI, queue string) (string, error)

GetQueueURL retrieves the queue URL based on the provided queue name. If the queue name is already a URL, it is returned as is. If the queue name is an ARN, the associated queue URL is fetched.

func IsUnrecoverableError

func IsUnrecoverableError(err error) bool

IsUnrecoverableError checks if an error is an unrecoverable error. It returns true if the error is an unrecoverable error created using the UnrecoverableError function.

func UnrecoverableError

func UnrecoverableError(err error) error

UnrecoverableError creates a new unrecoverable error. Unrecoverable errors indicate that the processing of a message is not recoverable and should not be retried.

Types

type Backoff

type Backoff func(attemtps int, err error) (time.Duration, bool)

Backoff represents a function that calculates the backoff duration based on the number of attempts and an error. It returns the calculated backoff duration and a boolean indicating whether the message is retryable. If the message is retryable, but the duration is 0, the message won't be deleted from the queue and the VisibilityTimeout will not be updated.

func ConstantBackoff

func ConstantBackoff(duration time.Duration) Backoff

ConstantBackoff returns a Backoff function that always returns the same duration value for each attempt. The provided duration is used as the fixed backoff duration for all retries.

func ExponentialBackoff

func ExponentialBackoff(max time.Duration) Backoff

ExponentialBackoff returns a Backoff function that calculates the backoff duration in an exponentially growing manner. The provided max duration specifies the upper limit for the backoff duration.

type BulkConsumer

type BulkConsumer struct {
	// contains filtered or unexported fields
}

BulkConsumer is a bulk implementation of the Consumer interface that handles messages in bulk.

func NewBulkConsumer

func NewBulkConsumer(client Client, handler Handler[[]*sqs.Message], opts ...ConsumerOption) *BulkConsumer

NewBulkConsumer creates a consumer for the XSQS worker that handles messages in bulk.

func (*BulkConsumer) Consume

func (c *BulkConsumer) Consume(ctx context.Context, messages []*sqs.Message) error

Consume processes the messages in bulk using the provided handler. It handles errors differently based on the error type: - For UnrecoverableError, all received messages will be deleted. - For BulkError, it categorizes messages to be deleted or retried based on error types.

type BulkError

type BulkError struct {
	Errors []BulkMessageError // List of individual message errors.
}

BulkError represents an error that occurs during bulk message processing. It contains a list of BulkMessageError instances that provide detailed information about individual message errors. This error is intended for use with BulkConsumer.

func (BulkError) Error

func (e BulkError) Error() string

Error returns the string representation of a BulkError.

type BulkMessageError

type BulkMessageError struct {
	MessageID string
	Err       error
}

BulkMessageError represents an error that occurs while processing an individual message in a bulk operation. It contains the MessageID of the message and the corresponding error.

func (BulkMessageError) Error

func (e BulkMessageError) Error() string

Error returns the string representation of a BulkMessageError.

func (BulkMessageError) Unwrap

func (e BulkMessageError) Unwrap() error

Unwrap returns the underlying error of a BulkMessageError. It allows accessing the original error that caused the BulkMessageError.

type Client

type Client interface {
	// Receive retrieves messages from the queue.
	// It returns a slice of messages up to the specified limit.
	Receive(ctx context.Context, limit int) ([]*sqs.Message, error)
	// Delete removes a message from the queue.
	Delete(ctx context.Context, message *sqs.Message) error
	// DeleteBatch removes a batch of messages from the queue.
	DeleteBatch(ctx context.Context, messages []*sqs.Message) error
	// Retry changes the visibility timeout of a message, effectively making it
	// available for processing again after a certain delay.
	Retry(ctx context.Context, input RetryInput) error
	// RetryBatch changes the visibility timeout of a batch of messages, allowing
	// them to be retried after specific delays.
	RetryBatch(ctx context.Context, input []RetryInput) error
}

Client interface for SQS

func NewClient

func NewClient(api sqsiface.SQSAPI, queue string, opts ...ClientOption) (Client, error)

NewClient create the client for SQS

type ClientOption

type ClientOption func(*client)

ClientOption represents an option for configuring a client.

func WithVisibilityTimeout

func WithVisibilityTimeout(timeout time.Duration) ClientOption

WithVisibilityTimeout sets the visibility timeout duration for messages in an SQS client.

func WithWaitTimeout

func WithWaitTimeout(timeout time.Duration) ClientOption

WithWaitTimeout sets the duration for which the client waits for a message to arrive in the queue before returning.

type Consumer

type Consumer interface {
	Consume(ctx context.Context, messages []*sqs.Message) error
}

Consumer defines an interface for message consumers. Implementations should define the behavior of processing messages.

type ConsumerOption

type ConsumerOption func(*consumerOptions)

ConsumerOption represents an option for configuring a consumer.

func WithBackoff

func WithBackoff(backoff Backoff) ConsumerOption

WithBackoff sets the backoff strategy for a consumer

type Handler

type Handler[T any] interface {
	Handle(ctx context.Context, message T) error
}

Handler is an interface that defines the contract for handling messages of type T. The Handle method should process the message and return an error if any. By default, any error is considered retryable. To return an unrecoverable error, see UnrecoverableError.

type HandlerFunc

type HandlerFunc[T any] func(ctx context.Context, message T) error

HandlerFunc is a function type that implements the Handler interface for messages of type T. It allows using functions as handlers.

func (HandlerFunc[T]) Handle

func (fn HandlerFunc[T]) Handle(ctx context.Context, message T) error

Handle calls the underlying HandlerFunc function.

type ParallelConsumer

type ParallelConsumer struct {
	// contains filtered or unexported fields
}

ParallelConsumer processes messages in parallel using a SequentialConsumer for each message.

func NewParallelConsumer

func NewParallelConsumer(client Client, handler Handler[*sqs.Message], opts ...ConsumerOption) *ParallelConsumer

NewParallelConsumer creates a consumer for the XSQS worker that processes messages in parallel, each message in its own goroutine.

func (*ParallelConsumer) Consume

func (c *ParallelConsumer) Consume(ctx context.Context, messages []*sqs.Message) error

Consume processes the messages in parallel by creating separate goroutines for each message.

type RetryInput

type RetryInput struct {
	Message *sqs.Message
	// The delay before the message becomes visible again.
	Delay time.Duration
}

RetryInput provides the necessary information for retrying a message.

type SequentialConsumer

type SequentialConsumer struct {
	// contains filtered or unexported fields
}

SequentialConsumer processes messages one by one

func NewSequentialConsumer

func NewSequentialConsumer(client Client, handler Handler[*sqs.Message], opts ...ConsumerOption) *SequentialConsumer

NewSequentialConsumer creates a consumer for the XSQS worker that processes messages one by one.

func (*SequentialConsumer) Consume

func (c *SequentialConsumer) Consume(ctx context.Context, messages []*sqs.Message) error

Consume processes the messages one by one using the provided handler. It delegates the processing of each message to the handler and takes appropriate action based on the error result.

type Worker

type Worker struct {
	// contains filtered or unexported fields
}

Worker continuously polls an SQS queue for messages and consumes them using a Consumer implementation.

func NewWorker

func NewWorker(name string, client Client, consumer Consumer, opts ...WorkerOption) *Worker

NewWorker creates a new Worker with the given name.

func (*Worker) Start

func (w *Worker) Start(ctx context.Context)

Start begins the worker's operation, which includes polling the SQS queue for messages. It takes a context as input to control the worker's lifecycle.

type WorkerCtx

type WorkerCtx struct {
	Name   string // Name of the worker.
	Logger logging.Log
}

WorkerCtx stores the context information for an SQS worker.

func GetWorkerCtxFromContext

func GetWorkerCtxFromContext(ctx context.Context) (WorkerCtx, bool)

GetWorkerCtxFromContext retrieves the WorkerCtx from the provided context. It returns the retrieved WorkerCtx and a boolean indicating whether the WorkerCtx was found in the context.

type WorkerOption

type WorkerOption func(*Worker)

WorkerOption represents an option for configuring a worker.

func WithIdle

func WithIdle(duration time.Duration, drain bool) WorkerOption

WithIdle sets the idle delay duration between ReceiveMessage requests for a Worker. It also specifies whether the Worker should drain remaining messages before entering idle mode.

func WithLogger

func WithLogger(logger logging.Log) WorkerOption

WithLogger sets a logger for an SQS worker.

func WithMessagesLimit

func WithMessagesLimit(limit int) WorkerOption

WithMessagesLimit sets the maximum number of messages to return per polling request for a Worker.

Directories

Path Synopsis
Package deduplication provides middleware for FIFO queue handler.
Package deduplication provides middleware for FIFO queue handler.
examples
Package logging provides the logger interface for XSQS
Package logging provides the logger interface for XSQS

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL