Introduction
在本實(shí)驗(yàn)中,將用Go編程構(gòu)建一個(gè)MapReduce庫(kù)焦匈。在第一部分中,將編寫(xiě)一個(gè)簡(jiǎn)單的MapReduce程序昵仅。在第二部分中,將編寫(xiě)一個(gè)Master累魔,將任務(wù)分發(fā)給MapReduce的worker摔笤,并處理worker的失敗。庫(kù)的接口和容錯(cuò)方法類(lèi)似于MapReduce論文 中的描述垦写。
Software
代碼倉(cāng)庫(kù)的URL是 git://g.csail.mit.edu/6.824-golabs-2018
$ git clone git://g.csail.mit.edu/6.824-golabs-2018 6.824
$ cd 6.824
$ ls
Makefile src
Preamble
mapreduce包提供了1個(gè)簡(jiǎn)單的Map/Reduce庫(kù)的串行實(shí)現(xiàn)吕世。正常應(yīng)用應(yīng)該調(diào)用Distributed函數(shù)[master.go]來(lái)啟動(dòng)1個(gè)任務(wù),但是可以通過(guò)調(diào)用Sequential函數(shù)[master.go]來(lái)進(jìn)行debug梯投。
$ go test -run Sequential
- mapreduce實(shí)現(xiàn)流程:
- 應(yīng)用提供一些輸入文件命辖,1個(gè)map函數(shù),1個(gè)reduce函數(shù)分蓖,reduce worker的數(shù)目(nReduce)尔艇。
- 建立1個(gè)master節(jié)點(diǎn),它啟動(dòng)1個(gè)RPC server(master_rpc.go)么鹤,然后等待worker來(lái)注冊(cè)(使用RPC 調(diào)用 Register函數(shù)[master.go]). 當(dāng)worker可用時(shí)(在第4终娃、5部分),schedule函數(shù)[schedule.go]決定如何分配任務(wù)到worker以及如何處理worker的failures蒸甜。
- master節(jié)點(diǎn)認(rèn)為每個(gè)輸入文件對(duì)應(yīng)1個(gè)map任務(wù)棠耕,為每個(gè)任務(wù)至少調(diào)用1次doMap函數(shù)[common_map.go]。每次調(diào)用doMap函數(shù)會(huì)讀取合適的文件柠新,并調(diào)用map函數(shù)來(lái)處理文件內(nèi)容窍荧,為每個(gè)map文件生成nReduce個(gè)文件。
- master節(jié)點(diǎn)接下去為每個(gè)reduce任務(wù)至少調(diào)用1次doReduce函數(shù)[common_reduce.go]恨憎。doReduce函數(shù)收集nReduce個(gè)reduce文件蕊退,然后調(diào)用reduce函數(shù)處理這些文件,產(chǎn)生nReduce個(gè)結(jié)果文件框咙。
- master節(jié)點(diǎn)調(diào)用mr.merge函數(shù)[master_splitmerge.go]咕痛,來(lái)整合nReduce個(gè)結(jié)果文件成1個(gè)最終文件
- master節(jié)點(diǎn)發(fā)送1個(gè)Shutdown的RPC調(diào)用到每個(gè)worker,來(lái)關(guān)閉它們的RPC server喇嘱。
Part I: Map/Reduce input and output
給出的Map / Reduce實(shí)現(xiàn)缺少一些部分茉贡。在編寫(xiě)第一個(gè)Map / Reduce函數(shù)對(duì)之前,需要修復(fù)順序?qū)崿F(xiàn)者铜。特別是腔丧,給出的代碼缺少兩個(gè)關(guān)鍵部分:分割map任務(wù)輸出的函數(shù)放椰,以及收集reduce任務(wù)的所有輸入的函數(shù)。這些任務(wù)分別由common_map.go中的doMap()
函數(shù)和common_reduce.go中的doReduce()
函數(shù)執(zhí)行愉粤。
- 測(cè)試
$ cd 6.824
$ export "GOPATH=$PWD" # go needs $GOPATH to be set to the project's working directory
$ cd "$GOPATH/src/mapreduce"
$ go test -run Sequential
ok mapreduce 2.694s
- 實(shí)現(xiàn)
在common_map.go文件中有關(guān)于doMap函數(shù)功能的描述注釋?zhuān)饕僮魇谴蜷_(kāi)文件名為inFile的輸入文件砾医,讀取文件內(nèi)容,然后調(diào)用mapF函數(shù)來(lái)處理內(nèi)容衣厘,返回值為KeyVaule結(jié)構(gòu)體[common.go]實(shí)例如蚜,然后生成nReduce個(gè)中間文件,提示使用json格式寫(xiě)入影暴。
doMap實(shí)現(xiàn):
file, err := os.Open(inFile)
if err != nil {
log.Fatal("ERROR[doMap]: Open file error ", err)
}
defer file.Close()
// 獲取文件狀態(tài)信息
fileInfo, err := file.Stat()
if err != nil {
log.Fatal("ERROR[doMap]: Get file state error ", err)
}
// 讀文件
fileSize := fileInfo.Size()
buffer := make([]byte, fileSize)
_, err = file.Read(buffer)
if err != nil {
log.Fatal("ERROR[doMap]:Read error ", err)
}
// 處理文件內(nèi)容
middleRes := mapF(inFile, string(buffer))
rSize := len(middleRes)
// 生成中間文件
for i := 0; i < nReduce; i++ {
fileName := reduceName(jobName, mapTask, i)
midFile, err := os.Create(fileName)
if err != nil {
log.Fatal("ERROR[doMap]: Create intermediate file fail ", err)
}
enc := json.NewEncoder(midFile)
for r := 0; r < rSize; r++ {
kv := middleRes[r]
if ihash(kv.Key)%nReduce == i {
err := enc.Encode(&kv)
if err != nil {
log.Fatal("ERROR[doMap]: Encode error: ", err)
}
}
}
midFile.Close()
}
在common_reduce.go文件中有關(guān)于doReduce函數(shù)功能的描述注釋?zhuān)饕僮魇窍葟拿總€(gè)map函數(shù)的輸出文件中獲取該reduce任務(wù)相應(yīng)的中間文件错邦,然后根據(jù)key值進(jìn)行排序,最后調(diào)用reduce函數(shù)來(lái)生成最終的結(jié)果并寫(xiě)入文件型宙。
doReduce實(shí)現(xiàn):
keyValues := make(map[string][]string)
for i := 0; i < nMap; i++ {
fileName := reduceName(jobName, i, reduceTask)
file, err := os.Open(fileName)
if err != nil {
log.Fatal("ERROR[doReduce]: Open error: ", err)
}
dec := json.NewDecoder(file)
for {
var kv KeyValue
err := dec.Decode(&kv)
if err != nil {
break
}
_, ok := keyValues[kv.Key]
if !ok {
keyValues[kv.Key] = make([]string, 0)
}
keyValues[kv.Key] = append(keyValues[kv.Key], kv.Value)
}
file.Close()
}
var keys []string
for k := range keyValues {
keys = append(keys, k)
}
sort.Strings(keys)
mergeFileName := mergeName(jobName, reduceTask)
mergeFile, err := os.Create(mergeFileName)
if err != nil {
log.Fatal("ERROR[doReduce]: Create file error: ", err)
}
enc := json.NewEncoder(mergeFile)
for _, k := range keys {
res := reduceF(k, keyValues[k])
enc.Encode(&KeyValue{k, res})
}
mergeFile.Close()
Part II: Single-worker word count
現(xiàn)在撬呢,你將實(shí)現(xiàn)字?jǐn)?shù)統(tǒng)計(jì) - 一個(gè)簡(jiǎn)單的Map / Reduce示例余佃》峤椋看看main / wc.go
;你會(huì)發(fā)現(xiàn)空的mapF()
和reduceF()
函數(shù)。你的工作是插入代碼做瞪,以便wc.go報(bào)告其輸入中每個(gè)單詞的出現(xiàn)次數(shù)搁嗓。一個(gè)單詞是任何連續(xù)的字母序列芯勘,由unicode.IsLetter
確定。 有些輸入文件的路徑名為pg - * .txt
谱姓,位于?/ 6.824 / src / main借尿。以下是如何使用輸入文件運(yùn)行wc:
$ cd 6.824
$ export "GOPATH=$PWD"
$ cd "$GOPATH/src/main"
$ go run wc.go master sequential pg-*.txt
# command-line-arguments
./wc.go:14: missing return at end of function
./wc.go:21: missing return at end of function
運(yùn)行結(jié)果是編譯失敗,因?yàn)?code>mapF()和reduceF()
未完成屉来。
更簡(jiǎn)單的運(yùn)行方法是使用源代碼提供的測(cè)試腳本:
$ bash ./test-wc.sh
- 實(shí)現(xiàn)
mapF函數(shù)的參數(shù)filename為輸入文件的文件名路翻,contents為文件內(nèi)容,需要實(shí)現(xiàn)生成[word, “1”]這樣的中間結(jié)果茄靠。在main/wc.go中有關(guān)于mapF函數(shù)實(shí)現(xiàn)的注釋茂契。先對(duì)于文件內(nèi)容contents進(jìn)行分割,用strings.FieldsFunc函數(shù)來(lái)分割成單詞慨绳。然后對(duì)于每個(gè)單詞掉冶,將[word,”1”]加入到中間結(jié)果中。
mapF實(shí)現(xiàn):
values := strings.FieldsFunc(contents, func(c rune) bool {
return !unicode.IsLetter(c)
})
res := make([]mapreduce.KeyValue, 0)
for _, v := range values {
res = append(res, mapreduce.KeyValue{v, "1"})
}
return res
對(duì)于reduceF函數(shù)脐雪,參數(shù)key為word厌小,參數(shù)values就是[“1”,”1”, …]形式的字符串切片,主要操作就是統(tǒng)計(jì)該單詞的出現(xiàn)次數(shù)战秋,即累加values中的元素即可璧亚,使用strconv庫(kù)提供的函數(shù)將字符串轉(zhuǎn)換為數(shù)值,最后將統(tǒng)計(jì)和結(jié)果轉(zhuǎn)換為字符串返回脂信。
var sum int
for _, v := range values {
count, err := strconv.Atoi(v)
if err != nil {
log.Fatal("ERROR[reduceF]: atoi failed ", err)
}
sum += count
}
return strconv.Itoa(sum)
Part III: Distributing MapReduce tasks
你當(dāng)前的實(shí)現(xiàn)運(yùn)行map
并一次減少一個(gè)任務(wù)癣蟋。 Map / Reduce最大的賣(mài)點(diǎn)之一是它可以自動(dòng)并行化普通的順序代碼而無(wú)需開(kāi)發(fā)人員的任何額外工作透硝。在本練習(xí)的這一部分中,你將完成一個(gè)MapReduce的版本疯搅,該版本將工作拆分為在多核上并行運(yùn)行的一組工作線程濒生。雖然不像在實(shí)際的Map / Reduce部署中那樣分布在多臺(tái)機(jī)器上,但您的實(shí)現(xiàn)將使用RPC來(lái)模擬分布式計(jì)算幔欧。
為了協(xié)同任務(wù)的并行執(zhí)行罪治,我們將使用1個(gè)特殊的master線程,來(lái)分發(fā)任務(wù)到worker線程并等待它們完成礁蔗。實(shí)驗(yàn)中提供了worker的實(shí)現(xiàn)代碼和啟動(dòng)代碼(mapreduce/worker.go)以及RPC消息處理的代碼(mapreduce/common_rpc.go)规阀。
我們的任務(wù)實(shí)現(xiàn)mapreduce包中的schedule.go
文件,尤其是其中的schedule函數(shù)來(lái)分發(fā)map和reduce任務(wù)到worker瘦麸,并當(dāng)它們完成后才返回。
mr.run函數(shù)[master.go]里面會(huì)調(diào)用schedule函數(shù)來(lái)運(yùn)行map和reduce任務(wù)歧胁,然后調(diào)用merge函數(shù)來(lái)將每個(gè)reduce任務(wù)的結(jié)果文件整合成1個(gè)最終文件滋饲。schedule函數(shù)只需要告訴worker輸入文件的文件名(mr.files[task])和任務(wù)號(hào)。master節(jié)點(diǎn)通過(guò)RPC調(diào)用Worker.DoTask喊巍,傳遞1個(gè)DoTaskArgs對(duì)象作為RPC的參數(shù)來(lái)告訴worker新的任務(wù)屠缭。
當(dāng)1個(gè)worker啟動(dòng)時(shí),它會(huì)發(fā)送1個(gè)注冊(cè)RPC給master崭参,傳遞新worker的信息到mr.registerChannel呵曹。我們的schedule函數(shù)通過(guò)讀取mr.registerChannel來(lái)獲得可用的worker。
- 測(cè)試方法
$ cd 6.824/src/mapreduce
$ go test -run TestParallel
- 實(shí)現(xiàn)
主要過(guò)程是先區(qū)分一下這是map任務(wù)還是reduce任務(wù)何暮,對(duì)于map任務(wù)奄喂,任務(wù)數(shù)ntask為輸入文件的個(gè)數(shù),n_other為reduce worker的數(shù)目nReduce海洼,對(duì)于reduce任務(wù)跨新,任務(wù)數(shù)ntask為reduce worker的數(shù)目nReduce,n_other為map worker的數(shù)目即輸入文件的個(gè)數(shù)坏逢。然后創(chuàng)建1個(gè)同步包sync中的等待組WaitGroup域帐,對(duì)于每個(gè)任務(wù),將其加入到等待組中是整,并運(yùn)行1個(gè)goroutine來(lái)運(yùn)行進(jìn)行分發(fā)任務(wù)肖揣。首先從mr.registerChannel中獲得1個(gè)可用的worker,構(gòu)建DoTaskArgs對(duì)象浮入,作為參數(shù)調(diào)用worker的Worker.DoTask
來(lái)執(zhí)行任務(wù)龙优,當(dāng)其完成任務(wù)后將其重新加入到mr.registerChannel表示可用。最后使用WaitGroup的wait函數(shù)等待所有任務(wù)完成舵盈。因?yàn)橹挥挟?dāng)map任務(wù)都完成后才能執(zhí)行reduce任務(wù)陋率。
schedule()實(shí)現(xiàn):
var wg sync.WaitGroup
for i := 0; i < ntasks; i++ {
wg.Add(1)
go func(taskNum int, n_other int, phase jobPhase) {
defer wg.Done()
worker := <-registerChan
var args DoTaskArgs
args.JobName = jobName
args.File = mapFiles[taskNum]
args.Phase = phase
args.TaskNumber = taskNum
args.NumOtherPhase = n_other
ok := call(worker, "Worker.DoTask", &args, new(struct{}))
if ok {
go func() {
registerChan <- worker
}()
}
}(i, n_other, phase)
}
wg.Wait()
fmt.Printf("Schedule: %v done\n", phase)
Part IV: Handling worker failures
在這部分中球化,你需要讓master處理失敗的worker。 MapReduce使這相對(duì)容易瓦糟,因?yàn)閣orker沒(méi)有持久狀態(tài)筒愚。如果工作程序在從master處理RPC時(shí)失敗,則master的call()最終會(huì)因超時(shí)而返回false
菩浙。在這種情況下巢掺,master應(yīng)該將失敗worker的任務(wù)重新分配給另一個(gè)worker。 RPC故障并不一定意味著worker沒(méi)有執(zhí)行任務(wù);可能是worker已經(jīng)執(zhí)行了但是回復(fù)丟失了劲蜻,或者worker可能仍在執(zhí)行但master的RPC超時(shí)陆淀。因此,可能會(huì)發(fā)生兩個(gè)worker收到相同的任務(wù)先嬉,計(jì)算它并生成輸出轧苫。MapReduce框架確保map和reduce函數(shù)輸出以原子方式顯示:輸出文件不存在,或者將包含map或reduce函數(shù)的單個(gè)執(zhí)行的整個(gè)輸出疫蔓。
我們的任務(wù)是修改mapreduce包中的schedule.go
文件含懊,使其具有簡(jiǎn)單的容錯(cuò)性。使master節(jié)點(diǎn)能處理worker的宕機(jī)衅胀。當(dāng)1個(gè)worker宕機(jī)時(shí)岔乔,master發(fā)送的RPC都會(huì)失敗,那么久需要重新安排任務(wù)滚躯,將宕機(jī)worker的任務(wù)分配給其它worker雏门。
RPC的失敗并不是表示worker的宕機(jī),worker可能只是網(wǎng)絡(luò)不可達(dá)掸掏,仍然在工作計(jì)算茁影。所以如果重新分配任務(wù)可能造成2個(gè)worker接受相同的任務(wù)并計(jì)算。但是這沒(méi)關(guān)系阅束,因?yàn)橄嗤娜蝿?wù)生成相同的結(jié)果呼胚。我們只要實(shí)現(xiàn)重新分配任務(wù)即可。
- 測(cè)試方法
$ cd 6.824/src/mapreduce
$ go test -run Failure
- 實(shí)現(xiàn)
使用無(wú)限for循環(huán)中息裸,當(dāng)RPC的call失敗時(shí)蝇更,僅僅就是重新選取1個(gè)worker,只有當(dāng)成功時(shí)呼盆,才會(huì)break年扩。
schedule()實(shí)現(xiàn):
var wg sync.WaitGroup
for i := 0; i < ntasks; i++ {
wg.Add(1)
go func(taskNum int, n_other int, phase jobPhase) {
defer wg.Done()
for {
worker := <-registerChan
var args DoTaskArgs
args.JobName = jobName
args.File = mapFiles[taskNum]
args.Phase = phase
args.TaskNumber = taskNum
args.NumOtherPhase = n_other
ok := call(worker, "Worker.DoTask", &args, new(struct{}))
if ok {
go func() {
registerChan <- worker
}()
break
}
}
}(i, n_other, phase)
}
wg.Wait()
fmt.Printf("Schedule: %v done\n", phase)
Part V: Inverted index generation
在這個(gè)部分,你將構(gòu)建用于生成倒排索引的Map和Reduce函數(shù)访圃。 在main包中有一個(gè)ii.go
文件厨幻,與之前任務(wù)修改的wc.go非常相似。你應(yīng)該在main / ii.go中修改mapF和reduceF,以便它們一起生成倒排索引况脆。
- 測(cè)試方法
$ go run ii.go master sequential pg-*.txt
- 實(shí)現(xiàn)
在mapF函數(shù)中操作與原先的word count類(lèi)似饭宾,只是生成的中間結(jié)果形式變?yōu)閇word, document]。
values := strings.FieldsFunc(value, func(c rune) bool {
return !unicode.IsLetter(c)
})
for _, v := range values {
res = append(res, mapreduce.KeyValue{v, document})
}
return res
在reduceF函數(shù)中格了,此時(shí)values為document的字符串切片看铆,需要先去冗余,即實(shí)現(xiàn)set盛末,由于go語(yǔ)言不提供set弹惦,可以用map來(lái)模擬實(shí)現(xiàn),然后根據(jù)輸出構(gòu)造結(jié)果字符串悄但。
valuesNoRepeat := make([]string, 0)
set := make(map[string]int)
for _, v := range values {
_, ok := set[v]
if !ok {
set[v] = 1
valuesNoRepeat = append(valuesNoRepeat, v)
}
}
sort.Strings(valuesNoRepeat)
valuesLen := len(valuesNoRepeat)
res := strconv.Itoa(valuesLen) + " "
for i, v := range valuesNoRepeat {
if i == valuesLen-1 {
res += v
} else {
res += v + ","
}
}
return res
運(yùn)行l(wèi)ab 1所有Part的測(cè)試
$ cd src/main
$ bash ./test-mr.sh