singu

package module
v0.1.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 28, 2020 License: MIT Imports: 9 Imported by: 0

README

singu

Go Report Card GoDoc

singu designs a unified and simple API to interact with various message queue implementations:

  • Enqueue/Dequeue messages.
  • Retrieve list of orphan messages.

Latest release v0.1.1.

Queue Usage Flow

  • Create an IQueue instance. Pre-made implementations can be used out-of-the-box, see below.
  • Call IQueue.Queue(...) enqueue a message.
  • Call IQueue.Take(...) to dequeue a message.
  • Do something with the message.
    • When done, call IQueue.Finish(msgId)
    • If not done and the message need to be re-queued, call IQueue.Requeue(msgId, ...) msg) to put back the message to queue.

Queue Storage Implementation

Queue has 2 message storages:

  • Queue storage: (required) main storage where messages are enqueued and dequeued. Queue storage is implemented as FIFO list.
  • Ephemeral storage: (optional) messages taken from queue storage are temporarily stored in ephemeral storage until finished or re-queued.
  • IQueue.Take() moves the dequeued message from queue to ephemeral storage.
  • IQueue.Finish(...) removes the message from ephemeral storage.
  • On the other hand, IQueue.Requeue(...) moves back the message from ephemeral to queue storage.

Queue implementation is required to implement Queue storage. Ephemeral storage is optional.

The idea of the ephemeral storage is to make sure messages are not lost in the case the application crashes in between IQueue.Take() and IQueue.Finish(...) (or IQueue.Requeue(...)).

Orphan Messages

If the application crashes in between IQueue.Take() and IQueue.Finish(...) (or IQueue.Requeue(...)), there could be orphan messages left in the ephemeral storage. To deal with orphan messages:

  • Call IQueue.OrphanMessages(numSeconds, numMessages int) ([]*QueueMessage, error) to retrieve all messages that have been staying in the ephemeral storage for more than numSeconds seconds.
  • Call IQueue.Finish(...) on each message to completely remove the orphan message, or
  • Call IQueue.Requeue(...) to re-queue the message.

Built-in Queue Implementations

Implementation Bounded Size Persistent Ephemeral Storage Multi-Clients
In-memory Optional No Yes No
LevelDB Optional Yes (*) Yes No
  • Bounded Size: size of queue/ephemeral storage is bounded.
    • Queue implementation can set a hard limit on maximum number of messages can be stored in queue/ephemeral storage.
    • If no hard limit is set:
      • In-memory queue: number of messages is limited by memory capacity.
      • LevelDB queue: number of messages is limited by disk capacity.
  • Persistent: queue messages are persistent between application restarts.
  • Ephemeral Storage: supports retrieval of orphan messages.
  • Multi-Clients: multi-clients can share a same queue backend storage.
In-memory Queue

The built-in in-memory queue implementation implements queue storage using a FIFO linked-list and ephemeral storage using a map.

Messages in in-memory queues are not persistent between application restarts.

LevelDB Queue

GoDoc

The built-in LevelDB queue implementation uses LevelDB as storage backend.

Messages in LevelDB queues are not persistent between application restarts.

License

MIT - see LICENSE.md.

Documentation

Overview

Package singu defines queue message struct and queue API.

Index

Constants

View Source
const (
	// SizeNotSupported is returned if queue implementation does not support counting number of messages in storage
	SizeNotSupported = -1
)
View Source
const (
	// Version of singu
	Version = "0.1.1"
)

Variables

View Source
var (
	// ErrorOperationNotSupported is returned when the queue implementation does not support the invoked operation
	ErrorOperationNotSupported = errors.New("operation not supported")

	// ErrorQueueIsFull is returned when queue storage is full and can not accept any more message
	ErrorQueueIsFull = errors.New("queue storage is full")

	// ErrorEphemeralIsFull is returned when ephemeral storage is full and can not accept any more message
	ErrorEphemeralIsFull = errors.New("ephemeral storage is full")
)

Functions

func UniqueId

func UniqueId() string

UniqueId returns a unique id as string

Types

type IQueue

type IQueue interface {
	// Name returns queue's name.
	Name() string

	// QueueStorageCapacity returns max number of message queue storage can hold, or SizeNotSupported if queue storage has unlimited capacity.
	QueueStorageCapacity() (int, error)

	// EphemeralStorageCapacity returns max number of message ephemeral storage can hold, or SizeNotSupported if ephemeral storage has unlimited capacity.
	EphemeralStorageCapacity() (int, error)

	// IsEphemeralStorageEnabled returns true if ephemeral storage is supported, false otherwise.
	IsEphemeralStorageEnabled() bool

	// Queue enqueues a message: put the message to the tail of queue storage.
	// This function returns the enqueued QueueMessage with Id and QueueTimestamp fields filled.
	Queue(msg *QueueMessage) (*QueueMessage, error)

	// Requeue moves the enqueued message from ephemeral back to queue storage.
	//	- id: id of the message to be re-queued
	//	- silent: if true, message's requeue count and queue timestamp will not be updated; if false, message's requeue count is increased and queue timestamp is updated
	//
	// This function returns the enqueued QueueMessage with Id and QueueTimestamp fields filled.
	//
	// Notes:
	//	- message is put to head or tail of queue storage depending on queue implementation
	Requeue(id string, silent bool) (*QueueMessage, error)

	// Finish is called to signal that the message can now be removed from ephemeral storage.
	Finish(id string) error

	// Take dequeues a message: move a message from the head of queue storage to ephemeral storage and return the message.
	// Nil is returned if queue storage is empty.
	Take() (*QueueMessage, error)

	// OrphanMessages returns all messages that have been staying in ephemeral storage for more than a specific number of seconds.
	//	- numSeconds: messages older than <numSeconds> will be returned
	//	- numMessages: limit number of returned messages, value less than or equal to zero means 'no limit'
	//
	// Note: order of returned messages depends on queue implementation
	OrphanMessages(numSeconds, numMessages int) ([]*QueueMessage, error)

	// QueueSize returns number messages currently in queue storage.
	QueueSize() (int, error)

	// EphemeralSize returns number messages currently in ephemeral storage.
	EphemeralSize() (int, error)
}

IQueue defines API to access queue messages.

Queue implementation:

  • Queue storage to store queue messages. Messages are put to the tail and taken from the head of queue storage in FIFO manner.
  • Messages taken from queue storage are temporarily stored in ephemeral storage until Finish or Requeue is called.
  • Ephemeral storage is optional, depends on queue implementation.

Queue usage flow:

  • Create a IQueue instance.
  • Call IQueue.queue(msg) to put messages to queue.
  • Call IQueue.take() to take messages from queue.
  • Do something with the message.
  • When done, call IQueue.finish(id)
  • If not done and the message needs to be re-queued, call IQueue.requeue(id, true/false) to put the message back to queue.

func NewInmemQueue

func NewInmemQueue(name string, queueCapacity int, ephemeralDisabled bool, ephemeralCapacity int) IQueue

NewInmemQueue creates a new InmemQueue instance.

  • name: queue's name
  • queueCapacity: if zero or negative queue storage has unlimited capacity; otherwise number of messages can be stored in queue storage is capped by the specified number
  • ephemeralCapacity: if zero or negative ephemeral storage has unlimited capacity; otherwise ephemeral storage is capped by the specified number

type InmemQueue

type InmemQueue struct {
	// contains filtered or unexported fields
}

InmemQueue is in-memory queue implementation.

  • If queue message's id is not set, this queue implementation will assign one. Otherwise, the pre-set message id is used.

func (*InmemQueue) Destroy

func (q *InmemQueue) Destroy()

Destroy cleans up the queue instance

func (*InmemQueue) EphemeralSize

func (q *InmemQueue) EphemeralSize() (int, error)

EphemeralSize implements IQueue.EphemeralSize

func (*InmemQueue) EphemeralStorageCapacity

func (q *InmemQueue) EphemeralStorageCapacity() (int, error)

EphemeralStorageCapacity implements IQueue.EphemeralStorageCapacity

func (*InmemQueue) Finish

func (q *InmemQueue) Finish(id string) error

Finish implements IQueue.Finish

func (*InmemQueue) Init

func (q *InmemQueue) Init() error

Init initializes the queue instance

func (*InmemQueue) IsEphemeralStorageEnabled

func (q *InmemQueue) IsEphemeralStorageEnabled() bool

IsEphemeralStorageEnabled implements IQueue.IsEphemeralStorageEnabled

func (*InmemQueue) Name

func (q *InmemQueue) Name() string

Name implements IQueue.Name

func (*InmemQueue) OrphanMessages

func (q *InmemQueue) OrphanMessages(numSeconds, numMessages int) ([]*QueueMessage, error)

OrphanMessages implements IQueue.OrphanMessages

func (*InmemQueue) Queue

func (q *InmemQueue) Queue(msg *QueueMessage) (*QueueMessage, error)

Queue implements IQueue.Queue

func (*InmemQueue) QueueSize

func (q *InmemQueue) QueueSize() (int, error)

QueueSize implement IQueue.QueueSize

func (*InmemQueue) QueueStorageCapacity

func (q *InmemQueue) QueueStorageCapacity() (int, error)

QueueStorageCapacity implements IQueue.QueueStorageCapacity

func (*InmemQueue) Requeue

func (q *InmemQueue) Requeue(id string, silent bool) (*QueueMessage, error)

Requeue implements IQueue.Requeue

func (*InmemQueue) Take

func (q *InmemQueue) Take() (*QueueMessage, error)

Take implements IQueue.Take

type QueueMessage

type QueueMessage struct {
	Id             string    `json:"id"`           // message's unique id
	Timestamp      time.Time `json:"time"`         // message's creation timestamp
	QueueTimestamp time.Time `json:"qtime"`        // message's last-queued timestamp, maintained by queue implementation
	TakenTimestamp time.Time `json:"ttime"`        // message's taken timestamp, maintained by queue implementation
	NumRequeues    int       `json:"num_requeues"` // how many times message has been re-queued?, maintained by queue implementations
	Payload        []byte    `json:"payload"`      // message's payload
}

QueueMessage represents a queue message.

func CloneQueueMessage

func CloneQueueMessage(msg QueueMessage) QueueMessage

CloneQueueMessage clones a QueueMessage instance

func NewQueueMessage

func NewQueueMessage(payload []byte) *QueueMessage

NewQueueMessage creates a new QueueMessage instance with provided payload

Directories

Path Synopsis
Package leveldb contains queue implementation using LevelDB as backend storage.
Package leveldb contains queue implementation using LevelDB as backend storage.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL