conveyor

package module
v0.0.0-...-f047d33 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 16, 2022 License: Apache-2.0 Imports: 8 Imported by: 0

README

Go Report Card Go Reference GitHub

Conveyor

Conveyer is a data processing pipeline framework written in Go. It allows you to specify the segments in a pipeline without writing any code that involves synchronizing threads. The communication between the segments is entirely built on buffered blocking channels. All the segments run concurrently using go routines.

Within each segment, you can specify an init and dispose job where the init job will always be executed once during startup. The dispose job runs once after the pipeline is terminated allowing you to clean up resources. Each segment also has its private cache for setting states in between jobs. Furthermore, you can specify your circuit breaker or use the default one, with the circuit breaker you can specify a fallback policy for how and when to reprocess a failed job. This Framework queues and segments logs making them easier to read as they get flushed once the data has flown through the whole pipeline, making logs come to stdout. The logger can also be customized or you can inject our own.

Features

  • Fanin and Fanout of segments.
  • Easy scale of each segment
  • Optional init and dispose job for a each segment
  • Circuit breaker with exponential and static fallback policy
  • Smart flushing of logs. Queues logs in sequence and flushes the sequence when executed
  • Local cache for segment's to maintain state
  • Configurable inbound buffer size
  • Error handler
  • Supports custom injectable logger, circuitbreaker and error handler.

Installation

go get -u github.com/defendable/conveyor

Notes

  • Only the first stage can terminate the pipeline which is done by returning conveyor.Stop as shown in the Usage section. If any of the other segments returns the stop symbol the symbol will be received as input to the next segment(s).

  • You can skip further processing of a parcel by returning conveyor.Skip

  • Do not return errors from an injected process, Instead use panic with the error. The circuit breaker recovers all the panic and handles the retries.

Usage

image

func main() {
	maxNum := 100
	conveyor.New(conveyor.NewDefaultOptions()).
		AddSource(&conveyor.Stage{
			Name: "numerate",
			Process: func(parcel *conveyor.Parcel) interface{} {
				if parcel.Sequence > maxNum {
					return conveyor.Stop
				}
				return parcel.Sequence
			},
		}).AddStage(&conveyor.Stage{
		Name: "passthrough",
	},
	).Fanout(
		&conveyor.Stage{
			Name:       "add",
			BufferSize: 5,
			Process: func(parcel *conveyor.Parcel) interface{} {
				switch value := parcel.Content.(type) {
				case int:
					return value + value
				}
				return conveyor.Skip
			},
		},
		&conveyor.Stage{
			Name:       "multiply",
			MaxScale:   4,
			BufferSize: 5,
			Process: func(parcel *conveyor.Parcel) interface{} {
				switch value := parcel.Content.(type) {
				case int:
					return value * value
				}
				return conveyor.Skip
			},
		},
		&conveyor.Stage{
			Name:       "subtract",
			BufferSize: 5,
			Process: func(parcel *conveyor.Parcel) interface{} {
				switch value := parcel.Content.(type) {
				case int:
					return -value
				}
				return conveyor.Skip
			},
		}).Fanin(&conveyor.Stage{
		Name:       "sum",
		BufferSize: 5,
		Process: func(parcel *conveyor.Parcel) interface{} {
			switch value := parcel.Content.(type) {
			case int:
				if val, ok := parcel.Cache.Get("result"); ok {
					value += val.(int)
				}
				parcel.Cache.Set("result", value)

				return value
			}
			return conveyor.Skip
		},
	}).AddSink(&conveyor.Stage{
		Name:       "write",
		BufferSize: 10,
		Process: func(parcel *conveyor.Parcel) interface{} {
			parcel.Cache.Set("result", parcel.Content)
			return nil
		},
		Dispose: func(cache *conveyor.Cache) {
			if result, ok := cache.Get("result"); ok {
				fmt.Println(result)
			}
		},
	}).Build().DispatchWithTimeout(time.Second).Wait()
}

Examples

See examples folder for examples and benchmarks.

Documentation

Index

Constants

View Source
const (
	DefaultScale      = 1
	DefaultBufferSize = 10
	MaxScale          = 10000
	MaxBufferSize     = 10000
)

Variables

This section is empty.

Functions

This section is empty.

Types

type Cache

type Cache struct {
	cmap.ConcurrentMap
}

type CircuitBreaker

type CircuitBreaker struct {
	Enabled         bool
	NumberOfRetries int
	Policy          FallbackPolicy
	Interval        time.Duration
}

func (*CircuitBreaker) Execute

func (breaker *CircuitBreaker) Execute(stage *Stage, parcel *Parcel) interface{}

func (*CircuitBreaker) NewBackoffTimer

func (breaker *CircuitBreaker) NewBackoffTimer(circuit int) *time.Timer

type Error

type Error struct {
	Data  interface{}
	Stack string
}

type ErrorHandler

type ErrorHandler struct {
	Logger ILogger
}

func (*ErrorHandler) Handle

func (handler *ErrorHandler) Handle(stage *Stage, parcel *Parcel, err *Error)

type FallbackPolicy

type FallbackPolicy int
const (
	Exponential FallbackPolicy = iota
	Static
)

type ICircuitBreaker

type ICircuitBreaker interface {
	Execute(stage *Stage, parcel *Parcel) interface{}
}

func NewDefeaultCircuitBreaker

func NewDefeaultCircuitBreaker() ICircuitBreaker

type IErrorHandler

type IErrorHandler interface {
	Handle(stage *Stage, parcel *Parcel, err *Error)
}

func NewDefaultErrorHandler

func NewDefaultErrorHandler(logger ILogger) IErrorHandler

type IFactory

type IFactory interface {
	Dispatch(ctx context.Context) *Runner
	DispatchBackground() *Runner
	DispatchWithTimeout(duration time.Duration) *Runner
}

type ILogger

type ILogger interface {
	Warning(stage *Stage, args ...interface{})
	Error(stage *Stage, args ...interface{})
	Information(stage *Stage, args ...interface{})
	Debug(stage *Stage, args ...interface{})

	EnqueueWarning(stage *Stage, parcel *Parcel, args ...interface{})
	EnqueueError(stage *Stage, parcel *Parcel, args ...interface{})
	EnqueueInformation(stage *Stage, parcel *Parcel, args ...interface{})
	EnqueueDebug(stage *Stage, parcel *Parcel, args ...interface{})
	// contains filtered or unexported methods
}

func NewDefaultLogger

func NewDefaultLogger() ILogger

func NewLogger

func NewLogger(conveyorName string) ILogger

type ISink

type ISink interface {
	Build() IFactory
}

Last segment in

type ISource

type ISource interface {
	AddSource(stage *Stage) IStage
}

First segment in pipeline

func New

func New(opts *Options) ISource

type IStage

type IStage interface {
	AddStage(stage *Stage) IStage
	AddSink(stage *Stage) ISink
	Fanout(stages ...*Stage) IStages
}

Single/Fanin segment

type IStages

type IStages interface {
	AddStages(stages ...*Stage) IStages
	AddSinks(stages ...*Stage) ISink
	Fanin(stage *Stage) IStage
}

Fanout Intermediary segment

type Logger

type Logger struct {
	// contains filtered or unexported fields
}

func (*Logger) Append

func (logger *Logger) Append(parcel *Parcel, fn func())

func (*Logger) Debug

func (logger *Logger) Debug(stage *Stage, args ...interface{})

func (*Logger) EnqueueDebug

func (logger *Logger) EnqueueDebug(stage *Stage, parcel *Parcel, args ...interface{})

func (*Logger) EnqueueError

func (logger *Logger) EnqueueError(stage *Stage, parcel *Parcel, args ...interface{})

func (*Logger) EnqueueInformation

func (logger *Logger) EnqueueInformation(stage *Stage, parcel *Parcel, args ...interface{})

func (*Logger) EnqueueWarning

func (logger *Logger) EnqueueWarning(stage *Stage, parcel *Parcel, args ...interface{})

func (*Logger) Error

func (logger *Logger) Error(stage *Stage, args ...interface{})

func (*Logger) Information

func (logger *Logger) Information(stage *Stage, args ...interface{})

func (*Logger) Warning

func (logger *Logger) Warning(stage *Stage, args ...interface{})

type Options

type Options struct {
	Name string

	CircuitBreaker ICircuitBreaker
	Logger         ILogger
	ErrorHandler   IErrorHandler
}

func NewDefaultOptions

func NewDefaultOptions() *Options

type Parcel

type Parcel struct {
	Content  interface{}
	Cache    *Cache
	Stage    *Stage
	Logger   ILogger
	Sequence int
}

type Process

type Process func(parcel *Parcel) interface{}

type Runner

type Runner struct {
	// contains filtered or unexported fields
}

func JoinRunners

func JoinRunners(runners ...*Runner) *Runner

JoinRunners block and awaits all runners

func StartFactories

func StartFactories(ctx context.Context, factories ...IFactory) []*Runner

StartFactories dispatch alle factories using a single cancellation context.

func (*Runner) Wait

func (runner *Runner) Wait()

type Signal

type Signal int
const (
	Stop Signal = iota
	Skip
	Failure
)

type Stage

type Stage struct {
	Name       string
	MaxScale   uint
	BufferSize uint

	Init    func(cache *Cache)
	Process Process
	Dispose func(cache *Cache)

	CircuitBreaker ICircuitBreaker
	ErrorHandler   IErrorHandler
	// contains filtered or unexported fields
}

type Unpack

type Unpack struct {
	Data []interface{}
}

func UnpackData

func UnpackData[T any](data []T) Unpack

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL