queue

package
v0.12.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 29, 2022 License: BSD-3-Clause, MIT, MIT-0, + 1 more Imports: 8 Imported by: 0

README

queue - same-process durable message queue

go get -u "tawesoft.co.uk/go"
import "tawesoft.co.uk/go/queue"
Links License Stable?
homedocssrc MIT candidate

About

Package queue implements simple, durable/ACID, same-process message queues with best-effort ordering by priority and/or time.

Examples

See examples folder.

Changes

2021-07-06
  • The Queue RetryItem method now takes an Attempt parameter.

  • Calling the Delete() method on a Queue now attempts to avoid deleting an in-memory database opened as ":memory:".

Getting Help

This package is part of tawesoft.co.uk/go, a monorepo for small Go modules maintained by Tawesoft®. Check out that URL for more information about other Go modules from Tawesoft plus community and commercial support options.

Documentation

Overview

Package queue implements simple, durable/ACID, same-process message queues with best-effort ordering by priority and/or time.

Examples

See examples folder.

Package Information

License: MIT (see LICENSE.txt)

Stable: candidate

For more information, documentation, source code, examples, support, links, etc. please see https://www.tawesoft.co.uk/go and https://www.tawesoft.co.uk/go/queue

2021-07-06

    * The Queue RetryItem method now takes an Attempt parameter.

    * Calling the Delete() method on a Queue now attempts to avoid deleting
      an in-memory database opened as ":memory:".

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Item

type Item struct {
	// ID uniquely identifies the item in a single queue
	ID ItemID

	// Priority orders items in the queue so that due items with a higher
	// priority come before other due items, even if they were due sooner.
	Priority int // default 0, limit +/- math.MaxInt16

	// Message is any user-supplied string of bytes
	Message string

	// Attempt records how many times the given item has been unsuccessfully
	// processed and put back in the queue
	Attempt int

	// Created is the time the item was first added to the queue
	// (use `time.Now().UTC()`)
	Created time.Time // UTC

	// RetryAfter is the earliest time the queue will attempt to process the
	// item
	RetryAfter time.Time // UTC
}

func (Item) String

func (i Item) String() string

type ItemID

type ItemID int64

type NewItem

type NewItem struct {
	Message    string
	Priority   int
	Created    time.Time
	RetryAfter time.Time
}

type Queue

type Queue interface {
	// CreateItem places a new Item in the queue
	CreateItem(item NewItem) error

	// PeekItems returns up to `n` items with a priority >= `minPriority`,
	// and with a due time >= `due`. Returned items are ordered by (highest
	// priority, earliest due, earliest created, lowest ID). Items with IDs
	// in `excluding` (which may be nil or empty) are not included.
	PeekItems(n int, minPriority int, due time.Time, excluding []ItemID) ([]Item, error)

	// RetryItem reorders an item in the queue at a later `due` time and a
	// given priority. Also updates its attempt number (e.g. on the case of
	// temporary failure, this will be set to the item's Attempt field plus
	// one. In other cases, such as rescheduling to a better time, it might
	// be kept at the current Attempt value)
	RetryItem(item ItemID, priority int, due time.Time, attempt int) error

	// DeleteItem removes an item from the queue.
	DeleteItem(ItemID) error

	// Close any resources such as database handles.
	Close() error

	// Delete removes a queue database from disk. If you've opened an in-memory
	// database then don't try deleting it!
	Delete() error
}

type QueueService

type QueueService interface {
	// OpenQueue opens (or creates) a new queue with a given name, backed
	// by a file at the given path (the SQLite backend also supports
	// ":memory:" as a target path)
	OpenQueue(name string, path string) (Queue, error)

	// Close any resources such as database handles. You should individually
	// close any open queues first.
	Close() error
}

QueueService defines an interface for the creation of queues.

One such implementation is the NewQueueSqliteService which provides a reliable persistent queues backed by SQLite databases.

func NewQueueSqliteService

func NewQueueSqliteService() (QueueService, error)

NewQueueSqliteService creates a new QueueService implemented by a SQLite backend that persists queues to individual database files.

The SQLite backend may place a limit on the number of attached queue databases per connection (default 7).

SQLite is used in SecureDelete mode so that deleted items are overwritten by zeros on disk to protect possibly sensitive data.

A queue databases is VACUUMed when first attached by OpenQueue

type UUIDService

type UUIDService interface {
	Generate() ([]byte, error)
}

Directories

Path Synopsis
examples
simple
Simple example of creating queues, adding items, peeking at due items.
Simple example of creating queues, adding items, peeking at due items.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL