pipeline

package module
v1.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 26, 2022 License: GPL-3.0 Imports: 11 Imported by: 15

README



Pipeline

Build Status Coverage Go Report Card GoDoc Release

Pipeline is a GPL3-licensed Go package for building, executing and representing pipelines (aka workflows / templates).

Getting started

  • API documentation and examples are available via godoc.
  • The examples directory contains more elaborate example applications.
  • No specific mocks are needed for testing, every element is completely decoupled and atomic. You can create your own ones however you deem fit.

API stability

Pipeline follows semantic versioning and provides API stability via the gopkg.in service. You can import a version with a guaranteed stable API via http://gopkg.in/saantiaguilera/go-pipeline.v1

Documentation

Overview

The pipeline API allows us to create pipelines/workflows/templates were we can link different units of works (we will call them steps) between them to produce a graph of work.

Creating a step

A step is a contract that allows us to represent or run a unit of work. A step requires an input and may return an output or an error depending on whether it failed or not. A step is declared as

pipeline.Step[Input, Output]

Steps are considered the backbone of the API. The API already provides a set of steps that should suffice to create any type of pipeline, but there may be specific scenarios were the given API gets too verbose or its not enough. In these type of scenarios we can create our own custom steps to match our needs.

The steps provided by the API are:

UnitStep

The most simple and atomic step. This step lets us run a single unit of work.

var step pipeline.Step[InputData, OutputData] = pipeline.NewUnitStep[InputData, OutputData](
  "name_of_the_step",
  func(ctx context.Context, in InputData) (OutputData, error) {
    // do stuff with the InputData, returning Outputdata or error
  },
)

SequentialStep

A sequential step allows us to "link" two steps together sequentially.

var firstStep pipeline.Step[int, string]
var secondStep pipeline.Step[string, bool]

// in:  int
// out: bool
var sequentialStep pipeline.Step[int, bool] = pipeline.NewSequentialStep[int, string, bool](firstStep, secondStep)

ConcurrentStep

A concurrent step allows us to "link" multiple steps concurrently and once they're done reduce them to a single output.

var concurrentSteps []pipeline.Step[int, string]
var reducer func(context.Context, a, b string) (string, error)

// in: int
// out: string
var concurrentStep pipeline.Step[int, string] = pipeline.NewConcurrentStep[int, string](concurrentSteps, reducer)

ConditionalStep

A conditional step allows us to evaluate a condition and depending on its result branch to specific step. This step allows us to branch the graph in two different branches.

var trueWayStep pipeline.Step[InputData, OutputData]
var falseWayStep pipeline.Step[InputData, OutputData]

var statement pipeline.Statement[InputData] = pipeline.NewStatement(
  "name_of_the_statement",
  func(ctx context.Context, in InputData) bool {
    // evaluate statement and return branching mode
  }
)
var cond pipeline.Step[InputData, OutputData] = pipeline.NewConditionalStep(statement, trueWayStep, falseWayStep)

OptionalStep

An optional step is similar to a conditional one, although it only has a single branch. It either runs the given Step or it skips it (returning the initial input), depending on the result of the statement evaluation.

var step pipeline.Step[InputData, InputData]

var statement pipeline.Statement[InputData] = pipeline.NewStatement(
  "name_of_the_statement",
  func(ctx context.Context, in InputData) bool {
    // evaluate statement and return true to run / false to skip
  }
)
var opt pipeline.Step[InputData, InputData] = pipeline.NewOptionalStep(statement, step)

It also supports altering the output, but when doing so you need to provide how to default to it when the step is skipped

var step pipeline.Step[InputData, OutputData]

var statement pipeline.Statement[InputData] = pipeline.NewStatement(
  "name_of_the_statement",
  func(ctx context.Context, in InputData) bool {
    // evaluate statement and return true to run / false to skip
  }
)
var def pipeline.Unit[InputData, OutputData] = func(ctx context.Context, in InputData) (OutputData, error) {
  // create default output data for when the step is skipped because the statement evaluation was false
}
var opt pipeline.Step[InputData, OutputData] = pipeline.NewOptionalStepWithDefault(statement, step, def)

Creating a custom step

Steps need to comply to an extremely simple interface.

  type Step[I, O any] interface {
    Draw(pipeline.Graph) // lets us represent a step in a graph
	   Run(context.Context, I) (O, error) // lets us evaluate the step
  }

Hence, we can create our own custom steps by simply creating a struct that matches the given contract. There are no restrictions besides these two so it's highly flexible when wanting to create custom behaviors or logics.

For example, a step that always succeeds and doesn't mutate the result might be:

type ImmutableStepThatAlwaysSucceeds[I any] struct {
  name string
  fn   func(ctx context.Context, in I)
}

func (s ImmutableStepThatAlwaysSucceeds[I]) Draw(g pipeline.Graph) {
  g.AddActivity(s.name)
}

func (s ImmutableStepThatAlwaysSucceeds[I]) Run(ctx context.Context, in I) (I, error) {
  s.fn(ctx, in)
  return in, nil
}

func main() {
  var s pipeline.Step[int, int] = ImmutableStepThatAlwaysSucceeds[int]{
    name: "example",
    fn: func(ctx context.Context, in int) {
      // do something.
    }
  }
}

Run a pipeline

Running a pipeline is as simple as running the final step. You will need a context of your own (steps are context aware) and an initial input so the graph can be traversed with it and mutate it to yield a final output.

var step  pipeline.Step[InputStruct, OutputStruct]
var input InputStruct
var ctx   context.Context

res, err := step.Run(ctx, input) // res is of type OutputStruct

Rendering a graph

You can render a graph by simply creating a graph and drawing the steps on it Eg. for rendering an UML you should do

var step pipeline.Step[InputStruct, OutputStruct]

graph := pipeline.NewUMLGraph()
renderer := pipeline.NewUMLRenderer(pipeline.UMLOptions{
  Type: pipeline.UMLFormatSVG,
})
file, _ := os.Create("output_file.svg")

step.Draw(graph)

err := renderer.Render(graph, file)
Example (Basic)

Example basic showcases a simple graph that uses the basic API steps to produce a simple result based on a given input.

The input will be mutated across different steps (incrementing or doubling it) and finally, print if it's a 3 digit number or not

For showing purposes, all steps and pipeline building are in the same function and use basic parameter types and logics (we don't showcase a real life usecase with infrastructure / http calls / etc), just note that it's quite similar.

In the examples directory you can find more elaborate samples on how to do this better.

inc := pipeline.NewUnitStep( // int -> int
	"increase_number",
	func(ctx context.Context, i int) (int, error) {
		return i + 20, nil
	},
)
double := pipeline.NewUnitStep( // int -> int
	"double_number",
	func(ctx context.Context, i int) (int, error) {
		return i * 2, nil
	},
)
toString := pipeline.NewUnitStep( // int -> string
	"to_string",
	func(ctx context.Context, i int) (string, error) {
		return fmt.Sprintf("%d", i), nil
	},
)
threeDigit := pipeline.NewUnitStep( // string -> bool
	"number_is_three_digit",
	func(ctx context.Context, s string) (bool, error) {
		return len(s) == 3, nil
	},
)
print := pipeline.NewUnitStep( // bool -> bool
	"print",
	func(ctx context.Context, b bool) (bool, error) {
		fmt.Println(b)
		return b, nil
	},
)

// built from end to start
printThreeDigit := pipeline.NewSequentialStep[string, bool, bool](threeDigit, print)
stringAndEnd := pipeline.NewSequentialStep[int, string, bool](toString, printThreeDigit)
doubleAndEnd := pipeline.NewSequentialStep[int, int, bool](double, stringAndEnd)
graph := pipeline.NewSequentialStep[int, int, bool](inc, doubleAndEnd)

graph.Run(context.Background(), 30)
graph.Run(context.Background(), 20)
Output:

true
false
Example (Complex)

Example complex showcases a complex graph that uses most of the API steps to produce a simple result based on a given input.

The input will be mutated across different steps (incrementing or doubling it) and finally, print if it's a 3 digit number or not

For showing purposes, all steps and pipeline building are in the same function and use basic parameter types and logics (we don't showcase a real life usecase with infrastructure / http calls / etc), just note that it's quite similar.

In the examples directory you can find more elaborate samples on how to do this better.

inc := pipeline.NewUnitStep( // int -> int
	"increase_number",
	func(ctx context.Context, i int) (int, error) {
		return i + 1, nil
	},
)
double := pipeline.NewUnitStep( // int -> int
	"double_number",
	func(ctx context.Context, i int) (int, error) {
		return i * 2, nil
	},
)
toString := pipeline.NewUnitStep( // int -> string
	"to_string",
	func(ctx context.Context, i int) (string, error) {
		return fmt.Sprintf("%d", i), nil
	},
)
threeDigit := pipeline.NewUnitStep( // string -> bool
	"number_is_three_digit",
	func(ctx context.Context, s string) (bool, error) {
		return len(s) == 3, nil
	},
)
cond := pipeline.NewOptionalStep[int](
	pipeline.NewStatement(
		"multiply_if_even",
		func(ctx context.Context, i int) bool {
			return i%2 == 0
		},
	),
	double,
)
concurrentInc := pipeline.NewConcurrentStep( // int -> int
	[]pipeline.Step[int, int]{inc, inc, inc, inc, inc, inc, inc, inc, inc, inc},
	func(ctx context.Context, i1, i2 int) (int, error) {
		return i1 + i2, nil
	},
)
print := pipeline.NewUnitStep( // bool -> bool
	"print",
	func(ctx context.Context, b bool) (bool, error) {
		fmt.Println(b)
		return b, nil
	},
)

// built from end to start
threeDigitAndPrint := pipeline.NewSequentialStep[string, bool, bool](threeDigit, print)
toSringAndEnd := pipeline.NewSequentialStep[int, string, bool](toString, threeDigitAndPrint)
doubleAndEnd := pipeline.NewSequentialStep[int, int, bool](double, toSringAndEnd)
conditionAndEnd := pipeline.NewSequentialStep[int, int, bool](cond, doubleAndEnd)
graph := pipeline.NewSequentialStep[int, int, bool](concurrentInc, conditionAndEnd)

graph.Run(context.Background(), 2)
graph.Run(context.Background(), 1)
Output:

true
false

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type ConcurrentStep added in v0.5.0

type ConcurrentStep[I, O any] struct {
	// contains filtered or unexported fields
}

ConcurrentStep wraps multiple steps of a given Input/Output and runs them concurrently, later reducing them into a single output of the same type.

Example (Different)

The example uses dummy data and simulates a specific (random) scenario were we need to get a resource that is created from two different data sources (and we can obtain them concurrently to improve the performance) and finally reduce it to a single output

Note: we use several UnitStep to showcase as it allows us to easily run dummy code, but it could use any type of step you want as long as it implements pipeline.Step[I, O]

type DriverID int
type Driver struct {
	Person  any
	Vehicle any
}

gp := pipeline.NewUnitStep(
	"get_person",
	func(ctx context.Context, i DriverID) (Driver, error) {
		// do something with input
		return Driver{
			Person: true,
		}, nil
	},
)
gv := pipeline.NewUnitStep(
	"get_vehicle",
	func(ctx context.Context, i DriverID) (Driver, error) {
		// do something with input
		return Driver{
			Vehicle: true,
		}, nil
	},
)
reduce := func(ctx context.Context, a, b Driver) (Driver, error) {
	if b.Person != nil {
		a.Person = b.Person
	}
	if b.Vehicle != nil {
		a.Vehicle = b.Vehicle
	}
	return a, nil
}
ctx := context.Background()
in := DriverID(1)

pipe := pipeline.NewConcurrentStep(
	[]pipeline.Step[DriverID, Driver]{gp, gv},
	reduce,
)

out, err := pipe.Run(ctx, in)

fmt.Println(out, err)
Output:

{true true} <nil>
Example (Same)

This example shows a same step that is run many times concurrently.

The example uses dummy data to better showcase the immutability of the graph and step since we can produce different values and later reduce them without needing to take into account goroutines, mutexes, waitgroups to synchronize data at the end

Note: we use several UnitStep to showcase as it allows us to easily run dummy code, but it could use any type of step you want as long as it implements pipeline.Step[I, O]

step := pipeline.NewUnitStep(
	"half_increase",
	func(ctx context.Context, i int) (float32, error) {
		return float32(i) * 1.5, nil
	},
)
reduce := func(ctx context.Context, a, b float32) (float32, error) {
	return a + b, nil
}
ctx := context.Background()
in := 1

pipe := pipeline.NewConcurrentStep(
	[]pipeline.Step[int, float32]{
		step, step, step, step, step, step,
		step, step, step, step, step, step,
	},
	reduce,
)

out, err := pipe.Run(ctx, in)

fmt.Println(out, err)
Output:

18 <nil>

func NewConcurrentStep added in v0.5.0

func NewConcurrentStep[I, O any](steps []Step[I, O], reduce reducer[O]) ConcurrentStep[I, O]

NewConcurrentStep creates a step that will run each of the inner steps concurrently. The step will wait for all of the steps to finish before returning.

If one of them fails, the step will wait until everyone finishes and after that return the first encountered error.

This step (as all the others) doesn't handle panics. Be careful since this step creates goroutines and the panics not necessarilly will be signaled in the same goroutine as the origin call. Make sure to handle panics on your own if your code is unsafe (through decorations / deferrals in steps / etc)

func (ConcurrentStep[I, O]) Draw added in v0.5.0

func (c ConcurrentStep[I, O]) Draw(graph Graph)

func (ConcurrentStep[I, O]) Run added in v0.5.0

func (c ConcurrentStep[I, O]) Run(ctx context.Context, in I) (O, error)

Run the step concurrently, if one of them fails an error will be returned.

This step waits for all of the concurrent ones to finish.

Note that this step may use goroutines and (as all other steps) doesn't handle panics, hence it is advise to handle them on your own if you can't guarantee a panic-safe environment.

type ConditionalStep added in v0.5.0

type ConditionalStep[I, O any] struct {
	// contains filtered or unexported fields
}

ConditionalStep allows a step to branch into two inner steps depending on the result of a statement's evaluation.

This conditional allows us to branch into two "different pipelines" depending on the result

Example

The following example evaluates a dummy condition and depending on the result it branches to one step or another.

This example uses dummy data to showcase as simple as possible this scenario.

Note: we use several UnitStep to showcase as it allows us to easily run dummy code, but it could use any type of step you want as long as it implements pipeline.Step[I, O]

type User any
type Data any
stmt := pipeline.NewStatement(
	"check_something",
	func(ctx context.Context, in User) bool {
		// check and return were to branch
		return true
	},
)
tf := pipeline.NewUnitStep(
	"true_case",
	func(ctx context.Context, in User) (Data, error) {
		// do something with input
		return Data(true), nil
	},
)
ff := pipeline.NewUnitStep(
	"false_step",
	func(ctx context.Context, u User) (Data, error) {
		// do something with input
		return Data(false), nil
	},
)
ctx := context.Background()
in := User(nil)

pipe := pipeline.NewConditionalStep[User, Data](stmt, tf, ff)

out, err := pipe.Run(ctx, in)

fmt.Println(out, err)
Output:

true <nil>

func NewConditionalStep added in v0.5.0

func NewConditionalStep[I, O any](statement conditionalStatement[I], t, f Step[I, O]) ConditionalStep[I, O]

NewConditionalStep creates a conditional step that will run a statement. If it holds true, then the "true" step will be run. Else, the "false" step will be called. If a statement is nil, then it will be considered to hold false (thus, the "false" step is called) If one of the steps is nil and the statement is such, then an error will be triggered (you probably want an OptionalStep if one of the branches can be nil).

func (ConditionalStep[I, O]) Draw added in v0.5.0

func (c ConditionalStep[I, O]) Draw(graph Graph)

func (ConditionalStep[I, O]) Run added in v0.5.0

func (c ConditionalStep[I, O]) Run(ctx context.Context, in I) (O, error)

Run one of the provided steps depending on the statement's evaluation.

type DrawableGraph added in v0.5.0

type DrawableGraph interface {
	Draw(graph Graph)
}

DrawableGraph is a contract for drawing in graphs

type Graph added in v0.5.0

type Graph interface {
	// AddConcurrency branching as many times as needed (each branch is a concurrent/fork 'node')
	AddConcurrency(branches ...GraphDrawer)
	// AddDecision from a given statement, allowing inner graphs for each branch of the decision
	AddDecision(statement string, yes GraphDrawer, no GraphDrawer)
	// Create an action entry
	AddActivity(label string)
}

Graph interface allowing to create a representation/drawing of a graph

type GraphDrawer added in v0.5.0

type GraphDrawer = func(Graph)

GraphDrawer alias for Draw(Graph) functions

type OptionalStep added in v0.5.0

type OptionalStep[I, O any] struct {
	// contains filtered or unexported fields
}

OptionalStep is a step that may or may not run depending on a statement

Example

The following example shows an optional step were it will be run if the evaluated statement yields true, otherwise it will return the same input it was provided this type of constructed step doesn't allow output mutation as we don't know how to default the mutated output if the step is skipped

This example uses dummy data to showcase as simple as possible this scenario.

Note: we use several UnitStep to showcase as it allows us to easily run dummy code, but it could use any type of step you want as long as it implements pipeline.Step[I, O]

type User any
stmt := pipeline.NewStatement(
	"check_something",
	func(ctx context.Context, in User) bool {
		// check and return if we should run optional or not
		return in == User(1)
	},
)
of := pipeline.NewUnitStep(
	"optional_case",
	func(ctx context.Context, in User) (User, error) {
		// do something with input
		return User(true), nil
	},
)
ctx := context.Background()

pipe := pipeline.NewOptionalStep[User](stmt, of)

// Skips optional step (returns same input)
out, err := pipe.Run(ctx, User(12))
fmt.Println(out, err)

// Runs optional step (returns step output)
out, err = pipe.Run(ctx, User(1))
fmt.Println(out, err)
Output:

12 <nil>
true <nil>
Example (Default)

The following example allows us to define an optional step with different output from its input, and in case the optional step is skipped it will call a provided default function for it to return the default output you want for that case

This example uses dummy data to showcase as simple as possible this scenario.

Note: we use several UnitStep to showcase as it allows us to easily run dummy code, but it could use any type of step you want as long as it implements pipeline.Step[I, O]

type User any
type Data any
stmt := pipeline.NewStatement(
	"check_something",
	func(ctx context.Context, in User) bool {
		// check and return if we should run optional or not
		return in == User(1)
	},
)
of := pipeline.NewUnitStep(
	"optional_case",
	func(ctx context.Context, in User) (Data, error) {
		// do something with input
		return Data(true), nil
	},
)
def := func(ctx context.Context, in User) (Data, error) {
	// create default value of type Data (gets run if we
	// skip the step as we don't know how to default Data type)
	return Data(false), nil
}
ctx := context.Background()

pipe := pipeline.NewOptionalStepWithDefault[User, Data](stmt, of, def)

// Skips optional step (returns default)
out, err := pipe.Run(ctx, User(12))
fmt.Println(out, err)

// Runs optional step (returns step output)
out, err = pipe.Run(ctx, User(1))
fmt.Println(out, err)
Output:

false <nil>
true <nil>

func NewOptionalStep added in v0.5.0

func NewOptionalStep[T any](stmt Statement[T], s Step[T, T]) OptionalStep[T, T]

NewOptionalStep creates a step that may run the provided step if the statement evaluates correctly if the statement yields false, then the same input will be forwarded as output.

func NewOptionalStepWithDefault added in v0.5.0

func NewOptionalStepWithDefault[I, O any](stmt Statement[I], s Step[I, O], def Unit[I, O]) OptionalStep[I, O]

NewOptionalStepWithDefault creates a step that may run the provided step if the statement evaluates correctly if the statement yields false, then a default unit will be run to forward an output O

func (OptionalStep[I, O]) Draw added in v0.5.0

func (c OptionalStep[I, O]) Draw(graph Graph)

func (OptionalStep[I, O]) Run added in v0.5.0

func (c OptionalStep[I, O]) Run(ctx context.Context, in I) (O, error)

Run a step or skip it depending on the result of a statement evaluation

type SequentialStep added in v0.5.0

type SequentialStep[I, M, O any] struct {
	// contains filtered or unexported fields
}

SequentialStep runs two steps sequentially.

A sequential step allows partial mutation between the two steps.

Eg.
The first step can have an input of type 'A' and an output of type 'B'
The second step can have an input of type 'B' and an output of type 'C'.
  Hence, the sequential step is a step that goes from input 'A', to output 'C'
    (mutating patially into 'B' in the middle)

If one of the steps fails, the step is halted and the error is returned

Example

The following example shows a sequence between two steps were each of them is run sequentially with the output of the previous one as input

This example uses dummy data to showcase as simple as possible this scenario.

Note: we use several UnitStep to showcase as it allows us to easily run dummy code, but it could use any type of step you want as long as it implements pipeline.Step[I, O]

type DriverID int
type Driver any
type NotificationReceipt any
gd := pipeline.NewUnitStep(
	"get_driver",
	func(ctx context.Context, id DriverID) (Driver, error) {
		// do something with input
		return Driver(id), nil
	},
)
sn := pipeline.NewUnitStep(
	"send_notification",
	func(ctx context.Context, d Driver) (NotificationReceipt, error) {
		// do something with input
		return NotificationReceipt(25), nil
	},
)

pipe := pipeline.NewSequentialStep[DriverID, Driver, NotificationReceipt](gd, sn)

out, err := pipe.Run(context.Background(), DriverID(1234))

fmt.Println(out, err)
Output:

25 <nil>

func NewSequentialStep added in v0.5.0

func NewSequentialStep[I, M, O any](s Step[I, M], e Step[M, O]) SequentialStep[I, M, O]

NewSequentialStep creates step that will run each of the steps sequentially. If one of them fails, the operation will abort immediately

func (SequentialStep[I, M, O]) Draw added in v0.5.0

func (s SequentialStep[I, M, O]) Draw(graph Graph)

func (SequentialStep[I, M, O]) Run added in v0.5.0

func (s SequentialStep[I, M, O]) Run(ctx context.Context, in I) (O, error)

Run both steps sequentially. If one of them fails, the step is halted and the error is returned.

type Statement

type Statement[T any] struct {
	// contains filtered or unexported fields
}

Statement is a structure that can be evaluated to yield a boolean result

Example

This examples shows a simple statement that lets us evaluate it with a given input to yield a boolean result

This example uses dummy data to showcase as simple as possible this scenario.

stmt := pipeline.NewStatement(
	"is_number_odd",
	func(ctx context.Context, i int) bool {
		return i%2 != 0
	},
)

out := stmt.Evaluate(context.Background(), 25)

fmt.Println(out)
Output:

true

func NewAnonymousStatement added in v0.5.0

func NewAnonymousStatement[T any](eval func(context.Context, T) bool) Statement[T]

NewAnonymousStatement creates an anonymous statement with no representation, that will evaluate to the given evaluation

func NewStatement added in v0.5.0

func NewStatement[T any](name string, eval func(context.Context, T) bool) Statement[T]

NewStatement creates a statement represented by the given name, that will evaluate to the given evaluation

func (Statement[T]) Evaluate

func (s Statement[T]) Evaluate(ctx context.Context, v T) bool

func (Statement[T]) Name added in v0.5.0

func (s Statement[T]) Name() string

type Step

type Step[I, O any] interface {
	DrawableGraph

	// Run a step. Returns an error if this step fails to complete.
	// An input I is provided as a mean of communication between different units of work
	Run(context.Context, I) (O, error)
}

Step is runnable element that yields a result or error from a given input A step can be drawn into a graph to represent it.

type TracedStep added in v0.5.0

type TracedStep[I, O any] struct {
	// contains filtered or unexported fields
}

TracedStep decorates a step with tracing capabilities, logging the duration of the step execution, time of execution and its result.

func NewTracedStep added in v0.5.0

func NewTracedStep[I, O any](name string, step Step[I, O]) TracedStep[I, O]

NewTracedStep creates traced step that will log the execution time of the step to the stdout

func NewTracedStepWithWriter added in v0.5.0

func NewTracedStepWithWriter[I, O any](name string, step Step[I, O], writer io.Writer) TracedStep[I, O]

NewTracedStepWithWriter creates traced step that will log the execution time of the step to the writer

func (TracedStep[I, O]) Draw added in v0.5.0

func (t TracedStep[I, O]) Draw(graph Graph)

func (TracedStep[I, O]) Run added in v0.5.0

func (t TracedStep[I, O]) Run(ctx context.Context, in I) (O, error)

type UMLGraph added in v0.5.0

type UMLGraph struct {
	// contains filtered or unexported fields
}

UMLGraph represents a graph that can render itself into UML

func NewUMLGraph added in v0.5.0

func NewUMLGraph() *UMLGraph

NewUMLGraph createsn UML Activity graph diagram that represents one

func (*UMLGraph) AddActivity added in v0.5.0

func (p *UMLGraph) AddActivity(label string)

func (*UMLGraph) AddConcurrency added in v0.5.0

func (p *UMLGraph) AddConcurrency(forks ...GraphDrawer)

func (*UMLGraph) AddDecision added in v0.5.0

func (p *UMLGraph) AddDecision(statement string, yes GraphDrawer, no GraphDrawer)

func (*UMLGraph) String added in v0.5.0

func (p *UMLGraph) String() string

type UMLOptions

type UMLOptions struct {
	// Type of the drawing, by default we will use UMLFormatSVG
	Type UMLOutputFormat
	// Base URL to use for retrieving Plant UML graphs, by default we will use http://www.plantuml.com/plantuml/
	BaseURL string
}

UMLOptions available when drawing a graph

type UMLOutputFormat

type UMLOutputFormat string

UMLOutputFormat for graph renderings

const (

	// UMLFormatPNG OutputFormat for graph renderings (a UMLFormatPNG image will be created)
	UMLFormatPNG UMLOutputFormat = "png"
	// UMLFormatSVG OutputFormat for graph renderings (an UMLFormatSVG image will be created)
	UMLFormatSVG UMLOutputFormat = "svg"
	// UMLFormatRaw OutputFormat for graph renderings (a file with the raw contents will be created)
	UMLFormatRaw UMLOutputFormat = "raw"
	// UMLFormatTXT OutputFormat for graph renderings (an ASCII Art will be created)
	UMLFormatTXT UMLOutputFormat = "txt"
)

type UMLRenderer added in v0.5.0

type UMLRenderer struct {
	Options UMLOptions
}

UMLRenderer allows us to render graphs into an UML diagram output

func NewUMLRenderer added in v0.5.0

func NewUMLRenderer(options UMLOptions) *UMLRenderer

NewUMLRenderer creates an UML renderer for drawing graphs as specified

func (*UMLRenderer) Render added in v0.5.0

func (u *UMLRenderer) Render(graphDiagram umlRendererGraph, output io.WriteCloser) error

Render draws in UML activity the given step, and writes it to the given file

type Unit added in v0.5.0

type Unit[I, O any] func(context.Context, I) (O, error)

Unit of work to yield a result of type O (or an error in case of a failure) from a given input I

type UnitStep added in v0.5.0

type UnitStep[I, O any] struct {
	// contains filtered or unexported fields
}

UnitStep for making a unit of work.

Example

The following example shows a simple unit step that runs a unit of work with a given input and yields a result of a different type or an error depending on the execution

This example uses dummy data to showcase as simple as possible this scenario.

Note: we use several UnitStep to showcase as it allows us to easily run dummy code, but it could use any type of step you want as long as it implements pipeline.Step[I, O]

type (
	InData  any
	OutData any
)
step := pipeline.NewUnitStep(
	"do_something",
	func(ctx context.Context, in InData) (OutData, error) {
		// do something with input
		return OutData(in), nil
	},
)

out, err := step.Run(context.Background(), InData("example"))

fmt.Println(out, err)
Output:

example <nil>

func NewUnitStep added in v0.5.0

func NewUnitStep[I, O any](name string, run Unit[I, O]) UnitStep[I, O]

NewUnitStep creates an immutable stateless unit of work based on a Unit function You can use this implementation when your use-cases will be completely stateless (they don't rely on a service or anything that can be injected at the start and stay immutable for the lifetime of the process)

func (UnitStep[I, O]) Draw added in v0.5.0

func (s UnitStep[I, O]) Draw(graph Graph)

Draw this step in a graph

func (UnitStep[I, O]) ID added in v0.5.0

func (s UnitStep[I, O]) ID() string

ID is a unique identifier of this step. You can safely assume it wont be repeated and use it in any custom steps to enrich logic (eg. a circuit breaker / cache for IDs)

func (UnitStep[I, O]) Name added in v0.5.0

func (s UnitStep[I, O]) Name() string

Name to identify a step. You shouldn't assume this name is unique per step but rather use it to understand what this is / does / represent

func (UnitStep[I, O]) Run added in v0.5.0

func (s UnitStep[I, O]) Run(ctx context.Context, in I) (O, error)

Run a step and yield a result of type O or an error if it failed. This operation is context-aware.

Directories

Path Synopsis
examples

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL