pipe

package module
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 19, 2024 License: MIT Imports: 5 Imported by: 0

README

pipe

A multi pool pipeline for heavy multiprocessing in golang


Introduction

Library pipe implements a multithreaded pipeline which runs in several stacked pools of goroutine, allowing developers to fine tune the number of goroutine used in each part of it workload.

Features

  • Rely on panjf2000/ants for goroutine pools, inheriting of its properties
  • Provide severals helper function to build a pipeline dealing with various types.

How does it works

pipe works with simple processes of three kinds:

  • func(T) T which is a basic pipelines over a specific type
  • func(T) <-chan K which allows to split a pipeline into several sub pipeline
  • func(chan<- K) T which allows to merge several sub pipelines into the parent one.

Each "sub pipeline" is handle by another goroutine pools, to avoid deadlock.

For instance:

  • N Job is processed by several processor, each Job are concurrently processed in Pool 0
  • Then each Job splits in M SubJob (total SubJob: N*M), each SubJob are concurrently processed in Pool 1 (Pool 0 routines are still occupied by Job)
  • Then each SubJob splits in R Groups (total Groups: NMR), each Groups are concurrently processed in Pool 2 (Pool 1 routines are still occupied by SubJob)
  • Then Groups merge in their parent SubJob. Pool 2 routines are available to new groups whenever available. Pool 1 routines may continue processing SubJob with merged results
  • Then SubJob merge in their parent Job. Pool 1 routines are available to new SubJob whenever available. Pool 0 routines may continue processing Job with merged results
  • Then Jobs goes out of scope.

Why is it useful

This design allows to monitor memory through estimation of how much an average Job, SubJob or any sub routine may cost in terms of memory. Since those objects exists only when they are processed (or when some of them children are processed), and lives in their own routine scoped in a Pool, we can size the goroutine Pool accordingly.

Pool sizing can be tuned to be able to mitigate between latency for processing each objects and their memory imprints. If an object requires low CPU but wait a lot (API call), having a large pool may be a good idea... Respectfully the memory imprint of thus objects. If an object requires high CPU and has no wait, size the pool to cpu count may be a good idea.

However, these are general consideration. As for any performance tuning, you should try and tune.

Installation

go get -u github.com/ezian/pipe

How to use

Engine builder

See examples/example_test.go to see how use the pipe building blocks to create an customizable pipeline engine.

License

The source code in pipe is available under the MIT License.

Dependencies

  • panjf2000/ants a really simple and efficient goroutine pool implementation
  • samber/lo nice generics implementation with really useful methods

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrInvalidDispatcher = errors.New("invalid dispatcher")

Functions

func Pipe

func Pipe[IN, OUT any](dp *Pools, in <-chan IN, do func(*Pools, IN) OUT) <-chan OUT

Pipe allows to Pipe a channel in and out in the depth pool. It will execute the task in the current pool and pass the next level pool to the child task.

func Run

func Run[T any](pool *Pools, in <-chan T, proc PoolProcess[T])

Run executes a pool process on a channel and wait until the input channel is closed and the process is terminated.

func RunAll

func RunAll[T any](pool *Pools, in <-chan T, procs []PoolProcess[T])

RunAll is a convenient function to run a list of Poolprocess, in order.

Types

type AnyProcess

type AnyProcess[T any] interface {
	Process[T] | PoolProcess[T]
}

Link merge several Process to one.

type Dispatch

type Dispatch[Parent, Child any] struct {
	// contains filtered or unexported fields
}

Dispatch combines Split and Merge since there are linked. Basically, in a program flows we will do : `parent -(Split)-> [childs...] -(Merge)-> parent`.

func NewDispatch

func NewDispatch[Parent, Child any](split Split[Parent, Child], merge Merge[Parent, Child]) (Dispatch[Parent, Child], error)

NewDispatch creates a Dispatch from Split and Merge functions.

func (Dispatch[Parent, Child]) Validate

func (d Dispatch[Parent, Child]) Validate() error

Validate helps to validate Dispatch construct since it should have both fonction initialized.

type Merge

type Merge[Parent, Child any] func(parent Parent, out <-chan Child) Parent

Merge defines a operation which allow a Parent to be updated from several childs. The updated parent is returned.

type PoolProcess

type PoolProcess[T any] func(*Pools, T) T

PoolProcess defines a basic function which update a variable and return the updated variable, but with the capability to parralelize its execution in a Pool.

func AsPoolProcess

func AsPoolProcess[T any](proc Process[T]) PoolProcess[T]

AsPoolProcess decorates a Process, in order to make it seen as a PoolProcess.

func AsPoolProcesses

func AsPoolProcesses[T any](procs ...Process[T]) []PoolProcess[T]

AsPoolProcesses is an helper function to call AsPoolProcess on lists.

func Link[T any](procs ...PoolProcess[T]) PoolProcess[T]

LinkP merge several PoolProcess to one.

func Wrap

func Wrap[Parent, Child any](procs PoolProcess[Child], dispatch Dispatch[Parent, Child]) PoolProcess[Parent]

Wrap creates a PoolProcess parent from a child pool process and a dispatcher. Child PoolProcess will be called concurrently triggered by dispatcher Split function, then merged into through the dispatcher Merge function.

type Pools

type Pools struct {
	// contains filtered or unexported fields
}

Pools define a slice of in depth pools.

func NewPools

func NewPools(poolSizes ...int) (*Pools, error)

NewPools builds a depth pools with the size in parameters. If there is no size, no pools will be created. Submit will not run in parallel.

func NewPoolsWithOptions

func NewPoolsWithOptions(poolSizes []int, opts ...ants.Option) (*Pools, error)

NewPoolsWithOptions builds a depth pools with the size in parameters. If there is no size, no pools will be created. Submit will not run in parallel.

Moreover, a size of 0 means that the task pushed at this level will run in their parent routine (or alike).

func (*Pools) Release

func (p *Pools) Release()

Release releases all the pools inside the pools.

type Process

type Process[T any] func(T) T

Process defines a basic function which update a variable and return the updated variable.

type Split

type Split[Parent, Child any] func(parent Parent, in chan<- Child)

Split defines a operation which allow a Parent structure to split into several childs.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL