Spark基礎(chǔ)之shuffle機(jī)制和原理分析

總結(jié)

前提:
每一個(gè)job提交后都會(huì)生成一個(gè)ResultStage和若干個(gè)ShuffleMapStage
其中ResultStage表示生成作業(yè)的最終結(jié)果所在的Stage宛瞄;
ResultStage的task分別對(duì)應(yīng)著ResultTask
ShuffleMapStage的task分別對(duì)應(yīng)著ShuffleMapTask。

HashShuffle:1.6版本前
SortShuffle:之后借鑒MR的Shuffle機(jī)制亲桥,Sort-Based Shuffle有幾種不同的策略:BypassMergeSortShuffleWriter含滴、SortShuffleWriter祠汇、UnasfeSortShuffleWriter贞远。

1悦即、HashShuffle:

不需要在fetch數(shù)據(jù)時(shí),在Reduce端做merge sort逼裆,只進(jìn)行基于桶的聚合郁稍,每個(gè)桶對(duì)應(yīng)一個(gè)塊文件(block file)
缺點(diǎn)是:數(shù)據(jù)量很大的情況下,內(nèi)存中會(huì)產(chǎn)生大量的Bucket緩沖區(qū)胜宇,內(nèi)存中還要管理文件句柄耀怜,所以就會(huì)有很多內(nèi)存消耗,頻繁IO以及GC頻繁或者出現(xiàn)內(nèi)存溢出桐愉。

2财破、SortShuffle:

該機(jī)制每一個(gè)ShuffleMapTask不會(huì)為后續(xù)的任務(wù)創(chuàng)建單獨(dú)的文件,而是會(huì)將所有的Task結(jié)果寫入同一個(gè)文件从诲,并且對(duì)應(yīng)生成一個(gè)索引文件左痢。以前的數(shù)據(jù)是放在內(nèi)存緩存中,等到數(shù)據(jù)完了再刷到磁盤系洛,現(xiàn)在為了減少內(nèi)存的使用俊性,在內(nèi)存不夠用的時(shí)候,可以將輸出溢寫到磁盤描扯,結(jié)束的時(shí)候定页,再將這些不同的文件聯(lián)合內(nèi)存的數(shù)據(jù)一起進(jìn)行歸并,從而減少內(nèi)存的使用量绽诚。
:一方面文件數(shù)量顯著減少
:另一方面減少Writer緩存所占用的內(nèi)存大小典徊,而且同時(shí)避免GC的風(fēng)險(xiǎn)和頻率。

三種策略:
1)BypassMergeSortShuffleWriter:

適合在Reducer數(shù)量不大恩够,又不需要在map端聚合和排序卒落,則將數(shù)據(jù)是直接寫入文件,
缺點(diǎn):數(shù)據(jù)量較大的時(shí)候玫鸟,網(wǎng)絡(luò)I/O和內(nèi)存負(fù)擔(dān)較重

2)SortShuffleWriter

適合數(shù)據(jù)量很大的場景或者集群規(guī)模很大
引入了外部外部排序器,可以支持在Map端進(jìn)行本地聚合或者不聚合
如果外部排序器enable了spill功能犀勒,如果內(nèi)存不夠屎飘,可以先將輸出溢寫到本地磁盤,最后將內(nèi)存結(jié)果和本地磁盤的溢寫文件進(jìn)行合并

3)UnsafeShuffleWriter

也即就是 tungsten-sort
Spark Shuffle之Tungsten-Sort

詳情

原文:spark基礎(chǔ)之shuffle機(jī)制和原理分析

一 概述

Shuffle就是對(duì)數(shù)據(jù)進(jìn)行重組贾费,由于分布式計(jì)算的特性和要求钦购,在實(shí)現(xiàn)細(xì)節(jié)上更加繁瑣和復(fù)雜

在MapReduce框架,Shuffle是連接Map和Reduce之間的橋梁褂萧,Map階段通過shuffle讀取數(shù)據(jù)并輸出到對(duì)應(yīng)的Reduce押桃;而Reduce階段負(fù)責(zé)從Map端拉取數(shù)據(jù)并進(jìn)行計(jì)算。在整個(gè)shuffle過程中导犹,往往伴隨著大量的磁盤和網(wǎng)絡(luò)I/O唱凯。所以shuffle性能的高低也直接決定了整個(gè)程序的性能高低羡忘。Spark也會(huì)有自己的shuffle實(shí)現(xiàn)過程

Hadoop Map-Reduce Shuffle流程
ShuffleManager

在DAG調(diào)度的過程中,Stage階段的劃分是根據(jù)是否有shuffle過程磕昼,也就是存在ShuffleDependency寬依賴的時(shí)候卷雕,需要進(jìn)行shuffle,這時(shí)候會(huì)將作業(yè)job劃分成多個(gè)Stage票从;并且在劃分Stage的時(shí)候漫雕,構(gòu)建ShuffleDependency的時(shí)候進(jìn)行shuffle注冊(cè),獲取后續(xù)數(shù)據(jù)讀取所需要的ShuffleHandle峰鄙,最終每一個(gè)job提交后都會(huì)生成一個(gè)ResultStage和若干個(gè)ShuffleMapStage浸间,其中ResultStage表示生成作業(yè)的最終結(jié)果所在的Stage。ResultStage與ShuffleMapStage中的task分別對(duì)應(yīng)著ResultTask與ShuffleMapTask吟榴。一個(gè)作業(yè)魁蒜,除了最終的ResultStage外,其他若干ShuffleMapStage中各個(gè)ShuffleMapTask都需要將最終的數(shù)據(jù)根據(jù)相應(yīng)的Partitioner對(duì)數(shù)據(jù)進(jìn)行分組煤墙,然后持久化分區(qū)的數(shù)據(jù)梅惯。

一 HashShuffle機(jī)制

1.1 HashShuffle概述

在spark-1.6版本之前,一直使用HashShuffle仿野,在spark-1.6版本之后使用Sort-Base Shuffle铣减,因?yàn)镠ashShuffle存在的不足所以就替換了HashShuffle.

我們知道,Spark的運(yùn)行主要分為2部分:一部分是驅(qū)動(dòng)程序脚作,其核心是SparkContext葫哗;另一部分是Worker節(jié)點(diǎn)上Task,它是運(yùn)行實(shí)際任務(wù)的。程序運(yùn)行的時(shí)候球涛,Driver和Executor進(jìn)程相互交互:運(yùn)行什么任務(wù)劣针,即Driver會(huì)分配Task到Executor,Driver 跟 Executor 進(jìn)行網(wǎng)絡(luò)傳輸; 任務(wù)數(shù)據(jù)從哪兒獲取亿扁,即Task要從 Driver 抓取其他上游的 Task 的數(shù)據(jù)結(jié)果捺典,所以有這個(gè)過程中就不斷的產(chǎn)生網(wǎng)絡(luò)結(jié)果。其中从祝,下一個(gè) Stage 向上一個(gè) Stage 要數(shù)據(jù)這個(gè)過程襟己,我們就稱之為 Shuffle。

1.2 沒有優(yōu)化之前的HashShuffle機(jī)制

沒有優(yōu)化之前的HashShuffle機(jī)制

在HashShuffle沒有優(yōu)化之前牍陌,每一個(gè)ShufflleMapTask會(huì)為每一個(gè)ReduceTask創(chuàng)建一個(gè)bucket緩存擎浴,并且會(huì)為每一個(gè)bucket創(chuàng)建一個(gè)文件。這個(gè)bucket存放的數(shù)據(jù)就是經(jīng)過Partitioner操作(默認(rèn)是HashPartitioner)之后找到對(duì)應(yīng)的bucket然后放進(jìn)去毒涧,最后將數(shù)據(jù)

刷新bucket緩存的數(shù)據(jù)到磁盤上贮预,即對(duì)應(yīng)的block file.

然后ShuffleMapTask將輸出作為MapStatus發(fā)送到DAGScheduler的MapOutputTrackerMaster,每一個(gè)MapStatus包含了每一個(gè)ResultTask要拉取的數(shù)據(jù)的位置和大小

ResultTask然后去利用BlockStoreShuffleFetcher向MapOutputTrackerMaster獲取MapStatus,看哪一份數(shù)據(jù)是屬于自己的仿吞,然后底層通過BlockManager將數(shù)據(jù)拉取過來

拉取過來的數(shù)據(jù)會(huì)組成一個(gè)內(nèi)部的ShuffleRDD滑频,優(yōu)先放入內(nèi)存,內(nèi)存不夠用則放入磁盤茫藏,然后ResulTask開始進(jìn)行聚合误趴,最后生成我們希望獲取的那個(gè)MapPartitionRDD

缺點(diǎn):

如上圖所示:在這里有1個(gè)worker,2個(gè)executor务傲,每一個(gè)executor運(yùn)行2個(gè)ShuffleMapTask凉当,有三個(gè)ReduceTask,所以總共就有4 * 3=12個(gè)bucket和12個(gè)block file售葡。

  • 如果數(shù)據(jù)量較大看杭,將會(huì)生成M*R個(gè)小文件,比如ShuffleMapTask有100個(gè)挟伙,ResultTask有100個(gè)楼雹,這就會(huì)產(chǎn)生100*100=10000個(gè)小文件

  • bucket緩存很重要,需要將ShuffleMapTask所有數(shù)據(jù)都寫入bucket尖阔,才會(huì)刷到磁盤贮缅,那么如果Map端數(shù)據(jù)過多,這就很容易造成內(nèi)存溢出介却,盡管后面有優(yōu)化谴供,bucket寫入的數(shù)據(jù)達(dá)到刷新到磁盤的閥值之后,就會(huì)將數(shù)據(jù)一點(diǎn)一點(diǎn)的刷新到磁盤齿坷,但是這樣磁盤I/O就多了

1.3 優(yōu)化后的HashShuffle (Consolidation Shuffle)

優(yōu)化后的HashShuffle

每一個(gè)Executor進(jìn)程根據(jù)核數(shù)桂肌,決定Task的并發(fā)數(shù)量,比如executor核數(shù)是2永淌,就是可以并發(fā)運(yùn)行兩個(gè)task崎场,如果是一個(gè)則只能運(yùn)行一個(gè)task

假設(shè)executor核數(shù)是1,ShuffleMapTask數(shù)量是M,那么它依然會(huì)根據(jù)ResultTask的數(shù)量R遂蛀,創(chuàng)建R個(gè)bucket緩存谭跨,然后對(duì)key進(jìn)行hash,數(shù)據(jù)進(jìn)入不同的bucket中李滴,每一個(gè)bucket對(duì)應(yīng)著一個(gè)block file,用于刷新bucket緩存里的數(shù)據(jù)

然后下一個(gè)task運(yùn)行的時(shí)候螃宙,那么不會(huì)再創(chuàng)建新的bucket和block file,而是復(fù)用之前的task已經(jīng)創(chuàng)建好的bucket和block file悬嗓。即所謂同一個(gè)Executor進(jìn)程里所有Task都會(huì)把相同的key放入相同的bucket緩沖區(qū)中

這樣的話污呼,生成文件的數(shù)量就是(本地worker的executor數(shù)量*executor的cores*ResultTask數(shù)量)如上圖所示裕坊,即2*1*3 = 6個(gè)文件包竹,每一個(gè)Executor的shuffleMapTask數(shù)量100,ReduceTask數(shù)量為100,那么

未優(yōu)化的HashShuffle的文件數(shù)是2*1*100*100 =20000,優(yōu)化之后的數(shù)量是2*1*100 = 200文件周瞎,相當(dāng)于少了100倍

缺點(diǎn):如果 Reducer 端的并行任務(wù)或者是數(shù)據(jù)分片過多的話則 Core * Reducer Task 依舊過大苗缩,也會(huì)產(chǎn)生很多小文件。

二 Sort-Based Shuffle

2.1 Sort-Based Shuffle概述

HashShuffle回顧

HashShuffle寫數(shù)據(jù)的時(shí)候声诸,內(nèi)存有一個(gè)bucket緩沖區(qū)酱讶,同時(shí)在本地磁盤有對(duì)應(yīng)的本地文件,如果本地有文件彼乌,那么在內(nèi)存應(yīng)該也有文件句柄也是需要耗費(fèi)內(nèi)存的泻肯。也就是說,從內(nèi)存的角度考慮慰照,即有一部分存儲(chǔ)數(shù)據(jù)食绿,一部分管理文件句柄氮块。如果Mapper分片數(shù)量為1000,Reduce分片數(shù)量為1000,那么總共就需要1000000個(gè)小文件。所以就會(huì)有很多內(nèi)存消耗,頻繁IO以及GC頻繁或者出現(xiàn)內(nèi)存溢出狰住。

而且Reducer端讀取Map端數(shù)據(jù)時(shí),Mapper有這么多小文件民假,就需要打開很多網(wǎng)絡(luò)通道讀取意鲸,很容易造成Reducer(下一個(gè)stage)通過driver去拉取上一個(gè)stage數(shù)據(jù)的時(shí)候,說文件找不到算色,其實(shí)不是文件找不到而是程序不響應(yīng)抬伺,因?yàn)檎贕C.

2.2 Sorted-Based Shuffle介紹

為了緩解Shuffle過程產(chǎn)生文件數(shù)過多和Writer緩存開銷過大的問題,spark引入了類似于hadoop Map-Reduce的shuffle機(jī)制剃允。該機(jī)制每一個(gè)ShuffleMapTask不會(huì)為后續(xù)的任務(wù)創(chuàng)建單獨(dú)的文件沛简,而是會(huì)將所有的Task結(jié)果寫入同一個(gè)文件,并且對(duì)應(yīng)生成一個(gè)索引文件斥废。以前的數(shù)據(jù)是放在內(nèi)存緩存中椒楣,等到數(shù)據(jù)完了再刷到磁盤,現(xiàn)在為了減少內(nèi)存的使用牡肉,在內(nèi)存不夠用的時(shí)候捧灰,可以將輸出溢寫到磁盤,結(jié)束的時(shí)候统锤,再將這些不同的文件聯(lián)合內(nèi)存的數(shù)據(jù)一起進(jìn)行歸并毛俏,從而減少內(nèi)存的使用量。一方面文件數(shù)量顯著減少饲窿,另一方面減少Writer緩存所占用的內(nèi)存大小煌寇,而且同時(shí)避免GC的風(fēng)險(xiǎn)和頻率。

為了內(nèi)存的壓力逾雄,將數(shù)據(jù)刷到磁盤上阀溶,后續(xù)進(jìn)行歸并腻脏,減少寫緩存內(nèi)存壓力,避免GC银锻。

Sorted-Based Shuffle

Sort-Based Shuffle有幾種不同的策略:

BypassMergeSortShuffleWriter
SortShuffleWriter
UnasfeSortShuffleWriter永品。

BypassMergeSortShuffleWriter使用這個(gè)模式特點(diǎn):

主要用于處理不需要排序和聚合的Shuffle操作,所以數(shù)據(jù)是直接寫入文件击纬,數(shù)據(jù)量較大的時(shí)候鼎姐,網(wǎng)絡(luò)I/O和內(nèi)存負(fù)擔(dān)較重

  • 主要適合處理Reducer任務(wù)數(shù)量比較少的情況下

  • 將每一個(gè)分區(qū)寫入一個(gè)單獨(dú)的文件,最后將這些文件合并,減少文件數(shù)量更振;但是這種方式需要并發(fā)打開多個(gè)文件炕桨,對(duì)內(nèi)存消耗比較大

因?yàn)锽ypassMergeSortShuffleWriter這種方式比SortShuffleWriter更快,所以如果在Reducer數(shù)量不大肯腕,又不需要在map端聚合和排序谋作,而且

Reducer的數(shù)目 < spark.shuffle.sort.bypassMergeThrshold指定的閥值,就是用的是這種方式乎芳。

SortShuffleWriter使用這個(gè)模式特點(diǎn):

  • 比較適合數(shù)據(jù)量很大的場景或者集群規(guī)模很大

  • 引入了外部外部排序器遵蚜,可以支持在Map端進(jìn)行本地聚合或者不聚合

  • 如果外部排序器enable了spill功能,如果內(nèi)存不夠奈惑,可以先將輸出溢寫到本地磁盤吭净,最后將內(nèi)存結(jié)果和本地磁盤的溢寫文件進(jìn)行合并

對(duì)于UnsafeShuffleWriter由于需要謹(jǐn)慎使用,我們暫不做分析肴甸。

另外這個(gè)Sort-Based Shuffle跟Executor核數(shù)沒有關(guān)系寂殉,即跟并發(fā)度沒有關(guān)系,它是每一個(gè)ShuffleMapTask都會(huì)產(chǎn)生一個(gè)data文件和index文件原在,所謂合并也只是將該ShuffleMapTask的各個(gè)partition對(duì)應(yīng)的分區(qū)文件合并到data文件而已友扰。所以這個(gè)就需要個(gè)Hash-BasedShuffle的consolidation機(jī)制區(qū)別開來。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末庶柿,一起剝皮案震驚了整個(gè)濱河市村怪,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌浮庐,老刑警劉巖甚负,帶你破解...
    沈念sama閱讀 206,214評(píng)論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異审残,居然都是意外死亡梭域,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,307評(píng)論 2 382
  • 文/潘曉璐 我一進(jìn)店門搅轿,熙熙樓的掌柜王于貴愁眉苦臉地迎上來病涨,“玉大人,你說我怎么就攤上這事璧坟〖饶拢” “怎么了凌彬?”我有些...
    開封第一講書人閱讀 152,543評(píng)論 0 341
  • 文/不壞的土叔 我叫張陵,是天一觀的道長循衰。 經(jīng)常有香客問我,道長褐澎,這世上最難降的妖魔是什么会钝? 我笑而不...
    開封第一講書人閱讀 55,221評(píng)論 1 279
  • 正文 為了忘掉前任,我火速辦了婚禮工三,結(jié)果婚禮上迁酸,老公的妹妹穿的比我還像新娘。我一直安慰自己俭正,他們只是感情好奸鬓,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,224評(píng)論 5 371
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著掸读,像睡著了一般串远。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上儿惫,一...
    開封第一講書人閱讀 49,007評(píng)論 1 284
  • 那天澡罚,我揣著相機(jī)與錄音,去河邊找鬼肾请。 笑死留搔,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的铛铁。 我是一名探鬼主播隔显,決...
    沈念sama閱讀 38,313評(píng)論 3 399
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼饵逐!你這毒婦竟也來了括眠?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 36,956評(píng)論 0 259
  • 序言:老撾萬榮一對(duì)情侶失蹤倍权,失蹤者是張志新(化名)和其女友劉穎哺窄,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體账锹,經(jīng)...
    沈念sama閱讀 43,441評(píng)論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡萌业,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 35,925評(píng)論 2 323
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了奸柬。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片生年。...
    茶點(diǎn)故事閱讀 38,018評(píng)論 1 333
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖廓奕,靈堂內(nèi)的尸體忽然破棺而出抱婉,到底是詐尸還是另有隱情档叔,我是刑警寧澤,帶...
    沈念sama閱讀 33,685評(píng)論 4 322
  • 正文 年R本政府宣布蒸绩,位于F島的核電站衙四,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏患亿。R本人自食惡果不足惜传蹈,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,234評(píng)論 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望步藕。 院中可真熱鬧惦界,春花似錦、人聲如沸咙冗。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,240評(píng)論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽雾消。三九已至灾搏,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間立润,已是汗流浹背确镊。 一陣腳步聲響...
    開封第一講書人閱讀 31,464評(píng)論 1 261
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留范删,地道東北人蕾域。 一個(gè)月前我還...
    沈念sama閱讀 45,467評(píng)論 2 352
  • 正文 我出身青樓,卻偏偏與公主長得像到旦,于是被迫代替她去往敵國和親旨巷。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,762評(píng)論 2 345