gopark

package module
v0.0.0-...-bd15353 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 28, 2013 License: Apache-2.0, BSD-3-Clause, MIT, + 1 more Imports: 23 Imported by: 0

README

GoPark

GoPark is a naive local version porting of Spark/Dpark, MapReduce(R) alike computing framework supporting iterative computation.

GoPark is implemented in Go languange, and provides the cocurrent MapReduce(R) data operations using GoRoutines. It can only run in the local mode, but you can specify the concurrent numbers.

Examples

Examples for computing PI:

package main

import (
    "fmt"
    "github.com/mijia/gopark"
    "math/rand"
)

func main() {
    gopark.ParseOptions()
    c := gopark.NewContext("ComputePI")
    defer c.Stop()

    N := 100000
    iters := c.Data(make([]interface{}, N))
    count := iters.Map(func(_ interface{}) interface{} {
        x := rand.Float32()
        y := rand.Float32()
        if x*x+y*y < 1 {
            return 1
        } else {
            return 0
        }
    }).Reduce(func(x, y interface{}) interface{} {
        return x.(int) + y.(int)
    }).(int)
    fmt.Println("Pi =", (4.0 * float64(count) / float64(N)))
}

The above code can be runned as (using 4 go routines concurrently.)

$ go run hello.go -p=4

Checkout the examples/ for more cases.

interface{}

As the examples shows, since Go only provides the interface{} as the root type for everything and the type check/assertion is very strict in golang, so all the apis are implemented using the interface{} as the parameters. Have to do the type asserting in the closure functions. This also applies to the []interface{}.

The basic closure functions are like,

type MapperFunc func(interface{}) interface{}
type PartitionMapperFunc func([]interface{}) []interface{}
type FlatMapperFunc func(interface{}) []interface{}
type ReducerFunc func(interface{}, interface{}) interface{}
type FilterFunc func(interface{}) bool
type LoopFunc func(interface{})

Shuffle and Shuffle_N like funcs

Some functions which do shuffle job like GroupByKey() also provides the GroupByKey_N() func, which user can specify the numPartitions that job should run on. Please check rdd.go for references.

Encode / Gob

For the shuffle jobs like GroupByKey() and Persist(), GoPark uses encoding/gob as the encoder/decoder into local files, since GoPark uses interface{} as the parameters, GOB need to know what the interface{} actually is when decoding. Which can be done like the kmeans.go example shows,

type CenterCounter struct {
    X     gopark.Vector
    Count int
}

gob.Register(new(CenterCounter))

and you cannot use structs with unexported fields. Just be careful with this, if you got runtime panics, please check

  • if you have use a complicated struct
  • if you hadn't register the type on GOB, even like type T int
  • if you have use slices of slices of slices .... Just make sure GOB knows your objects behind the interface{} and []interface{}.

Things not included

So far, the Broadcast are still not implemented. I am just using the vars in closure.

And GoPark now really cannot run in the distributed mode.

Have fun~

Originally, I have only two goals in writing this,

  • Write some real stuff in Go, since I am learning the language
  • I am doing data mining jobs and I need some better concurrent framework for performance, and runs locally is ok for me.

Spark/DPark

These projects are really awesome and the RDD is really a fantastic data structure or design pattern. I learned a lot in them.

Really want to thank these two projects.

Documentation

Index

Constants

View Source
const DEFAULT_FILE_SPLIT_SIZE = 64 * 1024 * 1024 // 64MB Split Size

////////////////////////////////////////////////////////////////////// TextFileRDD Impl //////////////////////////////////////////////////////////////////////

View Source
const ENCODE_BUFFER_SIZE = 10000
View Source
const SHUFFLE_MAGIC_ID = 9999

////////////////////////////////////////////////////////////////////// ShuffledRDD Impl //////////////////////////////////////////////////////////////////////

Variables

This section is empty.

Functions

func HashCode

func HashCode(value interface{}) int64

func MaxInt64

func MaxInt64(a, b int64) int64

func MinInt64

func MinInt64(a, b int64) int64

func ParseOptions

func ParseOptions()

func Range

func Range(start, end int) []interface{}

Types

type AccumulateFunc

type AccumulateFunc func(x, y interface{}) interface{}

type Accumulator

type Accumulator interface {
	Add(interface{})
	Value() interface{}
}

type AccumulatorParam

type AccumulatorParam interface {
	AddFunc() AccumulateFunc
}
var IntAccumulatorParam AccumulatorParam
var ListAccumulatorParam AccumulatorParam

type AtomicInt

type AtomicInt int64

func (*AtomicInt) Add

func (i *AtomicInt) Add(n int64)

func (*AtomicInt) Get

func (i *AtomicInt) Get() int64

func (*AtomicInt) String

func (i *AtomicInt) String() string

type BufferEncoder

type BufferEncoder struct {
	// contains filtered or unexported fields
}

func NewBufferEncoder

func NewBufferEncoder(size int) *BufferEncoder

func (*BufferEncoder) Decode

func (e *BufferEncoder) Decode(f *os.File) ([]interface{}, error)

func (*BufferEncoder) Encode

func (e *BufferEncoder) Encode(f *os.File, value interface{}) error

func (*BufferEncoder) Flush

func (e *BufferEncoder) Flush(f *os.File) error

type CombinerCreator

type CombinerCreator func(interface{}) []interface{}

We need the []interface{} type for the gob to work

type CombinerMerger

type CombinerMerger func([]interface{}, []interface{}) []interface{}

type Context

type Context struct {
	// contains filtered or unexported fields
}

func NewContext

func NewContext(jobName string) *Context

func (*Context) Accumulator

func (c *Context) Accumulator(initValue int) Accumulator

func (*Context) AccumulatorWithParam

func (c *Context) AccumulatorWithParam(initValue interface{}, param AccumulatorParam) Accumulator

func (*Context) Data

func (c *Context) Data(d []interface{}) RDD

func (*Context) Data_N

func (c *Context) Data_N(d []interface{}, numPartitions int) RDD

func (*Context) Stop

func (c *Context) Stop()

func (*Context) String

func (c *Context) String() string

func (*Context) TextFile

func (c *Context) TextFile(pathname string) RDD

func (*Context) Union

func (c *Context) Union(rdds []RDD) RDD

type EncodeBox

type EncodeBox struct {
	Object interface{}
}

type FilterFunc

type FilterFunc func(interface{}) bool

type FlatMapperFunc

type FlatMapperFunc func(interface{}) []interface{}

type IndexedVector

type IndexedVector map[interface{}]float64

func NewIndexedVector

func NewIndexedVector() IndexedVector

IndexedVector methods

func (IndexedVector) Copy

func (v IndexedVector) Copy() IndexedVector

func (IndexedVector) Cosine

func (v IndexedVector) Cosine(o IndexedVector) float64

func (IndexedVector) Divide

func (v IndexedVector) Divide(d float64) IndexedVector

func (IndexedVector) Dot

func (IndexedVector) EulaDistance

func (v IndexedVector) EulaDistance(o IndexedVector) float64

func (IndexedVector) Keys

func (v IndexedVector) Keys() []interface{}

func (IndexedVector) Magnitude

func (v IndexedVector) Magnitude() float64

func (IndexedVector) Minus

func (IndexedVector) Multiply

func (v IndexedVector) Multiply(m float64) IndexedVector

func (IndexedVector) NormL2

func (v IndexedVector) NormL2() float64

func (IndexedVector) Plus

func (IndexedVector) RandomFeature

func (v IndexedVector) RandomFeature(feature interface{})

func (IndexedVector) RandomLimitedFeature

func (v IndexedVector) RandomLimitedFeature(feature interface{}, minValue, maxValue float64)

func (IndexedVector) RandomNormFeature

func (v IndexedVector) RandomNormFeature(feature interface{}, dev, mean float64)

func (IndexedVector) String

func (v IndexedVector) String() string

func (IndexedVector) Sum

func (v IndexedVector) Sum() float64

type KeyGroups

type KeyGroups struct {
	Key    interface{}
	Groups [][]interface{}
}

type KeyLessFunc

type KeyLessFunc func(x, y interface{}) bool

type KeyValue

type KeyValue struct {
	Key   interface{}
	Value interface{}
}

func (*KeyValue) String

func (kv *KeyValue) String() string

type LoopFunc

type LoopFunc func(interface{})

type MapperFunc

type MapperFunc func(interface{}) interface{}

type ParkSorter

type ParkSorter struct {
	// contains filtered or unexported fields
}

func NewParkSorter

func NewParkSorter(values []interface{}, fn KeyLessFunc) *ParkSorter

func (*ParkSorter) Len

func (s *ParkSorter) Len() int

func (*ParkSorter) Less

func (s *ParkSorter) Less(i, j int) bool

func (*ParkSorter) Swap

func (s *ParkSorter) Swap(i, j int)

type PartitionMapperFunc

type PartitionMapperFunc func(Yielder) Yielder

type Partitioner

type Partitioner interface {
	// contains filtered or unexported methods
}

type RDD

type RDD interface {
	Map(f MapperFunc) RDD
	MapPartition(f PartitionMapperFunc) RDD
	FlatMap(f FlatMapperFunc) RDD
	Filter(f FilterFunc) RDD
	Sample(fraction float64, seed int64, withReplacement bool) RDD
	GroupByKey() RDD
	SortByKey(fn KeyLessFunc, reverse bool) RDD
	SortByValue(fn KeyLessFunc, reverse bool) RDD
	PartitionByKey() RDD
	ReduceByKey(fn ReducerFunc) RDD
	Distinct() RDD
	Union(other RDD) RDD
	Join(other RDD) RDD
	LeftOuterJoin(other RDD) RDD
	RightOuterJoin(other RDD) RDD
	GroupWith(other RDD) RDD
	Cartesian(other RDD) RDD

	GroupByKey_N(numPartitions int) RDD
	SortByKey_N(fn KeyLessFunc, reverse bool, numPartitions int) RDD
	SortByValue_N(fn KeyLessFunc, reverse bool, numPartitions int) RDD
	PartitionByKey_N(numPartitions int) RDD
	ReduceByKey_N(fn ReducerFunc, numPartitions int) RDD
	Distinct_N(numPartitions int) RDD
	Join_N(other RDD, numPartitions int) RDD
	LeftOuterJoin_N(other RDD, numPartitions int) RDD
	RightOuterJoin_N(other RDD, numPartitions int) RDD
	GroupWith_N(other RDD, numPartitions int) RDD

	Reduce(fn ReducerFunc) interface{}
	CountByKey() map[interface{}]int64
	CountByValue() map[interface{}]int64
	Take(n int64) []interface{}
	Collect() []interface{}
	CollectAsMap() map[interface{}]interface{}
	Count() int64
	Foreach(fn LoopFunc)
	Foreach0(fn LoopFunc)
	SaveAsTextFile(pathname string)

	Cache() RDD
	Persist() RDD
	// contains filtered or unexported methods
}

type ReducerFn

type ReducerFn func(yield Yielder, partition int) interface{}

type ReducerFunc

type ReducerFunc func(interface{}, interface{}) interface{}

type Scheduler

type Scheduler interface {
	// contains filtered or unexported methods
}

//////////////////////////////////////////////////// DAGScheduler base impl ////////////////////////////////////////////////////

type Split

type Split interface {
	// contains filtered or unexported methods
}

type ValueMerger

type ValueMerger func([]interface{}, interface{}) []interface{}

type Vector

type Vector []float64

func NewRandomLimitedVector

func NewRandomLimitedVector(size int, minValue, maxValue float64) Vector

func NewRandomNormVector

func NewRandomNormVector(size int, dev, mean float64) Vector

func NewRandomVector

func NewRandomVector(size int) Vector

func NewSameValueVector

func NewSameValueVector(size int, value float64) Vector

func NewZeroVector

func NewZeroVector(size int) Vector

Vector methods

func (Vector) Cosine

func (v Vector) Cosine(o Vector) float64

func (Vector) Divide

func (v Vector) Divide(d float64) Vector

func (Vector) Dot

func (v Vector) Dot(o Vector) float64

func (Vector) EulaDistance

func (v Vector) EulaDistance(o Vector) float64

func (Vector) Magnitude

func (v Vector) Magnitude() float64

func (Vector) Minus

func (v Vector) Minus(o Vector) Vector

func (Vector) Multiply

func (v Vector) Multiply(m float64) Vector

func (Vector) NormL2

func (v Vector) NormL2() float64

func (Vector) Plus

func (v Vector) Plus(o Vector) Vector

func (Vector) String

func (v Vector) String() string

func (Vector) Sum

func (v Vector) Sum() float64

type Yielder

type Yielder chan interface{}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL