mailbox

package module
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 24, 2023 License: Apache-2.0 Imports: 9 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// MessageConsumedKey is the key used to record the number of messages
	// successfully consumed.
	MessageConsumedKey = "messages_consumed_total"
	// MessageConsumedFailedKey is the key used to record the number of
	// messages that failed to be consumed.
	MessageConsumedFailedKey = "messages_consumed_failed_total"
)
View Source
const (
	DefaultMaxAttempts    = 5
	DefaultAttemptTimeout = 10 * time.Second
)

DefaultMaxAttempts is the default maximum number of attempts.

View Source
const MaxExponentialBackoff = 5 * time.Minute

Variables

View Source
var ErrNoMessage = errors.New("no message in mailbox")

Error returned when the mailbox is empty.

View Source
var (
	// ErrNoRouteMatch is returned when no route matches the message.
	ErrNoRouteMatch = errors.New("no route matches the message")
)
View Source
var (
	// ErrNoTx is returned when the transaction is not found in the context.
	ErrNoTx = errors.New("no transaction found in context")
)

Functions

func ExponentialBackoff

func ExponentialBackoff(i int, _ Message) time.Duration

Follows kubernetes exponential backoff, i.e. 10s, 20s, 40s, ..., capped at 5 minutes.

Types

type ConsumeFn

type ConsumeFn = func(context.Context, Message) error

ConsumeFn is the function that is called by the consumer on a message. If the function returns an error, the message is re-queued and will be processed again until success.

func RoutingConsumer

func RoutingConsumer(routes ...Route) ConsumeFn

RoutingConsumer returns a ConsumeFn that routes the message to the first route that matches the message according to the order of the routes. If no route matches the message, it returns ErrNoRouteMatch.

func WithMoveToMailbox

func WithMoveToMailbox(m Mailbox) ConsumeFn

WithMoveToMailbox returns a ConsumeFn that moves the message to the mailbox. This can be paired with WithRetryConsume to implement a dead-letter queue when this ConsumeFn is used as the Final function. In order to use this ConsumeFn, the consumer's transactor must be configured with recursive transactions. The enqueueing of the message is done in the same transaction as the processing of the message. This ensures that the message is not lost if the transaction is rolled back.

type ConsumeMiddleware

type ConsumeMiddleware = func(ConsumeFn) ConsumeFn

ConsumeMiddleware is a function that wraps a ConsumeFn. It is used to implement middleware that can be applied to the ConsumeFn.

func WithObservabilityConsume

func WithObservabilityConsume(p ObservabilityPolicy) ConsumeMiddleware

WithObservabilityConsume returns a ConsumeMiddleware that wraps the ConsumeFn with observability, notably logging and metrics. See ObservabilityPolicy for more details. Ideally, this middleware should immediately wrap the ConsumeFn such that retry attempts are also logged.

func WithRetryPolicyConsume

func WithRetryPolicyConsume(p RetryPolicy) ConsumeMiddleware

WithRetryConsume returns a ConsumeMiddleware that wraps the ConsumeFn with a retry policy. See RetryPolicy for more details.

type Consumer

type Consumer interface {
	// Consume consumes messages from the mailbox. It is safe to call this
	// method concurrently. This method is meant to be called in a loop, the
	// caller is responsible to apply back-pressure if needed.
	//
	// It is recommended that the function:
	//   - is idempotent, it may be called multiple times for the same message
	//   - returns quickly to avoid holding the row lock for too long
	//   - does not call other methods on the consumer to avoid deadlocks
	//
	// The following errors are returned:
	//   - nil: a message was successfully consumed
	//   - ErrNoMessage: the mailbox is empty and no message was consumed
	// 	 - Any other error: an error occurred while consuming a message
	//
	// The consumer does not have any dead-letter mechanism. If the function
	// returns an error, the message is re-queued and will be processed again.
	// It is the responsibility of the caller to handle a maximum number of
	// retries and/or to move the message to a dead-letter queue, see the
	// various middlewares for more details, e.g. WithTimeoutConsume,
	// WithRetryPolicyConsume.
	Consume(context.Context) error

	// Size returns the number of messages in the mailbox.
	Size(context.Context) (size int64, err error)
}

Consumer consumes messages from the mailbox. Once a message is processed, it is removed from the mailbox. The draining is controlled by the caller via the Consume method. The caller is responsible to call this method in a loop. The consumer does not have any background goroutines.

func NewConsumer

func NewConsumer(ctx context.Context, transactor tx.Transactor, table string, consume ConsumeFn) (Consumer, error)

NewConsumer creates a new consumer. The table must exist and have the same schema as required by Mailbox. The consumer does not have any background goroutines, the caller is responsible to drive the draining in an infinite loop.

The context is not persisted and is only used to validate the database connection and schema validation.

type Mailbox

type Mailbox interface {
	// Put adds a message to the mailbox. If the method returns an error,
	// the message is not added to the mailbox and caller is responsible
	// for retrying and/or rolling back the transaction. The transaction is
	// explicitly passed allowing the caller to control the transaction boundaries.
	Put(context.Context, *sql.Tx, Message) error
}

Mailbox is a message queue backed by Postgres.

func NewMailbox

func NewMailbox(table string) Mailbox

NewMailbox creates a new mailbox. The table must exist and have the following schema:

- id VARCHAR PRIMARY KEY - metadata JSONB - payload BYTEA - create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP

The table name must be fully qualified, for example: "public.mailbox".

type MatchFn

type MatchFn func(Message) bool

Filter is a function that returns true if the message should be given to this consumer.

type Message

type Message struct {
	// The message identifier.
	ID string
	// The message metadata.
	Metadata map[string]string
	// The message payload.
	Payload []byte
}

Message is a message in the mailbox.

type Metadata

type Metadata = internal.Metadata

type ObservabilityPolicy

type ObservabilityPolicy struct {
	// Metrics is the expvar map used to record metrics. If nil, no metrics
	// are recorded. Two metrics are recorded:
	//   - messages_consumed_total: the number of messages successfully consumed
	//   - messages_consumed_failed_total: the number of messages that failed to be consumed
	// The metrics keys are statically defined by MessageConsumedKey and
	// MessageConsumedFailedKey.
	Metrics *expvar.Map
	// Logger is the logger used to log events. If nil, no logging is done.
	// By default, only errors are logged. See LogSuccess for more details.
	Logger *slog.Logger
	// Attrs is the function that returns the attributes to log. If nil, no
	// attributes are extracted and logged. See slog's documentation for more
	// details on attributes.
	Attrs func(context.Context, Message) []any
	// If set, successful message consumption is logged. Errors are always
	// logged. If nil, no logging is done.
	LogSuccess bool
}

ObservabilityPolicy controls how a message is logged with the WithObservabilityConsume middleware. It also supports recording metrics. It exposes, at a high level, the following:

  • the logger used to log errors
  • the attributes to log
  • whether successful message consumption is logged
  • the expvar map used to record metrics

type RetryPolicy

type RetryPolicy struct {
	// Backoff is the function that returns the duration to wait before
	// retrying. The argument is the number of retries and the message. If
	// the function returns 0, it retries immediately. By default, it uses
	// an exponential backoff similar to Kubernetes, e.g. 10s, 20s, 40s, ...,
	// capped at 5 minutes.
	Backoff func(int, Message) time.Duration
	// Final is invoked after all retries have been exhausted. For example,
	// it could be used to move the message to a dead-letter queue, or it could
	// be used to log the message. If the function returns an error, the
	// message is retried. By default, it swallows the error. Note that this
	// function is not recovered from.
	Final ConsumeFn
	// MaxAttempts is the maximum number of attempts before giving up. If the
	// value not strictly positive, it is set to DefaultMaxAttempts.
	MaxAttempts int
	// AttemptTimeout is the timeout for each attempt. If the value is not
	// strictly positive, it is set to DefaultAttemptTimeout.
	AttemptTimeout time.Duration
}

RetryPolicy controls how a message is retried with the WithRetryConsume middleware. It exposes, at a high level, the following:

  • the backoff function
  • the final function, invoked after all retries have been exhausted
  • the maximum number of attempts
  • the timeout for each attempt
  • it recovers from panics and return an error

type Route

type Route struct {
	Match   MatchFn
	Consume ConsumeFn
}

Route is a pair of MatchFn and ConsumeFn. They define the routing rules for a message.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL