Documentation ¶
Index ¶
- Constants
- func BatchMessages(messages []Message, chunkSize int) [][]Message
- func DefaultWorkerBackoffFunc(name string, count int, err error)
- type Acker
- type BackoffFunc
- type BatchEnqueuer
- type ChannelQueue
- type Enqueuer
- type HandlerFunc
- type MemoryQueue
- type Message
- type Queue
- type Receiver
- type ReceivingAcker
- type RedisPoolQueue
- func (q *RedisPoolQueue) Ack(_ *Message) error
- func (q *RedisPoolQueue) BatchEnqueue(msgs []Message) error
- func (q *RedisPoolQueue) Enqueue(m *Message) error
- func (q *RedisPoolQueue) Receive(ch chan Message) error
- func (q *RedisPoolQueue) Send(m Message) error
- func (q *RedisPoolQueue) SendBatch(messages []Message) error
- type RedisQueue
- type Runner
- type SQSQueue
- type Worker
- type Writer
Constants ¶
const ( // SQSQueueAttributeTrace is the optional SQS Message attribute that holds a trace ID. SQSQueueAttributeTrace = "trace" // SQSTypeString is used to state the data type of an SQS Message attribute. SQSTypeString = "String" // SQSDefaultMaxMessages is used to take the maximum messages permitted. SQSDefaultMaxMessages = int64(10) // SQSDefaultWaitTime is to specify the maximum time permitted to listen for messages from SQS. SQSDefaultWaitTime = int64(20) )
Variables ¶
This section is empty.
Functions ¶
func BatchMessages ¶
BatchMessages splits a []Message slice into batches of a given size.
Some queue providers such as SQS have limits on batch enqueueing.
Types ¶
type BackoffFunc ¶
type BatchEnqueuer ¶
BatchEnqueuer is an interface for queueing batches of messages.
type ChannelQueue ¶
type ChannelQueue struct {
Input chan Message
}
ChannelQueue is a queue that manages messages over a channel.
Ensure the channel size is sufficient.
func NewChannelQueue ¶
func NewChannelQueue(input chan Message) *ChannelQueue
func (*ChannelQueue) Ack ¶
func (q *ChannelQueue) Ack(m *Message) error
func (*ChannelQueue) BatchEnqueue ¶
func (q *ChannelQueue) BatchEnqueue(msg []Message) error
func (*ChannelQueue) Enqueue ¶
func (q *ChannelQueue) Enqueue(m *Message) error
type HandlerFunc ¶
type MemoryQueue ¶
type MemoryQueue struct {
Items []Message
}
func NewMemoryQueue ¶
func NewMemoryQueue() *MemoryQueue
func (*MemoryQueue) Ack ¶
func (q *MemoryQueue) Ack(m *Message) error
func (*MemoryQueue) BatchEnqueue ¶
func (q *MemoryQueue) BatchEnqueue(msg []Message) error
func (*MemoryQueue) Enqueue ¶
func (q *MemoryQueue) Enqueue(m *Message) error
func (*MemoryQueue) Receive ¶
func (q *MemoryQueue) Receive(ch chan Message) error
type Message ¶
type Message struct { ID string Handle string Payload string Trace string // GroupID is used by FIFO queues to ensure FIFO delivery for groups of messages. GroupID string `json:"group_id,omitempty"` }
Message is carries message data to and from queue implementations.
func BuildMessageFromSQSMessage ¶
func BuildMessageFromSQSMessage(r *events.SQSMessage) *Message
BuildMessageFromSQSEventsMessage converts an events.SQSMessage into a Message.
type Queue ¶
type Queue interface { Receiver Enqueuer BatchEnqueuer Acker }
Queue is the complete interface for a read/write, relaible QuUeue.
type ReceivingAcker ¶
type RedisPoolQueue ¶
type RedisPoolQueue struct { Topic string // contains filtered or unexported fields }
RedisQueue works with a Redis instance to satisfy most of the Queue unterface, except
func NewRedisPoolQueue ¶
func NewRedisPoolQueue(pool *redigo.Pool, topic string) *RedisPoolQueue
func (*RedisPoolQueue) Ack ¶
func (q *RedisPoolQueue) Ack(_ *Message) error
Ack is not support by Redis, so provide a NOOP implementation.
func (*RedisPoolQueue) BatchEnqueue ¶
func (q *RedisPoolQueue) BatchEnqueue(msgs []Message) error
func (*RedisPoolQueue) Enqueue ¶
func (q *RedisPoolQueue) Enqueue(m *Message) error
func (*RedisPoolQueue) Receive ¶
func (q *RedisPoolQueue) Receive(ch chan Message) error
Receive reads messages from the topic and pushes them to the channel.
func (*RedisPoolQueue) Send ¶
func (q *RedisPoolQueue) Send(m Message) error
Enqueue adds a message to the queue.
func (*RedisPoolQueue) SendBatch ¶
func (q *RedisPoolQueue) SendBatch(messages []Message) error
type RedisQueue ¶
type RedisQueue struct { Topic string // contains filtered or unexported fields }
RedisQueue works with a Redis instance to satisfy most of the Queue unterface, except
func NewRedisQueue ¶
func NewRedisQueue(conn redis.Conn, topic string) *RedisQueue
func (*RedisQueue) Ack ¶
func (q *RedisQueue) Ack(_ *Message) error
Ack is not support by Redis, so provide a NOOP implementation.
func (*RedisQueue) BatchEnqueue ¶
func (q *RedisQueue) BatchEnqueue(msgs []Message) error
func (*RedisQueue) Enqueue ¶
func (q *RedisQueue) Enqueue(m *Message) error
func (*RedisQueue) Receive ¶
func (q *RedisQueue) Receive(ch chan<- Message) error
Receive reads messages from the topic and pushes them to the channel.
func (*RedisQueue) Send ¶
func (q *RedisQueue) Send(m Message) error
Enqueue adds a message to the queue.
func (*RedisQueue) SendBatch ¶
func (q *RedisQueue) SendBatch(messages []Message) error
type SQSQueue ¶
SQSQueue is provides access to AWS SQS.
func NewSQSFIFOQueue ¶
NewSQSFIFOQueue returna a new SQSQueue configued to interact with a FIFO queue.
func NewSQSQueue ¶
NewSQSQuere returns a new SQSQueue.
By default the env var AWS_DEFAULT_REGION will be used to determine which region to use.
func (SQSQueue) Ack ¶
Ack implements the Acker interface.
SQS requires specific removal of messages after reading.
func (SQSQueue) BatchEnqueue ¶
BatchEnqueue implements the BatchEnqueuer interface.
type Worker ¶
type Worker struct { Name string Queue ReceivingAcker ChannelWrite chan Message HandleFunc HandlerFunc BackoffFunc BackoffFunc // contains filtered or unexported fields }
func NewWorker ¶
func NewWorker(name string, r ReceivingAcker, h HandlerFunc) *Worker
type Writer ¶
type Writer interface { Enqueuer BatchEnqueuer }