line

package module
v0.0.0-...-f1b9ca1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 24, 2022 License: Apache-2.0 Imports: 9 Imported by: 0

README

Line

Go Report Card Coverage Go Reference

What is Line?
  • A code model similar to pipeline execution. LINE contains a InputQueue, and a series of STAGES and executes them in sequence.
  • Each STAGE could contain several WORKERS, which wait for inputs, execute user-defined function in separated goroutines and output to the next STAGE.
When to use?

If your program has processes similar to below codes.

//fetching items
items := fetchItems()

//handle each item
for _, item := range items {
   err := doSomethingA(item)
   if err != nil {
       continue
   }
   err = doSomethingB(item)
   if err != nil {
       continue
   }
   err = doSomethingC(item)
   ...
}

You may consider refactoring it into below codes, for higher scalability and clarity.

type Func func(item Item) error
type FuncList []Func

type (fl FuncList) Handle(items []Item) {
    for _, item := range items {
        for _, stage := range fl {
            if err := stage(item); err != nil {
                break
            }
        }
    }
}

//define stages
var stages FuncList = []Func{doSomethingA, doSomethingB, doSomethingC, ...}

//fetching items
items := fetchItems()

//handle them
stages.Handle(items)

Further, you may want those functions run in parallel to improve performance, or want to easily adjust them. That's actually what Line aims to do. Line links those functions and gives you ability of management, such as add/remove.

How to use?

First, you need to split your process into several stages reasonably. Then define your object struct which is passed between your stages and define each stage functions with specific function signature (type line.WorkFunc). The param input in type *line.M has a method Item(), from which you can get your object.

// your custom struct 
type MyItem struct {
    paramA interface{}
    paramB interface{}
    ...
}

// your stage function
func YourStage1Func(ctx line.ExecContext, input *line.M) (output *line.M, err error) {
    //you need to assert item's type.
    item := input.Item().(*MyItem)

    if err = doSomethingWithItem(item); err != nil {
        // if you return an error, the process will be interrupted and an ErrHandler will handle this error.
        return nil, err
    }

    // send to next stage, if there is no next stage, the process is done.
    return input, nil
}

func YourStage2Func(ctx line.ExecContext, input *line.M) (output *line.M, err error) {
    item := input.Item().(*MyItem)
    doSomethingElseWithItem(item)
    return input, nil
}

Create Line And Set Stages with your options, then run it.

l := line.New(line.WithMaxQueueLen(10), line.WithPQSupported())

//create your stages with your function and options.
stages := []*line.Stage{
        //WithWorkerNum represents how many goroutines are created to exec your stage function.
        line.NewStage("stage1Name", YourStage1Func, line.WithWorkerNum(10)),
        line.NewStage("stage2Name", YourStage2Func, line.WithWorkerNum(10)),
    }

//Set and run.
l.SetStages(stages).Run()

Now you can input your objects with options (such as setting priority) to the line.

item := &MyItem{}
// nonblock.
l.Input(item, line.WithPriority(10))

item2 := &MyItem2{}
// block until item2 been done.
l.InputAndWait(item2, line.WithPriority(20)) 

items := fetchSomeItems()
// batch input, block until all been done.
l.InputAndWait(items, line.WithBatch())
Features
  • Dynamic stage adjustment, enable add/delete stages while line is running.
  • Dynamic resizing scale of concurrency.
  • Priority input queue supported.
  • Support both block and non-block input.
  • Support pausing/resuming.

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrNoStage     = errors.New("There is no stage. ")
	ErrAlreadyOpen = errors.New("Line is already open. ")
	ErrDupName     = errors.New("Stage name already exist. ")
)

Functions

This section is empty.

Types

type ErrHandler

type ErrHandler func(msg ErrorMsg)

type ErrorMsg

type ErrorMsg struct {
	StageName string
	WorkerID  string
	OccurAt   time.Time
	M         *M
	Err       error
}

type ExecContext

type ExecContext struct {
	context.Context
	WorkerUUID string
}

type InputOption

type InputOption func(*inputOption)

func WithBatch

func WithBatch() InputOption

WithBatch represents treating the input param as a input slice. Be sure of your input param's kind is SLICE when using this option.

func WithHighestPriority

func WithHighestPriority() InputOption

WithPriority sets priority to the highest.

func WithPriority

func WithPriority(priority uint) InputOption

WithPriority sets priority of the input.

func WithWait

func WithWait() InputOption

WithWait option makes the Input function return after all the input objects been done.

type Line

type Line struct {
	// contains filtered or unexported fields
}

func New

func New(option ...LineOption) *Line

New line with options

func (*Line) AppendStages

func (l *Line) AppendStages(stages []*Stage, after ...string) *Line

AppendStages append stages at specific position. if you don't pass the after param or after param not exist in current line, stages will be appended to the last by default. if after param is empty string, stages will be inserted to the front. When line is running, it'll stop the line firstly and rerun after operation been done.

func (*Line) GetStage

func (l *Line) GetStage(stageName string) *Stage

GetStage returns Stage with giving name.

func (*Line) Input

func (l *Line) Input(obj interface{}, opt ...InputOption)

Input pushes your input object(s) to input queue with giving option.

func (*Line) InputAndWait

func (l *Line) InputAndWait(obj interface{}, opt ...InputOption)

InputAndWait pushes your input object(s) to input queue and wait until all been done.

func (*Line) IsOpen

func (l *Line) IsOpen() bool

IsOpen shows whether the line is fetching from the queue.

func (*Line) RemoveStage

func (l *Line) RemoveStage(stageName string) *Line

RemoveStage remove stage by giving name. When line is running, it'll stop the line firstly and rerun after operation been done.

func (*Line) Run

func (l *Line) Run()

Run starts the line. If the line is already started, it does nothing.

func (*Line) SetStages

func (l *Line) SetStages(stages []*Stage) *Line

SetStages set your stages. When line is running, it'll stop the line firstly and rerun after operation been done.

func (*Line) Stop

func (l *Line) Stop() (switched bool)

Stop stops fetching item from queue.

func (*Line) StopAndWait

func (l *Line) StopAndWait() (switched bool)

StopAndWait stops fetching item from queue and wait until nothing is running.

type LineOption

type LineOption func(*lineOption)

func WithCustomQueue

func WithCustomQueue(q Queue) LineOption

WithCustomQueue sets custom input queue.

func WithMaxQueueLen

func WithMaxQueueLen(max int) LineOption

WithMaxQueueLen sets max length of the input queue. Not effective when using custom queue.

func WithPQSupported

func WithPQSupported() LineOption

WithPQSupported sets priority queue supported.

type M

type M struct {
	// contains filtered or unexported fields
}

func (*M) Item

func (m *M) Item() interface{}

type Queue

type Queue interface {
	Enqueue(item *M, priority ...uint)
	Dequeue() *M
	Len() int
	Close()
	IsClosed() bool
}

type Stage

type Stage struct {
	// contains filtered or unexported fields
}

func NewStage

func NewStage(name string, workFunc WorkFunc, option ...StageOption) *Stage

func (*Stage) GetWorkerNum

func (s *Stage) GetWorkerNum() int

func (*Stage) ResizeWorkerNum

func (s *Stage) ResizeWorkerNum(to int)

func (*Stage) SetErrHandler

func (s *Stage) SetErrHandler(to ErrHandler)

func (*Stage) SetFunc

func (s *Stage) SetFunc(to WorkFunc)

func (*Stage) SetTimeout

func (s *Stage) SetTimeout(to time.Duration)

type StageOption

type StageOption func(o *stageOption)

func WithErrHandler

func WithErrHandler(handler ErrHandler) StageOption

func WithTimeout

func WithTimeout(timeout time.Duration) StageOption

func WithWorkerNum

func WithWorkerNum(workerNum int) StageOption

type WorkFunc

type WorkFunc func(ctx ExecContext, input *M) (output *M, err error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL