Documentation ¶
Overview ¶
Package processor provides a framework for creating processors.
Processor ¶
## Overview
The `processor` package is a key component of the ETL project, providing a versatile and efficient mechanism for defining and executing data processing operations within a pipeline stage. Processors are responsible for transforming and enhancing the data flowing through the pipeline, allowing for complex data manipulations and business logic to be applied.
## What's a Processor?
A processor is a self-contained unit of data transformation within a pipeline stage. It encapsulates a specific data processing operation, defined as a function that takes a context and an input slice of data, performs the necessary transformations, and returns the processed data along with any errors that occurred during processing.
Processors are highly modular and reusable, allowing them to be easily combined and composed within stages to create sophisticated data processing workflows.
## How It Works
The core of the `processor` package revolves around the `IProcessor` interface, which defines the contract for a processor. The `Processor` struct implements this interface, providing the necessary functionality for creating and running processors.
To create a processor, you instantiate a new `Processor` using the `New` factory function, specifying the processor name, description, and the transform function. The transform function is defined using the `Transform` type, which takes a context and an input slice of data of type `ProcessingData`, and returns the processed data along with any errors.
When the `Run` method is called on a processor, it executes the following steps:
1. It checks if the pipeline is paused. If paused, the processor waits until it is resumed or the context is done. This allows for graceful handling of pipeline pauses during processor execution.
2. Once the pipeline is resumed or if it was not paused, the processor invokes the transform function, passing the input data and the context. The transform function performs the necessary data transformations and returns the processed data.
3. If the transform function returns an error, the processor handles it gracefully, updating the relevant metrics and logging the error details.
4. After successful execution of the transform function, the processor updates its metrics, such as incrementing the done counter and setting the duration.
5. If an `OnFinished` callback function is provided, the processor invokes it, passing the original input data, the processed data, and the context. This allows for custom post-processing or logging after the processor finishes its execution.
6. Finally, the processor returns the processed data to the caller.
Throughout the execution, the processor maintains comprehensive observability, including metrics, logging, and tracing, to monitor and debug the processor's performance and behavior.
## Features
1. **Modularity and Reusability**: Processors are designed to be modular and reusable, enabling easy composition and combination within pipeline stages to create complex data processing workflows.
2. **Flexibility**: Processors can encapsulate any data transformation logic, from simple arithmetic operations to complex business rules and data enrichment.
3. **Pause and Resume**: Processors support pipeline pausing and resuming. If the pipeline is paused during a processor's execution, the processor gracefully waits until it is resumed or the context is done, ensuring proper handling of pauses.
4. **Observability**: The processor package provides comprehensive observability features, including metrics, logging, and tracing, to monitor and debug the processor's execution.
5. **Metrics**: Processor metrics are exposed using the `expvar` package, allowing for easy integration with monitoring systems. Metrics include counters for created, running, failed, done, and interrupted processors, as well as duration.
6. **Logging**: The package utilizes the `sypl` library for structured logging, providing rich context and consistent log levels throughout the codebase. Log messages include relevant information such as processor status, counters, and duration.
7. **Tracing**: Tracing is implemented using the `customapm` package, which integrates with Elastic APM (Application Performance Monitoring) under the hood. This enables distributed tracing of the processor's execution, allowing developers to gain insights into the performance and behavior of their processors.
8. **Error Handling**: The processor package includes robust error handling mechanisms, with detailed error messages and proper propagation of errors during processor execution.
9. **OnFinished Callback**: Processors support an optional `OnFinished` callback function, which is invoked after the processor finishes its execution. This callback receives the original input data, the processed data, and the context, enabling custom post-processing or logging.
10. **Flexible Configuration**: Processors can be configured with various options, such as the `OnFinished` callback, using a functional options pattern. This allows for easy customization of processor behavior without modifying the core processor struct.
11. **Thorough Testing**: The codebase includes comprehensive unit tests, ensuring the reliability and correctness of the processor functionality. The tests cover various scenarios, including success cases, error handling, and pause/resume functionality.
12. **Well-Documented**: The code is thoroughly documented, with clear comments explaining the purpose and functionality of each component. The package also includes usage examples and test cases.
13. **Idiomatic Go**: The codebase follows idiomatic Go practices, leveraging the language's features and conventions for clean and efficient code.
14. **Typed Errors**: The package utilizes typed errors, providing more context and facilitating error handling and debugging.
15. **Customizable**: The processor package provides a high level of customization through the use of interfaces and generic types. Developers can easily create custom processors with specific transformation logic to meet their data processing requirements.
Index ¶
- Constants
- type Func
- type IProcessor
- type OnFinished
- type Processor
- func (p *Processor[ProcessingData]) GetCounterCreated() *expvar.Int
- func (p *Processor[ProcessingData]) GetCounterDone() *expvar.Int
- func (p *Processor[ProcessingData]) GetCounterFailed() *expvar.Int
- func (p *Processor[ProcessingData]) GetCounterInterrupted() *expvar.Int
- func (p *Processor[ProcessingData]) GetCounterRunning() *expvar.Int
- func (p *Processor[ProcessingData]) GetCreatedAt() time.Time
- func (p *Processor[ProcessingData]) GetDescription() string
- func (p *Processor[ProcessingData]) GetDuration() *expvar.Int
- func (p *Processor[ProcessingData]) GetLogger() sypl.ISypl
- func (p *Processor[ProcessingData]) GetMetrics() map[string]string
- func (p *Processor[ProcessingData]) GetName() string
- func (p *Processor[ProcessingData]) GetOnFinished() OnFinished[ProcessingData]
- func (p *Processor[ProcessingData]) GetStatus() *expvar.String
- func (p *Processor[ProcessingData]) GetType() string
- func (p *Processor[ProcessingData]) Run(ctx context.Context, processingData []ProcessingData) ([]ProcessingData, error)
- func (p *Processor[ProcessingData]) SetOnFinished(onFinished OnFinished[ProcessingData])
- type Transform
Constants ¶
const Type = "processor"
Type of the entity.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Func ¶
type Func[T any] func(p IProcessor[T]) IProcessor[T]
Func allows to specify message's options.
func WithOnFinished ¶
func WithOnFinished[T any](onFinished OnFinished[T]) Func[T]
WithOnFinished sets the OnFinished function.
type IProcessor ¶
type IProcessor[ProcessingData any] interface { shared.IMeta shared.IMetrics // GetOnFinished returns the `OnFinished` function. GetOnFinished() OnFinished[ProcessingData] // SetOnFinished sets the `OnFinished` function. SetOnFinished(onFinished OnFinished[ProcessingData]) // Get GetCounterInterrupted() *expvar.Int // Run the transform function. Run(ctx context.Context, processingData []ProcessingData) (processedOut []ProcessingData, err error) }
IProcessor defines what a `Processor` must do.
type OnFinished ¶
type OnFinished[T any] func(ctx context.Context, p IProcessor[T], originalIn []T, processedOut []T)
OnFinished is the function that is called when a processor finishes its execution.
type Processor ¶
type Processor[ProcessingData any] struct { // Transform function. Func Transform[ProcessingData] `json:"-"` // Logger is the internal logger. Logger sypl.ISypl `json:"-" validate:"required"` // Name of the processor. Name string `json:"name"` // Description of the processor. Description string `json:"description"` // OnFinished is the function that is called when a processor finishes its // execution. OnFinished OnFinished[ProcessingData] `json:"-"` // Metrics. CounterCreated *expvar.Int `json:"counterCreated"` CounterDone *expvar.Int `json:"counterDone"` CounterFailed *expvar.Int `json:"counterFailed"` CounterInterrupted *expvar.Int `json:"counterInterrupted"` CounterRunning *expvar.Int `json:"counterRunning"` CreatedAt time.Time `json:"createdAt"` Duration *expvar.Int `json:"duration"` Status *expvar.String `json:"status"` }
Processor definition.
func (*Processor[ProcessingData]) GetCounterCreated ¶
GetCounterCreated returns the `CounterCreated` metric.
func (*Processor[ProcessingData]) GetCounterDone ¶
GetCounterDone returns the `CounterDone` metric.
func (*Processor[ProcessingData]) GetCounterFailed ¶
GetCounterFailed returns the `CounterFailed` metric.
func (*Processor[ProcessingData]) GetCounterInterrupted ¶ added in v2.1.0
GetCounterInterrupted returns the `CounterInterrupted` metric.
func (*Processor[ProcessingData]) GetCounterRunning ¶
GetCounterRunning returns the `CounterRunning` metric.
func (*Processor[ProcessingData]) GetCreatedAt ¶ added in v2.1.0
GetCreatedAt returns the created at time.
func (*Processor[ProcessingData]) GetDescription ¶
GetDescription returns the `Description` of the processor.
func (*Processor[ProcessingData]) GetDuration ¶ added in v2.1.0
GetDuration returns the `CounterDuration` of the stage.
func (*Processor[ProcessingData]) GetMetrics ¶ added in v2.1.0
GetMetrics returns the stage's metrics.
func (*Processor[ProcessingData]) GetOnFinished ¶
func (p *Processor[ProcessingData]) GetOnFinished() OnFinished[ProcessingData]
GetOnFinished returns the `OnFinished` function.
func (*Processor[ProcessingData]) Run ¶
func (p *Processor[ProcessingData]) Run(ctx context.Context, processingData []ProcessingData) ([]ProcessingData, error)
Run the transform function.
func (*Processor[ProcessingData]) SetOnFinished ¶
func (p *Processor[ProcessingData]) SetOnFinished(onFinished OnFinished[ProcessingData])
SetOnFinished sets the `OnFinished` function.
type Transform ¶ added in v2.1.0
type Transform[ProcessedData any] func(ctx context.Context, processingData []ProcessedData) (processedOut []ProcessedData, err error)
Transform is a function that transforms (`processingData`) into (`processingData`), returning any errors that occurred during processing.