asynq

package module
v1.2.10 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 22, 2024 License: MIT Imports: 13 Imported by: 0

README

Asynq

Asynq是一个go语言实现的分布式任务队列和异步处理库,基于Redis。类似于Python的Celery。作者Ken Hibino,任职于Google。

特点

  • 保证至少执行一次任务
  • 任务写入Redis后可以持久化
  • 任务失败之后,会自动重试
  • worker崩溃自动恢复
  • 可是实现任务的优先级
  • 任务可以进行编排
  • 任务可以设定执行时间或者最长可执行的时间
  • 支持中间件
  • 可以使用 unique-option 来避免任务重复执行,实现唯一性
  • 支持 Redis Cluster 和 Redis Sentinels 以达成高可用性
  • 作者提供了Web UI & CLI Tool让大家查看任务的执行情况

安装命令行工具

go install github.com/hibiken/asynq/tools/asynq

Docker部署开发环境

Redis
docker pull bitnami/redis:latest
docker pull bitnami/redis-exporter:latest

docker run -itd \
    --name redis-test \
    -p 6379:6379 \
    -e ALLOW_EMPTY_PASSWORD=yes \
    bitnami/redis:latest
Asynqmon
docker pull hibiken/asynqmon:latest

docker run -d \
    --name asynq \
    -p 8080:8080 \
    hibiken/asynqmon:latest --redis-addr=host.docker.internal:6379

参考资料

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func LogDebug added in v1.1.0

func LogDebug(args ...interface{})

func LogDebugf added in v1.1.0

func LogDebugf(format string, args ...interface{})

func LogError added in v1.1.0

func LogError(args ...interface{})

func LogErrorf added in v1.1.0

func LogErrorf(format string, args ...interface{})

func LogFatal added in v1.1.0

func LogFatal(args ...interface{})

func LogFatalf added in v1.1.0

func LogFatalf(format string, args ...interface{})

func LogInfo added in v1.1.0

func LogInfo(args ...interface{})

func LogInfof added in v1.1.0

func LogInfof(format string, args ...interface{})

func LogWarn added in v1.1.0

func LogWarn(args ...interface{})

func LogWarnf added in v1.1.0

func LogWarnf(format string, args ...interface{})

func RegisterSubscriber added in v1.1.0

func RegisterSubscriber[T any](srv *Server, taskType string, handler func(string, *T) error) error

RegisterSubscriber register task subscriber

func RegisterSubscriberWithCtx added in v1.2.10

func RegisterSubscriberWithCtx[T any](srv *Server, taskType string,
	handler func(context.Context, string, *T) error) error

RegisterSubscriberWithCtx register task subscriber with context

Types

type Binder added in v1.1.0

type Binder func() any

type HandlerData added in v1.1.0

type HandlerData struct {
	Handler MessageHandler
	Binder  Binder
}

type MessageHandler added in v1.1.0

type MessageHandler func(string, MessagePayload) error

type MessageHandlerMap added in v1.1.0

type MessageHandlerMap map[string]HandlerData

type MessagePayload added in v1.1.0

type MessagePayload any

type Server

type Server struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

func NewServer

func NewServer(opts ...ServerOption) *Server

func (*Server) Endpoint

func (s *Server) Endpoint() (*url.URL, error)

func (*Server) Name

func (s *Server) Name() string

func (*Server) NewPeriodicTask

func (s *Server) NewPeriodicTask(cronSpec, typeName string, msg broker.Any, opts ...asynq.Option) (string, error)

NewPeriodicTask enqueue a new crontab task

func (*Server) NewTask

func (s *Server) NewTask(typeName string, msg broker.Any, opts ...asynq.Option) error

NewTask enqueue a new task

func (*Server) NewWaitResultTask added in v1.2.8

func (s *Server) NewWaitResultTask(typeName string, msg broker.Any, opts ...asynq.Option) error

NewWaitResultTask enqueue a new task and wait for the result

func (*Server) QueryPeriodicTaskEntryID added in v1.2.1

func (s *Server) QueryPeriodicTaskEntryID(typeName string) string

func (*Server) RegisterSubscriber added in v1.1.0

func (s *Server) RegisterSubscriber(taskType string, handler MessageHandler, binder Binder) error

RegisterSubscriber register task subscriber

func (*Server) RegisterSubscriberWithCtx added in v1.2.10

func (s *Server) RegisterSubscriberWithCtx(taskType string,
	handler func(context.Context, string, MessagePayload) error, binder Binder) error

RegisterSubscriberWithCtx register task subscriber with context

func (*Server) RemoveAllPeriodicTask added in v1.2.1

func (s *Server) RemoveAllPeriodicTask()

func (*Server) RemovePeriodicTask added in v1.1.0

func (s *Server) RemovePeriodicTask(typeName string) error

RemovePeriodicTask remove periodic task

func (*Server) Start

func (s *Server) Start(ctx context.Context) error

Start the server

func (*Server) Stop

func (s *Server) Stop(_ context.Context) error

Stop the server

type ServerOption

type ServerOption func(o *Server)

func WithAddress

func WithAddress(addr string) ServerOption

func WithCodec added in v1.1.0

func WithCodec(c string) ServerOption

func WithConcurrency

func WithConcurrency(concurrency int) ServerOption

func WithConfig added in v1.2.5

func WithConfig(c asynq.Config) ServerOption

func WithDelayedTaskCheckInterval

func WithDelayedTaskCheckInterval(tm time.Duration) ServerOption

func WithDialTimeout

func WithDialTimeout(timeout time.Duration) ServerOption

func WithEnableKeepAlive added in v1.1.0

func WithEnableKeepAlive(enable bool) ServerOption

WithEnableKeepAlive enable keep alive

func WithErrorHandler

func WithErrorHandler(fn asynq.ErrorHandler) ServerOption

func WithGroupGracePeriod

func WithGroupGracePeriod(tm time.Duration) ServerOption

func WithGroupMaxDelay

func WithGroupMaxDelay(tm time.Duration) ServerOption

func WithGroupMaxSize

func WithGroupMaxSize(sz int) ServerOption

func WithHealthCheckFunc

func WithHealthCheckFunc(fn func(error)) ServerOption

func WithHealthCheckInterval

func WithHealthCheckInterval(tm time.Duration) ServerOption

func WithIsFailure added in v1.2.5

func WithIsFailure(c asynq.Config) ServerOption

func WithLocation

func WithLocation(name string) ServerOption

func WithMiddleware

func WithMiddleware(m ...asynq.MiddlewareFunc) ServerOption

func WithQueues

func WithQueues(queues map[string]int) ServerOption

func WithReadTimeout

func WithReadTimeout(timeout time.Duration) ServerOption

func WithRedisAuth

func WithRedisAuth(userName, password string) ServerOption

func WithRedisDatabase

func WithRedisDatabase(db int) ServerOption

func WithRedisPassword added in v1.1.0

func WithRedisPassword(password string) ServerOption

func WithRedisPoolSize

func WithRedisPoolSize(size int) ServerOption

func WithRetryDelayFunc

func WithRetryDelayFunc(fn asynq.RetryDelayFunc) ServerOption

func WithStrictPriority

func WithStrictPriority(val bool) ServerOption

func WithTLSConfig

func WithTLSConfig(c *tls.Config) ServerOption

func WithWriteTimeout

func WithWriteTimeout(timeout time.Duration) ServerOption

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL