Documentation ¶
Overview ¶
Package mr has a simple MapReduce implementation, one that does everything inside the task manager (no outside files). This limits what it is good for, but makes for a lovely stress test, and shows off some useful task manager interaction patterns.
Index ¶
- func Fingerprint64(key []byte) uint64
- func FirstValueReducer(ctx context.Context, input ReducerInput) ([]byte, error)
- func IdentityMapper(ctx context.Context, key, value []byte, emit MapEmitFunc) error
- func NilReducer(ctx context.Context, input ReducerInput) ([]byte, error)
- func ShardForKey(key []byte, n int) int
- func SliceReducer(ctx context.Context, input ReducerInput) ([]byte, error)
- func SumReducer(ctx context.Context, input ReducerInput) ([]byte, error)
- func WordCountMapper(ctx context.Context, key, value []byte, emit MapEmitFunc) error
- type CollectingMapEmitter
- type KV
- type MapEmitFunc
- type MapEmitter
- type MapReduce
- type MapReduceOption
- type MapWorker
- type MapWorkerOption
- type Mapper
- type ReduceWorker
- type ReduceWorkerOption
- type Reducer
- type ReducerInput
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func Fingerprint64 ¶
Fingerprint64 produces a 64-bit unsigned integer from a byte string.
func FirstValueReducer ¶
func FirstValueReducer(ctx context.Context, input ReducerInput) ([]byte, error)
FirstValueReducer outputs its first value and quits.
func IdentityMapper ¶
func IdentityMapper(ctx context.Context, key, value []byte, emit MapEmitFunc) error
IdentityMapper produces the same output as its input.
func NilReducer ¶
func NilReducer(ctx context.Context, input ReducerInput) ([]byte, error)
NilReducer produces a single nil value for the provided key. This can be useful for sorting keys, for example, where the values are not useful or important.
func ShardForKey ¶
ShardForKey produces the shard for a given byte slice and number of shards.
func SliceReducer ¶
func SliceReducer(ctx context.Context, input ReducerInput) ([]byte, error)
SliceReducer produces a JSON-serialized slice of all values in its input.
func SumReducer ¶
func SumReducer(ctx context.Context, input ReducerInput) ([]byte, error)
SumReducer produces a sum over (int) values for each key.
func WordCountMapper ¶
func WordCountMapper(ctx context.Context, key, value []byte, emit MapEmitFunc) error
WordCountMapper produces word:1 for each word in the value. The input key is ignored. Splitting is purely based on whitespace, and is quite naive.
Types ¶
type CollectingMapEmitter ¶
type CollectingMapEmitter struct { NumShards int // contains filtered or unexported fields }
CollectingMapEmitter collects all of its output into a slice of shards, each member of which contains a slice of kev/value pairs.
func NewCollectingMapEmitter ¶
func NewCollectingMapEmitter(numShards int) *CollectingMapEmitter
NewCollectingMapEmitter creates a shard map emitter for use by a mapper. When mapping is done, the data is collected into sorted slices of key/value pairs, one per shard.
func (*CollectingMapEmitter) AsModifyArgs ¶
func (e *CollectingMapEmitter) AsModifyArgs(qPrefix string, additional ...entroq.ModifyArg) ([]entroq.ModifyArg, error)
AsModifyArgs returns a slice of arguments to be sent to insert new shuffle tasks after emissions are complete. Additional modifications can be passed in to make, e.g., simultaneous task deletion easier to specify.
type KV ¶
type KV struct { Key []byte `json:"key"` Key2 []byte `json:"key2"` // secondary key for sorting Value []byte `json:"value"` }
KV contains instructions for a mapper. It is just a key and value.
type MapEmitFunc ¶
MapEmitFunc is the emit function passed to mappers.
type MapEmitter ¶
type MapEmitter interface { // Emit is called to output data from a mapper. Emit(ctx context.Context, key, value []byte) error // AsModifyArgs takes a *completed* map output and produces shuffle task // insertions from that output. Adds optional additional arguments as // needed by the caller (for example, the caller may desire to delete the // map task at the same time). Tasks are added to the queue <prefix>/<shard>. AsModifyArgs(prefix string, additional ...entroq.ModifyArg) ([]entroq.ModifyArg, error) }
MapEmitter is passed to a map input processor so it can emit multiple outputs for a single input by calling it.
type MapReduce ¶
type MapReduce struct { QueuePrefix string NumMappers int NumReducers int Map Mapper Reduce Reducer EarlyReduce Reducer Data []*KV // contains filtered or unexported fields }
MapReduce creates and runs a full mapreduce pipeline, using EntroQ as its state storage under the given queue prefix. It spawns in-memory workers as goroutines, using the provided worker counts.
func NewMapReduce ¶
func NewMapReduce(eq *entroq.EntroQ, qPrefix string, opts ...MapReduceOption) *MapReduce
NewMapReduce creates a mapreduce pipeline config, from which workers can be started to carry out the desired data manipulations.
type MapReduceOption ¶
type MapReduceOption func(*MapReduce)
MapReduceOption modifies how a MapReduce is created.
func WithEarlyReduce ¶
func WithEarlyReduce(r Reducer) MapReduceOption
WithEarlyReduce sets the early reducer for map operations.
func WithMap ¶
func WithMap(m Mapper) MapReduceOption
WithMap instructs the mapreduce to use the given mapper. Defaults to IdentityMapper.
func WithNumMappers ¶
func WithNumMappers(n int) MapReduceOption
WithNumMappers sets the number of map workers.
func WithNumReducers ¶
func WithNumReducers(n int) MapReduceOption
WithNumReducers sets the number of reduce workers.
func WithReduce ¶
func WithReduce(r Reducer) MapReduceOption
WithReduce instructs the mapreduce to use the given reducer. Defaults to NilReducer.
type MapWorker ¶
type MapWorker struct { Name string InputQueue string OutputPrefix string Map Mapper EarlyReduce Reducer // contains filtered or unexported fields }
MapWorker claims map input tasks, processes them, and produces output tasks.
func NewMapWorker ¶
func NewMapWorker(eq *entroq.EntroQ, inQueue string, newEmitter func() MapEmitter, opts ...MapWorkerOption) *MapWorker
NewMapWorker creates a new MapWorker, which loops until told to stop, claiming tasks and processing them, placing them into an appropriate output queue calculated from the output key.
func (*MapWorker) Run ¶
Run runs the map worker. It blocks, running until the map queue is empty, it encounters an error, or its context is canceled, whichever comes first. If this should be run in a goroutine, that is up to the caller. The task value is expected to be a JSON-serialized KV struct.
Runs until the context is canceled or an unrecoverable error is encountered.
type MapWorkerOption ¶
type MapWorkerOption func(*MapWorker)
MapWorkerOption is passed to NewMapWorker to change what it does.
func MapAsName ¶
func MapAsName(name string) MapWorkerOption
MapAsName sets the name for this worker. Worker names are empty by default.
func MapToOutputPrefix ¶
func MapToOutputPrefix(p string) MapWorkerOption
MapToOutputPrefix provides an output queue prefix separate from the input queue for mappers to place output tasks into. If not provided, mappers append "done" to the input queue to form the output queue prefix.
func WithEarlyReducer ¶
func WithEarlyReducer(r Reducer) MapWorkerOption
WithEarlyReducer provides a reducer that can accept map output and produce reduce input, ideally in a "reduced" way. This works if the input value is the same type as the output value, which is not always the case
The inputs and outputs are the same as for a Reducer, but *this reducer must be able to operate on its own output*.
This would be useful, for example, when summing over words to produce a count of each unique word. The mapper may output "1" for each word, then the intermediate reducer sums up all like words, producing an count. The final reduction and intermediate shuffling then have much less work to do because much of the reducing is happening in the map phase.
func WithMapper ¶
func WithMapper(m Mapper) MapWorkerOption
WithMapper provides a mapper process to a map worker. The default is IdentityMapper if not specified as an option.
type Mapper ¶
type Mapper func(ctx context.Context, key, value []byte, emit MapEmitFunc) error
MapProcessor is a function that accepts a key/value pair and emits zero or more key/value pairs for reducing.
type ReduceWorker ¶
type ReduceWorker struct { Name string MapEmptyQueue string InputQueue string OutputQueue string Reduce Reducer // contains filtered or unexported fields }
ReduceWorker consumes shuffle output and combines all values for a particular key into a single key/value pair, which is then JSON-serialized, one item per line, and written to a provided emitter.
func NewReduceWorker ¶
func NewReduceWorker(eq *entroq.EntroQ, mapEmptyQueue, inQueue string, opts ...ReduceWorkerOption) *ReduceWorker
NewReduceWorker creates a reduce worker for the given task client and input queue, running the reducer over every unique key.
func (*ReduceWorker) Run ¶
func (w *ReduceWorker) Run(ctx context.Context) error
Run starts a worker that consumes all reduce tasks in a particular queue, and quits when it is empty. It actually does map output merging, too, since there are exactly as many mergers as reducers, and reduce cannot proceed until merging is finished.
The worker watches for - more than one task in its input queue, and - an empty map queue.
This worker has no need to claim tasks to do shuffling. It is assigned a specific queue representing its shard, and no other worker will be assigned the same queue.
Shuffle logic is as follows. In a watch/sleep loop, - When more than one task is in the input queue, merge them into a single sorted task and replace. - If only one task exists and there are no more map tasks, proceed to reduce.
Reduce logic: - pull (singleton) task from input queue - run reduce over it and place the resulting sorted key/value pairs into the output queue. - quit
type ReduceWorkerOption ¶
type ReduceWorkerOption func(*ReduceWorker)
ReduceWorkerOption is passed to NewReduceWorker to specify non-default options.
func ReduceAsName ¶
func ReduceAsName(name string) ReduceWorkerOption
ReduceAsName sets the name of this reduce worker, defaults to blank.
func ReduceToOutput ¶
func ReduceToOutput(q string) ReduceWorkerOption
ReduceToOutput specifies the output queue name for finished reduce shards.
func WithReducer ¶
func WithReducer(reduce Reducer) ReduceWorkerOption
WithReducer specifies the reducer, otherwise NilReducer is used.
type Reducer ¶
type Reducer func(ctx context.Context, input ReducerInput) ([]byte, error)
Reducer is called once per unique map-output key. It is expected to output a single value for all inputs.
type ReducerInput ¶
type ReducerInput interface { // Key produces the key for this reduce operation. Key() []byte // Value outputs the current value in the input. Value() []byte // Err returns any errors encountered while iterating over input. Err() error // Next must be called before Value() (but Key() is always available). // // Example: // // for input.Next() { // process(input.Value()) // } // if err := input.Err(); err != nil { // return fmt.Errorf("error getting input: %w", err) // } Next() bool }
ReducerInput provides a streaming interface for getting values during reduction.