physicalplan

package
v0.0.0-...-d0d91b6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 25, 2024 License: Apache-2.0 Imports: 26 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrUnsupportedAndType = errors.New("unsupported type for is and aggregation, expected bool")
View Source
var ErrUnsupportedBinaryOperation = errors.New("unsupported binary operation")
View Source
var ErrUnsupportedBooleanExpression = errors.New("unsupported boolean expression")
View Source
var ErrUnsupportedIsUniqueType = errors.New("unsupported type for is unique aggregation, expected int64")
View Source
var ErrUnsupportedMaxType = errors.New("unsupported type for max aggregation, expected int64 or float64")
View Source
var ErrUnsupportedMinType = errors.New("unsupported type for max aggregation, expected int64 or float64")
View Source
var ErrUnsupportedSumType = errors.New("unsupported type for sum aggregation, expected int64 or float64")

Functions

func AddFloat64s

func AddFloat64s(mem memory.Allocator, left, right *array.Float64) *array.Float64

func AddInt32s

func AddInt32s(mem memory.Allocator, left, right *array.Int32) *array.Int32

func AddInt64s

func AddInt64s(mem memory.Allocator, left, right *array.Int64) *array.Int64

func AddUint64s

func AddUint64s(mem memory.Allocator, left, right *array.Uint64) *array.Uint64

func AndArrays

func AndArrays(pool memory.Allocator, arrs []arrow.Array) arrow.Array

func DivFloat64s

func DivFloat64s(mem memory.Allocator, left, right *array.Float64) *array.Float64

func DivInt32s

func DivInt32s(mem memory.Allocator, left, right *array.Int32) *array.Int32

func DivInt64s

func DivInt64s(mem memory.Allocator, left, right *array.Int64) *array.Int64

func DivUint64s

func DivUint64s(mem memory.Allocator, left, right *array.Uint64) *array.Uint64

func MulFloat64s

func MulFloat64s(mem memory.Allocator, left, right *array.Float64) *array.Float64

func MulInt32s

func MulInt32s(mem memory.Allocator, left, right *array.Int32) *array.Int32

func MulInt64s

func MulInt64s(mem memory.Allocator, left, right *array.Int64) *array.Int64

func MulUint64s

func MulUint64s(mem memory.Allocator, left, right *array.Uint64) *array.Uint64

func SubFloat64s

func SubFloat64s(mem memory.Allocator, left, right *array.Float64) *array.Float64

func SubInt32s

func SubInt32s(mem memory.Allocator, left, right *array.Int32) *array.Int32

func SubInt64s

func SubInt64s(mem memory.Allocator, left, right *array.Int64) *array.Int64

func SubUint64s

func SubUint64s(mem memory.Allocator, left, right *array.Uint64) *array.Uint64

Types

type Aggregation

type Aggregation struct {
	// contains filtered or unexported fields
}

Aggregation groups together some lower level primitives to for the column to be aggregated by its function.

type AggregationFunction

type AggregationFunction interface {
	Aggregate(pool memory.Allocator, arrs []arrow.Array) (arrow.Array, error)
}

type AndAggregation

type AndAggregation struct{}

func (*AndAggregation) Aggregate

func (a *AndAggregation) Aggregate(pool memory.Allocator, arrs []arrow.Array) (arrow.Array, error)

type AndExpr

type AndExpr struct {
	Left  BooleanExpression
	Right BooleanExpression
}

func (*AndExpr) Eval

func (a *AndExpr) Eval(r arrow.Record) (*Bitmap, error)

func (*AndExpr) String

func (a *AndExpr) String() string

type ArrayRef

type ArrayRef struct {
	ColumnName string
}

func (*ArrayRef) ArrowArray

func (a *ArrayRef) ArrowArray(r arrow.Record) (arrow.Array, bool, error)

func (*ArrayRef) String

func (a *ArrayRef) String() string

type ArrayReference

type ArrayReference struct{}

type BinaryScalarExpr

type BinaryScalarExpr struct {
	Left  *ArrayRef
	Op    logicalplan.Op
	Right scalar.Scalar
}

func (BinaryScalarExpr) Eval

func (e BinaryScalarExpr) Eval(r arrow.Record) (*Bitmap, error)

func (BinaryScalarExpr) String

func (e BinaryScalarExpr) String() string

type Bitmap

type Bitmap = roaring.Bitmap

func ArrayScalarCompute

func ArrayScalarCompute(funcName string, left arrow.Array, right scalar.Scalar) (*Bitmap, error)

func ArrayScalarContains

func ArrayScalarContains(arr arrow.Array, right scalar.Scalar, not bool) (*Bitmap, error)

func ArrayScalarRegexMatch

func ArrayScalarRegexMatch(left arrow.Array, right *regexp.Regexp) (*Bitmap, error)

func ArrayScalarRegexNotMatch

func ArrayScalarRegexNotMatch(left arrow.Array, right *regexp.Regexp) (*Bitmap, error)

func BinaryArrayScalarRegexMatch

func BinaryArrayScalarRegexMatch(left *array.Binary, right *regexp.Regexp) (*Bitmap, error)

func BinaryArrayScalarRegexNotMatch

func BinaryArrayScalarRegexNotMatch(left *array.Binary, right *regexp.Regexp) (*Bitmap, error)

func BinaryDictionaryArrayScalarRegexMatch

func BinaryDictionaryArrayScalarRegexMatch(dict *array.Dictionary, left *array.Binary, right *regexp.Regexp) (*Bitmap, error)

func BinaryDictionaryArrayScalarRegexNotMatch

func BinaryDictionaryArrayScalarRegexNotMatch(dict *array.Dictionary, left *array.Binary, right *regexp.Regexp) (*Bitmap, error)

func BinaryScalarOperation

func BinaryScalarOperation(left arrow.Array, right scalar.Scalar, operator logicalplan.Op) (*Bitmap, error)

func DictionaryArrayScalarContains

func DictionaryArrayScalarContains(left *array.Dictionary, right scalar.Scalar, not bool) (*Bitmap, error)

func DictionaryArrayScalarEqual

func DictionaryArrayScalarEqual(left *array.Dictionary, right scalar.Scalar) (*Bitmap, error)

func DictionaryArrayScalarNotEqual

func DictionaryArrayScalarNotEqual(left *array.Dictionary, right scalar.Scalar) (*Bitmap, error)

func NewBitmap

func NewBitmap() *Bitmap

func StringArrayScalarRegexMatch

func StringArrayScalarRegexMatch(left *array.String, right *regexp.Regexp) (*Bitmap, error)

func StringArrayScalarRegexNotMatch

func StringArrayScalarRegexNotMatch(left *array.String, right *regexp.Regexp) (*Bitmap, error)

type BooleanExpression

type BooleanExpression interface {
	Eval(r arrow.Record) (*Bitmap, error)
	String() string
}

type CountAggregation

type CountAggregation struct{}

func (*CountAggregation) Aggregate

func (a *CountAggregation) Aggregate(pool memory.Allocator, arrs []arrow.Array) (arrow.Array, error)

type Diagram

type Diagram struct {
	Details string
	Child   *Diagram
}

func (*Diagram) String

func (d *Diagram) String() string

type Distinction

type Distinction struct {
	// contains filtered or unexported fields
}

func Distinct

func Distinct(pool memory.Allocator, tracer trace.Tracer, columns []logicalplan.Expr) *Distinction

func (*Distinction) Callback

func (d *Distinction) Callback(ctx context.Context, r arrow.Record) error

func (*Distinction) Close

func (d *Distinction) Close()

func (*Distinction) Draw

func (d *Distinction) Draw() *Diagram

func (*Distinction) Finish

func (d *Distinction) Finish(ctx context.Context) error

func (*Distinction) SetNext

func (d *Distinction) SetNext(plan PhysicalPlan)

type HashAggregate

type HashAggregate struct {
	// contains filtered or unexported fields
}

func NewHashAggregate

func NewHashAggregate(
	pool memory.Allocator,
	tracer trace.Tracer,
	aggregations []Aggregation,
	groupByColumnMatchers []logicalplan.Expr,
	seed maphash.Seed,
	finalStage bool,
) *HashAggregate

func (*HashAggregate) Callback

func (a *HashAggregate) Callback(_ context.Context, r arrow.Record) error

func (*HashAggregate) Close

func (a *HashAggregate) Close()

func (*HashAggregate) Draw

func (a *HashAggregate) Draw() *Diagram

func (*HashAggregate) Finish

func (a *HashAggregate) Finish(ctx context.Context) error

func (*HashAggregate) SetNext

func (a *HashAggregate) SetNext(next PhysicalPlan)

type IndexRange

type IndexRange struct {
	Start uint32
	End   uint32
}

type Limiter

type Limiter struct {
	// contains filtered or unexported fields
}

func Limit

func Limit(pool memory.Allocator, tracer trace.Tracer, expr logicalplan.Expr) (*Limiter, error)

func (*Limiter) Callback

func (l *Limiter) Callback(ctx context.Context, r arrow.Record) error

func (*Limiter) Close

func (l *Limiter) Close()

func (*Limiter) Draw

func (l *Limiter) Draw() *Diagram

func (*Limiter) Finish

func (l *Limiter) Finish(ctx context.Context) error

func (*Limiter) SetNext

func (l *Limiter) SetNext(next PhysicalPlan)

type MaxAggregation

type MaxAggregation struct{}

func (*MaxAggregation) Aggregate

func (a *MaxAggregation) Aggregate(pool memory.Allocator, arrs []arrow.Array) (arrow.Array, error)

type MinAggregation

type MinAggregation struct{}

func (*MinAggregation) Aggregate

func (a *MinAggregation) Aggregate(pool memory.Allocator, arrs []arrow.Array) (arrow.Array, error)

type Option

type Option func(o *execOptions)

func WithOrderedAggregations

func WithOrderedAggregations() Option

func WithOverrideInput

func WithOverrideInput(input []PhysicalPlan) Option

WithOverrideInput can be used to provide an input stage on top of which the Build function can build the physical plan.

func WithReadMode

func WithReadMode(m logicalplan.ReadMode) Option

type OrExpr

type OrExpr struct {
	Left  BooleanExpression
	Right BooleanExpression
}

func (*OrExpr) Eval

func (a *OrExpr) Eval(r arrow.Record) (*Bitmap, error)

func (*OrExpr) String

func (a *OrExpr) String() string

type OrderedAggregate

type OrderedAggregate struct {
	// contains filtered or unexported fields
}

OrderedAggregate is an aggregation operator that supports aggregations on streams of data ordered by the group by columns. This is a more efficient aggregation than aggregating by hash since a group can be determined as completed once a different aggregation key is found in the ordered stream. OrderedAggregate also supports partially ordered aggregations. This means aggregating on keys that arrive in ordered sets of data that are not mutually exclusive. For example consider the group by columns: a, a, b, c, a, b, c. The OrderedAggregate will perform the aggregation on the first ordered set a, a, b, c and another one on the second a, b, c. The result of both aggregations is merged. Specifically, if the example is pushed to Callback in two records (a, a, b, c) followed by (a, b, c), and assuming that the aggregation values for each row are 1 for simplicity and we're using a sum aggregation, after the first call to Callback the OrderedAggregate will store [a, b, c], [2, 1, 1] but not emit anything. When the second record is pushed, the OrderedAggregate will realize that the first value in the new record (a) sorts before the "current group" (c), so will store the aggregation results of the second record as another ordered group [a, b, c], [1, 1, 1]. Only when Finish is called, will the OrderedAggregate be able to emit the merged aggregation results. The merged results should be: [a, b, c], [3, 2, 2].

func NewOrderedAggregate

func NewOrderedAggregate(
	pool memory.Allocator,
	tracer trace.Tracer,
	aggregation Aggregation,
	groupByColumnMatchers []logicalplan.Expr,
	finalStage bool,
) *OrderedAggregate

func (*OrderedAggregate) Callback

func (a *OrderedAggregate) Callback(_ context.Context, r arrow.Record) error

func (*OrderedAggregate) Close

func (a *OrderedAggregate) Close()

func (*OrderedAggregate) Draw

func (a *OrderedAggregate) Draw() *Diagram

func (*OrderedAggregate) Finish

func (a *OrderedAggregate) Finish(ctx context.Context) error

func (*OrderedAggregate) SetNext

func (a *OrderedAggregate) SetNext(next PhysicalPlan)

type OrderedSynchronizer

type OrderedSynchronizer struct {
	// contains filtered or unexported fields
}

OrderedSynchronizer implements synchronizing ordered input from multiple goroutines. The strategy used is that any input that calls Callback must wait for all the other inputs to call Callback, since an ordered result cannot be produced until all inputs have pushed data. Another strategy would be to store the pushed records, but that requires fully copying all the data for safety.

func NewOrderedSynchronizer

func NewOrderedSynchronizer(pool memory.Allocator, inputs int, orderByExprs []logicalplan.Expr) *OrderedSynchronizer

func (*OrderedSynchronizer) Callback

func (o *OrderedSynchronizer) Callback(ctx context.Context, r arrow.Record) error

func (*OrderedSynchronizer) Close

func (o *OrderedSynchronizer) Close()

func (*OrderedSynchronizer) Draw

func (o *OrderedSynchronizer) Draw() *Diagram

func (*OrderedSynchronizer) Finish

func (o *OrderedSynchronizer) Finish(ctx context.Context) error

func (*OrderedSynchronizer) SetNext

func (o *OrderedSynchronizer) SetNext(next PhysicalPlan)

type OutputPlan

type OutputPlan struct {
	// contains filtered or unexported fields
}

func Build

func Build(
	ctx context.Context,
	pool memory.Allocator,
	tracer trace.Tracer,
	s *dynparquet.Schema,
	plan *logicalplan.LogicalPlan,
	options ...Option,
) (*OutputPlan, error)

func (*OutputPlan) Callback

func (e *OutputPlan) Callback(ctx context.Context, r arrow.Record) error

func (*OutputPlan) Close

func (e *OutputPlan) Close()

func (*OutputPlan) Draw

func (e *OutputPlan) Draw() *Diagram

func (*OutputPlan) DrawString

func (e *OutputPlan) DrawString() string

func (*OutputPlan) Execute

func (e *OutputPlan) Execute(ctx context.Context, pool memory.Allocator, callback func(ctx context.Context, r arrow.Record) error) error

func (*OutputPlan) Finish

func (e *OutputPlan) Finish(_ context.Context) error

func (*OutputPlan) SetNext

func (e *OutputPlan) SetNext(_ PhysicalPlan)

func (*OutputPlan) SetNextCallback

func (e *OutputPlan) SetNextCallback(next func(ctx context.Context, r arrow.Record) error)

type PhysicalPlan

type PhysicalPlan interface {
	Callback(ctx context.Context, r arrow.Record) error
	Finish(ctx context.Context) error
	SetNext(next PhysicalPlan)
	Draw() *Diagram
	Close()
}

func Aggregate

func Aggregate(
	pool memory.Allocator,
	tracer trace.Tracer,
	agg *logicalplan.Aggregation,
	final bool,
	ordered bool,
	seed maphash.Seed,
) (PhysicalPlan, error)

type PostPlanVisitorFunc

type PostPlanVisitorFunc func(plan *logicalplan.LogicalPlan) bool

func (PostPlanVisitorFunc) PostVisit

func (f PostPlanVisitorFunc) PostVisit(plan *logicalplan.LogicalPlan) bool

func (PostPlanVisitorFunc) PreVisit

type PreExprVisitorFunc

type PreExprVisitorFunc func(expr logicalplan.Expr) bool

func (PreExprVisitorFunc) PostVisit

func (f PreExprVisitorFunc) PostVisit(_ logicalplan.Expr) bool

func (PreExprVisitorFunc) PreVisit

func (f PreExprVisitorFunc) PreVisit(expr logicalplan.Expr) bool

func (PreExprVisitorFunc) Visit

type PrePlanVisitorFunc

type PrePlanVisitorFunc func(plan *logicalplan.LogicalPlan) bool

func (PrePlanVisitorFunc) PostVisit

func (PrePlanVisitorFunc) PreVisit

func (f PrePlanVisitorFunc) PreVisit(plan *logicalplan.LogicalPlan) bool

type PredicateFilter

type PredicateFilter struct {
	// contains filtered or unexported fields
}

func Filter

func Filter(pool memory.Allocator, tracer trace.Tracer, filterExpr logicalplan.Expr) (*PredicateFilter, error)

func (*PredicateFilter) Callback

func (f *PredicateFilter) Callback(ctx context.Context, r arrow.Record) error

func (*PredicateFilter) Close

func (f *PredicateFilter) Close()

func (*PredicateFilter) Draw

func (f *PredicateFilter) Draw() *Diagram

func (*PredicateFilter) Finish

func (f *PredicateFilter) Finish(ctx context.Context) error

func (*PredicateFilter) SetNext

func (f *PredicateFilter) SetNext(next PhysicalPlan)

type Projection

type Projection struct {
	// contains filtered or unexported fields
}

func Project

func Project(mem memory.Allocator, tracer trace.Tracer, exprs []logicalplan.Expr) (*Projection, error)

func (*Projection) Callback

func (p *Projection) Callback(ctx context.Context, r arrow.Record) error

func (*Projection) Close

func (p *Projection) Close()

func (*Projection) Draw

func (p *Projection) Draw() *Diagram

func (*Projection) Finish

func (p *Projection) Finish(ctx context.Context) error

func (*Projection) Project

func (p *Projection) Project(_ context.Context, r arrow.Record) (arrow.Record, error)

func (*Projection) SetNext

func (p *Projection) SetNext(next PhysicalPlan)

type RegExpFilter

type RegExpFilter struct {
	// contains filtered or unexported fields
}

func (*RegExpFilter) Eval

func (f *RegExpFilter) Eval(r arrow.Record) (*Bitmap, error)

func (*RegExpFilter) String

func (f *RegExpFilter) String() string

type ScanPhysicalPlan

type ScanPhysicalPlan interface {
	Execute(ctx context.Context, pool memory.Allocator) error
	Draw() *Diagram
}

type SchemaScan

type SchemaScan struct {
	// contains filtered or unexported fields
}

func (*SchemaScan) Draw

func (s *SchemaScan) Draw() *Diagram

func (*SchemaScan) Execute

func (s *SchemaScan) Execute(ctx context.Context, pool memory.Allocator) error

type SumAggregation

type SumAggregation struct{}

func (*SumAggregation) Aggregate

func (a *SumAggregation) Aggregate(pool memory.Allocator, arrs []arrow.Array) (arrow.Array, error)

type Synchronizer

type Synchronizer struct {
	// contains filtered or unexported fields
}

Synchronizer is used to combine the results of multiple parallel streams into a single stream concurrent stream. It also forms a barrier on the finishers, by waiting to call next plan's finish until all previous parallel stages have finished.

func Synchronize

func Synchronize(concurrency int) *Synchronizer

func (*Synchronizer) Callback

func (m *Synchronizer) Callback(ctx context.Context, r arrow.Record) error

func (*Synchronizer) Close

func (m *Synchronizer) Close()

func (*Synchronizer) Draw

func (m *Synchronizer) Draw() *Diagram

func (*Synchronizer) Finish

func (m *Synchronizer) Finish(ctx context.Context) error

func (*Synchronizer) SetNext

func (m *Synchronizer) SetNext(next PhysicalPlan)

func (*Synchronizer) SetNextPlan

func (m *Synchronizer) SetNextPlan(nextPlan PhysicalPlan)

type TableScan

type TableScan struct {
	// contains filtered or unexported fields
}

func (*TableScan) Draw

func (s *TableScan) Draw() *Diagram

func (*TableScan) Execute

func (s *TableScan) Execute(ctx context.Context, pool memory.Allocator) error

type UniqueAggregation

type UniqueAggregation struct{}

func (*UniqueAggregation) Aggregate

func (a *UniqueAggregation) Aggregate(pool memory.Allocator, arrs []arrow.Array) (arrow.Array, error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL