transform

package
v0.0.0-...-144a274 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 21, 2024 License: LGPL-3.0 Imports: 10 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Processor

func Processor(
	pg PayloadGenerator,
	configuration map[string]interface{},
	exec func([]byte) error,
	input <-chan DataBlock,
	signals <-chan os.Signal,
	logger *log.Entry) error

Processor defines an stand-alone processor that execute the produced function "exec" with all the result of the transformed data coming via the "input" channel. The payload generator and the configuration will be used for the data transformation. The signals provides a mechanism to stop the processor execution and the logger trace the processor activity.

For every data received in the "input" chan, a transformation is made and the result of the transformation is provided as a parameter for the "exec" function.

If the transformation fails, the retry mechanism will be used

If the execution of the "exec" function returns an error, the retry mechanism will be used.

func ProcessorOrch

func ProcessorOrch(
	queue Queue,
	pg PayloadGenerator,
	configuration map[string]interface{},
	exec func([]byte) error,
	input chan DataBlock,
	signals <-chan os.Signal,
	logger *log.Entry,
	concurrents int) error

ProcessorOrch execute the "Processor" function with an extra level of orchestration to start more than one transformation and executor process in parallel (specified by the "concurrents" parameters). The stop mechanism also handle the stop of all the individual workers.

See "Processor" for more information.

Types

type DataBlock

type DataBlock struct {
	Data []byte
	Ack  func() error
	Nack func() error
}

DataBlock is a generic structure used as an input for the user-defined executor.

The raw data is provided and two methods to acknowledge (Ack) that the data was correctly processed or to indicate that the data was not processed correctly and allows the library to execute the retry mechanism on that date to try to process it later.

type PayloadGenerator

type PayloadGenerator struct {
	// contains filtered or unexported fields
}

PayloadGenerator hold the templates and the mapping function between data and templates names

func NewPayloadGenerator

func NewPayloadGenerator(
	templates *template.Template,
	getType func(map[string]interface{}) ([]string, error),
	templateExtension string) PayloadGenerator

NewPayloadGenerator creates an instance of a PayloadGenerator with the available templates, a function that maps a input Json to a template name and the file extension used for the template files.

The getType is a function that maps a input data object (Json structure transformed into a map) to a template name (extension not included). The output map is a list of names that the payload generator will try to map, in the provided order, to the available templates. For example, a JSON:

{"operation": {"sum": [1,2]} }

can be mapped to ["sum", "operation"].

The payload generator will look for the "sum" template, and if that one no exist in the "templates" parameter, it will try the "operation" template. If also the "operation" template is not available, a error will be raise

func (PayloadGenerator) GenEventPayload

func (p PayloadGenerator) GenEventPayload(
	raw []byte,
	configuration map[string]interface{}) ([]byte, error)

GenEventPayload generates a payload using the predefined templates and mapping functions for the input raw data. With the input data, a configuration map can be defined to fill values for the transformation not available in the raw data (application configuration like API keys, host information, versions ...) The method returns the result of the transformation.

If the template is not found for the input raw data a error will be raise. If one the parameters of the template is not possible to be filled using the input data (raw data and configuration map), an error will be raise. Conditional flows can be used for a non-mandatory data: https://golang.org/pkg/text/template/

type Queue

type Queue interface {
	// Connect start a connection with a queue, using the provided
	// context, storing the data from the queue in the stream chan
	// DataBlock, returning errors via the error chan and log the
	// process in the provided logger.
	//
	// The connection uses a default retry mechanism implemented,
	// that can depends on the library implementation. For a
	// custom retry parameters, the "ConnectCustomRetry" can be
	// used.
	Connect(context context.Context, stream chan DataBlock, done chan error, logger *log.Entry) error

	// ConnectCustomRetry start a connection with a queue, using
	// the provided context, storing the data from the queue in
	// the stream chan DataBlock, returning errors via the error
	// chan and log the process in the provided logger.
	//
	// The retry mechanism can be tuned via the specification the
	// maximum amount of retries and a function that define the
	// time (in milliseconds) between the retries, with the retry
	// counter as a parameter (starts on 0 -
	// https://www.cs.utexas.edu/users/EWD/ewd08xx/EWD831.html )
	ConnectCustomRetry(
		context context.Context,
		msgs chan DataBlock,
		done chan error,
		maxRetries int,
		retryFuncTime func(int) int,
		loggerInput *log.Entry) error
}

Queue is a general interface to connect with a library specified queue implementation

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL