parapipe

package module
v0.4.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 7, 2021 License: MIT Imports: 1 Imported by: 1

README

Parapipe - paralleling pipeline

Mentioned in Awesome Go tests linters coverage Go Report Card GoDoc

The library provides a zero-dependency non-blocking buffered FIFO-pipeline for structuring the code and vertically scaling your app. Unlike regular pipeline examples you may find on the internet - parapipe executes everything on each step concurrently, yet maintaining the output order. Although, this library does not use any locks or mutexes. Just pure channels.

When to use

  • processed data can be divided in chunks (messages), and the flow may consist of one or more stages
  • data should be processed concurrently (scaled vertically)
  • the order of processing messages must be maintained

Installation

go get github.com/nazar256/parapipe

Usage

  1. Create a pipeline
cfg := parapipe.Config{
    ProcessErrors: false,	// messages implementing "error" interface will not be passed to subsequent workers
}
pipeline := parapipe.NewPipeline(cfg)
  1. Add pipes - call Pipe() method one or more times
concurrency := 5    // how many messages to process concurrently for each pipe
pipeline.Pipe(concurrency, func(msg interface{}) interface{} {
    typedMsg := msg.(YourInputType)     // assert your type for the message
    // do something and generate a new value "someValue"
    return someValue
})
  1. Get "out" channel when all pipes are added and read results from it
for result := range pipeline.Out() {
    typedResut := result.(YourResultType)
    // do something with the result
}

It's important to read everything from "out" even when the pipeline won't produce any viable result. It will be stuck otherwise.

  1. Push values for processing into the pipeline:
pipeline.Push("something")
  1. Close pipeline to clean up its resources and close its output channel after the last message. All internal channels, goroutines, including Out() channel will be closed in a cascade. It's not recommended closing pipeline using defer because you may not want to hang output util defer is executed.
pipeline.Close()
Error handling

To handle errors just return them as a result then listen to them on Out. By default, errors will not be processed by subsequent stages.

pipeline.Pipe(4, func(msg interface{}) interface{} {
    inputValue := msg.(YourInputType)     // assert your type for the message
    someValue, err := someOperation(inputValue)
    if err != nil {
        return err      // error can also be a result and can be returned from a pipeline stage (pipe)
    }
    return someValue
})
// ...
for result := range pipeline.Out() {
    err := result.(error)
    if err != nil {
        // handle the error
        // you may want to stop sending new values to the pipeline in your own way and do close(pipeline.In())
    }   
    typedResut := result.(YourResultType)
    // do something with the result
}

Optionally you may allow passing errors to subsequent pipes. For example, if you do not wish to stop the pipeline on errors, but rather process them in subsequent pipes.

cfg := parapipe.Config{
    ProcessErrors: true,	// messages implementing "error" interface will be passed to subsequent workers as any message
}
concurrency := 5    // how many messages to process concurrently for each pipe

pipeline := parapipe.NewPipeline(cfg).
    Pipe(concurrency, func(msg interface{}) interface{} {
        inputValue := msg.(YourInputType)     // assert your type for the message
        someValue, err := someOperation(inputValue)
        if err != nil {
            return err      // error can also be a result and can be returned from a pipeline stage (pipe)
        }
        return someValue
    }).
    Pipe(concurrency, func(msg interface{}) interface{} {
        switch inputValue := msg.(type) {
            case error:
                // process error 
            case YourNormalExpectedType:
                // process message normally
        }
    })
Limitations
  • Out() method can be used only once on each pipeline. Any subsequent Pipe() call will cause panic. Though, when you need to stream values somewhere from the middle of the pipeline - just send them to your own channel.
  • do not try to Push to the pipeline before the first Pipe is defined - it will panic
  • as at the time of writing Go does not have generics, you have to assert the type for incoming messages in pipes explicitly, which means the type of the message can be checked in runtime only.
Performance

As already was mentioned, parapipe makes use of interface{} and also executes callbacks in a separate goroutine per each message. This can have a great performance impact because of heap allocation and creation of goroutines. For instance if you try to stream a slice of integers, each of them will be converted to an interface type and will likely be allocated in heap. Moreover, if an execution time of each step is relatively small, than a goroutine creation may decrease overall performance considerably.

If the performance is the priority, its recommended that you pack such messages in batches (i.e. slices) and stream that batches instead. Obviously that's your responsibility to process batch in the order you like inside step (pipe) callback.

Basically the overall recommendations for choosing batch size are in general the same as if you have to create a slice of interfaces or create a new goroutine.

Examples

AMQP middleware

Parapipe can be handful when you need to process messages in the middle concurrently, yet maintaining their order.

See the working example of using parapipe in AMQP client.

Other examples

With parapipe you can:

  • respond a JSON-feed as stream, retrieve, enrich and marshal each object concurrently, in maintained order and return them to the client
  • fetch and merge entries from different sources as one stream
  • structure your HTTP-controllers
  • processing heavy files in effective way

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Config added in v0.2.0

type Config struct {
	ProcessErrors bool // if false, messages implementing "error" interface will not be passed to subsequent workers
}

Config contains pipeline parameters which influence execution or behavior

type Job

type Job func(msg interface{}) interface{}

Job is a short callback signature, used in pipes

type Pipeline

type Pipeline struct {
	// contains filtered or unexported fields
}

Pipeline executes jobs concurrently maintaining message order

func NewPipeline

func NewPipeline(cfg Config) *Pipeline

NewPipeline creates new pipeline instance, "Concurrency" sets how many jobs can be executed concurrently in each pipe

func (*Pipeline) Close added in v0.3.1

func (p *Pipeline) Close()

Close closes pipeline input channel, from that moment pipeline processes what is left and releases the resources it must not be used after Close is called

func (*Pipeline) Out

func (p *Pipeline) Out() <-chan interface{}

Out returns exit of the pipeline - channel with results of the last pipe. Call it once - it is not idempotent!

func (*Pipeline) Pipe

func (p *Pipeline) Pipe(concurrency int, job Job) *Pipeline

Pipe adds new pipe to pipeline with the callback for processing each message Concurrency indicates how many messages to process concurrently for this pipe

func (*Pipeline) Push added in v0.3.1

func (p *Pipeline) Push(v interface{})

Push adds a value to the pipeline for processing, it is immediately queued to be processed

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL