queue

package module
v1.1.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 5, 2022 License: MIT Imports: 14 Imported by: 1

README

Queue

A package that provides queue implementations for Redis (RedisPool) and SQS (SQSQueue).

Licence

The MIT License (MIT)

Copyright (c) 2022 Scott Barr

See LICENSE

Documentation

Index

Constants

View Source
const (
	// SQSQueueAttributeTrace is the optional SQS Message attribute that holds a trace ID.
	SQSQueueAttributeTrace = "trace"

	// SQSTypeString is used to state the data type of an SQS Message attribute.
	SQSTypeString = "String"

	// SQSDefaultMaxMessages is used to take the maximum messages permitted.
	SQSDefaultMaxMessages = int64(10)

	// SQSDefaultWaitTime is to specify the maximum time permitted to listen for messages from SQS.
	SQSDefaultWaitTime = int64(20)
)

Variables

This section is empty.

Functions

func BatchMessages

func BatchMessages(messages []Message, chunkSize int) [][]Message

BatchMessages splits a []Message slice into batches of a given size.

Some queue providers such as SQS have limits on batch enqueueing.

func DefaultWorkerBackoffFunc

func DefaultWorkerBackoffFunc(name string, count int, err error)

Types

type Acker

type Acker interface {
	Ack(*Message) error
}

type BackoffFunc

type BackoffFunc func(string, int, error)

type BatchEnqueuer

type BatchEnqueuer interface {
	BatchEnqueue([]Message) error
}

BatchEnqueuer is an interface for queueing batches of messages.

type ChannelQueue

type ChannelQueue struct {
	Input chan Message
}

ChannelQueue is a queue that manages messages over a channel.

Ensure the channel size is sufficient.

func NewChannelQueue

func NewChannelQueue(input chan Message) *ChannelQueue

func (*ChannelQueue) Ack

func (q *ChannelQueue) Ack(m *Message) error

func (*ChannelQueue) BatchEnqueue

func (q *ChannelQueue) BatchEnqueue(msg []Message) error

func (*ChannelQueue) Enqueue

func (q *ChannelQueue) Enqueue(m *Message) error

func (*ChannelQueue) Receive

func (q *ChannelQueue) Receive(ch chan Message, done chan bool) error

type Enqueuer

type Enqueuer interface {
	Enqueue(*Message) error
}

Enqueuer is an interface for queueing messages.

type HandlerFunc

type HandlerFunc func(Message) error

type MemoryQueue

type MemoryQueue struct {
	Items []Message
}

func NewMemoryQueue

func NewMemoryQueue() *MemoryQueue

func (*MemoryQueue) Ack

func (q *MemoryQueue) Ack(m *Message) error

func (*MemoryQueue) BatchEnqueue

func (q *MemoryQueue) BatchEnqueue(msg []Message) error

func (*MemoryQueue) Enqueue

func (q *MemoryQueue) Enqueue(m *Message) error

func (*MemoryQueue) Receive

func (q *MemoryQueue) Receive(ch chan Message) error

type Message

type Message struct {
	ID      string
	Handle  string
	Payload string
	Trace   string

	// GroupID is used by FIFO queues to ensure FIFO delivery for groups of messages.
	GroupID string `json:"group_id,omitempty"`
}

Message is carries message data to and from queue implementations.

func BuildMessageFromSQSMessage

func BuildMessageFromSQSMessage(r *events.SQSMessage) *Message

BuildMessageFromSQSEventsMessage converts an events.SQSMessage into a Message.

type Queue

type Queue interface {
	Receiver
	Enqueuer
	BatchEnqueuer
	Acker
}

Queue is the complete interface for a read/write, relaible QuUeue.

func New

func New(uri string) (Queue, error)

type Receiver

type Receiver interface {
	Receive(chan Message) error
}

type ReceivingAcker

type ReceivingAcker interface {
	Receiver
	Acker
}

type RedisPoolQueue

type RedisPoolQueue struct {
	Topic string
	// contains filtered or unexported fields
}

RedisQueue works with a Redis instance to satisfy most of the Queue unterface, except

func NewRedisPoolQueue

func NewRedisPoolQueue(pool *redigo.Pool, topic string) *RedisPoolQueue

func (*RedisPoolQueue) Ack

func (q *RedisPoolQueue) Ack(_ *Message) error

Ack is not support by Redis, so provide a NOOP implementation.

func (*RedisPoolQueue) BatchEnqueue

func (q *RedisPoolQueue) BatchEnqueue(msgs []Message) error

func (*RedisPoolQueue) Enqueue

func (q *RedisPoolQueue) Enqueue(m *Message) error

func (*RedisPoolQueue) Receive

func (q *RedisPoolQueue) Receive(ch chan Message) error

Receive reads messages from the topic and pushes them to the channel.

func (*RedisPoolQueue) Send

func (q *RedisPoolQueue) Send(m Message) error

Enqueue adds a message to the queue.

func (*RedisPoolQueue) SendBatch

func (q *RedisPoolQueue) SendBatch(messages []Message) error

type RedisQueue

type RedisQueue struct {
	Topic string
	// contains filtered or unexported fields
}

RedisQueue works with a Redis instance to satisfy most of the Queue unterface, except

func NewRedisQueue

func NewRedisQueue(conn redis.Conn, topic string) *RedisQueue

func (*RedisQueue) Ack

func (q *RedisQueue) Ack(_ *Message) error

Ack is not support by Redis, so provide a NOOP implementation.

func (*RedisQueue) BatchEnqueue

func (q *RedisQueue) BatchEnqueue(msgs []Message) error

func (*RedisQueue) Enqueue

func (q *RedisQueue) Enqueue(m *Message) error

func (*RedisQueue) Receive

func (q *RedisQueue) Receive(ch chan<- Message) error

Receive reads messages from the topic and pushes them to the channel.

func (*RedisQueue) Send

func (q *RedisQueue) Send(m Message) error

Enqueue adds a message to the queue.

func (*RedisQueue) SendBatch

func (q *RedisQueue) SendBatch(messages []Message) error

type Runner

type Runner interface {
	Start() error
	Stop()
}

type SQSQueue

type SQSQueue struct {
	URL         string
	FIFO        bool
	MaxMessages int64
	WaitTime    int64
}

SQSQueue is provides access to AWS SQS.

func NewSQSFIFOQueue

func NewSQSFIFOQueue(url string) SQSQueue

NewSQSFIFOQueue returna a new SQSQueue configued to interact with a FIFO queue.

func NewSQSQueue

func NewSQSQueue(url string) SQSQueue

NewSQSQuere returns a new SQSQueue.

By default the env var AWS_DEFAULT_REGION will be used to determine which region to use.

func (SQSQueue) Ack

func (s SQSQueue) Ack(m *Message) error

Ack implements the Acker interface.

SQS requires specific removal of messages after reading.

func (SQSQueue) BatchEnqueue

func (s SQSQueue) BatchEnqueue(messages []Message) error

BatchEnqueue implements the BatchEnqueuer interface.

func (SQSQueue) Enqueue

func (s SQSQueue) Enqueue(m *Message) error

Enqueue implements the Enqueuer interface.

func (SQSQueue) Receive

func (s SQSQueue) Receive(messages chan Message) error

Receive takes messages from SQS and writes them to the channel.

func (SQSQueue) Send

func (s SQSQueue) Send(m Message) error

Send writes a message to SQS.

func (SQSQueue) SendBatch

func (s SQSQueue) SendBatch(messages []Message) error

SendBatch sends a batch of messages to SQS.

type Worker

type Worker struct {
	Name         string
	Queue        ReceivingAcker
	ChannelWrite chan Message
	HandleFunc   HandlerFunc
	BackoffFunc  BackoffFunc
	// contains filtered or unexported fields
}

func NewWorker

func NewWorker(name string, r ReceivingAcker, h HandlerFunc) *Worker

func (*Worker) Start

func (w *Worker) Start() error

func (*Worker) Stop

func (w *Worker) Stop()

type Writer

type Writer interface {
	Enqueuer
	BatchEnqueuer
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL