flowprocess

package module
v1.2.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 2, 2023 License: Apache-2.0 Imports: 6 Imported by: 1

README

Flow and Parallel processing framework

It provides a easy way to create a flow process and significantly improve the efficiency of data processing.

Architecture Diagram

Architecture Diagram

Usage

For example, we count the file words and get the top 10 occurrences of the words. The test file is too small, you can enlarge the file by copying it several times. Let's compare the two ways below:

1、General way
	wordCount := map[string]int{}
	reverse := true
	//You can replace the file with a larger file.
	file := "testfile/2553.txt"
	start := time.Now()
	f, err := os.Open(file)
	if err != nil {
		panic(err)
	}
	defer f.Close()

	sc := bufio.NewScanner(f)
	//split lines
	for sc.Scan() {
		line := sc.Text()
		sps := splitText(line)
		for i := 0; i < len(sps); i++ {
			st := strings.TrimSpace(sps[i])
			if len(st) > 0 {
				wordCount[st]++
			}
		}
	}
	//sort by word occurrence times desc
	sortedWc := sortWc(wordCount, reverse)

	duration := time.Since(start)

	//print elapsed time
	fmt.Printf("duration(ms):%v\n", duration.Milliseconds())

	//print topN
	topN := 10
	if topN > len(sortedWc) {
		topN = len(sortedWc)
	}
	fmt.Println("sortedWc-top", topN, ":")
	for i := 0; i < topN; i++ {
		fmt.Println(sortedWc[i])
	}

The 'General way' is slow and has lower CPU and IO usage when the file is very large.

2、Flow and Parallel way

We separate IO and CPU operations.

(1) define flownode-0 processor ( read file lines )
//ReadFileProcessor reads file lines, and put the line into a OutTaskChan for next flow-node to process. 
type ReadFileProcessor struct {
	Filepath string
}

func (g *ReadFileProcessor) Proccess(inTasks flowprocess.InTaskChan, outTask flowprocess.OutTaskChan, ctx context.Context) (cancelAllProcess bool)  {
	f, err := os.Open(g.Filepath)
	if err != nil {
		panic(err)
	}
	defer f.Close()

	sc := bufio.NewScanner(f)
	for sc.Scan() {
		select {
		case <- ctx.Done() :
			return
		default:
			line := sc.Text()
			outTask <- line
		}
	}
	return 
}
(2) define flownode-1 processor ( split and count )
//SplitAndCountProcessor splits the line and counts the word occurrence.
type SplitAndCountProcessor struct {
}

func (s *SplitAndCountProcessor) Proccess(inTasks flowprocess.InTaskChan, outTask flowprocess.OutTaskChan, ctx context.Context) (cancelAllProcess bool)  {
	wordCount := map[string]int{}
	for {
		select {
		case <-ctx.Done():
			return true
		case task, ok := <-inTasks:
			if ok {
				line := task.(string)
				sps := splitText(line)
				for i := 0; i < len(sps); i++ {
					st := strings.TrimSpace(sps[i])
					if len(st) > 0 {
						wordCount[st]++
					}
				}
			} else {
				outTask <- wordCount
				return
			}
		}
	}
}
(3) define flownode-2 processor ( summarize )
//SumWordCountProcessor summarizes the word occurrence.
type SumWordCountProcessor struct {
	reverse   bool
}

func (s *SumWordCountProcessor) Proccess(inTasks flowprocess.InTaskChan, outTask flowprocess.OutTaskChan, ctx context.Context) (cancelAllProcess bool)  {
	wordCount := map[string]int{}
	for {
		select {
		case <-ctx.Done():
			return true
		case task, ok := <-inTasks:
			if ok {
				wc := task.(map[string]int)
				for key, val := range wc {
					wordCount[key] += val
				}
			} else {
				sortedWc := sortWc(wordCount, s.reverse)
				outTask <- sortedWc
				return
			}
		}
	}
}
(4) define flow process
    start := time.Now()
	fp := flowprocess.NewFlow()
	queneCount := 4000
	//Node-0: read file lines. We define 1 processor to read file.
	fp.AddNodeProcessors(queneCount,
		&ReadFileProcessor{
			//You can replace the file with a larger file.
			Filepath: "testfile/2553.txt",
		})

	//Node-1: split and count. we define 4 parallel processors to split and count.
	fp.AddNodeProcessors(queneCount,
		&SplitAndCountProcessor{},
		&SplitAndCountProcessor{},
		&SplitAndCountProcessor{},
		&SplitAndCountProcessor{},
	)

	result := &SumWordCountProcessor{
		reverse: true,
	}

	//Node-2: we define 1 processor to summarize.
	fp.AddNodeProcessors(1,
		result,
	)

	fp.Start()
	if res, ok := fp.Result(); ok {
		sortedWc := res.([]wordAndCount)
		duration := time.Since(start)
		fmt.Printf("duration(ms):%v\n", duration.Milliseconds())

		topN := 10
		if topN > len(sortedWc) {
			topN = len(sortedWc)
		}
		fmt.Println("sortedWc-top", topN, ":")
		for i := 0; i < topN; i++ {
			fmt.Println(sortedWc[i])
		}
	}

The 'Flow and Parallel way' is faster and has higher CPU and IO usage when the file is very large.

3、A practice

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type DefaultProcessor added in v1.0.1

type DefaultProcessor struct {
	// contains filtered or unexported fields
}

func (*DefaultProcessor) Proccess added in v1.0.1

func (p *DefaultProcessor) Proccess(inTaskChan InTaskChan, outTaskChan OutTaskChan, ctx context.Context) (cancelAllProcess bool)

type Flow added in v0.0.4

type Flow interface {
	// AddNodeProcessors 添加流处理节点。
	// outChanSize 表示该节点的输出通道大小。
	// processors 为处理逻辑,每个Processor占用单独的goroutine,此方法开放性好。
	AddNodeProcessors(outChanSize int, processors ...Processor) Node
	// AddNodeTaskHandlers 添加流处理节点。
	// outChanSize 表示该节点的输出通道大小。
	// taskHandlers 为处理逻辑,每个TaskHandler占用单独的goroutine,当需要追踪任务执行轨迹时,可以使用该方法。
	AddNodeTaskHandlers(outChanSize int, taskHandlers ...TaskHandler) Node
	// Start 启动流处理引擎。
	Start()
	// Await 等待流处理引擎执行完成。
	Await()
	// Result 等待执行结果。该方法会阻塞,直到有结果输出。当ok为true时,结果有效。
	// 当任务的执行结果有多个时,可循环调用该方法获取结果。
	Result() (result interface{}, ok bool)
}

Flow 表示流处理引擎。

type InTaskChan added in v0.0.3

type InTaskChan <-chan Task

type Node added in v1.0.0

type Node interface {
	Done() <-chan struct{}
	// Cancel cancels the task execution,the submitted tasks will be executed.
	Cancel()
	// SetProcessorSelector
	SetProcessorSelector(processorSelector ProcessorSelector)

	SubmitTask(task Task) error
	// contains filtered or unexported methods
}

Node defines a node to process tasks

func NewNode added in v1.0.0

func NewNode(processors []Processor, outChanSize int, id int, preNode Node) Node

NewNode 创建流处理引擎节点。processors 表示处理逻辑,每个 Processor独占一个goroutine。processorChanBufferSize 为输出通道的大小。id 为节点标识。 preNode 指向上一个节点,取消任务时,会调用preNode.Cancel()。

type NodeKey added in v1.1.0

type NodeKey string
const (
	NODE_ID NodeKey = "nodeId"
)

type OutTaskChan added in v0.0.3

type OutTaskChan chan<- Task

type Processor

type Processor interface {
	Proccess(inTasks InTaskChan, outTask OutTaskChan, ctx context.Context) (cancelAllProcess bool)
}

Processor processes task from the previous FlowNode, and place the result into outTask chan which will be proccessed by the next FlowNode. A node includes multiple processors which run concurrently.

func NewTaskHandlerProcessors added in v1.0.1

func NewTaskHandlerProcessors(
	taskAt func(task TraceableTask, nodeId int),
	onNewTaskCreated func(task TraceableTask, nodeId int),
	onTaskFinished func(task TraceableTask, nodeId int, err error),
	taskhandlers ...TaskHandler) []Processor

type ProcessorSelector added in v1.2.2

type ProcessorSelector interface {
	// DefineProcessorInTaskChan 为每个processor定义一个输入chan,返回值为输入chan的id
	DefineProcessorInTaskChan() (processorInTaskChanIndexes []int)
	// SelectInTaskChanIndex 将task分配给某个chan。
	SelectInTaskChanIndex(task Task) (inTaskChanIndex int)
}

ProcessorSelector 根据任务属性,决定将任务分发给哪个processor。

针对某个node,将输入TaskChan分解成多个TaskChan,作为node内processor的输入。node内的每个processor,都对应一个输入TaskChan。多个node可以使用同一个输入TaskChan。
应用场景:
1、当task之间存在依赖关系,即:task1执行完成之后才能执行task2,此时需要将task1和task2分配到同一个processor

type Task

type Task interface{}

type TaskChan

type TaskChan chan Task

type TaskHandler added in v1.0.1

type TaskHandler interface {
	// Handle 处理任务。如果返回的err不为nil,则会中断流处理。
	// dispatch 用于转发任务至下一个节点。
	Handle(inTask Task, dispatch func(outTask Task) error) (err error)

	// OnCompleted 当前节点的任务处理完毕时,回调该方法。如果返回的err不为nil,则会中断流处理。
	OnCompleted(dispatch func(outTask Task) error) (err error)
}

TaskHandler 定义任务处理逻辑。

func NewSortTaskHandler added in v1.2.2

func NewSortTaskHandler(taskStartId uint64, maxBlocking int) TaskHandler

NewSortTaskHandler 对任务按照taskId进行排序,taskId越小,越先被转发给下一个节点。 taskStartId:第一个任务的id maxBlocking:队列的最大长度。如果超过最大长度,则会转发队列中taskId最小的任务。

type TaskHandlerAdapter added in v1.1.0

type TaskHandlerAdapter struct {
}

func (TaskHandlerAdapter) OnCompleted added in v1.1.0

func (h TaskHandlerAdapter) OnCompleted(dispatch func(outTask Task) error) (err error)

type TraceableFlow added in v1.2.2

type TraceableFlow interface {
	Flow
	// taskAt 用于跟踪任务。当TraceableTask流转到某个节点时,回调该方法。
	SetTaskAt(taskAt func(task TraceableTask, nodeId int))
	// onNewTaskCreated 当新的TraceableTask被创建时,回调该方法。
	SetOnNewTaskCreated(onNewTaskCreated func(task TraceableTask, nodeId int))
	// onTaskFinished 用于跟踪任务。当TraceableTask结束时,回调该方法。
	SetOnTaskFinished(onTaskFinished func(task TraceableTask, nodeId int, err error))
}

func NewFlow added in v1.1.0

func NewFlow() TraceableFlow

NewFlow 创建流处理引擎。

type TraceableTask added in v1.1.0

type TraceableTask interface {
	// TaskId 用于获取TaskId,便于追踪。
	TaskId() uint64
	// Inner 用于获取底层的Task。
	Inner() Task
}

TraceableTask 表示可追踪的task。

func ToTraceableTask added in v1.1.0

func ToTraceableTask(taskId uint64, task Task) TraceableTask

ToTraceableTask 将Task转为TraceableTask。

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL