mqutils

package
v0.0.0-...-8793eb2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 16, 2023 License: MIT Imports: 12 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewExchangeName

func NewExchangeName(appName, exchangeName string) string

NewExchangeName placeholder in format "{owner}.{name_you_desire}" example result: "subscriptions.primary"

func NewMessage

func NewMessage(id string, body []byte) amqp.Publishing

NewMessage assumes body to be json, for other formats write own factory function

func NewQueueName

func NewQueueName(appName, appEnv, queueName string) string

NewQueueName placeholder in format: "{owner}.{ENV}.{name_you_desire}" example result: "subscriptions.prod.primary" ENV specification help you keep testing queues separate from production, and also run any debugging listening on production data

func Publish

func Publish(ctx context.Context, exchange, key string, msg amqp.Publishing, client *rabbitmq.Client) error

Types

type AckFailedError

type AckFailedError struct {
	// contains filtered or unexported fields
}

func NewAckFailedError

func NewAckFailedError(msg amqp.Delivery) *AckFailedError

func (*AckFailedError) Error

func (err *AckFailedError) Error() string

type ErrorHandler

type ErrorHandler func(ctx context.Context, msg amqp.Delivery, err error) bool

ErrorHandler should handle error during Handler phase and return indicator of `requeue` i.e. should message be redelivered again

type Handler

type Handler func(ctx context.Context, msg amqp.Delivery) error

Handler is a function should be some kind of controller.Method where you can handle a kind of business logic or call underlying function and return error which will be handled by underlying ErrorHandler

type HandlerNotFoundError

type HandlerNotFoundError struct {
	// contains filtered or unexported fields
}

func NewHandlerNotFoundError

func NewHandlerNotFoundError(msg amqp.Delivery) *HandlerNotFoundError

func (*HandlerNotFoundError) Error

func (err *HandlerNotFoundError) Error() string

type Middleware

type Middleware func(next rabbitmq.IConsumer) rabbitmq.IConsumer

func NewConsumerTraceLoggerMid

func NewConsumerTraceLoggerMid() Middleware

NewConsumerTraceLoggerMid example of Middleware which logs a message into traces

func NewPanicRecoveryMiddleware

func NewPanicRecoveryMiddleware(cb PanicRecoveryCallback) Middleware

func NewTracerMiddleware

func NewTracerMiddleware() Middleware

type NackFailedError

type NackFailedError struct {
	// contains filtered or unexported fields
}

func NewNackFailedError

func NewNackFailedError(msg amqp.Delivery) *NackFailedError

func (*NackFailedError) Error

func (err *NackFailedError) Error() string

type PanicRecoveryCallback

type PanicRecoveryCallback func(ctx context.Context, msg amqp.Delivery, recErr any)

type Pool

type Pool struct {
	// contains filtered or unexported fields
}

Pool is a generic container for keeping rabbitmq clients

func NewPool

func NewPool() *Pool

func (*Pool) Close

func (p *Pool) Close() error

func (*Pool) Get

func (p *Pool) Get(clientName string) (*rabbitmq.Client, bool)

func (*Pool) Register

func (p *Pool) Register(cfg rabbitmq.ClientConfig) (*rabbitmq.Client, error)

func (*Pool) Set

func (p *Pool) Set(clientName string, client *rabbitmq.Client)

type Router

type Router struct {
	// contains filtered or unexported fields
}

func NewRouter

func NewRouter(errorHandler ErrorHandler, mids ...Middleware) *Router

func (*Router) Consume

func (r *Router) Consume(ctx context.Context, msg amqp.Delivery)

func (*Router) GetEventNames

func (r *Router) GetEventNames() []string

func (*Router) RegisterEventHandler

func (r *Router) RegisterEventHandler(eventName string, handler Handler, middlewares ...Middleware)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL