queues

package
v0.0.1-3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 25, 2024 License: MIT Imports: 8 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type RabbitMQMessageQueue

type RabbitMQMessageQueue struct {
	*cqueues.MessageQueue

	Interval time.Duration
	// contains filtered or unexported fields
}

Message queue that sends and receives messages via MQTT message broker.

MQTT is a popular light-weight protocol to communicate IoT devices.

Configuration parameters:

  • topic: name of MQTT topic to subscribe

    connection(s):

  • discovery_key: (optional) a key to retrieve the connection from IDiscovery

  • host: host name or IP address

  • port: port number

  • uri: resource URI or connection string with all parameters in it

    credential(s):

  • store_key: (optional) a key to retrieve the credentials from ICredentialStore

  • username: user name

  • password: user password

References:

- *:logger:*:*:1.0 (optional) ILogger components to pass log messages - *:counters:*:*:1.0 (optional) ICounters components to pass collected measurements - *:discovery:*:*:1.0 (optional) IDiscovery services to resolve connections - *:credential-store:*:*:1.0 (optional) Credential stores to resolve credentials

Example:

ctx := context.Background()
queue := queues.NewEmptyRabbitMQMessageQueue("myqueue")
queue.Configure(ctx, config.NewConfigParamsFromTuples(
	"exchange", "my_exchange",
	"queue", "my_exchange",
	"options.auto_create", true,
	"connection.host", "5672",
	"connection.port", "localhost",
	"credential.username", "user",
	"credential.password", "password",
))
_ = queue.Open(ctx)
_ = queue.Send(ctx, cqueues.NewMessageEnvelope("", "mymessage", []byte("ABC")))
message, _ := queue.Receive(ctx, 10000*time.Millisecond)
if message != nil {
	// ...
	queue.Complete(ctx, message)
}

func NewEmptyRabbitMQMessageQueue

func NewEmptyRabbitMQMessageQueue(name string) *RabbitMQMessageQueue
 Creates a new instance of the message queue.
	Parameters:
 	- name(optional) a queue name.

func NewRabbitMQMessageQueue

func NewRabbitMQMessageQueue(name string, mqChanel *rabbitmq.Channel, queue string) *RabbitMQMessageQueue

func NewRabbitMQMessageQueueFromConfig

func NewRabbitMQMessageQueueFromConfig(name string, config *cconf.ConfigParams) *RabbitMQMessageQueue

func (*RabbitMQMessageQueue) Abandon

func (c *RabbitMQMessageQueue) Abandon(ctx context.Context, message *cqueues.MessageEnvelope) (err error)

Returnes message into the queue and makes it available for all subscribers to receive it again. This method is usually used to return a message which could not be processed at the moment to repeat the attempt.Messages that cause unrecoverable errors shall be removed permanently or/and send to dead letter queue. Important: This method is not supported by RabbitMQ. Parameters:

  • ctx context.Context
  • message a message to return.

func (*RabbitMQMessageQueue) Clear

func (c *RabbitMQMessageQueue) Clear(ctx context.Context) (err error)
	Clear method are clears component state.
	Parameters:
		- ctx context.Context transaction id to trace execution through call chain.
 Returns: error

func (*RabbitMQMessageQueue) Close

func (c *RabbitMQMessageQueue) Close(ctx context.Context) (err error)

Close mwthod are closes component and frees used resources.

Parameters:
	- ctx context.Context transaction id to trace execution through call chain.

func (*RabbitMQMessageQueue) Complete

func (c *RabbitMQMessageQueue) Complete(ctx context.Context, message *cqueues.MessageEnvelope) (err error)

Permanently removes a message from the queue. This method is usually used to remove the message after successful processing. Important: This method is not supported by RabbitMQ. Parameters:

  • ctx context.Context
  • message a message to remove.

func (*RabbitMQMessageQueue) Configure

func (c *RabbitMQMessageQueue) Configure(ctx context.Context, config *cconf.ConfigParams)

Configures component by passing configuration parameters. Parameters:

  • ctx context.Context

- config configuration parameters to be set.

func (*RabbitMQMessageQueue) EndListen

func (c *RabbitMQMessageQueue) EndListen(ctx context.Context)

Ends listening for incoming messages. When this method is call listen unblocks the thread and execution continues. Parameters:

  • ctx context.Context transaction id to trace execution through call chain.

func (*RabbitMQMessageQueue) IsOpen

func (c *RabbitMQMessageQueue) IsOpen() bool

Checks if the component is opened. Retruns : true if the component has been opened and false otherwise.

func (*RabbitMQMessageQueue) Listen

	Listens for incoming messages and blocks the current thread until queue is closed.
	Parameters:
		- ctx context.Context transaction id to trace execution through call chain.
		- receiver IMessageReceiver callback interface to receive incoming messages.
 Returns: listen error

func (*RabbitMQMessageQueue) MoveToDeadLetter

func (c *RabbitMQMessageQueue) MoveToDeadLetter(ctx context.Context, message *cqueues.MessageEnvelope) (err error)

Permanently removes a message from the queue and sends it to dead letter queue. Important: This method is not supported by RabbitMQ. Parameters:

  • ctx context.Context
  • message a message to be removed.

Returns: error

func (*RabbitMQMessageQueue) Open

Opens the component with given connection and credential parameters. Parameters:

  • ctx context.Context transaction id to trace execution through call chain.
  • connections connection parameters
  • credential credential parameters

func (*RabbitMQMessageQueue) Peek

func (c *RabbitMQMessageQueue) Peek(ctx context.Context) (result *cqueues.MessageEnvelope, err error)

Peeks a single incoming message from the queue without removing it. If there are no messages available in the queue it returns nil. Parameters:

  • ctx context.Context transaction id to trace execution through call chain.

Returns: a message

func (*RabbitMQMessageQueue) PeekBatch

func (c *RabbitMQMessageQueue) PeekBatch(ctx context.Context, messageCount int64) (result []*cqueues.MessageEnvelope, err error)

PeekBatch method are peeks multiple incoming messages from the queue without removing them. If there are no messages available in the queue it returns an empty list. Parameters:

  • ctx context.Context transaction id to trace execution through call chain.
  • messageCount a maximum number of messages to peek.

Returns: a list with messages

func (*RabbitMQMessageQueue) ReadMessageCount

func (c *RabbitMQMessageQueue) ReadMessageCount() (count int64, err error)

ReadMessageCount method are reads the current number of messages in the queue to be delivered. Returns count int64, err error number of messages or error.

func (*RabbitMQMessageQueue) Receive

func (c *RabbitMQMessageQueue) Receive(ctx context.Context, waitTimeout time.Duration) (result *cqueues.MessageEnvelope, err error)
 Receive method are receives an incoming message and removes it from the queue.
	Parameters:
		- ctx context.Context  transaction id to trace execution through call chain.
		- waitTimeout a timeout in milliseconds to wait for a message to come.
	Returns: a message

func (*RabbitMQMessageQueue) RenewLock

func (c *RabbitMQMessageQueue) RenewLock(ctx context.Context, message *cqueues.MessageEnvelope, lockTimeout time.Duration) (err error)

Renews a lock on a message that makes it invisible from other receivers in the queue. This method is usually used to extend the message processing time. Important: This method is not supported by RabbitMQ. Parameters:

  • ctx context.Context
  • message a message to extend its lock.
  • lockTimeout a locking timeout in milliseconds.

func (*RabbitMQMessageQueue) Send

func (c *RabbitMQMessageQueue) Send(ctx context.Context, message *cqueues.MessageEnvelope) (err error)
 Send method are sends a message into the queue.
	Parameters:
		- ctx context.Context transaction id to trace execution through call chain.
		- message a message envelop to be sent.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL