xpipeline

package
v0.7.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 6, 2014 License: Apache-2.0 Imports: 13 Imported by: 6

Documentation

Index

Constants

View Source
const CHANNEL = "PIPELINE"
View Source
const DEBUG_DUP_CHANNEL = "OP_DUP"
View Source
const DEBUG_PROJECT_CHANNEL = "OP_PROJECT"
View Source
const FETCH_BATCH_SIZE = 1000

Variables

This section is empty.

Functions

This section is empty.

Types

type BaseOperator

type BaseOperator struct {
	Source Operator
	// contains filtered or unexported fields
}

func NewBaseOperator

func NewBaseOperator() *BaseOperator

func (*BaseOperator) Evaluate added in v0.6.0

func (this *BaseOperator) Evaluate(e ast.Expression, item *dparval.Value) (*dparval.Value, error)

func (*BaseOperator) GetChannels

func (*BaseOperator) RecoverPanic

func (this *BaseOperator) RecoverPanic()

func (*BaseOperator) RunOperator

func (this *BaseOperator) RunOperator(oper Operator, stopChannel misc.StopChannel)

func (*BaseOperator) SendError

func (this *BaseOperator) SendError(err query.Error) bool

func (*BaseOperator) SendItem

func (this *BaseOperator) SendItem(item *dparval.Value) bool

func (*BaseOperator) SendOther

func (this *BaseOperator) SendOther(obj interface{}) bool

func (*BaseOperator) SetQuery added in v0.6.0

func (this *BaseOperator) SetQuery(q network.Query)

func (*BaseOperator) SetSource

func (this *BaseOperator) SetSource(source Operator)

type CreateIndex

type CreateIndex struct {
	// contains filtered or unexported fields
}

func NewCreateIndex

func NewCreateIndex(bucket catalog.Bucket, name string, index_type string, primary bool, on ast.ExpressionList) *CreateIndex

func (*CreateIndex) GetChannels

func (this *CreateIndex) GetChannels() (dparval.ValueChannel, PipelineSupportChannel)

func (*CreateIndex) RecoverPanic

func (this *CreateIndex) RecoverPanic()

func (*CreateIndex) Run

func (this *CreateIndex) Run(stopChannel misc.StopChannel)

func (*CreateIndex) SendError

func (this *CreateIndex) SendError(err query.Error) bool

func (*CreateIndex) SendItem

func (this *CreateIndex) SendItem(item *dparval.Value) bool

func (*CreateIndex) SetQuery added in v0.6.0

func (this *CreateIndex) SetQuery(q network.Query)

func (*CreateIndex) SetSource

func (this *CreateIndex) SetSource(source Operator)

type DropIndex

type DropIndex struct {
	// contains filtered or unexported fields
}

func NewDropIndex

func NewDropIndex(index catalog.Index) *DropIndex

func (*DropIndex) GetChannels

func (this *DropIndex) GetChannels() (dparval.ValueChannel, PipelineSupportChannel)

func (*DropIndex) RecoverPanic

func (this *DropIndex) RecoverPanic()

func (*DropIndex) Run

func (this *DropIndex) Run(stopChannel misc.StopChannel)

func (*DropIndex) SendError

func (this *DropIndex) SendError(err query.Error) bool

func (*DropIndex) SendItem

func (this *DropIndex) SendItem(item *dparval.Value) bool

func (*DropIndex) SetQuery added in v0.6.0

func (this *DropIndex) SetQuery(q network.Query)

func (*DropIndex) SetSource

func (this *DropIndex) SetSource(source Operator)

type EliminateDuplicates

type EliminateDuplicates struct {
	Base *BaseOperator
	// contains filtered or unexported fields
}

func NewEliminateDuplicates

func NewEliminateDuplicates() *EliminateDuplicates

func (*EliminateDuplicates) GetChannels

func (*EliminateDuplicates) Run

func (this *EliminateDuplicates) Run(stopChannel misc.StopChannel)

func (*EliminateDuplicates) SetQuery added in v0.6.0

func (this *EliminateDuplicates) SetQuery(q network.Query)

func (*EliminateDuplicates) SetSource

func (this *EliminateDuplicates) SetSource(source Operator)

type ExecutablePipeline

type ExecutablePipeline struct {
	Root Operator
}

type Explain

type Explain struct {
	Plan plan.PlanElement
	// contains filtered or unexported fields
}

func NewExplain

func NewExplain(plan plan.PlanElement) *Explain

func (*Explain) GetChannels

func (this *Explain) GetChannels() (dparval.ValueChannel, PipelineSupportChannel)

func (*Explain) RecoverPanic

func (this *Explain) RecoverPanic()

func (*Explain) Run

func (this *Explain) Run(stopChannel misc.StopChannel)

func (*Explain) SendError

func (this *Explain) SendError(err query.Error) bool

func (*Explain) SendItem

func (this *Explain) SendItem(item *dparval.Value) bool

func (*Explain) SetQuery added in v0.6.0

func (this *Explain) SetQuery(q network.Query)

func (*Explain) SetSource

func (this *Explain) SetSource(source Operator)

type FastCount added in v0.6.0

type FastCount struct {
	// contains filtered or unexported fields
}

func NewFastCount added in v0.6.0

func NewFastCount(bucket catalog.Bucket, index catalog.CountIndex, expr ast.Expression, ranges plan.ScanRanges) *FastCount

func (*FastCount) GetChannels added in v0.6.0

func (this *FastCount) GetChannels() (dparval.ValueChannel, PipelineSupportChannel)

func (*FastCount) RecoverPanic added in v0.6.0

func (this *FastCount) RecoverPanic()

func (*FastCount) Run added in v0.6.0

func (this *FastCount) Run(stopChannel misc.StopChannel)

func (*FastCount) SendError added in v0.6.0

func (this *FastCount) SendError(err query.Error) bool

func (*FastCount) SendItem added in v0.6.0

func (this *FastCount) SendItem(item *dparval.Value) bool

func (*FastCount) SetQuery added in v0.6.0

func (this *FastCount) SetQuery(q network.Query)

func (*FastCount) SetSource added in v0.6.0

func (this *FastCount) SetSource(source Operator)

type Fetch

type Fetch struct {
	Base *BaseOperator
	// contains filtered or unexported fields
}

func NewFetch

func NewFetch(bucket catalog.Bucket, projection ast.Expression, as string) *Fetch

func (*Fetch) GetChannels

func (this *Fetch) GetChannels() (dparval.ValueChannel, PipelineSupportChannel)

func (*Fetch) Run

func (this *Fetch) Run(stopChannel misc.StopChannel)

func (*Fetch) SetIds added in v0.6.0

func (this *Fetch) SetIds(ids []string)

func (*Fetch) SetQuery added in v0.6.0

func (this *Fetch) SetQuery(q network.Query)

func (*Fetch) SetSource

func (this *Fetch) SetSource(source Operator)

type Filter

type Filter struct {
	Base *BaseOperator
	Expr ast.Expression
}

func NewFilter

func NewFilter(expr ast.Expression) *Filter

func (*Filter) GetChannels

func (this *Filter) GetChannels() (dparval.ValueChannel, PipelineSupportChannel)

func (*Filter) Run

func (this *Filter) Run(stopChannel misc.StopChannel)

func (*Filter) SetQuery added in v0.6.0

func (this *Filter) SetQuery(q network.Query)

func (*Filter) SetSource

func (this *Filter) SetSource(source Operator)

type Grouper

type Grouper struct {
	Base    *BaseOperator
	GroupBy ast.ExpressionList

	Aggregates ast.ExpressionList
	// contains filtered or unexported fields
}

func NewGrouper

func NewGrouper(groupBy ast.ExpressionList, aggs ast.ExpressionList) *Grouper

func (*Grouper) GetChannels

func (this *Grouper) GetChannels() (dparval.ValueChannel, PipelineSupportChannel)

func (*Grouper) Run

func (this *Grouper) Run(stopChannel misc.StopChannel)

func (*Grouper) SetQuery added in v0.6.0

func (this *Grouper) SetQuery(q network.Query)

func (*Grouper) SetSource

func (this *Grouper) SetSource(source Operator)

type KeyJoin added in v0.7.0

type KeyJoin struct {
	Base *BaseOperator

	Projection ast.Expression
	Keys       ast.KeyExpression
	Type       string
	As         string
	// contains filtered or unexported fields
}

func NewKeyJoin added in v0.7.0

func NewKeyJoin(bucket catalog.Bucket, projection ast.Expression, Type string, keys ast.KeyExpression, as string) *KeyJoin

func (*KeyJoin) GetChannels added in v0.7.0

func (this *KeyJoin) GetChannels() (dparval.ValueChannel, PipelineSupportChannel)

func (*KeyJoin) Run added in v0.7.0

func (this *KeyJoin) Run(stopChannel misc.StopChannel)

func (*KeyJoin) SetQuery added in v0.7.0

func (this *KeyJoin) SetQuery(q network.Query)

func (*KeyJoin) SetSource added in v0.7.0

func (this *KeyJoin) SetSource(source Operator)

type KeyNest added in v0.7.0

type KeyNest struct {
	Base *BaseOperator

	Projection ast.Expression
	Keys       ast.KeyExpression
	Right      []interface{}
	Type       string
	As         string
	// contains filtered or unexported fields
}

func NewKeyNest added in v0.7.0

func NewKeyNest(bucket catalog.Bucket, projection ast.Expression, Type string, keys ast.KeyExpression, as string) *KeyNest

func (*KeyNest) GetChannels added in v0.7.0

func (this *KeyNest) GetChannels() (dparval.ValueChannel, PipelineSupportChannel)

func (*KeyNest) Run added in v0.7.0

func (this *KeyNest) Run(stopChannel misc.StopChannel)

func (*KeyNest) SetQuery added in v0.7.0

func (this *KeyNest) SetQuery(q network.Query)

func (*KeyNest) SetSource added in v0.7.0

func (this *KeyNest) SetSource(source Operator)

type KeyScan added in v0.7.0

type KeyScan struct {
	// contains filtered or unexported fields
}

func NewKeyScan added in v0.7.0

func NewKeyScan(keylist []string) *KeyScan

func (*KeyScan) GetChannels added in v0.7.0

func (this *KeyScan) GetChannels() (dparval.ValueChannel, PipelineSupportChannel)

func (*KeyScan) RecoverPanic added in v0.7.0

func (this *KeyScan) RecoverPanic()

func (*KeyScan) Run added in v0.7.0

func (this *KeyScan) Run(stopChannel misc.StopChannel)

func (*KeyScan) SendError added in v0.7.0

func (this *KeyScan) SendError(err query.Error) bool

func (*KeyScan) SendItem added in v0.7.0

func (this *KeyScan) SendItem(item *dparval.Value) bool

func (*KeyScan) SetQuery added in v0.7.0

func (this *KeyScan) SetQuery(q network.Query)

func (*KeyScan) SetSource added in v0.7.0

func (this *KeyScan) SetSource(source Operator)

type Limit

type Limit struct {
	Base  *BaseOperator
	Limit int
	// contains filtered or unexported fields
}

func NewLimit

func NewLimit(limit int) *Limit

func (*Limit) GetChannels

func (this *Limit) GetChannels() (dparval.ValueChannel, PipelineSupportChannel)

func (*Limit) Run

func (this *Limit) Run(stopChannel misc.StopChannel)

func (*Limit) SetQuery added in v0.6.0

func (this *Limit) SetQuery(q network.Query)

func (*Limit) SetSource

func (this *Limit) SetSource(source Operator)

type Offset

type Offset struct {
	Base   *BaseOperator
	Offset int
	// contains filtered or unexported fields
}

func NewOffset

func NewOffset(offset int) *Offset

func (*Offset) GetChannels

func (this *Offset) GetChannels() (dparval.ValueChannel, PipelineSupportChannel)

func (*Offset) Run

func (this *Offset) Run(stopChannel misc.StopChannel)

func (*Offset) SetQuery added in v0.6.0

func (this *Offset) SetQuery(q network.Query)

func (*Offset) SetSource

func (this *Offset) SetSource(source Operator)

type Operator

type Operator interface {
	SetSource(Operator)
	GetChannels() (dparval.ValueChannel, PipelineSupportChannel)
	Run(misc.StopChannel)

	SetQuery(q network.Query)
	// contains filtered or unexported methods
}

type Order

type Order struct {
	Base    *BaseOperator
	OrderBy []*ast.SortExpression
	// contains filtered or unexported fields
}

func NewOrder

func NewOrder(orderBy []*ast.SortExpression, explicitAliases []string) *Order

func (*Order) GetChannels

func (this *Order) GetChannels() (dparval.ValueChannel, PipelineSupportChannel)

func (*Order) Len

func (this *Order) Len() int

func (*Order) Less

func (this *Order) Less(i, j int) bool

func (*Order) Run

func (this *Order) Run(stopChannel misc.StopChannel)

func (*Order) SetQuery added in v0.6.0

func (this *Order) SetQuery(q network.Query)

func (*Order) SetSource

func (this *Order) SetSource(source Operator)

func (*Order) Swap

func (this *Order) Swap(i, j int)

type PipelineSupportChannel

type PipelineSupportChannel chan interface{}

type Project

type Project struct {
	Base   *BaseOperator
	Result ast.ResultExpressionList
	// contains filtered or unexported fields
}

func NewProject

func NewProject(result ast.ResultExpressionList, projectEmpty bool) *Project

func (*Project) GetChannels

func (this *Project) GetChannels() (dparval.ValueChannel, PipelineSupportChannel)

func (*Project) Run

func (this *Project) Run(stopChannel misc.StopChannel)

func (*Project) SetQuery added in v0.6.0

func (this *Project) SetQuery(q network.Query)

func (*Project) SetSource

func (this *Project) SetSource(source Operator)

type Scan

type Scan struct {
	// contains filtered or unexported fields
}

func NewScan

func NewScan(bucket catalog.Bucket, index catalog.ScanIndex, ranges plan.ScanRanges, as string) *Scan

func (*Scan) GetChannels

func (this *Scan) GetChannels() (dparval.ValueChannel, PipelineSupportChannel)

func (*Scan) RecoverPanic

func (this *Scan) RecoverPanic()

func (*Scan) Run

func (this *Scan) Run(stopChannel misc.StopChannel)

func (*Scan) SendError

func (this *Scan) SendError(err query.Error) bool

func (*Scan) SendItem

func (this *Scan) SendItem(item *dparval.Value) bool

func (*Scan) SetQuery added in v0.6.0

func (this *Scan) SetQuery(q network.Query)

func (*Scan) SetSource

func (this *Scan) SetSource(source Operator)

type Unnest added in v0.7.0

type Unnest struct {
	Base *BaseOperator
	Over ast.Expression
	Type string
	As   string
}

func NewUnnest added in v0.7.0

func NewUnnest(over ast.Expression, jointype string, as string) *Unnest

func (*Unnest) GetChannels added in v0.7.0

func (this *Unnest) GetChannels() (dparval.ValueChannel, PipelineSupportChannel)

func (*Unnest) Run added in v0.7.0

func (this *Unnest) Run(stopChannel misc.StopChannel)

func (*Unnest) SetQuery added in v0.7.0

func (this *Unnest) SetQuery(q network.Query)

func (*Unnest) SetSource added in v0.7.0

func (this *Unnest) SetSource(source Operator)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL