總結(jié)
前提:
每一個(gè)job提交后都會(huì)生成一個(gè)ResultStage和若干個(gè)ShuffleMapStage
其中ResultStage表示生成作業(yè)的最終結(jié)果所在的Stage宛瞄;
ResultStage的task分別對(duì)應(yīng)著ResultTask
ShuffleMapStage的task分別對(duì)應(yīng)著ShuffleMapTask。
HashShuffle:1.6版本前
SortShuffle:之后借鑒MR的Shuffle機(jī)制亲桥,Sort-Based Shuffle有幾種不同的策略:BypassMergeSortShuffleWriter含滴、SortShuffleWriter祠汇、UnasfeSortShuffleWriter贞远。
1悦即、HashShuffle:
不需要在fetch數(shù)據(jù)時(shí),在Reduce端做merge sort逼裆,只進(jìn)行基于桶的聚合郁稍,每個(gè)桶對(duì)應(yīng)一個(gè)塊文件(block file)
缺點(diǎn)是:數(shù)據(jù)量很大的情況下,內(nèi)存中會(huì)產(chǎn)生大量的Bucket緩沖區(qū)胜宇,內(nèi)存中還要管理文件句柄耀怜,所以就會(huì)有很多內(nèi)存消耗,頻繁IO以及GC頻繁或者出現(xiàn)內(nèi)存溢出桐愉。
2财破、SortShuffle:
該機(jī)制每一個(gè)ShuffleMapTask不會(huì)為后續(xù)的任務(wù)創(chuàng)建單獨(dú)的文件,而是會(huì)將所有的Task結(jié)果寫入同一個(gè)文件从诲,并且對(duì)應(yīng)生成一個(gè)索引文件左痢。以前的數(shù)據(jù)是放在內(nèi)存緩存中,等到數(shù)據(jù)完了再刷到磁盤系洛,現(xiàn)在為了減少內(nèi)存的使用俊性,在內(nèi)存不夠用的時(shí)候,可以將輸出溢寫到磁盤描扯,結(jié)束的時(shí)候定页,再將這些不同的文件聯(lián)合內(nèi)存的數(shù)據(jù)一起進(jìn)行歸并,從而減少內(nèi)存的使用量绽诚。
:一方面文件數(shù)量顯著減少
:另一方面減少Writer緩存所占用的內(nèi)存大小典徊,而且同時(shí)避免GC的風(fēng)險(xiǎn)和頻率。
三種策略:
1)BypassMergeSortShuffleWriter:
適合在Reducer數(shù)量不大恩够,又不需要在map端聚合和排序卒落,則將數(shù)據(jù)是直接寫入文件,
缺點(diǎn):數(shù)據(jù)量較大的時(shí)候玫鸟,網(wǎng)絡(luò)I/O和內(nèi)存負(fù)擔(dān)較重
2)SortShuffleWriter
適合數(shù)據(jù)量很大的場景或者集群規(guī)模很大
引入了外部外部排序器,可以支持在Map端進(jìn)行本地聚合或者不聚合
如果外部排序器enable了spill功能犀勒,如果內(nèi)存不夠屎飘,可以先將輸出溢寫到本地磁盤,最后將內(nèi)存結(jié)果和本地磁盤的溢寫文件進(jìn)行合并
3)UnsafeShuffleWriter
也即就是 tungsten-sort
Spark Shuffle之Tungsten-Sort
詳情
原文:spark基礎(chǔ)之shuffle機(jī)制和原理分析
一 概述
Shuffle就是對(duì)數(shù)據(jù)進(jìn)行重組贾费,由于分布式計(jì)算的特性和要求钦购,在實(shí)現(xiàn)細(xì)節(jié)上更加繁瑣和復(fù)雜
在MapReduce框架,Shuffle是連接Map和Reduce之間的橋梁褂萧,Map階段通過shuffle讀取數(shù)據(jù)并輸出到對(duì)應(yīng)的Reduce押桃;而Reduce階段負(fù)責(zé)從Map端拉取數(shù)據(jù)并進(jìn)行計(jì)算。在整個(gè)shuffle過程中导犹,往往伴隨著大量的磁盤和網(wǎng)絡(luò)I/O唱凯。所以shuffle性能的高低也直接決定了整個(gè)程序的性能高低羡忘。Spark也會(huì)有自己的shuffle實(shí)現(xiàn)過程
在DAG調(diào)度的過程中,Stage階段的劃分是根據(jù)是否有shuffle過程磕昼,也就是存在ShuffleDependency寬依賴的時(shí)候卷雕,需要進(jìn)行shuffle,這時(shí)候會(huì)將作業(yè)job劃分成多個(gè)Stage票从;并且在劃分Stage的時(shí)候漫雕,構(gòu)建ShuffleDependency的時(shí)候進(jìn)行shuffle注冊(cè),獲取后續(xù)數(shù)據(jù)讀取所需要的ShuffleHandle峰鄙,最終每一個(gè)job提交后都會(huì)生成一個(gè)ResultStage和若干個(gè)ShuffleMapStage浸间,其中ResultStage表示生成作業(yè)的最終結(jié)果所在的Stage。ResultStage與ShuffleMapStage中的task分別對(duì)應(yīng)著ResultTask與ShuffleMapTask吟榴。一個(gè)作業(yè)魁蒜,除了最終的ResultStage外,其他若干ShuffleMapStage中各個(gè)ShuffleMapTask都需要將最終的數(shù)據(jù)根據(jù)相應(yīng)的Partitioner對(duì)數(shù)據(jù)進(jìn)行分組煤墙,然后持久化分區(qū)的數(shù)據(jù)梅惯。
一 HashShuffle機(jī)制
1.1 HashShuffle概述
在spark-1.6版本之前,一直使用HashShuffle仿野,在spark-1.6版本之后使用Sort-Base Shuffle铣减,因?yàn)镠ashShuffle存在的不足所以就替換了HashShuffle.
我們知道,Spark的運(yùn)行主要分為2部分:一部分是驅(qū)動(dòng)程序脚作,其核心是SparkContext葫哗;另一部分是Worker節(jié)點(diǎn)上Task,它是運(yùn)行實(shí)際任務(wù)的。程序運(yùn)行的時(shí)候球涛,Driver和Executor進(jìn)程相互交互:運(yùn)行什么任務(wù)劣针,即Driver會(huì)分配Task到Executor,Driver 跟 Executor 進(jìn)行網(wǎng)絡(luò)傳輸; 任務(wù)數(shù)據(jù)從哪兒獲取亿扁,即Task要從 Driver 抓取其他上游的 Task 的數(shù)據(jù)結(jié)果捺典,所以有這個(gè)過程中就不斷的產(chǎn)生網(wǎng)絡(luò)結(jié)果。其中从祝,下一個(gè) Stage 向上一個(gè) Stage 要數(shù)據(jù)這個(gè)過程襟己,我們就稱之為 Shuffle。
1.2 沒有優(yōu)化之前的HashShuffle機(jī)制
在HashShuffle沒有優(yōu)化之前牍陌,每一個(gè)ShufflleMapTask會(huì)為每一個(gè)ReduceTask創(chuàng)建一個(gè)bucket緩存擎浴,并且會(huì)為每一個(gè)bucket創(chuàng)建一個(gè)文件。這個(gè)bucket存放的數(shù)據(jù)就是經(jīng)過Partitioner操作(默認(rèn)是HashPartitioner)之后找到對(duì)應(yīng)的bucket然后放進(jìn)去毒涧,最后將數(shù)據(jù)
刷新bucket緩存的數(shù)據(jù)到磁盤上贮预,即對(duì)應(yīng)的block file.
然后ShuffleMapTask將輸出作為MapStatus發(fā)送到DAGScheduler的MapOutputTrackerMaster,每一個(gè)MapStatus包含了每一個(gè)ResultTask要拉取的數(shù)據(jù)的位置和大小
ResultTask然后去利用BlockStoreShuffleFetcher向MapOutputTrackerMaster獲取MapStatus,看哪一份數(shù)據(jù)是屬于自己的仿吞,然后底層通過BlockManager將數(shù)據(jù)拉取過來
拉取過來的數(shù)據(jù)會(huì)組成一個(gè)內(nèi)部的ShuffleRDD滑频,優(yōu)先放入內(nèi)存,內(nèi)存不夠用則放入磁盤茫藏,然后ResulTask開始進(jìn)行聚合误趴,最后生成我們希望獲取的那個(gè)MapPartitionRDD
缺點(diǎn):
如上圖所示:在這里有1個(gè)worker,2個(gè)executor务傲,每一個(gè)executor運(yùn)行2個(gè)ShuffleMapTask凉当,有三個(gè)ReduceTask,所以總共就有4 * 3=12個(gè)bucket和12個(gè)block file售葡。
如果數(shù)據(jù)量較大看杭,將會(huì)生成M*R個(gè)小文件,比如ShuffleMapTask有100個(gè)挟伙,ResultTask有100個(gè)楼雹,這就會(huì)產(chǎn)生100*100=10000個(gè)小文件
bucket緩存很重要,需要將ShuffleMapTask所有數(shù)據(jù)都寫入bucket尖阔,才會(huì)刷到磁盤贮缅,那么如果Map端數(shù)據(jù)過多,這就很容易造成內(nèi)存溢出介却,盡管后面有優(yōu)化谴供,bucket寫入的數(shù)據(jù)達(dá)到刷新到磁盤的閥值之后,就會(huì)將數(shù)據(jù)一點(diǎn)一點(diǎn)的刷新到磁盤齿坷,但是這樣磁盤I/O就多了
1.3 優(yōu)化后的HashShuffle (Consolidation Shuffle)
每一個(gè)Executor進(jìn)程根據(jù)核數(shù)桂肌,決定Task的并發(fā)數(shù)量,比如executor核數(shù)是2永淌,就是可以并發(fā)運(yùn)行兩個(gè)task崎场,如果是一個(gè)則只能運(yùn)行一個(gè)task
假設(shè)executor核數(shù)是1,ShuffleMapTask數(shù)量是M,那么它依然會(huì)根據(jù)ResultTask的數(shù)量R遂蛀,創(chuàng)建R個(gè)bucket緩存谭跨,然后對(duì)key進(jìn)行hash,數(shù)據(jù)進(jìn)入不同的bucket中李滴,每一個(gè)bucket對(duì)應(yīng)著一個(gè)block file,用于刷新bucket緩存里的數(shù)據(jù)
然后下一個(gè)task運(yùn)行的時(shí)候螃宙,那么不會(huì)再創(chuàng)建新的bucket和block file,而是復(fù)用之前的task已經(jīng)創(chuàng)建好的bucket和block file悬嗓。即所謂同一個(gè)Executor進(jìn)程里所有Task都會(huì)把相同的key放入相同的bucket緩沖區(qū)中
這樣的話污呼,生成文件的數(shù)量就是(本地worker的executor數(shù)量*executor的cores*ResultTask數(shù)量)如上圖所示裕坊,即2*1*3 = 6個(gè)文件包竹,每一個(gè)Executor的shuffleMapTask數(shù)量100,ReduceTask數(shù)量為100,那么
未優(yōu)化的HashShuffle的文件數(shù)是2*1*100*100 =20000,優(yōu)化之后的數(shù)量是2*1*100 = 200文件周瞎,相當(dāng)于少了100倍
缺點(diǎn):如果 Reducer 端的并行任務(wù)或者是數(shù)據(jù)分片過多的話則 Core * Reducer Task 依舊過大苗缩,也會(huì)產(chǎn)生很多小文件。
二 Sort-Based Shuffle
2.1 Sort-Based Shuffle概述
HashShuffle回顧
HashShuffle寫數(shù)據(jù)的時(shí)候声诸,內(nèi)存有一個(gè)bucket緩沖區(qū)酱讶,同時(shí)在本地磁盤有對(duì)應(yīng)的本地文件,如果本地有文件彼乌,那么在內(nèi)存應(yīng)該也有文件句柄也是需要耗費(fèi)內(nèi)存的泻肯。也就是說,從內(nèi)存的角度考慮慰照,即有一部分存儲(chǔ)數(shù)據(jù)食绿,一部分管理文件句柄氮块。如果Mapper分片數(shù)量為1000,Reduce分片數(shù)量為1000,那么總共就需要1000000個(gè)小文件。所以就會(huì)有很多內(nèi)存消耗,頻繁IO以及GC頻繁或者出現(xiàn)內(nèi)存溢出狰住。
而且Reducer端讀取Map端數(shù)據(jù)時(shí),Mapper有這么多小文件民假,就需要打開很多網(wǎng)絡(luò)通道讀取意鲸,很容易造成Reducer(下一個(gè)stage)通過driver去拉取上一個(gè)stage數(shù)據(jù)的時(shí)候,說文件找不到算色,其實(shí)不是文件找不到而是程序不響應(yīng)抬伺,因?yàn)檎贕C.
2.2 Sorted-Based Shuffle介紹
為了緩解Shuffle過程產(chǎn)生文件數(shù)過多和Writer緩存開銷過大的問題,spark引入了類似于hadoop Map-Reduce的shuffle機(jī)制剃允。該機(jī)制每一個(gè)ShuffleMapTask不會(huì)為后續(xù)的任務(wù)創(chuàng)建單獨(dú)的文件沛简,而是會(huì)將所有的Task結(jié)果寫入同一個(gè)文件,并且對(duì)應(yīng)生成一個(gè)索引文件斥废。以前的數(shù)據(jù)是放在內(nèi)存緩存中椒楣,等到數(shù)據(jù)完了再刷到磁盤,現(xiàn)在為了減少內(nèi)存的使用牡肉,在內(nèi)存不夠用的時(shí)候捧灰,可以將輸出溢寫到磁盤,結(jié)束的時(shí)候统锤,再將這些不同的文件聯(lián)合內(nèi)存的數(shù)據(jù)一起進(jìn)行歸并毛俏,從而減少內(nèi)存的使用量。一方面文件數(shù)量顯著減少饲窿,另一方面減少Writer緩存所占用的內(nèi)存大小煌寇,而且同時(shí)避免GC的風(fēng)險(xiǎn)和頻率。
為了內(nèi)存的壓力逾雄,將數(shù)據(jù)刷到磁盤上阀溶,后續(xù)進(jìn)行歸并腻脏,減少寫緩存內(nèi)存壓力,避免GC银锻。
Sort-Based Shuffle有幾種不同的策略:
BypassMergeSortShuffleWriter
SortShuffleWriter
UnasfeSortShuffleWriter永品。
BypassMergeSortShuffleWriter使用這個(gè)模式特點(diǎn):
主要用于處理不需要排序和聚合的Shuffle操作,所以數(shù)據(jù)是直接寫入文件击纬,數(shù)據(jù)量較大的時(shí)候鼎姐,網(wǎng)絡(luò)I/O和內(nèi)存負(fù)擔(dān)較重
主要適合處理Reducer任務(wù)數(shù)量比較少的情況下
將每一個(gè)分區(qū)寫入一個(gè)單獨(dú)的文件,最后將這些文件合并,減少文件數(shù)量更振;但是這種方式需要并發(fā)打開多個(gè)文件炕桨,對(duì)內(nèi)存消耗比較大
因?yàn)锽ypassMergeSortShuffleWriter這種方式比SortShuffleWriter更快,所以如果在Reducer數(shù)量不大肯腕,又不需要在map端聚合和排序谋作,而且
Reducer的數(shù)目 < spark.shuffle.sort.bypassMergeThrshold指定的閥值,就是用的是這種方式乎芳。
SortShuffleWriter使用這個(gè)模式特點(diǎn):
比較適合數(shù)據(jù)量很大的場景或者集群規(guī)模很大
引入了外部外部排序器遵蚜,可以支持在Map端進(jìn)行本地聚合或者不聚合
如果外部排序器enable了spill功能,如果內(nèi)存不夠奈惑,可以先將輸出溢寫到本地磁盤吭净,最后將內(nèi)存結(jié)果和本地磁盤的溢寫文件進(jìn)行合并
對(duì)于UnsafeShuffleWriter由于需要謹(jǐn)慎使用,我們暫不做分析肴甸。
另外這個(gè)Sort-Based Shuffle跟Executor核數(shù)沒有關(guān)系寂殉,即跟并發(fā)度沒有關(guān)系,它是每一個(gè)ShuffleMapTask都會(huì)產(chǎn)生一個(gè)data文件和index文件原在,所謂合并也只是將該ShuffleMapTask的各個(gè)partition對(duì)應(yīng)的分區(qū)文件合并到data文件而已友扰。所以這個(gè)就需要個(gè)Hash-BasedShuffle的consolidation機(jī)制區(qū)別開來。