workflow

package
v0.0.0-...-da789b0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 11, 2024 License: Apache-2.0 Imports: 22 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

Functions

This section is empty.

Types

type Config

type Config struct {
	ManagerType ManagerType `yaml:"manager"`
}

func NewConfig

func NewConfig(provider config.Provider) (Config, error)

func NewDefaultConfig

func NewDefaultConfig() Config

type Connector

type Connector struct {
	// contains filtered or unexported fields
}

func NewConnector

func NewConnector() *Connector

func (*Connector) Add

func (c *Connector) Add(nodeID string, o *graph.IO)

func (*Connector) Connect

func (c *Connector) Connect(ctx context.Context) rxgo.Observable

func (*Connector) Dispose

func (c *Connector) Dispose()

func (*Connector) Get

func (c *Connector) Get(nodeID string) (*graph.IO, bool)

type EdgeBuilder

type EdgeBuilder[U graph.ProtoEdge] func(message U) graph.Edge

EdgeBuilder is a function that builds an edge from a proto message.

type Logger

type Logger interface {
	Debug(msg string, keyvals ...interface{})
	Info(msg string, keyvals ...interface{})
	Warn(msg string, keyvals ...interface{})
	Error(msg string, keyvals ...interface{})
}

type Manager

type Manager struct {
	// contains filtered or unexported fields
}

func NewManager

func NewManager(wm *WorkflowManager) *Manager

func (*Manager) Id

func (m *Manager) Id() string

func (*Manager) Start

func (m *Manager) Start(ctx context.Context) (context.Context, *Connector)

func (*Manager) Stop

func (m *Manager) Stop() error

type ManagerBuilder

type ManagerBuilder struct {
	// contains filtered or unexported fields
}

func NewManagerBuilder

func NewManagerBuilder(wm *WorkflowManager) *ManagerBuilder

func (*ManagerBuilder) Build

func (m *ManagerBuilder) Build() (*Manager, error)

func (*ManagerBuilder) WithReq

type ManagerType

type ManagerType string
const (
	MemoryManagerType   ManagerType = "memory"
	TemporalManagerType ManagerType = "temporal"
)

type MemoryLogger

type MemoryLogger struct{}

func (MemoryLogger) Debug

func (m MemoryLogger) Debug(msg string, keyvals ...interface{})

func (MemoryLogger) Error

func (m MemoryLogger) Error(msg string, keyvals ...interface{})

func (MemoryLogger) Info

func (m MemoryLogger) Info(msg string, keyvals ...interface{})

func (MemoryLogger) Warn

func (m MemoryLogger) Warn(msg string, keyvals ...interface{})

type NodeBuilder

type NodeBuilder[T graph.ProtoNode] func(message T) graph.Node

NodeBuilder is a function that builds a node from a proto message.

type RunningWorkflow

type RunningWorkflow struct {
	Trace  *gen.WorkflowTrace
	Ctx    context.Context
	Cancel context.CancelFunc
	// contains filtered or unexported fields
}

type Workflow

type Workflow struct {
	ID        string
	ProjectID string
	Graph     graphlib.Graph[string, string]
	// TODO breadchris this should be a deterministic value based on the workflow node slice
	NodeLookup map[string]graph.Node
	EdgeLookup map[string]graph.Edge
}

Workflow is a directed graph of nodes that represent a workflow. The builder interface is immutable allowing for extensions.

func (*Workflow) EnumerateProviders

func (w *Workflow) EnumerateProviders() []*gen.EnumeratedProvider

func (*Workflow) GetNode

func (w *Workflow) GetNode(id string) (graph.Node, error)

func (*Workflow) GetNodeInfo

func (w *Workflow) GetNodeInfo(n graph.Node) (*graph.Info, error)

TODO breadchris separate "infer" and "collect" type information TODO breadchris node.Type should be passed workflow since a node's info can depend on other nodes

func (*Workflow) GetNodeProvider

func (w *Workflow) GetNodeProvider(id string) (graph.Node, error)

func (*Workflow) WireNodes

func (w *Workflow) WireNodes(
	ctx context.Context,
	nodeID string,
	input rxgo.Observable,
	manager *Manager,
) (*graph.IO, error)

WireNodes wires the nodes in the workflow together and returns an observable that can be subscribed to. Nodes are executed when an event is received on the input observable.

type WorkflowBuilder

type WorkflowBuilder[T graph.ProtoNode, U graph.ProtoEdge] struct {
	// contains filtered or unexported fields
}

WorkflowBuilder is a builder for a workflow. It is immutable allowing for extensions.

func Default

func Default() *WorkflowBuilder[*gen.Node, *gen.Edge]

func NewBuilder

func NewBuilder[T graph.ProtoNode, U graph.ProtoEdge]() *WorkflowBuilder[T, U]

func (*WorkflowBuilder[T, U]) Build

func (w *WorkflowBuilder[T, U]) Build() (*Workflow, error)

func (*WorkflowBuilder[T, U]) WithBuiltEdges

func (w *WorkflowBuilder[T, U]) WithBuiltEdges(edges ...graph.Edge) *WorkflowBuilder[T, U]

func (*WorkflowBuilder[T, U]) WithBuiltNodes

func (w *WorkflowBuilder[T, U]) WithBuiltNodes(nodes ...graph.Node) *WorkflowBuilder[T, U]

func (*WorkflowBuilder[T, U]) WithEdgeTypes

func (w *WorkflowBuilder[T, U]) WithEdgeTypes(n U, builder EdgeBuilder[U]) *WorkflowBuilder[T, U]

func (*WorkflowBuilder[T, U]) WithNodeTypes

func (w *WorkflowBuilder[T, U]) WithNodeTypes(n T, builder NodeBuilder[T]) *WorkflowBuilder[T, U]

func (*WorkflowBuilder[T, U]) WithNodes

func (w *WorkflowBuilder[T, U]) WithNodes(nodes ...T) *WorkflowBuilder[T, U]

func (*WorkflowBuilder[T, U]) WithProtoProject

func (w *WorkflowBuilder[T, U]) WithProtoProject(project graph.ProtoProject[T, U]) *WorkflowBuilder[T, U]

type WorkflowManager

type WorkflowManager struct {
	// contains filtered or unexported fields
}

func NewWorkflowManager

func NewWorkflowManager() *WorkflowManager

func (*WorkflowManager) Start

func (*WorkflowManager) Stop

func (m *WorkflowManager) Stop(id string) error

func (*WorkflowManager) Traces

func (m *WorkflowManager) Traces() []*gen.WorkflowTrace

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL