Documentation ¶
Index ¶
- func NewLambdaHandler(dag *DAG) lambda.Handler
- func Run(args []string, dag *DAG) error
- func RunWithContext(ctx context.Context, args []string, dag *DAG) error
- func WithCircuitBreaker(num int) func(opts *DAGOptions) error
- func WithDAGLogger(fn func(context.Context, *DAGRunContext) (*log.Logger, error)) func(opts *DAGOptions) error
- func WithNumOfTasksInSingleInvoke(num int) func(opts *DAGOptions) error
- func WithTaskLocker(fn func(context.Context, *DAGRunContext) (LockerWithError, error)) func(opts *TaskOptions) error
- func WithTaskLogger(fn func(context.Context, *DAGRunContext) (*log.Logger, error)) func(opts *TaskOptions) error
- func WrapTaskRetryable(err error) error
- type AncestorDescendantSameError
- type CycleDetectedInDAGError
- type DAG
- func (dag *DAG) AddDependency(ancestor *Task, descendant *Task) error
- func (dag *DAG) CircuitBreaker() int
- func (dag *DAG) Execute(ctx context.Context, dagRunCtx *DAGRunContext) (*DAGRunContext, error)
- func (dag *DAG) GetAllTasks() []*Task
- func (dag *DAG) GetAncestorTasks(taskID string) []*Task
- func (dag *DAG) GetDescendantTasks(taskID string) []*Task
- func (dag *DAG) GetDownstreamTasks(taskID string) []*Task
- func (dag *DAG) GetExecutableTasks(finishedTaskIDs []string) []*Task
- func (dag *DAG) GetStartTasks() []*Task
- func (dag *DAG) GetTask(taskID string) (*Task, bool)
- func (dag *DAG) GetUpstreamTasks(taskID string) []*Task
- func (dag *DAG) ID() string
- func (dag *DAG) IsExecutableTask(taskID string, finishedTaskIDs []string) bool
- func (dag *DAG) NewLogger(ctx context.Context, dagRunCtx *DAGRunContext) (*log.Logger, error)
- func (dag *DAG) NewTask(taskID string, handler TaskHandler, optFns ...func(opts *TaskOptions) error) (*Task, error)
- func (dag *DAG) NumOfTasksInSingleInvoke() int
- func (dag *DAG) WarkAllDependencies(fn func(ancestor *Task, descendant *Task) error) error
- type DAGOptions
- type DAGRunContext
- type LambdaAPIStubMux
- type LambdaHandler
- type LockerWithError
- type LockerWithoutError
- type NopLocker
- type Task
- func (task *Task) Execute(ctx context.Context, dagRunCtx *DAGRunContext) (json.RawMessage, error)
- func (task *Task) GoString() string
- func (task *Task) ID() string
- func (task *Task) NewLocker(ctx context.Context, dagRunCtx *DAGRunContext) (LockerWithError, error)
- func (task *Task) NewLogger(ctx context.Context, dagRunCtx *DAGRunContext) (*log.Logger, error)
- func (task *Task) SetDownstream(descendants ...*Task) error
- func (task *Task) SetUpstream(ancestors ...*Task) error
- func (task *Task) String() string
- func (task *Task) TaskHandler() TaskHandler
- type TaskDependencyDuplicateError
- type TaskHandler
- type TaskHandlerFunc
- type TaskIDDuplicateError
- type TaskOptions
- type TaskRequest
- type TaskRetryableError
- type UnknownError
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func NewLambdaHandler ¶
func WithCircuitBreaker ¶
func WithCircuitBreaker(num int) func(opts *DAGOptions) error
func WithDAGLogger ¶
func WithDAGLogger(fn func(context.Context, *DAGRunContext) (*log.Logger, error)) func(opts *DAGOptions) error
func WithNumOfTasksInSingleInvoke ¶
func WithNumOfTasksInSingleInvoke(num int) func(opts *DAGOptions) error
func WithTaskLocker ¶
func WithTaskLocker(fn func(context.Context, *DAGRunContext) (LockerWithError, error)) func(opts *TaskOptions) error
func WithTaskLogger ¶
func WithTaskLogger(fn func(context.Context, *DAGRunContext) (*log.Logger, error)) func(opts *TaskOptions) error
func WrapTaskRetryable ¶
Types ¶
type AncestorDescendantSameError ¶
func (*AncestorDescendantSameError) Error ¶
func (err *AncestorDescendantSameError) Error() string
type CycleDetectedInDAGError ¶
func (*CycleDetectedInDAGError) Error ¶
func (err *CycleDetectedInDAGError) Error() string
type DAG ¶
type DAG struct {
// contains filtered or unexported fields
}
func (*DAG) CircuitBreaker ¶
func (*DAG) Execute ¶
func (dag *DAG) Execute(ctx context.Context, dagRunCtx *DAGRunContext) (*DAGRunContext, error)
func (*DAG) GetAllTasks ¶
func (*DAG) GetAncestorTasks ¶
func (*DAG) GetDescendantTasks ¶
func (*DAG) GetDownstreamTasks ¶
func (*DAG) GetExecutableTasks ¶
func (*DAG) GetStartTasks ¶
func (*DAG) GetUpstreamTasks ¶
func (*DAG) IsExecutableTask ¶
func (*DAG) NewTask ¶
func (dag *DAG) NewTask(taskID string, handler TaskHandler, optFns ...func(opts *TaskOptions) error) (*Task, error)
func (*DAG) NumOfTasksInSingleInvoke ¶
type DAGOptions ¶
type DAGOptions struct {
// contains filtered or unexported fields
}
type DAGRunContext ¶
type DAGRunContext struct { DAGRunID string `json:"DAGRunId"` DAGRunStartAt time.Time `json:"DAGRunStartAt"` DAGRunConfig json.RawMessage `json:"DAGRunConfig"` TaskResponses map[string]json.RawMessage `json:"TaskResponses,omitempty"` LambdaCallCount int `json:"LambdaCallCount"` Continue bool `json:"Continue"` IsCircuitBreak bool `json:"IsCircuitBreak"` }
type LambdaAPIStubMux ¶
type LambdaAPIStubMux struct {
// contains filtered or unexported fields
}
func NewLambdaAPIStubMux ¶
func NewLambdaAPIStubMux(functionName string, handler lambda.Handler) *LambdaAPIStubMux
func (*LambdaAPIStubMux) ServeHTTP ¶
func (mux *LambdaAPIStubMux) ServeHTTP(w http.ResponseWriter, r *http.Request)
type LambdaHandler ¶
type LambdaHandler struct {
// contains filtered or unexported fields
}
func (*LambdaHandler) Invoke ¶
func (h *LambdaHandler) Invoke(ctx context.Context, payload json.RawMessage) (interface{}, error)
type LockerWithError ¶
type LockerWithoutError ¶
func NewLockerWithoutError ¶
func NewLockerWithoutError(l sync.Locker) LockerWithoutError
func (LockerWithoutError) LockWithErr ¶
func (l LockerWithoutError) LockWithErr(_ context.Context) (bool, error)
func (LockerWithoutError) UnlockWithErr ¶
func (l LockerWithoutError) UnlockWithErr(_ context.Context) error
type Task ¶
type Task struct {
// contains filtered or unexported fields
}
func (*Task) Execute ¶
func (task *Task) Execute(ctx context.Context, dagRunCtx *DAGRunContext) (json.RawMessage, error)
func (*Task) NewLocker ¶
func (task *Task) NewLocker(ctx context.Context, dagRunCtx *DAGRunContext) (LockerWithError, error)
func (*Task) SetDownstream ¶
func (*Task) SetUpstream ¶
func (*Task) TaskHandler ¶
func (task *Task) TaskHandler() TaskHandler
type TaskDependencyDuplicateError ¶
func (*TaskDependencyDuplicateError) Error ¶
func (err *TaskDependencyDuplicateError) Error() string
type TaskHandler ¶
type TaskHandler interface {
Invoke(context.Context, *TaskRequest) (interface{}, error)
}
type TaskHandlerFunc ¶
type TaskHandlerFunc func(context.Context, *TaskRequest) (interface{}, error)
func (TaskHandlerFunc) Invoke ¶
func (h TaskHandlerFunc) Invoke(ctx context.Context, req *TaskRequest) (interface{}, error)
type TaskIDDuplicateError ¶
type TaskIDDuplicateError struct {
TaskID string
}
func (*TaskIDDuplicateError) Error ¶
func (err *TaskIDDuplicateError) Error() string
type TaskOptions ¶
type TaskOptions struct {
// contains filtered or unexported fields
}
type TaskRequest ¶
type TaskRequest struct { DAGRunID string DAGRunConfig json.RawMessage TaskResponses map[string]json.RawMessage Logger *log.Logger }
type TaskRetryableError ¶
type TaskRetryableError struct {
// contains filtered or unexported fields
}
func (TaskRetryableError) Error ¶
func (err TaskRetryableError) Error() string
func (TaskRetryableError) Unwrap ¶
func (err TaskRetryableError) Unwrap() error
type UnknownError ¶
type UnknownError struct {
// contains filtered or unexported fields
}
func (UnknownError) Error ¶
func (err UnknownError) Error() string
func (UnknownError) Unwrap ¶
func (err UnknownError) Unwrap() error
Click to show internal directories.
Click to hide internal directories.