wfexec

package
v0.0.0-...-2873e01 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 29, 2024 License: Apache-2.0 Imports: 10 Imported by: 0

Documentation

Index

Constants

View Source
const (
	DefaultMaxIteratorBufferSize uint = 1000
)

Variables

View Source
var (
	MaxIteratorBufferSize uint = DefaultMaxIteratorBufferSize
)

Functions

func Delay

func Delay(until time.Time) *delayed

func ErrorHandler

func ErrorHandler(h Step, results *expr.Vars) *errHandler

func ExclGateway

func ExclGateway(pp ...*GatewayPath) (*exclGateway, error)

ExclGateway fn initializes exclusive gateway

func ForkGateway

func ForkGateway() *forkGateway

ForkGateway fn initializes fork gateway No arguments are required; Graph Graph config is used to determine all possible fork paths on the fly

func GenericResourceNextCheck

func GenericResourceNextCheck(useLimit bool, ptr, buffSize, total, limit uint, hasMore bool) bool

func GetContextCallStack

func GetContextCallStack(ctx context.Context) []uint64

func InclGateway

func InclGateway(pp ...*GatewayPath) (*inclGateway, error)

InclGateway fn initializes inclusive gateway

func JoinGateway

func JoinGateway(ss ...Step) *joinGateway

JoinGateway fn initializes join gateway with all paths that are expected to be partial

func LoopBreak

func LoopBreak() *loopBreak

func LoopContinue

func LoopContinue() *loopContinue

func NewGenericStep

func NewGenericStep(fn execFn) *genericStep

func Prompt

func Prompt(ownerId uint64, ref string, payload *expr.Vars) *prompted

func Resume

func Resume() *resumed

func SetContextCallStack

func SetContextCallStack(ctx context.Context, ss []uint64) context.Context

func Termination

func Termination() *termination

Types

type ExecRequest

type ExecRequest struct {
	SessionID uint64
	StateID   uint64

	// Current input received on session resume
	Input *expr.Vars

	// Current scope
	Scope *expr.Vars

	// Helps with gateway join/merge steps
	// that needs info about the step it's currently merging
	Parent Step
}

ExecRequest is passed to Exec() functions and contains all information for step execution

type ExecResponse

type ExecResponse interface{}

type Frame

type Frame struct {
	CreatedAt time.Time  `json:"createdAt"`
	SessionID uint64     `json:"sessionID"`
	StateID   uint64     `json:"stateID"`
	Input     *expr.Vars `json:"input"`
	Scope     *expr.Vars `json:"scope"`
	Results   *expr.Vars `json:"results"`
	ParentID  uint64     `json:"parentID"`
	StepID    uint64     `json:"stepID"`
	NextSteps []uint64   `json:"nextSteps"`

	// How much time from the 1st step to the start of this step in milliseconds
	ElapsedTime uint `json:"elapsedTime"`

	// How much time it took to execute this step in milliseconds
	StepTime uint `json:"stepTime"`

	Action string `json:"action,omitempty"`
	Error  string `json:"error,omitempty"`
}

type GatewayPath

type GatewayPath struct {
	// contains filtered or unexported fields
}

GatewayPath structure is used by subset of gateway nodes

It allows to evaluate test Expression to help determine the gateway if a certain path should be used or not

func NewGatewayPath

func NewGatewayPath(s Step, t pathTester) (gwp *GatewayPath, err error)

NewGatewayPath validates Expression and returns initialized GatewayPath

type GatewayPaths

type GatewayPaths []*GatewayPath

GatewayPath structure is used by subset of gateway nodes

It allows to evaluate test Expression to help determine the gateway if a certain path should be used or not

type Graph

type Graph struct {
	// contains filtered or unexported fields
}

list of Graph steps with relations

func NewGraph

func NewGraph() *Graph

func (*Graph) AddParent

func (g *Graph) AddParent(c, p Step)

func (*Graph) AddStep

func (g *Graph) AddStep(s Step, cc ...Step)

func (*Graph) Children

func (g *Graph) Children(s Step) Steps

func (*Graph) Exec

func (*Graph) Len

func (g *Graph) Len() int

func (*Graph) Orphans

func (g *Graph) Orphans() (oo Steps)

func (*Graph) Parents

func (g *Graph) Parents(s Step) Steps

func (*Graph) StepByID

func (g *Graph) StepByID(ID uint64) Step

type Iterator

type Iterator interface {
	// Is the given step this iterator step
	Is(Step) bool

	// Initialize iterator
	Start(context.Context, *expr.Vars) error

	// Break fn is called when loop is forcefully broken
	Break() Step

	Iterator() Step

	// Next is called before each iteration and returns
	// 1st step of the iteration branch and variables that are added to the scope
	Next(context.Context, *expr.Vars) (Step, *expr.Vars, error)
}

Iterator can be returned from Exec fn as ExecResponse

It helps session's exec fn() to properly navigate through graph by calling is/break/iterator/next function

func GenericIterator

func GenericIterator(iter, next, exit Step, h IteratorHandler) Iterator

GenericIterator creates a wrapper around IteratorHandler and returns genericIterator that implements Iterator interface

type IteratorHandler

type IteratorHandler interface {
	Start(context.Context, *expr.Vars) error
	More(context.Context, *expr.Vars) (bool, error)
	Next(context.Context, *expr.Vars) (*expr.Vars, error)
}

type PendingPrompt

type PendingPrompt struct {
	Ref       string     `json:"ref"`
	SessionID uint64     `json:"sessionID,string"`
	CreatedAt time.Time  `json:"createdAt"`
	StateID   uint64     `json:"stateID,string"`
	Payload   *expr.Vars `json:"payload"`
	OwnerId   uint64     `json:"-"`

	Original *prompted `json:"-"`
}

type ResultEvaluator

type ResultEvaluator interface {
	EvalResults(ctx context.Context, results *expr.Vars) (out *expr.Vars, err error)
}

type ResumedPrompt

type ResumedPrompt struct {
	StateID uint64 `json:"stateID,string"`
	OwnerId uint64 `json:"-"`
}

type Session

type Session struct {
	// contains filtered or unexported fields
}

func NewSession

func NewSession(ctx context.Context, g *Graph, oo ...SessionOpt) *Session

func (*Session) AllPendingPrompts

func (s *Session) AllPendingPrompts() (out []*PendingPrompt)

AllPendingPrompts returns all pending prompts for all user

func (*Session) Cancel

func (s *Session) Cancel()

func (*Session) Delayed

func (s *Session) Delayed() bool

func (*Session) Error

func (s *Session) Error() error

func (*Session) Exec

func (s *Session) Exec(ctx context.Context, step Step, scope *expr.Vars) error

func (*Session) ID

func (s *Session) ID() uint64

func (*Session) Idle

func (s *Session) Idle() bool

func (*Session) Prompted

func (s *Session) Prompted() bool

func (*Session) Result

func (s *Session) Result() *expr.Vars

func (*Session) Resume

func (s *Session) Resume(ctx context.Context, stateId uint64, input *expr.Vars) (*ResumedPrompt, error)

func (*Session) Status

func (s *Session) Status() SessionStatus

func (*Session) Stop

func (s *Session) Stop()

func (*Session) Suspended

func (s *Session) Suspended() bool

Suspended returns true if the workflow has either delayed or prompted steps

func (*Session) UnsentPendingPrompts

func (s *Session) UnsentPendingPrompts() (out []*PendingPrompt)

UnsentPendingPrompts returns unsent pending prompts for all user

func (*Session) UserPendingPrompts

func (s *Session) UserPendingPrompts(ownerId uint64) (out []*PendingPrompt)

UserPendingPrompts prompts fn returns all owner's pending prompts on this session

func (*Session) Wait

func (s *Session) Wait(ctx context.Context) error

Wait does not wait for the whole wf to be complete but until:

  • context timeout
  • idle state
  • error in error queue

func (*Session) WaitUntil

func (s *Session) WaitUntil(ctx context.Context, expected ...SessionStatus) error

WaitUntil blocks until workflow session gets into expected status

type SessionOpt

type SessionOpt func(*Session)

func SetCallStack

func SetCallStack(id ...uint64) SessionOpt

func SetDumpStacktraceOnPanic

func SetDumpStacktraceOnPanic(dump bool) SessionOpt

func SetHandler

func SetHandler(fn StateChangeHandler) SessionOpt

func SetLogger

func SetLogger(log *zap.Logger) SessionOpt

func SetWorkerIntervalSuspended

func SetWorkerIntervalSuspended(i time.Duration) SessionOpt

func SetWorkerIntervalWaiter

func SetWorkerIntervalWaiter(i time.Duration) SessionOpt

func SetWorkflowID

func SetWorkflowID(workflowID uint64) SessionOpt

type SessionStatus

type SessionStatus int
const (
	SessionActive SessionStatus = iota
	SessionPrompted
	SessionDelayed
	SessionFailed
	SessionCompleted
	SessionCanceled
)

func (SessionStatus) String

func (s SessionStatus) String() string

type State

type State struct {
	// contains filtered or unexported fields
}

state holds information about Session ID

func FinalState

func FinalState(ses *Session, scope *expr.Vars) *State

func NewState

func NewState(ses *Session, owner auth.Identifiable, caller, current Step, scope *expr.Vars) *State

func (*State) Error

func (s *State) Error() string

func (State) MakeFrame

func (s State) MakeFrame() *Frame

func (State) MakeRequest

func (s State) MakeRequest() *ExecRequest

func (State) Next

func (s State) Next(current Step, scope *expr.Vars) *State

type StateChangeHandler

type StateChangeHandler func(SessionStatus, *State, *Session)

type Step

type Step interface {
	ID() uint64
	SetID(uint64)
	Exec(context.Context, *ExecRequest) (ExecResponse, error)
}

type StepIdentifier

type StepIdentifier struct {
	// contains filtered or unexported fields
}

func (*StepIdentifier) ID

func (i *StepIdentifier) ID() uint64

func (*StepIdentifier) SetID

func (i *StepIdentifier) SetID(id uint64)

type Steps

type Steps []Step

func (Steps) Contains

func (ss Steps) Contains(steps ...Step) bool

func (Steps) IDs

func (ss Steps) IDs() []uint64

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL