pipeline

package module
v0.0.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 16, 2020 License: MIT Imports: 6 Imported by: 0

README

pipeline

Go Report Card

Build your own data pipeline to gather, organize and transform data by using protobuf as an intermediate format.

Example sources

Purpose

According to Daniel Whitenack, surveys have shown that 90% or more of a data scientist's time is spent collecting, organizing and cleansing data. Thus, I created an extensible data pipeline to implement the following process.

Process

+----------+    +------------+    +-------------+    +------------+    +------------+
|  Gather  +---->  Organize  +---->  Transform  +---->  Evaluate  +---->  Validate  |
+----------+    +------------+    +------^------+    +------------+    +------+-----+
                                         |                                    |
                                         +------------------------------------+

We should treat data and its format as immutable, and especially if you are using data from an external ressource or team. So at first we need to gather the data and organize its raw format to an interim one, which is portable and can be used by many tools. Thus, I decided to use Google's protobuf for portability.

Then you can extract features and transform your data into a new format. Finally, you evaluate and validate your model and repeat the process until you reached the final cleaned model, which can be used for scoring.

You can also load and save the data at any step in the pipeline.

Installation

First install the Protobuf Compiler and the corresponding Protobuf Go Plugin manually or use the following command:

make

Usage

The first time you can create the whole process at once:

func main() {
    p := new(pipeline.Pipeline)

    p.Gather("https://archive.ics.uci.edu/ml/machine-learning-databases/iris/iris.data", "data/external/iris.csv").
    Organize("data/external/iris.csv", func(records [][]string) (out proto.Message, err error) {
        return nil, nil
    }).
    Save("data/interim/iris.pb").
    Transform(func(in proto.Message) (out proto.Message, err error) {
        // Extract features ...
        // Normalize ...
        // Whatever ...
        return nil, nil
    }).
    Save("data/processed/iris.pb").
    Evaluate("evaluate something ...", func(in interface{}, data proto.Message) error {
        // Classification ...
        // Regression ...
        // More ...
        // Save metrics for validation
        return nil
    }).
    Validate("validate something ...", func(in interface{}, data proto.Message) error {
        // Use the metrics ...
        // Print statistics ...
        // Plot diagrams ...
	return nil
    })
    // Handle errors - If a certain step has failed, the next steps are ignored.
    if err := p.Error(); err != nil {
        log.Fatal(err)
    }
}

Next you can use the Load method to speedup the process and decouple your project from the external resource:

func main() {
    p := new(pipeline.Pipeline)

    p.Load("data/interim/iris.pb").
    Transform(func(in proto.Message) (out proto.Message, err error) {
        // Extract features ...
        // Normalize ...
        // Whatever ...
        return nil, nil
    }).
    Save("data/processed/iris.pb").
    Evaluate("evaluate something ...", func(in interface{}, data proto.Message) error {
        // Classification ...
        // Regression ...
        // More ...
        // Save metrics for validation
        return nil
    }).
    Validate("validate something ...", func(in interface{}, data proto.Message) error {
        // Use the metrics ...
        // Print statistics ...
        // Plot diagrams ...
    return nil
    })
    // Handle errors - If a certain step has failed, the next steps are ignored.
    if err := p.Error(); err != nil {
        log.Fatal(err)
    }
}

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Pipeline

type Pipeline struct {
	// contains filtered or unexported fields
}

Pipeline provides an easy-to-use interface to work with data structures in context of data science and machine learning.

func (*Pipeline) Data

func (p *Pipeline) Data() proto.Message

Data returns the underlying raw data which is currently in the pipeline.

func (*Pipeline) Error

func (p *Pipeline) Error() error

Error returns the current error state of the pipeline.

func (*Pipeline) Evaluate

func (p *Pipeline) Evaluate(in interface{}, fn func(in interface{}, data proto.Message) (err error)) *Pipeline

Evaluate tries to guess a specific output by using the data and a given input.

func (*Pipeline) Gather

func (p *Pipeline) Gather(sourceUrl, targetFile string) *Pipeline

Gather retrieves raw pipeline from an external resource and saves that into a single file.

func (*Pipeline) Load

func (p *Pipeline) Load(protoFilename string, fn func(raw []byte) (out proto.Message, err error)) *Pipeline

Load reads a protobuf file and pass the raw bytes into a given function fn.

func (*Pipeline) Organize

func (p *Pipeline) Organize(csvFilename string, fn func(records [][]string) (out proto.Message, err error)) *Pipeline

Organize reads a CSV file and provides serialization to a protobuf format via a given function fn.

func (*Pipeline) Save

func (p *Pipeline) Save(protoFilename string) *Pipeline

Save writes a protobuf structure from memory into a specific file.

func (*Pipeline) Transform

func (p *Pipeline) Transform(fn func(in proto.Message) (out proto.Message, err error)) *Pipeline

Transform translates a given protobuf message into another protobuf message by using a specific function.

func (*Pipeline) Validate

func (p *Pipeline) Validate(in interface{}, fn func(in interface{}, data proto.Message) (err error)) *Pipeline

Validate calculates the quality of a model by using the data and a given input.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL