dagRun

package module
v0.0.0-...-3a329e9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 12, 2024 License: MIT Imports: 10 Imported by: 0

README

dag-run

A simple multi-task concurrent scheduling library,Multiple tasks with dependencies can be automatically scheduled to run concurrently according to their dependencies, minimizing the running time

go get github.com/ycl2018/dag-run

Feature

  • install `go get github.com/ycl2018/dag-run`

  • Generic implementation, support go1.18

  • Based on sync.WaitGroup, very simple and lightweight implementation

  • Support fail fast, if a task returns an error during operation, the rest of the unrunning tasks will be canceled

  • You can use TaskManager to easily register and get your Task tasks

  • support add injector

中文说明

一个简单的多任务并发调度工具,可以将具有依赖关系的多个任务,自动按其依赖关系并发调度运行

go get github.com/ycl2018/dag-run

特性:

  • 支持泛型

  • 基于sync.WaitGroup,非常简单、轻量的实现

  • 支持fail fast,运行中如果有任务返回错误,则取消其余未运行任务

  • 可以使用TaskManager来方便的注册和获取你的Task任务

  • 支持提交函数任务/结构体任务

  • 支持注入injector,在每个任务执行前后插入通用的业务逻辑,如打点、监控等

Example1:函数任务

example1

这个例子中,任务B、C依赖A任务的完成、任务D依赖B、C任务的完成。任一任务返回错误,执行将会提前终止。

err := dagRun.NewFuncScheduler().
		Submit("A", a). /* 参数: 名称,任务函数,依赖 */
		Submit("B", b, "A").
		Submit("C", c, "A").
		Submit("D", d, "B", "C").
		Run()

var a = func() error {return nil}
var b = func() error {return nil}
var c = func() error {return nil}
var d = func() error {return nil}

Example2:普通任务

实际业务场景下,任务的执行通常会在一个确定的执行环境中,如提供任务入参、任务配置、收集任务结果等。你可以通过实现Task接口来定义你的任务,其中的泛型参数T即为任务环境参数。

// Task is the interface all your tasks should implement
type Task[T any] interface {
	Name() string
	Dependencies() []string
	Execute(context.Context, T) error
}

本例展示任务在执行环境参数sync.Map下的使用场景。

example1


type taskA struct{}
func (ta taskA) Name() string {return "A"}
func (ta taskA) Dependencies() []string {return nil}
func (ta taskA) Execute(ctx context.Context, runCtx *sync.Map) error {return nil}

type taskB struct{}
func (tb taskB) Name() string {return "B"}
func (tb taskB) Dependencies() []string {return []string{"A"}}
func (tb taskB) Execute(ctx context.Context, runCtx *sync.Map) error {return nil}

type taskC struct{}
func (tc taskC) Name() string {return "C"}
func (tc taskC) Dependencies() []string {return []string{"A"}}
func (tc taskC) Execute(ctx context.Context, runCtx *sync.Map) error {return nil}

ds := NewScheduler[*sync.Map]()
ds.Submit(taskA{})
ds.Submit(taskB{})
ds.Submit(taskC{})
err := ds.Run(context.Background(), &sync.Map{})

拦截器

支持自定义拦截器工厂,为每个任务生成一个拦截器,以便在其执行前后做一些前置/后置处理。

拦截器和拦截器工厂接口定义

type Injector[T any] struct {
	Pre   func(ctx context.Context, runCtx T)
	After func(ctx context.Context, runCtx T, err error) error
}

type InjectorFactory[T any] interface {
	Inject(ctx context.Context, task Task[T]) Injector[T]
}

type InjectorFactoryFunc[T any] func(ctx context.Context, task Task[T]) Injector[T]

func (i InjectorFactoryFunc[T]) Inject(ctx context.Context, task Task[T]) Injector[T] {
	return i(ctx, task)
}

比如在每个任务执行前后打印日志的自定义拦截器工厂实现

ds = ds.WithInjectorFactory(InjectorFactoryFunc[*sync.Map](func(ctx context.Context, task Task[*sync.Map]) Injector[*sync.Map] {
		return Injector[*sync.Map]{
			// 任务执行前
			Pre: func(ctx context.Context, runCtx *sync.Map) {
				log.Printf("task:%s start at:%s\n", task.Name(), time.Now())
			},
			// 任务执行后
			After: func(ctx context.Context, runCtx *sync.Map, err error) error {
				log.Printf("task:%s end at:%s\n", task.Name(), time.Now())
				return err
			},
		}
	}))

Dump为DOT语言

支持将构建的任务有向图转换为dot语言显示

dotStr := dagRun.NewFuncScheduler().
		Submit("A", a).
		Submit("B", b, "A").
		Submit("C", c, "A").
		Submit("D", d, "B", "C").
		Submit("E", e, "C", "D").
		Dot()

println(dotStr)

// digraph G {
// "start"[shape=box,color="green"]
// "end"[shape=box,color="red"]
// "A" -> {"B","C"}
// "B" -> {"D"}
// "C" -> {"D","E"}
// "D" -> {"E"}
// "start" -> {"A"}
// {"E"}  -> "end"
// }

将打印转换为图形:

Documentation

Index

Examples

Constants

This section is empty.

Variables

View Source
var (
	ErrNilTask      = errors.New("dagRun: nil task")
	ErrTaskExist    = errors.New("dagRun: task already exist")
	ErrNoTaskName   = errors.New("dagRun: no task name")
	ErrNilFunc      = errors.New("dagRun: nil func")
	ErrTaskNotExist = errors.New("dagRun: task not found")
	ErrSealed       = errors.New("dagRun: dag is sealed")
)

Functions

func NopeWalker

func NopeWalker(_ Node) error

Types

type DepTask

type DepTask interface {
	Name() string
	Dependencies() []string
}

type FuncScheduler

type FuncScheduler struct {
	// contains filtered or unexported fields
}

FuncScheduler simple scheduler for func tasks

func NewFuncScheduler

func NewFuncScheduler() *FuncScheduler

NewFuncScheduler build a func task scheduler

func (*FuncScheduler) Dot

func (d *FuncScheduler) Dot() string

Dot dump dag in dot language

func (*FuncScheduler) Err

func (d *FuncScheduler) Err() error

Err check if any error happens

func (*FuncScheduler) Run

func (d *FuncScheduler) Run() error

Run start all tasks and block till all of them done or meet critical err

func (*FuncScheduler) Submit

func (d *FuncScheduler) Submit(name string, f func() error, deps ...string) *FuncScheduler

Submit provide func task to scheduler the param `name` is the taskID which should be unique, `deps` are the names of tasks that this task dependents, `f` defines what this task really does

func (*FuncScheduler) WithInjectorFactory

func (d *FuncScheduler) WithInjectorFactory(injectFac InjectorFactory[types.Nil]) *FuncScheduler

type Graph

type Graph struct {
	Nodes []Node
	Edges map[Node][]Node
	// contains filtered or unexported fields
}

func NewGraph

func NewGraph() *Graph
Example
fmt.Println(g().String())
Output:

[A]-> [C,D,]
[B]-> [A,E,D,]
[C]-> [D,]
[D]-> [E,]
[E]-> []

func (*Graph) AddEdge

func (g *Graph) AddEdge(from Node, to Node)

func (*Graph) AddNode

func (g *Graph) AddNode(n Node)

func (*Graph) BFS

func (g *Graph) BFS(walker Walker) error
Example
graph := g()
_ = graph.BFS(func(node Node) error {
	fmt.Println(node.String())
	return nil
})
Output:

A
C
D
E
B

func (*Graph) DFS

func (g *Graph) DFS(walker Walker) error
Example
graph := g()
// graph.AddEdge(nodes[4], nodes[0])
err := graph.DFS(func(node Node) error {
	fmt.Println(node.String())
	return nil
})
if err != nil {
	fmt.Println("err ", err)
}
Output:

A
C
D
E
B

func (*Graph) DOT

func (g *Graph) DOT() string
Example
fmt.Println(g().DOT())
Output:

digraph G {
"start"[shape=box,color="green"]
"end"[shape=box,color="red"]
"A" -> {"C","D"}
"B" -> {"A","E","D"}
"C" -> {"D"}
"D" -> {"E"}
"start" -> {"B"}
{"E"}  -> "end"
}

func (*Graph) String

func (g *Graph) String() string

type Injector

type Injector[T any] struct {
	Pre   func(ctx context.Context, runCtx T)
	After func(ctx context.Context, runCtx T, err error) error
}

type InjectorFactory

type InjectorFactory[T any] interface {
	Inject(ctx context.Context, task Task[T]) Injector[T]
}

type InjectorFactoryFunc

type InjectorFactoryFunc[T any] func(ctx context.Context, task Task[T]) Injector[T]

func (InjectorFactoryFunc[T]) Inject

func (i InjectorFactoryFunc[T]) Inject(ctx context.Context, task Task[T]) Injector[T]

type Named

type Named interface{ Name() string }

type Node

type Node interface {
	fmt.Stringer
}

type Registry

type Registry[T Named] struct {
	// contains filtered or unexported fields
}

func NewRegistry

func NewRegistry[T Named]() Registry[T]

func (Registry[T]) Get

func (l Registry[T]) Get(name string) (T, error)

func (Registry[T]) Has

func (l Registry[T]) Has(name string) bool

func (Registry[T]) Register

func (l Registry[T]) Register(t T)

type Scheduler

type Scheduler[T any] struct {
	// contains filtered or unexported fields
}

Scheduler simple scheduler for typed tasks

func NewScheduler

func NewScheduler[T any]() *Scheduler[T]

NewScheduler build a typed task scheduler

func NewWithInjectorFactory

func NewWithInjectorFactory[T any](injectFac InjectorFactory[T]) *Scheduler[T]

NewWithInjectorFactory is shortcut of NewScheduler.WithInjectorFactory

func (*Scheduler[T]) CancelWithErr

func (d *Scheduler[T]) CancelWithErr(err error)

CancelWithErr cancel the tasks which has not been stated in the scheduler

func (*Scheduler[T]) Dot

func (d *Scheduler[T]) Dot() string

Dot dump dag in dot language

func (*Scheduler[T]) Run

func (d *Scheduler[T]) Run(ctx context.Context, x T) error

Run start all tasks and block till all of them done or meet critical err

func (*Scheduler[T]) Submit

func (d *Scheduler[T]) Submit(tasks ...Task[T]) error

Submit provide typed task to scheduler, all task should implement interface Task

func (*Scheduler[T]) SubmitFunc

func (d *Scheduler[T]) SubmitFunc(name string, f func(context.Context, T) error, deps ...string) error

SubmitFunc submit a func task to scheduler

func (*Scheduler[T]) WithInjectorFactory

func (d *Scheduler[T]) WithInjectorFactory(injectFac InjectorFactory[T]) *Scheduler[T]

WithInjectorFactory add a injectorFactory, which run before/after each task.

type Task

type Task[T any] interface {
	Name() string
	Dependencies() []string
	Execute(context.Context, T) error
}

Task is the interface all your tasks should implement

type TaskManager

type TaskManager[T DepTask] struct {
	Registry[T]
}

func NewTaskManager

func NewTaskManager[T DepTask]() TaskManager[T]

func (TaskManager[T]) GetAllTaskWithDepsByName

func (t TaskManager[T]) GetAllTaskWithDepsByName(taskNames []string) (map[string]T, error)

GetAllTaskWithDepsByName get all Tasks with their parent dependencies by names

type Walker

type Walker func(node Node) error

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL