consumer

package
v1.27.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 3, 2024 License: MIT Imports: 10 Imported by: 0

README

Consumer

To build a super performant SQS consumer in Golang, there are several best practices to follow. Here are some recommendations:

  • Use long polling: Use long polling instead of short polling to receive messages from SQS. Long polling reduces the number of requests and therefore reduces the cost and latency. Set the WaitTimeSeconds parameter to a high value (up to 20 seconds).

  • Use multiple goroutines: Consume messages from SQS using multiple goroutines to process messages concurrently. Create a worker pool and dispatch incoming messages to the workers. This approach can improve performance by taking advantage of multiple cores and minimizing idle time.

  • Reduce message processing time: Keep message processing time to a minimum to avoid blocking the worker pool. If possible, move the heavy processing to another service or to a batch processing job.

  • Use batch deletion: Use batch deletion to delete messages from SQS. This approach reduces the number of API calls and can significantly improve performance.

  • Enable buffering: Use buffering to reduce the number of API calls. Buffer messages in memory or on disk before processing them. This approach can be especially useful when processing large messages or when processing messages at high volumes.

package main

import (
	"log"
	"sync"
	"time"

	"github.com/aws/aws-sdk-go/aws"
	"github.com/aws/aws-sdk-go/aws/session"
	"github.com/aws/aws-sdk-go/service/sqs"
)

const (
	maxNumberOfMessages = 10
	waitTimeSeconds     = 20
)

func main() {
	// Create an AWS session
	sess, err := session.NewSession(&aws.Config{})
	if err != nil {
		log.Fatalf("Failed to create AWS session: %v", err)
	}

	// Create an SQS client
	sqsClient := sqs.New(sess)

	// Create a WaitGroup to synchronize worker goroutines
	var wg sync.WaitGroup

	// Create a channel to receive messages
	msgChan := make(chan *sqs.Message)

	// Create a worker pool
	numWorkers := 10
	for i := 0; i < numWorkers; i++ {
		wg.Add(1)
		go worker(i, msgChan, sqsClient, &wg)
	}

	// Create a receive message input
	input := &sqs.ReceiveMessageInput{
		QueueUrl:            aws.String("<QUEUE_URL>"),
		MaxNumberOfMessages: aws.Int64(maxNumberOfMessages),
		WaitTimeSeconds:     aws.Int64(waitTimeSeconds),
	}

	// Continuously receive messages from SQS and dispatch to worker pool
	for {
		output, err := sqsClient.ReceiveMessage(input)
		if err != nil {
			log.Printf("Failed to receive messages: %v", err)
			time.Sleep(1 * time.Second)
			continue
		}

		if len(output.Messages) == 0 {
			continue
		}

		// Dispatch messages to worker pool
		for _, message := range output.Messages {
			msgChan <- message
		}
	}

	// Wait for worker pool to complete
	close(msgChan)
	wg.Wait()
}

// Worker function to process messages
func worker(id int, msgChan <-chan *sqs.Message, sqsClient *sqs.SQS, wg *sync.WaitGroup) {
	defer wg.Done()

	// Process incoming messages
	for message := range msgChan {
	    log.Printf("Worker %d received message: %s", id, *message.Body)

	    // Process message here...

	    // Delete the message from the queue
	    if _, err := sqsClient.DeleteMessage(&s,sqs.DeleteMessageInput{
                QueueUrl: aws.String("<QUEUE_URL>"),
                ReceiptHandle: message.ReceiptHandle,
                }); err != nil {
        
            log.Printf("Failed to delete message: %v", err)
        }
    }
}

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type ConsumerClient

type ConsumerClient struct {
	// contains filtered or unexported fields
}

ConsumerClient encapsulates fields related to a client that consumes messages from a queue. It uses a logger for debugging, an instrumentation client for performance monitoring, and an SQS client to interact with the message queue.

func New

func New(options ...Option) (*ConsumerClient, error)

The New function creates a new ConsumerClient instance with optional configuration options.

example: consumer := NewConsumerClient(

WithLogger(yourLogger),
WithInstrumentationClient(yourInstrumentationClient),
WithSQSClient(yourSQSClient),
WithQueueURL(&yourQueueURL),
WithDLQURL(&yourDLQURL),
WithConcurrencyFactor(yourConcurrencyFactor),
WithQueuePollingDuration(yourPollingDuration),
WithMessageProcessTimeout(yourMessageTimeout),
WithMessageHandler(yourMessageHandlerFunc),
WithBackoffDuration(yourBackoffDuration),
WithBatchSize(yourBatchSize),
WithWaitTimeSecond(yourWaitTimeSeconds),

)

func (*ConsumerClient) Start

func (c *ConsumerClient) Start()

Start initiates the polling of the SQS queue in a separate goroutine.

Example: poller := NewSQSPoller("us-west-2", "https://sqs.us-west-2.amazonaws.com/1234567890/myqueue",

"https://sqs.us-west-2.amazonaws.com/1234567890/mydlq", exampleHandler)

poller.Start()

func (*ConsumerClient) Stop

func (c *ConsumerClient) Stop()

Stop sends a signal to the poller to stop polling.

Example: poller := NewSQSPoller("us-west-2", "https://sqs.us-west-2.amazonaws.com/1234567890/myqueue",

"https://sqs.us-west-2.amazonaws.com/1234567890/mydlq", exampleHandler)

poller.Start() time.Sleep(10 * time.Second) // Let it poll for 10 seconds poller.Stop()

func (*ConsumerClient) Validate

func (c *ConsumerClient) Validate() error

Validate validates whether all the required parameters for the consumer client have been set. It checks if the `SqsClient`, `Logger`, `NewRelicClient`, `QueueUrl`, `ConcurrencyFactor`, `MessageProcessTimeout`, and `QueuePollingDuration` fields are not nil or zero. If any of these fields are nil or zero, it returns an error indicating that the consumer client is invalid.

type IConsumer

type IConsumer interface {
	Start()
	Stop()
}

IConsumer provides an interface for consuming messages either concurrently or in a naive, sequential manner. Start: Begins processing messages concurrently. Stop: Halts the processing of messages.

type MessageProcessorFunc

type MessageProcessorFunc = func(ctx context.Context, message *sqs.Message) error

MessageProcessorFunc defines the function type that processes messages from SQS. An error should be returned if processing fails, leading the message to be moved to the DLQ.

type Option

type Option func(*ConsumerClient)

func WithBackoffDuration

func WithBackoffDuration(duration time.Duration) Option

func WithBatchSize

func WithBatchSize(size int64) Option

func WithConcurrencyFactor

func WithConcurrencyFactor(factor int) Option

func WithDLQURL

func WithDLQURL(dlqURL *string) Option

func WithInstrumentationClient

func WithInstrumentationClient(ic *instrumentation.Client) Option

func WithLogger

func WithLogger(logger *zap.Logger) Option

func WithMessageHandler

func WithMessageHandler(handler MessageProcessorFunc) Option

func WithMessageProcessTimeout

func WithMessageProcessTimeout(timeout time.Duration) Option

func WithQueuePollingDuration

func WithQueuePollingDuration(duration time.Duration) Option

func WithQueueURL

func WithQueueURL(url *string) Option

func WithSQSClient

func WithSQSClient(client *sqs.SQS) Option

func WithWaitTimeSecond

func WithWaitTimeSecond(waitTime int64) Option

type SQSAPI

type SQSAPI interface {
	SendMessageWithContext(ctx aws.Context, input *sqs.SendMessageInput, opts ...request.Option) (*sqs.SendMessageOutput, error)
	ReceiveMessageWithContext(ctx aws.Context, input *sqs.ReceiveMessageInput, opts ...request.Option) (*sqs.ReceiveMessageOutput, error)
	DeleteMessageWithContext(ctx aws.Context, input *sqs.DeleteMessageInput, opts ...request.Option) (*sqs.DeleteMessageOutput, error)
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL