amqp

package
v0.73.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 14, 2022 License: Apache-2.0 Imports: 13 Imported by: 0

Documentation

Overview

Package amqp provides a native consumer for the AMQP protocol.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Batch

type Batch interface {
	// Messages of the batch.
	Messages() []Message
	// ACK deletes all messages and completes the message tracing spans.
	// In case the action will not manage to ACK all the messages, a slice of the failed messages will be returned.
	ACK() ([]Message, error)
	// NACK leaves all messages in the queue and completes all message tracing spans.
	// In case the action will not manage to NACK all the messages, a slice of the failed messages will be returned.
	NACK() ([]Message, error)
}

Batch interface for multiple AWS SQS messages.

type Component

type Component struct {
	// contains filtered or unexported fields
}

Component implementation of an async component.

func New

func New(url, queue string, proc ProcessorFunc, oo ...OptionFunc) (*Component, error)

New creates a new component with support for functional configuration.

func (*Component) Run

func (c *Component) Run(ctx context.Context) error

Run starts the consumer processing loop messages.

type Message

type Message interface {
	// Context will contain the context to be used for processing.
	// Each context will have a logger setup which can be used to create a logger from context.
	Context() context.Context
	// ID of the message.
	ID() string
	// Body of the message.
	Body() []byte
	// Message will contain the raw AMQP delivery.
	Message() amqp.Delivery
	// Span contains the tracing span of this message.
	Span() opentracing.Span
	// ACK deletes the message from the queue and completes the tracing span.
	ACK() error
	// NACK leaves the message in the queue and completes the tracing span.
	NACK() error
}

Message interface for an AMQP Delivery.

type OptionFunc

type OptionFunc func(*Component) error

OptionFunc definition for configuring the component in a functional way.

func Batching

func Batching(count uint, timeout time.Duration) OptionFunc

Batching option for setting up batching. Allowed values for count is > 1 and timeout > 0.

func Config

func Config(cfg amqp.Config) OptionFunc

Config option for setting AMQP configuration.

func Requeue

func Requeue(requeue bool) OptionFunc

Requeue option for adjusting the requeue policy of a message.

func Retry

func Retry(count uint, delay time.Duration) OptionFunc

Retry option for setting up retries.

func StatsInterval

func StatsInterval(interval time.Duration) OptionFunc

StatsInterval option for setting the interval to retrieve statistics.

type ProcessorFunc

type ProcessorFunc func(context.Context, Batch)

ProcessorFunc definition of an async processor.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL