MapReduce的Shuffle過程介紹
Shuffle的本義是洗牌甸箱、混洗糯而,把一組有一定規(guī)則的數(shù)據(jù)盡量轉(zhuǎn)換成一組無(wú)規(guī)則的數(shù)據(jù)账嚎,越隨機(jī)越好涂身。MapReduce中的Shuffle更像是洗牌的逆過程雄卷,把一組無(wú)規(guī)則的數(shù)據(jù)盡量轉(zhuǎn)換成一組具有一定規(guī)則的數(shù)據(jù)。
為什么MapReduce計(jì)算模型需要Shuffle過程蛤售?我們都知道MapReduce計(jì)算模型一般包括兩個(gè)重要的階段:Map是映射丁鹉,負(fù)責(zé)數(shù)據(jù)的過濾分發(fā);Reduce是規(guī)約悴能,負(fù)責(zé)數(shù)據(jù)的計(jì)算歸并揣钦。Reduce的數(shù)據(jù)來源于Map,Map的輸出即是Reduce的輸入搜骡,Reduce需要通過Shuffle來獲取數(shù)據(jù)拂盯。
從Map輸出到Reduce輸入的整個(gè)過程可以廣義地稱為Shuffle。Shuffle橫跨Map端和Reduce端记靡,在Map端包括Spill過程谈竿,在Reduce端包括copy和sort過程,如圖所示:
Spill過程
Spill過程包括輸出摸吠、排序空凸、溢寫、合并等步驟寸痢,如圖所示:
Collect
每個(gè)Map任務(wù)不斷地以對(duì)的形式把數(shù)據(jù)輸出到在內(nèi)存中構(gòu)造的一個(gè)環(huán)形數(shù)據(jù)結(jié)構(gòu)中呀洲。使用環(huán)形數(shù)據(jù)結(jié)構(gòu)是為了更有效地使用內(nèi)存空間,在內(nèi)存中放置盡可能多的數(shù)據(jù)。
這個(gè)數(shù)據(jù)結(jié)構(gòu)其實(shí)就是個(gè)字節(jié)數(shù)組道逗,叫Kvbuffer兵罢,名如其義,但是這里面不光放置了
value>數(shù)據(jù)滓窍,還放置了一些索引數(shù)據(jù)卖词,給放置索引數(shù)據(jù)的區(qū)域起了一個(gè)Kvmeta的別名,在Kvbuffer的一塊區(qū)域上穿了一個(gè)IntBuffer(字節(jié)序采用的是平臺(tái)自身的字節(jié)序)的馬甲吏夯。
value>數(shù)據(jù)區(qū)域和索引數(shù)據(jù)區(qū)域在Kvbuffer中是相鄰不重疊的兩個(gè)區(qū)域此蜈,用一個(gè)分界點(diǎn)來劃分兩者,分界點(diǎn)不是亙古不變的噪生,而是每次Spill之后都會(huì)更新一次裆赵。初始的分界點(diǎn)是0,
value>數(shù)據(jù)的存儲(chǔ)方向是向上增長(zhǎng)跺嗽,索引數(shù)據(jù)的存儲(chǔ)方向是向下增長(zhǎng)战授,如圖所示:
Kvbuffer的存放指針bufindex是一直悶著頭地向上增長(zhǎng),比如bufindex初始值為0抛蚁,一個(gè)Int型的key寫完之后陈醒,bufindex增長(zhǎng)為4惕橙,一個(gè)Int型的value寫完之后瞧甩,bufindex增長(zhǎng)為8。
索引是對(duì)
value>在kvbuffer中的索引弥鹦,是個(gè)四元組肚逸,包括:value的起始位置、key的起始位置彬坏、partition值朦促、value的長(zhǎng)度,占用四個(gè)Int長(zhǎng)度栓始,Kvmeta的存放指針Kvindex每次都是向下跳四個(gè)“格子”务冕,然后再向上一個(gè)格子一個(gè)格子地填充四元組的數(shù)據(jù)。比如Kvindex初始位置是-4幻赚,當(dāng)?shù)谝粋€(gè)
value>寫完之后禀忆,(Kvindex+0)的位置存放value的起始位置、(Kvindex+1)的位置存放key的起始位置落恼、(Kvindex+2)的位置存放partition的值箩退、(Kvindex+3)的位置存放value的長(zhǎng)度,然后Kvindex跳到-8位置佳谦,等第二個(gè)
value>和索引寫完之后戴涝,Kvindex跳到-32位置。
Kvbuffer的大小雖然可以通過參數(shù)設(shè)置,但是總共就那么大啥刻,
value>和索引不斷地增加奸鸯,加著加著,Kvbuffer總有不夠用的那天可帽,那怎么辦府喳?把數(shù)據(jù)從內(nèi)存刷到磁盤上再接著往內(nèi)存寫數(shù)據(jù),把Kvbuffer中的數(shù)據(jù)刷到磁盤上的過程就叫Spill蘑拯,多么明了的叫法钝满,內(nèi)存中的數(shù)據(jù)滿了就自動(dòng)地spill到具有更大空間的磁盤。
關(guān)于Spill觸發(fā)的條件申窘,也就是Kvbuffer用到什么程度開始Spill弯蚜,還是要講究一下的。如果把Kvbuffer用得死死得剃法,一點(diǎn)縫都不剩的時(shí)候再開始Spill碎捺,那Map任務(wù)就需要等Spill完成騰出空間之后才能繼續(xù)寫數(shù)據(jù);如果Kvbuffer只是滿到一定程度贷洲,比如80%的時(shí)候就開始Spill收厨,那在Spill的同時(shí),Map任務(wù)還能繼續(xù)寫數(shù)據(jù)优构,如果Spill夠快诵叁,Map可能都不需要為空閑空間而發(fā)愁。兩利相衡取其大钦椭,一般選擇后者拧额。
Spill這個(gè)重要的過程是由Spill線程承擔(dān),Spill線程從Map任務(wù)接到“命令”之后就開始正式干活彪腔,干的活叫SortAndSpill侥锦,原來不僅僅是Spill,在Spill之前還有個(gè)頗具爭(zhēng)議性的Sort德挣。
Sort
先把Kvbuffer中的數(shù)據(jù)按照partition值和key兩個(gè)關(guān)鍵字升序排序恭垦,移動(dòng)的只是索引數(shù)據(jù),排序結(jié)果是Kvmeta中數(shù)據(jù)按照partition為單位聚集在一起格嗅,同一partition內(nèi)的按照key有序番挺。
Spill
Spill線程為這次Spill過程創(chuàng)建一個(gè)磁盤文件:從所有的本地目錄中輪訓(xùn)查找能存儲(chǔ)這么大空間的目錄,找到之后在其中創(chuàng)建一個(gè)類似于“spill12.out”的文件吗浩。Spill線程根據(jù)排過序的Kvmeta挨個(gè)partition的把
value>數(shù)據(jù)吐到這個(gè)文件中建芙,一個(gè)partition對(duì)應(yīng)的數(shù)據(jù)吐完之后順序地吐下個(gè)partition,直到把所有的partition遍歷完懂扼。一個(gè)partition在文件中對(duì)應(yīng)的數(shù)據(jù)也叫段(segment)禁荸。
所有的partition對(duì)應(yīng)的數(shù)據(jù)都放在這個(gè)文件里右蒲,雖然是順序存放的,但是怎么直接知道某個(gè)partition在這個(gè)文件中存放的起始位置呢赶熟?強(qiáng)大的索引又出場(chǎng)了瑰妄。有一個(gè)三元組記錄某個(gè)partition對(duì)應(yīng)的數(shù)據(jù)在這個(gè)文件中的索引:起始位置、原始數(shù)據(jù)長(zhǎng)度映砖、壓縮之后的數(shù)據(jù)長(zhǎng)度间坐,一個(gè)partition對(duì)應(yīng)一個(gè)三元組。然后把這些索引信息存放在內(nèi)存中邑退,如果內(nèi)存中放不下了竹宋,后續(xù)的索引信息就需要寫到磁盤文件中了:從所有的本地目錄中輪訓(xùn)查找能存儲(chǔ)這么大空間的目錄,找到之后在其中創(chuàng)建一個(gè)類似于“spill12.out.index”的文件地技,文件中不光存儲(chǔ)了索引數(shù)據(jù)蜈七,還存儲(chǔ)了crc32的校驗(yàn)數(shù)據(jù)。(spill12.out.index不一定在磁盤上創(chuàng)建莫矗,如果內(nèi)存(默認(rèn)1M空間)中能放得下就放在內(nèi)存中飒硅,即使在磁盤上創(chuàng)建了,和spill12.out文件也不一定在同一個(gè)目錄下作谚。)
每一次Spill過程就會(huì)最少生成一個(gè)out文件三娩,有時(shí)還會(huì)生成index文件,Spill的次數(shù)也烙印在文件名中妹懒。索引文件和數(shù)據(jù)文件的對(duì)應(yīng)關(guān)系如下圖所示:
話分兩端雀监,在Spill線程如火如荼的進(jìn)行SortAndSpill工作的同時(shí),Map任務(wù)不會(huì)因此而停歇彬伦,而是一無(wú)既往地進(jìn)行著數(shù)據(jù)輸出滔悉。Map還是把數(shù)據(jù)寫到kvbuffer中,那問題就來了:只顧著悶頭按照bufindex指針向上增長(zhǎng)单绑,kvmeta只顧著按照Kvindex向下增長(zhǎng),是保持指針起始位置不變繼續(xù)跑呢曹宴,還是另謀它路搂橙?如果保持指針起始位置不變斋日,很快bufindex和Kvindex就碰頭了止喷,碰頭之后再重新開始或者移動(dòng)內(nèi)存都比較麻煩吠各,不可取叭首。Map取kvbuffer中剩余空間的中間位置惜互,用這個(gè)位置設(shè)置為新的分界點(diǎn)甲葬,bufindex指針移動(dòng)到這個(gè)分界點(diǎn)撇寞,Kvindex移動(dòng)到這個(gè)分界點(diǎn)的-16位置绍撞,然后兩者就可以和諧地按照自己既定的軌跡放置數(shù)據(jù)了礁芦,當(dāng)Spill完成蜻韭,空間騰出之后悼尾,不需要做任何改動(dòng)繼續(xù)前進(jìn)。分界點(diǎn)的轉(zhuǎn)換如下圖所示:
Map任務(wù)總要把輸出的數(shù)據(jù)寫到磁盤上肖方,即使輸出數(shù)據(jù)量很小在內(nèi)存中全部能裝得下闺魏,在最后也會(huì)把數(shù)據(jù)刷到磁盤上。
Merge
Map任務(wù)如果輸出數(shù)據(jù)量很大俯画,可能會(huì)進(jìn)行好幾次Spill析桥,out文件和Index文件會(huì)產(chǎn)生很多,分布在不同的磁盤上艰垂。最后把這些文件進(jìn)行合并的merge過程閃亮登場(chǎng)泡仗。
Merge過程怎么知道產(chǎn)生的Spill文件都在哪了呢?從所有的本地目錄上掃描得到產(chǎn)生的Spill文件猜憎,然后把路徑存儲(chǔ)在一個(gè)數(shù)組里沮焕。Merge過程又怎么知道Spill的索引信息呢?沒錯(cuò)拉宗,也是從所有的本地目錄上掃描得到Index文件峦树,然后把索引信息存儲(chǔ)在一個(gè)列表里。到這里旦事,又遇到了一個(gè)值得納悶的地方魁巩。在之前Spill過程中的時(shí)候?yàn)槭裁床恢苯影堰@些信息存儲(chǔ)在內(nèi)存中呢,何必又多了這步掃描的操作姐浮?特別是Spill的索引數(shù)據(jù)谷遂,之前當(dāng)內(nèi)存超限之后就把數(shù)據(jù)寫到磁盤,現(xiàn)在又要從磁盤把這些數(shù)據(jù)讀出來卖鲤,還是需要裝到更多的內(nèi)存中肾扰。之所以多此一舉,是因?yàn)檫@時(shí)kvbuffer這個(gè)內(nèi)存大戶已經(jīng)不再使用可以回收蛋逾,有內(nèi)存空間來裝這些數(shù)據(jù)了集晚。(對(duì)于內(nèi)存空間較大的土豪來說,用內(nèi)存來省卻這兩個(gè)io步驟還是值得考慮的区匣。)
然后為merge過程創(chuàng)建一個(gè)叫file.out的文件和一個(gè)叫file.out.Index的文件用來存儲(chǔ)最終的輸出和索引偷拔。
一個(gè)partition一個(gè)partition的進(jìn)行合并輸出。對(duì)于某個(gè)partition來說亏钩,從索引列表中查詢這個(gè)partition對(duì)應(yīng)的所有索引信息莲绰,每個(gè)對(duì)應(yīng)一個(gè)段插入到段列表中。也就是這個(gè)partition對(duì)應(yīng)一個(gè)段列表姑丑,記錄所有的Spill文件中對(duì)應(yīng)的這個(gè)partition那段數(shù)據(jù)的文件名蛤签、起始位置、長(zhǎng)度等等栅哀。
然后對(duì)這個(gè)partition對(duì)應(yīng)的所有的segment進(jìn)行合并震肮,目標(biāo)是合并成一個(gè)segment称龙。當(dāng)這個(gè)partition對(duì)應(yīng)很多個(gè)segment時(shí),會(huì)分批地進(jìn)行合并:先從segment列表中把第一批取出來钙蒙,以key為關(guān)鍵字放置成最小堆茵瀑,然后從最小堆中每次取出最小的<key,value>輸出到一個(gè)臨時(shí)文件中,這樣就把這一批段合并成一個(gè)臨時(shí)的段躬厌,把它加回到segment列表中马昨;再?gòu)膕egment列表中把第二批取出來合并輸出到一個(gè)臨時(shí)segment,把其加入到列表中扛施;這樣往復(fù)執(zhí)行鸿捧,直到剩下的段是一批,輸出到最終的文件中疙渣。
最終的索引數(shù)據(jù)仍然輸出到Index文件中匙奴。
Map端的Shuffle過程到此結(jié)束。
Copy
Reduce任務(wù)通過HTTP向各個(gè)Map任務(wù)拖取它所需要的數(shù)據(jù)妄荔。每個(gè)節(jié)點(diǎn)都會(huì)啟動(dòng)一個(gè)常駐的HTTP
server泼菌,其中一項(xiàng)服務(wù)就是響應(yīng)Reduce拖取Map數(shù)據(jù)。當(dāng)有MapOutput的HTTP請(qǐng)求過來的時(shí)候啦租,HTTP
server就讀取相應(yīng)的Map輸出文件中對(duì)應(yīng)這個(gè)Reduce部分的數(shù)據(jù)通過網(wǎng)絡(luò)流輸出給Reduce哗伯。
Reduce任務(wù)拖取某個(gè)Map對(duì)應(yīng)的數(shù)據(jù),如果在內(nèi)存中能放得下這次數(shù)據(jù)的話就直接把數(shù)據(jù)寫到內(nèi)存中篷角。Reduce要向每個(gè)Map去拖取數(shù)據(jù)焊刹,在內(nèi)存中每個(gè)Map對(duì)應(yīng)一塊數(shù)據(jù),當(dāng)內(nèi)存中存儲(chǔ)的Map數(shù)據(jù)占用空間達(dá)到一定程度的時(shí)候恳蹲,開始啟動(dòng)內(nèi)存中merge虐块,把內(nèi)存中的數(shù)據(jù)merge輸出到磁盤上一個(gè)文件中。
如果在內(nèi)存中不能放得下這個(gè)Map的數(shù)據(jù)的話嘉蕾,直接把Map數(shù)據(jù)寫到磁盤上贺奠,在本地目錄創(chuàng)建一個(gè)文件,從HTTP流中讀取數(shù)據(jù)然后寫到磁盤荆针,使用的緩存區(qū)大小是64K敞嗡。拖一個(gè)Map數(shù)據(jù)過來就會(huì)創(chuàng)建一個(gè)文件,當(dāng)文件數(shù)量達(dá)到一定閾值時(shí)航背,開始啟動(dòng)磁盤文件merge,把這些文件合并輸出到一個(gè)文件棱貌。
有些Map的數(shù)據(jù)較小是可以放在內(nèi)存中的玖媚,有些Map的數(shù)據(jù)較大需要放在磁盤上,這樣最后Reduce任務(wù)拖過來的數(shù)據(jù)有些放在內(nèi)存中了有些放在磁盤上婚脱,最后會(huì)對(duì)這些來一個(gè)全局合并今魔。
Merge Sort
這里使用的Merge和Map端使用的Merge過程一樣勺像。Map的輸出數(shù)據(jù)已經(jīng)是有序的,Merge進(jìn)行一次合并排序错森,所謂Reduce端的sort過程就是這個(gè)合并的過程吟宦。一般Reduce是一邊copy一邊sort,即copy和sort兩個(gè)階段是重疊而不是完全分開的涩维。
Reduce端的Shuffle過程至此結(jié)束殃姓。
Spark的Shuffle過程介紹
Shuffle Writer
Spark豐富了任務(wù)類型,有些任務(wù)之間數(shù)據(jù)流轉(zhuǎn)不需要通過Shuffle瓦阐,但是有些任務(wù)之間還是需要通過Shuffle來傳遞數(shù)據(jù)蜗侈,比如wide dependency的group by key。
Spark中需要Shuffle輸出的Map任務(wù)會(huì)為每個(gè)Reduce創(chuàng)建對(duì)應(yīng)的bucket睡蟋,Map產(chǎn)生的結(jié)果會(huì)根據(jù)設(shè)置的partitioner得到對(duì)應(yīng)的bucketId踏幻,然后填充到相應(yīng)的bucket中去。每個(gè)Map的輸出結(jié)果可能包含所有的Reduce所需要的數(shù)據(jù)戳杀,所以每個(gè)Map會(huì)創(chuàng)建R個(gè)bucket(R是reduce的個(gè)數(shù))该面,M個(gè)Map總共會(huì)創(chuàng)建M*R個(gè)bucket。
Map創(chuàng)建的bucket其實(shí)對(duì)應(yīng)磁盤上的一個(gè)文件信卡,Map的結(jié)果寫到每個(gè)bucket中其實(shí)就是寫到那個(gè)磁盤文件中隔缀,這個(gè)文件也被稱為blockFile,是Disk Block Manager管理器通過文件名的Hash值對(duì)應(yīng)到本地目錄的子目錄中創(chuàng)建的坐求。每個(gè)Map要在節(jié)點(diǎn)上創(chuàng)建R個(gè)磁盤文件用于結(jié)果輸出蚕泽,Map的結(jié)果是直接輸出到磁盤文件上的,100KB的內(nèi)存緩沖是用來創(chuàng)建Fast Buffered OutputStream輸出流桥嗤。這種方式一個(gè)問題就是Shuffle文件過多须妻。
針對(duì)上述Shuffle過程產(chǎn)生的文件過多問題,Spark有另外一種改進(jìn)的Shuffle過程:consolidation Shuffle泛领,以期顯著減少Shuffle文件的數(shù)量荒吏。在consolidation Shuffle中每個(gè)bucket并非對(duì)應(yīng)一個(gè)文件,而是對(duì)應(yīng)文件中的一個(gè)segment部分渊鞋。Job的map在某個(gè)節(jié)點(diǎn)上第一次執(zhí)行绰更,為每個(gè)reduce創(chuàng)建bucket對(duì)應(yīng)的輸出文件,把這些文件組織成ShuffleFileGroup锡宋,當(dāng)這次map執(zhí)行完之后儡湾,這個(gè)ShuffleFileGroup可以釋放為下次循環(huán)利用;當(dāng)又有map在這個(gè)節(jié)點(diǎn)上執(zhí)行時(shí)执俩,不需要?jiǎng)?chuàng)建新的bucket文件徐钠,而是在上次的ShuffleFileGroup中取得已經(jīng)創(chuàng)建的文件繼續(xù)追加寫一個(gè)segment;當(dāng)前次map還沒執(zhí)行完役首,ShuffleFileGroup還沒有釋放尝丐,這時(shí)如果有新的map在這個(gè)節(jié)點(diǎn)上執(zhí)行显拜,無(wú)法循環(huán)利用這個(gè)ShuffleFileGroup,而是只能創(chuàng)建新的bucket文件組成新的ShuffleFileGroup來寫輸出爹袁。
比如一個(gè)Job有3個(gè)Map和2個(gè)reduce:(1) 如果此時(shí)集群有3個(gè)節(jié)點(diǎn)有空槽远荠,每個(gè)節(jié)點(diǎn)空閑了一個(gè)core,則3個(gè)Map會(huì)調(diào)度到這3個(gè)節(jié)點(diǎn)上執(zhí)行失息,每個(gè)Map都會(huì)創(chuàng)建2個(gè)Shuffle文件譬淳,總共創(chuàng)建6個(gè)Shuffle文件;(2) 如果此時(shí)集群有2個(gè)節(jié)點(diǎn)有空槽根时,每個(gè)節(jié)點(diǎn)空閑了一個(gè)core瘦赫,則2個(gè)Map先調(diào)度到這2個(gè)節(jié)點(diǎn)上執(zhí)行,每個(gè)Map都會(huì)創(chuàng)建2個(gè)Shuffle文件蛤迎,然后其中一個(gè)節(jié)點(diǎn)執(zhí)行完Map之后又調(diào)度執(zhí)行另一個(gè)Map确虱,則這個(gè)Map不會(huì)創(chuàng)建新的Shuffle文件,而是把結(jié)果輸出追加到之前Map創(chuàng)建的Shuffle文件中替裆;總共創(chuàng)建4個(gè)Shuffle文件校辩;(3) 如果此時(shí)集群有2個(gè)節(jié)點(diǎn)有空槽,一個(gè)節(jié)點(diǎn)有2個(gè)空core一個(gè)節(jié)點(diǎn)有1個(gè)空core辆童,則一個(gè)節(jié)點(diǎn)調(diào)度2個(gè)Map一個(gè)節(jié)點(diǎn)調(diào)度1個(gè)Map宜咒,調(diào)度2個(gè)Map的節(jié)點(diǎn)上,一個(gè)Map創(chuàng)建了Shuffle文件把鉴,后面的Map還是會(huì)創(chuàng)建新的Shuffle文件故黑,因?yàn)樯弦粋€(gè)Map還正在寫,它創(chuàng)建的ShuffleFileGroup還沒有釋放庭砍;總共創(chuàng)建6個(gè)Shuffle文件场晶。
Shuffle Fetcher
Reduce去拖Map的輸出數(shù)據(jù),Spark提供了兩套不同的拉取數(shù)據(jù)框架:通過socket連接去取數(shù)據(jù)怠缸;使用netty框架去取數(shù)據(jù)诗轻。
每個(gè)節(jié)點(diǎn)的Executor會(huì)創(chuàng)建一個(gè)BlockManager,其中會(huì)創(chuàng)建一個(gè)BlockManagerWorker用于響應(yīng)請(qǐng)求揭北。當(dāng)Reduce的GET_BLOCK的請(qǐng)求過來時(shí)扳炬,讀取本地文件將這個(gè)blockId的數(shù)據(jù)返回給Reduce。如果使用的是Netty框架搔体,BlockManager會(huì)創(chuàng)建ShuffleSender用于發(fā)送Shuffle數(shù)據(jù)恨樟。
并不是所有的數(shù)據(jù)都是通過網(wǎng)絡(luò)讀取,對(duì)于在本節(jié)點(diǎn)的Map數(shù)據(jù)疚俱,Reduce直接去磁盤上讀取而不再通過網(wǎng)絡(luò)框架厌杜。
Reduce拖過來數(shù)據(jù)之后以什么方式存儲(chǔ)呢?Spark
Map輸出的數(shù)據(jù)沒有經(jīng)過排序计螺,Spark Shuffle過來的數(shù)據(jù)也不會(huì)進(jìn)行排序夯尽,Spark認(rèn)為Shuffle過程中的排序不是必須的,并不是所有類型的Reduce需要的數(shù)據(jù)都需要排序登馒,強(qiáng)制地進(jìn)行排序只會(huì)增加Shuffle的負(fù)擔(dān)匙握。Reduce拖過來的數(shù)據(jù)會(huì)放在一個(gè)HashMap中,HashMap中存儲(chǔ)的也是<key,value>對(duì)陈轿,key是Map輸出的key圈纺,Map輸出對(duì)應(yīng)這個(gè)key的所有value組成HashMap的value。Spark將Shuffle取過來的每一個(gè)<key,value>對(duì)插入或者更新到HashMap中麦射,來一個(gè)處理一個(gè)蛾娶。HashMap全部放在內(nèi)存中。
Shuffle取過來的數(shù)據(jù)全部存放在內(nèi)存中潜秋,對(duì)于數(shù)據(jù)量比較小或者已經(jīng)在Map端做過合并處理的Shuffle數(shù)據(jù)蛔琅,占用內(nèi)存空間不會(huì)太大,但是對(duì)于比如groupbykey這樣的操作峻呛,Reduce需要得到key對(duì)應(yīng)的所有value罗售,并將這些value組一個(gè)數(shù)組放在內(nèi)存中,這樣當(dāng)數(shù)據(jù)量較大時(shí)钩述,就需要較多內(nèi)存寨躁。
當(dāng)內(nèi)存不夠時(shí),要不就失敗牙勘,要不就用老辦法把內(nèi)存中的數(shù)據(jù)移到磁盤上放著职恳。Spark意識(shí)到在處理數(shù)據(jù)規(guī)模遠(yuǎn)遠(yuǎn)大于內(nèi)存空間時(shí)所帶來的不足,引入了一個(gè)具有外部排序的方案方面。Shuffle過來的數(shù)據(jù)先放在內(nèi)存中放钦,當(dāng)內(nèi)存中存儲(chǔ)的<key,value>對(duì)超過1000并且內(nèi)存使用超過70%時(shí),判斷節(jié)點(diǎn)上可用內(nèi)存如果還足夠葡幸,則把內(nèi)存緩沖區(qū)大小翻倍最筒,如果可用內(nèi)存不再夠了,則把內(nèi)存中的<key,value>對(duì)排序然后寫到磁盤文件中蔚叨。最后把內(nèi)存緩沖區(qū)中的數(shù)據(jù)排序之后和那些磁盤文件組成一個(gè)最小堆床蜘,每次從最小堆中讀取最小的數(shù)據(jù),這個(gè)和MapReduce中的merge過程類似蔑水。
MapReduce和Spark的Shuffle過程對(duì)比
Shuffle后續(xù)優(yōu)化方向
通過上面的介紹邢锯,我們了解到,Shuffle過程的主要存儲(chǔ)介質(zhì)是磁盤搀别,盡量的減少IO是Shuffle的主要優(yōu)化方向丹擎。我們腦海中都有那個(gè)經(jīng)典的存儲(chǔ)金字塔體系,Shuffle過程為什么把結(jié)果都放在磁盤上,那是因?yàn)楝F(xiàn)在內(nèi)存再大也大不過磁盤蒂培,內(nèi)存就那么大再愈,還這么多張嘴吃,當(dāng)然是分配給最需要的了护戳。如果具有“土豪”內(nèi)存節(jié)點(diǎn)翎冲,減少Shuffle
IO的最有效方式無(wú)疑是盡量把數(shù)據(jù)放在內(nèi)存中。下面列舉一些現(xiàn)在看可以優(yōu)化的方面媳荒,期待經(jīng)過我們不斷的努力抗悍,TDW計(jì)算引擎運(yùn)行地更好。
MapReduce Shuffle后續(xù)優(yōu)化方向
壓縮:對(duì)數(shù)據(jù)進(jìn)行壓縮钳枕,減少寫讀數(shù)據(jù)量缴渊;
減少不必要的排序:并不是所有類型的Reduce需要的數(shù)據(jù)都是需要排序的,排序這個(gè)nb的過程如果不需要最好還是不要的好鱼炒;
內(nèi)存化:Shuffle的數(shù)據(jù)不放在磁盤而是盡量放在內(nèi)存中衔沼,除非逼不得已往磁盤上放;當(dāng)然了如果有性能和內(nèi)存相當(dāng)?shù)牡谌酱鎯?chǔ)系統(tǒng)田柔,那放在第三方存儲(chǔ)系統(tǒng)上也是很好的俐巴;這個(gè)是個(gè)大招;
網(wǎng)絡(luò)框架:netty的性能據(jù)說要占優(yōu)了硬爆;
本節(jié)點(diǎn)上的數(shù)據(jù)不走網(wǎng)絡(luò)框架:對(duì)于本節(jié)點(diǎn)上的Map輸出欣舵,Reduce直接去讀吧,不需要繞道網(wǎng)絡(luò)框架缀磕。
Spark Shuffle后續(xù)優(yōu)化方向
Spark作為MapReduce的進(jìn)階架構(gòu)缘圈,對(duì)于Shuffle過程已經(jīng)是優(yōu)化了的,特別是對(duì)于那些具有爭(zhēng)議的步驟已經(jīng)做了優(yōu)化袜蚕,但是Spark的Shuffle對(duì)于我們來說在一些方面還是需要優(yōu)化的糟把。
壓縮:對(duì)數(shù)據(jù)進(jìn)行壓縮,減少寫讀數(shù)據(jù)量牲剃;
內(nèi)存化:Spark歷史版本中是有這樣設(shè)計(jì)的:Map寫數(shù)據(jù)先把數(shù)據(jù)全部寫到內(nèi)存中遣疯,寫完之后再把數(shù)據(jù)刷到磁盤上;考慮內(nèi)存是緊缺資源凿傅,后來修改成把數(shù)據(jù)直接寫到磁盤了缠犀;對(duì)于具有較大內(nèi)存的集群來講,還是盡量地往內(nèi)存上寫吧聪舒,內(nèi)存放不下了再放磁盤辨液。