rabbitmq

package
v0.0.0-...-54413df Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 19, 2024 License: Apache-2.0 Imports: 11 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrProducerConnection = errors.New("RMQ producer has already closed the connection")

Functions

This section is empty.

Types

type AcknowledgementType

type AcknowledgementType int
const (
	Ack AcknowledgementType = iota
	Nack
	Reject
)

type ClientConfig

type ClientConfig struct {
	// ConnectionURI is the string used to connect to rabbitmq, e.g `amqp://...`
	ConnectionURI string
	// Metric is an interface to collect metrics about the client and consumer
	// There is NullMetric struct if you want to skip them
	Metric Metric
	// ConnectRetryAttempts number of attempts to try and dial the rabbitmq, and create a channel
	ConnectRetryAttempts int
	// InitialReconnectDelay delay between each attempt
	InitialReconnectDelay time.Duration
}

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

func NewConsumer

func NewConsumer(
	client RabbitMQClientInterface,
	handler Handler,
	logger logger.StructuredLogger,
	metric Metric,
	cfg ConsumerConfig,
) *Consumer

func (*Consumer) Run

func (c *Consumer) Run(ctx context.Context) error

type ConsumerConfig

type ConsumerConfig struct {
	// PrefetchCount configures how many in-flight "deliveries" are available to the consumer to ack/nack.
	// ref: https://www.rabbitmq.com/consumer-prefetch.html
	// There's no default value for the reason that it's very easy to misuse RMQ and have multiple consumers
	// with too many deliveries in flight which results into badly distributed work load and high memory footprint
	// of the consumers.
	PrefetchCount int
}

type ExchangeConfig

type ExchangeConfig struct {
	Name       string
	Kind       string
	Durable    bool
	AutoDelete bool
	Internal   bool
	NoWait     bool
	Args       amqp.Table
}

type Handler

type Handler interface {
	GetQueueName() string
	GetConsumerTag() string
	QueueAutoAck() bool
	ExclusiveConsumer() bool
	MustStopOnAckError() bool
	MustStopOnNAckError() bool
	MustStopOnRejectError() bool
	WaitToConsumeInflight() bool
	ReceiveMessage(ctx context.Context, msg *Message) (acknowledgement HandlerAcknowledgement, err error)
}

type HandlerAcknowledgement

type HandlerAcknowledgement struct {
	Acknowledgement AcknowledgementType
	Requeue         bool
}

type Message

type Message struct {
	// The application specific payload of the message
	Body []byte

	// Correlation identifier
	CorrelationID string
	// Message headers
	Headers map[string]interface{}
}

Message contains data that is specific to the consumed RabbitMQ message.

type MessageArgs

type MessageArgs struct {
	// Application or exchange specific fields,
	// the headers exchange will inspect this field.
	Headers amqp.Table

	// Correlation identifier
	CorrelationID string
}

MessageArgs captures the fields related to the message sent to the server.

type Metric

type Metric interface {
	ObserveRabbitMQConnectionFailed()
	ObserveRabbitMQConnectionRetry()
	ObserveRabbitMQConnection()

	ObserveRabbitMQChanelConnectionFailed()
	ObserveRabbitMQChanelConnectionRetry()
	ObserveRabbitMQChanelConnection()

	ObserveMsgDelivered()
	ObserveAck(success bool)
	ObserveNack(success bool)
	ObserveReject(success bool)
	ObserveMsgPublish(success bool)
}

type NullMetric

type NullMetric struct{}

func (*NullMetric) ObserveAck

func (n *NullMetric) ObserveAck(success bool)

func (*NullMetric) ObserveMsgDelivered

func (n *NullMetric) ObserveMsgDelivered()

func (*NullMetric) ObserveMsgPublish

func (n *NullMetric) ObserveMsgPublish(success bool)

func (*NullMetric) ObserveNack

func (n *NullMetric) ObserveNack(success bool)

func (*NullMetric) ObserveRabbitMQChanelConnection

func (n *NullMetric) ObserveRabbitMQChanelConnection()

func (*NullMetric) ObserveRabbitMQChanelConnectionFailed

func (n *NullMetric) ObserveRabbitMQChanelConnectionFailed()

func (*NullMetric) ObserveRabbitMQChanelConnectionRetry

func (n *NullMetric) ObserveRabbitMQChanelConnectionRetry()

func (*NullMetric) ObserveRabbitMQConnection

func (n *NullMetric) ObserveRabbitMQConnection()

func (*NullMetric) ObserveRabbitMQConnectionFailed

func (n *NullMetric) ObserveRabbitMQConnectionFailed()

func (*NullMetric) ObserveRabbitMQConnectionRetry

func (n *NullMetric) ObserveRabbitMQConnectionRetry()

func (*NullMetric) ObserveReject

func (n *NullMetric) ObserveReject(success bool)

type Producer

type Producer struct {
	// contains filtered or unexported fields
}

func NewProducer

func NewProducer(client RabbitMQClientInterface, logger logger.StructuredLogger, metric Metric) (*Producer, error)

func (*Producer) Close

func (p *Producer) Close() error

func (*Producer) Publish

func (p *Producer) Publish(
	exchange,
	key string,
	mandatory,
	immediate bool,
	expiration string,
	body []byte,
	args MessageArgs,
) error

type QueueBindConfig

type QueueBindConfig struct {
	Name     string
	Key      string
	Exchange string
	NoWait   bool
	Args     amqp.Table
}

type QueueConfig

type QueueConfig struct {
	Name       string
	Durable    bool
	AutoDelete bool
	Exclusive  bool
	NoWait     bool
	Args       amqp.Table
}

type RabbitMQClient

type RabbitMQClient struct {
	// contains filtered or unexported fields
}

A simple client that tries to connect to rabbitmq and create a channel.

Does not attempt to reconnect if the connection drops.

func (*RabbitMQClient) Close

func (c *RabbitMQClient) Close() error

func (*RabbitMQClient) CreateChannel

func (c *RabbitMQClient) CreateChannel(ctx context.Context) (*amqp.Channel, error)

func (*RabbitMQClient) Setup

func (c *RabbitMQClient) Setup(ctx context.Context, setup *Setup) error

type RabbitMQClientInterface

type RabbitMQClientInterface interface {
	CreateChannel(ctx context.Context) (*amqp.Channel, error)
	Setup(ctx context.Context, setup *Setup) error
	Close() error
}

func NewRabbitMQClient

func NewRabbitMQClient(ctx context.Context, cfg *ClientConfig) (RabbitMQClientInterface, error)

type RetryableConsumer

type RetryableConsumer struct {
	// contains filtered or unexported fields
}

func NewRetryableConsumer

func NewRetryableConsumer(
	newClientFactory func(ctx context.Context, config *ClientConfig) (RabbitMQClientInterface, error),
	config RetryableConsumerConfig,
	logger logger.StructuredLogger,
	metric Metric,
	handler Handler,
) *RetryableConsumer

func (*RetryableConsumer) Run

func (c *RetryableConsumer) Run(ctx context.Context) error

type RetryableConsumerConfig

type RetryableConsumerConfig struct {
	MaxRetryAttempts int
	// healthCheckFactor is a number representing how much N multiplied by backoffConfig.Max time is needed
	// for a block of code to run w/o returning an error, to consider it healthy.
	// E.g backConfig.Max = 1min, healthCheckFactor = 2, means that code needs to run 2min at least to be healthy
	// and retried again starting from backoffConfig.Base the next time it has an error.
	HealthCheckFactor  int
	BackoffConfig      *backoff.Config
	ConsumerConfig     ConsumerConfig
	RabbitClientConfig *ClientConfig
}

type RetryableProducer

type RetryableProducer struct {
	// contains filtered or unexported fields
}

func NewRetryableProducer

func NewRetryableProducer(
	newClientFactory func(ctx context.Context, config *ClientConfig) (RabbitMQClientInterface, error),
	config RetryableProducerConfig,
	logger logger.StructuredLogger,
	metric Metric,
) *RetryableProducer

func (*RetryableProducer) Close

func (p *RetryableProducer) Close()

func (*RetryableProducer) Publish

func (p *RetryableProducer) Publish(
	exchange,
	key string,
	mandatory,
	immediate bool,
	expiration string,
	body []byte,
	args MessageArgs,
) error

type RetryableProducerConfig

type RetryableProducerConfig struct {
	MaxRetryAttempts int
	// HealthCheckFactor is a number representing how much N multiplied by backoffConfig.Max time is needed
	// for a block of code to run w/o returning an error, to consider it healthy.
	// E.g backConfig.Max = 1min, healthCheckFactor = 2, means that code needs to run 2min at least to be healthy
	// and retried again starting from backoffConfig.Base the next time it has an error.
	HealthCheckFactor  int
	BackoffConfig      *backoff.Config
	RabbitClientConfig *ClientConfig
}

type Setup

type Setup struct {
	Exchanges     []ExchangeConfig
	Queues        []QueueConfig
	QueueBindings []QueueBindConfig
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL