分布式系統(tǒng)里的Shuffle 階段往往是非常復(fù)雜的惦费,而且分支條件也多,我只能按著我關(guān)注的線去描述泰讽±埽肯定會(huì)有不少謬誤之處昔期,我會(huì)根據(jù)自己理解的深入,不斷更新這篇文章佛玄。
前言
借用和董神的一段對(duì)話說(shuō)下背景:
shuffle共有三種硼一,別人討論的是hash shuffle,這是最原始的實(shí)現(xiàn)梦抢,曾經(jīng)有兩個(gè)版本般贼,第一版是每個(gè)map產(chǎn)生r個(gè)文件,一共產(chǎn)生mr個(gè)文件奥吩,由于產(chǎn)生的中間文件太大影響擴(kuò)展性哼蛆,社區(qū)提出了第二個(gè)優(yōu)化版本,讓一個(gè)core上map共用文件霞赫,減少文件數(shù)目腮介,這樣共產(chǎn)生corer個(gè)文件,好多了端衰,但中間文件數(shù)目仍隨任務(wù)數(shù)線性增加叠洗,仍難以應(yīng)對(duì)大作業(yè),但hash shuffle已經(jīng)優(yōu)化到頭了靴迫。為了解決hash shuffle性能差的問(wèn)題惕味,又引入sort shuffle,完全借鑒mapreduce實(shí)現(xiàn)玉锌,每個(gè)map產(chǎn)生一個(gè)文件名挥,徹底解決了擴(kuò)展性問(wèn)題
目前Sort Based Shuffle 是作為默認(rèn)Shuffle類型的。Shuffle 是一個(gè)很復(fù)雜的過(guò)程主守,任何一個(gè)環(huán)節(jié)都足夠?qū)懸黄恼沦骶蟆K赃@里,我嘗試換個(gè)方式参淫,從實(shí)用的角度出發(fā)救湖,讓讀者有兩方面的收獲:
- 剖析哪些環(huán)節(jié),哪些代碼可能會(huì)讓內(nèi)存產(chǎn)生問(wèn)題
- 控制相關(guān)內(nèi)存的參數(shù)
有時(shí)候涎才,我們寧可程序慢點(diǎn)鞋既,也不要OOM,至少要先跑步起來(lái)耍铜,希望這篇文章能夠讓你達(dá)成這個(gè)目標(biāo)邑闺。
同時(shí)我們會(huì)提及一些類名,這些類方便你自己想更深入了解時(shí)棕兼,可以方便的找到他們陡舅,自己去探個(gè)究竟。
Shuffle 概覽
Spark 的Shuffle 分為 Write,Read 兩階段伴挚。我們預(yù)先建立三個(gè)概念:
Write 對(duì)應(yīng)的是ShuffleMapTask,具體的寫操作ExternalSorter來(lái)負(fù)責(zé)
Read 階段由ShuffleRDD里的HashShuffleReader來(lái)完成靶衍。如果拉來(lái)的數(shù)據(jù)如果過(guò)大灾炭,需要落地,則也由ExternalSorter來(lái)完成的
所有Write 寫完后颅眶,才會(huì)執(zhí)行Read蜈出。 他們被分成了兩個(gè)不同的Stage階段。
也就是說(shuō)帚呼,Shuffle Write ,Shuffle Read 兩階段都可能需要落磁盤掏缎,并且通過(guò)Disk Merge 來(lái)完成最后的Sort歸并排序。
Shuffle Write 內(nèi)存消耗分析
Shuffle Write 的入口鏈路為:
org.apache.spark.scheduler.ShuffleMapTask
---> org.apache.spark.shuffle.sort.SortShuffleWriter
---> org.apache.spark.util.collection.ExternalSorter
會(huì)產(chǎn)生內(nèi)存瓶頸的其實(shí)就是 org.apache.spark.util.collection.ExternalSorter
煤杀。我們看看這個(gè)復(fù)雜的ExternalSorter都有哪些地方在占用內(nèi)存:
第一個(gè)地:
private var map = new PartitionedAppendOnlyMap[K, C]
我們知道眷蜈,數(shù)據(jù)都是先寫內(nèi)存,內(nèi)存不夠了沈自,才寫磁盤酌儒。這里的map就是那個(gè)放數(shù)據(jù)的內(nèi)存了。
這個(gè)PartitionedAppendOnlyMap
內(nèi)部維持了一個(gè)數(shù)組枯途,是這樣的:
private var data = new Array[AnyRef](2 * capacity)
也就是他消耗的并不是Storage的內(nèi)存忌怎,所謂Storage內(nèi)存,指的是由blockManager管理起來(lái)的內(nèi)存酪夷。
PartitionedAppendOnlyMap 放不下榴啸,要落地,那么不能硬生生的寫磁盤晚岭,所以需要個(gè)buffer,然后把buffer再一次性寫入磁盤文件鸥印。這個(gè)buffer是由參數(shù)
spark.shuffle.file.buffer=32k
控制的。數(shù)據(jù)獲取的過(guò)程中坦报,序列化反序列化库说,也是需要空間的,所以Spark 對(duì)數(shù)量做了限制片择,通過(guò)如下參數(shù)控制:
spark.shuffle.spill.batchSize=10000
假設(shè)一個(gè)Executor的可使用的Core為 C個(gè)潜的,那么對(duì)應(yīng)需要的內(nèi)存消耗為:
C * 32k + C * 10000個(gè)Record + C * PartitionedAppendOnlyMap
這么看來(lái),寫文件的buffer不是問(wèn)題字管,而序列化的batchSize也不是問(wèn)題啰挪,幾萬(wàn)或者十幾萬(wàn)個(gè)Record 而已。那C * PartitionedAppendOnlyMap 到底會(huì)有多大呢嘲叔?我先給個(gè)結(jié)論:
C * PartitionedAppendOnlyMap < ExecutorHeapMemeory * 0.2 * 0.8
怎么得到上面的結(jié)論呢脐供?核心店就是要判定PartitionedAppendOnlyMap
需要占用多少內(nèi)存,而它到底能占用內(nèi)存借跪,則由觸發(fā)寫磁盤動(dòng)作決定,因?yàn)橐坏懘疟P酌壕,PartitionedAppendOnlyMap所占有的內(nèi)存就會(huì)被釋放掏愁。下面是判斷是否寫磁盤的邏輯代碼:
estimatedSize = map.estimateSize()
if (maybeSpill(map, estimatedSize)) {
map = new PartitionedAppendOnlyMap[K, C]
}
每放一條記錄歇由,就會(huì)做一次內(nèi)存的檢查,看PartitionedAppendOnlyMap
到底占用了多少內(nèi)存果港。如果真是這樣沦泌,假設(shè)檢查一次內(nèi)存1ms, 1kw 就不得了的時(shí)間了。所以肯定是不行的,所以 estimateSize
其實(shí)是使用采樣算法來(lái)做的。
第二個(gè)迄汛,我們也不希望mayBeSpill
太耗時(shí),所以 maybeSpill
方法里就搞了很多東西盒蟆,減少耗時(shí)。我們看看都設(shè)置了哪些防線
首先會(huì)判定要不要執(zhí)行內(nèi)部邏輯:
elementsRead % 32 == 0 && currentMemory >= myMemoryThreshold
每隔32次會(huì)進(jìn)行一次檢查驼鞭,并且要當(dāng)前PartitionedAppendOnlyMap
currentMemory > myMemoryThreshold 才會(huì)進(jìn)一步判定是不是要spill.
其中 myMemoryThreshold可通過(guò)如下配置獲得初始值
spark.shuffle.spill.initialMemoryThreshold = 5 * 1024 * 1024
接著會(huì)向 shuffleMemoryManager 要 2 * currentMemory - myMemoryThreshold
的內(nèi)存,shuffleMemoryManager 是被Executor 所有正在運(yùn)行的Task(Core) 共享的,能夠分配出去的內(nèi)存是:
ExecutorHeapMemeory * 0.2 * 0.8
上面的數(shù)字可通過(guò)下面兩個(gè)配置來(lái)更改:
spark.shuffle.memoryFraction=0.2
spark.shuffle.safetyFraction=0.8
如果無(wú)法獲取到足夠的內(nèi)存千劈,就會(huì)觸發(fā)真的spill操作了。
看到這里牌捷,上面的結(jié)論就顯而易見了墙牌。
然而,這里我們忽略了一個(gè)很大的問(wèn)題暗甥,就是
estimatedSize = map.estimateSize()
為什么說(shuō)它是大問(wèn)題喜滨,前面我們說(shuō)了,estimateSize 是近似估計(jì)撤防,所以有可能估的不準(zhǔn)虽风,也就是實(shí)際內(nèi)存會(huì)遠(yuǎn)遠(yuǎn)超過(guò)預(yù)期。
具體的大家可以看看 org.apache.spark.util.collection.SizeTracker
我這里給出一個(gè)結(jié)論:
如果你內(nèi)存開的比較大即碗,其實(shí)反倒風(fēng)險(xiǎn)更高焰情,因?yàn)閑stimateSize 并不是每次都去真實(shí)的算緩存。它是通過(guò)采樣來(lái)完成的剥懒,而采樣的周期不是固定的内舟,而是指數(shù)增長(zhǎng)的,比如第一次采樣完后初橘,PartitionedAppendOnlyMap 要經(jīng)過(guò)1.1次的update/insert操作之后才進(jìn)行第二次采樣验游,然后經(jīng)過(guò)1.1*.1.1次之后進(jìn)行第三次采樣,以此遞推保檐,假設(shè)你內(nèi)存開的大耕蝉,那PartitionedAppendOnlyMap可能要經(jīng)過(guò)幾十萬(wàn)次更新之后之后才會(huì)進(jìn)行一次采樣,然后才能計(jì)算出新的大小夜只,這個(gè)時(shí)候幾十萬(wàn)次更新帶來(lái)的新的內(nèi)存壓力垒在,可能已經(jīng)讓你的GC不堪重負(fù)了。
當(dāng)然扔亥,這是一種折中场躯,因?yàn)榇_實(shí)不能頻繁采樣谈为。
如果你不想出現(xiàn)這種問(wèn)題,要么自己替換實(shí)現(xiàn)這個(gè)類踢关,要么將
spark.shuffle.safetyFraction=0.8
設(shè)置的更小一些伞鲫。
Shuffle Read 內(nèi)存消耗分析
Shuffle Read 的入口鏈路為:
org.apache.spark.rdd.ShuffledRDD
---> org.apache.spark.shuffle.sort.HashShuffleReader
---> org.apache.spark.util.collection.ExternalAppendOnlyMap
---> org.apache.spark.util.collection.ExternalSorter
Shuffle Read 會(huì)更復(fù)雜些,尤其是從各個(gè)節(jié)點(diǎn)拉取數(shù)據(jù)签舞。但這塊不是不是我們的重點(diǎn)秕脓。按流程,主要有:
- 獲取待拉取數(shù)據(jù)的迭代器
- 使用AppendOnlyMap/ExternalAppendOnlyMap 做combine
- 如果需要對(duì)key排序儒搭,則使用ExternalSorter
其中1后續(xù)會(huì)單獨(dú)列出文章吠架。3我們?cè)趙rite階段已經(jīng)討論過(guò)。所以這里重點(diǎn)是第二個(gè)步驟师妙,combine階段诵肛。
如果你開啟了
spark.shuffle.spill=true
則使用ExternalAppendOnlyMap,否則使用AppendOnlyMap默穴。兩者的區(qū)別是怔檩,前者如果內(nèi)存不夠,則落磁盤蓄诽,會(huì)發(fā)生spill操作薛训,后者如果內(nèi)存不夠,直接OOM了仑氛。
這里我們會(huì)重點(diǎn)分析ExternalAppendOnlyMap乙埃。
ExternalAppendOnlyMap 作為內(nèi)存緩沖數(shù)據(jù)的對(duì)象如下:
private var currentMap = new SizeTrackingAppendOnlyMap[K, C]
如果currentMap 對(duì)象向申請(qǐng)不到內(nèi)存,就會(huì)觸發(fā)spill動(dòng)作锯岖。判定內(nèi)存是否充足的邏輯和Shuffle Write 完全一致介袜。
Combine做完之后,ExternalAppendOnlyMap 會(huì)返回一個(gè)Iterator出吹,叫做ExternalIterator
,這個(gè)Iterator背后的數(shù)據(jù)源是所有spill文件以及當(dāng)前currentMap里的數(shù)據(jù)遇伞。
我們進(jìn)去 ExternalIterator 看看,唯一的一個(gè)占用內(nèi)存的對(duì)象是這個(gè)優(yōu)先隊(duì)列:
private val mergeHeap = new mutable.PriorityQueue[StreamBuffer]
mergeHeap 里元素?cái)?shù)量等于所有spill文件個(gè)數(shù)加一捶牢。StreamBuffer 的結(jié)構(gòu):
private class StreamBuffer(
val iterator: BufferedIterator[(K, C)],
val pairs: ArrayBuffer[(K, C)])
其中iterator 只是一個(gè)對(duì)象引用鸠珠,pairs 應(yīng)該保存的是iterator里的第一個(gè)元素(如果hash有沖突的話,則為多個(gè))
所以mergeHeap 應(yīng)該不占用什么內(nèi)存秋麸。到這里我們看看應(yīng)該占用多少內(nèi)存渐排。依然假設(shè) CoreNum 為 C,則
C * 32k + C * mergeHeap + C * SizeTrackingAppendOnlyMap
所以這一段占用內(nèi)存較大的依然是 SizeTrackingAppendOnlyMap ,一樣的灸蟆,他的值也符合如下公式
C * SizeTrackingAppendOnlyMap < ExecutorHeapMemeory * 0.2 * 0.8
ExternalAppendOnlyMap 的目的是做Combine,然后如果你還設(shè)置了Order,那么接著會(huì)啟用 ExternalSorter 來(lái)完成排序驯耻。
經(jīng)過(guò)上文對(duì)Shuffle Write的使用,相比大家也對(duì)ExternalSorter有一定的了解了,此時(shí)應(yīng)該占用內(nèi)存的地方最大不超過(guò)下面的這個(gè)值:
C * SizeTrackingAppendOnlyMap + C * PartitionedAppendOnlyMap
不過(guò)即使如此可缚,因?yàn)樗麄児蚕硪粋€(gè)shuffleMemoryManager孽水,則理論上只有這么大:
C * SizeTrackingAppendOnlyMap < ExecutorHeapMemeory * 0.2 * 0.8
分析到這里,我們可以做個(gè)總結(jié):
- Shuffle Read階段如果內(nèi)存不足城看,有兩個(gè)階段會(huì)落磁盤,分別是Combine 和 Sort 階段杏慰。對(duì)應(yīng)的都會(huì)spill小文件测柠,并且產(chǎn)生讀。
- Shuffle Read 階段如果開啟了spill功能缘滥,則基本能保證內(nèi)存控制在
ExecutorHeapMemeory * 0.2 * 0.8
之內(nèi)轰胁。
后話
如果大家對(duì)Sort Shuffle 落磁盤文件這塊感興趣,還可以看看這篇文章 Spark Shuffle Write階段磁盤文件分析