上一篇文章【源碼剖析】- Spark 新舊內(nèi)存管理方案(上)介紹了舊的內(nèi)存管理方案以及其實現(xiàn)類 StaticMemoryManager 是如何工作的朱盐,本文將通過介紹 UnifiedMemoryManager 來介紹新內(nèi)存管理方案(以下統(tǒng)稱為新方案)姻锁。
內(nèi)存總體分布
系統(tǒng)預(yù)留
在新方案中矿筝,內(nèi)存依然分為三塊,分別是系統(tǒng)預(yù)留渗蟹、用于 storage吟榴、用于 execution塘幅。其中系統(tǒng)預(yù)留大小如下:
val reservedMemory = conf.getLong("spark.testing.reservedMemory",
if (conf.contains("spark.testing")) 0 else RESERVED_SYSTEM_MEMORY_BYTES)
生產(chǎn)環(huán)境中使用一般不會設(shè)置 spark.testing.reservedMemory
和 spark.testing
骤竹,所以我們認為系統(tǒng)預(yù)留空間大小置為 RESERVED_SYSTEM_MEMORY_BYTES
,即 300M俄认。
execution 和 storage 部分總大小
上一小節(jié)這段代碼是 UnifiedMemoryManager#getMaxMemory 的一個片段个少,該方法返回 execution 和 storage 可以共用的總空間,讓我們來看看這個方法的具體實現(xiàn):
private def getMaxMemory(conf: SparkConf): Long = {
//< 生產(chǎn)環(huán)境中一般不會設(shè)置 spark.testing.memory眯杏,所以這里認為 systemMemory 大小為 Jvm 最大可用內(nèi)存
val systemMemory = conf.getLong("spark.testing.memory", Runtime.getRuntime.maxMemory)
//< 系統(tǒng)預(yù)留 300M
val reservedMemory = conf.getLong("spark.testing.reservedMemory",
if (conf.contains("spark.testing")) 0 else RESERVED_SYSTEM_MEMORY_BYTES)
val minSystemMemory = reservedMemory * 1.5
//< 如果 systemMemory 小于450M夜焦,則拋異常
if (systemMemory < minSystemMemory) {
throw new IllegalArgumentException(s"System memory $systemMemory must " +
s"be at least $minSystemMemory. Please use a larger heap size.")
}
val usableMemory = systemMemory - reservedMemory
val memoryFraction = conf.getDouble("spark.memory.fraction", 0.75)
//< 最終 execution 和 storage 的可用內(nèi)存之和為 (JVM最大可用內(nèi)存 - 系統(tǒng)預(yù)留內(nèi)存) * spark.memory.fraction
(usableMemory * memoryFraction).toLong
}
從以上代碼及注釋我們可以看出,最終 execution 和 storage 的可用內(nèi)存之和為 (JVM最大可用內(nèi)存 - 系統(tǒng)預(yù)留內(nèi)存) * spark.memory.fraction
岂贩,默認為(JVM 最大可用內(nèi)存 - 300M)* 0.75
茫经。舉個例子,如果你為 execution 設(shè)置了2G 內(nèi)存河闰,那么 execution 和 storage 可用的總內(nèi)存為 (2048-300)*0.75=1311
execution 和 storage 部分默認大小
上一小節(jié)搞清了用于 execution 和 storage 的內(nèi)存之和 maxMemory科平,那么用于 execution 和 storage 的內(nèi)存分別為多少呢?看下面三段代碼:
object UnifiedMemoryManager 的 apply 方法用來構(gòu)造類 UnifiedMemoryManager 的實例
def apply(conf: SparkConf, numCores: Int): UnifiedMemoryManager = {
val maxMemory = getMaxMemory(conf)
new UnifiedMemoryManager(
conf,
maxMemory = maxMemory,
storageRegionSize =
(maxMemory * conf.getDouble("spark.memory.storageFraction", 0.5)).toLong,
numCores = numCores)
}
這段代碼確定在構(gòu)造 UnifiedMemoryManager
時:
- maxMemory 即 execution 和 storage 能共用的內(nèi)存總和為
getMaxMemory(conf)
姜性,即(JVM最大可用內(nèi)存 - 系統(tǒng)預(yù)留內(nèi)存) * spark.memory.fraction
- storageRegionSize 為
maxMemory * conf.getDouble("spark.memory.storageFraction", 0.5)
,在沒有設(shè)置spark.memory.storageFraction
的情況下為一半的 maxMemory
那么 storageRegionSize 是干嘛用的呢髓考?繼續(xù)看 UnifiedMemoryManager
和 MemoryManager
構(gòu)造函數(shù):
private[spark] class UnifiedMemoryManager private[memory] (
conf: SparkConf,
val maxMemory: Long,
storageRegionSize: Long,
numCores: Int)
extends MemoryManager(
conf,
numCores,
storageRegionSize,
maxMemory - storageRegionSize)
private[spark] abstract class MemoryManager(
conf: SparkConf,
numCores: Int,
storageMemory: Long,
onHeapExecutionMemory: Long) extends Logging
我們不難發(fā)現(xiàn):
- storageRegionSize 就是 storageMemory部念,大小為
maxMemory * conf.getDouble("spark.memory.storageFraction", 0.5)
,默認為maxMemory * 0.5
- execution 的大小為
maxMemory - storageRegionSize
,默認為maxMemory * 0.5
儡炼,即默認情況下 storageMemory 和 execution 能用的內(nèi)存相同妓湘,各占一半
互相借用內(nèi)存
新方案與舊方案最大的不同是:舊方案中 execution 和 storage 可用的內(nèi)存是固定死的,即使一方內(nèi)存不夠用而另一方有大把空閑內(nèi)存乌询,空閑一方也無法將結(jié)存借給不足一方榜贴,這樣降造成嚴重的內(nèi)存浪費。而新方案解決了這一點妹田,execution 和 storage 之間的內(nèi)存可以互相借用唬党,大大提供內(nèi)存利用率,也更好的滿足了不同資源側(cè)重的計算的需求
下面便來介紹新方案中內(nèi)存是如何互相借用的
acquireStorageMemory
先來看看 storage 從 execution 借用內(nèi)存是如何在分配 storage 內(nèi)存中發(fā)揮作用的
這一過程對應(yīng)的實現(xiàn)是 UnifiedMemoryManager#acquireStorageMemory
鬼佣,上面的流程圖應(yīng)該說明了是如何 storage 內(nèi)存及在 storage 內(nèi)存不足時是如何向 execution 借用內(nèi)存的
acquireExecutionMemory
該方法是給 execution 給指定 task 分配內(nèi)存的實現(xiàn)驶拱,當 execution pool 內(nèi)存不足時,會從 storage pool 中借晶衷。該方法在某些情況下可能會阻塞直到有足夠空閑內(nèi)存蓝纲。
在該方法內(nèi)部定義了兩個函數(shù):
- maybeGrowExecutionPool:會釋放storage中保存的數(shù)據(jù),減小storage部分內(nèi)存大小晌纫,從而增大Execution部分
- computeMaxExecutionPoolSize:計算在 storage 釋放內(nèi)存借給 execution 后税迷,execution 部分的內(nèi)存大小
在定義了這兩個方法后,直接調(diào)用 ExecutionMemoryPool#acquireMemory
方法锹漱,acquireMemory方法會一直處理該 task 的請求翁狐,直到分配到足夠內(nèi)存或系統(tǒng)判斷無法滿足該請求為止。acquireMemory 方法內(nèi)部有一個死循環(huán)凌蔬,循環(huán)內(nèi)部邏輯如下:
從上面的流程圖中露懒,我們可以知道當 execution pool 要為某個 task 分配內(nèi)存并且內(nèi)存不足時,會從 storage pool 中借用內(nèi)存砂心,能借用的最大 size 為 storage 的空閑內(nèi)存+之前 storage 從 execution 借走的內(nèi)存
懈词。這與 storage 從 execution 借用內(nèi)存不同,storage 只能從 execution 借走空閑的內(nèi)存辩诞,不能借走 execution 中已在使用的從 storage 借來的內(nèi)存坎弯,源碼中的解釋是如果要這么做實現(xiàn)太過復(fù)雜,暫時不支持译暂。
以上過程分析的是memoryMode 為 ON_HEAP 的情況抠忘,如果是 OFF_HEAP,則直接從 offHeapExecution 內(nèi)存池中分配外永,本文重點不在此崎脉,故不展開分析。