pipeline

package module
v0.0.0-...-61cbcda Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 9, 2016 License: MIT Imports: 3 Imported by: 2

README

Build Status

pipeline

Package pipeline allows a 'job' to be processed through a sequence of stages. The stages can be developed and unit tested independent of each other and composed together into different processing flows.

Overview

A more detailed example is provided in the /examples directory. At it's core, the pipeline hides the complexity of managing the channels necessary to move a job through a series of stages for processing.

p := pipeline.New()

p.AddGenerator(work.Generator)

p.AddStage(fetch.Stage)
p.AddStage(parse.Stage)
p.AddStage(store.Stage)
p.AddStage(log.Stage)

p.Run()

The Next() function on the generator is called by the pipeline to retrieve a new job to pass through the pipeline.

type Generator interface {
	Name() string
	Next() interface{}
	Abort()
}

The Process() function on the Stage is called by the pipeline for each job. A job is never processed by concurrent stages. However, each stage may be instantiated in a number of goroutines defined by Concurrency().

type Stage interface {
	Name() string
	Concurrency() int
	Process(interface{})
}

Installation

	go get github.com/jboelter/pipeline

Documentation

http://godoc.org/github.com/jboelter/pipeline

Documentation

Overview

Package pipeline allows a 'job' to be processed through a sequence of stages. The stages can be developed and unit tested independent of each other and composed together into different processing flows.

Basics

A typical pipeline may look like the following. A user defined generator creates a job that is moved through the pipeline. A job is a user defined type with no restrictions.

p := pipeline.New()

p.AddGenerator(work.Generator)

p.AddStage(fetch.Stage)
p.AddStage(parse.Stage)
p.AddStage(store.Stage)
p.AddStage(log.Stage)

p.Run()

The pipeline retrieves a job by calling the Next() function on the Generator. The job is then moved sequentially through the pipeline to the terminal stage. The Run() function blocks until there is no more work to do, as signaled by the Generator returning a nil value from Next(). The Generator and each Stage may reside in their own packages and must implement the pipeline.Generator and pipeline.Stage interfaces respectively.

How it works

The pipeline creates and manages the channels used internally for moving work between stages. The pipeline does not monitor the stages or jobs for error states, panics or other failures. By design, the pipeline does not expose the channels to the developer.

A job (a user defined structure) is retrieved from the Generator Next() call (as an interface{}) which is then passed to each stage via the Process() call.

Types

The Generator interface requires a Next() function that is called to retreive the next job. Returning a nil value will shutdown the pipeline, flushing all jobs in the process.

type Generator interface {
	Name() string
	Next() interface{}
	Abort()
}

The Stage interface specifies the concurrency of the stage the determines how many goroutines are launched for this stage. The Process() function is called on each job as the job moves through the pipeline. A given job (represented by the interface{}) will never be operated on concurrently by multiple stages in the pipeline.

type Stage interface {
	Name() string
	Concurrency() int
	Process(interface{})
}

Generator

A typical generator design uses an internal channel to pass work to the Next() function. User must implement the function to fill the Generator.jobs. See examples for more details and patterns including generators that can run forever or be aborted. The Generator may further have an Initialize(), or New() function that can be called to setup the generator.

package mygenerator

type MyGenerator {
	jobs    chan *job.Job
	// other local state goes here
}

// the generator instance; passed to the pipeline
// p.AddGenerator(mygenerator.Generator)
var Generator = &MyGenerator{}

// Name() required by the Generator interface
func (g *MyGenerator) Name() string {
	return "MyGenerator"
}

// Next() required by the Generator interface
// Called by the pipeline to get the next job to process in the pipeline.
// Only called sequentially.
func (g *LocalGenerator) Next() interface{} {
	j, ok := <-g.jobs

	if ok {
		// return the job if there was one
		return j
	} else {
		// no more jobs
		return nil
	}
}

Stage

A typical stage is implemented as its own package to allow for ease of testing and encourage isolation of stages.

package mystage

import (
	"myjobtype/job"
	"log"
)

type MyStage struct {
}

var Stage = &MyStage{}

func (s *MyStage) Name() string {
	return "MyStage"
}

func (s *MyStage) Concurrency() int {
	return 1
}

func (s *MyStage) Process(i interface{}) {

	j := i.(*job.Job)

	// do something in the stage
	// any errors would be conveyed in the job object to the next stage

	// j.Err = errors.New(...)
}

Index

Constants

This section is empty.

Variables

View Source
var ErrNilGenerator = errors.New("pipeline: the generator cannot be nil")

ErrNilGenerator is returned when the pipeline has a nil generator. Set a generator by calling SetGenerator.

View Source
var ErrNoStages = errors.New("pipeline: there are no stages defined")

ErrNoStages is returned when the pipeline has no stages. Call AddStage to add one more more stages to the pipeline.

Functions

This section is empty.

Types

type Config

type Config struct {
	Logger        *log.Logger
	Depth         int
	Buffered      bool
	NoConcurrency bool
	Verbose       bool
	// contains filtered or unexported fields
}

Config defines the configuration for a Pipeline

func DefaultConfig

func DefaultConfig() Config

DefaultConfig provides a default configuration with buffering

type Generator

type Generator interface {
	Name() string
	Next() interface{}
	Abort()
}

Generator defines an interface that creates 'jobs' to be processed by the pipeline

type Pipeline

type Pipeline struct {
	// contains filtered or unexported fields
}

Pipeline defines the container for the generator and stages

func New

func New() *Pipeline

New creates a new Pipeline with the default configuration

func NewWithConfig

func NewWithConfig(cfg Config) *Pipeline

NewWithConfig creates a new Pipeline with the provided configuration

func (*Pipeline) Abort

func (p *Pipeline) Abort() error

Abort gracefully terminates a Pipeline by calling Abort on the generator

func (*Pipeline) AddStage

func (p *Pipeline) AddStage(stages ...Stage)

AddStage adds 1 or more stages to the pipeline. Jobs are passed through the stages in the order they are added.

func (*Pipeline) Run

func (p *Pipeline) Run() error

Run will pull work from the generator and pass it through the pipeline. This call will block until the pipeline has completed.

func (*Pipeline) SetGenerator

func (p *Pipeline) SetGenerator(generator Generator)

SetGenerator sets the generator for the Pipeline

type Stage

type Stage interface {
	Name() string
	Concurrency() int
	Process(interface{})
}

Stage defines a stage in the pipeline

Directories

Path Synopsis
The MIT License (MIT) Copyright (c) 2014-2016 Joshua Boelter Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
The MIT License (MIT) Copyright (c) 2014-2016 Joshua Boelter Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
job

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL