pipeline

package module
v0.1.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 7, 2021 License: MIT Imports: 9 Imported by: 0

README

pipeline

Concurrent, adaptive pipeline processing in Go

Installation

go get -u github.com/JoelWesleyReed/pipeline

Quick Start

func main() {
	logger, _ := zap.NewDevelopment(zap.IncreaseLevel(zap.DebugLevel))
	defer logger.Sync()

	// Create the pipeline
	p := pipeline.NewPipeline(logger)

	// Set up process 1
	p1, err := pipeline.NewProcessConcurrent("process1", 2, pipeline.NewSimpleProcessWorker(
		func(ctx context.Context, id string, item interface{}, emit func(interface{})) error {
			time.Sleep(100 * time.Millisecond)
			emit(item)
			return nil
		}))
	if err != nil {
		panic(err)
	}
	p.Add(p1)

	// Set up process 2
	p2, err := pipeline.NewProcessConcurrent("process2", 4, pipeline.NewSimpleProcessWorker(
		func(ctx context.Context, id string, item interface{}, emit func(interface{})) error {
			time.Sleep(200 * time.Millisecond)
			emit(item)
			return nil
		}))
	if err != nil {
		panic(err)
	}
	p.Add(p2)

	// Create the context
	ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
	defer cancel()

	// Start go func to send data into pipeline and then shutdown
	go func() {
		defer p.Shutdown()
		for i := 1; i <= 50; i++ {
			if err := p.Submit(ctx, i); err != nil {
				panic(err)
			}
		}
	}()

	// Start the pipeline and wait until it has completed
	s, err := pipeline.NewSinkConcurrent("sink1", 4, pipeline.NewSimpleSinkWorker(
		func(ctx context.Context, id string, item interface{}) error {
			fmt.Printf("%s: %v\n", id, item)
			return nil
		}))
	if err != nil {
		panic(err)
	}
	err = p.Run(ctx, s)
	if err != nil {
		fmt.Println(err)
	}

	fmt.Printf("Metrics: %s\n", p.Metrics())
}

Documentation

Index

Constants

This section is empty.

Variables

View Source
var DefaultProcessAdaptiveConfig = &ProcessAdaptiveConfig{
	SamplingInterval: 5 * time.Second,
	SamplingWindow:   70,
	ScaleInterval:    30 * time.Second,
}

Functions

func NewProcessAdaptive

func NewProcessAdaptive(id string, minThreads, maxThreads int, config *ProcessAdaptiveConfig, worker ProcessWorker) (process, error)

func NewProcessConcurrent

func NewProcessConcurrent(id string, threads int, worker ProcessWorker) (process, error)

func NewProcessSingle

func NewProcessSingle(id string, worker ProcessWorker) process

func NewSinkConcurrent

func NewSinkConcurrent(id string, threads int, worker SinkWorker) (sink, error)

func NewSinkSingle

func NewSinkSingle(id string, worker SinkWorker) sink

Types

type Pipeline

type Pipeline struct {
	// contains filtered or unexported fields
}

func NewPipeline

func NewPipeline(logger *zap.Logger) *Pipeline

func (*Pipeline) Add

func (p *Pipeline) Add(proc process) error

func (*Pipeline) Metrics added in v0.0.6

func (p *Pipeline) Metrics() string

func (*Pipeline) Run

func (p *Pipeline) Run(ctx context.Context, s sink) error

func (*Pipeline) Shutdown

func (p *Pipeline) Shutdown() error

func (*Pipeline) Submit

func (p *Pipeline) Submit(ctx context.Context, item interface{}) error

type ProcessAdaptiveConfig

type ProcessAdaptiveConfig struct {
	SamplingInterval time.Duration
	SamplingWindow   int
	ScaleInterval    time.Duration
}

type ProcessWorker

type ProcessWorker interface {
	Process(context.Context, string, interface{}, func(interface{})) error
}

func NewSimpleProcessWorker

func NewSimpleProcessWorker(f ProcessWorkerFunc) ProcessWorker

type ProcessWorkerFunc

type ProcessWorkerFunc func(ctx context.Context, id string, item interface{}, emit func(interface{})) error

type SimpleProcessWorker

type SimpleProcessWorker struct {
	// contains filtered or unexported fields
}

func (*SimpleProcessWorker) Process

func (spw *SimpleProcessWorker) Process(ctx context.Context, id string, item interface{}, emit func(interface{})) error

type SimpleSinkWorker

type SimpleSinkWorker struct {
	// contains filtered or unexported fields
}

func (*SimpleSinkWorker) Sink

func (ssw *SimpleSinkWorker) Sink(ctx context.Context, id string, item interface{}) error

type SinkWorker

type SinkWorker interface {
	Sink(context.Context, string, interface{}) error
}

func NewSimpleSinkWorker

func NewSimpleSinkWorker(f SinkWorkerFunc) SinkWorker

type SinkWorkerFunc

type SinkWorkerFunc func(ctx context.Context, id string, item interface{}) error

Directories

Path Synopsis
examples

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL