pipeline

package module
v0.2.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 22, 2023 License: Apache-2.0 Imports: 6 Imported by: 31

README

Data Pipeline

GitHub Test Status GoDoc License Go Report CodeFactor Codecov Follow on Twitter

Simple asynchronous data pipeline written in Go with support for concurrent tasks at each stage.

Installation Go Version

go get -v -u github.com/caffix/pipeline

Usage

The pipeline processes data provided by the input source through multiple stages and finally consumed by the output sink. All steps of the pipeline can be executing concurrently to maximize throughput. The pipeline can also be executed with buffering in-between each step in an attempt to minimize the impact of one stage taking longer than the others. Any error returned from a task being executed will terminate the pipeline. If a task returns nil data, the data is marked as processed and will not continue to the following stage.

The Pipeline Data

The pipeline Data implements the Clone and MarkAsProcessed methods that performs a deep copy and marks the data to prevent further movement down the pipeline, respectively. Below is a simple pipeline Data implementation:

type stringData struct {
	processed bool
	val       string
}

// Clone implements the pipeline Data interface.
func (s *stringData) Clone() pipeline.Data { return &stringData{val: s.val} }

// Clone implements the pipeline Data interface.
func (s *stringData) MarkAsProcessed() { s.processed = true }

// String implements the Stringer interface.
func (s *stringData) String() string   { return s.val }
The Input Source

The InputSource is an iterator that feeds the pipeline with data. Once the Next method returns false, the pipeline prevents the following stage from receiving data and begins an avalanche affect stopping each stage and eventually terminating the pipeline. Below is a simple input source:

type stringSource []pipeline.Data

var source stringSource = []*stringData{
    &stringData{val: "one"},
    &stringData{val: "two"},
    &stringData{val: "three"},
}

// Next implements the pipeline InputSource interface.
func (s stringSource) Next(context.Context) bool { return len(s) > 0 }

// Data implements the pipeline InputSource interface.
func (s stringSource) Data() pipeline.Data {
    defer func() { s = s[1:] }
    return s[0]
}

// Error implements the pipeline InputSource interface.
func (s stringSource) Error() error { return nil }
The Output Sink

The OutputSink serves as a final landing spot for the data after successfully traversing the entire pipeline. All data reaching the output sink is automatically marked as processed. Below is a simple output sink:

type stringSink []string

// Consume implements the pipeline OutputSink interface.
func (s stringSink) Consume(ctx context.Context, data pipeline.Data) error {
    sd := data.(*stringData)

    s = append(s, sd.String())
    return nil
}
The Stages

The pipeline steps are executed in sequential order by instances of Stage. The execution strategies implemented are FIFO, FixedPool, DynamicPool, Broadcast, and Parallel:

  • FIFO - Executes the single Task
  • FixedPool - Executes a fixed number of instances of the one specified Task
  • DynamicPool - Executes a dynamic number of instances of the one specified Task
  • Broadcast - Executes several unique Task instances concurrently moving Data ASAP
  • Parallel - Executes several unique Task instances concurrently and passing through the original Data only once all the tasks complete successfully

The stage execution strategies can be combined to form desired pipelines. A Stage requires at least one Task to be executed at the step it represents in the pipeline. Each Task returns Data and an error. If the data returned is nil, it will not be sent to the following Stage. If the error is non-nil, the entire pipeline will be terminated. This allows users of the pipeline to have complete control over how failures impact the overall pipeline execution. A Task implements the Process method.

// TaskFunc is defined as a function with a Process method that calls the function
task := pipeline.TaskFunc(func(ctx context.Context, data pipeline.Data, tp pipeline.TaskParams) (pipeline.Data, error) {
    var val int
    s := data.(*stringData)

	switch s.String() {
    case "one":
        val = 1
    case "two":
        var = 2
    case "three":
        var = 3
    }

    data.val = fmt.Sprintf("%s - %d", s.String(), val)
    return data, nil
})

stage := pipeline.FIFO("", task)
Executing the Pipeline

The Pipeline continues executing until all the Data from the input source is processed, an error takes place, or the provided Context expires. At a minimum, the pipeline requires an input source, a pass through stage, and the output sink.

p := NewPipeline(stage)

if err := p.Execute(context.TODO(), source, sink); err != nil {
    fmt.Printf("Error executing the pipeline: %v\n", err)
}

Future Features

Some additional features would bring value to this data pipeline implementation.

Logging

No logging is built into this pipeline implementation and this could be quite useful to have.

Metrics and Monitoring

It would be helpful to have the ability to monitor stage and task performance such as how long each is taking to execute, the number of Data instances processes, the number of successes and failures, etc.

Task Implementations for Common Use Cases

This pipeline implementation is very abstract, which allows users to perform nearly any set of steps. Currently, users must implement their own tasks. Some tasks are very common and the project could build support for such activities. For example, executing a script pulled from a Git repo.

Support for Configuration Files

As the implementation becomes from complex, it could be helpful to support the use of configuration files and reduce the level of effort necessary to build a pipeline. For example, the configuration file could specify when tasks should be output to alternative stages.

Develop Additional Stage Execution Strategies

While the current execution strategies work for many use cases, there could be opportunities to develop additional stage types that ease pipeline development.

Licensing License

This program is free software: you can redistribute it and/or modify it under the terms of the Apache license.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func SendData

func SendData(ctx context.Context, stage string, data Data, tp TaskParams)

SendData marks the provided data as new to the pipeline and sends it to the provided named stage.

Types

type Data

type Data interface {
	// Clone returns a new Data that is a deep-copy of the original.
	Clone() Data
}

Data is implemented by values that can be sent through a pipeline.

type InputSource

type InputSource interface {
	// Next fetches the next data element from the source. If no more items are
	// available or an error occurs, calls to Next return false.
	Next(context.Context) bool

	// Data returns the next data to be processed.
	Data() Data

	// Error return the last error observed by the source.
	Error() error
}

InputSource is implemented by types that generate Data instances which can be used as inputs to a Pipeline instance.

type OutputSink

type OutputSink interface {
	// Consume processes a Data instance that has been emitted out of
	// a Pipeline instance.
	Consume(context.Context, Data) error
}

OutputSink is implemented by types that can operate as the tail of a pipeline.

type Pipeline

type Pipeline struct {
	sync.Mutex
	// contains filtered or unexported fields
}

Pipeline is an abstract and extendable asynchronous data pipeline with concurrent tasks at each stage. Each pipeline is constructed from an InputSource, an OutputSink, and zero or more Stage instances for processing.

func NewPipeline

func NewPipeline(stages ...Stage) *Pipeline

NewPipeline returns a new data pipeline instance where input traverse each of the provided Stage instances.

func (*Pipeline) DataItemCount added in v0.2.0

func (p *Pipeline) DataItemCount() int

DataItemCount returns the number of data items currently on the pipeline.

func (*Pipeline) Execute

func (p *Pipeline) Execute(ctx context.Context, src InputSource, sink OutputSink) error

Execute performs ExecuteBuffered with a bufsize parameter equal to 1.

func (*Pipeline) ExecuteBuffered

func (p *Pipeline) ExecuteBuffered(ctx context.Context, src InputSource, sink OutputSink, bufsize int) error

ExecuteBuffered reads data from the InputSource, sends them through each of the Stage instances, and finishes with the OutputSink. All errors are returned that occurred during the execution. ExecuteBuffered will block until all data from the InputSource has been processed, or an error occurs, or the context expires.

type SinkFunc

type SinkFunc func(context.Context, Data) error

SinkFunc is an adapter to allow the use of plain functions as OutputSink instances.

func (SinkFunc) Consume

func (f SinkFunc) Consume(ctx context.Context, data Data) error

Consume calls f(ctx, data)

type Stage

type Stage interface {
	// ID returns the optional identifier assigned to this stage.
	ID() string

	// Run executes the processing logic for this stage by reading
	// data from the input channel, processing the data and sending
	// the results to the output channel. Run blocks until the stage
	// input channel is closed, the context expires, or an error occurs.
	Run(context.Context, StageParams)
}

Stage is designed to be executed in sequential order to form a multi-stage data pipeline.

func Broadcast

func Broadcast(id string, tasks ...Task) Stage

Broadcast returns a Stage that passes a copy of each incoming data to all specified tasks and emits their outputs to the next stage.

func DynamicPool

func DynamicPool(id string, task Task, max int) Stage

DynamicPool returns a Stage that maintains a dynamic pool that can scale up to max parallel tasks for processing incoming inputs in parallel and emitting their outputs to the next stage.

func FIFO

func FIFO(id string, task Task) Stage

FIFO returns a Stage that processes incoming data in a first-in first-out fashion. Each input is passed to the specified Task and its output is emitted to the next Stage.

func FixedPool

func FixedPool(id string, task Task, num int) Stage

FixedPool returns a Stage that spins up a pool containing numWorkers to process incoming data in parallel and emit their outputs to the next stage.

func Parallel

func Parallel(id string, tasks ...Task) Stage

Parallel returns a Stage that passes a copy of each incoming Data to all specified tasks, waits for all the tasks to finish before sending data to the next stage, and only passes the original Data through to the following stage.

type StageParams

type StageParams interface {
	// Pipeline returns the pipeline executing this stage.
	Pipeline() *Pipeline

	// Position returns the position of this stage in the pipeline.
	Position() int

	// Input returns the input channel for this stage.
	Input() <-chan Data

	// Output returns the output channel for this stage.
	Output() chan<- Data

	// DataQueue returns the alternative data queue for this stage.
	DataQueue() queue.Queue

	// Error returns the queue that reports errors encountered by the stage.
	Error() queue.Queue

	// Registry returns a map of stage names to stage input channels.
	Registry() StageRegistry
}

StageParams provides the information needed for executing a pipeline Stage. The Pipeline passes a StageParams instance to the Run method of each stage.

type StageRegistry

type StageRegistry map[string]queue.Queue

StageRegistry is a map of stage identifiers to input channels.

type Task

type Task interface {
	// Process operates on the input data and returns back a new data to be
	// forwarded to the next pipeline stage. Task instances may also opt to
	// prevent the data from reaching the rest of the pipeline by returning
	// a nil data value instead.
	Process(context.Context, Data, TaskParams) (Data, error)
}

Task is implemented by types that can process Data as part of a pipeline stage.

type TaskFunc

type TaskFunc func(context.Context, Data, TaskParams) (Data, error)

TaskFunc is an adapter to allow the use of plain functions as Task instances.

func (TaskFunc) Process

func (f TaskFunc) Process(ctx context.Context, data Data, params TaskParams) (Data, error)

Process calls f(ctx, data)

type TaskParams

type TaskParams interface {
	// Pipeline returns the pipeline executing this task.
	Pipeline() *Pipeline

	// Registry returns a map of stage names to stage input channels.
	Registry() StageRegistry
}

TaskParams provides access to pipeline mechanisms needed by a Task. The Stage passes a TaskParams instance to the Process method of each task.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL