mapreduce

package
v0.0.0-...-42a8f8d Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 22, 2022 License: MIT Imports: 12 Imported by: 0

Documentation

Overview

Package mapreduce provides a simple mapreduce library with a sequential implementation. Applications should normally call Distributed() [located in master.go] to start a job, but may instead call Sequential() [also in master.go] to get a sequential execution for debugging purposes.

The flow of the mapreduce implementation is as follows:

  1. The application provides a number of input files, a map function, a reduce function, and the number of reduce tasks (nReduce).

  2. A master is created with this knowledge. It spins up an RPC server (see master_rpc.go), and waits for workers to register (using the RPC call Register() [defined in master.go]). As tasks become available (in steps 4 and 5), schedule() [schedule.go] decides how to assign those tasks to workers, and how to handle worker failures.

  3. The master considers each input file one map tasks, and makes a call to doMap() [common_map.go] at least once for each task. It does so either directly (when using Sequential()) or by issuing the DoJob RPC on a worker [worker.go]. Each call to doMap() reads the appropriate file, calls the map function on that file's contents, and produces nReduce files for each map file. Thus, there will be #files x nReduce files after all map tasks are done:

    f0-0, ..., f0-0, f0-<nReduce-1>, ..., f<#files-1>-0, ... f<#files-1>-<nReduce-1>.

  4. The master next makes a call to doReduce() [common_reduce.go] at least once for each reduce task. As for doMap(), it does so either directly or through a worker. doReduce() collects nReduce reduce files from each map (f-*-<reduce>), and runs the reduce function on those files. This produces nReduce result files.

  5. The master calls mr.merge() [master_splitmerge.go], which merges all the nReduce files produced by the previous step into a single output.

  6. The master sends a Shutdown RPC to each of its workers, and then shuts down its own RPC server.

TODO: You will have to write/modify doMap, doReduce, and schedule yourself. These are located in common_map.go, common_reduce.go, and schedule.go respectively. You will also have to write the map and reduce functions in ../main/wc.go.

You should not need to modify any other files, but reading them might be useful in order to understand how the other methods fit into the overall architecture of the system.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func RunWorker

func RunWorker(MasterAddress string, me string,
	MapFunc func(string, string) []KeyValue,
	ReduceFunc func(string, []string) string,
	nRPC int,
)

RunWorker sets up a connection with the master, registers its address, and waits for tasks to be scheduled.

Types

type DoTaskArgs

type DoTaskArgs struct {
	JobName    string
	File       string   // input file, only used in map tasks
	Phase      jobPhase // are we in mapPhase or reducePhase?
	TaskNumber int      // this task's index in the current phase

	// NumOtherPhase is the total number of tasks in other phase; mappers
	// need this to compute the number of output bins, and reducers needs
	// this to know how many input files to collect.
	NumOtherPhase int
}

DoTaskArgs holds the arguments that are passed to a worker when a job is scheduled on it.

type KeyValue

type KeyValue struct {
	Key   string
	Value string
}

KeyValue is a type used to hold the key/value pairs passed to the map and reduce functions.

type Master

type Master struct {
	sync.Mutex
	// contains filtered or unexported fields
}

Master holds all the state that the master needs to keep track of. Of particular importance is registerChannel, the channel that notifies the master of workers that have gone idle and are in need of new work.

func Distributed

func Distributed(jobName string, files []string, nreduce int, master string) (mr *Master)

Distributed schedules map and reduce tasks on workers that register with the master over RPC.

func Sequential

func Sequential(jobName string, files []string, nreduce int,
	mapF func(string, string) []KeyValue,
	reduceF func(string, []string) string,
) (mr *Master)

Sequential runs map and reduce tasks sequentially, waiting for each task to complete before scheduling the next.

func (*Master) CleanupFiles

func (mr *Master) CleanupFiles()

CleanupFiles removes all intermediate files produced by running mapreduce.

func (*Master) Register

func (mr *Master) Register(args *RegisterArgs, _ *struct{}) error

Register is an RPC method that is called by workers after they have started up to report that they are ready to receive tasks.

func (*Master) Shutdown

func (mr *Master) Shutdown(_, _ *struct{}) error

Shutdown is an RPC method that shuts down the Master's RPC server.

func (*Master) Wait

func (mr *Master) Wait()

Wait blocks until the currently scheduled work has completed. This happens when all tasks have scheduled and completed, the final output have been computed, and all workers have been shut down.

type RegisterArgs

type RegisterArgs struct {
	Worker string
}

RegisterArgs is the argument passed when a worker registers with the master.

type ShutdownReply

type ShutdownReply struct {
	Ntasks int
}

ShutdownReply is the response to a WorkerShutdown. It holds the number of tasks this worker has processed since it was started.

type Worker

type Worker struct {
	sync.Mutex

	Map    func(string, string) []KeyValue
	Reduce func(string, []string) string
	// contains filtered or unexported fields
}

Worker holds the state for a server waiting for DoTask or Shutdown RPCs

func (*Worker) DoTask

func (wk *Worker) DoTask(arg *DoTaskArgs, _ *struct{}) error

DoTask is called by the master when a new task is being scheduled on this worker.

func (*Worker) Shutdown

func (wk *Worker) Shutdown(_ *struct{}, res *ShutdownReply) error

Shutdown is called by the master when all work has been completed. We should respond with the number of tasks we have processed.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL