states

package
v0.8.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 28, 2024 License: Apache-2.0 Imports: 18 Imported by: 0

Documentation

Overview

Package states implements the logic for Direktiv workflow states.

State logic is implemented in Run functions.

The following state properties should be ignored in this package because they're handled by Direktiv: * catch * log * metadata

The state logic dictates how to transform and transition with the returned Transition struct. If the Transition struct is not nil Direktiv will execute the transform and transition logic. As long as the transition string is not empty it must refer to a state ID. If it's empty, that concludes the workflow. If the Run function returns a nil Transition struct and no errors, this tells Direktiv to yield. This is how long-running states can be implemented: they yield, putting the instance to sleep while it waits for something to wake it up. Such states usually should call Engine APIs to register for ways that they can be woken up.

When Direktiv first executes a state it calls the Run function with nil wakedata and the instance will have nothing stored in memory. If the state may need to be scheduled in repeatedly, the Run function will need to determine where it stands in that process in any given call by checking what's in instance memory and the wakedata. The wakedata is defined in various ways by the Engine APIs, but the instance memory is entirely under the control of the Run logic.

Run functions should be designed in a way such that each time they are called they will return in a timely manner. The entire time the Run function is running Direktiv holds a cluster-wide mutex on the instance, making it impossible to cancel or timeout. Ensure that this is always only temporary. Even though the mutex is on the specific instance, each node in the cluster can only hold a limited number of cluster-wide mutexes at any given time. Direktiv uses these cluster-wide locks for many purpose, therefore, failure to return in a timely manner can result in seemingly unrelated problems in the cluster.

The mutex held for an instance guarantees that exactly one node in the cluster can be running logic for the state at a time. As an example: it is safe to register an event listener in a Run function without worrying about race conditions because even if the events are received immediately the followup call to Run will wait until after the current call has returned and been cleaned up.

Index

Constants

View Source
const (
	DefaultShortDeadline = time.Second * 5
	DefaultLongDeadline  = time.Minute * 15
)
View Source
const (
	ErrCodeJQBadQuery                 = "direktiv.jq.badCommand"
	ErrCodeJQNoResults                = "direktiv.jq.badCommand"
	ErrCodeJQManyResults              = "direktiv.jq.badCommand"
	ErrCodeJQNotObject                = "direktiv.jq.notObject"
	ErrCodeFailedSchemaValidation     = "direktiv.schema.failed"
	ErrCodeJQNotString                = "direktiv.jq.notString"
	ErrCodeInvalidVariableKey         = "direktiv.var.invalidKey"
	ErrCodeInvalidVariableScope       = "direktiv.var.invalidScope"
	ErrCodeAllBranchesFailed          = "direktiv.parallel.allFailed"
	ErrCodeNotArray                   = "direktiv.foreach.badArray"
	ErrCodeInvalidVariablePermissions = "direktiv.var.perms"
)

Variables

This section is empty.

Functions

func ISO8601StringtoSecs

func ISO8601StringtoSecs(timeout string) (int, error)

func RegisterState

func RegisterState(st model.StateType, initializer func(instance Instance, state model.State) (Logic, error))

Types

type Child

type Child interface {
	Run(ctx context.Context)
	Info() ChildInfo
}

type ChildInfo

type ChildInfo struct {
	ID          string
	Complete    bool
	Type        string
	Attempts    int
	Results     interface{}
	ServiceName string
}

type CreateChildArgs

type CreateChildArgs struct {
	Definition model.FunctionDefinition
	Input      []byte
	Timeout    int
	Async      bool
	Files      []model.FunctionFileDefinition
	Iterator   int
}

type Instance

type Instance interface {
	GetInstanceID() uuid.UUID
	GetInstanceData() interface{}
	GetMemory() interface{}
	UnmarshalMemory(x interface{}) error
	GetModel() (*model.Workflow, error)
	PrimeDelayedEvent(event cloudevents.Event)
	SetMemory(ctx context.Context, x interface{}) error
	StoreData(key string, val interface{}) error
	GetVariables(ctx context.Context, vars []VariableSelector) ([]Variable, error)
	Sleep(ctx context.Context, d time.Duration, x interface{}) error
	Raise(ctx context.Context, err *derrors.CatchableError) error
	Log(ctx context.Context, level log.Level, a string, x ...interface{})
	AddAttribute(tag, value string)
	SetVariables(ctx context.Context, vars []VariableSetter) error
	BroadcastCloudevent(ctx context.Context, event *cloudevents.Event, dd int64) error
	ListenForEvents(ctx context.Context, events []*model.ConsumeEventDefinition, all bool) error
	RetrieveSecret(ctx context.Context, secret string) (string, error)
	CreateChild(ctx context.Context, args CreateChildArgs) (Child, error)
	Iterator() (int, bool)
	Deadline(ctx context.Context) time.Time
	LivingChildren(ctx context.Context) []*ChildInfo
}

type Logic

type Logic interface {
	GetID() string
	GetType() model.StateType
	GetLog() interface{}
	GetMetadata() interface{}
	ErrorDefinitions() []model.ErrorDefinition
	GetMemory() interface{}
	Deadline(ctx context.Context) time.Time
	Run(ctx context.Context, wakedata []byte) (*Transition, error)
	LivingChildren(ctx context.Context) []*ChildInfo
}

func Action

func Action(instance Instance, state model.State) (Logic, error)

Action initializes the logic for executing an 'action' state in a Direktiv workflow instance.

func ConsumeEvent

func ConsumeEvent(instance Instance, state model.State) (Logic, error)

func Delay

func Delay(instance Instance, state model.State) (Logic, error)

Delay initializes the logic for executing a 'delay' state in a Direktiv workflow instance.

func Error

func Error(instance Instance, state model.State) (Logic, error)

Error initializes the logic for executing an 'error' state in a Direktiv workflow instance.

func EventsAnd

func EventsAnd(instance Instance, state model.State) (Logic, error)

func EventsXor

func EventsXor(instance Instance, state model.State) (Logic, error)

func ForEach

func ForEach(instance Instance, state model.State) (Logic, error)

ForEach initializes the logic for executing an 'action' state in a Direktiv workflow instance.

func GenerateEvent

func GenerateEvent(instance Instance, state model.State) (Logic, error)

func Getter

func Getter(instance Instance, state model.State) (Logic, error)

func Noop

func Noop(instance Instance, state model.State) (Logic, error)

Noop initializes the logic for executing a 'noop' state in a Direktiv workflow instance.

func Parallel

func Parallel(instance Instance, state model.State) (Logic, error)

Parallel initializes the logic for executing a 'parallel' state in a Direktiv workflow instance.

func Setter

func Setter(instance Instance, state model.State) (Logic, error)

func StateLogic

func StateLogic(instance Instance, state model.State) (Logic, error)

func Switch

func Switch(instance Instance, state model.State) (Logic, error)

Switch initializes the logic for executing a 'switch' state in a Direktiv workflow instance.

func Validate

func Validate(instance Instance, state model.State) (Logic, error)

type Transition

type Transition struct {
	NextState string
	Transform interface{}
}

type Variable

type Variable struct {
	Scope string
	Key   string
	Data  []byte
}

type VariableSelector

type VariableSelector struct {
	Scope string
	Key   string
}

type VariableSetter

type VariableSetter struct {
	Scope    string
	Key      string
	MIMEType string
	Data     []byte
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL