orchestrator

package module
v0.1.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 17, 2023 License: BSD-3-Clause Imports: 7 Imported by: 9

README

orchestrator

GoDoc Go Report Card

Package orchestrator provides orchestration and supervision of data pipelines.

These pipelines are made up of inputs and processes; an input is a long running function which listens to events (such as database triggers, or kafka topics, or webhooks, or anything really) and a process is a job (such as a docker container, or webhook dispatcher, or kubernetes job, or anything that runs once) that does something with that event.

By building a system of inputs, processes, and persistence layers it becomes easy to build sophisticated data pipelines.

For instance, consider the ecommerce analytics pipeline where:

  1. A user places an order
  2. A backend microservice of some sort places that order into a kafka topic for back-office processing
  3. An analytics consumer slurps these messages into a datawarehouse somewhere
  4. A write to this datawarehouse is picked up by an input
  5. That input triggers a process which runs a series of validations and enrichement processes
  6. The process then writes the enriched, validated to a different table in the warehouse
  7. That write triggers a different input, which listens for enriched data
  8. That input triggers a process which does some kind of final cleansing for gold standard reporting

In effect, this gives us:

[svc] -> (topic() -> [consumer] -> {data warehouse} -> [input -> [encrichment process]] -> {data warehouse} -> [input -> [reporting process]] -> {reporting system}

Or, if you like, a way of building a lightweight medallion architecture

When should you use this package?

This package is useful for building customised data pipeline orchestrators, and for building customised components (where off the shelf components, such as Databricks, are fiddly or unable to be customised to the same level).

This package is also useful for running pipelines cheaply, or locally- it requires no outside service (unless you write that into your own service), and doesn't need complicated masters/worker configurations or anything else really

When should you not use this package?

This package will not give you the same things that off the shelf tools, such as databricks, will give you. There's no easy way to see DAGs, no simple API for updating configuration (unless you write your own).

This package wont do a lot of what you might need; it exists to serve as the engine of a pipeline tool; you must build the rest yourself.

Sub Packages


Readme created from Go doc with goreadme

Documentation

Overview

Package orchestrator provides orchestration and supervision of data pipelines.

These pipelines are made up of inputs and processes; an input is a long running function which listens to events (such as database triggers, or kafka topics, or webhooks, or anything really) and a process is a job (such as a docker container, or webhook dispatcher, or kubernetes job, or anything that runs once) that does something with that event.

By building a system of inputs, processes, and persistence layers it becomes easy to build sophisticated data pipelines.

For instance, consider the ecommerce analytics pipeline where:

1. A user places an order 2. A backend microservice of some sort places that order into a kafka topic for back-office processing 3. An analytics consumer slurps these messages into a datawarehouse somewhere 4. A write to this datawarehouse is picked up by an input 5. That input triggers a process which runs a series of validations and enrichement processes 6. The process then writes the enriched, validated to a different table in the warehouse 7. That write triggers a different input, which listens for enriched data 8. That input triggers a process which does some kind of final cleansing for gold standard reporting

In effect, this gives us:

[svc] -> (topic() -> [consumer] -> {data warehouse} -> [input -> [encrichment process]] -> {data warehouse} -> [input -> [reporting process]] -> {reporting system}

Or, if you like, a way of building a lightweight [medallion architecture](https://www.databricks.com/glossary/medallion-architecture)

When should you use this package?

This package is useful for building customised data pipeline orchestrators, and for building customised components (where off the shelf components, such as Databricks, are fiddly or unable to be customised to the same level).

This package is also useful for running pipelines cheaply, or locally- it requires no outside service (unless you write that into your own service), and doesn't need complicated masters/worker configurations or anything else really

When should you _not_ use this package?

This package will not give you the same things that off the shelf tools, such as databricks, will give you. There's no easy way to see DAGs, no simple API for updating configuration (unless you write your own).

This package wont do a lot of what you might need; it exists to serve as the engine of a pipeline tool; you must build the rest yourself.

Index

Constants

This section is empty.

Variables

View Source
var ConcurrentProcessors int64 = 8

ConcurrentProcessors limits the number of processes which can be kicked off at once

Functions

This section is empty.

Types

type Event

type Event struct {
	// Location could be a table name, a topic, an event from some store,
	// or anything really- it is up to both the Input and the Process to
	// agree on what this means
	Location  string    `json:"location"`
	Operation Operation `json:"operation"`
	ID        string    `json:"id"`

	// Trigger is the name or ID of the Input which triggers this
	// process, which can be useful for routing/ flow control in
	// triggers
	Trigger string `json:"trigger"`
}

Event represents basic metadata that each Input provides

func (Event) JSON

func (e Event) JSON() (string, error)

JSON returns the json representation for an event, in a way that our processes can later work with

type Input

type Input interface {
	// Handle inputs from this input source, creating Events and
	// streaming down the Event channel
	Handle(context.Context, chan Event) error
	ID() string
}

Input is a simple interface, and exposes a long running process called Handle which is expected to stream Events.

It is the job of the Orchestrator to understand which channel is assigned to which input and to route messages accordingly

type InputConfig

type InputConfig struct {
	Name             string      `toml:"name"`
	Type             string      `toml:"type"`
	ConnectionString string      `toml:"connection_string"`
	Operations       []Operation `toml:"operation"`
}

InputConfig contains the necessary values for coniguring an Input, such as how to connect to the input source, and the operations the input supports

func (InputConfig) ID

func (ic InputConfig) ID() string

ID returns a (hopefully) unique value for this InputConfig

type NewInputFunc added in v0.1.1

type NewInputFunc func(InputConfig) (Input, error)

NewInputFunc is the suggested function that an Input should be instantiated with and, as such, can be used when creating a registry of Inputs an orchestrator supports when creating Inputs dynamically say from a config file, or from an API.

For instance:

var inputs = map[string]orchestrator.NewInputFunc{
   "postgres": orchestrator.NewPostgresInput,
   "webhook": webhooks.NewWebhookInput,
}
func createInput(cfg orchestrator.InputConfig) (orchestrator.Input, error) {
   return inputs[cfg.Type](cfg)
}

type NewProcessFunc added in v0.1.1

type NewProcessFunc func(ProcessConfig) (Process, error)

NewProcessFunc is the suggested function that an Process should be instantiated with and, as such, can be used when creating a registry of Processs an orchestrator supports when creating Processs dynamically say from a config file, or from an API.

For instance:

var processs = map[string]orchestrator.NewProcessFunc{
   "docker": orchestrator.NewContainerProcess,
   "webhook": webhooks.NewWebhookProcess,
}
func createProcess(cfg orchestrator.ProcessConfig) (orchestrator.Process, error) {
   return processs[cfg.Type](cfg)
}

type Operation

type Operation uint8

Operation represents one of the basic CRUD operations on a piece of data and can be used in Inputs to do clever things around ignoring certain events

const (
	OperationUnknown Operation = iota
	OperationCreate
	OperationRead
	OperationUpdate
	OperationDelete
)

Supported set of operations

func (Operation) MarshalJSON

func (o Operation) MarshalJSON() ([]byte, error)

MarshalJSON implements the json.Marshaler interface which allows an Operation to be represented in json (which is really a json string)

func (Operation) MarshalText

func (o Operation) MarshalText() (b []byte, err error)

MarshalText implements the encoding.TextMarshaler interface in order to get a textual representation of an Operation

func (Operation) String

func (o Operation) String() string

String returns the string representation of an Operation, or "unknown" for any Operation value it doesn't know about

func (*Operation) UnmarshalJSON

func (o *Operation) UnmarshalJSON(b []byte) (err error)

UnmarshalJSON implements the json.Unmarshaler interface, allowing for the operation type to be represented in json properly

func (*Operation) UnmarshalText

func (o *Operation) UnmarshalText(b []byte) error

UnmarshalText implements the encoding.TextUnmarshaler interface allowing for a byte slice containing certain crud operations to be cast to Operations

type Orchestrator

type Orchestrator struct {
	*dag.DAG

	ErrorChan chan error
	// contains filtered or unexported fields
}

Orchestrator is the workhorse of this package. It:

  1. Supervises inputs
  2. Manages the lifecycle of processes, which run on events
  3. Syncs events from inputs across multiple processes in a DAG

Multiple Orchestrators _can_ be run, like in a cluster, but out of the box doesn't contain any logic to synchronise inputs and/or processes that wont cluster natively (such as the postgres sample input)

func New

func New() *Orchestrator

New returns an Orchestrator ready for use

func (*Orchestrator) AddInput

func (d *Orchestrator) AddInput(ctx context.Context, i Input) (err error)

AddInput takes an Input, adds it to the Orchestrator's DAG, and runs it ready for events to flow through

AddInput will error when duplicate input IDs are specified. Any other error from the running of an Input comes via the Orchestrator's ErrorChan - this is because Inputs are run in separate goroutines

func (d Orchestrator) AddLink(input Input, process Process) (err error)

AddLink accepts an Input and a Process, and links them so that when the input triggers an event, the specified process is called

func (Orchestrator) AddProcess

func (d Orchestrator) AddProcess(p Process) error

AddProcess adds a Process to the Orchestrator's DAG, ready to be triggered by Inputs.

Processes are not run until an Event is generated by an Input, and that Input is linked to the specified Process.

This means long running processes with state should either be re-architected to use some kind of persistence level, or should be a separate service which exposes (say) a webhook or similar trigger

type Process

type Process interface {
	Run(context.Context, Event) (ProcessStatus, error)
	ID() string
}

Process is an interface which processes must implement

The inteface is pretty simple: given a

type ProcessConfig

type ProcessConfig struct {
	Name             string            `toml:"name"`
	Type             string            `toml:"type"`
	ExecutionContext map[string]string `toml:"execution_context"`
}

ProcessConfig contains configuration options for processes, including an unkeyed map[string]string for arbitrary values

func (ProcessConfig) ID

func (pc ProcessConfig) ID() string

ID returns a (hopefully) unique value for this ProcessConfig

type ProcessExitStatus

type ProcessExitStatus uint8

ProcessExitStatus represents the final status of a Process

const (
	ProcessUnknown ProcessExitStatus = iota
	ProcessUnstarted
	ProcessSuccess
	ProcessFail
)

Provided set of ExitStatuses

type ProcessInterfaceConversionError

type ProcessInterfaceConversionError struct {
	// contains filtered or unexported fields
}

ProcessInterfaceConversionError returns when trying to load a process from our internal process store returns completely unexpected data

This error represents a huge failure somewhere and should cause a stop-the-world event

func NewTestProcessInterfaceConversionError

func NewTestProcessInterfaceConversionError(input, process string, iface any) ProcessInterfaceConversionError

NewTestProcessInterfaceConversionError can be used to return a testable error (in tests)

func (ProcessInterfaceConversionError) Error

Error returns a descriptive error message

type ProcessStatus

type ProcessStatus struct {
	Name   string
	Logs   []string
	Status ProcessExitStatus
}

ProcessStatus contains various bits and pieces a process might return, such as logs and statuscodes and so on

type UnknownProcessError

type UnknownProcessError struct {
	// contains filtered or unexported fields
}

UnknownProcessError returns when an input tries to trigger a process whch doesn't exist

func NewTestUnknownProcessError

func NewTestUnknownProcessError(input, process string) UnknownProcessError

NewTestUnknownProcessError can be used to return a testable error (in tests)

func (UnknownProcessError) Error

func (e UnknownProcessError) Error() string

Error returns a descriptive error message

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL