light_flow

package module
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 27, 2024 License: MIT Imports: 12 Imported by: 0

README

LightFlow

English, 中文

LightFlow is a declarative task arrage framework.

Users only need to focus on the timing of task execution, while the framework automatically handles the orchestration of tasks.

  • Declarative : Focus on execution timing using functional programming.
  • Mergeable Processes: Seamlessly integrate registered processes into the current workflow being constructed.
  • Multilevel Callbacks: Support callbacks at various levels, allowing flexible inclusion of callback logic.
  • Multilevel Configuration: Each level can be configured with priorities for smaller-level configurations.
  • Unique Context Mechanism: Connect step contexts with direct or indirect dependencies and isolate step contexts without dependencies.
  • Maximum Execution Coverage: Even if a step fails, other steps that do not depend on it will continue to execute.
Installation
go get github.com/Bilibotter/light-flow
Usage
import (
	"fmt"
	flow "github.com/Bilibotter/light-flow"
	"strings"
)

func Step1(ctx flow.StepCtx) (result interface{}, err error) {
	// Context can get value set by workflow input
	if input, exists := ctx.Get("flow-input"); exists {
		fmt.Printf("%s get workflow input, value = '%s'\n", ctx.ContextName(), input.(string))
	}
	ctx.Set("step1-key", 1)
	result = "Step1 Result"
	return
}

func Step2(ctx flow.StepCtx) (result interface{}, err error) {
	if value, exists := ctx.Get("step1-key"); exists {
		fmt.Printf("%s get Step1 key, value = '%d'\n", ctx.ContextName(), value.(int))
	}
	// Context can get related step's execute result by step name
	if step1Result, exists := ctx.GetResult("Step1"); exists {
		fmt.Printf("%s get Step1 result, value = `%s`\n", ctx.ContextName(), step1Result.(string))
	}
	return
}

func AfterStepCallback(step *flow.Step) (keepOn bool, err error) {
	if step.Exceptions() != nil {
		fmt.Printf("%s executed faield %s occur\n", step.Name, strings.Join(step.Exceptions(), ","))
	}
	return true, nil
}

func init() {
	workflow := flow.RegisterFlow("WorkFlow")
	process := workflow.Process("Process")
	process.Step(Step1)
	// Step2 depends on Step1
	process.Step(Step2, Step1)
	// Use the callback function to handle errors during step execution
	workflow.AfterStep(false, AfterStepCallback)
}

func main() {
	// Complete the workflow with initial input values
	flow.DoneFlow("WorkFlow", map[string]interface{}{"flow-input": "Hello world!"})
}

Documentation

Index

Constants

View Source
const (
	Before = "Before"
	After  = "After"
)

Variables

View Source
var (
	Pending      = &statusEnum{0, "Pending"}
	Running      = &statusEnum{0b1, "Running"}
	Pause        = &statusEnum{0b1 << 1, "Pause"}
	Success      = &statusEnum{0b1 << 15, "Success"}
	NormalMask   = &statusEnum{0b1<<16 - 1, "NormalMask"}
	Cancel       = &statusEnum{0b1 << 16, "Cancel"}
	Timeout      = &statusEnum{0b1 << 17, "Timeout"}
	Panic        = &statusEnum{0b1 << 18, "Panic"}
	Error        = &statusEnum{0b1 << 19, "Error"}
	Stop         = &statusEnum{0b1 << 20, "Stop"}
	CallbackFail = &statusEnum{0b1 << 21, "CallbackFail"}
	Failed       = &statusEnum{0b1 << 31, "Failed"}
	// AbnormalMask An abnormal step status will cause the cancellation of dependent unexecuted steps.
	AbnormalMask = &statusEnum{NormalMask.flag << 16, "AbnormalMask"}
)

these variable are used to indicate the status of the unit

Functions

func BuildRunFlow

func BuildRunFlow(name string, input map[string]any) *runFlow

func DoneArgs

func DoneArgs(name string, args ...any) resultI

func DoneFlow

func DoneFlow(name string, input map[string]any) resultI

func SetIdGenerator

func SetIdGenerator(method func() string)

Types

type FlowConfig

type FlowConfig struct {
	ProcessConfig `flow:"skip"`
	// contains filtered or unexported fields
}

func CreateDefaultConfig

func CreateDefaultConfig() *FlowConfig

func (*FlowConfig) AfterFlow

func (fc *FlowConfig) AfterFlow(must bool, callback func(*WorkFlow) (keepOn bool, err error)) *callback[*WorkFlow]

func (*FlowConfig) BeforeFlow

func (fc *FlowConfig) BeforeFlow(must bool, callback func(*WorkFlow) (keepOn bool, err error)) *callback[*WorkFlow]

type FlowController

type FlowController interface {
	Done() []*Future
	ListProcess() []string
	ProcessController(name string) controller
	// contains filtered or unexported methods
}

func AsyncArgs

func AsyncArgs(name string, args ...any) FlowController

func AsyncFlow

func AsyncFlow(name string, input map[string]any) FlowController

type FlowMeta

type FlowMeta struct {
	FlowConfig
	// contains filtered or unexported fields
}

func RegisterFlow

func RegisterFlow(name string) *FlowMeta

func (*FlowMeta) NoUseDefault

func (fm *FlowMeta) NoUseDefault() *FlowMeta

func (*FlowMeta) Process

func (fm *FlowMeta) Process(name string) *ProcessMeta

type Future

type Future struct {
	// contains filtered or unexported fields
}

func (*Future) Done

func (f *Future) Done()

Done method waits for the corresponding process to complete.

func (Future) Exceptions added in v0.0.5

func (s Future) Exceptions() []string

Exceptions return contain exception's message

func (Future) ExplainStatus added in v0.0.5

func (s Future) ExplainStatus() []string

ExplainStatus function explains the status represented by the provided bitmask. The function checks the status against predefined abnormal and normal flags, and returns a slice of strings containing the names of the matching flags. Parameter status is the bitmask representing the status. The returned slice contains the names of the matching flags in the layer they were found. If abnormal flags are found, normal flags will be ignored.

func (Future) GetId

func (bi Future) GetId() string

func (Future) GetName

func (bi Future) GetName() string

func (Future) Has added in v0.0.5

func (s Future) Has(enum *statusEnum) bool

func (Future) Normal added in v0.0.5

func (s Future) Normal() bool

Normal return true if not exception occur

func (Future) Success added in v0.0.5

func (s Future) Success() bool

Success return true if finish running and success

type Process added in v0.0.5

type Process struct {
	FlowId string
	// contains filtered or unexported fields
}

func (*Process) ContextName added in v0.0.5

func (pi *Process) ContextName() string

Fix ContextName return first step name

func (*Process) GetId added in v0.0.5

func (bi *Process) GetId() string

func (*Process) GetName added in v0.0.5

func (bi *Process) GetName() string

type ProcessConfig

type ProcessConfig struct {
	StepConfig
	ProcTimeout       time.Duration
	ProcNotUseDefault bool
	// contains filtered or unexported fields
}

func (*ProcessConfig) AfterProcess

func (pc *ProcessConfig) AfterProcess(must bool, callback func(*Process) (keepOn bool, err error)) *callback[*Process]

func (*ProcessConfig) AfterStep

func (pc *ProcessConfig) AfterStep(must bool, callback func(*Step) (keepOn bool, err error)) *callback[*Step]

func (*ProcessConfig) BeforeProcess

func (pc *ProcessConfig) BeforeProcess(must bool, callback func(*Process) (keepOn bool, err error)) *callback[*Process]

func (*ProcessConfig) BeforeStep

func (pc *ProcessConfig) BeforeStep(must bool, callback func(*Step) (keepOn bool, err error)) *callback[*Step]

func (*ProcessConfig) NotUseDefault

func (pc *ProcessConfig) NotUseDefault()

func (*ProcessConfig) ProcessTimeout

func (pc *ProcessConfig) ProcessTimeout(timeout time.Duration) *ProcessConfig

func (*ProcessConfig) StepsRetry

func (pc *ProcessConfig) StepsRetry(retry int) *ProcessConfig

func (*ProcessConfig) StepsTimeout

func (pc *ProcessConfig) StepsTimeout(timeout time.Duration) *ProcessConfig

type ProcessMeta

type ProcessMeta struct {
	ProcessConfig
	// contains filtered or unexported fields
}

func (*ProcessMeta) Merge

func (pm *ProcessMeta) Merge(name string)

Merge will not merge config, because has not effective design to not use merged config.

func (*ProcessMeta) NameStep added in v0.0.6

func (pm *ProcessMeta) NameStep(run func(ctx StepCtx) (any, error), name string, depends ...any) *StepMeta

func (*ProcessMeta) Step

func (pm *ProcessMeta) Step(run func(ctx StepCtx) (any, error), depends ...any) *StepMeta

func (*ProcessMeta) Tail

func (pm *ProcessMeta) Tail(run func(ctx StepCtx) (any, error), alias ...string) *StepMeta

type Step added in v0.0.5

type Step struct {
	StepCtx
	ProcessId string
	FlowId    string
	Start     time.Time
	End       time.Time
	Err       error
	// contains filtered or unexported fields
}

func (*Step) Error added in v0.0.5

func (si *Step) Error() error

func (Step) GetId added in v0.0.5

func (bi Step) GetId() string

func (Step) GetName added in v0.0.5

func (bi Step) GetName() string

type StepConfig

type StepConfig struct {
	StepTimeout time.Duration
	StepRetry   int
}

type StepCtx added in v0.0.5

type StepCtx interface {
	GetEndValues(key string) map[string]any
	GetResult(key string) (value any, exist bool)
	// contains filtered or unexported methods
}

type StepMeta

type StepMeta struct {
	StepConfig
	// contains filtered or unexported fields
}

func (*StepMeta) Next

func (meta *StepMeta) Next(run func(ctx StepCtx) (any, error), alias ...string) *StepMeta

func (*StepMeta) Priority

func (meta *StepMeta) Priority(priority map[string]any)

func (*StepMeta) Retry

func (meta *StepMeta) Retry(retry int) *StepMeta

func (*StepMeta) Same

func (meta *StepMeta) Same(run func(ctx StepCtx) (any, error), alias ...string) *StepMeta

func (*StepMeta) Timeout

func (meta *StepMeta) Timeout(timeout time.Duration) *StepMeta

type WorkFlow added in v0.0.5

type WorkFlow struct {
	// contains filtered or unexported fields
}

func (WorkFlow) GetId added in v0.0.5

func (bi WorkFlow) GetId() string

func (WorkFlow) GetName added in v0.0.5

func (bi WorkFlow) GetName() string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL