stage

package
v2.2.8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 22, 2024 License: MIT Imports: 16 Imported by: 0

README

Stage

Overview

The stage package is a fundamental building block of the ETL project, providing a flexible and powerful mechanism for defining and executing individual stages within a pipeline. A stage represents a specific data processing step, consisting of a converter and one or more processors, which work together to transform and enhance the data flowing through the pipeline.

What's a Stage?

A stage is a self-contained unit of data processing within a pipeline. It encapsulates a converter and a set of processors that operate on the data sequentially. The converter is responsible for transforming the data from one format to another, while the processors perform specific operations on the data, such as filtering, aggregating, or enriching.

Stages are highly modular and reusable, allowing them to be easily combined and composed to create complex data processing workflows.

How It Works

At the core of the stage package is the IStage interface, which defines the contract for a stage. The Stage struct implements this interface, providing the necessary functionality for creating and running stages.

To create a stage, you instantiate a new Stage using the New factory function, specifying the stage name, description, converter, and a variadic list of processors. The converter is defined using the converter.IConverter interface, while the processors are defined using the processor.IProcessor interface.

When the Run method is called on a stage, it executes the following steps:

  1. It iterates through the processors sequentially, passing the output of each processor as the input to the next one. This ensures that the data is processed in a sequential manner, allowing for data dependencies and ordering.

  2. After all the processors have finished executing, the stage applies the converter to transform the processed data into the desired output format. The converter operates concurrently on the data using the concurrentloop package, enabling efficient parallel processing.

  3. Finally, the stage returns the converted data as a task.Task, which encapsulates both the processed and converted data.

Throughout the execution, the stage maintains comprehensive observability, including metrics, logging, and tracing, to monitor and debug the stage's performance and behavior.

Features

  1. Modularity and Reusability: Stages are designed to be modular and reusable, allowing for easy composition and combination to create complex data processing workflows.

  2. Sequential Processing: The stage executes processors sequentially, ensuring that data dependencies and ordering are maintained. This is particularly useful when the output of one processor depends on the output of a previous processor.

  3. Concurrent Conversion: The stage applies the converter concurrently to the processed data, leveraging the concurrentloop package for efficient parallel processing. This improves the overall performance of the stage.

  4. Observability: The stage package provides comprehensive observability features, including metrics, logging, and tracing, to monitor and debug the stage's execution.

  5. Metrics: Stage metrics are exposed using the expvar package, allowing for easy integration with monitoring systems. Metrics include counters for created, running, failed, and done stages, as well as duration, progress, and progress percentage.

  6. Logging: The package utilizes the sypl library for structured logging, providing rich context and consistent log levels throughout the codebase. Log messages include relevant information such as stage status, counters, duration, and progress.

  7. Tracing: Tracing is implemented using the customapm package, which integrates with Elastic APM (Application Performance Monitoring) under the hood. This enables distributed tracing of the stage's execution, allowing developers to gain deep insights into the performance and behavior of their stages.

  8. Error Handling: The stage package includes robust error handling mechanisms, with detailed error messages and proper propagation of errors throughout the stage's execution.

  9. Progress Tracking: The package provides progress tracking capabilities, including absolute progress and percentage completion, enabling real-time monitoring of the stage's execution.

  10. Flexible Configuration: Stages can be configured with various options, such as custom converters, processors, and on-finished callbacks, using a functional options pattern.

  11. Thorough Testing: The codebase includes comprehensive unit tests, ensuring the reliability and correctness of the stage functionality. The tests cover various scenarios, including the usage of different processors and converters.

  12. Well-Documented: The code is thoroughly documented, with clear comments explaining the purpose and functionality of each component. The package also includes usage examples and test cases.

  13. Idiomatic Go: The codebase follows idiomatic Go practices, leveraging the language's features and conventions for clean and efficient code.

  14. Customizable: The stage package provides a high level of customization through the use of interfaces and generic types. Developers can easily create custom converters and processors to meet their specific data processing requirements.

Architectural Modularity and Flexibility

The stage package is designed with architectural modularity and flexibility in mind. It leverages Go's interfaces and generic types to provide a highly extensible and customizable stage framework.

The IStage interface defines the contract for a stage, allowing for easy integration of custom stage implementations. The converter.IConverter and processor.IProcessor interfaces enable the creation of custom converters and processors, respectively.

The use of generic types for ProcessingData and ConvertedData allows stages to handle various data types, making the package adaptable to different data processing scenarios.

The functional options pattern, used in the New factory function and various configuration methods, provides a clean and flexible way to customize stage behavior without modifying the core stage struct.

Documentation

Overview

Package stage provides a framework for creating stages

Stage

## Overview

The `stage` package is a fundamental building block of the ETL project, providing a flexible and powerful mechanism for defining and executing individual stages within a pipeline. A stage represents a specific data processing step, consisting of a converter and one or more processors, which work together to transform and enhance the data flowing through the pipeline.

## What's a Stage?

A stage is a self-contained unit of data processing within a pipeline. It encapsulates a converter and a set of processors that operate on the data sequentially. The converter is responsible for transforming the data from one format to another, while the processors perform specific operations on the data, such as filtering, aggregating, or enriching.

Stages are highly modular and reusable, allowing them to be easily combined and composed to create complex data processing workflows.

## How It Works

At the core of the `stage` package is the `IStage` interface, which defines the contract for a stage. The `Stage` struct implements this interface, providing the necessary functionality for creating and running stages.

To create a stage, you instantiate a new `Stage` using the `New` factory function, specifying the stage name, description, converter, and a variadic list of processors. The converter is defined using the `converter.IConverter` interface, while the processors are defined using the `processor.IProcessor` interface.

When the `Run` method is called on a stage, it executes the following steps:

1. It iterates through the processors sequentially, passing the output of each processor as the input to the next one. This ensures that the data is processed in a sequential manner, allowing for data dependencies and ordering.

2. After all the processors have finished executing, the stage applies the converter to transform the processed data into the desired output format. The converter operates concurrently on the data using the `concurrentloop` package, enabling efficient parallel processing.

3. Finally, the stage returns the converted data as a `task.Task`, which encapsulates both the processed and converted data.

Throughout the execution, the stage maintains comprehensive observability, including metrics, logging, and tracing, to monitor and debug the stage's performance and behavior.

## Features

1. **Modularity and Reusability**: Stages are designed to be modular and reusable, allowing for easy composition and combination to create complex data processing workflows.

2. **Sequential Processing**: The stage executes processors sequentially, ensuring that data dependencies and ordering are maintained. This is particularly useful when the output of one processor depends on the output of a previous processor.

3. **Concurrent Conversion**: The stage applies the converter concurrently to the processed data, leveraging the `concurrentloop` package for efficient parallel processing. This improves the overall performance of the stage.

4. **Observability**: The stage package provides comprehensive observability features, including metrics, logging, and tracing, to monitor and debug the stage's execution.

5. **Metrics**: Stage metrics are exposed using the `expvar` package, allowing for easy integration with monitoring systems. Metrics include counters for created, running, failed, and done stages, as well as duration, progress, and progress percentage.

6. **Logging**: The package utilizes the `sypl` library for structured logging, providing rich context and consistent log levels throughout the codebase. Log messages include relevant information such as stage status, counters, duration, and progress.

7. **Tracing**: Tracing is implemented using the `customapm` package, which integrates with Elastic APM (Application Performance Monitoring) under the hood. This enables distributed tracing of the stage's execution, allowing developers to gain deep insights into the performance and behavior of their stages.

8. **Error Handling**: The stage package includes robust error handling mechanisms, with detailed error messages and proper propagation of errors throughout the stage's execution.

9. **Progress Tracking**: The package provides progress tracking capabilities, including absolute progress and percentage completion, enabling real-time monitoring of the stage's execution.

10. **Flexible Configuration**: Stages can be configured with various options, such as custom converters, processors, and on-finished callbacks, using a functional options pattern.

11. **Thorough Testing**: The codebase includes comprehensive unit tests, ensuring the reliability and correctness of the stage functionality. The tests cover various scenarios, including the usage of different processors and converters.

12. **Well-Documented**: The code is thoroughly documented, with clear comments explaining the purpose and functionality of each component. The package also includes usage examples and test cases.

13. **Idiomatic Go**: The codebase follows idiomatic Go practices, leveraging the language's features and conventions for clean and efficient code.

14. **Customizable**: The stage package provides a high level of customization through the use of interfaces and generic types. Developers can easily create custom converters and processors to meet their specific data processing requirements.

## Architectural Modularity and Flexibility

The stage package is designed with architectural modularity and flexibility in mind. It leverages Go's interfaces and generic types to provide a highly extensible and customizable stage framework.

The `IStage` interface defines the contract for a stage, allowing for easy integration of custom stage implementations. The `converter.IConverter` and `processor.IProcessor` interfaces enable the creation of custom converters and processors, respectively.

The use of generic types for `ProcessingData` and `ConvertedData` allows stages to handle various data types, making the package adaptable to different data processing scenarios.

The functional options pattern, used in the `New` factory function and various configuration methods, provides a clean and flexible way to customize stage behavior without modifying the core stage struct.

## Applied Best Practices

The stage package adheres to best practices and idiomatic Go programming principles:

- **Interface-Driven Design**: The package heavily relies on interfaces, such as `IStage`, `converter.IConverter`, and `processor.IProcessor`, to provide abstraction and extensibility. This allows for easy integration of custom implementations and facilitates testing.

- **Functional Options**: The package utilizes the functional options pattern for stage configuration, providing a clean and flexible way to customize stage behavior.

- **Error Handling**: The package follows Go's error handling conventions, returning errors from functions and methods when necessary. Errors are propagated and handled appropriately throughout the stage's execution.

- **Testing**: The package includes comprehensive unit tests, covering various scenarios and edge cases. The tests ensure the correctness and reliability of the stage functionality.

- **Naming Conventions**: The codebase follows Go's naming conventions, using descriptive and meaningful names for variables, functions, and types.

- **Code Organization**: The package is well-organized, with separate files for different components and concerns. This promotes code readability and maintainability.

By applying these best practices, the stage package maintains a high level of code quality, reliability, and ease of use.

Index

Examples

Constants

View Source
const Type = "stage"

Type of the entity.

Variables

This section is empty.

Functions

This section is empty.

Types

type Func

type Func[ProcessedData, ConvertedOut any] func(p IStage[ProcessedData, ConvertedOut]) IStage[ProcessedData, ConvertedOut]

Func allows to specify message's options.

func WithOnFinished

func WithOnFinished[ProcessedData, ConvertedOut any](onFinished OnFinished[ProcessedData, ConvertedOut]) Func[ProcessedData, ConvertedOut]

WithOnFinished sets the OnFinished function.

type IStage

type IStage[ProcessedData, ConvertedOut any] interface {
	shared.IMeta

	shared.IMetrics

	// GetProgress returns the `CounterProgress` of the stage.
	GetProgress() *expvar.Int

	// GetProgressPercent returns the `ProgressPercent` of the stage.
	GetProgressPercent() *expvar.String

	// SetProgressPercent sets the `ProgressPercent` of the stage.
	SetProgressPercent()

	// GetOnFinished returns the `OnFinished` function.
	GetOnFinished() OnFinished[ProcessedData, ConvertedOut]

	// SetOnFinished sets the `OnFinished` function.
	SetOnFinished(onFinished OnFinished[ProcessedData, ConvertedOut])

	// Run the stage function.
	Run(context.Context, task.Task[ProcessedData, ConvertedOut]) (task.Task[ProcessedData, ConvertedOut], error)
}

IStage defines what a `Stage` must do.

func New

func New[ProcessingData, ConvertedData any](
	name string,
	description string,
	conversor converter.IConverter[ProcessingData, ConvertedData],
	processors ...processor.IProcessor[ProcessingData],
) (IStage[ProcessingData, ConvertedData], error)

New returns a new stage.

Example (Storage_processor)

Demonstrates the usage of a stage with the storage processor (memory storage).

// Sample data.
tests := []Test{
	{
		ID:   "1",
		Name: "John",
	},
	{
		ID:   "2",
		Name: "Peter",
	},
}

// Memory storage from DAL.
memoryStorage, err := memory.New(context.Background())
if err != nil {
	log.Fatalln(err)
}

// Storage processor, concurrency set to 1.
s, err := storage.New[Test](memoryStorage, 1, "example-")
if err != nil {
	log.Fatalln(err)
}

// Stage with the storage processor.
stg1, err := New(
	"stage-1",
	"main stage",

	// Add as many as you want.
	passthru.Must[Test](), // Pass-through, does nothing.

	// Add as many as you want.
	s,
)
if err != nil {
	log.Fatalln(err)
}

// Run the stage passing the processing data as a task.
tasksOut, err := stg1.Run(context.Background(), task.Task[Test, Test]{
	ProcessingData: tests,
})
if err != nil {
	log.Fatalln(err)
}

// String builder to contain the output of the stage and the memory storage.
var buf strings.Builder

// Iterate over tasksOut and write to the buffer.
for _, v := range tasksOut.ConvertedData {
	buf.WriteString(fmt.Sprintf("%s %s\n", v.ID, v.Name))
}

// Get a list from the memory storage.
//
// NOTE: The usage of memory.ResponseList[Test] wrapper.
var fromMemory memory.ResponseList[Test]
if err := memoryStorage.List(context.Background(), "etl", &fromMemory, &list.List{}); err != nil {
	log.Fatalln(err)
}

// Iterate over fromMemory so we can add to the buffer.
for _, v := range fromMemory.Items {
	buf.WriteString(fmt.Sprintf("%s %s\n", v.ID, v.Name))
}

// Get the content of the buffer.
bufferContent := buf.String()

//////
// Check in the buffer if the name John and Peter appears 2 times.
//////

if strings.Count(bufferContent, "John") == 2 {
	fmt.Println("John appears 2 times")
}

if strings.Count(bufferContent, "Peter") == 2 {
	fmt.Println("Peter appears 2 times")
}
Output:

John appears 2 times
Peter appears 2 times

type OnFinished

type OnFinished[ProcessedData, ConvertedOut any] func(ctx context.Context, s IStage[ProcessedData, ConvertedOut], tskIn task.Task[ProcessedData, ConvertedOut], tskOut task.Task[ProcessedData, ConvertedOut])

OnFinished is the function that is called when a processor finishes its execution.

type Stage

type Stage[ProcessingData, ConvertedData any] struct {
	// Description of the stage.
	Description string `json:"description"`

	// Conversor to be used tsk the stage.
	Conversor converter.IConverter[ProcessingData, ConvertedData] `json:"-" validate:"required"`

	// Logger is the internal logger.
	Logger sypl.ISypl `json:"-" validate:"required"`

	// Name of the stage.
	Name string `json:"name" validate:"required"`

	// OnFinished is the function that is called when a processor finishes its
	// execution.
	OnFinished OnFinished[ProcessingData, ConvertedData] `json:"-"`

	// Processors to be run tsk the stage.
	Processors []processor.IProcessor[ProcessingData] `json:"processors" validate:"required,gt=0"`

	// Metrics.
	CounterCreated *expvar.Int `json:"counterCreated"`
	CounterDone    *expvar.Int `json:"counterDone"`
	CounterFailed  *expvar.Int `json:"counterFailed"`
	CounterRunning *expvar.Int `json:"counterRunning"`

	CreatedAt       time.Time      `json:"createdAt"`
	Duration        *expvar.Int    `json:"duration"`
	Progress        *expvar.Int    `json:"progress"`
	ProgressPercent *expvar.String `json:"progressPercent"`
	Status          *expvar.String `json:"status"`
}

Stage definition.

func (*Stage[ProcessingData, ConvertedData]) GetCounterCreated

func (s *Stage[ProcessingData, ConvertedData]) GetCounterCreated() *expvar.Int

GetCounterCreated returns the `CounterCreated` of the stage.

func (*Stage[ProcessingData, ConvertedData]) GetCounterDone

func (s *Stage[ProcessingData, ConvertedData]) GetCounterDone() *expvar.Int

GetCounterDone returns the `CounterDone` of the stage.

func (*Stage[ProcessingData, ConvertedData]) GetCounterFailed

func (s *Stage[ProcessingData, ConvertedData]) GetCounterFailed() *expvar.Int

GetCounterFailed returns the `CounterFailed` of the stage.

func (*Stage[ProcessingData, ConvertedData]) GetCounterRunning

func (s *Stage[ProcessingData, ConvertedData]) GetCounterRunning() *expvar.Int

GetCounterRunning returns the `CounterRunning` of the stage.

func (*Stage[ProcessingData, ConvertedData]) GetCreatedAt added in v2.0.6

func (s *Stage[ProcessingData, ConvertedData]) GetCreatedAt() time.Time

GetCreatedAt returns the created at time.

func (*Stage[ProcessingData, ConvertedData]) GetDescription

func (s *Stage[ProcessingData, ConvertedData]) GetDescription() string

GetDescription returns the `Description` of the stage.

func (*Stage[ProcessingData, ConvertedData]) GetDuration added in v2.0.6

func (s *Stage[ProcessingData, ConvertedData]) GetDuration() *expvar.Int

GetDuration returns the `CounterDuration` of the stage.

func (*Stage[ProcessingData, ConvertedData]) GetLogger

func (s *Stage[ProcessingData, ConvertedData]) GetLogger() sypl.ISypl

GetLogger returns the `Logger` of the stage.

func (*Stage[ProcessingData, ConvertedData]) GetMetrics added in v2.0.7

func (s *Stage[ProcessingData, ConvertedData]) GetMetrics() map[string]string

GetMetrics returns the stage's metrics.

func (*Stage[ProcessingData, ConvertedData]) GetName

func (s *Stage[ProcessingData, ConvertedData]) GetName() string

GetName returns the `Name` of the stage.

func (*Stage[ProcessingData, ConvertedData]) GetOnFinished

func (s *Stage[ProcessingData, ConvertedData]) GetOnFinished() OnFinished[ProcessingData, ConvertedData]

GetOnFinished returns the `OnFinished` function.

func (*Stage[ProcessingData, ConvertedData]) GetProgress added in v2.0.6

func (s *Stage[ProcessingData, ConvertedData]) GetProgress() *expvar.Int

GetProgress returns the `CounterProgress` of the stage.

func (*Stage[ProcessingData, ConvertedData]) GetProgressPercent added in v2.0.6

func (s *Stage[ProcessingData, ConvertedData]) GetProgressPercent() *expvar.String

GetProgressPercent returns the `ProgressPercent` of the stage.

func (*Stage[ProcessingData, ConvertedData]) GetStatus

func (s *Stage[ProcessingData, ConvertedData]) GetStatus() *expvar.String

GetStatus returns the `Status` metric.

func (*Stage[ProcessingData, ConvertedData]) GetType

func (s *Stage[ProcessingData, ConvertedData]) GetType() string

GetType returns the entity type.

func (*Stage[ProcessingData, ConvertedData]) Run

func (s *Stage[ProcessingData, ConvertedData]) Run(ctx context.Context, tsk task.Task[ProcessingData, ConvertedData]) (task.Task[ProcessingData, ConvertedData], error)

Run the transform function.

func (*Stage[ProcessingData, ConvertedData]) SetOnFinished

func (s *Stage[ProcessingData, ConvertedData]) SetOnFinished(onFinished OnFinished[ProcessingData, ConvertedData])

SetOnFinished sets the `OnFinished` function.

func (*Stage[ProcessingData, ConvertedData]) SetProgressPercent added in v2.0.6

func (s *Stage[ProcessingData, ConvertedData]) SetProgressPercent()

SetProgressPercent sets the `ProgressPercent` of the stage.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL