本文1脖镀、2、3節(jié)介紹了Spark 內(nèi)存相關(guān)之識,第4節(jié)描述了常見錯(cuò)誤類型及產(chǎn)生原因并給出了解決方案蜒灰。
1 堆內(nèi)和堆外內(nèi)存規(guī)劃
Executor 的內(nèi)存管理建立在 JVM 的內(nèi)存管理之上弦蹂,Spark 對 JVM 的空間(Heap+Off-heap)進(jìn)行了更為詳細(xì)的分配,以充分利用內(nèi)存强窖。同時(shí)凸椿,Spark 引入了Off-heap(TungSten)內(nèi)存模式,使之可以直接在工作節(jié)點(diǎn)的系統(tǒng)內(nèi)存中開辟空間翅溺,進(jìn)一步優(yōu)化了內(nèi)存的使用(可以理解為是獨(dú)立于JVM托管的Heap之外利用c-style的malloc從os分配到的memory脑漫。由于不再由JVM托管,通過高效的內(nèi)存管理咙崎,可以避免JVM object overhead和Garbage collection的開銷)优幸。
運(yùn)行于Executor中的Task同時(shí)可使用JVM和Off-heap兩種模式的內(nèi)存。
- JVM OnHeap內(nèi)存: 大小由”--executor-memory”(即 spark.executor.memory)參數(shù)指定褪猛。Executor中運(yùn)行的并發(fā)任務(wù)共享JVM堆內(nèi)內(nèi)存网杆。
- JVM OffHeap內(nèi)存:大小由“spark.yarn.executor.memoryOverhead”參數(shù)指定,主要用于JVM自身伊滋,字符串, NIO Buffer等開銷碳却。
- Off-heap模式:默認(rèn)情況下Off-heap模式的內(nèi)存并不啟用,可以通過“spark.memory.offHeap.enabled”參數(shù)開啟笑旺,并由spark.memory.offHeap.size指定堆外內(nèi)存的大兄缙帧(占用的空間劃歸JVM OffHeap內(nèi)存)。
==備注==:我們現(xiàn)在未啟用Off-heap模式的內(nèi)存筒主,因此关噪,只介紹JVM模式的Executor內(nèi)存管理。以下出現(xiàn)有Off-heap均為JVM中區(qū)別于Heap的內(nèi)存乌妙。
2 Executor內(nèi)存劃分
2.1 Executor可用內(nèi)存總量
如上圖所示色洞,Yarn集群管理模式中,Spark 以Executor Container的形式在NodeManager中運(yùn)行冠胯,其可使用的內(nèi)存上限由“yarn.scheduler.maximum-allocation-mb” 指定, ====我們可以稱其為MonitorMemory ====火诸。
如前所述,Executor的內(nèi)存由Heap內(nèi)存和設(shè)定的Off-heap內(nèi)存組成荠察。
Heap: 由“spark.executor.memory” 指定, 以下稱為ExecutorMemory
Off-heap: 由 “spark.yarn.executor.memoryOverhead” 指定置蜀, 以下稱為MemoryOverhead
因此, 對現(xiàn)有Yarn集群,存在:
ExecutorMemory + MemoryOverhead <= MonitorMemory
若應(yīng)用提交之時(shí)悉盆,指定的 ExecutorMemory與MemoryOverhead 之和大于 MonitorMemory盯荤,則會導(dǎo)致Executor申請失敗焕盟;若運(yùn)行過程中秋秤,實(shí)際使用內(nèi)存超過上限閾值,Executor進(jìn)程會被Yarn終止掉(kill)。
2.2 Heap
Spark 對Heap內(nèi)存的管理是邏輯上的劃分管理(限制各有邏輯區(qū)域內(nèi)存量及記錄使用狀態(tài))灼卢,對象實(shí)例真正占用內(nèi)存的管理(申請和釋放)都由JVM完成绍哎。
“spark.executor.memory”指定的內(nèi)存為JVM最大分配的堆內(nèi)存(“-xmx”),Spark為了更高效的使用這部分內(nèi)存鞋真,對這部分內(nèi)存進(jìn)行了細(xì)分崇堰,下圖(備注:此圖源于互聯(lián)網(wǎng))對基于spark 2 (1.6)對堆內(nèi)存分配比例進(jìn)行了描述:
其中:
- Reserved Memory保留內(nèi)存,系統(tǒng)默認(rèn)值為300涩咖,一般無需改動海诲,不用關(guān)心此部分內(nèi)存。 但如果Executor分配的內(nèi)存小于 1.5 * 300 = 450M時(shí)檩互,Executor將無法執(zhí)行特幔。
- Storage Memory 存儲內(nèi)存
用于存放廣播數(shù)據(jù)及RDD緩存數(shù)據(jù)。由上圖可知闸昨,Spark 2+中敬辣,初始狀態(tài)下,Storage及Execution Memory均約占系統(tǒng)總內(nèi)存的30%(1 * 0.6 * 0.5 = 0.3)零院。在UnifiedMemory管理中,這兩部分內(nèi)存可以相互借用村刨,為了方便描述,我們使用storageRegionSize來表示“spark.storage.storageFraction”告抄。當(dāng)計(jì)算內(nèi)存不足時(shí),可以改造storageRegionSize中未使用部分嵌牺,且StorageMemory需要存儲內(nèi)存時(shí)也不可被搶占打洼; 若實(shí)際StorageMemory使用量超過storageRegionSize,那么當(dāng)計(jì)算內(nèi)存不足時(shí)逆粹,可以改造(StorageMemory – storageRegionSize)部分募疮,而storageRegionSize部分不可被搶占。
==備注==: Unified Memory中僻弹,spark.shuffle.memoryFraction, spark.storage.unrollFraction等參數(shù)無需在指定阿浓。
2.3 Java Off-heap (Memory Overhead)
Executor 中,另一塊內(nèi)存為由“spark.yarn.executor.memoryOverhead”指定的Java Off-heap內(nèi)存蹋绽,此部分內(nèi)存主要是創(chuàng)建Java Object時(shí)的額外開銷芭毙,Native方法調(diào)用,線程棧卸耘, NIO Buffer等開銷(Driect Buffer)退敦。此部分為用戶代碼及Spark 不可操作的內(nèi)存,不足時(shí)可通過調(diào)整參數(shù)解決, 無需過多關(guān)注蚣抗。 具體需要調(diào)整的場景參見本文第4節(jié)侈百。
3 任務(wù)內(nèi)存管理(Task Memory Manager)
Executor中任務(wù)以線程的方式執(zhí)行,各線程共享JVM的資源,任務(wù)之間的內(nèi)存資源沒有強(qiáng)隔離(任務(wù)沒有專用的Heap區(qū)域)钝域。因此讽坏,可能會出現(xiàn)這樣的情況:先到達(dá)的任務(wù)可能占用較大的內(nèi)存,而后到的任務(wù)因得不到足夠的內(nèi)存而掛起网梢。
在Spark任務(wù)內(nèi)存管理中震缭,使用HashMap存儲任務(wù)與其消耗內(nèi)存的映射關(guān)系。每個(gè)任務(wù)可占用的內(nèi)存大小為潛在可使用計(jì)算內(nèi)存的1/2n – 1/n , 當(dāng)剩余內(nèi)存為小于1/2n時(shí)战虏,任務(wù)將被掛起拣宰,直至有其他任務(wù)釋放執(zhí)行內(nèi)存,而滿足內(nèi)存下限1/2n烦感,任務(wù)被喚醒巡社,其中n為當(dāng)前Executor中活躍的任務(wù)數(shù)。
任務(wù)執(zhí)行過程中手趣,如果需要更多的內(nèi)存晌该,則會進(jìn)行申請,如果绿渣,存在空閑內(nèi)存朝群,則自動擴(kuò)容成功,否則中符,將拋出OutOffMemroyError姜胖。
==備注==:潛在可使用計(jì)算內(nèi)存為:初始計(jì)算內(nèi)存+可搶占存儲內(nèi)存
4 內(nèi)存調(diào)整方案
Executor中可同時(shí)運(yùn)行的任務(wù)數(shù)由Executor分配的CPU的核數(shù)N 和每個(gè)任務(wù)需要的CPU核心數(shù)C決定。其中:
- N = spark.executor.cores
- C = spark.task.cpus
Executor的最大任務(wù)并行度可表示為 ==TP = N / C==. 其中,C值與應(yīng)用類型有關(guān)淀散,大部分應(yīng)用使用默認(rèn)值1即可右莱,因此,影響Executor中最大任務(wù)并行度的主要因素是N.
依據(jù)Task的內(nèi)存使用特征档插,前文所述的Executor內(nèi)存模型可以簡單抽象為下圖所示模型:
其中慢蜓,Executor 向yarn申請的總內(nèi)存可表示為: M = M1 + M2 .
4.1 錯(cuò)誤類型及調(diào)整方案
4.1.1 Executor OOM類錯(cuò)誤 (錯(cuò)誤代碼 137、143等)
該類錯(cuò)誤一般是由于Heap(M2)已達(dá)上限郭膛,Task需要更多的內(nèi)存晨抡,而又得不到足夠的內(nèi)存而導(dǎo)致。因此则剃,解決方案要從增加每個(gè)Task的內(nèi)存使用量凄诞,滿足任務(wù)需求 或 降低單個(gè)Task的內(nèi)存消耗量,從而使現(xiàn)有內(nèi)存可以滿足任務(wù)運(yùn)行需求兩個(gè)角度出發(fā)忍级。因此:
4.1.1.1 增加單個(gè)task的內(nèi)存使用量
- 增加最大Heap值帆谍, 即 上圖中M2 的值,使每個(gè)Task可使用內(nèi)存增加轴咱。
- 降低Executor的可用Core的數(shù)量 N , 使Executor中同時(shí)運(yùn)行的任務(wù)數(shù)減少汛蝙,在總資源不變的情況下烈涮,使每個(gè)Task獲得的內(nèi)存相對增加。
4.1.1.2 降低單個(gè)Task的內(nèi)存消耗量
降低單個(gè)Task的內(nèi)存消耗量可從配制方式和調(diào)整應(yīng)用邏輯兩個(gè)層面進(jìn)行優(yōu)化:
-
配制方式
減少每個(gè)Task處理的數(shù)據(jù)量窖剑,可降低Task的內(nèi)存開銷坚洽,在Spark中,每個(gè)partition對應(yīng)一個(gè)處理任務(wù)Task, 因此西土,在數(shù)據(jù)總量一定的前提下讶舰,可以通過增加partition數(shù)量的方式來減少每個(gè)Task處理的數(shù)據(jù)量,從而降低Task的內(nèi)存開銷。針對不同的Spark應(yīng)用類型需了,存在不同的partition調(diào)整參數(shù)如下:
- P = spark.default.parallism (非SQL應(yīng)用)
- P = spark.sql.shuffle.partition (SQL 應(yīng)用)
通過增加P的值跳昼,可在一定程度上使Task現(xiàn)有內(nèi)存滿足任務(wù)運(yùn)行
注: 當(dāng)調(diào)整一個(gè)參數(shù)不能解決問題時(shí),上述方案應(yīng)進(jìn)行協(xié)同調(diào)整==備注:若應(yīng)用shuffle階段 spill嚴(yán)重肋乍,則可以通過調(diào)整“spark.shuffle.spill.numElementsForceSpillThreshold”的值鹅颊,來限制spill使用的內(nèi)存大小 ,比如設(shè)置(2000000)墓造,該值太大不足以解決OOM問題堪伍,若太小,則spill會太頻繁觅闽,影響集群性能帝雇,因此,要依據(jù)負(fù)載類型進(jìn)行合理伸縮(此處蛉拙,可設(shè)法引入動態(tài)伸縮機(jī)制尸闸,待后續(xù)處理)。==
-
調(diào)整應(yīng)用邏輯
Executor OOM 一般發(fā)生Shuffle階段刘离,該階段需求計(jì)算內(nèi)存較大,且應(yīng)用邏輯對內(nèi)存需求有較大影響睹栖,下面舉例就行說明:
- groupByKey 轉(zhuǎn)換為 reduceByKey
一般情況下硫惕,groupByKey能實(shí)現(xiàn)的功能使用reduceByKey均可實(shí)現(xiàn),而ReduceByKey存在Map端的合并野来,可以有效減少傳輸帶寬占用及Reduce端內(nèi)存消耗恼除。
- groupByKey 轉(zhuǎn)換為 reduceByKey
-
data skew 預(yù)處理
Data Skew是指任務(wù)間處理的數(shù)據(jù)量存大較大的差異。
如左圖所示曼氛,key 為010的數(shù)據(jù)較多豁辉,當(dāng)發(fā)生shuffle時(shí),010所在分區(qū)存在大量數(shù)據(jù)舀患,不僅拖慢Job執(zhí)行(Job的執(zhí)行時(shí)間由最后完成的任務(wù)決定)徽级。 而且導(dǎo)致010對應(yīng)Task內(nèi)存消耗過多,可能導(dǎo)致OOM. 而右圖聊浅,經(jīng)過預(yù)處理(加鹽餐抢,此處僅為舉例說明問題现使,解決方法不限于此)可以有效減少Data Skew導(dǎo)致 的問題
==注:上述舉例僅為說明調(diào)整應(yīng)用邏輯可以在一定程序上解決OOM問題,解決方法不限于上述舉例==
4.1.2 Beyond…… memory, killed by yarn.
出現(xiàn)該問題原因是由于實(shí)際使用內(nèi)存上限超過申請的內(nèi)存上限而被Yarn終止掉了, 首先說明Yarn中Container內(nèi)存監(jiān)控機(jī)制:
- Container進(jìn)程的內(nèi)存使用量:以Container進(jìn)程為根的進(jìn)程樹中所有進(jìn)程的內(nèi)存使用總量旷痕。
- Container被殺死的判斷依據(jù):進(jìn)程樹總內(nèi)存(物理內(nèi)存或虛擬內(nèi)存)使用量超過向Yarn申請的內(nèi)存上限值碳锈,則認(rèn)為該Container使用內(nèi)存超量,可以被“殺死”欺抗。
因此售碳,對該異常的分析要從是否存在子進(jìn)程兩個(gè)角度出發(fā)。
1) 不存在子進(jìn)程
根據(jù)Container進(jìn)程殺死的條件可知绞呈,在不存在子進(jìn)程時(shí)贸人,出現(xiàn)killed by yarn問題是于由Executor(JVM)進(jìn)程自身內(nèi)存超過向Yarn申請的內(nèi)存總量M 所致。由于未出現(xiàn)4.1.1節(jié)所述的OOM異常报强,因此可判定其為 M1 (Overhead)不足, 依據(jù)Yarn內(nèi)存使用情況有如下兩種方案:
- 如果灸姊,M未達(dá)到Y(jié)arn單個(gè)Container允許的上限時(shí),可僅增加M1 秉溉,從而增加M力惯;如果,M達(dá)到Y(jié)arn單個(gè)Container允許的上限時(shí)召嘶,增加 M1父晶, 降低 M2.
操作方法:在提交腳本中添加 --conf spark.yarn.executor.memoryOverhead=3072(或更大的值,比如4096等) --conf spark.executor.memory = 10g 或 更小的值弄跌,注意二者之各要小于Container監(jiān)控內(nèi)存量,否則伸請資源將被yarn拒絕甲喝。 - 減少可用的Core的數(shù)量 N, 使并行任務(wù)數(shù)減少,從而減少Overhead開銷
操作方法:在提交腳本中添加 --executor-cores=3 <比原來小的值> 或 --conf spark.executor.cores=3 <比原來小的值>
2)存在子進(jìn)程
Spark 應(yīng)用中Container以Executor(JVM進(jìn)程)的形式存在铛只,因此根進(jìn)程為Executor對應(yīng)的進(jìn)程, 而Spark 應(yīng)用向Yarn申請的總資源M = M1 + M 2 , 都是以Executor(JVM)進(jìn)程(非進(jìn)程樹)可用資源的名義申請的埠胖。申請的資源并非一次性全量分配給JVM使用,而是先為JVM分配初始值淳玩,隨后內(nèi)存不足時(shí)再按比率不斷進(jìn)行擴(kuò)容直撤,直致達(dá)到Container監(jiān)控的最大內(nèi)存使用量M 。當(dāng)Executor中啟動了子進(jìn)程(調(diào)用shell等)時(shí)蜕着,子進(jìn)程占用的內(nèi)存(記為 S) 就被加入Container進(jìn)程樹谋竖,此時(shí)就會影響Executor實(shí)際可使用內(nèi)存資源(Executor進(jìn)程實(shí)際可使用資源為:M - S),然而啟動JVM時(shí)設(shè)置的可用最大資源為M承匣, 且JVM進(jìn)程并不會感知Container中留給自己的使用量已被子進(jìn)程占用蓖乘,因此,當(dāng)JVM使用量達(dá)到 M - S韧骗,還會繼續(xù)開劈內(nèi)存空間嘉抒,這就會導(dǎo)致Executor進(jìn)程樹使用的總內(nèi)存量大于M 而被Yarn 殺死。
典形場景有:PySpark(Spark已做內(nèi)存限制袍暴,一般不會占用過大內(nèi)存)众眨、自定義Shell調(diào)用握牧。其解決方案:
- PySpark場景:
- 如果,M未達(dá)到Y(jié)arn單個(gè)Container允許的上限時(shí)娩梨,可僅增加M1 沿腰,從而增加M;如果狈定,M達(dá)到Y(jié)arn單個(gè)Container允許的上限時(shí)颂龙,增加 M1, 降低 M2.
- 減少可用的Core的數(shù)量 N, 使并行任務(wù)數(shù)減少纽什,從而減少Overhead開銷
- 自定義Shell 場景:(OverHead不足為假象)
- 調(diào)整子進(jìn)程可用內(nèi)存量措嵌,(通過單機(jī)測試,內(nèi)存控制在Container監(jiān)控內(nèi)存以內(nèi)芦缰,且為Spark保留內(nèi)存等留有空間)企巢。
操作方法同4.1.2<1>中所述
- 調(diào)整子進(jìn)程可用內(nèi)存量措嵌,(通過單機(jī)測試,內(nèi)存控制在Container監(jiān)控內(nèi)存以內(nèi)芦缰,且為Spark保留內(nèi)存等留有空間)企巢。
特別鳴謝:海華師兄
轉(zhuǎn)載請注明:http://www.reibang.com/p/10e91ace3378
參考文獻(xiàn):
https://spark.apache.org/docs/latest/tuning.html
https://0x0fff.com/spark-memory-management/ https://www.ibm.com/developerworks/cn/analytics/library/ba-cn-apache-spark-memory-management/index.html?ca=drs-&utm_source=tuicool&utm_medium=referral