Documentation ¶
Overview ¶
Exececution tasks and executor for DAG of plan tasks can be embedded and used, or extended using Executor interface.
Index ¶
- Constants
- Variables
- func DisableRecover()
- func RegisterSqlDriver()
- type AggFunc
- type AggPartial
- type Aggregator
- type Command
- type DeletionScanner
- type DeletionTask
- type ErrChan
- type Executor
- type ExecutorSource
- type GroupBy
- type GroupByFinal
- type JobExecutor
- func (m *JobExecutor) Close() error
- func (m *JobExecutor) DrainChan() MessageChan
- func (m *JobExecutor) NewTask(p plan.Task) Task
- func (m *JobExecutor) Run() error
- func (m *JobExecutor) Setup() error
- func (m *JobExecutor) WalkChildren(p plan.Task, root Task) error
- func (m *JobExecutor) WalkCommand(p *plan.Command) (Task, error)
- func (m *JobExecutor) WalkDelete(p *plan.Delete) (Task, error)
- func (m *JobExecutor) WalkGroupBy(p *plan.GroupBy) (Task, error)
- func (m *JobExecutor) WalkHaving(p *plan.Having) (Task, error)
- func (m *JobExecutor) WalkInsert(p *plan.Insert) (Task, error)
- func (m *JobExecutor) WalkJoin(p *plan.JoinMerge) (Task, error)
- func (m *JobExecutor) WalkJoinKey(p *plan.JoinKey) (Task, error)
- func (m *JobExecutor) WalkPlan(p plan.Task) (Task, error)
- func (m *JobExecutor) WalkPlanAll(p plan.Task) (Task, error)
- func (m *JobExecutor) WalkPlanTask(p plan.Task) (Task, error)
- func (m *JobExecutor) WalkPreparedStatement(p *plan.PreparedStatement) (Task, error)
- func (m *JobExecutor) WalkProjection(p *plan.Projection) (Task, error)
- func (m *JobExecutor) WalkSelect(p *plan.Select) (Task, error)
- func (m *JobExecutor) WalkSource(p *plan.Source) (Task, error)
- func (m *JobExecutor) WalkSourceExec(p *plan.Source) (Task, error)
- func (m *JobExecutor) WalkUpdate(p *plan.Update) (Task, error)
- func (m *JobExecutor) WalkUpsert(p *plan.Upsert) (Task, error)
- func (m *JobExecutor) WalkWhere(p *plan.Where) (Task, error)
- type JobMaker
- type JobRunner
- type JoinKey
- type JoinMerge
- type KeyEvaluator
- type MessageChan
- type MessageHandler
- type Projection
- type RequiresContext
- type ResultBuffer
- type ResultExecWriter
- type ResultWriter
- type SigChan
- type Source
- type Task
- type TaskBase
- func (m *TaskBase) Add(task Task) error
- func (m *TaskBase) AddPlan(task plan.Task) error
- func (m *TaskBase) Children() []Task
- func (m *TaskBase) Close() error
- func (m *TaskBase) CloseFinal() error
- func (m *TaskBase) ErrChan() ErrChan
- func (m *TaskBase) MessageIn() MessageChan
- func (m *TaskBase) MessageInSet(ch MessageChan)
- func (m *TaskBase) MessageOut() MessageChan
- func (m *TaskBase) MessageOutSet(ch MessageChan)
- func (m *TaskBase) Quit()
- func (m *TaskBase) Run() error
- func (m *TaskBase) Setup(depth int) error
- func (m *TaskBase) SigChan() SigChan
- type TaskParallel
- type TaskPrinter
- type TaskRunner
- type TaskSequential
- type TaskStepper
- type Upsert
- type Where
Constants ¶
const (
ItemDefaultChannelSize = 50
)
const (
MaxAllowedPacket = 1024 * 1024
)
const (
MysqlTimeFormat = "2006-01-02 15:04:05.000000000"
)
Variables ¶
var ( // Standard errors ErrShuttingDown = fmt.Errorf("Received Shutdown Signal") ErrNotSupported = fmt.Errorf("QLBridge: Not supported") ErrNotImplemented = fmt.Errorf("QLBridge: Not implemented") ErrUnknownCommand = fmt.Errorf("QLBridge: Unknown Command") ErrInternalError = fmt.Errorf("QLBridge: Internal Error") ErrNoSchemaSelected = fmt.Errorf("No Schema Selected") )
Functions ¶
func DisableRecover ¶
func DisableRecover()
func RegisterSqlDriver ¶
func RegisterSqlDriver()
Types ¶
type AggPartial ¶
type Aggregator ¶
type Aggregator interface { Do(v value.Value) Result() interface{} Reset() Merge(*AggPartial) }
func NewCount ¶
func NewCount(col *rel.Column) Aggregator
func NewGroupByValue ¶
func NewGroupByValue(col *rel.Column) Aggregator
type Command ¶
type Command struct { *TaskBase // contains filtered or unexported fields }
Command is executeable task for SET SQL commands
func NewCommand ¶
NewCommand creates new command exec task
type DeletionScanner ¶
type DeletionScanner struct {
*DeletionTask
}
Delete scanner if we don't have a seek operation on this source
func (*DeletionScanner) Run ¶
func (m *DeletionScanner) Run() error
type DeletionTask ¶
type DeletionTask struct { *TaskBase // contains filtered or unexported fields }
Delete task for sources that natively support delete
func NewDelete ¶
func NewDelete(ctx *plan.Context, p *plan.Delete) *DeletionTask
An inserter to write to data source
func (*DeletionTask) Close ¶
func (m *DeletionTask) Close() error
func (*DeletionTask) Run ¶
func (m *DeletionTask) Run() error
type Executor ¶
type Executor interface { NewTask(p plan.Task) Task WalkPlan(p plan.Task) (Task, error) WalkSelect(p *plan.Select) (Task, error) WalkInsert(p *plan.Insert) (Task, error) WalkUpsert(p *plan.Upsert) (Task, error) WalkUpdate(p *plan.Update) (Task, error) WalkDelete(p *plan.Delete) (Task, error) WalkCommand(p *plan.Command) (Task, error) WalkPreparedStatement(p *plan.PreparedStatement) (Task, error) // Child Tasks WalkSource(p *plan.Source) (Task, error) WalkJoin(p *plan.JoinMerge) (Task, error) WalkJoinKey(p *plan.JoinKey) (Task, error) WalkWhere(p *plan.Where) (Task, error) WalkHaving(p *plan.Having) (Task, error) WalkGroupBy(p *plan.GroupBy) (Task, error) WalkProjection(p *plan.Projection) (Task, error) }
Executor defines standard Walk() pattern to create a executeable task dag from a plan dag
An implementation of WalkPlan() will be be able to execute/run a Statement
- inproc: ie, in process. qlbridge/exec package implements a non-distributed query-planner
- distributed: ie, run this job across multiple servers dataux/planner implements a distributed query-planner that distributes/runs tasks across multiple nodes
type ExecutorSource ¶
type ExecutorSource interface { // given our plan, turn that into a Task. WalkExecSource(p *plan.Source) (Task, error) }
Sources can often do their own execution-plan for sub-select statements
ie mysql can do its own (select, projection) mongo, es can as well
- provide interface to allow passing down select planning to source
type GroupBy ¶
type GroupBy struct { *TaskBase // contains filtered or unexported fields }
Group by: Sql Group By Operator
creates a hashable key commposed of key = {each,value,of,column,in,groupby}
A very stupid naive parallel groupby holds values in memory
task -> groupby -->
type GroupByFinal ¶
type GroupByFinal struct { *TaskBase // contains filtered or unexported fields }
Group by: Sql Group By Operator finalizer for partials
func NewGroupByFinal ¶
func NewGroupByFinal(ctx *plan.Context, p *plan.GroupBy) *GroupByFinal
func (*GroupByFinal) Close ¶
func (m *GroupByFinal) Close() error
func (*GroupByFinal) Run ¶
func (m *GroupByFinal) Run() error
type JobExecutor ¶
type JobExecutor struct { Planner plan.Planner Executor Executor RootTask TaskRunner Ctx *plan.Context // contains filtered or unexported fields }
JobExecutor translates a Sql Statement into a Execution DAG of tasks using the Planner, Executor supplied. This package implements default executor and uses the default Planner from plan. This will create a single node dag of Tasks.
func BuildSqlJob ¶
func BuildSqlJob(ctx *plan.Context) (*JobExecutor, error)
func NewExecutor ¶
func NewExecutor(ctx *plan.Context, planner plan.Planner) *JobExecutor
func (*JobExecutor) DrainChan ¶
func (m *JobExecutor) DrainChan() MessageChan
The drain is the last out channel, on last task
func (*JobExecutor) WalkChildren ¶
func (m *JobExecutor) WalkChildren(p plan.Task, root Task) error
WalkChildren walk dag of plan taasks creating execution tasks
func (*JobExecutor) WalkCommand ¶
func (m *JobExecutor) WalkCommand(p *plan.Command) (Task, error)
func (*JobExecutor) WalkDelete ¶
func (m *JobExecutor) WalkDelete(p *plan.Delete) (Task, error)
func (*JobExecutor) WalkGroupBy ¶
func (m *JobExecutor) WalkGroupBy(p *plan.GroupBy) (Task, error)
func (*JobExecutor) WalkHaving ¶
func (m *JobExecutor) WalkHaving(p *plan.Having) (Task, error)
func (*JobExecutor) WalkInsert ¶
func (m *JobExecutor) WalkInsert(p *plan.Insert) (Task, error)
func (*JobExecutor) WalkJoinKey ¶
func (m *JobExecutor) WalkJoinKey(p *plan.JoinKey) (Task, error)
func (*JobExecutor) WalkPlan ¶
func (m *JobExecutor) WalkPlan(p plan.Task) (Task, error)
Main Entry point to take a Plan, and convert into Execution DAG
func (*JobExecutor) WalkPlanAll ¶
func (m *JobExecutor) WalkPlanAll(p plan.Task) (Task, error)
func (*JobExecutor) WalkPlanTask ¶
func (m *JobExecutor) WalkPlanTask(p plan.Task) (Task, error)
func (*JobExecutor) WalkPreparedStatement ¶
func (m *JobExecutor) WalkPreparedStatement(p *plan.PreparedStatement) (Task, error)
func (*JobExecutor) WalkProjection ¶
func (m *JobExecutor) WalkProjection(p *plan.Projection) (Task, error)
func (*JobExecutor) WalkSelect ¶
func (m *JobExecutor) WalkSelect(p *plan.Select) (Task, error)
func (*JobExecutor) WalkSource ¶
func (m *JobExecutor) WalkSource(p *plan.Source) (Task, error)
func (*JobExecutor) WalkSourceExec ¶
func (m *JobExecutor) WalkSourceExec(p *plan.Source) (Task, error)
func (*JobExecutor) WalkUpdate ¶
func (m *JobExecutor) WalkUpdate(p *plan.Update) (Task, error)
func (*JobExecutor) WalkUpsert ¶
func (m *JobExecutor) WalkUpsert(p *plan.Upsert) (Task, error)
type JoinKey ¶
type JoinKey struct { *TaskBase // contains filtered or unexported fields }
Evaluate messages to create JoinKey based message, where the
Join Key (composite of each value in join expr) hashes consistently
func NewJoinKey ¶
A JoinKey task that evaluates the compound JoinKey to allow
for parallelized join's source1 -> JoinKey -> hash-route \ -- join --> / source2 -> JoinKey -> hash-route
type JoinMerge ¶
type JoinMerge struct { *TaskBase // contains filtered or unexported fields }
Scans 2 source tasks for rows, evaluate keys, use for join
func NewJoinNaiveMerge ¶
A very stupid naive parallel join merge, uses Key() as value to merge
two different input channels source1 -> \ -- join --> / source2 ->
Distributed:
source1a -> |-> -- join --> source1b -> key-hash-route |-> -- join --> reduce -> source1n -> |-> -- join --> |-> -- join --> source2a -> |-> -- join --> source2b -> key-hash-route |-> -- join --> source2n -> |-> -- join -->
type MessageChan ¶
type MessageHandler ¶
Handle/Forward a message for this Task
func MakeHandler ¶
func MakeHandler(task TaskRunner) MessageHandler
type Projection ¶
type Projection struct { *TaskBase // contains filtered or unexported fields }
Projection Execution Task
func NewProjection ¶
func NewProjection(ctx *plan.Context, p *plan.Projection) *Projection
In Process projections are used when mapping multiple sources together
and additional columns such as those used in Where, GroupBy etc are used even if they will not be used in Final projection
func NewProjectionFinal ¶
func NewProjectionFinal(ctx *plan.Context, p *plan.Projection) *Projection
Final Projections project final select columns for result-writing
func NewProjectionInProcess ¶
func NewProjectionInProcess(ctx *plan.Context, p *plan.Projection) *Projection
In Process projections are used when mapping multiple sources together
and additional columns such as those used in Where, GroupBy etc are used even if they will not be used in Final projection
func NewProjectionLimit ¶
func NewProjectionLimit(ctx *plan.Context, p *plan.Projection) *Projection
NewProjectionLimit Only provides counting/limit projection
func (*Projection) CloseFinal ¶
func (m *Projection) CloseFinal() error
CloseFinal after exit, cleanup some more
type RequiresContext ¶
Source data sources requires context
type ResultBuffer ¶
type ResultBuffer struct { *TaskBase // contains filtered or unexported fields }
func NewResultBuffer ¶
func NewResultBuffer(ctx *plan.Context, writeTo *[]schema.Message) *ResultBuffer
func (*ResultBuffer) Close ¶
func (m *ResultBuffer) Close() error
func (*ResultBuffer) Copy ¶
func (m *ResultBuffer) Copy() *ResultBuffer
type ResultExecWriter ¶
type ResultExecWriter struct { *TaskBase // contains filtered or unexported fields }
func NewResultExecWriter ¶
func NewResultExecWriter(ctx *plan.Context) *ResultExecWriter
func (*ResultExecWriter) Close ¶
func (m *ResultExecWriter) Close() error
func (*ResultExecWriter) Copy ¶
func (m *ResultExecWriter) Copy() *ResultExecWriter
func (*ResultExecWriter) Result ¶
func (m *ResultExecWriter) Result() driver.Result
type ResultWriter ¶
type ResultWriter struct { *TaskBase // contains filtered or unexported fields }
func NewResultRows ¶
func NewResultRows(ctx *plan.Context, cols []string) *ResultWriter
func NewResultWriter ¶
func NewResultWriter(ctx *plan.Context) *ResultWriter
func (*ResultWriter) Close ¶
func (m *ResultWriter) Close() error
func (*ResultWriter) Columns ¶
func (m *ResultWriter) Columns() []string
func (*ResultWriter) Copy ¶
func (m *ResultWriter) Copy() *ResultWriter
func (*ResultWriter) Next ¶
func (m *ResultWriter) Next(dest []driver.Value) error
Note, this is implementation of the sql/driver Rows() Next() interface
func (*ResultWriter) Run ¶
func (m *ResultWriter) Run() error
For ResultWriter, since we are are not paging through messages
using this mesage channel, instead using Next() as defined by sql/driver we don't read the input channel, just watch stop channels
type Source ¶
type Source struct { *TaskBase Scanner schema.ConnScanner ExecSource ExecutorSource JoinKey KeyEvaluator // contains filtered or unexported fields }
Scan a data source for rows, feed into runner. The source scanner being
a source is iter.Next() messages instead of sending them on input channel 1) table -- FROM table 2) channels -- FROM stream 3) join -- SELECT t1.name, t2.salary FROM employee AS t1 INNER JOIN info AS t2 ON t1.name = t2.name; 4) sub-select -- SELECT * FROM (SELECT 1, 2, 3) AS t1;
func NewSourceScanner ¶
A scanner to read from sub-query data source (join, sub-query, static)
type Task ¶
type Task interface { Run() error Close() error CloseFinal() error Children() []Task // children sub-tasks Add(Task) error // Add a child to this dag }
exec Tasks are inherently DAG's of task's implementing Run(), Close() etc
to allow them to be executeable
type TaskBase ¶
type TaskBase struct { Ctx *plan.Context Name string Handler MessageHandler // contains filtered or unexported fields }
Base executeable task that implements Task interface, embedded into other channel based task runners
func NewTaskBase ¶
func (*TaskBase) CloseFinal ¶
func (*TaskBase) MessageIn ¶
func (m *TaskBase) MessageIn() MessageChan
func (*TaskBase) MessageInSet ¶
func (m *TaskBase) MessageInSet(ch MessageChan)
func (*TaskBase) MessageOut ¶
func (m *TaskBase) MessageOut() MessageChan
func (*TaskBase) MessageOutSet ¶
func (m *TaskBase) MessageOutSet(ch MessageChan)
type TaskParallel ¶
type TaskParallel struct { *TaskBase // contains filtered or unexported fields }
A parallel set of tasks, this starts each child task and offers up
an output channel that is a merger of each child --> \ --> - -> --> /
func NewTaskParallel ¶
func NewTaskParallel(ctx *plan.Context) *TaskParallel
func (*TaskParallel) Add ¶
func (m *TaskParallel) Add(task Task) error
func (*TaskParallel) Children ¶
func (m *TaskParallel) Children() []Task
func (*TaskParallel) Close ¶
func (m *TaskParallel) Close() error
func (*TaskParallel) PrintDag ¶
func (m *TaskParallel) PrintDag(depth int)
func (*TaskParallel) Run ¶
func (m *TaskParallel) Run() error
func (*TaskParallel) Setup ¶
func (m *TaskParallel) Setup(depth int) error
type TaskPrinter ¶
type TaskPrinter interface {
PrintDag(depth int)
}
type TaskRunner ¶
type TaskRunner interface { Task Setup(depth int) error MessageIn() MessageChan MessageOut() MessageChan MessageInSet(MessageChan) MessageOutSet(MessageChan) ErrChan() ErrChan SigChan() SigChan Quit() }
TaskRunner is an interface for a single task in Dag of Tasks necessary to execute a Job - it may have children tasks - it may be parallel, distributed, etc
type TaskSequential ¶
type TaskSequential struct { *TaskBase // contains filtered or unexported fields }
func NewTaskSequential ¶
func NewTaskSequential(ctx *plan.Context) *TaskSequential
func (*TaskSequential) Add ¶
func (m *TaskSequential) Add(task Task) error
func (*TaskSequential) Children ¶
func (m *TaskSequential) Children() []Task
func (*TaskSequential) Close ¶
func (m *TaskSequential) Close() error
func (*TaskSequential) PrintDag ¶
func (m *TaskSequential) PrintDag(depth int)
func (*TaskSequential) Run ¶
func (m *TaskSequential) Run() (err error)
func (*TaskSequential) Setup ¶
func (m *TaskSequential) Setup(depth int) error
type TaskStepper ¶
type TaskStepper struct {
*TaskBase
}
On Task stepper we don't Run it, rather use a
Next() explicit call from end user
func NewTaskStepper ¶
func NewTaskStepper(ctx *plan.Context) *TaskStepper
func (*TaskStepper) Run ¶
func (m *TaskStepper) Run() error
type Upsert ¶
type Upsert struct { *TaskBase // contains filtered or unexported fields }
Upsert task for insert, update, upsert
type Where ¶
type Where struct { *TaskBase // contains filtered or unexported fields }
A filter to implement where clause