delayed

package
v1.20.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 11, 2023 License: MIT Imports: 22 Imported by: 0

Documentation

Index

Constants

View Source
const (
	ReadyQueueKey = "delayed:ready:topic.%s" // 就绪队列的 key
	DelayQueueKey = "delayed:delay:topic.%s" // 延迟SortedSet的 key
	DelayJob      = "delayed:job:topic.%s"   // 延迟任务详情存放的 key
)

Variables

View Source
var (
	ErrGetJobIdRetries = fmt.Errorf("获取 JobId 重试次数超过 5 次, 请检查 Redis 连接是否正常")
)

Functions

func DelayTime

func DelayTime(delay int, delayUnit DelayUnit) int64

DelayTime 计算延迟时间

func GetDelayJobKey

func GetDelayJobKey(topic string) string

GetDelayJobKey 获取延迟任务详情存放的 key

func GetDelayQueueKey

func GetDelayQueueKey(topic string) string

GetDelayQueueKey 获取延迟SortedSet的 key

func GetReadyQueueKey

func GetReadyQueueKey(topic string) string

GetReadyQueueKey 获取就绪队列的 key

Types

type Consumer

type Consumer struct {
	sync.Mutex // 互斥锁
	// contains filtered or unexported fields
}

Consumer 消费者

func NewConsumer

func NewConsumer(opts ...ConsumerOption) *Consumer

func (*Consumer) Cancel

func (c *Consumer) Cancel(topic, jobId string) error

func (*Consumer) Close

func (c *Consumer) Close() error

Close 关闭队列

func (*Consumer) Provide

func (c *Consumer) Provide(ctx context.Context) interface{}

func (*Consumer) Register

func (c *Consumer) Register(worker *Worker)

func (*Consumer) Run

func (c *Consumer) Run() error

Run 启动队列

type ConsumerOption

type ConsumerOption func(*ConsumerOptions)

func WithClosedWait

func WithClosedWait(closedWait int) ConsumerOption

func WithMaxRetries

func WithMaxRetries(maxRetries int) ConsumerOption

func WithSleep

func WithSleep(sleep int) ConsumerOption

func WithWorkerNum

func WithWorkerNum(workerNum int) ConsumerOption

type ConsumerOptions

type ConsumerOptions struct {
	Sleep      int // 当就绪队列为空时, 休眠时间, 单位为秒
	ClosedWait int // 关闭队列时, 等待队列中的任务执行完成的时间, 单位为秒
	MaxRetries int // 任务失败后的重试次数
	WorkerNum  int // 任务处理器的数量
}

ConsumerOptions 代表消费者的配置选项

type DelayUnit

type DelayUnit uint8

DelayUnit 延时时间计算类型

const (
	DelayUnitSecond DelayUnit = iota + 1 // 延迟计算单位:秒
	DelayUnitMinute                      // 延迟计算单位:分
	DelayUnitHour                        // 延迟计算单位:时
	DelayUnitDay                         // 延迟计算单位:天
)

type Delayed

type Delayed struct {
	// contains filtered or unexported fields
}

func (*Delayed) Cancel

func (d *Delayed) Cancel(topic, jobId string) error

Cancel 通过指定 topic 和 jobId 取消延迟任务

func (*Delayed) Close

func (d *Delayed) Close() error

func (*Delayed) Dispatch

func (d *Delayed) Dispatch(topic string, payload []byte) (string, error)

Dispatch 用于添加任务

func (*Delayed) DispatchDelay

func (d *Delayed) DispatchDelay(topic string, payload []byte, delay int, delayUnit DelayUnit) (string, error)

DispatchDelay 用于添加延迟任务

func (*Delayed) Provide

func (d *Delayed) Provide(ctx context.Context) interface{}

func (*Delayed) Register

func (d *Delayed) Register(worker *Worker)

func (*Delayed) Run

func (d *Delayed) Run() error

type IConsumer

type IConsumer interface {
	Run() error                  // 启动消费者
	JoinWorker(worker *Worker)   // 加入一个 worker
	Close(context.Context) error // 关闭消费者
}

type IProduct

type IProduct interface {
	Dispatch(topic string, payload interface{}) (string, error)                                      // 添加任务
	DispatchDelay(topic string, payload interface{}, delay int, delayUnit DelayUnit) (string, error) // 添加延迟任务
}

type Job

type Job struct {
	Id        string `msgpack:"1"` // 任务ID
	Topic     string `msgpack:"2"` // 任务主题
	Delay     int64  `msgpack:"3"` // 任务延迟执行的时间戳
	Payload   []byte `msgpack:"4"` // 任务内容
	Timestamp int64  `msgpack:"5"` // 任务投递时间
	Retries   int    `msgpack:"6"` // 任务重试次数
}

Job 代表一个定时任务

type JobHandler

type JobHandler interface {
	Topic() string                          // 获取 Job 的 topic
	OnHandle(context.Context, []byte) error // 处理 Job
	OnError(context.Context, []byte) error  // 处理 Job 处理失败的情况
}

type Producer

type Producer struct {
	// contains filtered or unexported fields
}

func (*Producer) Dispatch

func (p *Producer) Dispatch(topic string, payload []byte) (string, error)

Dispatch 向就绪队列投递任务

func (*Producer) DispatchDelay

func (p *Producer) DispatchDelay(topic string, payload []byte, delay int, delayUnit DelayUnit) (string, error)

DispatchDelay 向延迟队列投递任务 delay 为延迟时间, delayUnit 为延迟时间单位

func (*Producer) Provide

func (p *Producer) Provide(context.Context) interface{}

type Queue

type Queue interface {
	Run() error                           // 启动队列
	AddTopic(string) error                // 添加一个 topic
	Push(*Job) error                      // 添加 Job 到队列中, 投递的方向为队列尾部
	Pop(string) (*Job, error)             // 从队列中获取 Job
	PushDelay(*Job) error                 // 将 Job 添加到延迟队列中
	Cancel(string, string) error          // 通过指定 topic 和 jobId 取消延迟任务
	FetchReadyJob(string) ([]*Job, error) // 获取队列中已就绪的 Job
	FetchDelayJob(string) ([]*Job, error) // 获取延迟队列中的所有 Job
	ReadyLen(string) (int, error)         // 获取队列中已就绪的 Job 数量
	DelayLen(string) (int, error)         // 获取延迟队列中的 Job 数量
	Exists(string, string) (bool, error)  // 判断 JobID 是否存在
	Close() error                         // 关闭队列
}

Queue 代表一个队列, 用于存储 Job, 并提供 Job 的增删改查等操作, 以及 Job 的投递和消费, 以及 Job 的迁移, 以及队列的关闭等操作 延迟 Job 会被存储在延迟队列中, 就绪 Job 会被存储在就绪队列中, 就绪 Job 会被消费, 延迟 Job 在延迟时间到达后会被迁移到就绪队列中

func NewRedisQueue

func NewRedisQueue() Queue

type RedisQueue

type RedisQueue struct {
	sync.RWMutex // 读写锁
	// contains filtered or unexported fields
}

func (*RedisQueue) AddTopic

func (rq *RedisQueue) AddTopic(topic string) error

func (*RedisQueue) Cancel

func (rq *RedisQueue) Cancel(topic, jobId string) error

Cancel 取消延迟任务

func (*RedisQueue) Close

func (rq *RedisQueue) Close() error

func (*RedisQueue) DelayLen

func (rq *RedisQueue) DelayLen(topic string) (int, error)

DelayLen 获取延迟队列长度

func (*RedisQueue) Exists

func (rq *RedisQueue) Exists(topic, jobId string) (bool, error)

Exists 判断任务是否存在

func (*RedisQueue) FetchDelayJob

func (rq *RedisQueue) FetchDelayJob(topic string) ([]*Job, error)

FetchDelayJob 获取延迟队列中的任务

func (*RedisQueue) FetchReadyJob

func (rq *RedisQueue) FetchReadyJob(topic string) ([]*Job, error)

FetchReadyJob 获取就绪队列中的任务

func (*RedisQueue) Pop

func (rq *RedisQueue) Pop(topic string) (*Job, error)

func (*RedisQueue) Provide

func (rq *RedisQueue) Provide(context.Context) interface{}

func (*RedisQueue) Push

func (rq *RedisQueue) Push(job *Job) error

Push 将任务推送到就绪队列

func (*RedisQueue) PushDelay

func (rq *RedisQueue) PushDelay(job *Job) error

func (*RedisQueue) ReadyLen

func (rq *RedisQueue) ReadyLen(topic string) (int, error)

ReadyLen 获取就绪队列长度

func (*RedisQueue) Run

func (rq *RedisQueue) Run() error

type Worker

type Worker struct {
	Topic       string              // 任务 topic
	Handler     JobHandler          // 任务处理器
	WorkerCount int                 // 并行任务数
	WorkerPool  *semaphore.Weighted // 通过信号量控制并发协程数
}

Worker 代表一个 topic 的 worker

func NewWorker

func NewWorker(topic string, handler JobHandler, workerCount int) *Worker

NewWorker 用于创建一个 Worker

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL