task

package module
v0.9.11 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 16, 2023 License: MIT Imports: 17 Imported by: 25

README

Devfeel/DotTask

简约大方的go-task组件
支持cron、loop、queue三种模式

特性

  • 支持配置方式(xml + json + yaml)与代码方式
  • 支持cron、loop、queue三种模式
  • cron模式支持“秒 分 时 日 月 周”配置
  • loop模式支持毫秒级别
  • queue模式支持毫秒级别
  • 上次任务没有停止的情况下不触发下次任务
  • 支持Exception、OnBegin、OnEnd注入点
  • 支持单独执行TaskHandler
  • 支持代码级重设Task的相关设置
  • 支持超时控制
  • 内建Task运行计数信息,包含执行与异常计数
  • 内建针对Task与Counter的OutputHttpHandler,可方便与WebServer自动集成

安装:

go get -u github.com/devfeel/dottask

快速开始:

package main

import (
	"fmt"
	. "github.com/devfeel/dottask"
	"time"
)

var service *TaskService

func Job_Test(ctx *TaskContext) error {
	fmt.Println(time.Now().String(), " => Job_Test")
	//time.Sleep(time.Second * 3)
	return nil
}

func Loop_Test(ctx *TaskContext) error {
	fmt.Println(time.Now().String(), " => Loop_Test")
	time.Sleep(time.Second * 3)
	return nil
}

func main() {

    //step 1: init new task service
	service = StartNewService()

	//step 2: register task handler
	_, err := service.CreateCronTask("testcron", true, "48-5 */2 * * * *", Job_Test, nil)
	if err != nil {
		fmt.Println("service.CreateCronTask error! => ", err.Error())
	}
	_, err = service.CreateLoopTask("testloop", true, 0, 1000, Loop_Test, nil)
	if err != nil {
		fmt.Println("service.CreateLoopTask error! => ", err.Error())
	}

	//step 3: start all task
	service.StartAllTask()

	fmt.Println(service.PrintAllTasks())

	for {
		time.Sleep(time.Hour)
	}

}

配置方式

package main

import (
	"fmt"
	. "github.com/devfeel/dottask"
	"time"
)

var service *TaskService

func Job_Config(ctx *TaskContext) error {
	fmt.Println(time.Now().String(), " => Job_Config")
	//time.Sleep(time.Second * 3)
	return nil
}

func Loop_Config(ctx *TaskContext) error {
	fmt.Println(time.Now().String(), " => Loop_Config")
	time.Sleep(time.Second * 3)
	return nil
}

func RegisterTask(service *TaskService) {
	service.RegisterHandler("Job_Config", Job_Config)
	service.RegisterHandler("Loop_Config", Loop_Config)
}

func main() {
	//step 1: init new task service
	service = StartNewService()

	//step 2: register all task handler
	RegisterTask(service)

	//step 3: load config file
	service.LoadConfig("d:\\task.conf")

	//step 4: start all task
	service.StartAllTask()

	fmt.Println(service.PrintAllTasks())

	for {
		time.Sleep(time.Hour)
	}
}

task.xml.conf:
<?xml version="1.0" encoding="UTF-8"?>
<config>
<global isrun="true" logpath="d:/"/>
<tasks>
    <task taskid="Loop_Config" type="loop" isrun="true" duetime="10000" interval="10" handlername="Loop_Config" />
    <task taskid="Job_Config" type="cron" isrun="true" express="0 */5 * * * *" handlername="Job_Config" />
</tasks>
</config>

关于表达式

  • 关于CronTask的TimeExpress 简单解释:
  • 基本格式:* * * * * *(6列,以空格分隔)
  • f1:第1列表示秒0-59,每一秒用/1 表示。
  • f2:第2列表示分钟0-59。
  • f3:第3列表示小时0-23。
  • f4:第4列表示日期1-31。
  • f5:第5列表示月份1-12。
  • f6:第6列表示星期几0-7,其中0和7均表示为周日。
  • 当f1为 * 时表示每秒都要执行任务,f2为 * 时表示每分钟都要执行程序,其余类推
  • 当f1为 a-b 时表示从第 a 秒钟到第 b 秒钟这段时间内要执行,f2 为 a-b 时表示从第 a 到第 b 分钟都要执行,其余类推
  • 当f1为 */n 时表示每 n 秒钟个时间间隔执行一次,f2 为 */n 表示每 n 小时个时间间隔执行一次,其余类推
  • 当f1为 a, b, c,... 时表示第 a, b, c,... 秒钟要执行,f2 为 a, b, c,... 时表示第 a, b, c...分钟要执行,其余类推
示例:
  • #每天早上7点执行一次调度任务:
  • 0 0 7 * * *
  • #在 12 月内, 每天的早上 6 点到 12 点中,每隔3个小时执行一次调度任务:
  • 0 0 6-12/3 * 12 *
  • #周一到周五每天下午 5:00执行一次调度任务:
  • 0 0 17 * * 1-5
  • #每月每天的午夜 0 点 20 分, 2 点 20 分, 4 点 20 分....执行一次调度任务
  • 0 20 0-23/2 * * *
  • #每月每天的0 点 20 分, 9 点 20 分, 16 点 20 分执行一次调度任务
  • 0 20 0,9,16 * * *

外部依赖

yaml - https://gopkg.in/yaml.v2

如何联系

QQ群:193409346

Documentation

Index

Constants

View Source
const (
	TaskState_Init = "0"
	TaskState_Run  = "1"
	TaskState_Stop = "2"

	TaskType_Loop  = "loop"
	TaskType_Cron  = "cron"
	TaskType_Queue = "queue"

	ConfigType_Xml  = "xml"
	ConfigType_Json = "json"
	ConfigType_Yaml = "yaml"
)
View Source
const (
	Max_WeekDay = 7  //max weekday value
	Min_WeekDay = 0  //min weekday value
	Max_Month   = 12 //max month value
	Min_Month   = 1  //min month value
	Max_Day     = 31 //max day value
	Min_Day     = 1  //min day value
	Max_Hour    = 23 //max hour value
	Min_Hour    = 0  //min hour value
	Max_Minute  = 59 //max minute value
	Min_Minute  = 0  //min minute value
	Max_Second  = 59 //max second value
	Min_Second  = 0  //min second value
)
View Source
const (
	ExpressType_WeekDay = "weekday"
	ExpressType_Month   = "month"
	ExpressType_Day     = "day"
	ExpressType_Hour    = "hour"
	ExpressType_Minute  = "minute"
	ExpressType_Second  = "second"
)
View Source
const (
	DefaultPeriod = time.Second //默认执行周期

)
View Source
const (
	DefaultQueueSize = 1000
)

Variables

View Source
var ErrNotSupportTaskType = errors.New("not support task type")

Functions

func NewFileLogger

func NewFileLogger(filePath string) *fileLogger

func NewFmtLogger

func NewFmtLogger() *fmtLogger

func PackageVersion

func PackageVersion() string

PackageVersion return packageVersion info

func ValidateTaskType

func ValidateTaskType(taskType string) bool

ValidateTaskType validate the TaskType is supported

Types

type AppConfig

type AppConfig struct {
	XMLName xml.Name `xml:"config"`

	Global struct {
		LogPath string `xml:"logpath,attr"  yaml:"logpath"`
		IsRun   bool   `xml:"isrun,attr" yaml:"isrun"`
		Timeout int64  `xml:"timeout,attr" yaml:"timeout"` //全局超时配置,单位为毫秒
	} `xml:"global" yaml:"global"`

	Tasks []struct {
		TaskID      string `xml:"taskid,attr" yaml:"taskid"`           //task编号,需唯一
		IsRun       bool   `xml:"isrun,attr" yaml:"isrun"`             //标识是否允许task执行,默认为false,如设为flash,则启动后不执行task
		TaskType    string `xml:"type,attr" yaml:"type"`               //Task类型,目前支持loop、cron、queue
		DueTime     int64  `xml:"duetime,attr" yaml:"duetime"`         //开始任务的延迟时间(以毫秒为单位),如果<=0则不延迟
		Interval    int64  `xml:"interval,attr" yaml:"interval"`       //loop类型下,两次Task执行之间的间隔,单位为毫秒
		Express     string `xml:"express,attr" yaml:"express"`         //cron类型下,task执行的时间表达式,具体参考readme
		QueueSize   int64  `xml:"queuesize,attr" yaml:"queuesize"`     //queue类型下,queue初始长度
		HandlerName string `xml:"handlername,attr" yaml:"handlername"` //Task对应的HandlerName,需使用RegisterHandler进行统一注册
		HandlerData string `xml:"handlerdata,attr" yaml:"handlerdata"` //Task对应的自定义数据,可在配置源中设置
		Timeout     int64  `xml:"timeout,attr" yaml:"timeout"`         //全局超时配置,单位为毫秒
	} `xml:"tasks>task" yaml:"tasks"`
}

func JsonConfigHandler

func JsonConfigHandler(configFile string) *AppConfig

初始化配置文件(json)

func XmlConfigHandler

func XmlConfigHandler(configFile string) *AppConfig

初始化配置文件(xml)

func YamlConfigHandler

func YamlConfigHandler(configFile string) *AppConfig

初始化配置文件(yaml)

type ConfigHandle

type ConfigHandle func(configSource string) (*AppConfig, error)

type Counter

type Counter interface {
	StartTime() time.Time
	Clear()
	Count() int64
	Dec(int64)
	Inc(int64)
}

Counter incremented and decremented base on int64 value.

func NewCounter

func NewCounter() Counter

NewCounter constructs a new StandardCounter.

type CounterInfo

type CounterInfo struct {
	StartTime    time.Time
	RunCounter   Counter
	ErrorCounter Counter
}

type CronTask

type CronTask struct {
	TaskInfo
	RawExpress string `json:"express"` //运行周期表达式,当TaskType==TaskType_Cron时有效
	// contains filtered or unexported fields
}

CronTask cron task info define

func (*CronTask) GetConfig

func (task *CronTask) GetConfig() *TaskConfig

GetConfig get task config info

func (*CronTask) Reset

func (task *CronTask) Reset(conf *TaskConfig) error

Reset first check conf, then reload conf & restart task special, TaskID can not be reset special, if TaskData is nil, it can not be reset special, if Handler is nil, it can not be reset

func (*CronTask) RunOnce

func (task *CronTask) RunOnce() error

RunOnce do task only once no match Express or Interval no recover panic support for #6 新增RunOnce方法建议

func (*CronTask) Start

func (task *CronTask) Start()

Start start task

type ExceptionHandleFunc

type ExceptionHandleFunc func(*TaskContext, error)

type ExpressSet

type ExpressSet struct {
	// contains filtered or unexported fields
}

func (*ExpressSet) IsMatch

func (e *ExpressSet) IsMatch(t time.Time) bool

type Logger

type Logger interface {
	Error(err error, v interface{})
	Warn(v interface{})
	Info(v interface{})
	Debug(v interface{})
}

type LoopTask

type LoopTask struct {
	TaskInfo
	Interval int64 `json:"interval"` //运行间隔时间,单位毫秒,当TaskType==TaskType_Loop||TaskType_Queue时有效
}

LoopTask loop task info define

func (*LoopTask) GetConfig

func (task *LoopTask) GetConfig() *TaskConfig

GetConfig get task config info

func (*LoopTask) Reset

func (task *LoopTask) Reset(conf *TaskConfig) error

Reset first check conf, then reload conf & restart task special, TaskID can not be reset special, if TaskData is nil, it can not be reset special, if Handler is nil, it can not be reset

func (*LoopTask) RunOnce

func (task *LoopTask) RunOnce() error

RunOnce do task only once no match Express or Interval no recover panic support for #6 新增RunOnce方法建议

func (*LoopTask) Start

func (task *LoopTask) Start()

Start start task

type QueueTask

type QueueTask struct {
	TaskInfo
	Interval    int64 //运行间隔时间,单位毫秒,当TaskType==TaskType_Loop||TaskType_Queue时有效
	MessageChan chan interface{}
}

func (*QueueTask) EnQueue

func (task *QueueTask) EnQueue(value interface{})

EnQueue enqueue value into message queue

func (*QueueTask) GetConfig

func (task *QueueTask) GetConfig() *TaskConfig

GetConfig get task config info

func (*QueueTask) Reset

func (task *QueueTask) Reset(conf *TaskConfig) error

Reset first check conf, then reload conf & restart task

func (*QueueTask) RunOnce

func (task *QueueTask) RunOnce() error

RunOnce do task only once

func (*QueueTask) Start

func (task *QueueTask) Start()

Start start task

type ShowCountInfo

type ShowCountInfo struct {
	TaskID string
	Lable  string
	Count  int64
}

type StandardCounter

type StandardCounter struct {
	// contains filtered or unexported fields
}

StandardCounter is the standard implementation of a Counter

func (*StandardCounter) Clear

func (c *StandardCounter) Clear()

Clear sets the counter to zero.

func (*StandardCounter) Count

func (c *StandardCounter) Count() int64

Count returns the current count.

func (*StandardCounter) Dec

func (c *StandardCounter) Dec(i int64)

Dec decrements the counter by the given amount.

func (*StandardCounter) Inc

func (c *StandardCounter) Inc(i int64)

Inc increments the counter by the given amount.

func (*StandardCounter) StartTime

func (c *StandardCounter) StartTime() time.Time

type Task

type Task interface {
	TaskID() string
	GetConfig() *TaskConfig
	Start()
	Stop()
	SetTimeout(int64)
	RunOnce() error
	SetTaskService(service *TaskService)
	Reset(conf *TaskConfig) error
	CounterInfo() *CounterInfo
}

func NewCronTask

func NewCronTask(taskID string, isRun bool, express string, handler TaskHandle, taskData interface{}) (Task, error)

NewCronTask create new cron task

func NewLoopTask

func NewLoopTask(taskID string, isRun bool, dueTime int64, interval int64, handler TaskHandle, taskData interface{}) (Task, error)

NewLoopTask create new loop task

func NewQueueTask

func NewQueueTask(taskID string, isRun bool, interval int64, handler TaskHandle, taskData interface{}, queueSize int64) (Task, error)

NewQueueTask create new queue task

type TaskConfig

type TaskConfig struct {
	TaskID   string
	TaskType string
	IsRun    bool
	Handler  TaskHandle `json:"-"`
	DueTime  int64
	Interval int64
	Express  string
	TaskData interface{}
}

TaskConfig task config

type TaskContext

type TaskContext struct {
	TaskID         string
	TaskData       interface{} //用于当前Task全局设置的数据项
	Message        interface{} //用于每次Task执行上下文消息传输
	IsEnd          bool        //如果设置该属性为true,则停止当次任务的后续执行,一般用在OnBegin中
	Error          error
	Header         map[string]interface{}
	TimeoutContext context.Context
	TimeoutCancel  context.CancelFunc
	// contains filtered or unexported fields
}

Task上下文信息

type TaskHandle

type TaskHandle func(*TaskContext) error

type TaskInfo

type TaskInfo struct {
	IsRun bool `json:"isrun"`

	TimeTicker *time.Ticker `json:"-"`
	TaskType   string       `json:"tasktype"`

	TaskData interface{}
	State    string `json:"state"`   //匹配 TskState_Init、TaskState_Run、TaskState_Stop
	DueTime  int64  `json:"duetime"` //开始任务的延迟时间(以毫秒为单位),如果<=0则不延迟
	Timeout  int64
	// contains filtered or unexported fields
}

TaskInfo task info define

func (*TaskInfo) CounterInfo

func (task *TaskInfo) CounterInfo() *CounterInfo

func (*TaskInfo) RunOnce

func (task *TaskInfo) RunOnce() error

RunOnce do task only once no match Express or Interval no recover panic support for #6 新增RunOnce方法建议

func (*TaskInfo) SetTaskService

func (task *TaskInfo) SetTaskService(service *TaskService)

SetTaskService Set up the associated service

func (*TaskInfo) SetTimeout

func (task *TaskInfo) SetTimeout(timeout int64)

func (*TaskInfo) Stop

func (task *TaskInfo) Stop()

Stop stop task

func (*TaskInfo) TaskID

func (task *TaskInfo) TaskID() string

TaskID return taskID

type TaskService

type TaskService struct {
	Config *AppConfig

	ExceptionHandler ExceptionHandleFunc
	OnBeforeHandler  TaskHandle
	OnEndHandler     TaskHandle
	// contains filtered or unexported fields
}

TaskService task 容器

func StartNewService

func StartNewService() *TaskService

func (*TaskService) AddTask

func (service *TaskService) AddTask(t Task)

AddTask add new task point

func (*TaskService) Count

func (service *TaskService) Count() int

Count get all task's count

func (*TaskService) CounterOutputHttpHandler

func (service *TaskService) CounterOutputHttpHandler(w http.ResponseWriter, r *http.Request)

CounterOutputHttpHandler Http Handler for output counter info

func (*TaskService) CreateCronTask

func (service *TaskService) CreateCronTask(taskID string, isRun bool, express string, handler TaskHandle, taskData interface{}) (Task, error)

CreateCronTask create new cron task and register to task service

func (*TaskService) CreateLoopTask

func (service *TaskService) CreateLoopTask(taskID string, isRun bool, dueTime int64, interval int64, handler TaskHandle, taskData interface{}) (Task, error)

CreateLoopTask create new loop task and register to task service

func (*TaskService) CreateQueueTask

func (service *TaskService) CreateQueueTask(taskID string, isRun bool, interval int64, handler TaskHandle, taskData interface{}, queueSize int64) (Task, error)

CreateQueueTask create new queue task and register to task service

func (*TaskService) CreateTask

func (service *TaskService) CreateTask(config TaskConfig) (Task, error)

CreateTask create new task with TaskConfig and register to task service

func (*TaskService) GetAllTaskCountInfo

func (service *TaskService) GetAllTaskCountInfo() []ShowCountInfo

GetAllTaskCountInfo return all show count info

func (*TaskService) GetAllTasks

func (service *TaskService) GetAllTasks() map[string]Task

GetAllTasks get all tasks

func (*TaskService) GetHandler

func (service *TaskService) GetHandler(name string) (TaskHandle, bool)

GetHandler get handler by handler name

func (*TaskService) GetTask

func (service *TaskService) GetTask(taskID string) (t Task, exists bool)

GetTask get TaskInfo by TaskID

func (*TaskService) LoadConfig

func (service *TaskService) LoadConfig(configFile string, confType ...interface{}) *TaskService

LoadConfig 如果指定配置文件,初始化配置 Deprecated: Use the LoadFileConfig instead

func (*TaskService) LoadConfigHandler

func (service *TaskService) LoadConfigHandler(configHandler ConfigHandle, configSource string) *TaskService

LoadConfigHandler load config handler and init task config

func (*TaskService) LoadFileConfig

func (service *TaskService) LoadFileConfig(configFile string, confType ...interface{}) *TaskService

LoadFileConfig 如果指定配置文件,初始化配置

func (*TaskService) Logger

func (service *TaskService) Logger() Logger

Logger get Logger

func (*TaskService) PrintAllCronTask

func (service *TaskService) PrintAllCronTask() string

PrintAllCronTask print all task info Deprecated: Use the PrintAllTasks instead

func (*TaskService) PrintAllTaskCounterInfo

func (service *TaskService) PrintAllTaskCounterInfo() string

PrintAllTaskCounterInfo print all task counter data

func (*TaskService) PrintAllTasks

func (service *TaskService) PrintAllTasks() string

PrintAllCronTask print all task info

func (*TaskService) RegisterHandler

func (service *TaskService) RegisterHandler(name string, handler TaskHandle) *TaskService

RegisterHandler register handler by name

func (*TaskService) RemoveAllTask

func (service *TaskService) RemoveAllTask()

RemoveAllTask remove all task

func (*TaskService) RemoveTask

func (service *TaskService) RemoveTask(taskID string)

RemoveTask remove task by taskid

func (*TaskService) SetExceptionHandler

func (service *TaskService) SetExceptionHandler(handler ExceptionHandleFunc)

SetExceptionHandler 设置自定义异常处理方法

func (*TaskService) SetLogger

func (service *TaskService) SetLogger(logger Logger)

SetLogger set logger which Implements Logger interface

func (*TaskService) SetOnBeforeHandler

func (service *TaskService) SetOnBeforeHandler(handler TaskHandle)

SetOnBeforeHandler set handler which exec before task run

func (*TaskService) SetOnEndHandler

func (service *TaskService) SetOnEndHandler(handler TaskHandle)

SetOnEndHandler set handler which exec after task run

func (*TaskService) StartAllTask

func (service *TaskService) StartAllTask()

StartAllTask start all task

func (*TaskService) StopAllTask

func (service *TaskService) StopAllTask()

StopAllTask stop all task

func (*TaskService) TaskOutputHttpHandler

func (service *TaskService) TaskOutputHttpHandler(w http.ResponseWriter, r *http.Request)

TaskOutputHttpHandler Http Handler for output task info

func (*TaskService) UseDefaultLogCounterTask

func (service *TaskService) UseDefaultLogCounterTask()

UseDefaultLogCounterTask use default LogCounterTask in TaskService

Directories

Path Synopsis
example

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL