pipeline

package
v0.0.0-...-88587b8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 8, 2023 License: GPL-3.0 Imports: 9 Imported by: 0

Documentation

Overview

Package pipeline provides functions to build synchronisation pipeline.

Index

Constants

This section is empty.

Variables

View Source
var Log = logrus.New()

Log implement Logrus logger for debug logging.

Functions

func IsContextCancelErr

func IsContextCancelErr(err error) bool

IsContextCancelErr check that input error is caused by a context cancellation.

Types

type Group

type Group struct {
	Source    storage.Storage
	Target    storage.Storage
	StartTime time.Time
	// contains filtered or unexported fields
}

Group store a Source and Target storage's and pipeline configuration.

func NewGroup

func NewGroup() Group

NewGroup return a new prepared Group. You should always create new Group{} with this constructor.

func (*Group) AddPipeStep

func (group *Group) AddPipeStep(step Step)

AddPipeStep add pipeline step to group. Steps will executed sequentially, in order of addition.

func (*Group) ErrChan

func (group *Group) ErrChan() <-chan error

ErrChan return a Group error chan. All pipeline errors will be sent errors to this channel.

"nil" message mean that all pipeline functions completed and pipeline was terminated. To prevent leakage of resources in the event of a context cancellation, you should read all messages from this channel. ErrChan will be closed after receiving a "nil" message.

func (*Group) GetStepInfo

func (group *Group) GetStepInfo(stepNum int) StepInfo

GetStepInfo return info about step with given sequential number.

func (*Group) GetStepsInfo

func (group *Group) GetStepsInfo() []StepInfo

GetStepsInfo return info about all pipeline steps.

func (*Group) Run

func (group *Group) Run()

Run start the pipeline execution.

For result and error handling see ErrChan() function.

func (*Group) SetSource

func (group *Group) SetSource(st storage.Storage)

SetSource configure source storage for group.

func (*Group) SetTarget

func (group *Group) SetTarget(st storage.Storage)

SetTarget configure target storage for group.

type ObjectError

type ObjectError struct {
	Object *storage.Object
	Err    error
}

ObjectError contain a pointer to an Object that failed with error

func (*ObjectError) Error

func (e *ObjectError) Error() string

func (*ObjectError) Unwrap

func (e *ObjectError) Unwrap() error

Unwrap ObjectError error and return the underlying error.

type PipelineError

type PipelineError struct {
	StepName string
	StepNum  int
	Err      error
}

PipelineError implement wrapper for pipeline errors.

func (*PipelineError) Error

func (e *PipelineError) Error() string

func (*PipelineError) Unwrap

func (e *PipelineError) Unwrap() error

Unwrap PipelineError error and return the underlying error.

type Step

type Step struct {
	Name       string
	Fn         StepFn
	AddWorkers uint
	Config     interface{}
	ChanSize   uint
	// contains filtered or unexported fields
}

Step contain configuration of pipeline step and it's internal structure. Be careful with Config interface! Check of its type should implemented in StepFn. If typing fails, you get a StepConfigurationError in runtime.

type StepConfigurationError

type StepConfigurationError struct {
	StepName string
	StepNum  int
	Err      error
}

StepConfigurationError raises when step have interface typing error.

func (*StepConfigurationError) Error

func (e *StepConfigurationError) Error() string

func (*StepConfigurationError) Unwrap

func (e *StepConfigurationError) Unwrap() error

Unwrap StepConfigurationError error and return the underlying error.

type StepFn

type StepFn func(group *Group, stepNum int, input <-chan *storage.Object, output chan<- *storage.Object, errChan chan<- error)

StepFn implement the type of pipeline Step function.

type StepInfo

type StepInfo struct {
	Stats  *StepStats
	Name   string
	Num    int
	Config interface{}
}

StepInfo is used to represent step information, statistic and the step configuration interface.

type StepStats

type StepStats struct {
	Input  atomic.Uint64
	Output atomic.Uint64
	Error  atomic.Uint64
}

StepStats to keep basic step statistics.

Directories

Path Synopsis
Package collection contains different StepFn functions to do different pipeline actions.
Package collection contains different StepFn functions to do different pipeline actions.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL