gorabbit

package module
v0.0.0-...-931c294 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 29, 2024 License: MIT Imports: 11 Imported by: 0

README

logo

🐇 gorabbit - RabbitMQ client for Golang

GoDoc License

📖 Introduction

gorabbit is a RabbitMQ client for Golang. It is a wrapper around rabbitmq/amqp091-go library. It provides a simple interface to interact with RabbitMQ. Also, it provides a simple way to create a consumer and a publisher that we call jobs. It's good to mention that gorabbit handles the reconnection and reconsuming the jobs automatically.

📦 Installation

go get github.com/n25a/gorabbit

⚙️ How to use

In the following, we will show you how to use gorabbit in your project.

🐇 Create a connection

In the first step, you should create a rabbitmq instance. Then, you can make a connection to the rabbitmq server.

import (
    "github.com/n25a/gorabbit"
)

func main() {
    // Create a rabbitmq instance 
    rabbit := gorabbit.NewRabbitMQ(
        dsn,
        dialTimeout,
        dialRetry,
        ctxTimeout,
        logLevel,
    ) 
    
    // Create a connection
    err := rabbit.Connect()
    if err != nil {
        panic(err)
    }
    
    // Close the connection
    defer rabbit.Close()
}

Each parameter of the NewRabbitMQ function is described below:

  • dsn: RabbitMQ connection string. It should be in the following format: amqp://user:password@host:port/vhost
  • dialTimeout: The timeout for dialing to the RabbitMQ server. The default value is 5 seconds.
  • dialRetry: The number of retries for dialing the RabbitMQ server. The default value is 0.
  • ctxTimeout: The timeout for the context of the consumer handler. The default value is 1 second.
  • logLevel: The log level for the gorabbit. The default value is info. It can be debug, info, warn, error, fatal, panic, dpanic.

This function creates an instance of the RabbitMQ struct. This struct has the following methods:

  • Connect: Create a connection to the RabbitMQ server.
  • Close: Close the connection to the RabbitMQ server.
  • Declare: Declare a queue and an exchange.
  • StartConsumingJobs: Start the consumers.
  • NewJob: Create a new job instance.
  • ShutdownJobs: Shutdown the consumers.
  • GetConnection: Get the connection to the RabbitMQ server.
  • GetChannel: Get the channel to the RabbitMQ server.

For connecting to the RabbitMQ server, you should call the Connect method. Also, you should call the Close method for closing the connection.

✍️ Declare a queue and an exchange

You should call the Declare method for declaring a queue and an exchange. This method has the following parameters:

  • exchangeOption: The exchange options. The ExchangeDeclareOption function can create it.
  • queueOption: The queue options. The QueueDeclareOption function can create it. It's good to mention that you can pass multiple queue options to this function for the exact exchange.
import (
    "github.com/n25a/gorabbit"
)

func main() {
    //...
    
    // Close the connection
    defer rabbit.Close()
    
    
    // Declare an exchange 
    exchangeOption := gorabbit.ExchangeDeclareOption(
        name,
        kind,
        durable,
        autoDelete,
        internal,
        noWait,
        args,
    )
	
    // Declare a queue
    queueOption := gorabbit.QueueDeclareOption(
        name,
        durable,
        autoDelete,
        exclusive,
        noWait,
        args,
    )
    
    // Declare a queue and an exchange for rabbit instance
    err = rabbit.Declare(exchangeOption, queueOption)
    if err != nil {
        panic(err)
    }
}
📩 Consume a job

To create a job instance, you should create a job instance. Then, you should call the StartConsumingJobs method for starting the consumers. This method consumes all jobs created by gorabbit.RabbitMQ instance.

Creating a new job instance has the following parameters:

  • handler: The handler function for consuming the job. It should be in the format: func(ctx context.Context, message []byte) error.
  • jobExchange: The exchange name for consuming the job.
  • jobQueue: The queue name for consuming the job.
  • autoAck: The auto ack for consuming the job.
  • justPublish: It's a flag for just publishing the job. It helps start all consumers and pass publishers' jobs.
import (
    "github.com/n25a/gorabbit"
)

func main() {
    //...
    
    // Declare a queue and an exchange for rabbit instance
    err = rabbit.Declare(exchangeOption, queueOption)
    if err != nil {
        panic(err)
    }
	
    // create a job instance
	job := rabbit.NewJob(
        handler,
        jobExchange,
        jobQueue,
        autoAck,
        justPublish,
    )
	
    // Create a consumers
    err = rabbit.StartConsumingJobs()
    if err != nil {
        panic(err)
    }
}
📨 Publish a job

To publish a job, you should create a job instance. Then, you should call the Publish method for publishing the job. This method, publishes the message on the declared job to the RabbitMQ server. The Publish method has the following parameters:

  • ctx: The context for publishing the job.
  • message: The message for publishing the job. It should be in the []byte format.
  • options: The options for publishing the job. The PublishOption function can create it. These optional options are described below:
    • WithContentType: The content type for publishing the job. The default value is text/json.
    • WithDelay: The delay in publishing the job. The default value is 0.
    • WithPriority: The priority for publishing the job. The default value is 0.
import (
    "github.com/n25a/gorabbit"
)

func main() {
    //...

    // Create a consumer
    err = rabbit.StartJobs()
    if err != nil {
        panic(err)
    }
	
    // Create a publisher
    msg, _ := json.Marshal(struct{
        Body string `json:"body"`
    }{
        Body: "test",
    })
	
    err = job.Publish(
        context.Background(),
        msg,
        gorabbit.WithContentType("text/json"),
        gorabbit.WithDelay(10),
        gorabbit.WithPriority(0),
    )
    if err != nil {
        panic(err)
    }
}

Documentation

Index

Constants

This section is empty.

Variables

View Source
var LogLevelStringToZapLevel = map[string]zapcore.Level{
	"debug":  zapcore.DebugLevel,
	"info":   zapcore.InfoLevel,
	"warn":   zapcore.WarnLevel,
	"error":  zapcore.ErrorLevel,
	"dpanic": zapcore.DPanicLevel,
	"panic":  zapcore.PanicLevel,
	"fatal":  zapcore.FatalLevel,
}

Functions

func InitLogger

func InitLogger(logLevel string)

Types

type Job

type Job interface {
	Consume(ctxTimeout time.Duration) error
	Publish(ctx context.Context, msg []byte, options ...PublishOption) error
}

type JobHandler

type JobHandler func(ctx context.Context, msg []byte) error

type Opt

type Opt func() interface{}

func ExchangeDeclareOption

func ExchangeDeclareOption(
	name string,
	kind string,
	durable bool,
	autoDelete bool,
	internal bool,
	noWait bool,
	args amqp.Table,
) Opt

func QueueDeclareOption

func QueueDeclareOption(
	name string,
	durable bool,
	autoDelete bool,
	exclusive bool,
	noWait bool,
	args amqp.Table,
) Opt

type PublishOption

type PublishOption func(p *amqp.Publishing) *amqp.Publishing

func WithContentType

func WithContentType(contentType string) PublishOption

func WithDelay

func WithDelay(delay int64) PublishOption

delay is milliseconds type

func WithPriority

func WithPriority(priority uint8) PublishOption

type RabbitMQ

type RabbitMQ interface {
	Connect() error
	Close()
	GetConnection() *amqp.Connection
	GetChannel() *amqp.Channel
	NewJob(handler JobHandler, jobExchange string, jobQueue string, autoAck bool, justPublish bool) Job
	StartConsumingJobs() error
	ShutdownJobs()
	Declare(exchangeOpt Opt, queueOpts ...Opt) error
}

func NewRabbitMQ

func NewRabbitMQ(
	dsn string,
	dialTimeout time.Duration,
	dialRetry uint,
	ctxTimeout time.Duration,
	logLevel string,
) RabbitMQ

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL