delayqueue

package module
v0.0.0-...-03004a7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 21, 2021 License: MIT Imports: 14 Imported by: 0

README

Delay Queue

单层时间轮 基于Go实现, 主要能力:
  1. Push(delaySecond int64, taskType string) *Task
  2. AfterFunc(delaySecond int64, f func())
具体介绍:
  • 支持持久化的单层时间轮
  • 通过 lock-free 栈链存储任务
  • 通过 pebble 进行数据持久化

如何使用

下载
go get github.com/invxp/delayqueue
简单示例:
package main

import (
	"github.com/invxp/delayqueue"
	"log"
	"time"
)

func main() {
	//新建一个DelayQueue
	dq := delayqueue.New()

	delaySeconds := int64(5)
	
	//5秒后执行
	dq.AfterFunc(delaySeconds, func() { log.Println("After 5 second function") })

	//开启任务(会阻塞, 创建一个协程)
	go func() {
		dq.Run()
	}()

	//等待10秒后退出
	time.Sleep(time.Second * 10)

	log.Println("close delay queue error", dq.Close())
}

测试用例可以这样做:
$ go test -v -race -run @XXXXX(具体方法名)
PASS / FAILED
或测试全部用例:
$ go test -v -race

TODO

  1. 优化过期的Task处理方式(多层时间轮, 感觉没啥必要)
  2. 支持分布式数据同步(Raft, CP, 放弃性能保证一致性)

Documentation

Index

Constants

This section is empty.

Variables

View Source
var DefaultDropTaskHandler = func(taskID string) {
	loadDB()
	log.Printf("drop task: %s, error: %v", taskID, defaultDatabase.Delete(stringToByte(defaultTaskKey+taskID), &pebble.WriteOptions{Sync: true}))
}
View Source
var DefaultLoadTaskHandler = func() []Task {
	loadDB()
	var tasks []Task
	iter := defaultDatabase.NewIter(&pebble.IterOptions{LowerBound: stringToByte(defaultTaskKey)})
	for iter.First(); iter.Valid(); iter.Next() {
		task := Task{}
		if err := gob.NewDecoder(bytes.NewReader(iter.Value())).Decode(&task); err != nil {
			panic(err)
		}
		log.Printf("load task: %s, type: %s, cycle: %d, pos: %d, time: %s", task.ID, task.Type, task.Cycle, task.Pos, time.Unix(task.Timestamp, 0).Format("2006-01-02 15:04:05"))
		tasks = append(tasks, task)
	}
	_ = iter.Close()
	return tasks
}
View Source
var DefaultLoadTickHandler = func() (tick int64) {
	loadDB()
	data, closer, err := defaultDatabase.Get(stringToByte(defaultTickKey))
	if err != nil {
		log.Printf("load tick error: %v", err)
		return 0
	}
	defer func() {
		_ = closer.Close()
	}()
	err = binary.Read(bytes.NewReader(data), binary.BigEndian, &tick)
	if err != nil {
		log.Printf("load tick error: %v", err)
	}
	return
}
View Source
var DefaultLoadTimeHandler = func() (time int64) {
	loadDB()
	data, closer, err := defaultDatabase.Get(stringToByte(defaultTimeKey))
	if err != nil {
		log.Printf("load time error: %v", err)
		return 0
	}
	defer func() {
		_ = closer.Close()
	}()
	err = binary.Read(bytes.NewReader(data), binary.BigEndian, &time)
	if err != nil {
		log.Printf("load time error: %v", err)
	}
	return
}
View Source
var DefaultSaveTaskHandler = func(taskID, taskType string, timestamp, cycle, pos int64) {
	loadDB()
	var task = Task{taskID, taskType, cycle, pos, timestamp, true, nil}
	var buf bytes.Buffer
	if err := gob.NewEncoder(&buf).Encode(task); err != nil {
		panic(err)
	}

	log.Printf("save task: %s, type: %s, cycle: %d, pos: %d, time: %s, error: %v", task.ID, task.Type, task.Cycle, task.Pos, time.Unix(task.Timestamp, 0).Format("2006-01-02 15:04:05"), defaultDatabase.Set(stringToByte(defaultTaskKey+taskID), buf.Bytes(), &pebble.WriteOptions{Sync: true}))
}
View Source
var DefaultSaveTickHandler = func(tick int64) {
	loadDB()
	var buf bytes.Buffer
	if err := binary.Write(&buf, binary.BigEndian, tick); err != nil {
		panic(err)
	}
	if err := defaultDatabase.Set(stringToByte(defaultTickKey), buf.Bytes(), &pebble.WriteOptions{Sync: true}); err != nil {
		log.Printf("save tick: %d, error: %v", tick, err)
	}
}
View Source
var DefaultSaveTimeHandler = func(time int64) {
	loadDB()
	var buf bytes.Buffer
	if err := binary.Write(&buf, binary.BigEndian, time); err != nil {
		panic(err)
	}
	if err := defaultDatabase.Set(stringToByte(defaultTimeKey), buf.Bytes(), &pebble.WriteOptions{Sync: true}); err != nil {
		log.Printf("save time: %d, error: %v", time, err)
	}
}

Functions

This section is empty.

Types

type DelayQueue

type DelayQueue struct {
	// contains filtered or unexported fields
}

func New

func New(options ...HandlerOptions) *DelayQueue

New 创建一个时间轮, 默认通过Pebble进行数据持久化, 可自定义持久化方式 wheelSize - 轮盘大小, 推荐60 * 60 * 24 (一天走一轮) options 自定义持久化函数

func (*DelayQueue) AfterFunc

func (dq *DelayQueue) AfterFunc(delaySeconds int64, f func())

AfterFunc 推送一个任务 (非持久化) delaySeconds - 单位 (n秒后) f - 具体执行的函数

func (*DelayQueue) Bind

func (dq *DelayQueue) Bind(taskType string, f func())

Bind 向模块注册一个任务类型绑定其要运行的func taskType - 任务类型 f - 具体执行某个方法

func (*DelayQueue) Close

func (dq *DelayQueue) Close() error

Close 终止

func (*DelayQueue) Push

func (dq *DelayQueue) Push(delaySeconds int64, taskType string)

Push 推送一个任务 (持久化) delaySeconds - 单位 (n秒后) taskType - 任务类型 (与Bind对应, 没有则直接返回)

func (*DelayQueue) Run

func (dq *DelayQueue) Run()

Run 执行任务 (阻塞)

type HandlerOptions

type HandlerOptions func(*DelayQueue)

func WithDropTaskHandler

func WithDropTaskHandler(dropTaskHandler func(taskID string)) HandlerOptions

func WithFunctions

func WithFunctions(functions map[string]func()) HandlerOptions

func WithLoadTaskHandler

func WithLoadTaskHandler(loadTaskHandler func() []Task) HandlerOptions

func WithLoadTickHandler

func WithLoadTickHandler(loadTickHandler func() (tick int64)) HandlerOptions

func WithLoadTimeHandler

func WithLoadTimeHandler(loadTimeHandler func() (timeSecond int64)) HandlerOptions

func WithSaveTaskHandler

func WithSaveTaskHandler(saveTaskHandler func(taskID, taskType string, timestamp, cycle, pos int64)) HandlerOptions

func WithSaveTickHandler

func WithSaveTickHandler(saveTickHandler func(tick int64)) HandlerOptions

func WithSaveTimeHandler

func WithSaveTimeHandler(saveTimeHandler func(timeSecond int64)) HandlerOptions

func WithWheelSize

func WithWheelSize(wheelSize int64) HandlerOptions

type Task

type Task struct {
	ID        string //任务ID便于持久化
	Type      string //任务类型
	Cycle     int64  //任务在时间轮上的循环次数,等于0时,执行该任务
	Pos       int64  //任务在时间轮上的位置
	Timestamp int64  //任务首次加入时间
	// contains filtered or unexported fields
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL