Documentation ¶
Index ¶
- Constants
- type BindingConfig
- type ClientOptions
- func (c *ClientOptions) SetCredentials(username, password string) *ClientOptions
- func (c *ClientOptions) SetHost(host string) *ClientOptions
- func (c *ClientOptions) SetKeepAlive(keepAlive bool) *ClientOptions
- func (c *ClientOptions) SetMaxRetry(retry uint) *ClientOptions
- func (c *ClientOptions) SetMode(mode string) *ClientOptions
- func (c *ClientOptions) SetPort(port uint) *ClientOptions
- func (c *ClientOptions) SetPublishingCacheSize(size uint64) *ClientOptions
- func (c *ClientOptions) SetPublishingCacheTTL(ttl time.Duration) *ClientOptions
- func (c *ClientOptions) SetRetryDelay(delay time.Duration) *ClientOptions
- func (c *ClientOptions) SetUseTLS(use bool) *ClientOptions
- func (c *ClientOptions) SetVhost(vhost string) *ClientOptions
- type DeliveryMode
- type ExchangeConfig
- type ExchangeType
- type MQTTClient
- type MQTTManager
- type MQTTMessageHandlerFunc
- type MQTTMessageHandlers
- type ManagerOptions
- func (m *ManagerOptions) SetCredentials(username, password string) *ManagerOptions
- func (m *ManagerOptions) SetHost(host string) *ManagerOptions
- func (m *ManagerOptions) SetMode(mode string) *ManagerOptions
- func (m *ManagerOptions) SetPort(port uint) *ManagerOptions
- func (m *ManagerOptions) SetUseTLS(use bool) *ManagerOptions
- func (m *ManagerOptions) SetVhost(vhost string) *ManagerOptions
- type MessageConsumer
- type MessagePriority
- type PublishingOptions
- type QueueConfig
- type RabbitMQEnvs
- type SchemaDefinitions
Constants ¶
const ( Release = "release" Debug = "debug" )
Logging Modes.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type BindingConfig ¶
type ClientOptions ¶
type ClientOptions struct { // Host is the RabbitMQ server host name. Host string // Port is the RabbitMQ server port number. Port uint // Username is the RabbitMQ server allowed username. Username string // Password is the RabbitMQ server allowed password. Password string // Vhost is used for CloudAMQP connections to set the specific vhost. Vhost string // UseTLS defines whether we use amqp or amqps protocol. UseTLS bool // KeepAlive will determine whether the re-connection and retry mechanisms should be triggered. KeepAlive bool // RetryDelay will define the delay for the re-connection and retry mechanism. RetryDelay time.Duration // MaxRetry will define the number of retries when an amqpMessage could not be processed. MaxRetry uint // PublishingCacheTTL defines the time to live for each publishing cache item. PublishingCacheTTL time.Duration // PublishingCacheSize defines the max length of the publishing cache. PublishingCacheSize uint64 // Mode will specify whether logs are enabled or not. Mode string }
ClientOptions holds all necessary properties to launch a successful connection with an MQTTClient.
func DefaultClientOptions ¶
func DefaultClientOptions() *ClientOptions
DefaultClientOptions will return a ClientOptions with default values.
func NewClientOptions ¶
func NewClientOptions() *ClientOptions
NewClientOptions is the exported builder for a ClientOptions and will offer setter methods for an easy construction. Any non-assigned field will be set to default through DefaultClientOptions.
func NewClientOptionsFromEnv ¶
func NewClientOptionsFromEnv() *ClientOptions
NewClientOptionsFromEnv will generate a ClientOptions from environment variables. Empty values will be taken as default through the DefaultClientOptions.
func (*ClientOptions) SetCredentials ¶
func (c *ClientOptions) SetCredentials(username, password string) *ClientOptions
SetCredentials will assign the Username and Password.
func (*ClientOptions) SetHost ¶
func (c *ClientOptions) SetHost(host string) *ClientOptions
SetHost will assign the Host.
func (*ClientOptions) SetKeepAlive ¶
func (c *ClientOptions) SetKeepAlive(keepAlive bool) *ClientOptions
SetKeepAlive will assign the KeepAlive status.
func (*ClientOptions) SetMaxRetry ¶
func (c *ClientOptions) SetMaxRetry(retry uint) *ClientOptions
SetMaxRetry will assign the max retry count.
func (*ClientOptions) SetMode ¶
func (c *ClientOptions) SetMode(mode string) *ClientOptions
SetMode will assign the Mode if valid.
func (*ClientOptions) SetPort ¶
func (c *ClientOptions) SetPort(port uint) *ClientOptions
SetPort will assign the Port.
func (*ClientOptions) SetPublishingCacheSize ¶
func (c *ClientOptions) SetPublishingCacheSize(size uint64) *ClientOptions
SetPublishingCacheSize will assign the publishing cache max length.
func (*ClientOptions) SetPublishingCacheTTL ¶
func (c *ClientOptions) SetPublishingCacheTTL(ttl time.Duration) *ClientOptions
SetPublishingCacheTTL will assign the publishing cache item TTL.
func (*ClientOptions) SetRetryDelay ¶
func (c *ClientOptions) SetRetryDelay(delay time.Duration) *ClientOptions
SetRetryDelay will assign the retry delay.
func (*ClientOptions) SetUseTLS ¶
func (c *ClientOptions) SetUseTLS(use bool) *ClientOptions
SetUseTLS will assign the UseTLS status.
func (*ClientOptions) SetVhost ¶
func (c *ClientOptions) SetVhost(vhost string) *ClientOptions
SetVhost will assign the Vhost.
type DeliveryMode ¶
type DeliveryMode uint8
const ( Transient DeliveryMode = 1 Persistent DeliveryMode = 2 )
func (DeliveryMode) Uint8 ¶
func (d DeliveryMode) Uint8() uint8
type ExchangeConfig ¶
type ExchangeConfig struct { Name string `yaml:"name"` Type ExchangeType `yaml:"type"` Persisted bool `yaml:"persisted"` Args map[string]interface{} `yaml:"args"` }
type ExchangeType ¶
type ExchangeType string
const ( ExchangeTypeTopic ExchangeType = "topic" ExchangeTypeDirect ExchangeType = "direct" ExchangeTypeFanout ExchangeType = "fanout" ExchangeTypeHeaders ExchangeType = "headers" )
func (ExchangeType) String ¶
func (e ExchangeType) String() string
type MQTTClient ¶
type MQTTClient interface { // Disconnect launches the disconnection process. // This operation disables to client permanently. Disconnect() error // Publish will send the desired payload through the selected channel. // - exchange is the name of the exchange targeted for event publishing. // - routingKey is the route that the exchange will use to forward the message. // - payload is the object you want to send as a byte array. // Returns an error if the connection to the RabbitMQ server is down. Publish(exchange, routingKey string, payload interface{}) error // PublishWithOptions will send the desired payload through the selected channel. // - exchange is the name of the exchange targeted for event publishing. // - routingKey is the route that the exchange will use to forward the message. // - payload is the object you want to send as a byte array. // Optionally you can add publishingOptions for extra customization. // Returns an error if the connection to the RabbitMQ server is down. PublishWithOptions(exchange, routingKey string, payload interface{}, options *PublishingOptions) error // RegisterConsumer will register a MessageConsumer for internal queue subscription and message processing. // The MessageConsumer will hold a list of MQTTMessageHandlers to internalize message processing. // Based on the return of error of each handler, the process of acknowledgment, rejection and retry of messages is // fully handled internally. // Furthermore, connection lost and channel errors are also internally handled by the connectionManager that will keep consumers // alive if and when necessary. RegisterConsumer(consumer MessageConsumer) error // IsReady returns true if the client is fully operational and connected to the RabbitMQ. IsReady() bool // IsHealthy returns true if the client is ready (IsReady) and all channels are operating successfully. IsHealthy() bool // GetHost returns the host used to initialize the client. GetHost() string // GetPort returns the port used to initialize the client. GetPort() uint // GetUsername returns the username used to initialize the client. GetUsername() string // GetVhost returns the vhost used to initialize the client. GetVhost() string // IsDisabled returns whether the client is disabled or not. IsDisabled() bool }
MQTTClient is a simple MQTT interface that offers basic client operations such as:
- Publishing
- Consuming
- Disconnecting
- Ready and health checks
func NewClient ¶
func NewClient(options *ClientOptions) MQTTClient
NewClient will instantiate a new MQTTClient. If options is set to nil, the DefaultClientOptions will be used.
func NewClientFromEnv ¶
func NewClientFromEnv() MQTTClient
NewClientFromEnv will instantiate a new MQTTClient from environment variables.
type MQTTManager ¶
type MQTTManager interface { // Disconnect launches the disconnection process. // This operation disables to manager permanently. Disconnect() error // CreateQueue will create a new queue from QueueConfig. CreateQueue(config QueueConfig) error // CreateExchange will create a new exchange from ExchangeConfig. CreateExchange(config ExchangeConfig) error // BindExchangeToQueueViaRoutingKey will bind an exchange to a queue via a given routingKey. // Returns an error if the connection to the RabbitMQ server is down or if the exchange or queue does not exist. BindExchangeToQueueViaRoutingKey(exchange, queue, routingKey string) error // GetNumberOfMessages retrieves the number of messages currently sitting in a given queue. // Returns an error if the connection to the RabbitMQ server is down or the queue does not exist. GetNumberOfMessages(queue string) (int, error) // PushMessageToExchange pushes a message to a given exchange with a given routing key. // Returns an error if the connection to the RabbitMQ server is down or the exchange does not exist. PushMessageToExchange(exchange, routingKey string, payload interface{}) error // PopMessageFromQueue retrieves the first message of a queue. The message can then be auto-acknowledged or not. // Returns an error if the connection to the RabbitMQ server is down or the queue does not exist or is empty. PopMessageFromQueue(queue string, autoAck bool) (*amqp.Delivery, error) // PurgeQueue will empty a queue of all its current messages. // Returns an error if the connection to the RabbitMQ server is down or the queue does not exist. PurgeQueue(queue string) error // DeleteQueue permanently deletes an existing queue. // Returns an error if the connection to the RabbitMQ server is down or the queue does not exist. DeleteQueue(queue string) error // DeleteExchange permanently deletes an existing exchange. // Returns an error if the connection to the RabbitMQ server is down or the exchange does not exist. DeleteExchange(exchange string) error // SetupFromDefinitions loads a definitions.json file and automatically sets up exchanges, queues and bindings. SetupFromDefinitions(path string) error // GetHost returns the host used to initialize the manager. GetHost() string // GetPort returns the port used to initialize the manager. GetPort() uint // GetUsername returns the username used to initialize the manager. GetUsername() string // GetVhost returns the vhost used to initialize the manager. GetVhost() string // IsDisabled returns whether the manager is disabled or not. IsDisabled() bool }
MQTTManager is a simple MQTT interface that offers basic management operations such as:
- Creation of queue, exchange and bindings
- Deletion of queues and exchanges
- Purge of queues
- Queue evaluation (existence and number of messages)
func NewManager ¶
func NewManager(options *ManagerOptions) (MQTTManager, error)
NewManager will instantiate a new MQTTManager. If options is set to nil, the DefaultManagerOptions will be used.
func NewManagerFromEnv ¶
func NewManagerFromEnv() (MQTTManager, error)
NewManagerFromEnv will instantiate a new MQTTManager from environment variables.
type MQTTMessageHandlerFunc ¶
MQTTMessageHandlerFunc is the function that will be called when a delivery is received.
type MQTTMessageHandlers ¶
type MQTTMessageHandlers map[string]MQTTMessageHandlerFunc
MQTTMessageHandlers is a wrapper that holds a map[string]MQTTMessageHandlerFunc.
func (MQTTMessageHandlers) FindFunc ¶
func (mh MQTTMessageHandlers) FindFunc(routingKey string) MQTTMessageHandlerFunc
func (MQTTMessageHandlers) Validate ¶
func (mh MQTTMessageHandlers) Validate() error
Validate verifies that all routing keys in the handlers are properly formatted and allowed.
type ManagerOptions ¶
type ManagerOptions struct { // Host is the RabbitMQ server host name. Host string // Port is the RabbitMQ server port number. Port uint // Username is the RabbitMQ server allowed username. Username string // Password is the RabbitMQ server allowed password. Password string // Vhost is used for CloudAMQP connections to set the specific vhost. Vhost string // UseTLS defines whether we use amqp or amqps protocol. UseTLS bool // Mode will specify whether logs are enabled or not. Mode string }
ManagerOptions holds all necessary properties to launch a successful connection with an MQTTManager.
func DefaultManagerOptions ¶
func DefaultManagerOptions() *ManagerOptions
DefaultManagerOptions will return a ManagerOptions with default values.
func NewManagerOptions ¶
func NewManagerOptions() *ManagerOptions
NewManagerOptions is the exported builder for a ManagerOptions and will offer setter methods for an easy construction. Any non-assigned field will be set to default through DefaultManagerOptions.
func NewManagerOptionsFromEnv ¶
func NewManagerOptionsFromEnv() *ManagerOptions
NewManagerOptionsFromEnv will generate a ManagerOptions from environment variables. Empty values will be taken as default through the DefaultManagerOptions.
func (*ManagerOptions) SetCredentials ¶
func (m *ManagerOptions) SetCredentials(username, password string) *ManagerOptions
SetCredentials will assign the username and password.
func (*ManagerOptions) SetHost ¶
func (m *ManagerOptions) SetHost(host string) *ManagerOptions
SetHost will assign the host.
func (*ManagerOptions) SetMode ¶
func (m *ManagerOptions) SetMode(mode string) *ManagerOptions
SetMode will assign the mode if valid.
func (*ManagerOptions) SetPort ¶
func (m *ManagerOptions) SetPort(port uint) *ManagerOptions
SetPort will assign the port.
func (*ManagerOptions) SetUseTLS ¶
func (m *ManagerOptions) SetUseTLS(use bool) *ManagerOptions
SetUseTLS will assign the UseTLS status.
func (*ManagerOptions) SetVhost ¶
func (m *ManagerOptions) SetVhost(vhost string) *ManagerOptions
SetVhost will assign the Vhost.
type MessageConsumer ¶
type MessageConsumer struct { // Queue defines the queue from which we want to consume messages. Queue string // Name is a unique identifier of the consumer. Should be as explicit as possible. Name string // PrefetchSize defines the max size of messages that are allowed to be processed at the same time. // This property is dropped if AutoAck is set to true. PrefetchSize int // PrefetchCount defines the max number of messages that are allowed to be processed at the same time. // This property is dropped if AutoAck is set to true. PrefetchCount int // AutoAck defines whether a message is directly acknowledged or not when being consumed. AutoAck bool // ConcurrentProcess will make MQTTMessageHandlers run concurrently for faster consumption, if set to true. ConcurrentProcess bool // Handlers is the list of defined handlers. Handlers MQTTMessageHandlers }
MessageConsumer holds all the information needed to consume messages.
func (MessageConsumer) HashCode ¶
func (c MessageConsumer) HashCode() string
HashCode returns a unique identifier for the defined consumer.
type MessagePriority ¶
type MessagePriority uint8
const ( PriorityLowest MessagePriority = 1 PriorityVeryLow MessagePriority = 2 PriorityLow MessagePriority = 3 PriorityMedium MessagePriority = 4 PriorityHigh MessagePriority = 5 PriorityHighest MessagePriority = 6 )
func (MessagePriority) Uint8 ¶
func (m MessagePriority) Uint8() uint8
type PublishingOptions ¶
type PublishingOptions struct { MessagePriority *MessagePriority DeliveryMode *DeliveryMode }
func SendOptions ¶
func SendOptions() *PublishingOptions
func (*PublishingOptions) SetMode ¶
func (m *PublishingOptions) SetMode(mode DeliveryMode) *PublishingOptions
func (*PublishingOptions) SetPriority ¶
func (m *PublishingOptions) SetPriority(priority MessagePriority) *PublishingOptions
type QueueConfig ¶
type QueueConfig struct { Name string `yaml:"name"` Durable bool `yaml:"durable"` Exclusive bool `yaml:"exclusive"` Args map[string]interface{} `yaml:"args"` Bindings []BindingConfig `yaml:"bindings"` }
type RabbitMQEnvs ¶
type SchemaDefinitions ¶
type SchemaDefinitions struct { Exchanges []struct { Name string `json:"name"` Vhost string `json:"vhost"` Type string `json:"type"` Durable bool `json:"durable"` AutoDelete bool `json:"auto_delete"` Internal bool `json:"internal"` Arguments struct { } `json:"arguments"` } `json:"exchanges"` Queues []struct { Name string `json:"name"` Vhost string `json:"vhost"` Durable bool `json:"durable"` AutoDelete bool `json:"auto_delete"` Arguments struct { } `json:"arguments"` } `json:"queues"` Bindings []struct { Source string `json:"source"` Vhost string `json:"vhost"` Destination string `json:"destination"` DestinationType string `json:"destination_type"` RoutingKey string `json:"routing_key"` Arguments struct { } `json:"arguments"` } `json:"bindings"` }