core_message_consumer

package module
v0.0.0-...-eeb1421 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 7, 2023 License: MIT Imports: 8 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Consumer

type Consumer struct {
	Logger                *zap.Logger
	NewRelicClient        *newrelic.Application
	MetricEngine          *telemetry.MetricsEngine
	ServiceMetrics        *telemetry.ServiceMetrics
	Client                *core_message_queue.SqsQueueHandle
	QueueUrl              *string
	ConcurrencyFactor     int
	QueuePollingDuration  time.Duration
	MessageProcessTimeout time.Duration
}

func NewConsumer

func NewConsumer(params *ConsumerParams) (*Consumer, error)

NewConsumer instantiates a new instance of the aws consumer object

func (*Consumer) ConcurrentConsumer

func (c *Consumer) ConcurrentConsumer(f MessageProcessorFunc)

ConcurrentConsumer creates a limited parallel queue, and continues to poll AWS until all the limit is reached. This is performed by implementing a token bucket” using a buffered channel hence this approach is only limited by aws throughput

Some scenarios will require a different set of resources consumed, depending on the message type (Lets say you want your handler to be able to process from 1 to N emails in 1 message).

To maintain our limitations, we could introduce the timely based token bucket algorithm , which will ensure we don’t process more than N emails over a period of time (like 1 minute),
by grabbing the exact amount of “worker tokens” from the pool, depending on emails count in message. Also, if your code can be timed out, there is a good approach to impose timeout and cancellation,
based on golang context.WithCancel function. Check out the golang semaphore library to build the nuclear-resistant solution. (the mechanics are the same as in our example, abstracted to library,

so instead of using channel for limiting our operation we will call semaphore.Acquire, which will also block our execution until “worker tokens” will be refilled).

LINK - Ref: https://docs.microsoft.com/en-us/azure/architecture/microservices/model/domain-analysis LINK - Ref: https://docs.microsoft.com/en-us/azure/architecture/microservices/design/interservice-communication

func (*Consumer) NaiveConsumer

func (c *Consumer) NaiveConsumer(f MessageProcessorFunc)

As standard aws sqs receive call gives us maximum of 10 messages, the naive approach will be to process them

in parallel, then call the next batch.

With approach like this we will be limited to the 1 minute / slowest message processing in batch * 10, for example having the slowest message being processed in 50ms it will give us (1000 ms / 50ms) * 10 = 200 messages per second of processing time minus network latency, that can eat up most of the projected capacity.

type ConsumerConfigs

type ConsumerConfigs struct {
	ConcurrencyFactor     int
	MessageProcessTimeout time.Duration
	QueuePollingDuration  time.Duration
}

type ConsumerParams

type ConsumerParams struct {
	QueueURl       *string
	Logger         *zap.Logger
	NrClient       *newrelic.Application
	MetricsEngine  *telemetry.MetricsEngine
	ServiceMetrics *telemetry.ServiceMetrics
	AwsClient      *core_message_queue.SqsQueueHandle
	Config         *ConsumerConfigs
}

type IConsumer

type IConsumer interface {
	ConcurrentConsumer(f MessageProcessorFunc)
	NaiveConsumer(f MessageProcessorFunc)
}

type MessageProcessorFunc

type MessageProcessorFunc = func(ctx context.Context, message *core_message_queue.Message) error

MessageProcessorFunc serves as the logic used to process each incoming message from a msg queue

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL