asynq

package module
v0.0.0-...-41df229 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 11, 2022 License: MIT Imports: 14 Imported by: 0

README

Asynq Build Status

Simple, efficent asynchronous task processing library in Go.

Table of Contents

Overview

Asynq provides a simple interface to asynchronous task processing.

Asynq also ships with a CLI to monitor the queues and take manual actions if needed.

Asynq provides:

  • Clear separation of task producer and consumer
  • Ability to schedule task processing in the future
  • Automatic retry of failed tasks with exponential backoff
  • Ability to configure max retry count per task
  • Ability to configure max number of worker goroutines to process tasks
  • Unix signal handling to safely shutdown background processing
  • Enhanced reliability TODO(brianbinbin): link to wiki page describing this.
  • CLI to query and mutate queues state for mointoring and administrative purposes

Requirements

Dependency Version
Redis v2.6+
Go v1.12+
github.com/go-redis/redis v.7.0+

Installation

go get github.com/brianbinbin/asynq

Getting Started

  1. Import asynq in your file.
import "github.com/brianbinbin/asynq"
  1. Create a Client instance to create tasks.
func main() {
    r := redis.NewClient(&redis.Options{
        Addr: "localhost:6379",
    }
    client := asynq.NewClient(r)

    t1 := asynq.Task{
        Type: "send_welcome_email",
        Payload: map[string]interface{}{
          "recipient_id": 1234,
        },
    }

    t2 := asynq.Task{
        Type: "send_reminder_email",
        Payload: map[string]interface{}{
          "recipient_id": 1234,
        },
    }

    // process the task immediately.
    err := client.Schedule(&t1, time.Now())

    // process the task 24 hours later.
    err = client.Schedule(&t2, time.Now().Add(24 * time.Hour))

    // specify the max number of retry (default: 25)
    err = client.Schedule(&t1, time.Now(), asynq.MaxRetry(1))
}
  1. Create a Background instance to process tasks.
func main() {
    r := redis.NewClient(&redis.Options{
        Addr: "localhost:6379",
    }
    bg := asynq.NewBackground(r, &asynq.Config{
        Concurrency: 20,
    })

    // Blocks until signal TERM or INT is received.
    // For graceful shutdown, send signal TSTP to stop processing more tasks
    // before sending TERM or INT signal.
    bg.Run(handler)
}

The argument to (*asynq.Background).Run is an interface asynq.Handler which has one method ProcessTask.

// ProcessTask should return nil if the processing of a task
// is successful.
//
// If ProcessTask return a non-nil error or panics, the task
// will be retried.
type Handler interface {
    ProcessTask(*Task) error
}

The simplest way to implement a handler is to define a function with the same signature and use asynq.HandlerFunc adapter type when passing it to Run.

func handler(t *asynq.Task) error {
    switch t.Type {
    case "send_welcome_email":
        id, err := t.Payload.GetInt("recipient_id")
        if err != nil {
            return err
        }
        fmt.Printf("Send Welcome Email to %d\n", id)

    // ... handle other types ...

    default:
        return fmt.Errorf("unexpected task type: %s", t.Type)
    }
    return nil
}

func main() {
    r := redis.NewClient(&redis.Options{
        Addr: "localhost:6379",
    }
    bg := asynq.NewBackground(r, &asynq.Config{
        Concurrency: 20,
    })

    // Use asynq.HandlerFunc adapter for a handler function
    bg.Run(asynq.HandlerFunc(handler))
}

License

Asynq is released under the MIT license. See LICENSE.

Documentation

Overview

Package asynq provides a framework for background task processing.

The Client is used to register a task to be processed at the specified time.

client := asynq.NewClient(redis)

t := asynq.Task{
    Type:    "send_email",
    Payload: map[string]interface{}{"user_id": 42},
}

err := client.Schedule(&t, time.Now().Add(time.Minute))

The Background is used to run the background task processing with a given handler.

bg := asynq.NewBackground(redis, &asynq.Config{
    Concurrency: 10,
})

bg.Run(handler)

Handler is an interface with one method ProcessTask which takes a task and returns an error. Handler should return nil if the processing is successful, otherwise return a non-nil error. If handler returns a non-nil error, the task will be retried in the future.

Example of a type that implements the Handler interface.

type TaskHandler struct {
    // ...
}

func (h *TaskHandler) ProcessTask(task *asynq.Task) error {
    switch task.Type {
    case "send_email":
        id, err := task.Payload.GetInt("user_id")
        // send email
    case "generate_thumbnail":
        // generate thumbnail image
    //...
    default:
        return fmt.Errorf("unepected task type %q", task.Type)
    }
    return nil
}

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Background

type Background struct {
	// contains filtered or unexported fields
}

Background is responsible for managing the background-task processing.

Background manages background queues to process tasks and retry if necessary. If the processing of a task is unsuccessful, background will schedule it for a retry with an exponential backoff until either the task gets processed successfully or it exhausts its max retry count.

Once a task exhausts its retries, it will be moved to the "dead" queue and will be kept in the queue for some time until a certain condition is met (e.g., queue size reaches a certain limit, or the task has been in the queue for a certain amount of time).

func NewBackground

func NewBackground(r *redis.Client, cfg *Config) *Background

NewBackground returns a new Background instance given a redis client and background processing configuration.

func (*Background) Run

func (bg *Background) Run(handler Handler)

Run starts the background-task processing and blocks until an os signal to exit the program is received. Once it receives a signal, it gracefully shuts down all pending workers and other goroutines to process the tasks.

type Client

type Client struct {
	// contains filtered or unexported fields
}

A Client is responsible for scheduling tasks.

A Client is used to register tasks that should be processed immediately or some time in the future.

Clients are safe for concurrent use by multiple goroutines.

func NewClient

func NewClient(r *redis.Client) *Client

NewClient and returns a new Client given a redis configuration.

func (*Client) Schedule

func (c *Client) Schedule(task *Task, processAt time.Time, opts ...Option) error

Schedule registers a task to be processed at the specified time.

Schedule returns nil if the task is registered successfully, otherwise returns non-nil error.

opts specifies the behavior of task processing. If there are conflicting Option the last one overrides the ones before.

type Config

type Config struct {
	// Maximum number of concurrent workers to process tasks.
	//
	// If set to zero or negative value, NewBackground will overwrite the value to one.
	Concurrency int

	// Function to calculate retry delay for a failed task.
	//
	// By default, it uses exponential backoff algorithm to calculate the delay.
	//
	// n is the number of times the task has been retried.
	// e is the error returned by the task handler.
	// t is the task in question. t is read-only, the function should not mutate t.
	RetryDelayFunc func(n int, e error, t *Task) time.Duration
}

Config specifies the background-task processing behavior.

type Handler

type Handler interface {
	ProcessTask(*Task) error
}

A Handler processes a task.

ProcessTask should return nil if the processing of a task is successful.

If ProcessTask return a non-nil error or panics, the task will be retried after delay.

Note: The argument task is ready only, ProcessTask should not mutate the task.

type HandlerFunc

type HandlerFunc func(*Task) error

The HandlerFunc type is an adapter to allow the use of ordinary functions as a Handler. If f is a function with the appropriate signature, HandlerFunc(f) is a Handler that calls f.

func (HandlerFunc) ProcessTask

func (fn HandlerFunc) ProcessTask(task *Task) error

ProcessTask calls fn(task)

type Option

type Option interface{}

Option specifies the processing behavior for the associated task.

func MaxRetry

func MaxRetry(n int) Option

MaxRetry returns an option to specify the max number of times a task will be retried.

Negative retry count is treated as zero retry.

type Payload

type Payload map[string]interface{}

Payload is an arbitrary data needed for task execution. The values have to be JSON serializable.

func (Payload) GetBool

func (p Payload) GetBool(key string) (bool, error)

GetBool returns a boolean value if a boolean type is associated with the key, otherwise reports an error.

func (Payload) GetDuration

func (p Payload) GetDuration(key string) (time.Duration, error)

GetDuration returns a duration value if a correct map type is associated with the key, otherwise reports an error.

func (Payload) GetFloat64

func (p Payload) GetFloat64(key string) (float64, error)

GetFloat64 returns a float64 value if a numeric type is associated with the key, otherwise reports an error.

func (Payload) GetInt

func (p Payload) GetInt(key string) (int, error)

GetInt returns an int value if a numeric type is associated with the key, otherwise reports an error.

func (Payload) GetIntSlice

func (p Payload) GetIntSlice(key string) ([]int, error)

GetIntSlice returns a slice of ints if a int slice type is associated with the key, otherwise reports an error.

func (Payload) GetString

func (p Payload) GetString(key string) (string, error)

GetString returns a string value if a string type is associated with the key, otherwise reports an error.

func (Payload) GetStringMap

func (p Payload) GetStringMap(key string) (map[string]interface{}, error)

GetStringMap returns a map of string to empty interface if a correct map type is associated with the key, otherwise reports an error.

func (Payload) GetStringMapBool

func (p Payload) GetStringMapBool(key string) (map[string]bool, error)

GetStringMapBool returns a map of string to boolean if a correct map type is associated with the key, otherwise reports an error.

func (Payload) GetStringMapInt

func (p Payload) GetStringMapInt(key string) (map[string]int, error)

GetStringMapInt returns a map of string to int if a correct map type is associated with the key, otherwise reports an error.

func (Payload) GetStringMapString

func (p Payload) GetStringMapString(key string) (map[string]string, error)

GetStringMapString returns a map of string to string if a correct map type is associated with the key, otherwise reports an error.

func (Payload) GetStringMapStringSlice

func (p Payload) GetStringMapStringSlice(key string) (map[string][]string, error)

GetStringMapStringSlice returns a map of string to string slice if a correct map type is associated with the key, otherwise reports an error.

func (Payload) GetStringSlice

func (p Payload) GetStringSlice(key string) ([]string, error)

GetStringSlice returns a slice of strings if a string slice type is associated with the key, otherwise reports an error.

func (Payload) GetTime

func (p Payload) GetTime(key string) (time.Time, error)

GetTime returns a time value if a correct map type is associated with the key, otherwise reports an error.

func (Payload) Has

func (p Payload) Has(key string) bool

Has reports whether key exists.

type Task

type Task struct {
	// Type indicates the kind of the task to be performed.
	Type string

	// Payload holds data needed to process the task.
	Payload Payload
}

Task represents a task to be performed.

Directories

Path Synopsis
internal
asynqtest
Package asynqtest defines test helpers for asynq and its internal packages.
Package asynqtest defines test helpers for asynq and its internal packages.
base
Package base defines foundational types and constants used in asynq package.
Package base defines foundational types and constants used in asynq package.
rdb
Package rdb encapsulates the interactions with redis.
Package rdb encapsulates the interactions with redis.
tools

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL