goflow

package module
v1.0.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 10, 2023 License: MIT Imports: 14 Imported by: 0

README

Build Status codecov Go Report Card GoDoc Release

Goflow

A workflow/DAG orchestrator written in Go for rapid prototyping of ETL/ML/AI pipelines. Goflow comes complete with a web UI for inspecting and triggering jobs.

Contents

  1. Quick start
  2. Use case
  3. Concepts and features
    1. Jobs and tasks
    2. Custom Operators
    3. Retries
    4. Task dependencies
    5. Trigger rules
    6. The Goflow engine

Quick start

With Docker
docker run -p 8181:8181 ghcr.io/fieldryand/goflow-example:latest

Browse to localhost:8181 to explore the UI.

goflow-demo

Without Docker

In a fresh project directory:

go mod init # create a new module
go get github.com/fieldryand/goflow # install dependencies

Create a file main.go with contents:

package main

import "github.com/fieldryand/goflow"

func main() {
        options := goflow.Options{
                AssetBasePath: "assets/",
                StreamJobRuns: true,
                ShowExamples:  true,
        }
        gf := goflow.New(options)
        gf.Use(goflow.DefaultLogger())
        gf.Run(":8181")
}

Download the front-end from the release page, untar it, and move it to the location specified in goflow.Options.AssetBasePath. Now run the application with go run main.go and see it in the browser at localhost:8181.

Use case

Goflow was built as a simple replacement for Apache Airflow to manage some small data pipeline projects. Airflow started to feel too heavyweight for these projects where all the computation was offloaded to independent services, but there was still a need for basic orchestration, concurrency, retries, visibility etc.

Goflow prioritizes ease of deployment over features and scalability. If you need distributed workers, backfilling over time slices, a durable database of job runs, etc, then Goflow is not for you. On the other hand, if you want to rapidly prototype some pipelines, then Goflow might be a good fit.

Concepts and features

  • Job: A Goflow workflow is called a Job. Jobs can be scheduled using cron syntax.
  • Task: Each job consists of one or more tasks organized into a dependency graph. A task can be run under certain conditions; by default, a task runs when all of its dependencies finish successfully.
  • Concurrency: Jobs and tasks execute concurrently.
  • Operator: An Operator defines the work done by a Task. Goflow comes with a handful of basic operators, and implementing your own Operator is straightforward.
  • Retries: You can allow a Task a given number of retry attempts. Goflow comes with two retry strategies, ConstantDelay and ExponentialBackoff.
  • Database: Goflow supports two database types, in-memory and BoltDB. BoltDB will persist your history of job runs, whereas in-memory means the history will be lost each time the Goflow server is stopped. The default is BoltDB.
  • Streaming: Goflow uses server-sent events to stream the status of jobs and tasks to the UI in real time.
Jobs and tasks

Let's start by creating a function that returns a job called myJob. There is a single task in this job that sleeps for one second.

package main

import (
	"errors"

	"github.com/fieldryand/goflow"
)

func myJob() *goflow.Job {
	j := &goflow.Job{Name: "myJob", Schedule: "* * * * *", Active: true}
	j.Add(&goflow.Task{
		Name:     "sleepForOneSecond",
		Operator: goflow.Command{Cmd: "sleep", Args: []string{"1"}},
	})
	return j
}

By setting Active: true, we are telling Goflow to apply the provided cron schedule for this job when the application starts. Job scheduling can be activated and deactivated from the UI.

Custom operators

A custom Operator needs to implement the Run method. Here's an example of an operator that adds two positive numbers.

type PositiveAddition struct{ a, b int }

func (o PositiveAddition) Run() (interface{}, error) {
	if o.a < 0 || o.b < 0 {
		return 0, errors.New("Can't add negative numbers")
	}
	result := o.a + o.b
	return result, nil
}
Retries

Let's add a retry strategy to the sleepForOneSecond task:

func myJob() *goflow.Job {
	j := &goflow.Job{Name: "myJob", Schedule: "* * * * *"}
	j.Add(&goflow.Task{
		Name:       "sleepForOneSecond",
		Operator:   goflow.Command{Cmd: "sleep", Args: []string{"1"}},
		Retries:    5,
		RetryDelay: goflow.ConstantDelay{Period: 1},
	})
	return j
}

Instead of ConstantDelay, we could also use ExponentialBackoff (see https://en.wikipedia.org/wiki/Exponential_backoff).

Task dependencies

A job can define a directed acyclic graph (DAG) of independent and dependent tasks. Let's use the SetDownstream method to define two tasks that are dependent on sleepForOneSecond. The tasks will use the PositiveAddition operator we defined earlier, as well as a new operator provided by Goflow, Get.

func myJob() *goflow.Job {
	j := &goflow.Job{Name: "myJob", Schedule: "* * * * *"}
	j.Add(&goflow.Task{
		Name:       "sleepForOneSecond",
		Operator:   goflow.Command{Cmd: "sleep", Args: []string{"1"}},
		Retries:    5,
		RetryDelay: goflow.ConstantDelay{Period: 1},
	})
	j.Add(&goflow.Task{
		Name:       "getGoogle",
		Operator:   goflow.Get{Client: &http.Client{}, URL: "https://www.google.com"},
	})
	j.Add(&goflow.Task{
		Name:       "AddTwoPlusThree",
		Operator:   PositiveAddition{a: 2, b: 3},
	})
	j.SetDownstream(j.Task("sleepForOneSecond"), j.Task("getGoogle"))
	j.SetDownstream(j.Task("sleepForOneSecond"), j.Task("AddTwoPlusThree"))
	return j
}
Trigger rules

By default, a task has the trigger rule allSuccessful, meaning the task starts executing when all the tasks directly upstream exit successfully. If any dependency exits with an error, all downstream tasks are skipped, and the job exits with an error.

Sometimes you want a downstream task to execute even if there are upstream failures. Often these are situations where you want to perform some cleanup task, such as shutting down a server. In such cases, you can give a task the trigger rule allDone.

Let's modify sleepForOneSecond to have the trigger rule allDone.

func myJob() *goflow.Job {
	// other stuff
	j.Add(&goflow.Task{
		Name:        "sleepForOneSecond",
		Operator:    goflow.Command{Cmd: "sleep", Args: []string{"1"}},
		Retries:     5,
		RetryDelay:  goflow.ConstantDelay{Period: 1},
		TriggerRule: "allDone",
	})
	// other stuff
}
The Goflow Engine

Finally, let's create a Goflow engine, register our job, attach a logger, and run the application.

func main() {
	gf := goflow.New(goflow.Options{StreamJobRuns: true})
	gf.AddJob(myJob)
	gf.Use(goflow.DefaultLogger())
	gf.Run(":8181")
}

You can pass different options to the engine. Options currently supported:

  • AssetBasePath: The path containing the UI assets, usually assets/.
  • DBType: boltdb (default) or memory
  • BoltDBPath: This will be the filepath of the Bolt database on disk.
  • StreamJobRuns: Whether to stream updates to the UI.
  • ShowExamples: Whether to show the example jobs.

Goflow is built on the Gin framework, so you can pass any Gin handler to Use.

Documentation

Overview

Package goflow implements a web UI-based workflow orchestrator inspired by Apache Airflow.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func DefaultLogger added in v0.2.0

func DefaultLogger() gin.HandlerFunc

DefaultLogger returns the default logging middleware.

Types

type Command added in v0.8.0

type Command struct {
	Cmd  string
	Args []string
}

Command executes a shell command.

func (Command) Run added in v0.8.0

func (o Command) Run() (interface{}, error)

Run passes the command and arguments to exec.Command and captures the output.

type ConstantDelay added in v0.3.0

type ConstantDelay struct{ Period int }

ConstantDelay waits a constant number of seconds between task retries.

type ExponentialBackoff added in v0.3.0

type ExponentialBackoff struct{}

ExponentialBackoff waits exponentially longer between each retry attempt.

type Get added in v0.4.0

type Get struct {
	Client *http.Client
	URL    string
}

Get makes a GET request.

func (Get) Run added in v0.4.0

func (o Get) Run() (interface{}, error)

Run sends the request and returns an error if the status code is outside the 2xx range.

type Goflow

type Goflow struct {
	Options Options
	Jobs    map[string](func() *Job)
	// contains filtered or unexported fields
}

Goflow contains job data and a router.

func New added in v0.4.0

func New(opts Options) *Goflow

New returns a Goflow engine.

func (*Goflow) AddJob added in v0.4.0

func (g *Goflow) AddJob(jobFn func() *Job) *Goflow

AddJob takes a job-emitting function and registers it with the engine.

func (*Goflow) Run added in v0.4.0

func (g *Goflow) Run(port string)

Run runs the webserver.

func (*Goflow) Use added in v0.4.0

func (g *Goflow) Use(middleware gin.HandlerFunc) *Goflow

Use middleware in the Gin router.

type Job

type Job struct {
	Name     string
	Tasks    map[string]*Task
	Schedule string
	Dag      dag
	Active   bool
	// contains filtered or unexported fields
}

A Job is a workflow consisting of independent and dependent tasks organized into a graph.

func (*Job) Add added in v0.4.0

func (j *Job) Add(t *Task) *Job

Add a task to a job.

func (*Job) SetDownstream

func (j *Job) SetDownstream(ind, dep *Task) *Job

SetDownstream sets a dependency relationship between two tasks in the job. The dependent task is downstream of the independent task and waits for the independent task to finish before starting execution.

func (*Job) Task added in v0.2.0

func (j *Job) Task(name string) *Task

Task getter

type Operator added in v0.2.0

type Operator interface {
	Run() (interface{}, error)
}

An Operator implements a Run() method. When a job executes a task that uses the operator, the Run() method is called.

type Options added in v0.6.0

type Options struct {
	DBType        string
	BoltDBPath    string
	AssetBasePath string
	StreamJobRuns bool
	ShowExamples  bool
}

Options to control various Goflow behavior.

type PositiveAddition added in v0.9.1

type PositiveAddition struct {
	// contains filtered or unexported fields
}

PositiveAddition adds two nonnegative numbers.

func (PositiveAddition) Run added in v0.9.1

func (o PositiveAddition) Run() (interface{}, error)

Run implements the custom operation

type Post added in v0.8.0

type Post struct {
	Client *http.Client
	URL    string
	Body   io.Reader
}

Post makes a POST request.

func (Post) Run added in v0.8.0

func (o Post) Run() (interface{}, error)

Run sends the request and returns an error if the status code is outside the 2xx range.

type RetryDelay added in v0.3.0

type RetryDelay interface {
	// contains filtered or unexported methods
}

RetryDelay is a type that implements a Wait() method, which is called in between task retry attempts.

type Task

type Task struct {
	Name        string
	Operator    Operator
	TriggerRule triggerRule
	Retries     int
	RetryDelay  RetryDelay
	// contains filtered or unexported fields
}

A Task is the unit of work that makes up a job. Whenever a task is executed, it calls its associated operator.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL