asyncjob

package module
v2.0.0-...-f530737 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 10, 2022 License: MIT Imports: 8 Imported by: 0

README

AsyncJob

AsyncJob aiming to help you organize code in dependencyGraph(DAG), instead of a sequential chain.

Concepts

JobDefinition is a graph describe code blocks and their connections.

  • jobDefinition can be visualized using graphviz, easier for human to understand.

JobInstance is an instance of JobDefinition, after calling .Start() method from JobDefinition

  • all Steps on the definition will be copied to JobInstance.
  • each step will be executed once it's precedent step is done.
  • jobInstance can be visualized as well, instance visualize contains detailed info(startTime, duration) on each step.

StepDefinition is a individual code block which can be executed and have inputs, output.

  • StepDefinition describe it's preceding steps.
  • StepDefinition contains generic Params
  • output of a step can be feed into next step as input, type is checked by go generics.

StepInstance is instance of StepDefinition

  • step is wrapped in AsyncTask
  • a step would be started once all it's dependency is finished.
  • executionPolicy can be applied {Retry, ContextEnrichment}

Usage

Build and run a asyncjob
	job := NewJob("sqlSummaryJob")
	jobLib := &SqlSummaryJobLib{}

	# connection
	connTask, _ := AddStep(bCtx, job, "getConnection", jobLib.GetConnection, []string{})

	# query1
	table1ParamTask := InputParam(job, "param_table1", "table1")
	table1ClientTask, _ := StepAfterBoth(bCtx, job, "getTableClient1", connTask, table1ParamTask, jobLib.GetTableClient)
	query1ParamTask := InputParam(job, "param_query1", "select x,y,z from table1")
	qery1ResultTask, _ := StepAfterBoth(bCtx, job, "queryTable1", table1ClientTask, query1ParamTask, jobLib.ExecuteQuery)

	# query2
	table2ParamTask := InputParam(job, "param_table2", "table2")
	table2ClientTask, _ := StepAfterBoth(bCtx, job, "getTableClient2", connTask, table2ParamTask, jobLib.GetTableClient)
	query2ParamTask := InputParam(job, "param_query2", &sjb.Query2)
	qery2ResultTask, _ := StepAfterBoth(bCtx, job, "queryTable2", table2ClientTask, query2ParamTask, jobLib.ExecuteQuery)

	# summarize
	StepAfterBoth(bCtx, job, "summarize", qery1ResultTask, qery2ResultTask, jobLib.SummarizeQueryResult)

	# execute job
	job.Start(context.Background())
	job.Wait(context.WithTimeout(context.Background(), 10*time.Second))
visualize of a job
	# visualize the job
	dotGraph := job.Visualize()
	fmt.Println(dotGraph)

visualize job graph

digraph {
	newrank = "true"
		param_table1 [label="table1" shape=hexagon style=filled tooltip="Type: param\nName: table1\nState: completed\nStartAt: 2022-11-03T00:56:30.006196-07:00\nDuration: 12.657µs" fillcolor=green] 
		param_query1 [label="query1" shape=hexagon style=filled tooltip="Type: param\nName: query1\nState: completed\nStartAt: 2022-11-03T00:56:30.0062-07:00\nDuration: 17.013µs" fillcolor=green] 
		root_job [label="job" shape=triangle style=filled tooltip="Type: root\nName: job\nState: completed\nStartAt: 2022-11-03T00:56:30.006183-07:00\nDuration: 3.695µs" fillcolor=green] 
		param_query2 [label="query2" shape=hexagon style=filled tooltip="Type: param\nName: query2\nState: completed\nStartAt: 2022-11-03T00:56:30.006197-07:00\nDuration: 13.781µs" fillcolor=green] 
		task_getTableClient1 [label="getTableClient1" shape=box style=filled tooltip="Type: task\nName: getTableClient1\nState: completed\nStartAt: 2022-11-03T00:56:30.006304-07:00\nDuration: 34.652µs" fillcolor=green] 
		task_queryTable1 [label="queryTable1" shape=box style=filled tooltip="Type: task\nName: queryTable1\nState: completed\nStartAt: 2022-11-03T00:56:30.006349-07:00\nDuration: 3.217443247s" fillcolor=green] 
		param_table2 [label="table2" shape=hexagon style=filled tooltip="Type: param\nName: table2\nState: completed\nStartAt: 2022-11-03T00:56:30.006199-07:00\nDuration: 15.632µs" fillcolor=green] 
		task_getTableClient2 [label="getTableClient2" shape=box style=filled tooltip="Type: task\nName: getTableClient2\nState: completed\nStartAt: 2022-11-03T00:56:30.00631-07:00\nDuration: 51.872µs" fillcolor=green] 
		task_queryTable2 [label="queryTable2" shape=box style=filled tooltip="Type: task\nName: queryTable2\nState: completed\nStartAt: 2022-11-03T00:56:30.006377-07:00\nDuration: 67.814µs" fillcolor=green] 
		task_emailNotification [label="emailNotification" shape=box style=filled tooltip="Type: task\nName: emailNotification\nState: completed\nStartAt: 2022-11-03T00:56:33.223952-07:00\nDuration: 3.92µs" fillcolor=green] 
		param_serverName [label="serverName" shape=hexagon style=filled tooltip="Type: param\nName: serverName\nState: completed\nStartAt: 2022-11-03T00:56:30.006198-07:00\nDuration: 14.638µs" fillcolor=green] 
		task_getConnection [label="getConnection" shape=box style=filled tooltip="Type: task\nName: getConnection\nState: completed\nStartAt: 2022-11-03T00:56:30.006231-07:00\nDuration: 62.234µs" fillcolor=green] 
		task_checkAuth [label="checkAuth" shape=box style=filled tooltip="Type: task\nName: checkAuth\nState: completed\nStartAt: 2022-11-03T00:56:30.006212-07:00\nDuration: 650ns" fillcolor=green] 
		task_summarize [label="summarize" shape=box style=filled tooltip="Type: task\nName: summarize\nState: completed\nStartAt: 2022-11-03T00:56:33.22392-07:00\nDuration: 4.325µs" fillcolor=green] 
        
		param_table1 -> task_getTableClient1 [style=bold tooltip="Time: 2022-11-03T00:56:30.006304-07:00" color=green] 
		param_query1 -> task_queryTable1 [style=bold tooltip="Time: 2022-11-03T00:56:30.006349-07:00" color=green] 
		param_table2 -> task_getTableClient2 [style=bold tooltip="Time: 2022-11-03T00:56:30.00631-07:00" color=green] 
		task_getTableClient2 -> task_queryTable2 [style=bold tooltip="Time: 2022-11-03T00:56:30.006377-07:00" color=green] 
		param_query2 -> task_queryTable2 [style=bold tooltip="Time: 2022-11-03T00:56:30.006377-07:00" color=green] 
		task_queryTable2 -> task_summarize [style=bold tooltip="Time: 2022-11-03T00:56:33.22392-07:00" color=green] 
		root_job -> param_serverName [style=bold tooltip="Time: 2022-11-03T00:56:30.006198-07:00" color=green] 
		root_job -> task_checkAuth [style=bold tooltip="Time: 2022-11-03T00:56:30.006212-07:00" color=green] 
		root_job -> param_table1 [style=bold tooltip="Time: 2022-11-03T00:56:30.006196-07:00" color=green] 
		root_job -> param_query1 [style=bold tooltip="Time: 2022-11-03T00:56:30.0062-07:00" color=green] 
		root_job -> param_table2 [style=bold tooltip="Time: 2022-11-03T00:56:30.006199-07:00" color=green] 
		root_job -> param_query2 [style=bold tooltip="Time: 2022-11-03T00:56:30.006197-07:00" color=green] 
		param_serverName -> task_getConnection [style=bold tooltip="Time: 2022-11-03T00:56:30.006231-07:00" color=green] 
		task_getTableClient1 -> task_queryTable1 [style=bold tooltip="Time: 2022-11-03T00:56:30.006349-07:00" color=green] 
		task_queryTable1 -> task_summarize [style=bold tooltip="Time: 2022-11-03T00:56:33.22392-07:00" color=green] 
		task_summarize -> task_emailNotification [style=bold tooltip="Time: 2022-11-03T00:56:33.223952-07:00" color=green] 
		task_getConnection -> task_getTableClient1 [style=bold tooltip="Time: 2022-11-03T00:56:30.006304-07:00" color=green] 
		task_getConnection -> task_getTableClient2 [style=bold tooltip="Time: 2022-11-03T00:56:30.00631-07:00" color=green] 
		task_checkAuth -> task_queryTable1 [style=bold tooltip="Time: 2022-11-03T00:56:30.006349-07:00" color=green] 
		task_checkAuth -> task_queryTable2 [style=bold tooltip="Time: 2022-11-03T00:56:30.006377-07:00" color=green] 

}

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type ExecutionOptionPreparer

type ExecutionOptionPreparer func(*StepExecutionOptions) *StepExecutionOptions

func ExecuteAfter

Add precedence to a step.

without taking input from it(use StepAfter/StepAfterBoth otherwise)

func WithContextEnrichment

func WithContextEnrichment(contextPolicy StepContextPolicy) ExecutionOptionPreparer

func WithRetry

func WithRetry(retryPolicy RetryPolicy) ExecutionOptionPreparer

Allow retry of a step on error.

func WithTimeout

func WithTimeout(timeout time.Duration) ExecutionOptionPreparer

Limit time spend on a step.

type JobDefinition

type JobDefinition[T any] struct {
	Name string
	// contains filtered or unexported fields
}

JobDefinition defines a job with child steps, and step is organized in a Directed Acyclic Graph (DAG).

func NewJobDefinition

func NewJobDefinition[T any](name string) *JobDefinition[T]

Create new JobDefinition

it is suggest to build jobDefinition statically on process start, and reuse it for each job instance.

func (*JobDefinition[T]) GetName

func (jd *JobDefinition[T]) GetName() string

func (*JobDefinition[T]) GetStep

func (jd *JobDefinition[T]) GetStep(stepName string) (StepDefinitionMeta, bool)

GetStep returns the stepDefinition by name

func (*JobDefinition[T]) Start

func (jd *JobDefinition[T]) Start(ctx context.Context, input *T, jobOptions ...JobOptionPreparer) *JobInstance[T]

Start execution of the job definition.

this will create and return new instance of the job
caller will then be able to wait for the job instance

func (*JobDefinition[T]) Visualize

func (jd *JobDefinition[T]) Visualize() (string, error)

Visualize the job definition in graphviz dot format

type JobDefinitionMeta

type JobDefinitionMeta interface {
	GetName() string
	GetStep(stepName string) (StepDefinitionMeta, bool) // TODO: switch bool to error
	// contains filtered or unexported methods
}

Interface for a job definition

type JobDefinitionWithResult

type JobDefinitionWithResult[Tin, Tout any] struct {
	*JobDefinition[Tin]
	// contains filtered or unexported fields
}

func JobWithResult

func JobWithResult[Tin, Tout any](jd *JobDefinition[Tin], resultStep *StepDefinition[Tout]) (*JobDefinitionWithResult[Tin, Tout], error)

func (*JobDefinitionWithResult[Tin, Tout]) Start

func (jd *JobDefinitionWithResult[Tin, Tout]) Start(ctx context.Context, input *Tin) *JobInstanceWithResult[Tin, Tout]

type JobError

type JobError struct {
	Code         JobErrorCode
	StepError    error
	StepInstance StepInstanceMeta
	Message      string
}

func (*JobError) Error

func (je *JobError) Error() string

func (*JobError) RootCause

func (je *JobError) RootCause() error

RootCause track precendent chain and return the first step raised this error.

func (*JobError) Unwrap

func (je *JobError) Unwrap() error

type JobErrorCode

type JobErrorCode string
const (
	ErrPrecedentStepFailure JobErrorCode = "precedent step failed"
	ErrStepFailed           JobErrorCode = "step failed"
	ErrStepNotInJob         JobErrorCode = "trying to reference to a step not registered in job"
)

func (JobErrorCode) Error

func (code JobErrorCode) Error() string

type JobExecutionOptions

type JobExecutionOptions struct {
	Id              string
	RunSequentially bool
}

type JobInstance

type JobInstance[T any] struct {
	Definition *JobDefinition[T]
	// contains filtered or unexported fields
}

JobInstance is the instance of a jobDefinition

func (*JobInstance[T]) GetJobDefinition

func (ji *JobInstance[T]) GetJobDefinition() JobDefinitionMeta

func (*JobInstance[T]) GetJobInstanceId

func (ji *JobInstance[T]) GetJobInstanceId() string

func (*JobInstance[T]) GetStepInstance

func (ji *JobInstance[T]) GetStepInstance(stepName string) (StepInstanceMeta, bool)

GetStepInstance returns the stepInstance by name

func (*JobInstance[T]) Visualize

func (jd *JobInstance[T]) Visualize() (string, error)

Visualize the job instance in graphviz dot format

func (*JobInstance[T]) Wait

func (ji *JobInstance[T]) Wait(ctx context.Context) error

Wait for all steps in the job to finish.

type JobInstanceMeta

type JobInstanceMeta interface {
	GetJobDefinition() JobDefinitionMeta
	GetStepInstance(stepName string) (StepInstanceMeta, bool)
	Wait(context.Context) error
	// contains filtered or unexported methods
}

type JobInstanceWithResult

type JobInstanceWithResult[Tin, Tout any] struct {
	*JobInstance[Tin]
	// contains filtered or unexported fields
}

func (*JobInstanceWithResult[Tin, Tout]) Result

func (ji *JobInstanceWithResult[Tin, Tout]) Result(ctx context.Context) (*Tout, error)

Result returns the result of the job from result step.

it doesn't wait for all steps to finish, you can use Result() after Wait() if desired.

type JobOptionPreparer

type JobOptionPreparer func(*JobExecutionOptions) *JobExecutionOptions

func WithJobId

func WithJobId(jobId string) JobOptionPreparer

func WithSequentialExecution

func WithSequentialExecution() JobOptionPreparer

type RetryPolicy

type RetryPolicy interface {
	ShouldRetry(error) (bool, time.Duration)
}

type RetryReport

type RetryReport struct {
	Count int
}

RetryReport would record the retry count (could extend to include each retry duration, ...)

type StepContextPolicy

type StepContextPolicy func(context.Context, StepInstanceMeta) context.Context

StepContextPolicy allows context enrichment before passing to step.

With StepInstanceMeta you can access StepInstance, StepDefinition, JobInstance, JobDefinition.

type StepDefinition

type StepDefinition[T any] struct {
	// contains filtered or unexported fields
}

StepDefinition defines a step and it's dependencies in a job definition.

func AddStep

func AddStep[JT, ST any](bCtx context.Context, j *JobDefinition[JT], stepName string, stepFuncCreator func(input *JT) asynctask.AsyncFunc[ST], optionDecorators ...ExecutionOptionPreparer) (*StepDefinition[ST], error)

AddStep adds a step to the job definition.

func AddStepWithStaticFunc

func AddStepWithStaticFunc[JT, ST any](bCtx context.Context, j *JobDefinition[JT], stepName string, stepFunc asynctask.AsyncFunc[ST], optionDecorators ...ExecutionOptionPreparer) (*StepDefinition[ST], error)

AddStepWithStaticFunc is same as AddStep, but the stepFunc passed in shouldn't have receiver. (or you get shared state between job instances)

func StepAfter

func StepAfter[JT, PT, ST any](bCtx context.Context, j *JobDefinition[JT], stepName string, parentStep *StepDefinition[PT], stepAfterFuncCreator func(input *JT) asynctask.ContinueFunc[PT, ST], optionDecorators ...ExecutionOptionPreparer) (*StepDefinition[ST], error)

StepAfter add a step after a preceding step, also take input from that preceding step

func StepAfterBoth

func StepAfterBoth[JT, PT1, PT2, ST any](bCtx context.Context, j *JobDefinition[JT], stepName string, parentStep1 *StepDefinition[PT1], parentStep2 *StepDefinition[PT2], stepAfterBothFuncCreator func(input *JT) asynctask.AfterBothFunc[PT1, PT2, ST], optionDecorators ...ExecutionOptionPreparer) (*StepDefinition[ST], error)

StepAfterBoth add a step after both preceding steps, also take input from both preceding steps

func StepAfterBothWithStaticFunc

func StepAfterBothWithStaticFunc[JT, PT1, PT2, ST any](bCtx context.Context, j *JobDefinition[JT], stepName string, parentStep1 *StepDefinition[PT1], parentStep2 *StepDefinition[PT2], stepFunc asynctask.AfterBothFunc[PT1, PT2, ST], optionDecorators ...ExecutionOptionPreparer) (*StepDefinition[ST], error)

StepAfterBothWithStaticFunc is same as StepAfterBoth, but the stepFunc passed in shouldn't have receiver. (or you get shared state between job instances)

func StepAfterWithStaticFunc

func StepAfterWithStaticFunc[JT, PT, ST any](bCtx context.Context, j *JobDefinition[JT], stepName string, parentStep *StepDefinition[PT], stepFunc asynctask.ContinueFunc[PT, ST], optionDecorators ...ExecutionOptionPreparer) (*StepDefinition[ST], error)

StepAfterWithStaticFunc is same as StepAfter, but the stepFunc passed in shouldn't have receiver. (or you get shared state between job instances)

func (*StepDefinition[T]) DependsOn

func (sd *StepDefinition[T]) DependsOn() []string

func (*StepDefinition[T]) DotSpec

func (sd *StepDefinition[T]) DotSpec() *graph.DotNodeSpec

func (*StepDefinition[T]) ExecutionPolicy

func (sd *StepDefinition[T]) ExecutionPolicy() *StepExecutionOptions

func (*StepDefinition[T]) GetName

func (sd *StepDefinition[T]) GetName() string

type StepDefinitionMeta

type StepDefinitionMeta interface {

	// GetName return name of the step
	GetName() string

	// DependsOn return the list of step names that this step depends on
	DependsOn() []string

	// ExecutionPolicy return the execution policy of the step
	ExecutionPolicy() *StepExecutionOptions

	// DotSpec used for generating graphviz graph
	DotSpec() *graph.DotNodeSpec
	// contains filtered or unexported methods
}

StepDefinitionMeta is the interface for a step definition

type StepErrorPolicy

type StepErrorPolicy struct{}

type StepExecutionData

type StepExecutionData struct {
	StartTime time.Time
	Duration  time.Duration
	Retried   *RetryReport
}

StepExecutionData would measure the step execution time and retry report.

type StepExecutionOptions

type StepExecutionOptions struct {
	Timeout       time.Duration
	ErrorPolicy   StepErrorPolicy
	RetryPolicy   RetryPolicy
	ContextPolicy StepContextPolicy

	// dependencies that are not input.
	DependOn []string
}

type StepInstance

type StepInstance[T any] struct {
	Definition  *StepDefinition[T]
	JobInstance JobInstanceMeta
	// contains filtered or unexported fields
}

StepInstance is the instance of a step, within a job instance.

func (*StepInstance[T]) DotSpec

func (si *StepInstance[T]) DotSpec() *graph.DotNodeSpec

func (*StepInstance[T]) EnrichContext

func (si *StepInstance[T]) EnrichContext(ctx context.Context) (result context.Context)

func (*StepInstance[T]) ExecutionData

func (si *StepInstance[T]) ExecutionData() *StepExecutionData

func (*StepInstance[T]) GetJobInstance

func (si *StepInstance[T]) GetJobInstance() JobInstanceMeta

func (*StepInstance[T]) GetName

func (si *StepInstance[T]) GetName() string

func (*StepInstance[T]) GetState

func (si *StepInstance[T]) GetState() StepState

func (*StepInstance[T]) GetStepDefinition

func (si *StepInstance[T]) GetStepDefinition() StepDefinitionMeta

func (*StepInstance[T]) Waitable

func (si *StepInstance[T]) Waitable() asynctask.Waitable

type StepInstanceMeta

type StepInstanceMeta interface {
	GetName() string
	ExecutionData() *StepExecutionData
	GetState() StepState
	GetJobInstance() JobInstanceMeta
	GetStepDefinition() StepDefinitionMeta
	Waitable() asynctask.Waitable

	DotSpec() *graph.DotNodeSpec
}

StepInstanceMeta is the interface for a step instance

type StepState

type StepState string
const StepStateCompleted StepState = "completed"
const StepStateFailed StepState = "failed"
const StepStatePending StepState = "pending"
const StepStateRunning StepState = "running"

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL