pipeline

package
v0.40.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 19, 2022 License: Apache-2.0 Imports: 6 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrPipelineAborted = errors.New("pipeline aborted")

ErrPipelidneAborted is the error returned from Pipeline.Wait when Pipeline.Abort is called.

View Source
var ErrUnknownStageName = errors.New("unknown stage name")

ErrUnknownStageName is the error returned when an unknown stage name is referenced

NoProps is an empty ImmutableProperties struct

Functions

This section is empty.

Types

type ImmutableProperties

type ImmutableProperties struct {
	// contains filtered or unexported fields
}

ImmutableProperties is a map of properties which can't be edited after creation

func NewImmutableProps

func NewImmutableProps(props map[string]interface{}) ImmutableProperties

func (ImmutableProperties) Get

func (ip ImmutableProperties) Get(propName string) (interface{}, bool)

Get retrieves an element from the map, and a bool which says if there was a property that exists with that name at all

func (ImmutableProperties) Len

func (ip ImmutableProperties) Len() int

func (ImmutableProperties) Set

func (ip ImmutableProperties) Set(updates map[string]interface{}) ImmutableProperties

Set will create a new ImmutableProperties struct whose values are the original properties combined with the provided updates

type ItemWithProps

type ItemWithProps interface {
	// GetItem retrieves the item
	GetItem() interface{}
	// GetProperties retrieves properties attached to the item
	GetProperties() ImmutableProperties
}

ItemWithProps is an interface for an item which is passed through a pipeline

func NewItemWithNoProps

func NewItemWithNoProps(item interface{}) ItemWithProps

NewItemWithNoProps creates an item with no properties

func NewItemWithProps

func NewItemWithProps(item interface{}, props ImmutableProperties) ItemWithProps

NewItemWithProps creates an item with props from an item and a map of properties

type LocalStorage

type LocalStorage map[int]interface{}

TODO: get rid of this LocalStorage provides routine local storage on go routines spawned by a pipeline

func GetLocalStorage

func GetLocalStorage(ctx context.Context) LocalStorage

GetLocalStorage retrieves the LocalStorage from the context. This only works if the context was generated by the pipeline package when starting the pipeline

func (LocalStorage) Get

func (ls LocalStorage) Get(id int) (interface{}, bool)

Get retrieves an item from localStorage

func (LocalStorage) Put

func (ls LocalStorage) Put(id int, val interface{})

Put stores an item in local storage

type Pipeline

type Pipeline struct {
	// contains filtered or unexported fields
}

Pipeline is a batch processor which takes data in batches and transforms it in stages

func NewPipeline

func NewPipeline(stages ...*Stage) *Pipeline

NewPipeline creates a new Pipeline from an ordered slice of stages. The first stage in the pipeline must produce data and each stage will pass data on to the next stage.

func (*Pipeline) Abort

func (p *Pipeline) Abort()

Abort aborts the pipeline. After abort is called the pipeline will continue running closing asynchronously Use Wait() if you want to wait for the pipeline to finish closing before continuing.

func (*Pipeline) GetInputChannel

func (p *Pipeline) GetInputChannel(stageName string) (chan []ItemWithProps, error)

GetInputChannel gets the input channel for a pipeline stage

func (*Pipeline) Start

func (p *Pipeline) Start(ctx context.Context)

Start the pipeline

func (*Pipeline) Wait

func (p *Pipeline) Wait() error

Wait waits for the pipeline to finish

type ReadableMap

type ReadableMap interface {
	// Get retrieves an element from the map, and a bool which says if there was a property that exists with that
	// name at all
	Get(propName string) (interface{}, bool)
}

ReadableMap is an interface that provides read only access to map properties

type Stage

type Stage struct {
	// contains filtered or unexported fields
}

Stage is a parallelizable portion of a pipeline which reads data from an input channel, transforms it, and then writes it to an output channel. The first stage of a pipeline will not read from it's input channel, and the last stage of a pipeline should only write ErrorItems

func NewStage

func NewStage(name string, initFunc StageInitFunc, stageFunc StageFunc, parallelism, inBufferSize, inBatchSize int) *Stage

NewStage creates a new pipeline

type StageFunc

type StageFunc func(ctx context.Context, inBatch []ItemWithProps) ([]ItemWithProps, error)

StageFunc takes a batch of items, and returns a new batch of items that have been transformed. The first StageFunc in a pipeline will receive nil input batches, and should produce batches to be processed by the pipeline. Other stages in the pipeline will only receive a nil input batch as a signal to flush any items that it is bufferring internally.

type StageInitFunc

type StageInitFunc func(ctx context.Context, stageRoutineIndex int) error

StageInitFunc is an initialization call made by each go routine executing the stage of the pipeline

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL