mr

package
v0.8.8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 28, 2023 License: Apache-2.0 Imports: 13 Imported by: 0

Documentation

Overview

Package mr has a simple MapReduce implementation, one that does everything inside the task manager (no outside files). This limits what it is good for, but makes for a lovely stress test, and shows off some useful task manager interaction patterns.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Fingerprint64

func Fingerprint64(key []byte) uint64

Fingerprint64 produces a 64-bit unsigned integer from a byte string.

func FirstValueReducer

func FirstValueReducer(ctx context.Context, input ReducerInput) ([]byte, error)

FirstValueReducer outputs its first value and quits.

func IdentityMapper

func IdentityMapper(ctx context.Context, key, value []byte, emit MapEmitFunc) error

IdentityMapper produces the same output as its input.

func NilReducer

func NilReducer(ctx context.Context, input ReducerInput) ([]byte, error)

NilReducer produces a single nil value for the provided key. This can be useful for sorting keys, for example, where the values are not useful or important.

func ShardForKey

func ShardForKey(key []byte, n int) int

ShardForKey produces the shard for a given byte slice and number of shards.

func SliceReducer

func SliceReducer(ctx context.Context, input ReducerInput) ([]byte, error)

SliceReducer produces a JSON-serialized slice of all values in its input.

func SumReducer

func SumReducer(ctx context.Context, input ReducerInput) ([]byte, error)

SumReducer produces a sum over (int) values for each key.

func WordCountMapper

func WordCountMapper(ctx context.Context, key, value []byte, emit MapEmitFunc) error

WordCountMapper produces word:1 for each word in the value. The input key is ignored. Splitting is purely based on whitespace, and is quite naive.

Types

type CollectingMapEmitter

type CollectingMapEmitter struct {
	NumShards int
	// contains filtered or unexported fields
}

CollectingMapEmitter collects all of its output into a slice of shards, each member of which contains a slice of kev/value pairs.

func NewCollectingMapEmitter

func NewCollectingMapEmitter(numShards int) *CollectingMapEmitter

NewCollectingMapEmitter creates a shard map emitter for use by a mapper. When mapping is done, the data is collected into sorted slices of key/value pairs, one per shard.

func (*CollectingMapEmitter) AsModifyArgs

func (e *CollectingMapEmitter) AsModifyArgs(qPrefix string, additional ...entroq.ModifyArg) ([]entroq.ModifyArg, error)

AsModifyArgs returns a slice of arguments to be sent to insert new shuffle tasks after emissions are complete. Additional modifications can be passed in to make, e.g., simultaneous task deletion easier to specify.

func (*CollectingMapEmitter) Emit

func (e *CollectingMapEmitter) Emit(_ context.Context, key, value []byte) error

Emit adds a new key/value pair to the emitter.

type KV

type KV struct {
	Key   []byte `json:"key"`
	Key2  []byte `json:"key2"` // secondary key for sorting
	Value []byte `json:"value"`
}

KV contains instructions for a mapper. It is just a key and value.

func NewKV

func NewKV(key, value []byte) *KV

NewKV creates a new key/value struct.

func (*KV) String

func (kv *KV) String() string

String converts this key/value pair into a readable string.

type MapEmitFunc

type MapEmitFunc func(ctx context.Context, key, value []byte) error

MapEmitFunc is the emit function passed to mappers.

type MapEmitter

type MapEmitter interface {
	// Emit is called to output data from a mapper.
	Emit(ctx context.Context, key, value []byte) error

	// AsModifyArgs takes a *completed* map output and produces shuffle task
	// insertions from that output. Adds optional additional arguments as
	// needed by the caller (for example, the caller may desire to delete the
	// map task at the same time). Tasks are added to the queue <prefix>/<shard>.
	AsModifyArgs(prefix string, additional ...entroq.ModifyArg) ([]entroq.ModifyArg, error)
}

MapEmitter is passed to a map input processor so it can emit multiple outputs for a single input by calling it.

type MapReduce

type MapReduce struct {
	QueuePrefix string
	NumMappers  int
	NumReducers int

	Map         Mapper
	Reduce      Reducer
	EarlyReduce Reducer

	Data []*KV
	// contains filtered or unexported fields
}

MapReduce creates and runs a full mapreduce pipeline, using EntroQ as its state storage under the given queue prefix. It spawns in-memory workers as goroutines, using the provided worker counts.

func NewMapReduce

func NewMapReduce(eq *entroq.EntroQ, qPrefix string, opts ...MapReduceOption) *MapReduce

NewMapReduce creates a mapreduce pipeline config, from which workers can be started to carry out the desired data manipulations.

func (*MapReduce) Run

func (mr *MapReduce) Run(ctx context.Context) (string, error)

Run starts the mapreduce pipeline, adding data to EntroQ and starting workers. Returns the output queue with finished tasks.

type MapReduceOption

type MapReduceOption func(*MapReduce)

MapReduceOption modifies how a MapReduce is created.

func AddInput

func AddInput(kvs ...*KV) MapReduceOption

AddInput adds an input KV to a mapreduce.

func WithEarlyReduce

func WithEarlyReduce(r Reducer) MapReduceOption

WithEarlyReduce sets the early reducer for map operations.

func WithMap

func WithMap(m Mapper) MapReduceOption

WithMap instructs the mapreduce to use the given mapper. Defaults to IdentityMapper.

func WithNumMappers

func WithNumMappers(n int) MapReduceOption

WithNumMappers sets the number of map workers.

func WithNumReducers

func WithNumReducers(n int) MapReduceOption

WithNumReducers sets the number of reduce workers.

func WithReduce

func WithReduce(r Reducer) MapReduceOption

WithReduce instructs the mapreduce to use the given reducer. Defaults to NilReducer.

type MapWorker

type MapWorker struct {
	Name string

	InputQueue   string
	OutputPrefix string
	Map          Mapper
	EarlyReduce  Reducer
	// contains filtered or unexported fields
}

MapWorker claims map input tasks, processes them, and produces output tasks.

func NewMapWorker

func NewMapWorker(eq *entroq.EntroQ, inQueue string, newEmitter func() MapEmitter, opts ...MapWorkerOption) *MapWorker

NewMapWorker creates a new MapWorker, which loops until told to stop, claiming tasks and processing them, placing them into an appropriate output queue calculated from the output key.

func (*MapWorker) Run

func (w *MapWorker) Run(ctx context.Context) error

Run runs the map worker. It blocks, running until the map queue is empty, it encounters an error, or its context is canceled, whichever comes first. If this should be run in a goroutine, that is up to the caller. The task value is expected to be a JSON-serialized KV struct.

Runs until the context is canceled or an unrecoverable error is encountered.

type MapWorkerOption

type MapWorkerOption func(*MapWorker)

MapWorkerOption is passed to NewMapWorker to change what it does.

func MapAsName

func MapAsName(name string) MapWorkerOption

MapAsName sets the name for this worker. Worker names are empty by default.

func MapToOutputPrefix

func MapToOutputPrefix(p string) MapWorkerOption

MapToOutputPrefix provides an output queue prefix separate from the input queue for mappers to place output tasks into. If not provided, mappers append "done" to the input queue to form the output queue prefix.

func WithEarlyReducer

func WithEarlyReducer(r Reducer) MapWorkerOption

WithEarlyReducer provides a reducer that can accept map output and produce reduce input, ideally in a "reduced" way. This works if the input value is the same type as the output value, which is not always the case

The inputs and outputs are the same as for a Reducer, but *this reducer must be able to operate on its own output*.

This would be useful, for example, when summing over words to produce a count of each unique word. The mapper may output "1" for each word, then the intermediate reducer sums up all like words, producing an count. The final reduction and intermediate shuffling then have much less work to do because much of the reducing is happening in the map phase.

func WithMapper

func WithMapper(m Mapper) MapWorkerOption

WithMapper provides a mapper process to a map worker. The default is IdentityMapper if not specified as an option.

type Mapper

type Mapper func(ctx context.Context, key, value []byte, emit MapEmitFunc) error

MapProcessor is a function that accepts a key/value pair and emits zero or more key/value pairs for reducing.

type ReduceWorker

type ReduceWorker struct {
	Name string

	MapEmptyQueue string
	InputQueue    string
	OutputQueue   string
	Reduce        Reducer
	// contains filtered or unexported fields
}

ReduceWorker consumes shuffle output and combines all values for a particular key into a single key/value pair, which is then JSON-serialized, one item per line, and written to a provided emitter.

func NewReduceWorker

func NewReduceWorker(eq *entroq.EntroQ, mapEmptyQueue, inQueue string, opts ...ReduceWorkerOption) *ReduceWorker

NewReduceWorker creates a reduce worker for the given task client and input queue, running the reducer over every unique key.

func (*ReduceWorker) Run

func (w *ReduceWorker) Run(ctx context.Context) error

Run starts a worker that consumes all reduce tasks in a particular queue, and quits when it is empty. It actually does map output merging, too, since there are exactly as many mergers as reducers, and reduce cannot proceed until merging is finished.

The worker watches for - more than one task in its input queue, and - an empty map queue.

This worker has no need to claim tasks to do shuffling. It is assigned a specific queue representing its shard, and no other worker will be assigned the same queue.

Shuffle logic is as follows. In a watch/sleep loop, - When more than one task is in the input queue, merge them into a single sorted task and replace. - If only one task exists and there are no more map tasks, proceed to reduce.

Reduce logic: - pull (singleton) task from input queue - run reduce over it and place the resulting sorted key/value pairs into the output queue. - quit

type ReduceWorkerOption

type ReduceWorkerOption func(*ReduceWorker)

ReduceWorkerOption is passed to NewReduceWorker to specify non-default options.

func ReduceAsName

func ReduceAsName(name string) ReduceWorkerOption

ReduceAsName sets the name of this reduce worker, defaults to blank.

func ReduceToOutput

func ReduceToOutput(q string) ReduceWorkerOption

ReduceToOutput specifies the output queue name for finished reduce shards.

func WithReducer

func WithReducer(reduce Reducer) ReduceWorkerOption

WithReducer specifies the reducer, otherwise NilReducer is used.

type Reducer

type Reducer func(ctx context.Context, input ReducerInput) ([]byte, error)

Reducer is called once per unique map-output key. It is expected to output a single value for all inputs.

type ReducerInput

type ReducerInput interface {
	// Key produces the key for this reduce operation.
	Key() []byte

	// Value outputs the current value in the input.
	Value() []byte

	// Err returns any errors encountered while iterating over input.
	Err() error

	// Next must be called before Value() (but Key() is always available).
	//
	// Example:
	//
	// 	for input.Next() {
	// 		process(input.Value())
	// 	}
	// 	if err := input.Err(); err != nil {
	// 		return fmt.Errorf("error getting input: %w", err)
	// 	}
	Next() bool
}

ReducerInput provides a streaming interface for getting values during reduction.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL