asynq

package module
v0.12.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 20, 2020 License: MIT Imports: 24 Imported by: 0

README

Asynq

Build Status Go Report Card GoDoc Gitter chat codecov

Overview

Asynq is a Go library for queueing tasks and processing them asynchronously with workers. It's backed by Redis and is designed to be scalable yet easy to get started.

Highlevel overview of how Asynq works:

  • Client puts task on a queue
  • Server pulls task off queues and starts a worker goroutine for each task
  • Tasks are processed concurrently by multiple workers

Task queues are used as a mechanism to distribute work across multiple machines.
A system can consist of multiple worker servers and brokers, giving way to high availability and horizontal scaling.

Task Queue Diagram

Stability and Compatibility

Important Note: Current major version is zero (v0.x.x) to accomodate rapid development and fast iteration while getting early feedback from users (Feedback on APIs are appreciated!). The public API could change without a major version update before v1.0.0 release.

Status: The library is currently undergoing heavy development with frequent, breaking API changes.

Features

Quickstart

First, make sure you are running a Redis server locally.

$ redis-server

Next, write a package that encapsulates task creation and task handling.

package tasks

import (
    "fmt"

    "github.com/itsursujit/asynq"
)

// A list of task types.
const (
    TypeEmailDelivery   = "email:deliver"
    TypeImageResize     = "image:resize"
)

//----------------------------------------------
// Write a function NewXXXTask to create a task.
// A task consists of a type and a payload.
//----------------------------------------------

func NewEmailDeliveryTask(userID int, tmplID string) *asynq.Task {
    payload := map[string]interface{}{"user_id": userID, "template_id": tmplID}
    return asynq.NewTask(TypeEmailDelivery, payload)
}

func NewImageResizeTask(src string) *asynq.Task {
    payload := map[string]interface{}{"src": src}
    return asynq.NewTask(TypeImageResize, payload)
}

//---------------------------------------------------------------
// Write a function HandleXXXTask to handle the input task.
// Note that it satisfies the asynq.HandlerFunc interface.
//
// Handler doesn't need to be a function. You can define a type
// that satisfies asynq.Handler interface. See examples below.
//---------------------------------------------------------------

func HandleEmailDeliveryTask(ctx context.Context, t *asynq.Task) error {
    userID, err := t.Payload.GetInt("user_id")
    if err != nil {
        return err
    }
    tmplID, err := t.Payload.GetString("template_id")
    if err != nil {
        return err
    }
    fmt.Printf("Send Email to User: user_id = %d, template_id = %s\n", userID, tmplID)
    // Email delivery code ...
    return nil
}

// ImageProcessor implements asynq.Handler interface.
type ImageProcesser struct {
    // ... fields for struct
}

func (p *ImageProcessor) ProcessTask(ctx context.Context, t *asynq.Task) error {
    src, err := t.Payload.GetString("src")
    if err != nil {
        return err
    }
    fmt.Printf("Resize image: src = %s\n", src)
    // Image resizing code ...
    return nil
}

func NewImageProcessor() *ImageProcessor {
    // ... return an instance
}

In your application code, import the above package and use Client to put tasks on the queue.

package main

import (
    "time"

    "github.com/itsursujit/asynq"
    "your/app/package/tasks"
)

const redisAddr = "127.0.0.1:6379"

func main() {
    r := asynq.RedisClientOpt{Addr: redisAddr}
    c := asynq.NewClient(r)
    defer c.Close()

    // ------------------------------------------------------
    // Example 1: Enqueue task to be processed immediately.
    //            Use (*Client).Enqueue method.
    // ------------------------------------------------------

    t := tasks.NewEmailDeliveryTask(42, "some:template:id")
    res, err := c.Enqueue(t)
    if err != nil {
        log.Fatal("could not enqueue task: %v", err)
    }
    fmt.Printf("Enqueued Result: %+v\n", res)


    // ------------------------------------------------------------
    // Example 2: Schedule task to be processed in the future.
    //            Use ProcessIn or ProcessAt option.
    // ------------------------------------------------------------

    t = tasks.NewEmailDeliveryTask(42, "other:template:id")
    res, err = c.Enqueue(t, asynq.ProcessIn(24*time.Hour))
    if err != nil {
        log.Fatal("could not schedule task: %v", err)
    }
    fmt.Printf("Enqueued Result: %+v\n", res)


    // ----------------------------------------------------------------------------
    // Example 3: Set other options to tune task processing behavior.
    //            Options include MaxRetry, Queue, Timeout, Deadline, Unique etc.
    // ----------------------------------------------------------------------------

    c.SetDefaultOptions(tasks.ImageProcessing, asynq.MaxRetry(10), asynq.Timeout(3*time.Minute))

    t = tasks.NewImageResizeTask("some/blobstore/path")
    res, err = c.Enqueue(t)
    if err != nil {
        log.Fatal("could not enqueue task: %v", err)
    }
    fmt.Printf("Enqueued Result: %+v\n", res)

    // ---------------------------------------------------------------------------
    // Example 4: Pass options to tune task processing behavior at enqueue time.
    //            Options passed at enqueue time override default ones, if any.
    // ---------------------------------------------------------------------------

    t = tasks.NewImageResizeTask("some/blobstore/path")
    res, err = c.Enqueue(t, asynq.Queue("critical"), asynq.Timeout(30*time.Second))
    if err != nil {
        log.Fatal("could not enqueue task: %v", err)
    }
    fmt.Printf("Enqueued Result: %+v\n", res)
}

Next, start a worker server to process these tasks in the background.
To start the background workers, use Server and provide your Handler to process the tasks.

You can optionally use ServeMux to create a handler, just as you would with "net/http" Handler.

package main

import (
    "log"

    "github.com/itsursujit/asynq"
    "your/app/package/tasks"
)

const redisAddr = "127.0.0.1:6379"

func main() {
    r := asynq.RedisClientOpt{Addr: redisAddr}

    srv := asynq.NewServer(r, asynq.Config{
        // Specify how many concurrent workers to use
        Concurrency: 10,
        // Optionally specify multiple queues with different priority.
        Queues: map[string]int{
            "critical": 6,
            "default":  3,
            "low":      1,
        },
        // See the godoc for other configuration options
    })

    // mux maps a type to a handler
    mux := asynq.NewServeMux()
    mux.HandleFunc(tasks.TypeEmailDelivery, tasks.HandleEmailDeliveryTask)
    mux.Handle(tasks.TypeImageResize, tasks.NewImageProcessor())
    // ...register other handlers...

    if err := srv.Run(mux); err != nil {
        log.Fatalf("could not run server: %v", err)
    }
}

For a more detailed walk-through of the library, see our Getting Started Guide.

To Learn more about asynq features and APIs, see our Wiki and godoc.

Command Line Tool

Asynq ships with a command line tool to inspect the state of queues and tasks.

Here's an example of running the stats command.

Gif

For details on how to use the tool, refer to the tool's README.

Installation

To install asynq library, run the following command:

go get -u github.com/itsursujit/asynq

To install the CLI tool, run the following command:

go get -u github.com/itsursujit/asynq/tools/asynq

Requirements

Dependency Version
Redis v3.0+
Go v1.13+

Contributing

We are open to, and grateful for, any contributions (Github issues/pull-requests, feedback on Gitter channel, etc) made by the community. Please see the Contribution Guide before contributing.

Acknowledgements

  • Sidekiq : Many of the design ideas are taken from sidekiq and its Web UI
  • RQ : Client APIs are inspired by rq library.
  • Cobra : Asynq CLI is built with cobra

License

Asynq is released under the MIT license. See LICENSE.

Documentation

Overview

Package asynq provides a framework for Redis based distrubted task queue.

Asynq uses Redis as a message broker. To connect to redis, specify the connection using one of RedisConnOpt types.

redisConnOpt = asynq.RedisClientOpt{
    Addr:     "127.0.0.1:6379",
    Password: "xxxxx",
    DB:       3,
}

The Client is used to enqueue a task.

client := asynq.NewClient(redisConnOpt)

// Task is created with two parameters: its type and payload.
t := asynq.NewTask(
    "send_email",
    map[string]interface{}{"user_id": 42})

// Enqueue the task to be processed immediately.
res, err := client.Enqueue(t)

// Schedule the task to be processed after one minute.
res, err = client.Enqueue(t, asynq.ProcessIn(1*time.Minute))

The Server is used to run the task processing workers with a given handler.

srv := asynq.NewServer(redisConnOpt, asynq.Config{
    Concurrency: 10,
})

if err := srv.Run(handler); err != nil {
    log.Fatal(err)
}

Handler is an interface type with a method which takes a task and returns an error. Handler should return nil if the processing is successful, otherwise return a non-nil error. If handler panics or returns a non-nil error, the task will be retried in the future.

Example of a type that implements the Handler interface.

type TaskHandler struct {
    // ...
}

func (h *TaskHandler) ProcessTask(ctx context.Context, task *asynq.Task) error {
    switch task.Type {
    case "send_email":
        id, err := task.Payload.GetInt("user_id")
        // send email
    //...
    default:
        return fmt.Errorf("unexpected task type %q", task.Type)
    }
    return nil
}

Index

Examples

Constants

This section is empty.

Variables

View Source
var ErrDuplicateTask = errors.New("task already exists")

ErrDuplicateTask indicates that the given task could not be enqueued since it's a duplicate of another task.

ErrDuplicateTask error only applies to tasks enqueued with a Unique option.

View Source
var ErrServerStopped = errors.New("asynq: the server has been stopped")

ErrServerStopped indicates that the operation is now illegal because of the server being stopped.

Functions

func GetMaxRetry

func GetMaxRetry(ctx context.Context) (n int, ok bool)

GetMaxRetry extracts maximum retry from a context, if any.

Return value n indicates the maximum number of times the assoicated task can be retried if ProcessTask returns a non-nil error.

func GetQueueName

func GetQueueName(ctx context.Context) (qname string, ok bool)

GetQueueName extracts queue name from a context, if any.

Return value qname indicates which queue the task was pulled from.

func GetRetryCount

func GetRetryCount(ctx context.Context) (n int, ok bool)

GetRetryCount extracts retry count from a context, if any.

Return value n indicates the number of times associated task has been retried so far.

func GetTaskID

func GetTaskID(ctx context.Context) (id string, ok bool)

GetTaskID extracts a task ID from a context, if any.

ID of a task is guaranteed to be unique. ID of a task doesn't change if the task is being retried.

func NotFound

func NotFound(ctx context.Context, task *Task) error

NotFound returns an error indicating that the handler was not found for the given task.

Types

type ActiveTask

type ActiveTask struct {
	*Task
	ID    string
	Queue string
}

ActiveTask is a task that's currently being processed.

type Client

type Client struct {
	// contains filtered or unexported fields
}

A Client is responsible for scheduling tasks.

A Client is used to register tasks that should be processed immediately or some time in the future.

Clients are safe for concurrent use by multiple goroutines.

func NewClient

func NewClient(r RedisConnOpt) *Client

NewClient and returns a new Client given a redis connection option.

func (*Client) Close

func (c *Client) Close() error

Close closes the connection with redis.

func (*Client) Enqueue

func (c *Client) Enqueue(task *Task, opts ...Option) (*Result, error)

Enqueue enqueues the given task to be processed asynchronously.

Enqueue returns nil if the task is enqueued successfully, otherwise returns a non-nil error.

The argument opts specifies the behavior of task processing. If there are conflicting Option values the last one overrides others. By deafult, max retry is set to 25 and timeout is set to 30 minutes. If no ProcessAt or ProcessIn options are passed, the task will be processed immediately.

func (*Client) SetDefaultOptions

func (c *Client) SetDefaultOptions(taskType string, opts ...Option)

SetDefaultOptions sets options to be used for a given task type. The argument opts specifies the behavior of task processing. If there are conflicting Option values the last one overrides others.

Default options can be overridden by options passed at enqueue time.

type ClusterNode

type ClusterNode struct {
	// Node ID in the cluster.
	ID string

	// Address of the node.
	Addr string
}

ClusterNode describes a node in redis cluster.

type Config

type Config struct {
	// Maximum number of concurrent processing of tasks.
	//
	// If set to a zero or negative value, NewServer will overwrite the value
	// to the number of CPUs usable by the currennt process.
	Concurrency int

	Handler *ServeMux

	// Function to calculate retry delay for a failed task.
	//
	// By default, it uses exponential backoff algorithm to calculate the delay.
	//
	// n is the number of times the task has been retried.
	// e is the error returned by the task handler.
	// t is the task in question.
	RetryDelayFunc func(n int, e error, t *Task) time.Duration

	// List of queues to process with given priority value. Keys are the names of the
	// queues and values are associated priority value.
	//
	// If set to nil or not specified, the server will process only the "default" queue.
	//
	// Priority is treated as follows to avoid starving low priority queues.
	//
	// Example:
	//
	//     Queues: map[string]int{
	//         "critical": 6,
	//         "default":  3,
	//         "low":      1,
	//     }
	//
	// With the above config and given that all queues are not empty, the tasks
	// in "critical", "default", "low" should be processed 60%, 30%, 10% of
	// the time respectively.
	//
	// If a queue has a zero or negative priority value, the queue will be ignored.
	Queues map[string]int

	// StrictPriority indicates whether the queue priority should be treated strictly.
	//
	// If set to true, tasks in the queue with the highest priority is processed first.
	// The tasks in lower priority queues are processed only when those queues with
	// higher priorities are empty.
	StrictPriority bool

	// ErrorHandler handles errors returned by the task handler.
	//
	// HandleError is invoked only if the task handler returns a non-nil error.
	//
	// Example:
	//
	//     func reportError(ctx context, task *asynq.Task, err error) {
	//         retried, _ := asynq.GetRetryCount(ctx)
	//         maxRetry, _ := asynq.GetMaxRetry(ctx)
	//     	   if retried >= maxRetry {
	//             err = fmt.Errorf("retry exhausted for task %s: %w", task.Type, err)
	//     	   }
	//         errorReportingService.Notify(err)
	//     })
	//
	//     ErrorHandler: asynq.ErrorHandlerFunc(reportError)
	ErrorHandler ErrorHandler

	// Logger specifies the logger used by the server instance.
	//
	// If unset, default logger is used.
	Logger Logger

	// LogLevel specifies the minimum log level to enable.
	//
	// If unset, InfoLevel is used by default.
	LogLevel LogLevel

	// ShutdownTimeout specifies the duration to wait to let workers finish their tasks
	// before forcing them to abort when stopping the server.
	//
	// If unset or zero, default timeout of 8 seconds is used.
	ShutdownTimeout time.Duration

	// HealthCheckFunc is called periodically with any errors encountered during ping to the
	// connected redis server.
	HealthCheckFunc func(error)

	// HealthCheckInterval specifies the interval between healthchecks.
	//
	// If unset or zero, the interval is set to 15 seconds.
	HealthCheckInterval time.Duration
}

Config specifies the server's background-task processing behavior.

type DailyStats

type DailyStats struct {
	// Name of the queue.
	Queue string
	// Total number of tasks being processed during the given date.
	// The number includes both succeeded and failed tasks.
	Processed int
	// Total number of tasks failed to be processed during the given date.
	Failed int
	// Date this stats was taken.
	Date time.Time
}

DailyStats holds aggregate data for a given day for a given queue.

type DeadTask

type DeadTask struct {
	*Task
	ID           string
	Queue        string
	MaxRetry     int
	Retried      int
	LastFailedAt time.Time
	ErrorMsg     string
	// contains filtered or unexported fields
}

DeadTask is a task exhausted its retries. DeadTask won't be retried automatically.

func (*DeadTask) Key

func (t *DeadTask) Key() string

Key returns a key used to delete, run, and kill the task.

type ErrorHandler

type ErrorHandler interface {
	HandleError(ctx context.Context, task *Task, err error)
}

An ErrorHandler handles an error occured during task processing.

type ErrorHandlerFunc

type ErrorHandlerFunc func(ctx context.Context, task *Task, err error)

The ErrorHandlerFunc type is an adapter to allow the use of ordinary functions as a ErrorHandler. If f is a function with the appropriate signature, ErrorHandlerFunc(f) is a ErrorHandler that calls f.

func (ErrorHandlerFunc) HandleError

func (fn ErrorHandlerFunc) HandleError(ctx context.Context, task *Task, err error)

HandleError calls fn(ctx, task, err)

type Handler

type Handler interface {
	ProcessTask(context.Context, *Task) error
}

A Handler processes tasks.

ProcessTask should return nil if the processing of a task is successful.

If ProcessTask return a non-nil error or panics, the task will be retried after delay.

func NotFoundHandler

func NotFoundHandler() Handler

NotFoundHandler returns a simple task handler that returns a “not found“ error.

type HandlerFunc

type HandlerFunc func(context.Context, *Task) error

The HandlerFunc type is an adapter to allow the use of ordinary functions as a Handler. If f is a function with the appropriate signature, HandlerFunc(f) is a Handler that calls f.

func (HandlerFunc) ProcessTask

func (fn HandlerFunc) ProcessTask(ctx context.Context, task *Task) error

ProcessTask calls fn(ctx, task)

type Inspector

type Inspector struct {
	RDB *rdb.RDB
}

Inspector is a client interface to inspect and mutate the state of queues and tasks.

func NewInspector

func NewInspector(r RedisConnOpt) *Inspector

New returns a new instance of Inspector.

func (*Inspector) Close

func (i *Inspector) Close() error

Close closes the connection with redis.

func (*Inspector) ClusterKeySlot

func (i *Inspector) ClusterKeySlot(qname string) (int64, error)

ClusterKeySlot returns an integer identifying the hash slot the given queue hashes to.

func (*Inspector) ClusterNodes

func (i *Inspector) ClusterNodes(qname string) ([]ClusterNode, error)

ClusterNode returns a list of nodes the given queue belongs to.

func (*Inspector) CurrentStats

func (i *Inspector) CurrentStats(qname string) (*QueueStats, error)

CurrentStats returns a current stats of the given queue.

func (*Inspector) DeleteAllActiveTasks

func (i *Inspector) DeleteAllActiveTasks(qname string) (int, error)

func (*Inspector) DeleteAllDeadTasks

func (i *Inspector) DeleteAllDeadTasks(qname string) (int, error)

DeleteAllDeadTasks deletes all dead tasks from the specified queue, and reports the number tasks deleted.

func (*Inspector) DeleteAllPendingTasks

func (i *Inspector) DeleteAllPendingTasks(qname string) (int, error)

func (*Inspector) DeleteAllRetryTasks

func (i *Inspector) DeleteAllRetryTasks(qname string) (int, error)

DeleteAllRetryTasks deletes all retry tasks from the specified queue, and reports the number tasks deleted.

func (*Inspector) DeleteAllScheduledTasks

func (i *Inspector) DeleteAllScheduledTasks(qname string) (int, error)

DeleteAllScheduledTasks deletes all scheduled tasks from the specified queue, and reports the number tasks deleted.

func (*Inspector) DeleteTaskByKey

func (i *Inspector) DeleteTaskByKey(qname, key string) error

DeleteTaskByKey deletes a task with the given key from the given queue.

func (*Inspector) History

func (i *Inspector) History(qname string, n int) ([]*DailyStats, error)

History returns a list of stats from the last n days.

func (*Inspector) KillAllRetryTasks

func (i *Inspector) KillAllRetryTasks(qname string) (int, error)

KillAllRetryTasks kills all retry tasks within the given queue, and reports the number of tasks killed.

func (*Inspector) KillAllScheduledTasks

func (i *Inspector) KillAllScheduledTasks(qname string) (int, error)

KillAllScheduledTasks kills all scheduled tasks within the given queue, and reports the number of tasks killed.

func (*Inspector) KillTaskByKey

func (i *Inspector) KillTaskByKey(qname, key string) error

KillTaskByKey kills a task with the given key in the given queue.

func (*Inspector) ListActiveTasks

func (i *Inspector) ListActiveTasks(qname string, opts ...ListOption) ([]*ActiveTask, error)

ListActiveTasks retrieves active tasks from the specified queue.

By default, it retrieves the first 30 tasks.

func (*Inspector) ListActiveTasksByFilterPayload

func (i *Inspector) ListActiveTasksByFilterPayload(qname string, filters map[string]interface{}, opts ...ListOption) ([]*ActiveTask, error)

ListActiveTasks retrieves active tasks from the specified queue.

By default, it retrieves the first 30 tasks.

func (*Inspector) ListDeadTasks

func (i *Inspector) ListDeadTasks(qname string, opts ...ListOption) ([]*DeadTask, error)

ListDeadTasks retrieves dead tasks from the specified queue. Tasks are sorted by LastFailedAt field in descending order.

By default, it retrieves the first 30 tasks.

func (*Inspector) ListDeadTasksByFilterPayload

func (i *Inspector) ListDeadTasksByFilterPayload(qname string, filters map[string]interface{}, opts ...ListOption) ([]*DeadTask, error)

ListDeadTasks retrieves dead tasks from the specified queue. Tasks are sorted by LastFailedAt field in descending order.

By default, it retrieves the first 30 tasks.

func (*Inspector) ListPendingTasks

func (i *Inspector) ListPendingTasks(qname string, opts ...ListOption) ([]*PendingTask, error)

ListPendingTasks retrieves pending tasks from the specified queue.

By default, it retrieves the first 30 tasks.

func (*Inspector) ListPendingTasksByFilterPayload

func (i *Inspector) ListPendingTasksByFilterPayload(qname string, filters map[string]interface{}, opts ...ListOption) ([]*PendingTask, error)

ListPendingTasks retrieves pending tasks from the specified queue.

By default, it retrieves the first 30 tasks.

func (*Inspector) ListRetryTasks

func (i *Inspector) ListRetryTasks(qname string, opts ...ListOption) ([]*RetryTask, error)

ListRetryTasks retrieves retry tasks from the specified queue. Tasks are sorted by NextProcessAt field in ascending order.

By default, it retrieves the first 30 tasks.

func (*Inspector) ListRetryTasksByFilterPayload

func (i *Inspector) ListRetryTasksByFilterPayload(qname string, filters map[string]interface{}, opts ...ListOption) ([]*RetryTask, error)

ListRetryTasks retrieves retry tasks from the specified queue. Tasks are sorted by NextProcessAt field in ascending order.

By default, it retrieves the first 30 tasks.

func (*Inspector) ListScheduledTasks

func (i *Inspector) ListScheduledTasks(qname string, opts ...ListOption) ([]*ScheduledTask, error)

ListScheduledTasks retrieves scheduled tasks from the specified queue. Tasks are sorted by NextProcessAt field in ascending order.

By default, it retrieves the first 30 tasks.

func (*Inspector) ListScheduledTasksByFilterPayload

func (i *Inspector) ListScheduledTasksByFilterPayload(qname string, filters map[string]interface{}, opts ...ListOption) ([]*ScheduledTask, error)

ListScheduledTasks retrieves scheduled tasks from the specified queue. Tasks are sorted by NextProcessAt field in ascending order.

By default, it retrieves the first 30 tasks.

func (*Inspector) PauseQueue

func (i *Inspector) PauseQueue(qname string) error

PauseQueue pauses task processing on the specified queue. If the queue is already paused, it will return a non-nil error.

func (*Inspector) Queues

func (i *Inspector) Queues() ([]string, error)

Queues returns a list of all queue names.

func (*Inspector) RunAllDeadTasks

func (i *Inspector) RunAllDeadTasks(qname string) (int, error)

RunAllDeadTasks transition all dead tasks to pending state within the given queue, and reports the number of tasks transitioned.

func (*Inspector) RunAllRetryTasks

func (i *Inspector) RunAllRetryTasks(qname string) (int, error)

RunAllRetryTasks transition all retry tasks to pending state within the given queue, and reports the number of tasks transitioned.

func (*Inspector) RunAllScheduledTasks

func (i *Inspector) RunAllScheduledTasks(qname string) (int, error)

RunAllScheduledTasks transition all scheduled tasks to pending state within the given queue, and reports the number of tasks transitioned.

func (*Inspector) RunTaskByKey

func (i *Inspector) RunTaskByKey(qname, key string) error

RunTaskByKey transition a task to pending state given task key and queue name.

func (*Inspector) UnpauseQueue

func (i *Inspector) UnpauseQueue(qname string) error

UnpauseQueue resumes task processing on the specified queue. If the queue is not paused, it will return a non-nil error.

type ListOption

type ListOption interface{}

ListOption specifies behavior of list operation.

func Page

func Page(n int) ListOption

Page returns an option to specify the page number for list operation. The value 1 fetches the first page.

Negative page number is treated as one.

func PageSize

func PageSize(n int) ListOption

PageSize returns an option to specify the page size for list operation.

Negative page size is treated as zero.

type LogLevel

type LogLevel int32

LogLevel represents logging level.

It satisfies flag.Value interface.

const (

	// DebugLevel is the lowest level of logging.
	// Debug logs are intended for debugging and development purposes.
	DebugLevel LogLevel

	// InfoLevel is used for general informational log messages.
	InfoLevel

	// WarnLevel is used for undesired but relatively expected events,
	// which may indicate a problem.
	WarnLevel

	// ErrorLevel is used for undesired and unexpected events that
	// the program can recover from.
	ErrorLevel

	// FatalLevel is used for undesired and unexpected events that
	// the program cannot recover from.
	FatalLevel
)

func (*LogLevel) Set

func (l *LogLevel) Set(val string) error

Set is part of the flag.Value interface.

func (*LogLevel) String

func (l *LogLevel) String() string

String is part of the flag.Value interface.

type Logger

type Logger interface {
	// Debug logs a message at Debug level.
	Debug(args ...interface{})

	// Info logs a message at Info level.
	Info(args ...interface{})

	// Warn logs a message at Warning level.
	Warn(args ...interface{})

	// Error logs a message at Error level.
	Error(args ...interface{})

	// Fatal logs a message at Fatal level
	// and process will exit with status set to 1.
	Fatal(args ...interface{})
}

Logger supports logging at various log levels.

type MiddlewareFunc

type MiddlewareFunc func(Handler) Handler

MiddlewareFunc is a function which receives an asynq.Handler and returns another asynq.Handler. Typically, the returned handler is a closure which does something with the context and task passed to it, and then calls the handler passed as parameter to the MiddlewareFunc.

type Option

type Option interface{}

Option specifies the task processing behavior.

func Deadline

func Deadline(t time.Time) Option

Deadline returns an option to specify the deadline for the given task. If it reaches the deadline before the Handler returns, then the task will be retried.

If there's a conflicting Timeout option, whichever comes earliest will be used.

func MaxRetry

func MaxRetry(n int) Option

MaxRetry returns an option to specify the max number of times the task will be retried.

Negative retry count is treated as zero retry.

func ProcessAt

func ProcessAt(t time.Time) Option

ProcessAt returns an option to specify when to process the given task.

If there's a conflicting ProcessIn option, the last option passed to Enqueue overrides the others.

func ProcessIn

func ProcessIn(d time.Duration) Option

ProcessIn returns an option to specify when to process the given task relative to the current time.

If there's a conflicting ProcessAt option, the last option passed to Enqueue overrides the others.

func Queue

func Queue(name string) Option

Queue returns an option to specify the queue to enqueue the task into.

Queue name is case-insensitive and the lowercased version is used.

func Timeout

func Timeout(d time.Duration) Option

Timeout returns an option to specify how long a task may run. If the timeout elapses before the Handler returns, then the task will be retried.

Zero duration means no limit.

If there's a conflicting Deadline option, whichever comes earliest will be used.

func Unique

func Unique(ttl time.Duration) Option

Unique returns an option to enqueue a task only if the given task is unique. Task enqueued with this option is guaranteed to be unique within the given ttl. Once the task gets processed successfully or once the TTL has expired, another task with the same uniqueness may be enqueued. ErrDuplicateTask error is returned when enqueueing a duplicate task.

Uniqueness of a task is based on the following properties:

  • Task Type
  • Task Payload
  • Queue Name

type Payload

type Payload struct {
	// contains filtered or unexported fields
}

Payload holds arbitrary data needed for task execution.

func (Payload) GetBool

func (p Payload) GetBool(key string) (bool, error)

GetBool returns a boolean value if a boolean type is associated with the key, otherwise reports an error.

func (Payload) GetDuration

func (p Payload) GetDuration(key string) (time.Duration, error)

GetDuration returns a duration value if a correct map type is associated with the key, otherwise reports an error.

func (Payload) GetFloat64

func (p Payload) GetFloat64(key string) (float64, error)

GetFloat64 returns a float64 value if a numeric type is associated with the key, otherwise reports an error.

func (Payload) GetInt

func (p Payload) GetInt(key string) (int, error)

GetInt returns an int value if a numeric type is associated with the key, otherwise reports an error.

func (Payload) GetIntSlice

func (p Payload) GetIntSlice(key string) ([]int, error)

GetIntSlice returns a slice of ints if a int slice type is associated with the key, otherwise reports an error.

func (Payload) GetString

func (p Payload) GetString(key string) (string, error)

GetString returns a string value if a string type is associated with the key, otherwise reports an error.

func (Payload) GetStringMap

func (p Payload) GetStringMap(key string) (map[string]interface{}, error)

GetStringMap returns a map of string to empty interface if a correct map type is associated with the key, otherwise reports an error.

func (Payload) GetStringMapBool

func (p Payload) GetStringMapBool(key string) (map[string]bool, error)

GetStringMapBool returns a map of string to boolean if a correct map type is associated with the key, otherwise reports an error.

func (Payload) GetStringMapInt

func (p Payload) GetStringMapInt(key string) (map[string]int, error)

GetStringMapInt returns a map of string to int if a correct map type is associated with the key, otherwise reports an error.

func (Payload) GetStringMapString

func (p Payload) GetStringMapString(key string) (map[string]string, error)

GetStringMapString returns a map of string to string if a correct map type is associated with the key, otherwise reports an error.

func (Payload) GetStringMapStringSlice

func (p Payload) GetStringMapStringSlice(key string) (map[string][]string, error)

GetStringMapStringSlice returns a map of string to string slice if a correct map type is associated with the key, otherwise reports an error.

func (Payload) GetStringSlice

func (p Payload) GetStringSlice(key string) ([]string, error)

GetStringSlice returns a slice of strings if a string slice type is associated with the key, otherwise reports an error.

func (Payload) GetTime

func (p Payload) GetTime(key string) (time.Time, error)

GetTime returns a time value if a correct map type is associated with the key, otherwise reports an error.

func (Payload) Has

func (p Payload) Has(key string) bool

Has reports whether key exists.

type PendingTask

type PendingTask struct {
	*Task
	ID    string
	Queue string
}

PendingTask is a task in a queue and is ready to be processed.

type QueueStats

type QueueStats struct {
	// Name of the queue.
	Queue string
	// Size is the total number of tasks in the queue.
	// The value is the sum of Pending, Active, Scheduled, Retry, and Dead.
	Size int
	// Number of pending tasks.
	Pending int
	// Number of active tasks.
	Active int
	// Number of scheduled tasks.
	Scheduled int
	// Number of retry tasks.
	Retry int
	// Number of dead tasks.
	Dead int
	// Total number of tasks being processed during the given date.
	// The number includes both succeeded and failed tasks.
	Processed int
	// Total number of tasks failed to be processed during the given date.
	Failed int
	// Paused indicates whether the queue is paused.
	// If true, tasks in the queue will not be processed.
	Paused bool
	// Time when this stats was taken.
	Timestamp time.Time
}

QueueStats represents a state of queues at a certain time.

type RedisClientOpt

type RedisClientOpt struct {
	// Network type to use, either tcp or unix.
	// Default is tcp.
	Network string

	// Redis server address in "host:port" format.
	Addr string

	// Username to authenticate the current connection when Redis ACLs are used.
	// See: https://redis.io/commands/auth.
	Username string

	// Password to authenticate the current connection.
	// See: https://redis.io/commands/auth.
	Password string

	// Redis DB to select after connecting to a server.
	// See: https://redis.io/commands/select.
	DB int

	// Maximum number of socket connections.
	// Default is 10 connections per every CPU as reported by runtime.NumCPU.
	PoolSize int

	// TLS Config used to connect to a server.
	// TLS will be negotiated only if this field is set.
	TLSConfig *tls.Config
}

RedisClientOpt is used to create a redis client that connects to a redis server directly.

type RedisClusterClientOpt

type RedisClusterClientOpt struct {
	// A seed list of host:port addresses of cluster nodes.
	Addrs []string

	// The maximum number of retries before giving up.
	// Command is retried on network errors and MOVED/ASK redirects.
	// Default is 8 retries.
	MaxRedirects int

	// Username to authenticate the current connection when Redis ACLs are used.
	// See: https://redis.io/commands/auth.
	Username string

	// Password to authenticate the current connection.
	// See: https://redis.io/commands/auth.
	Password string

	// TLS Config used to connect to a server.
	// TLS will be negotiated only if this field is set.
	TLSConfig *tls.Config
}

RedisFailoverClientOpt is used to creates a redis client that connects to redis cluster.

type RedisConnOpt

type RedisConnOpt interface{}

RedisConnOpt is a discriminated union of types that represent Redis connection configuration option.

RedisConnOpt represents a sum of following types:

  • RedisClientOpt
  • RedisFailoverClientOpt
  • RedisClusterClientOpt

func ParseRedisURI

func ParseRedisURI(uri string) (RedisConnOpt, error)

ParseRedisURI parses redis uri string and returns RedisConnOpt if uri is valid. It returns a non-nil error if uri cannot be parsed.

Three URI schemes are supported, which are redis:, redis-socket:, and redis-sentinel:. Supported formats are:

redis://[:password@]host[:port][/dbnumber]
redis-socket://[:password@]path[?db=dbnumber]
redis-sentinel://[:password@]host1[:port][,host2:[:port]][,hostN:[:port]][?master=masterName]
Example
package main

import (
	"fmt"
	"log"

	"github.com/itsursujit/asynq"
)

func main() {
	rconn, err := asynq.ParseRedisURI("redis://localhost:6379/10")
	if err != nil {
		log.Fatal(err)
	}
	r, ok := rconn.(asynq.RedisClientOpt)
	if !ok {
		log.Fatal("unexpected type")
	}
	fmt.Println(r.Addr)
	fmt.Println(r.DB)
}
Output:

localhost:6379
10

type RedisFailoverClientOpt

type RedisFailoverClientOpt struct {
	// Redis master name that monitored by sentinels.
	MasterName string

	// Addresses of sentinels in "host:port" format.
	// Use at least three sentinels to avoid problems described in
	// https://redis.io/topics/sentinel.
	SentinelAddrs []string

	// Redis sentinel password.
	SentinelPassword string

	// Username to authenticate the current connection when Redis ACLs are used.
	// See: https://redis.io/commands/auth.
	Username string

	// Password to authenticate the current connection.
	// See: https://redis.io/commands/auth.
	Password string

	// Redis DB to select after connecting to a server.
	// See: https://redis.io/commands/select.
	DB int

	// Maximum number of socket connections.
	// Default is 10 connections per every CPU as reported by runtime.NumCPU.
	PoolSize int

	// TLS Config used to connect to a server.
	// TLS will be negotiated only if this field is set.
	TLSConfig *tls.Config
}

RedisFailoverClientOpt is used to creates a redis client that talks to redis sentinels for service discovery and has an automatic failover capability.

type Result

type Result struct {
	// ID is a unique identifier for the task.
	ID string

	// ProcessAt indicates when the task should be processed.
	ProcessAt time.Time

	// Retry is the maximum number of retry for the task.
	Retry int

	// Queue is a name of the queue the task is enqueued to.
	Queue string

	// Timeout is the timeout value for the task.
	// Counting for timeout starts when a worker starts processing the task.
	// If task processing doesn't complete within the timeout, the task will be retried.
	// The value zero means no timeout.
	//
	// If deadline is set, min(now+timeout, deadline) is used, where the now is the time when
	// a worker starts processing the task.
	Timeout time.Duration

	// Deadline is the deadline value for the task.
	// If task processing doesn't complete before the deadline, the task will be retried.
	// The value time.Unix(0, 0) means no deadline.
	//
	// If timeout is set, min(now+timeout, deadline) is used, where the now is the time when
	// a worker starts processing the task.
	Deadline time.Time
}

A Result holds enqueued task's metadata.

type RetryTask

type RetryTask struct {
	*Task
	ID            string
	Queue         string
	NextProcessAt time.Time
	MaxRetry      int
	Retried       int
	ErrorMsg      string
	// contains filtered or unexported fields
}

RetryTask is a task scheduled to be retried in the future.

func (*RetryTask) Key

func (t *RetryTask) Key() string

Key returns a key used to delete, run, and kill the task.

type ScheduledTask

type ScheduledTask struct {
	*Task
	ID            string
	Queue         string
	NextProcessAt time.Time
	// contains filtered or unexported fields
}

ScheduledTask is a task scheduled to be processed in the future.

func (*ScheduledTask) Key

func (t *ScheduledTask) Key() string

Key returns a key used to delete, run, and kill the task.

type ServeMux

type ServeMux struct {
	// contains filtered or unexported fields
}

ServeMux is a multiplexer for asynchronous tasks. It matches the type of each task against a list of registered patterns and calls the handler for the pattern that most closely matches the task's type name.

Longer patterns take precedence over shorter ones, so that if there are handlers registered for both "images" and "images:thumbnails", the latter handler will be called for tasks with a type name beginning with "images:thumbnails" and the former will receive tasks with type name beginning with "images".

func NewServeMux

func NewServeMux() *ServeMux

NewServeMux allocates and returns a new ServeMux.

func (*ServeMux) Handle

func (mux *ServeMux) Handle(pattern string, handler Handler)

Handle registers the handler for the given pattern. If a handler already exists for pattern, Handle panics.

func (*ServeMux) HandleFunc

func (mux *ServeMux) HandleFunc(pattern string, handler func(context.Context, *Task) error)

HandleFunc registers the handler function for the given pattern.

func (*ServeMux) Handler

func (mux *ServeMux) Handler(t *Task) (h Handler, pattern string)

Handler returns the handler to use for the given task. It always return a non-nil handler.

Handler also returns the registered pattern that matches the task.

If there is no registered handler that applies to the task, handler returns a 'not found' handler which returns an error.

func (*ServeMux) ProcessTask

func (mux *ServeMux) ProcessTask(ctx context.Context, task *Task) error

ProcessTask dispatches the task to the handler whose pattern most closely matches the task type.

func (*ServeMux) Use

func (mux *ServeMux) Use(mws ...MiddlewareFunc)

Use appends a MiddlewareFunc to the chain. Middlewares are executed in the order that they are applied to the ServeMux.

type Server

type Server struct {
	ID uuid.UUID

	Handler     *ServeMux
	Concurrency int
	// contains filtered or unexported fields
}

Server is responsible for managing the background-task processing.

Server pulls tasks off queues and processes them. If the processing of a task is unsuccessful, server will schedule it for a retry. A task will be retried until either the task gets processed successfully or until it reaches its max retry count.

If a task exhausts its retries, it will be moved to the "dead" queue and will be kept in the queue for some time until a certain condition is met (e.g., queue size reaches a certain limit, or the task has been in the queue for a certain amount of time).

func NewServer

func NewServer(r RedisConnOpt, cfg Config) *Server

NewServer returns a new Server given a redis connection option and background processing configuration.

func (*Server) AddQueue

func (srv *Server) AddQueue(queueName string, priority int) error

func (*Server) Quiet

func (srv *Server) Quiet()

Quiet signals the server to stop pulling new tasks off queues. Quiet should be used before stopping the server.

Example
package main

import (
	"log"
	"os"
	"os/signal"

	"github.com/itsursujit/asynq"
	"golang.org/x/sys/unix"
)

func main() {
	srv := asynq.NewServer(
		asynq.RedisClientOpt{Addr: ":6379"},
		asynq.Config{Concurrency: 20},
	)

	h := asynq.NewServeMux()
	// ... Register handlers

	if err := srv.Start(h); err != nil {
		log.Fatal(err)
	}

	sigs := make(chan os.Signal, 1)
	signal.Notify(sigs, unix.SIGTERM, unix.SIGINT, unix.SIGTSTP)
	// Handle SIGTERM, SIGINT to exit the program.
	// Handle SIGTSTP to stop processing new tasks.
	for {
		s := <-sigs
		if s == unix.SIGTSTP {
			srv.Quiet() // stop processing new tasks
			continue
		}
		break
	}

	srv.Stop()
}
Output:

func (*Server) RemoveQueue

func (srv *Server) RemoveQueue(queueName string) error

func (*Server) Run

func (srv *Server) Run() error

Run starts the background-task processing and blocks until an os signal to exit the program is received. Once it receives a signal, it gracefully shuts down all active workers and other goroutines to process the tasks.

Run returns any error encountered during server startup time. If the server has already been stopped, ErrServerStopped is returned.

Example
package main

import (
	"log"

	"github.com/itsursujit/asynq"
)

func main() {
	srv := asynq.NewServer(
		asynq.RedisClientOpt{Addr: ":6379"},
		asynq.Config{Concurrency: 20},
	)

	h := asynq.NewServeMux()
	// ... Register handlers

	// Run blocks and waits for os signal to terminate the program.
	if err := srv.Run(h); err != nil {
		log.Fatal(err)
	}
}
Output:

func (*Server) Start

func (srv *Server) Start() error

Start starts the worker server. Once the server has started, it pulls tasks off queues and starts a worker goroutine for each task. Tasks are processed concurrently by the workers up to the number of Concurrency specified at the initialization time.

Start returns any error encountered during server startup time. If the server has already been stopped, ErrServerStopped is returned.

func (*Server) Stop

func (srv *Server) Stop()

Stop stops the worker server. It gracefully closes all active workers. The server will wait for active workers to finish processing tasks for duration specified in Config.ShutdownTimeout. If worker didn't finish processing a task during the timeout, the task will be pushed back to Redis.

Example
package main

import (
	"log"
	"os"
	"os/signal"

	"github.com/itsursujit/asynq"
	"golang.org/x/sys/unix"
)

func main() {
	srv := asynq.NewServer(
		asynq.RedisClientOpt{Addr: ":6379"},
		asynq.Config{Concurrency: 20},
	)

	h := asynq.NewServeMux()
	// ... Register handlers

	if err := srv.Start(h); err != nil {
		log.Fatal(err)
	}

	sigs := make(chan os.Signal, 1)
	signal.Notify(sigs, unix.SIGTERM, unix.SIGINT)
	<-sigs // wait for termination signal

	srv.Stop()
}
Output:

func (*Server) Tune

func (srv *Server) Tune(noOfWorkers int)

type Task

type Task struct {
	// Type indicates the type of task to be performed.
	Type string

	// Payload holds data needed to perform the task.
	Payload Payload
}

Task represents a unit of work to be performed.

func NewTask

func NewTask(typename string, payload map[string]interface{}) *Task

NewTask returns a new Task given a type name and payload data.

The payload values must be serializable.

Directories

Path Synopsis
internal
asynqtest
Package asynqtest defines test helpers for asynq and its internal packages.
Package asynqtest defines test helpers for asynq and its internal packages.
base
Package base defines foundational types and constants used in asynq package.
Package base defines foundational types and constants used in asynq package.
log
Package log exports logging related types and functions.
Package log exports logging related types and functions.
rdb
Package rdb encapsulates the interactions with redis.
Package rdb encapsulates the interactions with redis.
testbroker
Package testbroker exports a broker implementation that should be used in package testing.
Package testbroker exports a broker implementation that should be used in package testing.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL