executor

package
v0.0.0-...-197f627 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 17, 2024 License: MIT Imports: 13 Imported by: 0

Documentation

Overview

Package executor ...

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Get

func Get[T any](ctx context.Context, s *Storage, key string) (value *T, err error)

func Set

func Set[T any](ctx context.Context, s *Storage, key string, value T) error

Types

type ActivateTaskCommand

type ActivateTaskCommand struct {
}

ActivateTaskCommand ...

type ActivityBranchStatus

type ActivityBranchStatus struct {
	Max    int
	Done   int
	Output [][]byte
}

ActivityBranchStatus ...

type ActivityContext

type ActivityContext struct {
	// /{tenant}/{namespace}/execution/{id}/activity/{id}
	ID string

	ExecutionID string

	StateName string
	Input     []byte

	ParentBranchID    *string
	ParentIterationID *string

	BranchStatus    *ActivityBranchStatus
	IterationStatus *ActivityIterationStatus
}

ActivityContext ...

type ActivityIterationStatus

type ActivityIterationStatus struct {
	Max    int
	Done   int
	Output [][]byte
}

ActivityIterationStatus ...

type BranchCompletedEvent

type BranchCompletedEvent struct {
	// /{tenant}/{namespace}/execution/{id}/activity/{id}/branch/{id}
	BranchID string
}

BranchCompletedEvent ...

type BranchContext

type BranchContext struct {
	// /{tenant}/{namespace}/execution/{id}/activity/{id}/branch/{id}
	BranchID string

	Index int

	//
	ActivityID string

	Input []byte
}

BranchContext ...

type BranchStartedEvent

type BranchStartedEvent struct {
	// /{tenant}/{namespace}/execution/{id}/activity/{id}/branch/{id}
	BranchID string
}

BranchStartedEvent ...

type CompleteBranchCommand

type CompleteBranchCommand struct {
	// BranchID is the identify of branch.
	BranchID string

	// Output is the results of this branch. it MUST be an valid JSON-Object.
	Output []byte

	// Timestamp is `HLC` value of this command request time.
	Timestamp uint64
}

CompleteBranchCommand is triggered when an branch is complete, and this command will been replicated from leader to followers.

type CompleteExecutionCommand

type CompleteExecutionCommand struct {
	// ID is the identify of this execution.
	ID string

	// Output is the results of this execution. it MUST be an valid JSON-Object.
	Output []byte

	// Timestamp is `HLC` value of this command request time.
	Timestamp uint64
}

CompleteExecutionCommand is triggered when an execution is completed, and this command will been replicated from leader to followers.

type CompleteIterationCommand

type CompleteIterationCommand struct {
	// IterationID is the identify of iteration.
	IterationID string

	// Output is the results of this iteration. it MUST be an valid JSON-Object.
	Output []byte

	// Timestamp is `HLC` value of this command request time.
	Timestamp uint64
}

CompleteIterationCommand is triggered when an iteration is complete, and this command will been replicated from leader to followers.

type CompleteStateCommand

type CompleteStateCommand struct {
	// ActivityID is the identify of this state's context.
	ActivityID string

	// Output is the results of this state. it MUST be an valid JSON-Object.
	Output []byte

	// Status is the result of this state, which could be one of
	// 1. `Succeeded`: the state is executed successful.
	// 2. `Failed`: the state is executed failed, it always will be caused by workflow interpreter.
	Status string

	// Reason is the errcode string when `Status` is `Failed`.
	// It's a brief CamelCase message.
	Reason string

	// Message is the err message string when `Status` is `Failed`.
	// It's a human readable message indicating details.
	Message string

	// Timestamp is `HLC` value of this command request time.
	Timestamp uint64
}

CompleteStateCommand is triggered when an State is complete, and this command will been replicated from leader to followers.

type CompleteTaskCommand

type CompleteTaskCommand struct {
	// TaskID is the identify of this task.
	TaskID string

	// Output is the results of this task. it MUST be an valid JSON-Object.
	Output []byte

	// Timestamp is `HLC` value of this command request time.
	Timestamp uint64
}

CompleteTaskCommand is triggered when an Task is complete, and this command will been replicated from leader to followers.

type CreateTaskCommand

type CreateTaskCommand struct {
	// ID is the identify of this task, it formated as
	// `/{tenant}/{namespace}/t{xxxxxxx}/{id}`
	//  1. {tenant}: is the tenant name of current execution
	//  2. {namespace}: is the namespace name of current execution
	//  3. {xxxxxxx}: is 7-length hex string, it the hash of task's type.
	//  4. {id}: is the 8-length hex string of current `hybrid logical clock` value.
	ID string

	// ActivityID is the identify of activity which this task belongs.
	ActivityID string

	// ExecutionID is the identify of execution which this task belong to.
	ExecutionID string

	// Resource is the task's reference resource definitions.
	Resource string

	// Input is the arguments of this task. It MUST be an valid JSON-Object.
	Input []byte

	// Timestamp is `HLC` value of this command request time.
	Timestamp uint64
}

CreateTaskCommand is triggered when an `TaskState`、 `WaitState` or other task is ready to fired. And this command will been replicated from leader to followers.

type ExecutionCompletedEvent

type ExecutionCompletedEvent struct {
	// /{tenant}/{namespace}/execution/{id}
	ID string
}

ExecutionCompletedEvent ...

type ExecutionContext

type ExecutionContext struct {
	ID string

	WorkflowID string

	Input []byte

	Timestamp uint64

	Status string
}

ExecutionContext ...

type ExecutionStartedEvent

type ExecutionStartedEvent struct {
	// ID is the identify of this execution.
	ID string

	// WorkflowID is the identify of workflow definition which this execution belongs.
	WorkflowID string

	// Input is the arguments of this execution. It MUST be an valid JSON-Object.
	Input []byte

	// Timestamp is `HLC` value of this command request time.
	Timestamp uint64

	// Status is current state of execution, which could be one of
	// 1. `Pending`: is waiting to start the first `State`, current it always
	//   transition to `Running` right now; but when we support `delay execution`
	//   there will be an observed `Pending` state.
	// 2. `Running`: is running with an state、task and so on.
	// 3. `Completing`: had reach an `End` state, and all running state、task is reclaiming.
	// 4. `Succeeded`: execution is done as `Succeed`.
	// 5. `Failed`: execution is done as `Fail`.
	// 6. `Deleting`: execution record is `Deleting`, all persistent data will been deleted.
	// 7. `Canceling`: user commant to stopping exeuciton, or it's timeout reached.
	// 8. `Canceled`: execution is done as `Cancel`.
	//
	// State transition flow:
	//                                 +-------------+
	//                                 |   Pending   |
	//                                 +------|------+
	//                                        |
	//                                        |
	//                                        |
	//            +-------------+      +------|------+       +-------------+
	//        +----  Completing --------   Running   ---------  Canceling  |
	//        |   +------|------+      +-------------+       +------|------+
	//        |          |                                          |
	//        |          |                                          |
	//        |          |                                          |
	// +------|------+   |  +------+------+                  +------|------+
	// |  Succeeded  |   +---   Failed    ----+              |   Canceled  |
	// +------|------+      +-------------+   |              +------|------+
	//        |                               |                     |
	//        |                               |                     |
	//        |                               |                     |
	//        |                        +------|------+              |
	//        +-------------------------   Deleting  ---------------+
	//                                 +-------------+
	Status string
}

ExecutionStartedEvent is result of `StartExecutionCommand`, and MUST followed it. When follower's receive this event, it persistent the result to storage to keep consistency with leader. And if an new leader is elected, and found `StartExecutionCommand` didn't followed by any `ExecutionStartedEvent`, treat it as an not processed command and reprocess it.

type Executor

type Executor struct {
	// contains filtered or unexported fields
}

Executor ...

func NewExecutor

func NewExecutor(sm *fsl.StateMachine, handlers map[string]func(*CreateTaskCommand) []byte) *Executor

NewExecutor ...

func (*Executor) Run

func (e *Executor) Run(input []byte) *ExecutionContext

Run ...

func (*Executor) WaitExecutionDone

func (e *Executor) WaitExecutionDone() *ExecutionContext

WaitExecutionDone ...

type IterationCompletedEvent

type IterationCompletedEvent struct {
	// /{tenant}/{namespace}/execution/{id}/activity/{id}/iteration/{id}
	IterationID string
}

IterationCompletedEvent ...

type IterationContext

type IterationContext struct {
	// /{tenant}/{namespace}/execution/{id}/activity/{id}/iteration/{id}
	IterationID string

	Index int
	//
	ActivityID string
}

IterationContext ...

type IterationStartedEvent

type IterationStartedEvent struct {
	// /{tenant}/{namespace}/execution/{id}/activity/{id}/iteration/{id}
	IterationID string
}

IterationStartedEvent ...

type LogStream

type LogStream struct {
	// contains filtered or unexported fields
}

func (*LogStream) Append

func (s *LogStream) Append(ctx context.Context, v interface{}) error

func (*LogStream) Read

func (s *LogStream) Read(ctx context.Context) <-chan interface{}

type StartBranchCommand

type StartBranchCommand struct {
	// BranchID is the identify of this branch, it formated as
	// `/{ActivityID}/b{id}`
	// in which:
	//  1. {ActivityID}: is the referenced activity
	//  2. {id}: is the branch index of current branch, begin with 0
	BranchID string

	ExecutionID string

	// Index is serial of this branch in ParallelState.
	Index int

	// ActivityID is the identify of activity which this branch belongs.
	ActivityID string

	// Input is the arguments of this branch. It MUST be an valid JSON-Object.
	Input []byte

	// Timestamp is `HLC` value of this command request time.
	Timestamp uint64
}

StartBranchCommand is triggered when an ParallelState is started, and this command will been replicated from leader to followers.

type StartExecutionCommand

type StartExecutionCommand struct {
	// ID is the identify of this execution, it formated as
	// `/{tenant}/{namespace}/e{xxxxxxx}/{id}`
	// in which:
	//  1. {tenant}: is the tenant name of current execution
	//  2. {namespace}: is the namespace name of current execution
	//  3. {xxxxxxx}: is 7-length hex string, when start with idempotent,
	//     it was the hash of idempotent key; when without idempotent, it's
	//     the hash of last id.
	//  4. {id}: is the 16-length hex string of current `hybrid logical clock` value.
	//
	// All execution will partitioned within namespace, the partition prefix is
	// without last {id} to support idempotent without cross-partition transaction.
	// So there maybe large partition.
	ID string

	// WorkflowID is the identify of workflow definition which this execution belongs.
	WorkflowID string

	// Input is the arguments of this execution. It MUST be an valid JSON-Object.
	Input []byte

	// Timestamp is `HLC` value of this command request time.
	Timestamp uint64
}

StartExecutionCommand is triggered when an workflow is ready for execution, and this command will been replicated from leader to followers.

type StartIterationCommand

type StartIterationCommand struct {
	// IterationID is the identify of this iteration, it formated as
	// `/{ActivityID}/i{id}`
	// in which:
	//  1. {ActivityID}: is the referenced activity
	//  2. {id}: is the iteration index of current iteration, begin with 0
	IterationID string

	ExecutionID string

	// Index is serial of this iteration in MapState.
	Index int

	// ActivityID is the identify of activity which this branch belongs.
	ActivityID string

	// Input is the arguments of this iteration. It MUST be an valid JSON-Object.
	Input []byte

	// Timestamp is `HLC` value of this command request time.
	Timestamp uint64
}

StartIterationCommand is triggered when an MapState is started, and this command will been replicated from leader to followers.

type StartStateCommand

type StartStateCommand struct {
	// ActivityID is the identify of this state's context, it formated as
	// `/{ExecutionID}/{id}`
	// in which:
	//  1. {ExecutionID}: is the referenced execution of current state's context
	//  2. {id}: is the 8-length hex string of current `hybrid logical clock` value.
	//
	// An execution's all activity have the same prefix, and will in same partition.
	ActivityID string

	// ExecutionID is the identify of execution which this state belong to.
	ExecutionID string

	// StateName is current state's name which ready for execution, and because all state's
	// name must be unique (include in each nest state), so only name is enough.
	StateName string

	// ParentBranchID is exists when this state is invoked in ParallelState.
	ParentBranchID *string

	// ParentIterationID is exists when this state is invoked in MapState.
	ParentIterationID *string

	// Input is the arguments of this state. It MUST be an valid JSON-Object.
	Input []byte

	// Timestamp is `HLC` value of this command request time.
	Timestamp uint64
}

StartStateCommand is triggered when an State is ready for execution, and this command will been replicated from leader to followers.

type StateCompletedEvent

type StateCompletedEvent struct {
	// /{tenant}/{namespace}/execution/{id}/activity/{id}
	ActivityID string
}

StateCompletedEvent ...

type StateStartedEvent

type StateStartedEvent struct {
	ActivityID string
}

StateStartedEvent ...

type Storage

type Storage struct {
	// contains filtered or unexported fields
}

func NewStorage

func NewStorage(dir string) (*Storage, error)

func (*Storage) Get

func (s *Storage) Get(ctx context.Context, key []byte) (value []byte, err error)

func (*Storage) Set

func (s *Storage) Set(ctx context.Context, key []byte, value []byte) error

type TaskActivatedEvent

type TaskActivatedEvent struct {
}

TaskActivatedEvent ...

type TaskCompletedEvent

type TaskCompletedEvent struct {
	TaskID string
}

TaskCompletedEvent ...

type TaskContext

type TaskContext struct {
	ActivityID string
}

TaskContext ...

type TaskCreatedEvent

type TaskCreatedEvent struct {
	// /{tenant}/{namespace}/task/{id}
	ID string
}

TaskCreatedEvent ...

type Timestamp

type Timestamp struct {
	// WallTime is the physical unix epoch time expressed in
	// seconds.
	WallTime uint32

	// Logical is an sequential clock to captures causality for events
	// whose wall times are equal.
	Logical uint32
}

Timestamp is the represent of `hybrid logical clock`

func ParseTimestamp

func ParseTimestamp(t uint64) Timestamp

func (Timestamp) AsString

func (t Timestamp) AsString() string

func (Timestamp) AsUint64

func (t Timestamp) AsUint64() uint64

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL