Documentation ¶
Index ¶
- Variables
- type DelayQueue
- type HandlerOptions
- func WithDropTaskHandler(dropTaskHandler func(taskID string)) HandlerOptions
- func WithFunctions(functions map[string]func()) HandlerOptions
- func WithLoadTaskHandler(loadTaskHandler func() []Task) HandlerOptions
- func WithLoadTickHandler(loadTickHandler func() (tick int64)) HandlerOptions
- func WithLoadTimeHandler(loadTimeHandler func() (timeSecond int64)) HandlerOptions
- func WithSaveTaskHandler(saveTaskHandler func(taskID, taskType string, timestamp, cycle, pos int64)) HandlerOptions
- func WithSaveTickHandler(saveTickHandler func(tick int64)) HandlerOptions
- func WithSaveTimeHandler(saveTimeHandler func(timeSecond int64)) HandlerOptions
- func WithWheelSize(wheelSize int64) HandlerOptions
- type Task
Constants ¶
This section is empty.
Variables ¶
View Source
var DefaultDropTaskHandler = func(taskID string) { loadDB() log.Printf("drop task: %s, error: %v", taskID, defaultDatabase.Delete(stringToByte(defaultTaskKey+taskID), &pebble.WriteOptions{Sync: true})) }
View Source
var DefaultLoadTaskHandler = func() []Task { loadDB() var tasks []Task iter := defaultDatabase.NewIter(&pebble.IterOptions{LowerBound: stringToByte(defaultTaskKey)}) for iter.First(); iter.Valid(); iter.Next() { task := Task{} if err := gob.NewDecoder(bytes.NewReader(iter.Value())).Decode(&task); err != nil { panic(err) } log.Printf("load task: %s, type: %s, cycle: %d, pos: %d, time: %s", task.ID, task.Type, task.Cycle, task.Pos, time.Unix(task.Timestamp, 0).Format("2006-01-02 15:04:05")) tasks = append(tasks, task) } _ = iter.Close() return tasks }
View Source
var DefaultLoadTickHandler = func() (tick int64) { loadDB() data, closer, err := defaultDatabase.Get(stringToByte(defaultTickKey)) if err != nil { log.Printf("load tick error: %v", err) return 0 } defer func() { _ = closer.Close() }() err = binary.Read(bytes.NewReader(data), binary.BigEndian, &tick) if err != nil { log.Printf("load tick error: %v", err) } return }
View Source
var DefaultLoadTimeHandler = func() (time int64) { loadDB() data, closer, err := defaultDatabase.Get(stringToByte(defaultTimeKey)) if err != nil { log.Printf("load time error: %v", err) return 0 } defer func() { _ = closer.Close() }() err = binary.Read(bytes.NewReader(data), binary.BigEndian, &time) if err != nil { log.Printf("load time error: %v", err) } return }
View Source
var DefaultSaveTaskHandler = func(taskID, taskType string, timestamp, cycle, pos int64) { loadDB() var task = Task{taskID, taskType, cycle, pos, timestamp, true, nil} var buf bytes.Buffer if err := gob.NewEncoder(&buf).Encode(task); err != nil { panic(err) } log.Printf("save task: %s, type: %s, cycle: %d, pos: %d, time: %s, error: %v", task.ID, task.Type, task.Cycle, task.Pos, time.Unix(task.Timestamp, 0).Format("2006-01-02 15:04:05"), defaultDatabase.Set(stringToByte(defaultTaskKey+taskID), buf.Bytes(), &pebble.WriteOptions{Sync: true})) }
View Source
var DefaultSaveTickHandler = func(tick int64) { loadDB() var buf bytes.Buffer if err := binary.Write(&buf, binary.BigEndian, tick); err != nil { panic(err) } if err := defaultDatabase.Set(stringToByte(defaultTickKey), buf.Bytes(), &pebble.WriteOptions{Sync: true}); err != nil { log.Printf("save tick: %d, error: %v", tick, err) } }
View Source
var DefaultSaveTimeHandler = func(time int64) { loadDB() var buf bytes.Buffer if err := binary.Write(&buf, binary.BigEndian, time); err != nil { panic(err) } if err := defaultDatabase.Set(stringToByte(defaultTimeKey), buf.Bytes(), &pebble.WriteOptions{Sync: true}); err != nil { log.Printf("save time: %d, error: %v", time, err) } }
Functions ¶
This section is empty.
Types ¶
type DelayQueue ¶
type DelayQueue struct {
// contains filtered or unexported fields
}
func New ¶
func New(options ...HandlerOptions) *DelayQueue
New 创建一个时间轮, 默认通过Pebble进行数据持久化, 可自定义持久化方式 wheelSize - 轮盘大小, 推荐60 * 60 * 24 (一天走一轮) options 自定义持久化函数
func (*DelayQueue) AfterFunc ¶
func (dq *DelayQueue) AfterFunc(delaySeconds int64, f func())
AfterFunc 推送一个任务 (非持久化) delaySeconds - 单位 (n秒后) f - 具体执行的函数
func (*DelayQueue) Bind ¶
func (dq *DelayQueue) Bind(taskType string, f func())
Bind 向模块注册一个任务类型绑定其要运行的func taskType - 任务类型 f - 具体执行某个方法
func (*DelayQueue) Push ¶
func (dq *DelayQueue) Push(delaySeconds int64, taskType string)
Push 推送一个任务 (持久化) delaySeconds - 单位 (n秒后) taskType - 任务类型 (与Bind对应, 没有则直接返回)
type HandlerOptions ¶
type HandlerOptions func(*DelayQueue)
func WithDropTaskHandler ¶
func WithDropTaskHandler(dropTaskHandler func(taskID string)) HandlerOptions
func WithFunctions ¶
func WithFunctions(functions map[string]func()) HandlerOptions
func WithLoadTaskHandler ¶
func WithLoadTaskHandler(loadTaskHandler func() []Task) HandlerOptions
func WithLoadTickHandler ¶
func WithLoadTickHandler(loadTickHandler func() (tick int64)) HandlerOptions
func WithLoadTimeHandler ¶
func WithLoadTimeHandler(loadTimeHandler func() (timeSecond int64)) HandlerOptions
func WithSaveTaskHandler ¶
func WithSaveTaskHandler(saveTaskHandler func(taskID, taskType string, timestamp, cycle, pos int64)) HandlerOptions
func WithSaveTickHandler ¶
func WithSaveTickHandler(saveTickHandler func(tick int64)) HandlerOptions
func WithSaveTimeHandler ¶
func WithSaveTimeHandler(saveTimeHandler func(timeSecond int64)) HandlerOptions
func WithWheelSize ¶
func WithWheelSize(wheelSize int64) HandlerOptions
Click to show internal directories.
Click to hide internal directories.