介紹
通過 分布式系統系列文章爬骤,我們了解了分布式的一些基本概念充石,若是寫點代碼實踐一下,那就更好了霞玄。先做個簡單的實驗練練手骤铃,還記得 MapReduce 嗎?溃列,本次實驗中會構建一個 MapReduce 庫劲厌,主要能熟悉 Go 語言外加了解分布式系統中的容錯機制。首先寫個一個簡單的 MapReduce 程序听隐,再寫一個 Master,它不僅能分配任務給 worker 而且能處理 worker 執(zhí)行錯誤哄啄。接口參考論文描述雅任。
實驗環(huán)境
不會讓你從零開始擼代碼啦,還不快 git clone ?
$ git clone git://g.csail.mit.edu/6.824-golabs-2016 6.824
$ cd 6.824
$ ls
Makefile src
MapReduce 代碼支持順序執(zhí)行和分布式執(zhí)行咨跌。順序執(zhí)行意味著 Map 先執(zhí)行沪么,當所有 Map 任務都完成了再執(zhí)行 Reduce,這種模式可能效率比較低锌半,但是比較便于調試禽车,畢竟串行。分布式執(zhí)行啟動了很多 worker 線程,他們并行執(zhí)行 Map 任務殉摔,然后執(zhí)行 Reduce 任務州胳,這種模式效率更高,當然更難實現和調試逸月。
準備:熟悉代碼
mapreduce
包提供了一個簡單的 MapReduce 順序執(zhí)行實現栓撞。應用只要調用 Distributed()
方法就可以啟動一個任務,但是要調試的時候可能需要調用 Sequential()
.
mapreduce 的運行流程如下:
應用層需要提供輸入文件碗硬,一個 map 函數瓤湘,一個 reduce 函數,要啟動 reduce 任務的數量恩尾。
用這些參數創(chuàng)建一個 master弛说。它會啟動一個 RPC 服務器(master_rpc.go),然后等待 worker 注冊(
Register()
)翰意。當有待完成的任務時木人,schedule()
就會將任務分配給 worker,同時也會進行 worker 的錯誤處理猎物。master 認為每個輸入文件應當交給一個 map 任務處理虎囚,然后調用
doMap()
,無論直接調用Sequential()
還是通過 RPC 給 worker 發(fā)送 DoTask 消息都會觸發(fā)這個操作蔫磨。每當調用 doMap() 時淘讥,它都會去讀取相應的文件,以文件內容調用 map 函數并且為每個輸入文件產生 nReduce 個文件堤如。因此蒲列,每個 map 任務最終會產生#files x nReduce
個文件。master 接下來會對每個 reduce 任務至少調用一次
doReduce()
搀罢。doReduce()
首先會收集 nReduce 個 map 任務產生的文件蝗岖,然后在每個文件上執(zhí)行 reduce 函數,最后產生一個結果文件榔至。master 會調用
mr.merge()
方法將上一步產生所有結果文件聚合到一個文件中抵赢。
所以本次實驗就是到填空題,空是:doMap, doReduce唧取,schedule 和 reduce铅鲤。
其他的方法基本不需要改動,有時間的研究研究有助于理解整體架構枫弟。
Part I: Map/Reduce 輸入和輸出
第一個空 doMap()
函數的功能是讀取指定文件的內容邢享,執(zhí)行 mapF 函數,將結果保存在新的文件中淡诗;而 doReuce()
讀取 doMap
的輸出文件骇塘,執(zhí)行 reduceF 函數伊履,將結果存在磁盤中。
寫完了就測試測試款违,測試文件(test_test.go)已經寫好了唐瀑。串行模式測試可執(zhí)行:
$ cd 6.824
$ export "GOPATH=$PWD"
$ cd "$GOPATH/src/mapreduce"
$ setup ggo_v1.5
$ go test -run Sequential mapreduce/...
ok mapreduce 2.694s
如果你看到的不是 ok,說明還有 bug 哦奠货。在 common.go 將 debugEnbale 設置成 true介褥,然后運行 go test -run Sequential mapreduce/... -v
,可以看到更詳細的輸出:
$ env "GOPATH=$PWD/../../" go test -v -run Sequential mapreduce/...
=== RUN TestSequentialSingle
master: Starting Map/Reduce task test
Merge: read mrtmp.test-res-0
master: Map/Reduce task completed
--- PASS: TestSequentialSingle (1.34s)
=== RUN TestSequentialMany
master: Starting Map/Reduce task test
Merge: read mrtmp.test-res-0
Merge: read mrtmp.test-res-1
Merge: read mrtmp.test-res-2
master: Map/Reduce task completed
--- PASS: TestSequentialMany (1.33s)
PASS
ok mapreduce 2.672s
Part II: 單機詞頻統計
完成了第一部分递惋,我們可以開始構建自己第一個 MapReduce 系統:詞頻統計器柔滔。沒錯還是填空題:mapF 和 reduceF,讓 wc.go 可以統計出每個單詞出現的次數萍虽。我們的測試文件里面只有英文睛廊,所以一個單詞就是連續(xù)出現字母,判斷一個字母參考標準庫 unicode.IsLetter
杉编。
測試文件是 6.824/src/main/pg-*.txt超全,不妨先編譯試試:
$ cd 6.824
$ export "GOPATH=$PWD"
$ cd "$GOPATH/src/main"
$ go run wc.go master sequential pg-*.txt
# command-line-arguments
./wc.go:14: missing return at end of function
./wc.go:21: missing return at end of function
當然通過不了,畢竟空還沒填呢邓馒。mapF 的參數是測試文件名和其內容嘶朱,分割成單詞,返回 []mapreduce.KeyValue光酣,KeyValue:單詞-頻次疏遏。輪到 reduceF 函數了,它會針對每個 key(單詞) 調用一次救军,參數是某個單詞以及這個單詞在所有測試文件中的 mapF 結果财异。
寫好了嚼吞,便可測試:
$ cd "$GOPATH/src/main"
$ time go run wc.go master sequential pg-*.txt
master: Starting Map/Reduce task wcseq
Merge: read mrtmp.wcseq-res-0
Merge: read mrtmp.wcseq-res-1
Merge: read mrtmp.wcseq-res-2
master: Map/Reduce task completed
14.59user 3.78system 0:14.81elapsed
最終的結果保存在 mrtmp.wcseq 文件中沙庐。運行 $ rm mrtmp.*
刪除所有的中間數據文件锦积。
運行 sort -n -k2 mrtmp.wcseq | tail -10
炸站,如果看到的和下面的一樣,說明你寫對了敢会。
$
he: 34077
was: 37044
that: 37495
I: 44502
in: 46092
a: 60558
to: 74357
of: 79727
and: 93990
the: 154024
亦可直接運行 $sh ./test-wc.sh
小提示:
strings.FieldFunc
可以將一個 string 分割成多個部分杉武,strconv
包中有函數可將 string 轉換成 int伐蒂。
Part III: 分布式 MapReduce
MapReduce 讓開發(fā)者最爽的地方是不需要關心代碼是在多臺機器并行執(zhí)行的司致。但我們現在的實現是 master 把 map 和 reduce 任務一個一個執(zhí)行订晌。雖然這種實現模式概念上很簡單,但是性能并不是很高蚌吸。接下來我們來實現一個并發(fā)的 MapReduce,它會調用多個 worker 線程去執(zhí)行任務砌庄,這樣可以更好地利用多核CPU羹唠。當然我們的實驗不是真署在多臺機器上而是用 channel 去模擬分布式計算奕枢。
由于是并發(fā),所以需要調度者 master 線程佩微,它負責給 worker 分發(fā)任務缝彬,而且一直等待直到所有 worker 完成任務。為了讓我們的實驗更加真實哺眯,master 只能通過 RPC 的方式與 worker 通訊谷浅。worker 代碼(mapreduce/worker.go)已經準備好了,它用于啟動 worker奶卓。
下一個空是 schedule.go 中的 schedule()
一疯,這個方法負責給 worker 分發(fā) map 和 reduce 任務,當所有任務完成后返回夺姑。
master.go 中的 run()
方法會先調用 schedule()
墩邀,然后調用 merge()
把每個 reduce 任務的輸出文件整合到一個文件里面。schedule 只需要告訴 worker 輸入文件的名字 (mr.files[task]
) 和任務 task盏浙,worker 自己知道從哪里讀取也知道把結果寫到哪個文件里面眉睹。master 通過 RPC 調用 Worker.DoTask
通知 worker 開始新任務,同時還會在 RPC 參數中包含一個 DoTaskArgs
對象废膘。
當一個 worker 準備完畢可以工作時竹海,它會向 master 發(fā)送一個 Register RPC,注冊的同時還會把這個 worker 的相關信息放入 mr.registerChannel
丐黄。所以 schedule
應該通過讀取這個 channel 處理新 worker 的注冊斋配。
當前正在運行的 job 信息都在 Master 中定義。注意孵稽,master 不需要知道 Map 或 Reduce 具體執(zhí)行的是什么代碼许起;當一個 worker 被 wc.go 創(chuàng)建時就已經攜帶了 Map 和 Reduce 函數的信息。
運行 $ go test -run TestBasic mapreduce/...
可進行基礎測試菩鲜。
小提示: master 應該并行的發(fā)送 RPC 給 worker园细,這樣 worker 可以并發(fā)執(zhí)行任務〗有#可參考 Go RPC 文檔猛频。
小提示: master 應該等一個 worker 完成當前任務后馬上為它分配一個新任務。等待 master 響應的線程可以用 channel 作為同步工具蛛勉。Concurrency in Go 有詳細的 channel 用法鹿寻。
小提示: 跟蹤 bug 最簡單的方法就是在代碼加入 debug(),然后執(zhí)行
go test -run TestBasic mapreduce/... > out
诽凌,out 就會包含調試信息毡熏。最重要的思考你原以為的輸出和真正的輸出為何不一樣。
注:當前的代碼試運行在一個 Unix 進程中侣诵,而且它能夠利用一臺機器的多核痢法。如果是要部署在多臺機器上狱窘,則要修改代碼讓 worker 通過 TCP 而不是 Unix-domain sockets 通訊。此外還需要一個網絡文件系統共享存儲财搁。
Part IV: 處理 worker 執(zhí)行錯誤
本小節(jié)要讓你的 master 能夠處理任務執(zhí)行失敗的 worker蘸炸。由于 MapReduce 中 worker 并沒有持久狀態(tài),所以處理起來相對容易尖奔。如果一個 worker 執(zhí)行失敗了搭儒,master 向 worker 發(fā)送的任何一個 RPC 都可能失敗,例如超時提茁。因此淹禾,如果失敗,master 應該把這個任務指派給另為一個worker甘凭。
一個 RPC 失敗并不一定代表 worker 失敗稀拐,有可能是某個 worker 正常運行但 master 無法獲取到它的信息。所以可能會出兩個 worker 同時執(zhí)行同一個任務丹弱。不過因為每個任務都是冪等的德撬,一個任務被執(zhí)行兩次是沒啥影響。
我們假設它不會失敗躲胳,所以不需要處理 master 失敗的情況蜓洪。讓 master 可以容錯是相對困難的,因為它保持著持久的狀態(tài)坯苹,當它失敗后我們需要恢復它的狀態(tài)以保證它可以繼續(xù)工作隆檀。
test_test.go 還剩最后兩個測試。測有一個 worker 失敗的情況和有很多 worker 失敗的情況粹湃。運行可測試:$ go test -run Failure mapreduce/...
Part V: 反向索引(可選)
挑戰(zhàn)性:
詞頻統計雖然是 MapReduce 最經典的一個應用恐仑,但是在大規(guī)模數據應用不經常用。試試寫個反向索引應用为鳄。
反向索引在計算機科學中使用廣泛裳仆,尤其在文檔搜索領域中非常有用。一般來說孤钦,一個反向索引就是一個從數據到數據特征的映射歧斟。例如,在文檔搜索中偏形,這個映射可能就是關鍵詞與文檔名稱的映射静袖。
main/ii.go 的整體結構跟 wc.go 相似。修改 mapF 和 reduceF 讓它們創(chuàng)建反向索引俊扭。運行 ii.go 應該輸出一個元組列表队橙,每一行的格式如下:
$ go run ii.go master sequential pg-*.txt
$ head -n5 mrtmp.iiseq
A: 16 pg-being_ernest.txt,pg-dorian_gray.txt,pg-dracula.txt,pg-emma.txt,pg-frankenstein.txt,pg-great_expectations.txt,pg-grimm.txt,pg-huckleberry_finn.txt,pg-les_miserables.txt,pg-metamorphosis.txt,pg-moby_dick.txt,pg-sherlock_holmes.txt,pg-tale_of_two_cities.txt,pg-tom_sawyer.txt,pg-ulysses.txt,pg-war_and_peace.txt
ABC: 2 pg-les_miserables.txt,pg-war_and_peace.txt
ABOUT: 2 pg-moby_dick.txt,pg-tom_sawyer.txt
ABRAHAM: 1 pg-dracula.txt
ABSOLUTE: 1 pg-les_miserables.txt
你的代碼應該通過 test-ii.sh 的測試:
$ sort -k1,1 mrtmp.iiseq | sort -snk2,2 mrtmp.iiseq | grep -v '16' | tail -10
women: 15 pg-being_ernest.txt,pg-dorian_gray.txt,pg-dracula.txt,pg-emma.txt,pg-frankenstein.txt,pg-great_expectations.txt,pg-huckleberry_finn.txt,pg-les_miserables.txt,pg-metamorphosis.txt,pg-moby_dick.txt,pg-sherlock_holmes.txt,pg-tale_of_two_cities.txt,pg-tom_sawyer.txt,pg-ulysses.txt,pg-war_and_peace.txt
won: 15 pg-being_ernest.txt,pg-dorian_gray.txt,pg-dracula.txt,pg-frankenstein.txt,pg-great_expectations.txt,pg-grimm.txt,pg-huckleberry_finn.txt,pg-les_miserables.txt,pg-metamorphosis.txt,pg-moby_dick.txt,pg-sherlock_holmes.txt,pg-tale_of_two_cities.txt,pg-tom_sawyer.txt,pg-ulysses.txt,pg-war_and_peace.txt
wonderful: 15 pg-being_ernest.txt,pg-dorian_gray.txt,pg-dracula.txt,pg-emma.txt,pg-frankenstein.txt,pg-great_expectations.txt,pg-grimm.txt,pg-huckleberry_finn.txt,pg-les_miserables.txt,pg-moby_dick.txt,pg-sherlock_holmes.txt,pg-tale_of_two_cities.txt,pg-tom_sawyer.txt,pg-ulysses.txt,pg-war_and_peace.txt
words: 15 pg-dorian_gray.txt,pg-dracula.txt,pg-emma.txt,pg-frankenstein.txt,pg-great_expectations.txt,pg-grimm.txt,pg-huckleberry_finn.txt,pg-les_miserables.txt,pg-metamorphosis.txt,pg-moby_dick.txt,pg-sherlock_holmes.txt,pg-tale_of_two_cities.txt,pg-tom_sawyer.txt,pg-ulysses.txt,pg-war_and_peace.txt
worked: 15 pg-dorian_gray.txt,pg-dracula.txt,pg-emma.txt,pg-frankenstein.txt,pg-great_expectations.txt,pg-grimm.txt,pg-huckleberry_finn.txt,pg-les_miserables.txt,pg-metamorphosis.txt,pg-moby_dick.txt,pg-sherlock_holmes.txt,pg-tale_of_two_cities.txt,pg-tom_sawyer.txt,pg-ulysses.txt,pg-war_and_peace.txt
worse: 15 pg-being_ernest.txt,pg-dorian_gray.txt,pg-dracula.txt,pg-emma.txt,pg-frankenstein.txt,pg-great_expectations.txt,pg-grimm.txt,pg-huckleberry_finn.txt,pg-les_miserables.txt,pg-moby_dick.txt,pg-sherlock_holmes.txt,pg-tale_of_two_cities.txt,pg-tom_sawyer.txt,pg-ulysses.txt,pg-war_and_peace.txt
wounded: 15 pg-being_ernest.txt,pg-dorian_gray.txt,pg-dracula.txt,pg-emma.txt,pg-frankenstein.txt,pg-great_expectations.txt,pg-grimm.txt,pg-huckleberry_finn.txt,pg-les_miserables.txt,pg-moby_dick.txt,pg-sherlock_holmes.txt,pg-tale_of_two_cities.txt,pg-tom_sawyer.txt,pg-ulysses.txt,pg-war_and_peace.txt
yes: 15 pg-being_ernest.txt,pg-dorian_gray.txt,pg-dracula.txt,pg-emma.txt,pg-great_expectations.txt,pg-grimm.txt,pg-huckleberry_finn.txt,pg-les_miserables.txt,pg-metamorphosis.txt,pg-moby_dick.txt,pg-sherlock_holmes.txt,pg-tale_of_two_cities.txt,pg-tom_sawyer.txt,pg-ulysses.txt,pg-war_and_peace.txt
younger: 15 pg-being_ernest.txt,pg-dorian_gray.txt,pg-dracula.txt,pg-emma.txt,pg-frankenstein.txt,pg-great_expectations.txt,pg-grimm.txt,pg-huckleberry_finn.txt,pg-les_miserables.txt,pg-moby_dick.txt,pg-sherlock_holmes.txt,pg-tale_of_two_cities.txt,pg-tom_sawyer.txt,pg-ulysses.txt,pg-war_and_peace.txt
yours: 15 pg-being_ernest.txt,pg-dorian_gray.txt,pg-dracula.txt,pg-emma.txt,pg-frankenstein.txt,pg-great_expectations.txt,pg-grimm.txt,pg-huckleberry_finn.txt,pg-les_miserables.txt,pg-moby_dick.txt,pg-sherlock_holmes.txt,pg-tale_of_two_cities.txt,pg-tom_sawyer.txt,pg-ulysses.txt,pg-war_and_peace.txt
通過全部測試
運行 src/main/test-mr.sh 可測試本次實驗的所有內容。如果全部通過,可以看到:
$ sh ./test-mr.sh
==> Part I
ok mapreduce 3.053s
==> Part II
Passed test
==> Part III
ok mapreduce 1.851s
==> Part IV
ok mapreduce 10.650s
==> Part V (challenge)
Passed test