sqlq

package module
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 9, 2021 License: MIT Imports: 8 Imported by: 0

README

SQLQ

SQLQ (read as "sql-queue") is a dead simple SQLite3 based job queue library/tool.

Note: This is mostly meant for simple single node batch job processing use-cases and not meant to be used as a distributed job queue system.

Usage

import "github.com/spy16/sqlq"

func main() {
    // connect to sqlite3.
    q, err := sqlq.Open("my-queue.db", nil)
    if err != nil {
        panic(err)
    }
    defer q.Close()


    // enqueue items on the queue.
    // this can be exposed as http api or something.
    _ = q.Push(ctx, Item{
        ID: "job1",
        Type:"job-category",
        Payload:"arbitrary data for executing job",
    })

    // run the poll-excute loop
    log.Fatalf("exited: %v", q.Run(ctx, myExecutorFunc))
}

func myExecutorFunc(ctx context.Context, item sqlq.Item) error {
    // do your thing

    return nil
    // return sqlq.ErrFail to fail immediately
    // return sqlq.ErrSkip to skip this item
    // return any other error to signal retry.
}

Documentation

Index

Constants

View Source
const (
	StatusDone    = "DONE"    // fn finished successfully.
	StatusFailed  = "FAILED"  // all attempts failed or fn returned ErrFail.
	StatusPending = "PENDING" // attempts are still remaining.
	StatusSkipped = "SKIPPED" // fn returned ErrSkip
)

Status values an item on the queue can have.

Variables

View Source
var (
	// ErrSkip can be returned by ApplyFn to indicate that the queued item
	// be skipped immediately.
	ErrSkip = errors.New("skip")

	// ErrFail can be returned by ApplyFn to indicate no further retries
	// should be attempted.
	ErrFail = errors.New("failed")
)

Functions

This section is empty.

Types

type ApplyFn

type ApplyFn func(ctx context.Context, item Item) error

ApplyFn is invoked by the queue instance when an item is available for execution.

type Item

type Item struct {
	ID          string `json:"id"`
	Type        string `json:"type"`
	Payload     string `json:"payload"`
	Attempt     int    `json:"attempt"`
	MaxAttempts int    `json:"max_attempts"`
}

Item represents an item on the queue.

type Options

type Options struct {
	PollInt      time.Duration
	FnTimeout    time.Duration
	MaxAttempts  int
	RetryBackoff time.Duration
}

Options represents optional queue configurations.

type SQLQueue

type SQLQueue struct {
	// contains filtered or unexported fields
}

SQLQueue implements a simple disk-backed queue using SQLite3 database. A single table is used to store the queue items with their insertion timestamp defining the execution order.

func Open

func Open(sqlFile string, opts *Options) (*SQLQueue, error)

Open opens a SQLite3 DB file and returns the queue instance based on it. If the `queue` table is not present, it will be created.

func (*SQLQueue) Push

func (q *SQLQueue) Push(ctx context.Context, items ...Item) error

Push enqueues all items into the queue with pending status.

func (*SQLQueue) Run

func (q *SQLQueue) Run(ctx context.Context, fn ApplyFn) error

Run starts the worker loop that fetches next item from the queue and applies the given func. Runs until context is cancelled. fn can return nil, ErrFail, ErrSkipped to move to DONE, FAILED or SKIPPED terminal statuses directly. If fn returns any other error, it will remain in PENDING state and will be retried after sometime.

func (*SQLQueue) Stats

func (q *SQLQueue) Stats() ([]Stats, error)

Stats returns entire queue statistics broken down by type.

func (*SQLQueue) String

func (q *SQLQueue) String() string

type Stats

type Stats struct {
	Type    string `json:"type"`
	Total   int    `json:"total"`
	Done    int    `json:"done"`
	Pending int    `json:"pending"`
	Failed  int    `json:"failed"`
	Skipped int    `json:"skipped"`
}

Stats represents queue status break down by type.

Directories

Path Synopsis
cmd

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL