pipeline

package
v0.0.0-...-e348512 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 6, 2021 License: MIT Imports: 30 Imported by: 0

Documentation

Index

Constants

View Source
const ResultTaskDotID = "__result__"

Variables

View Source
var (
	ErrWrongInputCardinality = errors.New("wrong number of task inputs")
	ErrBadInput              = errors.New("bad input for task")
)

Functions

func FindBridge

func FindBridge(db *gorm.DB, name models.TaskType) (models.BridgeType, error)

FindBridge find a bridge using the given database

func NewORM

func NewORM(db *gorm.DB, config Config, eventBroadcaster postgres.EventBroadcaster) *orm

func NewRunner

func NewRunner(orm ORM, config Config) *runner

func WrapResultIfError

func WrapResultIfError(result *Result, msg string, args ...interface{})

Types

type BaseTask

type BaseTask struct {
	Index int32 `mapstructure:"index" json:"-" `
	// contains filtered or unexported fields
}

func (BaseTask) DotID

func (t BaseTask) DotID() string

func (BaseTask) OutputIndex

func (t BaseTask) OutputIndex() int32

func (BaseTask) OutputTask

func (t BaseTask) OutputTask() Task

func (*BaseTask) SetOutputTask

func (t *BaseTask) SetOutputTask(outputTask Task)

type BridgeTask

type BridgeTask struct {
	BaseTask `mapstructure:",squash"`

	Name        string          `json:"name"`
	RequestData HttpRequestData `json:"requestData"`
	// contains filtered or unexported fields
}

func (*BridgeTask) Run

func (t *BridgeTask) Run(ctx context.Context, taskRun TaskRun, inputs []Result) (result Result)

func (*BridgeTask) Type

func (t *BridgeTask) Type() TaskType

type Config

type Config interface {
	BridgeResponseURL() *url.URL
	DatabaseMaximumTxDuration() time.Duration
	DatabaseURL() string
	DefaultHTTPLimit() int64
	DefaultHTTPTimeout() models.Duration
	DefaultMaxHTTPAttempts() uint
	DefaultHTTPAllowUnrestrictedNetworkAccess() bool
	JobPipelineDBPollInterval() time.Duration
	JobPipelineMaxTaskDuration() time.Duration
	JobPipelineParallelism() uint8
	JobPipelineReaperInterval() time.Duration
	JobPipelineReaperThreshold() time.Duration
}

type FinalErrors

type FinalErrors []null.String

func (FinalErrors) Error

func (fe FinalErrors) Error() string

func (*FinalErrors) Scan

func (fe *FinalErrors) Scan(value interface{}) error

func (FinalErrors) Value

func (fe FinalErrors) Value() (driver.Value, error)

type HTTPTask

type HTTPTask struct {
	BaseTask    `mapstructure:",squash"`
	Method      string
	URL         models.WebURL
	RequestData HttpRequestData `json:"requestData"`
	// contains filtered or unexported fields
}

func (*HTTPTask) Run

func (t *HTTPTask) Run(ctx context.Context, taskRun TaskRun, inputs []Result) Result

func (*HTTPTask) Type

func (t *HTTPTask) Type() TaskType

type HttpRequestData

type HttpRequestData map[string]interface{}

func (HttpRequestData) AsMap

func (h HttpRequestData) AsMap() map[string]interface{}

func (*HttpRequestData) Scan

func (h *HttpRequestData) Scan(value interface{}) error

func (HttpRequestData) Value

func (h HttpRequestData) Value() (driver.Value, error)

type JSONParseTask

type JSONParseTask struct {
	BaseTask `mapstructure:",squash"`
	Path     JSONPath `json:"path"`
	// Lax when disabled will return an error if the path does not exist
	// Lax when enabled will return nil with no error if the path does not exist
	Lax bool
}

func (*JSONParseTask) Run

func (t *JSONParseTask) Run(_ context.Context, taskRun TaskRun, inputs []Result) (result Result)

func (*JSONParseTask) Type

func (t *JSONParseTask) Type() TaskType

type JSONPath

type JSONPath []string

func (*JSONPath) Scan

func (p *JSONPath) Scan(value interface{}) error

func (*JSONPath) UnmarshalText

func (p *JSONPath) UnmarshalText(bs []byte) error

func (JSONPath) Value

func (p JSONPath) Value() (driver.Value, error)

type JSONSerializable

type JSONSerializable struct {
	Val interface{}
}

func (JSONSerializable) MarshalJSON

func (js JSONSerializable) MarshalJSON() ([]byte, error)

func (*JSONSerializable) Scan

func (js *JSONSerializable) Scan(value interface{}) error

func (*JSONSerializable) UnmarshalJSON

func (js *JSONSerializable) UnmarshalJSON(bs []byte) error

func (JSONSerializable) Value

func (js JSONSerializable) Value() (driver.Value, error)

type MedianTask

type MedianTask struct {
	BaseTask `mapstructure:",squash"`
}

func (*MedianTask) Run

func (t *MedianTask) Run(_ context.Context, taskRun TaskRun, inputs []Result) (result Result)

func (*MedianTask) Type

func (t *MedianTask) Type() TaskType

type MultiplyTask

type MultiplyTask struct {
	BaseTask `mapstructure:",squash"`
	Times    decimal.Decimal `json:"times"`
}

func (*MultiplyTask) Run

func (t *MultiplyTask) Run(_ context.Context, taskRun TaskRun, inputs []Result) (result Result)

func (*MultiplyTask) Type

func (t *MultiplyTask) Type() TaskType

type ORM

type ORM interface {
	CreateSpec(ctx context.Context, taskDAG TaskDAG) (int32, error)
	CreateRun(ctx context.Context, jobID int32, meta map[string]interface{}) (int64, error)
	ProcessNextUnclaimedTaskRun(ctx context.Context, fn ProcessTaskRunFunc) (bool, error)
	ListenForNewRuns() (postgres.Subscription, error)
	AwaitRun(ctx context.Context, runID int64) error
	RunFinished(runID int64) (bool, error)
	ResultsForRun(ctx context.Context, runID int64) ([]Result, error)
	DeleteRunsOlderThan(threshold time.Duration) error

	FindBridge(name models.TaskType) (models.BridgeType, error)
}

type PossibleErrorResponses

type PossibleErrorResponses struct {
	Error        string `json:"error"`
	ErrorMessage string `json:"errorMessage"`
}

type ProcessTaskRunFunc

type ProcessTaskRunFunc func(ctx context.Context, txdb *gorm.DB, jobID int32, ptRun TaskRun, predecessors []TaskRun) Result

type Result

type Result struct {
	Value interface{}
	Error error
}

type ResultTask

type ResultTask struct {
	BaseTask `mapstructure:",squash"`
}

ResultTask exists solely as a Postgres performance optimization. It's added automatically to the end of every pipeline, and it receives the outputs of all tasks that have no successor tasks. This allows the pipeline runner to detect when it has reached the end a given pipeline simply by checking the `successor_id` field, rather than having to try to SELECT all of the pipeline run's task runs, (which must be done from inside of a transaction, and causes lock contention and serialization anomaly issues).

func (*ResultTask) Run

func (t *ResultTask) Run(_ context.Context, taskRun TaskRun, inputs []Result) Result

func (*ResultTask) Type

func (t *ResultTask) Type() TaskType

type Run

type Run struct {
	ID               int64             `json:"-" gorm:"primary_key"`
	PipelineSpecID   int32             `json:"-"`
	PipelineSpec     Spec              `json:"pipelineSpec"`
	Meta             JSONSerializable  `json:"meta"`
	Errors           *JSONSerializable `json:"errors" gorm:"type:jsonb"`
	Outputs          *JSONSerializable `json:"outputs" gorm:"type:jsonb"`
	CreatedAt        time.Time         `json:"createdAt"`
	FinishedAt       *time.Time        `json:"finishedAt"`
	PipelineTaskRuns []TaskRun         `json:"taskRuns" gorm:"foreignkey:PipelineRunID;association_autoupdate:false;association_autocreate:false"`
}

func (Run) GetID

func (r Run) GetID() string

func (*Run) SetID

func (r *Run) SetID(value string) error

func (Run) TableName

func (Run) TableName() string

type Runner

type Runner interface {
	Start()
	Stop()
	CreateRun(ctx context.Context, jobID int32, meta map[string]interface{}) (int64, error)
	AwaitRun(ctx context.Context, runID int64) error
	ResultsForRun(ctx context.Context, runID int64) ([]Result, error)
}

Runner checks the DB for incomplete TaskRuns and runs them. For a TaskRun to be eligible to be run, its parent/input tasks must already all be complete.

type Spec

type Spec struct {
	ID           int32 `gorm:"primary_key"`
	DotDagSource string
	CreatedAt    time.Time
}

func (Spec) TableName

func (Spec) TableName() string

type Task

type Task interface {
	Type() TaskType
	DotID() string
	Run(ctx context.Context, taskRun TaskRun, inputs []Result) Result
	OutputTask() Task
	SetOutputTask(task Task)
	OutputIndex() int32
}

func UnmarshalTaskFromMap

func UnmarshalTaskFromMap(taskType TaskType, taskMap interface{}, dotID string, config Config, txdb *gorm.DB) (_ Task, err error)

type TaskDAG

type TaskDAG struct {
	*simple.DirectedGraph
	DOTSource string
}

TaskDAG fulfills the graph.DirectedGraph interface, which makes it possible for us to `dot.Unmarshal(...)` a DOT string directly into it. Once unmarshalled, calling `TaskDAG#TasksInDependencyOrder()` will return the unmarshaled tasks.

func NewTaskDAG

func NewTaskDAG() *TaskDAG

func (*TaskDAG) HasCycles

func (g *TaskDAG) HasCycles() bool

func (*TaskDAG) NewNode

func (g *TaskDAG) NewNode() graph.Node

func (TaskDAG) TasksInDependencyOrder

func (g TaskDAG) TasksInDependencyOrder() ([]Task, error)

Returns a slice of Tasks starting at the outputs of the DAG and ending at the inputs. As you iterate through this slice, you can expect that any individual Task's outputs will already have been traversed.

func (*TaskDAG) UnmarshalText

func (g *TaskDAG) UnmarshalText(bs []byte) (err error)

type TaskRun

type TaskRun struct {
	ID                 int64             `json:"-" gorm:"primary_key"`
	Type               TaskType          `json:"type"`
	PipelineRun        Run               `json:"-"`
	PipelineRunID      int64             `json:"-"`
	Output             *JSONSerializable `json:"output" gorm:"type:jsonb"`
	Error              null.String       `json:"error"`
	PipelineTaskSpecID int32             `json:"-"`
	PipelineTaskSpec   TaskSpec          `json:"taskSpec" gorm:"foreignkey:PipelineTaskSpecID;association_autoupdate:false;association_autocreate:false"`
	CreatedAt          time.Time         `json:"createdAt"`
	FinishedAt         *time.Time        `json:"finishedAt"`
}

func (TaskRun) DotID

func (tr TaskRun) DotID() string

func (TaskRun) GetID

func (tr TaskRun) GetID() string

func (TaskRun) Result

func (tr TaskRun) Result() Result

func (*TaskRun) SetID

func (tr *TaskRun) SetID(value string) error

func (TaskRun) TableName

func (TaskRun) TableName() string

type TaskSpec

type TaskSpec struct {
	ID             int32            `json:"-" gorm:"primary_key"`
	DotID          string           `json:"dotId"`
	PipelineSpecID int32            `json:"-"`
	Type           TaskType         `json:"-"`
	JSON           JSONSerializable `json:"-" gorm:"type:jsonb"`
	Index          int32            `json:"-"`
	SuccessorID    null.Int         `json:"-"`
	CreatedAt      time.Time        `json:"-"`
}

func (TaskSpec) IsFinalPipelineOutput

func (s TaskSpec) IsFinalPipelineOutput() bool

func (TaskSpec) TableName

func (TaskSpec) TableName() string

type TaskType

type TaskType string
const (
	TaskTypeHTTP      TaskType = "http"
	TaskTypeBridge    TaskType = "bridge"
	TaskTypeMedian    TaskType = "median"
	TaskTypeMultiply  TaskType = "multiply"
	TaskTypeJSONParse TaskType = "jsonparse"
	TaskTypeResult    TaskType = "result"
)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL