异步任务管理器
OneToManyTaskManagerInterface: mapReduce的简单实现
- oneToMany : 一个task执行多个handle,每一个oneToManyTask就是执行一个定长的AsyncTaskManager
- 应用场景:数据汇总,消息分发
AsyncTaskManagerInterface: 事件回调模式的异步任务
- 可以定长任务
- 可以Stop
- todo:从等待队列里删除
- todo:任务超时处理
工作原理
- 在加入任务时或任务完成时都会触发"任务从等待队列转到工作队列"的动作,并有触发条件与互斥锁保证了任务不会丢失与不会重复
- 任务管理者(有默认实现)
- 任务执行者interface
- 任务(有默认实现)
注意
- 大并发操作DB是,一定注意链接数问题
- 常用的orm,经测试,只有xorm能锁定链接数
测试异步写库
- orm: xorm,1000的链接数
- 总任务:100000
- 并行数:1000
- 任务执行者:40次写入
- 用时:261秒,写入4000000条记录
// AsyncTaskManagerInterface 异步任务管理器通用接口
type AsyncTaskManagerInterface interface {
Start() // 开始
Stop() // 停止
SetWaitListMaxLength(l int) // 设置等待队列的最大长度,默认:1000
SetWorkPoolSize(s int) // 设置并行的数量,默认:100
AddAsyncTask(t AsyncTaskInterface, h AsyncTaskHandle, cb ...AsyncTaskCallBack) error // 增加一个异步任务
SetFinishCallBack(f func(*TaskManagerResult)) // 设置任务完成的回调
CheckTaskInWork(tag string) (bool, error) // 查验任务是否在执行
GetWaitLength() int // 获取当前等待队列长度
GetWorkerLength() int // 获取当前工作队列长度
GetManagerStatus() uint8 // 获取管理器状态
SetTaskMaxLength(s int) // 设置最大任务数
SetProgressCallBack(p int, cb ProgressCallBack) // 设置进度回调
}
type AsyncTaskInterface interface {
GetTag() *TaskTag // 任务全局唯一标识
SetStartUnix(end int64) // 设置任务开始时间
GetStartUnix() int64 // 获取任务开始时间
SetEndUnix(end int64) // 设置任务结束时间
GetEndUnix() int64 // 获取任务结束时间
SetStatus(s uint8) // 设置当前任务状态
GetStatus() uint8 // 获取当前任务状态
SetError(e error)
GetError() error
}
type AsyncTaskHandle interface {
Start(AsyncTaskInterface) // 开始执行任务
Stop() // 结束任务,然后拉起回调,返回enmu.UserStopError
SetHandleCallBack(f AsyncTaskCallBack) // 配置执行的回调
}
// AsyncTaskCallBack 异步任务回调
type AsyncTaskCallBack func(AsyncTaskInterface)
// TaskManagerResult 任务管理器执行结果
type TaskManagerResult struct {
StartTime int64
EndTime int64
Length uint64
TagsMap map[string]AsyncTaskInterface
Status uint8
}
// ProgressCallBack 进度回调
type ProgressCallBack func(*TaskProgress)
// TaskProgress 进度信息
type TaskProgress struct {
WorkNum int // 任务并行数
AllTaskNum int // 总任务数,如果没有SetTaskMaxLength(),返回0
Progress int // 当前完成个数
ParentTime int64 // 上次进度回调的时间
}
// OneToManyTaskInterface 一对多异步任务通用接口
type OneToManyTaskInterface interface {
AsyncTaskInterface
SetTaskTag(tag *TaskTag)
GetHandleTag() *TaskTag
SetHandleTag(tag *TaskTag)
GetHandleResult() interface{}
SetHandleResult(res interface{})
Copy() OneToManyTaskInterface
}
// OneToManyTaskHandleInterface 一对多异步任务Handle通用接口
type OneToManyTaskHandleInterface interface {
Start(AsyncTaskInterface) // 开始执行任务
Stop() // 结束任务,然后拉起回调,返回enmu.UserStopError
SetHandleCallBack(f AsyncTaskCallBack) // 配置执行的回调
GetTarget() *TaskTag // 同一任务中唯一
}
// OneToManyTaskManagerInterface 一对多异步任务管理通用接口
type OneToManyTaskManagerInterface interface {
Start() // 开始
Stop() // 停止
SetWaitListMaxLength(l int) // 设置等待队列的最大长度,默认:1000
SetWorkPoolSize(s int) // 设置并行的数量,默认:100
AddAsyncTask(t OneToManyTaskInterface, cb func(res *OneToManyTaskResult), handles ...OneToManyTaskHandleInterface) error // 增加一个异步任务
CheckTaskInWork(tag string) (bool, error) // 查验任务是否在执行
GetWaitLength() int // 获取当前等待队列长度
GetWorkerLength() int // 获取当前工作队列长度
GetManagerStatus() uint8 // 获取管理器状态
}
// OneToManyTaskResult 一对多异步任务 执行结果
type OneToManyTaskResult struct {
StartTime int64
EndTime int64
Status uint8
Task OneToManyTaskInterface
ResultMap map[string]interface{}
}