storage

package
v1.1.1039 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 30, 2024 License: MIT Imports: 40 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var NatsConfig string

NatsConfig holds the current nats configuration for SHAR.

Functions

func NoOpServiceTaskConsumerFn added in v1.1.927

func NoOpServiceTaskConsumerFn(_ context.Context, _ string) error

NoOpServiceTaskConsumerFn no op service task consumer fn

func NoOpWorkFlowProcessMappingFn added in v1.1.927

func NoOpWorkFlowProcessMappingFn(_ context.Context, _ *model.Workflow, _ *model.Process) (uint64, error)

NoOpWorkFlowProcessMappingFn no op workflow to process mapping fn

func WithEmbargo

func WithEmbargo(embargo int) *publishEmbargoOption

WithEmbargo allows the specification of an embargo time on a workflow state message

Types

type NamespaceKvs added in v1.1.797

type NamespaceKvs struct {
	// contains filtered or unexported fields
}

NamespaceKvs defines all of the key value stores shar needs to operate

type Nats

type Nats struct {
	// contains filtered or unexported fields
}

Nats contains the engine functions that communicate with NATS.

func New

func New(conn *nats.Conn, txConn *nats.Conn, storageType jetstream.StorageType, concurrency int, allowOrphanServiceTasks bool, telCfg telemetry.Config) (*Nats, error)

New creates a new instance of the NATS communication layer.

func (*Nats) CheckProcessTaskDeprecation added in v1.1.670

func (s *Nats) CheckProcessTaskDeprecation(ctx context.Context, workflow *model.Workflow, processName string) error

CheckProcessTaskDeprecation checks if all the tasks in a process have not been deprecated.

func (*Nats) CloseUserTask

func (s *Nats) CloseUserTask(ctx context.Context, trackingID string) error

CloseUserTask removes a completed user task.

func (*Nats) Conn

func (s *Nats) Conn() common.NatsConn

Conn returns the active nats connection

func (*Nats) CreateExecution added in v1.0.645

func (s *Nats) CreateExecution(ctx context.Context, execution *model.Execution) (*model.Execution, error)

CreateExecution given a workflow, starts a new execution and returns its ID

func (*Nats) CreateJob

func (s *Nats) CreateJob(ctx context.Context, job *model.WorkflowState) (string, error)

CreateJob stores a workflow task state.

func (*Nats) CreateProcessInstance

func (s *Nats) CreateProcessInstance(ctx context.Context, executionId string, parentProcessID string, parentElementID string, processName string, workflowName string, workflowId string) (*model.ProcessInstance, error)

CreateProcessInstance creates a new instance of a process and attaches it to the workflow instance.

func (*Nats) DeleteJob

func (s *Nats) DeleteJob(ctx context.Context, trackingID string) error

DeleteJob removes a workflow task state.

func (*Nats) DeleteNamespace added in v1.1.927

func (s *Nats) DeleteNamespace(ctx context.Context, ns string) error

DeleteNamespace deletes the key-value store for the specified namespace in SHAR. It iterates over all the key-value stores and deletes them one by one. The function returns nil if all key-value stores are successfully deleted.

func (*Nats) DeprecateTaskSpec added in v1.1.670

func (s *Nats) DeprecateTaskSpec(ctx context.Context, uid []string) error

DeprecateTaskSpec deprecates one or more task specs by ID.

func (*Nats) DestroyProcessInstance

func (s *Nats) DestroyProcessInstance(ctx context.Context, state *model.WorkflowState, pi *model.ProcessInstance, execution *model.Execution) error

DestroyProcessInstance deletes a process instance and removes the workflow instance dependent on all process instances being satisfied.

func (*Nats) EnsureServiceTaskConsumer added in v1.1.670

func (s *Nats) EnsureServiceTaskConsumer(ctx context.Context, uid string) error

EnsureServiceTaskConsumer creates or updates a service task consumer.

func (*Nats) GetElement

func (s *Nats) GetElement(ctx context.Context, state *model.WorkflowState) (*model.Element, error)

GetElement gets the definition for the current element given a workflow state.

func (*Nats) GetExecutableWorkflowIds added in v1.1.670

func (s *Nats) GetExecutableWorkflowIds(ctx context.Context) ([]string, error)

GetExecutableWorkflowIds returns a list of all workflow Ids that contain executable processes

func (*Nats) GetExecution added in v1.0.645

func (s *Nats) GetExecution(ctx context.Context, executionID string) (*model.Execution, error)

GetExecution retrieves an execution given its ID.

func (*Nats) GetGatewayInstance

func (s *Nats) GetGatewayInstance(ctx context.Context, gatewayInstanceID string) (*model.Gateway, error)

GetGatewayInstance - returns a gateway instance from the KV store.

func (*Nats) GetGatewayInstanceID

func (s *Nats) GetGatewayInstanceID(state *model.WorkflowState) (string, string, error)

GetGatewayInstanceID - returns a gateawy instance ID and a satisfying route to that gateway.

func (*Nats) GetJob

func (s *Nats) GetJob(ctx context.Context, trackingID string) (*model.WorkflowState, error)

GetJob gets a workflow task state.

func (*Nats) GetLatestVersion

func (s *Nats) GetLatestVersion(ctx context.Context, workflowName string) (string, error)

GetLatestVersion queries the workflow versions table for the latest entry

func (*Nats) GetOldState

func (s *Nats) GetOldState(ctx context.Context, id string) (*model.WorkflowState, error)

GetOldState gets a task state given its tracking ID.

func (*Nats) GetProcessHistory

func (s *Nats) GetProcessHistory(ctx context.Context, processInstanceId string, wch chan<- *model.ProcessHistoryEntry, errs chan<- error)

GetProcessHistory fetches the history object for a process.

func (*Nats) GetProcessIdFor added in v1.1.725

func (s *Nats) GetProcessIdFor(ctx context.Context, startEventMessageName string) (string, error)

GetProcessIdFor retrieves the processId that a begun by a message start event

func (*Nats) GetProcessInstance

func (s *Nats) GetProcessInstance(ctx context.Context, processInstanceID string) (*model.ProcessInstance, error)

GetProcessInstance returns a process instance for a given process ID

func (*Nats) GetProcessInstanceStatus

func (s *Nats) GetProcessInstanceStatus(ctx context.Context, id string, wch chan<- *model.WorkflowState, errs chan<- error)

GetProcessInstanceStatus returns a list of workflow statuses for the specified process instance ID.

func (*Nats) GetTaskSpecByUID added in v1.0.623

func (s *Nats) GetTaskSpecByUID(ctx context.Context, uid string) (*model.TaskSpec, error)

GetTaskSpecByUID fetches a task spec from the database.

func (*Nats) GetTaskSpecUID added in v1.0.623

func (s *Nats) GetTaskSpecUID(ctx context.Context, name string) (string, error)

GetTaskSpecUID fetches

func (*Nats) GetTaskSpecUsage added in v1.1.670

func (s *Nats) GetTaskSpecUsage(ctx context.Context, uid []string) (*model.TaskSpecUsageReport, error)

GetTaskSpecUsage returns the usage report for a list of task specs.

func (*Nats) GetTaskSpecUsageByName added in v1.1.670

func (s *Nats) GetTaskSpecUsageByName(ctx context.Context, name string) (*model.TaskSpecUsageReport, error)

GetTaskSpecUsageByName produces a report of running and executable places where the task spec is in use.

func (*Nats) GetTaskSpecVersions added in v1.1.670

func (s *Nats) GetTaskSpecVersions(ctx context.Context, name string) (*model.TaskSpecVersions, error)

GetTaskSpecVersions fetches the versions of a given task spec name

func (*Nats) GetUserTaskIDs

func (s *Nats) GetUserTaskIDs(ctx context.Context, owner string) (*model.UserTasks, error)

GetUserTaskIDs gets a list of tasks given an owner.

func (*Nats) GetWorkflow

func (s *Nats) GetWorkflow(ctx context.Context, workflowID string) (*model.Workflow, error)

GetWorkflow - retrieves a workflow model given its ID

func (*Nats) GetWorkflowNameFor added in v1.0.645

func (s *Nats) GetWorkflowNameFor(ctx context.Context, processName string) (string, error)

GetWorkflowNameFor - get the worflow name a process is associated with

func (*Nats) GetWorkflowVersions

func (s *Nats) GetWorkflowVersions(ctx context.Context, workflowName string, wch chan<- *model.WorkflowVersion, errs chan<- error)

GetWorkflowVersions - returns a list of versions for a given workflow.

func (*Nats) HasValidProcess

func (s *Nats) HasValidProcess(ctx context.Context, processInstanceId, executionId string) (*model.ProcessInstance, *model.Execution, error)

HasValidProcess - checks for a valid process and instance for a workflow process and instance ids

func (*Nats) Heartbeat added in v1.1.731

func (s *Nats) Heartbeat(ctx context.Context, req *model.HeartbeatRequest) error

Heartbeat saves a client status to the client KV.

func (*Nats) KvsFor added in v1.1.797

func (s *Nats) KvsFor(ctx context.Context, ns string) (*NamespaceKvs, error)

KvsFor retrieves the shar KVs for a given namespace. If they do not exist for a namespace, it will initialise them and store them in a map for future lookup.

func (*Nats) ListExecutableProcesses added in v1.1.1032

func (s *Nats) ListExecutableProcesses(ctx context.Context, wch chan<- *model.ListExecutableProcessesItem, errs chan<- error)

ListExecutableProcesses returns a list of all the executable processes in SHAR. It retrieves the current SHAR namespace from the context and fetches the workflow versions for that namespace from the key-value store. It then iterates through each workflow version and loads the corresponding workflow. For each process in the workflow, it creates a ListExecutableProcessesItem object and populates it with the process name, workflow name, and the executable start parameters obtained from the workflow's start events. It sends each ListExecutableProcessesItem object to the wch channel.

Parameters: - ctx: The context containing the SHAR namespace. - wch: The channel for sending the list of executable processes. - errs: The channel for sending any errors that occur.

Returns: Nothing. Errors are sent to the errs channel if encountered.

func (*Nats) ListExecutionProcesses added in v1.0.645

func (s *Nats) ListExecutionProcesses(ctx context.Context, id string) ([]string, error)

ListExecutionProcesses gets the current processIDs for an execution.

func (*Nats) ListExecutions added in v1.0.645

func (s *Nats) ListExecutions(ctx context.Context, workflowName string, wch chan<- *model.ListExecutionItem, errs chan<- error)

ListExecutions returns a list of running workflows and versions given a workflow Name

func (*Nats) ListTaskSpecUIDs added in v1.1.725

func (s *Nats) ListTaskSpecUIDs(ctx context.Context, deprecated bool) ([]string, error)

ListTaskSpecUIDs lists UIDs of active (and optionally deprecated) tasks specs.

func (*Nats) ListWorkflows

func (s *Nats) ListWorkflows(ctx context.Context, res chan<- *model.ListWorkflowResponse, errs chan<- error)

ListWorkflows returns a list of all the workflows in SHAR.

func (*Nats) Log added in v1.1.754

func (s *Nats) Log(ctx context.Context, req *model.LogRequest) error

Log publishes LogRequest to WorkflowTelemetry Logs subject

func (*Nats) OwnerID

func (s *Nats) OwnerID(ctx context.Context, name string) (string, error)

OwnerID gets a unique identifier for a task owner.

func (*Nats) OwnerName

func (s *Nats) OwnerName(ctx context.Context, id string) (string, error)

OwnerName retrieves an owner name given an ID.

func (*Nats) ProcessServiceTasks added in v1.1.927

func (s *Nats) ProcessServiceTasks(ctx context.Context, wf *model.Workflow, svcTaskConsFn ServiceTaskConsumerFn, wfProcessMappingFn WorkflowProcessMappingFn) error

ProcessServiceTasks iterates over service tasks in the processes of a given workflow setting, validating them and setting their uid into their element definitions

func (*Nats) PublishMessage

func (s *Nats) PublishMessage(ctx context.Context, name string, key string, vars []byte) error

PublishMessage publishes a workflow message.

func (*Nats) PublishWorkflowState

func (s *Nats) PublishWorkflowState(ctx context.Context, stateName string, state *model.WorkflowState, opts ...PublishOpt) error

PublishWorkflowState publishes a SHAR state object to a given subject

func (*Nats) PutTaskSpec added in v1.0.623

func (s *Nats) PutTaskSpec(ctx context.Context, spec *model.TaskSpec) (string, error)

PutTaskSpec writes a task spec to the database.

func (*Nats) RecordHistory added in v1.1.927

func (s *Nats) RecordHistory(ctx context.Context, state *model.WorkflowState, historyType model.ProcessHistoryType) error

RecordHistory records into the history KV.

func (*Nats) RecordHistoryActivityComplete

func (s *Nats) RecordHistoryActivityComplete(ctx context.Context, state *model.WorkflowState) error

RecordHistoryActivityComplete records the activity completion into the history object.

func (*Nats) RecordHistoryActivityExecute

func (s *Nats) RecordHistoryActivityExecute(ctx context.Context, state *model.WorkflowState) error

RecordHistoryActivityExecute records the activity execute into the history object.

func (*Nats) RecordHistoryProcessAbort

func (s *Nats) RecordHistoryProcessAbort(ctx context.Context, state *model.WorkflowState) error

RecordHistoryProcessAbort records the process aborting into the history object.

func (*Nats) RecordHistoryProcessComplete

func (s *Nats) RecordHistoryProcessComplete(ctx context.Context, state *model.WorkflowState) error

RecordHistoryProcessComplete records the process completion into the history object.

func (*Nats) RecordHistoryProcessSpawn

func (s *Nats) RecordHistoryProcessSpawn(ctx context.Context, state *model.WorkflowState, newProcessInstanceID string) error

RecordHistoryProcessSpawn records the process spawning a new process into the history object.

func (*Nats) RecordHistoryProcessStart

func (s *Nats) RecordHistoryProcessStart(ctx context.Context, state *model.WorkflowState) error

RecordHistoryProcessStart records the process start into the history object.

func (*Nats) SaveState

func (s *Nats) SaveState(ctx context.Context, id string, state *model.WorkflowState) error

SaveState saves the task state.

func (*Nats) SetAbort

func (s *Nats) SetAbort(processor services.AbortFunc)

SetAbort sets the function called when a workflow object aborts.

func (*Nats) SetCompleteActivity

func (s *Nats) SetCompleteActivity(processor services.CompleteActivityFunc)

SetCompleteActivity sets the callback which generates complete activity events.

func (*Nats) SetCompleteActivityProcessor

func (s *Nats) SetCompleteActivityProcessor(processor services.CompleteActivityProcessorFunc)

SetCompleteActivityProcessor sets the callback fired when an activity completes.

func (*Nats) SetCompleteJobProcessor

func (s *Nats) SetCompleteJobProcessor(processor services.CompleteJobProcessorFunc)

SetCompleteJobProcessor sets the callback for completed tasks.

func (*Nats) SetEventProcessor

func (s *Nats) SetEventProcessor(processor services.EventProcessorFunc)

SetEventProcessor sets the callback for processing workflow activities.

func (*Nats) SetLaunchFunc

func (s *Nats) SetLaunchFunc(processor services.LaunchFunc)

SetLaunchFunc sets the callback used to start child workflows.

func (*Nats) SetMessageProcessor

func (s *Nats) SetMessageProcessor(processor services.MessageProcessorFunc)

SetMessageProcessor sets the callback used to create new workflow instances based on a timer.

func (*Nats) SetTraversalProvider

func (s *Nats) SetTraversalProvider(provider services.TraversalFunc)

SetTraversalProvider sets the callback used to handle traversals.

func (*Nats) Shutdown

func (s *Nats) Shutdown()

Shutdown signals the engine to stop processing.

func (*Nats) StartProcessing

func (s *Nats) StartProcessing(ctx context.Context) error

StartProcessing begins listening to all the message processing queues.

func (*Nats) StoreWorkflow

func (s *Nats) StoreWorkflow(ctx context.Context, wf *model.Workflow) (string, error)

StoreWorkflow stores a workflow definition and returns a unique ID

func (*Nats) XDestroyProcessInstance added in v1.0.645

func (s *Nats) XDestroyProcessInstance(ctx context.Context, state *model.WorkflowState) error

XDestroyProcessInstance terminates a running process instance with a cancellation reason and error

type PublishOpt

type PublishOpt interface {
	Apply(n *publishOptions)
}

PublishOpt represents an option that can be used when publishing a workflow state

type ServiceTaskConsumerFn added in v1.1.927

type ServiceTaskConsumerFn func(ctx context.Context, id string) error

ServiceTaskConsumerFn defines the type of a function that ensures existence of a service task consumer

type WorkflowProcessMappingFn added in v1.1.927

type WorkflowProcessMappingFn func(ctx context.Context, wf *model.Workflow, i *model.Process) (uint64, error)

WorkflowProcessMappingFn defines the type of a function that creates a workflow to process mapping

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL