Documentation ¶
Index ¶
- Variables
- func LoadPrivateState[SK ~string, SV any](state ogcore.State, key string) SV
- func LoadState[SV any](state ogcore.State, key string) SV
- func SavePrivateState[SK ~string](state ogcore.State, key string, val any, overwrite bool)
- func SaveState(state ogcore.State, key string, val any, overwrite bool)
- func UpdatePrivateState[SK ~string, SV any](state ogcore.State, key string, updateFunc func(oldVal SV) (val SV)) error
- func UpdateState[SV any](state ogcore.State, key string, updateFunc func(oldVal SV) (val SV)) error
- type BaseCluster
- type BaseNode
- type BaseState
- type BaseWrapper
- type Builder
- type Element
- func (e *Element) AsVirtual() *Element
- func (e *Element) GetRequiredFactories() map[string]bool
- func (e *Element) Implement(virtualElem *Element, isDefault bool) *Element
- func (e *Element) Params(key string, val any) *Element
- func (e *Element) SetVirtual(isVirtual bool) *Element
- func (e *Element) UseFactory(name string, subElements ...*Element) *Element
- func (e *Element) UseFn(fn func() error) *Element
- func (e *Element) UseNode(node ogcore.Node) *Element
- func (e *Element) UsePrivateFactory(factory func() ogcore.Node, subElements ...*Element) *Element
- func (e *Element) Wrap(wrappers ...string) *Element
- type FuncNode
- type Op
- type PGraph
- type Pipeline
- func (pipeline *Pipeline) Check() error
- func (pipeline *Pipeline) DumpDOT() ([]byte, error)
- func (pipeline *Pipeline) DumpGraph() ([]byte, error)
- func (pipeline *Pipeline) ForEachElem(op func(e *Element)) *Pipeline
- func (pipeline *Pipeline) LoadGraph(data []byte) error
- func (pipeline *Pipeline) Register(e *Element, ops ...Op) *Pipeline
- func (pipeline *Pipeline) RegisterInterrupt(handler ogcore.InterruptHandler, on ...string) *Pipeline
- func (pipeline *Pipeline) ResetPool()
- func (pipeline *Pipeline) Run(ctx context.Context, state ogcore.State) error
- func (pipeline *Pipeline) SetPoolCache(size int, warmup bool) error
Constants ¶
This section is empty.
Variables ¶
View Source
var Branch = func(elements ...*Element) Op { return func(pipeline *Pipeline, element *Element) { if len(elements) == 0 { return } var prev, next *Element prev = element for i := range elements { next = elements[i] if pipeline.elements[next.Name] == nil { pipeline.Register(next) } if pipeline.elements[next.Name] == next { pipeline.graph.AddEdge(prev.Name, next.Name) } prev = next } } }
Register(a, ograph.Branch(b, c, d)) => a->b->c->d
View Source
var DependOn = func(dependencies ...*Element) Op { return func(pipeline *Pipeline, element *Element) { for _, dep := range dependencies { if pipeline.elements[dep.Name] == nil { pipeline.Register(dep) } if pipeline.elements[dep.Name] == dep { pipeline.graph.AddEdge(dep.Name, element.Name) } } } }
View Source
var ErrFactoryNotFound error = errors.New("factory not found")
View Source
var Then = func(nextElements ...*Element) Op { return func(pipeline *Pipeline, element *Element) { for _, next := range nextElements { if pipeline.elements[next.Name] == nil { pipeline.Register(next) } if pipeline.elements[next.Name] == next { pipeline.graph.AddEdge(element.Name, next.Name) } } } }
Functions ¶
func LoadPrivateState ¶
func SavePrivateState ¶
func UpdatePrivateState ¶
Types ¶
type BaseCluster ¶
func (*BaseCluster) Join ¶
func (cluster *BaseCluster) Join(nodes []ogcore.Node)
type BaseWrapper ¶
func (*BaseWrapper) Wrap ¶
func (wrapper *BaseWrapper) Wrap(node ogcore.Node)
type Builder ¶
func (*Builder) RegisterFactory ¶
type Element ¶
type Element struct { Virtual bool Name string FactoryName string Wrappers []string ParamsMap map[string]any DefaultImpl string Singleton ogcore.Node `json:"-"` PrivateFactory func() ogcore.Node `json:"-"` SubElements []*Element ImplElements []*Element }
func NewElement ¶
func (*Element) GetRequiredFactories ¶
func (*Element) SetVirtual ¶
func (*Element) UseFactory ¶
func (*Element) UsePrivateFactory ¶
type FuncNode ¶
func NewFuncNode ¶
type Pipeline ¶
type Pipeline struct { BaseNode Builder Interrupters []ogcore.Interrupter ParallelismLimit int DisablePool bool // contains filtered or unexported fields }
func NewPipeline ¶
func NewPipeline() *Pipeline
func (*Pipeline) ForEachElem ¶
func (*Pipeline) RegisterInterrupt ¶
func (pipeline *Pipeline) RegisterInterrupt(handler ogcore.InterruptHandler, on ...string) *Pipeline
Source Files ¶
Click to show internal directories.
Click to hide internal directories.