goqueuelite

package module
v0.0.0-...-288dba7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 26, 2024 License: MIT Imports: 8 Imported by: 0

README

GoQueueLite

goqueuelito Tiny little queue on top of SQLite written in Go.

sQueueLite is a simplistic, SQLite backed job embedded queue library for Go applications. It provides an easy way to manage and process background jobs, facilitating the scheduling and processing of tasks in an organized and efficient manner.

Warning Is still in heavy development and the API is not finalized yet and might change any moment.

Features

Installation

go get github.com/risico/goqueuelite

Usage

package main

import "github.com/risico/goqueuelite"

func main() {
	params := goqueuelite.Params{
		DatabasePath: "queue.db",
		AutoVacuum:   true,
		AutoPrune:    true,
	}
	queue, err := goqueuelite.New(params)
	if err != nil {
		panic(err)
	}
	defer queue.Close()
}

Enqueuing a Job

data := "Your job data here"
params := goqueuelite.EnqueueParams{
    Namespace: "test_namespace",
    ScheduleAfter: time.Now().Add(1 * time.Hour),
    TTL: 2 * time.Hour,
}
id, err := queue.Enqueue(data, params)
if err != nil {}

Dequeueing a Job

params := goqueuelite.DequeueParams{
    Namespace: "default",
}
message, err := queue.Dequeue(params)
if err != nil {}
fmt.Printf("Message(%+v) \n", message)

Message Payload

type Message struct {
	ID          int64
	Data        any
	Namespace   string
	Status      JobStatus
	Delay       uint64
	LockTime    int
	DoneTime    int
	Retries     int
	ScheduledAt int
	TTL         int
}

Subscribe to namespaces

ch, err = queue.Subscribe("default")
if err != nil { }

for {
    select {
        case mEvent, ok := <-ch:
            if !ok {
                return
            }

            m, err := queue.Lock(mEvent.MessageID)
            // do something with m
    }
}

Marking a Job as Done or Failed

err = queue.Done(messageID)
if err != nil {
	// handle error
}

err = queue.Fail(messageID)
if err != nil {
	// handle error
}

Retrying a job

err = queue.Retry(messageID)
if err != nil {
	// handle error
}

Getting the size of the queue

size, err := queue.Size()

Checking if the queue is empty

isEmpty, err := queue.Empty()

Manual Pruning and Vacuuming

If you have disabled AutoPrune and AutoVacuum, you can manually prune and vacuum the database.

queue.Prune()
queue.Vacuum()

To CGO or to Not CGO

CGO is required and enabled by default as the package makes use of github.com/mattn/go-sqlite3 but if you require cross-compilation and don't want to bother with
CGO you can set the -tags nocgo (alongside CGO_ENABLED=0) and it will switch to using modernc.org/sqlite which does not require CGO.

go build . -tags nocgo

Contributing

Feel free to contribute to this project by opening issues or submitting pull requests for bug fixes or features.

License

This project is licensed under the MIT License.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type DequeueParams

type DequeueParams struct {
	// Namespace is the namespace to dequeue from
	Namespace string
}

func (DequeueParams) Defaults

func (p DequeueParams) Defaults() DequeueParams

type EnqueueParams

type EnqueueParams struct {
	// Namespace is the namespace to enqueue the job to
	Namespace string

	// ScheduleAfter is the number of seconds to wait before making the job available
	// for consumption
	ScheduleAfter time.Duration

	// TTL is the number of seconds to keep the job around available for consumption
	TTL time.Duration
}

EnqueueParams are passed into the Queue.Enqueue method

func (EnqueueParams) Defaults

func (p EnqueueParams) Defaults() (EnqueueParams, error)

Defaults sets the default values for the EnqueueParams

type EnqueuedMessageEvent

type EnqueuedMessageEvent struct {
	MessageID int64
	Namespace string
}

type JobStatus

type JobStatus int
const (
	JobStatusReady JobStatus = iota
	JobStatusLocked
	JobStatusDone
	JobStatusFailed
)

type Message

type Message struct {
	ID int64
	// Data holds the data for this job
	Data        any
	Namespace   string
	Status      JobStatus
	Delay       uint64
	LockTime    int
	DoneTime    int
	Retries     int
	ScheduledAt int
	TTL         int
}

type MessagesCh

type MessagesCh chan EnqueuedMessageEvent

type Params

type Params struct {
	// DB is the main link to the database, you can either pass this from outside
	// or if left nil it will try to create it
	DB *sql.DB

	Clock clock.Clock

	// DatabasePath is the path where the database sits (if no sql.DB is being passed)
	DatabasePath string

	// AutoVacuum automatically handles vaccuming the db, if this is not
	// enabled you will have to take care of it by manually calling Queue.Vacuum
	AutoVacuum         bool
	AutoVacuumInterval time.Duration

	// AutoPrune deletes completed jobs
	AutoPrune         bool
	AutoPruneInterval time.Duration

	// DefaultTTL is the default time to live for a job
	DefaultTTL time.Duration
}

Params are passed into the Queue and accept external user input

func (Params) Defaults

func (q Params) Defaults() (Params, error)

Defaults sets the default values for the Params

type Queue

type Queue interface {
	// EnqueueWithParams adds a new job to the Queue with custom parameters
	Enqueue(data any, params EnqueueParams) (int64, error)
	// Dequeue returns the next job in the Queue
	Dequeue(params DequeueParams) (*Message, error)
	// Done marks the job as done
	Done(id int64) error
	// Fail marks the job as failed
	Fail(id int64) error
	// Retry marks the message as ready to be consumed again
	Retry(id int64) error
	// Size returns the size of the queue
	Size() (int, error)
	// Lock provides direct access to lock the message.
	// This is used mostly by the subscription mechanism.
	Lock(messageID int64) (*Message, error)
	// Subscribe returns a channel that will receive messages as they are enqueued
	// this provides a simple way to implement pub/sub.
	// Note that the jobs are not consumed from the queue, they are just sent to the
	// channel as they are enqueued and if work needs to happen on them you'd have to lock
	// them using the Lock(id) method.
	Subscribe(namespace string) (MessagesCh, error)
	// Prune deletes completed jobs
	Prune() error
	// Close clears the auto matically clean system and db file handles
	Close() error
}

Queue describes the main interface of the queue system

func New

func New(params Params) (Queue, error)

New creates a new Queue

type SqliteQueue

type SqliteQueue struct {
	// contains filtered or unexported fields
}

func (*SqliteQueue) Close

func (q *SqliteQueue) Close() error

func (*SqliteQueue) Dequeue

func (q *SqliteQueue) Dequeue(params DequeueParams) (*Message, error)

Dequeue

func (*SqliteQueue) Done

func (q *SqliteQueue) Done(id int64) error

Done marks the job as done

func (*SqliteQueue) Enqueue

func (q *SqliteQueue) Enqueue(data any, params EnqueueParams) (int64, error)

Enqueue adds a new job to the Queue

func (*SqliteQueue) Fail

func (q *SqliteQueue) Fail(id int64) error

Fail marks the job as failed

func (*SqliteQueue) Lock

func (q *SqliteQueue) Lock(messageID int64) (*Message, error)

func (*SqliteQueue) Prune

func (q *SqliteQueue) Prune() error

func (*SqliteQueue) Retry

func (q *SqliteQueue) Retry(id int64) error

Retry marks the message as ready to be consumed again

func (*SqliteQueue) Size

func (q *SqliteQueue) Size() (int, error)

Retry marks the message as ready to be consumed again

func (*SqliteQueue) Subscribe

func (s *SqliteQueue) Subscribe(namespace string) (MessagesCh, error)

func (*SqliteQueue) Vacuum

func (q *SqliteQueue) Vacuum() error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL