Shuffle的正常意思是洗牌或弄亂氏义,Hadoop官網提供了下圖來描述該過程:
但是實際過程比上圖描述的復雜得多惯悠。Shuffle的大致范圍就是: 怎樣把map task的輸出結果有效地傳送到reduce端克婶。也可以這樣理解情萤, Shuffle描述著數據從map task輸出到reduce task輸入的這段過程筋岛。
假設以WordCount為例睁宰,并假設它有8個map task和3個reduce task寝凌。從上圖看出较木,Shuffle過程橫跨map與reduce兩端,所以下面我也會分兩部分來展開刹前。
一喇喉、Map端
整個流程分了四步。簡單些可以這樣說耍目,每個map task都有一個內存緩沖區(qū),存儲著map的輸出結果邪驮,當緩沖區(qū)快滿的時候需要將緩沖區(qū)的數據以一個臨時文件的方式存放到磁盤毅访,當整個map task結束后再對磁盤中這個map task產生的所有臨時文件做合并沮榜,生成最終的正式輸出文件,然后等待reduce task來拉數據喻粹。
當然這里的每一步都可能包含著多個步驟與細節(jié)蟆融,下面我對細節(jié)來一一說明:
1、在map task執(zhí)行時守呜,它的輸入數據來源于HDFS的block型酥,當然在MapReduce概念中,map task只讀取split查乒。Split與block的對應關系可能是多對一弥喉,默認是一對一。在WordCount例子里侣颂,假設map的輸入數據都是像“aaa”這樣的字符串档桃。
2、在經過mapper的運行后,我們得知mapper的輸出是這樣一個key/value對: key是“aaa”嘹屯, value是數值1钧栖。因為當前map端只做加1的操作,在reduce task里才去合并結果集。前面我們知道這個job有3個reduce task依溯,到底當前的“aaa”應該交由哪個reduce去做呢,是需要現在決定的。
MapReduce提供Partitioner接口,它的作用就是根據key或value及reduce的數量來決定當前的這對輸出數據最終應該交由哪個reduce task處理。默認對key hash后再以reduce task數量取模。默認的取模方式只是為了平均reduce的處理能力辕羽,如果用戶自己對Partitioner有需求,可以訂制并設置到job上。
在我們的例子中,“aaa”經過Partitioner后返回0叔遂,也就是這對值應當交由第一個reducer來處理。接下來,需要將數據寫入內存緩沖區(qū)中幔嫂,緩沖區(qū)的作用是批量收集map結果呢蔫,減少磁盤IO的影響绽昏。我們的key/value對以及Partition的結果都會被寫入緩沖區(qū)。當然寫入之前,key與value值都會被序列化成字節(jié)數組卷员。
3岩瘦、這個內存緩沖區(qū)是有大小限制的箫津,默認是100MB饼拍。當map task的輸出結果很多時,就可能會撐爆內存,所以需要在一定條件下將緩沖區(qū)中的數據臨時寫入磁盤,然后重新利用這塊緩沖區(qū)惭蹂。這個從內存往磁盤寫數據的過程被稱為Spill廷雅,中文可譯為溢寫,字面意思很直觀甸私。這個溢寫是由單獨線程來完成弃鸦,不影響往緩沖區(qū)寫map結果的線程颜说。溢寫線程啟動時不應該阻止map的結果輸出玄妈,所以整個緩沖區(qū)有個溢寫的比例spill.percent拟蜻。這個比例默認是0.8,也就是當緩沖區(qū)的數據已經達到閾值(buffer size * spill percent = 100MB * 0.8 = 80MB)袱巨,溢寫線程啟動焰盗,鎖定這80MB的內存,執(zhí)行溢寫過程活烙。Map task的輸出結果還可以往剩下的20MB內存中寫气笙,互不影響。
當溢寫線程啟動后客燕,需要對這80MB空間內的key做排序(Sort)。排序是MapReduce模型默認的行為嗦玖,這里的排序也是對序列化的字節(jié)做的排序。
在這里我們可以想想娱局,因為map task的輸出是需要發(fā)送到不同的reduce端去废酷,而內存緩沖區(qū)沒有對將發(fā)送到相同reduce端的數據做合并澈蟆,那么這種合并應該是體現是磁盤文件中的疲憋。從官方圖上也可以看到寫到磁盤中的溢寫文件是對不同的reduce端的數值做過合并缚柳。所以溢寫過程一個很重要的細節(jié)在于,如果有很多個key/value對需要發(fā)送到某個reduce端去,那么需要將這些key/value值拼接到一塊裁奇,減少與partition相關的索引記錄。
在針對每個reduce端而合并數據時,有些數據可能像這樣:“aaa”/1苍蔬, “aaa”/1。對于WordCount例子抓狭,就是簡單地統計單詞出現的次數伯病,如果在同一個map task的結果中有很多個像“aaa”一樣出現多次的key,我們就應該把它們的值合并到一塊否过,這個過程叫reduce也叫combine午笛。但MapReduce的術語中,reduce只指reduce端執(zhí)行從多個map task取數據做計算的過程苗桂。除reduce外药磺,非正式地合并數據只能算做combine了。其實大家知道的煤伟,MapReduce中將Combiner等同于Reducer癌佩。
如果client設置過Combiner,那么現在就是使用Combiner的時候了便锨。將有相同key的key/value對的value加起來围辙,減少溢寫到磁盤的數據量。
Combiner會優(yōu)化MapReduce的中間結果放案,所以它在整個模型中會多次使用姚建。那哪些場景才能使用Combiner呢?從這里分析吱殉,Combiner的輸出是Reducer的輸入掸冤,Combiner絕不能改變最終的計算結果。所以從我的想法來看友雳,Combiner只應該用于那種Reduce的輸入key/value與輸出key/value類型完全一致稿湿,且不影響最終結果的場景。比如累加押赊,最大值等缎罢。Combiner的使用一定得慎重,如果用好,它對job執(zhí)行效率有幫助策精,反之會影響reduce的最終結果。
4崇棠、每次溢寫會在磁盤上生成一個溢寫文件咽袜,如果map的輸出結果真的很大,有多次這樣的溢寫發(fā)生枕稀,磁盤上相應的就會有多個溢寫文件存在询刹。當map task真正完成時,內存緩沖區(qū)中的數據也全部溢寫到磁盤中形成一個溢寫文件萎坷。最終磁盤中會至少有一個這樣的溢寫文件存在(如果map的輸出結果很少凹联,當map執(zhí)行完成時,只會產生一個溢寫文件)哆档,因為最終的文件只有一個蔽挠,所以需要將這些溢寫文件歸并到一起,這個過程就叫做Merge瓜浸。Merge是怎樣的澳淑?如前面的例子,“aaa”從某個map task讀取過來時值是5插佛,從另外一個map 讀取時值是8杠巡,因為它們有相同的key,所以得merge成group雇寇。什么是group氢拥。對于“aaa”就是像這樣的:{“aaa”, [5, 8, 2,]},數組中的值就是從不同溢寫文件中讀取出來的锨侯,然后再把這些值加起來嫩海。請注意,因為merge是將多個溢寫文件合并到一個文件识腿,所以可能也有相同的key存在出革,在這個過程中如果client設置過Combiner,也會使用Combiner來合并相同的key渡讼。
至此骂束,map端的所有工作都已結束,最終生成的這個文件也存放在TaskTracker夠得著的某個本地目錄內成箫。每個reduce task不斷地通過RPC從JobTracker那里獲取map task是否完成的信息展箱,如果reduce task得到通知,獲知某臺TaskTracker上的map task執(zhí)行完成蹬昌,Shuffle的后半段過程開始啟動混驰。
二、reduce端
簡單地說,reduce task在執(zhí)行之前的工作就是不斷地拉取當前job里每個map task的最終結果栖榨,然后對從不同地方拉取過來的數據不斷地做merge昆汹,也最終形成一個文件作為reduce task的輸入文件。見下圖:
如map 端的細節(jié)圖婴栽,Shuffle在reduce端的過程也能用圖上標明的三點來概括满粗。當前reduce copy數據的前提是它要從JobTracker獲得有哪些map task已執(zhí)行結束,這段過程不表愚争,有興趣的朋友可以關注下映皆。Reducer真正運行之前,所有的時間都是在拉取數據轰枝,做merge捅彻,且不斷重復地在做。如前面的方式一樣鞍陨,下面我也分段地描述reduce 端的Shuffle細節(jié):
1步淹、Copy過程
簡單地拉取數據。Reduce進程啟動一些數據copy線程(Fetcher)湾戳,通過HTTP方式請求map task所在的TaskTracker獲取map task的輸出文件贤旷。因為map task早已結束,這些文件就歸TaskTracker管理在本地磁盤中砾脑。
2幼驶、Merge階段
這里的merge如map端的merge動作,只是數組中存放的是不同map端copy來的數值韧衣。Copy過來的數據會先放入內存緩沖區(qū)中盅藻,這里的緩沖區(qū)大小要比map端的更為靈活,它基于JVM的heap size設置畅铭,因為Shuffle階段Reducer不運行氏淑,所以應該把絕大部分的內存都給Shuffle用。這里需要強調的是硕噩,merge有三種形式:
1)內存到內存
2)內存到磁盤
3)磁盤到磁盤假残。
默認情況下第一種形式不啟用,讓人比較困惑炉擅,是吧辉懒。當內存中的數據量到達一定閾值,就啟動內存到磁盤的merge谍失。與map 端類似眶俩,這也是溢寫的過程,這個過程中如果你設置有Combiner快鱼,也是會啟用的颠印,然后在磁盤中生成了眾多的溢寫文件纲岭。第二種merge方式一直在運行,直到沒有map端的數據時才結束线罕,然后啟動第三種磁盤到磁盤的merge方式生成最終的那個文件止潮。
3、Reducer的輸入文件
不斷地merge后钞楼,最后會生成一個“最終文件”沽翔。為什么加引號?因為這個文件可能存在于磁盤上窿凤,也可能存在于內存中。對我們來說跨蟹,當然希望它存放于內存中雳殊,直接作為Reducer的輸入,但默認情況下窗轩,這個文件是存放于磁盤中的夯秃。當Reducer的輸入文件已定,整個Shuffle才最終結束痢艺。然后就是Reducer執(zhí)行仓洼,把結果放到HDFS上。