pipeline

package
v0.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 19, 2021 License: MIT Imports: 36 Imported by: 0

Documentation

Index

Constants

View Source
const (
	MaybeBoolTrue  = MaybeBool("true")
	MaybeBoolFalse = MaybeBool("false")
	MaybeBoolNull  = MaybeBool("")
)
View Source
const (
	DotStr = `` /* 624-byte string literal not displayed */

)

Variables

View Source
var (
	ErrWrongInputCardinality = errors.New("wrong number of task inputs")
	ErrBadInput              = errors.New("bad input for task")
)
View Source
var (
	ErrNoSuchBridge = errors.New("no such bridge exists")
)
View Source
var (
	ErrRunPanicked = errors.New("pipeline run panicked")
)

Functions

func FindBridge

func FindBridge(db *gorm.DB, name models.TaskType) (models.BridgeType, error)

FindBridge find a bridge using the given database

func NewORM

func NewORM(db *gorm.DB, config Config, eventBroadcaster postgres.EventBroadcaster) *orm

func NewRunner

func NewRunner(orm ORM, config Config) *runner

Types

type AnyTask

type AnyTask struct {
	BaseTask `mapstructure:",squash"`
}

AnyTask picks a value at random from the set of non-errored inputs. If there are zero non-errored inputs then it returns an error.

func (*AnyTask) Run

func (t *AnyTask) Run(_ context.Context, _ JSONSerializable, inputs []Result) (result Result)

func (*AnyTask) SetDefaults

func (t *AnyTask) SetDefaults(inputValues map[string]string, g TaskDAG, self taskDAGNode) error

func (*AnyTask) Type

func (t *AnyTask) Type() TaskType

type BaseTask

type BaseTask struct {
	Index   int32         `mapstructure:"index" json:"-" `
	Timeout time.Duration `mapstructure:"timeout"`
	// contains filtered or unexported fields
}

func NewBaseTask

func NewBaseTask(dotID string, t Task, index int32, nPreds int) BaseTask

func (BaseTask) DotID

func (t BaseTask) DotID() string

func (BaseTask) NPreds

func (t BaseTask) NPreds() int

func (BaseTask) OutputIndex

func (t BaseTask) OutputIndex() int32

func (BaseTask) OutputTask

func (t BaseTask) OutputTask() Task

func (*BaseTask) SetOutputTask

func (t *BaseTask) SetOutputTask(outputTask Task)

func (BaseTask) TaskTimeout

func (t BaseTask) TaskTimeout() (time.Duration, bool)

type BridgeTask

type BridgeTask struct {
	BaseTask `mapstructure:",squash"`

	Name        string          `json:"name"`
	RequestData HttpRequestData `json:"requestData"`
	// contains filtered or unexported fields
}

func (BridgeTask) ExportedEquals

func (t BridgeTask) ExportedEquals(otherTask Task) bool

func (*BridgeTask) HelperSetConfigAndTxDB

func (t *BridgeTask) HelperSetConfigAndTxDB(config Config, txdb *gorm.DB)

func (*BridgeTask) Run

func (t *BridgeTask) Run(ctx context.Context, meta JSONSerializable, inputs []Result) (result Result)

func (*BridgeTask) SetDefaults

func (t *BridgeTask) SetDefaults(inputValues map[string]string, g TaskDAG, self taskDAGNode) error

func (*BridgeTask) Type

func (t *BridgeTask) Type() TaskType

type Config

type Config interface {
	BridgeResponseURL() *url.URL
	DatabaseMaximumTxDuration() time.Duration
	DatabaseURL() url.URL
	DefaultHTTPLimit() int64
	DefaultHTTPTimeout() models.Duration
	DefaultMaxHTTPAttempts() uint
	DefaultHTTPAllowUnrestrictedNetworkAccess() bool
	TriggerFallbackDBPollInterval() time.Duration
	JobPipelineMaxRunDuration() time.Duration
	JobPipelineReaperInterval() time.Duration
	JobPipelineReaperThreshold() time.Duration
}

type FinalResult

type FinalResult struct {
	Values []interface{}
	Errors []error
}

FinalResult is the result of a Run

func (FinalResult) ErrorsDB

func (result FinalResult) ErrorsDB() RunErrors

ErrorsDB dumps a result error for a pipeline_run

func (FinalResult) HasErrors

func (result FinalResult) HasErrors() bool

HasErrors returns true if the final result has any errors

func (FinalResult) OutputsDB

func (result FinalResult) OutputsDB() JSONSerializable

OutputsDB dumps a result output for a pipeline_run

func (FinalResult) SingularResult

func (result FinalResult) SingularResult() (Result, error)

SingularResult returns a single result if the FinalResult only has one set of outputs/errors

type HTTPTask

type HTTPTask struct {
	BaseTask                       `mapstructure:",squash"`
	Method                         string
	URL                            models.WebURL
	RequestData                    HttpRequestData `json:"requestData"`
	AllowUnrestrictedNetworkAccess MaybeBool
	// contains filtered or unexported fields
}

func (HTTPTask) ExportedEquals

func (t HTTPTask) ExportedEquals(otherTask Task) bool

func (*HTTPTask) HelperSetConfig

func (t *HTTPTask) HelperSetConfig(config Config)

func (*HTTPTask) Run

func (t *HTTPTask) Run(ctx context.Context, _ JSONSerializable, inputs []Result) Result

func (*HTTPTask) SetDefaults

func (t *HTTPTask) SetDefaults(inputValues map[string]string, g TaskDAG, self taskDAGNode) error

func (*HTTPTask) Type

func (t *HTTPTask) Type() TaskType

type HttpRequestData

type HttpRequestData map[string]interface{}

func (HttpRequestData) AsMap

func (h HttpRequestData) AsMap() map[string]interface{}

func (*HttpRequestData) Scan

func (h *HttpRequestData) Scan(value interface{}) error

func (HttpRequestData) Value

func (h HttpRequestData) Value() (driver.Value, error)

type JSONParseTask

type JSONParseTask struct {
	BaseTask `mapstructure:",squash"`
	Path     JSONPath `json:"path"`
	// Lax when disabled will return an error if the path does not exist
	// Lax when enabled will return nil with no error if the path does not exist
	Lax bool
}

func (JSONParseTask) ExportedEquals

func (t JSONParseTask) ExportedEquals(otherTask Task) bool

func (*JSONParseTask) Run

func (t *JSONParseTask) Run(_ context.Context, _ JSONSerializable, inputs []Result) (result Result)

func (*JSONParseTask) SetDefaults

func (t *JSONParseTask) SetDefaults(inputValues map[string]string, g TaskDAG, self taskDAGNode) error

func (*JSONParseTask) Type

func (t *JSONParseTask) Type() TaskType

type JSONPath

type JSONPath []string

func (*JSONPath) Scan

func (p *JSONPath) Scan(value interface{}) error

func (*JSONPath) UnmarshalText

func (p *JSONPath) UnmarshalText(bs []byte) error

func (JSONPath) Value

func (p JSONPath) Value() (driver.Value, error)

type JSONSerializable

type JSONSerializable struct {
	Val  interface{}
	Null bool
}

func (JSONSerializable) MarshalJSON

func (js JSONSerializable) MarshalJSON() ([]byte, error)

func (*JSONSerializable) Scan

func (js *JSONSerializable) Scan(value interface{}) error

func (*JSONSerializable) UnmarshalJSON

func (js *JSONSerializable) UnmarshalJSON(bs []byte) error

func (JSONSerializable) Value

func (js JSONSerializable) Value() (driver.Value, error)

type MaybeBool

type MaybeBool string

func MaybeBoolFromString

func MaybeBoolFromString(s string) (MaybeBool, error)

func (MaybeBool) Bool

func (m MaybeBool) Bool() (b bool, isSet bool)

type MedianTask

type MedianTask struct {
	BaseTask      `mapstructure:",squash"`
	AllowedFaults uint64 `json:"allowedFaults"`
}

func (MedianTask) ExportedEquals

func (t MedianTask) ExportedEquals(otherTask Task) bool

func (*MedianTask) Run

func (t *MedianTask) Run(_ context.Context, _ JSONSerializable, inputs []Result) (result Result)

func (*MedianTask) SetDefaults

func (t *MedianTask) SetDefaults(inputValues map[string]string, g TaskDAG, self taskDAGNode) error

func (*MedianTask) Type

func (t *MedianTask) Type() TaskType

type MultiplyTask

type MultiplyTask struct {
	BaseTask `mapstructure:",squash"`
	Times    decimal.Decimal `json:"times"`
}

func (MultiplyTask) ExportedEquals

func (t MultiplyTask) ExportedEquals(otherTask Task) bool

func (*MultiplyTask) Run

func (t *MultiplyTask) Run(_ context.Context, _ JSONSerializable, inputs []Result) (result Result)

func (*MultiplyTask) SetDefaults

func (t *MultiplyTask) SetDefaults(inputValues map[string]string, g TaskDAG, self taskDAGNode) error

func (*MultiplyTask) Type

func (t *MultiplyTask) Type() TaskType

type ORM

type ORM interface {
	CreateSpec(ctx context.Context, db *gorm.DB, taskDAG TaskDAG, maxTaskTimeout models.Interval) (int32, error)
	InsertFinishedRun(ctx context.Context, run Run, trrs []TaskRunResult, saveSuccessfulTaskRuns bool) (runID int64, err error)
	DeleteRunsOlderThan(threshold time.Duration) error
	FindBridge(name models.TaskType) (models.BridgeType, error)
	FindRun(id int64) (Run, error)
	DB() *gorm.DB
}

type PanicTask

type PanicTask struct {
	BaseTask `mapstructure:",squash"`
	Msg      string
}

func (*PanicTask) Run

func (t *PanicTask) Run(_ context.Context, _ JSONSerializable, _ []Result) (result Result)

func (*PanicTask) SetDefaults

func (t *PanicTask) SetDefaults(_ map[string]string, _ TaskDAG, _ taskDAGNode) error

func (*PanicTask) Type

func (t *PanicTask) Type() TaskType

type PossibleErrorResponses

type PossibleErrorResponses struct {
	Error        string `json:"error"`
	ErrorMessage string `json:"errorMessage"`
}

type Result

type Result struct {
	Value interface{}
	Error error
}

Result is the result of a TaskRun

func (Result) ErrorDB

func (result Result) ErrorDB() null.String

ErrorDB dumps a single result error for a pipeline_task_run

func (Result) OutputDB

func (result Result) OutputDB() JSONSerializable

OutputDB dumps a single result output for a pipeline_run or pipeline_task_run

type Run

type Run struct {
	ID             int64            `json:"-" gorm:"primary_key"`
	PipelineSpecID int32            `json:"-"`
	PipelineSpec   Spec             `json:"pipelineSpec"`
	Meta           JSONSerializable `json:"meta"`
	// The errors are only ever strings
	// DB example: [null, null, "my error"]
	Errors RunErrors `json:"errors" gorm:"type:jsonb"`
	// The outputs can be anything.
	// DB example: [1234, {"a": 10}, null]
	Outputs          JSONSerializable `json:"outputs" gorm:"type:jsonb"`
	CreatedAt        time.Time        `json:"createdAt"`
	FinishedAt       *time.Time       `json:"finishedAt"`
	PipelineTaskRuns []TaskRun        `json:"taskRuns" gorm:"foreignkey:PipelineRunID;->"`
}

func (Run) GetID

func (r Run) GetID() string

func (Run) HasErrors

func (r Run) HasErrors() bool

func (*Run) SetID

func (r *Run) SetID(value string) error

func (*Run) Status

func (r *Run) Status() RunStatus

Status determines the status of the run.

func (Run) TableName

func (Run) TableName() string

type RunErrors

type RunErrors []null.String

func (RunErrors) HasError

func (re RunErrors) HasError() bool

func (*RunErrors) Scan

func (re *RunErrors) Scan(value interface{}) error

func (RunErrors) Value

func (re RunErrors) Value() (driver.Value, error)

type RunStatus

type RunStatus int

RunStatus represents the status of a run

const (
	// RunStatusUnknown is the when the run status cannot be determined.
	RunStatusUnknown RunStatus = iota
	// RunStatusInProgress is used for when a run is actively being executed.
	RunStatusInProgress
	// RunStatusErrored is used for when a run has errored and will not complete.
	RunStatusErrored
	// RunStatusCompleted is used for when a run has successfully completed execution.
	RunStatusCompleted
)

func (RunStatus) Completed

func (s RunStatus) Completed() bool

Completed returns true if the status is RunStatusCompleted.

func (RunStatus) Errored

func (s RunStatus) Errored() bool

Errored returns true if the status is RunStatusErrored.

func (RunStatus) Finished

func (s RunStatus) Finished() bool

Finished returns true if the status is final and can't be changed.

type RunWithResults

type RunWithResults struct {
	Run            Run
	TaskRunResults TaskRunResults
}

type Runner

type Runner interface {
	// Start spawns a background routine to delete old pipeline runs.
	Start() error
	Close() error

	// We expect spec.JobID and spec.JobName to be set for logging/prometheus.
	// ExecuteRun executes a new run in-memory according to a spec and returns the results.
	ExecuteRun(ctx context.Context, spec Spec, meta JSONSerializable, l logger.Logger) (trrs TaskRunResults, err error)
	// InsertFinishedRun saves the run results in the database.
	InsertFinishedRun(ctx context.Context, run Run, trrs TaskRunResults, saveSuccessfulTaskRuns bool) (int64, error)

	// ExecuteAndInsertNewRun executes a new run in-memory according to a spec, persists and saves the results.
	// It is a combination of ExecuteRun and InsertFinishedRun.
	// Note that the spec MUST have a DOT graph for this to work.
	ExecuteAndInsertFinishedRun(ctx context.Context, spec Spec, meta JSONSerializable, l logger.Logger, saveSuccessfulTaskRuns bool) (runID int64, finalResult FinalResult, err error)
}

type SafeTx

type SafeTx struct {
	// contains filtered or unexported fields
}

Bundled tx and txmutex for multiple goroutines inside the same transaction. This mutex is necessary to work to avoid concurrent database calls inside the same transaction to fail. With the pq driver: `pq: unexpected Parse response 'C'` With the pgx driver: `conn busy`.

type Spec

type Spec struct {
	ID              int32           `gorm:"primary_key"`
	DotDagSource    string          `json:"dotDagSource"`
	CreatedAt       time.Time       `json:"-"`
	MaxTaskDuration models.Interval `json:"-"`

	JobID   int32  `gorm:"-" json:"-"`
	JobName string `gorm:"-" json:"-"`
}

func (Spec) TableName

func (Spec) TableName() string

func (Spec) TasksInDependencyOrder

func (s Spec) TasksInDependencyOrder() ([]Task, error)

type Task

type Task interface {
	Type() TaskType
	DotID() string
	Run(ctx context.Context, meta JSONSerializable, inputs []Result) Result
	OutputTask() Task
	SetOutputTask(task Task)
	OutputIndex() int32
	TaskTimeout() (time.Duration, bool)
	SetDefaults(inputValues map[string]string, g TaskDAG, self taskDAGNode) error
	NPreds() int
}

func UnmarshalTaskFromMap

func UnmarshalTaskFromMap(taskType TaskType, taskMap interface{}, dotID string, config Config, txdb *gorm.DB, txdbMutex *sync.Mutex, nPreds int) (_ Task, err error)

type TaskDAG

type TaskDAG struct {
	*simple.DirectedGraph
	DOTSource string
}

TaskDAG fulfills the graph.DirectedGraph interface, which makes it possible for us to `dot.Unmarshal(...)` a DOT string directly into it. Once unmarshalled, calling `TaskDAG#TasksInDependencyOrder()` will return the unmarshaled tasks. NOTE: We only permit one child

func NewTaskDAG

func NewTaskDAG() *TaskDAG

func (*TaskDAG) HasCycles

func (g *TaskDAG) HasCycles() bool

func (TaskDAG) MinTimeout

func (g TaskDAG) MinTimeout() (time.Duration, bool, error)

func (*TaskDAG) NewNode

func (g *TaskDAG) NewNode() graph.Node

func (TaskDAG) TasksInDependencyOrder

func (g TaskDAG) TasksInDependencyOrder() ([]Task, error)

Returns a slice of Tasks starting at the outputs of the DAG and ending at the inputs. As you iterate through this slice, you can expect that any individual Task's outputs will already have been traversed.

func (*TaskDAG) UnmarshalText

func (g *TaskDAG) UnmarshalText(bs []byte) (err error)

type TaskRun

type TaskRun struct {
	ID            int64             `json:"-" gorm:"primary_key"`
	Type          TaskType          `json:"type"`
	PipelineRun   Run               `json:"-"`
	PipelineRunID int64             `json:"-"`
	Output        *JSONSerializable `json:"output" gorm:"type:jsonb"`
	Error         null.String       `json:"error"`
	CreatedAt     time.Time         `json:"createdAt"`
	FinishedAt    *time.Time        `json:"finishedAt"`
	Index         int32             `json:"index"`
	DotID         string            `json:"dotId"`
}

func (TaskRun) GetDotID

func (tr TaskRun) GetDotID() string

func (TaskRun) GetID

func (tr TaskRun) GetID() string

func (TaskRun) Result

func (tr TaskRun) Result() Result

func (*TaskRun) SetID

func (tr *TaskRun) SetID(value string) error

func (TaskRun) TableName

func (TaskRun) TableName() string

type TaskRunResult

type TaskRunResult struct {
	ID         int64
	Task       Task
	TaskRun    TaskRun
	Result     Result
	CreatedAt  time.Time
	FinishedAt time.Time
	IsTerminal bool
}

TaskRunResult describes the result of a task run, suitable for database update or insert. ID might be zero if the TaskRun has not been inserted yet TaskSpecID will always be non-zero

type TaskRunResults

type TaskRunResults []TaskRunResult

TaskRunResults represents a collection of results for all task runs for one pipeline run

func (TaskRunResults) FinalResult

func (trrs TaskRunResults) FinalResult() FinalResult

FinalResult pulls the FinalResult for the pipeline_run from the task runs It needs to respect the output index of each task

type TaskType

type TaskType string
const (
	TaskTypeHTTP      TaskType = "http"
	TaskTypeBridge    TaskType = "bridge"
	TaskTypeMedian    TaskType = "median"
	TaskTypeMultiply  TaskType = "multiply"
	TaskTypeJSONParse TaskType = "jsonparse"
	TaskTypeAny       TaskType = "any"

	// Testing only.
	TaskTypePanic TaskType = "panic"
)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL