goplumber

package module
v0.1.1-0...-4580db3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 14, 2022 License: Apache-2.0 Imports: 24 Imported by: 1

README

DISCONTINUATION OF PROJECT.

This project will no longer be maintained by Intel.

This project has been identified as having known security escapes.

Intel has ceased development and contributions including, but not limited to, maintenance, bug fixes, new releases, or updates, to this project.

Intel no longer accepts patches to this project.

Goplumber

Go plumb your data.

Goplumber is an ETL framework based around Go's templates. It's main purpose it facilitating declarative descriptions for scheduled tasks that involve downloading some data, verifying and/or reshaping it, and sending it to receiver.

Operation

While it's possible to create Pipelines entirely via code, the point of goplumber is declarative configs. Currently, that's accomplished via JSON files. You create a Plumber, give it Clients, and ask it to build Pipelines out of config data. You can then execute the Pipeline directly, or easily schedule it to run on a regular interval.

Pipeline

A Pipeline is as blueprint for a series of Tasks, i.e. descriptions of work.

As a quick example, if you want to download a file and publish it on an MQTT topic, you can create a pipeline like this:

{
  "name": "example",
  "trigger": {"interval": {"minutes": 10}},
  "tasks": {
    "download": {
      "type": "http",
      "raw": { "method": "GET", "url": "http://example.com" }
    },
    "send": {
      "type": "mqtt",
      "raw": {"name": "some/topic"},
      "links": { "value": { "from": "download" } }
    }
  }
}
Client

A Client handles Task execution. For example, and MQTT client describes which broker to connect to:

{
  "endpoint": "mosquitto-server:1883",
  "clientID": "goplumber"
}
Plumber

The Plumber creates Pipelines. When you create a Plumber, you give it the Clients it should use, then you give it a Pipeline config and ask it to create a Pipeline. The returned Pipeline can then be executed, scheduled to run on an interval, or be used as a Client for another Pipeline, effectively creating new Task types.

The goplumber library doesn't currently create Plumbers from config code - it's instead left to the calling code. This package includes a couple of commands, and one of them will read a JSON file and use it to build a Plumber and start running Pipelines:

goplumber -config /config/plumber.json

{
  "configDir": "/config",
  "mqttConfigFile": "mqttConf.json",
  "pipelineNames": [ "mypipeline.json" ],
  "customTasks": [ ]
}

If plumber.json contains this, the above command creates a Plumber that has a single MQTT client and a single Pipeline, both loaded from /config. The included Makefile runs this example.

Task Execution

Task input is specified in the configuration as either raw values or links. A task's name can be used to link it's output to other tasks' inputs. The params valid for a task depend on its type, as the type determines which client the plumber will use to satisfy the task.

goplumber builds a dependency graph of the tasks from the config. When the pipeline is run, it executes tasks in dependency order. It merges a task's raw and links parameters into a map which is then passed to the client.

Most tasks (see info about template tasks below) allow all parameters to be either raw or links. A key should not appear in both the raw and links section. All links must reference the name of a valid task in the pipeline.

While a pipeline is executing, some metadata is stored about its tasks -- things like when it started or stopped, or whether it contained an error. This data is stored as its status, which may be accessed in links via "using". The value of the link will be the corresponding piece of metadat.a

Template Tasks

Since template tasks are the main transform in goplumber, they have some special behavior. In Go, templates are defined within a namespace of templates, within which templates may call each other by name. When Go parses template data, it creates a map of template names in the namespace to their definitions; if it encounters a new template definition with the same name as one already in the namespace, the new definition overwrites the old one. This allows a user to create reusable, composable templates definitions.

A template task's namespace is formed by loading all of the names given in its namespaces list. Each namespace defines one or more templates, loaded in order. The template that's executed is the one given by template.

The input map for the template consists of the raw.initialData merged with the data stored in links. As for all tasks, the input is map[string][]byte, and the output is []byte. This allows, for instance, the output of a template to be the raw data for an http task, but it leads to a notable gotcha: if you wish to use the output of a template in a json context, the template must produce valid json data. Additionally, if the template wishes to use linked json object, it must first convert the json data to extract its properties.

Example:
{
  "myTemplateTask": {
    "type": "template",
    "raw": {
      "template": "myURLBuilderTemplate",
      "namespaces": ["urlTemplates"],
      "initialData": {
        "baseURL": "http://example.com/endpoint"
      }
    },
    "links": {
      "someParam": {"from": "anotherTask"}
    }    
  }
}
Template Functions

The following template functions are available (see code docs for more details):

  • timestamp: current Unix time in ms as an int64.
  • formatTime: convert int64 Unix ms timestamp to string.
  • formatCurrentTime: like above, but uses the current time.
  • outboundIP: returns a net.IP that the service uses for outbound requests.
  • int: converts a number, string, byte, or byte array to an int.
  • add: returns the sum of multiple float64s.
  • str: converts a []byte to a Go string.
  • bytes: converts a Go string to a []byte.
  • json: unmarshals []byte (or json.RawMessage) into a Go map[string]interface{}.
  • dec64: decodes a base-64 encoded string into a []byte
  • enc64: encodes a []byte to a base-64 encoded string.
  • join: same as strings.Join.
  • split: same as strings.Split.
  • splitN: same as strings.SplitN.
  • strIdx: same as strings.Index.
  • trimSpace: same as strings.TrimSpace.
  • err: immediately forces a template to stop execution and return an error.

Documentation

Index

Constants

View Source
const (
	Waiting = State(iota) // not running/failed
	Running
	Success
	Failed   // failed in an unrecoverable way, or exceeded retries
	Retrying // failed, but might be able to succeed after retry
)

Variables

This section is empty.

Functions

func Add

func Add(values ...interface{}) (int, error)

Add returns the sum of integer values.

func FormatCurrentTime

func FormatCurrentTime(fmt string) string

FormatCurrentTime returns the current UTC time formatted by the given string.

func FormatTime

func FormatTime(fmt string, t interface{}) (string, error)

FormatTime formats the given time according to the given format.

The time must either be a go time.Time, or an int, which is interpreted as the number of milliseconds since Jan 1, 1970, in UTC. Other types will return an error.

func GetOutboundIP

func GetOutboundIP() (net.IP, error)

func LoadNamespace

func LoadNamespace(source DataSource, namespaces []string) (*template.Template, error)

LoadNamespace loads a template namespace from a given source.

If the source is a FileSystem and id does not have an extension, .gotmpl is automatically appended to the id.

func RunNow

func RunNow(ctx context.Context, p *Pipeline)

func RunPipelineForever

func RunPipelineForever(ctx context.Context, p *Pipeline, d time.Duration)

RunPipelineForever repeatedly schedules the Pipeline's execution until the context is canceled.

The Pipeline will be executed once immediately. The given duration is waited after an execution completed; it is _NOT_ the amount of time between starts, but instead the time from one end to the next start.

If the duration is zero or negative, the Pipeline will execute as often as possible.

func SendTo

func SendTo(s Sink, key, value string) error

SendTo is a convenience wrapper to send a string value to a Sink without managing the context.

func TemplateError

func TemplateError(format string, args ...interface{}) (string, error)

func Timestamp

func Timestamp() int64

Timestamp returns the number of milliseconds since Jan 1, 1970 UTC.

func ToInt

func ToInt(i interface{}) (int, error)

ToInt attempts to convert the given interface to an integral type.

func ToJSON

func ToJSON(src []byte) (v interface{}, err error)

ToJSON unmarshals the byte array into an interface, which it returns.

func ToString

func ToString(x interface{}) string

ToString converts the given item to its string representation.

Types

type Client

type Client interface {
	GetPipe(task *Task) (Pipe, error)
}

A Client generates a Pipe from a Task definition.

func NewSinkClient

func NewSinkClient(sink Sink) Client

NewSinkClient returns a Client to store data in a given Sink.

func NewSourceClient

func NewSourceClient(source DataSource) Client

NewSourceClient returns a Client to load data from the DataSource.

func NewTaskType

func NewTaskType(pipeline *Pipeline) (Client, error)

NewTaskType returns a Client that can be used directly as new types.

The resulting generator generates Pipes that, when executed, execute the underlying Pipeline, making it possible to create complex new task types simply by composing existing pipeline definitions.

func NewTemplateClient

func NewTemplateClient(src DataSource) Client

NewTemplateClient returns a new Client that returns TemplatePipes.

type DataSource

type DataSource interface {
	Get(ctx context.Context, key string) (data []byte, wasPresent bool, err error)
}

DataSource returns data from a source, or possibly a default value if the key wasn't present. If the source returns a default, it should indicate this by returning `false` for wasPresent.

type FileSystem

type FileSystem struct {
	// contains filtered or unexported fields
}

FileSystem loads files from a base directory.

func NewFileSystem

func NewFileSystem(base string) FileSystem

NewFileSystem returns a new FileSystem using the given base directory.

The given base doesn't have to be absolute, but GetFile avoids "moving up" past the base directory path.

func (FileSystem) Get

func (fs FileSystem) Get(ctx context.Context, key string) ([]byte, bool, error)

Get implements the DataSource interface for an FSLoader.

func (FileSystem) GetFile

func (fs FileSystem) GetFile(name string) ([]byte, error)

GetFile returns the byte data stored in a file of a given name.

func (FileSystem) Put

func (fs FileSystem) Put(ctx context.Context, key string, value []byte) error

Put implements the Sink interface for an FSLoader.

The file is created with 0666 (before masking), matching that used by Create.

type HTTPTask

type HTTPTask struct {
	MaxRetries     int                 `json:"maxRetries"`
	Method         string              `json:"method,omitempty"`
	URL            string              `json:"url,omitempty"`
	Body           json.RawMessage     `json:"body,omitempty"`
	Headers        map[string][]string `json:"headers,omitempty"`
	SkipCertVerify bool                `json:"skipCertVerify"`
}

HTTPTask executes an HTTP request.

func (*HTTPTask) Execute

func (task *HTTPTask) Execute(ctx context.Context, w io.Writer, d linkMap) error

type InputTask

type InputTask struct {
	Default json.RawMessage `json:"default"`
}

InputTask serves input to Pipeline-based task types.

Optionally, Input tasks can include default values.

func (*InputTask) Execute

func (it *InputTask) Execute(ctx context.Context, w io.Writer, input linkMap) error

type Interval

type Interval struct {
	Milliseconds int64 `json:"milliseconds"`
	Seconds      int64 `json:"seconds"`
	Minutes      int64 `json:"minutes"`
	Hours        int64 `json:"hours"`
}

func (Interval) Duration

func (i Interval) Duration() time.Duration

Duration returns the duration represented by this Interval.

type JSONPipe

type JSONPipe func(task *Task) (Pipe, error)

JSONPipe adapts a function into a Client that unmarshal's the Task's raw data into the returned Pipe.

func (JSONPipe) GetPipe

func (f JSONPipe) GetPipe(task *Task) (Pipe, error)

type JSONValidationTask

type JSONValidationTask struct {
	Content     []byte `json:"content"`
	SchemaBytes []byte `json:"schema"`
}

func (*JSONValidationTask) Execute

func (jvt *JSONValidationTask) Execute(ctx context.Context, w io.Writer, d linkMap) error
type Link struct {
	Source string  `json:"from"`
	Using  *string `json:"using,omitempty"`
}

type LoadTask

type LoadTask struct {
	Key     string          `json:"name"`
	Default json.RawMessage `json:"default"` // default value, if not present
	// contains filtered or unexported fields
}

LoadTask loads data from a DataSource using a key. If the key isn't present, it'll return the Default, which may be nil.

func (*LoadTask) Execute

func (task *LoadTask) Execute(ctx context.Context, w io.Writer, d linkMap) error

Execute by loading data from a source.

type MQTTClient

type MQTTClient struct {
	ClientID       string
	Username       string
	Password       string
	Endpoint       string
	TimeoutSecs    int
	SkipCertVerify bool
	// contains filtered or unexported fields
}

func (*MQTTClient) Put

func (mqttClient *MQTTClient) Put(ctx context.Context, topic string, msg []byte) error

func (*MQTTClient) UnmarshalJSON

func (mqttClient *MQTTClient) UnmarshalJSON(data []byte) error

type MQTTTask

type MQTTTask struct {
	Message []json.RawMessage `json:"message"`
}

type MemoryStore

type MemoryStore struct {
	// contains filtered or unexported fields
}

MemoryStore is a concurrency-safe in-memory k/v store.

func NewMemoryStore

func NewMemoryStore() *MemoryStore

NewMemoryStore returns a new instance of a MemoryStore

func (*MemoryStore) Get

func (ms *MemoryStore) Get(ctx context.Context, key string) ([]byte, bool, error)

Get allows MemoryStore to act as a DataSource.

func (*MemoryStore) Put

func (ms *MemoryStore) Put(ctx context.Context, key string, value []byte) error

Put allows MemoryStore to act as a Sink.

type Pipe

type Pipe interface {
	// Execute executes an operation and writes its result to the given writer.
	// If the Pipe doesn't have a result, it may choose to simply not write
	// anything. An empty result isn't typically considered an error, but a Task
	// may choose to view an empty result as an error explicitly. The input map
	// represents data from Links, mapped as linkName: pipeOutput.
	Execute(ctx context.Context, w io.Writer, input map[string][]byte) error
}

Pipe implementers handle the work required to execute a Task.

type PipeFunc

type PipeFunc func(task *Task) (Pipe, error)

PipeFunc adapts a function into a Client without any modifications.

func (PipeFunc) GetPipe

func (f PipeFunc) GetPipe(task *Task) (Pipe, error)

type Pipeline

type Pipeline struct {
	// contains filtered or unexported fields
}

Pipeline is a series of Pipes through which data can flow.

Pipelines hold on to a reference to their underlying PipelineConfig. Multiple Pipelines can share the same PipelineConfig, and although you can execute a Pipeline concurrently multiple times, you should not modify the PipelineConfig while a Pipeline using it is running.

func (*Pipeline) Execute

func (pline *Pipeline) Execute(ctx context.Context) Status

Execute a Pipeline by running each task in dependency order until either the Pipeline is complete, or a Task indicates that the Pipeline should not continue.

Execution may also be canceled via the Context, but it is up to tasks to play nice and respect cancellation requests; at the very least, the pipeline will stop as soon as the current task completes.

type PipelineConfig

type PipelineConfig struct {
	Name          string  `json:"name"`
	Description   string  `json:"description"`
	Trigger       Trigger `json:"trigger"`
	Tasks         taskMap `json:"tasks"`
	TimeoutSecs   *int    `json:"timeoutSeconds,omitempty"`
	DefaultOutput *string `json:"defaultOutput"`
}

func (*PipelineConfig) Validate

func (pc *PipelineConfig) Validate() error

Validate a PipelineConfig by ensuring all required fields are set, all references have a known referent, and the task graph is acyclic.

type PipelineContext

type PipelineContext struct {
	// contains filtered or unexported fields
}

PipelineContext represents the current execution state of a Pipeline.

Every time a Pipeline is executed, a new PipelineContext is created for it, used to store the status and output of the various Pipes during execution.

type PipelineTask

type PipelineTask struct {
	OutputTask string                     `json:"outputTask"`
	Inputs     map[string]json.RawMessage `json:"inputs"`
	// contains filtered or unexported fields
}

PipelineTask allows you to use another Pipeline as if it were its own Task type.

The output of the Pipeline is the output of the task with name "OutputTask". Inputs may come from links, as usual, or may be given as "Defaults".

func (*PipelineTask) Execute

func (pt *PipelineTask) Execute(ctx context.Context, w io.Writer, linkedValues linkMap) error

Execute runs the attached Pipeline with data from other Pipes.

It sets up a new pipeline context and prepares it with the data from the Task's raw input values. It creates the initial job contexts, and for any Input tasks, it marks the job successfully completed and assigns it its value: either data linked data from a Pipe, or its default value if nothing was piped in.

After preparing the context, it executes the underlying Pipeline. When it completes, it finds the job associated with the output Task, checks its status, and sends the job's output to the writer.

Note that while the underlying pipeline runs in a different PipelineContext, they share the same Golang context.Context, so the "calling" pipeline's context controls timeouts -- not the pipeline on which this Pipe is based.

type Plumber

type Plumber struct {
	// Client define how the Plumber maps Tasks to Pipes.
	Clients map[string]Client
}

Plumber constructs Pipelines.

The Plumber keeps a map of task types to PipeGenerators, which interpret Task definitions to generate Pipes. The Plumber parses the config and links these pipes together.

func NewPlumber

func NewPlumber() Plumber

NewPlumber returns a new Plumber with the default task types.

You can modify the Clients map to change how tasks are constructed and which tasks are allowed.

By default, the Plumber includes the following task types: - http: execute an HTTPTask with the Go's default HTTP client - validation: validate some dat against an JSON schema - input: accept input from another task, as is used for Pipeline-based Clients.

func (Plumber) NewPipeline

func (plumber Plumber) NewPipeline(conf *PipelineConfig) (*Pipeline, error)

NewPipeline constructs a new Pipeline from a given PipelineConfig.

func (Plumber) SetClient

func (plumber Plumber) SetClient(taskType string, client Client)

SetClient configures a Plumber with a particular task type.

func (Plumber) SetSink

func (plumber Plumber) SetSink(taskType string, sink Sink)

SetSink configures the Plumber to send data to a given sink.

func (Plumber) SetSource

func (plumber Plumber) SetSource(taskType string, source DataSource)

SetSource configures the Plumber to supply data from a given source.

func (Plumber) SetTemplateSource

func (plumber Plumber) SetTemplateSource(taskType string, source DataSource)

SetTemplateSource adds a template client under the given name.

type Settings

type Settings struct {
	Trigger       Trigger `json:"trigger"`
	DefaultOutput *string `json:"defaultOutput"`
}

type SimpleJSONPipe

type SimpleJSONPipe func() Pipe

SimpleJSONPipe adapts functions that just return a new instance of a Pipe into PipeGenerators that unmarshals the task's Raw data into the Pipe.

func (SimpleJSONPipe) GetPipe

func (f SimpleJSONPipe) GetPipe(task *Task) (Pipe, error)

type Sink

type Sink interface {
	Put(ctx context.Context, key string, value []byte) error
}

Sink accepts a single <key, value> pair, presumably to store it.

type State

type State int

State represents the current execution state of a task or pipeline.

func (State) MarshalJSON

func (state State) MarshalJSON() ([]byte, error)

func (State) String

func (state State) String() string

type Status

type Status struct {
	State       State
	StartedAt   time.Time
	CompletedAt time.Time
	Err         error
	Attempts    int // number of times executed
}

Status tracks the status of a Pipeline or Job.

func (*Status) Duration

func (s *Status) Duration() time.Duration

Duration returns the Duration of time between CompletedAt and StartedAt.

If CompletedAt.IsZero() is true (likely implying that the State is Waiting), then this method returns a Duration of 0.

func (Status) MarshalJSON

func (s Status) MarshalJSON() ([]byte, error)

type StoreTask

type StoreTask struct {
	Key   string `json:"name"`
	Value json.RawMessage
	// contains filtered or unexported fields
}

StoreTask sends data to a Sink.

func (*StoreTask) Execute

func (task *StoreTask) Execute(ctx context.Context, w io.Writer, d linkMap) error

type Task

type Task struct {
	TaskType         string          `json:"type"`
	Raw              json.RawMessage `json:"raw,omitempty"`
	Links            map[string]Link `json:"links,omitempty"`        // dependencies
	Successes        []string        `json:"ifSuccessful,omitempty"` // tasks that must finish, but data doesn't matter
	Failures         []string        `json:"ifFailed,omitempty"`     // not currently used
	StopIfEmpty      bool            `json:"stopIfEmpty"`
	ErrorIfEmpty     bool            `json:"errorIfEmpty"`
	DisableResultLog bool            `json:"disableResultLog"`
	ContinueOnError  bool            `json:"continueOnError"`
	// contains filtered or unexported fields
}

Task tells a Plumber how it should construct a specific Pipe.

Just as Blueprints are configuration for Pipelines, Tasks are configuration for Pipes. It's up to the Plumber to interpret how the Task should be satisfied.

Tasks can be directly unmarshaled from JSON.

func (*Task) Dependencies

func (t *Task) Dependencies(f func(taskName string))

type TemplatePipe

type TemplatePipe struct {
	// contains filtered or unexported fields
}

func (*TemplatePipe) Execute

func (ts *TemplatePipe) Execute(ctx context.Context, w io.Writer, links linkMap) error

type TemplateTask

type TemplateTask struct {
	Data         map[string]json.RawMessage `json:"initialData,omitempty"`
	Namespaces   []string                   `json:"namespaces"`
	TemplateName string                     `json:"template"`
}

type Trigger

type Trigger struct {
	Interval Interval `json:"interval,omitempty"` // run the pipeline every N intervals

}

Directories

Path Synopsis
cmd

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL