mapreduce

package
v0.0.0-...-ea8e5dc Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 4, 2020 License: MIT Imports: 12 Imported by: 0

README

老司机带你用 Go 语言实现 MapReduce 框架

  MapReduce 是 Google 提出的一个软件架构,用于大规模数据集(大于1TB)的并行运算。简而言之,就是将任务切分成很小的任务然后一个一个区的执行最后汇总,这就像小时候我们老师经常教育我们一样,大事化小,小事化了(瞬间感觉那时候老师好言简意赅啊!!!)思想就这么一个思想,那么按照这个思想在现代软件定义一切的世界里面,我们怎么运用这样的方式来解决海量数据的处理,这篇就告诉你一个这样的一个简单的实现使用 Go 语言。

上车

  简单介绍一下几个概念:

  概念“Map(映射)”和“Reduce(归纳)”,及他们的主要思想,都是从函数式编程语言借来的,还有从矢量编程语言借来的特性。当前的软件实现是指定一个 Map(映射)函数,用来把一组键值对映射成一组新的键值对,指定并发的 Reduce(归纳)函数,用来保证所有映射的键值对中的每一个共享相同的键组。

  以一个例子为简单的开始:

  词频的统计(WorldCount),在现实的需求的上面可能我们可能有这样的一个需求,就是计算出一篇文章里面出现每个单词的个数。具体到生活就是,就算 Top N 的结果,比如全校要开表彰大会,找出 10 个好学生这样的 Top N 这样的例子比比皆是,而 World Count 就是他的一个实现,只是最终的结果只取出排在前面的结果而已。

  有了上面找出 10 个好学生的需求的时候,我们来想想怎么去实现它呢,很显然这个需求可能是校长在开会的时候提出来的,那么具体的实现就是每个年级组长是不是要把每个年级排名前 10 的学生找出来,然后年级组长的领导,将这些信息在汇总取出 前 10 的学生咯,那么具体的每个年级怎么做呢?同理,将每个班的前10名学生找出来,然后汇总到年级部门咯。

发车

  基本概览和思路已经明白了,现在开始构建整个 MapReduce 框架了,首先我们明确一个思想就是,将任务划分成合适的大小,然后对其进行计算,然后将每一步计算的的结果,进行一个汇总合并的过程。那么这两个过程我们先分别定义为Map 和Reduce 过程。

  还是以 World Count 这个为例子:

  Map 的处理过程就是读取给定的文件,将文件里面的每个单词的出现频率初始化为 1。

  Reduce 的处理过程就是将相同的单词,数据进行一个累加的过程。那么,我们 MapReduce 框架的目的是调用在合适的时候调用这个 Map 和 Reduce 的过程。 在 common_map.go 里面 doMap 方法就是给定文件,读取数据然后,调用 Map 这个过程,代码里面有注释,在这里进行一个简单概述一下主要有这几个步骤:

  1. 读取文件;
  2. 将读文件的内容,调用用户 Map 函数,生产对于的 KeyValue 值;
  3. 最后按照 KeyValue 里面的 Key 进行分区,将内容写入到文件里面,以便于后面的 Reduce 过程执行;
func doMap(
	jobName string, // // the name of the MapReduce job
	mapTaskNumber int, // which map task this is
	inFile string,
	nReduce int, // the number of reduce task that will be run
	mapF func(file string, contents string) []KeyValue,
) {

	//setp 1 read file
	contents, err := ioutil.ReadFile(inFile)
	if err != nil {
		log.Fatal("do map error for inFile ",err)
	}
	//setp 2 call user user-map method ,to get kv
	kvResult := mapF(inFile, string(contents))

	/**
	 *   setp 3 use key of kv generator nReduce file ,partition
	 *      a. create tmpFiles
	 *      b. create encoder for tmpFile to write contents
	 *      c. partition by key, then write tmpFile
	 */

	var tmpFiles  [] *os.File = make([] *os.File, nReduce)
	var encoders    [] *json.Encoder = make([] *json.Encoder, nReduce)

	for i := 0; i < nReduce; i++ {
		tmpFileName := reduceName(jobName,mapTaskNumber,i)
		tmpFiles[i],err = os.Create(tmpFileName)
		if err!=nil {
			log.Fatal(err)
		}

		defer tmpFiles[i].Close()
		encoders[i] = json.NewEncoder(tmpFiles[i])
		if err!=nil {
			log.Fatal(err)
		}
	}

	for _ , kv := range kvResult {
		hashKey := int(ihash(kv.Key)) % nReduce
		err := encoders[hashKey].Encode(&kv)
		if err!=nil {
			log.Fatal("do map encoders ",err)
		}
	}
}

  doReduce 函数在 common_reduce.go 里面,主要步骤:

  1. 读取 doMap 过程中产生的中间文件;
  2. 按照读取相同文件中的 Key 进新按照字典顺序进行排序;
  3. 遍历读取的 KeyValue,并且调用用户的 Reduce 方法,将计算的结果继续写入到文件中;
func doReduce(
	jobName string, // the name of the whole MapReduce job
	reduceTaskNumber int, // which reduce task this is
	nMap int, // the number of map tasks that were run ("M" in the paper)
	reduceF func(key string, values []string) string,
) {

	// file.Close()

	//setp 1,read map generator file ,same key merge put map[string][]string

	kvs := make(map[string][]string)

	for i := 0; i < nMap; i++ {
		fileName := reduceName(jobName, i, reduceTaskNumber)
		file, err := os.Open(fileName)
		if err != nil {
			log.Fatal("doReduce1: ", err)
		}

		dec := json.NewDecoder(file)

		for {
			var kv KeyValue
			err = dec.Decode(&kv)
			if err != nil {
				break
			}

			_, ok := kvs[kv.Key]
			if !ok {
				kvs[kv.Key] = []string{}
			}
			kvs[kv.Key] = append(kvs[kv.Key], kv.Value)
		}
		file.Close()
	}

	var keys []string

	for k := range kvs {
		keys = append(keys, k)
	}

	//setp 2 sort by keys
	sort.Strings(keys)

	//setp 3 create result file
	p := mergeName(jobName, reduceTaskNumber)
	file, err := os.Create(p)
	if err != nil {
		log.Fatal("doReduce2: ceate ", err)
	}
	enc := json.NewEncoder(file)

	//setp 4 call user reduce each key of kvs
	for _, k := range keys {
		res := reduceF(k, kvs[k])
		enc.Encode(KeyValue{k, res})
	}

	file.Close()
}

  Merge 过程

  当然最后就是将每个 Reduce 产生的结果进行一个Merge 的过程,在 merge 的过程中,同样也是需要进行按照 Key 进行字典顺序排列,然后写入到最终的文件中。代码跟 reduce 还是相似的,这里就不自爱赘述了。

  使用 go 的多线程来实现分布式的任务执行,这里主要是是 schedule.go 里面的 schedule 方法,主要是步骤:

  1. 通过不同的阶段( Map or Reduce ),获取到需要执行多少个 map (reduce),然后调用远程的 worker.go 里面的 DoTask 方法;
  2. 等待所有的任务完成,然后才结束。这里主要使用了go 语言的一些特性,Go RPC documentationConcurrency in Go
func (mr *Master) schedule(phase jobPhase) {
	var ntasks int
	var nios int // number of inputs (for reduce) or outputs (for map)
	switch phase {
	case mapPhase:
		ntasks = len(mr.files)
		nios = mr.nReduce
	case reducePhase:
		ntasks = mr.nReduce
		nios = len(mr.files)
	}

	fmt.Printf("Schedule: %v %v tasks (%d I/Os)\n", ntasks, phase, nios)

	//use go routing,worker rpc executor task,
	done := make(chan bool)
	for i := 0; i < ntasks; i++ {
		go func(number int) {

			args := DoTaskArgs{mr.jobName, mr.files[ntasks], phase, number, nios}
			var worker string
			reply := new(struct{})
			ok := false
			for ok != true {
				worker = <- mr.registerChannel
				ok = call(worker, "Worker.DoTask", args, reply)
			}
			done <- true
			mr.registerChannel <- worker
		}(i)

	}

	//wait for  all task is complate
	for i := 0; i< ntasks; i++ {
		<- done
	}
	fmt.Printf("Schedule: %v phase done\n", phase)
}

到站

  • 运行测试:

运行测试

  • 测试结果:

测试结果

  • 测试倒排结果:

倒排索引结果

Documentation

Overview

Package mapreduce provides a simple mapreduce library with a sequential implementation. Applications should normally call Distributed() [located in master.go] to start a job, but may instead call Sequential() [also in master.go] to get a sequential execution for debugging purposes.

The flow of the mapreduce implementation is as follows:

  1. The application provides a number of input files, a map function, a reduce function, and the number of reduce tasks (nReduce).

  2. A master is created with this knowledge. It spins up an RPC server (see master_rpc.go), and waits for workers to register (using the RPC call Register() [defined in master.go]). As tasks become available (in steps 4 and 5), schedule() [schedule.go] decides how to assign those tasks to workers, and how to handle worker failures.

  3. The master considers each input file one map tasks, and makes a call to doMap() [common_map.go] at least once for each task. It does so either directly (when using Sequential()) or by issuing the DoJob RPC on a worker [worker.go]. Each call to doMap() reads the appropriate file, calls the map function on that file's contents, and produces nReduce files for each map file. Thus, there will be #files x nReduce files after all map tasks are done:

    f0-0, ..., f0-0, f0-<nReduce-1>, ..., f<#files-1>-0, ... f<#files-1>-<nReduce-1>.

  4. The master next makes a call to doReduce() [common_reduce.go] at least once for each reduce task. As for doMap(), it does so either directly or through a worker. doReduce() collects nReduce reduce files from each map (f-*-<reduce>), and runs the reduce function on those files. This produces nReduce result files.

  5. The master calls mr.merge() [master_splitmerge.go], which merges all the nReduce files produced by the previous step into a single output.

  6. The master sends a Shutdown RPC to each of its workers, and then shuts down its own RPC server.

TODO: You will have to write/modify doMap, doReduce, and schedule yourself. These are located in common_map.go, common_reduce.go, and schedule.go respectively. You will also have to write the map and reduce functions in ../main/wc.go.

You should not need to modify any other files, but reading them might be useful in order to understand how the other methods fit into the overall architecture of the system.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func RunWorker

func RunWorker(MasterAddress string, me string,
	MapFunc func(string, string) []KeyValue,
	ReduceFunc func(string, []string) string,
	nRPC int,
)

RunWorker sets up a connection with the master, registers its address, and waits for tasks to be scheduled.

Types

type DoTaskArgs

type DoTaskArgs struct {
	JobName    string
	File       string   // the file to process
	Phase      jobPhase // are we in mapPhase or reducePhase?
	TaskNumber int      // this task's index in the current phase

	// NumOtherPhase is the total number of tasks in other phase; mappers
	// need this to compute the number of output bins, and reducers needs
	// this to know how many input files to collect.
	NumOtherPhase int
}

DoTaskArgs holds the arguments that are passed to a worker when a job is scheduled on it.

type KeyValue

type KeyValue struct {
	Key   string
	Value string
}

KeyValue is a type used to hold the key/value pairs passed to the map and reduce functions.

type Master

type Master struct {
	sync.Mutex
	// contains filtered or unexported fields
}

Master holds all the state that the master needs to keep track of. Of particular importance is registerChannel, the channel that notifies the master of workers that have gone idle and are in need of new work.

func Distributed

func Distributed(jobName string, files []string, nreduce int, master string) (mr *Master)

Distributed schedules map and reduce tasks on workers that register with the master over RPC.

func Sequential

func Sequential(jobName string, files []string, nreduce int,
	mapF func(string, string) []KeyValue,
	reduceF func(string, []string) string,
) (mr *Master)

Sequential runs map and reduce tasks sequentially, waiting for each task to complete before scheduling the next.

func (*Master) CleanupFiles

func (mr *Master) CleanupFiles()

CleanupFiles removes all intermediate files produced by running mapreduce.

func (*Master) Register

func (mr *Master) Register(args *RegisterArgs, _ *struct{}) error

Register is an RPC method that is called by workers after they have started up to report that they are ready to receive tasks.

func (*Master) Shutdown

func (mr *Master) Shutdown(_, _ *struct{}) error

Shutdown is an RPC method that shuts down the Master's RPC server.

func (*Master) Wait

func (mr *Master) Wait()

Wait blocks until the currently scheduled work has completed. This happens when all tasks have scheduled and completed, the final output have been computed, and all workers have been shut down.

type RegisterArgs

type RegisterArgs struct {
	Worker string
}

RegisterArgs is the argument passed when a worker registers with the master.

type ShutdownReply

type ShutdownReply struct {
	Ntasks int
}

ShutdownReply is the response to a WorkerShutdown. It holds the number of tasks this worker has processed since it was started.

type Worker

type Worker struct {
	sync.Mutex

	Map    func(string, string) []KeyValue
	Reduce func(string, []string) string
	// contains filtered or unexported fields
}

Worker holds the state for a server waiting for DoTask or Shutdown RPCs

func (*Worker) DoTask

func (wk *Worker) DoTask(arg *DoTaskArgs, _ *struct{}) error

DoTask is called by the master when a new task is being scheduled on this worker.

func (*Worker) Shutdown

func (wk *Worker) Shutdown(_ *struct{}, res *ShutdownReply) error

Shutdown is called by the master when all work has been completed. We should respond with the number of tasks we have processed.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL