spark源碼閱讀之storage模塊②

spark源碼閱讀之storage模塊①中烹笔,描繪了Storage模塊的整體框架是標準的master-slave框架:master用來管理slave的元數(shù)據(jù)信息尿孔,slave則是具體存儲數(shù)據(jù)祠汇,分析了作為master節(jié)點的BlockManagerMasterEndpoint和作為slave節(jié)點的BlockManagerSlaveEndpoint之間如何傳遞消息。
這篇文章中將分析數(shù)據(jù)Block存儲的具體過程谢床,分析它是如何實現(xiàn)的
本篇文章源碼基于spark 1.6.3

存儲級別

緩存RDD有兩個方法作谭,cache()和persist()源哩,而cache方法底層調(diào)用的還是persist方法,只不過cache方法傳入了默認的參數(shù)鸦做,算是persist的一個快捷操作励烦。

persist的構(gòu)造方法中可以傳入存儲級別,如下所示:

def persist(newLevel: StorageLevel): this.type = {
  if (isLocallyCheckpointed) {
    // This means the user previously called localCheckpoint(), which should have already
    // marked this RDD for persisting. Here we should override the old storage level with
    // one that is explicitly requested by the user (after adapting it to use disk).
    persist(LocalRDDCheckpointData.transformStorageLevel(newLevel), allowOverride = true)
  } else {
    persist(newLevel, allowOverride = false)
  }
}

這個StorageLevel參數(shù)就是spark中的存儲級別泼诱,代表storage的數(shù)據(jù)以什么方式存入什么媒介中

StorageLevel的構(gòu)造方法如下:

class StorageLevel private(
    private var _useDisk: Boolean,
    private var _useMemory: Boolean,
    private var _useOffHeap: Boolean,
    private var _deserialized: Boolean,
    private var _replication: Int = 1)
  extends Externalizable {...}

五個參數(shù)分別代表:

_useDisk:是否使用磁盤
_useMemory:是否使用內(nèi)存
_useOffHeap:是否使用堆外存儲
_deserialized:是否序列化
_replication:副本個數(shù)

除了_useOffHeap外坛掠,其他參數(shù)可以隨意配合使用,使用方法如下:

object StorageLevel {
  val NONE = new StorageLevel(false, false, false, false)
  val DISK_ONLY = new StorageLevel(true, false, false, false)
  val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
  val MEMORY_ONLY = new StorageLevel(false, true, false, true)
  val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
  val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
  val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
  val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
  val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
  val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
  val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
  val OFF_HEAP = new StorageLevel(false, false, true, false)
  /**
   * :: DeveloperApi ::
   * Return the StorageLevel object with the specified name.
   */
  @DeveloperApi
  def fromString(s: String): StorageLevel = s match {
    case "NONE" => NONE   //不保存任何數(shù)據(jù)
    case "DISK_ONLY" => DISK_ONLY   //僅保存在磁盤
    case "DISK_ONLY_2" => DISK_ONLY_2   //僅保存在磁盤,備份一份
    case "MEMORY_ONLY" => MEMORY_ONLY   //僅保存在內(nèi)存
    case "MEMORY_ONLY_2" => MEMORY_ONLY_2   //僅保存在內(nèi)存,備份一份
    case "MEMORY_ONLY_SER" => MEMORY_ONLY_SER   //僅保存在內(nèi)存执桌,保存序列化后的對象
    case "MEMORY_ONLY_SER_2" => MEMORY_ONLY_SER_2   //僅保存在內(nèi)存爪喘,保存序列化后的對象,備份一份
    case "MEMORY_AND_DISK" => MEMORY_AND_DISK   //優(yōu)先保存在內(nèi)存,溢出部分保存在磁盤
    case "MEMORY_AND_DISK_2" => MEMORY_AND_DISK_2   //同上,備份一份
    case "MEMORY_AND_DISK_SER" => MEMORY_AND_DISK_SER   //優(yōu)先保存在內(nèi)存,溢出部分保存在磁盤域滥,保存序列化后的結(jié)果
    case "MEMORY_AND_DISK_SER_2" => MEMORY_AND_DISK_SER_2   //同上,備份一份
    case "OFF_HEAP" => OFF_HEAP   //保存在堆外存儲
    case _ => throw new IllegalArgumentException(s"Invalid StorageLevel: $s")
  }

即在persist方法中傳入對應(yīng)的字符串即可指定存儲級別

存儲級別的選擇:

一般來說

  1. 優(yōu)先使用內(nèi)存MEMORY_ONLY蜈抓,如果內(nèi)存不夠可以加上序列化MEMORY_ONLY_SER骗绕,當然也需要衡量序列化帶來的cpu消耗
  2. 盡量不要使用磁盤,因為磁盤IO消耗的時間遠大于內(nèi)存资昧,迫不得已重算partition數(shù)據(jù)可能都要更優(yōu)酬土,除非計算邏輯復雜,且內(nèi)存放不下數(shù)據(jù)集格带,或者你安裝的是SSD盤撤缴,可以考慮采用MEMORY_AND_DISK
  3. 副本機制的作用相較于容錯其實更偏向于效率,因為在spark中丟失的數(shù)據(jù)可以重算叽唱,且數(shù)據(jù)源一般都有副本機制(如HDFS)屈呕,那么增加一個副本的理由可能就是避免重算,提高效率

存儲細節(jié)

RDD的persist只有在觸發(fā)一個action操作(比如count)的時候才會真正實施棺亭,然后通過一系列操作虎眨,最后會在Task中調(diào)用RDD的iterator()方法來執(zhí)行計算,以下是iterator方法的代碼:

final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
  if (storageLevel != StorageLevel.NONE) {    // 如果存儲級別不是NONE,就從cacheManager中獲取數(shù)據(jù)
    SparkEnv.get.cacheManager.getOrCompute(this, split, context, storageLevel)
  } else {    // 否則就讀取checkpoint镶摘,再否則就重新計算
    computeOrReadCheckpoint(split, context)
  }
}

如果存儲級別不為NONE嗽桩,那么會調(diào)用CacheManager的getOrCompute方法,如果有緩存則讀取凄敢,如果沒有則計算并按照存儲級別將數(shù)據(jù)寫入緩存碌冶,CacheManager相當于BlockManager的包裝類,用來管理緩存內(nèi)容涝缝,繼續(xù)看CacheManager的getOrCompute方法:

def getOrCompute[T](
    rdd: RDD[T],
    partition: Partition,
    context: TaskContext,
    storageLevel: StorageLevel): Iterator[T] = {
  val key = RDDBlockId(rdd.id, partition.index) //獲取blockid
  logDebug(s"Looking for partition $key")
  blockManager.get(key) match {   //向BlockManager查詢是否有緩存
    case Some(blockResult) => //緩存命中
      // Partition is already materialized, so just return its values
      val existingMetrics = context.taskMetrics
        .getInputMetricsForReadMethod(blockResult.readMethod)   //更新統(tǒng)計信息
      existingMetrics.incBytesRead(blockResult.bytes)
      //將緩存作為結(jié)果返回
      val iter = blockResult.data.asInstanceOf[Iterator[T]]
      new InterruptibleIterator[T](context, iter) {
        override def next(): T = {
          existingMetrics.incRecordsRead(1)
          delegate.next()
        }
      }
    case None =>    //沒有命中緩存扑庞,需要計算
      // Acquire a lock for loading this partition
      // If another thread already holds the lock, wait for it to finish return its results
      val storedValues = acquireLockForPartition[T](key)    //申請一個鎖來加載這個分區(qū)的數(shù)據(jù)
      if (storedValues.isDefined) {   //如果這部分數(shù)據(jù)已經(jīng)被計算過直接返回結(jié)果
        return new InterruptibleIterator[T](context, storedValues.get)
      }
      // Otherwise, we have to load the partition ourselves
      //如果沒有被計算過譬重,我們需要重新計算這部分數(shù)據(jù)
      try {
        logInfo(s"Partition $key not found, computing it")
        //如果被checkpoint過則讀取checkpoint的數(shù)據(jù),否則就計算
        val computedValues = rdd.computeOrReadCheckpoint(partition, context)
        // If the task is running locally, do not persist the result
        //如果這個task是在driver端執(zhí)行的話就直接返回結(jié)果
        if (context.isRunningLocally) {
          return computedValues
        }
        // Otherwise, cache the values and keep track of any updates in block statuses
        //如果是在executor端執(zhí)行的話就需要更新緩存信息
        val updatedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]   //將計算結(jié)果寫入BlockManager中
        val cachedValues = putInBlockManager(key, computedValues, storageLevel, updatedBlocks)
        val metrics = context.taskMetrics   //更新任務(wù)的統(tǒng)計信息
        val lastUpdatedBlocks = metrics.updatedBlocks.getOrElse(Seq[(BlockId, BlockStatus)]())
        metrics.updatedBlocks = Some(lastUpdatedBlocks ++ updatedBlocks.toSeq)
        new InterruptibleIterator(context, cachedValues)
      } finally {
        loading.synchronized {
          //如果有其他的線程在等待改分區(qū)的處理結(jié)果罐氨,那么通知它們已經(jīng)計算完成
          //結(jié)果已經(jīng)儲存到BlockManager中
          loading.remove(key)
          loading.notifyAll()
        }
      }
  }
}

以上代碼中臀规,數(shù)據(jù)在計算之前會反復確認是否存在緩存中,最后也會調(diào)用RDD的computeOrReadCheckpoint方法來計算這部分數(shù)據(jù):
val computedValues = rdd.computeOrReadCheckpoint(partition, context)
computeOrReadCheckpoint方法中會遞歸的調(diào)用當前RDD的parentRDD的iterator方法栅隐,最后會調(diào)用不同RDD類別的compute方法來計算數(shù)據(jù):

private[spark] def computeOrReadCheckpoint(split: Partition, context: TaskContext): Iterator[T] =
{
  if (isCheckpointedAndMaterialized) {
    firstParent[T].iterator(split, context)
  } else {
    compute(split, context)
  }
}

拿到計算數(shù)據(jù)后會調(diào)用putInBlockManager方法將計算結(jié)果寫入到BlockManager中

private def putInBlockManager[T](
    key: BlockId,
    values: Iterator[T],
    level: StorageLevel,
    updatedBlocks: ArrayBuffer[(BlockId, BlockStatus)],
    effectiveStorageLevel: Option[StorageLevel] = None): Iterator[T] = {
  val putLevel = effectiveStorageLevel.getOrElse(level)   //獲取存儲級別
  if (!putLevel.useMemory) {  //如果沒有使用內(nèi)存的存儲級別塔嬉,可以直接寫入BlockManager
    /*
     * This RDD is not to be cached in memory, so we can just pass the computed values as an
     * iterator directly to the BlockManager rather than first fully unrolling it in memory.
     */
    updatedBlocks ++=
      blockManager.putIterator(key, values, level, tellMaster = true, effectiveStorageLevel)
    blockManager.get(key) match {
      case Some(v) => v.data.asInstanceOf[Iterator[T]]
      case None =>
        logInfo(s"Failure to store $key")
        throw new BlockException(key, s"Block manager failed to return cached value for $key!")
    }
  } else {  //否則就在內(nèi)存中展開數(shù)據(jù)
    /*
     * This RDD is to be cached in memory. In this case we cannot pass the computed values
     * to the BlockManager as an iterator and expect to read it back later. This is because
     * we may end up dropping a partition from memory store before getting it back.
     *
     * In addition, we must be careful to not unroll the entire partition in memory at once.
     * Otherwise, we may cause an OOM exception if the JVM does not have enough space for this
     * single partition. Instead, we unroll the values cautiously, potentially aborting and
     * dropping the partition to disk if applicable.
     */
    blockManager.memoryStore.unrollSafely(key, values, updatedBlocks) match {
      case Left(arr) =>
        // We have successfully unrolled the entire partition, so cache it in memory
        updatedBlocks ++=
          blockManager.putArray(key, arr, level, tellMaster = true, effectiveStorageLevel)
        arr.iterator.asInstanceOf[Iterator[T]]
      case Right(it) =>
        // There is not enough space to cache this partition in memory
        val returnValues = it.asInstanceOf[Iterator[T]]
        if (putLevel.useDisk) {
          logWarning(s"Persisting partition $key to disk instead.")
          val diskOnlyLevel = StorageLevel(useDisk = true, useMemory = false,
            useOffHeap = false, deserialized = false, putLevel.replication)
          putInBlockManager[T](key, returnValues, level, updatedBlocks, Some(diskOnlyLevel))
        } else {
          returnValues
        }
    }
  }
}

代碼中,如果獲取的存儲級別沒有memory约啊,那么就調(diào)用BlockManager的putIterator方法將計算結(jié)果直接寫入磁盤邑遏,否則就調(diào)用BlockManager的putArray方法將計算結(jié)果在內(nèi)存中展開佣赖。

def putIterator(
    blockId: BlockId,
    values: Iterator[Any],
    level: StorageLevel,
    tellMaster: Boolean = true,
    effectiveStorageLevel: Option[StorageLevel] = None): Seq[(BlockId, BlockStatus)] = {
  require(values != null, "Values is null")
  doPut(blockId, IteratorValues(values), level, tellMaster, effectiveStorageLevel)
}

def putArray(
    blockId: BlockId,
    values: Array[Any],
    level: StorageLevel,
    tellMaster: Boolean = true,
    effectiveStorageLevel: Option[StorageLevel] = None): Seq[(BlockId, BlockStatus)] = {
  require(values != null, "Values is null")
  doPut(blockId, ArrayValues(values), level, tellMaster, effectiveStorageLevel)
}

def putBytes(
    blockId: BlockId,
    bytes: ByteBuffer,
    level: StorageLevel,
    tellMaster: Boolean = true,
    effectiveStorageLevel: Option[StorageLevel] = None): Seq[(BlockId, BlockStatus)] = {
  require(bytes != null, "Bytes is null")
  doPut(blockId, ByteBufferValues(bytes), level, tellMaster, effectiveStorageLevel)
}

兩者最終都會調(diào)用doPut方法恰矩,只不過一個數(shù)據(jù)封裝為IteratorValues另一個為ArrayValues,一個對應(yīng)磁盤一個對應(yīng)內(nèi)存憎蛤,還有一個方法也會調(diào)用doPut方法外傅,就是BlockManager的putBytes方法,對應(yīng)的是外部存儲俩檬。

doPut方法篇幅略長萎胰,分為以下三個部分來說明,

  1. 其中分類緩存數(shù)據(jù)的部分代碼如下所示:
try {
  // returnValues - Whether to return the values put
  // blockStore - The type of storage to put these values into
  val (returnValues, blockStore: BlockStore) = {
    if (putLevel.useMemory) {
      // Put it in memory first, even if it also has useDisk set to true;
      // We will drop it to disk later if the memory store can't hold it.
      (true, memoryStore)
    } else if (putLevel.useOffHeap) {
      // Use external block store
      (false, externalBlockStore)
    } else if (putLevel.useDisk) {
      // Don't get back the bytes from put unless we replicate them
      (putLevel.replication > 1, diskStore)
    } else {
      assert(putLevel == StorageLevel.NONE)
      throw new BlockException(
        blockId, s"Attempted to put block $blockId without specifying storage level!")
    }
  }
  // Actually put the values
  val result: PutResult = data match {
    case IteratorValues(iterator) =>
      blockStore.putIterator(blockId, iterator, putLevel, returnValues)
    case ArrayValues(array) =>
      blockStore.putArray(blockId, array, putLevel, returnValues)
    case ByteBufferValues(bytes) =>
      bytes.rewind()
      blockStore.putBytes(blockId, bytes, putLevel)
  }

這里存入的數(shù)據(jù)結(jié)構(gòu)為memoryStore棚辽、diskStore技竟、externalBlockStore分別對應(yīng)著存儲級別中的內(nèi)存、磁盤和外部存儲屈藐,他們緩存數(shù)據(jù)的邏輯后面單獨說明榔组,這里分別調(diào)用了它們的putArray、putIterator联逻、putBytes方法搓扯。
其中需要注意的是,如果存儲級別是MEMORY_AND_DISK包归,代碼中體現(xiàn)了優(yōu)先存儲在內(nèi)存memoryStore中锨推,等到內(nèi)存滿了才會寫到diskStore中。

2.其次是副本邏輯公壤,代碼體現(xiàn)如下:

// If we're storing bytes, then initiate the replication before storing them locally.
// This is faster as data is already serialized and ready to send.
val replicationFuture = data match {
  case b: ByteBufferValues if putLevel.replication > 1 =>
    // Duplicate doesn't copy the bytes, but just creates a wrapper
    val bufferView = b.buffer.duplicate()
    Future {
      // This is a blocking action and should run in futureExecutionContext which is a cached
      // thread pool
      replicate(blockId, bufferView, putLevel)
    }(futureExecutionContext)
  case _ => null
}

這里啟動一個Future的線程優(yōu)先去處理ByteBufferValues類的數(shù)據(jù)(也就是外部存儲類的數(shù)據(jù))的復制换可,其中的核心方法是replicate方法,感興趣的話可以深入了解一下厦幅。

那么IteratorValues類型的數(shù)據(jù)和ArrayValues類型的副本邏輯怎么處理呢锦担?請看以下代碼:

if (putLevel.replication > 1) {
  data match {
    case ByteBufferValues(bytes) =>
      if (replicationFuture != null) {
        Await.ready(replicationFuture, Duration.Inf)
      }
    case _ =>
      val remoteStartTime = System.currentTimeMillis
      // Serialize the block if not already done
      if (bytesAfterPut == null) {
        if (valuesAfterPut == null) {
          throw new SparkException(
            "Underlying put returned neither an Iterator nor bytes! This shouldn't happen.")
        }
        bytesAfterPut = dataSerialize(blockId, valuesAfterPut)
      }
      replicate(blockId, bytesAfterPut, putLevel)
      logDebug("Put block %s remotely took %s"
        .format(blockId, Utils.getUsedTimeMs(remoteStartTime)))
  }
}

如果是ByteBufferValues類型的數(shù)據(jù),那么會對應(yīng)上面的代碼慨削,去等待那個Future線程的回傳值洞渔。
如果是另外兩種類型套媚,則首先進行序列化,然后調(diào)用replicate方法去進行復制操作磁椒。

  1. tellMaster將更新上報Master
if (tellMaster) {
  reportBlockStatus(blockId, putBlockInfo, putBlockStatus)
}

調(diào)用reportBlockStatus方法向master匯報更新堤瘤,最后會向BlockManagerMasterEndpoint發(fā)送UpdateBlockInfo消息,而Master會在收到消息后更新Block的元數(shù)據(jù)


存儲Block的類
BlockStore

Block存儲的抽象類浆熔,定義了接口的一些基本功能和方法:

/**
 * Abstract class to store blocks.
 */
private[spark] abstract class BlockStore(val blockManager: BlockManager) extends Logging {
  // 根據(jù)StorageLevel將blockId標識的Block的內(nèi)容bytes寫入系統(tǒng)
  def putBytes(blockId: BlockId, bytes: ByteBuffer, level: StorageLevel): PutResult
  /**
   * Put in a block and, possibly, also return its content as either bytes or another Iterator.
   * This is used to efficiently write the values to multiple locations (e.g. for replication).
   *
   * @return a PutResult that contains the size of the data, as well as the values put if
   *         returnValues is true (if not, the result's data field can be null)
   */
  // 將values寫入系統(tǒng)本辐,如果returnValues為true,需要將結(jié)果寫入PutResult
  def putIterator(
    blockId: BlockId,
    values: Iterator[Any],
    level: StorageLevel,
    returnValues: Boolean): PutResult
  //同上医增,只不過由Iterator變成Array
  def putArray(
    blockId: BlockId,
    values: Array[Any],
    level: StorageLevel,
    returnValues: Boolean): PutResult
  /**
   * Return the size of a block in bytes.
   */
  // 獲得Block的大小
  def getSize(blockId: BlockId): Long
  // 獲得Block的數(shù)據(jù)慎皱,返回類型ByteBuffer
  def getBytes(blockId: BlockId): Option[ByteBuffer]
  // 獲取Block的數(shù)據(jù),返回類型Iterator[Any]
  def getValues(blockId: BlockId): Option[Iterator[Any]]
  /**
   * Remove a block, if it exists.
   * @param blockId the block to remove.
   * @return True if the block was found and removed, False otherwise.
   */
  // 刪除Block叶骨,成功返回true茫多, 否則返回false
  def remove(blockId: BlockId): Boolean
  // 查詢是否包含某個Block
  def contains(blockId: BlockId): Boolean
  // 退出時清理回收資源
  def clear() { }
}

BlockStore的實現(xiàn)類有三個MemoryStore、DiskStore忽刽、ExternalBlockStore天揖,分別對應(yīng)了存儲級別的內(nèi)存、磁盤和外部存儲跪帝。

MemoryStore維護了一個數(shù)據(jù)結(jié)構(gòu)今膊,是一個HashMap
private val entries = new LinkedHashMap[BlockId, MemoryEntry](32, 0.75f, true)
所有需要緩存在內(nèi)存中的數(shù)據(jù)都是通過tryToPut方法維護到這個數(shù)據(jù)結(jié)構(gòu)中,如果內(nèi)存不夠的話會釋放一些老的緩存伞剑,如果存儲級別中還有磁盤斑唬,就會調(diào)用DiskStore的putIterator寫入Disk,如果沒有黎泣,那么就不緩存這部分數(shù)據(jù)恕刘,下次需要就重新計算。

DiskStore將數(shù)據(jù)持久化到磁盤中聘裁,會以什么樣的形式存儲呢雪营?我們來看DiskStore的putIterator方法:

override def putIterator(
    blockId: BlockId,
    values: Iterator[Any],
    level: StorageLevel,
    returnValues: Boolean): PutResult = {
  logDebug(s"Attempting to write values for block $blockId")
  val startTime = System.currentTimeMillis
  val file = diskManager.getFile(blockId)   //獲取文件句柄
  val outputStream = new FileOutputStream(file)   //創(chuàng)建流
  try {
    Utils.tryWithSafeFinally {
      blockManager.dataSerializeStream(blockId, outputStream, values) //序列化流
    } {
      // Close outputStream here because it should be closed before file is deleted.
      outputStream.close()
    }
  } catch {
    case e: Throwable =>
      if (file.exists()) {
        if (!file.delete()) {
          logWarning(s"Error deleting ${file}")
        }
      }
      throw e
  }
  val length = file.length
  val timeTaken = System.currentTimeMillis - startTime
  logDebug("Block %s stored as %s file on disk in %d ms".format(
    file.getName, Utils.bytesToString(length), timeTaken))
  if (returnValues) {
    // Return a byte buffer for the contents of the file
    val buffer = getBytes(blockId).get
    PutResult(length, Right(buffer))
  } else {
    PutResult(length, null)
  }

以上代碼中創(chuàng)建了一個文件流,序列化之后就寫入了本地的物理文件衡便,獲取文件句柄的方法為getFile献起,接著看其實現(xiàn):

def getFile(filename: String): File = {
  // Figure out which local directory it hashes to, and which subdirectory in that
  val hash = Utils.nonNegativeHash(filename)  //根據(jù)文件名hash值獲取文件應(yīng)該存放的層級位置
  val dirId = hash % localDirs.length
  val subDirId = (hash / localDirs.length) % subDirsPerLocalDir
  // Create the subdirectory if it doesn't already exist
  val subDir = subDirs(dirId).synchronized {
    val old = subDirs(dirId)(subDirId)
    if (old != null) {
      old
    } else {
      val newDir = new File(localDirs(dirId), "%02x".format(subDirId))
      if (!newDir.exists() && !newDir.mkdir()) {
        throw new IOException(s"Failed to create local dir in $newDir.")
      }
      subDirs(dirId)(subDirId) = newDir
      newDir
    }
  }
  new File(subDir, filename)  //創(chuàng)建文件句柄
}

根據(jù)BlockId的name的hash值取得文件的存放路徑,然后創(chuàng)建一個文件句柄將數(shù)據(jù)寫入物理文件镣陕,而這個物理文件的路徑可以通過spark.local.dir來進行配置谴餐,在yarn-cluster模式下,這個路徑會被yarn.nodemanager.local-dirs替換呆抑。


Storage模塊調(diào)優(yōu)
  1. 首先就是本篇文章開始時說的選擇存儲級別的注意事項岂嗓,盡量使用內(nèi)存,少用磁盤鹊碍,序列化和副本根據(jù)情況選擇使用厌殉。
  2. spark.local.dir:
    磁盤存儲級別物理文件的路徑設(shè)置項食绿,盡量配置多個路徑(用逗號隔開),如有條件最好選擇SSD盤公罕。
  3. spark.memory.storageFraction:
    可用內(nèi)存中器紧,用于Storage模塊緩存數(shù)據(jù)的占比,默認為0.5楼眷,也就是和Shuffle模塊占用內(nèi)存五五開铲汪,但是1.5版本之后,spark有一個動態(tài)內(nèi)存分配模型的功能罐柳,簡單來說就是在使用內(nèi)存的時候掌腰,Shuffle是親兒子,它可以占用分給Storage的內(nèi)存拒不歸還张吉,而Storage卻不行齿梁。
    這里調(diào)優(yōu)的策略就是,根據(jù)實際情況芦拿,如果程序RDD的緩存數(shù)據(jù)集量較大士飒,而期間很少產(chǎn)生shuffle數(shù)據(jù)的話查邢,可以適當把這個參數(shù)提高蔗崎。
  4. 堆外內(nèi)存(off-head memory)
    堆外內(nèi)存也是一種外部存儲,是spark通過調(diào)用java的unsafe相關(guān)API直接向操作系統(tǒng)要內(nèi)存扰藕,這種方式的優(yōu)點是跳過JVM的管理可以避免GC影響缓苛,缺點是需要自己來編寫內(nèi)存申請和釋放的邏輯
    spark.memory.offHeap.enabled
    默認為false,設(shè)置為true打開堆外內(nèi)存功能
    spark.memory.offHeap.size
    默認為0邓深,,打開堆外內(nèi)存功能后未桥,方可設(shè)置內(nèi)存大小,但在配置的時候需要小心內(nèi)存溢出的問題

總結(jié)

關(guān)于Storage模塊的源碼閱讀就分析到這兒芥备,閱讀Storage模塊的源碼有助于了解RDD之下冬耿,系統(tǒng)又做了哪些操作,RDD實現(xiàn)了邏輯萌壳,而Storage管理著數(shù)據(jù)亦镶。通過閱讀源碼,對于今后的問題定位和性能調(diào)優(yōu)提供了理論依據(jù)袱瓮。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末缤骨,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子尺借,更是在濱河造成了極大的恐慌绊起,老刑警劉巖,帶你破解...
    沈念sama閱讀 223,002評論 6 519
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件燎斩,死亡現(xiàn)場離奇詭異虱歪,居然都是意外死亡蜂绎,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 95,357評論 3 400
  • 文/潘曉璐 我一進店門笋鄙,熙熙樓的掌柜王于貴愁眉苦臉地迎上來荡碾,“玉大人,你說我怎么就攤上這事局装√秤酰” “怎么了?”我有些...
    開封第一講書人閱讀 169,787評論 0 365
  • 文/不壞的土叔 我叫張陵铐尚,是天一觀的道長拨脉。 經(jīng)常有香客問我,道長宣增,這世上最難降的妖魔是什么玫膀? 我笑而不...
    開封第一講書人閱讀 60,237評論 1 300
  • 正文 為了忘掉前任,我火速辦了婚禮爹脾,結(jié)果婚禮上帖旨,老公的妹妹穿的比我還像新娘。我一直安慰自己灵妨,他們只是感情好解阅,可當我...
    茶點故事閱讀 69,237評論 6 398
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著泌霍,像睡著了一般货抄。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上朱转,一...
    開封第一講書人閱讀 52,821評論 1 314
  • 那天蟹地,我揣著相機與錄音,去河邊找鬼藤为。 笑死怪与,一個胖子當著我的面吹牛,可吹牛的內(nèi)容都是我干的缅疟。 我是一名探鬼主播分别,決...
    沈念sama閱讀 41,236評論 3 424
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼窿吩!你這毒婦竟也來了茎杂?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 40,196評論 0 277
  • 序言:老撾萬榮一對情侶失蹤纫雁,失蹤者是張志新(化名)和其女友劉穎煌往,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 46,716評論 1 320
  • 正文 獨居荒郊野嶺守林人離奇死亡刽脖,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,794評論 3 343
  • 正文 我和宋清朗相戀三年羞海,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片曲管。...
    茶點故事閱讀 40,928評論 1 353
  • 序言:一個原本活蹦亂跳的男人離奇死亡却邓,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出院水,到底是詐尸還是另有隱情腊徙,我是刑警寧澤,帶...
    沈念sama閱讀 36,583評論 5 351
  • 正文 年R本政府宣布檬某,位于F島的核電站撬腾,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏恢恼。R本人自食惡果不足惜民傻,卻給世界環(huán)境...
    茶點故事閱讀 42,264評論 3 336
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望场斑。 院中可真熱鬧漓踢,春花似錦、人聲如沸漏隐。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,755評論 0 25
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽锁保。三九已至薯酝,卻和暖如春半沽,著一層夾襖步出監(jiān)牢的瞬間爽柒,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,869評論 1 274
  • 我被黑心中介騙來泰國打工者填, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留浩村,地道東北人。 一個月前我還...
    沈念sama閱讀 49,378評論 3 379
  • 正文 我出身青樓占哟,卻偏偏與公主長得像心墅,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子榨乎,可洞房花燭夜當晚...
    茶點故事閱讀 45,937評論 2 361

推薦閱讀更多精彩內(nèi)容