Documentation ¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
var ErrPipelineAborted = errors.New("pipeline aborted")
ErrPipelidneAborted is the error returned from Pipeline.Wait when Pipeline.Abort is called.
var ErrUnknownStageName = errors.New("unknown stage name")
ErrUnknownStageName is the error returned when an unknown stage name is referenced
var NoProps = ImmutableProperties{}
NoProps is an empty ImmutableProperties struct
Functions ¶
This section is empty.
Types ¶
type ImmutableProperties ¶
type ImmutableProperties struct {
// contains filtered or unexported fields
}
ImmutableProperties is a map of properties which can't be edited after creation
func NewImmutableProps ¶
func NewImmutableProps(props map[string]interface{}) ImmutableProperties
func (ImmutableProperties) Get ¶
func (ip ImmutableProperties) Get(propName string) (interface{}, bool)
Get retrieves an element from the map, and a bool which says if there was a property that exists with that name at all
func (ImmutableProperties) Len ¶
func (ip ImmutableProperties) Len() int
func (ImmutableProperties) Set ¶
func (ip ImmutableProperties) Set(updates map[string]interface{}) ImmutableProperties
Set will create a new ImmutableProperties struct whose values are the original properties combined with the provided updates
type ItemWithProps ¶
type ItemWithProps interface { // GetItem retrieves the item GetItem() interface{} // GetProperties retrieves properties attached to the item GetProperties() ImmutableProperties }
ItemWithProps is an interface for an item which is passed through a pipeline
func NewItemWithNoProps ¶
func NewItemWithNoProps(item interface{}) ItemWithProps
NewItemWithNoProps creates an item with no properties
func NewItemWithProps ¶
func NewItemWithProps(item interface{}, props ImmutableProperties) ItemWithProps
NewItemWithProps creates an item with props from an item and a map of properties
type LocalStorage ¶
type LocalStorage map[int]interface{}
TODO: get rid of this LocalStorage provides routine local storage on go routines spawned by a pipeline
func GetLocalStorage ¶
func GetLocalStorage(ctx context.Context) LocalStorage
GetLocalStorage retrieves the LocalStorage from the context. This only works if the context was generated by the pipeline package when starting the pipeline
func (LocalStorage) Get ¶
func (ls LocalStorage) Get(id int) (interface{}, bool)
Get retrieves an item from localStorage
func (LocalStorage) Put ¶
func (ls LocalStorage) Put(id int, val interface{})
Put stores an item in local storage
type Pipeline ¶
type Pipeline struct {
// contains filtered or unexported fields
}
Pipeline is a batch processor which takes data in batches and transforms it in stages
func NewPipeline ¶
NewPipeline creates a new Pipeline from an ordered slice of stages. The first stage in the pipeline must produce data and each stage will pass data on to the next stage.
func (*Pipeline) Abort ¶
func (p *Pipeline) Abort()
Abort aborts the pipeline. After abort is called the pipeline will continue running closing asynchronously Use Wait() if you want to wait for the pipeline to finish closing before continuing.
func (*Pipeline) GetInputChannel ¶
func (p *Pipeline) GetInputChannel(stageName string) (chan []ItemWithProps, error)
GetInputChannel gets the input channel for a pipeline stage
type ReadableMap ¶
type ReadableMap interface { // Get retrieves an element from the map, and a bool which says if there was a property that exists with that // name at all Get(propName string) (interface{}, bool) }
ReadableMap is an interface that provides read only access to map properties
type Stage ¶
type Stage struct {
// contains filtered or unexported fields
}
Stage is a parallelizable portion of a pipeline which reads data from an input channel, transforms it, and then writes it to an output channel. The first stage of a pipeline will not read from it's input channel, and the last stage of a pipeline should only write ErrorItems
type StageFunc ¶
type StageFunc func(ctx context.Context, inBatch []ItemWithProps) ([]ItemWithProps, error)
StageFunc takes a batch of items, and returns a new batch of items that have been transformed. The first StageFunc in a pipeline will receive nil input batches, and should produce batches to be processed by the pipeline. Other stages in the pipeline will only receive a nil input batch as a signal to flush any items that it is bufferring internally.