Documentation ¶
Overview ¶
Package xsqs is the library that provides a convenient and flexible way to consume messages from Amazon Simple Queue Service (SQS) in a reliable and efficient manner.
Index ¶
- Constants
- func ContextWithWorkerCtx(ctx context.Context, worker WorkerCtx) context.Context
- func GetQueueURL(client sqsiface.SQSAPI, queue string) (string, error)
- func IsUnrecoverableError(err error) bool
- func UnrecoverableError(err error) error
- type Backoff
- type BulkConsumer
- type BulkError
- type BulkMessageError
- type Client
- type ClientOption
- type Consumer
- type ConsumerOption
- type Handler
- type HandlerFunc
- type ParallelConsumer
- type RetryInput
- type SequentialConsumer
- type Worker
- type WorkerCtx
- type WorkerOption
Constants ¶
const ( // MaxVisibilityTimeout represents the maximum visibility timeout duration for an SQS message. MaxVisibilityTimeout = time.Second*43200 - time.Second*1 // MinVisibilityTimeout represents the minimum visibility timeout duration for an SQS message. MinVisibilityTimeout = time.Second * 10 )
Variables ¶
This section is empty.
Functions ¶
func ContextWithWorkerCtx ¶
ContextWithWorkerCtx returns a new context with WorkerCtx derived from the provided parent context.
func GetQueueURL ¶
GetQueueURL retrieves the queue URL based on the provided queue name. If the queue name is already a URL, it is returned as is. If the queue name is an ARN, the associated queue URL is fetched.
func IsUnrecoverableError ¶
IsUnrecoverableError checks if an error is an unrecoverable error. It returns true if the error is an unrecoverable error created using the UnrecoverableError function.
func UnrecoverableError ¶
UnrecoverableError creates a new unrecoverable error. Unrecoverable errors indicate that the processing of a message is not recoverable and should not be retried.
Types ¶
type Backoff ¶
Backoff represents a function that calculates the backoff duration based on the number of attempts and an error. It returns the calculated backoff duration and a boolean indicating whether the message is retryable. If the message is retryable, but the duration is 0, the message won't be deleted from the queue and the VisibilityTimeout will not be updated.
func ConstantBackoff ¶
ConstantBackoff returns a Backoff function that always returns the same duration value for each attempt. The provided duration is used as the fixed backoff duration for all retries.
func ExponentialBackoff ¶
ExponentialBackoff returns a Backoff function that calculates the backoff duration in an exponentially growing manner. The provided max duration specifies the upper limit for the backoff duration.
type BulkConsumer ¶
type BulkConsumer struct {
// contains filtered or unexported fields
}
BulkConsumer is a bulk implementation of the Consumer interface that handles messages in bulk.
func NewBulkConsumer ¶
func NewBulkConsumer(client Client, handler Handler[[]*sqs.Message], opts ...ConsumerOption) *BulkConsumer
NewBulkConsumer creates a consumer for the XSQS worker that handles messages in bulk.
func (*BulkConsumer) Consume ¶
Consume processes the messages in bulk using the provided handler. It handles errors differently based on the error type: - For UnrecoverableError, all received messages will be deleted. - For BulkError, it categorizes messages to be deleted or retried based on error types.
type BulkError ¶
type BulkError struct {
Errors []BulkMessageError // List of individual message errors.
}
BulkError represents an error that occurs during bulk message processing. It contains a list of BulkMessageError instances that provide detailed information about individual message errors. This error is intended for use with BulkConsumer.
type BulkMessageError ¶
BulkMessageError represents an error that occurs while processing an individual message in a bulk operation. It contains the MessageID of the message and the corresponding error.
func (BulkMessageError) Error ¶
func (e BulkMessageError) Error() string
Error returns the string representation of a BulkMessageError.
func (BulkMessageError) Unwrap ¶
func (e BulkMessageError) Unwrap() error
Unwrap returns the underlying error of a BulkMessageError. It allows accessing the original error that caused the BulkMessageError.
type Client ¶
type Client interface { // Receive retrieves messages from the queue. // It returns a slice of messages up to the specified limit. Receive(ctx context.Context, limit int) ([]*sqs.Message, error) // Delete removes a message from the queue. Delete(ctx context.Context, message *sqs.Message) error // DeleteBatch removes a batch of messages from the queue. DeleteBatch(ctx context.Context, messages []*sqs.Message) error // Retry changes the visibility timeout of a message, effectively making it // available for processing again after a certain delay. Retry(ctx context.Context, input RetryInput) error // RetryBatch changes the visibility timeout of a batch of messages, allowing // them to be retried after specific delays. RetryBatch(ctx context.Context, input []RetryInput) error }
Client interface for SQS
type ClientOption ¶
type ClientOption func(*client)
ClientOption represents an option for configuring a client.
func WithVisibilityTimeout ¶
func WithVisibilityTimeout(timeout time.Duration) ClientOption
WithVisibilityTimeout sets the visibility timeout duration for messages in an SQS client.
func WithWaitTimeout ¶
func WithWaitTimeout(timeout time.Duration) ClientOption
WithWaitTimeout sets the duration for which the client waits for a message to arrive in the queue before returning.
type Consumer ¶
Consumer defines an interface for message consumers. Implementations should define the behavior of processing messages.
type ConsumerOption ¶
type ConsumerOption func(*consumerOptions)
ConsumerOption represents an option for configuring a consumer.
func WithBackoff ¶
func WithBackoff(backoff Backoff) ConsumerOption
WithBackoff sets the backoff strategy for a consumer
type Handler ¶
Handler is an interface that defines the contract for handling messages of type T. The Handle method should process the message and return an error if any. By default, any error is considered retryable. To return an unrecoverable error, see UnrecoverableError.
type HandlerFunc ¶
HandlerFunc is a function type that implements the Handler interface for messages of type T. It allows using functions as handlers.
type ParallelConsumer ¶
type ParallelConsumer struct {
// contains filtered or unexported fields
}
ParallelConsumer processes messages in parallel using a SequentialConsumer for each message.
func NewParallelConsumer ¶
func NewParallelConsumer(client Client, handler Handler[*sqs.Message], opts ...ConsumerOption) *ParallelConsumer
NewParallelConsumer creates a consumer for the XSQS worker that processes messages in parallel, each message in its own goroutine.
type RetryInput ¶
type RetryInput struct { Message *sqs.Message // The delay before the message becomes visible again. Delay time.Duration }
RetryInput provides the necessary information for retrying a message.
type SequentialConsumer ¶
type SequentialConsumer struct {
// contains filtered or unexported fields
}
SequentialConsumer processes messages one by one
func NewSequentialConsumer ¶
func NewSequentialConsumer(client Client, handler Handler[*sqs.Message], opts ...ConsumerOption) *SequentialConsumer
NewSequentialConsumer creates a consumer for the XSQS worker that processes messages one by one.
type Worker ¶
type Worker struct {
// contains filtered or unexported fields
}
Worker continuously polls an SQS queue for messages and consumes them using a Consumer implementation.
type WorkerOption ¶
type WorkerOption func(*Worker)
WorkerOption represents an option for configuring a worker.
func WithIdle ¶
func WithIdle(duration time.Duration, drain bool) WorkerOption
WithIdle sets the idle delay duration between ReceiveMessage requests for a Worker. It also specifies whether the Worker should drain remaining messages before entering idle mode.
func WithLogger ¶
func WithLogger(logger logging.Log) WorkerOption
WithLogger sets a logger for an SQS worker.
func WithMessagesLimit ¶
func WithMessagesLimit(limit int) WorkerOption
WithMessagesLimit sets the maximum number of messages to return per polling request for a Worker.
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
Package deduplication provides middleware for FIFO queue handler.
|
Package deduplication provides middleware for FIFO queue handler. |
examples
|
|
Package logging provides the logger interface for XSQS
|
Package logging provides the logger interface for XSQS |