gorabbit

package module
v1.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 8, 2024 License: MIT Imports: 13 Imported by: 0

README

Gorabbit

logo

Gorabbit is a wrapper that provides high level and robust RabbitMQ operations through a client or a manager.

This wrapper depends on the official Go RabbitMQ plugin.

Installation

Go module
go get github.com/KardinalAI/gorabbit
Environment variables

The client's and manager's Mode can also be set via an environment variable that will override the manually entered value.

GORABBIT_MODE: debug    # possible values: release or debug

The client and manager can also be completely disabled via the following environment variable:

GORABBIT_DISABLED: true     # possible values: true, false, 1, or 0 

Always-on mechanism

Here is a visual representation of the always-on mechanism of a connection and channel when the KeepAlive flag is set to true.

Always on mechanism

Client

The gorabbit client offers 2 main functionalities:

  • Publishing
  • Consuming

Additionally, the client also provides a ready check and a health check.

Client initialization

A client can be initialized via the constructor NewClient. This constructor takes ClientOptions as an optional parameter.

Client options
Property Description Default Value
Host The hostname of the RabbitMQ server 127.0.0.1
Port The port of the RabbitMQ server 5672
Username The plain authentication username guest
Password The plain authentication password guest
Vhost The specific vhost to use when connection to CloudAMQP
UseTLS The flag that activates the use of TLS (amqps) false
KeepAlive The flag that activates retry and re-connect mechanisms true
RetryDelay The delay between each retry and re-connection 3 seconds
MaxRetry The max number of message retry if it failed to process 5
PublishingCacheTTL The time to live for a failed publish when set in cache 60 seconds
PublishingCacheSize The max number of failed publish to add into cache 128
Mode The mode defines whether logs are shown or not Release
Client with default options

Passing nil options will trigger the client to use default values (host, port, credentials, etc...) via DefaultClientOptions().

client := gorabbit.NewClient(nil)

You can also explicitly pass DefaultClientOptions() for a cleaner initialization.

client := gorabbit.NewClient(gorabbit.DefaultClientOptions())

Finally, passing a NewClientOptions() method also initializes default values if not overwritten.

client := gorabbit.NewClient(gorabbit.NewClientOptions())
Client with options from environment variables

You can instantiate a client from environment variables, without the need of manually specifying options in the code.

client := gorabbit.NewClientFromEnv()

Here are the following supported environment variables:

  • RABBITMQ_HOST: Defines the host,
  • RABBITMQ_PORT: Defines the port,
  • RABBITMQ_USERNAME: Defines the username,
  • RABBITMQ_PASSWORD: Defines the password,
  • RABBITMQ_VHOST: Defines the vhost,
  • RABBITMQ_USE_TLS: Defines whether to use TLS or no.

Note that environment variables are all optional, so missing keys will be replaced by their corresponding default.

Client with custom options

We can input custom values for a specific property, either via the built-in builder or via direct struct initialization.

Client options using the builder

NewClientOptions() and DefaultClientOptions() both return an instance of *ClientOptions that can act as a builder.

options := gorabbit.NewClientOptions().
    SetMode(gorabbit.Debug).
    SetCredentials("root", "password").
    SetRetryDelay(5 * time.Second)

client := gorabbit.NewClient(options)

ℹ There is a setter method for each property.

Client options using struct initialization

ClientOptions is an exported type, so it can be used directly.

options := gorabbit.ClientOptions {
    Host:     "localhost",
    Port:     5673,
    Username: "root",
    Password: "password",
    ...
}

client := gorabbit.NewClient(&options)

⚠ Direct initialization via the struct does not use default values on missing properties, so be sure to fill in every property available.

Client disconnection

When a client is initialized, to prevent a leak, always disconnect it when no longer needed.

client := gorabbit.NewClient(gorabbit.DefaultClientOptions())
defer client.Disconnect()
Publishing

To send a message, the client offers two simple methods: Publish and PublishWithOptions. The required arguments for publishing are:

  • Exchange (which exchange the message should be sent to)
  • Routing Key
  • Payload (interface{}, the object will be marshalled internally)

Example of sending a simple string

err := client.Publish("events_exchange", "event.foo.bar.created", "foo string")

Example of sending an object

type foo struct {
    Action string
}

err := client.Publish("events_exchange", "event.foo.bar.created", foo{Action: "bar"})

Optionally, you can set the message's Priority and DeliveryMode via the PublishWithOptions method.

options := gorabbit.SendOptions().
    SetPriority(gorabbit.PriorityMedium).
    SetDeliveryMode(gorabbit.Persistent)

err := client.PublishWithOptions("events_exchange", "event.foo.bar.created", "foo string", options)

ℹ If the KeepAlive flag is set to true when initializing the client, failed publishing will be cached once and re-published as soon as the channel is back up.

publishing safeguard

Consuming

To consume messages, gorabbit offers a very simple asynchronous consumer method Consume that takes a MessageConsumer as argument. Error handling, acknowledgement, negative acknowledgement and rejection are all done internally by the consumer.

err := client.RegisterConsumer(gorabbit.MessageConsumer{
    Queue:             "events_queue",
    Name:              "toto_consumer",
    PrefetchSize:      0,
    PrefetchCount:     10,
    AutoAck:           false,
    ConcurrentProcess: false,
    Handlers: gorabbit.MQTTMessageHandlers{
        "event.foo.bar.created": func (payload []byte) error {
            fmt.Println(string(payload))

            return nil
        },
    },
})
  • Queue: The queue to consume messages from
  • Name: Unique identifier for the consumer
  • PrefetchSize: The maximum size of messages that can be processed at the same time
  • PrefetchCount: The maximum number of messages that can be processed at the same time
  • AutoAck: Automatic acknowledgement of messages upon reception
  • ConcurrentProcess: Asynchronous handling of deliveries
  • Handlers: A list of handlers for specified routes

NB: RabbitMQ Wildcards are also supported. If multiple routing keys have the same handler, a wildcard can be used, for example: event.foo.bar.* or event.foo.#.

ℹ If the KeepAlive flag is set to true when initializing the client, consumers will auto-reconnect after a connection loss. This mechanism is indefinite and therefore, consuming from a non-existent queue will trigger an error repeatedly but will not affect other consumptions. This is because each consumer has its own channel.

consumer safeguard

Ready and Health checks

The client offers IsReady() and IsHealthy() checks that can be used for monitoring.

Ready: Verifies that connections are opened and ready to launch new operations.

Healthy: Verifies that both connections and channels are opened, ready and ongoing operations are working (Consumers are consuming).

Manager

The gorabbit manager offers multiple management operations:

  • Exchange, queue and bindings creation
  • Exchange and queue deletion
  • Queue evaluation: Exists, number of messages
  • Queue operations: Pop message, push message, purge

⚠ A manager should only be used for either testing RabbitMQ functionalities or setting up a RabbitMQ server. The manager does not provide robust mechanisms of retry and reconnection like the client.

Manager initialization

A manager can be initialized via the constructor NewManager. This constructor takes ManagerOptions as an optional parameter.

Manager options
Property Description Default Value
Host The hostname of the RabbitMQ server 127.0.0.1
Port The port of the RabbitMQ server 5672
Username The plain authentication username guest
Password The plain authentication password guest
Vhost The specific vhost to use when connection to CloudAMQP
UseTLS The flag that activates the use of TLS (amqps) false
Mode The mode defines whether logs are shown or not Release
Manager with default options

Passing nil options will trigger the manager to use default values (host, port, credentials, etc...) via DefaultManagerOptions().

manager := gorabbit.NewManager(nil)

You can also explicitly pass DefaultManagerOptions() for a cleaner initialization.

manager := gorabbit.NewManager(gorabbit.DefaultManagerOptions())

Finally, passing a NewManagerOptions() method also initializes default values if not overwritten.

manager := gorabbit.NewManager(gorabbit.NewManagerOptions())
Manager with options from environment variables

You can instantiate a manager from environment variables, without the need of manually specifying options in the code.

manager := gorabbit.NewManagerFromEnv()

Here are the following supported environment variables:

  • RABBITMQ_HOST: Defines the host,
  • RABBITMQ_PORT: Defines the port,
  • RABBITMQ_USERNAME: Defines the username,
  • RABBITMQ_PASSWORD: Defines the password,
  • RABBITMQ_VHOST: Defines the vhost,
  • RABBITMQ_USE_TLS: Defines whether to use TLS or no.

Note that environment variables are all optional, so missing keys will be replaced by their corresponding default.

Manager with custom options

We can input custom values for a specific property, either via the built-in builder or via direct struct initialization.

Manager options using the builder

NewManagerOptions() and DefaultManagerOptions() both return an instance of *ManagerOptions that can act as a builder.

options := gorabbit.NewManagerOptions().
    SetMode(gorabbit.Debug).
    SetCredentials("root", "password")

manager := gorabbit.NewManager(options)

ℹ There is a setter method for each property.

Manager options using struct initialization

ManagerOptions is an exported type, so it can be used directly.

options := gorabbit.ManagerOptions {
    Host:     "localhost",
    Port:     5673,
    Username: "root",
    Password: "password",
    Mode:     gorabbit.Debug,
}

manager := gorabbit.NewManager(&options)

⚠ Direct initialization via the struct does not use default values on missing properties, so be sure to fill in every property available.

Manager disconnection

When a manager is initialized, to prevent a leak, always disconnect it when no longer needed.

manager := gorabbit.NewManager(gorabbit.DefaultManagerOptions())
defer manager.Disconnect()
Manager operations

The manager offers all necessary operations to manager a RabbitMQ server.

Exchange creation

Creates an exchange with optional arguments.

err := manager.CreateExchange(gorabbit.ExchangeConfig{
    Name:      "events_exchange",
    Type:      gorabbit.ExchangeTypeTopic,
    Persisted: false,
    Args:      nil,
})
Queue creation

Creates a queue with optional arguments and bindings if declared.

err := manager.CreateQueue(gorabbit.QueueConfig{
    Name:      "events_queue",
    Durable:   false,
    Exclusive: false,
    Args:      nil,
    Bindings: &[]gorabbit.BindingConfig{
        {
            RoutingKey: "event.foo.bar.created",
            Exchange:   "events_exchange",
        },
    },
})
Binding creation

Binds a queue to an exchange via a given routing key.

err := manager.BindExchangeToQueueViaRoutingKey("events_exchange", "events_queue", "event.foo.bar.created")
Queue messages count

Returns the number of messages in a queue, or an error if the queue does not exist. This method can also evaluate the existence of a queue.

messageCount, err := manager.GetNumberOfMessages("events_queue")
Push message

Pushes a single message to a given exchange.

err := manager.PushMessageToExchange("events_exchange", "event.foo.bar.created", "single_message_payload")
Pop message

Retrieves a single message from a given queue and auto acknowledges it if autoAck is set to true.

message, err := manager.PopMessageFromQueue("events_queue", true)
Purge queue

Deletes all messages from a given queue.

err := manager.PurgeQueue("events_queue")
Delete queue

Deletes a given queue.

err := manager.DeleteQueue("events_queue")
Delete exchange

Deletes a given exchange.

err := manager.DeleteExchange("events_exchange")
Setup from schema definition file

You can setup exchanges, queues and bindings automatically by referencing a RabbitMQ Schema Definition JSON file.

err := manager.SetupFromDefinitions("/path/to/definitions.json")

⚠ The standard RabbitMQ definitions file contains configurations for users, vhosts and permissions. Those configurations are not taken into consideration in the SetupFromDefinitions method.

Launch Local RabbitMQ Server

To run a local rabbitMQ server quickly with a docker container, simply run the following command:

docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management

It will launch a local RabbitMQ server mapped on port 5672, and the management dashboard will be mapped on port 15672 accessible on localhost:15672 with a username "guest" and password "guest".

License

Gorabbit is licensed under the MIT.

Documentation

Index

Constants

View Source
const (
	Release = "release"
	Debug   = "debug"
)

Logging Modes.

Variables

This section is empty.

Functions

This section is empty.

Types

type BindingConfig

type BindingConfig struct {
	RoutingKey string `yaml:"routing_key"`
	Exchange   string `yaml:"exchange"`
}

type ClientOptions

type ClientOptions struct {
	// Host is the RabbitMQ server host name.
	Host string

	// Port is the RabbitMQ server port number.
	Port uint

	// Username is the RabbitMQ server allowed username.
	Username string

	// Password is the RabbitMQ server allowed password.
	Password string

	// Vhost is used for CloudAMQP connections to set the specific vhost.
	Vhost string

	// UseTLS defines whether we use amqp or amqps protocol.
	UseTLS bool

	// KeepAlive will determine whether the re-connection and retry mechanisms should be triggered.
	KeepAlive bool

	// RetryDelay will define the delay for the re-connection and retry mechanism.
	RetryDelay time.Duration

	// MaxRetry will define the number of retries when an amqpMessage could not be processed.
	MaxRetry uint

	// PublishingCacheTTL defines the time to live for each publishing cache item.
	PublishingCacheTTL time.Duration

	// PublishingCacheSize defines the max length of the publishing cache.
	PublishingCacheSize uint64

	// Mode will specify whether logs are enabled or not.
	Mode string
}

ClientOptions holds all necessary properties to launch a successful connection with an MQTTClient.

func DefaultClientOptions

func DefaultClientOptions() *ClientOptions

DefaultClientOptions will return a ClientOptions with default values.

func NewClientOptions

func NewClientOptions() *ClientOptions

NewClientOptions is the exported builder for a ClientOptions and will offer setter methods for an easy construction. Any non-assigned field will be set to default through DefaultClientOptions.

func NewClientOptionsFromEnv

func NewClientOptionsFromEnv() *ClientOptions

NewClientOptionsFromEnv will generate a ClientOptions from environment variables. Empty values will be taken as default through the DefaultClientOptions.

func (*ClientOptions) SetCredentials

func (c *ClientOptions) SetCredentials(username, password string) *ClientOptions

SetCredentials will assign the Username and Password.

func (*ClientOptions) SetHost

func (c *ClientOptions) SetHost(host string) *ClientOptions

SetHost will assign the Host.

func (*ClientOptions) SetKeepAlive

func (c *ClientOptions) SetKeepAlive(keepAlive bool) *ClientOptions

SetKeepAlive will assign the KeepAlive status.

func (*ClientOptions) SetMaxRetry

func (c *ClientOptions) SetMaxRetry(retry uint) *ClientOptions

SetMaxRetry will assign the max retry count.

func (*ClientOptions) SetMode

func (c *ClientOptions) SetMode(mode string) *ClientOptions

SetMode will assign the Mode if valid.

func (*ClientOptions) SetPort

func (c *ClientOptions) SetPort(port uint) *ClientOptions

SetPort will assign the Port.

func (*ClientOptions) SetPublishingCacheSize

func (c *ClientOptions) SetPublishingCacheSize(size uint64) *ClientOptions

SetPublishingCacheSize will assign the publishing cache max length.

func (*ClientOptions) SetPublishingCacheTTL

func (c *ClientOptions) SetPublishingCacheTTL(ttl time.Duration) *ClientOptions

SetPublishingCacheTTL will assign the publishing cache item TTL.

func (*ClientOptions) SetRetryDelay

func (c *ClientOptions) SetRetryDelay(delay time.Duration) *ClientOptions

SetRetryDelay will assign the retry delay.

func (*ClientOptions) SetUseTLS

func (c *ClientOptions) SetUseTLS(use bool) *ClientOptions

SetUseTLS will assign the UseTLS status.

func (*ClientOptions) SetVhost

func (c *ClientOptions) SetVhost(vhost string) *ClientOptions

SetVhost will assign the Vhost.

type DeliveryMode

type DeliveryMode uint8
const (
	Transient  DeliveryMode = 1
	Persistent DeliveryMode = 2
)

func (DeliveryMode) Uint8

func (d DeliveryMode) Uint8() uint8

type ExchangeConfig

type ExchangeConfig struct {
	Name      string                 `yaml:"name"`
	Type      ExchangeType           `yaml:"type"`
	Persisted bool                   `yaml:"persisted"`
	Args      map[string]interface{} `yaml:"args"`
}

type ExchangeType

type ExchangeType string
const (
	ExchangeTypeTopic   ExchangeType = "topic"
	ExchangeTypeDirect  ExchangeType = "direct"
	ExchangeTypeFanout  ExchangeType = "fanout"
	ExchangeTypeHeaders ExchangeType = "headers"
)

func (ExchangeType) String

func (e ExchangeType) String() string

type MQTTClient

type MQTTClient interface {
	// Disconnect launches the disconnection process.
	// This operation disables to client permanently.
	Disconnect() error

	// Publish will send the desired payload through the selected channel.
	//	- exchange is the name of the exchange targeted for event publishing.
	//	- routingKey is the route that the exchange will use to forward the message.
	//	- payload is the object you want to send as a byte array.
	// Returns an error if the connection to the RabbitMQ server is down.
	Publish(exchange, routingKey string, payload interface{}) error

	// PublishWithOptions will send the desired payload through the selected channel.
	//	- exchange is the name of the exchange targeted for event publishing.
	//	- routingKey is the route that the exchange will use to forward the message.
	//	- payload is the object you want to send as a byte array.
	// Optionally you can add publishingOptions for extra customization.
	// Returns an error if the connection to the RabbitMQ server is down.
	PublishWithOptions(exchange, routingKey string, payload interface{}, options *PublishingOptions) error

	// RegisterConsumer will register a MessageConsumer for internal queue subscription and message processing.
	// The MessageConsumer will hold a list of MQTTMessageHandlers to internalize message processing.
	// Based on the return of error of each handler, the process of acknowledgment, rejection and retry of messages is
	// fully handled internally.
	// Furthermore, connection lost and channel errors are also internally handled by the connectionManager that will keep consumers
	// alive if and when necessary.
	RegisterConsumer(consumer MessageConsumer) error

	// IsReady returns true if the client is fully operational and connected to the RabbitMQ.
	IsReady() bool

	// IsHealthy returns true if the client is ready (IsReady) and all channels are operating successfully.
	IsHealthy() bool

	// GetHost returns the host used to initialize the client.
	GetHost() string

	// GetPort returns the port used to initialize the client.
	GetPort() uint

	// GetUsername returns the username used to initialize the client.
	GetUsername() string

	// GetVhost returns the vhost used to initialize the client.
	GetVhost() string

	// IsDisabled returns whether the client is disabled or not.
	IsDisabled() bool
}

MQTTClient is a simple MQTT interface that offers basic client operations such as:

  • Publishing
  • Consuming
  • Disconnecting
  • Ready and health checks

func NewClient

func NewClient(options *ClientOptions) MQTTClient

NewClient will instantiate a new MQTTClient. If options is set to nil, the DefaultClientOptions will be used.

func NewClientFromEnv

func NewClientFromEnv() MQTTClient

NewClientFromEnv will instantiate a new MQTTClient from environment variables.

type MQTTManager

type MQTTManager interface {
	// Disconnect launches the disconnection process.
	// This operation disables to manager permanently.
	Disconnect() error

	// CreateQueue will create a new queue from QueueConfig.
	CreateQueue(config QueueConfig) error

	// CreateExchange will create a new exchange from ExchangeConfig.
	CreateExchange(config ExchangeConfig) error

	// BindExchangeToQueueViaRoutingKey will bind an exchange to a queue via a given routingKey.
	// Returns an error if the connection to the RabbitMQ server is down or if the exchange or queue does not exist.
	BindExchangeToQueueViaRoutingKey(exchange, queue, routingKey string) error

	// GetNumberOfMessages retrieves the number of messages currently sitting in a given queue.
	// Returns an error if the connection to the RabbitMQ server is down or the queue does not exist.
	GetNumberOfMessages(queue string) (int, error)

	// PushMessageToExchange pushes a message to a given exchange with a given routing key.
	// Returns an error if the connection to the RabbitMQ server is down or the exchange does not exist.
	PushMessageToExchange(exchange, routingKey string, payload interface{}) error

	// PopMessageFromQueue retrieves the first message of a queue. The message can then be auto-acknowledged or not.
	// Returns an error if the connection to the RabbitMQ server is down or the queue does not exist or is empty.
	PopMessageFromQueue(queue string, autoAck bool) (*amqp.Delivery, error)

	// PurgeQueue will empty a queue of all its current messages.
	// Returns an error if the connection to the RabbitMQ server is down or the queue does not exist.
	PurgeQueue(queue string) error

	// DeleteQueue permanently deletes an existing queue.
	// Returns an error if the connection to the RabbitMQ server is down or the queue does not exist.
	DeleteQueue(queue string) error

	// DeleteExchange permanently deletes an existing exchange.
	// Returns an error if the connection to the RabbitMQ server is down or the exchange does not exist.
	DeleteExchange(exchange string) error

	// SetupFromDefinitions loads a definitions.json file and automatically sets up exchanges, queues and bindings.
	SetupFromDefinitions(path string) error

	// GetHost returns the host used to initialize the manager.
	GetHost() string

	// GetPort returns the port used to initialize the manager.
	GetPort() uint

	// GetUsername returns the username used to initialize the manager.
	GetUsername() string

	// GetVhost returns the vhost used to initialize the manager.
	GetVhost() string

	// IsDisabled returns whether the manager is disabled or not.
	IsDisabled() bool
}

MQTTManager is a simple MQTT interface that offers basic management operations such as:

  • Creation of queue, exchange and bindings
  • Deletion of queues and exchanges
  • Purge of queues
  • Queue evaluation (existence and number of messages)

func NewManager

func NewManager(options *ManagerOptions) (MQTTManager, error)

NewManager will instantiate a new MQTTManager. If options is set to nil, the DefaultManagerOptions will be used.

func NewManagerFromEnv

func NewManagerFromEnv() (MQTTManager, error)

NewManagerFromEnv will instantiate a new MQTTManager from environment variables.

type MQTTMessageHandlerFunc

type MQTTMessageHandlerFunc func(payload []byte) error

MQTTMessageHandlerFunc is the function that will be called when a delivery is received.

type MQTTMessageHandlers

type MQTTMessageHandlers map[string]MQTTMessageHandlerFunc

MQTTMessageHandlers is a wrapper that holds a map[string]MQTTMessageHandlerFunc.

func (MQTTMessageHandlers) FindFunc

func (mh MQTTMessageHandlers) FindFunc(routingKey string) MQTTMessageHandlerFunc

func (MQTTMessageHandlers) Validate

func (mh MQTTMessageHandlers) Validate() error

Validate verifies that all routing keys in the handlers are properly formatted and allowed.

type ManagerOptions

type ManagerOptions struct {
	// Host is the RabbitMQ server host name.
	Host string

	// Port is the RabbitMQ server port number.
	Port uint

	// Username is the RabbitMQ server allowed username.
	Username string

	// Password is the RabbitMQ server allowed password.
	Password string

	// Vhost is used for CloudAMQP connections to set the specific vhost.
	Vhost string

	// UseTLS defines whether we use amqp or amqps protocol.
	UseTLS bool

	// Mode will specify whether logs are enabled or not.
	Mode string
}

ManagerOptions holds all necessary properties to launch a successful connection with an MQTTManager.

func DefaultManagerOptions

func DefaultManagerOptions() *ManagerOptions

DefaultManagerOptions will return a ManagerOptions with default values.

func NewManagerOptions

func NewManagerOptions() *ManagerOptions

NewManagerOptions is the exported builder for a ManagerOptions and will offer setter methods for an easy construction. Any non-assigned field will be set to default through DefaultManagerOptions.

func NewManagerOptionsFromEnv

func NewManagerOptionsFromEnv() *ManagerOptions

NewManagerOptionsFromEnv will generate a ManagerOptions from environment variables. Empty values will be taken as default through the DefaultManagerOptions.

func (*ManagerOptions) SetCredentials

func (m *ManagerOptions) SetCredentials(username, password string) *ManagerOptions

SetCredentials will assign the username and password.

func (*ManagerOptions) SetHost

func (m *ManagerOptions) SetHost(host string) *ManagerOptions

SetHost will assign the host.

func (*ManagerOptions) SetMode

func (m *ManagerOptions) SetMode(mode string) *ManagerOptions

SetMode will assign the mode if valid.

func (*ManagerOptions) SetPort

func (m *ManagerOptions) SetPort(port uint) *ManagerOptions

SetPort will assign the port.

func (*ManagerOptions) SetUseTLS

func (m *ManagerOptions) SetUseTLS(use bool) *ManagerOptions

SetUseTLS will assign the UseTLS status.

func (*ManagerOptions) SetVhost

func (m *ManagerOptions) SetVhost(vhost string) *ManagerOptions

SetVhost will assign the Vhost.

type MessageConsumer

type MessageConsumer struct {
	// Queue defines the queue from which we want to consume messages.
	Queue string

	// Name is a unique identifier of the consumer. Should be as explicit as possible.
	Name string

	// PrefetchSize defines the max size of messages that are allowed to be processed at the same time.
	// This property is dropped if AutoAck is set to true.
	PrefetchSize int

	// PrefetchCount defines the max number of messages that are allowed to be processed at the same time.
	// This property is dropped if AutoAck is set to true.
	PrefetchCount int

	// AutoAck defines whether a message is directly acknowledged or not when being consumed.
	AutoAck bool

	// ConcurrentProcess will make MQTTMessageHandlers run concurrently for faster consumption, if set to true.
	ConcurrentProcess bool

	// Handlers is the list of defined handlers.
	Handlers MQTTMessageHandlers
}

MessageConsumer holds all the information needed to consume messages.

func (MessageConsumer) HashCode

func (c MessageConsumer) HashCode() string

HashCode returns a unique identifier for the defined consumer.

type MessagePriority

type MessagePriority uint8
const (
	PriorityLowest  MessagePriority = 1
	PriorityVeryLow MessagePriority = 2
	PriorityLow     MessagePriority = 3
	PriorityMedium  MessagePriority = 4
	PriorityHigh    MessagePriority = 5
	PriorityHighest MessagePriority = 6
)

func (MessagePriority) Uint8

func (m MessagePriority) Uint8() uint8

type PublishingOptions

type PublishingOptions struct {
	MessagePriority *MessagePriority
	DeliveryMode    *DeliveryMode
}

func SendOptions

func SendOptions() *PublishingOptions

func (*PublishingOptions) SetMode

func (*PublishingOptions) SetPriority

func (m *PublishingOptions) SetPriority(priority MessagePriority) *PublishingOptions

type QueueConfig

type QueueConfig struct {
	Name      string                 `yaml:"name"`
	Durable   bool                   `yaml:"durable"`
	Exclusive bool                   `yaml:"exclusive"`
	Args      map[string]interface{} `yaml:"args"`
	Bindings  []BindingConfig        `yaml:"bindings"`
}

type RabbitMQEnvs

type RabbitMQEnvs struct {
	Host     string `env:"RABBITMQ_HOST"`
	Port     uint   `env:"RABBITMQ_PORT"`
	Username string `env:"RABBITMQ_USERNAME"`
	Password string `env:"RABBITMQ_PASSWORD"`
	Vhost    string `env:"RABBITMQ_VHOST"`
	UseTLS   bool   `env:"RABBITMQ_USE_TLS"`
}

type SchemaDefinitions

type SchemaDefinitions struct {
	Exchanges []struct {
		Name       string `json:"name"`
		Vhost      string `json:"vhost"`
		Type       string `json:"type"`
		Durable    bool   `json:"durable"`
		AutoDelete bool   `json:"auto_delete"`
		Internal   bool   `json:"internal"`
		Arguments  struct {
		} `json:"arguments"`
	} `json:"exchanges"`
	Queues []struct {
		Name       string `json:"name"`
		Vhost      string `json:"vhost"`
		Durable    bool   `json:"durable"`
		AutoDelete bool   `json:"auto_delete"`
		Arguments  struct {
		} `json:"arguments"`
	} `json:"queues"`
	Bindings []struct {
		Source          string `json:"source"`
		Vhost           string `json:"vhost"`
		Destination     string `json:"destination"`
		DestinationType string `json:"destination_type"`
		RoutingKey      string `json:"routing_key"`
		Arguments       struct {
		} `json:"arguments"`
	} `json:"bindings"`
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL