Documentation ¶
Overview ¶
Package converter provides a framework for creating converters.
Conveters ¶
## Overview
The `conveters` package is a core component of the ETL project, providing a versatile and efficient mechanism for defining and executing data conversion tasks within a pipeline. A conveter is responsible for transforming data from one format to another, enabling seamless integration between different stages and ensuring data compatibility throughout the pipeline.
## What's a Conveter?
A conveter is a self-contained unit of data conversion within a pipeline. It encapsulates a specific conversion logic that transforms input data of type `In` into output data of type `Out`. Conveters are highly reusable and can be easily plugged into different stages of the pipeline.
The conveter package provides a generic interface `IConverter` and a concrete implementation `Converter` that can be instantiated with custom conversion functions.
## How It Works
The core of the `conveters` package revolves around the `IConverter` interface and the `Converter` struct. The `IConverter` interface defines the contract for a conveter, specifying methods for running the conversion, managing metrics, and handling callbacks.
To create a conveter, you instantiate a new `Converter` using the `New` factory function, specifying the conveter name, description, and the conversion function. The conversion function is defined as `Convert[In, Out]`, where `In` represents the input data type and `Out` represents the output data type.
When the `Run` method is called on a conveter, it executes the following steps:
1. It sets up observability, including tracing, metrics, status, and logging, to monitor and track the conveter's execution.
2. It invokes the conversion function, passing the input data and receiving the converted output data. If an error occurs during the conversion, it is handled appropriately.
3. After the conversion is complete, it updates the observability metrics, such as counters for created, running, failed, and done conveters, as well as the duration and status.
4. If an `OnFinished` callback is defined, it is invoked with the original input data and the converted output data, allowing for custom post-conversion processing.
5. Finally, the conveter returns the converted output data.
## Features
1. **Generic and Reusable**: Conveters are implemented using Go's generic types, allowing for flexibility in handling various data types. They can be easily reused across different stages and pipelines.
2. **Customizable Conversion Logic**: The conversion logic is encapsulated in a custom function that can be defined when creating a conveter. This allows for tailored data transformations specific to the project's requirements.
3. **Observability**: The conveters package provides comprehensive observability features, including tracing, metrics, status tracking, and logging. This enables effective monitoring and debugging of the conversion process.
4. **Metrics**: Conveter metrics are exposed using the `expvar` package, allowing for easy integration with monitoring systems. Metrics include counters for created, running, failed, and done conveters, as well as duration and status.
5. **Callbacks**: The package supports an `OnFinished` callback function that is invoked after the conversion is complete. This callback receives the original input data and the converted output data, enabling custom post-conversion processing.
6. **Error Handling**: The conveters package includes robust error handling mechanisms, propagating errors that occur during the conversion process and providing informative error messages.
7. **Factory Functions**: The package provides factory functions `New` and `Default` for creating conveters with custom or default configurations. The `MustDefault` function is also available for creating a conveter that panics on error.
8. **Thorough Testing**: The codebase includes comprehensive unit tests to ensure the correctness and reliability of the conveters package. The tests cover various scenarios and validate the conveter's behavior and metrics.
Index ¶
- Constants
- type Convert
- type Converter
- func (c *Converter[In, Out]) GetCounterCreated() *expvar.Int
- func (c *Converter[In, Out]) GetCounterDone() *expvar.Int
- func (c *Converter[In, Out]) GetCounterFailed() *expvar.Int
- func (c *Converter[In, Out]) GetCounterRunning() *expvar.Int
- func (c *Converter[In, Out]) GetCreatedAt() time.Time
- func (c *Converter[In, Out]) GetDescription() string
- func (c *Converter[In, Out]) GetDuration() *expvar.Int
- func (c *Converter[In, Out]) GetLogger() sypl.ISypl
- func (c *Converter[In, Out]) GetMetrics() map[string]string
- func (c *Converter[In, Out]) GetName() string
- func (c *Converter[In, Out]) GetOnFinished() OnFinished[In, Out]
- func (c *Converter[In, Out]) GetStatus() *expvar.String
- func (c *Converter[In, Out]) GetType() string
- func (c *Converter[In, Out]) Run(ctx context.Context, in In) (Out, error)
- func (c *Converter[In, Out]) SetOnFinished(onFinished OnFinished[In, Out])
- type Func
- type IConverter
- func Default[In, Out any](fn Convert[In, Out], opts ...Func[In, Out]) (IConverter[In, Out], error)
- func MustDefault[In, Out any](fn Convert[In, Out], opts ...Func[In, Out]) IConverter[In, Out]
- func New[In, Out any](name, description string, fn Convert[In, Out], opts ...Func[In, Out]) (IConverter[In, Out], error)
- type OnFinished
Constants ¶
const Type = "converter"
Type of the entity.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Convert ¶ added in v2.1.0
Convert is a function that converts the data (`in`). It returns the converted data and any errors that occurred during conversion.
type Converter ¶
type Converter[In, Out any] struct { // Description of the processor. Description string `json:"description"` // Conversion function. Func Convert[In, Out] `json:"-"` // Logger is the internal logger. Logger sypl.ISypl `json:"-" validate:"required"` // Name of the converter. Name string `json:"name" validate:"required"` // OnFinished is the function that is called when a processor finishes its // execution. OnFinished OnFinished[In, Out] `json:"-"` // Metrics. CounterCreated *expvar.Int `json:"counterCreated"` CounterRunning *expvar.Int `json:"counterRunning"` CounterFailed *expvar.Int `json:"counterFailed"` CounterDone *expvar.Int `json:"counterDone"` CreatedAt time.Time `json:"createdAt"` Duration *expvar.Int `json:"duration"` Status *expvar.String `json:"status"` }
Converter definition.
func (*Converter[In, Out]) GetCounterCreated ¶
GetCounterCreated returns the `CounterCreated` of the processor.
func (*Converter[In, Out]) GetCounterDone ¶
GetCounterDone returns the `CounterDone` of the processor.
func (*Converter[In, Out]) GetCounterFailed ¶
GetCounterFailed returns the `CounterFailed` of the processor.
func (*Converter[In, Out]) GetCounterRunning ¶
GetCounterRunning returns the `CounterRunning` of the processor.
func (*Converter[In, Out]) GetCreatedAt ¶ added in v2.1.0
GetCreatedAt returns the created at time.
func (*Converter[In, Out]) GetDescription ¶
GetDescription returns the `Description` of the processor.
func (*Converter[In, Out]) GetDuration ¶ added in v2.1.0
GetDuration returns the `CounterDuration` of the converter.
func (*Converter[In, Out]) GetMetrics ¶ added in v2.1.0
GetMetrics returns the converter's metrics.
func (*Converter[In, Out]) GetOnFinished ¶
func (c *Converter[In, Out]) GetOnFinished() OnFinished[In, Out]
GetOnFinished returns the `OnFinished` function.
func (*Converter[In, Out]) SetOnFinished ¶
func (c *Converter[In, Out]) SetOnFinished(onFinished OnFinished[In, Out])
SetOnFinished sets the `OnFinished` function.
type Func ¶
type Func[In, Out any] func(p IConverter[In, Out]) IConverter[In, Out]
Func allows to specify message's options.
func WithOnFinished ¶
func WithOnFinished[In, Out any](onFinished OnFinished[In, Out]) Func[In, Out]
WithOnFinished sets the OnFinished function.
type IConverter ¶
type IConverter[In, Out any] interface { shared.IMeta shared.IMetrics // GetOnFinished returns the `OnFinished` function. GetOnFinished() OnFinished[In, Out] // SetOnFinished sets the `OnFinished` function. SetOnFinished(onFinished OnFinished[In, Out]) // Run the stage function. Run(ctx context.Context, in In) (Out, error) }
IConverter defines what a `Conveter` must do.
func Default ¶ added in v2.2.0
func Default[In, Out any]( fn Convert[In, Out], opts ...Func[In, Out], ) (IConverter[In, Out], error)
Default returns a new converter with name and description set to "converter".
func MustDefault ¶ added in v2.2.0
func MustDefault[In, Out any]( fn Convert[In, Out], opts ...Func[In, Out], ) IConverter[In, Out]
type OnFinished ¶
type OnFinished[In, Out any] func(ctx context.Context, c IConverter[In, Out], originalIn In, convertedOut Out)
OnFinished is the function that is called when a processor finishes its execution.