sophie: github.com/daviddengcn/sophie/mr Index | Files

package mr

import "github.com/daviddengcn/sophie/mr"

Package mr provides a local concurrent computing model(MapReduce) using Sophie serialization.

A simple word count example is like this:

job := MrJob {
	Source: []Input{...},

	NewMapperF: func(src, part int) Mapper {
		return &MapperStruct {
			NewKeyF: sophie.NewString,
			NewValF: sophie.ReturnNULL,
			MapperF: func(key, val sophie.SophieWriter, c PartCollector) error {
				line := key.(*RawString).String()
				words := strings.Split(line, " ")
				for _, word: range words {
					c.CollectTo(0, sophie.RawString(word), sophie.VInt(1))
				}
			},
		}
	},

	NewReducerF: func(part int) Reducer {
		return &ReducerStruct {
			NewKeyF: sophie.NewRawString,
			NewValF: sophie.NewVInt,
			ReducerF: func((key sophie.SophieWriter, nextVal SophierIterator,
				c []sophie.Collector) error {
				var count sophie.VInt
				for {
					val, err := nextVal()
					if err == io.EOF {
						break
					}
					if err != nil {
						return err
					}
					count += val.(*sophie.VInt).Val()
				}
				return c[0].Collect(key, count)
			},
		}
	},

	Dest: []Output{...},
}

if err := job.Run(); err != nil {
	log.Fatalf("job.Run failed: %v", err)
}

One can also use MapOnlyJob for simple jobs.

Index

Package Files

io.go mo.go mr.go sorters.go utils.go

Variables

var (
    // end of map, an error returned by a Mapper/OnlyMapper.Map indicating a
    // stop of continuing mapping
    EOM = errors.New("EOM")
)
var NullOutput = &OutputStruct{}

A helper variable with an Output returning the NullCollectCloser

type FileSorter Uses

type FileSorter struct {
    sync.RWMutex
    TmpFolder sophie.FsPath
    // contains filtered or unexported fields
}

FileSorter is a Sorter that stores mapped kv pairs in a TmpFolder and will read to memory, sort and reduce.

func NewFileSorter Uses

func NewFileSorter(TmpFolder sophie.FsPath) *FileSorter

func (*FileSorter) ClosePartCollectors Uses

func (fs *FileSorter) ClosePartCollectors() (err error)

Sorted interface

func (*FileSorter) CollectTo Uses

func (fs *FileSorter) CollectTo(part int, key, val sophie.SophieWriter) error

PartCollector interface

func (*FileSorter) NewPartCollector Uses

func (fs *FileSorter) NewPartCollector(int) (PartCollector, error)

Sorted interface

func (*FileSorter) NewReduceIterator Uses

func (fs *FileSorter) NewReduceIterator(part int) (ReduceIterator, error)

Sorted interface

func (*FileSorter) ReduceParts Uses

func (fs *FileSorter) ReduceParts() []int

Sorted interface

type Input Uses

type Input interface {
    // PartCount returns the number partitions.
    PartCount() (int, error)
    // index range [0, PartCount())
    Iterator(index int) (sophie.IterateCloser, error)
}

Input represents a specified input source for a mr job.

type InputStruct Uses

type InputStruct struct {
    PartCountF func() (int, error)
    IteratorF  func(index int) (sophie.IterateCloser, error)
}

A struct implementing the Input interface by funcs.

func (*InputStruct) Iterator Uses

func (is *InputStruct) Iterator(index int) (sophie.IterateCloser, error)

Input interface

func (*InputStruct) PartCount Uses

func (is *InputStruct) PartCount() (int, error)

Input interface

type MapOnlyJob Uses

type MapOnlyJob struct {
    // The slice of Inputs
    Source []Input

    // The factory for OnlyMappers
    NewMapperF func(src, part int) OnlyMapper

    // The slice of Outputs
    Dest []Output
}

MapOnlyJob is a job with a mapping step only.

func (*MapOnlyJob) Run Uses

func (job *MapOnlyJob) Run() error

Runs the job. If some of the mapper failed, one of the error is returned.

type Mapper Uses

type Mapper interface {
    // NewKey returns a new instance of the key Sophier object.
    NewKey() sophie.Sophier
    // NewVal returns a new instance of the value Sophier object.
    NewVal() sophie.Sophier
    // Map converts the input kv pair to what the Reducer expect and send to
    // the PartCollector.
    Map(key, val sophie.SophieWriter, c PartCollector) error
    // MapEnd is invoked after a partition of the Input is mapped.
    MapEnd(c PartCollector) error
}

The mapping stage in MrJob.

type MapperStruct Uses

type MapperStruct struct {
    // Func for Mapper.NewKey
    NewKeyF func() sophie.Sophier
    // Func for Mapper.NewVal
    NewValF func() sophie.Sophier
    // Func for Mapper.Map
    MapF func(key, val sophie.SophieWriter, c PartCollector) error
    // Func for Mapper.MapEnd
    MapEndF func(c PartCollector) error
}

A MapperStruct implementing Mapper by funcs.

func (*MapperStruct) Map Uses

func (ms *MapperStruct) Map(key, val sophie.SophieWriter,
    c PartCollector) error

Mapper interface

func (*MapperStruct) MapEnd Uses

func (ms *MapperStruct) MapEnd(c PartCollector) error

Mapper interface

func (*MapperStruct) NewKey Uses

func (ms *MapperStruct) NewKey() sophie.Sophier

Mapper interface

func (*MapperStruct) NewVal Uses

func (ms *MapperStruct) NewVal() sophie.Sophier

Mapper interface

type MemSorters Uses

type MemSorters struct {
    sync.RWMutex
    // contains filtered or unexported fields
}

MemSorters is a Sorter that stores all kv pairs in memory.

func NewMemSorters Uses

func NewMemSorters() *MemSorters

NewMemSorters creates a new *MemSorters.

func (*MemSorters) ClosePartCollectors Uses

func (*MemSorters) ClosePartCollectors() error

Sorter interface

func (*MemSorters) CollectTo Uses

func (ms *MemSorters) CollectTo(part int, key, val sophie.SophieWriter) error

PartCollector interface

func (*MemSorters) NewPartCollector Uses

func (ms *MemSorters) NewPartCollector(int) (PartCollector, error)

Sorter interface

func (*MemSorters) NewReduceIterator Uses

func (ms *MemSorters) NewReduceIterator(part int) (ReduceIterator, error)

Sorter interface

func (*MemSorters) ReduceParts Uses

func (ms *MemSorters) ReduceParts() []int

Sorter interface

type MrJob Uses

type MrJob struct {
    // The factory for Mappers
    NewMapperF func(src, part int) Mapper
    // The factory for Reducers
    NewReducerF func(part int) Reducer

    // The Sorter that sorts kv pairs mapped by Mappers and provides
    // SophierIterator for Reducers.
    Sorter Sorter

    // The source Inputs
    Source []Input
    // The destination Outputs
    Dest []Output
}

An MrJob contains a mapping step and a reducing step. In reducing step, kv pairs are sorted by keys, and values of a key are reduced using the Reducer.

func (*MrJob) Run Uses

func (job *MrJob) Run() error

Runs the MrJob. If Sorter is not specified, MemSorters is used.

type OnlyMapper Uses

type OnlyMapper interface {
    // NewKey returns a new instance of key for reading from Source
    NewKey() sophie.Sophier
    // NewVal returns a new instance of value for reading from Source
    NewVal() sophie.Sophier
    // Make a map action for a key/val pair, collecting results to c.
    // NOTE the key-value pairs will be reused on next call to Map, so don't
    // make a deep copy if you want to save the contents.
    // If sophie.EOM is returned the mapping is stopped (as sucess).
    // If other non-nil error is returned, the job is aborted as failure.
    // @param c  the slice of Collectors. Same length as Source.
    Map(key, val sophie.SophieWriter, c []sophie.Collector) error
    // Make a map action at final stage, collecting results to c
    // @param c  the slice of Collectors. Same length as Source.
    MapEnd(c []sophie.Collector) error
}

OnlyMapper is an interface defining the map actions for MapOnlyJob

type OnlyMapperStruct Uses

type OnlyMapperStruct struct {
    NewKeyF func() sophie.Sophier
    NewValF func() sophie.Sophier
    MapF    func(key, val sophie.SophieWriter, c []sophie.Collector) error
    MapEndF func(c []sophie.Collector) error
}

a struct implementing OnlyMapper with funcs.

func (*OnlyMapperStruct) Map Uses

func (oms *OnlyMapperStruct) Map(key, val sophie.SophieWriter,
    c []sophie.Collector) error

OnlyMapper interface

func (*OnlyMapperStruct) MapEnd Uses

func (oms *OnlyMapperStruct) MapEnd(c []sophie.Collector) error

OnlyMapper interface

func (*OnlyMapperStruct) NewKey Uses

func (oms *OnlyMapperStruct) NewKey() sophie.Sophier

OnlyMapper interface

func (*OnlyMapperStruct) NewVal Uses

func (oms *OnlyMapperStruct) NewVal() sophie.Sophier

OnlyMapper interface

type Output Uses

type Output interface {
    // Collector generates a sophie.CollectCloser for collecting kv pairs.
    // index is an interger indicating the index to some partition.
    Collector(index int) (sophie.CollectCloser, error)
}

Output represents a specified output destination for a mr job.

type OutputStruct Uses

type OutputStruct struct {
    CollectorF func(int) (sophie.CollectCloser, error)
}

OutputStruct is a struct whose pointer implements Output interface.

func (*OutputStruct) Collector Uses

func (o *OutputStruct) Collector(i int) (sophie.CollectCloser, error)

type PartCollector Uses

type PartCollector interface {
    CollectTo(part int, key, val sophie.SophieWriter) error
}

A collector that collects kv pairs to a specified part.

type ReduceIterator Uses

type ReduceIterator interface {
    // Iterate calls Reducer.Reduce for each key.
    Iterate(c []sophie.Collector, r Reducer) error
}

ReduceIterator is an object for Sort to call Reducer.

type Reducer Uses

type Reducer interface {
    // NewKey returns a new instance of the key Sophier object for reducing.
    NewKey() sophie.Sophier
    // NewVal returns a new instance of the value Sophier object for reducing.
    NewVal() sophie.Sophier
    // to get all values:
    //   for {
    //	 	val, err := nextVal()
    //   	if errorsp.Cause(err) == io.EOF {
    //   		break;
    //   	}
    //      if err != nil {
    //   		return err;
    //   	}
    //      ...
    //   }
    Reduce(key sophie.SophieWriter, nextVal SophierIterator,
        c []sophie.Collector) error
    // MapEnd is invoked after a partition of the reducing kv pairs is reduced.
    ReduceEnd(c []sophie.Collector) error
}

The reducing stage in MrJob.

type ReducerStruct Uses

type ReducerStruct struct {
    NewKeyF func() sophie.Sophier
    NewValF func() sophie.Sophier
    ReduceF func(key sophie.SophieWriter, nextVal SophierIterator,
        c []sophie.Collector) error
    ReduceEndF func(c []sophie.Collector) error
}

A struct implementing Reducer interface by funcs.

func (*ReducerStruct) NewKey Uses

func (rs *ReducerStruct) NewKey() sophie.Sophier

Reducer interface

func (*ReducerStruct) NewVal Uses

func (rs *ReducerStruct) NewVal() sophie.Sophier

Reducer interface

func (*ReducerStruct) Reduce Uses

func (rs *ReducerStruct) Reduce(key sophie.SophieWriter,
    nextVal SophierIterator, c []sophie.Collector) error

Reducer interface

func (*ReducerStruct) ReduceEnd Uses

func (rs *ReducerStruct) ReduceEnd(c []sophie.Collector) error

Reducer interface

type SophierIterator Uses

type SophierIterator func() (sophie.Sophier, error)

An interator for fetching a list of Sophiers. If io.EOF is returned as the error, no further Sophiers are avaiable. Typical usage:

var SophierIterator next
for {
    vl, err := next()
    if err == io.EOF {
        break
    }
    if err != nil {
        return err
    }
    ... consume vl ...
}

type Sorter Uses

type Sorter interface {
    // NewPartCollector returns a PartCollector for receiving kv pairs from
    // Mappers.
    NewPartCollector(inPart int) (PartCollector, error)
    // ClosePartCollectors closes all PartCollectors opened. This should be
    // called when all kv pairs have been collected.
    ClosePartCollectors() error
    // Returns a slice of integers of all the partition indexes.
    ReduceParts() []int
    // NewReduceIterator creates and returns a ReduceIterator for a partition.
    NewReduceIterator(part int) (ReduceIterator, error)
}

A Sorter is responsible for receiving all kv pairs from Mappers, sort them and send to Reducers.

Package mr imports 11 packages (graph) and is imported by 23 packages. Updated 2016-07-16. Refresh now. Tools for package owners.