rabbitmq

package module
v1.17.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 29, 2023 License: MIT Imports: 15 Imported by: 0

Documentation

Index

Constants

View Source
const (
	JsonContentType = "application/json"
)

Variables

View Source
var (
	NullableChannelError                      = NewRabbitMQError("channel cant be null")
	NotFoundQueueDefinitionError              = NewRabbitMQError("not found queue definition")
	InvalidDispatchParamsError                = NewRabbitMQError("register dispatch with invalid parameters")
	QueueDefinitionNotFoundError              = NewRabbitMQError("any queue definition was founded to the given queue")
	ReceivedMessageWithUnformattedHeaderError = NewRabbitMQError("received message with unformatted headers")
	RetryableError                            = NewRabbitMQError("error to process this message, retry latter")
)

Functions

func LogMessage added in v1.0.258

func LogMessage(msg ...string) string

func NewDispatcher

func NewDispatcher(logger logging.Logger, channel AMQPChannel, queueDefinitions map[string]*QueueDefinition) *dispatcher

func NewPublisher added in v1.0.258

func NewPublisher(logger logging.Logger, configs *configs.Configs, channel AMQPChannel) *publisher

func NewRabbitMQError added in v1.0.258

func NewRabbitMQError(msg string) error

func NewTopology

func NewTopology(l logging.Logger) *topology

Types

type AMQPChannel

type AMQPChannel interface {
	ExchangeDeclare(name, kind string, durable, autoDelete, internal, noWait bool, args amqp.Table) error
	ExchangeBind(destination, key, source string, noWait bool, args amqp.Table) error
	QueueDeclare(name string, durable, autoDelete, exclusive, noWait bool, args amqp.Table) (amqp.Queue, error)
	QueueBind(name, key, exchange string, noWait bool, args amqp.Table) error
	Consume(queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args amqp.Table) (<-chan amqp.Delivery, error)
	Publish(exchange, key string, mandatory, immediate bool, msg amqp.Publishing) error
}

func NewChannel added in v1.0.258

func NewChannel(cfg *configs.RabbitMQConfigs, logger logging.Logger) (AMQPChannel, error)

type AMQPConnection

type AMQPConnection interface {
	Channel() (*amqp.Channel, error)
}

type ConsumerDefinition added in v1.0.258

type ConsumerDefinition struct {
	// contains filtered or unexported fields
}

type ConsumerHandler

type ConsumerHandler = func(ctx context.Context, msg any, metadata any) error

type Dispatcher

type Dispatcher interface {
	Register(queue string, msg any, handler ConsumerHandler) error
	ConsumeBlocking(ch chan os.Signal)
}

type ExchangeBindingDefinition added in v1.0.258

type ExchangeBindingDefinition struct {
	// contains filtered or unexported fields
}

func NewExchangeBiding added in v1.0.258

func NewExchangeBiding() *ExchangeBindingDefinition

type ExchangeDefinition added in v1.0.258

type ExchangeDefinition struct {
	// contains filtered or unexported fields
}

func NewDirectExchange added in v1.0.258

func NewDirectExchange(name string) *ExchangeDefinition

func NewDirectExchanges added in v1.0.258

func NewDirectExchanges(names []string) []*ExchangeDefinition

func NewFanoutExchange added in v1.0.258

func NewFanoutExchange(name string) *ExchangeDefinition

func NewFanoutExchanges added in v1.0.258

func NewFanoutExchanges(names []string) []*ExchangeDefinition

func (*ExchangeDefinition) Delete added in v1.0.258

func (*ExchangeDefinition) Durable added in v1.0.258

func (*ExchangeDefinition) Params added in v1.0.258

func (e *ExchangeDefinition) Params(p map[string]any) *ExchangeDefinition

type ExchangeKind

type ExchangeKind string
var (
	FanoutExchange ExchangeKind = "fanout"
	DirectExchange ExchangeKind = "direct"
)

func (ExchangeKind) String added in v1.0.258

func (k ExchangeKind) String() string

type Publisher added in v1.0.258

type Publisher interface {
	SimplePublish(ctx context.Context, target string, msg any) error
	Publish(ctx context.Context, exchange, key string, msg any) error
}

type QueueBindingDefinition added in v1.0.258

type QueueBindingDefinition struct {
	// contains filtered or unexported fields
}

func NewQueueBinding added in v1.0.258

func NewQueueBinding() *QueueBindingDefinition

func (*QueueBindingDefinition) Exchange added in v1.0.258

func (*QueueBindingDefinition) Queue added in v1.0.258

func (*QueueBindingDefinition) RoutingKey added in v1.0.258

type QueueDefinition added in v1.0.258

type QueueDefinition struct {
	// contains filtered or unexported fields
}

func NewQueue

func NewQueue(name string) *QueueDefinition

func (*QueueDefinition) DLQName added in v1.0.258

func (q *QueueDefinition) DLQName() string

func (*QueueDefinition) Delete added in v1.0.258

func (q *QueueDefinition) Delete(d bool) *QueueDefinition

func (*QueueDefinition) Durable added in v1.0.258

func (q *QueueDefinition) Durable(d bool) *QueueDefinition

func (*QueueDefinition) Exclusive added in v1.0.258

func (q *QueueDefinition) Exclusive(e bool) *QueueDefinition

func (*QueueDefinition) RetryName added in v1.0.258

func (q *QueueDefinition) RetryName() string

func (*QueueDefinition) WithDQL added in v1.0.258

func (q *QueueDefinition) WithDQL() *QueueDefinition

func (*QueueDefinition) WithRetry added in v1.0.258

func (q *QueueDefinition) WithRetry(ttl time.Duration, retries int64) *QueueDefinition

func (*QueueDefinition) WithTTL added in v1.0.258

func (q *QueueDefinition) WithTTL(ttl time.Duration) *QueueDefinition

type RabbitMQError added in v1.0.258

type RabbitMQError struct {
	// contains filtered or unexported fields
}

func (*RabbitMQError) Error added in v1.0.258

func (e *RabbitMQError) Error() string

type Topology

type Topology interface {
	Channel(c AMQPChannel) Topology
	Queue(q *QueueDefinition) Topology
	Queues(queues []*QueueDefinition) Topology
	Exchange(e *ExchangeDefinition) Topology
	Exchanges(e []*ExchangeDefinition) Topology
	ExchangeBinding(b *ExchangeBindingDefinition) Topology
	QueueBinding(b *QueueBindingDefinition) Topology
	GetQueuesDefinition() map[string]*QueueDefinition
	GetQueueDefinition(queueName string) (*QueueDefinition, error)
	Apply() error
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL