stagedpipe

package
v0.0.0-...-a5b82e5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 3, 2024 License: MIT Imports: 16 Imported by: 0

README

StagedPipe - The Concurrent and Parallel Pipeline Framework

GoDoc Go Report Card

Introduction

Note: Any reference to github.com/johnsiilver/pipelines should be substituted for this package. That is the original place this package was developed.

Pipelining in Go can be hard to debug and can end up a little messy. Stagepipe combines semantics of Go state machines and Go pipelining to give you an efficient and easy to use pipeline framework.

This package supports:

  • A concurrent pipeline that can be run in parallel
  • Multiple users can use a single set of pipelines, not a pipeline setup per user
  • No need to run your own goroutines, simply scale up parallelism
  • Generic, so it avoids runtime type checks of data objects
  • Data can be on the stack or the heap
  • Retrieve Stats on how the pipeline is running
  • Route requests to different stages depending on data
  • Allow routing to route back to a stage or setup the pipeline as a Directed Acyllic Graph (DAG) to avoid loops via an option
  • Defaults to out of order processing, but can do in-order processing via an option
  • Cancelation of a set of requests on an error or ignore errors

Here is a brief introduction to standard Go pipelining, standard Go state machines and a hello world for the stagedpipe framework:

Introduction Video

Chapters Links:

Just jump in

This is for those of you that simply want to hit the ground running and aren't interested in a video introduction.

To help get you started, we have a tools/ directory containing the stagedpipe-cli. This allows you to generate all the structure that is required to implement a pipeline.

Installation can be done with: go install github.com/gostdlib/concurrency/pipelines/stagedpipe/tools/stagedpipe-cli@latest from that directory.

Simply enter into your new package's directory, and type: stagedpipe-cli -m -p "[package root]/sm" to get:

├──myPipeline
        ├── main.go
        └──sm
            ├── data.go
            └── sm.go

Run go mod init <path>, go mod tidy and go fmt ./..., to get a running program:

├──myPipeline
        ├── go.mod
        ├── go.sum
        ├── main.go
        └──sm
            ├── data.go
            └── sm.go

Type go run . to run the basic pipeline that you can change to fit your needs.

See the comments in the file to see how to modify and extend the example pipeline for your needs.

Basics and Best Practices

Here are the basics, if you've built off of the stagedpipe-cli skeleton code:

  • Each method in your state machine intended as a Stage must implement the stagedpipe.Stage type
  • Stage methods must be public, otherwise you will not get the expected concurrecy
  • You must always drain a RequestGroup, even if you cancel it. This is the one way to get your pipeline stuck
  • Be careful not to route in an infinite loop

Here are some best practices:

  • Use bulk objects in your pipeline. These are much more efficient and easier to manage
  • Do not use goroutines inside your stages. Instead, dial up the parallelism
  • For pipelines with lots of stages, parallelism could start at 1. Otherwise, runtime.NumCPU() is a good starting point

Something to watch out for:

  • If your data object uses the stack, remember the stack use is not visible in pprof memory traces * This can be a big gotcha if you get OOM messages and are expecting to see it in the graphs
  • Dialing up parallelism can make things slower if you are bound by disk (like talking to the filesystem or a database) or network IO limited * This works best when doing everything is in memory or the data store can horizontally scale to keep up with demand

Building an ETL Pipeline from Scratch

Ardan Labs has a great tutorial on building a basic ETL pipeline in Go. With their permission, I have re-written their example using the stagedpipe framework.

This lesson takes about 30 minutes and I build the pipeline from scratch without using the stagedpipe-cli tool, so it takes longer that normal. I use a local postgres server to store data in, so if you want to follow this you will need one too.

Using the stagedpipe-cli removes a lot of the boilerplate here, but this is a good opportunity to explain what each of the boilerplate types and functions do.

You can download the dataset uses in this example here

Code for our modified version of Ardan Labs code and our version can be found here

ETL Video

The Common Pipeline Pattern

Golang standard Pipelines work using the concurrency model layed out in Rob Pike's talk on Concurrency in not Parallelism.

In this version, each pipeline has stages, each stage runs in parallel, and channels pass data from one stage to the next. In a single pipeline with X stages, you can have X stages running. This is called a concurrent pipeline for the purposes of this doc.

You can run multiple pipelines in parallel. So if you run Y pipelines of X stages, you can have X * Y stages running at the same time.

        -> stage0 -> stage1 -> stage2 -> stage3 ->
        -> stage0 -> stage1 -> stage2 -> stage3 ->
in ->->                                            ->-> out
        -> stage0 -> stage1 -> stage2 -> stage3 ->
        -> stage0 -> stage1 -> stage2 -> stage3 ->

This looks similar to a standard fan in/out model, with the exception that each stage here is running concurrently with the other stages. In the above scenario, 16 workers are running at various stages and we have 4 parallel jobs.

Stages pass data through the pipeline through a series of channels. Pike also offers another concurrent model where you pass functions to works via a channel, which is great for worker dispatch.

Note: Nothing here is a criticism of Pike or his talks/methods/etc. In a more complex pipeline, I'm sure he would alter this method to control the complexity. If anything, over the years I have learned so much by taking ideas from him and hammering them into successful packages. This package uses two of his ideas together to make something I find easy to use and fast.

The First Problem

In simple pipelines with few stages, writing a pipeline is pretty easy. In complex pipelines, you end up with a bunch of channels and go routines. When you add to the pipeline after you haven't looked at the code in a few months, you forget to call make(chan X) in your constructor, which causes a deadlock. You fix that, but you forgot to call go p.stage(), which caused another deadlock. This tends to make the code brittle.

There are certainly other methods to deal with this, but they usually lack the beauty of just looking through the stages in a single file that makes it really easy to read.

The Second Problem

The standard type of pipelining also works great in two scenarios:

  • Everything that goes through the Pipeline is related
  • Each request in the Pipeline is a promise that responds to a single request

In the first scenario, no matter how many things enter the pipeline, you know know they are all related to a single call. When input ends, you shut down the input channel and the output channel shuts down when a sync.WaitGroup says all pipelines are done.

In the second scenario, you can keep open the pipeline for the life of the program as requests come in. Each request is a promise, which once it comes out the exit channel is sent on the included promise channel. This is costly because you have to create a channel for every request, but it also keeps open the pipelines for future use.

But what if you want to keep your pipelines running and have multiple ingress streams that each need to be routed to their individual output streams? The pipeline models above break down, either requiring each stream to have its own pipelines (which wastes resources), bulk requests that eat a lot of memory, or other unsavory methods.

This Solution

I hate to say "the solution", because there are many ways you can solve this. But I was looking to create a framework that was elegant in how it handled this.

What I've done here is combine a state machine with a pipeline. For each stage in the state machine, we spin off 1 goroutine that is running the state machine. We receive input on a single channel and send output on a single channel. The input gets sent to any state machine that is available to process and each state machine sends to the out channel.

Four Pipelines processing
        -> Pipeline ->
        -> Pipeline ->
in ->->                ->-> out
        -> Pipeline ->
        -> Pipeline ->

Each Pipeline looks like:
        -> stages ->
        -> stages ->
in ->->               ->-> out
        -> stages ->
        -> stages ->

You can than concurrently run multiple pipelines. This differs from the standard model in that a full pipeline might not have all stages running, but it will have the same number of stages running. Mathmatically, we still end up in a X * Y number of concurrent actions.

Stages are constructed inside a type that implements our StateMachine interface. Any method on that object that is Public and implements Stage becomes a valid stage to be run. You pass the StateMachine to our New() constructor with the number of parallel pipelines (all running concurrently) that you wish to run. A good starting number is either 1 or runtime.NumCPU(). The more stages you have or the more blocing on IO you have, the more 1 is a great starting point.

Accessing the pipeline happens by creating a RequestGroup. You can simply stream values in and out of the pipeline separate from other RequestGroups using the the Submit() method and Out channel.

Documentation

Overview

Package stagedpipe offers a generic, concurrent and parallel pipeline based on a statemachine to process work. For N number of Stages in the StateMachine, N Stages can concurrently be processed. You can run pipelines in parallel. So X Pipelines with N Stages will have X*N Stages processing.

This library requires working knowledge of both the specific type of Go statemachine implementation and basic Go pipelining.

Full introduction including a hello world example can be found here: https://vimeo.com/879175351?share=copy

Please view the README.md for more detailed information on how to get started.

Every pipeline will receive a Request, which contains the data to be manipulated. Each Request is designed to be stack allocated, meaning the data should not be a pointer unless absolutely necessary.

You define a StateMachine object that satisfies the StateMachine interface. These states represent the stages of the pipeline. All StateMachine methods that implement a Stage MUST BE PUBLIC.

A RequestGroup represents a set of related Request(s) that should be processed together. A new RequestGroup can be created with Pipelines.NewRequestGroup().

Requests enter the Pipelines via the RequestGroup.Submit() method. Requests are received with RequestGroup.Out(), which returns a channel of Request(s).

Multiple RequestGroup(s) can send into the Pipelines for processing, as everything is muxed into the Pipelines and demuxed out to the RequestGroup.Out() channel.

There is a provided CLI application called `stagedpipe-cli“ located in the `tools/` directory that can be used to generate all the boilerplate you see below for a working example. You can install it like this:

``` go install github.com/gostdlib/concurrency/pipelines/stagedpipe/tools/stagedpipe-cli@latest ``` Simply enter into your new package's directory, and type: `stagedpipe-cli -m -p "[package root]/sm"` to get:

``` ├──myPipeline

├── main.go
└──sm
    ├── data.go
    └── sm.go

``` Run `go mod init <path>`, `go mod tidy` and `go fmt ./...`, to get a running program: ``` ├──myPipeline

├── go.mod
├── go.sum
├── main.go
└──sm
    ├── data.go
    └── sm.go

```

Type `go run .` to run the basic pipeline that you can change to fit your needs.

Here is an example that runs inside the playground: https://go.dev/play/p/zaiNU_kbp6_3

Here is an ETL pipeline example: https://github.com/johnsiilver/concurrency/pipelines/tree/main/stagedpipe/examples/etl/bostonFoodViolations/pipelined

A video introduciton to the ETL pipeline: https://player.vimeo.com/video/879203973?h=24035c0a82

Note: This package supports OTEL spans and will record information into OTEL spans if provided.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func IsErrCyclic

func IsErrCyclic(err error) bool

IsErrCyclic returns true if the error is a cyclic error. A cyclic error is when a stage is called more than once in a single Request. This is only returned if the DAG() option is set.

Types

type Error

type Error struct {
	// Type is the type of error.
	Type string
	// Msg is the message of the error.
	Msg string
}

Error represents a typed error that this package can return. Not all errors are of this type.

func (Error) Error

func (e Error) Error() string

Error returns the Error type and message.

type IngestStats

type IngestStats struct {
	// Min is the minimum running time for a Request.
	Min time.Duration
	// Avg is the avg running time for a Request.
	Avg time.Duration
	// Max is the maximim running time for a Request.
	Max time.Duration
}

IngestStats detail how long a request waits for a Pipeline to be ready.

type Option

type Option[T any] func(p *Pipelines[T]) error

Option is an option for the New() constructor.

func CountSubStages

func CountSubStages[T any](subStageObj ...any) Option[T]

CountSubStages is used when the StateMachine object does not hold all the Stage(s). This allows you to design multiple pipleines that use the same data object but will be executed as a single pipeline. CountSubStages is used to correctly calculate the concurrency. Without this, only stages in the StateMachine object will be counted toward the concurrency count.

func DAG

func DAG[T any]() Option[T]

DAG makes the StateMachine a Directed Acyllic Graph. This means that no Stage can be called more than once in a single Request. If a Stage is called more than once, the request will exit with a cyclic error that can be detected with IsErrCyclic().

func DelayWarning

func DelayWarning[T any](d time.Duration) Option[T]

DelayWarning will send a log message whenever pushing entries to the out channel takes longer than the supplied time.Duration. Not setting this results will result in no warnings. Useful when chaining Pipelines and figuring out where something is stuck.

func Ordered

func Ordered[T any]() Option[T]

Ordered makes the Pipelines output requests in the order they are received by a request group. This can slow down output as it stores finished requests until older ones finish processing and are output.

func PreProcessors

func PreProcessors[T any](p ...PreProcesor[T]) Option[T]

PreProcessors provides a set of functions that are called in order at each stage in the StateMachine. This is used to do work that is common to each stage instead of having to call the same code. Similar to http.HandleFunc wrapping techniques.

type Pipelines

type Pipelines[T any] struct {
	// contains filtered or unexported fields
}

Pipelines provides access to a set of Pipelines that processes DBD information.

func New

func New[T any](name string, num int, sm StateMachine[T], options ...Option[T]) (*Pipelines[T], error)

New creates a new Pipelines object with "num" pipelines running in parallel. Each underlying pipeline runs concurrently for each stage. The first StateMachine.Start() in the list is the starting place for executions

func (*Pipelines[T]) Close

func (p *Pipelines[T]) Close()

Close closes the ingestion of the Pipeline. No further Submit calls should be made. If called more than once Close will panic.

func (*Pipelines[T]) NewRequestGroup

func (p *Pipelines[T]) NewRequestGroup() *RequestGroup[T]

NewRequestGroup returns a RequestGroup that can be used to process requests in this set of Pipelines.

func (*Pipelines[T]) Stats

func (p *Pipelines[T]) Stats() Stats

Stats returns stats about all the running Pipelines.

type PreProcesor

type PreProcesor[T any] func(req Request[T]) Request[T]

PreProcessor is called before each Stage. If req.Err is set execution of the Request in the StateMachine stops.

type Request

type Request[T any] struct {

	// Ctx is a Context scoped for this requestor set of requests.
	Ctx context.Context

	// Data is data that is processed in this Request.
	Data T

	// Err, if set, is an error for the Request. This type of error is for unrecoverable
	// errors in processing, not errors for the data being processed. For example, if it
	// can't communicate with a database or RPC service. For errors with the data itself,
	// add the error to the underlying data type as a separate error.
	Err error

	// Next is the next stage to be executed. Must be set at each stage of a StateMachine.
	// If set to nil, exits the pipeline.
	Next Stage[T]
	// contains filtered or unexported fields
}

Requests is a Request to be processed by a pipeline.

func (Request[T]) Event

func (r Request[T]) Event(name string, keyValues ...any) error

Event records an OTEL event into the Request span with name and keyvalues. This allows for stages in your statemachine to record events inside each stage. keyvalues must be an even number with every even value a string representing the key, with the following value representing the value associated with that key. The following values are supported:

- bool/[]bool - float64/[]float64 - int/[]int - int64/[]int64 - string/[]string - time.Duration/[]time.Duration

Note: This is a no-op if the Request is not recording.

type RequestGroup

type RequestGroup[T any] struct {
	// Name is the name of the RequestGroup. This is used in OTEL tracing only and is not required.
	Name string
	// contains filtered or unexported fields
}

RequestGroup provides in and out channels to send a group of related data into the Pipelines and receive the processed data. This allows multiple callers to multiplex onto the same Pipelines. A RequestGroup is created with Pipelines.NewRequestGroup().

func (*RequestGroup[T]) Close

func (r *RequestGroup[T]) Close()

Close signals that the input is done and will wait for all Request objects to finish proceessing, then close the output channel. The owner of the RequestGroup is still required to pull all entries out of the RequestGroup via .Out() and until that occurs, Close() will not return.

func (*RequestGroup[T]) Out

func (r *RequestGroup[T]) Out() chan Request[T]

Out returns a channel to receive Request(s) that have been processed. It is unsafe to close the output channel. Instead, use .Close() when all input has been sent and the output channel will close once all data has been processed. You MUST get all data from Out() until it closes, even if you run into an error. Otherwise the pipelines become stuck.

func (*RequestGroup[T]) Submit

func (r *RequestGroup[T]) Submit(req Request[T]) error

Submit submits a new Request into the Pipelines. A Request with a nil Context will cause a panic.

type Stage

type Stage[T any] func(req Request[T]) Request[T]

Stage represents a function that executes at a given state.

type StateMachine

type StateMachine[T any] interface {
	// Start is the starting Stage of the StateMachine.
	Start(req Request[T]) Request[T]
	// Close stops the StateMachine.
	Close()
}

StateMachine represents a state machine where the methods that implement Stage are the States and execution starts with the Start() method.

type Stats

type Stats struct {
	// Running is the number of currently running Request(s).
	Running int64
	// Completed is the number of completed Request(s).
	Completed int64
	// Min is the minimum running time for a Request.
	Min time.Duration
	// Avg is the avg running time for a Request.
	Avg time.Duration
	// Max is the maximim running time for a Request.
	Max time.Duration

	// IngestStats contains information on Pipeline ingestion.
	IngestStats IngestStats
}

Stats are the stats for the Pipeline.

Directories

Path Synopsis
examples
etl/bostonFoodViolations/pipelined/etl
Package etl contains the ETL statemachine for translating the boston food violations file into a postgres database.
Package etl contains the ETL statemachine for translating the boston food violations file into a postgres database.
internal
queue
Package queue provides a simple queue primitive that blocks on Pop() until an entry is available and can block on Push() when a size limit is applied.
Package queue provides a simple queue primitive that blocks on Pop() until an entry is available and can block on Push() when a size limit is applied.
testing
client
Package client provides a fake client to a fictional "identity" service to use in testing.
Package client provides a fake client to a fictional "identity" service to use in testing.
tools

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL