hep: go-hep.org/x/hep/fwk Index | Files | Directories

package fwk

import "go-hep.org/x/hep/fwk"

Package fwk provides a set of tools to process High Energy Physics events data. fwk is a components-based framework, a-la Gaudi, with builtin support for concurrency.

A fwk application consists of a set of components (fwk.Task) which are:

- (optionally) configured
- started
- given the chance to process each event
- stopped

Helper components (fwk.Svc) can provide additional features (such as a whiteboard/event-store service, a data-flow service, ...) but do not typically take (directly) part of the event processing.

Typically, users will implement fwk.Tasks, ie:

type MyTask struct {
  fwk.TaskBase
}

// Configure is called once, after having read the properties
// from the data-cards.
func (tsk *MyTask) Configure(ctx fwk.Context) error { return nil }

// StartTask is called once (sequentially), just before
// the main event-loop processing.
func (tsk *MyTask) StartTask(ctx fwk.Context) error { return nil }

// Process is called for each event, (quite) possibly concurrently.
func (tsk *MyTask) Process(ctx fwk.Context)   error { return nil }

// StopTask is called once (sequentially), just after the
// main event-loop processing finished.
func (tsk *MyTask) StopTask(ctx fwk.Context)  error { return nil }

A fwk application processes data and leverages concurrency at two different levels:

- event-level concurrency: multiple events are processed concurrently
  at any given time, during the event loop;
- task-level concurrency: during the event loop, multiple tasks are
  executing concurrently.

To ensure the proper self-consistency of the global processed event, components need to express their data dependencies (input(s)) as well as the data they produce (output(s)) for downstream components. This is achieved by the concept of a fwk.Port. A fwk.Port consists of a pair { Name string; Type reflect.Type } where 'Name' is the unique location in the event-store, and 'Type' the expected 'go' type of the data at that event-store location.

fwk.Ports can be either INPUT ports or OUTPUT ports. Components declare INPUT ports and OUTPUT ports during the 'Configure' stage of a fwk application, like so:

t := reflect.TypeOf([]Electron{})
err = component.DeclInPort("Electrons", t)
err = component.DeclOutPort("ReScaledElectrons", t)

Then, during the event processing, one gets and puts data from/to the store like so:

func (tsk *MyTask) Process(ctx fwk.Context) error {
   var err error

   // retrieve the store associated with this event / region-of-interest
   store := ctx.Store()

   v, err := store.Get("Electrons")
   if err != nil {
      return err
   }
   eles := v.([]Electron) // type-cast to the correct (underlying) type

   // create output collection
   out := make([]Electron, 0, len(eles))

   // make sure the collection will be put in the store
   defer func() {
      err = store.Put("ReScaledElectrons", out)
   }()

   // ... do some massaging with 'eles' and 'out'

   return err
}

Index

Package Files

app.go core.go ctx.go dflow.go error.go fwk.go hsvc.go inputstream.go io.go msgstream.go outputstream.go registry.go runner.go store.go svcbase.go taskbase.go worker.go

func Error Uses

func Error(err error) error

Error returns the original error with the associated stack trace.

func Errorf Uses

func Errorf(format string, args ...interface{}) error

Errorf formats according to a format specifier and returns the string as a value that satisfies error, together with the associated stack trace.

func Register Uses

func Register(t reflect.Type, fct FactoryFunc)

Register registers a type t with the FactoryFunc fct.

fwk.ComponentMgr will then be able to create new values of that type t using the associated FactoryFunc fct. If a type t was already registered, the previous FactoryFunc value will be silently overridden with the new FactoryFunc value.

func Registry Uses

func Registry() []string

Registry returns the list of all registered and known components.

type App Uses

type App interface {
    Component
    ComponentMgr
    SvcMgr
    TaskMgr
    PropMgr
    PortMgr

    FSMStater

    Runner
    Scripter() Scripter

    Msg() MsgStream
}

App is the component orchestrating all the other components in a coherent application to process physics events.

func NewApp Uses

func NewApp() App

NewApp creates a (default) fwk application with (default and) sensible options.

type Component Uses

type Component interface {
    Type() string // Type of the component (ex: "go-hep.org/x/hep/fads.MomentumSmearing")
    Name() string // Name of the component (ex: "MyPropagator")
}

Component is the interface satisfied by all values in fwk.

A component can be asked for: its Type() (ex: "go-hep.org/x/hep/fads.MomentumSmearing") its Name() (ex: "MyPropagator")

type ComponentMgr Uses

type ComponentMgr interface {
    Component(n string) Component
    HasComponent(n string) bool
    Components() []Component
    New(t, n string) (Component, error)
}

ComponentMgr manages components. ComponentMgr creates and provides access to all the components in a fwk App.

type Configurer Uses

type Configurer interface {
    Component
    Configure(ctx Context) error
}

Configurer are components which can be configured via properties declared or created by the job-options.

type Context Uses

type Context interface {
    ID() int64      // id of this context (e.g. entry number or some kind of event number)
    Slot() int      // slot number in the pool of event sequences
    Store() Store   // data store corresponding to the id+slot
    Msg() MsgStream // messaging for this context (id+slot)

    Svc(n string) (Svc, error) // retrieve an already existing Svc by name
}

Context is the interface to access context-local data.

type DeclPorter Uses

type DeclPorter interface {
    DeclInPort(name string, t reflect.Type) error
    DeclOutPort(name string, t reflect.Type) error
}

DeclPorter is the interface to declare input/output ports for the data flow.

type Deleter Uses

type Deleter interface {
    Delete() error
}

Deleter prepares values to be GC-reclaimed

type FSMStater Uses

type FSMStater interface {
    FSMState() fsm.State
}

FSMStater is the interface used to query the current state of the fwk application

type FactoryFunc Uses

type FactoryFunc func(t, n string, mgr App) (Component, error)

FactoryFunc creates a Component of type t and name n, managed by the fwk.App mgr.

type H1D Uses

type H1D struct {
    ID   HID // unique id
    Hist *hbook.H1D
}

H1D wraps a hbook.H1D for safe concurrent access

func (H1D) Name Uses

func (h H1D) Name() string

func (H1D) Value Uses

func (h H1D) Value() interface{}

type H2D Uses

type H2D struct {
    ID   HID // unique id
    Hist *hbook.H2D
}

H2D wraps a hbook.H2D for safe concurrent access

func (H2D) Name Uses

func (h H2D) Name() string

func (H2D) Value Uses

func (h H2D) Value() interface{}

type HID Uses

type HID string

HID is a histogram, scatter or profile identifier

type Hist Uses

type Hist interface {
    Name() string
    Value() interface{}
}

Hist is a histogram, scatter or profile object that can be saved or loaded by the HistSvc.

type HistSvc Uses

type HistSvc interface {
    Svc

    // BookH1D books a 1D histogram.
    // name should be of the form: "/fwk/streams/<stream-name>/<path>/<histogram-name>"
    BookH1D(name string, nbins int, xmin, xmax float64) (H1D, error)

    // BookH2D books a 2D histogram.
    // name should be of the form: "/fwk/streams/<stream-name>/<path>/<histogram-name>"
    BookH2D(name string, nx int, xmin, xmax float64, ny int, ymin, ymax float64) (H2D, error)

    // BookP1D books a 1D profile.
    // name should be of the form: "/fwk/streams/<stream-name>/<path>/<profile-name>"
    BookP1D(name string, nbins int, xmin, xmax float64) (P1D, error)

    // BookS2D books a 2D scatter.
    // name should be of the form: "/fwk/streams/<stream-name>/<path>/<scatter-name>"
    BookS2D(name string) (S2D, error)

    // FillH1D fills the 1D-histogram id with data x and weight w.
    FillH1D(id HID, x, w float64)

    // FillH2D fills the 2D-histogram id with data (x,y) and weight w.
    FillH2D(id HID, x, y, w float64)

    // FillP1D fills the 1D-profile id with data (x,y) and weight w.
    FillP1D(id HID, x, y, w float64)

    // FillS2D fills the 2D-scatter id with data (x,y).
    FillS2D(id HID, x, y float64)
}

HistSvc is the interface providing access to histograms

type InputStream Uses

type InputStream struct {
    TaskBase
    // contains filtered or unexported fields
}

InputStream implements a task reading data from an InputStreamer.

InputStream is concurrent-safe.

InputStream declares a property 'Ports', a []fwk.Port, which will be used to declare the output ports the streamer will publish, loading in data from the underlying InputStreamer.

InputStream declares a property 'Streamer', a fwk.InputStreamer, which will be used to actually read data from.

func (*InputStream) Configure Uses

func (tsk *InputStream) Configure(ctx Context) error

Configure declares the output ports defined by the 'Ports' property.

func (*InputStream) Process Uses

func (tsk *InputStream) Process(ctx Context) error

Process loads data from the underlying InputStreamer and puts it in the event store.

func (*InputStream) StartTask Uses

func (tsk *InputStream) StartTask(ctx Context) error

StartTask starts the InputStreamer task

func (*InputStream) StopTask Uses

func (tsk *InputStream) StopTask(ctx Context) error

StopTask stops the InputStreamer task

type InputStreamer Uses

type InputStreamer interface {

    // Connect connects the InputStreamer to the underlying io.Reader,
    // and configure it to only read-in the data specified in ports.
    Connect(ports []Port) error

    // Read reads the data from the underlying io.Reader
    // and puts it in the store associated with the fwk.Context ctx
    Read(ctx Context) error

    // Disconnect disconnects the InputStreamer from the underlying io.Reader,
    // possibly computing some statistics data.
    // It does not (and can not) close the underlying io.Reader.
    Disconnect() error
}

InputStreamer reads data from the underlying io.Reader and puts it into fwk's Context

type Level Uses

type Level int

Level regulates the verbosity level of a component.

const (
    LvlDebug   Level = -10 // LvlDebug defines the DBG verbosity level
    LvlInfo    Level = 0   // LvlInfo defines the INFO verbosity level
    LvlWarning Level = 10  // LvlWarning defines the WARN verbosity level
    LvlError   Level = 20  // LvlError defines the ERR verbosity level
)

Default verbosity levels.

func (Level) String Uses

func (lvl Level) String() string

String prints the human-readable representation of a Level value.

type MsgStream Uses

type MsgStream interface {
    Debugf(format string, a ...interface{}) (int, error)
    Infof(format string, a ...interface{}) (int, error)
    Warnf(format string, a ...interface{}) (int, error)
    Errorf(format string, a ...interface{}) (int, error)

    Msg(lvl Level, format string, a ...interface{}) (int, error)
}

MsgStream provides access to verbosity-defined formated messages, a la fmt.Printf.

func NewMsgStream Uses

func NewMsgStream(name string, lvl Level, w WriteSyncer) MsgStream

NewMsgStream creates a new MsgStream value with name name and minimum verbosity level lvl. This MsgStream will print messages into w.

type OutputStream Uses

type OutputStream struct {
    TaskBase
    // contains filtered or unexported fields
}

OutputStream implements a task writing data to an OutputStreamer.

OutputStream is concurrent-safe.

OutputStream declares a property 'Ports', a []fwk.Port, which will be used to declare the input ports the task will access to, writing out data via the underlying OutputStreamer.

OutputStream declares a property 'Streamer', a fwk.OutputStreamer, which will be used to actually write data to.

func (*OutputStream) Configure Uses

func (tsk *OutputStream) Configure(ctx Context) error

Configure declares the input ports defined by the 'Ports' property.

func (*OutputStream) Process Uses

func (tsk *OutputStream) Process(ctx Context) error

Process gets data from the store and writes it out via the underlying OutputStreamer

func (*OutputStream) StartTask Uses

func (tsk *OutputStream) StartTask(ctx Context) error

StartTask starts the OutputStreamer task

func (*OutputStream) StopTask Uses

func (tsk *OutputStream) StopTask(ctx Context) error

StopTask stops the OutputStreamer task

type OutputStreamer Uses

type OutputStreamer interface {

    // Connect connects the OutputStreamer to the underlying io.Writer,
    // and configure it to only write-out the data specified in ports.
    Connect(ports []Port) error

    // Write gets the data from the store associated with the fwk.Context ctx
    // and writes it to the underlying io.Writer
    Write(ctx Context) error

    // Disconnect disconnects the OutputStreamer from the underlying io.Writer,
    // possibly computing some statistics data.
    // It does not (and can not) close the underlying io.Writer.
    Disconnect() error
}

OutputStreamer gets data from the Context and writes it to the underlying io.Writer

type P1D Uses

type P1D struct {
    ID      HID // unique id
    Profile *hbook.P1D
}

P1D wraps a hbook.P1D for safe concurrent access

func (P1D) Name Uses

func (p P1D) Name() string

func (P1D) Value Uses

func (p P1D) Value() interface{}

type Port Uses

type Port struct {
    Name string
    Type reflect.Type
}

Port holds the name and type of a data item in a store

type PortMgr Uses

type PortMgr interface {
    DeclInPort(c Component, name string, t reflect.Type) error
    DeclOutPort(c Component, name string, t reflect.Type) error
}

PortMgr is the interface to manage input/output ports for the data flow

type PropMgr Uses

type PropMgr interface {
    DeclProp(c Component, name string, ptr interface{}) error
    SetProp(c Component, name string, value interface{}) error
    GetProp(c Component, name string) (interface{}, error)
    HasProp(c Component, name string) bool
}

PropMgr manages properties attached to components.

type Property Uses

type Property interface {
    DeclProp(name string, ptr interface{}) error
    SetProp(name string, value interface{}) error
    GetProp(name string) (interface{}, error)
}

Property is a pair key/value, associated to a component. Properties of a given component can be modified by a job-option or by other components.

type Runner Uses

type Runner interface {
    Run() error
}

Runner runs a fwk App in a batch fashion:

- Configure
- Start
- Run event loop
- Stop
- Shutdown

type S2D Uses

type S2D struct {
    ID      HID // unique id
    Scatter *hbook.S2D
}

S2D wraps a hbook.S2D for safe concurrent access

func (S2D) Name Uses

func (s S2D) Name() string

func (S2D) Value Uses

func (s S2D) Value() interface{}

type Scripter Uses

type Scripter interface {
    Configure() error
    Start() error
    Run(evtmax int64) error
    Stop() error
    Shutdown() error
}

Scripter gives finer control to running a fwk App

type Store Uses

type Store interface {
    Get(key string) (interface{}, error)
    Put(key string, value interface{}) error
    Has(key string) bool
}

Store provides access to a concurrent-safe map[string]interface{} store.

type StreamControl Uses

type StreamControl struct {
    Ports []Port        // list of ports streamers will read-from or write-to
    Ctx   chan Context  // contexts to read-from or write-to
    Err   chan error    // errors encountered during reading-from or writing-to
    Quit  chan struct{} // closed to signify in/out-streamers should stop reading-from/writing-to
}

StreamControl provides concurrency-safe control to input and output streamers.

type Svc Uses

type Svc interface {
    Component

    StartSvc(ctx Context) error
    StopSvc(ctx Context) error
}

Svc is a component providing services or helper features. Services are started before the main event loop processing and stopped just after.

type SvcBase Uses

type SvcBase struct {
    // contains filtered or unexported fields
}

SvcBase provides a base implementation for fwk.Svc

func NewSvc Uses

func NewSvc(typ, name string, mgr App) SvcBase

NewSvc creates a new SvcBase of type typ and name name, managed by the fwk.App mgr.

func (*SvcBase) DeclProp Uses

func (svc *SvcBase) DeclProp(n string, ptr interface{}) error

DeclProp declares this service has a property named n, and takes a pointer to the associated value.

func (*SvcBase) FSMState Uses

func (svc *SvcBase) FSMState() fsm.State

FSMState returns the current state of the FSM

func (*SvcBase) GetProp Uses

func (svc *SvcBase) GetProp(name string) (interface{}, error)

GetProp returns the value of the property named n.

func (*SvcBase) Name Uses

func (svc *SvcBase) Name() string

Name returns the name of the underlying service. e.g. "my-service"

func (*SvcBase) SetProp Uses

func (svc *SvcBase) SetProp(name string, value interface{}) error

SetProp sets the property name n with the value v.

func (*SvcBase) Type Uses

func (svc *SvcBase) Type() string

Type returns the fully qualified type of the underlying service. e.g. "go-hep.org/x/hep/fwk/testdata.svc1"

type SvcMgr Uses

type SvcMgr interface {
    AddSvc(svc Svc) error
    DelSvc(svc Svc) error
    HasSvc(n string) bool
    GetSvc(n string) Svc
    Svcs() []Svc
}

SvcMgr manages services.

type Task Uses

type Task interface {
    Component

    StartTask(ctx Context) error
    Process(ctx Context) error
    StopTask(ctx Context) error
}

Task is a component processing event-level data. Task.Process is called for every component and for every input event.

type TaskBase Uses

type TaskBase struct {
    // contains filtered or unexported fields
}

TaskBase provides a base implementation for fwk.Task

func NewTask Uses

func NewTask(typ, name string, mgr App) TaskBase

NewTask creates a new TaskBase of type typ and name name, managed by the fwk.App mgr.

func (*TaskBase) DeclInPort Uses

func (tsk *TaskBase) DeclInPort(n string, t reflect.Type) error

DeclInPort declares this task has an input Port with name n and type t.

func (*TaskBase) DeclOutPort Uses

func (tsk *TaskBase) DeclOutPort(n string, t reflect.Type) error

DeclOutPort declares this task has an output Port with name n and type t.

func (*TaskBase) DeclProp Uses

func (tsk *TaskBase) DeclProp(n string, ptr interface{}) error

DeclProp declares this task has a property named n, and takes a pointer to the associated value.

func (*TaskBase) FSMState Uses

func (tsk *TaskBase) FSMState() fsm.State

FSMState returns the current state of the FSM

func (*TaskBase) GetProp Uses

func (tsk *TaskBase) GetProp(n string) (interface{}, error)

GetProp returns the value of the property named n.

func (*TaskBase) Name Uses

func (tsk *TaskBase) Name() string

Name returns the name of the underlying task. e.g. "my-task"

func (*TaskBase) SetProp Uses

func (tsk *TaskBase) SetProp(n string, v interface{}) error

SetProp sets the property name n with the value v.

func (*TaskBase) Type Uses

func (tsk *TaskBase) Type() string

Type returns the fully qualified type of the underlying task. e.g. "go-hep.org/x/hep/fwk/testdata.task1"

type TaskMgr Uses

type TaskMgr interface {
    AddTask(tsk Task) error
    DelTask(tsk Task) error
    HasTask(n string) bool
    GetTask(n string) Task
    Tasks() []Task
}

TaskMgr manages tasks.

type WriteSyncer Uses

type WriteSyncer interface {
    io.Writer
    Sync() error
}

WriteSyncer is an io.Writer which can be sync'ed/flushed.

Directories

PathSynopsis
fsm
hbooksvc
job
rio
utils/builderpackage builder builds a fwk-app binary from a list of go files.
utils/errstack
utils/parallelThe parallel package provides a way of running functions concurrently while limiting the maximum number running at once.
utils/tarjanpackage tarjan implements a graph loop detection algorithm called Tarjan's algorithm.

Package fwk imports 16 packages (graph) and is imported by 7 packages. Updated 2018-07-28. Refresh now. Tools for package owners.