前言
Spark作為一個(gè)基于內(nèi)存的分布式計(jì)算引擎,程序在運(yùn)行時(shí)可能會(huì)被集群中的任何資源阻塞:CPU | 網(wǎng)絡(luò)帶寬 | 內(nèi)存。然而Spark的內(nèi)存管理模塊在整個(gè)系統(tǒng)中扮演著非常重要的角色。理解Spark內(nèi)存管理的基本原理鸭叙,有助于更好地開(kāi)發(fā)Spark應(yīng)用程序和進(jìn)行性能調(diào)優(yōu)
在提交一個(gè)Spark Application時(shí)韧骗,Spark集群會(huì)啟動(dòng)Driver和Executor兩種JVM進(jìn)程驼侠。Driver為主控進(jìn)程兔院,負(fù)責(zé)創(chuàng)建Context殖卑,提交Job,并將Job轉(zhuǎn)化成Task秆乳,協(xié)調(diào)Executor間的Task執(zhí)行懦鼠。而Executor主要負(fù)責(zé)執(zhí)行具體的計(jì)算任務(wù),將結(jié)果返回Driver屹堰。由于Driver的內(nèi)存管理比較簡(jiǎn)單,和一般的JVM程序區(qū)別不大街氢,本文基于Spark2.12重點(diǎn)分析Executor的內(nèi)存管理扯键。
??? 既然說(shuō)到內(nèi)存管理,接下來(lái)不得不說(shuō)Spark任務(wù)開(kāi)啟后涉及的有哪些內(nèi)存I核唷H傩獭!
堆內(nèi)存(on-heap)和堆外內(nèi)存(off-heap)
-
堆內(nèi)存(on-heap):
描述:在JVM堆上分配的內(nèi)存伦乔,在GC范圍內(nèi)
①:Driver堆內(nèi)存:通過(guò)--driver-memory 或者spark.driver.memory指定厉亏,默認(rèn)大小1G;
②:Executor堆內(nèi)存:通過(guò)--executor-memory 或者spark.executor.memory指定烈和,默認(rèn)大小1G
-
堆外內(nèi)存(off-heap):
描述:在JVM之外分配的內(nèi)存爱只,不在GC范圍內(nèi)
①:Driver堆外內(nèi)存:通過(guò)spark.driver.memoryOverhead指定,默認(rèn)是Driver堆內(nèi)存的0.1倍招刹,最小是384MB恬试;
Driver堆外內(nèi)存 = max(Driver堆內(nèi)存 * MEMORY_OVERHEAD_FACTOR ,MEMORY_OVERHEAD_MIN )
注:MEMORY_OVERHEAD_FACTOR = 0.10 和 MEMORY_OVERHEAD_MIN = 384 如有需要的同學(xué)可以查看源碼
②:Executor堆外內(nèi)存 :通過(guò)spark.executor.memoryOverhead指定疯暑,默認(rèn)是Executor堆內(nèi)存的0.1倍训柴,最小是384MB;
Executor堆外內(nèi)存 = max(Executor堆內(nèi)存 * MEMORY_OVERHEAD_FACTOR 妇拯,MEMORY_OVERHEAD_MIN )
注:MEMORY_OVERHEAD_FACTOR = 0.10 和 MEMORY_OVERHEAD_MIN = 384 如有需要的同學(xué)可以查看源碼
-
Driver和Executor內(nèi)存:
①:Driver內(nèi)存大谢媚佟:Driver的堆內(nèi)存 + Driver的堆外內(nèi)存
②:Executor內(nèi)存大小:Executor的堆內(nèi)存 + Executor的堆外內(nèi)存
??? 走到這,大家應(yīng)該對(duì)Spark中涉及的內(nèi)存概念有一個(gè)簡(jiǎn)單的印象了仗嗦,那在開(kāi)發(fā)中我們給Spark任務(wù)一些資源后预麸,它是怎么管理這些內(nèi)存的呢?儒将?吏祸?
內(nèi)存空間分配
-
靜態(tài)內(nèi)存管理
在靜態(tài)內(nèi)存管理機(jī)制下,存儲(chǔ)內(nèi)存钩蚊,執(zhí)行內(nèi)存和其它內(nèi)存三部分的大小在Spark應(yīng)用運(yùn)行期間是固定的贡翘,但是用戶可以在提交Spark應(yīng)用之前進(jìn)行配置。如果開(kāi)發(fā)者不熟悉Spark的存儲(chǔ)機(jī)制砰逻,或沒(méi)有根據(jù)具體的數(shù)據(jù)規(guī)模和計(jì)算任務(wù)做相應(yīng)的配置鸣驱,很容易會(huì)造成資源沒(méi)有得到合理的分配導(dǎo)致Spark任務(wù)失敗。由于新的內(nèi)存管理機(jī)制的出現(xiàn)蝠咆,靜態(tài)內(nèi)存管理不在本文做詳細(xì)介紹踊东,有興趣的同學(xué)可以參考網(wǎng)上的其它博客。
-
統(tǒng)一內(nèi)存管理
Spark 1.6之后引入了統(tǒng)一內(nèi)存管理機(jī)制刚操,與靜態(tài)內(nèi)存管理的區(qū)別在于存儲(chǔ)和執(zhí)行內(nèi)存共享同一塊空間闸翅,可以動(dòng)態(tài)占用對(duì)方的空閑區(qū)域。
堆內(nèi)模型
描述:Executor內(nèi)運(yùn)行的并發(fā)任務(wù)共享JVM對(duì)內(nèi)內(nèi)存菊霜。
- Execution內(nèi)存:主要用于存放Shuffle坚冀,Join,Sort鉴逞,Aggregation等計(jì)算過(guò)程中的臨時(shí)數(shù)據(jù)记某;
- Storage內(nèi)存:主要用于存放Spark的cache數(shù)據(jù),例如:RDD的緩存构捡,unroll數(shù)據(jù)液南;
- 用戶內(nèi)存(User Memory):主要用于儲(chǔ)存RDD轉(zhuǎn)換操作所需要的數(shù)據(jù),例如RDD依賴等信息勾徽;
-
預(yù)留內(nèi)存(Reserved Memory):系統(tǒng)預(yù)留內(nèi)存滑凉,會(huì)用來(lái)存儲(chǔ)Spark內(nèi)部對(duì)象。
- systemMemory:其實(shí)就是通過(guò)參數(shù)spark.executor.memory 或 --executor-memory配置的捂蕴,在源碼中可以發(fā)現(xiàn)譬涡,systemMemory也有其最小值:minSystemMemory = (reservedMemory * 1.5),也就是最小是reservedMemory的1.5倍啥辨;
/**
* Return the total amount of memory shared between execution and storage, in bytes.
*/
private def getMaxMemory(conf: SparkConf): Long = {
val systemMemory = conf.get(TEST_MEMORY)
val reservedMemory = conf.getLong(TEST_RESERVED_MEMORY.key,
if (conf.contains(IS_TESTING)) 0 else RESERVED_SYSTEM_MEMORY_BYTES)
//SystemMemory最小值
val minSystemMemory = (reservedMemory * 1.5).ceil.toLong
if (systemMemory < minSystemMemory) {
throw new IllegalArgumentException(s"System memory $systemMemory must " +
s"be at least $minSystemMemory. Please increase heap size using the --driver-memory " +
s"option or spark.driver.memory in Spark configuration.")
}
// SPARK-12759 Check executor memory to fail fast if memory is insufficient
if (conf.contains(config.EXECUTOR_MEMORY)) {
val executorMemory = conf.getSizeAsBytes(config.EXECUTOR_MEMORY.key)
if (executorMemory < minSystemMemory) {
throw new IllegalArgumentException(s"Executor memory $executorMemory must be at least " +
s"$minSystemMemory. Please increase executor memory using the " +
s"--executor-memory option or ${config.EXECUTOR_MEMORY.key} in Spark configuration.")
}
}
//usableMemory計(jì)算邏輯
val usableMemory = systemMemory - reservedMemory
val memoryFraction = conf.get(config.MEMORY_FRACTION)
(usableMemory * memoryFraction).toLong
}
- reservedMemory:其值等于300MB涡匀,這個(gè)值平常線上開(kāi)發(fā)是不做修改的;
// Set aside a fixed amount of memory for non-storage, non-execution purposes.
// This serves a function similar to `spark.memory.fraction`, but guarantees that we reserve
// sufficient memory for the system even for small heaps. E.g. if we have a 1GB JVM, then
// the memory used for execution and storage will be (1024 - 300) * 0.6 = 434MB by default.
private val RESERVED_SYSTEM_MEMORY_BYTES = 300 * 1024 * 1024
- usableMemory = systemMemory - reservedMemory溉知,這個(gè)就是Spark的可用內(nèi)存陨瘩,具體可以在上面介紹systemMemory中找到這個(gè)值怎么計(jì)算的腕够。
堆外模型
描述:相比對(duì)內(nèi)內(nèi)存,堆外內(nèi)存的模型比較簡(jiǎn)單舌劳,只包括Storage內(nèi)存和Execution內(nèi)存帚湘,其分布如下圖:
??? 上面大概說(shuō)了說(shuō)Spark中的堆內(nèi)存和堆外內(nèi)存怎么管理以及相關(guān)參數(shù)怎么得到和設(shè)置的,其中涉及了Execution和Storage內(nèi)存動(dòng)態(tài)占用的情景甚淡,那接下來(lái)聊一聊動(dòng)態(tài)占用機(jī)制是怎么樣的大诸??
動(dòng)態(tài)占用機(jī)制
- Execution內(nèi)存不夠用的時(shí)候贯卦,可以去Storage內(nèi)存中申請(qǐng)使用资柔;
- Storage內(nèi)存不夠用的時(shí)候也可以去Execution區(qū)域中聲明使用;
- 但是Storage內(nèi)存在使用完占用的Execution內(nèi)存之后撵割,需要?dú)w還對(duì)方的內(nèi)存贿堰,而Execution內(nèi)存在使用完Storage內(nèi)存是默認(rèn)是不歸還的。