什么是Map-Reduce呢?

Map指的是一个形如下面定义的函数。

def Map(k, v): //return [(k1, v1), (k2, v2), (k3, v3), ...]
pass

它接受一个key和一个value,返回一组所谓的中间值。注意,返回的不是一个dict,所以k1可能等于k2。

Reduce指的是一个形如下面定义的函数。

def Reduce(k, [v1, v2, v3, ....])://return v
pass

它接受一个key和该key对应的所有在Map函数中返回的value,返回k值的最终结果。所谓Reduce,就是把多个值“消减”为一个。

那么Map-Reduce作为一个框架,通常是这样运行的。

  • 多个节点在分别,独立地执行Map函数。

Map1 ----> (k1, v1), (k2, v2), (k4, v4), (k8, v8)

Map2 ----> (k1, v9), (k4, v10)

Map3 ----> (k8, v11)

  • 然后,多个节点分别,独立地执行Reduce函数。

Reduce1 (k1, [v1, v9])     ----> (k1, <some value>)

Reduce2 (k2, [v2])           ----> (k2, <some value>)

Reduce3 (k4, [v4, v10])   ---->  (k4, <some value>)

Reduce4 (k8, [v8, v11])   ---->  (k8, <some value>)

框架保证了Map们独立地跑,并且每个Reduce接收到的参数,都是经过“总结“不同map的返回值得到的。程序员只需要实现自己的Map和Reduce函数就行了。一个典型的应用就是word count。

Map-Reduce有很多优点,主要就是节点之间在执行函数的过程中并不要通信,而且有很好的扩展。

Google的Map-Reduce实现

有几个要点。

1. 输入和输出以文件的形式放在GFS上。这个文件系统有个特点,虽然是分布式系统,但是写文件和创建新文件是原子操作。

2. 输入的所有文件被分成M份,存放在GFS上。

3. 一个Master节点控制M个worker执行Map操作。v = 其中的一个文件内容。

4. Map操作完成的结果被存放在本地磁盘上,并将存放位置告诉Master

5. 完成所有Map操作以后,Master并行地执行R个独立的Reduce操作。

6. Map生成的中间值的k传给了,第 hash(k) % R个Reduce worker操作。

容错

1. Master周期性ping 所有worker,如果失败,新的闲置worker将重新执行失败的worker上的Map/Reduce操作。

2. Map/Reduce函数都是可以重入的,他们的中间结果,不会被其他节点看见,除非完成。

3. Worker的数目小于需要执行的M或者R

Lab1

这个课程所有Lab都是用Go语言实现的。答案用的Go 1.7,我用了1.8,发现有一些不一样,待会说。

首先git clone这个项目。

git clone git://g.csail.mit.edu/6.824-golabs-2017 6.824

你会看到一个src 文件目录,里面包含各个Lab的子目录。

在MapReduce这个实验,几乎忠实地按照论文 MapReduce: Simplified Data Processing on Large Clusters 实现了Google的MapReduce框架。完成实验对理解论文内容帮助蛮大。

理解这个Go语言实现的框架不是太难,就不笔记了。代码都有。有一点在Go 1.8里要注意。

在common_map.go和common_reduce.go中,需要写文件存放中间结果,comment推荐使用json格式,并使用如下代码。

// enc := json.NewEncoder(file)
// for key := ... {
// enc.Encode(KeyValue{key, reduceF(...)})
// }
// file.Close()

如果你和我一样使用的1.8,并且在doReduce的实现中按照上面的方法去写文件,第一个task是怎么都过不了的。为啥呢?

在Go1.8中,如果按照上面的Encode json,生成的文件是如下的格式

[{key:value}, {key: value}, ....]

打开master_splitmerge.go文件,看一下merge函数,发现他在一个一个decode这些KeyValue,类似下面这样

// deco := json.NewDecoder(file)
// for key := ... {
// deco.Decode(&kv) <---这个就不行
// }
// file.Close()

而在merge函数中的Decode 居然在遇到"["的时候报错=_=b。不知道是不是encoder和decoder在1.8被deprecate,否则以同样方式写却没办法以同样方式读。

建议改用json.marshal/unmarshal

Part I: Map/Reduce input and output

这个就是让你实现doMap和doReduce。注意不是Map和Reduce函数,而是call 客户实现的Map和Reduce函数的逻辑。

首先是doMap。

该函数读取某一个输入文件的内容,然后调用客户实现的Map函数,再把返回值写到文件中。下面是我实现的代码部分。

bytes, err := ioutil.ReadFile(inFile)
if err != nil {
fmt.Errorf("doMap failed to read file %s due to %s", inFile, err)
return
}
contents := string(bytes)
kvs := mapF(inFile, contents) if len(kvs) == 0 {
return
} sort.Slice(kvs, func(i, j int) bool {
return ihash(kvs[i].Key) % nReduce < ihash(kvs[j].Key) % nReduce
}) var pr int = -1
var pkvs []KeyValue
for _, kv := range kvs {
r := ihash(kv.Key) % nReduce
if r != pr {
writeOutKeyValueArrays(jobName, mapTaskNumber, pr, pkvs)
pr = r
pkvs = nil
}
pkvs = append(pkvs, kv)
}
//write last one
writeOutKeyValueArrays(jobName, mapTaskNumber, pr, pkvs)

mapF就是客户的业务逻辑,注意到它返回一个(k, v)的list。这个中间值需要放到文件中。不过放文件,按照规则,需要根据k的hash值,分别放到R个(nReduce个)文件中。由于一共有M个doMap执行,实际上生成的中间文件有M x R个。这个框架生成的中间文件类似这样的格式。

mrtmp.xxx-0-0
mrtmp.xxx-0-1
mrtmp.xxx-0-2
mrtmp.xxx-1-0
mrtmp.xxx-1-1
mrtmp.xxx-1-2

这表示2 (M)X 3(R)

为了节省时间,避免频繁打开关闭文件,我们可以先按照k的hash值排序一下,然后依次打开文件存放它们。

再看一下doReduce函数。一共R个doReduce函数的执行,第r个会读取mrtmp.xxx-*-r 文件。例如,第1个doReduce要读的文件就是mrtmp.xxx-0-1和mrtmp.xxx-1-1。它会读取这些文件中产生的中间值,然后合并同样的key所对应的value们,并传入到客户定义的Reduce函数中。将返回值写到结果文件中。

var aOfKV []KeyValue
for i := 0; i < nMap; i++ {
var kvs []KeyValue
inFilename := reduceName(jobName, i, reduceTaskNumber)
inFile, e := ioutil.ReadFile(inFilename)
if e != nil {
fmt.Errorf("Failed to read file %s due to %v", inFilename, e)
continue
}
json.Unmarshal(inFile, &kvs)
aOfKV = append(aOfKV, kvs...)
}
sort.Slice(aOfKV, func(i, j int) bool {
return aOfKV[i].Key < aOfKV[j].Key
}) //end
aOfKV = append(aOfKV, KeyValue{"", ""}) var curKey string
var curValues []string
var outputKVS []KeyValue
for _, kv := range aOfKV {
if kv.Key != curKey {
if len(curKey) > 0 {
outputStr := reduceF(curKey, curValues)
outputKVS = append(outputKVS, KeyValue{curKey, outputStr})
}
curKey = kv.Key
curValues = nil
}
curValues = append(curValues, kv.Value)
}
marshalledKVs, _ := json.Marshal(outputKVS)
err := ioutil.WriteFile(outFile, marshalledKVs, 0644)
if err != nil {
log.Fatal("Failed to write reduce file:", err)
}

注意这里使用了类似的doMap的逻辑:先排序,再依次合并。所以在所有doReduce执行完成以后,应该有R个最后输出文件,然后框架会调用merge函数做合并。

Part II: Single-worker word count

让你实现word count,没啥说的。就是看一下strings里面的FieldsFunc。

在Map函数中,文件内容都传给你了,要做的就是份成一个词一个词。

func mapWF(filename string, contents string) []mapreduce.KeyValue {
f := func(c rune) bool {
return !unicode.IsLetter(c) && !unicode.IsNumber(c)
}
var kvs []mapreduce.KeyValue
for _, w := range strings.FieldsFunc(contents, f) {
kvs = append(kvs, mapreduce.KeyValue{w, ""})
}
return kvs
}

在Reduce函数中,传进来已经是所有的词的出现了。看一下value 的len好了。

func mapWF(filename string, contents string) []mapreduce.KeyValue {
f := func(c rune) bool {
return !unicode.IsLetter(c) && !unicode.IsNumber(c)
}
var kvs []mapreduce.KeyValue
for _, w := range strings.FieldsFunc(contents, f) {
kvs = append(kvs, mapreduce.KeyValue{w, ""})
}
return kvs
}

Part III: Distributing MapReduce tasks/Part IV: Handling worker failures

我们之前实现的doMap和doReduce都是串行地执行的,也就是,每个Map依次执行,完成以后,再依次执行Reduce。这个从源代码里面的master.go的Sequential函数很容易看出来。

在该函数里面,对每一个input file,都依次调用doMap,在reducePhase,则是把中间结果(就是上面的 mrtmp.xxx-*-*)依次传给doReduce。

// Sequential runs map and reduce tasks sequentially, waiting for each task to
// complete before running the next.
func Sequential(jobName string, files []string, nreduce int,
mapF func(string, string) []KeyValue,
reduceF func(string, []string) string,
) (mr *Master) {
mr = newMaster("master")
go mr.run(jobName, files, nreduce, func(phase jobPhase) {
switch phase {
case mapPhase:
for i, f := range mr.files {
doMap(mr.jobName, i, f, mr.nReduce, mapF)
}
case reducePhase:
for i := 0; i < mr.nReduce; i++ {
doReduce(mr.jobName, i, mergeName(mr.jobName, i), len(mr.files), reduceF)
}
}
}, func() {
mr.stats = []int{len(files) + nreduce}
})
return
}

而这个任务,就是要把串行执行的Map和Reduce,变成并行的。当然,整个Map-Reduce还是运行在单机上的,只不过用了go里面的RPC来模拟论文中master和worker之间的通信。我们可以看看框架已经为我们做了什么。

// Distributed schedules map and reduce tasks on workers that register with the
// master over RPC.
func Distributed(jobName string, files []string, nreduce int, master string) (mr *Master) {
mr = newMaster(master)
mr.startRPCServer()
go mr.run(jobName, files, nreduce,
func(phase jobPhase) {
ch := make(chan string)
go mr.forwardRegistrations(ch)
schedule(mr.jobName, mr.files, mr.nReduce, phase, ch)
},
func() {
mr.stats = mr.killWorkers()
mr.stopRPCServer()
})
return
}

首先,master启动一个RPC server。在这个master server中,它会暴露一个叫做”Register“的函数。这个Register函数的主要作用就是接收worker的Register(好废话),把worker(它其实也是一个RPC server)的RPC地址,通过调用RPC 调用master上的Register,放到master的一个list里面。

// Register is an RPC method that is called by workers after they have started
// up to report that they are ready to receive tasks.
func (mr *Master) Register(args *RegisterArgs, _ *struct{}) error {
mr.Lock()
defer mr.Unlock()
debug("Register: worker %s\n", args.Worker)
mr.workers = append(mr.workers, args.Worker) // tell forwardRegistrations() that there's a new workers[] entry.
mr.newCond.Broadcast() return nil
}

接下来,forwardRegistrations把master中的registered worker从list中提取出来,依次发送到channel。

而我们要实现的具体代码在schedule.go里面。其中schedule函数需要完成指定数目的task。指定数目指的是,如果是做的Map 阶段,那指定数目就是输入文件的数目M;如果是在Reduce阶段,就是R。那么schedule应该有下面的步骤:

  1. 如果指定数目的task还没有完成那么 从channel中提取一个注册的worker RPC的地址;否则3。
  2. 异步调用该worker上的DoTask方法。 在DoTask 方法中,已经有对doMap和doReduce的调用的逻辑,DoTask成功返回以后,重新把worker放回到channel,以待重用。goto 1。
  3. 等待所有task完成后退出。

在2中,有一个限制,我们不能给同一个worker RPC 地址发送多次DoTask请求。要实现这个,我们可以在把worker从channel提取出来以后,不放回去,然后待到DoTask成功返回后,才放入待重用。

另外,在Task 4中,当发送一个DoTask请求以后,该请求可能超时或者失败。这时候需要重试。我们的想法是在在第二中的异步调用逻辑里面,如果DoTask 返回失败,不把worker重新放入channel,而是直接丢弃。再从channel中取一个worker。

func schedule(jobName string, mapFiles []string, nReduce int, phase jobPhase, registerChan chan string) {
var ntasks int
var n_other int // number of inputs (for reduce) or outputs (for map)
switch phase {
case mapPhase:
ntasks = len(mapFiles)
n_other = nReduce
case reducePhase:
ntasks = nReduce
n_other = len(mapFiles)
} fmt.Printf("Schedule: %v %v tasks (%d I/Os)\n", ntasks, phase, n_other) // All ntasks tasks have to be scheduled on workers, and only once all of
// them have been completed successfully should the function return.
// Remember that workers may fail, and that any given worker may finish
// multiple tasks.
//
doneCh := make(chan int)
doneCnt := 0
var mux sync.Mutex
for nt := 0; nt < ntasks; nt++ {
go func(fileIndex int) {
for {
workerName := <-registerChan
ok := call(workerName,
"Worker.DoTask",
DoTaskArgs{jobName, mapFiles[fileIndex], phase, fileIndex, n_other},
nil)
if ok == false {
fmt.Printf("Failed to RPC call %v at %d, retrying\n", workerName, nt)
continue
}
mux.Lock()
doneCnt += 1
if doneCnt == ntasks {
doneCh <- 1
}
mux.Unlock()
registerChan <- workerName
break
} } (nt)
}
<-doneCh
fmt.Printf("Schedule: %v phase done\n", phase)
}

注意这个nt一定要传到goroutine里面去,而不是作为一个闭包变量,因为它在循环中会变。

Part V: Inverted index generation (optional for extra credit)

这又是一个Map-Reduce应用。是让你看某个单词在哪些文档里面出现过。

同样的,在Map函数中分词,词作为key,而value是文档名字。

而Reduce函数中,仅仅是把文档列出来,配上文档数目。代码比较简单就不贴了。

第一次做课程,觉得作业设计上很用心。

最新文章

  1. oracle生成单据号
  2. Socket实现仿QQ聊天(可部署于广域网)附源码(2)-服务器搭建
  3. IMS Global Learning Tools Interoperability™ Implementation Guide
  4. How Tomcat Works(十一)
  5. 使用node的http模块实现爬虫功能,并把爬到的数据存入mongondb
  6. Android AlarmManager的取消
  7. oralce11 过程
  8. 在Ubuntu12.0.4下搭建TFTP服务器
  9. (转)HiddenField控件的使用
  10. Rainmeter 雨滴桌面 主题分享
  11. 初学swift笔记 方法(九)
  12. js实现无限级树形导航列表
  13. ReactNative学习之css样式使用
  14. 【WebGL入门】画一个旋转的cube
  15. Windows 10创意者更新ISO发布!官方下载
  16. 应对WannaCry勒索危机之关闭445端口等危险端口——以本人Windows7系统为例
  17. 多线程编程(六)-Executor与ThreadPoolExecutor的使用
  18. (纪录片)光的故事 BBC Light Fantastic (2004)
  19. sicily 1046. Plane Spotting(排序求topN)
  20. 在activity之间传递数据

热门文章

  1. selenium+phantomjs渲染网页
  2. Python中的类(中)
  3. grid网格的流动定位
  4. linux共享内存的查看与删除
  5. NFS 简介
  6. C++ template —— 模板与继承(八)
  7. web移动前端页面,jquery判断页面滑动方向
  8. iOS - UITextView放在自定义cell里面-自适应高度
  9. Stratix内嵌存储器测试报告
  10. CASE:DB shutdown/open 过程中发生异常导致JOB不能自动执行