【轉(zhuǎn)載】spark的內(nèi)存分配管理

SPARK的內(nèi)存管理器
StaticMemoryManager,UnifiedMemoryManager
1.6以后默認是UnifiedMemoryManager.
這個內(nèi)存管理器在sparkContext中通過SparnEnv.create函數(shù)來創(chuàng)建SparkEnv的實例時,會生成.
通過spark.memory.useLegacyMode配置,可以控制選擇的內(nèi)存管理器實例.
如果設(shè)置為true時,選擇的實例為StaticMemoryManager實例,否則選擇UnifiedMemoryManager實例.默認情況下這個值為false.
val useLegacyMemoryManager = conf.getBoolean("spark.memory.useLegacyMode", false)
val memoryManager: MemoryManager =
if (useLegacyMemoryManager) {
new StaticMemoryManager(conf, numUsableCores)
} else {
UnifiedMemoryManager(conf, numUsableCores)
}

UnifiedMemoryManager
這個實例生成時,最大內(nèi)存的得到方法:
1,根據(jù)當前JVM的啟動內(nèi)存,減去300MB,
這個300MB可以通過spark.testing.reservedMemory配置得到.
2,最大內(nèi)存值,通過1計算出來的內(nèi)存值,與spark.memory.fraction配置的系數(shù)進行相乘.默認是0.75.
示例:如果JVM配置的內(nèi)存為1GB,那么可使用的最大內(nèi)存為(1GB-300MB)*0.75
需要的配置項:
配置項spark.memory.fraction,默認值0.75,這個配置用于配置當前的內(nèi)存管理器的最大內(nèi)存使用比例.
配置項spark.memory.storageFraction,默認值0.5,這個配置用于配置rdd的storage與cache的默認分配的內(nèi)存池大小.
配置項spark.memory.offHeap.size,默認值0,這個配置用于配置非堆內(nèi)存的大小,默認不啟用.這個不做分析.

在實例生成后,默認會根據(jù)storage的內(nèi)存權(quán)重,總內(nèi)存減去storage的內(nèi)存權(quán)重,生成兩個內(nèi)存池storageMemoryPool與onHeapExecutionMemoryPool.

onHeapExecutionMemoryPool用于在執(zhí)行executor的shuffle操作時,使用的內(nèi)存,
storageMemoryPool用于在執(zhí)行rdd的cache操作時,使用的內(nèi)存.

在Executor執(zhí)行時的內(nèi)存分配
這個操作通常是在task執(zhí)行shuffle操作時,計算spill時,在內(nèi)存中的CACHE時使用的內(nèi)存.通過調(diào)用實例中的acquireExecutionMemory函數(shù)來申請內(nèi)存.
override private[memory] def acquireExecutionMemory(
numBytes: Long,
taskAttemptId: Long,
memoryMode: MemoryMode): Long = synchronized {
這個函數(shù)傳入的memoryMode可選擇是使用堆內(nèi)存還是直接使用本地內(nèi)存,默認是使用堆內(nèi)存.
assert(onHeapExecutionMemoryPool.poolSize +
storageMemoryPool.poolSize == maxMemory)
assert(numBytes >= 0)
memoryMode match {
case MemoryMode.ON_HEAP =>
這里定義的這個函數(shù),用于判斷numBytes(需要申請的內(nèi)存大小)減去當前內(nèi)存池中可用的內(nèi)存大小是否夠用,如果不夠用,這個函數(shù)的傳入值是一個正數(shù)
def maybeGrowExecutionPool(extraMemoryNeeded: Long): Unit = {
if (extraMemoryNeeded > 0) {
這里根據(jù)當前的rdd的cache中的內(nèi)存池的大小,減去配置的storage的存儲大小,與當前storage的內(nèi)存池中的可用大小,取最大值出來,這個值表示是一個可用于回收的內(nèi)存資源.
val memoryReclaimableFromStorage =
math.max(storageMemoryPool.memoryFree,
storageMemoryPool.poolSize - storageRegionSize)
if (memoryReclaimableFromStorage > 0) {
首先根據(jù)計算出來的storage中可以進行回收的資源,通過StorageMemoryPool進行資源的釋放.得到一個完成釋放的資源大小.這里根據(jù)executor中task需要的內(nèi)存與storage可回收的資源取最小值進行資源的回收.把得到的可用資源添加到executor的內(nèi)存池中.
// Only reclaim as much space as is necessary and available:
val spaceReclaimed = storageMemoryPool.shrinkPoolToFreeSpace(
math.min(extraMemoryNeeded, memoryReclaimableFromStorage))
onHeapExecutionMemoryPool.incrementPoolSize(spaceReclaimed)
}
}
}
這個函數(shù)用于計算executor的內(nèi)存池可以使用的最大內(nèi)存大小.最小可以使用總內(nèi)存減去storage的配置權(quán)重,也就是默認情況下,shuffle的executor的內(nèi)存最小可以使用0.5的權(quán)重的內(nèi)存.
def computeMaxExecutionPoolSize(): Long = {
maxMemory - math.min(storageMemoryUsed, storageRegionSize)
}
執(zhí)行內(nèi)存的分配操作.
onHeapExecutionMemoryPool.acquireMemory(
numBytes, taskAttemptId, maybeGrowExecutionPool,
computeMaxExecutionPoolSize)

case MemoryMode.OFF_HEAP =>
  offHeapExecutionMemoryPool.acquireMemory(numBytes, taskAttemptId)

}
}

給executor中的task分配需要的內(nèi)存:
private[memory] def acquireMemory(
numBytes: Long,
taskAttemptId: Long,
maybeGrowPool: Long => Unit = (additionalSpaceNeeded: Long) => Unit,
computeMaxPoolSize: () => Long = () => poolSize): Long = lock.synchronized {
assert(numBytes > 0, s"invalid number of bytes requested: $numBytes")

// TODO: clean up this clunky method signature

if (!memoryForTask.contains(taskAttemptId)) {
這里首先檢查這個task是否在memoryForTask中存在,如果不存在時,表示這個task是第一次申請內(nèi)存,在這個集合中設(shè)置此task的當前使用內(nèi)存為0,并喚醒所有的當前的executor的等待的task.
memoryForTask(taskAttemptId) = 0L
// This will later cause waiting tasks to wake up and check numTasks again
lock.notifyAll()
}

// TODO: simplify this to limit each task to its own slot
while (true) {
執(zhí)行內(nèi)存的分配操作,這個操作會一直進行迭代,直到滿足一定的條件.
首先得到當前的executor中有申請內(nèi)存的task的個數(shù),并得到當前的task的使用內(nèi)存量.
val numActiveTasks = memoryForTask.keys.size
val curMem = memoryForTask(taskAttemptId)
計算出需要申請的內(nèi)存與當前內(nèi)存池中的內(nèi)存,是否需要對storage中的內(nèi)存進行回收.如果需要申請的內(nèi)存大于了當前內(nèi)存池的內(nèi)存,這個參數(shù)傳入為一個大于0的數(shù),這個時候會對storage的內(nèi)存進行回收.
maybeGrowPool(numBytes - memoryFree)
這里計算出executor的內(nèi)存值可以使用的最大內(nèi)存,默認情況下,最小可使用內(nèi)存為總內(nèi)存減去storage的配置內(nèi)存.也就是默認可使用50%的內(nèi)存.
val maxPoolSize = computeMaxPoolSize()
這里計算出每個task平均可使用的最大內(nèi)存大小,與最小內(nèi)存大小.
如:有5個task,可使用100MB的內(nèi)存,那么最大可使用的內(nèi)存為20MB,最小可使用的內(nèi)存為10MB.
val maxMemoryPerTask = maxPoolSize / numActiveTasks
val minMemoryPerTask = poolSize / (2 * numActiveTasks)
這里計算出當前可申請的內(nèi)存.能夠申請的內(nèi)存總量不能超過平均每個task使用內(nèi)存的平均大小.
// How much we can grant this task; keep its share within 0 <= X <= 1 / numActiveTasks
val maxToGrant = math.min(numBytes, math.max(0, maxMemoryPerTask - curMem))

val toGrant = math.min(maxToGrant, memoryFree)

這里控制迭代是否可以跳出的條件.如果可申請的內(nèi)存小于需要申請的內(nèi)存,同時當前task使用的內(nèi)存加上可申請的內(nèi)存小于每個task平均使用的內(nèi)存時,這個申請操作會wait住.等待其它的task資源回收時進行喚醒.否則跳出迭代,返回可申請的內(nèi)存.
if (toGrant < numBytes && curMem + toGrant < minMemoryPerTask) {
logInfo(s"TID $taskAttemptId waiting for at least 1/2N of
$poolName pool to be free")
lock.wait()
} else {
memoryForTask(taskAttemptId) += toGrant
return toGrant
}
}
0L // Never reached
}

在BLOCK的CACHE時的內(nèi)存分配
在執(zhí)行rdd的iterator操作時,如果對rdd執(zhí)行過cache或者persist的操作時,也就是storage的級別不是none時,會對數(shù)據(jù)進行cache操作.在cache后的block中有一個超時時間,這個超時時間在blockManager中通過一個定時器,會定時去刪除cache的block與的Broadcast數(shù)據(jù).
如果是BLOCK的CACHE的超時,可通過spark.cleaner.ttl.BLOCK_MANAGER配置.
在對RDD執(zhí)行cache操作時,最終會調(diào)用內(nèi)存管理器中的acquireStorageMemory函數(shù)來進行操作.
override def acquireStorageMemory(
blockId: BlockId,
numBytes: Long,
evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)])
: Boolean = synchronized {
這個傳入的參數(shù)中,evictedBlocks是一個用于返回的傳入?yún)?shù),這個集合中表示進行這次申請后,被淘汰掉的block的信息.
assert(onHeapExecutionMemoryPool.poolSize
+ storageMemoryPool.poolSize == maxMemory)
assert(numBytes >= 0)

如果當前的BLOCK的的CACHE的大小已經(jīng)超過了當前可用的內(nèi)存總量(總內(nèi)存減去executor的使用內(nèi)存)時,直接返回false,表示不做內(nèi)存分配.申請的內(nèi)存太多,不處理了.
if (numBytes > maxStorageMemory) {
// Fail fast if the block simply won't fit
logInfo(s"Will not store $blockId as the required space
($numBytes bytes) exceeds our " +
s"memory limit ($maxStorageMemory bytes)")
return false
}
如果當前申請的block需要的內(nèi)存大小超過了當前storage的內(nèi)存池可用的內(nèi)存大小時,在executor的內(nèi)存池中回收部分資源,原則是如果申請的內(nèi)存小于executor內(nèi)存池可用的內(nèi)存,回收申請的大小,否則回收executor所有的可用內(nèi)存.并執(zhí)行內(nèi)存的分配操作.
if (numBytes > storageMemoryPool.memoryFree) {
// There is not enough free memory in the storage pool,
// so try to borrow free memory from
// the execution pool.
val memoryBorrowedFromExecution =
Math.min(onHeapExecutionMemoryPool.memoryFree, numBytes)
onHeapExecutionMemoryPool.decrementPoolSize(memoryBorrowedFromExecution)
storageMemoryPool.incrementPoolSize(memoryBorrowedFromExecution)
}
storageMemoryPool.acquireMemory(blockId, numBytes, evictedBlocks)
}

在StorageMemoryPool中執(zhí)行block的內(nèi)存分配:
def acquireMemory(
blockId: BlockId,
numBytesToAcquire: Long,
numBytesToFree: Long,
evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)])
: Boolean = lock.synchronized {
第二個參數(shù)是需要申請的內(nèi)存,第三個參數(shù)如果是0表示可用內(nèi)存大于申請內(nèi)存,大于0表示可用內(nèi)存不夠用.
assert(numBytesToAcquire >= 0)
assert(numBytesToFree >= 0)
assert(memoryUsed <= poolSize)

這里,如果可用的內(nèi)存不夠用時,通過MemoryStore中的evictBlocksToFreeSpace函數(shù)來對當前的cache進行淘汰.
if (numBytesToFree > 0) {
memoryStore.evictBlocksToFreeSpace(Some(blockId), numBytesToFree,
evictedBlocks)
// Register evicted blocks, if any, with the active task metrics
Option(TaskContext.get()).foreach { tc =>
val metrics = tc.taskMetrics()
val lastUpdatedBlocks = metrics.updatedBlocks.getOrElse(Seq(BlockId,
BlockStatus)
)
metrics.updatedBlocks = Some(lastUpdatedBlocks ++ evictedBlocks.toSeq)
}
}

如果完成資源的回收后,當前可用的內(nèi)存大于要申請的內(nèi)存,表示申請成功,返回的值為true,否則為false.
// NOTE: If the memory store evicts blocks,
// then those evictions will synchronously call
// back into this StorageMemoryPool in order to free memory. Therefore,
// these variables
// should have been updated.
val enoughMemory = numBytesToAcquire <= memoryFree
if (enoughMemory) {
_memoryUsed += numBytesToAcquire
}
enoughMemory
}

Block的cache的淘汰
在executor的內(nèi)存池不夠使用時,或者總的內(nèi)存不夠使用時,會執(zhí)行storage的內(nèi)存池的資源回收操作.由shrinkPoolToFreeSpace函數(shù),這個函數(shù)通過調(diào)用MemoryStorage中的evictBlocksToFreeSpace函數(shù)來進行block的淘汰(如果是對block的cache時,申請內(nèi)存不夠時會直接調(diào)用這個函數(shù)來淘汰老的block)

shrinkPoolToFreeSpace函數(shù)用于在executor的內(nèi)存不夠時,需要storage的內(nèi)存池釋放資源給executor使用時調(diào)用.
這個過程中,可給executor提供的內(nèi)存分為五種可能:
1,storage默認的內(nèi)存空間還沒有使用完成,同時executor需要的空間小于等于storage的內(nèi)存池的可用空間,直接在storage的內(nèi)存池中釋放需要的大小.
2,storage默認的內(nèi)存空間還沒有使用完成,同時executor需要的空間大于storage的內(nèi)存池的可用空間,這個時候storage的可用空間全部進行釋放,但這個時候不會做block的淘汰操作.
3,storage的默認的內(nèi)存空間使用完成,這個時候storage的內(nèi)存池比默認的storage的配置權(quán)重要多,同時executor需要申請的內(nèi)存小于多出的部分,對storage內(nèi)存池中的block進行淘汰直到夠executor的申請內(nèi)存結(jié)束,這個時候storage的使用內(nèi)存還是大于storage的默認配置權(quán)重大小.
4,storage的默認的內(nèi)存空間使用完成,這個時候storage的內(nèi)存池比默認的storage的配置權(quán)重要多,同時executor需要申請的內(nèi)存大于或等于多出的部分,對storage內(nèi)存池中的block進行淘汰直到但最多只淘汰到storage的配置權(quán)重大小就結(jié)束淘汰.
5,storage剛好使用到了配置的權(quán)重,無法進行分配.

def shrinkPoolToFreeSpace(spaceToFree: Long): Long = lock.synchronized {
首先根據(jù)需要釋放的內(nèi)存,1=executor申請的內(nèi)存,2-1=storage內(nèi)存池可用的內(nèi)存,2-2=storage中占用的內(nèi)存大于默認給storage分配的權(quán)重.
這里根據(jù)這個要釋放的資源與內(nèi)存池可用的資源取最小值進行釋放,如果申請的小于可用的,不會對block進行淘汰操作,否則對block進行淘汰操作,直接淘汰到可用的內(nèi)存空間結(jié)束.
// First, shrink the pool by reclaiming free memory:
val spaceFreedByReleasingUnusedMemory = math.min(spaceToFree, memoryFree)
decrementPoolSize(spaceFreedByReleasingUnusedMemory)
val remainingSpaceToFree = spaceToFree - spaceFreedByReleasingUnusedMemory
if (remainingSpaceToFree > 0) {
// If reclaiming free memory did not adequately shrink the pool,
// begin evicting blocks:
val evictedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
memoryStore.evictBlocksToFreeSpace(None, remainingSpaceToFree, evictedBlocks)
val spaceFreedByEviction = evictedBlocks.map(_._2.memSize).sum
// When a block is released, BlockManager.dropFromMemory()
// calls releaseMemory(), so we do
// not need to decrement _memoryUsed here. However, we do need to decrement the
// pool size.
decrementPoolSize(spaceFreedByEviction)
spaceFreedByReleasingUnusedMemory + spaceFreedByEviction
} else {
spaceFreedByReleasingUnusedMemory
}
}

evictBlocksToFreeSpace函數(shù)這個函數(shù)用于對storage的內(nèi)存空間中釋放掉部分block的存儲空間的函數(shù),由MemoryStorage進行實現(xiàn).
這個函數(shù)的三個傳入?yún)?shù)中:
第一個在block的cache時,會傳入blockid,如果是executor要求釋放時,傳入為None,這個參數(shù)用于控制釋放的資源,如果傳入了blockid,那么這個block對應(yīng)的rdd的所有的所有的CACHE都會被保留,只釋放其它的RDD對應(yīng)的BLOCK的CACHE,如果傳入為None時,不區(qū)分BLOCK,從頭開始迭代,直接釋放到需要的內(nèi)存大小結(jié)束.
第二個是需要釋放的內(nèi)存大小.
第三個參數(shù)是釋放后的block的集合,這個集合內(nèi)容就是從內(nèi)存中淘汰出去的block.
private[spark] def evictBlocksToFreeSpace(
blockId: Option[BlockId],
space: Long,
droppedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = {
assert(space > 0)
memoryManager.synchronized {
var freedMemory = 0L
這里得到傳入的blockid對應(yīng)的rdd.如果傳入的blockId是None時,這個rdd也就不存在.
val rddToAdd = blockId.flatMap(getRddId)
val selectedBlocks = new ArrayBuffer[BlockId]
// This is synchronized to ensure that the set of entries is not changed
// (because of getValue or getBytes) while traversing the iterator, as that
// can lead to exceptions.
entries.synchronized {
val iterator = entries.entrySet().iterator()
這里從所有的cache的block中進行迭代,如果迭代的block的rdd不是現(xiàn)在需要cache的block對應(yīng)的rdd時(傳入的blockId對應(yīng)的RDD),就選擇這個block.并釋放內(nèi)存大小.
while (freedMemory < space && iterator.hasNext) {
val pair = iterator.next()
val blockId = pair.getKey
if (rddToAdd.isEmpty || rddToAdd != getRddId(blockId)) {
selectedBlocks += blockId
freedMemory += pair.getValue.size
}
}
}

if (freedMemory >= space) {
  logInfo(s"${selectedBlocks.size} blocks selected for dropping")
  for (blockId <- selectedBlocks) {

這里對選擇的block,通過blockManager釋放掉block的cache占用的內(nèi)存.如果這個block的cache的級別中包含有disk的級別時,釋放掉內(nèi)存的同時會把這個cache的數(shù)據(jù)寫入到磁盤中.
把執(zhí)行釋放后的block的集合添加到傳入?yún)?shù)的droppedBlocks的集合參數(shù)中,用于數(shù)據(jù)的返回.

    val entry = entries.synchronized { entries.get(blockId) }
    // This should never be null as only one task should be dropping
    // blocks and removing entries. However the check is still here for
    // future safety.
    if (entry != null) {
      val data = if (entry.deserialized) {
        Left(entry.value.asInstanceOf[Array[Any]])
      } else {
        Right(entry.value.asInstanceOf[ByteBuffer].duplicate())
      }
      val droppedBlockStatus = blockManager.dropFromMemory(blockId, data)
      droppedBlockStatus.foreach { status => droppedBlocks += 
               ((blockId, status)) }
    }
  }
  true
} else {
  blockId.foreach { id =>
    logInfo(s"Will not store $id as it would require dropping another block " +
      "from the same RDD")
  }
  false
}

}
}
StaticMemoryManager
這個實例在1.6的環(huán)境下,需要把配置項spark.memory.useLegacyMode設(shè)置為true時,才會被啟用.下面首先先看看這個實例生成時的處理:
需要的配置項:
1,配置項spark.shuffle.memoryFraction,用于設(shè)置executor的shuffle操作可使用的內(nèi)存,默認占總內(nèi)存的0.2.
2,配置項spark.shuffle.safetyFraction,用于設(shè)置executor的shuffle的安全操作內(nèi)存,默認占1配置內(nèi)存的0.8.
3,配置項spark.storage.memoryFraction,用于設(shè)置block cache的使用內(nèi)存,默認占總內(nèi)存的0.6;
4,配置項spark.storage.safetyFraction,用于設(shè)置block cache的安全使用內(nèi)存,默認占3配置內(nèi)存的0.9;
5,配置項spark.storage.unrollFraction,默認值是storage內(nèi)存總大于的0.2;這個有于在storage中cache的block的數(shù)據(jù)的反序列化時數(shù)據(jù)的展開使用空間.

def this(conf: SparkConf, numCores: Int) {
this(
conf,
StaticMemoryManager.getMaxExecutionMemory(conf),
StaticMemoryManager.getMaxStorageMemory(conf),
numCores)
}

在Executor執(zhí)行時的內(nèi)存分配
在StaticMemoryManager中,對executor中的shuffle的內(nèi)存執(zhí)行分配這塊其實并沒有統(tǒng)一內(nèi)存管理中那么麻煩,只是在分配的固定大小的存儲空間中進行分配,如果無法再進行分配時,這個分配函數(shù)返回的分配量就是0.
private[memory] override def acquireExecutionMemory(
numBytes: Long,
taskAttemptId: Long,
memoryMode: MemoryMode): Long = synchronized {

OFF_HEAP的模式這里就不分析了,我在代碼里沒發(fā)現(xiàn)有地方去調(diào)用,好像申請內(nèi)存時,是直接寫死的ON_HEAP的模式.在這個地方,不會考慮executor的內(nèi)存池中的內(nèi)存是否夠用,直接通過ExecutionMemoryPool內(nèi)存池實例中的分配內(nèi)存函數(shù)進行內(nèi)存的分配 .

memoryMode match {
case MemoryMode.ON_HEAP => onHeapExecutionMemoryPool.acquireMemory(numBytes,
taskAttemptId)
case MemoryMode.OFF_HEAP => offHeapExecutionMemoryPool.acquireMemory(numBytes,
taskAttemptId)
}
}

內(nèi)存分配部分的代碼實現(xiàn):這個部分與統(tǒng)一內(nèi)存管理部分是一樣的,
private[memory] def acquireMemory(
numBytes: Long,
taskAttemptId: Long,
maybeGrowPool: Long => Unit = (additionalSpaceNeeded: Long) => Unit,
computeMaxPoolSize: () => Long = () => poolSize): Long = lock.synchronized {
assert(numBytes > 0, s"invalid number of bytes requested: $numBytes")

// TODO: clean up this clunky method signature

首先如果說task是第一次申請內(nèi)存,添加這個task到內(nèi)存池的集合屬性中,并把這個task的使用內(nèi)存設(shè)置為0.
if (!memoryForTask.contains(taskAttemptId)) {
memoryForTask(taskAttemptId) = 0L
// This will later cause waiting tasks to wake up and check numTasks again
lock.notifyAll()
}

下面開始迭代進行內(nèi)存的分配.加上while的目的是為了保持如果task的分配內(nèi)存達不到指定的大小時,就一直等待分配,直到達到指定的大小.
// TODO: simplify this to limit each task to its own slot
while (true) {
val numActiveTasks = memoryForTask.keys.size
val curMem = memoryForTask(taskAttemptId)

在這里,這個函數(shù)是一個空的實現(xiàn),什么都不會做.
maybeGrowPool(numBytes - memoryFree)

這個函數(shù)得到的值就是當前的executor的內(nèi)存池的poolsize的大小.
val maxPoolSize = computeMaxPoolSize()
根據(jù)當前的活動的task的個數(shù)計算出每個task可使用的最大內(nèi)存,每個task使用的最小內(nèi)存為最大內(nèi)存除以2(如果申請的內(nèi)存本身小于這個最小內(nèi)存除外).
val maxMemoryPerTask = maxPoolSize / numActiveTasks
val minMemoryPerTask = poolSize / (2 * numActiveTasks)
計算出這次需要分配的內(nèi)存,如果申請的內(nèi)存小于可用的內(nèi)存時,取申請內(nèi)存,否則取這個task可申請的最大內(nèi)存
// How much we can grant this task; keep its share within 0 <= X <= 1 / numActiveTasks
val maxToGrant = math.min(numBytes, math.max(0, maxMemoryPerTask - curMem))

這里計算出來的值根據(jù)當前原則上可以申請的內(nèi)存與當前內(nèi)存池中的可用內(nèi)存取最小值.
val toGrant = math.min(maxToGrant, memoryFree)
這里有一個線程wait的條件,如果這一次申請的內(nèi)存小于需要申請的內(nèi)存,同時當前的task的使用內(nèi)存小于最小的使用內(nèi)存時,線程wait,等待其它的task釋放內(nèi)存或者有新的task加入來喚醒此wait.
// We want to let each task get at least 1 / (2 * numActiveTasks) before blocking;
// if we can't give it this much now, wait for other tasks to free up memory
// (this happens if older tasks allocated lots of memory before N grew)
if (toGrant < numBytes && curMem + toGrant < minMemoryPerTask) {
logInfo(s"TID $taskAttemptId waiting for at least 1/2N of
$poolName pool to be free")
lock.wait()
} else {
memoryForTask(taskAttemptId) += toGrant
return toGrant
}
}
0L // Never reached
}

Storage的展開內(nèi)存分配
這里說明下,在UnifiedMemoryManager中展開內(nèi)存的分配與stroage中block cache的內(nèi)存分配共用相同的內(nèi)存空間,因此申請方法與storage的block cache的內(nèi)存分配相同,而在static的分配中,不同的區(qū)塊,所使用的內(nèi)存空間都是固定的,因此這里需要獨立說明一下.
在對MemoryStorage中執(zhí)行block的cache操作時,會執(zhí)行pubInterator等操作,會先根據(jù)block中的數(shù)據(jù)申請對應(yīng)數(shù)據(jù)大小的展開內(nèi)存空間,把數(shù)據(jù)進行提取,然后才會執(zhí)行storage的cache操作的內(nèi)存分配.
執(zhí)行流程,在MemoryStorage中:
1,putIterator函數(shù)執(zhí)行,對block進行cache
2,unrollSafely函數(shù)執(zhí)行,申請展開內(nèi)存,根據(jù)block的內(nèi)容大小.
3,釋放申請的展開內(nèi)存,并申請block cache內(nèi)存,執(zhí)行putArray->tryToPut函數(shù).

看看unrollSafely函數(shù)如何處理展開內(nèi)存的申請:
這里先配置項spark.storage.unrollMemoryThreshold,默認值是1MB,先申請固定大小的展開內(nèi)存,這個函數(shù)返回的值是一個true/false的值,true表示申請成功.這個函數(shù)調(diào)用內(nèi)存管理器中的acquireUnrollMemory函數(shù).這里申請到的內(nèi)存大小會先記錄到unrollMemoryMap集合中根據(jù)對應(yīng)的taskid.
keepUnrolling = reserveUnrollMemoryForThisTask(blockId, initialMemoryThreshold,
droppedBlocks)

接下來,迭代block中的數(shù)據(jù),把數(shù)據(jù)添加到vector的展開臨時變量中.
while (values.hasNext && keepUnrolling) {
vector += values.next()
if (elementsUnrolled % memoryCheckPeriod == 0) {
// If our vector's size has exceeded the threshold, request more memory
val currentSize = vector.estimateSize()
if (currentSize >= memoryThreshold) {
這里每次申請當前的使用內(nèi)存的一半做為展開內(nèi)存,這個展開內(nèi)存伴隨著block的數(shù)據(jù)越多,申請的量也會越大.
val amountToRequest = (currentSize * memoryGrowthFactor -
memoryThreshold).toLong
keepUnrolling = reserveUnrollMemoryForThisTask(
blockId, amountToRequest, droppedBlocks)
// New threshold is currentSize * memoryGrowthFactor
memoryThreshold += amountToRequest
}
}
elementsUnrolled += 1
}
這里的判斷比較關(guān)鍵,如果keepUnrolling的值為true,表示內(nèi)存能夠安全展開這個block的數(shù)據(jù),否則表示不能展開這個block的內(nèi)容.
if (keepUnrolling) {
// We successfully unrolled the entirety of this block
Left(vector.toArray)
} else {
// We ran out of space while unrolling the values for this block
logUnrollFailureMessage(blockId, vector.estimateSize())
Right(vector.iterator ++ values)
}

if (keepUnrolling) {
val taskAttemptId = currentTaskAttemptId()
memoryManager.synchronized {
這里如果內(nèi)存能夠安全展開當前的block,把這個block的展開內(nèi)存存儲到pendingUnrollMemoryMap的集合中對應(yīng)此task的位置.
// Since we continue to hold onto the array until we actually cache it, we cannot
// release the unroll memory yet. Instead, we transfer it to pending unroll memory
// so tryToPut can further transfer it to normal storage memory later.
// TODO: we can probably express this without pending unroll memory (SPARK-10907)
val amountToTransferToPending = currentUnrollMemoryForThisTask -
previousMemoryReserved
unrollMemoryMap(taskAttemptId) -= amountToTransferToPending
pendingUnrollMemoryMap(taskAttemptId) =
pendingUnrollMemoryMap.getOrElse(taskAttemptId, 0L) +
amountToTransferToPending
}
}

提示:關(guān)于展開內(nèi)存的釋放部分,如果block的內(nèi)容能夠被安全展開存儲到內(nèi)存中時,這個時候,在做block的storage的操作時,會釋放掉展開內(nèi)存的空間(在pendingUnrollMemoryMap集合中),如果內(nèi)存不能夠安全展開block的內(nèi)容時,這個時候無法進行block的cache操作(可能會寫磁盤),這時申請的內(nèi)容大小存儲在unrollMemoryMap集合中,這時由于不會執(zhí)行block的memory的cache操作,因此這個集合中占用的內(nèi)存大小暫時不會被回收,只有等到這個task結(jié)束時,占用的unrollMemoryMap集合中的內(nèi)存才會被回收.
...

接下來看看在StaticMemoryManager中如何處理展開內(nèi)存的分配:
override def acquireUnrollMemory(
blockId: BlockId,
numBytes: Long,
evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)])
: Boolean = synchronized {
val currentUnrollMemory = storageMemoryPool.memoryStore.currentUnrollMemory
val freeMemory = storageMemoryPool.memoryFree
這里根據(jù)可用的最大展開內(nèi)存與當前正在使用中的展開內(nèi)存,計算出可以申請的最大展開內(nèi)存,如果這里得到的值是一個0時,表示不需要釋放block cache的內(nèi)存,如果是一個大于0的值,就表示需要釋放BLOCK CACHE的內(nèi)存.
// When unrolling, we will use all of the existing free memory, and, if necessary,
// some extra space freed from evicting cached blocks. We must place a cap on the
// amount of memory to be evicted by unrolling, however, otherwise unrolling one
// big block can blow away the entire cache.
val maxNumBytesToFree = math.max(0, maxUnrollMemory - currentUnrollMemory -
freeMemory)
這里計算出需要釋放的內(nèi)存,取申請的資源與可以使用的unroll內(nèi)存資源的最小值,如果這個一個大于0的值,表示需要從storage的內(nèi)存池中釋放這么多的內(nèi)存出來.
// Keep it within the range 0 <= X <= maxNumBytesToFree
val numBytesToFree = math.max(0, math.min(maxNumBytesToFree,
numBytes - freeMemory))
storageMemoryPool.acquireMemory(blockId, numBytes, numBytesToFree,
evictedBlocks)
}

Storage中block cache的內(nèi)存分配
在block使用了memory的storage時,同時block的內(nèi)容能夠被展開內(nèi)存存儲起來時,會通過MemoryStorage中對應(yīng)的函數(shù)來向StaticMemoryManager中的acquireStorageMemory函數(shù)申請內(nèi)存資源.
override def acquireStorageMemory(
blockId: BlockId,
numBytes: Long,
evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)])
: Boolean = synchronized {
if (numBytes > maxStorageMemory) {
如果一個block的內(nèi)容太大,已經(jīng)超過了配置的storage的存儲空間大小,這個block不做cache.
// Fail fast if the block simply won't fit
logInfo(s"Will not store $blockId as the required space
($numBytes bytes) exceeds our " +
s"memory limit ($maxStorageMemory bytes)")
false
} else {
否則通過storage的內(nèi)存池執(zhí)行block的cache的內(nèi)存申請,這個過程中如果內(nèi)存不夠用時,會釋放老的block的cache對應(yīng)的內(nèi)存空間,也就是會淘汰掉老的block cache,
storageMemoryPool.acquireMemory(blockId, numBytes, evictedBlocks)
}
}

在storage的內(nèi)存池中處理block cache的內(nèi)存申請:
def acquireMemory(
blockId: BlockId,
numBytesToAcquire: Long,
numBytesToFree: Long,
evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)])
: Boolean = lock.synchronized {
這個函數(shù)的傳入?yún)?shù)中,numBytesToAcquire表示需要申請的內(nèi)存大小,numBytesToFree如果是一個大于0的值,表示現(xiàn)在內(nèi)存池中的內(nèi)存空間不夠,需要淘汰現(xiàn)有的block的cache.
assert(numBytesToAcquire >= 0)
assert(numBytesToFree >= 0)
assert(memoryUsed <= poolSize)

這里先判斷,如果申請的內(nèi)存大于還在可用的內(nèi)存,需要先淘汰掉部分block cache來釋放空間.
if (numBytesToFree > 0) {
memoryStore.evictBlocksToFreeSpace(Some(blockId), numBytesToFree,
evictedBlocks)
// Register evicted blocks, if any, with the active task metrics
Option(TaskContext.get()).foreach { tc =>
val metrics = tc.taskMetrics()
val lastUpdatedBlocks = metrics.updatedBlocks.getOrElse(Seq(BlockId,
BlockStatus)
)
metrics.updatedBlocks = Some(lastUpdatedBlocks ++ evictedBlocks.toSeq)
}
}

分配內(nèi)存是否成功,也就是要申請的內(nèi)存小于或等于可用的內(nèi)存空間,最后把分配的內(nèi)存添加到使用的內(nèi)存空間中.表示這部分內(nèi)存已經(jīng)被look住.
val enoughMemory = numBytesToAcquire <= memoryFree
if (enoughMemory) {
_memoryUsed += numBytesToAcquire
}
enoughMemory
}

Storage處理block cache的淘汰
在storage中內(nèi)存不夠使用時,通過memoryStorage去執(zhí)行block的淘汰,并把淘汰后的block返回通知上層的調(diào)用端.
if (numBytesToFree > 0) {
memoryStore.evictBlocksToFreeSpace(Some(blockId), numBytesToFree, evictedBlocks)
// Register evicted blocks, if any, with the active task metrics
Option(TaskContext.get()).foreach { tc =>
val metrics = tc.taskMetrics()
val lastUpdatedBlocks = metrics.updatedBlocks.getOrElse(Seq(BlockId,
BlockStatus)
)
metrics.updatedBlocks = Some(lastUpdatedBlocks ++ evictedBlocks.toSeq)
}
}

MemoryStore中處理對cache的淘汰:
對block的cache進行淘汰的處理函數(shù),傳入?yún)?shù)中,第二個參數(shù)是需要釋放的空間,第三個參數(shù)是被淘汰后的block的集合用于返回.
private[spark] def evictBlocksToFreeSpace(
blockId: Option[BlockId],
space: Long,
droppedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = {
assert(space > 0)
memoryManager.synchronized {
var freedMemory = 0L
這里首先先得到傳入的block對應(yīng)的rdd的id.得到這個rdd_id的目的是淘汰block時,如果發(fā)現(xiàn)是這個rdd的block時,不進行淘汰.
val rddToAdd = blockId.flatMap(getRddId)
val selectedBlocks = new ArrayBuffer[BlockId]
// This is synchronized to ensure that the set of entries is not changed
// (because of getValue or getBytes) while traversing the iterator, as that
// can lead to exceptions.
entries.synchronized {
這里開始對storage的內(nèi)存中所有的cache進行迭代,這個迭代從最先進行cache的block開始,如果迭代到的block對應(yīng)的RDD不是傳入的BLOCK對應(yīng)的RDD時,把這個BLOCK添加到選擇的BLOCK的集合中,并計算當前內(nèi)存池中的內(nèi)存是否達到需要的內(nèi)存空間,如果達到,停止選擇BLOCK的操作.
val iterator = entries.entrySet().iterator()
while (freedMemory < space && iterator.hasNext) {
val pair = iterator.next()
val blockId = pair.getKey
if (rddToAdd.isEmpty || rddToAdd != getRddId(blockId)) {
selectedBlocks += blockId
freedMemory += pair.getValue.size
}
}
}

if (freedMemory >= space) {

如果流程執(zhí)行到這里,說明內(nèi)存空間釋放成功,現(xiàn)在可用的內(nèi)存空間已經(jīng)達到需要的內(nèi)存空間的大小,把選擇的BLOCK對應(yīng)的CACHE通過BLOCKMANAGER從內(nèi)存中進行釋放.并把釋放后的BLOCK添加到droppedBlocks的集合中,這個集合用于返回結(jié)果,表示這次空間的釋放時,這些BLOCK已經(jīng)從CACHE中稱出.
logInfo(s"${selectedBlocks.size} blocks selected for dropping")
for (blockId <- selectedBlocks) {
val entry = entries.synchronized { entries.get(blockId) }
// This should never be null as only one task should be dropping
// blocks and removing entries. However the check is still here for
// future safety.
if (entry != null) {
val data = if (entry.deserialized) {
Left(entry.value.asInstanceOf[Array[Any]])
} else {
Right(entry.value.asInstanceOf[ByteBuffer].duplicate())
}
val droppedBlockStatus = blockManager.dropFromMemory(blockId, data)
droppedBlockStatus.foreach { status => droppedBlocks += ((blockId,
status)) }
}
}
true
} else {
流程執(zhí)行到這里,表示STORAGE的內(nèi)存空間中無法釋放出更多的內(nèi)存,也就相當于是釋放空間失敗求.
blockId.foreach { id =>
logInfo(s"Will not store $id as it would require dropping another block " +
"from the same RDD")
}
false
}
}
}

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末汞窗,一起剝皮案震驚了整個濱河市池凄,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌致稀,老刑警劉巖,帶你破解...
    沈念sama閱讀 219,039評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異温自,居然都是意外死亡玄货,警方通過查閱死者的電腦和手機皇钞,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,426評論 3 395
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來松捉,“玉大人夹界,你說我怎么就攤上這事“溃” “怎么了可柿?”我有些...
    開封第一講書人閱讀 165,417評論 0 356
  • 文/不壞的土叔 我叫張陵,是天一觀的道長丙者。 經(jīng)常有香客問我复斥,道長,這世上最難降的妖魔是什么械媒? 我笑而不...
    開封第一講書人閱讀 58,868評論 1 295
  • 正文 為了忘掉前任目锭,我火速辦了婚禮,結(jié)果婚禮上纷捞,老公的妹妹穿的比我還像新娘痢虹。我一直安慰自己,他們只是感情好主儡,可當我...
    茶點故事閱讀 67,892評論 6 392
  • 文/花漫 我一把揭開白布奖唯。 她就那樣靜靜地躺著,像睡著了一般糜值。 火紅的嫁衣襯著肌膚如雪丰捷。 梳的紋絲不亂的頭發(fā)上坯墨,一...
    開封第一講書人閱讀 51,692評論 1 305
  • 那天,我揣著相機與錄音病往,去河邊找鬼畅蹂。 笑死,一個胖子當著我的面吹牛荣恐,可吹牛的內(nèi)容都是我干的液斜。 我是一名探鬼主播,決...
    沈念sama閱讀 40,416評論 3 419
  • 文/蒼蘭香墨 我猛地睜開眼叠穆,長吁一口氣:“原來是場噩夢啊……” “哼少漆!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起硼被,我...
    開封第一講書人閱讀 39,326評論 0 276
  • 序言:老撾萬榮一對情侶失蹤示损,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后嚷硫,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體检访,經(jīng)...
    沈念sama閱讀 45,782評論 1 316
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,957評論 3 337
  • 正文 我和宋清朗相戀三年仔掸,在試婚紗的時候發(fā)現(xiàn)自己被綠了脆贵。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 40,102評論 1 350
  • 序言:一個原本活蹦亂跳的男人離奇死亡起暮,死狀恐怖卖氨,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情负懦,我是刑警寧澤筒捺,帶...
    沈念sama閱讀 35,790評論 5 346
  • 正文 年R本政府宣布,位于F島的核電站纸厉,受9級特大地震影響系吭,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜颗品,卻給世界環(huán)境...
    茶點故事閱讀 41,442評論 3 331
  • 文/蒙蒙 一肯尺、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧抛猫,春花似錦蟆盹、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,996評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至,卻和暖如春寨昙,著一層夾襖步出監(jiān)牢的瞬間讥巡,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,113評論 1 272
  • 我被黑心中介騙來泰國打工舔哪, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留欢顷,地道東北人。 一個月前我還...
    沈念sama閱讀 48,332評論 3 373
  • 正文 我出身青樓捉蚤,卻偏偏與公主長得像抬驴,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子缆巧,可洞房花燭夜當晚...
    茶點故事閱讀 45,044評論 2 355

推薦閱讀更多精彩內(nèi)容