sqs

package
v0.2.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 29, 2024 License: Apache-2.0 Imports: 20 Imported by: 2

Documentation

Index

Constants

View Source
const ComponentID = 6481

go-queueing is 6481: https://github.com/senzing-garage/knowledge-base/blob/main/lists/senzing-product-ids.md

View Source
const Prefix = "rabbitmq: "

Log message prefix.

Variables

View Source
var IDMessages = map[int]string{

	2005: Prefix + "SQS client is setup! QueueURL: %v, Queue name: %v, DeadLetterQueueURL: %v",
	2006: Prefix + "AWS response Message ID: %s",
	2007: Prefix + "Successfully sent %v records to the queue",
	2008: Prefix + "SQS Client delete message: %v",
	2009: Prefix + "SQS Client set message visibility, MessageID: %v",
	2010: Prefix + "Jobs added to job queue:: %v",
	2011: Prefix + "Job ID: %v, Job count: %d",
	2012: Prefix + "Number of consumer workers: %d",
	2013: Prefix + "Number of producer workers: %d",

	3003: Prefix + "Unable to set log level to %s, error: %w",

	4006: Prefix + "Unable to retrieve queue redrive policy, error: %w",
	4007: Prefix + "Error unmarshalling redrive policy, error: %w",
	4008: Prefix + "Error sending to the dead record queue, error: %w",
	4009: Prefix + "Error sending the record, error: %w",
	4010: Prefix + "Error sending the record batch, error: %w",
	4011: Prefix + "Error sending record in batch, MessageID: %v, error: %v",
	4012: Prefix + "Error sending the last record batch, error: %w",
	4013: Prefix + "Error receiving records, error: %v",
	4014: Prefix + "No records found.",
	4015: Prefix + "Error deleting records, error: %v",
	4016: Prefix + "Error changing message visibility, MessageID: %v, error: %v",
	4017: Prefix + "Error getting delivery channel, error: %v",
}

Message templates for g2config implementations.

View Source
var IDStatuses = map[int]string{}

Status strings for specific messages.

Functions

func SetLogLevel

func SetLogLevel(ctx context.Context, logLevelName string) error

The SetLogLevel method sets the level of logging.

Input

  • ctx: A context to control lifecycle.
  • logLevel: The desired log level. TRACE, DEBUG, INFO, WARN, ERROR, FATAL or PANIC.

func StartManagedConsumer

func StartManagedConsumer(ctx context.Context, urlString string, numberOfWorkers int, g2engine g2api.G2engine, withInfo bool, visibilitySeconds int32, logLevel string, jsonOutput bool) error

Starts a number of workers that read Records from the given queue and add them to Senzing. - Workers restart when they are killed or die. - respond to standard system signals.

func StartManagedProducer

func StartManagedProducer(ctx context.Context, urlString string, numberOfWorkers int, recordchan <-chan queues.Record, logLevel string, jsonOutput bool)

Starts a number of workers that push Records in the record channel to the given queue. - Workers restart when they are killed or die. - respond to standard system signals.

Types

type Client

type Client struct {
	DeadLetterQueueURL string
	QueueName          string
	QueueURL           *string
	// desired / default delay durations
	MaxDelay       time.Duration
	ReconnectDelay time.Duration
	ResendDelay    time.Duration
	RoutingKey     string
	// contains filtered or unexported fields
}

func NewClient

func NewClient(ctx context.Context, urlString string, logLevel string, jsonOutput bool) (*Client, error)

New creates a single SQS client

func (*Client) Close

func (client *Client) Close() error

Close will cleanly shutdown the channel and connection.

func (*Client) Consume

func (client *Client) Consume(ctx context.Context, visibilitySeconds int32) (<-chan types.Message, error)

Consume will continuously put queue messages on the channel.

func (*Client) Push

func (client *Client) Push(ctx context.Context, record queues.Record) error

Push will push data onto the queue and wait for a response. TODO: work on resend with delay...

func (*Client) PushBatch

func (client *Client) PushBatch(ctx context.Context, recordchan <-chan queues.Record) error

Push will push data onto the queue and wait for a response. TODO: work on resend with delay????

func (*Client) PushDeadRecord

func (client *Client) PushDeadRecord(ctx context.Context, record types.Message) error

PushDeadRecord will push an erroneous record onto the DLQ. TODO: work on resend with delay...

func (*Client) RemoveMessage

func (client *Client) RemoveMessage(ctx context.Context, msg types.Message) error

Remove a message from the SQS queue

func (*Client) SetMessageVisibility

func (client *Client) SetMessageVisibility(ctx context.Context, msg types.Message, seconds int32) error

Remove a message from the SQS queue

type SQSJob

type SQSJob struct {
	// contains filtered or unexported fields
}

define a structure that will implement the Job interface

func (*SQSJob) Execute

func (j *SQSJob) Execute(ctx context.Context, visibilitySeconds int32) error

Job interface implementation: Execute() is run once for each Job

func (*SQSJob) OnError

func (j *SQSJob) OnError(err error)

Whenever Execute() returns an error or panics, this is called

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL