sche

package module
v0.0.0-...-e490bf4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 28, 2022 License: MIT Imports: 10 Imported by: 1

README

go-sche

LICENSE codecov GitHub Stars

背景

任务调度是很常见的需求,一般的任务调度系统包含异步、回调等模块,实现相对比较复杂。

go-sche 是一个基于 cron、gonal 实现的轻量级任务调度库。

目标

1、基于 cron、gonal 实现任务调度

  • Cron
  • Gonal

2、实现 memory、postgres 存储

  • Memory Store
  • Postgres Store

使用

1、初始化Scheduler

// 依赖
import "github.com/czasg/go-sche"
// 初始化调度
scheduler := sche.NewScheduler()

2、新增任务

_ = scheduler.AddTask(&sche.Task{
    Name: "task1",
    Label: map[string]string{
        "gonal标签": "gonal标签",
    },
    Trig: "* * * * *",
})

3、启动调度(阻塞)

_ = scheduler.Start(context.Background())

4、更新任务(基于ID)

_ = scheduler.AddTask(&sche.Task{
    ID: 1,
    Name: "task1",
    Label: map[string]string{
        "gonal标签": "gonal标签",
    },
    Trig: "*/30 * * * *",
})

5、删除任务(基于ID)

_ = scheduler.DelTask(&sche.Task{
    ID: 1,
})

4.Demo

package main

import (
	"context"
	"fmt"
	"github.com/czasg/go-sche"
	"github.com/czasg/gonal"
	"time"
)

func handler(ctx context.Context, labels gonal.Labels, data []byte) {
	fmt.Println(labels, string(data))
}

func main() {
	// 绑定 gonal 标签
	labels := map[string]string{"test": "test"}
	gonal.BindHandler(labels, handler)
	// 创建任务
	task := sche.Task{
		Name:  "task1",
		Label: labels,
		Trig:  "* * * * *",
	}
	// 初始化调度对象
	scheduler := sche.NewScheduler()
	// 后台挂起调度
	ctx, cancel := context.WithCancel(context.Background())
	go scheduler.Start(ctx)
	// 新增任务
	_ = scheduler.AddTask(&task)
	time.Sleep(time.Second * 5)
	task.Trig = "*/3 * * * *"
	_ = scheduler.UpdateTask(&task)
	time.Sleep(time.Second * 10)
	_ = scheduler.DelTask(&task)
	time.Sleep(time.Second * 5)
	cancel()
}

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	StoreInvalidTaskErr = errors.New("store invalid task.")
	StoreNoTaskErr      = errors.New("store no task.")
)
View Source
var (
	MaxDateTime = time.Date(9999, 1, 1, 0, 0, 0, 0, time.Local)
)

Functions

This section is empty.

Types

type Notify

type Notify struct {
	// contains filtered or unexported fields
}

func (*Notify) Notify

func (n *Notify) Notify()

func (*Notify) Wait

func (n *Notify) Wait(t time.Time) <-chan struct{}

type Scheduler

type Scheduler struct {
	Store Store
	// contains filtered or unexported fields
}

func NewScheduler

func NewScheduler(stores ...Store) *Scheduler

func (*Scheduler) AddTask

func (s *Scheduler) AddTask(task *Task) error

func (*Scheduler) DelTask

func (s *Scheduler) DelTask(task *Task) error

func (*Scheduler) Start

func (s *Scheduler) Start(ctx context.Context) error

func (*Scheduler) UpdateTask

func (s *Scheduler) UpdateTask(task *Task) error

type Store

type Store interface {
	Todo(now time.Time) ([]*Task, error)
	GetNextRunTime() (time.Time, error)
	AddTask(task *Task) error
	UpdateTask(task *Task) error
	DelTask(task *Task) error
	GetTaskByID(id int64) (*Task, error)
}

type StoreMemory

type StoreMemory struct {
	Index    int64
	Tasks    *list.List
	TasksMap map[int64]*list.Element
	Lock     sync.Mutex
}

func NewStoreMemory

func NewStoreMemory() *StoreMemory

func (*StoreMemory) AddTask

func (s *StoreMemory) AddTask(task *Task) error

func (*StoreMemory) DelTask

func (s *StoreMemory) DelTask(task *Task) error

func (*StoreMemory) GetNextRunTime

func (s *StoreMemory) GetNextRunTime() (time.Time, error)

func (*StoreMemory) GetTaskByID

func (s *StoreMemory) GetTaskByID(id int64) (*Task, error)

func (*StoreMemory) Todo

func (s *StoreMemory) Todo(now time.Time) ([]*Task, error)

func (*StoreMemory) UpdateTask

func (s *StoreMemory) UpdateTask(task *Task) error

type Task

type Task struct {
	ID          int64             `json:"id" pg:",pk"`
	Name        string            `json:"name" pg:",use_zero"`
	Label       map[string]string `json:"label" pg:",use_zero"`
	Trig        Trig              `json:"trig" pg:",use_zero"`
	LastRunTime time.Time         `json:"last_run_time" pg:",use_zero"`
	NextRunTime time.Time         `json:"next_run_time" pg:",use_zero"`
	Suspended   bool              `json:"suspended" pg:",use_zero"`
}

func (*Task) Run

func (t *Task) Run() error

type Trig

type Trig string

func (Trig) GetNextRunTime

func (t Trig) GetNextRunTime(previous time.Time) time.Time

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL