分布式系統-實驗-MapReduce

介紹

通過 分布式系統系列文章爬骤,我們了解了分布式的一些基本概念充石,若是寫點代碼實踐一下,那就更好了霞玄。先做個簡單的實驗練練手骤铃,還記得 MapReduce 嗎?溃列,本次實驗中會構建一個 MapReduce 庫劲厌,主要能熟悉 Go 語言外加了解分布式系統中的容錯機制。首先寫個一個簡單的 MapReduce 程序听隐,再寫一個 Master,它不僅能分配任務給 worker 而且能處理 worker 執(zhí)行錯誤哄啄。接口參考論文描述雅任。

實驗環(huán)境

不會讓你從零開始擼代碼啦,還不快 git clone ?

$ git clone git://g.csail.mit.edu/6.824-golabs-2016 6.824
$ cd 6.824
$ ls
Makefile src

MapReduce 代碼支持順序執(zhí)行和分布式執(zhí)行咨跌。順序執(zhí)行意味著 Map 先執(zhí)行沪么,當所有 Map 任務都完成了再執(zhí)行 Reduce,這種模式可能效率比較低锌半,但是比較便于調試禽车,畢竟串行。分布式執(zhí)行啟動了很多 worker 線程,他們并行執(zhí)行 Map 任務殉摔,然后執(zhí)行 Reduce 任務州胳,這種模式效率更高,當然更難實現和調試逸月。

準備:熟悉代碼

mapreduce 包提供了一個簡單的 MapReduce 順序執(zhí)行實現栓撞。應用只要調用 Distributed() 方法就可以啟動一個任務,但是要調試的時候可能需要調用 Sequential().

mapreduce 的運行流程如下:

  1. 應用層需要提供輸入文件碗硬,一個 map 函數瓤湘,一個 reduce 函數,要啟動 reduce 任務的數量恩尾。

  2. 用這些參數創(chuàng)建一個 master弛说。它會啟動一個 RPC 服務器(master_rpc.go),然后等待 worker 注冊(Register())翰意。當有待完成的任務時木人,schedule() 就會將任務分配給 worker,同時也會進行 worker 的錯誤處理猎物。

  3. master 認為每個輸入文件應當交給一個 map 任務處理虎囚,然后調用 doMap(),無論直接調用 Sequential() 還是通過 RPC 給 worker 發(fā)送 DoTask 消息都會觸發(fā)這個操作蔫磨。每當調用 doMap() 時淘讥,它都會去讀取相應的文件,以文件內容調用 map 函數并且為每個輸入文件產生 nReduce 個文件堤如。因此蒲列,每個 map 任務最終會產生 #files x nReduce 個文件。

  4. master 接下來會對每個 reduce 任務至少調用一次 doReduce()搀罢。doReduce() 首先會收集 nReduce 個 map 任務產生的文件蝗岖,然后在每個文件上執(zhí)行 reduce 函數,最后產生一個結果文件榔至。

  5. master 會調用 mr.merge() 方法將上一步產生所有結果文件聚合到一個文件中抵赢。

所以本次實驗就是到填空題,空是:doMap, doReduce唧取,schedule 和 reduce铅鲤。

其他的方法基本不需要改動,有時間的研究研究有助于理解整體架構枫弟。

Part I: Map/Reduce 輸入和輸出

第一個空 doMap() 函數的功能是讀取指定文件的內容邢享,執(zhí)行 mapF 函數,將結果保存在新的文件中淡诗;而 doReuce() 讀取 doMap 的輸出文件骇塘,執(zhí)行 reduceF 函數伊履,將結果存在磁盤中。

寫完了就測試測試款违,測試文件(test_test.go)已經寫好了唐瀑。串行模式測試可執(zhí)行:

$ cd 6.824
$ export "GOPATH=$PWD"  
$ cd "$GOPATH/src/mapreduce"
$ setup ggo_v1.5
$ go test -run Sequential mapreduce/...
ok      mapreduce   2.694s

如果你看到的不是 ok,說明還有 bug 哦奠货。在 common.go 將 debugEnbale 設置成 true介褥,然后運行 go test -run Sequential mapreduce/... -v,可以看到更詳細的輸出:

$ env "GOPATH=$PWD/../../" go test -v -run Sequential mapreduce/...
=== RUN   TestSequentialSingle
master: Starting Map/Reduce task test
Merge: read mrtmp.test-res-0
master: Map/Reduce task completed
--- PASS: TestSequentialSingle (1.34s)
=== RUN   TestSequentialMany
master: Starting Map/Reduce task test
Merge: read mrtmp.test-res-0
Merge: read mrtmp.test-res-1
Merge: read mrtmp.test-res-2
master: Map/Reduce task completed
--- PASS: TestSequentialMany (1.33s)
PASS
ok      mapreduce   2.672s

Part II: 單機詞頻統計

完成了第一部分递惋,我們可以開始構建自己第一個 MapReduce 系統:詞頻統計器柔滔。沒錯還是填空題:mapF 和 reduceF,讓 wc.go 可以統計出每個單詞出現的次數萍虽。我們的測試文件里面只有英文睛廊,所以一個單詞就是連續(xù)出現字母,判斷一個字母參考標準庫 unicode.IsLetter杉编。

測試文件是 6.824/src/main/pg-*.txt超全,不妨先編譯試試:

$ cd 6.824
$ export "GOPATH=$PWD"
$ cd "$GOPATH/src/main"
$ go run wc.go master sequential pg-*.txt
# command-line-arguments
./wc.go:14: missing return at end of function
./wc.go:21: missing return at end of function

當然通過不了,畢竟空還沒填呢邓馒。mapF 的參數是測試文件名和其內容嘶朱,分割成單詞,返回 []mapreduce.KeyValue光酣,KeyValue:單詞-頻次疏遏。輪到 reduceF 函數了,它會針對每個 key(單詞) 調用一次救军,參數是某個單詞以及這個單詞在所有測試文件中的 mapF 結果财异。

寫好了嚼吞,便可測試:

$ cd "$GOPATH/src/main"
$ time go run wc.go master sequential pg-*.txt
master: Starting Map/Reduce task wcseq
Merge: read mrtmp.wcseq-res-0
Merge: read mrtmp.wcseq-res-1
Merge: read mrtmp.wcseq-res-2
master: Map/Reduce task completed
14.59user 3.78system 0:14.81elapsed

最終的結果保存在 mrtmp.wcseq 文件中沙庐。運行 $ rm mrtmp.* 刪除所有的中間數據文件锦积。

運行 sort -n -k2 mrtmp.wcseq | tail -10炸站,如果看到的和下面的一樣,說明你寫對了敢会。

$ 
he: 34077
was: 37044
that: 37495
I: 44502
in: 46092
a: 60558
to: 74357
of: 79727
and: 93990
the: 154024

亦可直接運行 $sh ./test-wc.sh

小提示: strings.FieldFunc 可以將一個 string 分割成多個部分杉武,strconv 包中有函數可將 string 轉換成 int伐蒂。

Part III: 分布式 MapReduce

MapReduce 讓開發(fā)者最爽的地方是不需要關心代碼是在多臺機器并行執(zhí)行的司致。但我們現在的實現是 master 把 map 和 reduce 任務一個一個執(zhí)行订晌。雖然這種實現模式概念上很簡單,但是性能并不是很高蚌吸。接下來我們來實現一個并發(fā)的 MapReduce,它會調用多個 worker 線程去執(zhí)行任務砌庄,這樣可以更好地利用多核CPU羹唠。當然我們的實驗不是真署在多臺機器上而是用 channel 去模擬分布式計算奕枢。

由于是并發(fā),所以需要調度者 master 線程佩微,它負責給 worker 分發(fā)任務缝彬,而且一直等待直到所有 worker 完成任務。為了讓我們的實驗更加真實哺眯,master 只能通過 RPC 的方式與 worker 通訊谷浅。worker 代碼(mapreduce/worker.go)已經準備好了,它用于啟動 worker奶卓。

下一個空是 schedule.go 中的 schedule()一疯,這個方法負責給 worker 分發(fā) map 和 reduce 任務,當所有任務完成后返回夺姑。

master.go 中的 run() 方法會先調用 schedule()墩邀,然后調用 merge() 把每個 reduce 任務的輸出文件整合到一個文件里面。schedule 只需要告訴 worker 輸入文件的名字 (mr.files[task]) 和任務 task盏浙,worker 自己知道從哪里讀取也知道把結果寫到哪個文件里面眉睹。master 通過 RPC 調用 Worker.DoTask 通知 worker 開始新任務,同時還會在 RPC 參數中包含一個 DoTaskArgs 對象废膘。

當一個 worker 準備完畢可以工作時竹海,它會向 master 發(fā)送一個 Register RPC,注冊的同時還會把這個 worker 的相關信息放入 mr.registerChannel丐黄。所以 schedule 應該通過讀取這個 channel 處理新 worker 的注冊斋配。

當前正在運行的 job 信息都在 Master 中定義。注意孵稽,master 不需要知道 Map 或 Reduce 具體執(zhí)行的是什么代碼许起;當一個 worker 被 wc.go 創(chuàng)建時就已經攜帶了 Map 和 Reduce 函數的信息。

運行 $ go test -run TestBasic mapreduce/... 可進行基礎測試菩鲜。

小提示: master 應該并行的發(fā)送 RPC 給 worker园细,這樣 worker 可以并發(fā)執(zhí)行任務〗有#可參考 Go RPC 文檔猛频。

小提示: master 應該等一個 worker 完成當前任務后馬上為它分配一個新任務。等待 master 響應的線程可以用 channel 作為同步工具蛛勉。Concurrency in Go 有詳細的 channel 用法鹿寻。

小提示: 跟蹤 bug 最簡單的方法就是在代碼加入 debug(),然后執(zhí)行 go test -run TestBasic mapreduce/... > out诽凌,out 就會包含調試信息毡熏。最重要的思考你原以為的輸出和真正的輸出為何不一樣。

注:當前的代碼試運行在一個 Unix 進程中侣诵,而且它能夠利用一臺機器的多核痢法。如果是要部署在多臺機器上狱窘,則要修改代碼讓 worker 通過 TCP 而不是 Unix-domain sockets 通訊。此外還需要一個網絡文件系統共享存儲财搁。

Part IV: 處理 worker 執(zhí)行錯誤

本小節(jié)要讓你的 master 能夠處理任務執(zhí)行失敗的 worker蘸炸。由于 MapReduce 中 worker 并沒有持久狀態(tài),所以處理起來相對容易尖奔。如果一個 worker 執(zhí)行失敗了搭儒,master 向 worker 發(fā)送的任何一個 RPC 都可能失敗,例如超時提茁。因此淹禾,如果失敗,master 應該把這個任務指派給另為一個worker甘凭。

一個 RPC 失敗并不一定代表 worker 失敗稀拐,有可能是某個 worker 正常運行但 master 無法獲取到它的信息。所以可能會出兩個 worker 同時執(zhí)行同一個任務丹弱。不過因為每個任務都是冪等的德撬,一個任務被執(zhí)行兩次是沒啥影響。

我們假設它不會失敗躲胳,所以不需要處理 master 失敗的情況蜓洪。讓 master 可以容錯是相對困難的,因為它保持著持久的狀態(tài)坯苹,當它失敗后我們需要恢復它的狀態(tài)以保證它可以繼續(xù)工作隆檀。

test_test.go 還剩最后兩個測試。測有一個 worker 失敗的情況和有很多 worker 失敗的情況粹湃。運行可測試:$ go test -run Failure mapreduce/...

Part V: 反向索引(可選)

挑戰(zhàn)性:

詞頻統計雖然是 MapReduce 最經典的一個應用恐仑,但是在大規(guī)模數據應用不經常用。試試寫個反向索引應用为鳄。

反向索引在計算機科學中使用廣泛裳仆,尤其在文檔搜索領域中非常有用。一般來說孤钦,一個反向索引就是一個從數據到數據特征的映射歧斟。例如,在文檔搜索中偏形,這個映射可能就是關鍵詞與文檔名稱的映射静袖。

main/ii.go 的整體結構跟 wc.go 相似。修改 mapF 和 reduceF 讓它們創(chuàng)建反向索引俊扭。運行 ii.go 應該輸出一個元組列表队橙,每一行的格式如下:

$ go run ii.go master sequential pg-*.txt
$ head -n5 mrtmp.iiseq
A: 16 pg-being_ernest.txt,pg-dorian_gray.txt,pg-dracula.txt,pg-emma.txt,pg-frankenstein.txt,pg-great_expectations.txt,pg-grimm.txt,pg-huckleberry_finn.txt,pg-les_miserables.txt,pg-metamorphosis.txt,pg-moby_dick.txt,pg-sherlock_holmes.txt,pg-tale_of_two_cities.txt,pg-tom_sawyer.txt,pg-ulysses.txt,pg-war_and_peace.txt
ABC: 2 pg-les_miserables.txt,pg-war_and_peace.txt
ABOUT: 2 pg-moby_dick.txt,pg-tom_sawyer.txt
ABRAHAM: 1 pg-dracula.txt
ABSOLUTE: 1 pg-les_miserables.txt

你的代碼應該通過 test-ii.sh 的測試:

$ sort -k1,1 mrtmp.iiseq | sort -snk2,2 mrtmp.iiseq | grep -v '16' | tail -10
women: 15 pg-being_ernest.txt,pg-dorian_gray.txt,pg-dracula.txt,pg-emma.txt,pg-frankenstein.txt,pg-great_expectations.txt,pg-huckleberry_finn.txt,pg-les_miserables.txt,pg-metamorphosis.txt,pg-moby_dick.txt,pg-sherlock_holmes.txt,pg-tale_of_two_cities.txt,pg-tom_sawyer.txt,pg-ulysses.txt,pg-war_and_peace.txt
won: 15 pg-being_ernest.txt,pg-dorian_gray.txt,pg-dracula.txt,pg-frankenstein.txt,pg-great_expectations.txt,pg-grimm.txt,pg-huckleberry_finn.txt,pg-les_miserables.txt,pg-metamorphosis.txt,pg-moby_dick.txt,pg-sherlock_holmes.txt,pg-tale_of_two_cities.txt,pg-tom_sawyer.txt,pg-ulysses.txt,pg-war_and_peace.txt
wonderful: 15 pg-being_ernest.txt,pg-dorian_gray.txt,pg-dracula.txt,pg-emma.txt,pg-frankenstein.txt,pg-great_expectations.txt,pg-grimm.txt,pg-huckleberry_finn.txt,pg-les_miserables.txt,pg-moby_dick.txt,pg-sherlock_holmes.txt,pg-tale_of_two_cities.txt,pg-tom_sawyer.txt,pg-ulysses.txt,pg-war_and_peace.txt
words: 15 pg-dorian_gray.txt,pg-dracula.txt,pg-emma.txt,pg-frankenstein.txt,pg-great_expectations.txt,pg-grimm.txt,pg-huckleberry_finn.txt,pg-les_miserables.txt,pg-metamorphosis.txt,pg-moby_dick.txt,pg-sherlock_holmes.txt,pg-tale_of_two_cities.txt,pg-tom_sawyer.txt,pg-ulysses.txt,pg-war_and_peace.txt
worked: 15 pg-dorian_gray.txt,pg-dracula.txt,pg-emma.txt,pg-frankenstein.txt,pg-great_expectations.txt,pg-grimm.txt,pg-huckleberry_finn.txt,pg-les_miserables.txt,pg-metamorphosis.txt,pg-moby_dick.txt,pg-sherlock_holmes.txt,pg-tale_of_two_cities.txt,pg-tom_sawyer.txt,pg-ulysses.txt,pg-war_and_peace.txt
worse: 15 pg-being_ernest.txt,pg-dorian_gray.txt,pg-dracula.txt,pg-emma.txt,pg-frankenstein.txt,pg-great_expectations.txt,pg-grimm.txt,pg-huckleberry_finn.txt,pg-les_miserables.txt,pg-moby_dick.txt,pg-sherlock_holmes.txt,pg-tale_of_two_cities.txt,pg-tom_sawyer.txt,pg-ulysses.txt,pg-war_and_peace.txt
wounded: 15 pg-being_ernest.txt,pg-dorian_gray.txt,pg-dracula.txt,pg-emma.txt,pg-frankenstein.txt,pg-great_expectations.txt,pg-grimm.txt,pg-huckleberry_finn.txt,pg-les_miserables.txt,pg-moby_dick.txt,pg-sherlock_holmes.txt,pg-tale_of_two_cities.txt,pg-tom_sawyer.txt,pg-ulysses.txt,pg-war_and_peace.txt
yes: 15 pg-being_ernest.txt,pg-dorian_gray.txt,pg-dracula.txt,pg-emma.txt,pg-great_expectations.txt,pg-grimm.txt,pg-huckleberry_finn.txt,pg-les_miserables.txt,pg-metamorphosis.txt,pg-moby_dick.txt,pg-sherlock_holmes.txt,pg-tale_of_two_cities.txt,pg-tom_sawyer.txt,pg-ulysses.txt,pg-war_and_peace.txt
younger: 15 pg-being_ernest.txt,pg-dorian_gray.txt,pg-dracula.txt,pg-emma.txt,pg-frankenstein.txt,pg-great_expectations.txt,pg-grimm.txt,pg-huckleberry_finn.txt,pg-les_miserables.txt,pg-moby_dick.txt,pg-sherlock_holmes.txt,pg-tale_of_two_cities.txt,pg-tom_sawyer.txt,pg-ulysses.txt,pg-war_and_peace.txt
yours: 15 pg-being_ernest.txt,pg-dorian_gray.txt,pg-dracula.txt,pg-emma.txt,pg-frankenstein.txt,pg-great_expectations.txt,pg-grimm.txt,pg-huckleberry_finn.txt,pg-les_miserables.txt,pg-moby_dick.txt,pg-sherlock_holmes.txt,pg-tale_of_two_cities.txt,pg-tom_sawyer.txt,pg-ulysses.txt,pg-war_and_peace.txt

通過全部測試

運行 src/main/test-mr.sh 可測試本次實驗的所有內容。如果全部通過,可以看到:

$ sh ./test-mr.sh
==> Part I
ok      mapreduce   3.053s

==> Part II
Passed test

==> Part III
ok      mapreduce   1.851s

==> Part IV
ok      mapreduce   10.650s

==> Part V (challenge)
Passed test
最后編輯于
?著作權歸作者所有,轉載或內容合作請聯系作者
  • 序言:七十年代末喘帚,一起剝皮案震驚了整個濱河市畅姊,隨后出現的幾起案子,更是在濱河造成了極大的恐慌吹由,老刑警劉巖,帶你破解...
    沈念sama閱讀 206,126評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件朱嘴,死亡現場離奇詭異倾鲫,居然都是意外死亡,警方通過查閱死者的電腦和手機萍嬉,發(fā)現死者居然都...
    沈念sama閱讀 88,254評論 2 382
  • 文/潘曉璐 我一進店門乌昔,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人壤追,你說我怎么就攤上這事磕道。” “怎么了行冰?”我有些...
    開封第一講書人閱讀 152,445評論 0 341
  • 文/不壞的土叔 我叫張陵溺蕉,是天一觀的道長。 經常有香客問我悼做,道長疯特,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 55,185評論 1 278
  • 正文 為了忘掉前任肛走,我火速辦了婚禮漓雅,結果婚禮上,老公的妹妹穿的比我還像新娘朽色。我一直安慰自己邻吞,他們只是感情好,可當我...
    茶點故事閱讀 64,178評論 5 371
  • 文/花漫 我一把揭開白布葫男。 她就那樣靜靜地躺著抱冷,像睡著了一般。 火紅的嫁衣襯著肌膚如雪腾誉。 梳的紋絲不亂的頭發(fā)上徘层,一...
    開封第一講書人閱讀 48,970評論 1 284
  • 那天,我揣著相機與錄音利职,去河邊找鬼趣效。 笑死,一個胖子當著我的面吹牛猪贪,可吹牛的內容都是我干的跷敬。 我是一名探鬼主播,決...
    沈念sama閱讀 38,276評論 3 399
  • 文/蒼蘭香墨 我猛地睜開眼热押,長吁一口氣:“原來是場噩夢啊……” “哼西傀!你這毒婦竟也來了斤寇?” 一聲冷哼從身側響起,我...
    開封第一講書人閱讀 36,927評論 0 259
  • 序言:老撾萬榮一對情侶失蹤拥褂,失蹤者是張志新(化名)和其女友劉穎娘锁,沒想到半個月后,有當地人在樹林里發(fā)現了一具尸體饺鹃,經...
    沈念sama閱讀 43,400評論 1 300
  • 正文 獨居荒郊野嶺守林人離奇死亡莫秆,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 35,883評論 2 323
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現自己被綠了悔详。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片镊屎。...
    茶點故事閱讀 37,997評論 1 333
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖茄螃,靈堂內的尸體忽然破棺而出缝驳,到底是詐尸還是另有隱情,我是刑警寧澤归苍,帶...
    沈念sama閱讀 33,646評論 4 322
  • 正文 年R本政府宣布用狱,位于F島的核電站,受9級特大地震影響霜医,放射性物質發(fā)生泄漏齿拂。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 39,213評論 3 307
  • 文/蒙蒙 一肴敛、第九天 我趴在偏房一處隱蔽的房頂上張望署海。 院中可真熱鬧,春花似錦医男、人聲如沸砸狞。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,204評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽刀森。三九已至,卻和暖如春报账,著一層夾襖步出監(jiān)牢的瞬間研底,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,423評論 1 260
  • 我被黑心中介騙來泰國打工透罢, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留榜晦,地道東北人。 一個月前我還...
    沈念sama閱讀 45,423評論 2 352
  • 正文 我出身青樓羽圃,卻偏偏與公主長得像乾胶,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 42,722評論 2 345

推薦閱讀更多精彩內容