Stream processing flow in Go
Stream processing in Golang with a modular notification behavior based on filters.
go get github.com/konimarti/flow
Usage
- Define a data stream source:
yourSource := flow.Func{
func() interface{} {
return rand.Float64()
},
500 * time.Millisecond,
}
- Define the stream data processing (i.e. your filters):
yourFilters := filters.AboveFloat64{0.5}
- Create your flow and subscribe to the results:
yourFlow := flow.New(
yourFilters,
yourSource,
)
results := yourFlow.Subscribe()
for {
<-results.C()
fmt.Println(results.Value())
}
Example
- Apply a low-pass filter (exponential smoothing) to a sequence of random numbers between 0 and 1:
flow := flow.New(
&filters.Lowpass{A: 0.1},
&flow.Func{
func(){ return rand.Float64() },
500 * time.Millisecond,
},
)
- Generate a stream of random numbers from a standard normal, calculate moving average, print average and check if average is above or below 0.5 and -0.5, respectively. If so, then notify the subscribers.
// define stream processor (flow) that returns an observer
yourFlow := flow.New(
filters.NewChain(
&filters.MovingAverage{Window: 10},
&filters.Print{Writer: os.Stdout, Prefix: "Moving average:"},
filters.NewSwitch(
&filters.AboveFloat64{0.5},
&filters.BelowFloat64{-0.5},
),
),
&flow.Func{
func() interface{} { return rand.NormFloat64() },
500*time.Millisecond,
},
)
// subscribe to flow and listen to events
results := yourFlow.Subscribe()
for {
<-results.C()
fmt.Println("Notified:", results.Value())
}
Description
Two types of flows are available that are suitable for different use cases:
- Channel-based flows accept new values through a
chan interface{}
channel, and
- Function-based flows collect new values in regular intervals from a
func() interface{}
function.
Channel-based flows are useful in cases where we receive specific events.
Function-based flows can be used to monitor any object or state of resources
(i.e. reading data from OPC, HTTP requests, etc.).
- To get a channel-based flow:
// define channel
ch := make(chan interface{})
// define your filter(s) for data processing
filter := filters.OnChange{}
// create your flow
yourChannelFlow := flow.New(
&filter,
&flow.Chan{ch},
)
// publish new data to channel ch
// ch <- ..
- To get a function-based flow:
// define a function that returns the values
fn := func() interface{} {
return rand.Float64()
}
// define your filter(s)
filter := filters.OnChange{}
// create your flow
yourFunctionFlow := flow.New(
&filter,
&flow.Func{
fn,
1 * time.Second,
},
)
- You can subscribe to a flow in order to receive the results from the filter(s):
results := yourFlow.Subscribe()
for {
// wait for a result
<-results.C()
// get value from the filters
results.Value()
}
Filters
The filters control the behavior of the observer, i.e. they determine when and what values should be sent to the subscribers.
Available filters out-of-the-box
The following filters are currently implemented in this package:
User-defined filters
User-defined filters can easily be created: Define your struct and embed the filters.Model
. You can then customize one or both of the interface functions.
The filters.None
is implemented by creating an empty struct and just embedding filters.Model
, for example.
The following user-defined filter expects a float64 value and multiplies it with a pre-defined factor:
type Multiply struct {
filters.Model
Factor float64
}
func (m *Multiply) Update(v interface{}) interface{} {
return v.(float64) * m.Factor
}
Logical structures
Filters can be chained together using filters.NewChain(Filter1, Filter2, ...)
.
To adjust the notification behavior, the filters.NewSwitch
function can be useful, especially in cases when you want
to monitor a value that needs to remain within a certain range ("deadband").
See this example for more information on logical structures
A stream-processing use case: Anomaly detection
An anomaly detection example for streams with an user-defined filter based on Lytics' Anomalyzer
can be found here.
More examples
Check out the examples here.
Credits
This software package has been developed for and is in production at Kalkfabrik Netstal.
The design of the observer implementation was inspired by go-observer.
Disclaimer
This package is still work-in-progress. Interfaces might still change substantially. It is also not recommended to use it in a production environment at this point.