Documentation ¶
Index ¶
- Variables
- type AcknowledgementType
- type ClientConfig
- type Consumer
- type ConsumerConfig
- type ExchangeConfig
- type Handler
- type HandlerAcknowledgement
- type Message
- type MessageArgs
- type Metric
- type NullMetric
- func (n *NullMetric) ObserveAck(success bool)
- func (n *NullMetric) ObserveMsgDelivered()
- func (n *NullMetric) ObserveMsgPublish(success bool)
- func (n *NullMetric) ObserveNack(success bool)
- func (n *NullMetric) ObserveRabbitMQChanelConnection()
- func (n *NullMetric) ObserveRabbitMQChanelConnectionFailed()
- func (n *NullMetric) ObserveRabbitMQChanelConnectionRetry()
- func (n *NullMetric) ObserveRabbitMQConnection()
- func (n *NullMetric) ObserveRabbitMQConnectionFailed()
- func (n *NullMetric) ObserveRabbitMQConnectionRetry()
- func (n *NullMetric) ObserveReject(success bool)
- type Producer
- type QueueBindConfig
- type QueueConfig
- type RabbitMQClient
- type RabbitMQClientInterface
- type RetryableConsumer
- type RetryableConsumerConfig
- type RetryableProducer
- type RetryableProducerConfig
- type Setup
Constants ¶
This section is empty.
Variables ¶
View Source
var ErrProducerConnection = errors.New("RMQ producer has already closed the connection")
Functions ¶
This section is empty.
Types ¶
type AcknowledgementType ¶
type AcknowledgementType int
const ( Ack AcknowledgementType = iota Nack Reject )
type ClientConfig ¶
type ClientConfig struct { // ConnectionURI is the string used to connect to rabbitmq, e.g `amqp://...` ConnectionURI string // Metric is an interface to collect metrics about the client and consumer // There is NullMetric struct if you want to skip them Metric Metric // ConnectRetryAttempts number of attempts to try and dial the rabbitmq, and create a channel ConnectRetryAttempts int // InitialReconnectDelay delay between each attempt InitialReconnectDelay time.Duration }
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
func NewConsumer ¶
func NewConsumer( client RabbitMQClientInterface, handler Handler, logger logger.StructuredLogger, metric Metric, cfg ConsumerConfig, ) *Consumer
type ConsumerConfig ¶
type ConsumerConfig struct { // PrefetchCount configures how many in-flight "deliveries" are available to the consumer to ack/nack. // ref: https://www.rabbitmq.com/consumer-prefetch.html // There's no default value for the reason that it's very easy to misuse RMQ and have multiple consumers // with too many deliveries in flight which results into badly distributed work load and high memory footprint // of the consumers. PrefetchCount int }
type ExchangeConfig ¶
type Handler ¶
type Handler interface { GetQueueName() string GetConsumerTag() string QueueAutoAck() bool ExclusiveConsumer() bool MustStopOnAckError() bool MustStopOnNAckError() bool MustStopOnRejectError() bool WaitToConsumeInflight() bool ReceiveMessage(ctx context.Context, msg *Message) (acknowledgement HandlerAcknowledgement, err error) }
type HandlerAcknowledgement ¶
type HandlerAcknowledgement struct { Acknowledgement AcknowledgementType Requeue bool }
type Message ¶
type Message struct { // The application specific payload of the message Body []byte // Correlation identifier CorrelationID string // Message headers Headers map[string]interface{} }
Message contains data that is specific to the consumed RabbitMQ message.
type MessageArgs ¶
type MessageArgs struct { // Application or exchange specific fields, // the headers exchange will inspect this field. Headers amqp.Table // Correlation identifier CorrelationID string }
MessageArgs captures the fields related to the message sent to the server.
type Metric ¶
type Metric interface { ObserveRabbitMQConnectionFailed() ObserveRabbitMQConnectionRetry() ObserveRabbitMQConnection() ObserveRabbitMQChanelConnectionFailed() ObserveRabbitMQChanelConnectionRetry() ObserveRabbitMQChanelConnection() ObserveMsgDelivered() ObserveAck(success bool) ObserveNack(success bool) ObserveReject(success bool) ObserveMsgPublish(success bool) }
type NullMetric ¶
type NullMetric struct{}
func (*NullMetric) ObserveAck ¶
func (n *NullMetric) ObserveAck(success bool)
func (*NullMetric) ObserveMsgDelivered ¶
func (n *NullMetric) ObserveMsgDelivered()
func (*NullMetric) ObserveMsgPublish ¶
func (n *NullMetric) ObserveMsgPublish(success bool)
func (*NullMetric) ObserveNack ¶
func (n *NullMetric) ObserveNack(success bool)
func (*NullMetric) ObserveRabbitMQChanelConnection ¶
func (n *NullMetric) ObserveRabbitMQChanelConnection()
func (*NullMetric) ObserveRabbitMQChanelConnectionFailed ¶
func (n *NullMetric) ObserveRabbitMQChanelConnectionFailed()
func (*NullMetric) ObserveRabbitMQChanelConnectionRetry ¶
func (n *NullMetric) ObserveRabbitMQChanelConnectionRetry()
func (*NullMetric) ObserveRabbitMQConnection ¶
func (n *NullMetric) ObserveRabbitMQConnection()
func (*NullMetric) ObserveRabbitMQConnectionFailed ¶
func (n *NullMetric) ObserveRabbitMQConnectionFailed()
func (*NullMetric) ObserveRabbitMQConnectionRetry ¶
func (n *NullMetric) ObserveRabbitMQConnectionRetry()
func (*NullMetric) ObserveReject ¶
func (n *NullMetric) ObserveReject(success bool)
type Producer ¶
type Producer struct {
// contains filtered or unexported fields
}
func NewProducer ¶
func NewProducer(client RabbitMQClientInterface, logger logger.StructuredLogger, metric Metric) (*Producer, error)
type QueueBindConfig ¶
type QueueConfig ¶
type RabbitMQClient ¶
type RabbitMQClient struct {
// contains filtered or unexported fields
}
A simple client that tries to connect to rabbitmq and create a channel.
Does not attempt to reconnect if the connection drops.
func (*RabbitMQClient) Close ¶
func (c *RabbitMQClient) Close() error
func (*RabbitMQClient) CreateChannel ¶
type RabbitMQClientInterface ¶
type RabbitMQClientInterface interface { CreateChannel(ctx context.Context) (*amqp.Channel, error) Setup(ctx context.Context, setup *Setup) error Close() error }
func NewRabbitMQClient ¶
func NewRabbitMQClient(ctx context.Context, cfg *ClientConfig) (RabbitMQClientInterface, error)
type RetryableConsumer ¶
type RetryableConsumer struct {
// contains filtered or unexported fields
}
func NewRetryableConsumer ¶
func NewRetryableConsumer( newClientFactory func(ctx context.Context, config *ClientConfig) (RabbitMQClientInterface, error), config RetryableConsumerConfig, logger logger.StructuredLogger, metric Metric, handler Handler, ) *RetryableConsumer
type RetryableConsumerConfig ¶
type RetryableConsumerConfig struct { MaxRetryAttempts int // healthCheckFactor is a number representing how much N multiplied by backoffConfig.Max time is needed // for a block of code to run w/o returning an error, to consider it healthy. // E.g backConfig.Max = 1min, healthCheckFactor = 2, means that code needs to run 2min at least to be healthy // and retried again starting from backoffConfig.Base the next time it has an error. HealthCheckFactor int BackoffConfig *backoff.Config ConsumerConfig ConsumerConfig RabbitClientConfig *ClientConfig }
type RetryableProducer ¶
type RetryableProducer struct {
// contains filtered or unexported fields
}
func NewRetryableProducer ¶
func NewRetryableProducer( newClientFactory func(ctx context.Context, config *ClientConfig) (RabbitMQClientInterface, error), config RetryableProducerConfig, logger logger.StructuredLogger, metric Metric, ) *RetryableProducer
func (*RetryableProducer) Close ¶
func (p *RetryableProducer) Close()
func (*RetryableProducer) Publish ¶
func (p *RetryableProducer) Publish( exchange, key string, mandatory, immediate bool, expiration string, body []byte, args MessageArgs, ) error
type RetryableProducerConfig ¶
type RetryableProducerConfig struct { MaxRetryAttempts int // HealthCheckFactor is a number representing how much N multiplied by backoffConfig.Max time is needed // for a block of code to run w/o returning an error, to consider it healthy. // E.g backConfig.Max = 1min, healthCheckFactor = 2, means that code needs to run 2min at least to be healthy // and retried again starting from backoffConfig.Base the next time it has an error. HealthCheckFactor int BackoffConfig *backoff.Config RabbitClientConfig *ClientConfig }
type Setup ¶
type Setup struct { Exchanges []ExchangeConfig Queues []QueueConfig QueueBindings []QueueBindConfig }
Click to show internal directories.
Click to hide internal directories.