摘要
MapReduce 是一個編程模型,也是一個處理和生成超大數(shù)據(jù)集的算法模型的相關(guān)實現(xiàn)。用戶首先創(chuàng)建一個 Map 函數(shù)處理一個基于 key/value pair 的數(shù)據(jù)集合钳垮,輸出中間的基于 key/value pair 的數(shù)據(jù)集合焙糟;然后再創(chuàng)建一個 Reduce 函數(shù)用來合并所有的具有相同中間 key 值的中間 value 值。
MapReduce 架構(gòu)的程序能夠在大量的普通配置的計算機上實現(xiàn)并行化處理晾捏。這個系統(tǒng)在運行時只關(guān)心:如何分割輸入數(shù)據(jù)蒿涎,在大量計算機組成的集群上的調(diào)度,集群中計算機的錯誤處理惦辛,管理集群中計算機之間必要的通信劳秋。采用 MapReduce 架構(gòu)可以使那些沒有并行計算和分布式處理系統(tǒng)開發(fā)經(jīng)驗的程序員有效利用分布式系統(tǒng)的豐富資源。
點評: 當(dāng)時設(shè)計MAP REDUCE的框架時胖齐,是為了解決大數(shù)據(jù)集單機無法HANDLE的問題玻淑。那么就要利用大量的機器協(xié)同工作。如何協(xié)同工作以及拆分任務(wù)是第一個難點呀伙。其次是容錯岁忘。最后我們希望可以屏蔽分布式的細(xì)節(jié),可以讓一般的程序員來編寫程序区匠,把分布式的問題交給底層干像。
1. 介紹
在過去的 5 年里帅腌,包括本文作者在內(nèi)的 Google 的很多程序員,為了處理海量的原始數(shù)據(jù)麻汰,已經(jīng)實現(xiàn)了數(shù)以百計的速客、專用的計算方法。這些計算方法用來處理大量的原始數(shù)據(jù)五鲫,比如溺职,文檔抓取(類似網(wǎng)絡(luò)爬蟲的程序)位喂、Web 請求日志等等浪耘;也為了計算處理各種類型的衍生數(shù)據(jù),比如倒排索引塑崖、Web 文檔的圖結(jié)構(gòu)的各種表示形勢七冲、每臺主機上網(wǎng)絡(luò)爬蟲抓取的頁面數(shù)量的匯總、每天被請求的最多的查詢的集合等等规婆。大多數(shù)這樣的數(shù)據(jù)處理運算在概念上很容易理解澜躺。然而由于輸入的數(shù)據(jù)量巨大,因此要想在可接受的時間內(nèi)完成運算抒蚜,只有將這些計算分布在成百上千的主機上掘鄙。如何處理并行計算、如何分發(fā)數(shù)據(jù)嗡髓、如何處理錯誤操漠?所有這些問題綜合在一起,需要大量的代碼處理饿这,因此也使得原本簡單的運算變得難以處理颅夺。
為了解決上述復(fù)雜的問題,我們設(shè)計一個新的抽象模型蛹稍,使用這個抽象模型吧黄,我們只要表述我們想要執(zhí)行的簡單運算即可,而不必關(guān)心并行計算唆姐、容錯拗慨、數(shù)據(jù)分布、負(fù)載均衡等復(fù)雜的細(xì)節(jié)奉芦,這些問題都被封裝在了一個庫里面赵抢。設(shè)計這個抽象模型的靈感來自 Lisp 和許多其他函數(shù)式語言的 Map 和 Reduce 的原語。我們意識到我們大多數(shù)的運算都包含這樣的操作:在輸入數(shù)據(jù)的“邏輯”記錄上應(yīng)用 Map 操作得出一個中間 key/value pair 集合声功,然后在所有具有相同 key 值的 value 值上應(yīng)Reduce 操作烦却,從而達(dá)到合并中間的數(shù)據(jù),得到一個想要的結(jié)果的目的先巴。使用 MapReduce 模型其爵,再結(jié)合用戶實現(xiàn)的 Map 和 Reduce 函數(shù)冒冬,我們就可以非常容易的實現(xiàn)大規(guī)模并行化計算;通過 MapReduce 模型自帶的“再次執(zhí)行”(re-execution)功能摩渺,也提供了初級的容災(zāi)實現(xiàn)方案简烤。
點評:這次借鑒了函數(shù)式語言的靈感,來設(shè)計出一個MAP方法和一個REDUCE方法摇幻,并且這種方法可以再次執(zhí)行來達(dá)到容錯横侦。
2.編程模型
MapReduce 編程模型的原理是:利用一個輸入 key/value pair 集合來產(chǎn)生一個輸出的 key/value pair 集合。
MapReduce 庫的用戶用兩個函數(shù)表達(dá)這個計算:Map 和 Reduce绰姻。
用戶自定義的 Map 函數(shù)接受一個輸入的 key/value pair 值枉侧,然后產(chǎn)生一個中間 key/value pair 值的集合。
MapReduce 庫把所有具有相同中間 key 值 I 的中間 value 值集合在一起后傳遞給 reduce 函數(shù)狂芋。
用戶自定義的 Reduce 函數(shù)接受一個中間 key 的值 I 和相關(guān)的一個 value 值的集合榨馁。Reduce 函數(shù)合并這些value 值,形成一個較小的 value 值的集合银酗。一般的,每次 Reduce 函數(shù)調(diào)用只產(chǎn)生 0 或 1 個輸出 value 值徒像。通常我們通過一個迭代器把中間 value 值提供給 Reduce 函數(shù)黍特,這樣我們就可以處理無法全部放入內(nèi)存中的大量的 value 值的集合。
Map 函數(shù)輸出文檔中的每個詞锯蛀、以及這個詞的出現(xiàn)次數(shù)(在這個簡單的例子里就是 1)灭衷。Reduce 函數(shù)把 Map函數(shù)產(chǎn)生的每一個特定的詞的計數(shù)累加起來。
3.實現(xiàn)
MapReduce 模型可以有多種不同的實現(xiàn)方式旁涤。如何正確選擇取決于具體的環(huán)境翔曲。例如,一種實現(xiàn)方式適用于小型的共享內(nèi)存方式的機器劈愚,另外一種實現(xiàn)方式則適用于大型 NUMA 架構(gòu)的多處理器的主機瞳遍,而有的實現(xiàn)方式更適合大型的網(wǎng)絡(luò)連接集群。
本章節(jié)描述一個適用于 Google 內(nèi)部廣泛使用的運算環(huán)境的實現(xiàn):用以太網(wǎng)交換機連接菌羽、由普通 PC 機組成的大型集群掠械。在我們的環(huán)境里包括:
- x86 架構(gòu)、運行 Linux 操作系統(tǒng)注祖、雙處理器猾蒂、2-4GB 內(nèi)存的機器。
- 普通的網(wǎng)絡(luò)硬件設(shè)備是晨,每個機器的帶寬為百兆或者千兆肚菠,但是遠(yuǎn)小于網(wǎng)絡(luò)的平均帶寬的一半。
- 集群中包含成百上千的機器罩缴,因此蚊逢,機器故障是常態(tài)层扶。
- 存儲為廉價的內(nèi)置 IDE 硬盤。一個內(nèi)部分布式文件系統(tǒng)用來管理存儲在這些磁盤上的數(shù)據(jù)时捌。文件系統(tǒng)通過數(shù)據(jù)復(fù)制來在不可靠的硬件上保證數(shù)據(jù)的可靠性和有效性怒医。
- 用戶提交工作(job)給調(diào)度系統(tǒng)。每個工作(job)都包含一系列的任務(wù)(task)奢讨,調(diào)度系統(tǒng)將這些任務(wù)調(diào)度到集群中多臺可用的機器上稚叹。
3.1 執(zhí)行概括
通過將 Map 調(diào)用的輸入數(shù)據(jù)自動分割為 M 個數(shù)據(jù)片段的集合,Map 調(diào)用被分布到多臺機器上執(zhí)行拿诸。輸入的數(shù)據(jù)片段能夠在不同的機器上并行處理扒袖。使用分區(qū)函數(shù)將 Map 調(diào)用產(chǎn)生的中間 key 值分成 R 個不同分區(qū)(例如,hash(key) mod R)亩码,Reduce 調(diào)用也被分布到多臺機器上執(zhí)行季率。分區(qū)數(shù)量(R)和分區(qū)函數(shù)由用戶來指
定。
圖 1 展示了我們的 MapReduce 實現(xiàn)中操作的全部流程描沟。當(dāng)用戶調(diào)用 MapReduce 函數(shù)時飒泻,將發(fā)生下面的一系列動作(下面的序號和圖 1 中的序號一一對應(yīng)):
- 用戶程序首先調(diào)用的 MapReduce 庫將輸入文件分成 M 個數(shù)據(jù)片度,每個數(shù)據(jù)片段的大小一般從16MB 到 64MB(可以通過可選的參數(shù)來控制每個數(shù)據(jù)片段的大小)吏廉。然后用戶程序在機群中創(chuàng)建大量的程序副本泞遗。
- 這些程序副本中的有一個特殊的程序–master。副本中其它的程序都是 worker 程序席覆,由 master 分配任務(wù)史辙。有 M 個 Map 任務(wù)和 R 個 Reduce 任務(wù)將被分配,master 將一個 Map 任務(wù)或 Reduce 任務(wù)分配給一個空閑的 worker佩伤。
- 被分配了 map 任務(wù)的 worker 程序讀取相關(guān)的輸入數(shù)據(jù)片段聊倔,從輸入的數(shù)據(jù)片段中解析出 key/value pair,然后把 key/value pair 傳遞給用戶自定義的 Map 函數(shù)生巡,由 Map 函數(shù)生成并輸出的中間 key/value pair耙蔑,并緩存在內(nèi)存中。
- 緩存中的 key/value pair 通過分區(qū)函數(shù)分成 R 個區(qū)域孤荣,之后周期性的寫入到本地磁盤上纵潦。緩存的 key/value pair 在本地磁盤上的存儲位置將被回傳給 master,由 master 負(fù)責(zé)把這些存儲位置再傳送給Reduce worker垃环。
- 當(dāng) Reduce worker 程序接收到 master 程序發(fā)來的數(shù)據(jù)存儲位置信息后邀层,使用 RPC 從 Map worker 所在主機的磁盤上讀取這些緩存數(shù)據(jù)。當(dāng) Reduce worker 讀取了所有的中間數(shù)據(jù)后遂庄,通過對 key 進(jìn)行排序后使得具有相同 key 值的數(shù)據(jù)聚合在一起寥院。由于許多不同的 key 值會映射到相同的 Reduce 任務(wù)上,因此必須進(jìn)行排序涛目。如果中間數(shù)據(jù)太大無法在內(nèi)存中完成排序秸谢,那么就要在外部進(jìn)行排序凛澎。
- Reduce worker 程序遍歷排序后的中間數(shù)據(jù),對于每一個唯一的中間 key 值估蹄,Reduce worker 程序?qū)⑦@個 key 值和它相關(guān)的中間 value 值的集合傳遞給用戶自定義的 Reduce 函數(shù)塑煎。Reduce 函數(shù)的輸出被追加到所屬分區(qū)的輸出文件。
- 當(dāng)所有的 Map 和 Reduce 任務(wù)都完成之后臭蚁,master 喚醒用戶程序最铁。在這個時候,在用戶程序里的對MapReduce 調(diào)用才返回垮兑。
在成功完成任務(wù)之后冷尉,MapReduce 的輸出存放在 R 個輸出文件中(對應(yīng)每個 Reduce 任務(wù)產(chǎn)生一個輸出文件,文件名由用戶指定)系枪。一般情況下雀哨,用戶不需要將這 R 個輸出文件合并成一個文件–他們經(jīng)常把這些文件作為另外一個MapReduce 的輸入,或者在另外一個可以處理多個分割文件的分布式應(yīng)用中使用私爷。
3.2 Master 數(shù)據(jù)結(jié)構(gòu)
Master 持有一些數(shù)據(jù)結(jié)構(gòu)雾棺,它存儲每一個 Map 和 Reduce 任務(wù)的狀態(tài)(空閑、工作中或完成)衬浑,以及 Worker機器(非空閑任務(wù)的機器)的標(biāo)識捌浩。
Master 就像一個數(shù)據(jù)管道,中間文件存儲區(qū)域的位置信息通過這個管道從 Map 傳遞到 Reduce嚎卫。因此嘉栓,對于每個已經(jīng)完成的 Map 任務(wù)宏榕,master 存儲了 Map 任務(wù)產(chǎn)生的 R 個中間文件存儲區(qū)域的大小和位置拓诸。當(dāng) Map任務(wù)完成時,Master 接收到位置和大小的更新信息麻昼,這些信息被逐步遞增的推送給那些正在工作的 Reduce 任務(wù)奠支。
3.3 容錯
3.3.1 worker 故障
master 周期性的 ping 每個 worker。如果在一個約定的時間范圍內(nèi)沒有收到 worker 返回的信息抚芦,master 將把這個 worker 標(biāo)記為失效倍谜。所有由這個失效的 worker 完成的 Map 任務(wù)被重設(shè)為初始的空閑狀態(tài),之后這些任務(wù)就可以被安排給其他的 worker叉抡。同樣的尔崔,worker 失效時正在運行的 Map 或 Reduce 任務(wù)也將被重新置為空閑狀態(tài),等待重新調(diào)度褥民。
當(dāng) worker 故障時季春,由于已經(jīng)完成的 Map 任務(wù)的輸出存儲在這臺機器上,Map 任務(wù)的輸出已不可訪問了消返,因此必須重新執(zhí)行载弄。而已經(jīng)完成的 Reduce 任務(wù)的輸出存儲在全局文件系統(tǒng)上耘拇,因此不需要再次執(zhí)行。
當(dāng)一個 Map 任務(wù)首先被 worker A 執(zhí)行宇攻,之后由于 worker A 失效了又被調(diào)度到 worker B 執(zhí)行惫叛,這個“重新執(zhí)行”的動作會被通知給所有執(zhí)行 Reduce 任務(wù)的 worker。任何還沒有從 worker A 讀取數(shù)據(jù)的 Reduce 任務(wù)將從 worker B 讀取數(shù)據(jù)逞刷。
MapReduce 可以處理大規(guī)模 worker 失效的情況嘉涌。比如洛心,在一個 MapReduce 操作執(zhí)行期間题篷,在正在運行的集群上進(jìn)行網(wǎng)絡(luò)維護引起 80 臺機器在幾分鐘內(nèi)不可訪問了词身,MapReduce master 只需要簡單的再次執(zhí)行那些不可訪問的 worker 完成的工作番枚,之后繼續(xù)執(zhí)行未完成的任務(wù),直到最終完成這個 MapReduce 操作葫笼。
3.3.2 master 失敗
一個簡單的解決辦法是讓 master 周期性的將上面描述的數(shù)據(jù)結(jié)構(gòu)(alex 注:指 3.2 節(jié))的寫入磁盤深啤,即檢查點(checkpoint)路星。如果這個 master 任務(wù)失效了洋丐,可以從最后一個檢查點(checkpoint)開始啟動另一個master 進(jìn)程友绝。然而,由于只有一個 master 進(jìn)程郭宝,master 失效后再恢復(fù)是比較麻煩的粘室,因此我們現(xiàn)在的實現(xiàn)是
如果 master 失效卜范,就中止 MapReduce 運算∪В客戶可以檢查到這個狀態(tài)怀薛,并且可以根據(jù)需要重新執(zhí)行 MapReduce操作迷郑。
3.3.3 在失效方面的處理機制
當(dāng)用戶提供的 Map 和 Reduce 操作是輸入確定性函數(shù)(即相同的輸入產(chǎn)生相同的輸出)時嗡害,我們的分布式實現(xiàn)在任何情況下的輸出都和所有程序沒有出現(xiàn)任何錯誤、順序的執(zhí)行產(chǎn)生的輸出是一樣的十电。
我們依賴對 Map 和 Reduce 任務(wù)的輸出是原子提交的來完成這個特性鹃骂。每個工作中的任務(wù)把它的輸出寫到私有的臨時文件中畏线。每個 Reduce 任務(wù)生成一個這樣的文件寝殴,而每個 Map 任務(wù)則生成 R 個這樣的文件(一個 Reduce 任務(wù)對應(yīng)一個文件)蚣常。當(dāng)一個 Map 任務(wù)完成的時袖外,worker 發(fā)送一個包含 R 個臨時文件名的完成消息給 master曼验。如果 master 從一個已經(jīng)完成的 Map 任務(wù)再次接收到到一個完成消息鬓照,master 將忽略這個消息豺裆;否則臭猜,master 將這 R 個文件的名字記錄在數(shù)據(jù)結(jié)構(gòu)里轩勘。當(dāng) Reduce 任務(wù)完成時散休,Reduce worker 進(jìn)程以原子的方式把臨時文件重命名為最終的輸出文件。如果同一個 Reduce 任務(wù)在多臺機器上執(zhí)行裸违,針對同一個最終的輸出文件將有多個重命名操作執(zhí)行供汛。我們依賴底層文件系統(tǒng)提供的重命名操作的原子性來保證最終的文件系統(tǒng)狀態(tài)僅僅包含一個 Reduce 任務(wù)產(chǎn)生的數(shù)據(jù)紊馏。
使用 MapReduce 模型的程序員可以很容易的理解他們程序的行為朱监,因為我們絕大多數(shù)的 Map 和 Reduce操作是確定性的赫编,而且存在這樣的一個事實:我們的失效處理機制等價于一個順序的執(zhí)行的操作擂送。當(dāng) Map 或/和 Reduce 操作是不確定性的時候嘹吨,我們提供雖然較弱但是依然合理的處理機制蟀拷。當(dāng)使用非確定操作的時候萍聊,
一個 Reduce 任務(wù) R1 的輸出等價于一個非確定性程序順序執(zhí)行產(chǎn)生時的輸出寿桨。但是,另一個 Reduce 任務(wù) R2的輸出也許符合一個不同的非確定順序程序執(zhí)行產(chǎn)生的 R2 的輸出骑歹。
考慮 Map 任務(wù) M 和 Reduce 任務(wù) R1陵刹、R2 的情況衰琐。我們設(shè)定 e(Ri)是 Ri 已經(jīng)提交的執(zhí)行過程(有且僅有一個這樣的執(zhí)行過程)羡宙。當(dāng) e(R1)讀取了由 M 一次執(zhí)行產(chǎn)生的輸出狗热,而 e(R2)讀取了由 M 的另一次執(zhí)行產(chǎn)生的輸出匿刮,導(dǎo)致了較弱的失效處理熟丸。
我的點評: Map reduce的容錯不難處理光羞。因為這2個函數(shù)在很多情況是冪等啊怀大。如果一個WORKER失敗了化借,那么我們重啟另外一個WORKER去做原來那個WORKER的事情就可以蓖康。而MASTER失敗 是通過CHECK POINT來恢復(fù)钓瞭,這里會比較麻煩山涡。一般我們會返回客戶端失敗了鸭丛,然后重啟另一個MASTER來讓客戶端決定是不是要重啟整個MAP REDUCE任務(wù)鳞溉。
3.4 存儲位置
在我們的計算運行環(huán)境中熟菲,網(wǎng)絡(luò)帶寬是一個相當(dāng)匱乏的資源抄罕。我們通過盡量把輸入數(shù)據(jù)(由 GFS 管理)存儲在集群中機器的本地磁盤上來節(jié)省網(wǎng)絡(luò)帶寬呆贿。GFS 把每個文件按 64MB 一個 Block 分隔做入,每個 Block 保存在多臺機器上竟块,環(huán)境中就存放了多份拷貝(一般是 3 個拷貝)浪秘。MapReduce 的 master 在調(diào)度 Map 任務(wù)時會考慮
輸入文件的位置信息秫逝,盡量將一個 Map 任務(wù)調(diào)度在包含相關(guān)輸入數(shù)據(jù)拷貝的機器上執(zhí)行违帆;如果上述努力失敗了刷后,master 將嘗試在保存有輸入數(shù)據(jù)拷貝的機器附近的機器上執(zhí)行 Map 任務(wù)(例如尝胆,分配到一個和包含輸入數(shù)據(jù)的機器在一個 switch 里的 worker 機器上執(zhí)行)含衔。當(dāng)在一個足夠大的 cluster 集群上運行大型 MapReduce 操作的時候,大部分的輸入數(shù)據(jù)都能從本地機器讀取催享,因此消耗非常少的網(wǎng)絡(luò)帶寬因妙。
3.5 任務(wù)粒度
如前所述攀涵,我們把 Map 拆分成了 M 個片段汁果、把 Reduce 拆分成 R 個片段執(zhí)行据德。理想情況下棘利,M 和 R 應(yīng)當(dāng)比集群中 worker 的機器數(shù)量要多得多善玫。在每臺 worker 機器都執(zhí)行大量的不同任務(wù)能夠提高集群的動態(tài)的負(fù)載均衡能力茅郎,并且能夠加快故障恢復(fù)的速度:失效機器上執(zhí)行的大量 Map 任務(wù)都可以分布到所有其他的 worker
機器上去執(zhí)行系冗。
但是實際上掌敬,在我們的具體實現(xiàn)中對 M 和 R 的取值都有一定的客觀限制奔害,因為 master 必須執(zhí)行 O(M+R)次調(diào)度华临,并且在內(nèi)存中保存 O(MR)個狀態(tài)(對影響內(nèi)存使用的因素還是比較小的:O(MR)塊狀態(tài)雅潭,大概每對 Map 任務(wù)/Reduce 任務(wù) 1 個字節(jié)就可以了)寻馏。
更進(jìn)一步,R 值通常是由用戶指定的轰绵,因為每個 Reduce 任務(wù)最終都會生成一個獨立的輸出文件左腔。實際使用時我們也傾向于選擇合適的 M 值液样,以使得每一個獨立任務(wù)都是處理大約 16M 到 64M 的輸入數(shù)據(jù)(這樣鞭莽,上面描寫的輸入數(shù)據(jù)本地存儲優(yōu)化策略才最有效)澎怒,另外喷面,我們把 R 值設(shè)置為我們想使用的 worker 機器數(shù)量
的小的倍數(shù)惧辈。我們通常會用這樣的比例來執(zhí)行 MapReduce:M=200000咬像,R=5000县昂,使用 2000 臺 worker 機器
3.6 備用任務(wù)
影響一個 MapReduce 的總執(zhí)行時間最通常的因素是“落伍者”:在運算過程中,如果有一臺機器花了很長的時間才完成最后幾個 Map 或 Reduce 任務(wù)待讳,導(dǎo)致 MapReduce 操作總的執(zhí)行時間超過預(yù)期创淡。出現(xiàn)“落伍者”的原因非常多琳彩。比如:如果一個機器的硬盤出了問題露乏,在讀取的時候要經(jīng)常的進(jìn)行讀取糾錯操作瘟仿,導(dǎo)致讀取數(shù)據(jù)的速度從 30M/s 降低到 1M/s劳较。如果 cluster 的調(diào)度系統(tǒng)在這臺機器上又調(diào)度了其他的任務(wù)观蜗,由于 CPU嫂便、內(nèi)存岸售、本地硬盤和網(wǎng)絡(luò)帶寬等競爭因素的存在凸丸,導(dǎo)致執(zhí)行 MapReduce 代碼的執(zhí)行效率更加緩慢屎慢。我們最近遇到的一個問題是由于機器的初始化代碼有 bug腻惠,導(dǎo)致關(guān)閉了的處理器的緩存:在這些機器上執(zhí)行任務(wù)的性能和正常情況相差上百倍集灌。
我們有一個通用的機制來減少“落伍者”出現(xiàn)的情況腌零。當(dāng)一個 MapReduce 操作接近完成的時候益涧,master調(diào)度備用(backup)任務(wù)進(jìn)程來執(zhí)行剩下的闲询、處于處理中狀態(tài)(in-progress)的任務(wù)嘹裂。無論是最初的執(zhí)行進(jìn)程丁寄、還是備用(backup)任務(wù)進(jìn)程完成了任務(wù)伊磺,我們都把這個任務(wù)標(biāo)記成為已經(jīng)完成屑埋。我們調(diào)優(yōu)了這個機制摘能,通
常只會占用比正常操作多幾個百分點的計算資源严望。我們發(fā)現(xiàn)采用這樣的機制對于減少超大 MapReduce 操作的總處理時間效果顯著像吻。例如拨匆,在 5.3 節(jié)描述的排序任務(wù),在關(guān)閉掉備用任務(wù)的情況下要多花 44%的時間完成排序任務(wù)洪鸭。
我的點評: 這個優(yōu)化技巧非常值得借鑒览爵。有一些WORKER處理事情很慢蜓竹,如果我們把一個任務(wù)交給它嘶是,可能他會拖慢整個處理進(jìn)程聂喇,因為其他快的不得不等他做完希太。與其等待不如讓快的也去試試看那個慢的在做的任務(wù)誊辉,誰先完成就算完成即可。
4. 技巧
雖然簡單的 Map 和 Reduce 函數(shù)提供的基本功能已經(jīng)能夠滿足大部分的計算需要蛙紫,我們還是發(fā)掘出了一些有價值的擴展功能惊来。本節(jié)將描述這些擴展功能棺滞。
4.1 分區(qū)函數(shù)
MapReduce 的使用者通常會指定 Reduce 任務(wù)和 Reduce 任務(wù)輸出文件的數(shù)量(R)裁蚁。我們在中間 key 上使用分區(qū)函數(shù)來對數(shù)據(jù)進(jìn)行分區(qū),之后再輸入到后續(xù)任務(wù)執(zhí)行進(jìn)程继准。一個缺省的分區(qū)函數(shù)是使用 hash 方法(比如枉证,hash(key) mod R)進(jìn)行分區(qū)。hash 方法能產(chǎn)生非常平衡的分區(qū)移必。然而,有的時候,其它的一些分區(qū)函數(shù)對 key值進(jìn)行的分區(qū)將非常有用秒赤。比如猪瞬,輸出的 key 值是 URLs,我們希望每個主機的所有條目保持在同一個輸出文件中入篮。為了支持類似的情況陈瘦,MapReduce庫的用戶需要提供專門的分區(qū)函數(shù)。例如潮售,使用“hash(Hostname(urlkey))mod R”作為分區(qū)函數(shù)就可以把所有來自同一個主機的 URLs 保存在同一個輸出文件中痊项。
4.2 順序保證
我們確保在給定的分區(qū)中,中間 key/value pair 數(shù)據(jù)的處理順序是按照 key 值增量順序處理的酥诽。這樣的順序保證對每個分成生成一個有序的輸出文件鞍泉,這對于需要對輸出文件按 key 值隨機存取的應(yīng)用非常有意義,對在排序輸出的數(shù)據(jù)集也很有幫助肮帐。
4.3 Combiner 函數(shù)
在某些情況下咖驮,Map 函數(shù)產(chǎn)生的中間 key 值的重復(fù)數(shù)據(jù)會占很大的比重,并且泪姨,用戶自定義的 Reduce 函數(shù)滿足結(jié)合律和交換律游沿。在 2.1 節(jié)的詞數(shù)統(tǒng)計程序是個很好的例子饰抒。由于詞頻率傾向于一個 zipf 分布(齊夫分布)肮砾,每個 Map 任務(wù)將產(chǎn)生成千上萬個這樣的記錄<the,1>。所有的這些記錄將通過網(wǎng)絡(luò)被發(fā)送到一個單獨的
Reduce 任務(wù)袋坑,然后由這個 Reduce 任務(wù)把所有這些記錄累加起來產(chǎn)生一個數(shù)字仗处。我們允許用戶指定一個可選的 combiner 函數(shù),combiner 函數(shù)首先在本地將這些記錄進(jìn)行一次合并枣宫,然后將合并的結(jié)果再通過網(wǎng)絡(luò)發(fā)送出去婆誓。
Combiner 函數(shù)在每臺執(zhí)行 Map 任務(wù)的機器上都會被執(zhí)行一次。一般情況下也颤,Combiner 和 Reduce 函數(shù)是一樣的洋幻。Combiner 函數(shù)和 Reduce 函數(shù)之間唯一的區(qū)別是 MapReduce 庫怎樣控制函數(shù)的輸出。Reduce 函數(shù)的輸出被保存在最終的輸出文件里翅娶,而 Combiner 函數(shù)的輸出被寫到中間文件里文留,然后被發(fā)送給 Reduce 任務(wù)。
部分的合并中間結(jié)果可以顯著的提高一些 MapReduce 操作的速度竭沫。附錄 A 包含一個使用 combiner 函數(shù)的例子燥翅。
4.4 輸入和輸出的類型
MapReduce 庫支持幾種不同的格式的輸入數(shù)據(jù)。比如蜕提,文本模式的輸入數(shù)據(jù)的每一行被視為是一個key/value pair森书。key 是文件的偏移量,value 是那一行的內(nèi)容。另外一種常見的格式是以 key 進(jìn)行排序來存儲的 key/value pair 的序列凛膏。每種輸入類型的實現(xiàn)都必須能夠把輸入數(shù)據(jù)分割成數(shù)據(jù)片段杨名,該數(shù)據(jù)片段能夠由單獨的 Map 任務(wù)來進(jìn)行后續(xù)處理(例如,文本模式的范圍分割必須確保僅僅在每行的邊界進(jìn)行范圍分割)猖毫。雖然大多數(shù) MapReduce 的使用者僅僅使用很少的預(yù)定義輸入類型就滿足要求了镣煮,但是使用者依然可以通過提供一個簡單的 Reader 接口實現(xiàn)就能夠支持一個新的輸入類型。
Reader 并非一定要從文件中讀取數(shù)據(jù)鄙麦,比如典唇,我們可以很容易的實現(xiàn)一個從數(shù)據(jù)庫里讀記錄的 Reader,或者從內(nèi)存中的數(shù)據(jù)結(jié)構(gòu)讀取數(shù)據(jù)的 Reader胯府。
類似的介衔,我們提供了一些預(yù)定義的輸出數(shù)據(jù)的類型,通過這些預(yù)定義類型能夠產(chǎn)生不同格式的數(shù)據(jù)骂因。用戶采用類似添加新的輸入數(shù)據(jù)類型的方式增加新的輸出類型炎咖。
4.5 副作用
在某些情況下,MapReduce 的使用者發(fā)現(xiàn)寒波,如果在 Map 和/或 Reduce 操作過程中增加輔助的輸出文件會比較省事乘盼。我們依靠程序 writer 把這種“副作用”變成原子的和冪等的。通常應(yīng)用程序首先把輸出結(jié)果寫到一個臨時文件中俄烁,在輸出全部數(shù)據(jù)之后绸栅,在使用系統(tǒng)級的原子操作 rename 重新命名這個臨時文件。
如果一個任務(wù)產(chǎn)生了多個輸出文件页屠,我們沒有提供類似兩階段提交的原子操作支持這種情況粹胯。因此,對于會產(chǎn)生多個輸出文件辰企、并且對于跨文件有一致性要求的任務(wù)风纠,都必須是確定性的任務(wù)。但是在實際應(yīng)用過程中牢贸,這個限制還沒有給我們帶來過麻煩竹观。
4.6 跳過損壞的記錄
有時候,用戶程序中的 bug 導(dǎo)致 Map 或者 Reduce 函數(shù)在處理某些記錄的時候 crash 掉潜索,MapReduce 操作無法順利完成臭增。慣常的做法是修復(fù) bug 后再次執(zhí)行 MapReduce 操作,但是帮辟,有時候找出這些 bug 并修復(fù)它們不是一件容易的事情速址;這些 bug 也許是在第三方庫里邊,而我們手頭沒有這些庫的源代碼由驹。而且在很多時候芍锚,忽略一些有問題的記錄也是可以接受的昔园,比如在一個巨大的數(shù)據(jù)集上進(jìn)行統(tǒng)計分析的時候。我們提供了一種執(zhí)行模式并炮,在這種模式下默刚,為了保證保證整個處理能繼續(xù)進(jìn)行,MapReduce會檢測哪些記錄導(dǎo)致確定性的crash逃魄,并且跳過這些記錄不處理荤西。
每個 worker 進(jìn)程都設(shè)置了信號處理函數(shù)捕獲內(nèi)存段異常(segmentation violation)和總線錯誤(bus error)。
在執(zhí)行 Map 或者 Reduce 操作之前伍俘,MapReduce 庫通過全局變量保存記錄序號邪锌。如果用戶程序觸發(fā)了一個系統(tǒng)信號,消息處理函數(shù)將用“最后一口氣”通過 UDP 包向 master 發(fā)送處理的最后一條記錄的序號癌瘾。當(dāng) master看到在處理某條特定記錄不止失敗一次時觅丰,master 就標(biāo)志著條記錄需要被跳過,并且在下次重新執(zhí)行相關(guān)的
Map 或者 Reduce 任務(wù)的時候跳過這條記錄妨退。
4.7 本地執(zhí)行
調(diào)試 Map 和 Reduce 函數(shù)的 bug 是非常困難的妇萄,因為實際執(zhí)行操作時不但是分布在系統(tǒng)中執(zhí)行的,而且通常是在好幾千臺計算機上執(zhí)行咬荷,具體的執(zhí)行位置是由 master 進(jìn)行動態(tài)調(diào)度的冠句,這又大大增加了調(diào)試的難度。
為了簡化調(diào)試幸乒、profile 和小規(guī)模測試懦底,我們開發(fā)了一套 MapReduce 庫的本地實現(xiàn)版本,通過使用本地版本的MapReduce 庫逝变,MapReduce 操作在本地計算機上順序的執(zhí)行基茵。用戶可以控制 MapReduce 操作的執(zhí)行,可以把操作限制到特定的 Map 任務(wù)上壳影。用戶通過設(shè)定特別的標(biāo)志來在本地執(zhí)行他們的程序,之后就可以很容易的使用本地調(diào)試和測試工具(比如 gdb)弥臼。
4.8 狀態(tài)信息
master 使用嵌入式的 HTTP 服務(wù)器(如 Jetty)顯示一組狀態(tài)信息頁面宴咧,用戶可以監(jiān)控各種執(zhí)行狀態(tài)。狀態(tài)信息頁面顯示了包括計算執(zhí)行的進(jìn)度径缅,比如已經(jīng)完成了多少任務(wù)掺栅、有多少任務(wù)正在處理、輸入的字節(jié)數(shù)纳猪、中間數(shù)據(jù)的字節(jié)數(shù)氧卧、輸出的字節(jié)數(shù)、處理百分比等等氏堤。頁面還包含了指向每個任務(wù)的 stderr 和 stdout 文件的鏈
接沙绝。用戶根據(jù)這些數(shù)據(jù)預(yù)測計算需要執(zhí)行大約多長時間、是否需要增加額外的計算資源。這些頁面也可以用來分析什么時候計算執(zhí)行的比預(yù)期的要慢闪檬。
另外星著,處于最頂層的狀態(tài)頁面顯示了哪些 worker 失效了,以及他們失效的時候正在運行的 Map 和 Reduce任務(wù)粗悯。這些信息對于調(diào)試用戶代碼中的 bug 很有幫助虚循。
4.9 計數(shù)器
MapReduce 庫使用計數(shù)器統(tǒng)計不同事件發(fā)生次數(shù)。比如样傍,用戶可能想統(tǒng)計已經(jīng)處理了多少個單詞横缔、已經(jīng)索引的多少篇 German 文檔等等。
為了使用這個特性衫哥,用戶在程序中創(chuàng)建一個命名的計數(shù)器對象剪廉,在 Map 和 Reduce 函數(shù)中相應(yīng)的增加計數(shù)器的值。例如:
這些計數(shù)器的值周期性的從各個單獨的worker機器上傳遞給master(附加在ping的應(yīng)答包中傳遞)炕檩。master把執(zhí)行成功的 Map 和 Reduce 任務(wù)的計數(shù)器值進(jìn)行累計斗蒋,當(dāng) MapReduce 操作完成之后,返回給用戶代碼笛质。
計數(shù)器當(dāng)前的值也會顯示在 master 的狀態(tài)頁面上泉沾,這樣用戶就可以看到當(dāng)前計算的進(jìn)度。當(dāng)累加計數(shù)器的值的時候妇押,master 要檢查重復(fù)運行的 Map 或者 Reduce 任務(wù)跷究,避免重復(fù)累加(之前提到的備用任務(wù)和失效后重新執(zhí)行任務(wù)這兩種情況會導(dǎo)致相同的任務(wù)被多次執(zhí)行)。
有些計數(shù)器的值是由 MapReduce 庫自動維持的敲霍,比如已經(jīng)處理的輸入的key/value pair 的數(shù)量俊马、輸出的key/value pair 的數(shù)量等等。
計數(shù)器機制對于 MapReduce 操作的完整性檢查非常有用肩杈。比如柴我,在某些MapReduce 操作中,用戶需要確保輸出的 key value pair 精確的等于輸入的 key value pair扩然,或者處理的 German 文檔數(shù)量在處理的整個文檔數(shù)量中屬于合理范圍艘儒。