Documentation ¶
Overview ¶
The pipeline API allows us to create pipelines/workflows/templates were we can link different units of works (we will call them steps) between them to produce a graph of work.
Creating a step ¶
A step is a contract that allows us to represent or run a unit of work. A step requires an input and may return an output or an error depending on whether it failed or not. A step is declared as
pipeline.Step[Input, Output]
Steps are considered the backbone of the API. The API already provides a set of steps that should suffice to create any type of pipeline, but there may be specific scenarios were the given API gets too verbose or its not enough. In these type of scenarios we can create our own custom steps to match our needs.
The steps provided by the API are:
UnitStep ¶
The most simple and atomic step. This step lets us run a single unit of work.
var step pipeline.Step[InputData, OutputData] = pipeline.NewUnitStep[InputData, OutputData]( "name_of_the_step", func(ctx context.Context, in InputData) (OutputData, error) { // do stuff with the InputData, returning Outputdata or error }, )
SequentialStep ¶
A sequential step allows us to "link" two steps together sequentially.
var firstStep pipeline.Step[int, string] var secondStep pipeline.Step[string, bool] // in: int // out: bool var sequentialStep pipeline.Step[int, bool] = pipeline.NewSequentialStep[int, string, bool](firstStep, secondStep)
ConcurrentStep ¶
A concurrent step allows us to "link" multiple steps concurrently and once they're done reduce them to a single output.
var concurrentSteps []pipeline.Step[int, string] var reducer func(context.Context, a, b string) (string, error) // in: int // out: string var concurrentStep pipeline.Step[int, string] = pipeline.NewConcurrentStep[int, string](concurrentSteps, reducer)
ConditionalStep ¶
A conditional step allows us to evaluate a condition and depending on its result branch to specific step. This step allows us to branch the graph in two different branches.
var trueWayStep pipeline.Step[InputData, OutputData] var falseWayStep pipeline.Step[InputData, OutputData] var statement pipeline.Statement[InputData] = pipeline.NewStatement( "name_of_the_statement", func(ctx context.Context, in InputData) bool { // evaluate statement and return branching mode } ) var cond pipeline.Step[InputData, OutputData] = pipeline.NewConditionalStep(statement, trueWayStep, falseWayStep)
OptionalStep ¶
An optional step is similar to a conditional one, although it only has a single branch. It either runs the given Step or it skips it (returning the initial input), depending on the result of the statement evaluation.
var step pipeline.Step[InputData, InputData] var statement pipeline.Statement[InputData] = pipeline.NewStatement( "name_of_the_statement", func(ctx context.Context, in InputData) bool { // evaluate statement and return true to run / false to skip } ) var opt pipeline.Step[InputData, InputData] = pipeline.NewOptionalStep(statement, step)
It also supports altering the output, but when doing so you need to provide how to default to it when the step is skipped
var step pipeline.Step[InputData, OutputData] var statement pipeline.Statement[InputData] = pipeline.NewStatement( "name_of_the_statement", func(ctx context.Context, in InputData) bool { // evaluate statement and return true to run / false to skip } ) var def pipeline.Unit[InputData, OutputData] = func(ctx context.Context, in InputData) (OutputData, error) { // create default output data for when the step is skipped because the statement evaluation was false } var opt pipeline.Step[InputData, OutputData] = pipeline.NewOptionalStepWithDefault(statement, step, def)
Creating a custom step ¶
Steps need to comply to an extremely simple interface.
type Step[I, O any] interface { Draw(pipeline.Graph) // lets us represent a step in a graph Run(context.Context, I) (O, error) // lets us evaluate the step }
Hence, we can create our own custom steps by simply creating a struct that matches the given contract. There are no restrictions besides these two so it's highly flexible when wanting to create custom behaviors or logics.
For example, a step that always succeeds and doesn't mutate the result might be:
type ImmutableStepThatAlwaysSucceeds[I any] struct { name string fn func(ctx context.Context, in I) } func (s ImmutableStepThatAlwaysSucceeds[I]) Draw(g pipeline.Graph) { g.AddActivity(s.name) } func (s ImmutableStepThatAlwaysSucceeds[I]) Run(ctx context.Context, in I) (I, error) { s.fn(ctx, in) return in, nil } func main() { var s pipeline.Step[int, int] = ImmutableStepThatAlwaysSucceeds[int]{ name: "example", fn: func(ctx context.Context, in int) { // do something. } } }
Run a pipeline ¶
Running a pipeline is as simple as running the final step. You will need a context of your own (steps are context aware) and an initial input so the graph can be traversed with it and mutate it to yield a final output.
var step pipeline.Step[InputStruct, OutputStruct] var input InputStruct var ctx context.Context res, err := step.Run(ctx, input) // res is of type OutputStruct
Rendering a graph ¶
You can render a graph by simply creating a graph and drawing the steps on it Eg. for rendering an UML you should do
var step pipeline.Step[InputStruct, OutputStruct] graph := pipeline.NewUMLGraph() renderer := pipeline.NewUMLRenderer(pipeline.UMLOptions{ Type: pipeline.UMLFormatSVG, }) file, _ := os.Create("output_file.svg") step.Draw(graph) err := renderer.Render(graph, file)
Example (Basic) ¶
Example basic showcases a simple graph that uses the basic API steps to produce a simple result based on a given input.
The input will be mutated across different steps (incrementing or doubling it) and finally, print if it's a 3 digit number or not
For showing purposes, all steps and pipeline building are in the same function and use basic parameter types and logics (we don't showcase a real life usecase with infrastructure / http calls / etc), just note that it's quite similar.
In the examples directory you can find more elaborate samples on how to do this better.
inc := pipeline.NewUnitStep( // int -> int "increase_number", func(ctx context.Context, i int) (int, error) { return i + 20, nil }, ) double := pipeline.NewUnitStep( // int -> int "double_number", func(ctx context.Context, i int) (int, error) { return i * 2, nil }, ) toString := pipeline.NewUnitStep( // int -> string "to_string", func(ctx context.Context, i int) (string, error) { return fmt.Sprintf("%d", i), nil }, ) threeDigit := pipeline.NewUnitStep( // string -> bool "number_is_three_digit", func(ctx context.Context, s string) (bool, error) { return len(s) == 3, nil }, ) print := pipeline.NewUnitStep( // bool -> bool "print", func(ctx context.Context, b bool) (bool, error) { fmt.Println(b) return b, nil }, ) // built from end to start printThreeDigit := pipeline.NewSequentialStep[string, bool, bool](threeDigit, print) stringAndEnd := pipeline.NewSequentialStep[int, string, bool](toString, printThreeDigit) doubleAndEnd := pipeline.NewSequentialStep[int, int, bool](double, stringAndEnd) graph := pipeline.NewSequentialStep[int, int, bool](inc, doubleAndEnd) graph.Run(context.Background(), 30) graph.Run(context.Background(), 20)
Output: true false
Example (Complex) ¶
Example complex showcases a complex graph that uses most of the API steps to produce a simple result based on a given input.
The input will be mutated across different steps (incrementing or doubling it) and finally, print if it's a 3 digit number or not
For showing purposes, all steps and pipeline building are in the same function and use basic parameter types and logics (we don't showcase a real life usecase with infrastructure / http calls / etc), just note that it's quite similar.
In the examples directory you can find more elaborate samples on how to do this better.
inc := pipeline.NewUnitStep( // int -> int "increase_number", func(ctx context.Context, i int) (int, error) { return i + 1, nil }, ) double := pipeline.NewUnitStep( // int -> int "double_number", func(ctx context.Context, i int) (int, error) { return i * 2, nil }, ) toString := pipeline.NewUnitStep( // int -> string "to_string", func(ctx context.Context, i int) (string, error) { return fmt.Sprintf("%d", i), nil }, ) threeDigit := pipeline.NewUnitStep( // string -> bool "number_is_three_digit", func(ctx context.Context, s string) (bool, error) { return len(s) == 3, nil }, ) cond := pipeline.NewOptionalStep[int]( pipeline.NewStatement( "multiply_if_even", func(ctx context.Context, i int) bool { return i%2 == 0 }, ), double, ) concurrentInc := pipeline.NewConcurrentStep( // int -> int []pipeline.Step[int, int]{inc, inc, inc, inc, inc, inc, inc, inc, inc, inc}, func(ctx context.Context, i1, i2 int) (int, error) { return i1 + i2, nil }, ) print := pipeline.NewUnitStep( // bool -> bool "print", func(ctx context.Context, b bool) (bool, error) { fmt.Println(b) return b, nil }, ) // built from end to start threeDigitAndPrint := pipeline.NewSequentialStep[string, bool, bool](threeDigit, print) toSringAndEnd := pipeline.NewSequentialStep[int, string, bool](toString, threeDigitAndPrint) doubleAndEnd := pipeline.NewSequentialStep[int, int, bool](double, toSringAndEnd) conditionAndEnd := pipeline.NewSequentialStep[int, int, bool](cond, doubleAndEnd) graph := pipeline.NewSequentialStep[int, int, bool](concurrentInc, conditionAndEnd) graph.Run(context.Background(), 2) graph.Run(context.Background(), 1)
Output: true false
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type ConcurrentStep ¶ added in v0.5.0
type ConcurrentStep[I, O any] struct { // contains filtered or unexported fields }
ConcurrentStep wraps multiple steps of a given Input/Output and runs them concurrently, later reducing them into a single output of the same type.
Example (Different) ¶
The example uses dummy data and simulates a specific (random) scenario were we need to get a resource that is created from two different data sources (and we can obtain them concurrently to improve the performance) and finally reduce it to a single output
Note: we use several UnitStep to showcase as it allows us to easily run dummy code, but it could use any type of step you want as long as it implements pipeline.Step[I, O]
type DriverID int type Driver struct { Person any Vehicle any } gp := pipeline.NewUnitStep( "get_person", func(ctx context.Context, i DriverID) (Driver, error) { // do something with input return Driver{ Person: true, }, nil }, ) gv := pipeline.NewUnitStep( "get_vehicle", func(ctx context.Context, i DriverID) (Driver, error) { // do something with input return Driver{ Vehicle: true, }, nil }, ) reduce := func(ctx context.Context, a, b Driver) (Driver, error) { if b.Person != nil { a.Person = b.Person } if b.Vehicle != nil { a.Vehicle = b.Vehicle } return a, nil } ctx := context.Background() in := DriverID(1) pipe := pipeline.NewConcurrentStep( []pipeline.Step[DriverID, Driver]{gp, gv}, reduce, ) out, err := pipe.Run(ctx, in) fmt.Println(out, err)
Output: {true true} <nil>
Example (Same) ¶
This example shows a same step that is run many times concurrently.
The example uses dummy data to better showcase the immutability of the graph and step since we can produce different values and later reduce them without needing to take into account goroutines, mutexes, waitgroups to synchronize data at the end
Note: we use several UnitStep to showcase as it allows us to easily run dummy code, but it could use any type of step you want as long as it implements pipeline.Step[I, O]
step := pipeline.NewUnitStep( "half_increase", func(ctx context.Context, i int) (float32, error) { return float32(i) * 1.5, nil }, ) reduce := func(ctx context.Context, a, b float32) (float32, error) { return a + b, nil } ctx := context.Background() in := 1 pipe := pipeline.NewConcurrentStep( []pipeline.Step[int, float32]{ step, step, step, step, step, step, step, step, step, step, step, step, }, reduce, ) out, err := pipe.Run(ctx, in) fmt.Println(out, err)
Output: 18 <nil>
func NewConcurrentStep ¶ added in v0.5.0
func NewConcurrentStep[I, O any](steps []Step[I, O], reduce reducer[O]) ConcurrentStep[I, O]
NewConcurrentStep creates a step that will run each of the inner steps concurrently. The step will wait for all of the steps to finish before returning.
If one of them fails, the step will wait until everyone finishes and after that return the first encountered error.
This step (as all the others) doesn't handle panics. Be careful since this step creates goroutines and the panics not necessarilly will be signaled in the same goroutine as the origin call. Make sure to handle panics on your own if your code is unsafe (through decorations / deferrals in steps / etc)
func (ConcurrentStep[I, O]) Draw ¶ added in v0.5.0
func (c ConcurrentStep[I, O]) Draw(graph Graph)
func (ConcurrentStep[I, O]) Run ¶ added in v0.5.0
func (c ConcurrentStep[I, O]) Run(ctx context.Context, in I) (O, error)
Run the step concurrently, if one of them fails an error will be returned.
This step waits for all of the concurrent ones to finish.
Note that this step may use goroutines and (as all other steps) doesn't handle panics, hence it is advise to handle them on your own if you can't guarantee a panic-safe environment.
type ConditionalStep ¶ added in v0.5.0
type ConditionalStep[I, O any] struct { // contains filtered or unexported fields }
ConditionalStep allows a step to branch into two inner steps depending on the result of a statement's evaluation.
This conditional allows us to branch into two "different pipelines" depending on the result
Example ¶
The following example evaluates a dummy condition and depending on the result it branches to one step or another.
This example uses dummy data to showcase as simple as possible this scenario.
Note: we use several UnitStep to showcase as it allows us to easily run dummy code, but it could use any type of step you want as long as it implements pipeline.Step[I, O]
type User any type Data any stmt := pipeline.NewStatement( "check_something", func(ctx context.Context, in User) bool { // check and return were to branch return true }, ) tf := pipeline.NewUnitStep( "true_case", func(ctx context.Context, in User) (Data, error) { // do something with input return Data(true), nil }, ) ff := pipeline.NewUnitStep( "false_step", func(ctx context.Context, u User) (Data, error) { // do something with input return Data(false), nil }, ) ctx := context.Background() in := User(nil) pipe := pipeline.NewConditionalStep[User, Data](stmt, tf, ff) out, err := pipe.Run(ctx, in) fmt.Println(out, err)
Output: true <nil>
func NewConditionalStep ¶ added in v0.5.0
func NewConditionalStep[I, O any](statement conditionalStatement[I], t, f Step[I, O]) ConditionalStep[I, O]
NewConditionalStep creates a conditional step that will run a statement. If it holds true, then the "true" step will be run. Else, the "false" step will be called. If a statement is nil, then it will be considered to hold false (thus, the "false" step is called) If one of the steps is nil and the statement is such, then an error will be triggered (you probably want an OptionalStep if one of the branches can be nil).
func (ConditionalStep[I, O]) Draw ¶ added in v0.5.0
func (c ConditionalStep[I, O]) Draw(graph Graph)
type DrawableGraph ¶ added in v0.5.0
type DrawableGraph interface {
Draw(graph Graph)
}
DrawableGraph is a contract for drawing in graphs
type Graph ¶ added in v0.5.0
type Graph interface { // AddConcurrency branching as many times as needed (each branch is a concurrent/fork 'node') AddConcurrency(branches ...GraphDrawer) // AddDecision from a given statement, allowing inner graphs for each branch of the decision AddDecision(statement string, yes GraphDrawer, no GraphDrawer) // Create an action entry AddActivity(label string) }
Graph interface allowing to create a representation/drawing of a graph
type GraphDrawer ¶ added in v0.5.0
type GraphDrawer = func(Graph)
GraphDrawer alias for Draw(Graph) functions
type OptionalStep ¶ added in v0.5.0
type OptionalStep[I, O any] struct { // contains filtered or unexported fields }
OptionalStep is a step that may or may not run depending on a statement
Example ¶
The following example shows an optional step were it will be run if the evaluated statement yields true, otherwise it will return the same input it was provided this type of constructed step doesn't allow output mutation as we don't know how to default the mutated output if the step is skipped
This example uses dummy data to showcase as simple as possible this scenario.
Note: we use several UnitStep to showcase as it allows us to easily run dummy code, but it could use any type of step you want as long as it implements pipeline.Step[I, O]
type User any stmt := pipeline.NewStatement( "check_something", func(ctx context.Context, in User) bool { // check and return if we should run optional or not return in == User(1) }, ) of := pipeline.NewUnitStep( "optional_case", func(ctx context.Context, in User) (User, error) { // do something with input return User(true), nil }, ) ctx := context.Background() pipe := pipeline.NewOptionalStep[User](stmt, of) // Skips optional step (returns same input) out, err := pipe.Run(ctx, User(12)) fmt.Println(out, err) // Runs optional step (returns step output) out, err = pipe.Run(ctx, User(1)) fmt.Println(out, err)
Output: 12 <nil> true <nil>
Example (Default) ¶
The following example allows us to define an optional step with different output from its input, and in case the optional step is skipped it will call a provided default function for it to return the default output you want for that case
This example uses dummy data to showcase as simple as possible this scenario.
Note: we use several UnitStep to showcase as it allows us to easily run dummy code, but it could use any type of step you want as long as it implements pipeline.Step[I, O]
type User any type Data any stmt := pipeline.NewStatement( "check_something", func(ctx context.Context, in User) bool { // check and return if we should run optional or not return in == User(1) }, ) of := pipeline.NewUnitStep( "optional_case", func(ctx context.Context, in User) (Data, error) { // do something with input return Data(true), nil }, ) def := func(ctx context.Context, in User) (Data, error) { // create default value of type Data (gets run if we // skip the step as we don't know how to default Data type) return Data(false), nil } ctx := context.Background() pipe := pipeline.NewOptionalStepWithDefault[User, Data](stmt, of, def) // Skips optional step (returns default) out, err := pipe.Run(ctx, User(12)) fmt.Println(out, err) // Runs optional step (returns step output) out, err = pipe.Run(ctx, User(1)) fmt.Println(out, err)
Output: false <nil> true <nil>
func NewOptionalStep ¶ added in v0.5.0
func NewOptionalStep[T any](stmt Statement[T], s Step[T, T]) OptionalStep[T, T]
NewOptionalStep creates a step that may run the provided step if the statement evaluates correctly if the statement yields false, then the same input will be forwarded as output.
func NewOptionalStepWithDefault ¶ added in v0.5.0
func NewOptionalStepWithDefault[I, O any](stmt Statement[I], s Step[I, O], def Unit[I, O]) OptionalStep[I, O]
NewOptionalStepWithDefault creates a step that may run the provided step if the statement evaluates correctly if the statement yields false, then a default unit will be run to forward an output O
func (OptionalStep[I, O]) Draw ¶ added in v0.5.0
func (c OptionalStep[I, O]) Draw(graph Graph)
type SequentialStep ¶ added in v0.5.0
type SequentialStep[I, M, O any] struct { // contains filtered or unexported fields }
SequentialStep runs two steps sequentially.
A sequential step allows partial mutation between the two steps.
Eg. The first step can have an input of type 'A' and an output of type 'B' The second step can have an input of type 'B' and an output of type 'C'. Hence, the sequential step is a step that goes from input 'A', to output 'C' (mutating patially into 'B' in the middle)
If one of the steps fails, the step is halted and the error is returned
Example ¶
The following example shows a sequence between two steps were each of them is run sequentially with the output of the previous one as input
This example uses dummy data to showcase as simple as possible this scenario.
Note: we use several UnitStep to showcase as it allows us to easily run dummy code, but it could use any type of step you want as long as it implements pipeline.Step[I, O]
type DriverID int type Driver any type NotificationReceipt any gd := pipeline.NewUnitStep( "get_driver", func(ctx context.Context, id DriverID) (Driver, error) { // do something with input return Driver(id), nil }, ) sn := pipeline.NewUnitStep( "send_notification", func(ctx context.Context, d Driver) (NotificationReceipt, error) { // do something with input return NotificationReceipt(25), nil }, ) pipe := pipeline.NewSequentialStep[DriverID, Driver, NotificationReceipt](gd, sn) out, err := pipe.Run(context.Background(), DriverID(1234)) fmt.Println(out, err)
Output: 25 <nil>
func NewSequentialStep ¶ added in v0.5.0
func NewSequentialStep[I, M, O any](s Step[I, M], e Step[M, O]) SequentialStep[I, M, O]
NewSequentialStep creates step that will run each of the steps sequentially. If one of them fails, the operation will abort immediately
func (SequentialStep[I, M, O]) Draw ¶ added in v0.5.0
func (s SequentialStep[I, M, O]) Draw(graph Graph)
type Statement ¶
type Statement[T any] struct { // contains filtered or unexported fields }
Statement is a structure that can be evaluated to yield a boolean result
Example ¶
This examples shows a simple statement that lets us evaluate it with a given input to yield a boolean result
This example uses dummy data to showcase as simple as possible this scenario.
stmt := pipeline.NewStatement( "is_number_odd", func(ctx context.Context, i int) bool { return i%2 != 0 }, ) out := stmt.Evaluate(context.Background(), 25) fmt.Println(out)
Output: true
func NewAnonymousStatement ¶ added in v0.5.0
NewAnonymousStatement creates an anonymous statement with no representation, that will evaluate to the given evaluation
func NewStatement ¶ added in v0.5.0
NewStatement creates a statement represented by the given name, that will evaluate to the given evaluation
type Step ¶
type Step[I, O any] interface { DrawableGraph // Run a step. Returns an error if this step fails to complete. // An input I is provided as a mean of communication between different units of work Run(context.Context, I) (O, error) }
Step is runnable element that yields a result or error from a given input A step can be drawn into a graph to represent it.
type TracedStep ¶ added in v0.5.0
type TracedStep[I, O any] struct { // contains filtered or unexported fields }
TracedStep decorates a step with tracing capabilities, logging the duration of the step execution, time of execution and its result.
func NewTracedStep ¶ added in v0.5.0
func NewTracedStep[I, O any](name string, step Step[I, O]) TracedStep[I, O]
NewTracedStep creates traced step that will log the execution time of the step to the stdout
func NewTracedStepWithWriter ¶ added in v0.5.0
func NewTracedStepWithWriter[I, O any](name string, step Step[I, O], writer io.Writer) TracedStep[I, O]
NewTracedStepWithWriter creates traced step that will log the execution time of the step to the writer
func (TracedStep[I, O]) Draw ¶ added in v0.5.0
func (t TracedStep[I, O]) Draw(graph Graph)
type UMLGraph ¶ added in v0.5.0
type UMLGraph struct {
// contains filtered or unexported fields
}
UMLGraph represents a graph that can render itself into UML
func NewUMLGraph ¶ added in v0.5.0
func NewUMLGraph() *UMLGraph
NewUMLGraph createsn UML Activity graph diagram that represents one
func (*UMLGraph) AddActivity ¶ added in v0.5.0
func (*UMLGraph) AddConcurrency ¶ added in v0.5.0
func (p *UMLGraph) AddConcurrency(forks ...GraphDrawer)
func (*UMLGraph) AddDecision ¶ added in v0.5.0
func (p *UMLGraph) AddDecision(statement string, yes GraphDrawer, no GraphDrawer)
type UMLOptions ¶
type UMLOptions struct { // Type of the drawing, by default we will use UMLFormatSVG Type UMLOutputFormat // Base URL to use for retrieving Plant UML graphs, by default we will use http://www.plantuml.com/plantuml/ BaseURL string }
UMLOptions available when drawing a graph
type UMLOutputFormat ¶
type UMLOutputFormat string
UMLOutputFormat for graph renderings
const ( // UMLFormatPNG OutputFormat for graph renderings (a UMLFormatPNG image will be created) UMLFormatPNG UMLOutputFormat = "png" // UMLFormatSVG OutputFormat for graph renderings (an UMLFormatSVG image will be created) UMLFormatSVG UMLOutputFormat = "svg" // UMLFormatRaw OutputFormat for graph renderings (a file with the raw contents will be created) UMLFormatRaw UMLOutputFormat = "raw" // UMLFormatTXT OutputFormat for graph renderings (an ASCII Art will be created) UMLFormatTXT UMLOutputFormat = "txt" )
type UMLRenderer ¶ added in v0.5.0
type UMLRenderer struct {
Options UMLOptions
}
UMLRenderer allows us to render graphs into an UML diagram output
func NewUMLRenderer ¶ added in v0.5.0
func NewUMLRenderer(options UMLOptions) *UMLRenderer
NewUMLRenderer creates an UML renderer for drawing graphs as specified
func (*UMLRenderer) Render ¶ added in v0.5.0
func (u *UMLRenderer) Render(graphDiagram umlRendererGraph, output io.WriteCloser) error
Render draws in UML activity the given step, and writes it to the given file
type Unit ¶ added in v0.5.0
Unit of work to yield a result of type O (or an error in case of a failure) from a given input I
type UnitStep ¶ added in v0.5.0
type UnitStep[I, O any] struct { // contains filtered or unexported fields }
UnitStep for making a unit of work.
Example ¶
The following example shows a simple unit step that runs a unit of work with a given input and yields a result of a different type or an error depending on the execution
This example uses dummy data to showcase as simple as possible this scenario.
Note: we use several UnitStep to showcase as it allows us to easily run dummy code, but it could use any type of step you want as long as it implements pipeline.Step[I, O]
type ( InData any OutData any ) step := pipeline.NewUnitStep( "do_something", func(ctx context.Context, in InData) (OutData, error) { // do something with input return OutData(in), nil }, ) out, err := step.Run(context.Background(), InData("example")) fmt.Println(out, err)
Output: example <nil>
func NewUnitStep ¶ added in v0.5.0
NewUnitStep creates an immutable stateless unit of work based on a Unit function You can use this implementation when your use-cases will be completely stateless (they don't rely on a service or anything that can be injected at the start and stay immutable for the lifetime of the process)
func (UnitStep[I, O]) Draw ¶ added in v0.5.0
func (s UnitStep[I, O]) Draw(graph Graph)
Draw this step in a graph
func (UnitStep[I, O]) ID ¶ added in v0.5.0
func (s UnitStep[I, O]) ID() string
ID is a unique identifier of this step. You can safely assume it wont be repeated and use it in any custom steps to enrich logic (eg. a circuit breaker / cache for IDs)