pipeline

package
v0.0.0-...-dd9be88 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 21, 2021 License: MIT Imports: 4 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Payload

type Payload interface {
	// Clone returns a new Payload that is a deep-copy of the original.
	Clone() Payload

	// MarkAsProcessed is invoked by the pipeline when the Payload either
	// reaches the pipeline sink or it gets discarded by one of the
	// pipeline stages.
	MarkAsProcessed()
}

Payload is implemented by values that can be sent through a pipeline.

type Pipeline

type Pipeline struct {
	// contains filtered or unexported fields
}

Pipeline implements a modular, multi-stage pipeline. Each pipeline is constructed out of an input source, an output sink and zero or more processing stages.

func New

func New(stages ...StageRunner) *Pipeline

New returns a new pipeline instance where input payloads will traverse each one of the specified stages.

func (*Pipeline) Process

func (p *Pipeline) Process(ctx context.Context, source Source, sink Sink) error

Process reads the contents of the specified source, sends them through the various stages of the pipeline and directs the results to the specified sink and returns back any errors that may have occurred.

Calls to Process block until:

  • all data from the source has been processed OR
  • an error occurs OR
  • the supplied context expires

It is safe to call Process concurrently with different sources and sinks.

type Processor

type Processor interface {
	// Process operates on the input payload and returns back a new payload
	// to be forwarded to the next pipeline stage. Processors may also opt
	// to prevent the payload from reaching the rest of the pipeline by
	// returning a nil payload value instead.
	Process(context.Context, Payload) (Payload, error)
}

Processor is implemented by types that can process Payloads as part of a pipeline stage.

type ProcessorFunc

type ProcessorFunc func(context.Context, Payload) (Payload, error)

ProcessorFunc is an adapter to allow the use of plain functions as Processor instances. If f is a function with the appropriate signature, ProcessorFunc(f) is a Processor that calls f.

func (ProcessorFunc) Process

func (f ProcessorFunc) Process(ctx context.Context, p Payload) (Payload, error)

Process calls f(ctx, p).

type Sink

type Sink interface {
	// Consume processes a Payload instance that has been emitted out of
	// a Pipeline instance.
	Consume(context.Context, Payload) error
}

Sink is implemented by types that can operate as the tail of a pipeline.

type Source

type Source interface {
	// Next fetches the next payload from the source. If no more items are
	// available or an error occurs, calls to Next return false.
	Next(context.Context) bool

	// Payload returns the next payload to be processed.
	Payload() Payload

	// Error return the last error observed by the source.
	Error() error
}

Source is implemented by types that generate Payload instances which can be used as inputs to a Pipeline instance.

type StageParams

type StageParams interface {
	// StageIndex returns the position of this stage in the pipeline.
	StageIndex() int

	// Input returns a channel for reading the input payloads for a stage.
	Input() <-chan Payload

	// Output returns a channel for writing the output payloads for a stage.
	Output() chan<- Payload

	// Error returns a channel for writing errors that were encountered by
	// a stage while processing payloads.
	Error() chan<- error
}

StageParams encapsulates the information required for executing a pipeline stage. The pipeline passes a StageParams instance to the Run() method of each stage.

type StageRunner

type StageRunner interface {
	// Run implements the processing logic for this stage by reading
	// incoming Payloads from an input channel, processing them and
	// outputting the results to an output channel.
	//
	// Calls to Run are expected to block until:
	// - the stage input channel is closed OR
	// - the provided context expires OR
	// - an error occurs while processing payloads.
	Run(context.Context, StageParams)
}

StageRunner is implemented by types that can be strung together to form a multi-stage pipeline.

func Broadcast

func Broadcast(procs ...Processor) StageRunner

Broadcast returns a StageRunner that passes a copy of each incoming payload to all specified processors and emits their outputs to the next stage.

func DynamicWorkerPool

func DynamicWorkerPool(proc Processor, maxWorkers int) StageRunner

DynamicWorkerPool returns a StageRunner that maintains a dynamic worker pool that can scale up to maxWorkers for processing incoming inputs in parallel and emitting their outputs to the next stage.

func FIFO

func FIFO(proc Processor) StageRunner

FIFO returns a StageRunner that processes incoming payloads in a first-in first-out fashion. Each input is passed to the specified processor and its output is emitted to the next stage.

func FixedWorkerPool

func FixedWorkerPool(proc Processor, numWorkers int) StageRunner

FixedWorkerPool returns a StageRunner that spins up a pool containing numWorkers to process incoming payloads in parallel and emit their outputs to the next stage.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL