Documentation ¶
Overview ¶
Package states implements the logic for Direktiv workflow states.
State logic is implemented in Run functions.
The following state properties should be ignored in this package because they're handled by Direktiv: * catch * log * metadata
The state logic dictates how to transform and transition with the returned Transition struct. If the Transition struct is not nil Direktiv will execute the transform and transition logic. As long as the transition string is not empty it must refer to a state ID. If it's empty, that concludes the workflow. If the Run function returns a nil Transition struct and no errors, this tells Direktiv to yield. This is how long-running states can be implemented: they yield, putting the instance to sleep while it waits for something to wake it up. Such states usually should call Engine APIs to register for ways that they can be woken up.
When Direktiv first executes a state it calls the Run function with nil wakedata and the instance will have nothing stored in memory. If the state may need to be scheduled in repeatedly, the Run function will need to determine where it stands in that process in any given call by checking what's in instance memory and the wakedata. The wakedata is defined in various ways by the Engine APIs, but the instance memory is entirely under the control of the Run logic.
Run functions should be designed in a way such that each time they are called they will return in a timely manner. The entire time the Run function is running Direktiv holds a cluster-wide mutex on the instance, making it impossible to cancel or timeout. Ensure that this is always only temporary. Even though the mutex is on the specific instance, each node in the cluster can only hold a limited number of cluster-wide mutexes at any given time. Direktiv uses these cluster-wide locks for many purpose, therefore, failure to return in a timely manner can result in seemingly unrelated problems in the cluster.
The mutex held for an instance guarantees that exactly one node in the cluster can be running logic for the state at a time. As an example: it is safe to register an event listener in a Run function without worrying about race conditions because even if the events are received immediately the followup call to Run will wait until after the current call has returned and been cleaned up.
Index ¶
- Constants
- func ISO8601StringtoSecs(timeout string) (int, error)
- func RegisterState(st model.StateType, ...)
- type Child
- type ChildInfo
- type CreateChildArgs
- type Instance
- type Logic
- func Action(instance Instance, state model.State) (Logic, error)
- func ConsumeEvent(instance Instance, state model.State) (Logic, error)
- func Delay(instance Instance, state model.State) (Logic, error)
- func Error(instance Instance, state model.State) (Logic, error)
- func EventsAnd(instance Instance, state model.State) (Logic, error)
- func EventsXor(instance Instance, state model.State) (Logic, error)
- func ForEach(instance Instance, state model.State) (Logic, error)
- func GenerateEvent(instance Instance, state model.State) (Logic, error)
- func Getter(instance Instance, state model.State) (Logic, error)
- func Noop(instance Instance, state model.State) (Logic, error)
- func Parallel(instance Instance, state model.State) (Logic, error)
- func Setter(instance Instance, state model.State) (Logic, error)
- func StateLogic(instance Instance, state model.State) (Logic, error)
- func Switch(instance Instance, state model.State) (Logic, error)
- func Validate(instance Instance, state model.State) (Logic, error)
- type Transition
- type Variable
- type VariableSelector
- type VariableSetter
Constants ¶
const ( DefaultShortDeadline = time.Second * 5 DefaultLongDeadline = time.Minute * 15 )
const ( ErrCodeJQBadQuery = "direktiv.jq.badCommand" ErrCodeJQNoResults = "direktiv.jq.badCommand" ErrCodeJQManyResults = "direktiv.jq.badCommand" ErrCodeJQNotObject = "direktiv.jq.notObject" ErrCodeFailedSchemaValidation = "direktiv.schema.failed" ErrCodeJQNotString = "direktiv.jq.notString" ErrCodeInvalidVariableKey = "direktiv.var.invalidKey" ErrCodeInvalidVariableScope = "direktiv.var.invalidScope" ErrCodeAllBranchesFailed = "direktiv.parallel.allFailed" ErrCodeNotArray = "direktiv.foreach.badArray" ErrCodeInvalidVariablePermissions = "direktiv.var.perms" )
Variables ¶
This section is empty.
Functions ¶
func ISO8601StringtoSecs ¶
Types ¶
type CreateChildArgs ¶
type CreateChildArgs struct { Definition model.FunctionDefinition Input []byte Timeout int Async bool Files []model.FunctionFileDefinition Iterator int }
type Instance ¶
type Instance interface { GetInstanceID() uuid.UUID GetInstanceData() interface{} GetMemory() interface{} UnmarshalMemory(x interface{}) error GetModel() (*model.Workflow, error) PrimeDelayedEvent(event cloudevents.Event) SetMemory(ctx context.Context, x interface{}) error StoreData(key string, val interface{}) error GetVariables(ctx context.Context, vars []VariableSelector) ([]Variable, error) Sleep(ctx context.Context, d time.Duration, x interface{}) error Raise(ctx context.Context, err *derrors.CatchableError) error Log(ctx context.Context, level log.Level, a string, x ...interface{}) AddAttribute(tag, value string) SetVariables(ctx context.Context, vars []VariableSetter) error BroadcastCloudevent(ctx context.Context, event *cloudevents.Event, dd int64) error ListenForEvents(ctx context.Context, events []*model.ConsumeEventDefinition, all bool) error RetrieveSecret(ctx context.Context, secret string) (string, error) CreateChild(ctx context.Context, args CreateChildArgs) (Child, error) Iterator() (int, bool) Deadline(ctx context.Context) time.Time LivingChildren(ctx context.Context) []*ChildInfo }
type Logic ¶
type Logic interface { GetID() string GetType() model.StateType GetLog() interface{} GetMetadata() interface{} ErrorDefinitions() []model.ErrorDefinition GetMemory() interface{} Deadline(ctx context.Context) time.Time Run(ctx context.Context, wakedata []byte) (*Transition, error) LivingChildren(ctx context.Context) []*ChildInfo }
func Action ¶
Action initializes the logic for executing an 'action' state in a Direktiv workflow instance.
func Delay ¶
Delay initializes the logic for executing a 'delay' state in a Direktiv workflow instance.
func Error ¶
Error initializes the logic for executing an 'error' state in a Direktiv workflow instance.
func ForEach ¶
ForEach initializes the logic for executing an 'action' state in a Direktiv workflow instance.
func Noop ¶
Noop initializes the logic for executing a 'noop' state in a Direktiv workflow instance.
func Parallel ¶
Parallel initializes the logic for executing a 'parallel' state in a Direktiv workflow instance.
type Transition ¶
type Transition struct { NextState string Transform interface{} }