queue

package
v1.1.28 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 1, 2024 License: MIT Imports: 13 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrQueueDoesNotExist is returned when the MultipleSQSProducer tries to send
	// a message to a queue that is not defined.
	ErrQueueDoesNotExist = errors.New("queue does not exist")
)

Functions

This section is empty.

Types

type Config

type Config struct {
	NumberOfProcessors uint8  `mapstructure:"number_of_processors"`
	Endpoint           string `mapstructure:"endpoint"`
	QueueArn           string `mapstructure:"queue_arn"`
	WaitTime           int64  `mapstructure:"wait_time"`
	Timeout            int64  `mapstructure:"timeout"`
	Disabled           bool   `mapstructure:"disabled"`
	LogMessages        bool   `mapstructure:"log_messages"`
}

Config holds the required sqs config information.

type MessageProcessor

type MessageProcessor interface {
	Process(context.Context, []byte) error
}

MessageProcessor defines the functions required by the SQSConsumer to process a message.

type MultiSQSProducer

type MultiSQSProducer struct {
	// contains filtered or unexported fields
}

MultiSQSProducer allows to send messages to different named queues.

func NewMultiSQSProducer

func NewMultiSQSProducer(queues map[string]string, endpoint string, log log.Logger) (*MultiSQSProducer, error)

NewMultiSQSProducer creates a new MultipleSQSProducer given a map containing the name of the queues as keys and the ARN for those queues as values.

func (*MultiSQSProducer) Send

func (m *MultiSQSProducer) Send(queueName string, body string) error

Send send a message to a queue with the given name. If the queue is not defined in the producer a QueueDoesNotExistError is returned.

type SQSConsumer

type SQSConsumer struct {
	// contains filtered or unexported fields
}

SQSConsumer reads and consumes sqs messages.

func NewConsumer

func NewConsumer(c Config, processor MessageProcessor, store messageStore, log log.Logger) (*SQSConsumer, error)

NewConsumer creates and initializes an SQSConsumer.

func (*SQSConsumer) StarProcessing

func (s *SQSConsumer) StarProcessing(ctx context.Context, wg *sync.WaitGroup)

StarProcessing stars processing messages by reading from the queue an passing them to the MessageProcessor.

type SQSProducer

type SQSProducer struct {
	// contains filtered or unexported fields
}

SQSProducer reads and consumes sqs messages.

func NewSQSProducer

func NewSQSProducer(queueARN string, endpoint string, log log.Logger) (*SQSProducer, error)

NewSQSProducer creates a new SQSProducer that allows to send messages to the given queueARN.

func (*SQSProducer) SendMessage

func (s *SQSProducer) SendMessage(body string) error

SendMessage sends a message to the producer defined queue.

type UpdateProcessorGroup

type UpdateProcessorGroup struct {
	// contains filtered or unexported fields
}

UpdateProcessorGroup defines a group of check notification processors.

func NewUpdateProcessorGroup

func NewUpdateProcessorGroup(c Config, processor MessageProcessor, store messageStore, log log.Logger) (UpdateProcessorGroup, error)

NewUpdateProcessorGroup creates a ProcessorGroup using the given config, message processor and logger.

func (*UpdateProcessorGroup) StartProcessing

func (u *UpdateProcessorGroup) StartProcessing(ctx context.Context)

StartProcessing signals all the consumers in the group to start processing messages in the queue.

func (*UpdateProcessorGroup) WaitFinish

func (u *UpdateProcessorGroup) WaitFinish()

WaitFinish locks the calling goroutine until all the consumers finished processing messages.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL