queue

package module
v0.0.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 4, 2019 License: Apache-2.0 Imports: 10 Imported by: 2

README

GoDoc Go Report Card

Overview

A simple POC showcasing how you can use Amazon DocumentDB (with MongoDB compatibility) as a message queue. This queue is designed for at-least-once delivery. With this in mind, it is important to bake in idempotence into your applications because messages can be delivered multiple times. When a message is enqueued, you must specify a timeout parameter which is approximately the maximum amount of time the queue will allow before the message is made available again to be dequeued. After a message is dequeued, the clock starts ticking on the visibility timeout and you must call the Done function on the message, or it will be delivered to another process.

Note

This uses the MongoDB Go Driver which at the time this was written is currently a beta release.

This library has not been run in production and is not ready for prime time. There are bugs and logic errors that will cause production issues, so please only use as a reference of what is possible.

Requirements

This has only been tested on Go 1.11 with MongoDB 3.6.9 (for local development) and docdb3.6 (cloud).

Use

NewQueue

In the NewQueue function call, the parameters are:

if queue, err = NewQueue("test", "queue", connectionUri, "local.pem", time.Second*5); err != nil {
  // Error - failed to connect to the database, there was an issue with the URI or the pem file
}
Enqueue

Once you have the queue client, you can add entries to the queue using the Enqueue function:

Parameters:

  • The context
  • The message payload (string)
  • The visibility timeout
if err := queue.Enqueue(context.TODO(), "this is a test", 30); err != nil {
  // Handle the error
}
Dequeue

To get the latest message from the queue, call the Dequeue function:

Parameters:

  • The context
if msg, err := queue.Dequeue(context.TODO()); err != nil {
  // Handle the error
} else {
  // Process the message

  if err = msg.Done(context.TODO()); err != nil {
    // It is possible that the msg was deleted by another process.
    // If the database is unavailable, the message wil be processed again
    // later when it is available.
  }
}
Listen

While the Dequeue method is available, it is highly recommended that you use the Listen function on the queue struct, which returns a channel. Additionally, the listen approach also includes throttling (via exponential backoff) in case there are database errors or no messages in the queue. If you call the Listen function, then you must call the StopListen function to close the channel and stop the goroutine(s).

Parameters:

  • The number of goroutine(s) to spawn that call Dequeue
channel := queue.Listen(2)

// This range over the channel will exit when you call StopListen, because
// the channel is closed
for msg := range channel {

  // Process the message

  if err := msg.Done(context.TODO()); err != nil {
    // Handle the error
  }
}

// When you are done using the message queue, stop listening
queue.StopListen()

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Queue

type Queue struct {
	// contains filtered or unexported fields
}

Queue is queue processing struct. Keep a handle to this struct for sending and receiving messages.

func NewQueue

func NewQueue(
	dbName,
	collectionName,
	connectionUri,
	caFile string,
	timeout time.Duration,
) (*Queue, error)

NewQueue creates new new queue struct. This method panics if the timeout is negative, or any of the string parameters has a length of zero.

func (*Queue) Dequeue

func (q *Queue) Dequeue(
	ctx context.Context,
) (*QueueMessage, error)

Dequeue pulls the next item off the queue (if available). You must call the Done function on the message when you are done processing or it will timeout and be made visible again. If not entries are available, nil is returned.

func (*Queue) Enqueue

func (q *Queue) Enqueue(
	ctx context.Context,
	payload string,
	visibility int,
) error

Enqueue inserts a new item into the queue. This allows for an empty payload. If visibility is negative, this will panic.

func (*Queue) Listen

func (q *Queue) Listen(count int) <-chan *QueueMessage

Listen returns a channel and polls the database for new messages in separate goroutine(s). The channel created does not buffer. If you call Listen, you must call StopListen on process shutdown, which will close the channel. The count param indicates the number of goroutines to spawn to query the database for new entries. If count is less than 1, this method panics.

func (*Queue) Size

func (q *Queue) Size(ctx context.Context) (int64, error)

Size provides the total depth of the message queue.

func (*Queue) StopListen

func (q *Queue) StopListen()

StopListen must be called when you are ready to shutdown the Listen call. This closes the channel returned by the Listen call and terminates the goroutines that are call Dequeue.

type QueueMessage

type QueueMessage struct {
	Id         *primitive.ObjectID `json:"id" bson:"_id"`
	Version    *primitive.ObjectID `json:"version" bson:"version"`
	Visibility int                 `json:"visibility" bson:"visibility"` // Visibility timeout is in seconds
	Created    *time.Time          `json:"created" bson:"created"`
	Payload    string              `json:"payload" bson:"payload"`
	Started    *time.Time          `json:"started" bson:"started"`
	Dequeued   *time.Time          `json:"dequeued" bson:"dequeued"`
	Expire     *time.Time          `json:"expire" bson:"expire"`
	// contains filtered or unexported fields
}

QueueMessage is the queue message structure.

func (*QueueMessage) Done

func (m *QueueMessage) Done(ctx context.Context) error

Done tries to delete the message from the queue. If the visibility expired and the entry was updated, the version will not match and this method will return an error. The error is simply informational because the entry will be made available for another worker/processor. Reminder, this queue is for idempotent workloads.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL