stage

package
v0.1.7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 23, 2023 License: MIT Imports: 13 Imported by: 0

Documentation

Overview

Package stage provides a framework for creating stages

Index

Constants

View Source
const Type = "stage"

Type of the entity.

Variables

This section is empty.

Functions

This section is empty.

Types

type Func

type Func[In, Out any] func(p IStage[In, Out]) IStage[In, Out]

Func allows to specify message's options.

func WithOnFinished

func WithOnFinished[In, Out any](onFinished OnFinished[In, Out]) Func[In, Out]

WithOnFinished sets the OnFinished function.

type IStage

type IStage[In, Out any] interface {
	shared.IMeta

	shared.IMetrics

	// GetOnFinished returns the `OnFinished` function.
	GetOnFinished() OnFinished[In, Out]

	// SetOnFinished sets the `OnFinished` function.
	SetOnFinished(onFinished OnFinished[In, Out])

	// Run the stage function.
	Run(ctx context.Context, in []In) (out []Out, err error)
}

IStage defines what a `Stage` must do.

func New

func New[In, Out any](
	name string,
	conversor concurrentloop.MapFunc[In, Out],
	processors ...processor.IProcessor[In],
) (IStage[In, Out], error)

New returns a new stage.

type OnFinished

type OnFinished[In, Out any] func(ctx context.Context, p IStage[In, Out], in []In, processedOut []Out)

OnFinished is the function that is called when a processor finishes its execution.

type Stage

type Stage[In, Out any] struct {
	// Description of the processor.
	Description string `json:"description"`

	// Conversor to be used in the stage.
	Conversor concurrentloop.MapFunc[In, Out] `json:"-" validate:"required"`

	// Logger is the pipeline logger.
	Logger sypl.ISypl `json:"-" validate:"required"`

	// Name of the stage.
	Name string `json:"name" validate:"required"`

	// OnFinished is the function that is called when a processor finishes its
	// execution.
	OnFinished OnFinished[In, Out] `json:"-"`

	// Processors to be run in the stage.
	Processors []processor.IProcessor[In] `json:"processors" validate:"required,gt=0"`

	// Progress of the stage.
	Progress int `json:"progress"`

	// Metrics.
	CounterCreated *expvar.Int    `json:"counterCreated" validate:"required,gte=0"`
	CounterRunning *expvar.Int    `json:"counterRunning" validate:"required,gte=0"`
	CounterFailed  *expvar.Int    `json:"counterFailed" validate:"required,gte=0"`
	CounterDone    *expvar.Int    `json:"counterDone" validate:"required,gte=0"`
	Status         *expvar.String `json:"status" validate:"required,gte=0"`
}

Stage definition.

func (*Stage[In, Out]) GetCounterCreated

func (s *Stage[In, Out]) GetCounterCreated() *expvar.Int

GetCounterCreated returns the `CounterCreated` of the processor.

func (*Stage[In, Out]) GetCounterDone

func (s *Stage[In, Out]) GetCounterDone() *expvar.Int

GetCounterDone returns the `CounterDone` of the processor.

func (*Stage[In, Out]) GetCounterFailed

func (s *Stage[In, Out]) GetCounterFailed() *expvar.Int

GetCounterFailed returns the `CounterFailed` of the processor.

func (*Stage[In, Out]) GetCounterRunning

func (s *Stage[In, Out]) GetCounterRunning() *expvar.Int

GetCounterRunning returns the `CounterRunning` of the processor.

func (*Stage[In, Out]) GetDescription

func (s *Stage[In, Out]) GetDescription() string

GetDescription returns the `Description` of the processor.

func (*Stage[In, Out]) GetLogger

func (s *Stage[In, Out]) GetLogger() sypl.ISypl

GetLogger returns the `Logger` of the processor.

func (*Stage[In, Out]) GetName

func (s *Stage[In, Out]) GetName() string

GetName returns the `Name` of the stage.

func (*Stage[In, Out]) GetOnFinished

func (s *Stage[In, Out]) GetOnFinished() OnFinished[In, Out]

GetOnFinished returns the `OnFinished` function.

func (*Stage[In, Out]) GetStatus

func (s *Stage[In, Out]) GetStatus() *expvar.String

GetStatus returns the `Status` metric.

func (*Stage[In, Out]) GetType added in v0.1.2

func (s *Stage[In, Out]) GetType() string

GetType returns the entity type.

func (*Stage[In, Out]) Run

func (s *Stage[In, Out]) Run(ctx context.Context, in []In) ([]Out, error)

Run the transform function.

func (*Stage[In, Out]) SetOnFinished

func (s *Stage[In, Out]) SetOnFinished(onFinished OnFinished[In, Out])

SetOnFinished sets the `OnFinished` function.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL