rabbitmqstore

package module
v1.1.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 15, 2024 License: Apache-2.0 Imports: 10 Imported by: 0

README

RabbitMQ Store

rabbitmqstore is a Go package designed to simplify the usage of RabbitMQ for message-based communication between microservices. It provides an easy-to-use interface to create, manage, and interact with listeners for specific exchanges and queues, allowing developers to focus on implementing business logic rather than dealing with the intricacies of setting up and managing RabbitMQ connections and channels.

Features

  • Connection and channel management
  • Listener registration for specific exchanges, queues, and binding keys
  • Listener deregistration
  • Handler function updates for registered listeners
  • Access to the underlying RabbitMQ channel for custom operations

Installation

To install the rabbitmqstore package, use the following command:

go get -u github.com/problem-company-toolkit/rabbitmqstore

Usage

Import the package
import (
	"github.com/problem-company-toolkit/rabbitmqstore"
)
Create a new RabbitMQ store

Create a new RabbitMQ store instance by providing the RabbitMQ connection URL:

options := rabbitmqstore.Options{
	URL: "amqp://guest:guest@localhost:5672/",
}

store, err := rabbitmqstore.New(options)
if err != nil {
	panic(err)
}
Register a listener

Register a listener to handle messages for a specific exchange, queue, and binding key:

opts := rabbitmqstore.RegisterListenerOpts{
	Exchange:     "my.exchange",
	Queue:        "my.queue",
	BindingKey:   "my.binding.key",
	ExchangeType: "topic", // optional, defaults to "topic"
	Handler: func(d amqp091.Delivery) {
		fmt.Printf("Received message: %s\n", string(d.Body))
	},
}

listener, err := store.RegisterListener(opts)
if err != nil {
	panic(err)
}
Update the handler function

Update the handler function for an existing listener:

newHandler := func(d amqp091.Delivery) {
	fmt.Printf("New handler received message: %s\n", string(d.Body))
}

listener.UpdateHandler(newHandler)
Deregister a listener

Deregister a listener to stop processing messages for the associated exchange, queue, and binding key:

store.CloseListener(listener.GetID())
Access the underlying RabbitMQ channel

Access the underlying RabbitMQ channel to perform custom operations:

channel := store.GetChannel()

// Perform custom operations using the channel
Close all connections

Close all RabbitMQ connections and channels associated with the store:

err = store.CloseAll()
if err != nil {
	panic(err)
}

Logging

rabbitmqstore provides customizable logging capabilities to keep track of the events and errors occurring in the package. There are two ways to configure the logger:

  1. Passing in your own preconfigured *zap.Logger.
  2. Using the LoggerOpts field in the Options struct, allowing you to set the desired log level, log encoding, and custom logger.
  3. Setting the log level using the environment variable RABBITMQSTORE_LOG_LEVEL.
Configuring logger using LoggerOpts
import (
	"go.uber.org/zap"
	"go.uber.org/zap/zapcore"
)

// Custom logger configuration
loggerConfig := zap.Config{
	Level:             zap.NewAtomicLevelAt(zap.InfoLevel),
	Development:       false,
	DisableCaller:     true,
	DisableStacktrace: true,
	OutputPaths:       []string{"stdout"},
	ErrorOutputPaths:  []string{"stderr"},
	Encoding:          "json",
	EncoderConfig: zapcore.EncoderConfig{
		TimeKey:        "timestamp",
		LevelKey:       "level",
		MessageKey:     "message",
		CallerKey:      "caller",
		EncodeTime:     zapcore.ISO8601TimeEncoder,
		EncodeLevel:    zapcore.LowercaseLevelEncoder,
		EncodeDuration: zapcore.StringDurationEncoder,
		EncodeCaller:   zapcore.ShortCallerEncoder,
	},
}
logger, err := loggerConfig.Build()
if err != nil {
	panic(err)
}

options := rabbitmqstore.Options{
	URL: "amqp://guest:guest@localhost:5672/",
	LoggerOpts: rabbitmqstore.LoggerOptions{
		Logger: logger,
	},
}

store, err := rabbitmqstore.New(options)
if err != nil {
	panic(err)
}
Configuring logger using environment variable

Set the log level using the environment variable RABBITMQSTORE_LOG_LEVEL. Supported log levels are:

  • debug
  • info
  • warn
  • fatal
  • panic
  • dpanic

If an invalid log level is provided, the default log level (WarnLevel) will be used.

The package uses a default log level of WarnLevel and a default log encoding of console.

If the log level is set to debug, then rabbitmqstore will log all publish and received messages, including their contents.

Testing

You can test this package by running the command ginkgo while inside the devcontainer.

Documentation

Index

Constants

View Source
const (
	DEFAULT_LOG_LEVEL    = zapcore.WarnLevel
	DEFAULT_LOG_ENCODING = "console"
)

Variables

This section is empty.

Functions

This section is empty.

Types

type DeclareExchangeOpts

type DeclareExchangeOpts struct {
	// Required.
	Exchange string
	Durable  bool

	// Defaults to topic.
	Kind string
}

type Listener

type Listener interface {
	GetID() string
	GetExchange() string
	GetQueueName() string
	GetBindingKey() string
	GetExchangeType() string
	UpdateHandler(ListenerHandlerFunc)
}

type ListenerHandlerFunc

type ListenerHandlerFunc = func(amqp091.Delivery)

type LoggerOpts

type LoggerOpts struct {
	Logger   *zap.Logger
	Encoding string
	LogLevel *zapcore.Level
}

type MessageHandler

type MessageHandler func(amqp091.Delivery)

type Options

type Options struct {
	// Required if Connection is not provided.
	URL string

	// Required if URL is not provided.
	Connection *amqp091.Connection

	LoggerOpts LoggerOpts
}

type PublishOpts

type PublishOpts struct {
	Context    context.Context
	Exchange   string
	RoutingKey string

	Mandatory bool
	Immediate bool
	Message   amqp091.Publishing
}

type RegisterListenerOpts

type RegisterListenerOpts struct {
	Exchange     string
	Queue        string
	RoutingKey   string
	ExchangeType string
	Handler      func(amqp091.Delivery)
}

type Store

type Store interface {
	RegisterListener(RegisterListenerOpts) (Listener, error)
	CloseListener(id string)
	GetListeners() map[string]Listener

	// Declares a list of exchanges. Useful for initializing the exchanges that the store will use.
	DeclareExchanges([]DeclareExchangeOpts) error

	CloseAll() error

	// Retrieves the channel. But you should most likely not use this directly.
	// You already have access to publishing and consuming messages through the parent struct.
	// This is only for fringe cases where the basic Store functionality is not enough.
	GetChannel() *amqp091.Channel

	Publish(PublishOpts) error

	Reconnect() error
}

func New

func New(opts Options) (Store, error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL