Distributed Systems课程一共包含Lab1-4共4个大作业,Lab1是实现Mapreduce原型,Lab2-4是实现Raft以及基于Raft实现分布式KV存储。
本次实现MapReduce,框架代码来自 git://g.csail.mit.edu/6.824-golabs-2020 6.824
MapReduce: Simplified Data Processing on Large Clusters is the publication that talks about MapReduce and what it means for computing in general. It is in the required reading. I will read it after video one.
map 负责分发。每个map任务通常处理一个文件,有多少个输入文件就有多少个map任务。而输出则根据reduce的数目确定有多少个输出。注意,相同的key肯定是输出到同一个reduce文件中,通过哈希算法来确认一个key应该分发到哪个中间文件。targetReduct = ihash(key) % nReduce。
总的中间文件数为 nMap * nReduce
reduce负责对map输出的文件进行处理,每个reduce处理自己负责的那些中间文件。负责处理的中间文件名可以由map个数和reduce number来确认。
MapReduce is essentially splitting a problem into smaller sets based on some criteria, and then assembling the solution from partial solutions. Map a function onto a list of inputs, and then reduce the results into solutions.
The example he shows is a word count functionality. Given a set of files, how would you give a word count across them?
1 | # this isn't executable code btw. Just me thinking out loud. |
The way this would work is:
So this could be scaled by splitting the problem across CPUs or machines themselves.
Makes sense. I guess that is why it is used with Hadoop. Hadoop is a joint filesystem that enables such a mechanism, especially when you’d need to process actual files.
@1h9m Aha! So Hadoop came out of a necessity to facilitate MapReduce. Google File System, huh? That makes sense. Having a network file system that enables splitting a huge file and saving it across servers makes so much sense if you operate like this. And this also comes with data safety guarantees. Hadoop naturally has data backup guarantees and replications (3 is default, I think)
@1h12m GFS would schedule the map task where the data chunks were for network efficiency? That makes sense because the map job packet size must be much lower than the data that it operates on. This is good. Helps me understand what the heck is going on with Hadoop.
Map stores its output on the disk of the machine that it executes on. But to group together all the values associated to a key and then to reduce them on a separate machine, they need to be moved later. So the row-wise data of say, words in a word count dictionary, would have to be converted to a columnar dataset of words and their counts as opposed to a collection of different words and their individual counts across different files.
Say:
1 | # again, don't try to run this. |
This would then need to be turned into 3 reduce jobs. One for the key “cat”, one for “dog” and one for “elephant” across these two outputs.
Huh. How would you do sorting in MapReduce? Wouldn’t you need to know where something would appear? Perhaps split large arrays and sort sections and then sort those again? There was an algorithm for that, I think. I had watched some animation that showed this.
Chaining together MapReduce seems to be a normal procedure. I suppose sorting could operate like that.
原始代码有两个目录:main和mapreduce。其中main目录下只需要关注wc.go,ii.go以及一系列的txt文件
。 mapreduce目录则需要修改schedule.go
,common_map.go
和common_reduce.go
文件。
这两个实验都比较简单,单机的。Part 1实现common_map.go和common_reduce.go中的通用函数,而Part 2也只是统计单词数目。Part 2这里要注意的是,要使用实验文档中说的分割方法unicode.IsLetter()
去分割单词,不然测试会无法通过。
采用的分布式执行任务。先是master启动rpc服务,并调用schedule函数执行Task。注意的是,Worker启动后需要调用master的RPC方法Register注册到master,而master发现有了新的worker,会通知等待channel的协程进行任务操作,同一个worker需要处理多个任务,但是不能同时给一个worker分配多个任务。
注意在run中,先做map任务,做完map再做reduce。所以在多个worker做map任务的时候,需要等所有的map任务完成再继续reduce任务。这里用了 WaitGroup
。
另外,分配worker的算法要注意,是每次有新的可能会分配到,而老的worker如果执行完了一次任务,则也要放回channel中以重复使用。
worker失败的情况下的实验,也比较简单,在每个任务执行时加个for循环,如果成功则退出,否则重新取worker执行任务。
Part 5需要完成一个倒排索引,完成ii.go即可,跟wc.go类似,主要是要去掉重复以及对倒排列表排序就可以通过测试了。
Master 实现于 mr/master.go,由 main/mrmaster.go 驱动,通过命令行参数传入输入文件:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51// 分发给 woker 的任务
type Task struct {
Phase // 标注是 Map 还是 Reduce
Misson
R int // Reduce 数量
}
type Master struct {
mMisson []Misson // 待分配到任务队列
rMisson []Misson
mTasks []taskState // 维护 woker 的状态
rTasks []taskState
mCompleted int // woker 的完成数量
rCompleted int
}
func (m *Master) Assign(_ *Task, t *Task) error {
// ...
if len(m.mMisson) > 0 {
// ...
// t.Id == t.Mission.Id
// 保证同一个任务所对应的 task 在数组中的下标是一样的
m.mTasks[t.Id] = taskState{false, timer}
go m.mapCrush(t)
} else if m.mapDone() && len(m.rMisson) > 0 {
// ...
// t.Id == t.Mission.Id
// 保证同一个任务所对应的 task 在数组中的下标是一样的
m.rTasks[t.Id] = taskState{false, timer}
go m.reduceCrush(t)
} else {
// 通知 worker sleep 一会
t.Phase = Idle
}
// ...
}
func MakeMaster(files []string, nReduce int) *Master {
// ...
m.mTasks = make([]taskState, len(files)) // 空间开足,之后 index 访问
for i := range files {
// 初始化 Mission.Id,和 task 数组下标对应上
m.mMisson = append(m.mMisson, Misson{i, files[i:i+1]})
}
m.rTasks = make([]taskState, nReduce) // 空间开足,之后 index 访问
for i := 0; i < nReduce; i++ {
// 初始化 Mission.Id,和 task 数组下标对应上
m.rMisson = append(m.rMisson, Misson{i, nil})
}
// ...
}
因为Mission.Id
和taskState
数组是完全对应的,所以可以保证 worker 超时后将任务分配给别的 worker 是安全的,如果同时接到来自不同 worker 对同一个任务的反馈,只会被确认一次
Ugh, the CSS in the Labs pages is so horrible for accessibility. I cannot read the code snippets either. I love the course, but whoever made the webpages did not care a bit about accessibility or design of an interface.
1 | git clone git://g.csail.mit.edu/6.824-golabs-2020 6.824 |
I’ve cloned this repo. Apparently this one lecture is enough to get started. The course does recommend golang, but I am going to try some rudimentary stuff with Python, and I will get around to golang later, once I learn it.