ouretl

package module
v0.0.0-...-d537bbd Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 20, 2018 License: MIT Imports: 0 Imported by: 1

README

Shared definitions for use in ouretl-core

This library contain a set of shared definitions used in ouretl-core and its plugins. Any plugin must adhere to the definitions in this library to be valid and runnable plugins in a ouretl-core instance.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Config

type Config interface {
	PluginDefinitions() []PluginDefinition
	AppendPluginDefinition(pdef PluginDefinition) error
	OnPluginDefinitionAdded(func(PluginDefinition))
	OnPluginDefinitionActivated(func(PluginDefinition))
	OnPluginDefinitionDeactivated(func(PluginDefinition))
	Activate(pdef PluginDefinition)
	Deactivate(pdef PluginDefinition)
}

Config is an abstraction for configuration settings, usually loaded from a TOML based configuration file. It describes which plugins to use, in which order they run, and how to handle plugin settings. `inherit_settings_from_env` is a boolean that indicates if there are plugin settings available as environment variables. If `true` then any plugin setting will first try to load from it's corresponding environment variable, and secondly try to load from a specified `settings_file` for the plugin.

Example of a config file: ----- inherit_settings_from_env = true

[plugin] name = "kafka-stream-reader" path = "/usr/share/ouretl/plugins/ouretl-plugin-kafka-stream-reader.so.1.0.0" version = "1.0.0" priority = 1 settings_file = "/usr/share/ouretl/config-kafka-stream-reader.toml"

[plugin] name = "elasticsearch-writer" path = "/usr/share/ouretl/plugins/ouretl-plugin-elasticsearch-writer.so.4.1.0" version = "4.1.0" priority = 10 settings_file = "/usr/share/ouretl/config-elasticsearch-writer.toml" -----

Example of a plugin settings file: ----- string_variable = "value" int_variable = 1 -----

type DataHandlerPlugin

type DataHandlerPlugin interface {
	Handle(dm DataMessage, next func([]byte) error) error
}

DataHandlerPlugin defines a handler for a data message, where any type of handling can occur. A DataHandlerPlugin can be used either as a data transformation step or as a ETL `sink`. When the plugin has completed it's job, it should always call the provided `next` function with current state of the data message - unless an error occur. The `next` function, available as a function parameter, is a caller for the next plugin in line.

Example: ----- package main

type pluginDef struct{}

type jsonMessage struct {
    DataMessage string `json:"data"`
    ReceivedAt string `json:"received_at"`
}
func GetHandler(_ ouretl.Config, _ ouretl.PluginSettings) (ouretl.DataHandlerPlugin, error) {
    return &pluginDef{}
}
func (def *pluginDef) Handle(data ouretl.DataMessage, next func([]byte) error) error {
    s := string(data.Content())
    m := &jsonMessage{
        DataMessage: s,
        ReceivedAt: time.Now().Local().Format("2006-01-02 15:04:05"),
    }

    output, err := json.Marshal(m)
    if err != nil {
        return err
    }

    return next(output)
}

-----

type DataMessage

type DataMessage interface {
	ID() string
	Data() []byte
	Origin() string
}

DataMessage is a wrapper abstraction for the raw message provided to the DataHandlerPlugins. Apart from the actual data content, the message has an `Origin` field containing a WorkerPlugin name and an `ID` field with a unique message ID.

type PluginDefinition

type PluginDefinition interface {
	Name() string
	FilePath() string
	Version() string
	Priority() int
	IsActive() bool
	Settings() PluginSettings
}

PluginDefinition is an abstraction for a plugin, and describes its runtime behavior and load it

type PluginSettings

type PluginSettings interface {
	Get(key string) (interface{}, bool)
}

PluginSettings is an abstraction to access plugin specific settings, simply pass the setting name to extract the correct value, either from an environment variable or from the plugins settings file. Casting to the actual type is necessary when retrieving a setting value; ----- myValue, ok := psettings.Get("int_variable")

if ok {
    myIntValue := myValue.(int)
}

-----

type WorkerPlugin

type WorkerPlugin interface {
	Start(func([]byte)) error
}

WorkerPlugin defines a worker (or ETL `source`) that's always running, under ouretl-core. A worker plugin can choose to publish messages into the plugin pipeline, using the provided `target` function.

Example: ----- package main

type pluginDef struct{
     portNumber string
}
func GetWorker(_ ouretl.Config, settings ouretl.PluginSettings) (ouretl.WorkerPlugin, error) {
    return &pluginDef{
        portNumber: settings.Get("PortSettings"),
    }
}
func (def *pluginDef) Start(target func([]byte)) error {
    router := httprouter.New()
    router.POST("/", func(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
        defer r.Body.Close()

        message, err := ioutil.ReadAll(r.Body)
        if err != nil {
            w.WriteHeader(http.StatusBadRequest)
            return
        }

        target(message)
        w.WriteHeader(http.StatusAccepted)
    })

    addr := fmt.Sprintf(":%s", def.portNumber)
    return http.ListenAndServe(addr, router)
}

-----

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL