simple

package
v0.0.0-...-21ec7a0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 11, 2021 License: MIT Imports: 6 Imported by: 0

Documentation

Overview

Package simple provides a simple channeled orchestrator that can you can dial up or down parrallel operations and can multiplex to have large concurrent pipelines.

The designation of simple is an indication that this orchestrator has no decision branches, splitters to multiple different processors or syncronization points. This orchestrator shines when you need to hit each stage in order and the only branching is when there is an error.

However, don't let the "simple" designation fool you. This can do complex parallel operation pipelines concurrently at low memory and high speed.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Option

type Option func(p *Pipeline)

type Pipeline

type Pipeline struct {
	// contains filtered or unexported fields
}

Pipeline is an orchestration pipeline for doing doing operations based on a request flow.

func New

func New(in chan Request, options ...Option) *Pipeline

New creates a new Pipeline.

func (*Pipeline) AddStage

func (p *Pipeline) AddStage(stages ...Stage) error

AddStage adds a new stage that will do processing in the order they will be executed. The first stage added will receive input from the "in" channel passed to New() and the last will output to the Response channel that is in the request.

func (*Pipeline) Start

func (p *Pipeline) Start() error

Start starts the pipeline.

func (*Pipeline) Wait

func (p *Pipeline) Wait()

Wait will wait for the Pipeline to die after the "in" channel has been closed. This is signfied by all Stages to have exited.

type Processor

type Processor func(ctx context.Context, in interface{}) (data interface{}, err error)

Processor processes data represented by "in". Returned data will either be used as input into the next Stage or if the last Stage, returned to the user. If err != nil, further processing of this request is stopped.

type ReqOption

type ReqOption func(r Request) Request

ReqOption is an option for constructing a new Request via NewRequest().

func SharedResponse

func SharedResponse(ch chan Response, wg *sync.WaitGroup) ReqOption

SharedResponse allows you to pass a channel to multiple request objects and have each output their result to the same channel. Without this, each Request.Response() gives a different channel.

type Request

type Request struct {
	// contains filtered or unexported fields
}

Request is the request object that enters the pipeline.

func NewRequest

func NewRequest(ctx context.Context, cancel context.CancelFunc, data interface{}, options ...ReqOption) (Request, error)

NewRequest makes a new Request object. Data is the data you are sending to the Pipeline and counter is representing a series of requests that you must increment before sending.

func (Request) Recycle

func (r Request) Recycle()

Recycle can be called once you are no longer using the Response object. The Request must not use the channel returned by Response() after this.

func (Request) Response

func (r Request) Response() <-chan Response

Response returns the channel that the Response will be sent on. Depending on how you are using the Pipeline, this may be a single channel for all responses or an individual Request's promise.

type Response

type Response struct {
	// Data is the data that was returned.
	Data interface{}
	// Err is the error, if there were any.
	Err error
}

Repsone is the response from a pipeline.

type Stage

type Stage struct {
	// contains filtered or unexported fields
}

Stage represents a stage in the pipeline. A Stage takes input from an input channel, calls a Processor on the data, and either stops processing of the request because of an error or forwards that data onto an output channel that the Processor provided. A stage can be concurrent and have multiple Processors running.

func NewStage

func NewStage(name string, proc Processor, concurrency int) Stage

NewStage spins off concurrency procs taking data from an input channel and sending it to an output channel.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL