asyncer

package module
v0.3.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 24, 2024 License: MIT Imports: 7 Imported by: 2

README

asyncer

GitHub tag (latest SemVer) Go Reference License

Tests CodeQL Analysis GolangCI Lint Go Report Card

This is a simple, reliable, and efficient distributed task queue in Go. The asyncer just wrapps hibiken/asynq package with some predefined settings. So, if you need more flexibility, you can use hibiken/asynq directly.

Installation

To install the asyncer package, use the following command:

go get github.com/dmitrymomot/asyncer

Usage

Queued tasks

In this example, we will create a simple task that prints a greeting message to the console:

package main

import (
	"context"
	"fmt"
	"time"

	"github.com/dmitrymomot/asyncer"
	"golang.org/x/sync/errgroup"
)

const (
	redisAddr    = "redis://localhost:6379/0"
	TestTaskName = "queued_task"
)

type TestTaskPayload struct {
	Name string
}

// test task handler function
func testTaskHandler(ctx context.Context, payload TestTaskPayload) error {
	fmt.Printf("Hello, %s!\n", payload.Name)
	return nil
}

func main() {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	eg, _ := errgroup.WithContext(ctx)

	// Run a new queue server with redis as the broker.
	eg.Go(asyncer.RunQueueServer(
		ctx, redisAddr, nil,
		// Register a handler for the task.
		asyncer.HandlerFunc[TestTaskPayload](TestTaskName, testTaskHandler),
		// ... add more handlers here ...
	))

	// Create a new enqueuer with redis as the broker.
	enqueuer := asyncer.MustNewEnqueuer(redisAddr)
	defer enqueuer.Close()

	// Enqueue a task with payload.
	// The task will be processed after immediately.
	for i := 0; i < 10; i++ {
		if err := enqueuer.EnqueueTask(ctx, TestTaskName, TestTaskPayload{
			Name: fmt.Sprintf("Test %d", i),
		}); err != nil {
			panic(err)
		}
		time.Sleep(500 * time.Millisecond)
	}

	// Wait for the queue server to exit.
	if err := eg.Wait(); err != nil {
		panic(err)
	}
}
Scheduled tasks (Cron jobs)

Create a task that prints a greeting message to the console every 1 seconds:

package main

import (
	"context"
	"fmt"
	"time"

	"github.com/dmitrymomot/asyncer"
	"golang.org/x/sync/errgroup"
)

const (
	redisAddr    = "redis://localhost:6379/0"
	TestTaskName = "scheduled_task"
)

type TestTaskPayload struct {
	Name string
}

// test task handler function
func testTaskHandler(ctx context.Context) error {
	fmt.Println("scheduled test task handler called at", time.Now().Format(time.RFC3339))
	return nil
}

func main() {
	eg, ctx := errgroup.WithContext(context.Background())

	// Run a new queue server with redis as the broker.
	eg.Go(asyncer.RunQueueServer(
		ctx, redisAddr, nil,
		// Register a handler for the task.
		asyncer.ScheduledHandlerFunc(TestTaskName, testTaskHandler),
		// ... add more handlers here ...
	))

	// Run a scheduler with redis as the broker.
	// The scheduler will schedule tasks to be enqueued at a specified time.
	eg.Go(asyncer.RunSchedulerServer(
		ctx, redisAddr, nil,
		// Schedule the scheduled_task task to be enqueued every 1 seconds.
		asyncer.NewTaskScheduler("@every 1s", TestTaskName),
		// ... add more scheduled tasks here ...
	))

	// Wait for the queue server to exit.
	if err := eg.Wait(); err != nil {
		panic(err)
	}
}

Contributing

Contributions to the asyncer package are welcome! Here are some ways you can contribute:

  • Reporting bugs
  • Covering code with tests
  • Suggesting enhancements
  • Submitting pull requests
  • Sharing the love by telling others about this project

License

This project is licensed under the MIT License - see the LICENSE file for details. This project contains some code from hibiken/asynq package, which is also licensed under the MIT License.

Documentation

Index

Constants

View Source
const (
	LogLevelDebug = "debug"
	LogLevelInfo  = "info"
	LogLevelWarn  = "warn"
	LogLevelError = "error"
	LogLevelFatal = "fatal"
)

Log levels string representation.

Variables

View Source
var (
	ErrFailedToParseRedisURI            = errors.New("failed to parse redis connection string")
	ErrMissedAsynqClient                = errors.New("missed asynq client")
	ErrFailedToCreateEnqueuerWithClient = errors.New("failed to create enqueuer with asynq client")
	ErrFailedToEnqueueTask              = errors.New("failed to enqueue task")
	ErrFailedToCloseEnqueuer            = errors.New("failed to close enqueuer")
	ErrFailedToStartQueueServer         = errors.New("failed to start queue server")
	ErrFailedToUnmarshalPayload         = errors.New("failed to unmarshal payload")
	ErrFailedToRunQueueServer           = errors.New("failed to run queue server")
	ErrFailedToScheduleTask             = errors.New("failed to schedule task")
	ErrFailedToStartSchedulerServer     = errors.New("failed to start scheduler server")
	ErrCronSpecIsEmpty                  = errors.New("cron spec is empty")
	ErrTaskNameIsEmpty                  = errors.New("task name is empty")
	ErrFailedToRunSchedulerServer       = errors.New("failed to run scheduler server")
)

Predefined errors.

Functions

func NewClient

func NewClient(redisConnStr string) (*asynq.Client, asynq.RedisConnOpt, error)

NewClient creates a new instance of the asynq client using the provided Redis connection string. It returns the created client, the Redis connection options, and any error encountered during the process.

func RunQueueServer added in v0.2.0

func RunQueueServer(ctx context.Context, redisConnStr string, log asynq.Logger, handlers ...TaskHandler) func() error

RunQueueServer starts the queue server and registers the provided task handlers. It returns a function that can be used to run server in a error group. E.g.:

eg, _ := errgroup.WithContext(context.Background())
eg.Go(asyncer.RunQueueServer(
	"redis://localhost:6379",
	logger,
	asyncer.HandlerFunc[PayloadStruct1]("task1", task1Handler),
	asyncer.HandlerFunc[PayloadStruct2]("task2", task2Handler),
))

func task1Handler(ctx context.Context, payload PayloadStruct1) error {
	// ... handle task here ...
}

func task2Handler(ctx context.Context, payload PayloadStruct2) error {
	// ... handle task here ...
}

The function panics if the redis connection string is invalid. The function returns an error if the server fails to start.

func RunSchedulerServer added in v0.2.0

func RunSchedulerServer(ctx context.Context, redisConnStr string, log asynq.Logger, schedulers ...TaskScheduler) func() error
eg.Go(asyncer.RunQueueServer(
	"redis://localhost:6379",
	logger,
	asyncer.ScheduledHandlerFunc("scheduled_task_1", scheduledTaskHandler),
))

func scheduledTaskHandler(ctx context.Context) error {
	// ...handle task here...
}

The function returns an error if the server fails to start. The function panics if the Redis connection string is invalid.

!!! Pay attention, that the scheduler just triggers the job, so you need to run queue server as well.

Types

type Enqueuer

type Enqueuer struct {
	// contains filtered or unexported fields
}

Enqueuer is a helper struct for enqueuing tasks. You can encapsulate this struct in your own struct to add queue methods. See pkg/worker/_example/enqueuer.go for an example.

func MustNewEnqueuer added in v0.2.0

func MustNewEnqueuer(redisConn string, opt ...EnqueuerOption) *Enqueuer

MustNewEnqueuer creates a new Enqueuer with the given Redis connection string and options. It panics if an error occurs during the creation of the Enqueuer.

func MustNewEnqueuerWithAsynqClient added in v0.2.0

func MustNewEnqueuerWithAsynqClient(client *asynq.Client, opt ...EnqueuerOption) *Enqueuer

MustNewEnqueuerWithAsynqClient creates a new Enqueuer with the given Asynq client and options. It panics if an error occurs during the creation of the Enqueuer.

func NewEnqueuer

func NewEnqueuer(redisConn string, opt ...EnqueuerOption) (*Enqueuer, error)

NewEnqueuer creates a new Enqueuer with the given Redis connection string and options. Default values are used if no option is provided. It returns a pointer to the Enqueuer and an error if there was a problem creating the Enqueuer.

func NewEnqueuerWithAsynqClient added in v0.2.0

func NewEnqueuerWithAsynqClient(client *asynq.Client, opt ...EnqueuerOption) (*Enqueuer, error)

NewEnqueuerWithAsynqClient creates a new Enqueuer with the given Asynq client and options. It returns a pointer to the Enqueuer and an error if the Asynq client is nil. The Enqueuer is responsible for enqueueing tasks to the Asynq server. Default values are used if no option is provided. Default values are:

  • queue name: "default"
  • task deadline: 1 minute
  • max retry: 3

func (*Enqueuer) Close added in v0.2.0

func (e *Enqueuer) Close() error

Close closes the Enqueuer and releases any resources associated with it. It returns an error if there was a problem closing the Enqueuer.

func (*Enqueuer) EnqueueTask

func (e *Enqueuer) EnqueueTask(ctx context.Context, taskName string, payload any) error

EnqueueTask enqueues a task to be processed asynchronously. It takes a context and a task as parameters. The task is enqueued with the specified queue name, deadline, maximum retry count, and uniqueness constraint. Returns an error if the task fails to enqueue.

type EnqueuerOption

type EnqueuerOption func(*Enqueuer)

EnqueuerOption is a function that configures an enqueuer.

func WithMaxRetry

func WithMaxRetry(n int) EnqueuerOption

WithMaxRetry configures the max retry. The max retry is the number of times the task will be retried if it fails.

func WithQueueNameEnq

func WithQueueNameEnq(name string) EnqueuerOption

WithQueueNameEnq configures the queue name for enqueuing. The queue name is the name of the queue where the task will be enqueued.

func WithTaskDeadline

func WithTaskDeadline(d time.Duration) EnqueuerOption

WithTaskDeadline configures the task deadline. The task deadline is the time limit for the task to be processed.

type QueueServer

type QueueServer struct {
	// contains filtered or unexported fields
}

QueueServer is a wrapper for asynq.Server.

func NewQueueServer

func NewQueueServer(redisConnOpt asynq.RedisConnOpt, opts ...QueueServerOption) *QueueServer

NewQueueServer creates a new instance of QueueServer. It takes a redis connection option and optional queue server options. The function returns a pointer to the created QueueServer.

func (*QueueServer) Run

func (srv *QueueServer) Run(handlers ...TaskHandler) func() error

Run starts the queue server and registers the provided task handlers. It returns a function that can be used to run server in a error group. E.g.:

eg, ctx := errgroup.WithContext(context.Background())
eg.Go(queueServer.Run(
	yourapp.NewTaskHandler1(),
	yourapp.NewTaskHandler2(),
))

The function returns an error if the server fails to start.

func (*QueueServer) Shutdown

func (srv *QueueServer) Shutdown()

Shutdown gracefully shuts down the queue server by waiting for all in-flight tasks to finish processing before shutdown.

type QueueServerOption

type QueueServerOption func(*asynq.Config)

QueueServerOption is a function that configures a QueueServer.

func WithQueueConcurrency

func WithQueueConcurrency(concurrency int) QueueServerOption

WithQueueConcurrency sets the queue concurrency.

func WithQueueLogLevel

func WithQueueLogLevel(level string) QueueServerOption

WithQueueLogLevel sets the queue log level.

func WithQueueLogger added in v0.1.1

func WithQueueLogger(logger asynq.Logger) QueueServerOption

WithQueueLogger sets the queue logger.

func WithQueueName

func WithQueueName(name string) QueueServerOption

WithQueueName sets the queue name.

func WithQueueShutdownTimeout

func WithQueueShutdownTimeout(timeout time.Duration) QueueServerOption

WithQueueShutdownTimeout sets the queue shutdown timeout.

func WithQueues

func WithQueues(queues map[string]int) QueueServerOption

WithQueues sets the queues. It panics if the sum of concurrency is not equal to the concurrency set in the config. If you want to increase the concurrency of a queue, you can use asyncer.WithQueueConcurrency before this option.

type SchedulerServer

type SchedulerServer struct {
	// contains filtered or unexported fields
}

SchedulerServer is a wrapper for asynq.Scheduler.

func NewSchedulerServer

func NewSchedulerServer(redisConnOpt asynq.RedisConnOpt, opts ...SchedulerServerOption) *SchedulerServer

NewSchedulerServer creates a new scheduler client and returns the server.

func (*SchedulerServer) Run

func (srv *SchedulerServer) Run() func() error

Run runs the scheduler with the provided handlers. It returns a function that can be used to run server in a error group. E.g.:

eg, ctx := errgroup.WithContext(context.Background())
eg.Go(schedulerServer.Run())

func (*SchedulerServer) ScheduleTask added in v0.3.0

func (srv *SchedulerServer) ScheduleTask(cronSpec, taskName string) error

ScheduleTask schedules a task based on the given cron specification and task name. It returns an error if the cron specification or task name is empty, or if there was an error registering the task.

func (*SchedulerServer) Shutdown

func (srv *SchedulerServer) Shutdown()

Shutdown gracefully shuts down the scheduler server by waiting for all pending tasks to be processed.

type SchedulerServerOption

type SchedulerServerOption func(*asynq.SchedulerOpts)

SchedulerServerOption is a function that configures a SchedulerServer.

func WithPostEnqueueFunc

func WithPostEnqueueFunc(fn func(info *asynq.TaskInfo, err error)) SchedulerServerOption

WithPostEnqueueFunc sets the scheduler post enqueue function.

func WithPreEnqueueFunc

func WithPreEnqueueFunc(fn func(task *asynq.Task, opts []asynq.Option)) SchedulerServerOption

WithPreEnqueueFunc sets the scheduler pre enqueue function.

func WithSchedulerLocation

func WithSchedulerLocation(timeZone string) SchedulerServerOption

WithSchedulerLocation sets the scheduler location.

func WithSchedulerLogLevel

func WithSchedulerLogLevel(level string) SchedulerServerOption

WithSchedulerLogLevel sets the scheduler log level.

func WithSchedulerLogger added in v0.2.0

func WithSchedulerLogger(logger asynq.Logger) SchedulerServerOption

WithSchedulerLogger sets the scheduler logger.

type TaskHandler added in v0.2.0

type TaskHandler interface {
	// TaskName returns the name of the task. It is used to register the task handler.
	TaskName() string
	// Handle handles the task. It takes a context and a payload as parameters.
	Handle(ctx context.Context, payload []byte) error
}

TaskHandler is an interface for task handlers. It is used to register task handlers in the queue server.

func HandlerFunc added in v0.2.0

func HandlerFunc[Payload any](name string, fn handlerFunc[Payload]) TaskHandler

HandlerFunc is a function that creates a TaskHandler for handling tasks of a specific payload type. It takes a name string and a handler function as parameters and returns a TaskHandler. The name parameter represents the name of the handler, while the fn parameter is the actual handler function. The TaskHandler returned by HandlerFunc is responsible for executing the handler function when a task of the specified payload type is received. The payload type is specified using the generic type parameter Payload.

func ScheduledHandlerFunc added in v0.2.0

func ScheduledHandlerFunc(name string, fn scheduledHandlerFunc) TaskHandler

ScheduledHandlerFunc is a function that creates a TaskHandler for a scheduled task. It takes a name string and a scheduledHandlerFunc as parameters and returns a TaskHandler. The name parameter specifies the name of the scheduled task, while the fn parameter is the function to be executed when the task is triggered. The returned TaskHandler can be used to register the scheduled task in the queue server.

type TaskScheduler added in v0.3.0

type TaskScheduler interface {
	// TaskName returns the name of the task. It is used to register the task handler.
	TaskName() string
	// Schedule returns the cron spec for the task.
	// For more information about cron spec, see https://pkg.go.dev/github.com/robfig/cron/v3#hdr-CRON_Expression_Format.
	Schedule() string
}

TaskScheduler is an interface for task schedulers. It is used to register task schedulers in the queue server.

func NewTaskScheduler added in v0.3.0

func NewTaskScheduler(cronSpec, name string) TaskScheduler

NewTaskScheduler creates a new task scheduler with the given cron spec and name.

Directories

Path Synopsis
example

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL