Documentation ¶
Index ¶
- type RabbitMQMessageQueue
- func (c *RabbitMQMessageQueue) Abandon(ctx context.Context, message *cqueues.MessageEnvelope) (err error)
- func (c *RabbitMQMessageQueue) Clear(ctx context.Context) (err error)
- func (c *RabbitMQMessageQueue) Close(ctx context.Context) (err error)
- func (c *RabbitMQMessageQueue) Complete(ctx context.Context, message *cqueues.MessageEnvelope) (err error)
- func (c *RabbitMQMessageQueue) Configure(ctx context.Context, config *cconf.ConfigParams)
- func (c *RabbitMQMessageQueue) EndListen(ctx context.Context)
- func (c *RabbitMQMessageQueue) IsOpen() bool
- func (c *RabbitMQMessageQueue) Listen(ctx context.Context, receiver cqueues.IMessageReceiver) error
- func (c *RabbitMQMessageQueue) MoveToDeadLetter(ctx context.Context, message *cqueues.MessageEnvelope) (err error)
- func (c *RabbitMQMessageQueue) Open(ctx context.Context) error
- func (c *RabbitMQMessageQueue) Peek(ctx context.Context) (result *cqueues.MessageEnvelope, err error)
- func (c *RabbitMQMessageQueue) PeekBatch(ctx context.Context, messageCount int64) (result []*cqueues.MessageEnvelope, err error)
- func (c *RabbitMQMessageQueue) ReadMessageCount() (count int64, err error)
- func (c *RabbitMQMessageQueue) Receive(ctx context.Context, waitTimeout time.Duration) (result *cqueues.MessageEnvelope, err error)
- func (c *RabbitMQMessageQueue) RenewLock(ctx context.Context, message *cqueues.MessageEnvelope, ...) (err error)
- func (c *RabbitMQMessageQueue) Send(ctx context.Context, message *cqueues.MessageEnvelope) (err error)
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type RabbitMQMessageQueue ¶
type RabbitMQMessageQueue struct { *cqueues.MessageQueue Interval time.Duration // contains filtered or unexported fields }
Message queue that sends and receives messages via MQTT message broker.
MQTT is a popular light-weight protocol to communicate IoT devices.
Configuration parameters:
topic: name of MQTT topic to subscribe
connection(s):
discovery_key: (optional) a key to retrieve the connection from IDiscovery
host: host name or IP address
port: port number
uri: resource URI or connection string with all parameters in it
credential(s):
store_key: (optional) a key to retrieve the credentials from ICredentialStore
username: user name
password: user password
References:
- *:logger:*:*:1.0 (optional) ILogger components to pass log messages - *:counters:*:*:1.0 (optional) ICounters components to pass collected measurements - *:discovery:*:*:1.0 (optional) IDiscovery services to resolve connections - *:credential-store:*:*:1.0 (optional) Credential stores to resolve credentials
Example:
ctx := context.Background() queue := queues.NewEmptyRabbitMQMessageQueue("myqueue") queue.Configure(ctx, config.NewConfigParamsFromTuples( "exchange", "my_exchange", "queue", "my_exchange", "options.auto_create", true, "connection.host", "5672", "connection.port", "localhost", "credential.username", "user", "credential.password", "password", )) _ = queue.Open(ctx) _ = queue.Send(ctx, cqueues.NewMessageEnvelope("", "mymessage", []byte("ABC"))) message, _ := queue.Receive(ctx, 10000*time.Millisecond) if message != nil { // ... queue.Complete(ctx, message) }
func NewEmptyRabbitMQMessageQueue ¶
func NewEmptyRabbitMQMessageQueue(name string) *RabbitMQMessageQueue
Creates a new instance of the message queue. Parameters: - name(optional) a queue name.
func NewRabbitMQMessageQueue ¶
func NewRabbitMQMessageQueue(name string, mqChanel *rabbitmq.Channel, queue string) *RabbitMQMessageQueue
func NewRabbitMQMessageQueueFromConfig ¶
func NewRabbitMQMessageQueueFromConfig(name string, config *cconf.ConfigParams) *RabbitMQMessageQueue
func (*RabbitMQMessageQueue) Abandon ¶
func (c *RabbitMQMessageQueue) Abandon(ctx context.Context, message *cqueues.MessageEnvelope) (err error)
Returnes message into the queue and makes it available for all subscribers to receive it again. This method is usually used to return a message which could not be processed at the moment to repeat the attempt.Messages that cause unrecoverable errors shall be removed permanently or/and send to dead letter queue. Important: This method is not supported by RabbitMQ. Parameters:
- ctx context.Context
- message a message to return.
func (*RabbitMQMessageQueue) Clear ¶
func (c *RabbitMQMessageQueue) Clear(ctx context.Context) (err error)
Clear method are clears component state. Parameters: - ctx context.Context transaction id to trace execution through call chain. Returns: error
func (*RabbitMQMessageQueue) Close ¶
func (c *RabbitMQMessageQueue) Close(ctx context.Context) (err error)
Close mwthod are closes component and frees used resources.
Parameters: - ctx context.Context transaction id to trace execution through call chain.
func (*RabbitMQMessageQueue) Complete ¶
func (c *RabbitMQMessageQueue) Complete(ctx context.Context, message *cqueues.MessageEnvelope) (err error)
Permanently removes a message from the queue. This method is usually used to remove the message after successful processing. Important: This method is not supported by RabbitMQ. Parameters:
- ctx context.Context
- message a message to remove.
func (*RabbitMQMessageQueue) Configure ¶
func (c *RabbitMQMessageQueue) Configure(ctx context.Context, config *cconf.ConfigParams)
Configures component by passing configuration parameters. Parameters:
- ctx context.Context
- config configuration parameters to be set.
func (*RabbitMQMessageQueue) EndListen ¶
func (c *RabbitMQMessageQueue) EndListen(ctx context.Context)
Ends listening for incoming messages. When this method is call listen unblocks the thread and execution continues. Parameters:
- ctx context.Context transaction id to trace execution through call chain.
func (*RabbitMQMessageQueue) IsOpen ¶
func (c *RabbitMQMessageQueue) IsOpen() bool
Checks if the component is opened. Retruns : true if the component has been opened and false otherwise.
func (*RabbitMQMessageQueue) Listen ¶
func (c *RabbitMQMessageQueue) Listen(ctx context.Context, receiver cqueues.IMessageReceiver) error
Listens for incoming messages and blocks the current thread until queue is closed. Parameters: - ctx context.Context transaction id to trace execution through call chain. - receiver IMessageReceiver callback interface to receive incoming messages. Returns: listen error
func (*RabbitMQMessageQueue) MoveToDeadLetter ¶
func (c *RabbitMQMessageQueue) MoveToDeadLetter(ctx context.Context, message *cqueues.MessageEnvelope) (err error)
Permanently removes a message from the queue and sends it to dead letter queue. Important: This method is not supported by RabbitMQ. Parameters:
- ctx context.Context
- message a message to be removed.
Returns: error
func (*RabbitMQMessageQueue) Open ¶
func (c *RabbitMQMessageQueue) Open(ctx context.Context) error
Opens the component with given connection and credential parameters. Parameters:
- ctx context.Context transaction id to trace execution through call chain.
- connections connection parameters
- credential credential parameters
func (*RabbitMQMessageQueue) Peek ¶
func (c *RabbitMQMessageQueue) Peek(ctx context.Context) (result *cqueues.MessageEnvelope, err error)
Peeks a single incoming message from the queue without removing it. If there are no messages available in the queue it returns nil. Parameters:
- ctx context.Context transaction id to trace execution through call chain.
Returns: a message
func (*RabbitMQMessageQueue) PeekBatch ¶
func (c *RabbitMQMessageQueue) PeekBatch(ctx context.Context, messageCount int64) (result []*cqueues.MessageEnvelope, err error)
PeekBatch method are peeks multiple incoming messages from the queue without removing them. If there are no messages available in the queue it returns an empty list. Parameters:
- ctx context.Context transaction id to trace execution through call chain.
- messageCount a maximum number of messages to peek.
Returns: a list with messages
func (*RabbitMQMessageQueue) ReadMessageCount ¶
func (c *RabbitMQMessageQueue) ReadMessageCount() (count int64, err error)
ReadMessageCount method are reads the current number of messages in the queue to be delivered. Returns count int64, err error number of messages or error.
func (*RabbitMQMessageQueue) Receive ¶
func (c *RabbitMQMessageQueue) Receive(ctx context.Context, waitTimeout time.Duration) (result *cqueues.MessageEnvelope, err error)
Receive method are receives an incoming message and removes it from the queue. Parameters: - ctx context.Context transaction id to trace execution through call chain. - waitTimeout a timeout in milliseconds to wait for a message to come. Returns: a message
func (*RabbitMQMessageQueue) RenewLock ¶
func (c *RabbitMQMessageQueue) RenewLock(ctx context.Context, message *cqueues.MessageEnvelope, lockTimeout time.Duration) (err error)
Renews a lock on a message that makes it invisible from other receivers in the queue. This method is usually used to extend the message processing time. Important: This method is not supported by RabbitMQ. Parameters:
- ctx context.Context
- message a message to extend its lock.
- lockTimeout a locking timeout in milliseconds.
func (*RabbitMQMessageQueue) Send ¶
func (c *RabbitMQMessageQueue) Send(ctx context.Context, message *cqueues.MessageEnvelope) (err error)
Send method are sends a message into the queue. Parameters: - ctx context.Context transaction id to trace execution through call chain. - message a message envelop to be sent.