SPARK的內(nèi)存管理器
StaticMemoryManager,UnifiedMemoryManager
1.6以后默認是UnifiedMemoryManager.
這個內(nèi)存管理器在sparkContext中通過SparnEnv.create函數(shù)來創(chuàng)建SparkEnv的實例時,會生成.
通過spark.memory.useLegacyMode配置,可以控制選擇的內(nèi)存管理器實例.
如果設(shè)置為true時,選擇的實例為StaticMemoryManager實例,否則選擇UnifiedMemoryManager實例.默認情況下這個值為false.
val useLegacyMemoryManager = conf.getBoolean("spark.memory.useLegacyMode", false)
val memoryManager: MemoryManager =
if (useLegacyMemoryManager) {
new StaticMemoryManager(conf, numUsableCores)
} else {
UnifiedMemoryManager(conf, numUsableCores)
}
UnifiedMemoryManager
這個實例生成時,最大內(nèi)存的得到方法:
1,根據(jù)當前JVM的啟動內(nèi)存,減去300MB,
這個300MB可以通過spark.testing.reservedMemory配置得到.
2,最大內(nèi)存值,通過1計算出來的內(nèi)存值,與spark.memory.fraction配置的系數(shù)進行相乘.默認是0.75.
示例:如果JVM配置的內(nèi)存為1GB,那么可使用的最大內(nèi)存為(1GB-300MB)*0.75
需要的配置項:
配置項spark.memory.fraction,默認值0.75,這個配置用于配置當前的內(nèi)存管理器的最大內(nèi)存使用比例.
配置項spark.memory.storageFraction,默認值0.5,這個配置用于配置rdd的storage與cache的默認分配的內(nèi)存池大小.
配置項spark.memory.offHeap.size,默認值0,這個配置用于配置非堆內(nèi)存的大小,默認不啟用.這個不做分析.
在實例生成后,默認會根據(jù)storage的內(nèi)存權(quán)重,總內(nèi)存減去storage的內(nèi)存權(quán)重,生成兩個內(nèi)存池storageMemoryPool與onHeapExecutionMemoryPool.
onHeapExecutionMemoryPool用于在執(zhí)行executor的shuffle操作時,使用的內(nèi)存,
storageMemoryPool用于在執(zhí)行rdd的cache操作時,使用的內(nèi)存.
在Executor執(zhí)行時的內(nèi)存分配
這個操作通常是在task執(zhí)行shuffle操作時,計算spill時,在內(nèi)存中的CACHE時使用的內(nèi)存.通過調(diào)用實例中的acquireExecutionMemory函數(shù)來申請內(nèi)存.
override private[memory] def acquireExecutionMemory(
numBytes: Long,
taskAttemptId: Long,
memoryMode: MemoryMode): Long = synchronized {
這個函數(shù)傳入的memoryMode可選擇是使用堆內(nèi)存還是直接使用本地內(nèi)存,默認是使用堆內(nèi)存.
assert(onHeapExecutionMemoryPool.poolSize +
storageMemoryPool.poolSize == maxMemory)
assert(numBytes >= 0)
memoryMode match {
case MemoryMode.ON_HEAP =>
這里定義的這個函數(shù),用于判斷numBytes(需要申請的內(nèi)存大小)減去當前內(nèi)存池中可用的內(nèi)存大小是否夠用,如果不夠用,這個函數(shù)的傳入值是一個正數(shù)
def maybeGrowExecutionPool(extraMemoryNeeded: Long): Unit = {
if (extraMemoryNeeded > 0) {
這里根據(jù)當前的rdd的cache中的內(nèi)存池的大小,減去配置的storage的存儲大小,與當前storage的內(nèi)存池中的可用大小,取最大值出來,這個值表示是一個可用于回收的內(nèi)存資源.
val memoryReclaimableFromStorage =
math.max(storageMemoryPool.memoryFree,
storageMemoryPool.poolSize - storageRegionSize)
if (memoryReclaimableFromStorage > 0) {
首先根據(jù)計算出來的storage中可以進行回收的資源,通過StorageMemoryPool進行資源的釋放.得到一個完成釋放的資源大小.這里根據(jù)executor中task需要的內(nèi)存與storage可回收的資源取最小值進行資源的回收.把得到的可用資源添加到executor的內(nèi)存池中.
// Only reclaim as much space as is necessary and available:
val spaceReclaimed = storageMemoryPool.shrinkPoolToFreeSpace(
math.min(extraMemoryNeeded, memoryReclaimableFromStorage))
onHeapExecutionMemoryPool.incrementPoolSize(spaceReclaimed)
}
}
}
這個函數(shù)用于計算executor的內(nèi)存池可以使用的最大內(nèi)存大小.最小可以使用總內(nèi)存減去storage的配置權(quán)重,也就是默認情況下,shuffle的executor的內(nèi)存最小可以使用0.5的權(quán)重的內(nèi)存.
def computeMaxExecutionPoolSize(): Long = {
maxMemory - math.min(storageMemoryUsed, storageRegionSize)
}
執(zhí)行內(nèi)存的分配操作.
onHeapExecutionMemoryPool.acquireMemory(
numBytes, taskAttemptId, maybeGrowExecutionPool,
computeMaxExecutionPoolSize)
case MemoryMode.OFF_HEAP =>
offHeapExecutionMemoryPool.acquireMemory(numBytes, taskAttemptId)
}
}
給executor中的task分配需要的內(nèi)存:
private[memory] def acquireMemory(
numBytes: Long,
taskAttemptId: Long,
maybeGrowPool: Long => Unit = (additionalSpaceNeeded: Long) => Unit,
computeMaxPoolSize: () => Long = () => poolSize): Long = lock.synchronized {
assert(numBytes > 0, s"invalid number of bytes requested: $numBytes")
// TODO: clean up this clunky method signature
if (!memoryForTask.contains(taskAttemptId)) {
這里首先檢查這個task是否在memoryForTask中存在,如果不存在時,表示這個task是第一次申請內(nèi)存,在這個集合中設(shè)置此task的當前使用內(nèi)存為0,并喚醒所有的當前的executor的等待的task.
memoryForTask(taskAttemptId) = 0L
// This will later cause waiting tasks to wake up and check numTasks again
lock.notifyAll()
}
// TODO: simplify this to limit each task to its own slot
while (true) {
執(zhí)行內(nèi)存的分配操作,這個操作會一直進行迭代,直到滿足一定的條件.
首先得到當前的executor中有申請內(nèi)存的task的個數(shù),并得到當前的task的使用內(nèi)存量.
val numActiveTasks = memoryForTask.keys.size
val curMem = memoryForTask(taskAttemptId)
計算出需要申請的內(nèi)存與當前內(nèi)存池中的內(nèi)存,是否需要對storage中的內(nèi)存進行回收.如果需要申請的內(nèi)存大于了當前內(nèi)存池的內(nèi)存,這個參數(shù)傳入為一個大于0的數(shù),這個時候會對storage的內(nèi)存進行回收.
maybeGrowPool(numBytes - memoryFree)
這里計算出executor的內(nèi)存值可以使用的最大內(nèi)存,默認情況下,最小可使用內(nèi)存為總內(nèi)存減去storage的配置內(nèi)存.也就是默認可使用50%的內(nèi)存.
val maxPoolSize = computeMaxPoolSize()
這里計算出每個task平均可使用的最大內(nèi)存大小,與最小內(nèi)存大小.
如:有5個task,可使用100MB的內(nèi)存,那么最大可使用的內(nèi)存為20MB,最小可使用的內(nèi)存為10MB.
val maxMemoryPerTask = maxPoolSize / numActiveTasks
val minMemoryPerTask = poolSize / (2 * numActiveTasks)
這里計算出當前可申請的內(nèi)存.能夠申請的內(nèi)存總量不能超過平均每個task使用內(nèi)存的平均大小.
// How much we can grant this task; keep its share within 0 <= X <= 1 / numActiveTasks
val maxToGrant = math.min(numBytes, math.max(0, maxMemoryPerTask - curMem))
val toGrant = math.min(maxToGrant, memoryFree)
這里控制迭代是否可以跳出的條件.如果可申請的內(nèi)存小于需要申請的內(nèi)存,同時當前task使用的內(nèi)存加上可申請的內(nèi)存小于每個task平均使用的內(nèi)存時,這個申請操作會wait住.等待其它的task資源回收時進行喚醒.否則跳出迭代,返回可申請的內(nèi)存.
if (toGrant < numBytes && curMem + toGrant < minMemoryPerTask) {
logInfo(s"TID $taskAttemptId waiting for at least 1/2N of
$poolName pool to be free")
lock.wait()
} else {
memoryForTask(taskAttemptId) += toGrant
return toGrant
}
}
0L // Never reached
}
在BLOCK的CACHE時的內(nèi)存分配
在執(zhí)行rdd的iterator操作時,如果對rdd執(zhí)行過cache或者persist的操作時,也就是storage的級別不是none時,會對數(shù)據(jù)進行cache操作.在cache后的block中有一個超時時間,這個超時時間在blockManager中通過一個定時器,會定時去刪除cache的block與的Broadcast數(shù)據(jù).
如果是BLOCK的CACHE的超時,可通過spark.cleaner.ttl.BLOCK_MANAGER配置.
在對RDD執(zhí)行cache操作時,最終會調(diào)用內(nèi)存管理器中的acquireStorageMemory函數(shù)來進行操作.
override def acquireStorageMemory(
blockId: BlockId,
numBytes: Long,
evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)])
: Boolean = synchronized {
這個傳入的參數(shù)中,evictedBlocks是一個用于返回的傳入?yún)?shù),這個集合中表示進行這次申請后,被淘汰掉的block的信息.
assert(onHeapExecutionMemoryPool.poolSize
+ storageMemoryPool.poolSize == maxMemory)
assert(numBytes >= 0)
如果當前的BLOCK的的CACHE的大小已經(jīng)超過了當前可用的內(nèi)存總量(總內(nèi)存減去executor的使用內(nèi)存)時,直接返回false,表示不做內(nèi)存分配.申請的內(nèi)存太多,不處理了.
if (numBytes > maxStorageMemory) {
// Fail fast if the block simply won't fit
logInfo(s"Will not store $blockId as the required space
($numBytes bytes) exceeds our " +
s"memory limit ($maxStorageMemory bytes)")
return false
}
如果當前申請的block需要的內(nèi)存大小超過了當前storage的內(nèi)存池可用的內(nèi)存大小時,在executor的內(nèi)存池中回收部分資源,原則是如果申請的內(nèi)存小于executor內(nèi)存池可用的內(nèi)存,回收申請的大小,否則回收executor所有的可用內(nèi)存.并執(zhí)行內(nèi)存的分配操作.
if (numBytes > storageMemoryPool.memoryFree) {
// There is not enough free memory in the storage pool,
// so try to borrow free memory from
// the execution pool.
val memoryBorrowedFromExecution =
Math.min(onHeapExecutionMemoryPool.memoryFree, numBytes)
onHeapExecutionMemoryPool.decrementPoolSize(memoryBorrowedFromExecution)
storageMemoryPool.incrementPoolSize(memoryBorrowedFromExecution)
}
storageMemoryPool.acquireMemory(blockId, numBytes, evictedBlocks)
}
在StorageMemoryPool中執(zhí)行block的內(nèi)存分配:
def acquireMemory(
blockId: BlockId,
numBytesToAcquire: Long,
numBytesToFree: Long,
evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)])
: Boolean = lock.synchronized {
第二個參數(shù)是需要申請的內(nèi)存,第三個參數(shù)如果是0表示可用內(nèi)存大于申請內(nèi)存,大于0表示可用內(nèi)存不夠用.
assert(numBytesToAcquire >= 0)
assert(numBytesToFree >= 0)
assert(memoryUsed <= poolSize)
這里,如果可用的內(nèi)存不夠用時,通過MemoryStore中的evictBlocksToFreeSpace函數(shù)來對當前的cache進行淘汰.
if (numBytesToFree > 0) {
memoryStore.evictBlocksToFreeSpace(Some(blockId), numBytesToFree,
evictedBlocks)
// Register evicted blocks, if any, with the active task metrics
Option(TaskContext.get()).foreach { tc =>
val metrics = tc.taskMetrics()
val lastUpdatedBlocks = metrics.updatedBlocks.getOrElse(Seq(BlockId,
BlockStatus))
metrics.updatedBlocks = Some(lastUpdatedBlocks ++ evictedBlocks.toSeq)
}
}
如果完成資源的回收后,當前可用的內(nèi)存大于要申請的內(nèi)存,表示申請成功,返回的值為true,否則為false.
// NOTE: If the memory store evicts blocks,
// then those evictions will synchronously call
// back into this StorageMemoryPool in order to free memory. Therefore,
// these variables
// should have been updated.
val enoughMemory = numBytesToAcquire <= memoryFree
if (enoughMemory) {
_memoryUsed += numBytesToAcquire
}
enoughMemory
}
Block的cache的淘汰
在executor的內(nèi)存池不夠使用時,或者總的內(nèi)存不夠使用時,會執(zhí)行storage的內(nèi)存池的資源回收操作.由shrinkPoolToFreeSpace函數(shù),這個函數(shù)通過調(diào)用MemoryStorage中的evictBlocksToFreeSpace函數(shù)來進行block的淘汰(如果是對block的cache時,申請內(nèi)存不夠時會直接調(diào)用這個函數(shù)來淘汰老的block)
shrinkPoolToFreeSpace函數(shù)用于在executor的內(nèi)存不夠時,需要storage的內(nèi)存池釋放資源給executor使用時調(diào)用.
這個過程中,可給executor提供的內(nèi)存分為五種可能:
1,storage默認的內(nèi)存空間還沒有使用完成,同時executor需要的空間小于等于storage的內(nèi)存池的可用空間,直接在storage的內(nèi)存池中釋放需要的大小.
2,storage默認的內(nèi)存空間還沒有使用完成,同時executor需要的空間大于storage的內(nèi)存池的可用空間,這個時候storage的可用空間全部進行釋放,但這個時候不會做block的淘汰操作.
3,storage的默認的內(nèi)存空間使用完成,這個時候storage的內(nèi)存池比默認的storage的配置權(quán)重要多,同時executor需要申請的內(nèi)存小于多出的部分,對storage內(nèi)存池中的block進行淘汰直到夠executor的申請內(nèi)存結(jié)束,這個時候storage的使用內(nèi)存還是大于storage的默認配置權(quán)重大小.
4,storage的默認的內(nèi)存空間使用完成,這個時候storage的內(nèi)存池比默認的storage的配置權(quán)重要多,同時executor需要申請的內(nèi)存大于或等于多出的部分,對storage內(nèi)存池中的block進行淘汰直到但最多只淘汰到storage的配置權(quán)重大小就結(jié)束淘汰.
5,storage剛好使用到了配置的權(quán)重,無法進行分配.
def shrinkPoolToFreeSpace(spaceToFree: Long): Long = lock.synchronized {
首先根據(jù)需要釋放的內(nèi)存,1=executor申請的內(nèi)存,2-1=storage內(nèi)存池可用的內(nèi)存,2-2=storage中占用的內(nèi)存大于默認給storage分配的權(quán)重.
這里根據(jù)這個要釋放的資源與內(nèi)存池可用的資源取最小值進行釋放,如果申請的小于可用的,不會對block進行淘汰操作,否則對block進行淘汰操作,直接淘汰到可用的內(nèi)存空間結(jié)束.
// First, shrink the pool by reclaiming free memory:
val spaceFreedByReleasingUnusedMemory = math.min(spaceToFree, memoryFree)
decrementPoolSize(spaceFreedByReleasingUnusedMemory)
val remainingSpaceToFree = spaceToFree - spaceFreedByReleasingUnusedMemory
if (remainingSpaceToFree > 0) {
// If reclaiming free memory did not adequately shrink the pool,
// begin evicting blocks:
val evictedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
memoryStore.evictBlocksToFreeSpace(None, remainingSpaceToFree, evictedBlocks)
val spaceFreedByEviction = evictedBlocks.map(_._2.memSize).sum
// When a block is released, BlockManager.dropFromMemory()
// calls releaseMemory(), so we do
// not need to decrement _memoryUsed here. However, we do need to decrement the
// pool size.
decrementPoolSize(spaceFreedByEviction)
spaceFreedByReleasingUnusedMemory + spaceFreedByEviction
} else {
spaceFreedByReleasingUnusedMemory
}
}
evictBlocksToFreeSpace函數(shù)這個函數(shù)用于對storage的內(nèi)存空間中釋放掉部分block的存儲空間的函數(shù),由MemoryStorage進行實現(xiàn).
這個函數(shù)的三個傳入?yún)?shù)中:
第一個在block的cache時,會傳入blockid,如果是executor要求釋放時,傳入為None,這個參數(shù)用于控制釋放的資源,如果傳入了blockid,那么這個block對應(yīng)的rdd的所有的所有的CACHE都會被保留,只釋放其它的RDD對應(yīng)的BLOCK的CACHE,如果傳入為None時,不區(qū)分BLOCK,從頭開始迭代,直接釋放到需要的內(nèi)存大小結(jié)束.
第二個是需要釋放的內(nèi)存大小.
第三個參數(shù)是釋放后的block的集合,這個集合內(nèi)容就是從內(nèi)存中淘汰出去的block.
private[spark] def evictBlocksToFreeSpace(
blockId: Option[BlockId],
space: Long,
droppedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = {
assert(space > 0)
memoryManager.synchronized {
var freedMemory = 0L
這里得到傳入的blockid對應(yīng)的rdd.如果傳入的blockId是None時,這個rdd也就不存在.
val rddToAdd = blockId.flatMap(getRddId)
val selectedBlocks = new ArrayBuffer[BlockId]
// This is synchronized to ensure that the set of entries is not changed
// (because of getValue or getBytes) while traversing the iterator, as that
// can lead to exceptions.
entries.synchronized {
val iterator = entries.entrySet().iterator()
這里從所有的cache的block中進行迭代,如果迭代的block的rdd不是現(xiàn)在需要cache的block對應(yīng)的rdd時(傳入的blockId對應(yīng)的RDD),就選擇這個block.并釋放內(nèi)存大小.
while (freedMemory < space && iterator.hasNext) {
val pair = iterator.next()
val blockId = pair.getKey
if (rddToAdd.isEmpty || rddToAdd != getRddId(blockId)) {
selectedBlocks += blockId
freedMemory += pair.getValue.size
}
}
}
if (freedMemory >= space) {
logInfo(s"${selectedBlocks.size} blocks selected for dropping")
for (blockId <- selectedBlocks) {
這里對選擇的block,通過blockManager釋放掉block的cache占用的內(nèi)存.如果這個block的cache的級別中包含有disk的級別時,釋放掉內(nèi)存的同時會把這個cache的數(shù)據(jù)寫入到磁盤中.
把執(zhí)行釋放后的block的集合添加到傳入?yún)?shù)的droppedBlocks的集合參數(shù)中,用于數(shù)據(jù)的返回.
val entry = entries.synchronized { entries.get(blockId) }
// This should never be null as only one task should be dropping
// blocks and removing entries. However the check is still here for
// future safety.
if (entry != null) {
val data = if (entry.deserialized) {
Left(entry.value.asInstanceOf[Array[Any]])
} else {
Right(entry.value.asInstanceOf[ByteBuffer].duplicate())
}
val droppedBlockStatus = blockManager.dropFromMemory(blockId, data)
droppedBlockStatus.foreach { status => droppedBlocks +=
((blockId, status)) }
}
}
true
} else {
blockId.foreach { id =>
logInfo(s"Will not store $id as it would require dropping another block " +
"from the same RDD")
}
false
}
}
}
StaticMemoryManager
這個實例在1.6的環(huán)境下,需要把配置項spark.memory.useLegacyMode設(shè)置為true時,才會被啟用.下面首先先看看這個實例生成時的處理:
需要的配置項:
1,配置項spark.shuffle.memoryFraction,用于設(shè)置executor的shuffle操作可使用的內(nèi)存,默認占總內(nèi)存的0.2.
2,配置項spark.shuffle.safetyFraction,用于設(shè)置executor的shuffle的安全操作內(nèi)存,默認占1配置內(nèi)存的0.8.
3,配置項spark.storage.memoryFraction,用于設(shè)置block cache的使用內(nèi)存,默認占總內(nèi)存的0.6;
4,配置項spark.storage.safetyFraction,用于設(shè)置block cache的安全使用內(nèi)存,默認占3配置內(nèi)存的0.9;
5,配置項spark.storage.unrollFraction,默認值是storage內(nèi)存總大于的0.2;這個有于在storage中cache的block的數(shù)據(jù)的反序列化時數(shù)據(jù)的展開使用空間.
def this(conf: SparkConf, numCores: Int) {
this(
conf,
StaticMemoryManager.getMaxExecutionMemory(conf),
StaticMemoryManager.getMaxStorageMemory(conf),
numCores)
}
在Executor執(zhí)行時的內(nèi)存分配
在StaticMemoryManager中,對executor中的shuffle的內(nèi)存執(zhí)行分配這塊其實并沒有統(tǒng)一內(nèi)存管理中那么麻煩,只是在分配的固定大小的存儲空間中進行分配,如果無法再進行分配時,這個分配函數(shù)返回的分配量就是0.
private[memory] override def acquireExecutionMemory(
numBytes: Long,
taskAttemptId: Long,
memoryMode: MemoryMode): Long = synchronized {
OFF_HEAP的模式這里就不分析了,我在代碼里沒發(fā)現(xiàn)有地方去調(diào)用,好像申請內(nèi)存時,是直接寫死的ON_HEAP的模式.在這個地方,不會考慮executor的內(nèi)存池中的內(nèi)存是否夠用,直接通過ExecutionMemoryPool內(nèi)存池實例中的分配內(nèi)存函數(shù)進行內(nèi)存的分配 .
memoryMode match {
case MemoryMode.ON_HEAP => onHeapExecutionMemoryPool.acquireMemory(numBytes,
taskAttemptId)
case MemoryMode.OFF_HEAP => offHeapExecutionMemoryPool.acquireMemory(numBytes,
taskAttemptId)
}
}
內(nèi)存分配部分的代碼實現(xiàn):這個部分與統(tǒng)一內(nèi)存管理部分是一樣的,
private[memory] def acquireMemory(
numBytes: Long,
taskAttemptId: Long,
maybeGrowPool: Long => Unit = (additionalSpaceNeeded: Long) => Unit,
computeMaxPoolSize: () => Long = () => poolSize): Long = lock.synchronized {
assert(numBytes > 0, s"invalid number of bytes requested: $numBytes")
// TODO: clean up this clunky method signature
首先如果說task是第一次申請內(nèi)存,添加這個task到內(nèi)存池的集合屬性中,并把這個task的使用內(nèi)存設(shè)置為0.
if (!memoryForTask.contains(taskAttemptId)) {
memoryForTask(taskAttemptId) = 0L
// This will later cause waiting tasks to wake up and check numTasks again
lock.notifyAll()
}
下面開始迭代進行內(nèi)存的分配.加上while的目的是為了保持如果task的分配內(nèi)存達不到指定的大小時,就一直等待分配,直到達到指定的大小.
// TODO: simplify this to limit each task to its own slot
while (true) {
val numActiveTasks = memoryForTask.keys.size
val curMem = memoryForTask(taskAttemptId)
在這里,這個函數(shù)是一個空的實現(xiàn),什么都不會做.
maybeGrowPool(numBytes - memoryFree)
這個函數(shù)得到的值就是當前的executor的內(nèi)存池的poolsize的大小.
val maxPoolSize = computeMaxPoolSize()
根據(jù)當前的活動的task的個數(shù)計算出每個task可使用的最大內(nèi)存,每個task使用的最小內(nèi)存為最大內(nèi)存除以2(如果申請的內(nèi)存本身小于這個最小內(nèi)存除外).
val maxMemoryPerTask = maxPoolSize / numActiveTasks
val minMemoryPerTask = poolSize / (2 * numActiveTasks)
計算出這次需要分配的內(nèi)存,如果申請的內(nèi)存小于可用的內(nèi)存時,取申請內(nèi)存,否則取這個task可申請的最大內(nèi)存
// How much we can grant this task; keep its share within 0 <= X <= 1 / numActiveTasks
val maxToGrant = math.min(numBytes, math.max(0, maxMemoryPerTask - curMem))
這里計算出來的值根據(jù)當前原則上可以申請的內(nèi)存與當前內(nèi)存池中的可用內(nèi)存取最小值.
val toGrant = math.min(maxToGrant, memoryFree)
這里有一個線程wait的條件,如果這一次申請的內(nèi)存小于需要申請的內(nèi)存,同時當前的task的使用內(nèi)存小于最小的使用內(nèi)存時,線程wait,等待其它的task釋放內(nèi)存或者有新的task加入來喚醒此wait.
// We want to let each task get at least 1 / (2 * numActiveTasks) before blocking;
// if we can't give it this much now, wait for other tasks to free up memory
// (this happens if older tasks allocated lots of memory before N grew)
if (toGrant < numBytes && curMem + toGrant < minMemoryPerTask) {
logInfo(s"TID $taskAttemptId waiting for at least 1/2N of
$poolName pool to be free")
lock.wait()
} else {
memoryForTask(taskAttemptId) += toGrant
return toGrant
}
}
0L // Never reached
}
Storage的展開內(nèi)存分配
這里說明下,在UnifiedMemoryManager中展開內(nèi)存的分配與stroage中block cache的內(nèi)存分配共用相同的內(nèi)存空間,因此申請方法與storage的block cache的內(nèi)存分配相同,而在static的分配中,不同的區(qū)塊,所使用的內(nèi)存空間都是固定的,因此這里需要獨立說明一下.
在對MemoryStorage中執(zhí)行block的cache操作時,會執(zhí)行pubInterator等操作,會先根據(jù)block中的數(shù)據(jù)申請對應(yīng)數(shù)據(jù)大小的展開內(nèi)存空間,把數(shù)據(jù)進行提取,然后才會執(zhí)行storage的cache操作的內(nèi)存分配.
執(zhí)行流程,在MemoryStorage中:
1,putIterator函數(shù)執(zhí)行,對block進行cache
2,unrollSafely函數(shù)執(zhí)行,申請展開內(nèi)存,根據(jù)block的內(nèi)容大小.
3,釋放申請的展開內(nèi)存,并申請block cache內(nèi)存,執(zhí)行putArray->tryToPut函數(shù).
看看unrollSafely函數(shù)如何處理展開內(nèi)存的申請:
這里先配置項spark.storage.unrollMemoryThreshold,默認值是1MB,先申請固定大小的展開內(nèi)存,這個函數(shù)返回的值是一個true/false的值,true表示申請成功.這個函數(shù)調(diào)用內(nèi)存管理器中的acquireUnrollMemory函數(shù).這里申請到的內(nèi)存大小會先記錄到unrollMemoryMap集合中根據(jù)對應(yīng)的taskid.
keepUnrolling = reserveUnrollMemoryForThisTask(blockId, initialMemoryThreshold,
droppedBlocks)
接下來,迭代block中的數(shù)據(jù),把數(shù)據(jù)添加到vector的展開臨時變量中.
while (values.hasNext && keepUnrolling) {
vector += values.next()
if (elementsUnrolled % memoryCheckPeriod == 0) {
// If our vector's size has exceeded the threshold, request more memory
val currentSize = vector.estimateSize()
if (currentSize >= memoryThreshold) {
這里每次申請當前的使用內(nèi)存的一半做為展開內(nèi)存,這個展開內(nèi)存伴隨著block的數(shù)據(jù)越多,申請的量也會越大.
val amountToRequest = (currentSize * memoryGrowthFactor -
memoryThreshold).toLong
keepUnrolling = reserveUnrollMemoryForThisTask(
blockId, amountToRequest, droppedBlocks)
// New threshold is currentSize * memoryGrowthFactor
memoryThreshold += amountToRequest
}
}
elementsUnrolled += 1
}
這里的判斷比較關(guān)鍵,如果keepUnrolling的值為true,表示內(nèi)存能夠安全展開這個block的數(shù)據(jù),否則表示不能展開這個block的內(nèi)容.
if (keepUnrolling) {
// We successfully unrolled the entirety of this block
Left(vector.toArray)
} else {
// We ran out of space while unrolling the values for this block
logUnrollFailureMessage(blockId, vector.estimateSize())
Right(vector.iterator ++ values)
}
if (keepUnrolling) {
val taskAttemptId = currentTaskAttemptId()
memoryManager.synchronized {
這里如果內(nèi)存能夠安全展開當前的block,把這個block的展開內(nèi)存存儲到pendingUnrollMemoryMap的集合中對應(yīng)此task的位置.
// Since we continue to hold onto the array until we actually cache it, we cannot
// release the unroll memory yet. Instead, we transfer it to pending unroll memory
// so tryToPut
can further transfer it to normal storage memory later.
// TODO: we can probably express this without pending unroll memory (SPARK-10907)
val amountToTransferToPending = currentUnrollMemoryForThisTask -
previousMemoryReserved
unrollMemoryMap(taskAttemptId) -= amountToTransferToPending
pendingUnrollMemoryMap(taskAttemptId) =
pendingUnrollMemoryMap.getOrElse(taskAttemptId, 0L) +
amountToTransferToPending
}
}
提示:關(guān)于展開內(nèi)存的釋放部分,如果block的內(nèi)容能夠被安全展開存儲到內(nèi)存中時,這個時候,在做block的storage的操作時,會釋放掉展開內(nèi)存的空間(在pendingUnrollMemoryMap集合中),如果內(nèi)存不能夠安全展開block的內(nèi)容時,這個時候無法進行block的cache操作(可能會寫磁盤),這時申請的內(nèi)容大小存儲在unrollMemoryMap集合中,這時由于不會執(zhí)行block的memory的cache操作,因此這個集合中占用的內(nèi)存大小暫時不會被回收,只有等到這個task結(jié)束時,占用的unrollMemoryMap集合中的內(nèi)存才會被回收.
...
接下來看看在StaticMemoryManager中如何處理展開內(nèi)存的分配:
override def acquireUnrollMemory(
blockId: BlockId,
numBytes: Long,
evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)])
: Boolean = synchronized {
val currentUnrollMemory = storageMemoryPool.memoryStore.currentUnrollMemory
val freeMemory = storageMemoryPool.memoryFree
這里根據(jù)可用的最大展開內(nèi)存與當前正在使用中的展開內(nèi)存,計算出可以申請的最大展開內(nèi)存,如果這里得到的值是一個0時,表示不需要釋放block cache的內(nèi)存,如果是一個大于0的值,就表示需要釋放BLOCK CACHE的內(nèi)存.
// When unrolling, we will use all of the existing free memory, and, if necessary,
// some extra space freed from evicting cached blocks. We must place a cap on the
// amount of memory to be evicted by unrolling, however, otherwise unrolling one
// big block can blow away the entire cache.
val maxNumBytesToFree = math.max(0, maxUnrollMemory - currentUnrollMemory -
freeMemory)
這里計算出需要釋放的內(nèi)存,取申請的資源與可以使用的unroll內(nèi)存資源的最小值,如果這個一個大于0的值,表示需要從storage的內(nèi)存池中釋放這么多的內(nèi)存出來.
// Keep it within the range 0 <= X <= maxNumBytesToFree
val numBytesToFree = math.max(0, math.min(maxNumBytesToFree,
numBytes - freeMemory))
storageMemoryPool.acquireMemory(blockId, numBytes, numBytesToFree,
evictedBlocks)
}
Storage中block cache的內(nèi)存分配
在block使用了memory的storage時,同時block的內(nèi)容能夠被展開內(nèi)存存儲起來時,會通過MemoryStorage中對應(yīng)的函數(shù)來向StaticMemoryManager中的acquireStorageMemory函數(shù)申請內(nèi)存資源.
override def acquireStorageMemory(
blockId: BlockId,
numBytes: Long,
evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)])
: Boolean = synchronized {
if (numBytes > maxStorageMemory) {
如果一個block的內(nèi)容太大,已經(jīng)超過了配置的storage的存儲空間大小,這個block不做cache.
// Fail fast if the block simply won't fit
logInfo(s"Will not store $blockId as the required space
($numBytes bytes) exceeds our " +
s"memory limit ($maxStorageMemory bytes)")
false
} else {
否則通過storage的內(nèi)存池執(zhí)行block的cache的內(nèi)存申請,這個過程中如果內(nèi)存不夠用時,會釋放老的block的cache對應(yīng)的內(nèi)存空間,也就是會淘汰掉老的block cache,
storageMemoryPool.acquireMemory(blockId, numBytes, evictedBlocks)
}
}
在storage的內(nèi)存池中處理block cache的內(nèi)存申請:
def acquireMemory(
blockId: BlockId,
numBytesToAcquire: Long,
numBytesToFree: Long,
evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)])
: Boolean = lock.synchronized {
這個函數(shù)的傳入?yún)?shù)中,numBytesToAcquire表示需要申請的內(nèi)存大小,numBytesToFree如果是一個大于0的值,表示現(xiàn)在內(nèi)存池中的內(nèi)存空間不夠,需要淘汰現(xiàn)有的block的cache.
assert(numBytesToAcquire >= 0)
assert(numBytesToFree >= 0)
assert(memoryUsed <= poolSize)
這里先判斷,如果申請的內(nèi)存大于還在可用的內(nèi)存,需要先淘汰掉部分block cache來釋放空間.
if (numBytesToFree > 0) {
memoryStore.evictBlocksToFreeSpace(Some(blockId), numBytesToFree,
evictedBlocks)
// Register evicted blocks, if any, with the active task metrics
Option(TaskContext.get()).foreach { tc =>
val metrics = tc.taskMetrics()
val lastUpdatedBlocks = metrics.updatedBlocks.getOrElse(Seq(BlockId,
BlockStatus))
metrics.updatedBlocks = Some(lastUpdatedBlocks ++ evictedBlocks.toSeq)
}
}
分配內(nèi)存是否成功,也就是要申請的內(nèi)存小于或等于可用的內(nèi)存空間,最后把分配的內(nèi)存添加到使用的內(nèi)存空間中.表示這部分內(nèi)存已經(jīng)被look住.
val enoughMemory = numBytesToAcquire <= memoryFree
if (enoughMemory) {
_memoryUsed += numBytesToAcquire
}
enoughMemory
}
Storage處理block cache的淘汰
在storage中內(nèi)存不夠使用時,通過memoryStorage去執(zhí)行block的淘汰,并把淘汰后的block返回通知上層的調(diào)用端.
if (numBytesToFree > 0) {
memoryStore.evictBlocksToFreeSpace(Some(blockId), numBytesToFree, evictedBlocks)
// Register evicted blocks, if any, with the active task metrics
Option(TaskContext.get()).foreach { tc =>
val metrics = tc.taskMetrics()
val lastUpdatedBlocks = metrics.updatedBlocks.getOrElse(Seq(BlockId,
BlockStatus))
metrics.updatedBlocks = Some(lastUpdatedBlocks ++ evictedBlocks.toSeq)
}
}
MemoryStore中處理對cache的淘汰:
對block的cache進行淘汰的處理函數(shù),傳入?yún)?shù)中,第二個參數(shù)是需要釋放的空間,第三個參數(shù)是被淘汰后的block的集合用于返回.
private[spark] def evictBlocksToFreeSpace(
blockId: Option[BlockId],
space: Long,
droppedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = {
assert(space > 0)
memoryManager.synchronized {
var freedMemory = 0L
這里首先先得到傳入的block對應(yīng)的rdd的id.得到這個rdd_id的目的是淘汰block時,如果發(fā)現(xiàn)是這個rdd的block時,不進行淘汰.
val rddToAdd = blockId.flatMap(getRddId)
val selectedBlocks = new ArrayBuffer[BlockId]
// This is synchronized to ensure that the set of entries is not changed
// (because of getValue or getBytes) while traversing the iterator, as that
// can lead to exceptions.
entries.synchronized {
這里開始對storage的內(nèi)存中所有的cache進行迭代,這個迭代從最先進行cache的block開始,如果迭代到的block對應(yīng)的RDD不是傳入的BLOCK對應(yīng)的RDD時,把這個BLOCK添加到選擇的BLOCK的集合中,并計算當前內(nèi)存池中的內(nèi)存是否達到需要的內(nèi)存空間,如果達到,停止選擇BLOCK的操作.
val iterator = entries.entrySet().iterator()
while (freedMemory < space && iterator.hasNext) {
val pair = iterator.next()
val blockId = pair.getKey
if (rddToAdd.isEmpty || rddToAdd != getRddId(blockId)) {
selectedBlocks += blockId
freedMemory += pair.getValue.size
}
}
}
if (freedMemory >= space) {
如果流程執(zhí)行到這里,說明內(nèi)存空間釋放成功,現(xiàn)在可用的內(nèi)存空間已經(jīng)達到需要的內(nèi)存空間的大小,把選擇的BLOCK對應(yīng)的CACHE通過BLOCKMANAGER從內(nèi)存中進行釋放.并把釋放后的BLOCK添加到droppedBlocks的集合中,這個集合用于返回結(jié)果,表示這次空間的釋放時,這些BLOCK已經(jīng)從CACHE中稱出.
logInfo(s"${selectedBlocks.size} blocks selected for dropping")
for (blockId <- selectedBlocks) {
val entry = entries.synchronized { entries.get(blockId) }
// This should never be null as only one task should be dropping
// blocks and removing entries. However the check is still here for
// future safety.
if (entry != null) {
val data = if (entry.deserialized) {
Left(entry.value.asInstanceOf[Array[Any]])
} else {
Right(entry.value.asInstanceOf[ByteBuffer].duplicate())
}
val droppedBlockStatus = blockManager.dropFromMemory(blockId, data)
droppedBlockStatus.foreach { status => droppedBlocks += ((blockId,
status)) }
}
}
true
} else {
流程執(zhí)行到這里,表示STORAGE的內(nèi)存空間中無法釋放出更多的內(nèi)存,也就相當于是釋放空間失敗求.
blockId.foreach { id =>
logInfo(s"Will not store $id as it would require dropping another block " +
"from the same RDD")
}
false
}
}
}