Documentation ¶
Index ¶
- Constants
- Variables
- func LogMessage(msg string) string
- func LogMsgWithMessageId(msg, msgID string) (string, zapcore.Field)
- func LogMsgWithType(msg, typ, msgID string) (string, zapcore.Field)
- type AMQPChannel
- type AMQPConnection
- type BindingOpts
- type ConsumerHandler
- type DeadLetterOpts
- type DelayedOpts
- type DeliveryMetadata
- type Dispatcher
- type ExchangeKind
- type ExchangeOpts
- type FallbackType
- type IRabbitMQMessaging
- type MockAMQPChannel
- func (m *MockAMQPChannel) Consume(queue, consumer string, autoAck, exclusive, noLocal, noWait bool, ...) (<-chan amqp.Delivery, error)
- func (m *MockAMQPChannel) ExchangeBind(destination, key, source string, noWait bool, args amqp.Table) error
- func (m *MockAMQPChannel) ExchangeDeclare(name, kind string, durable, autoDelete, internal, noWait bool, args amqp.Table) error
- func (m *MockAMQPChannel) Publish(exchange, key string, mandatory, immediate bool, msg amqp.Publishing) error
- func (m *MockAMQPChannel) QueueBind(name, key, exchange string, noWait bool, args amqp.Table) error
- func (m *MockAMQPChannel) QueueDeclare(name string, durable, autoDelete, exclusive, noWait bool, args amqp.Table) (amqp.Queue, error)
- type MockAMQPConnection
- type MockRabbitMQMessaging
- func (m *MockRabbitMQMessaging) ApplyBinds() IRabbitMQMessaging
- func (m *MockRabbitMQMessaging) Build() (IRabbitMQMessaging, error)
- func (m *MockRabbitMQMessaging) Consume() error
- func (m *MockRabbitMQMessaging) Declare(opts *Topology) IRabbitMQMessaging
- func (m *MockRabbitMQMessaging) Publisher(exchange, routingKey string, msg any, opts *PublishOpts) error
- func (m *MockRabbitMQMessaging) RegisterDispatcher(event string, handler ConsumerHandler, t any) error
- type PublishOpts
- type QueueOpts
- type RabbitMQMessaging
- func (m *RabbitMQMessaging) ApplyBinds() IRabbitMQMessaging
- func (m *RabbitMQMessaging) Build() (IRabbitMQMessaging, error)
- func (m *RabbitMQMessaging) Consume() error
- func (m *RabbitMQMessaging) Declare(opts *Topology) IRabbitMQMessaging
- func (m *RabbitMQMessaging) Publisher(exchange, routingKey string, msg any, opts *PublishOpts) error
- func (m *RabbitMQMessaging) RegisterDispatcher(queue string, handler ConsumerHandler, t any) error
- type Retry
- type Topology
Constants ¶
View Source
const ( DIRECT_EXCHANGE ExchangeKind = "direct" FANOUT_EXCHANGE ExchangeKind = "fanout" TOPIC_EXCHANGE ExchangeKind = "topic" HEADERS_EXCHANGE ExchangeKind = "headers" DELAY_EXCHANGE ExchangeKind = "x-delayed-message" DLQ_FALLBACK FallbackType = "dlq" RETRY_FALLBACK FallbackType = "delayed" DeclareErrorMessage = "[RabbitMQ::Connect] failure to declare %s: %s" BindErrorMessage = "[RabbitMQ::Connect] failure to bind %s: %s" JsonContentType = "application/json" AMQPHeaderNumberOfRetry = "x-count" AMQPHeaderTraceID = "x-trace-id" AMQPHeaderDelay = "x-delay" )
Variables ¶
View Source
var ( ErrorConnection = errors.New("messaging failure to connect to rabbitmq") ErrorChannel = errors.New("messaging error to stablish amqp channel") ErrorRegisterDispatcher = errors.New("messaging unformatted dispatcher params") ErrorRetryable = errors.New("messaging failure to process send to retry latter") ErrorReceivedMessageValidator = errors.New("messaging unformatted received message") ErrorQueueDeclaration = errors.New("to use dql feature the bind exchanges must be declared first") )
Functions ¶
func LogMessage ¶
Types ¶
type AMQPChannel ¶
type AMQPChannel interface { ExchangeDeclare(name, kind string, durable, autoDelete, internal, noWait bool, args amqp.Table) error ExchangeBind(destination, key, source string, noWait bool, args amqp.Table) error QueueDeclare(name string, durable, autoDelete, exclusive, noWait bool, args amqp.Table) (amqp.Queue, error) QueueBind(name, key, exchange string, noWait bool, args amqp.Table) error Consume(queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args amqp.Table) (<-chan amqp.Delivery, error) Publish(exchange, key string, mandatory, immediate bool, msg amqp.Publishing) error }
AMQPChannel is an abstraction for AMQP default channel to improve unit tests
type AMQPConnection ¶
type BindingOpts ¶
type BindingOpts struct { RoutingKey string // contains filtered or unexported fields }
BindingOpts binds configuration
type ConsumerHandler ¶
type ConsumerHandler = func(msg any, metadata *DeliveryMetadata) error
ConsumerHandler
type DeadLetterOpts ¶
DeadLetterOpts parameters to configure DLQ
type DelayedOpts ¶
DelayedOpts parameters to configure retry queue exchange
type DeliveryMetadata ¶
type DeliveryMetadata struct { MessageId string XCount int64 Type string TraceId string Headers map[string]interface{} }
DeliveryMetadata amqp message received
type Dispatcher ¶
type Dispatcher struct { Queue string Topology *Topology MsgType string ReflectedType reflect.Value Handler ConsumerHandler }
Dispatcher struct to register an message handler
type ExchangeKind ¶
type ExchangeKind string
type ExchangeOpts ¶
type ExchangeOpts struct { Name string Type ExchangeKind Bindings []string }
ExchangeOpts exchanges to declare
type FallbackType ¶
type FallbackType string
type IRabbitMQMessaging ¶
type IRabbitMQMessaging interface { // Declare a new topology Declare(opts *Topology) IRabbitMQMessaging // Binding bind an exchange/queue with the following parameters without extra RabbitMQ configurations such as Dead Letter. ApplyBinds() IRabbitMQMessaging // Publish a message Publisher(exchange, routingKey string, msg any, opts *PublishOpts) error // Create a new goroutine to each dispatcher registered // // When messages came, some validations will be mad and based on the topology configured message could sent to dql or retry Consume() error // RegisterDispatcher Add the handler and msg type // // Each time a message came, we check the queue, and get the available handlers for that queue. // After we do a coercion of the msg type to check which handler expect this msg type RegisterDispatcher(event string, handler ConsumerHandler, t any) error // Build the topology configured Build() (IRabbitMQMessaging, error) }
IRabbitMQMessaging is RabbitMQ Builder
type MockAMQPChannel ¶
func NewMockAMQPChannel ¶
func NewMockAMQPChannel() *MockAMQPChannel
func (*MockAMQPChannel) ExchangeBind ¶
func (*MockAMQPChannel) ExchangeDeclare ¶
func (*MockAMQPChannel) Publish ¶
func (m *MockAMQPChannel) Publish(exchange, key string, mandatory, immediate bool, msg amqp.Publishing) error
func (*MockAMQPChannel) QueueDeclare ¶
type MockAMQPConnection ¶
func NewMockAMQPConnection ¶
func NewMockAMQPConnection() *MockAMQPConnection
type MockRabbitMQMessaging ¶
func NewMockRabbitMQMessaging ¶
func NewMockRabbitMQMessaging() *MockRabbitMQMessaging
func (*MockRabbitMQMessaging) ApplyBinds ¶
func (m *MockRabbitMQMessaging) ApplyBinds() IRabbitMQMessaging
func (*MockRabbitMQMessaging) Build ¶
func (m *MockRabbitMQMessaging) Build() (IRabbitMQMessaging, error)
func (*MockRabbitMQMessaging) Consume ¶
func (m *MockRabbitMQMessaging) Consume() error
func (*MockRabbitMQMessaging) Declare ¶
func (m *MockRabbitMQMessaging) Declare(opts *Topology) IRabbitMQMessaging
func (*MockRabbitMQMessaging) Publisher ¶
func (m *MockRabbitMQMessaging) Publisher(exchange, routingKey string, msg any, opts *PublishOpts) error
func (*MockRabbitMQMessaging) RegisterDispatcher ¶
func (m *MockRabbitMQMessaging) RegisterDispatcher(event string, handler ConsumerHandler, t any) error
type PublishOpts ¶
type PublishOpts struct { Type string Count int64 TraceId string MessageId string Delay time.Duration }
PUblishOpts
type RabbitMQMessaging ¶
type RabbitMQMessaging struct { Err error // contains filtered or unexported fields }
IRabbitMQMessaging is the implementation for IRabbitMQMessaging
func (*RabbitMQMessaging) ApplyBinds ¶
func (m *RabbitMQMessaging) ApplyBinds() IRabbitMQMessaging
func (*RabbitMQMessaging) Build ¶
func (m *RabbitMQMessaging) Build() (IRabbitMQMessaging, error)
func (*RabbitMQMessaging) Consume ¶
func (m *RabbitMQMessaging) Consume() error
func (*RabbitMQMessaging) Declare ¶
func (m *RabbitMQMessaging) Declare(opts *Topology) IRabbitMQMessaging
func (*RabbitMQMessaging) Publisher ¶
func (m *RabbitMQMessaging) Publisher(exchange, routingKey string, msg any, opts *PublishOpts) error
func (*RabbitMQMessaging) RegisterDispatcher ¶
func (m *RabbitMQMessaging) RegisterDispatcher(queue string, handler ConsumerHandler, t any) error
type Topology ¶
type Topology struct { Queue *QueueOpts Exchange *ExchangeOpts Binding *BindingOpts // contains filtered or unexported fields }
Topology used to declare and bind queue, exchanges. Configure dlq and retry
func (*Topology) ApplyBinds ¶
func (d *Topology) ApplyBinds()
func (*Topology) RemoveBinds ¶
func (d *Topology) RemoveBinds()
Click to show internal directories.
Click to hide internal directories.