work

package module
v0.0.0-...-46c4299 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 10, 2022 License: LGPL-3.0 Imports: 11 Imported by: 54

README

dowork godoc builds.sr.ht status

dowork is a general purpose task queueing system for Go programs. It queues, executes, and reschedules tasks in a Goroutine in-process.

A global task queue is provided for simple use-cases. To use it:

import (
  "context"

  "git.sr.ht/~sircmpwn/dowork"
)

work.Submit(func(ctx context.Context) error {
    // ...do work...
    return nil
})

This task will be executed in the background. The first time a task is submitted to the global queue, it will be initialized and start running in the background.

To customize options like maximum retries and timeouts, use work.Enqueue:

task := work.NewTask(func(ctx context.Context) error {
    // ...
}).Retries(5).MaxTimeout(10 * time.Minute)
work.Enqueue(task)
task := work.NewTask(func(ctx context.Context) error {
    // ...
}).
    Retries(5).                   // Maximum number of attempts
    MaxTimeout(10 * time.Minute). // Maximum timeout between attempts
    Within(10 * time.Second).     // Deadline for each attempt
    After(func(ctx context.Context, task *work.Task) {
        // Executed once the task completes, successful or not
    })
work.Enqueue(task)

Retries are conducted with an exponential backoff.

You may also manage your own work queues. Use NewQueue() to obtain a queue, (*Queue).Dispatch() to execute all overdue tasks, and (*Queue).Start() to spin up a goroutine and start dispatching tasks automatically.

Use work.Shutdown() or (*Queue).Shutdown() to perform a soft shutdown of the queue, which will stop accepting new tasks and block until all already-queued tasks complete.

Distributed task queues

No such functionality is provided OOTB, but you may find this package useful in doing the actual queueing work for your own distributed work queue. Such an integeration is left as an exercise to the reader.

Instrumentation

Instrumentation is provided via Prometheus and the client_golang library. Relevant metrics are prefixed with queue_ in the metric name.

A common pattern for soft restarting web servers is to shut down the http.Server, allowing the new web server process to start up and begin accepting new connections, then allow the queue to finish executing any pending tasks before terminating the process. If this describes your program, note that you may want to provide Prometheus metrics on a secondary http.Server on a random port, so that you may monitor the queue shutdown. Something similar to the following will set up a secondary HTTP server for this purpose:

import (
    "log"
    "net"
    "net/http"

    "github.com/prometheus/client_golang/prometheus/promhttp"
)

mux := &http.ServeMux{}
mux.Handle("/metrics", promhttp.Handler())
server := &http.Server{Handler: mux}
listen, err := net.Listen("tcp", ":0")
if err != nil {
    panic(err)
}
log.Printf("Prometheus listening on :%d", listen.Addr().(*net.TCPAddr).Port)
go server.Serve(listen)

Documentation

Overview

dowork is a generic task queueing system for Go programs. It queues, executes, and reschedules tasks in Goroutine in-process.

A global task queue is provided for simple use-cases. To use it:

import (
	"context"

	"git.sr.ht/~sircmpwn/dowork"
)

work.Submit(func(ctx context.Context) error {
	// ...do work...
	return nil
})

This task will be executed in the background. The first time a task is submitted to the global queue, it will be initialized and start running in the background.

To customize options like maximum retries and timeouts, use work.Enqueue:

task := work.NewTask(func(ctx context.Context) error {
	// ...
}).
	Retries(5).			// Maximum number of attempts
	MaxTimeout(10 * time.Minute).	// Maximum timeout between attempts
	Within(10 * time.Second).	// Deadline for each attempt
	After(func(ctx context.Context, task *work.Task) {
		// Executed once the task completes, successful or not
	})
work.Enqueue(task)

Retries are conducted with an exponential backoff.

You may also manage your own work queues. Use NewQueue() to obtain a queue, (*Queue).Dispatch() to execute all overdue tasks, and (*Queue).Start() to spin up a goroutine and start dispatching tasks automatically.

Use work.Shutdown() or (*Queue).Shutdown() to perform a soft shutdown of the queue, which will stop accepting new tasks and block until all already-queued tasks complete.

Index

Constants

This section is empty.

Variables

View Source
var (
	// Returned when a task is attempted which was already successfully completed.
	ErrAlreadyComplete = errors.New("This task was already successfully completed once")

	// If this is returned from a task function, the task shall not be re-attempted.
	ErrDoNotReattempt = errors.New("This task should not be re-attempted")

	// This task has been attempted too many times.
	ErrMaxRetriesExceeded = errors.New("The maximum retries for this task has been exceeded")

	// Set this function to influence the clock that will be used for
	// scheduling re-attempts.
	Now = func() time.Time {
		return time.Now().UTC()
	}
)
View Source
var (
	ErrQueueShuttingDown = errors.New("Queue is shutting down; new tasks are not being accepted")
)

Functions

func Enqueue

func Enqueue(t *Task)

Enqueues a task in the global queue.

func Join

func Join(queues ...*Queue)

Shuts down any number of work queues in parallel and blocks until they're all finished.

func Shutdown

func Shutdown()

Stops accepting new tasks and blocks until all queued tasks are completed.

func Start

func Start()

Ensures that the global queue is started

Types

type Queue

type Queue struct {
	// contains filtered or unexported fields
}

func NewQueue

func NewQueue(name string) *Queue

Creates a new task queue. The name of the task queue is used in Prometheus label names and must match [a-zA-Z0-9:_] (snake case is used by convention).

func (*Queue) Dispatch

func (q *Queue) Dispatch(ctx context.Context) bool

Attempts any tasks which are due and updates the task schedule. Returns true if there is more work to do.

func (*Queue) Enqueue

func (q *Queue) Enqueue(t *Task) error

Enqueues a task.

An error will only be returned if the queue has been shut down.

func (*Queue) Name

func (q *Queue) Name() string

Name returns the name value of the given Queue

func (*Queue) Now

func (q *Queue) Now(now func() time.Time)

Sets the function the queue will use to obtain the current time.

func (*Queue) Run

func (q *Queue) Run(ctx context.Context)

Runs the task queue. Blocks until the context is cancelled.

func (*Queue) Shutdown

func (q *Queue) Shutdown()

Stops accepting new tasks and blocks until all already-queued tasks are complete. The queue must have been started with Start, not Run.

func (*Queue) Start

func (q *Queue) Start(ctx context.Context)

Starts the task queue in the background. If you wish to use the warm shutdown feature, you must use Start, not Run.

func (*Queue) Submit

func (q *Queue) Submit(fn TaskFunc) (*Task, error)

Creates and enqueues a new task, returning the new task. Note that the caller cannot customize settings on the task without creating a race condition; so attempting to will panic. See NewTask and (*Queue).Enqueue to create tasks with customized options.

An error will only be returned if the queue has been shut down.

type Task

type Task struct {
	Metadata map[string]interface{}
	// contains filtered or unexported fields
}

Stores state for a task which shall be or has been executed. Each task may only be executed successfully once.

func NewTask

func NewTask(fn TaskFunc) *Task

Creates a new task for a given function.

func Submit

func Submit(fn func(ctx context.Context) error) (*Task, error)

See (*Queue).Submit

func (*Task) After

func (t *Task) After(fn func(ctx context.Context, task *Task)) *Task

Sets a function which will be executed once the task is completed, successfully or not. The final result (nil or an error) is passed to the callee.

func (*Task) Attempt

func (t *Task) Attempt(ctx context.Context) (time.Time, error)

Attempts to execute this task.

If successful, the zero time and nil are returned.

Otherwise, the error returned from the task function is returned to the caller. If an error is returned for which errors.Is(err, ErrDoNotReattempt) is true, the caller should not call Attempt again.

func (*Task) Attempts

func (t *Task) Attempts() int

Returns the number of times this task has been attempted

func (*Task) Before

func (t *Task) Before(fn func(ctx context.Context, task *Task)) *Task

Before Sets a function which will be executed before a task is attempted to run. This will be called before every attempt, including retries.

func (*Task) Done

func (t *Task) Done() bool

Returns true if this task was completed, successfully or not.

func (*Task) MaxTimeout

func (t *Task) MaxTimeout(d time.Duration) *Task

Sets the maximum timeout between retries, or zero to exponentially increase the timeout indefinitely. Defaults to 30 minutes.

func (*Task) NextAttempt

func (t *Task) NextAttempt() time.Time

Returns the time the next attempt is scheduled for, or the zero value if it has not been attempted before.

func (*Task) NoJitter

func (t *Task) NoJitter() *Task

Specifies that randomness should not be introduced into the exponential backoff algorithm.

func (*Task) NotBefore

func (t *Task) NotBefore(date time.Time) *Task

Specifies the earliest possible time of the first execution.

func (*Task) Result

func (t *Task) Result() error

Returns the result of the task. The task must have been completed for this to be valid.

func (*Task) Retries

func (t *Task) Retries(n int) *Task

Set the maximum number of retries on failure, or -1 to attempt indefinitely. By default, tasks are not retried on failure.

func (*Task) Within

func (t *Task) Within(deadline time.Duration) *Task

Specifies an upper limit for the duration of each attempt.

type TaskFunc

type TaskFunc func(ctx context.Context) error

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL