async_task_demon

module
v1.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 1, 2023 License: LGPL-2.1

README

异步任务管理器

OneToManyTaskManagerInterface: mapReduce的简单实现

  • oneToMany : 一个task执行多个handle,每一个oneToManyTask就是执行一个定长的AsyncTaskManager
  • 应用场景:数据汇总,消息分发

AsyncTaskManagerInterface: 事件回调模式的异步任务

  • 可以定长任务
  • 可以Stop
  • todo:从等待队列里删除
  • todo:任务超时处理

工作原理

  • 在加入任务时或任务完成时都会触发"任务从等待队列转到工作队列"的动作,并有触发条件与互斥锁保证了任务不会丢失与不会重复
  • 任务管理者(有默认实现)
  • 任务执行者interface
  • 任务(有默认实现)

注意

  • 大并发操作DB是,一定注意链接数问题
  • 常用的orm,经测试,只有xorm能锁定链接数

测试异步写库

  • orm: xorm,1000的链接数
  • 总任务:100000
  • 并行数:1000
  • 任务执行者:40次写入
  • 用时:261秒,写入4000000条记录

// AsyncTaskManagerInterface  异步任务管理器通用接口
type AsyncTaskManagerInterface interface {
	Start()                                                                              // 开始
	Stop()                                                                               // 停止
	SetWaitListMaxLength(l int)                                                          // 设置等待队列的最大长度,默认:1000
	SetWorkPoolSize(s int)                                                               // 设置并行的数量,默认:100
	AddAsyncTask(t AsyncTaskInterface, h AsyncTaskHandle, cb ...AsyncTaskCallBack) error // 增加一个异步任务
	SetFinishCallBack(f func(*TaskManagerResult))                                        // 设置任务完成的回调
	CheckTaskInWork(tag string) (bool, error)                                            // 查验任务是否在执行
	GetWaitLength() int                                                                  // 获取当前等待队列长度
	GetWorkerLength() int                                                                // 获取当前工作队列长度
	GetManagerStatus() uint8                                                             // 获取管理器状态
	SetTaskMaxLength(s int)                                                              // 设置最大任务数
	SetProgressCallBack(p int, cb ProgressCallBack)                                      // 设置进度回调
}

type AsyncTaskInterface interface {
	GetTag() *TaskTag       // 任务全局唯一标识
	SetStartUnix(end int64) // 设置任务开始时间
	GetStartUnix() int64    // 获取任务开始时间
	SetEndUnix(end int64)   // 设置任务结束时间
	GetEndUnix() int64      // 获取任务结束时间
	SetStatus(s uint8)      // 设置当前任务状态
	GetStatus() uint8       // 获取当前任务状态
	SetError(e error)
	GetError() error
}

type AsyncTaskHandle interface {
	Start(AsyncTaskInterface)              // 开始执行任务
	Stop()                                 // 结束任务,然后拉起回调,返回enmu.UserStopError
	SetHandleCallBack(f AsyncTaskCallBack) // 配置执行的回调
}

// AsyncTaskCallBack  异步任务回调
type AsyncTaskCallBack func(AsyncTaskInterface)

// TaskManagerResult   任务管理器执行结果
type TaskManagerResult struct {
	StartTime int64
	EndTime   int64
	Length    uint64
	TagsMap   map[string]AsyncTaskInterface
	Status    uint8
}

// ProgressCallBack   进度回调
type ProgressCallBack func(*TaskProgress)

// TaskProgress    进度信息
type TaskProgress struct {
	WorkNum    int   // 任务并行数
	AllTaskNum int   // 总任务数,如果没有SetTaskMaxLength(),返回0
	Progress   int   // 当前完成个数
	ParentTime int64 // 上次进度回调的时间
}

// OneToManyTaskInterface   一对多异步任务通用接口
type OneToManyTaskInterface interface {
	AsyncTaskInterface
	SetTaskTag(tag *TaskTag)
	GetHandleTag() *TaskTag
	SetHandleTag(tag *TaskTag)
	GetHandleResult() interface{}
	SetHandleResult(res interface{})
	Copy() OneToManyTaskInterface
}

// OneToManyTaskHandleInterface   一对多异步任务Handle通用接口
type OneToManyTaskHandleInterface interface {
	Start(AsyncTaskInterface)              // 开始执行任务
	Stop()                                 // 结束任务,然后拉起回调,返回enmu.UserStopError
	SetHandleCallBack(f AsyncTaskCallBack) // 配置执行的回调
	GetTarget() *TaskTag                   // 同一任务中唯一
}

// OneToManyTaskManagerInterface  一对多异步任务管理通用接口
type OneToManyTaskManagerInterface interface {
	Start()                                                                                                                  // 开始
	Stop()                                                                                                                   // 停止
	SetWaitListMaxLength(l int)                                                                                              // 设置等待队列的最大长度,默认:1000
	SetWorkPoolSize(s int)                                                                                                   // 设置并行的数量,默认:100
	AddAsyncTask(t OneToManyTaskInterface, cb func(res *OneToManyTaskResult), handles ...OneToManyTaskHandleInterface) error // 增加一个异步任务
	CheckTaskInWork(tag string) (bool, error)                                                                                // 查验任务是否在执行
	GetWaitLength() int                                                                                                      // 获取当前等待队列长度
	GetWorkerLength() int                                                                                                    // 获取当前工作队列长度
	GetManagerStatus() uint8                                                                                                 // 获取管理器状态
}

// OneToManyTaskResult    一对多异步任务 执行结果
type OneToManyTaskResult struct {
	StartTime int64
	EndTime   int64
	Status    uint8
	Task      OneToManyTaskInterface
	ResultMap map[string]interface{}
}


Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL