Documentation ¶
Index ¶
- Constants
- Variables
- func SetLogLevel(ctx context.Context, logLevelName string) error
- func StartManagedConsumer(ctx context.Context, urlString string, numberOfWorkers int, ...) error
- func StartManagedProducer(ctx context.Context, urlString string, numberOfWorkers int, ...)
- type Client
- func (client *Client) Close() error
- func (client *Client) Consume(ctx context.Context, visibilitySeconds int32) (<-chan types.Message, error)
- func (client *Client) Push(ctx context.Context, record queues.Record) error
- func (client *Client) PushBatch(ctx context.Context, recordchan <-chan queues.Record) error
- func (client *Client) PushDeadRecord(ctx context.Context, record types.Message) error
- func (client *Client) RemoveMessage(ctx context.Context, msg types.Message) error
- func (client *Client) SetMessageVisibility(ctx context.Context, msg types.Message, seconds int32) error
- type SQSJob
Constants ¶
const ComponentID = 6481
go-queueing is 6481: https://github.com/senzing-garage/knowledge-base/blob/main/lists/senzing-product-ids.md
const Prefix = "rabbitmq: "
Log message prefix.
Variables ¶
var IDMessages = map[int]string{ 2005: Prefix + "SQS client is setup! QueueURL: %v, Queue name: %v, DeadLetterQueueURL: %v", 2006: Prefix + "AWS response Message ID: %s", 2007: Prefix + "Successfully sent %v records to the queue", 2008: Prefix + "SQS Client delete message: %v", 2009: Prefix + "SQS Client set message visibility, MessageID: %v", 2010: Prefix + "Jobs added to job queue:: %v", 2011: Prefix + "Job ID: %v, Job count: %d", 2012: Prefix + "Number of consumer workers: %d", 2013: Prefix + "Number of producer workers: %d", 3003: Prefix + "Unable to set log level to %s, error: %w", 4006: Prefix + "Unable to retrieve queue redrive policy, error: %w", 4007: Prefix + "Error unmarshalling redrive policy, error: %w", 4008: Prefix + "Error sending to the dead record queue, error: %w", 4009: Prefix + "Error sending the record, error: %w", 4010: Prefix + "Error sending the record batch, error: %w", 4011: Prefix + "Error sending record in batch, MessageID: %v, error: %v", 4012: Prefix + "Error sending the last record batch, error: %w", 4013: Prefix + "Error receiving records, error: %v", 4014: Prefix + "No records found.", 4015: Prefix + "Error deleting records, error: %v", 4016: Prefix + "Error changing message visibility, MessageID: %v, error: %v", 4017: Prefix + "Error getting delivery channel, error: %v", }
Message templates for g2config implementations.
var IDStatuses = map[int]string{}
Status strings for specific messages.
Functions ¶
func SetLogLevel ¶
The SetLogLevel method sets the level of logging.
Input
- ctx: A context to control lifecycle.
- logLevel: The desired log level. TRACE, DEBUG, INFO, WARN, ERROR, FATAL or PANIC.
func StartManagedConsumer ¶
func StartManagedConsumer(ctx context.Context, urlString string, numberOfWorkers int, g2engine g2api.G2engine, withInfo bool, visibilitySeconds int32, logLevel string, jsonOutput bool) error
Starts a number of workers that read Records from the given queue and add them to Senzing. - Workers restart when they are killed or die. - respond to standard system signals.
func StartManagedProducer ¶
func StartManagedProducer(ctx context.Context, urlString string, numberOfWorkers int, recordchan <-chan queues.Record, logLevel string, jsonOutput bool)
Starts a number of workers that push Records in the record channel to the given queue. - Workers restart when they are killed or die. - respond to standard system signals.
Types ¶
type Client ¶
type Client struct { DeadLetterQueueURL string QueueName string QueueURL *string // desired / default delay durations MaxDelay time.Duration ReconnectDelay time.Duration ResendDelay time.Duration RoutingKey string // contains filtered or unexported fields }
func NewClient ¶
func NewClient(ctx context.Context, urlString string, logLevel string, jsonOutput bool) (*Client, error)
New creates a single SQS client
func (*Client) Consume ¶
func (client *Client) Consume(ctx context.Context, visibilitySeconds int32) (<-chan types.Message, error)
Consume will continuously put queue messages on the channel.
func (*Client) Push ¶
Push will push data onto the queue and wait for a response. TODO: work on resend with delay...
func (*Client) PushBatch ¶
Push will push data onto the queue and wait for a response. TODO: work on resend with delay????
func (*Client) PushDeadRecord ¶
PushDeadRecord will push an erroneous record onto the DLQ. TODO: work on resend with delay...
func (*Client) RemoveMessage ¶
Remove a message from the SQS queue