tuna

package module
v0.0.0-...-f150c5c Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 24, 2019 License: MIT Imports: 10 Imported by: 0

README

logo


tuna is a framework for computing streaming aggregates. In other words tuna is a streaming ETL. Sometimes datasets don't fit in memory and so you have to process them in chunks. One approach is to compute running statistics that provide a good approximation of their batch counterparts. The goal of tuna is to cover common use cases (e.g. a group by followed by a mean) while keeping it simple to build custom features.

Concepts

Like many such libraries, tuna involves a few simple concepts:

  • A Row is a set of (key, value) pairs (represented in Go with a map[string]string)
  • A Stream is a source of data that returns Rows one by one
  • A Metric is an object that computes one or more running statistics; it is fed float64 values and returns a map[string]float64 of features
  • An Agg takes Rows in, extracts float64s, and passes them to one or more Metrics
  • A Sink persists the output of an Agg

Quickstart

package main

import (
    "os"
    "strings"

    "github.com/MaxHalford/tuna"
)

func main() {
    // For the sake of example we inline the data, but usually it should be
    // located in a file, database, or some other source
    in := `name,£,bangers
Del Boy,-42,1
Rodney,1001,1
Rodney,1002,2
Del Boy,42,0
Grandad,0,3`

    // Define a Stream
    stream, _ := NewCSVStream(strings.NewReader(in))

    // Define an Agg
    agg := NewGroupBy(
        "name",
        func() Agg {
            return Aggs{
                NewExtractor("£", NewMean(), NewSum()),
                NewExtractor("bangers", NewSum()),
            }
        },
    )

    // Define a Sink
    sink := NewCSVSink(os.Stdout)

    // Run
    Run(stream, agg, sink, 0) // 0 means we don't display live progress
}

Running this script will produce the following output in your terminal:

bangers_sum,name,£_mean,£_sum
1,Del Boy,0,0
3,Grandad,0,0
3,Rodney,1001.5,2003

Usage

☝ In addition to the following documentation, please check out the godoc page for detailed information.

Streams

Streaming from a CSV file

A common use case you may have is processing rows located in a CSV file. You can use the NewCSVStream method to stream CSV data from an io.Reader instance.

var r io.Reader // Depends on your application
stream := tuna.NewCSVStream(r)

For convenience you can use the NewCSVStreamFromPath method to stream CSV data from a file path, it is simply a wrapper on top of NewCSVStream.

stream := tuna.NewCSVStreamFromPath("path/to/file")
Streaming Rows directly

For some reason you may want to stream a given set of Rows. Although this defeats the basic paradigm of tuna which is to process data that can't be loaded in memory, it is practical for testing purposes.

stream := tuna.NewStream(
    tuna.Row{"x0": "42.42", "x1": "24.24"},
    tuna.Row{"x0": "13.37", "x1": "31.73"},
)
Streaming from multiple sources

The ZipStreams method can be used to stream over multiple sources without having to merge them manually. Indeed large datasets are more often than not split into chunks for practical reasons. The issue is that if you're using a GroupBy and that the group keys are scattered across multiple sources, then processing each file individually won't produce the correct result.

To use ZipStreams you simply have to provide it with one or more Streams. It will then return a new Stream which will iterate over each Row of each provided Stream until they are all depleted. Naturally you can combine different types of Streams.

s1, _ := tuna.NewCSVStreamFromPath("path/to/file.csv")
s2 := tuna.NewStream(
    tuna.Row{"x0": "42.42", "x1": "24.24"},
    tuna.Row{"x0": "13.37", "x1": "31.73"},
)
stream := tuna.ZipStreams(s1, s2)
Using a custom Stream

A Stream is simply a channel that returns ErrRows, i.e.

type Stream chan ErrRow

An ErrRow has the following signature.

type ErrRow struct {
    Row
    Err error
}

A Row is nothing more than a map[string]string. The Err fields indicates if something went wrong during the retrieval of the corresponding Row.

Metrics

Overview

Metrics are the objects that do the actual computation. You're supposed to use them by providing them to an Extractor. Every time the Extractor is fed a Row, it will simply parse the values from the "banana" field and feed them to each Metric.

The current list of available metrics are:

  • Mean
  • Variance
  • Sum
  • Min
  • Max
  • Skew
  • Kurtosis
  • Diff

Although you can instantiate each struct yourself, it is recommended that you instantiate each struct with it's respective New method. For example use the NewMax method if you want to use the Max struct.

☝ A set of Metrics, which is represented in tuna by the Metrics, is also a Metric.

Writing a custom Metric

Every metric has to implement the following interface:

type Metric interface {
    Update(x float64) error
    Collect() map[string]float64
}
  • The Update method updates the running statistic that is being computed. For example the update formula for the Mean metric is mean = mean + (x - mean) / n.
  • The Collect method returns a set of one or more features. For example the Mean metric returns {"mean": some_float64}.

Aggs

Overview

Aggs are the bridge between Rows and Metrics. The simplest type of Agg is the Extractor, which extracts a float64 value from a Row and feeds to a Metric. Another example is the GroupBy struct, which maintains a set of set of Aggs and feeds them values given a Row key. Aggs can be composed to build powerful and expressive pipelines.

Extractor

As already said the Extractor is the simplest kind of Agg. It has the following signature:

type Extractor struct {
    Extract func(row Row) (float64, error)
    Metric  Metric
    Prefix  string
}

Simply put an Extractors parses a Row and extracts a float64 using it's Extract method. It then feeds the float64 to the Metric. After retrieving the results by calling the Metric's Collect method the Extractor will prepend the Prefix to each key so as to add the field name to the results.

The Extract field gives you the flexibility of parsing each Row as you wish. However often you might simply want to cast each value as float64. In this case you can use the NewExtractor method for convenience, as so:

extractor := tuna.NewExtractor("banana", tuna.NewMean(), tuna.NewMedian())
GroupBy

Computing running statistics is nice but in practice you probably want to compute conditional statistics. In other words you want to "group" the incoming values by a given attribute and compute one or more statistics inside each group. This is what the GroupBy struct is intended for.

You can use the NewGroupBy method to instantiate a GroupBy, it takes as arguments a string which tells it by what field to group the data and a func() Agg callable which returns an Agg. Every time a new key appears the callable will be used to instantiate a new Agg for the new group.

gb := tuna.NewGroupBy("name", func() tuna.Agg { return tuna.NewExtractor("£", tuna.NewSum()) })

You can nest GroupBys if you want to group the data by more than one variable. For example the following Agg will count the number of taken bikes along with the number of returned bikes by city as well as by day.

gb := tuna.NewGroupBy(
    "city",
    func() tuna.Agg {
        return tuna.NewGroupBy(
            "day",
            func() tuna.Agg {
                return tuna.Aggs{
                    tuna.NewExtractor("bike_taken", tuna.NewSum()),
                    tuna.NewExtractor("bike_returned", tuna.NewSum()),
                )
            }
        )
    }
)
SequentialGroupBy

Using a GroupBy can incur a large memory usage if you are computing many statistics on a very large dataset. Indeed the spatial complexity is O(n * k), where n is the number of group keys and k is the number of Aggs. This can potentially become quite large, especially if you're using nested GroupBys. While this is completely manageable if you have enough available RAM, it can still hinder the overall computation time.

The trick is that if your data is ordered by the group key then you only have to store the running statistics for one group at a time. This leads to an O(k) spatial complexity which is much more efficient. While having ordered data isn't always the case, you should make the most of it if it is. To do so you can use the SequentialGroupBy struct which can be initialized with the NewSequentialGroupBy method. It takes as argument a Sink in addition to the arguments used for the NewGroupBy method. Every time a new group key is encountered the current statistics are flushed to the Sink and a new Agg is initialized to handle the new group.

stream, _ := tuna.NewCSVStreamFromPath("path/to/csv/ordered/by/name")

sink, _ := tuna.NewCSVSinkFromPath("path/to/sink")

sgb := tuna.NewSequentialGroupBy(
    "name",
    func() tuna.Agg { return tuna.NewExtractor("bangers", NewMean()) }
    sink
)

tuna.Run(stream, sgb, nil, 1e6)

☝ Make sure your data is ordered by the group key before using SequentialGroupBy. There are various ways to sort a file by a given field, one of them being the Unix sort command.

Sinks

CSVSink

You can use a CSVSink struct to write the results of an Agg to a CSV file. It will write one line for each Row returned by the Agg's Collect method. Use the NewCSVSink method to instantiate a CSVSink that writes to a given io.Writer.

var w io.Reader // Depends on your application
sink := tuna.NewCSVSink(r)

For convenience you can use the NewCSVStreamFromPath method to stream CSV data from a file path, it is simply a wrapper on top of NewCSVStream.

sink := tuna.NewCSVSinkFromPath("path/to/file")
Writing a custom Sink

The Sink interface has the following signature:

type Sink interface {
    Write(rows <-chan Row) error
}

A Sink simply has to be able to write a channel of Rows "somewhere".

The Run method

Using the Run method is quite straightforward.

checkpoint := 1e5
err := Run(stream, agg, sink, checkpoint)

You simply have to provide it with a Stream, an Agg, and a Sink. It will feed the Agg with the Rows produced by the Stream one by one. Once the Stream is depleted the results of the Agg will be written to the Sink. An error will be returned if anything goes wrong along the way. The Run method will also display live progress in the console every time the number of parsed rows is a multiple of checkpoint, e.g.

00:00:02 -- 300,000 rows -- 179,317 rows/second

☝ In the future there might be a Runner interface to allow more flexibility. In the meantime you can copy/paste the content of the Run method and modify it as needed if you want to do something fancy (like monitoring progress inside a web page or whatnot)

Roadmap

  • Running median (and quantiles!)
  • DSL
  • CLI tool based on the DSL
  • Maybe handle dependencies between aggs (for example Variance could reuse Mean)
  • Benchmark and identify bottlenecks

License

The MIT License (MIT). Please see the LICENSE file for more information.

Documentation

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func Run

func Run(stream Stream, agg Agg, sink Sink, checkpoint uint) error

Run applies an against a stream. It will display the current progression at every multiple of checkpoint.

Example
// For the sake of example we inline the data, but usually it should be
// located in a file, database, or some other source
in := `name,£,bangers
Del Boy,-42,1
Rodney,1001,1
Rodney,1002,2
Del Boy,42,0
Grandad,0,3`

// Define a Stream
stream, _ := NewCSVStream(strings.NewReader(in))

// Define an Agg
agg := NewGroupBy(
	"name",
	func() Agg {
		return Aggs{
			NewExtractor("£", NewMean(), NewSum()),
			NewExtractor("bangers", NewSum()),
		}
	},
)

// Define a Sink
sink := NewCSVSink(os.Stdout)

// Run
Run(stream, agg, sink, 0)
Output:

bangers_sum,name,£_mean,£_sum
1,Del Boy,0,0
3,Grandad,0,0
3,Rodney,1001.5,2003

Types

type Agg

type Agg interface {
	Update(Row) error
	Collect() <-chan Row
}

An Agg takes Rows in and spits Rows out.

type Aggs

type Aggs []Agg

Aggs is also an Agg.

func (Aggs) Collect

func (aggs Aggs) Collect() <-chan Row

Collect merges and returns the outputs of each Agg's Collect method.

func (Aggs) Update

func (aggs Aggs) Update(row Row) error

Update calls Update on each Agg.

type CSVSink

type CSVSink struct {
	// contains filtered or unexported fields
}

CSVSink persist the output of an Agg's Collect method to a CSV file. The columns are ordered in lexical order.

func NewCSVSink

func NewCSVSink(writer io.Writer) *CSVSink

NewCSVSink returns a CSVSink which persists results to the given file.

func NewCSVSinkFromPath

func NewCSVSinkFromPath(path string) (*CSVSink, error)

NewCSVSinkFromPath returns a CSVSink which persists results to the given path.

func (*CSVSink) Write

func (cw *CSVSink) Write(rows <-chan Row) error

Write to a CSV located at Path.

type Diff

type Diff struct {
	Metric Metric
	// contains filtered or unexported fields
}

Diff runs an Agg on the (x[i+1] - x[i]) version of a stream of values. This can be used in conjunction with a GroupBy to compute rolling statistics.

func NewDiff

func NewDiff(m Metric) *Diff

NewDiff returns a Diff.

func (Diff) Collect

func (d Diff) Collect() map[string]float64

Collect returns the current value.

func (*Diff) Update

func (d *Diff) Update(x float64) error

Update Diff given a Row.

type ErrRow

type ErrRow struct {
	Row
	Err error
}

An ErrRow is a Row that has an accompanying error.

type ErrUnknownField

type ErrUnknownField struct {
	// contains filtered or unexported fields
}

An ErrUnknownField occurs trying to access an unexisting Row field.

func (ErrUnknownField) Error

func (e ErrUnknownField) Error() string

Error implements the Error interface.

type Extractor

type Extractor struct {
	Extract func(row Row) (float64, error)
	Metric  Metric
	Prefix  string
}

An Extractor is the simplest kind of Agg. It feeds the output of Extract to a Metric.

func NewExtractor

func NewExtractor(field string, metrics ...Metric) Extractor

NewExtractor returns an Extractor that parses a field as a float64.

func (Extractor) Collect

func (ex Extractor) Collect() <-chan Row

Collect converts the results from the Metric.

func (Extractor) Update

func (ex Extractor) Update(row Row) error

Update parses the Row using Extract and feeds the result to Metric.

type GroupBy

type GroupBy struct {
	By          string
	NewAgg      func() Agg
	SortResults bool
	// contains filtered or unexported fields
}

GroupBy maintains one Agg instance per group.

func NewGroupBy

func NewGroupBy(by string, newAgg func() Agg) *GroupBy

NewGroupBy returns a GroupBy that maintains a Agg for each distinct value of a given variable.

func (GroupBy) Collect

func (gb GroupBy) Collect() <-chan Row

Collect streams the Collect of each group. The groups are output in the lexical order of their keys.

func (*GroupBy) Update

func (gb *GroupBy) Update(row Row) error

Update updates the Agg of the Row's group.

type Kurtosis

type Kurtosis struct {
	// contains filtered or unexported fields
}

Kurtosis computes a running kurtosis using an extension of Welford's algorithm.

func NewKurtosis

func NewKurtosis() *Kurtosis

NewKurtosis returns a Kurtosis.

func (Kurtosis) Collect

func (k Kurtosis) Collect() map[string]float64

Collect returns the current value.

func (*Kurtosis) Update

func (k *Kurtosis) Update(x float64) error

Update Kurtosis given a Row.

type Max

type Max struct {
	// contains filtered or unexported fields
}

Max computes the maximal value of a column.

func NewMax

func NewMax() *Max

NewMax returns a Max that computes the mean of a given field.

func (Max) Collect

func (m Max) Collect() map[string]float64

Collect returns the current value.

func (*Max) Update

func (m *Max) Update(x float64) error

Update Max given a Row.

type Mean

type Mean struct {
	// contains filtered or unexported fields
}

Mean computes a running average. The result is an approximation but it is good enough for most purposes.

func NewMean

func NewMean() *Mean

NewMean returns a Mean.

func (Mean) Collect

func (m Mean) Collect() map[string]float64

Collect returns the current value.

func (*Mean) Update

func (m *Mean) Update(x float64) error

Update Mean given a Row.

type Metric

type Metric interface {
	Update(x float64) error
	Collect() map[string]float64
}

A Metric takes float64s in and spits out a map of float64s.

type Metrics

type Metrics []Metric

Metrics is also a Metric.

func (Metrics) Collect

func (ms Metrics) Collect() map[string]float64

Collect merges the results from each element.

func (Metrics) Update

func (ms Metrics) Update(x float64) error

Update calls Update on each element.

type Min

type Min struct {
	// contains filtered or unexported fields
}

Min computes the minimal value of a column.

func NewMin

func NewMin() *Min

NewMin returns a Min that computes the mean of a given field.

func (Min) Collect

func (m Min) Collect() map[string]float64

Collect returns the current value.

func (*Min) Update

func (m *Min) Update(x float64) error

Update Min given a Row.

type Row

type Row map[string]string

A Row maps a column name to a raw value.

type SequentialGroupBy

type SequentialGroupBy struct {
	By     string
	NewAgg func() Agg
	Sink   Sink
	// contains filtered or unexported fields
}

SequentialGroupBy maintains one Agg instance. Once a new group key is encoutered the Trigger is called. This has many practical use case for large but sequential data.

func NewSequentialGroupBy

func NewSequentialGroupBy(by string, newAgg func() Agg, sink Sink) *SequentialGroupBy

NewSequentialGroupBy returns a SequentialGroupBy that maintains an Agg for the given variable.

func (SequentialGroupBy) Collect

func (sgb SequentialGroupBy) Collect() <-chan Row

Collect streams the Collect of the current Agg.

func (*SequentialGroupBy) Flush

func (sgb *SequentialGroupBy) Flush() error

Flush writes the results of the Agg and resets it.

func (*SequentialGroupBy) Update

func (sgb *SequentialGroupBy) Update(row Row) error

Update updates the Agg of the Row's group.

type Sink

type Sink interface {
	Write(rows <-chan Row) error
}

A Sink can persist the output of an Agg's Collect method.

type Skew

type Skew struct {
	// contains filtered or unexported fields
}

Skew computes a running skew using an extension of Welford's algorithm.

func NewSkew

func NewSkew() *Skew

NewSkew returns a Skew that computes the mean of a given field.

func (Skew) Collect

func (s Skew) Collect() map[string]float64

Collect returns the current value.

func (*Skew) Update

func (s *Skew) Update(x float64) error

Update Skew given a Row.

type Stream

type Stream chan ErrRow

A Stream returns Rows one by one until it's source is depleted.

func NewCSVStream

func NewCSVStream(reader io.Reader) (Stream, error)

NewCSVStream returns a Stream from an io.Reader that reads strings that are assumed to CSV-parsable.

func NewCSVStreamFromPath

func NewCSVStreamFromPath(path string) (Stream, error)

NewCSVStreamFromPath returns a Stream from a CSV file.

func NewFuncStream

func NewFuncStream(f func() Row, n uint) Stream

NewFuncStream returns a Stream that calls function n times and returns the resulting Rows.

func NewStream

func NewStream(rows ...Row) Stream

NewStream returns a Stream from a slice of Rows. It is mainly here for demonstration and testing purposes.

func ZipStreams

func ZipStreams(ss ...Stream) Stream

ZipStreams returns a Stream that iterates over multiple streams one by one. This is quite convinient for going through a dataset which has been split into multiple parts.

type Sum

type Sum struct {
	// contains filtered or unexported fields
}

Sum computes a running sum.

func NewSum

func NewSum() *Sum

NewSum returns a Sum that computes the mean of a given field.

func (Sum) Collect

func (s Sum) Collect() map[string]float64

Collect returns the current value.

func (*Sum) Update

func (s *Sum) Update(x float64) error

Update Sum given a Row.

type Variance

type Variance struct {
	// contains filtered or unexported fields
}

Variance computes a running average using Welford's algorithm.

func NewVariance

func NewVariance() *Variance

NewVariance returns a Variance that computes the mean of a given field.

func (Variance) Collect

func (v Variance) Collect() map[string]float64

Collect returns the current value.

func (*Variance) Update

func (v *Variance) Update(x float64) error

Update Variance given a Row.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL