goflow

package module
v0.0.0-...-565bf3c Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 18, 2020 License: MIT Imports: 9 Imported by: 0

README

GoFlow - Dataflow and Flow-based programming library for Go (golang)

Build Status codecov

Status of this branch (WIP)

Warning: you are currently on v1 branch of GoFlow. v1 is a revisit and refactoring of the original GoFlow code which remained almost unchanged for 7 years. This branch is deep in progress, no stability guaranteed. API also may change.

If your code depends on the old implementation, you can build it using release 0.1.

--

GoFlow is a lean and opinionated implementation of Flow-based programming in Go that aims at designing applications as graphs of components which react to data that flows through the graph.

The main properties of the proposed model are:

  • Concurrent - graph nodes run in parallel.
  • Structural - applications are described as components, their ports and connections between them.
  • Reactive/active - system's behavior is how components react to events or how they handle their lifecycle.
  • Asynchronous/synchronous - there is no determined order in which events happen, unless you demand for such order.
  • Isolated - sharing is done by communication, state is not shared.

Getting started

If you don't have the Go compiler installed, read the official Go install guide.

Use go tool to install the package in your packages tree:

go get github.com/trustmaster/goflow

Then you can use it in import section of your Go programs:

import "github.com/trustmaster/goflow"

Basic Example

Below there is a listing of a simple program running a network of two processes.

Greeter example diagram

This first one generates greetings for given names, the second one prints them on screen. It demonstrates how components and graphs are defined and how they are embedded into the main program.

package main

import (
	"fmt"
	"github.com/trustmaster/goflow"
)

// Greeter sends greetings
type Greeter struct {
	Name           <-chan string // input port
	Res            chan<- string // output port
}

// Process incoming data
func (c *Greeter) Process() {
	// Keep reading incoming packets
	for name := range c.Name {
		greeting := fmt.Sprintf("Hello, %s!", name)
		// Send the greeting to the output port
		c.Res <- greeting
	}
}

// Printer prints its input on screen
type Printer struct {
	Line <-chan string // inport
}

// Process prints a line when it gets it
func (c *Printer) Process() {
	for line := range c.Line {
		fmt.Println(line)
	}
}

// NewGreetingApp defines the app graph
func NewGreetingApp() *goflow.Graph {
	n := goflow.NewGraph()
	// Add processes to the network
	n.Add("greeter", new(Greeter))
	n.Add("printer", new(Printer))
	// Connect them with a channel
	n.Connect("greeter", "Res", "printer", "Line")
	// Our net has 1 inport mapped to greeter.Name
	n.MapInPort("In", "greeter", "Name")
	return n
}

func main() {
	// Create the network
	net := NewGreetingApp()
	// We need a channel to talk to it
	in := make(chan string)
	net.SetInPort("In", in)
	// Run the net
	wait := goflow.Run(net)
	// Now we can send some names and see what happens
	in <- "John"
	in <- "Boris"
	in <- "Hanna"
	// Send end of input
	close(in)
	// Wait until the net has completed its job
	<-wait
}

Looks a bit heavy for such a simple task but FBP is aimed at a bit more complex things than just printing on screen. So in more complex an realistic examples the infractructure pays the price.

You probably have one question left even after reading the comments in code: why do we need to wait for the finish signal? This is because flow-based world is asynchronous and while you expect things to happen in the same sequence as they are in main(), during runtime they don't necessarily follow the same order and the application might terminate before the network has done its job. To avoid this confusion we listen for a signal on network's wait channel which is sent when the network finishes its job.

Terminology

Here are some Flow-based programming terms used in GoFlow:

  • Component - the basic element that processes data. Its structure consists of input and output ports and state fields. Its behavior is the set of event handlers. In OOP terms Component is a Class.
  • Connection - a link between 2 ports in the graph. In Go it is a channel of specific type.
  • Graph - components and connections between them, forming a higher level entity. Graphs may represent composite components or entire applications. In OOP terms Graph is a Class.
  • Network - is a Graph instance running in memory. In OOP terms a Network is an object of Graph class.
  • Port - is a property of a Component or Graph through which it communicates with the outer world. There are input ports (Inports) and output ports (Outports). For GoFlow components it is a channel field.
  • Process - is a Component instance running in memory. In OOP terms a Process is an object of Component class.

More terms can be found in flowbased terms and FBP wiki.

Documentation

Contents
  1. Components
    1. Ports, Events and Handlers
    2. Processes and their lifetime
    3. State
    4. Concurrency
    5. Internal state and Thread-safety
  2. Graphs
    1. Structure definition
    2. Behavior
Package docs

Documentation for the flow package can be accessed using standard godoc tool, e.g.

godoc github.com/trustmaster/goflow

Here are related projects and resources:

TODO

  • Integration with NoFlo-UI/Flowhub (in progress)
  • Distributed networks via TCP/IP and UDP
  • Reflection and monitoring of networks

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func LoadComponents

func LoadComponents(paths []string, factory *Factory) ([]string, error)

LoadComponents goes through all paths, opens all plugins in those paths and loads them into factory Plugins are denoted by *_goplug.so, The filename must begin with a capitolized letter

func NewDefaultGraph

func NewDefaultGraph() interface{}

NewDefaultGraph is a ComponentConstructor for the factory

Types

type Annotation

type Annotation struct {
	// Description tells what the component does
	Description string
	// Icon name in Font Awesome is used for visualization
	Icon string
}

Annotation provides reference information about a component to graph designers and operators

type Component

type Component interface {
	Process()
}

Component is a unit that can start a process

type ComponentInfo

type ComponentInfo struct {
	Name        string     `json:"name"`
	Description string     `json:"description"`
	Icon        string     `json:"icon"`
	Subgraph    bool       `json:"subgraph"`
	InPorts     []PortInfo `json:"inPorts"`
	OutPorts    []PortInfo `json:"outPorts"`
}

ComponentInfo represents a component to a protocol client

type Constructor

type Constructor func() (interface{}, error)

Constructor is used to create a component instance at run-time

type Done

type Done struct{}

Done notifies that the process is finished

type Factory

type Factory struct {
	// contains filtered or unexported fields
}

Factory registers components and creates their instances at run-time

func NewFactory

func NewFactory(config ...FactoryConfig) *Factory

NewFactory creates a new component Factory instance

func (*Factory) Annotate

func (f *Factory) Annotate(componentName string, annotation Annotation) error

Annotate adds human-readable documentation for a component to the runtime

func (*Factory) Create

func (f *Factory) Create(componentName string) (interface{}, error)

Create creates a new instance of a component registered under a specific name.

func (*Factory) Register

func (f *Factory) Register(componentName string, constructor Constructor) error

Register registers a component so that it can be instantiated at run-time

func (*Factory) Size

func (f *Factory) Size() int

Size returns number of objects in factory

func (*Factory) Unregister

func (f *Factory) Unregister(componentName string) error

Unregister removes a component with a given name from the component registry and returns true or returns false if no such component is registered.

type FactoryConfig

type FactoryConfig struct {
	RegistryCapacity uint
}

FactoryConfig sets up properties of a Factory

type Graph

type Graph struct {
	// contains filtered or unexported fields
}

Graph represents a graph of processes connected with packet channels.

func LoadJSON

func LoadJSON(filename string, factory *Factory) *Graph

LoadJSON loads a JSON graph definition file into a flow.Graph object that can be run or used in other networks

func NewGraph

func NewGraph(config ...GraphConfig) *Graph

NewGraph returns a new initialized empty graph instance

func ParseJSON

func ParseJSON(js []byte, factory *Factory) *Graph

ParseJSON converts a JSON network definition string into a flow.Graph object that can be run or used in other networks

func (*Graph) Add

func (n *Graph) Add(name string, c interface{}) error

Add adds a new process with a given name to the network.

func (*Graph) AddGraph

func (n *Graph) AddGraph(name string) error

AddGraph adds a new blank graph instance to a network. That instance can be modified then at run-time.

func (*Graph) AddIIP

func (n *Graph) AddIIP(processName, portName string, data interface{}) error

AddIIP adds an Initial Information packet to the network

func (*Graph) AddNew

func (n *Graph) AddNew(processName string, componentName string, f *Factory) error

AddNew creates a new process instance using component factory and adds it to the network.

func (*Graph) Connect

func (n *Graph) Connect(senderName, senderPort, receiverName, receiverPort string) error

Connect connects a sender to a receiver and creates a channel between them using BufferSize configuratio nof the graph. Normally such a connection is unbuffered but you can change by setting flow.DefaultBufferSize > 0 or by using ConnectBuf() function instead. It returns true on success or panics and returns false if error occurs.

func (*Graph) ConnectBuf

func (n *Graph) ConnectBuf(senderName, senderPort, receiverName, receiverPort string, bufferSize int) error

ConnectBuf connects a sender to a receiver using a channel with a buffer of a given size. It returns true on success or panics and returns false if error occurs.

func (*Graph) Get

func (n *Graph) Get(name string) interface{}

Get a component by name

func (*Graph) MapInPort

func (n *Graph) MapInPort(name, procName, procPort string)

MapInPort adds an inport to the net and maps it to a contained proc's port.

func (*Graph) MapOutPort

func (n *Graph) MapOutPort(name, procName, procPort string)

MapOutPort adds an outport to the net and maps it to a contained proc's port.

func (*Graph) Process

func (n *Graph) Process()

Process runs the network

func (*Graph) Remove

func (n *Graph) Remove(processName string) error

Remove deletes a process from the graph. First it stops the process if running. Then it disconnects it from other processes and removes the connections from the graph. Then it drops the process itself.

func (*Graph) RemoveIIP

func (n *Graph) RemoveIIP(processName, portName string) error

RemoveIIP detaches an IIP from specific process and port

func (*Graph) SetInPort

func (n *Graph) SetInPort(name string, channel interface{}) error

SetInPort assigns a channel to a network's inport to talk to the outer world.

func (*Graph) SetOutPort

func (n *Graph) SetOutPort(name string, channel interface{}) error

SetOutPort assigns a channel to a network's outport to talk to the outer world. It returns true on success or false if the outport cannot be set.

type GraphConfig

type GraphConfig struct {
	Capacity   uint
	BufferSize int
}

GraphConfig sets up properties for a graph

type InputGuard

type InputGuard struct {
	// contains filtered or unexported fields
}

InputGuard counts number of closed inputs

func NewInputGuard

func NewInputGuard(ports ...string) *InputGuard

NewInputGuard returns a guard for a given number of inputs

func (*InputGuard) Complete

func (g *InputGuard) Complete(port string) bool

Complete is called when a port is closed and returns true when all the ports have been closed

type Message

type Message struct {
	// Protocol is NoFlo protocol identifier:
	// "runtime", "component", "graph" or "network"
	Protocol string `json:"protocol"`
	// Command is a command to be executed within the protocol
	Command string `json:"command"`
	// Payload is JSON-encoded body of the message
	Payload interface{} `json:"payload"`
}

Message represents a single FBP protocol message

type PlugIn

type PlugIn interface {
	Component
	SetParams(params map[string]interface{}) error
	GetParams() map[string]interface{}
	GetParam(param string) interface{}
}

PlugIn something

type PlugInS

type PlugInS struct {
	// contains filtered or unexported fields
}

PlugInS something

func (*PlugInS) GetParam

func (s *PlugInS) GetParam(param string) interface{}

func (*PlugInS) GetParams

func (s *PlugInS) GetParams() map[string]interface{}

func (*PlugInS) Process

func (s *PlugInS) Process()

func (*PlugInS) SetParams

func (s *PlugInS) SetParams(params map[string]interface{}) error

type PortInfo

type PortInfo struct {
	ID          string        `json:"id"`
	Type        string        `json:"type"`
	Description string        `json:"description"`
	Addressable bool          `json:"addressable"` // ignored
	Required    bool          `json:"required"`
	Values      []interface{} `json:"values"`  // ignored
	Default     interface{}   `json:"default"` // ignored
}

PortInfo represents a port to a runtime client

type Wait

type Wait chan struct{}

Wait is a channel signalling of a completion

func Run

func Run(c Component) Wait

Run the component process

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL