rabbitmq

package
v0.0.0-...-56c5f8c Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 26, 2022 License: MIT Imports: 11 Imported by: 0

Documentation

Index

Constants

View Source
const (
	DIRECT_EXCHANGE  ExchangeKind = "direct"
	FANOUT_EXCHANGE  ExchangeKind = "fanout"
	TOPIC_EXCHANGE   ExchangeKind = "topic"
	HEADERS_EXCHANGE ExchangeKind = "headers"
	DELAY_EXCHANGE   ExchangeKind = "x-delayed-message"

	DLQ_FALLBACK   FallbackType = "dlq"
	RETRY_FALLBACK FallbackType = "delayed"

	DeclareErrorMessage = "[RabbitMQ::Connect] failure to declare %s: %s"
	BindErrorMessage    = "[RabbitMQ::Connect] failure to bind %s: %s"

	JsonContentType = "application/json"

	AMQPHeaderNumberOfRetry = "x-count"
	AMQPHeaderTraceID       = "x-trace-id"
	AMQPHeaderDelay         = "x-delay"
)

Variables

View Source
var (
	ErrorConnection               = errors.New("messaging failure to connect to rabbitmq")
	ErrorChannel                  = errors.New("messaging error to stablish amqp channel")
	ErrorRegisterDispatcher       = errors.New("messaging unformatted dispatcher params")
	ErrorRetryable                = errors.New("messaging failure to process send to retry latter")
	ErrorReceivedMessageValidator = errors.New("messaging unformatted received message")
	ErrorQueueDeclaration         = errors.New("to use dql feature the bind exchanges must be declared first")
)

Functions

func LogMessage

func LogMessage(msg string) string

func LogMsgWithMessageId

func LogMsgWithMessageId(msg, msgID string) (string, zapcore.Field)

func LogMsgWithType

func LogMsgWithType(msg, typ, msgID string) (string, zapcore.Field)

Types

type AMQPChannel

type AMQPChannel interface {
	ExchangeDeclare(name, kind string, durable, autoDelete, internal, noWait bool, args amqp.Table) error
	ExchangeBind(destination, key, source string, noWait bool, args amqp.Table) error
	QueueDeclare(name string, durable, autoDelete, exclusive, noWait bool, args amqp.Table) (amqp.Queue, error)
	QueueBind(name, key, exchange string, noWait bool, args amqp.Table) error
	Consume(queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args amqp.Table) (<-chan amqp.Delivery, error)
	Publish(exchange, key string, mandatory, immediate bool, msg amqp.Publishing) error
}

AMQPChannel is an abstraction for AMQP default channel to improve unit tests

type AMQPConnection

type AMQPConnection interface {
	Channel() (*amqp.Channel, error)
}

type BindingOpts

type BindingOpts struct {
	RoutingKey string
	// contains filtered or unexported fields
}

BindingOpts binds configuration

type ConsumerHandler

type ConsumerHandler = func(msg any, metadata *DeliveryMetadata) error

ConsumerHandler

type DeadLetterOpts

type DeadLetterOpts struct {
	QueueName    string
	ExchangeName string
	RoutingKey   string
}

DeadLetterOpts parameters to configure DLQ

type DelayedOpts

type DelayedOpts struct {
	QueueName    string
	ExchangeName string
	RoutingKey   string
}

DelayedOpts parameters to configure retry queue exchange

type DeliveryMetadata

type DeliveryMetadata struct {
	MessageId string
	XCount    int64
	Type      string
	TraceId   string
	Headers   map[string]interface{}
}

DeliveryMetadata amqp message received

type Dispatcher

type Dispatcher struct {
	Queue         string
	Topology      *Topology
	MsgType       string
	ReflectedType reflect.Value
	Handler       ConsumerHandler
}

Dispatcher struct to register an message handler

type ExchangeKind

type ExchangeKind string

type ExchangeOpts

type ExchangeOpts struct {
	Name     string
	Type     ExchangeKind
	Bindings []string
}

ExchangeOpts exchanges to declare

type FallbackType

type FallbackType string

type IRabbitMQMessaging

type IRabbitMQMessaging interface {
	// Declare a new topology
	Declare(opts *Topology) IRabbitMQMessaging

	// Binding bind an exchange/queue with the following parameters without extra RabbitMQ configurations such as Dead Letter.
	ApplyBinds() IRabbitMQMessaging

	// Publish a message
	Publisher(exchange, routingKey string, msg any, opts *PublishOpts) error

	// Create a new goroutine to each dispatcher registered
	//
	// When messages came, some validations will be mad and based on the topology configured message could sent to dql or retry
	Consume() error

	// RegisterDispatcher Add the handler and msg type
	//
	// Each time a message came, we check the queue, and get the available handlers for that queue.
	// After we do a coercion of the msg type to check which handler expect this msg type
	RegisterDispatcher(event string, handler ConsumerHandler, t any) error

	// Build the topology configured
	Build() (IRabbitMQMessaging, error)
}

IRabbitMQMessaging is RabbitMQ Builder

func New

func New(cfg *env.Configs, logger logging.ILogger) IRabbitMQMessaging

New(...) create a new instance for IRabbitMQMessaging

New(...) connect to the RabbitMQ broker and stablish a channel

type MockAMQPChannel

type MockAMQPChannel struct {
	mock.Mock
}

func NewMockAMQPChannel

func NewMockAMQPChannel() *MockAMQPChannel

func (*MockAMQPChannel) Consume

func (m *MockAMQPChannel) Consume(queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args amqp.Table) (<-chan amqp.Delivery, error)

func (*MockAMQPChannel) ExchangeBind

func (m *MockAMQPChannel) ExchangeBind(destination, key, source string, noWait bool, args amqp.Table) error

func (*MockAMQPChannel) ExchangeDeclare

func (m *MockAMQPChannel) ExchangeDeclare(name, kind string, durable, autoDelete, internal, noWait bool, args amqp.Table) error

func (*MockAMQPChannel) Publish

func (m *MockAMQPChannel) Publish(exchange, key string, mandatory, immediate bool, msg amqp.Publishing) error

func (*MockAMQPChannel) QueueBind

func (m *MockAMQPChannel) QueueBind(name, key, exchange string, noWait bool, args amqp.Table) error

func (*MockAMQPChannel) QueueDeclare

func (m *MockAMQPChannel) QueueDeclare(name string, durable, autoDelete, exclusive, noWait bool, args amqp.Table) (amqp.Queue, error)

type MockAMQPConnection

type MockAMQPConnection struct {
	mock.Mock
}

func NewMockAMQPConnection

func NewMockAMQPConnection() *MockAMQPConnection

func (*MockAMQPConnection) Channel

func (m *MockAMQPConnection) Channel() (*amqp.Channel, error)

type MockRabbitMQMessaging

type MockRabbitMQMessaging struct {
	mock.Mock
}

func NewMockRabbitMQMessaging

func NewMockRabbitMQMessaging() *MockRabbitMQMessaging

func (*MockRabbitMQMessaging) ApplyBinds

func (m *MockRabbitMQMessaging) ApplyBinds() IRabbitMQMessaging

func (*MockRabbitMQMessaging) Build

func (*MockRabbitMQMessaging) Consume

func (m *MockRabbitMQMessaging) Consume() error

func (*MockRabbitMQMessaging) Declare

func (*MockRabbitMQMessaging) Publisher

func (m *MockRabbitMQMessaging) Publisher(exchange, routingKey string, msg any, opts *PublishOpts) error

func (*MockRabbitMQMessaging) RegisterDispatcher

func (m *MockRabbitMQMessaging) RegisterDispatcher(event string, handler ConsumerHandler, t any) error

type PublishOpts

type PublishOpts struct {
	Type      string
	Count     int64
	TraceId   string
	MessageId string
	Delay     time.Duration
}

PUblishOpts

type QueueOpts

type QueueOpts struct {
	Name           string
	TTL            time.Duration
	Retryable      *Retry
	WithDeadLatter bool
}

QueueOpts declare queue configuration

type RabbitMQMessaging

type RabbitMQMessaging struct {
	Err error
	// contains filtered or unexported fields
}

IRabbitMQMessaging is the implementation for IRabbitMQMessaging

func (*RabbitMQMessaging) ApplyBinds

func (m *RabbitMQMessaging) ApplyBinds() IRabbitMQMessaging

func (*RabbitMQMessaging) Build

func (*RabbitMQMessaging) Consume

func (m *RabbitMQMessaging) Consume() error

func (*RabbitMQMessaging) Declare

func (m *RabbitMQMessaging) Declare(opts *Topology) IRabbitMQMessaging

func (*RabbitMQMessaging) Publisher

func (m *RabbitMQMessaging) Publisher(exchange, routingKey string, msg any, opts *PublishOpts) error

func (*RabbitMQMessaging) RegisterDispatcher

func (m *RabbitMQMessaging) RegisterDispatcher(queue string, handler ConsumerHandler, t any) error

type Retry

type Retry struct {
	NumberOfRetry int64
	DelayBetween  time.Duration
}

Retry

type Topology

type Topology struct {
	Queue    *QueueOpts
	Exchange *ExchangeOpts
	Binding  *BindingOpts
	// contains filtered or unexported fields
}

Topology used to declare and bind queue, exchanges. Configure dlq and retry

func (*Topology) ApplyBinds

func (d *Topology) ApplyBinds()

func (*Topology) RemoveBinds

func (d *Topology) RemoveBinds()

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL