pipeline

package
v0.0.0-...-6cbedfc Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 11, 2023 License: Apache-2.0 Imports: 5 Imported by: 0

Documentation

Overview

Package pipeline provides tools for rapidly assembling and tuning parallel software pipelines.

Software pipelines and work decomposition

Many parallelization problems decompose via data parallelism: some data set is partitioned into independent chunks, each of which is processed by its own concurrent worker, and all such workers behave the same.

Some parallelization problems, however, are not easily data-decomponsed. Most commonly, this is because no independent partition is available in the data set, or enabling such a partition is prohibitively complex.

For some problems like this, software pipelines (https://en.wikipedia.org/wiki/Pipeline_(computing)) provide an alternative decomposition. Rather than decomposing the data set into pieces handled by multiple independent and identical tasks, these instead decompose the work into multiple heterogeneous tasks which operate in sequence on a given work item; work items thus move through the pipeline from production to completion. In this way, work that requires exclusive access can be done in parallel with work that doesn't. Such systems are frequently compared to assembly lines: a single item moves through the pipeline, enjoying exclusive access to different resources at its different stops along the way, and over the duration of the pipeline the item is completed.

Parallelism via data decomposition should be used wherever possible. Work decomposition is considerably more nuanced and fragile, and the maximum concurrency achievable by work decomposition is limited by the nature of the decomposed work: typically this permits far less concurrency than data decomposition would.

When data decomposition is not an option, however, this package can facilitate the construction and analysis of work-decomposing parallel pipelines. The central concept of the pipeline is the work item, an object that moves through the pipeline and is worked on or mutated in turn by each stage. In this package, the work item is type-parameterized in most types and functions, where it is constrained to `any`.

Building a software pipeline

A pipeline comprises a single producer stage, followed by one or more intermediate stages. The producer originates work items, then sends them into the remainder of the pipeline. Each stage takes work items, does some work upon or with them, then sends them to the next stage.

Producers must be able to create new work items, but may also support reusing work items that have already been through the pipeline; this is useful when work items' types are large or complex and allocation might dominate. At minimum, a producer is a function (`ProducerFn`) accepting a `put func(T)` argument; invoking `put(i)` within the producer inserts `i` into the pipeline. Recycling producers (`RecyclingProducerFn`) also accept a `get func() (i T, ok bool)` argument, a non-blocking function which returns a work item `i` that has retired from the end of the pipeline, or if no such retired work item is available, returns `false`.

Subsequent stages in the pipeline implement `StageFn`, which accepts an input work item, performs work on that item, then returns it (or another instance of the same type reflecting the work done).

Running a software pipeline

A pipeline is executed with `Do()`. This function accepts a single producer, created with `NewProducer()` or `NewRecyclingProducer()`, and at least one stage, created with `NewStage()`. These constructors accept a function of the appropriate type (`ProducerFn`, `RecyclingProducerFn`, and `StageFn`, respectively) as well as a variadic set of stage options such as the name of the stage. A pipeline is linear, not branchy like true stream processing systems: each stage has at most one input and one output, and from the perspective of a single work item, the stages are processed in the order they appear in the arguments to `Do()`.

Besides `Do()`, two other executor functions are defined in this package. `Measure()` works like `Do()`, but returns metrics about work time spent in each stage, and is useful for tuning a pipeline. However, `Measure()` introduces profiling overhead and should be replaced with `Do()` once the tuning is complete. Finally, `SequentialDo()` works like `Do()`, but with sequential, single-threaded execution and no synchronization overhead. Like `Measure()`, `SequentialDo()` should only be used for assessment and tuning.

Designing and tuning a software pipeline

As always when parallelizing, avoid too-fine-grained decompositions. Each unit of work should be substantial enough to overcome the goroutine and channel overhead needed to parallelize it.

The wall time of a software pipeline's execution will be proportional not to the total amount of work to be done divided by number of stages in the pipeline, but to the duration of the longest stage in the pipeline. Suppose a task T, taking 7s, is decomposed into work items A, B, and C, with A taking 1s, B taking 2s, and C taking 4s. Then the shortest overall pipeline duration we can hope for is the duration of C: 4s, significantly longer than the 7s/3=2.33s we might expect. Indeed, in this case, there's no benefit to separating A and B as separate stages; even done together they do not dominate the pipeline.

So, balancing work is the most important part of pipeline tuning, and the report generated by `Measure()` can help with this. This package provides two other tools that can help:

  • Buffering: Producers and stages can accept an `InputBufferSize` option. By default, the buffers between stages can hold one work item; if a stage attempts to write a second, it is blocked until the item already in the buffer is removed. By increasing the buffer size, the individual stages are free to work at their own pace, which keeps the pipeline from having to operate in lockstep and can absorb per-work-item variations. Increasing the input buffer size can help when the overall pipeline execution duration is significantly larger than its longest stage's duration, but overall concurrency is still below GOMAXPROCS.
  • Concurrency: Producers and stages can accept a `Concurrency` option. By default, only one instance of each stage is used. However, if the work done in a given stage or producer can be done concurrently, this package can set up multiple instances of that stage. The number of instances to use is specified with the `Concurrency` option. So, if in the example above it is safe to perform stage C concurrently on two different work items, C can be run with `Concurrency(2)`. This would increase the available concurrency in the pipeline to 4, such that if 4 threads are free, the expected overall pipeline duration would fall to 2s (the maximum of [1s, 2s, 4s/2, 4s/2]).

Examining the report from `Measure()` is the best way to understand how a pipeline is imbalanced and what might improve it.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Do

func Do[T any](producerDef Producer[T], stageDefs ...Stage[T]) error

Do runs the parallel pipeline defined by the specified Producer and Stages. Work items of type T are produced by the Producer, then handled by each Stage in the provided order.

func SequentialDo

func SequentialDo[T any](producerDef Producer[T], stageDefs ...Stage[T]) error

SequentialDo behaves like Do(), but runs sequentially on one thread. Stage buffer lengths and concurrency options are ignored, but RecyclingProducers do recycle the (single) work item.

Types

type Metrics

type Metrics struct {
	WallDuration    time.Duration
	ProducerMetrics []*StageMetrics
	StageMetrics    [][]*StageMetrics
}

Metrics defines a set of performance metrics collected for an entire pipeline.

func Measure

func Measure[T any](producerDef Producer[T], stageDefs ...Stage[T]) (*Metrics, error)

Measure behaves like Do(), running the parallel pipeline defined by the specified Producer and Stages, but also measures the time spent in each stage.

func (*Metrics) String

func (pm *Metrics) String() string

type Producer

type Producer[T any] func(index uint) (*producer[T], error)

Producer defines a function building a pipeline producer.

func NewProducer

func NewProducer[T any](fn ProducerFn[T], optFns ...StageOptionFn) Producer[T]

NewProducer defines an initial stage in a pipeline, in which work items of type T are prepared for processing.

func NewRecyclingProducer

func NewRecyclingProducer[T any](fn RecyclingProducerFn[T], optFns ...StageOptionFn) Producer[T]

NewRecyclingProducer defines an initial stage in a pipeline, in which work items of type T are prepared for processing. The provided RecyclingProducerFn should invoke its `get` method to get a previously-allocated work item, only constructing a new work item if `get` returns false.

type ProducerFn

type ProducerFn[T any] func(put func(T)) error

ProducerFn performs the work of the initial stage in a pipeline, preparing work items of type T for processing. Work items are not reused; if work items are expensive to produce, RecyclingProducerFn should be used instead. ProducerFn implementations should prepare new work items for the pipeline, then place them into the pipeline with `put`. Production into the pipeline is complete when the implementation returns; if it returns a non-nil error, the pipeline is terminated.

type RecyclingProducerFn

type RecyclingProducerFn[T any] func(get func() (T, bool), put func(T)) error

RecyclingProducerFn performs the work of the initial stage in a pipeline, preparing work items of type T for processing. The work items that pass through the pipeline are recycled; RecyclingProducerFn implementations should use `get` to recycle old work items (which may need to be reset), and only instantiate new work items if `get` returns false. Implementations should prepare recycled or new work items for the pipeline, then place them into the pipeline with `put`. Production into the pipeline is complete when the implementation returns; if it returns a non-nil error, the pipeline is terminated.

type Stage

type Stage[T any] func(index uint) (*stage[T], error)

Stage defines a function building a pipeline stage.

func NewStage

func NewStage[T any](fn StageFn[T], optFns ...StageOptionFn) Stage[T]

NewStage defines an intermediate stage in a pipeline, in which work items of type T are operated upon.

type StageFn

type StageFn[T any] func(in T) (out T, err error)

StageFn performs the work of an intermediate stage in a pipeline, accepting a work item of T, performing some work on or with it, and then returning it (or another instance of T). If it returns a non-nil error, the pipeline is terminated.

type StageMetrics

type StageMetrics struct {
	StageName                   string
	StageInstance               uint
	WorkDuration, StageDuration time.Duration
	Items                       uint
}

StageMetrics defines a set of performance metrics collected for a particular pipeline stage.

type StageOptionFn

type StageOptionFn func(so *stageOptions) error

StageOptionFn defines a user-supplied option to a Producer or a Stage.

func Concurrency

func Concurrency(concurrency uint) StageOptionFn

Concurrency specifies the desired concurrency of a Stage or Producer. A Stage's concurrency is the number of worker goroutines performing that Stage.

func InputBufferSize

func InputBufferSize(inputBufferSize uint) StageOptionFn

InputBufferSize defines the size of the input buffer of a Stage. For recycling Producers, this defines the size of the input buffer of the recycling mechanism. For non-recyling Producers, this has no effect. Defaults to 1.

func Name

func Name(name string) StageOptionFn

Name specifies the name of a Stage or a Producer, for debugging. If unspecified, the Stage number (0 for the Producer) will be used.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL