rabbitmq

package module
v0.0.0-...-8793eb2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 16, 2023 License: MIT Imports: 7 Imported by: 0

README

Overview

Main goal of this repository is to be able automatically re-connect to rabbitmq and resume consuming messages.

  • auto-reconnection
  • restore-consumers

All connection, publishing, consuming logic here are just handy wrappers of "github.com/rabbitmq/amqp091-go". I assume that you have a good understanding of rabbitmq and leave you a full example of how it may look like in your project in cmd directory. Otherwise, start learning here: https://www.rabbitmq.com/getstarted.html

Install

go get -u github.com/alifcapital/rabbitmq

Contributing

Pull requests are welcome. For any changes, please open an issue first to discuss what you would like to change.

Documentation

Overview

Package rabbitmq - all structs like SmthParams are wrappers for amqp091-go library function arguments for documentation and usage examples see: - https://github.com/rabbitmq/amqp091-go - https://www.rabbitmq.com/documentation.html

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Dial

func Dial(cfg DialConfig) (*amqp.Connection, error)

Dial a handy wrapper for base "github.com/rabbitmq/amqp091-go" DialConfig function

Types

type Client

type Client struct {
	// contains filtered or unexported fields
}

func NewClient

func NewClient(cfg ClientConfig) (*Client, error)

func (*Client) Close

func (c *Client) Close() error

func (*Client) Consume

func (c *Client) Consume(consumer AMQPConsumer) error

func (*Client) Publish

func (c *Client) Publish(ctx context.Context, exchange, key string, mandatory, immediate bool, msg amqp.Publishing) error

type ClientConfig

type ClientConfig struct {
	// this callback will be invoked whenever network failure happens or node shuts down
	NetworkErrCallback func(*amqp.Error)
	// note that this interval is taken into account when on reconnecting multiple times in row
	AutoRecoveryInterval time.Duration
	// this callback will be invoked whenever connection retry fails or resource freeing returns error
	// returning value as bool should indicate whether to keep retrying recovery or not
	AutoRecoveryErrCallback func(error) bool
	// this callback will be called if restoring a consumer fails after recovery procedure
	ConsumerAutoRecoveryErrCallback func(AMQPConsumer, error)

	// configurations for setting up dial and new connection
	DialConfig

	PublisherConfirmEnabled bool
	PublisherConfirmNowait  bool

	ConsumerQos          int
	ConsumerPrefetchSize int
	ConsumerGlobal       bool
}

type ConsumerFunc

type ConsumerFunc func(ctx context.Context, msg amqp.Delivery)

func (ConsumerFunc) Consume

func (f ConsumerFunc) Consume(ctx context.Context, msg amqp.Delivery)

type ConsumerParams

type ConsumerParams struct {
	RoutingKeys []string
	ConsumerID  string
	AutoAck     bool
	Exclusive   bool
	NoLocal     bool
	Nowait      bool
	Args        amqp.Table
}

type DialConfig

type DialConfig struct {
	User       string
	Password   string
	Host       string
	Port       string
	AMQPConfig amqp.Config
}

type ExchangeParams

type ExchangeParams struct {
	Name       string
	Type       string
	Durable    bool
	AutoDelete bool
	Internal   bool
	Nowait     bool
	Args       amqp.Table
	// custom flag
	DeclareExchange bool
}

type IConsumer

type IConsumer interface {
	Consume(ctx context.Context, msg amqp.Delivery)
}

An IConsumer handles an amqp message (delivery - in terms of github.com/rabbitmq/amqp091-go library).

Underlying implementations are responsible for either `Acknowledge` or `Reject` of the message.

HINT: one can consume messages the same way as http request handling via http.Handler i.e. building chain of middlewares for: panic-handling, logging, tracing and error-handling ...etc.

type LogConsumer

type LogConsumer struct {
	// contains filtered or unexported fields
}

LogConsumer consumer which logs incoming messages this is just an example, not aimed for production usage

func (*LogConsumer) Consume

func (l *LogConsumer) Consume(ctx context.Context, msg amqp.Delivery)

type QueueBindParams

type QueueBindParams struct {
	Nowait bool
	Args   amqp.Table
}

type QueueParams

type QueueParams struct {
	Name       string
	Durable    bool
	AutoDelete bool
	Exclusive  bool
	Nowait     bool
	Args       amqp.Table
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL