rabbitmq

package module
v0.1.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 3, 2024 License: MIT Imports: 11 Imported by: 0

README

About The Project

Wrapper over golang amqp library that simplifies consumer and producer usage with additinal features. Inspired by gin

Key features:

  • Autoreconnect for amqp.Client
  • Message router for single queue deliveries with both direct and topic routings
  • Server abstraction over amqp consumer
  • Consumers graceful shutdown
  • Producer retries with backoff and circuit breaker

Installation

go get github.com/Maksumys/go-hare

Examples

See examples

License

Licensed under the MIT License

Documentation

Index

Constants

View Source
const (
	DefaultCircuitBreakerConsecutiveFailuresAllowed = 10
)

Variables

This section is empty.

Functions

This section is empty.

Types

type Connection

type Connection struct {
	*amqp.Connection
	// contains filtered or unexported fields
}

func NewConnection

func NewConnection(conn *amqp.Connection, factory ConnectionFactory, logger *slog.Logger) *Connection

func (*Connection) Close

func (c *Connection) Close()

func (*Connection) IsAlive

func (c *Connection) IsAlive() bool

func (*Connection) NotifyClose

func (c *Connection) NotifyClose(ch chan error) chan error

func (*Connection) NotifyReconnect

func (c *Connection) NotifyReconnect(ch chan error) chan error

type ConnectionFactory

type ConnectionFactory func() (*amqp.Connection, error)

type Consumer

type Consumer struct {
	Conn    *Connection
	Channel *amqp.Channel
	// contains filtered or unexported fields
}

func NewConsumer

func NewConsumer(conn *Connection, queueParams QueueParams, opts ...ConsumerOption) (*Consumer, error)

func (*Consumer) Consume

func (c *Consumer) Consume(consumer string, autoAck, exclusive, noLocal, noWait bool, args amqp.Table) <-chan amqp.Delivery

func (*Consumer) DeclaredQueue

func (c *Consumer) DeclaredQueue() amqp.Queue

type ConsumerOption

type ConsumerOption func(p *Consumer)

func WithConsumerQos

func WithConsumerQos(qos QualityOfService) ConsumerOption

func WithExchangeDeclare

func WithExchangeDeclare(exchangeParams ExchangeParams, queueBindingKey string) ConsumerOption

WithExchangeDeclare будет создавать exchange при создании канала подключения если queueBindingKey = "", биндинг будет осуществляться по названию очереди

type ConsumerParams

type ConsumerParams struct {
	ConsumerName string
	AutoAck      bool
	ConsumerArgs amqp.Table
}

type ControllerFunc

type ControllerFunc func(ctx *DeliveryContext)

type DeliveryContext

type DeliveryContext struct {
	context.Context
	Delivery amqp.Delivery
	Channel  *amqp.Channel

	Acked  bool
	Nacked bool
	// contains filtered or unexported fields
}

func NewDeliveryContext

func NewDeliveryContext(
	baseCtx context.Context,
	delivery amqp.Delivery,
	ch *amqp.Channel,
	handlers []ControllerFunc,
) *DeliveryContext

func (*DeliveryContext) Ack

func (c *DeliveryContext) Ack() bool

func (*DeliveryContext) BindJSON

func (c *DeliveryContext) BindJSON(ptr interface{}) error

func (*DeliveryContext) Get added in v0.1.2

func (c *DeliveryContext) Get(key string) (any, bool)

func (*DeliveryContext) Nack

func (c *DeliveryContext) Nack(requeue bool, err error) bool

func (*DeliveryContext) Next

func (c *DeliveryContext) Next()

func (*DeliveryContext) Set added in v0.1.2

func (c *DeliveryContext) Set(key string, value any)

type ExchangeParams

type ExchangeParams struct {
	Name       string
	Kind       string
	Durable    bool
	AutoDelete bool
	Internal   bool
	NoWait     bool
	Args       amqp.Table
}

type GroupOption

type GroupOption func(g *RouterGroup)

func WithNumWorkers

func WithNumWorkers(workers int) GroupOption

func WithRouterEngine

func WithRouterEngine(engine RouterEngine) GroupOption

type Publisher

type Publisher struct {
	Conn *Connection
	// contains filtered or unexported fields
}

func NewPublisher

func NewPublisher(connection *Connection, exchangeParams ExchangeParams, opts ...PublisherOption) (*Publisher, error)

func (*Publisher) Broken

func (p *Publisher) Broken() bool

Broken возвращает true в случае, если последовательное количество ошибок Publish > DefaultCircuitBreakerConsecutiveFailuresAllowed

func (*Publisher) Publish

func (p *Publisher) Publish(ctx context.Context, key string, mandatory, immediate bool, msg amqp.Publishing) (err error)

type PublisherOption

type PublisherOption func(p *Publisher)

func WithLogger

func WithLogger(logger *slog.Logger) PublisherOption

func WithQueueDeclaration

func WithQueueDeclaration(queueParams QueueParams, bindingKey string) PublisherOption

func WithRetries

func WithRetries() PublisherOption

type QualityOfService

type QualityOfService struct {
	PrefetchCount int
	PrefetchSize  int
}

type QueueParams

type QueueParams struct {
	Name       string
	Durable    bool
	AutoDelete bool
	Exclusive  bool
	NoWait     bool
	Args       amqp.Table
}

type Router

type Router struct {
	// contains filtered or unexported fields
}

func NewRouter

func NewRouter() *Router

func (*Router) Group

func (r *Router) Group(exchange ExchangeParams, queue QueueParams, qos QualityOfService, consumer ConsumerParams, opts ...GroupOption) *RouterGroup

Group обобщает транспорт consumer`а, объединяя в себе декларацию и параметры exchangeParams и queueParams

type RouterEngine

type RouterEngine interface {
	// AddBinding добавляет маршрутизацию контроллеров в роутер
	AddBinding(bindingKey string, controllers ...ControllerFunc)
	// Route подбирает все подходящие контроллеры по настроенным параметрам маршрутизации
	Route(routingKey string) []ControllerFunc
}

func NewDirectRouterEngine

func NewDirectRouterEngine() RouterEngine

func NewTopicRouterEngine

func NewTopicRouterEngine() RouterEngine

type RouterGroup

type RouterGroup struct {
	// contains filtered or unexported fields
}

func (*RouterGroup) Route

func (g *RouterGroup) Route(routingKey string, controllers ...ControllerFunc) *RouterGroup

Route регистрирует маршрутизацию сообщений для конкретной очереди в рамках одной RouterGroup При чтении из очереди сообщения будут маршрутизироваться в соответствующие контроллеры по параметру RoutingKey

type Server

type Server struct {
	Conn *Connection
	// contains filtered or unexported fields
}

func NewServer

func NewServer(conn *Connection, router *Router) *Server

func (*Server) ListenAndServe

func (s *Server) ListenAndServe(ctx context.Context) error

func (*Server) Shutdown

func (s *Server) Shutdown(ctx context.Context) error

Directories

Path Synopsis
examples
pkg

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL