worker

package
v0.1.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 17, 2023 License: Apache-2.0 Imports: 5 Imported by: 0

README

worker

Helpers for running background tasks.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewClient added in v0.1.2

func NewClient(redisConnStr string) (*asynq.Client, asynq.RedisConnOpt, error)

NewClient creates a new asynq client from the given redis client instance.

Types

type Enqueuer

type Enqueuer struct {
	// contains filtered or unexported fields
}

Enqueuer is a helper struct for enqueuing tasks. You can encapsulate this struct in your own struct to add queue methods. See pkg/worker/_example/enqueuer.go for an example.

func NewEnqueuer

func NewEnqueuer(client *asynq.Client, opt ...EnqueuerOption) *Enqueuer

NewEnqueuer creates a new email enqueuer. This function accepts EnqueuerOption to configure the enqueuer. Default values are used if no option is provided. Default values are:

  • queue name: "default"
  • task deadline: 1 minute
  • max retry: 3

func (*Enqueuer) EnqueueTask

func (e *Enqueuer) EnqueueTask(ctx context.Context, task *asynq.Task) error

EnqueueTask enqueues a task to the queue. This function returns an error if the task could not be enqueued. The task is enqueued with the following options:

  • queue name: e.queueName
  • task deadline: e.taskDeadline
  • max retry: e.maxRetry
  • unique: e.taskDeadline

type EnqueuerOption

type EnqueuerOption func(*Enqueuer)

EnqueuerOption is a function that configures an enqueuer.

func WithMaxRetry

func WithMaxRetry(n int) EnqueuerOption

WithMaxRetry configures the max retry. The max retry is the number of times the task will be retried if it fails.

func WithQueueNameEnq

func WithQueueNameEnq(name string) EnqueuerOption

WithQueueNameEnq configures the queue name for enqueuing. The queue name is the name of the queue where the task will be enqueued.

func WithTaskDeadline

func WithTaskDeadline(d time.Duration) EnqueuerOption

WithTaskDeadline configures the task deadline. The task deadline is the time limit for the task to be processed.

type QueueServer

type QueueServer struct {
	*asynq.Server
}

QueueServer is a wrapper for asynq.Server.

func NewQueueServer

func NewQueueServer(redisConnOpt asynq.RedisConnOpt, log asynq.Logger, opts ...QueueServerOption) *QueueServer

NewQueueServer creates a new queue client and returns the server.

func (*QueueServer) Run

func (srv *QueueServer) Run(handlers ...taskHandler) func() error

Run creates a new queue client, registers task handlers and runs the server. It returns a function that can be used to run server in a error group. E.g.:

eg, ctx := errgroup.WithContext(context.Background())
eg.Go(queueServer.Run(
	NewTaskHandler1(),
	NewTaskHandler2(),
))

func (*QueueServer) Shutdown added in v0.1.2

func (srv *QueueServer) Shutdown()

Shutdown gracefully shuts down the queue server by waiting for all in-flight tasks to finish processing before shutdown.

type QueueServerOption

type QueueServerOption func(*asynq.Config)

QueueServerOption is a function that configures a QueueServer.

func WithQueueConcurrency

func WithQueueConcurrency(concurrency int) QueueServerOption

WithQueueConcurrency sets the queue concurrency.

func WithQueueLogLevel

func WithQueueLogLevel(level string) QueueServerOption

WithQueueLogLevel sets the queue log level.

func WithQueueName

func WithQueueName(name string) QueueServerOption

WithQueueName sets the queue name.

func WithQueueShutdownTimeout

func WithQueueShutdownTimeout(timeout time.Duration) QueueServerOption

WithQueueShutdownTimeout sets the queue shutdown timeout.

func WithQueues

func WithQueues(queues map[string]int) QueueServerOption

WithQueues sets the queues. It panics if the sum of concurrency is not equal to the concurrency set in the config. If you want to increase the concurrency of a queue, you can use worker.WithQueueConcurrency before this option.

type SchedulerServer

type SchedulerServer struct {
	*asynq.Scheduler
}

SchedulerServer is a wrapper for asynq.Scheduler.

func NewSchedulerServer

func NewSchedulerServer(redisConnOpt asynq.RedisConnOpt, log asynq.Logger, opts ...SchedulerServerOption) *SchedulerServer

NewSchedulerServer creates a new scheduler client and returns the server.

func (*SchedulerServer) Run

func (srv *SchedulerServer) Run(handlers ...schedulerHandler) func() error

Run scheduler server. It returns a function that can be used to run server in a error group. E.g.:

eg, ctx := errgroup.WithContext(context.Background())
eg.Go(schedulerServer.Run(
	NewSchedulerHandler1(),
	NewSchedulerHandler2(),
))

func (*SchedulerServer) Shutdown added in v0.1.2

func (srv *SchedulerServer) Shutdown()

Shutdown gracefully shuts down the scheduler server by waiting for all pending tasks to be processed.

type SchedulerServerOption

type SchedulerServerOption func(*asynq.SchedulerOpts)

SchedulerServerOption is a function that configures a SchedulerServer.

func WithPostEnqueueFunc

func WithPostEnqueueFunc(fn func(info *asynq.TaskInfo, err error)) SchedulerServerOption

WithPostEnqueueFunc sets the scheduler post enqueue function.

func WithPreEnqueueFunc

func WithPreEnqueueFunc(fn func(task *asynq.Task, opts []asynq.Option)) SchedulerServerOption

WithPreEnqueueFunc sets the scheduler pre enqueue function.

func WithSchedulerLocation

func WithSchedulerLocation(timeZone string) SchedulerServerOption

WithSchedulerLocation sets the scheduler location.

func WithSchedulerLogLevel

func WithSchedulerLogLevel(level string) SchedulerServerOption

WithSchedulerLogLevel sets the scheduler log level.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL