gotask

package module
v0.0.0-...-1a1a4f2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 8, 2022 License: MulanPSL-2.0 Imports: 14 Imported by: 0

README

gotask

介绍

基于go语言的异步任务框架

软件架构

软件架构说明

安装教程
  1. xxxx
  2. xxxx
  3. xxxx
使用说明
  1. xxxx
  2. xxxx
  3. xxxx
参与贡献
  1. Fork 本仓库
  2. 新建 Feat_xxx 分支
  3. 提交代码
  4. 新建 Pull Request
特技
  1. 使用 Readme_XXX.md 来支持不同的语言,例如 Readme_en.md, Readme_zh.md
  2. Gitee 官方博客 blog.gitee.com
  3. 你可以 https://gitee.com/explore 这个地址来了解 Gitee 上的优秀开源项目
  4. GVP 全称是 Gitee 最有价值开源项目,是综合评定出的优秀开源项目
  5. Gitee 官方提供的使用手册 https://gitee.com/help
  6. Gitee 封面人物是一档用来展示 Gitee 会员风采的栏目 https://gitee.com/gitee-stars/

Documentation

Index

Constants

View Source
const (
	CREATED  = "created"
	ACQUIRED = "acquired"
	ENQUEUED = "enqeued"
	RUNNING  = "running"
	FAILED   = "failed"
	SUCESSED = "sucessed"
	FINISHED = "finished"
)

Variables

View Source
var (

	// gotasks builtin queue
	FatalQueueName = "fatal"
)

Functions

func AckWhen

func AckWhen(i AckWhenStatus)

AckWhen set when will the ack be sent to broker

func ArgsMapToStruct

func ArgsMapToStruct(am ArgsMap, s interface{}) error

ArgsMapToStruct Convert ArgsMap to struct, e.g. err := ArgsMapToStruct(am, &yourStruct)

func NewConcurrencyLimiter

func NewConcurrencyLimiter(limit int) *concurrencyLimiter

创建新的并发控制器

func Register

func Register(jobName string, handlers ...JobHandler)

func Run

func Run(ctx context.Context, nproc int, queueNames ...string)

Run a worker that listen on queues

func UseRedisBroker

func UseRedisBroker(redisURL string, brokerOptions ...RedisBrokerOption) error

Types

type AckWhenStatus

type AckWhenStatus int

gotasks is a job/task framework for Golang.

Note that job will be executed in register order, and every job handle function must have a signature which match gotasks.JobHandler, which receives a ArgsMap and return a ArgsMap which will be arguments input for next handler.

const (
	AckWhenAcquired AckWhenStatus = iota
	AckWhenSucceed
)

type ArgsMap

type ArgsMap map[string]interface{}

func MapToArgsMap

func MapToArgsMap(v interface{}) ArgsMap

MapToArgsMap Convert golang map to ArgsMap, e.g. am := MapToArgsMap(yourStruct)

func StructToArgsMap

func StructToArgsMap(v interface{}) ArgsMap

StructToArgsMap Convert struct to ArgsMap, e.g. am := StructToArgsMap(yourStruct)

type Broker

type Broker interface {
	Acquire(string) (*Task, error)
	Ack(*Task) bool
	Update(*Task) error
	Enqueue(*Task) string
	QueueLen(string) int64
}

type JobHandler

type JobHandler func(string, ArgsMap) (ArgsMap, error)

func Reentrant

func Reentrant(handler JobHandler, options ...ReentrantOption) JobHandler

type Queue

type Queue struct {
	Name     string
	MaxLimit int
	Async    bool

	// monitor
	MonitorInterval int
}

func NewQueue

func NewQueue(name string, options ...QueueOption) *Queue

func (*Queue) Enqueue

func (q *Queue) Enqueue(jobName string, argsMap ArgsMap) string

type QueueOption

type QueueOption func(*Queue)

func WithAsyncHandleTask

func WithAsyncHandleTask(async bool) QueueOption

func WithMaxLimit

func WithMaxLimit(max int) QueueOption

func WithMonitorInterval

func WithMonitorInterval(seconds int) QueueOption

type RedisBroker

type RedisBroker struct {
	TaskTTL int
}

func (*RedisBroker) Ack

func (r *RedisBroker) Ack(task *Task) bool

func (*RedisBroker) Acquire

func (r *RedisBroker) Acquire(queueName string) (*Task, error)

func (*RedisBroker) Enqueue

func (r *RedisBroker) Enqueue(task *Task) string

func (*RedisBroker) QueueLen

func (r *RedisBroker) QueueLen(queueName string) int64

func (*RedisBroker) Update

func (r *RedisBroker) Update(task *Task) error

type RedisBrokerOption

type RedisBrokerOption func(rb *RedisBroker)

func WithRedisTaskTTL

func WithRedisTaskTTL(ttl int) RedisBrokerOption

type ReentrantOption

type ReentrantOption func(*ReentrantOptions)

func WithMaxTimes

func WithMaxTimes(max int) ReentrantOption

func WithSleepyMS

func WithSleepyMS(ms int) ReentrantOption

type ReentrantOptions

type ReentrantOptions struct {
	MaxTimes int
	SleepyMS int
}

type Task

type Task struct {
	ID                  string    `json:"task_id"`               //任务ID
	CreatedAt           time.Time `json:"created_at"`            // 创建任务的时间
	UpdatedAt           time.Time `json:"updated_at"`            // 最近一次更新任务的时间
	QueueName           string    `json:"queue_name"`            // 任务队列的名字
	JobName             string    `json:"job_name"`              // 任务的名称
	ResultArgsMap       ArgsMap   `json:"result_args_map"`       // 任务执行结果的map
	CurrentHandlerIndex int       `json:"current_handler_index"` // 当前任务所在task执行队列的索引id
	OriginalArgsMap     ArgsMap   `json:"original_args_map"`     //  从消息队列请求的参数map
	ResultLog           string    `json:"result_log"`            // 返回的
	Status              status    `json:"status"`                // 任务的当前状态
	Errlog              string    `json:"err_log"`               //任务的错误结果日志
}

func NewTask

func NewTask(queueName, jobName string, originalArgsMap ArgsMap) *Task

Directories

Path Synopsis
examples

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL