dagflow

package module
v0.0.0-...-ee3be53 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 27, 2020 License: MIT Imports: 4 Imported by: 0

README

dagflow

基于DAG的事件流flow驱动

Usage

使用简单,常用就几个方法:

  • Add: 添加任务节点至DagFlow中。
  • Connect: 连接点与点之间的有向关系。
  • Validate: DAG图校验,只有校验通过后才会执行后续步骤。
  • Run: 执行DAG图事件流。

Job节点需要重载以下函数,详细参加example.go:

type JobNode interface {
  // 事件处理
  Exec()
  // 事件处理完成后调用
  Complete()
  // 函数唯一编号
  dag.Hashable
  // 是否完成
  IsFinished() bool
  // 设置结果
  SetFinished(bo bool)
  // 节点唯一编号
  GetTaskID() uint64
}

Example 1

example1

上图中,明显不是有向无环图,因为存在两个root节点12,所以在执行Validate时会报多个root节点的错。

完整用例见:examples/ex1/ex1.go

Example 2

example2

上图中,主要展示的是无根节点的情况,也就是这个是循环的,所以在执行Validate时会报找不到root节点。

完整用例见:examples/ex2/ex2.go

Example 3

example3

上图中,是一个比较简单的有向无环图,在执行Run时,执行顺序会如图所示,依次执行123

完整用例见:examples/ex3/ex3.go

Example 4

example4

上图中,是一个比较简单的有向无环图,在执行Run时,当检测到一个节点后有多个分支的情况时,会采用并发的形式去执行后续的节点,所以执行顺序会是先执行1,然后同时执行23

完整用例见:examples/ex4/ex4.go

Example 5

example5

上图中,是一个相对复杂的有向无环图,结合Example 3Example 4的情况,执行顺序是首先执行1,随后并行执行234,此时执行完4后,就准备执行6,但检测到要执行6的前提会是先执行完成5,所以后面的顺序是执行完5后再执行6

完整用例见:examples/ex5/ex5.go

如果这个小工具对你有帮助的话欢迎留个Star鼓励一下,😆

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type DagFlow

type DagFlow struct {
	// contains filtered or unexported fields
}

func (*DagFlow) Add

func (df *DagFlow) Add(v ...JobNode)

func (*DagFlow) Connect

func (df *DagFlow) Connect(from, to JobNode)

func (*DagFlow) DownEdgesLen

func (df *DagFlow) DownEdgesLen(v JobNode) int

func (*DagFlow) DownEdgesList

func (df *DagFlow) DownEdgesList(v JobNode) []interface{}

func (*DagFlow) Replace

func (df *DagFlow) Replace(original, replacement JobNode) bool

func (*DagFlow) Root

func (df *DagFlow) Root() (JobNode, error)

func (*DagFlow) Run

func (df *DagFlow) Run() error

func (*DagFlow) TransitiveReduction

func (df *DagFlow) TransitiveReduction()

func (*DagFlow) UpEdgesLen

func (df *DagFlow) UpEdgesLen(v JobNode) int

func (*DagFlow) UpEdgesList

func (df *DagFlow) UpEdgesList(v JobNode) []interface{}

func (*DagFlow) Validate

func (df *DagFlow) Validate() error

type JobNode

type JobNode interface {
	// 事件处理
	Exec()
	// 事件处理完成后调用
	Complete()
	// 函数唯一编号
	dag.Hashable
	// 是否完成
	IsFinished() bool
	// 设置结果
	SetFinished(bo bool)
}

type RootsMap

type RootsMap struct {
	// contains filtered or unexported fields
}

func (*RootsMap) Add

func (rm *RootsMap) Add(jobNode JobNode)

func (*RootsMap) Len

func (rm *RootsMap) Len() int

func (*RootsMap) List

func (rm *RootsMap) List() []JobNode

func (*RootsMap) UnfinishedLen

func (rm *RootsMap) UnfinishedLen() int

func (*RootsMap) UnfinishedList

func (rm *RootsMap) UnfinishedList() []JobNode

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL