dynamap

package module
v0.9.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 17, 2024 License: Apache-2.0 Imports: 14 Imported by: 1

README

DynaMap™

DynaMap™ is a lightweight workflow engine written in Go for building business processes based on Business Process Model and Notation 2.0 (BPMN).

The core of DynaMap is agnostic to how processes are started, created, or interacted with, making it suitable for use in command line tools, microservices, web applications, or embedding with software written in another language. Processes are compiled to standalone executables enabling deployment in a wide variety of environments.

Workflow engines are typically designed to store process state in a specific technology such as an SQL database. This is an appealing approach initially, but it can have deployment, maintenance and performance limitations in the long-run.

DynaMap core does not manage storing state externally, enabling customization by developers to suit their requirements. This enables tradeoffs in performance. For example in some cases, persisting process state is not required, but high throughput and low latency is desired. In other cases, the ability to recover from a system crash and resume the process is more important than lower processing speeds due to serialization. DynaMap lets developers and businesses make these tradeoffs on a process level, where different processes can have different storage requirements, instead of the entire engine forcing every process to use the same storage mechanism and schema. This fits well with the database per microservice approach. However, DynaMap can work in monolith, microservice, or serverless approaches.

Repositories

The primary repository is located at https://gitlab.com/dynamap. There are several sub repositories:

Dependencies

DynaMap core is designed to have no 3rd party dependencies other than Golang to keep the engine lightweight, and not require managing 3rd party dependencies. Other layers on top of the core engine may contain dependencies, such as DynaMap web services, data stores, and examples.

Getting started

The quickest way to get started, is with the Hello World BPMN Process example.

Workflows may be created in open source and commercial BPMN modeling tools. For recommendations please reach out to Quantek Systems.

The DynaMap Examples repository is the primary location for examples of different use cases: https://gitlab.com/dynamap/examples

Web services for processes may be generated using DynaMap Web: https://gitlab.com/dynamap/web

Supported BPMN Elements

The following BPMN elements are currently implemented:

  • None Start Event
  • Timer Start Event (only repeating time cycles, e.g. R/)
  • Exclusive Gateway
  • Parallel Gateway
  • Script Task (Go)
  • User Task
  • None End Event

License

Apache License, Version 2.0

Copyright 2023 Quantek Systems Inc.

https://www.quanteksystems.com

Documentation

Index

Constants

View Source
const (
	ElementCreating    = "Creating"
	ElementCreated     = "Created"
	ElementActivating  = "Activating"
	ElementActivated   = "Activated"
	ElementCompleting  = "Completing"
	ElementCompleted   = "Completed"
	ElementTerminating = "Terminating"
	ElementTerminated  = "Terminated"
)
View Source
const (
	ProcessInstanceType            = "ProcessInstance"
	TokenType                      = "Token"
	SequenceFlowType               = "SequenceFlow"
	ExclusiveGatewayType           = "ExclusiveGateway"
	ParallelGatewayType            = "ParallelGateway"
	NoneStartEventType             = "NoneStartEvent"
	NoneIntermediateThrowEventType = "NoneIntermediateThrowEvent"
	NoneEndEventType               = "NoneEndEvent"
	TimerStartEventType            = "TimerStartEventType"
	UserTaskType                   = "UserTaskType"
	ScriptTaskType                 = "ScriptTaskType"
	SubProcessType                 = "SubProcess"
)
View Source
const (
	R       token = iota // Repeat
	P                    // Duration
	T                    // Time
	Y                    // Year
	M                    // Month or Minute
	D                    // Day
	H                    // Hour
	S                    // Second
	INTEGER              // Integer
	DECIMAL              // Decimal
	SLASH                // /
	ILLEGAL              // Illegal
	EOF                  // End of file
)
View Source
const (
	DefaultSubscriberTimeout         = 2 * time.Second // default timeout to consider a subscriber slow
	DefaultSubscriberMaxTimeoutCount = 10              // default max number of timeouts occurred before a subscrriber is removed
)

Variables

View Source
var (
	ErrBPMNEngineNotStarted                 = NewBPMNEngineError(nil, "engine: not started.")
	ErrBPMNEngineCancelled                  = NewBPMNEngineError(nil, "engine: cancelled.")
	ErrBPMNEngineIdGeneratorNotSet          = NewBPMNEngineError(nil, "engine: Id Generator not set.")
	ErrBPMNEngineProcessInstanceStoreNotSet = NewBPMNEngineError(nil, "engine: Process Instance Store not set.")
	ErrBPMNEngineTokenStoreNotSet           = NewBPMNEngineError(nil, "engine: Token Store not set.")
	ErrBPMNEngineStateStoreNotSet           = NewBPMNEngineError(nil, "engine: State Store not set.")
	ErrBPMNEngineSendCommandTimeout         = NewBPMNEngineError(nil, "engine: send command timeout.")
	ErrBPMNEngineReceiveCommandTimeout      = NewBPMNEngineError(nil, "engine: receive command timeout.")
	ErrBPMNEngineProcessInstanceNotFound    = NewBPMNEngineError(nil, "engine: process instance not found.")
	ErrBPMNEngineProcessInstanceNotRunning  = NewBPMNEngineError(nil, "engine: process instance not running.")
)
View Source
var (
	ErrProcessStateMachineRequired  = NewBPMNEngineError(nil, "process: process instance implementation must implement ProcessStateMachine")
	ErrProcessInstanceNotRunning    = NewBPMNEngineError(nil, "process: process instance is completed")
	ErrProcessSendCommandTimeout    = NewBPMNEngineError(nil, "process: send command timeout.")
	ErrProcessReceiveCommandTimeout = NewBPMNEngineError(nil, "process: receive command timeout.")
)
View Source
var (
	ErrDataUnmarshal = NewBPMNEngineError(nil, "")
)
View Source
var (
	ErrStoreElementNotFound = NewBPMNEngineError(nil, "element not found")
)

Functions

func TaskData added in v0.9.0

func TaskData[T any](engine *BPMNEngine, key string) T

TaskData return data for the given task indexed by key

Types

type BPMNEngine

type BPMNEngine struct {
	IdGenerator          IdGenerator           // Generates ids for elements
	StartEventProcessors []StartEventProcessor // Start Event Processors to run on startup
	ProcessInstanceStore ProcessInstanceStore  // Stores process instance states received from state stream
	TokenStore           TokenStore            // Stores token states received from state stream
	StateStore           StateStore            // Stores states received from stateStream
	Ctx                  context.Context       // Context for the workload engine

	// control start rate of processes
	// default is 5 processes in 10 seconds
	// number of processes to start in given span
	ProcessorStartNum uint
	// time span to start above number of processes
	ProcessorStartTime uint
	// contains filtered or unexported fields
}

BPMNEngine manages starting, stopping, and sending commands to process instances.

func NewBPMNEngine

func NewBPMNEngine(g IdGenerator, pis ProcessInstanceStore, ts TokenStore, ss StateStore) *BPMNEngine

NewBPMNEngine creates a new BPMNEngine with given IdGenerator, StateStore

func NewMemoryBPMNEngine

func NewMemoryBPMNEngine() *BPMNEngine

NewMemoryBPMNEngine creates a new BPMNEngine with an in-memory id generator, state store, and history store.

func (*BPMNEngine) AddStartEventProcesser

func (b *BPMNEngine) AddStartEventProcesser(processor StartEventProcessor)

AddStartEventProcessor adds an StartEventProcessor to the engine.

func (*BPMNEngine) AddStartEventProcessor added in v0.9.0

func (b *BPMNEngine) AddStartEventProcessor(p StartEventProcessor)

func (*BPMNEngine) GenerateId

func (b *BPMNEngine) GenerateId(elementType string) string

GenerateId generates a new id for a given element type

func (*BPMNEngine) ProcessInstanceCount

func (b *BPMNEngine) ProcessInstanceCount() int

ProcessInstanceCount returns the count of process instances.

func (*BPMNEngine) RunProcessInstanceToCompletion added in v0.4.0

func (b *BPMNEngine) RunProcessInstanceToCompletion(pi *ProcessInstance) error

RunProcessInstanceToCompletion starts a new process instance and runs it to completion synchronously.

func (*BPMNEngine) SendCommand

func (b *BPMNEngine) SendCommand(cmd any) (any, error)

SendCommand sends any command to a process instance

func (*BPMNEngine) SendCommandWithTimeout

func (b *BPMNEngine) SendCommandWithTimeout(cmd any, timeout time.Duration) (any, error)

SendCommandWithTimeout sends any command to a process instance with a timeout

func (*BPMNEngine) Start

func (b *BPMNEngine) Start() error

Start starts the engine, and starts go routines for the state strearm, commands, and starteventprocessors.

func (*BPMNEngine) StartWithContext added in v0.9.1

func (b *BPMNEngine) StartWithContext(ctx context.Context, done func()) error

func (*BPMNEngine) Stop

func (b *BPMNEngine) Stop()

Stop stops the engine

func (*BPMNEngine) Wait

func (b *BPMNEngine) Wait()

Wait waits for all process instances to complete WTF is this ?

func (*BPMNEngine) WriteElement added in v0.5.0

func (b *BPMNEngine) WriteElement(state ElementState) error

WriteState sends the element state to the state stream for writing

func (*BPMNEngine) WriteProcessInstanceState

func (b *BPMNEngine) WriteProcessInstanceState(state ProcessInstanceState) error

WriteProcessInstanceState sends the process instance state to the state stream for writing

func (*BPMNEngine) WriteToken

func (b *BPMNEngine) WriteToken(state TokenState) error

WriteToken sends the token state to the state stream for writing

type BPMNEngineError

type BPMNEngineError struct {
	Inner      error          // Optional wrapped inner error
	Message    string         // Message describing the error
	StackTrace string         // Stacktrace in which the error was produced
	Misc       map[string]any // Optional information
}

BPMNEngineError describes an error produced in the engine.

func NewBPMNEngineError

func NewBPMNEngineError(err error, messagef string, msgArgs ...any) BPMNEngineError

NewBPMNEngineError creates a new error related to the BPMNEngine.

func (BPMNEngineError) Error

func (err BPMNEngineError) Error() string

Error implements the error interface and prints the BPMNEngineError message, inner error and stack trace.

type BaseTask

type BaseTask struct {
	CompletedBy
	Comments
}

type ClockSource added in v0.9.0

type ClockSource interface {
	SpecHandler(spec string, handler func()) error
	Start()
	Stop()
}

ClockSource interface for externally clocked timer start processes

type CmdError

type CmdError interface {
	Error() error
}

type Comments

type Comments struct {
	Comments string
}

type CompleteTasksResp

type CompleteTasksResp struct {
	TasksResp []CompleteUserTaskResp
	Errors    []error
}

func NewCompleteTasksResp

func NewCompleteTasksResp() *CompleteTasksResp

func (CompleteTasksResp) Error added in v0.7.0

func (c CompleteTasksResp) Error() error

type CompleteUserTaskCmd

type CompleteUserTaskCmd struct {
	ProcessInstanceId string
	TaskId            string
	Data              any
}

type CompleteUserTaskResp

type CompleteUserTaskResp struct {
	ProcessInstanceId string
	TaskId            string
	Error             error
}

type CompleteUserTasksCmd

type CompleteUserTasksCmd struct {
	Tasks []CompleteUserTaskCmd
}

type CompletedBy

type CompletedBy struct {
	CompletedById   string
	CompletedByUser string
}

type Duration

type Duration struct {
	Value   string
	Year    string
	Month   string
	Week    string
	Day     string
	Hour    string
	Minute  string
	Second  string
	HasTime bool
}

func (Duration) DayFloat

func (d Duration) DayFloat() (float64, error)

func (Duration) Duration

func (d Duration) Duration() (time.Duration, error)

func (Duration) HourFloat

func (d Duration) HourFloat() (float64, error)

func (Duration) MinuteFloat

func (d Duration) MinuteFloat() (float64, error)

func (Duration) MonthFloat

func (d Duration) MonthFloat() (float64, error)

func (Duration) SecondFloat

func (d Duration) SecondFloat() (float64, error)

func (Duration) YearFloat

func (d Duration) YearFloat() (float64, error)

type ElementReader added in v0.5.0

type ElementReader interface {
	// ReadElementState returns the current state for a given elementType and id.
	ReadElementState(elementType string, id string) (ElementState, error)
	// ReadElementStates returns all states in the store for a specific element type and key
	ReadElementStates(elementType string, key string) ([]ElementState, error)
	// ReadAllElementStates returns all states in the store
	ReadAllElementStates() ([]ElementState, error)
}

ElementReader reads a StateEntry based on filters

type ElementState

type ElementState struct {
	ElementType       string // The type of element, e.g. SequenceFlow, NoneStartEvent, ParallelGateway, etc.
	Key               string // The unique name for an element in a process at design time.
	Id                string // The unique id created at run-time for an instance of the element.
	Status            string // The status of the element state, e.g. Completing, Completed, etc.
	ProcessInstanceId string // The id of the process instance it belongs to
	TokenId           string // The id of the token it belongs to
	Object            any    `json:"-"` // A reference to the underlying element if required for direct access, e.g. to enable custom storage.
	Store             bool   // Whether to store the element state
}

ElementState contains the primary information required to describe the state of any element.

type ElementStateProvider

type ElementStateProvider interface {
	GetElementState() ElementState
}

ElementStateProvider is an interface that provides the state of an element.

type ElementStateSubscriber added in v0.5.0

type ElementStateSubscriber interface {
	SubscribeElementState() (uint64, <-chan ElementState)
	UnsubscribeElementState(uint64)
}

type ElementWriter added in v0.5.0

type ElementWriter interface {
	// WriteElement writes an element state
	WriteElement(state ElementState) error
}

ElementWriter writes a StateEntry

type ExclusiveGateway

type ExclusiveGateway struct {
	ElementState
	Handler     func() ElementStateProvider
	NextElement ElementStateProvider
}

func NewExclusiveGateway

func NewExclusiveGateway(pid, tid, key, id string, handler func() ElementStateProvider) *ExclusiveGateway

func (*ExclusiveGateway) Completed

func (eg *ExclusiveGateway) Completed(ctx context.Context, bpmnEngine *BPMNEngine, token *Token)

func (*ExclusiveGateway) Completing

func (eg *ExclusiveGateway) Completing(ctx context.Context, bpmnEngine *BPMNEngine, token *Token)

func (*ExclusiveGateway) GetElementState

func (eg *ExclusiveGateway) GetElementState() ElementState

func (*ExclusiveGateway) RefElement

func (eg *ExclusiveGateway) RefElement() any

func (*ExclusiveGateway) RunLifecycle

func (eg *ExclusiveGateway) RunLifecycle(ctx context.Context, bpmnEngine *BPMNEngine, token *Token) error

type ExecuteCmd

type ExecuteCmd interface {
	ExecuteCmd(cmd any) any
}

type GetProcessInstanceInfoCmd

type GetProcessInstanceInfoCmd struct{}

type GetProcessInstancesCmd

type GetProcessInstancesCmd struct {
}

type GetProcessInstancesResp

type GetProcessInstancesResp struct {
	Data   []ProcessInstanceState
	Errors []error
}

func NewGetProcessInstancesResp

func NewGetProcessInstancesResp() *GetProcessInstancesResp

type GetTasksCmd

type GetTasksCmd struct{}

type GetTasksInfoCmd

type GetTasksInfoCmd struct{}

type GetTasksResp

type GetTasksResp struct {
	Data   []TaskState
	Errors []error
}

func NewGetTasksResp

func NewGetTasksResp() *GetTasksResp

type GetUserTaskCmd

type GetUserTaskCmd struct {
}

type ISO8601DateTime

type ISO8601DateTime struct {
	Value          string
	Duration       Duration
	IsDuration     bool
	IsRepeating    bool
	IsIndefinitely bool
}

func NewISO8601DateTime

func NewISO8601DateTime(value string) (ISO8601DateTime, error)

TODO(FEI) improve quote removal

type ISO8601Parser

type ISO8601Parser struct {
	// contains filtered or unexported fields
}

ISO8601Parser represents an ISO8601 parser.

func NewISO8601Parser

func NewISO8601Parser(r io.Reader) *ISO8601Parser

NewISO8601Parser returns a new instance of ISO8601Parser.

func (*ISO8601Parser) Parse

func (p *ISO8601Parser) Parse() (ISO8601DateTime, error)

type ISO8601Scanner

type ISO8601Scanner struct {
	// contains filtered or unexported fields
}

ISO8601Scanner represents a lexical scanner for ISO8601 durations

func NewISO8601Scanner

func NewISO8601Scanner(r io.Reader) *ISO8601Scanner

NewISO8601Scanner returns a new instance of an ISO8601Scanner.

func (*ISO8601Scanner) Scan

func (s *ISO8601Scanner) Scan() (tok token, value string)

Scan returns the next token and literal value.

type IdGenerator

type IdGenerator interface {
	GenerateId(elementType string) string
}

type LifeCycleRunner

type LifeCycleRunner interface {
	RunLifecycle(ctx context.Context, bpmnEngine *BPMNEngine, token *Token) error
}

LifeCycleRunner runs the life cycle of an element through all required states for the element such as Creating to Completed.

type MemoryIdGenerator

type MemoryIdGenerator struct {
	// contains filtered or unexported fields
}

func NewMemoryIdGenerator

func NewMemoryIdGenerator() *MemoryIdGenerator

func (*MemoryIdGenerator) GenerateId

func (g *MemoryIdGenerator) GenerateId(_ string) string

func (*MemoryIdGenerator) Start

func (g *MemoryIdGenerator) Start(ctx context.Context)

type MemoryProcessInstanceStore added in v0.2.0

type MemoryProcessInstanceStore struct {
	// contains filtered or unexported fields
}

MemoryProcessInstanceStore stores the state of process instances and their tokens

type MemoryStateStore

type MemoryStateStore struct {
	IdGenerator IdGenerator // generates ids for process instances, tokens, and elements

	StoreCompleted bool // flag indicating whether completed process instances, tokens, and elements should be removed when complete.
	// contains filtered or unexported fields
}

MemoryStateStore stores process instance, token, and element states in memory

func NewMemoryStateStore

func NewMemoryStateStore() *MemoryStateStore

NewMemoryStateStore creates a new in memory state store

func (*MemoryStateStore) CreateProcessInstance added in v0.5.0

func (m *MemoryStateStore) CreateProcessInstance(state ProcessInstanceState) (string, error)

func (*MemoryStateStore) ReadAllElementStates added in v0.5.0

func (m *MemoryStateStore) ReadAllElementStates() ([]ElementState, error)

ReadAllElementStates returns all states in the store

func (*MemoryStateStore) ReadElementState added in v0.5.0

func (m *MemoryStateStore) ReadElementState(elementType string, id string) (ElementState, error)

ReadElementState returns the current state for a given elementType and id.

func (*MemoryStateStore) ReadElementStates added in v0.3.0

func (m *MemoryStateStore) ReadElementStates(elementType string, key string) ([]ElementState, error)

ReadElementStates returns all states in the store for a specific element type and key

func (*MemoryStateStore) ReadProcessInstance

func (m *MemoryStateStore) ReadProcessInstance(id string) (ProcessInstanceState, error)

ReadProcessInstance reads the state of a process instance. Does not include tokens or element states.

func (*MemoryStateStore) ReadProcessInstances

func (m *MemoryStateStore) ReadProcessInstances() ([]ProcessInstanceState, error)

ReadProcessInstances reads the state of all process instances. Does not include tokens or element states.

func (*MemoryStateStore) ReadTokens

func (m *MemoryStateStore) ReadTokens() ([]TokenState, error)

ReadTokens reads the state of all tokens

func (*MemoryStateStore) SubscribeElementState added in v0.5.0

func (m *MemoryStateStore) SubscribeElementState() (uint64, <-chan ElementState)

SubscribeElementState creates a new subscription for element changes and returns a new id for the subscriber and a channel to receive notifications. The id is needed to call UnsubscribeElementState.

func (*MemoryStateStore) UnsubscribeElementState added in v0.5.0

func (m *MemoryStateStore) UnsubscribeElementState(id uint64)

UnsubscribeElementState removes a subscriber

func (*MemoryStateStore) WriteElement added in v0.5.0

func (m *MemoryStateStore) WriteElement(state ElementState) error

WriteElement writes an element's state

func (*MemoryStateStore) WriteProcessInstanceState

func (m *MemoryStateStore) WriteProcessInstanceState(state ProcessInstanceState) error

WriteProcessInstanceState writes the state of a process instance. Does not include tokens or element states.

func (*MemoryStateStore) WriteToken

func (m *MemoryStateStore) WriteToken(state TokenState) error

WriteToken writes the state of a token

type MemoryStateStoreElementSubscriber added in v0.3.0

type MemoryStateStoreElementSubscriber struct {
	// contains filtered or unexported fields
}

MemoryStateStoreElementSubscriber stores communication information about a subscriber to element state changes

type MemoryTokenStore added in v0.2.0

type MemoryTokenStore struct {
	// contains filtered or unexported fields
}

MemoryTokenStore stores the state of tokens

type NoneEndEvent

type NoneEndEvent struct {
	ElementState
}

func NewNoneEndEvent

func NewNoneEndEvent(pid, tid, key, id string, store bool) *NoneEndEvent

func (*NoneEndEvent) GetElementState

func (e *NoneEndEvent) GetElementState() ElementState

func (*NoneEndEvent) RefElement

func (e *NoneEndEvent) RefElement() any

func (*NoneEndEvent) RunLifecycle

func (e *NoneEndEvent) RunLifecycle(ctx context.Context, bpmnEngine *BPMNEngine, token *Token) error

type NoneIntermediateThrowEvent

type NoneIntermediateThrowEvent struct {
	ElementState
}

func NewNoneIntermediateThrowEvent

func NewNoneIntermediateThrowEvent(pid, tid, key, id string) *NoneIntermediateThrowEvent

func (*NoneIntermediateThrowEvent) GetElementState

func (e *NoneIntermediateThrowEvent) GetElementState() ElementState

func (*NoneIntermediateThrowEvent) RefElement

func (e *NoneIntermediateThrowEvent) RefElement() any

func (*NoneIntermediateThrowEvent) RunLifecycle

func (e *NoneIntermediateThrowEvent) RunLifecycle(ctx context.Context, bpmnEngine *BPMNEngine, token *Token) error

type ParallelGateway

type ParallelGateway struct {
	ElementState
	CurrentSeqKey string
	InSeqKeys     []string
	OutSeqKeys    []string
}

func NewParallelGateway

func NewParallelGateway(pid, tid, key, id, currentSeqKey string, inSeqKeys, outSeqKeys []string) *ParallelGateway

func (*ParallelGateway) Completed

func (pg *ParallelGateway) Completed(ctx context.Context, bpmnEngine *BPMNEngine, token *Token)

func (*ParallelGateway) Completing

func (pg *ParallelGateway) Completing(ctx context.Context, bpmnEngine *BPMNEngine, token *Token) error

func (*ParallelGateway) GetElementState

func (pg *ParallelGateway) GetElementState() ElementState

func (*ParallelGateway) RefElement

func (pg *ParallelGateway) RefElement() any

func (*ParallelGateway) RunLifecycle

func (pg *ParallelGateway) RunLifecycle(ctx context.Context, bpmnEngine *BPMNEngine, token *Token) error

type ParallelGatewayCmd

type ParallelGatewayCmd struct {
	// contains filtered or unexported fields
}

type PingProcessInstanceCmd

type PingProcessInstanceCmd struct{}

type PrimarySecondaryStateStore added in v0.5.0

type PrimarySecondaryStateStore struct {
	Primary   StateStore
	Secondary []StateStore
}

PrimarySecondaryStateStore stores runtime data in the primary store, and replicates data to secondary stores. Read operations occur from the primary store.

func NewPrimarySecondaryStateStore added in v0.5.0

func NewPrimarySecondaryStateStore(primary StateStore, secondary []StateStore) *PrimarySecondaryStateStore

NewPrimarySecondaryStateStore creates a new composite store that stores runtime data in the primary store, and replicates data to secondary stores. Read operations occur from the primary store.

func (*PrimarySecondaryStateStore) CreateProcessInstance added in v0.5.0

func (s *PrimarySecondaryStateStore) CreateProcessInstance(state ProcessInstanceState) (string, error)

func (*PrimarySecondaryStateStore) ReadAllElementStates added in v0.5.0

func (s *PrimarySecondaryStateStore) ReadAllElementStates() ([]ElementState, error)

ReadAllElementStates returns all states in the primary store

func (*PrimarySecondaryStateStore) ReadElementState added in v0.5.0

func (s *PrimarySecondaryStateStore) ReadElementState(elementType string, id string) (ElementState, error)

ReadElementState returns the current state from the primary store for a given elementType and id.

func (*PrimarySecondaryStateStore) ReadElementStates added in v0.5.0

func (s *PrimarySecondaryStateStore) ReadElementStates(elementType string, key string) ([]ElementState, error)

ReadElementStates returns all states in the primary store for a specific element type and key

func (*PrimarySecondaryStateStore) ReadProcessInstance added in v0.5.0

func (s *PrimarySecondaryStateStore) ReadProcessInstance(id string) (ProcessInstanceState, error)

ReadProcessInstance reads process instance state from the primary store by id

func (*PrimarySecondaryStateStore) ReadProcessInstances added in v0.5.0

func (s *PrimarySecondaryStateStore) ReadProcessInstances() ([]ProcessInstanceState, error)

ReadProcessInstances reads all process instance state from the primary store

func (*PrimarySecondaryStateStore) ReadTokens added in v0.5.0

func (s *PrimarySecondaryStateStore) ReadTokens() ([]TokenState, error)

ReadTokens reads all tokens from the primary store

func (*PrimarySecondaryStateStore) SubscribeElementState added in v0.5.0

func (s *PrimarySecondaryStateStore) SubscribeElementState() (uint64, <-chan ElementState)

SubscribeElementState subscribes to element state changes in the primary store.

func (*PrimarySecondaryStateStore) UnsubscribeElementState added in v0.5.0

func (s *PrimarySecondaryStateStore) UnsubscribeElementState(u uint64)

UnsubscribeElementState unsubscribes an element state change subscriber from the primary store.

func (*PrimarySecondaryStateStore) WriteElement added in v0.5.0

func (s *PrimarySecondaryStateStore) WriteElement(state ElementState) error

WriteElement writes element state to the primary and secondary stores

func (*PrimarySecondaryStateStore) WriteProcessInstanceState added in v0.5.0

func (s *PrimarySecondaryStateStore) WriteProcessInstanceState(state ProcessInstanceState) error

WriteProcessInstanceState writes process instance state to the primary store and secondary stores.

func (*PrimarySecondaryStateStore) WriteToken added in v0.5.0

func (s *PrimarySecondaryStateStore) WriteToken(state TokenState) error

WriteToken writes token state to the primary and secondary stores

type ProcessInstance

type ProcessInstance struct {
	Mu           *sync.RWMutex
	Key          string
	Id           string
	Version      string
	Status       string
	Created      time.Time
	StartElement ElementStateProvider
	Tokens       []*Token
	BpmnEngine   *BPMNEngine
	Impl         any // Implementation of BPMN Process
	// contains filtered or unexported fields
}

func (*ProcessInstance) Activated

func (p *ProcessInstance) Activated()

func (*ProcessInstance) Activating

func (p *ProcessInstance) Activating()

func (*ProcessInstance) CompleteTask

func (p *ProcessInstance) CompleteTask(cmd CompleteUserTaskCmd) any

func (*ProcessInstance) Completed

func (p *ProcessInstance) Completed()

func (*ProcessInstance) Completing

func (p *ProcessInstance) Completing()

func (*ProcessInstance) Creating added in v0.5.0

func (p *ProcessInstance) Creating() error

func (*ProcessInstance) GetProcessInstanceInfo

func (p *ProcessInstance) GetProcessInstanceInfo() ProcessInstanceState

func (*ProcessInstance) GetTasksInfo

func (p *ProcessInstance) GetTasksInfo() []TaskState

func (*ProcessInstance) InitProcessInstance added in v0.5.0

func (p *ProcessInstance) InitProcessInstance(ctx context.Context, cancel context.CancelFunc) error

func (*ProcessInstance) ProcessInstanceState added in v0.5.0

func (p *ProcessInstance) ProcessInstanceState() ProcessInstanceState

func (*ProcessInstance) ProcessParallelGateway

func (p *ProcessInstance) ProcessParallelGateway(v ParallelGateway) <-chan any

func (*ProcessInstance) RefElement

func (p *ProcessInstance) RefElement() any

func (*ProcessInstance) RemoveToken added in v0.5.0

func (p *ProcessInstance) RemoveToken(token *Token)

func (*ProcessInstance) Run

func (p *ProcessInstance) Run(wg *sync.WaitGroup) error

func (*ProcessInstance) RunToCompletion added in v0.4.0

func (p *ProcessInstance) RunToCompletion(ctx context.Context)

Todo: RunToCompletion missing some initialization done in Run()

func (*ProcessInstance) Running

func (p *ProcessInstance) Running() bool

func (*ProcessInstance) SendCommand

func (p *ProcessInstance) SendCommand(cmd any) (any, error)

SendCommand Called from external client to communicate with process instance

func (*ProcessInstance) SendCommandWithTimeout

func (p *ProcessInstance) SendCommandWithTimeout(cmd any, timeout time.Duration) (any, error)

func (*ProcessInstance) Shutdown added in v0.5.0

func (p *ProcessInstance) Shutdown()

func (*ProcessInstance) String

func (p *ProcessInstance) String() string

func (*ProcessInstance) Terminated

func (p *ProcessInstance) Terminated()

func (*ProcessInstance) Terminating

func (p *ProcessInstance) Terminating()

func (*ProcessInstance) WriteState

func (p *ProcessInstance) WriteState()

type ProcessInstanceCmd

type ProcessInstanceCmd struct {
	Id  string
	Cmd any
}

type ProcessInstanceCreator added in v0.5.0

type ProcessInstanceCreator interface {
	// CreateProcessInstance creates a new process instance in the store.
	// If the process instance does not already have an id, it is generated
	// by the store and returned as a string. If there is an error, the
	// id will be empty and an error returned.
	CreateProcessInstance(state ProcessInstanceState) (string, error)
}

ProcessInstanceCreator creates a new process instance in a store

type ProcessInstanceDataRetriever

type ProcessInstanceDataRetriever interface {
	ProcessInstanceData() any
}

type ProcessInstanceReader

type ProcessInstanceReader interface {
	ReadProcessInstance(id string) (ProcessInstanceState, error) // Get rid of?
	ReadProcessInstances() ([]ProcessInstanceState, error)
}

ProcessInstanceReader reads the state of process instance(s)

type ProcessInstanceState

type ProcessInstanceState struct {
	Key     string
	Id      string
	Version string
	Status  string
	Created time.Time
	Data    any
}

type ProcessInstanceStore

type ProcessInstanceStore interface {
	ProcessInstanceCreator
	ProcessInstanceReader
	ProcessInstanceWriter
}

ProcessInstanceStore can read and write process instances

type ProcessInstanceWriter

type ProcessInstanceWriter interface {
	WriteProcessInstanceState(state ProcessInstanceState) error
}

ProcessInstanceWriter writes the state of a process to a store

type ProcessStateMachine

type ProcessStateMachine interface {
	GetNextElement(engine *BPMNEngine, g IdGenerator, t *Token, currentElement ElementStateProvider) ElementStateProvider
}

type ScriptTask

type ScriptTask struct {
	ElementState
	// contains filtered or unexported fields
}

func NewScriptTask

func NewScriptTask(pid, tid, key, id string, script func(ctx context.Context, bpmnEngine *BPMNEngine, token *Token, task *ScriptTask)) *ScriptTask

func (*ScriptTask) Activated added in v0.7.0

func (st *ScriptTask) Activated(ctx context.Context, bpmnEngine *BPMNEngine, token *Token)

func (*ScriptTask) Completed

func (st *ScriptTask) Completed(ctx context.Context, bpmnEngine *BPMNEngine, token *Token)

func (*ScriptTask) GetElementState

func (st *ScriptTask) GetElementState() ElementState

func (*ScriptTask) RunLifecycle

func (st *ScriptTask) RunLifecycle(ctx context.Context, bpmnEngine *BPMNEngine, token *Token) error

type SendTask

type SendTask struct {
	ElementState
}

func (*SendTask) RunLifecycle

func (s *SendTask) RunLifecycle(ctx context.Context, bpmnEngine *BPMNEngine, token *Token) error

type SequenceFlow

type SequenceFlow struct {
	ElementState
}

func NewSequenceFlow

func NewSequenceFlow(pid, tid, key, id string) *SequenceFlow

func (*SequenceFlow) GetElementState

func (s *SequenceFlow) GetElementState() ElementState

func (*SequenceFlow) RunLifecycle

func (s *SequenceFlow) RunLifecycle(ctx context.Context, bpmnEngine *BPMNEngine, token *Token) error

type StartEvent

type StartEvent struct {
	ElementState
}

StartEvent represents a BPMN none start event.

func NewNoneStartEvent

func NewNoneStartEvent(pid, tid, key, id string) *StartEvent

NewNoneStartEvent creates a new none start event.

func (*StartEvent) GetElementState

func (e *StartEvent) GetElementState() ElementState

GetElementState returns the state of the StartEvent instance

func (*StartEvent) RefElement

func (e *StartEvent) RefElement() any

func (*StartEvent) RunLifecycle

func (e *StartEvent) RunLifecycle(ctx context.Context, bpmnEngine *BPMNEngine, token *Token) error

type StartEventProcessor

type StartEventProcessor func(context.Context, *BPMNEngine) error

A StartEventProcessor is responsible for starting processes.

type StartProcessInstanceCmd

type StartProcessInstanceCmd struct {
	Instance *ProcessInstance
}

type StartProcessInstanceResp

type StartProcessInstanceResp struct {
	Id  string
	Err error
}

func (StartProcessInstanceResp) Error

func (s StartProcessInstanceResp) Error() error

type SubProcess added in v0.7.0

type SubProcess struct {
	ElementState
	NumberOfInstances int
	StartElementKey   string
	Tokens            []*Token
	BpmnEngine        *BPMNEngine
	// contains filtered or unexported fields
}

func NewSubProcess added in v0.7.0

func NewSubProcess(pid, tid, key, id string, engine *BPMNEngine, g IdGenerator, startElementKey string, numberOfInstances int, getNextElement func(engine *BPMNEngine, g IdGenerator, t *Token, currentElement ElementStateProvider, loopCounter int) ElementStateProvider) *SubProcess

func (*SubProcess) Activated added in v0.7.0

func (sp *SubProcess) Activated(ctx context.Context, bpmnEngine *BPMNEngine, token *Token)

func (*SubProcess) Completed added in v0.7.0

func (sp *SubProcess) Completed(ctx context.Context, bpmnEngine *BPMNEngine, token *Token)

func (*SubProcess) ExecuteProcesses added in v0.7.0

func (sp *SubProcess) ExecuteProcesses(ctx context.Context, bpmnEngine *BPMNEngine, parentToken *Token)

func (*SubProcess) GetElementState added in v0.7.0

func (sp *SubProcess) GetElementState() ElementState

func (*SubProcess) RunLifecycle added in v0.7.0

func (sp *SubProcess) RunLifecycle(ctx context.Context, bpmnEngine *BPMNEngine, token *Token) error

type TaskState

type TaskState struct {
	ProcessInstanceKey     string
	ProcessInstanceId      string
	ProcessInstanceVersion string
	Key                    string
	Id                     string
	Status                 string
	Data                   any
}

type TimerStartEvent

type TimerStartEvent struct {
	ElementState
}

func NewTimerStartEvent

func NewTimerStartEvent(pid, tid, key, id string) *TimerStartEvent

func (*TimerStartEvent) GetElementState

func (e *TimerStartEvent) GetElementState() ElementState

func (*TimerStartEvent) RefElement

func (e *TimerStartEvent) RefElement() any

func (*TimerStartEvent) RunLifecycle

func (e *TimerStartEvent) RunLifecycle(ctx context.Context, bpmnEngine *BPMNEngine, token *Token) error

type Token

type Token struct {
	Id                          string
	PrevElement, CurrentElement ElementStateProvider
	BpmnEngine                  *BPMNEngine
	ProcessInstance             *ProcessInstance
	// contains filtered or unexported fields
}

func NewToken

func NewToken(startElement ElementStateProvider, processInstance *ProcessInstance, engine *BPMNEngine) *Token

func (*Token) AcceptingCommands

func (t *Token) AcceptingCommands() bool

func (*Token) CurrentState

func (t *Token) CurrentState() ElementStateProvider

func (*Token) IsComplete

func (t *Token) IsComplete() bool

func (*Token) PreviousState

func (t *Token) PreviousState() ElementStateProvider

func (*Token) Run

func (t *Token) Run(ctx context.Context, wg *sync.WaitGroup)

func (*Token) SendCommand

func (t *Token) SendCommand(ctx context.Context, cmd any, timeout time.Duration) any

func (*Token) WriteState

func (t *Token) WriteState()

type TokenReader

type TokenReader interface {
	ReadTokens() ([]TokenState, error)
}

TokenReader reads the state of a token from a store

type TokenState

type TokenState struct {
	Id                string
	ProcessInstanceId string
	CurrentElementId  string
	Complete          bool
}

type TokenStore

type TokenStore interface {
	TokenReader
	TokenWriter
}

TokenStore can read and write tokens

type TokenWriter

type TokenWriter interface {
	WriteToken(state TokenState) error
}

TokenWriter writes the state of a token to a store

type UserTask

type UserTask[T any] struct {
	ElementState

	CompletedTime time.Time
	Data          T
	// contains filtered or unexported fields
}

func NewUserTask

func NewUserTask[T any](pid, tid, key, id, status string, store bool) *UserTask[T]

func (*UserTask[T]) Activated

func (t *UserTask[T]) Activated(ctx context.Context, bpmnEngine *BPMNEngine, token *Token) error

func (*UserTask[T]) Clone

func (t *UserTask[T]) Clone() *UserTask[T]

func (*UserTask[T]) Completed

func (t *UserTask[T]) Completed(ctx context.Context, bpmnEngine *BPMNEngine, token *Token)

func (*UserTask[T]) GetElementState

func (t *UserTask[T]) GetElementState() ElementState

func (*UserTask[T]) RefElement

func (t *UserTask[T]) RefElement() any

func (*UserTask[T]) RunLifecycle

func (t *UserTask[T]) RunLifecycle(ctx context.Context, bpmnEngine *BPMNEngine, token *Token) error

func (*UserTask[T]) RunTask

func (t *UserTask[T]) RunTask(ctx context.Context, bpmnEngine *BPMNEngine, token *Token) error

type WaitingParallelGateway

type WaitingParallelGateway struct {
	// contains filtered or unexported fields
}

Directories

Path Synopsis
examples
internal
proxy
cmd

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL