schedule

package
v0.0.0-...-3f9f090 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 12, 2023 License: MIT Imports: 10 Imported by: 0

Documentation

Overview

调度模块,负责从元数据库读取并解析调度信息。 将需要执行的任务发送给执行模块,并读取返回信息。

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func CheckErr

func CheckErr(info string, err error)

CheckErr检查错误信息,若有错误则打印并抛出异常。

func Copy

func Copy(copy_to interface{}, copy_from interface{}) (err error)

Copy复制对象 来自github.com/jinzhu/copier

func GetNow

func GetNow() time.Time

获取当前时间

func PrintErr

func PrintErr(info string, err error)

PrintErr打印错误信息

func Restore

func Restore(batchId string, scdId int64) (err error)

ExecSchedule.Restore(batchId string)方法修复执行指定的调度。 根据传入的batchId,构建调度执行结构,并调用Run方法执行其中的任务

func TruncDate

func TruncDate(cyc string, now time.Time) time.Time

时间取整

Types

type ExecJob

type ExecJob struct {
	// contains filtered or unexported fields

} // }}}

作业执行信息结构

func ExecJobWarper

func ExecJobWarper(batchId string, j *Job) *ExecJob

根据传入的batchId和Job参数来构建一个调度的执行结构,并返回。

func (*ExecJob) InitExecJob

func (ej *ExecJob) InitExecJob(es *ExecSchedule) (err error)

初始化作业执行链,并返回。

func (*ExecJob) Log

func (j *ExecJob) Log() (err error)

保存执行日志

func (*ExecJob) Start

func (ej *ExecJob) Start() (err error)

设置ExecJob的状态为开始,并记录到log中

func (*ExecJob) TaskDone

func (ej *ExecJob) TaskDone(et *ExecTask) (err error)

type ExecSchedule

type ExecSchedule struct {
	// contains filtered or unexported fields

} // }}}

调度执行信息结构

func ExecScheduleWarper

func ExecScheduleWarper(s *Schedule) *ExecSchedule

根据传入的Schedule参数来构建一个调度的执行结构,并返回。

func (*ExecSchedule) InitExecSchedule

func (es *ExecSchedule) InitExecSchedule() (err error)

初始化调度的执行结构,使之包含完整的执行链。

func (*ExecSchedule) Log

func (s *ExecSchedule) Log() (err error)

保存执行日志

func (*ExecSchedule) Pause

func (es *ExecSchedule) Pause()

Pause暂停调度执行

func (*ExecSchedule) Run

func (es *ExecSchedule) Run()

ExecSchedule.Run()方法执行调度任务。 过程中会维护一个Chan *ExecTask类型变量staskChan,用来传递执行完成的Task。 通过遍历Schedule下的全部Task,找出可执行的Task(依赖列表为空的Task),启动线程执行task.Run 方法,并将staskChan传给它。当Task执行结束后会把自己放入staskChan中,处理的另一部分监控着 staskChan,从其中取出执行完毕的task后,会从其它任务的依赖列表中将已执行完毕的task删除, 并重新找出依赖列表为空的task,启动线程运行它的Run方法。 全部执行结束后,设置Schedule的下次启动时间。

func (*ExecSchedule) RunTasks

func (es *ExecSchedule) RunTasks() (err error)

执行参数ets中符合运行条件的任务

func (*ExecSchedule) Start

func (es *ExecSchedule) Start() (err error)

ExecSchedule执行前状态记录

func (*ExecSchedule) TaskDone

func (es *ExecSchedule) TaskDone(et *ExecTask) (finish bool, err error)

调度中的一个任务完成,更新状态。 当调度中全部任务完成后,将调度执行体从全局列表中移除,并设置下次启动时间。

type ExecTask

type ExecTask struct {
	// contains filtered or unexported fields

} // }}}

任务执行信息结构

func ExecTaskWarper

func ExecTaskWarper(ej *ExecJob, t *Task) *ExecTask

根据传入的batchId和Job参数来构建一个调度的执行结构,并返回。

func (*ExecTask) InitExecTask

func (et *ExecTask) InitExecTask(es *ExecSchedule) error

初始化Task执行结构

func (*ExecTask) Log

func (t *ExecTask) Log() (err error)

保存执行日志

func (*ExecTask) Run

func (et *ExecTask) Run(taskChan chan *ExecTask)

Run方法负责执行任务。 首先会判断是否符合执行条件,符合则执行 执行时会从任务执行结构中取出需要执行的信息,通过RPC发送给执行模块执行。 完成后更新执行信息,并将任务置入taskChan变量中,供后续处理。

type GlobalConfigStruct

type GlobalConfigStruct struct {
	L           *logrus.Logger   //log对象
	HiveConn    *sql.DB          //元数据库链接
	LogConn     *sql.DB          //日志数据库链接
	ManagerPort string           //管理模块的web服务端口
	Port        string           //Schedule与Worker模块通信端口
	Schedules   *ScheduleManager //包含全部Schedule列表的结构

} // }}}

GlobalConfigStruct结构中定义了程序中的一些配置信息

func DefaultGlobal

func DefaultGlobal() *GlobalConfigStruct

返回GlobalConfigStruct的默认值。

type Job

type Job struct {
	Id           int64            //作业ID
	ScheduleId   int64            //调度ID
	ScheduleCyc  string           //调度周期
	Name         string           //作业名称
	Desc         string           //作业说明
	PreJobId     int64            //上级作业ID
	PreJob       *Job             `json:"-"` //上级作业
	NextJobId    int64            //下级作业ID
	NextJob      *Job             `json:"-"` //下级作业
	Tasks        map[string]*Task //作业中的任务
	TaskCnt      int              //调度中任务数量
	CreateUserId int64            //创建人
	CreateTime   time.Time        //创人
	ModifyUserId int64            //修改人
	ModifyTime   time.Time        //修改时间

} // }}}

作业信息结构

func (*Job) DeleteTask

func (j *Job) DeleteTask(taskid int64) (err error)

删除作业任务映射关系至元数据库

func (*Job) InitJob

func (j *Job) InitJob(s *Schedule) error

根据Job.Id初始化Job结构,从元数据库获取Job的基本信息初始化后 继续初始化Job所属的Task列表,同时递归调用自身,初始化下级Job结构 失败返回error信息。

func (*Job) InitTasksForJob

func (j *Job) InitTasksForJob(s *Schedule) error

初始化Job下的Tasks信息,从元数据库取到Job下所有的TaskId后 调用方法初始化Task并加至Job的Tasks成员中,同时也添加到全局Tasks列表 出错返回错误信息

func (*Job) UpdateTask

func (j *Job) UpdateTask(task *Task) (err error)

UpdateTask更新Job中指定Task的信息。 它会根据参数查找本Job下符合的Task,找到后更新信息 并调用Task的add方法进行持久化操作。

type Reply

type Reply struct {
	Err    string //错误信息
	Stdout string //标准输出

} // }}}

type Schedule

type Schedule struct {
	Id          int64           //调度ID
	Name        string          //调度名称
	Count       int8            //调度次数
	Cyc         string          //调度周期
	StartSecond []time.Duration //启动时间
	StartMonth  []int           //启动月份
	NextStart   time.Time       //下次启动时间
	TimeOut     int64           //最大执行时间
	JobId       int64           //作业ID
	Job         *Job            //作业
	Jobs        []*Job          //作业列表
	Tasks       []*Task         `json:"-"` //任务列表

	Desc         string    //调度说明
	JobCnt       int       //调度中作业数量
	TaskCnt      int       //调度中任务数量
	CreateUserId int64     //创建人
	CreateTime   time.Time //创人
	ModifyUserId int64     //修改人
	ModifyTime   time.Time //修改时间
	// contains filtered or unexported fields

} // }}}

调度信息结构

func (*Schedule) Add

func (s *Schedule) Add() error

增加Schedule信息

func (*Schedule) AddJob

func (s *Schedule) AddJob(job *Job) error

在调度中添加一个Job,AddJob会接收传入的Job类型的参数,并调用它的 Add()方法进行持久化操作。成功后把它添加到调度链中,添加时若调度 下无Job则将Job直接添加到调度中,否则添加到调度中的任务链末端。

func (*Schedule) AddScheduleStart

func (s *Schedule) AddScheduleStart() error

addStart将Schedule的启动列表持久化到数据库 添加前先调用delStart方法将Schedule中的原有启动列表清空 需要注意的是:内存中的启动列表单位为纳秒,存储前需要转成秒 若成功则开始添加,失败返回err信息

func (*Schedule) AddTask

func (s *Schedule) AddTask(task *Task) error

增加Task,将参数中的Task加入Schedule中,并调用其add方法持久化。

func (*Schedule) Delete

func (s *Schedule) Delete() error

Delete方法删除Schedule下的Job、Task信息并持久化。

func (*Schedule) DeleteJob

func (s *Schedule) DeleteJob(id int64) error

DeleteJob删除调度中最后一个Job,它会接收传入的Job Id,并查看是否 调度中最后一个Job,是,检查Job下有无Task,无,则执行删除操作,完成 后,将该Job的前一个Job的nextJob指针置0,更新调度信息。 出错或不符条件则返回error信息

func (*Schedule) DeleteTask

func (s *Schedule) DeleteTask(id int64) error

DeleteTask方法用来删除指定id的Task。首先会根据传入参数在Schedule的Tasks列 表中查出对应的Task。然后将其从Tasks列表中去除,将其从所属Job中去除,调用 Task的Delete方法删除Task的依赖关系,完成后删除元数据库的信息。 没找到对应Task或删除失败,返回error信息。

func (*Schedule) GetJobById

func (s *Schedule) GetJobById(id int64) (*Job, error)

GetJobById遍历Jobs列表,返回调度中指定Id的Job,若没找到返回nil

func (*Schedule) GetTaskById

func (s *Schedule) GetTaskById(id int64) *Task

GetTaskById根据传入的id查找Tasks中对应的Task,没有则返回nil。

func (*Schedule) InitSchedule

func (s *Schedule) InitSchedule() error

从元数据库初始化Schedule结构,先从元数据库获取Schedule的信息,完成后 根据其中的Jobid继续从元数据库读取job信息,并初始化。完成后继续初始化下级Job, 同时将初始化完成的Job和Task添加到Schedule的Jobs、Tasks成员中。

func (*Schedule) Timer

func (s *Schedule) Timer()

按时启动Schedule,Timer中会根据Schedule的周期以及启动时间计算下次 启动的时间,并依据此设置一个定时器按时唤醒,Schedule唤醒后,会重新 从元数据库初始化一下信息,生成执行结构ExecSchedule,执行其Run方法

func (*Schedule) UpdateJob

func (s *Schedule) UpdateJob(job *Job) error

UpdateJob用来在调度中添加一个Job UpdateJob会接收传入的Job类型的参数,修改调度中对应的Job信息,完成后 调用Job自身的update方法进行持久化操作。

func (*Schedule) UpdateSchedule

func (s *Schedule) UpdateSchedule() error

UpdateSchedule方法会将传入参数的信息更新到Schedule结构并持久化到数据库中 在持久化之前会调用addStart方法将启动列表持久化

type ScheduleManager

type ScheduleManager struct {
	ScheduleList     []*Schedule              //全部的调度列表
	ExecScheduleList map[string]*ExecSchedule //当前执行的调度列表
	Global           *GlobalConfigStruct      //配置信息

} // }}}

ScheduleManager通过成员ScheduleList持有全部的Schedule。 并提供获取、增加、删除以及启动、停止Schedule的功能。

func (*ScheduleManager) AddExecSchedule

func (sl *ScheduleManager) AddExecSchedule(es *ExecSchedule)

增加一个调度执行结构

func (*ScheduleManager) AddSchedule

func (sl *ScheduleManager) AddSchedule(s *Schedule) error

增加Schedule,将参数中的Schedule加入的列表中,并调用其Add方法持久化。

func (*ScheduleManager) DeleteSchedule

func (sl *ScheduleManager) DeleteSchedule(id int64) error

从当前ScheduleList列表中移除指定id的Schedule。 完成后,调用Schedule自身的Delete方法,删除其中的Job、Task信息并做持久化操作。 失败返回error信息

func (*ScheduleManager) GetScheduleById

func (sl *ScheduleManager) GetScheduleById(id int64) *Schedule

查找当前ScheduleList列表中指定id的Schedule,并返回。 查不到返回nil

func (*ScheduleManager) InitScheduleList

func (sl *ScheduleManager) InitScheduleList()

初始化ScheduleList,设置全局变量g

func (*ScheduleManager) RemoveExecSchedule

func (sl *ScheduleManager) RemoveExecSchedule(batchId string)

移除一个调度执行结构

func (*ScheduleManager) StartListener

func (sl *ScheduleManager) StartListener()

开始监听Schedule,遍历列表中的Schedule并启动它的Timer方法。

func (*ScheduleManager) StartScheduleById

func (sl *ScheduleManager) StartScheduleById(id int64) error

启动指定的Schedule,从ScheduleList中获取到指定id的Schedule后,从元数据库获取 Schedule的信息初始化一下调度链,然后调用它自身的Timer方法,启动监听。 失败返回error信息。

type Task

type Task struct {
	Id           int64             // 任务的ID
	Address      string            // 任务的执行地址
	Name         string            // 任务名称
	TaskType     int64             // 任务类型
	ScheduleCyc  string            //调度周期
	TaskCyc      string            //调度周期
	StartSecond  time.Duration     //周期内启动时间
	Cmd          string            // 任务执行的命令或脚本、函数名等。
	Desc         string            //任务说明
	TimeOut      int64             // 设定超时时间,0表示不做超时限制。单位秒
	Param        []string          // 任务的参数信息
	Attr         map[string]string // 任务的属性信息
	JobId        int64             //所属作业ID
	RelTasksId   []int64           //依赖的任务Id
	RelTasks     map[string]*Task  //`json:"-"` //依赖的任务
	RelTaskCnt   int64             //依赖的任务数量
	CreateUserId int64             //创建人
	CreateTime   time.Time         //创人
	ModifyUserId int64             //修改人
	ModifyTime   time.Time         //修改时间

} // }}}

任务信息结构

func (*Task) AddRelTask

func (t *Task) AddRelTask(rt *Task) (err error)

增加依赖的任务

func (*Task) AddTask

func (t *Task) AddTask() (err error)

AddTask方法持久化当前的Task信息。 调用add方法将Task基本信息持久化。 完成后处理作业关联信息、Task依赖关系、参数列表。

func (*Task) Delete

func (t *Task) Delete() (err error)

删除Task,依次删除Param、RelTask关系、Task

func (*Task) DeleteRelTask

func (t *Task) DeleteRelTask(relid int64) error

删除依赖的任务关系

func (*Task) InitTask

func (t *Task) InitTask(s *Schedule) error

根据Task.Id从元数据库获取信息初始化Task结构,包含以下动作 初始化Task基本信息

Task属性信息
Task的参数信息
依赖的Task列表

失败返回错误信息。

func (*Task) UpdateTask

func (t *Task) UpdateTask() error

更新Task信息到元数据库。 更新基本信息后,更新参数信息

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL