wfengine

package
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 15, 2021 License: Apache-2.0 Imports: 15 Imported by: 1

Documentation

Index

Constants

View Source
const FlowMaxAutoRetryTimes = 10

Variables

This section is empty.

Functions

func LoadUnCompletedWfRuntime

func LoadUnCompletedWfRuntime(wfRuntime *WfRuntime, wf *ResourceWorkflow) error

Types

type CreateHookFunc

type CreateHookFunc func(wfRuntime *WfRuntime) (WfHook, error)

type CreateMementoStorageFunc

type CreateMementoStorageFunc func(resource statemachine.StateResource) (WfRuntimeMementoStorage, error)

type CreateMetaLoaderFunc

type CreateMetaLoaderFunc func(resourceType string) (WfMetaLoader, error)

type FlowMeta

type FlowMeta struct {
	FlowName             string      `yaml:"flowName,omitempty"`
	RecoverFromFirstStep bool        `yaml:"recoverFromFirstStep,omitempty"`
	Steps                []*StepMeta `yaml:"steps,omitempty"`
}

type FlowStatusType

type FlowStatusType string
const (
	FlowStatusPrepared        FlowStatusType = "prepared"
	FlowStatusRunning         FlowStatusType = "running"
	FlowStatusFailed          FlowStatusType = "failed"
	FlowStatusWaiting         FlowStatusType = "waiting"
	FlowStatusFailedCompleted FlowStatusType = "failedAndCompleted" //失败,但不需要再次重试的状态
	FlowStatusCompleted       FlowStatusType = "completed"
)

type Recover

type Recover interface {
	GetResourceRecoverInfo(resource statemachine.StateResource, conf map[define.WFConfKey]string) (recover bool, preStatus statemachine.State)
	CancelResourceRecover(resource statemachine.StateResource, conf map[define.WFConfKey]string) error
	SaveResourceInterruptInfo(resource statemachine.StateResource, conf map[define.WFConfKey]string, reason, message, prevState string) error
}

type ResourceWorkflow

type ResourceWorkflow struct {
	Logger           logr.Logger
	Resource         statemachine.StateResource
	WfManager        *WfManager
	MementoCareTaker *WfRuntimeMementoCareTaker
}

func CreateResourceWorkflow

func CreateResourceWorkflow(
	resource statemachine.StateResource,
	wfManager *WfManager,
) (*ResourceWorkflow, error)

func (*ResourceWorkflow) CommonWorkFlowMainEnter

func (m *ResourceWorkflow) CommonWorkFlowMainEnter(ctx context.Context, resource statemachine.StateResource, flowName string, ignoreUnCompleted bool, eventChecker statemachine.EventChecker) error

func (*ResourceWorkflow) GetLastUnCompletedRuntime

func (m *ResourceWorkflow) GetLastUnCompletedRuntime() (*WfRuntime, error)

func (*ResourceWorkflow) RetryInterruptedStep

func (m *ResourceWorkflow) RetryInterruptedStep() error

func (*ResourceWorkflow) Run

func (m *ResourceWorkflow) Run(
	ctx context.Context,
	flowName string,
	initContext map[string]interface{},
) (err error)

func (*ResourceWorkflow) RunLastUnCompletedRuntime

func (m *ResourceWorkflow) RunLastUnCompletedRuntime(ctx context.Context, flowName string, ignoreOtherUnCompleted bool) (oldWfName string, isWaiting bool, err error)

func (*ResourceWorkflow) RunUnCompletedRuntime

func (m *ResourceWorkflow) RunUnCompletedRuntime(ctx context.Context, wfRuntime *WfRuntime) error

type StepAction

type StepAction interface {
	Init(map[string]interface{}, logr.Logger) error
	DoStep(context.Context, logr.Logger) error
	Output(logr.Logger) map[string]interface{}
}

type StepGroupMeta

type StepGroupMeta struct {
	GroupName string      `yaml:"groupName,omitempty"`
	Steps     []*StepMeta `yaml:"steps,omitempty"`
}

type StepMeta

type StepMeta struct {
	ClassName string `yaml:"className,omitempty"`
	StepName  string `yaml:"stepName,omitempty"`
}

type StepRuntime

type StepRuntime struct {
	StepName             string                 `json:"StepName,omitempty"`
	StepStatus           StepStatusType         `json:"stepStatus,omitempty"`
	StepStartTime        string                 `json:"stepStartTime,omitempty"`
	LastStepCompleteTime string                 `json:"lastStepCompleteTime,omitempty"`
	ContextOutput        map[string]interface{} `json:"contextOutput,omitempty"` // 会传递给下一个step的context.
	RetryTimes           int                    `json:"retryTimes,omitempty"`
}

type StepStatusType

type StepStatusType string
const (
	StepStatusPrepared  StepStatusType = "prepared"
	StepStatusInited    StepStatusType = "inited"
	StepStatusFailed    StepStatusType = "failed"
	StepStatusCompleted StepStatusType = "completed"
	StepStatusWaiting   StepStatusType = "waiting"
)

type WfHook

type WfHook interface {
	// 流程初始化钩子
	OnWfInit() error

	// 流程结束时执行
	OnWfCompleted() error

	// 流程中断时执行
	OnWfInterrupt(*define.InterruptError) error

	// 流程步骤执行前运行
	OnStepInit(step *StepRuntime) error

	// 步骤等待状态时执行
	OnStepWaiting(step *StepRuntime) error

	// 流程步骤执行后运行
	OnStepCompleted(step *StepRuntime) error
}

type WfManager

type WfManager struct {
	ResourceType string
	FlowMetaMap  map[string]*FlowMeta
	TypeRegistry map[string]reflect.Type

	Logger logr.Logger
	// contains filtered or unexported fields
}

func CreateWfManager

func CreateWfManager(
	resourceType string,
	workFlowMetaDir string,
	createMetaLoaderFunc CreateMetaLoaderFunc,
	createHookFunc CreateHookFunc,
	createMementoStorageFunc CreateMementoStorageFunc,
) (*WfManager, error)

func (*WfManager) CheckRecovery

func (m *WfManager) CheckRecovery(resource statemachine.StateResource) bool

func (*WfManager) CreateResourceWorkflow

func (m *WfManager) CreateResourceWorkflow(resource statemachine.StateResource) (*ResourceWorkflow, error)

func (*WfManager) GetConfItem

func (m *WfManager) GetConfItem(confKey define.WFConfKey) string

func (*WfManager) RegisterConf

func (m *WfManager) RegisterConf(conf map[define.WFConfKey]string)

func (*WfManager) RegisterLogger

func (m *WfManager) RegisterLogger(logger logr.Logger) *WfManager

func (*WfManager) RegisterRecover

func (m *WfManager) RegisterRecover(recover Recover) *WfManager

func (*WfManager) RegisterStep

func (m *WfManager) RegisterStep(step StepAction)

func (*WfManager) RegisterSteps

func (m *WfManager) RegisterSteps(steps ...StepAction)

type WfMetaLoader

type WfMetaLoader interface {
	GetAllFlowMeta(workFlowMetaDir string) (map[string]*FlowMeta, map[string]*StepGroupMeta, error)
}

type WfRuntime

type WfRuntime struct {
	FlowStatus       FlowStatusType         `json:"flowStatus,omitempty"`
	RetryTimes       int                    `json:"retryTimes,omitempty"`
	FlowName         string                 `json:"flowName,omitempty"`
	IgnoreError      bool                   `json:"ignoreError,omitempty"` // 是否忽略报警.
	StartTime        string                 `json:"startTime,omitempty"`
	CompleteTime     string                 `json:"completeTime,omitempty"`
	ErrorMessage     string                 `json:"errorMessage,omitempty"`
	InitContext      map[string]interface{} `json:"initContext,omitempty"` // 初始化context. 会传给第一个步骤
	RunningSteps     []*StepRuntime         `json:"runningSteps,omitempty"`
	FlowHookIns      WfHook                 `json:"-"` // hook 函数用于在步骤完成/失败后调用,在k8s场景下,在这个方法会去执行落库的动作.
	ResourceWorkflow *ResourceWorkflow      `json:"-"`
	Logger           logr.Logger            `json:"-"`
}

+k8s:deepcopy-gen:

func CreateWfRuntime

func CreateWfRuntime(
	flowName string,
	flowContext map[string]interface{},
	wf *ResourceWorkflow,
) (*WfRuntime, error)

func (*WfRuntime) NewStepActionIns

func (r *WfRuntime) NewStepActionIns(stepMeta *StepMeta) (StepAction, error)

func (*WfRuntime) Start

func (r *WfRuntime) Start(ctx context.Context) (err error)

type WfRuntimeMemento

type WfRuntimeMemento struct {
	// contains filtered or unexported fields
}

工作流运行时数据存储

type WfRuntimeMementoCareTaker

type WfRuntimeMementoCareTaker struct {
	Name              string
	MementoMap        map[string]string
	MementoStorage    WfRuntimeMementoStorage
	Namespace         string
	FlowApplyResource string
}

工作流运行时数据存储管理器

func CreateWfRuntimeMementoCareTaker

func CreateWfRuntimeMementoCareTaker(resource statemachine.StateResource, createStorageFunc CreateMementoStorageFunc) (*WfRuntimeMementoCareTaker, error)

func (*WfRuntimeMementoCareTaker) CreateMemento

func (cm *WfRuntimeMementoCareTaker) CreateMemento(flowName string) (*WfRuntimeMemento, error)

创建一个新的工作流运行时备忘录

func (*WfRuntimeMementoCareTaker) GetAllUnCompletedWorkflowRuntime

func (cm *WfRuntimeMementoCareTaker) GetAllUnCompletedWorkflowRuntime() (map[string]*WfRuntime, error)

获得所有尚未完成的工作流

func (*WfRuntimeMementoCareTaker) GetLastWorkflowRuntime

func (cm *WfRuntimeMementoCareTaker) GetLastWorkflowRuntime() (string, *WfRuntime, error)

获得最后执行的工作流

func (*WfRuntimeMementoCareTaker) LoadMemento

func (cm *WfRuntimeMementoCareTaker) LoadMemento(wf *WfRuntime) (*WfRuntimeMemento, error)

根据已经存在的工作流运行时创建一个备忘录实例,用于更新备忘录

func (*WfRuntimeMementoCareTaker) SaveMemento

func (cm *WfRuntimeMementoCareTaker) SaveMemento(memento *WfRuntimeMemento) error

保存备忘录

type WfRuntimeMementoStorage

type WfRuntimeMementoStorage interface {
	Save(mementoKey, mementoContent string) error
	LoadMementoMap(careTakerName string) (map[string]string, error)
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL