queues

package
v1.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 7, 2022 License: MIT Imports: 12 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type KafkaMessageQueue

type KafkaMessageQueue struct {
	*cqueues.MessageQueue

	// The dependency resolver.
	DependencyResolver *cref.DependencyResolver
	// The logger.
	Logger *clog.CompositeLogger
	// The Kafka connection component.
	Connection *connect.KafkaConnection
	// contains filtered or unexported fields
}

KafkaMessageQueue are message queue that sends and receives messages via Kafka message broker.

Configuration parameters:

  • topic: name of Kafka topic to subscribe
  • group_id: (optional) consumer group id (default: default)
  • from_beginning: (optional) restarts receiving messages from the beginning (default: false)
  • read_partitions: (optional) number of partitions to be consumed concurrently (default: 1)
  • autocommit: (optional) turns on/off autocommit (default: true)
  • connection(s):
  • discovery_key: (optional) a key to retrieve the connection from IDiscovery
  • host: host name or IP address
  • port: port number
  • uri: resource URI or connection string with all parameters in it
  • credential(s):
  • store_key: (optional) a key to retrieve the credentials from ICredentialStore
  • username: user name
  • password: user password
  • options:
  • read_partitions: (optional) list of partition indexes to be read (default: all, set for example: "1;5;7")
  • write_partition: (optional) list of partition indexes to be read (default: auto (-1))
  • autosubscribe: (optional) true to automatically subscribe on option (default: false)
  • log_level: (optional) log level 0 - None, 1 - Error, 2 - Warn, 3 - Info, 4 - Debug (default: 1)
  • connect_timeout: (optional) number of milliseconds to connect to broker (default: 1000)
  • max_retries: (optional) maximum retry attempts (default: 5)
  • retry_timeout: (optional) number of milliseconds to wait on each reconnection attempt (default: 30000)
  • request_timeout: (optional) number of milliseconds to wait on flushing messages (default: 30000)

References:

  • *:logger:*:*:1.0 (optional) ILogger components to pass log messages
  • *:counters:*:*:1.0 (optional) ICounters components to pass collected measurements
  • *:discovery:*:*:1.0 (optional) IDiscovery services to resolve connections
  • *:credential-store:*:*:1.0 (optional) Credential stores to resolve credentials
  • *:connection:kafka:*:1.0 (optional) Shared connection to Kafka service

See MessageQueue See MessagingCapabilities

Example:

	ctx := context.Context()
    queue := NewKafkaMessageQueue("myqueue")
    queue.Configure(ctx, cconf.NewConfigParamsFromTuples(
      "subject", "mytopic",
      "connection.protocol", "kafka",
      "connection.host", "localhost",
      "connection.port", 1883,
    ))

    _ = queue.Open(ctx, "123")

    _ = queue.Send(ctx, "123", NewMessageEnvelope("", "mymessage", "ABC"))

    message, err := queue.Receive(ctx, "123", 10000*time.Milliseconds)
	if (message != nil) {
		...
		queue.Complete(ctx, message);
	}

func NewKafkaMessageQueue

func NewKafkaMessageQueue(name string) *KafkaMessageQueue

Creates a new instance of the queue component. Parameters:

  • name (optional) a queue name.

func (*KafkaMessageQueue) Abandon

func (c *KafkaMessageQueue) Abandon(ctx context.Context, message *cqueues.MessageEnvelope) error
	Abandon method are returnes message into the queue and makes it available for all subscribers to receive it again.
	This method is usually used to return a message which could not be processed at the moment
	to repeat the attempt. Messages that cause unrecoverable errors shall be removed permanently
	or/and send to dead letter queue.
	Parameters:
		- ctx context.Context	operation context
		- message *cqueues.MessageEnvelope  a message to return.
	Returns: error
 error or nil for success.

func (*KafkaMessageQueue) Cleanup

Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited

func (*KafkaMessageQueue) Clear

func (c *KafkaMessageQueue) Clear(ctx context.Context, correlationId string) error

Clear method are clears component state. Parameters:

  • ctx context.Context operation context
  • correlationId string (optional) transaction id to trace execution through call chain.

Returns error or nil no errors occured.

func (*KafkaMessageQueue) Close

func (c *KafkaMessageQueue) Close(ctx context.Context, correlationId string) (err error)

Closes component and frees used resources.

  • correlationId (optional) transaction id to trace execution through call chain.
  • Returns error or nil no errors occured.

func (*KafkaMessageQueue) Complete

func (c *KafkaMessageQueue) Complete(ctx context.Context, message *cqueues.MessageEnvelope) error

Complete method are permanently removes a message from the queue. This method is usually used to remove the message after successful processing. Parameters:

  • ctx context.Context operation context
  • message *cqueues.MessageEnvelope a message to remove.

Returns: error error or nil for success.

func (*KafkaMessageQueue) Configure

func (c *KafkaMessageQueue) Configure(ctx context.Context, config *cconf.ConfigParams)

Configures component by passing configuration parameters. Parameters:

  • ctx context.Context operation context
  • config configuration parameters to be set.

func (*KafkaMessageQueue) ConsumeClaim

ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().

func (*KafkaMessageQueue) EndListen

func (c *KafkaMessageQueue) EndListen(ctx context.Context, correlationId string)

EndListen method are ends listening for incoming messages. When this method is call listen unblocks the thread and execution continues. Parameters:

  • ctx context.Context operation context
  • correlationId string (optional) transaction id to trace execution through call chain.

func (*KafkaMessageQueue) IsOpen

func (c *KafkaMessageQueue) IsOpen() bool

Checks if the component is opened. Returns true if the component has been opened and false otherwise.

func (*KafkaMessageQueue) Listen

func (c *KafkaMessageQueue) Listen(ctx context.Context, correlationId string, receiver cqueues.IMessageReceiver) error

Listens for incoming messages and blocks the current thread until queue is closed. Parameters:

  • ctx context.Context operation context
  • correlationId string (optional) transaction id to trace execution through call chain.
  • receiver cqueues.IMessageReceiver a receiver to receive incoming messages.

See IMessageReceiver See receive

func (*KafkaMessageQueue) MoveToDeadLetter

func (c *KafkaMessageQueue) MoveToDeadLetter(ctx context.Context, message *cqueues.MessageEnvelope) error

Permanently removes a message from the queue and sends it to dead letter queue. Important: This method is not supported by Kafka. Parameters:

  • ctx context.Context operation context
  • message *cqueues.MessageEnvelope a message to be removed.

Returns: error error or nil for success.

func (*KafkaMessageQueue) OnMessage

func (c *KafkaMessageQueue) OnMessage(ctx context.Context, msg *connect.KafkaMessage)

Callback for processing messages from kafka Parameters:

  • ctx context.Context operation context
  • msg *connect.KafkaMessage consumer message

func (*KafkaMessageQueue) Open

func (c *KafkaMessageQueue) Open(ctx context.Context, correlationId string) (err error)

Opens the component. Parameters:

  • ctx context.Context operation context
  • correlationId (optional) transaction id to trace execution through call chain.
  • Returns error or nil no errors occured.

func (*KafkaMessageQueue) Peek

func (c *KafkaMessageQueue) Peek(ctx context.Context, correlationId string) (*cqueues.MessageEnvelope, error)

Peek method are peeks a single incoming message from the queue without removing it. If there are no messages available in the queue it returns nil. Parameters:

  • ctx context.Context operation context
  • correlationId string (optional) transaction id to trace execution through call chain.

Returns: result *cqueues.MessageEnvelope, err error message or error.

func (*KafkaMessageQueue) PeekBatch

func (c *KafkaMessageQueue) PeekBatch(ctx context.Context, correlationId string, messageCount int64) ([]*cqueues.MessageEnvelope, error)

PeekBatch method are peeks multiple incoming messages from the queue without removing them. If there are no messages available in the queue it returns an empty list. Important: This method is not supported by Kafka. Parameters:

  • ctx context.Context operation context
  • correlationId (optional) transaction id to trace execution through call chain.
  • messageCount a maximum number of messages to peek.

Returns: callback function that receives a list with messages or error.

func (*KafkaMessageQueue) ReadMessageCount

func (c *KafkaMessageQueue) ReadMessageCount() (int64, error)

ReadMessageCount method are reads the current number of messages in the queue to be delivered. Returns number of messages or error.

func (*KafkaMessageQueue) Ready

func (c *KafkaMessageQueue) Ready() chan bool

Returns: channel with bool flag ready

func (*KafkaMessageQueue) Receive

func (c *KafkaMessageQueue) Receive(ctx context.Context, correlationId string, waitTimeout time.Duration) (*cqueues.MessageEnvelope, error)

Receive method are receives an incoming message and removes it from the queue. Parameters:

  • ctx context.Context operation context
  • correlationId string (optional) transaction id to trace execution through call chain.
  • waitTimeout time.Duration a timeout in milliseconds to wait for a message to come.

Returns: result *cqueues.MessageEnvelope, err error receives a message or error.

func (*KafkaMessageQueue) RenewLock

func (c *KafkaMessageQueue) RenewLock(ctx context.Context, message *cqueues.MessageEnvelope, lockTimeout time.Duration) (err error)

RenewLock method are renews a lock on a message that makes it invisible from other receivers in the queue. This method is usually used to extend the message processing time. Important: This method is not supported by Kafka. Parameters:

  • ctx context.Context operation context
  • message *cqueues.MessageEnvelope a message to extend its lock.
  • lockTimeout time.Duration a locking timeout in milliseconds.

Returns: error receives an error or nil for success.

func (*KafkaMessageQueue) Send

func (c *KafkaMessageQueue) Send(ctx context.Context, correlationId string, envelop *cqueues.MessageEnvelope) error

Send method are sends a message into the queue. Parameters:

  • ctx context.Context operation context
  • correlationId string (optional) transaction id to trace execution through call chain.
  • envelope *cqueues.MessageEnvelope a message envelop to be sent.

Returns: error or nil for success.

func (*KafkaMessageQueue) SetReady

func (c *KafkaMessageQueue) SetReady(chFlag chan bool)

Set bool channel with ready flag for consumer

Parameters:
	- chFlag	bool channel

func (*KafkaMessageQueue) SetReferences

func (c *KafkaMessageQueue) SetReferences(ctx context.Context, references cref.IReferences)

Sets references to dependent components. Parameters:

  • ctx context.Context
  • references references to locate the component dependencies.

func (*KafkaMessageQueue) Setup

Setup is run at the beginning of a new session, before ConsumeClaim Send ready flag into channel Returns: error

func (*KafkaMessageQueue) UnsetReferences

func (c *KafkaMessageQueue) UnsetReferences(ctx context.Context)

Unsets (clears) previously set references to dependent components. Parameters:

  • ctx context.Context operation context

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL