timer

package
v0.0.0-...-66c0c6b Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 21, 2024 License: MIT Imports: 11 Imported by: 0

README

Why Timing Wheels

时间轮动态图

常规(传统)的任务队列调度,需要不断的轮询任务队列,然后进行调度。这种方式会存在两个问题:

  • 轮询的开销
  • 调度的精度

尽可能不损失调度精度的前提下,减少调度的开销。 比如使用全局唯一的时间轮来进行批量化调度,充分利用协程/多线程资源,避免存在多个不同的调度中心,导致资源浪费。 主要的表现是,时间轮可以大批量地管理各种延时任务和周期任务。

Timing Wheels Implementation vs Traditional Implementation

Traditional Implementation

  • JDK Timer + DelayQueue(小根堆-优先级队列)
  • Java ScheduledThreadPoolExecutor(DelayQueue + thread pool)

Timing Wheels Implementation

hash table + linked list + ticker(心跳信号、节拍器、555 信号发生器)

hash table + linked list + DelayQueue + channel + timer (轮询 DelayQueue, 在第一个快到期元素的差值大于 0 的时候,进行休眠,避免空转)

  • Kafka

单层级时间轮

这种实现可能会存在调度时间不精确的现象。

多层级时间轮

通过对时间进行层级划分的方式来提高精确度。

实现细节讨论

slot/bucket 如何划分时间范围?

按照 kafka 的实现,timing wheel 的时间包括 startMs, tickMs, wheelSize, interval 和 currentTimeMs,到 slot 上就是 expirationMs。

但是 kafka 的时间是靠 DelayQueue 在 timeoutMs 时间内把所有过期的 buckets 取出来,再取出内部的 tasks 进行调度和 reinsert 操作。

在 golang 中,有 ticker 作为高精度的时间信号发生器来驱动时间轮的转动,所以不需要 DelayQueue 来进行过期的 bucket 的取出和 reinsert 操作。

ticker 的精度就是至关重要了,如果精度不够,那么就会导致调度时间不准确,比如跨度大,会导致在 slot 里面比较远离 expirationMs 的任务被延后调度。

例子: 假设 tickMs = 10ms,它相当于 kafka 中的 interval,那么 slot 的划分就是 [0, 10), [10, 20), [20, 30) 这样的。

如果 task 被设置的任务间隔是 2ms,那么它第一次应该是在 [0, 10) 的 slot 里面,但是触发调度的时间点却是在 10ms,这样就导致了调度时间不准确。

虽然可以通过限制提交进入时间轮的最小时间间隔来避免这个问题,但这样会导致时间轮可面向的场景变少,比如无法支持 1ms 的时间间隔。

slot/bucket 为什么要使用多层级?

多层级的时间轮是为了方便存放更多的不同时间跨度的任务且减少 slot 的数量,跨度大的任务放在时间间隔大的时间轮上面,这操作是逐级向上找的,直到找到合适的时间轮。

时间轮的转动

时间在向前流逝,就会让最底层时间轮的 slot 不断向前移动,同时也在不断更新所有时间轮的基准时间点,形成时间和时间轮同时向前流逝的效果。

执行任务的时候,实际上只有最底层的 slot 里面的任务才会被执行,其他层级的 slot 里面的任务只是在时间轮转动的时候,把任务移动到下一层级合适时间间隔的 slot 里面。

为什么只有最底层的 slot 里面的任务才会被执行?

因为最底层的 slot 里面的任务的时间间隔是最小的,所以它的时间间隔是最精确的,其他层级的 slot 里面的任务的时间间隔都是最底层的 slot 里面的任务的时间间隔的整数倍。

为什么每一层的时间轮可以不断地承载任务?

使用(逻辑上/物理上)的环境数据结构,可以在 slot 移动过程中把已经执行的任务的 slot 变更为下一个周期可以使用的 slot(复用)。

这里有一个闭环逻辑,底层时间轮的实际执行时间是要加上上层时间轮的周期的。

时间轮就是靠空间换取一定的时间,查的快(时间短)就是要付出一定空间上的代价。

移动

slot/bucket 如何实现多层级?

任务的提交涉及到时间跨度大的问题,比如秒级,分钟级,小时级,天级,月级,年级,这样的时间跨度,如果都放在一个时间轮里面,那么就会导致时间轮的大小很大, 比如 365 * 24 * 60 * 60 * 1000 = 31536000000,这样的时间轮就会有 31536000000 个 slot,这样的时间轮是不现实的。

所以需要对时间轮进行分层,比如秒级的时间轮,分钟级的时间轮,小时级的时间轮,天级的时间轮,月级的时间轮,年级的时间轮。

但是这样又会导致时间轮的每一次层的 slot 数量不一致。

但是实际上,设置相同的即可,因为相同的数量,在时间跨度小的任务非常多的时间轮上,可以减少 slot 的数量,虽然会导致在时间跨度大的时间轮上,要增加 slot 的数量。

动态时间轮升级和降级

task 分类

  • 一次性延迟执行任务。 这种必然有一个距离当前时间的延迟间隔,假设为 delayMs,那么它的过期时间就是 currentTimeMs + delayMs
  • 小周期性执行任务。 这种任务有一个固定的周期,假设为 periodMs,那么它的过期时间就是 currentTimeMs + periodMs
  • 大跨度的周期性执行任务。 这种需要计算它的下一次过期时间 expiredMs,因为大跨度的执行,需要考虑年,月,日的变化(极端的时候 还需要考虑到秒级别时间的补偿之类)
  • 周期间隔不连续的,而且次数有限制的任务。 这种任务的周期间隔是不连续的,比如 1s,2s,3s,4s,5s,10s,20s,30s,40s,50s,60s
  • 周期间隔不连续,但是循环执行的任务。 这种任务的周期间隔是不连续的,但是是循环执行的,比如 1s,2s,3s,4s,5s,10s,1s, 2s, 3s, 4s, 5s, 10s, 1s, 2s, 3s, 4s, 5s, 10s ...

参数说明

  • tickMs u ms
  • slotSize n
  • startMs 基准时间,时间轮被初始化的时间
  • interval = tickMs * wheelSize
  • slot (circle array,使用取余模拟循环效果)
  • task list (linked list,每个 slot 下挂载着对应的任务)
  • tasks map 记录任务信息,方便取消任务;全局唯一
  • delay queue 用于存放过期的任务,方便调度器调度;全局唯一;主要是存放 slot;不是任务过期,而是 slot 过期要清空或者重新分配它里面的任务

slot 的切分

令 u 为时间单元,一个大小为 n 的时间轮有 n 个桶,能够持有 n * u 个时间间隔的定时任务。 那么每个 slot 都持有进入相应时间范围的定时任务。

  • 第一个 slot 持有 [0, u) 范围的任务
  • 第二个 slot 持有 [u, 2u) 范围的任务
  • 第 n 个 slot 持有 [u * (n - 1), u * n) 范围的任务 随着时间单元 u 的数量持续增加,也就是 slot 不断地被移动,slot 移动之后其中所有的定时任务都会过期。 由于任务已经过期,此时就不会有任务被添加大当前的 slot 中,调度器应该立刻运行过期的任务。 因为空 slot 在下一轮是可用的,所以如果当前的 slot 对应时间 t,那么它会在 tick 后变成 [t + u * n, t + (n + 1) * u) 的 slot。 简单地说,当前 slot 中的任务会移动到下一个间隔了 t ms 的 slot 中。

slot 切分的间隔需要小一点,比如 2ms,5ms 这样的,防止间隔跨度大,导致调度时间偏移(不准确)

待续

使用 SkipList + hash table 实现时间轮的 slot/bucket 的查找 (delay queue)

是否能使用 RedBlackTree 实现时间轮的 slot/bucket 的查找

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrTimingWheelStopped                   = errors.New("timing wheel stopped")
	ErrTimingWheelTaskNotFound              = errors.New("task not found")
	ErrTimingWheelTaskEmptyJobID            = errors.New("empty job id in task")
	ErrTimingWheelEmptyJob                  = errors.New("empty job in task")
	ErrTimingWheelTaskIsExpired             = errors.New("task is expired")
	ErrTimingWheelTaskUnableToBeAddedToSlot = errors.New("task unable to be added to slot")
	ErrTimingWheelTaskUnableToBeRemoved     = errors.New("task unable to be removed")
	ErrTimingWheelTaskTooShortExpiration    = errors.New("task expiration is too short")
	ErrTimingWheelUnknownScheduler          = errors.New("unknown schedule")
	ErrTimingWheelTaskCancelled             = errors.New("task cancelled")
)

Functions

func MillisToUTCTime

func MillisToUTCTime(millis int64) time.Time

Types

type Job

type Job func(ctx context.Context, metadata JobMetadata)

Job is the function that will be executed by the timing wheel

type JobID

type JobID string

JobID is the unique identifier of a job

type JobMetadata

type JobMetadata interface {
	// GetJobID returns the jobID of the job, unique identifier.
	GetJobID() JobID
	// GetExpiredMs returns the expirationMs of the job.
	GetExpiredMs() int64
	// GetRestLoopCount returns the rest loop count.
	GetRestLoopCount() int64
	// GetJobType returns the job type.
	GetJobType() JobType
}

JobMetadata describes the metadata of a job Each slot in the timing wheel is a linked list of jobs

type JobType

type JobType uint8
const (
	OnceJob JobType = iota
	RepeatedJob
)

func (JobType) String

func (t JobType) String() string

type ScheduledTask

type ScheduledTask interface {
	Task
	UpdateNextScheduledMs()
}

ScheduledTask is the interface that wraps the repeat Job

func NewRepeatTask

func NewRepeatTask(
	ctx context.Context,
	jobID JobID,
	beginMs int64,
	scheduler Scheduler,
	job Job,
) ScheduledTask

type Scheduler

type Scheduler interface {

	// GetRestLoopCount returns the rest loop count.
	// If the rest loop count is -1, it means that the task will run forever unless cancel manually.
	GetRestLoopCount() int64
	// contains filtered or unexported methods
}

func NewFiniteScheduler

func NewFiniteScheduler(intervals ...time.Duration) Scheduler

func NewInfiniteScheduler

func NewInfiniteScheduler(intervals ...time.Duration) Scheduler

type Task

type Task interface {
	JobMetadata
	GetJobMetadata() JobMetadata
	// GetJob returns the job function.
	GetJob() Job
	// GetSlot returns the slot of the job.
	GetSlot() TimingWheelSlot

	// GetPreviousSlotMetadata returns the previous slot metadata of the job.
	GetPreviousSlotMetadata() TimingWheelSlotMetadata

	Cancel() bool
	Cancelled() bool
	// contains filtered or unexported methods
}

Task is the interface that wraps the Job

func NewOnceTask

func NewOnceTask(
	ctx context.Context,
	jobID JobID,
	expiredMs int64,
	job Job,
) Task

type TaskHandler

type TaskHandler func(Task) // Core function

TaskHandler is a function that reinserts a task into the timing wheel. It means that the task should be executed periodically or repeatedly for a certain times. Reinsert will add current task to next slot, higher level slot (overflow wheel) or the same level slot (current wheel) depending on the expirationMs of the task. When the task is reinserted, the expirationMs of the task should be updated.

  1. Check if the task is cancelled. If so, stop reinserting.
  2. Check if the task's loop count is greater than 0. If so, decrease the loop count and reinsert.
  3. Check if the task's loop count is -1 (run forever unless cancel manually). If so, reinsert and update the expirationMs.

type TimingWheel

type TimingWheel interface {
	TimingWheelCommonMetadata
	GetInterval() int64
	GetCurrentTimeMs() int64
}

TimingWheel slots is private, they should be provided by the implementation

type TimingWheelCommonMetadata

type TimingWheelCommonMetadata interface {
	// GetTickMs returns the baseline tick ms (interval) of the timing wheel.
	GetTickMs() int64
	// GetSlotSize returns the slot size of the timing wheel.
	GetSlotSize() int64
	// GetStartMs returns the start ms of the timing wheel.
	GetStartMs() int64
}

type TimingWheelOptions

type TimingWheelOptions func(tw *timingWheel)

func WithTimingWheelSlotSize

func WithTimingWheelSlotSize(slotSize int64) TimingWheelOptions

func WithTimingWheelTickMs

func WithTimingWheelTickMs(basicTickMs time.Duration) TimingWheelOptions

type TimingWheelSlot

type TimingWheelSlot interface {
	TimingWheelSlotMetadata
	// GetMetadata returns the metadata of the slot.
	GetMetadata() TimingWheelSlotMetadata
	// AddTask adds a task to the slot.
	AddTask(Task)
	// RemoveTask removes a task from the slot.
	RemoveTask(Task) bool
	// Flush flushes all tasks in the slot generally,
	// but it should be called in a loop.
	Flush(TaskHandler)
}

TimingWheelSlot is the interface that wraps the slot, in kafka, it is called bucket.

func NewXSlot

func NewXSlot() TimingWheelSlot

type TimingWheelSlotMetadata

type TimingWheelSlotMetadata interface {
	// GetExpirationMs returns the expirationMs of the slot.
	GetExpirationMs() int64

	// GetSlotID returns the slotID of the slot, easy for debugging.
	GetSlotID() int64

	// GetLevel returns the level of the slot, easy for debugging.
	GetLevel() int64
	// contains filtered or unexported methods
}

type TimingWheels

type TimingWheels interface {
	TimingWheelCommonMetadata
	// GetTaskCounter returns the current task count of the timing wheel.
	GetTaskCounter() int64
	// AddTask adds a task to the timing wheels.
	AddTask(task Task) error
	// CancelTask cancels a task by jobID.
	CancelTask(jobID JobID) error
	// Shutdown stops the timing wheels
	Shutdown()
	// AfterFunc schedules a function to run after the duration delayMs.
	AfterFunc(delayMs time.Duration, fn Job) (Task, error)
	// ScheduleFunc schedules a function to run at a certain time generated by the schedule.
	ScheduleFunc(schedFn func() Scheduler, fn Job) (Task, error)
}

func NewTimingWheels

func NewTimingWheels(ctx context.Context, startMs int64, opts ...TimingWheelOptions) TimingWheels

NewTimingWheels creates a new timing wheel. @param startMs the start time in milliseconds, example value time.Now().UnixMilli().

Same as the kafka, Time.SYSTEM.hiResClockMs() is used.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL