lambdag

package module
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 12, 2022 License: MIT Imports: 30 Imported by: 0

README

lambdag

Documentation GitHub go.mod Go version (branch) GitHub tag (latest SemVer) Github Actions test License

DAG Execution for AWS StepFunctions with Lambda Function

Motivation :

  • Amazon State Language (ASL) data flow (InputPath, ResultPath,ResultSelector,etc...) was too difficult.
  • AWS StepFunctions machine retry and error catching mechanisms are great!
  • In the end, for difficult processes, it is tempting to rely on Lambda functions, but it is hard to create a Lambda function for each processing unit.
  • Airflow's DAG was good, I want to write something like DAG in Golang

Ideas :

Let the AWS StepFunctions state machine concentrate on things outside of the pipeline logic, such as retries, SNS notifications, etc., and contain the pipeline logic in a single Lambda function. Then, the state machine should focus on invoking a single Lambda function.

Usage

So here is the simplest StateMachine example

simplest

definition.asl.json

and, The Lambda function that this state machine invokes is written as follows

package main

import (
	"context"
	"encoding/json"
	"log"
	"os"
	"os/signal"
	"syscall"

	"github.com/mashiike/lambdag"
)

func main() {
	ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGTERM, syscall.SIGINT, syscall.SIGHUP)
	defer cancel()
	dag, err := lambdag.NewDAG("SampleDAG")
	if err != nil {
		log.Fatal(err)
	}
	task1, err := dag.NewTask("task1", lambdag.TaskHandlerFunc(func(ctx context.Context, tr *lambdag.TaskRequest) (interface{}, error) {
		return `"task1 success"`, nil
	}))
	if err != nil {
		log.Fatal(err)
	}
	task2, err := dag.NewTask("task2", lambdag.TaskHandlerFunc(func(ctx context.Context, tr *lambdag.TaskRequest) (interface{}, error) {
		return "task2 success", nil
	}))
	if err != nil {
		log.Fatal(err)
	}
	err = task1.SetDownstream(task2)
	if err != nil {
		log.Fatal(err)
	}
	lambdag.RunWithContext(ctx, os.Args[1:], dag)
}
graph LR
    task1("task1")
    task2("task2")

    task1-->task2

It is assumed to work with the runtime of the provided.al2

The build should look like this

$ GOOS=linux GOARCH=arm64 go build -o bootstrap _examples/src/main.go

To run the example in a Local environment using StepFunctions Local

$ cd _examples
$ make prepare
$ make build
$ make run/<execution_name>
$ make history/<execution_name>

Usage (for local development)

lambdag.RunWithContext(ctx, os.Args[1:], dag)

if local execution run as CLI

$ go run _examples/src/main.go 
Usage: SampleDAG <flags> <subcommand> <subcommand args>

Subcommands:
        commands         list all command names
        flags            describe all known top-level flags
        help             describe subcommands and their syntax
        render           rendering DAG
        serve            start a stub server for the lambda Invoke API

CLI has stub server for the lambda Invoke API

$ go run _examples/src/main.go serve
2022/06/17 13:51:55 [info] starting up with Stub lambda API http://:3001
aws lambda --endpoint http://localhost:3001 invoke --function-name SampleDAG --cli-binary-format raw-in-base64-out --payload '{"Comment":"this is dag run config"}' output.txt --log-type Tail --qualifier current

LICENSE

MIT License

Copyright (c) 2022 IKEDA Masashi

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewLambdaHandler

func NewLambdaHandler(dag *DAG) lambda.Handler

func Run

func Run(args []string, dag *DAG) error

func RunWithContext

func RunWithContext(ctx context.Context, args []string, dag *DAG) error

func WithCircuitBreaker

func WithCircuitBreaker(num int) func(opts *DAGOptions) error

func WithDAGLogger

func WithDAGLogger(fn func(context.Context, *DAGRunContext) (*log.Logger, error)) func(opts *DAGOptions) error

func WithNumOfTasksInSingleInvoke

func WithNumOfTasksInSingleInvoke(num int) func(opts *DAGOptions) error

func WithTaskLocker

func WithTaskLocker(fn func(context.Context, *DAGRunContext) (LockerWithError, error)) func(opts *TaskOptions) error

func WithTaskLogger

func WithTaskLogger(fn func(context.Context, *DAGRunContext) (*log.Logger, error)) func(opts *TaskOptions) error

func WrapTaskRetryable

func WrapTaskRetryable(err error) error

Types

type AncestorDescendantSameError

type AncestorDescendantSameError struct {
	Ancestor   *Task
	Descendant *Task
}

func (*AncestorDescendantSameError) Error

func (err *AncestorDescendantSameError) Error() string

type CycleDetectedInDAGError

type CycleDetectedInDAGError struct {
	Start *Task
	End   *Task
}

func (*CycleDetectedInDAGError) Error

func (err *CycleDetectedInDAGError) Error() string

type DAG

type DAG struct {
	// contains filtered or unexported fields
}

func NewDAG

func NewDAG(id string, optFns ...func(opts *DAGOptions) error) (*DAG, error)

func (*DAG) AddDependency

func (dag *DAG) AddDependency(ancestor *Task, descendant *Task) error

func (*DAG) CircuitBreaker

func (dag *DAG) CircuitBreaker() int

func (*DAG) Execute

func (dag *DAG) Execute(ctx context.Context, dagRunCtx *DAGRunContext) (*DAGRunContext, error)

func (*DAG) GetAllTasks

func (dag *DAG) GetAllTasks() []*Task

func (*DAG) GetAncestorTasks

func (dag *DAG) GetAncestorTasks(taskID string) []*Task

func (*DAG) GetDescendantTasks

func (dag *DAG) GetDescendantTasks(taskID string) []*Task

func (*DAG) GetDownstreamTasks

func (dag *DAG) GetDownstreamTasks(taskID string) []*Task

func (*DAG) GetExecutableTasks

func (dag *DAG) GetExecutableTasks(finishedTaskIDs []string) []*Task

func (*DAG) GetStartTasks

func (dag *DAG) GetStartTasks() []*Task

func (*DAG) GetTask

func (dag *DAG) GetTask(taskID string) (*Task, bool)

func (*DAG) GetUpstreamTasks

func (dag *DAG) GetUpstreamTasks(taskID string) []*Task

func (*DAG) ID

func (dag *DAG) ID() string

func (*DAG) IsExecutableTask

func (dag *DAG) IsExecutableTask(taskID string, finishedTaskIDs []string) bool

func (*DAG) NewLogger

func (dag *DAG) NewLogger(ctx context.Context, dagRunCtx *DAGRunContext) (*log.Logger, error)

func (*DAG) NewTask

func (dag *DAG) NewTask(taskID string, handler TaskHandler, optFns ...func(opts *TaskOptions) error) (*Task, error)

func (*DAG) NumOfTasksInSingleInvoke

func (dag *DAG) NumOfTasksInSingleInvoke() int

func (*DAG) WarkAllDependencies

func (dag *DAG) WarkAllDependencies(fn func(ancestor *Task, descendant *Task) error) error

type DAGOptions

type DAGOptions struct {
	// contains filtered or unexported fields
}

type DAGRunContext

type DAGRunContext struct {
	DAGRunID        string                     `json:"DAGRunId"`
	DAGRunStartAt   time.Time                  `json:"DAGRunStartAt"`
	DAGRunConfig    json.RawMessage            `json:"DAGRunConfig"`
	TaskResponses   map[string]json.RawMessage `json:"TaskResponses,omitempty"`
	LambdaCallCount int                        `json:"LambdaCallCount"`
	Continue        bool                       `json:"Continue"`
	IsCircuitBreak  bool                       `json:"IsCircuitBreak"`
}

type LambdaAPIStubMux

type LambdaAPIStubMux struct {
	// contains filtered or unexported fields
}

func NewLambdaAPIStubMux

func NewLambdaAPIStubMux(functionName string, handler lambda.Handler) *LambdaAPIStubMux

func (*LambdaAPIStubMux) ServeHTTP

func (mux *LambdaAPIStubMux) ServeHTTP(w http.ResponseWriter, r *http.Request)

type LambdaHandler

type LambdaHandler struct {
	// contains filtered or unexported fields
}

func (*LambdaHandler) Invoke

func (h *LambdaHandler) Invoke(ctx context.Context, payload json.RawMessage) (interface{}, error)

type LockerWithError

type LockerWithError interface {
	LockWithErr(ctx context.Context) (lockGranted bool, err error)
	UnlockWithErr(ctx context.Context) (err error)
}

type LockerWithoutError

type LockerWithoutError struct {
	sync.Locker
}

func NewLockerWithoutError

func NewLockerWithoutError(l sync.Locker) LockerWithoutError

func (LockerWithoutError) LockWithErr

func (l LockerWithoutError) LockWithErr(_ context.Context) (bool, error)

func (LockerWithoutError) UnlockWithErr

func (l LockerWithoutError) UnlockWithErr(_ context.Context) error

type NopLocker

type NopLocker struct{}

func (NopLocker) LockWithErr

func (l NopLocker) LockWithErr(_ context.Context) (bool, error)

func (NopLocker) UnlockWithErr

func (l NopLocker) UnlockWithErr(_ context.Context) error

type Task

type Task struct {
	// contains filtered or unexported fields
}

func (*Task) Execute

func (task *Task) Execute(ctx context.Context, dagRunCtx *DAGRunContext) (json.RawMessage, error)

func (*Task) GoString

func (task *Task) GoString() string

func (*Task) ID

func (task *Task) ID() string

func (*Task) NewLocker

func (task *Task) NewLocker(ctx context.Context, dagRunCtx *DAGRunContext) (LockerWithError, error)

func (*Task) NewLogger

func (task *Task) NewLogger(ctx context.Context, dagRunCtx *DAGRunContext) (*log.Logger, error)

func (*Task) SetDownstream

func (task *Task) SetDownstream(descendants ...*Task) error

func (*Task) SetUpstream

func (task *Task) SetUpstream(ancestors ...*Task) error

func (*Task) String

func (task *Task) String() string

func (*Task) TaskHandler

func (task *Task) TaskHandler() TaskHandler

type TaskDependencyDuplicateError

type TaskDependencyDuplicateError struct {
	Ancestor   *Task
	Descendant *Task
}

func (*TaskDependencyDuplicateError) Error

func (err *TaskDependencyDuplicateError) Error() string

type TaskHandler

type TaskHandler interface {
	Invoke(context.Context, *TaskRequest) (interface{}, error)
}

type TaskHandlerFunc

type TaskHandlerFunc func(context.Context, *TaskRequest) (interface{}, error)

func (TaskHandlerFunc) Invoke

func (h TaskHandlerFunc) Invoke(ctx context.Context, req *TaskRequest) (interface{}, error)

type TaskIDDuplicateError

type TaskIDDuplicateError struct {
	TaskID string
}

func (*TaskIDDuplicateError) Error

func (err *TaskIDDuplicateError) Error() string

type TaskOptions

type TaskOptions struct {
	// contains filtered or unexported fields
}

type TaskRequest

type TaskRequest struct {
	DAGRunID      string
	DAGRunConfig  json.RawMessage
	TaskResponses map[string]json.RawMessage
	Logger        *log.Logger
}

type TaskRetryableError

type TaskRetryableError struct {
	// contains filtered or unexported fields
}

func (TaskRetryableError) Error

func (err TaskRetryableError) Error() string

func (TaskRetryableError) Unwrap

func (err TaskRetryableError) Unwrap() error

type UnknownError

type UnknownError struct {
	// contains filtered or unexported fields
}

func (UnknownError) Error

func (err UnknownError) Error() string

func (UnknownError) Unwrap

func (err UnknownError) Unwrap() error

Directories

Path Synopsis
_examples
src

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL