taskq

package module
v4.0.0-beta.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 10, 2023 License: BSD-2-Clause Imports: 23 Imported by: 13

README

Golang asynchronous task/job queue with Redis, SQS, IronMQ, and in-memory backends

build workflow PkgGoDev Documentation Chat

taskq is brought to you by ⭐ uptrace/uptrace. Uptrace is an open source and blazingly fast distributed tracing tool powered by OpenTelemetry and ClickHouse. Give it a star as well!

Features

  • Redis, SQS, IronMQ, and in-memory backends.
  • Automatically scaling number of goroutines used to fetch (fetcher) and process messages (worker).
  • Global rate limiting.
  • Global limit of workers.
  • Call once - deduplicating messages with same name.
  • Automatic retries with exponential backoffs.
  • Automatic pausing when all messages in queue fail.
  • Fallback handler for processing failed messages.
  • Message batching. It is used in SQS and IronMQ backends to add/delete messages in batches.
  • Automatic message compression using snappy / s2.

Resources:

Getting started

To get started, see Golang Task Queue documentation.

Producer:

import (
    "github.com/vmihailenco/taskq/v3"
    "github.com/vmihailenco/taskq/v3/redisq"
)

// Create a queue factory.
var QueueFactory = redisq.NewFactory()

// Create a queue.
var MainQueue = QueueFactory.RegisterQueue(&taskq.QueueOptions{
    Name:  "api-worker",
    Redis: Redis, // go-redis client
})

// Register a task.
var CountTask = taskq.RegisterTask("counter", &taskq.TaskOptions{
    Handler: func() error {
        IncrLocalCounter()
        return nil
    },
})

ctx := context.Background()

// And start producing.
for {
	// Call the task without any args.
	err := MainQueue.AddJob(ctx, CountTask.NewJob())
	if err != nil {
		panic(err)
	}
	time.Sleep(time.Second)
}

Consumer:

// Start consuming the queue.
if err := MainQueue.Start(context.Background()); err != nil {
    log.Fatal(err)
}

See also

Contributors

Thanks to all the people who already contributed!

Documentation

Overview

Package taskq implements task/job queue with Redis, SQS, IronMQ, and in-memory backends.

Example (CustomRateLimit)
package main

import (
	"context"
	"fmt"
	"time"

	"github.com/vmihailenco/taskq/memqueue/v4"
	"github.com/vmihailenco/taskq/v4"
)

type RateLimitError string

func (e RateLimitError) Error() string {
	return string(e)
}

func (RateLimitError) Delay() time.Duration {
	return 3 * time.Second
}

func main() {
	start := time.Now()
	q := memqueue.NewQueue(&taskq.QueueConfig{
		Name: "test",
	})
	task := taskq.RegisterTask("Example_customRateLimit", &taskq.TaskConfig{
		Handler: func() error {
			fmt.Println("retried in", timeSince(start))
			return RateLimitError("calm down")
		},
		RetryLimit: 2,
		MinBackoff: time.Millisecond,
	})

	ctx := context.Background()
	q.AddJob(ctx, task.NewJob())

	// Wait for all messages to be processed.
	_ = q.Close()

}
Output:

retried in 0s
retried in 3s
Example (MessageDelay)
start := time.Now()
q := memqueue.NewQueue(&taskq.QueueConfig{
	Name: "test",
})
task := taskq.RegisterTask("Example_messageDelay", &taskq.TaskConfig{
	Handler: func() {
		fmt.Println("processed with delay", timeSince(start))
	},
})

ctx := context.Background()
msg := task.NewJob()
msg.Delay = time.Second
_ = q.AddJob(ctx, msg)

// Wait for all messages to be processed.
_ = q.Close()
Output:

processed with delay 1s
Example (Once)
q := memqueue.NewQueue(&taskq.QueueConfig{
	Name:      "test",
	Redis:     redisRing(),
	RateLimit: redis_rate.PerSecond(1),
})
task := taskq.RegisterTask("Example_once", &taskq.TaskConfig{
	Handler: func(name string) {
		fmt.Println("hello", name)
	},
})

ctx := context.Background()
for i := 0; i < 10; i++ {
	msg := task.NewJob("world")
	// Call once in a second.
	msg.OnceInPeriod(time.Second)

	_ = q.AddJob(ctx, msg)
}

// Wait for all messages to be processed.
_ = q.Close()
Output:

hello world
Example (RateLimit)
start := time.Now()
q := memqueue.NewQueue(&taskq.QueueConfig{
	Name:      "test",
	Redis:     redisRing(),
	RateLimit: redis_rate.PerSecond(1),
})
task := taskq.RegisterTask("Example_rateLimit", &taskq.TaskConfig{
	Handler: func() {},
})

const n = 5

ctx := context.Background()
for i := 0; i < n; i++ {
	_ = q.AddJob(ctx, task.NewJob())
}

// Wait for all messages to be processed.
_ = q.Close()

fmt.Printf("%d msg/s", timeSinceCeil(start)/time.Second/n)
Output:

1 msg/s
Example (RetryOnError)
start := time.Now()
q := memqueue.NewQueue(&taskq.QueueConfig{
	Name: "test",
})
task := taskq.RegisterTask("Example_retryOnError", &taskq.TaskConfig{
	Handler: func() error {
		fmt.Println("retried in", timeSince(start))
		return errors.New("fake error")
	},
	RetryLimit: 3,
	MinBackoff: time.Second,
})

ctx := context.Background()
q.AddJob(ctx, task.NewJob())

// Wait for all messages to be processed.
_ = q.Close()
Output:

retried in 0s
retried in 1s
retried in 3s

Index

Examples

Constants

This section is empty.

Variables

View Source
var ErrAsyncTask = errors.New("taskq: async task")
View Source
var ErrDuplicate = errors.New("taskq: message with such name already exists")

ErrDuplicate is returned when adding duplicate message to the queue.

Functions

func SetLogger

func SetLogger(logger logr.Logger)

SetLogger configures the logger used internally to opentelemetry.

func SetUnknownTaskConfig

func SetUnknownTaskConfig(opt *TaskConfig)

func Version

func Version() string

Version is the current release version.

Types

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

Consumer reserves messages from the queue, processes them, and then either releases or deletes messages from the queue.

func NewConsumer

func NewConsumer(q Queue) *Consumer

NewConsumer creates new Consumer for the queue using provided processing options.

func StartConsumer

func StartConsumer(ctx context.Context, q Queue) *Consumer

StartConsumer creates new QueueConsumer and starts it.

func (*Consumer) AddHook

func (c *Consumer) AddHook(hook ConsumerHook)

AddHook adds a hook into message processing.

func (*Consumer) AddJob

func (c *Consumer) AddJob(ctx context.Context, job *Job) error

func (*Consumer) Len

func (c *Consumer) Len() int

func (*Consumer) Options

func (c *Consumer) Options() *QueueConfig

func (*Consumer) Process

func (c *Consumer) Process(ctx context.Context, job *Job) error

Process is low-level API to process message bypassing the internal queue.

func (*Consumer) ProcessAll

func (c *Consumer) ProcessAll(ctx context.Context) error

ProcessAll starts workers to process messages in the queue and then stops them when all messages are processed.

func (*Consumer) ProcessOne

func (c *Consumer) ProcessOne(ctx context.Context) error

ProcessOne processes at most one message in the queue.

func (*Consumer) Purge

func (c *Consumer) Purge(ctx context.Context) error

Purge discards messages from the internal queue.

func (*Consumer) Put

func (c *Consumer) Put(ctx context.Context, job *Job)

func (*Consumer) Queue

func (c *Consumer) Queue() Queue

func (*Consumer) Start

func (c *Consumer) Start(ctx context.Context) error

Start starts consuming messages in the queue.

func (*Consumer) Stats

func (c *Consumer) Stats() *ConsumerStats

Stats returns processor stats.

func (*Consumer) Stop

func (c *Consumer) Stop() error

Stop is StopTimeout with 30 seconds timeout.

func (*Consumer) StopTimeout

func (c *Consumer) StopTimeout(timeout time.Duration) error

StopTimeout waits workers for timeout duration to finish processing current messages and stops workers.

func (*Consumer) String

func (c *Consumer) String() string

type ConsumerHook

type ConsumerHook interface {
	BeforeProcessJob(context.Context, *ProcessJobEvent) context.Context
	AfterProcessJob(context.Context, *ProcessJobEvent)
}

type ConsumerStats

type ConsumerStats struct {
	NumWorker  uint32
	NumFetcher uint32

	BufferSize uint32
	Buffered   uint32

	InFlight  uint32
	Processed uint32
	Retries   uint32
	Fails     uint32
}

type Delayer

type Delayer interface {
	Delay() time.Duration
}

type Factory

type Factory interface {
	RegisterQueue(*QueueConfig) Queue
	Range(func(Queue) bool)
	StartConsumers(context.Context) error
	StopConsumers() error
	Close() error
}

Factory is an interface that abstracts creation of new queues. It is implemented in subpackages memqueue, azsqs, and ironmq.

type Handler

type Handler interface {
	HandleJob(ctx context.Context, msg *Job) error
}

Handler is an interface for processing messages.

func NewHandler

func NewHandler(fn interface{}) Handler

type HandlerFunc

type HandlerFunc func(*Job) error

func (HandlerFunc) HandleJob

func (fn HandlerFunc) HandleJob(ctx context.Context, msg *Job) error

type Job

type Job struct {
	// SQS/IronMQ message id.
	ID string `msgpack:"1,omitempty,alias:ID"`

	// Optional name for the message. Jobs with the same name
	// are processed only once.
	Name string `msgpack:"-"`

	// Delay specifies the duration the queue must wait
	// before executing the message.
	Delay time.Duration `msgpack:"-"`

	// Args passed to the handler.
	Args []interface{} `msgpack:"-"`

	// Binary representation of the args.
	ArgsCompression string `msgpack:"2,omitempty,alias:ArgsCompression"`
	ArgsBin         []byte `msgpack:"3,alias:ArgsBin"`

	// SQS/IronMQ reservation id that is used to release/delete the message.
	ReservationID string `msgpack:"-"`

	// The number of times the message has been reserved or released.
	ReservedCount int `msgpack:"4,omitempty,alias:ReservedCount"`

	TaskName string `msgpack:"5,alias:TaskName"`
	Err      error  `msgpack:"-"`
	// contains filtered or unexported fields
}

Job is used to create and retrieve messages from a queue.

func NewJob

func NewJob(args ...interface{}) *Job

func (*Job) MarshalArgs

func (m *Job) MarshalArgs() ([]byte, error)

func (*Job) MarshalBinary

func (m *Job) MarshalBinary() ([]byte, error)

func (*Job) OnceInPeriod

func (m *Job) OnceInPeriod(period time.Duration, args ...interface{})

OnceInPeriod uses the period and the args to generate such a message name that message with such args is added to the queue once in a given period. If args are not provided then message args are used instead.

func (*Job) OnceWithDelay

func (m *Job) OnceWithDelay(delay time.Duration)

func (*Job) OnceWithSchedule

func (m *Job) OnceWithSchedule(tm time.Time)

func (*Job) SetDelay

func (m *Job) SetDelay(delay time.Duration)

SetDelay sets the message delay.

func (*Job) String

func (m *Job) String() string

func (*Job) UnmarshalBinary

func (m *Job) UnmarshalBinary(b []byte) error

type ProcessJobEvent

type ProcessJobEvent struct {
	Job       *Job
	StartTime time.Time

	Stash map[interface{}]interface{}
}

type Queue

type Queue interface {
	fmt.Stringer
	Name() string
	Options() *QueueConfig
	Consumer() QueueConsumer

	Len(ctx context.Context) (int, error)
	AddJob(ctx context.Context, msg *Job) error
	ReserveN(ctx context.Context, n int, waitTimeout time.Duration) ([]Job, error)
	Release(ctx context.Context, msg *Job) error
	Delete(ctx context.Context, msg *Job) error
	Purge(ctx context.Context) error
	Close() error
	CloseTimeout(timeout time.Duration) error
}

type QueueConfig

type QueueConfig struct {
	// Queue name.
	Name string

	NumWorker int
	// Global limit of concurrently running workers across all servers.
	// Overrides NumWorker.
	WorkerLimit int
	// Maximum number of goroutines fetching messages.
	// Default is 8 * number of CPUs.
	NumFetcher int

	// Number of messages reserved by a fetcher in the queue in one request.
	// Default is 10 messages.
	ReservationSize int
	// Time after which the reserved message is returned to the queue.
	// Default is 5 minutes.
	ReservationTimeout time.Duration
	// Time that a long polling receive call waits for a message to become
	// available before returning an empty response.
	// Default is 10 seconds.
	WaitTimeout time.Duration
	// Size of the buffer where reserved messages are stored.
	// Default is the same as ReservationSize.
	BufferSize int

	// Number of consecutive failures after which queue processing is paused.
	// Default is 100 failures.
	PauseErrorsThreshold int

	// Processing rate limit.
	RateLimit redis_rate.Limit

	// Optional rate limiter. The default is to use Redis.
	RateLimiter *redis_rate.Limiter

	// Redis client that is used for storing metadata.
	Redis Redis

	// Optional storage interface. The default is to use Redis.
	Storage Storage

	// Optional message handler. The default is the global Tasks registry.
	Handler Handler

	// ConsumerIdleTimeout Time after which the consumer need to be deleted.
	// Default is 6 hour
	ConsumerIdleTimeout time.Duration

	// SchedulerBackoffTime is the time of backoff for the scheduler(
	// Scheduler was designed to clean zombie Consumer and requeue pending msgs, and so on.
	// Default is randomly between 1~1.5s
	// We can change it to a bigger value so that it won't slowdown the redis when using redis queue.
	// It will be between SchedulerBackoffTime and SchedulerBackoffTime+250ms.
	SchedulerBackoffTime time.Duration
	// contains filtered or unexported fields
}

func (*QueueConfig) Init

func (opt *QueueConfig) Init()

type QueueConsumer

type QueueConsumer interface {
	// AddHook adds a hook into message processing.
	AddHook(hook ConsumerHook)
	Queue() Queue
	Options() *QueueConfig
	Len() int
	// Stats returns processor stats.
	Stats() *ConsumerStats
	AddJob(ctx context.Context, job *Job) error
	// Start starts consuming messages in the queue.
	Start(ctx context.Context) error
	// Stop is StopTimeout with 30 seconds timeout.
	Stop() error
	// StopTimeout waits workers for timeout duration to finish processing current
	// messages and stops workers.
	StopTimeout(timeout time.Duration) error
	// ProcessAll starts workers to process messages in the queue and then stops
	// them when all messages are processed.
	ProcessAll(ctx context.Context) error
	// ProcessOne processes at most one message in the queue.
	ProcessOne(ctx context.Context) error
	// Process is low-level API to process message bypassing the internal queue.
	Process(ctx context.Context, msg *Job) error
	Put(ctx context.Context, msg *Job)
	// Purge discards messages from the internal queue.
	Purge(ctx context.Context) error
	String() string
}

QueueConsumer reserves messages from the queue, processes them, and then either releases or deletes messages from the queue.

type Redis

type Redis interface {
	redis.Scripter

	Del(ctx context.Context, keys ...string) *redis.IntCmd
	SetNX(ctx context.Context, key string, value interface{}, expiration time.Duration) *redis.BoolCmd
	Pipelined(ctx context.Context, fn func(pipe redis.Pipeliner) error) ([]redis.Cmder, error)
}

type Storage

type Storage interface {
	Exists(ctx context.Context, key string) bool
}

func NewLocalStorage

func NewLocalStorage() Storage

type Task

type Task struct {
	// contains filtered or unexported fields
}

func NewTask

func NewTask(name string) *Task

func RegisterTask

func RegisterTask(name string, opt *TaskConfig) *Task

func (*Task) HandleJob

func (t *Task) HandleJob(ctx context.Context, msg *Job) error

func (*Task) Name

func (t *Task) Name() string

func (*Task) NewJob

func (t *Task) NewJob(args ...interface{}) *Job

func (*Task) Options

func (t *Task) Options() *TaskConfig

func (*Task) String

func (t *Task) String() string

type TaskConfig

type TaskConfig struct {
	// Function called to process a message.
	// There are three permitted types of signature:
	// 1. A zero-argument function
	// 2. A function whose arguments are assignable in type from those which are passed in the message
	// 3. A function which takes a single `*Job` argument
	// The handler function may also optionally take a Context as a first argument and may optionally return an error.
	// If the handler takes a Context, when it is invoked it will be passed the same Context as that which was passed to
	// `StartConsumer`. If the handler returns a non-nil error the message processing will fail and will be retried/.
	Handler interface{}
	// Function called to process failed message after the specified number of retries have all failed.
	// The FallbackHandler accepts the same types of function as the Handler.
	FallbackHandler interface{}

	// Optional function used by Consumer with defer statement
	// to recover from panics.
	DeferFunc func()

	// Number of tries/releases after which the message fails permanently
	// and is deleted.
	// Default is 64 retries.
	RetryLimit int
	// Minimum backoff time between retries.
	// Default is 30 seconds.
	MinBackoff time.Duration
	// Maximum backoff time between retries.
	// Default is 30 minutes.
	MaxBackoff time.Duration
	// contains filtered or unexported fields
}

type TaskMap

type TaskMap struct {
	// contains filtered or unexported fields
}
var Tasks TaskMap

func (*TaskMap) Get

func (r *TaskMap) Get(name string) *Task

func (*TaskMap) HandleJob

func (r *TaskMap) HandleJob(ctx context.Context, msg *Job) error

func (*TaskMap) Range

func (r *TaskMap) Range(fn func(name string, task *Task) bool)

func (*TaskMap) Register

func (r *TaskMap) Register(name string, opt *TaskConfig) (*Task, error)

func (*TaskMap) Reset

func (r *TaskMap) Reset()

func (*TaskMap) Unregister

func (r *TaskMap) Unregister(task *Task)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL