flowgraph

package module
v0.0.0-...-900e8f8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 24, 2021 License: BSD-3-Clause Imports: 4 Imported by: 0

README

flowgraph

Getting Started
go get -u github.com/vectaport/flowgraph
go test
Overview

Flowgraphs are built out of hubs interconnected by streams. The hubs are implemented with goroutines that use select to wait on incoming data or back-pressure handshakes. The data and handshakes travel on streams implemented with channels of empty interfaces for forward flow (interface{}) and channels of empty structs for back-pressure (struct{}).

The user of this package is completely isolated from the details of using goroutines, channels, and select, and only has to provide the empty interface functions that transform incoming data into outgoing data as needed for each hub of the flowgraph under construction. It includes the ability to log each data flow and transformation at the desired level of detail for debugging and monitoring purposes.

The package allows for correct-by-construction dataflow systems that avoid deadlock and gridlock by using back-pressure to manage empty space. It also supports looping constructs that can operate at the same efficiency as pipeline structures using channel buffering within the loop.

All of this is made available with an API designed to directly underlie a future HDL for a flowgraph language.

Documentation

Overview

Package flowgraph for scalable asynchronous development. Build systems out of hubs interconnected by streams of data. https://github.com/vectaport/flowgraph/wiki

Index

Constants

View Source
const EOF = Error("EOF")

End of flow. Transmitted when end-of-file occurs, and promises no more data to follow.

Variables

This section is empty.

Functions

func ParseFlags

func ParseFlags()

ParseFlags parses the command line flags for this package

Types

type Breaker

type Breaker interface {
	Break() bool
	Clear()
}

Breaker returns true to break out of a loop.

type Error

type Error string

func (Error) Error

func (e Error) Error() string

type Flowgraph

type Flowgraph interface {

	// Title returns the title of this flowgraph
	Title() string

	// Hub returns a hub by index
	Hub(n int) Hub

	// Stream returns a stream by index
	Stream(n int) Stream

	// NumHub returns the number of hubs
	NumHub() int

	// NumStream returns the number of streams
	NumStream() int

	// NewHub returns a new unconnected hub
	NewHub(name string, code HubCode, init interface{}) Hub

	// NewStream returns a new unconnected stream
	NewStream(name string) Stream

	// NewGraphHub returns a hub with a flowgraph inside
	NewGraphHub(name string, code HubCode) GraphHub

	// FindHub finds a hub by name
	FindHub(name string) Hub

	// FindStream finds a stream by name
	FindStream(name string) Stream

	// Connect connects two hubs via named (string) or indexed (int) ports
	Connect(
		upstream Hub, upstreamPort interface{},
		dnstream Hub, dnstreamPort interface{}) Stream

	// ConnectInit connects two hubs via named (string) or indexed (int) ports
	// and sets an initial value for flow
	ConnectInit(
		upstream Hub, upstreamPort interface{},
		dnstream Hub, dnstreamPort interface{},
		init interface{}) Stream

	// Run runs the flowgraph
	Run()
}

Flowgraph interface for flowgraphs assembled out of hubs and streams

func New

func New(title string) Flowgraph

New returns a titled flowgraph

type GraphHub

type GraphHub interface {
	Hub
	Flowgraph

	// Loop builds a conditional iterator for a while or during loop
	Loop()

	// Link links an internal stream to an external stream
	Link(in, ex Stream)

	// ExposeSource marks an internal stream to be used as an input source as well
	ExposeSource(s Stream)

	// ExposeResult marks an internal stream to be used as an output result as well
	ExposeResult(s Stream)
}

GraphHub interface for flowgraph hub made out of a graph of hubs. Relevant code args for NewGraphHub are Graph, While, and During.

type Hub

type Hub interface {

	// Name returns the hub name
	Name() string

	// SetName sets the hub name
	SetName(name string)

	// Tracef for debug trace printing.  Uses atomic log mechanism.
	Tracef(format string, v ...interface{})

	// LogError for logging of error messages.  Uses atomic log mechanism.
	LogError(format string, v ...interface{})

	// Panicf for logging of panic messages.  Uses atomic log mechanism.
	Panicf(format string, v ...interface{})

	// Source returns source stream selected by string or int
	Source(port interface{}) Stream

	// Result returns result stream selected by string or int
	Result(port interface{}) Stream

	// SetSource sets a stream on a source port selected by string or int
	SetSource(port interface{}, s Stream) Hub

	// SetResult sets a stream on a result port selected by string or int
	SetResult(port interface{}, s Stream) Hub

	// AddSources adds a source port for each stream
	AddSources(s ...Stream) Hub

	// AddResults adds a result port for each stream
	AddResults(s ...Stream) Hub

	// NumSource returns the number of source ports
	NumSource() int

	// NumResult returns the number of result ports
	NumResult() int

	// SetNumSource sets the number of source ports
	SetNumSource(n int) Hub

	// SetNumResult sets the number of result ports
	SetNumResult(n int) Hub

	// SourceNames returns the names of the source ports
	SourceNames() []string

	// ResultNames returns the names of the result ports
	ResultNames() []string

	// SetSourceNames names the source ports
	SetSourceNames(nm ...string) Hub

	// SetResultNames names the result ports
	SetResultNames(nm ...string) Hub

	// SourceIndex returns the index of a source port selected by string or Stream
	SourceIndex(port interface{}) int

	// ResultIndex returns the index of a result port selected by string or Stream
	ResultIndex(port interface{}) int

	// ConnectSources connects a list of source Streams to this hub
	ConnectSources(source ...Stream) Hub

	// ConnectResults connects a list of result Streams to this hub
	ConnectResults(result ...Stream) Hub

	// HubCode returns code associated with hub.
	HubCode() HubCode

	// Empty returns true if the underlying implementation is nil
	Empty() bool

	// Flowgraph returns associated flowgraph
	Flowgraph() Flowgraph

	// Base returns value of underlying implementation
	Base() interface{}
}

Hub interface for flowgraph hubs that are connected by flowgraph streams

type HubCode

type HubCode int

HubCode is code arg to NewHub()

const (
	Nop HubCode = iota

	Retrieve //	Retriever	0,1	retrieve one value with Retrieve method
	Transmit //	Transmitter	1,0	transmit one value with Transmit method
	AllOf    //	Transformer	n,m	waiting for all sources for Transform method
	OneOf    //	Transformer	n,m	waiting for one source for Transform method

	Wait   // 	nil		n+1,1 	wait for last source to pass rest
	Select // 	nil		1+n,1   select from rest by first source
	Steer  // 	nil		2|1,2	steer last source by first source
	Cross  // 	nil           2*n,2*n   steer left or right rank by first source of each

	Array    //	[]interface{}	0,1	produce array of values then EOF
	Constant //	interface{}	0,1	produce constant values forever
	Pass     //	nil		1,1	pass value
	Split    //	nil		1,n     split slice into values
	Join     //	nil		n,1     join values into slice
	Sink     //	[Sinker]	1,0	consume values forever

	Graph  // 	nil		n,m     hub with general purpose internals
	While  // 	nil		n,n	hub with while loop around internals
	During // 	nil		n,n	hub with while loop with continuous results

	Add      //	[Transformer]	2,1	add numbers, concat strings
	Subtract //	[Transformer]	2,1	subtract numbers
	Multiply //	[Transformer]	2,1	multiply numbers
	Divide   //	[Transformer]	2,1	divide numbers
	Modulo   //	[Transformer]	2,1	modulate numbers
	And      //	[Transformer]	2,1	AND bool or bit-wise AND integers
	Or       //	[Transformer]	2,1	OR bool or bit-wise OR integers
	Not      //	[Transformer]	1,1	negate bool, invert integers
	Shift    //	ShiftCode|Transformer   2,1   shift first by second, Arith,Barrel,Signed
)

HubCode constants for NewHub() code arg. Comment fields are init arg for NewHub, number of source and number of results, and description. If n or m the number of sources and results are set with SetNumSource and SetNumResult (or SetSource and SetResult)

func (HubCode) String

func (c HubCode) String() string

String method for HubCode

type Retriever

type Retriever interface {
	Retrieve(h Hub) (result interface{}, err error)
}

Retriever retrieves one value using the Retrieve method. Provide as init arg to NewHub with Retrieve HubCode. Use Hub.Tracef for tracing.

type ShiftCode

type ShiftCode int

ShiftCode is the subcode for the "Shift" HubCode, provided as the init arg to NewHub.

const (
	Arith ShiftCode = iota
	Barrel
	Signed
)

type Sinker

type Sinker interface {
	Sink(source []interface{})
}

Sinker consumes wavefronts of values one at a time forever. Optionally provide as init arg to NewHub with Sink HubCode.

type Stream

type Stream interface {

	// Name returns the stream name
	Name() string

	// SetName sets the stream name
	SetName(name string)

	// Upstream returns upstream hub by index
	Upstream(i int) Hub

	// Downstream returns downstream hub by index
	Downstream(i int) Hub

	// NumUpstream returns the number of upstream hubs
	NumUpstream() int

	// NumDownstream returns the number of downstream hubs
	NumDownstream() int

	// Init sets an initial value for flow
	Init(v interface{}) Stream

	// Const sets a value for continual flow
	Const(v interface{}) Stream

	// Sink sets a stream to be a sink
	Sink() Stream

	// IsConst returns true if stream is a constant
	IsConst() bool

	// IsSink returns true if stream is a sink
	IsSink() bool

	// Same returns true if two streams are the same underneath
	Same(Stream) bool

	// Empty returns true if the underlying implementation is nil
	Empty() bool

	// Flowgraph returns associated flowgraph
	Flowgraph() Flowgraph

	// Base returns value of underlying implementation
	Base() interface{}
}

Stream interface for flowgraph streams that connect flowgraph hubs

type Transformer

type Transformer interface {
	Transform(h Hub, source []interface{}) (
		result []interface{}, err error)
}

Transformer transforms a slice of source values into a slice of result values with the Transform method. Provide as init arg to NewHub with AllOf or OneOf HubCode or optionally with a logic or math HubCode (access HubCode from a Transform method to customize these transforms). Use Hub.Tracef for tracing.

type Transmitter

type Transmitter interface {
	Transmit(h Hub, source interface{}) (err error)
}

Transmitter transmits one value using a Transmit method. Provide as init arg to NewHub with Transmit HubCode. Use Hub.Tracef for tracing.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL