Documentation ¶
Overview ¶
Package flow contains data structure for computation. Mostly Dataset operations such as Map/Reduce/Join/Sort etc.
Index ¶
- Variables
- type DasetsetHint
- type DasetsetMetadata
- type DasetsetShardMetadata
- type Dataset
- func (d *Dataset) Broadcast(shardCount int) *Dataset
- func (d *Dataset) CoGroup(other *Dataset, sortOptions ...*SortOption) *Dataset
- func (this *Dataset) CoGroupPartitionedSorted(that *Dataset, indexes []int) (ret *Dataset)
- func (d *Dataset) Distinct(sortOptions ...*SortOption) *Dataset
- func (d *Dataset) Do(fn func(*Dataset) *Dataset) *Dataset
- func (d *Dataset) DoJoin(other *Dataset, leftOuter, rightOuter bool, sortOptions ...*SortOption) *Dataset
- func (d *Dataset) Filter(code string) *Dataset
- func (d *Dataset) FlatMap(code string) *Dataset
- func (d *Dataset) ForEach(code string) *Dataset
- func (d *Dataset) Fprintf(writer io.Writer, format string) *Dataset
- func (d *Dataset) Fprintlnf(writer io.Writer, format string) *Dataset
- func (d *Dataset) GetIsOnDiskIO() bool
- func (d *Dataset) GetPartitionSize() int64
- func (d *Dataset) GetShards() []*DatasetShard
- func (d *Dataset) GetTotalSize() int64
- func (d *Dataset) GroupBy(sortOptions ...*SortOption) *Dataset
- func (bigger *Dataset) HashJoin(smaller *Dataset, sortOptions ...*SortOption) *Dataset
- func (d *Dataset) Hint(options ...DasetsetHint) *Dataset
- func (d *Dataset) Init(scriptPart string) *Dataset
- func (d *Dataset) Join(other *Dataset, sortOptions ...*SortOption) *Dataset
- func (this *Dataset) JoinPartitionedSorted(that *Dataset, sortOption *SortOption, isLeftOuterJoin, isRightOuterJoin bool) *Dataset
- func (d *Dataset) LeftOuterJoin(other *Dataset, sortOptions ...*SortOption) *Dataset
- func (d *Dataset) LocalDistinct(sortOptions ...*SortOption) *Dataset
- func (d *Dataset) LocalGroupBy(sortOptions ...*SortOption) *Dataset
- func (this *Dataset) LocalHashAndJoinWith(that *Dataset, sortOptions ...*SortOption) *Dataset
- func (d *Dataset) LocalLimit(n int, offset int) *Dataset
- func (d *Dataset) LocalReduce(code string) *Dataset
- func (d *Dataset) LocalReduceBy(code string, sortOptions ...*SortOption) *Dataset
- func (d *Dataset) LocalReducerBy(reducerId gio.ReducerId, sortOptions ...*SortOption) *Dataset
- func (d *Dataset) LocalSort(sortOptions ...*SortOption) *Dataset
- func (d *Dataset) LocalTop(n int, sortOptions ...*SortOption) *Dataset
- func (d *Dataset) Map(code string) *Dataset
- func (d *Dataset) Mapper(mapperId gio.MapperId) *Dataset
- func (d *Dataset) MergeSortedTo(partitionCount int, sortOptions ...*SortOption) (ret *Dataset)
- func (d *Dataset) MergeTo(partitionCount int) (ret *Dataset)
- func (d *Dataset) OnDisk(fn func(*Dataset) *Dataset) *Dataset
- func (d *Dataset) Output(f func(io.Reader) error) *Dataset
- func (d *Dataset) Partition(shard int, sortOptions ...*SortOption) *Dataset
- func (d *Dataset) Pipe(code string) *Dataset
- func (d *Dataset) PipeAsArgs(code string) *Dataset
- func (d *Dataset) PipeOut(writer io.Writer) *Dataset
- func (d *Dataset) Printf(format string) *Dataset
- func (d *Dataset) Printlnf(format string) *Dataset
- func (d *Dataset) Reduce(code string) (ret *Dataset)
- func (d *Dataset) ReduceBy(code string, sortOptions ...*SortOption) (ret *Dataset)
- func (d *Dataset) ReducerBy(reducerId gio.ReducerId, sortOptions ...*SortOption) (ret *Dataset)
- func (d *Dataset) RightOuterJoin(other *Dataset, sortOptions ...*SortOption) *Dataset
- func (d *Dataset) RoundRobin(shard int) *Dataset
- func (d *Dataset) Run(option ...FlowOption)
- func (d *Dataset) RunContext(ctx context.Context, option ...FlowOption)
- func (d *Dataset) SaveFirstRowTo(decodedObjects ...interface{}) *Dataset
- func (d *Dataset) Script(scriptType string) *Dataset
- func (d *Dataset) Select(sortOptions ...*SortOption) *Dataset
- func (d *Dataset) Sort(sortOptions ...*SortOption) *Dataset
- func (d *Dataset) Top(k int, sortOptions ...*SortOption) *Dataset
- func (d *Dataset) TreeMergeSortedTo(partitionCount int, factor int, sortOptions ...*SortOption) (ret *Dataset)
- type DatasetShard
- type DatasetShardStatus
- type Flow
- func (f *Flow) AddAllToOneStep(input *Dataset, output *Dataset) (step *Step)
- func (f *Flow) AddLinkedNToOneStep(input *Dataset, m int, output *Dataset) (step *Step)
- func (f *Flow) AddOneToAllStep(input *Dataset, output *Dataset) (step *Step)
- func (f *Flow) AddOneToEveryNStep(input *Dataset, n int, output *Dataset) (step *Step)
- func (f *Flow) AddOneToOneStep(input *Dataset, output *Dataset) (step *Step)
- func (fc *Flow) Bytes(slice [][]byte) (ret *Dataset)
- func (fc *Flow) Channel(ch chan interface{}) (ret *Dataset)
- func (d *Flow) Hint(options ...FlowHintOption)
- func (fc *Flow) Init(scriptPart string) *Flow
- func (fc *Flow) Ints(numbers []int) (ret *Dataset)
- func (fc *Flow) Listen(network, address string) (ret *Dataset)
- func (f *Flow) MergeDatasets1ShardTo1Step(inputs []*Dataset, output *Dataset) (step *Step)
- func (fc *Flow) NewStep() (step *Step)
- func (fc *Flow) OnInterrupt()
- func (fc *Flow) Read(s Sourcer) (ret *Dataset)
- func (fc *Flow) ReadTsv(reader io.Reader) (ret *Dataset)
- func (fc *Flow) Run(options ...FlowOption)
- func (fc *Flow) RunContext(ctx context.Context, options ...FlowOption)
- func (fc *Flow) Script(scriptType string) *Flow
- func (fc *Flow) Slices(slices [][]interface{}) (ret *Dataset)
- func (fc *Flow) Source(f func(io.Writer) error) (ret *Dataset)
- func (fc *Flow) Strings(lines []string) (ret *Dataset)
- func (fc *Flow) TextFile(fname string) (ret *Dataset)
- type FlowConfig
- type FlowHintOption
- type FlowOption
- type FlowRunner
- type ModeIO
- type NetworkType
- type RunLocked
- type SortOption
- type Sourcer
- type Step
- type StepMetadata
- type Task
Constants ¶
This section is empty.
Variables ¶
var (
Local *localDriver
)
Functions ¶
This section is empty.
Types ¶
type DasetsetHint ¶
type DasetsetHint func(d *Dataset)
func PartitionSize ¶
func PartitionSize(n int64) DasetsetHint
PartitionSize hints the partition size in MB. This is usually used when sorting is needed.
func TotalSize ¶
func TotalSize(n int64) DasetsetHint
TotalSize hints the total size in MB for all the partitions. This is usually used when sorting is needed.
type DasetsetMetadata ¶
type DasetsetShardMetadata ¶
type Dataset ¶
type Dataset struct { Flow *Flow Id int Shards []*DatasetShard Step *Step ReadingSteps []*Step IsPartitionedBy []int IsLocalSorted []instruction.OrderBy Meta *DasetsetMetadata RunLocked }
func (*Dataset) CoGroup ¶
func (d *Dataset) CoGroup(other *Dataset, sortOptions ...*SortOption) *Dataset
CoGroup joins two datasets by the key, Each result row becomes this format:
(key, []left_rows, []right_rows)
func (*Dataset) CoGroupPartitionedSorted ¶
CoGroupPartitionedSorted joins 2 datasets that are sharded by the same key and already locally sorted within each shard.
func (*Dataset) Distinct ¶
func (d *Dataset) Distinct(sortOptions ...*SortOption) *Dataset
Distinct sort on specific fields and pick the unique ones. Required Memory: about same size as each partition. example usage: Distinct(Field(1,2)) means distinct on field 1 and 2. TODO: optimize for low cardinality case.
func (*Dataset) Do ¶
Do accepts a function to transform a dataset into a new dataset. This allows custom complicated pre-built logic.
func (*Dataset) DoJoin ¶
func (d *Dataset) DoJoin(other *Dataset, leftOuter, rightOuter bool, sortOptions ...*SortOption) *Dataset
func (*Dataset) Filter ¶
Filter conditionally filter some rows into the next dataset. The code should be a function just returning a boolean result.
func (*Dataset) ForEach ¶
ForEach operates on each row, but the results are not collected. This is used to create some side effects.
func (*Dataset) GetIsOnDiskIO ¶
GetIsOnDiskIO returns true if the dataset is persisted to disk in distributed mode.
func (*Dataset) GetPartitionSize ¶
GetPartitionSize returns the size in MB for each partition of the dataset. This is based on the hinted total size divided by the number of partitions.
func (*Dataset) GetShards ¶
func (d *Dataset) GetShards() []*DatasetShard
func (*Dataset) GetTotalSize ¶
GetTotalSize returns the total size in MB for the dataset. This is based on the given hint.
func (*Dataset) GroupBy ¶
func (d *Dataset) GroupBy(sortOptions ...*SortOption) *Dataset
GroupBy e.g. GroupBy(Field(1,2,3)) group data by field 1,2,3
func (*Dataset) HashJoin ¶
func (bigger *Dataset) HashJoin(smaller *Dataset, sortOptions ...*SortOption) *Dataset
HashJoin joins two datasets by putting the smaller dataset in memory on all executors and streams through the bigger dataset.
func (*Dataset) Hint ¶
func (d *Dataset) Hint(options ...DasetsetHint) *Dataset
Hint adds options for previous dataset.
func (*Dataset) Join ¶
func (d *Dataset) Join(other *Dataset, sortOptions ...*SortOption) *Dataset
Join joins two datasets by the key.
func (*Dataset) JoinPartitionedSorted ¶
func (this *Dataset) JoinPartitionedSorted(that *Dataset, sortOption *SortOption, isLeftOuterJoin, isRightOuterJoin bool) *Dataset
JoinPartitionedSorted Join multiple datasets that are sharded by the same key, and locally sorted within the shard
func (*Dataset) LeftOuterJoin ¶
func (d *Dataset) LeftOuterJoin(other *Dataset, sortOptions ...*SortOption) *Dataset
func (*Dataset) LocalDistinct ¶
func (d *Dataset) LocalDistinct(sortOptions ...*SortOption) *Dataset
func (*Dataset) LocalGroupBy ¶
func (d *Dataset) LocalGroupBy(sortOptions ...*SortOption) *Dataset
func (*Dataset) LocalHashAndJoinWith ¶
func (this *Dataset) LocalHashAndJoinWith(that *Dataset, sortOptions ...*SortOption) *Dataset
func (*Dataset) LocalLimit ¶
LocalLimit take the local first n rows and skip all other rows.
func (*Dataset) LocalReduce ¶
func (*Dataset) LocalReduceBy ¶
func (d *Dataset) LocalReduceBy(code string, sortOptions ...*SortOption) *Dataset
func (*Dataset) LocalReducerBy ¶
func (d *Dataset) LocalReducerBy(reducerId gio.ReducerId, sortOptions ...*SortOption) *Dataset
func (*Dataset) LocalSort ¶
func (d *Dataset) LocalSort(sortOptions ...*SortOption) *Dataset
func (*Dataset) Map ¶
Map operates on each row, and the returned results are passed to next dataset.
func (*Dataset) Mapper ¶
Mapper runs the mapper registered to the mapperId. This is used to execute pure Go code.
func (*Dataset) MergeSortedTo ¶
func (d *Dataset) MergeSortedTo(partitionCount int, sortOptions ...*SortOption) (ret *Dataset)
func (*Dataset) OnDisk ¶
OnDisk ensure the intermediate dataset are persisted to disk. This allows executors to run not in parallel if executors are limited.
func (*Dataset) Partition ¶
func (d *Dataset) Partition(shard int, sortOptions ...*SortOption) *Dataset
hash data or by data key, return a new dataset This is divided into 2 steps: 1. Each record is sharded to a local shard 2. The destination shard will collect its child shards and merge into one
func (*Dataset) Pipe ¶
Pipe runs the code as an external program, which processes the tab-separated input from the program's stdin, and outout to stdout also in tab-separated lines.
func (*Dataset) PipeAsArgs ¶
PipeAsArgs takes each row of input, bind to variables in parameter code. The variables are specified via $1, $2, etc. The code is run as the command for an external program for each row of input.
Watch for performance impact since it starts one Os process for each line of input.
func (*Dataset) PipeOut ¶
PipeOut writes to writer. If previous step is a Pipe() or PipeAsArgs(), the output is written as is. Otherwise, each row of output is written in tab-separated lines.
func (*Dataset) Printlnf ¶
Printlnf prints to os.Stdout in the specified format, adding an "\n" at the end of each format
func (*Dataset) ReduceBy ¶
func (d *Dataset) ReduceBy(code string, sortOptions ...*SortOption) (ret *Dataset)
func (*Dataset) ReducerBy ¶
func (d *Dataset) ReducerBy(reducerId gio.ReducerId, sortOptions ...*SortOption) (ret *Dataset)
ReducerBy runs the reducer registered to the reducerId. This is used to execute pure Go code.
func (*Dataset) RightOuterJoin ¶
func (d *Dataset) RightOuterJoin(other *Dataset, sortOptions ...*SortOption) *Dataset
func (*Dataset) RoundRobin ¶
func (*Dataset) Run ¶
func (d *Dataset) Run(option ...FlowOption)
Run starts the whole flow. This is a convenient method, same as *Flow.Run()
func (*Dataset) RunContext ¶
func (d *Dataset) RunContext(ctx context.Context, option ...FlowOption)
Run starts the whole flow. This is a convenient method, same as *Flow.RunContext()
func (*Dataset) SaveFirstRowTo ¶
SaveFirstRowTo saves the first row's values into the operands.
func (*Dataset) Select ¶
func (d *Dataset) Select(sortOptions ...*SortOption) *Dataset
Select selects multiple fields into the next dataset. The index starts from 1.
func (*Dataset) Sort ¶
func (d *Dataset) Sort(sortOptions ...*SortOption) *Dataset
Sort sort on specific fields, default to the first field. Required Memory: about same size as each partition. example usage: Sort(Field(1,2)) means sorting on field 1 and 2.
func (*Dataset) Top ¶
func (d *Dataset) Top(k int, sortOptions ...*SortOption) *Dataset
Top streams through total n items, picking reverse ordered k items with O(n*log(k)) complexity. Required Memory: about same size as n items in memory
func (*Dataset) TreeMergeSortedTo ¶
func (d *Dataset) TreeMergeSortedTo(partitionCount int, factor int, sortOptions ...*SortOption) (ret *Dataset)
type DatasetShard ¶
type DatasetShard struct { Id int Dataset *Dataset ReadingTasks []*Task IncomingChan *util.Piper OutgoingChans []*util.Piper Counter int64 ReadyTime time.Time CloseTime time.Time Meta *DasetsetShardMetadata }
func (*DatasetShard) Closed ¶
func (s *DatasetShard) Closed() bool
func (*DatasetShard) Name ¶
func (s *DatasetShard) Name() string
func (*DatasetShard) TimeTaken ¶
func (s *DatasetShard) TimeTaken() time.Duration
type DatasetShardStatus ¶
type DatasetShardStatus int
const ( Untouched DatasetShardStatus = iota LocationAssigned InProgress InRetry Failed Successful )
type Flow ¶
type Flow struct { PrevScriptType string PrevScriptPart string Scripts map[string]func() script.Script Steps []*Step Datasets []*Dataset HashCode uint32 // contains filtered or unexported fields }
func (*Flow) AddAllToOneStep ¶
the task should run on the destination dataset shard
func (*Flow) AddLinkedNToOneStep ¶
func (*Flow) AddOneToAllStep ¶
the task should run on the source dataset shard input is nil for initial source dataset
func (*Flow) AddOneToEveryNStep ¶
func (*Flow) AddOneToOneStep ¶
the tasks should run on the source dataset shard
func (*Flow) Init ¶
Init defines or declares variables or functions for the script. This piece of code is executed first, before each function that invokes a script.
func (*Flow) Listen ¶
Listen receives textual inputs via a socket. Multiple parameters are separated via tab.
func (*Flow) MergeDatasets1ShardTo1Step ¶
All dataset should have the same number of shards.
func (*Flow) OnInterrupt ¶
func (fc *Flow) OnInterrupt()
func (*Flow) Read ¶
Read accepts a function to read data into the flow, creating a new dataset. This allows custom complicated pre-built logic for new data sources.
func (*Flow) Run ¶
func (fc *Flow) Run(options ...FlowOption)
func (*Flow) RunContext ¶
func (fc *Flow) RunContext(ctx context.Context, options ...FlowOption)
func (*Flow) Source ¶
Source produces data feeding into the flow. Function f writes to this writer. The written bytes should be MsgPack encoded []byte. Use util.EncodeRow(...) to encode the data before sending to this channel
type FlowConfig ¶
type FlowConfig struct {
OnDisk bool
}
type FlowHintOption ¶
type FlowHintOption func(c *FlowConfig)
type FlowOption ¶
type FlowOption interface {
GetFlowRunner() FlowRunner
}
type FlowRunner ¶
type NetworkType ¶
type NetworkType int
const ( OneShardToOneShard NetworkType = iota OneShardToAllShard AllShardToOneShard OneShardToEveryNShard LinkedNShardToOneShard MergeTwoShardToOneShard )
type SortOption ¶
type SortOption struct {
// contains filtered or unexported fields
}
func Field ¶
func Field(indexes ...int) *SortOption
By groups the indexes, usually start from 1, into a []int
func OrderBy ¶
func OrderBy(index int, ascending bool) *SortOption
func (*SortOption) By ¶
func (o *SortOption) By(index int, ascending bool) *SortOption
OrderBy chains a list of sorting order by
type Step ¶
type Step struct { Id int Flow *Flow InputDatasets []*Dataset OutputDataset *Dataset Function func([]io.Reader, []io.Writer, *pb.InstructionStat) error Instruction instruction.Instruction Tasks []*Task Name string NetworkType NetworkType IsOnDriverSide bool IsPipe bool IsGoCode bool Script script.Script Command *script.Command // used in Pipe() Meta *StepMetadata Params map[string]interface{} RunLocked }
func (*Step) GetScriptCommand ¶
func (*Step) RunFunction ¶
func (*Step) SetInstruction ¶
func (step *Step) SetInstruction(ins instruction.Instruction)
type StepMetadata ¶
type Task ¶
type Task struct { Id int Step *Step InputShards []*DatasetShard InputChans []*util.Piper // task specific input chans. InputShard may have multiple reading tasks OutputShards []*DatasetShard Stat *pb.InstructionStat }
Source Files ¶
- context.go
- context_hint.go
- context_script.go
- dataset.go
- dataset_cogroup.go
- dataset_do.go
- dataset_group.go
- dataset_hint.go
- dataset_join.go
- dataset_join_hash.go
- dataset_map.go
- dataset_merge.go
- dataset_output.go
- dataset_partition.go
- dataset_pipe.go
- dataset_reduce.go
- dataset_sort.go
- dataset_sort_option.go
- dataset_source.go
- runner.go
- runner_on_interrupt.go
- step.go
- structure.go