pipeline

package
v0.14.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 23, 2023 License: Apache-2.0 Imports: 25 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// 事件类型
	WfEventNoraml WfEventType = "Normal"   // 正常事件类型,包括状态更新等
	WfEventWarn   WfEventType = "Warnning" // 警告事件类型,不影响业务运行,但需要关注
	WfEventError  WfEventType = "Error"    // 异常事件类型,影响业务运行,例如其他模块函数返回异常

	// 事件值
	WfEventFailureOptionsTriggered WfEventValue = "FailureOpitons"
	WfEventJobUpdate               WfEventValue = "JobUpdate"
	WfEventRunUpdate               WfEventValue = "RunUpdate"
	WfEventDagUpdate               WfEventValue = "DagUpdate"
	WfEventJobSubmitErr            WfEventValue = "JobSubmitErr"
	WfEventJobWatchErr             WfEventValue = "JobWatchErr"
	WfEventJobStopErr              WfEventValue = "JobStopErr"
)

Variables

This section is empty.

Functions

func NewBaseComponentRuntime added in v0.14.3

func NewBaseComponentRuntime(name, fullname string, component schema.Component, seq int, ctx context.Context, failureOpitonsCtx context.Context,
	eventChannel chan<- WorkflowEvent, config *runConfig, parentDagID string) *baseComponentRuntime

func NewInnerSolver added in v0.14.3

func NewInnerSolver(cp schema.Component, runtimeName string, config *runConfig) *innerSolver

func NewParallelismManager added in v0.14.3

func NewParallelismManager(parallelism int) *parallelismManager

func NewReferenceSolver added in v0.14.3

func NewReferenceSolver(ws *schema.WorkflowSource) *referenceSolver

func NewRunConfig added in v0.14.3

func NewRunConfig(workflowSource *schema.WorkflowSource, mainFS *schema.FsMount, userName, runID string, logger *logrus.Entry,
	callbacks WorkflowCallbacks, pplSource string) *runConfig

Types

type BaseJob

type BaseJob struct {
	ID         string            `json:"jobID"`
	Name       string            `json:"name"`       // step名字,不同run的不同step,必须拥有不同名字
	Command    string            `json:"command"`    // 区别于step,是替换后的,可以直接运行
	Parameters map[string]string `json:"parameters"` // 区别于step,是替换后的,可以直接运行
	Artifacts  schema.Artifacts  `json:"artifacts"`  // 区别于step,是替换后的,可以直接运行
	Env        map[string]string `json:"env"`
	StartTime  string            `json:"startTime"`
	EndTime    string            `json:"endTime"`
	Status     schema.JobStatus  `json:"status"`
	Message    string            `json:"message"`
}

func NewBaseJob

func NewBaseJob(name string) *BaseJob

type BaseWorkflow

type BaseWorkflow struct {
	Name   string                 `json:"name,omitempty"`
	RunID  string                 `json:"runId,omitempty"`
	Desc   string                 `json:"desc,omitempty"`
	Params map[string]interface{} `json:"params,omitempty"`
	Extra  map[string]string      `json:"extra,omitempty"` // 可以存放一些ID,fsId,userId等
	Source schema.WorkflowSource  `json:"-"`               // Yaml string
	// contains filtered or unexported fields
}

func NewBaseWorkflow

func NewBaseWorkflow(wfSource schema.WorkflowSource, runID string, params map[string]interface{}, extra map[string]string) BaseWorkflow

type CacheCalculator

type CacheCalculator interface {

	// 计算第一层 fingerprint
	CalculateFirstFingerprint() (fingerprint string, err error)

	// 计算 第二层 fingerprint
	CalculateSecondFingerprint() (fingerprint string, err error)
	// contains filtered or unexported methods
}

func NewCacheCalculator

func NewCacheCalculator(job PaddleFlowJob, cacheConfig schema.Cache, logger *logrus.Entry,
	mainFs *schema.FsMount, extraFs []schema.FsMount) (CacheCalculator, error)

调用方应该保证在启用了 cache 功能的情况下才会调用NewCacheCalculator

func NewConservativeCacheCalculator

func NewConservativeCacheCalculator(job PaddleFlowJob, cacheConfig schema.Cache, logger *logrus.Entry,
	mainFs *schema.FsMount, extraFs []schema.FsMount) (CacheCalculator, error)

调用方应该保证在启用了 cache 功能的情况下才会调用NewConservativeCacheCalculator

type ConditionCalculator added in v0.14.3

type ConditionCalculator struct {
	// contains filtered or unexported fields
}

func NewConditionCalculator added in v0.14.3

func NewConditionCalculator(condition string) *ConditionCalculator

type CtxAndCancel added in v0.14.3

type CtxAndCancel struct {
	// contains filtered or unexported fields
}

type DagRuntime added in v0.14.3

type DagRuntime struct {

	// 用于解析依赖参数模板
	*DependencySolver

	// dagruntime 的全局唯一标识符
	ID string
	// contains filtered or unexported fields
}

func NewDagRuntime added in v0.14.3

func NewDagRuntime(name, fullName string, dag *schema.WorkflowSourceDag, seq int, ctx context.Context, failureOpitonsCtx context.Context,
	eventChannel chan<- WorkflowEvent, config *runConfig, parentDagID string) *DagRuntime

func (DagRuntime) CalculateCondition added in v0.14.3

func (crt DagRuntime) CalculateCondition() (bool, error)

func (*DagRuntime) CancellNotReadyComponent added in v0.14.3

func (drt *DagRuntime) CancellNotReadyComponent(subComponent schema.Component, reason string)

func (*DagRuntime) CreateSubRuntimeAccordingView added in v0.14.3

func (drt *DagRuntime) CreateSubRuntimeAccordingView(view schema.ComponentView, name string) componentRuntime

func (*DagRuntime) GetSubComponentArtifactPaths added in v0.14.3

func (drt *DagRuntime) GetSubComponentArtifactPaths(componentName string, artName string) (string, error)

func (*DagRuntime) GetSubComponentParameterValue added in v0.14.3

func (drt *DagRuntime) GetSubComponentParameterValue(componentName string, paramName string) (interface{}, error)

func (*DagRuntime) Listen added in v0.14.3

func (drt *DagRuntime) Listen()

监听由子节点发送过来的信号

func (*DagRuntime) ProcessFailureOptions added in v0.14.3

func (drt *DagRuntime) ProcessFailureOptions(event WorkflowEvent)

func (*DagRuntime) ProcessFailureOptionsWithContinue added in v0.14.3

func (drt *DagRuntime) ProcessFailureOptionsWithContinue(component schema.Component)

func (*DagRuntime) ProcessFailureOptionsWithFailFast added in v0.14.3

func (drt *DagRuntime) ProcessFailureOptionsWithFailFast()

func (*DagRuntime) Restart added in v0.14.3

func (drt *DagRuntime) Restart(dagView *schema.DagView)

重新执行 TODO

func (*DagRuntime) Resume added in v0.14.3

func (drt *DagRuntime) Resume(dagView *schema.DagView)

func (*DagRuntime) Start added in v0.14.3

func (drt *DagRuntime) Start()

开始执行 runtime 不返回error,直接通过 event 向上冒泡

func (*DagRuntime) Stop added in v0.14.3

func (drt *DagRuntime) Stop()

type DependencySolver added in v0.14.3

type DependencySolver struct {
	*DagRuntime
}

用于解析可能存在依赖关系的模版 如 parameter,artifact 字段,因为其有可能会引用上游节点,以及父节点(子节点) 的相关信息

func NewDependencySolver added in v0.14.3

func NewDependencySolver(dr *DagRuntime) *DependencySolver

func (DependencySolver) CalculateCondition added in v0.14.3

func (crt DependencySolver) CalculateCondition() (bool, error)

func (*DependencySolver) ResolveAfterDone added in v0.14.3

func (ds *DependencySolver) ResolveAfterDone() error

ResolveAfterDone: 当dag运行成功时调用,解析输出artifact 模版

func (*DependencySolver) ResolveBeforeRun added in v0.14.3

func (ds *DependencySolver) ResolveBeforeRun(subComponent schema.Component) error

type Job

type Job interface {
	Job() BaseJob
	Update(cmd string, params map[string]string, envs map[string]string, artifacts *schema.Artifacts)
	Validate() error
	Start() (string, error)
	Stop() error
	Check() (schema.JobStatus, error)
	Watch()
	Started() bool
	Succeeded() bool
	Failed() bool
	Terminated() bool
	Skipped() bool
	NotEnded() bool
	JobID() string
}

type LocalJob

type LocalJob struct {
	BaseJob
	Pid string
}

---------------------------------------------------------------------------- Local Process Job ----------------------------------------------------------------------------

type PaddleFlowJob

type PaddleFlowJob struct {
	BaseJob
	Image string
	// contains filtered or unexported fields
}

----------------------------------------------------------------------------

K8S Job

----------------------------------------------------------------------------

func NewPaddleFlowJob

func NewPaddleFlowJob(name, image, userName string, eventChannel chan<- WorkflowEvent, mainFS *schema.FsMount, extraFS []schema.FsMount) *PaddleFlowJob

func NewPaddleFlowJobWithJobView added in v0.14.3

func NewPaddleFlowJobWithJobView(view *schema.JobView, image string, eventChannel chan<- WorkflowEvent,
	mainFS *schema.FsMount, extraFS []schema.FsMount, userName string) *PaddleFlowJob

func (*PaddleFlowJob) Cancelled

func (pfj *PaddleFlowJob) Cancelled() bool

func (*PaddleFlowJob) Check

func (pfj *PaddleFlowJob) Check() (schema.JobStatus, error)

查作业状态接口

func (*PaddleFlowJob) Failed

func (pfj *PaddleFlowJob) Failed() bool

func (*PaddleFlowJob) Job

func (pfj *PaddleFlowJob) Job() BaseJob

func (*PaddleFlowJob) JobID added in v0.14.3

func (pfj *PaddleFlowJob) JobID() string

func (*PaddleFlowJob) NotEnded

func (pfj *PaddleFlowJob) NotEnded() bool

func (*PaddleFlowJob) Skipped

func (pfj *PaddleFlowJob) Skipped() bool

func (*PaddleFlowJob) Start

func (pfj *PaddleFlowJob) Start() (string, error)

发起作业接口

func (*PaddleFlowJob) Started

func (pfj *PaddleFlowJob) Started() bool

func (*PaddleFlowJob) Stop

func (pfj *PaddleFlowJob) Stop() error

停止作业接口

func (*PaddleFlowJob) Succeeded

func (pfj *PaddleFlowJob) Succeeded() bool

func (*PaddleFlowJob) Terminated

func (pfj *PaddleFlowJob) Terminated() bool

func (*PaddleFlowJob) Update

func (pfj *PaddleFlowJob) Update(cmd string, params map[string]string, envs map[string]string,
	artifacts *schema.Artifacts)

发起作业接口

func (*PaddleFlowJob) Validate

func (pfj *PaddleFlowJob) Validate() error

校验job参数

func (*PaddleFlowJob) Watch

func (pfj *PaddleFlowJob) Watch()

同步watch作业接口

type PathToModTime added in v0.14.3

type PathToModTime struct {
	ModTime map[string]string `json:"omitempty"`
}

type RuntimeStatus added in v0.14.3

type RuntimeStatus = schema.JobStatus
var (
	StatusRuntimeInit        RuntimeStatus = schema.StatusJobInit
	StatusRuntimePending     RuntimeStatus = schema.StatusJobPending
	StatusRuntimeRunning     RuntimeStatus = schema.StatusJobRunning
	StatusRuntimeFailed      RuntimeStatus = schema.StatusJobFailed
	StatusRuntimeSucceeded   RuntimeStatus = schema.StatusJobSucceeded
	StatusRuntimeTerminating RuntimeStatus = schema.StatusJobTerminating
	StatusRuntimeTerminated  RuntimeStatus = schema.StatusJobTerminated
	StatusRuntimeCancelled   RuntimeStatus = schema.StatusJobCancelled
	StatusRuntimeSkipped     RuntimeStatus = schema.StatusJobSkipped
)

type StepRuntime added in v0.14.3

type StepRuntime struct {
	CacheRunID string
	CacheJobID string
	// contains filtered or unexported fields
}

func NewStepRuntime added in v0.14.3

func NewStepRuntime(name, fullName string, step *schema.WorkflowSourceStep, seq int, ctx context.Context,
	failureOpitonsCtx context.Context, eventChannel chan<- WorkflowEvent, config *runConfig, ParentDagID string) *StepRuntime

func (StepRuntime) CalculateCondition added in v0.14.3

func (crt StepRuntime) CalculateCondition() (bool, error)

func (*StepRuntime) Execute added in v0.14.3

func (srt *StepRuntime) Execute()

运行步骤

func (*StepRuntime) Listen added in v0.14.3

func (srt *StepRuntime) Listen()

func (*StepRuntime) Restart added in v0.14.3

func (srt *StepRuntime) Restart(view *schema.JobView)

Restart: 根据 jobView 来重新运行step 只有step的状态为failed,terminated的时候,才会重新运行

func (*StepRuntime) Resume added in v0.14.3

func (srt *StepRuntime) Resume(view *schema.JobView)

func (*StepRuntime) Start added in v0.14.3

func (srt *StepRuntime) Start()

func (*StepRuntime) Stop added in v0.14.3

func (srt *StepRuntime) Stop()

type WfEventType

type WfEventType string

type WfEventValue

type WfEventValue string

type Workflow

type Workflow struct {
	BaseWorkflow
	// contains filtered or unexported fields
}

func NewWorkflow

func NewWorkflow(wfSource schema.WorkflowSource, runID string, params map[string]interface{}, extra map[string]string,
	callbacks WorkflowCallbacks) (*Workflow, error)

实例化一个Workflow,并返回

func (*Workflow) NewWorkflowRuntime added in v0.14.3

func (wf *Workflow) NewWorkflowRuntime() error

初始化 workflow runtime

func (*Workflow) Restart

func (wf *Workflow) Restart(entryPointView *schema.DagView, postProcessView schema.PostProcessView)

Restart 从 DB 中恢复重启 workflow Restart 调用逻辑:1. NewWorkflow 2. SetWorkflowRuntime 3. Restart

func (*Workflow) Resume added in v0.14.3

func (wf *Workflow) Resume(entryPointView *schema.DagView, postProcessView schema.PostProcessView, runStatus string, stopForce bool)

func (*Workflow) Start

func (wf *Workflow) Start()

Start to run a workflow

func (*Workflow) Status

func (wf *Workflow) Status() string

func (*Workflow) Stop

func (wf *Workflow) Stop(force bool)

Stop a workflow

type WorkflowCallbacks

type WorkflowCallbacks struct {
	GetJobCb        func(jobID string) (schema.JobView, error)
	UpdateRuntimeCb func(id string, event interface{}) (int64, bool)
	LogCacheCb      func(req schema.LogRunCacheRequest) (string, error)
	ListCacheCb     func(firstFp, fsID, source string) ([]models.RunCache, error)
	LogArtifactCb   func(req schema.LogRunArtifactRequest) error
}

type WorkflowEvent

type WorkflowEvent struct {
	Type    WfEventType
	Event   WfEventValue
	Message string
	Extra   map[string]interface{}
}

func NewWorkflowEvent

func NewWorkflowEvent(e WfEventValue, msg string, extra map[string]interface{}) *WorkflowEvent

实例化

type WorkflowRuntime

type WorkflowRuntime struct {
	EventChan chan WorkflowEvent
	// contains filtered or unexported fields
}

工作流运行时

func NewWorkflowRuntime

func NewWorkflowRuntime(rc *runConfig) *WorkflowRuntime

func (*WorkflowRuntime) IsCompleted

func (wfr *WorkflowRuntime) IsCompleted() bool

func (*WorkflowRuntime) Listen

func (wfr *WorkflowRuntime) Listen()

func (*WorkflowRuntime) Restart

func (wfr *WorkflowRuntime) Restart(entryPointView *schema.DagView,
	postProcessView schema.PostProcessView)

Restart: 重新运行

func (*WorkflowRuntime) Resume added in v0.14.3

func (wfr *WorkflowRuntime) Resume(entryPointView *schema.DagView, postProcessView schema.PostProcessView,
	runStatus string, stopForce bool)

func (*WorkflowRuntime) Start

func (wfr *WorkflowRuntime) Start()

运行

func (*WorkflowRuntime) Status

func (wfr *WorkflowRuntime) Status() string

func (*WorkflowRuntime) Stop

func (wfr *WorkflowRuntime) Stop(force bool) error

Stop 停止 Workflow do not call ctx_cancel(), which will be called when all steps has terminated eventually. 这里不通过 cancel channel 去取消 Step 的原因是防止有多个地方向通过一个 channel 传递东西,防止runtime hang 住

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL