pipeline

package module
v0.0.0-...-56e50fe Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 23, 2021 License: MIT Imports: 9 Imported by: 2

README

pipeline - create fast Go pipelines build

nerd pipeline is a package enabling you to create custom pipelines in Go.

It provides you with filters like Serial or Parallel and leaves it to you, the user, to focus only on what transformations to do on the data. No external library used.

Works well anywhere where multi-stage processing takes place, for example REST API for image management. Great to couple with message queues.

package example

import (
	"context"
	"fmt"
	"github.com/ele7ija/pipeline"
)

type LShiftWorker struct {
}

func (w *LShiftWorker) Work(ctx context.Context, in pipeline.Item) (pipeline.Item, error) {

	n := in.(int)
	return n << 1, nil
}

func main() {
	
	f := pipeline.NewSerialFilter(&LShiftWorker{}, &LShiftWorker{})
	p := pipeline.NewPipeline("My pipeline", f)

	items := make(chan pipeline.Item, 5)
	errors := make(chan error, 5)
	items <- 0
	items <- 1
	items <- 2
	items <- 3
	items <- 4
	close(items)

	filteredItems := p.Filter(context.Background(), items, errors)

	go func() {
		for range errors {
			fmt.Println("Oh no, an error!")
		}
	}()
	for filteredItem := range filteredItems {
		fmt.Println(filteredItem.(int))
	}
	close(errors)
}

// Output:
// 0
// 4
// 8
// 12
// 16

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type BoundedParallelFilter

type BoundedParallelFilter struct {
	// contains filtered or unexported fields
}

BoundedParallelFilter can filter up to N items at a time

func NewBoundedParallelFilter

func NewBoundedParallelFilter(bound int, workers ...Worker) *BoundedParallelFilter

func (*BoundedParallelFilter) Filter

func (f *BoundedParallelFilter) Filter(ctx context.Context, in <-chan Item, errors chan<- error) <-chan Item

func (*BoundedParallelFilter) GetStat

type Filter

type Filter interface {
	Filter(ctx context.Context, in <-chan Item, errors chan<- error) <-chan Item
	GetStat() FilterExecutionStat

} // I process a bunch of items. I do not stop processing if there's an error.

type FilterExecutionStat

type FilterExecutionStat struct {
	FilterName    string
	FilterType    string
	TotalDuration time.Duration
	TotalWork     time.Duration
	TotalWaiting  time.Duration
	NumberOfItems uint64

} // I keep track of the performance of a Filter.

type IndependentSerialFilter

type IndependentSerialFilter struct {
	// contains filtered or unexported fields
}

IndependentSerialFilter filters one item at a time and sends it only when it filtered all of them.

func NewIndependentSerialFilter

func NewIndependentSerialFilter(workers ...Worker) *IndependentSerialFilter

func (*IndependentSerialFilter) Filter

func (f *IndependentSerialFilter) Filter(ctx context.Context, in <-chan Item, errors chan<- error) <-chan Item

func (*IndependentSerialFilter) GetStat

type Item

type Item interface{} // I'm processed in the pipeline.

type ParallelFilter

type ParallelFilter struct {
	// contains filtered or unexported fields
}

ParallelFilter can filter multiple items at a time

func NewParallelFilter

func NewParallelFilter(workers ...Worker) *ParallelFilter

func (*ParallelFilter) Filter

func (f *ParallelFilter) Filter(ctx context.Context, in <-chan Item, errors chan<- error) <-chan Item

func (*ParallelFilter) GetStat

func (f *ParallelFilter) GetStat() FilterExecutionStat

type Pipeline

type Pipeline struct {
	FilteringDuration time.Duration
	FilteringNumber   int
	// contains filtered or unexported fields

} // I process a bunch of items. I contain filters, but I'm also a Filter!

func NewPipeline

func NewPipeline(name string, filters ...Filter) *Pipeline

func (*Pipeline) Filter

func (p *Pipeline) Filter(ctx context.Context, in <-chan Item, errors chan<- error) <-chan Item

func (*Pipeline) GetStat

func (p *Pipeline) GetStat() FilterExecutionStat

func (*Pipeline) SaveStats

func (p *Pipeline) SaveStats() error

SaveStats saves stats from filters to a temp file

func (*Pipeline) StartExtracting

func (p *Pipeline) StartExtracting(dur time.Duration)

type PipelineStat

type PipelineStat struct {
	PipelineName           string
	FilterStats            []FilterExecutionStat
	TotalDuration          time.Duration
	TotalNumberOfFiltering int
}

type SerialFilter

type SerialFilter struct {
	// contains filtered or unexported fields
}

SerialFilter filters a single item at a time as they come

func NewSerialFilter

func NewSerialFilter(workers ...Worker) *SerialFilter

func (*SerialFilter) Filter

func (f *SerialFilter) Filter(ctx context.Context, in <-chan Item, errors chan<- error) <-chan Item

func (*SerialFilter) GetStat

func (f *SerialFilter) GetStat() FilterExecutionStat

type Worker

type Worker interface {
	Work(ctx context.Context, in Item) (Item, error)

} // I process a single Item.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL