Spark Sort Based Shuffle內(nèi)存分析

分布式系統(tǒng)里的Shuffle 階段往往是非常復(fù)雜的惦费,而且分支條件也多,我只能按著我關(guān)注的線去描述泰讽±埽肯定會(huì)有不少謬誤之處昔期,我會(huì)根據(jù)自己理解的深入,不斷更新這篇文章佛玄。

前言

借用和董神的一段對(duì)話說(shuō)下背景:

shuffle共有三種硼一,別人討論的是hash shuffle,這是最原始的實(shí)現(xiàn)梦抢,曾經(jīng)有兩個(gè)版本般贼,第一版是每個(gè)map產(chǎn)生r個(gè)文件,一共產(chǎn)生mr個(gè)文件奥吩,由于產(chǎn)生的中間文件太大影響擴(kuò)展性哼蛆,社區(qū)提出了第二個(gè)優(yōu)化版本,讓一個(gè)core上map共用文件霞赫,減少文件數(shù)目腮介,這樣共產(chǎn)生corer個(gè)文件,好多了端衰,但中間文件數(shù)目仍隨任務(wù)數(shù)線性增加叠洗,仍難以應(yīng)對(duì)大作業(yè),但hash shuffle已經(jīng)優(yōu)化到頭了靴迫。為了解決hash shuffle性能差的問(wèn)題惕味,又引入sort shuffle,完全借鑒mapreduce實(shí)現(xiàn)玉锌,每個(gè)map產(chǎn)生一個(gè)文件名挥,徹底解決了擴(kuò)展性問(wèn)題

目前Sort Based Shuffle 是作為默認(rèn)Shuffle類型的。Shuffle 是一個(gè)很復(fù)雜的過(guò)程主守,任何一個(gè)環(huán)節(jié)都足夠?qū)懸黄恼沦骶蟆K赃@里,我嘗試換個(gè)方式参淫,從實(shí)用的角度出發(fā)救湖,讓讀者有兩方面的收獲:

  1. 剖析哪些環(huán)節(jié),哪些代碼可能會(huì)讓內(nèi)存產(chǎn)生問(wèn)題
  2. 控制相關(guān)內(nèi)存的參數(shù)

有時(shí)候涎才,我們寧可程序慢點(diǎn)鞋既,也不要OOM,至少要先跑步起來(lái)耍铜,希望這篇文章能夠讓你達(dá)成這個(gè)目標(biāo)邑闺。

同時(shí)我們會(huì)提及一些類名,這些類方便你自己想更深入了解時(shí)棕兼,可以方便的找到他們陡舅,自己去探個(gè)究竟。

Shuffle 概覽

Spark 的Shuffle 分為 Write,Read 兩階段伴挚。我們預(yù)先建立三個(gè)概念:

  • Write 對(duì)應(yīng)的是ShuffleMapTask,具體的寫操作ExternalSorter來(lái)負(fù)責(zé)

  • Read 階段由ShuffleRDD里的HashShuffleReader來(lái)完成靶衍。如果拉來(lái)的數(shù)據(jù)如果過(guò)大灾炭,需要落地,則也由ExternalSorter來(lái)完成的

  • 所有Write 寫完后颅眶,才會(huì)執(zhí)行Read蜈出。 他們被分成了兩個(gè)不同的Stage階段。

也就是說(shuō)帚呼,Shuffle Write ,Shuffle Read 兩階段都可能需要落磁盤掏缎,并且通過(guò)Disk Merge 來(lái)完成最后的Sort歸并排序。

Shuffle Write 內(nèi)存消耗分析

Shuffle Write 的入口鏈路為:

org.apache.spark.scheduler.ShuffleMapTask
---> org.apache.spark.shuffle.sort.SortShuffleWriter 
   ---> org.apache.spark.util.collection.ExternalSorter

會(huì)產(chǎn)生內(nèi)存瓶頸的其實(shí)就是 org.apache.spark.util.collection.ExternalSorter煤杀。我們看看這個(gè)復(fù)雜的ExternalSorter都有哪些地方在占用內(nèi)存:

第一個(gè)地:

private var map = new PartitionedAppendOnlyMap[K, C]

我們知道眷蜈,數(shù)據(jù)都是先寫內(nèi)存,內(nèi)存不夠了沈自,才寫磁盤酌儒。這里的map就是那個(gè)放數(shù)據(jù)的內(nèi)存了。

這個(gè)PartitionedAppendOnlyMap內(nèi)部維持了一個(gè)數(shù)組枯途,是這樣的:

private var data = new Array[AnyRef](2 * capacity)

也就是他消耗的并不是Storage的內(nèi)存忌怎,所謂Storage內(nèi)存,指的是由blockManager管理起來(lái)的內(nèi)存酪夷。

PartitionedAppendOnlyMap 放不下榴啸,要落地,那么不能硬生生的寫磁盤晚岭,所以需要個(gè)buffer,然后把buffer再一次性寫入磁盤文件鸥印。這個(gè)buffer是由參數(shù)

spark.shuffle.file.buffer=32k

控制的。數(shù)據(jù)獲取的過(guò)程中坦报,序列化反序列化库说,也是需要空間的,所以Spark 對(duì)數(shù)量做了限制片择,通過(guò)如下參數(shù)控制:

 spark.shuffle.spill.batchSize=10000

假設(shè)一個(gè)Executor的可使用的Core為 C個(gè)潜的,那么對(duì)應(yīng)需要的內(nèi)存消耗為:

 C * 32k + C * 10000個(gè)Record + C * PartitionedAppendOnlyMap

這么看來(lái),寫文件的buffer不是問(wèn)題字管,而序列化的batchSize也不是問(wèn)題啰挪,幾萬(wàn)或者十幾萬(wàn)個(gè)Record 而已。那C * PartitionedAppendOnlyMap 到底會(huì)有多大呢嘲叔?我先給個(gè)結(jié)論:

   C * PartitionedAppendOnlyMap < ExecutorHeapMemeory * 0.2 * 0.8 

怎么得到上面的結(jié)論呢脐供?核心店就是要判定PartitionedAppendOnlyMap 需要占用多少內(nèi)存,而它到底能占用內(nèi)存借跪,則由觸發(fā)寫磁盤動(dòng)作決定,因?yàn)橐坏懘疟P酌壕,PartitionedAppendOnlyMap所占有的內(nèi)存就會(huì)被釋放掏愁。下面是判斷是否寫磁盤的邏輯代碼:

 estimatedSize = map.estimateSize()
 if (maybeSpill(map, estimatedSize)) { 
          map = new PartitionedAppendOnlyMap[K, C]
 }

每放一條記錄歇由,就會(huì)做一次內(nèi)存的檢查,看PartitionedAppendOnlyMap 到底占用了多少內(nèi)存果港。如果真是這樣沦泌,假設(shè)檢查一次內(nèi)存1ms, 1kw 就不得了的時(shí)間了。所以肯定是不行的,所以 estimateSize其實(shí)是使用采樣算法來(lái)做的。

第二個(gè)迄汛,我們也不希望mayBeSpill太耗時(shí),所以 maybeSpill 方法里就搞了很多東西盒蟆,減少耗時(shí)。我們看看都設(shè)置了哪些防線

首先會(huì)判定要不要執(zhí)行內(nèi)部邏輯:

   elementsRead % 32 == 0 && currentMemory >= myMemoryThreshold

每隔32次會(huì)進(jìn)行一次檢查驼鞭,并且要當(dāng)前PartitionedAppendOnlyMap currentMemory > myMemoryThreshold 才會(huì)進(jìn)一步判定是不是要spill.

其中 myMemoryThreshold可通過(guò)如下配置獲得初始值

spark.shuffle.spill.initialMemoryThreshold =  5 * 1024 * 1024

接著會(huì)向 shuffleMemoryManager 要 2 * currentMemory - myMemoryThreshold 的內(nèi)存,shuffleMemoryManager 是被Executor 所有正在運(yùn)行的Task(Core) 共享的,能夠分配出去的內(nèi)存是:

ExecutorHeapMemeory * 0.2 * 0.8 

上面的數(shù)字可通過(guò)下面兩個(gè)配置來(lái)更改:

spark.shuffle.memoryFraction=0.2
spark.shuffle.safetyFraction=0.8

如果無(wú)法獲取到足夠的內(nèi)存千劈,就會(huì)觸發(fā)真的spill操作了。

看到這里牌捷,上面的結(jié)論就顯而易見了墙牌。

然而,這里我們忽略了一個(gè)很大的問(wèn)題暗甥,就是

 estimatedSize = map.estimateSize()

為什么說(shuō)它是大問(wèn)題喜滨,前面我們說(shuō)了,estimateSize 是近似估計(jì)撤防,所以有可能估的不準(zhǔn)虽风,也就是實(shí)際內(nèi)存會(huì)遠(yuǎn)遠(yuǎn)超過(guò)預(yù)期。

具體的大家可以看看 org.apache.spark.util.collection.SizeTracker

我這里給出一個(gè)結(jié)論:

如果你內(nèi)存開的比較大即碗,其實(shí)反倒風(fēng)險(xiǎn)更高焰情,因?yàn)閑stimateSize 并不是每次都去真實(shí)的算緩存。它是通過(guò)采樣來(lái)完成的剥懒,而采樣的周期不是固定的内舟,而是指數(shù)增長(zhǎng)的,比如第一次采樣完后初橘,PartitionedAppendOnlyMap 要經(jīng)過(guò)1.1次的update/insert操作之后才進(jìn)行第二次采樣验游,然后經(jīng)過(guò)1.1*.1.1次之后進(jìn)行第三次采樣,以此遞推保檐,假設(shè)你內(nèi)存開的大耕蝉,那PartitionedAppendOnlyMap可能要經(jīng)過(guò)幾十萬(wàn)次更新之后之后才會(huì)進(jìn)行一次采樣,然后才能計(jì)算出新的大小夜只,這個(gè)時(shí)候幾十萬(wàn)次更新帶來(lái)的新的內(nèi)存壓力垒在,可能已經(jīng)讓你的GC不堪重負(fù)了。

當(dāng)然扔亥,這是一種折中场躯,因?yàn)榇_實(shí)不能頻繁采樣谈为。

如果你不想出現(xiàn)這種問(wèn)題,要么自己替換實(shí)現(xiàn)這個(gè)類踢关,要么將

spark.shuffle.safetyFraction=0.8 

設(shè)置的更小一些伞鲫。

Shuffle Read 內(nèi)存消耗分析

Shuffle Read 的入口鏈路為:

org.apache.spark.rdd.ShuffledRDD
---> org.apache.spark.shuffle.sort.HashShuffleReader
   --->  org.apache.spark.util.collection.ExternalAppendOnlyMap
   --->  org.apache.spark.util.collection.ExternalSorter

Shuffle Read 會(huì)更復(fù)雜些,尤其是從各個(gè)節(jié)點(diǎn)拉取數(shù)據(jù)签舞。但這塊不是不是我們的重點(diǎn)秕脓。按流程,主要有:

  1. 獲取待拉取數(shù)據(jù)的迭代器
  2. 使用AppendOnlyMap/ExternalAppendOnlyMap 做combine
  3. 如果需要對(duì)key排序儒搭,則使用ExternalSorter

其中1后續(xù)會(huì)單獨(dú)列出文章吠架。3我們?cè)趙rite階段已經(jīng)討論過(guò)。所以這里重點(diǎn)是第二個(gè)步驟师妙,combine階段诵肛。

如果你開啟了

spark.shuffle.spill=true

則使用ExternalAppendOnlyMap,否則使用AppendOnlyMap默穴。兩者的區(qū)別是怔檩,前者如果內(nèi)存不夠,則落磁盤蓄诽,會(huì)發(fā)生spill操作薛训,后者如果內(nèi)存不夠,直接OOM了仑氛。

這里我們會(huì)重點(diǎn)分析ExternalAppendOnlyMap乙埃。

ExternalAppendOnlyMap 作為內(nèi)存緩沖數(shù)據(jù)的對(duì)象如下:

 private var currentMap = new SizeTrackingAppendOnlyMap[K, C]

如果currentMap 對(duì)象向申請(qǐng)不到內(nèi)存,就會(huì)觸發(fā)spill動(dòng)作锯岖。判定內(nèi)存是否充足的邏輯和Shuffle Write 完全一致介袜。

Combine做完之后,ExternalAppendOnlyMap 會(huì)返回一個(gè)Iterator出吹,叫做ExternalIterator,這個(gè)Iterator背后的數(shù)據(jù)源是所有spill文件以及當(dāng)前currentMap里的數(shù)據(jù)遇伞。

我們進(jìn)去 ExternalIterator 看看,唯一的一個(gè)占用內(nèi)存的對(duì)象是這個(gè)優(yōu)先隊(duì)列:

   private val mergeHeap = new mutable.PriorityQueue[StreamBuffer]

mergeHeap 里元素?cái)?shù)量等于所有spill文件個(gè)數(shù)加一捶牢。StreamBuffer 的結(jié)構(gòu):

 private class StreamBuffer(    
                    val iterator: BufferedIterator[(K, C)],    
                    val pairs: ArrayBuffer[(K, C)])

其中iterator 只是一個(gè)對(duì)象引用鸠珠,pairs 應(yīng)該保存的是iterator里的第一個(gè)元素(如果hash有沖突的話,則為多個(gè))

所以mergeHeap 應(yīng)該不占用什么內(nèi)存秋麸。到這里我們看看應(yīng)該占用多少內(nèi)存渐排。依然假設(shè) CoreNum 為 C,則

  C * 32k + C  * mergeHeap  + C * SizeTrackingAppendOnlyMap  

所以這一段占用內(nèi)存較大的依然是 SizeTrackingAppendOnlyMap ,一樣的灸蟆,他的值也符合如下公式

 C * SizeTrackingAppendOnlyMap < ExecutorHeapMemeory * 0.2 * 0.8

ExternalAppendOnlyMap 的目的是做Combine,然后如果你還設(shè)置了Order,那么接著會(huì)啟用 ExternalSorter 來(lái)完成排序驯耻。

經(jīng)過(guò)上文對(duì)Shuffle Write的使用,相比大家也對(duì)ExternalSorter有一定的了解了,此時(shí)應(yīng)該占用內(nèi)存的地方最大不超過(guò)下面的這個(gè)值:

 C * SizeTrackingAppendOnlyMap  + C * PartitionedAppendOnlyMap

不過(guò)即使如此可缚,因?yàn)樗麄児蚕硪粋€(gè)shuffleMemoryManager孽水,則理論上只有這么大:

 C * SizeTrackingAppendOnlyMap <  ExecutorHeapMemeory * 0.2 * 0.8

分析到這里,我們可以做個(gè)總結(jié):

  1. Shuffle Read階段如果內(nèi)存不足城看,有兩個(gè)階段會(huì)落磁盤,分別是Combine 和 Sort 階段杏慰。對(duì)應(yīng)的都會(huì)spill小文件测柠,并且產(chǎn)生讀。
  2. Shuffle Read 階段如果開啟了spill功能缘滥,則基本能保證內(nèi)存控制在 ExecutorHeapMemeory * 0.2 * 0.8 之內(nèi)轰胁。

后話

如果大家對(duì)Sort Shuffle 落磁盤文件這塊感興趣,還可以看看這篇文章 Spark Shuffle Write階段磁盤文件分析

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末朝扼,一起剝皮案震驚了整個(gè)濱河市赃阀,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌擎颖,老刑警劉巖榛斯,帶你破解...
    沈念sama閱讀 221,635評(píng)論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異搂捧,居然都是意外死亡驮俗,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,543評(píng)論 3 399
  • 文/潘曉璐 我一進(jìn)店門允跑,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)王凑,“玉大人,你說(shuō)我怎么就攤上這事聋丝∷髋耄” “怎么了?”我有些...
    開封第一講書人閱讀 168,083評(píng)論 0 360
  • 文/不壞的土叔 我叫張陵弱睦,是天一觀的道長(zhǎng)百姓。 經(jīng)常有香客問(wèn)我,道長(zhǎng)每篷,這世上最難降的妖魔是什么瓣戚? 我笑而不...
    開封第一講書人閱讀 59,640評(píng)論 1 296
  • 正文 為了忘掉前任,我火速辦了婚禮焦读,結(jié)果婚禮上子库,老公的妹妹穿的比我還像新娘。我一直安慰自己矗晃,他們只是感情好仑嗅,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,640評(píng)論 6 397
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著,像睡著了一般仓技。 火紅的嫁衣襯著肌膚如雪鸵贬。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 52,262評(píng)論 1 308
  • 那天脖捻,我揣著相機(jī)與錄音阔逼,去河邊找鬼。 笑死地沮,一個(gè)胖子當(dāng)著我的面吹牛嗜浮,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播摩疑,決...
    沈念sama閱讀 40,833評(píng)論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼危融,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了雷袋?” 一聲冷哼從身側(cè)響起吉殃,我...
    開封第一講書人閱讀 39,736評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎楷怒,沒(méi)想到半個(gè)月后蛋勺,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 46,280評(píng)論 1 319
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡率寡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,369評(píng)論 3 340
  • 正文 我和宋清朗相戀三年迫卢,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片冶共。...
    茶點(diǎn)故事閱讀 40,503評(píng)論 1 352
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡乾蛤,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出捅僵,到底是詐尸還是另有隱情家卖,我是刑警寧澤,帶...
    沈念sama閱讀 36,185評(píng)論 5 350
  • 正文 年R本政府宣布庙楚,位于F島的核電站上荡,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏馒闷。R本人自食惡果不足惜酪捡,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,870評(píng)論 3 333
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望纳账。 院中可真熱鬧逛薇,春花似錦、人聲如沸疏虫。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,340評(píng)論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至呢袱,卻和暖如春官扣,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背羞福。 一陣腳步聲響...
    開封第一講書人閱讀 33,460評(píng)論 1 272
  • 我被黑心中介騙來(lái)泰國(guó)打工惕蹄, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人治专。 一個(gè)月前我還...
    沈念sama閱讀 48,909評(píng)論 3 376
  • 正文 我出身青樓焊唬,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親看靠。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,512評(píng)論 2 359

推薦閱讀更多精彩內(nèi)容