【Spark】Spark 存儲(chǔ)原理--通信層架構(gòu)分析

本篇結(jié)構(gòu):

  • 概念介紹
  • 初始化過程
  • 相互通信

Spark 存儲(chǔ)管理模塊采用的是主從結(jié)構(gòu)(Master/Slave)來實(shí)現(xiàn)通信層订晌,主節(jié)點(diǎn)和從節(jié)點(diǎn)之間傳輸控制信息疟位、狀態(tài)信息,通信的底層還是依賴 Spark Rpc 框架飒赃。

Master 主要負(fù)責(zé)整個(gè)應(yīng)用程序在運(yùn)行期間 block 元數(shù)據(jù)的管理和維護(hù)禾酱,Slave 主要負(fù)責(zé)將本地?cái)?shù)據(jù)塊的狀態(tài)的匯報(bào)給 Master,而且接收 Master 傳過來的執(zhí)行指令霉赡,比如獲取數(shù)據(jù)塊狀態(tài)橄务,刪除 RDD 數(shù)據(jù)塊等。

一穴亏、概念介紹

在閱讀 Spark 存儲(chǔ)原理相關(guān)源碼前蜂挪,先了解下 Spark 存儲(chǔ)相關(guān)的類。

Block 是 Spark 數(shù)據(jù)處理的時(shí)候最小單位嗓化,是物理層面的儲(chǔ)存單位棠涮,和邏輯層面 RDD 的分區(qū)是一一對(duì)應(yīng)的。

BlockManager刺覆、BlockManagerMaster 及 BlockManagerMasterEndpoint 這幾個(gè)類從名字上看很相似严肪,理解較模糊,下面簡(jiǎn)單介紹谦屑。

1.1驳糯、BlockManager

顧名思義,BlockManager 被 Spark 用來管理 BlockManager 所在節(jié)點(diǎn)內(nèi)存和磁盤中的數(shù)據(jù)塊氢橙。

Driver 和 Executor 節(jié)點(diǎn)都會(huì)創(chuàng)建 BlockManager 酝枢。Driver 上的 BlockManager 不具備實(shí)際存儲(chǔ)的能力,它記錄了各個(gè) Executor 的 BlockManager 的狀態(tài)悍手。Executor 上的 BlockManager 負(fù)責(zé)數(shù)據(jù)的讀寫請(qǐng)求隧枫,刪除等操作,并向 Driver 節(jié)點(diǎn)注冊(cè)谓苟,匯報(bào)其所管理的數(shù)據(jù)塊元數(shù)據(jù)信息。

1.2协怒、BlockManagerMaster

和 BlockManager 一樣涝焙,BlockManagerMaster 也在 SparkEnv 中創(chuàng)建。

BlockManager 負(fù)責(zé)管理其所在節(jié)點(diǎn)的 Block 數(shù)據(jù)塊孕暇,而 BlockManagerMaster 主要負(fù)責(zé)整個(gè)應(yīng)用程序在運(yùn)行期間 block 元數(shù)據(jù)的管理和維護(hù)仑撞,以及向從節(jié)點(diǎn)發(fā)送指令執(zhí)行命令赤兴。

1.3、BlockManagerMasterEndpoint隧哮、BlockManagerSlaveEndpoint

BlockManagerMaster 并不具備通信的能力桶良,真正通信的是 BlockManagerMasterEndpoint,BlockManagerSlaveEndpoint沮翔,它們負(fù)責(zé)通過遠(yuǎn)程消息通信的方式去管理所有節(jié)點(diǎn)的 BlockManager陨帆。

BlockManagerMasterEndpoint 是 Driver 的通信端點(diǎn),BlockManagerSlaveEndpoint 是 Executor 的通信端點(diǎn)采蚀,Driver 通過 BlockManagerMaster 管理所有的 Block 數(shù)據(jù)塊疲牵,向 Executor 發(fā)送操作 Block 數(shù)據(jù)塊的請(qǐng)求是通過 BlockManagerMasterEndpoint 和 BlockManagerSlaveEndpoint 的通信實(shí)現(xiàn),BlockManagerMasterEndpoint 或者 BlockManagerSlaveEndpoint 收到請(qǐng)求后調(diào)用 BlockManager 去實(shí)際操控 Block 數(shù)據(jù)榆鼠。

二纲爸、初始化過程

通過 Spark-submit 啟動(dòng)一個(gè)應(yīng)用程序后,如果是 client 模式妆够,應(yīng)用程序就運(yùn)行在提交程序端识啦,如果是 cluster 模式,應(yīng)用程序由 Master 分配到 Worker 中運(yùn)行神妹,應(yīng)用程序運(yùn)行的進(jìn)程稱為 driver 端颓哮。

應(yīng)用程序啟動(dòng)時(shí),都會(huì)構(gòu)建 SparkContext灾螃,SparkContext 中會(huì)創(chuàng)建 Driver 端的通信底層框架 SparkEnv题翻,在該 SparkEnv 初始化時(shí)又會(huì)實(shí)例化 BlockManager 和BlockManagerMaster,BlockManagerMaster 實(shí)例化過程中內(nèi)部創(chuàng)建消息通信的終端點(diǎn) BlockManagerMasterEndPoint 腰鬼。

在 SparkEnv 初始化時(shí)還會(huì)創(chuàng)建 BlockTransferService 負(fù)責(zé)網(wǎng)絡(luò)數(shù)據(jù)傳輸服務(wù)嵌赠。

應(yīng)用程序啟動(dòng)后會(huì)和 Master 通信,由 Master 根據(jù)應(yīng)用程序的參數(shù)與 Worker 通信啟動(dòng)相應(yīng)數(shù)量的 Executor熄赡,Executor 啟動(dòng)中也會(huì)創(chuàng)建 SparkEnv姜挺,同樣會(huì)初始化 Block 數(shù)據(jù)塊管理相關(guān)的類,只是根據(jù) Driver 和 Executor 的不同彼硫,這些類的細(xì)節(jié)有部分不同炊豪,使用上也有不同。

SparkEnv:

// 遠(yuǎn)程數(shù)據(jù)傳輸服務(wù)拧篮,使用 Netty 實(shí)現(xiàn)
val blockTransferService =
  new NettyBlockTransferService(conf, securityManager, bindAddress, advertiseAddress,
    blockManagerPort, numUsableCores)

// 創(chuàng)建 BlockMangerMaster词渤,如果是 Dirver端,在 BlockMangerMaster串绩,內(nèi)部則創(chuàng)建終端點(diǎn) BlockManagerMasterEndpoint // 如果是 Executor缺虐,則持有 BlockManagerMasterEndpoint 的引用
val blockManagerMaster = new BlockManagerMaster(registerOrLookupEndpoint(
  BlockManagerMaster.DRIVER_ENDPOINT_NAME,
  new BlockManagerMasterEndpoint(rpcEnv, isLocal, conf, listenerBus)),
  conf, isDriver)

// NB: blockManager is not valid until initialize() is called later.
// 創(chuàng)建 BlockManager,運(yùn)行在每個(gè)節(jié)點(diǎn)(Driver 和 Executor)上礁凡,該節(jié)點(diǎn)提供接口高氮,用于管理本地節(jié)點(diǎn) Block 數(shù)據(jù)塊慧妄。
val blockManager = new BlockManager(executorId, rpcEnv, blockManagerMaster,
  serializerManager, conf, memoryManager, mapOutputTracker, shuffleManager,
  blockTransferService, securityManager, numUsableCores)
  1. 先具體看 BlockManagerMaster 的創(chuàng)建,該過程中會(huì)調(diào)用 registerOrLookupEndpoint 方法剪芍。

SparkEnv # registerOrLookupEndpoint:

def registerOrLookupEndpoint(
    name: String, endpointCreator: => RpcEndpoint):
  RpcEndpointRef = {
  if (isDriver) {
    logInfo("Registering " + name)
    rpcEnv.setupEndpoint(name, endpointCreator)
  } else {
    RpcUtils.makeDriverRef(name, conf, rpcEnv)
  }
}

RpcUtils # makeDriverRef:

/**
 * Retrieve a `RpcEndpointRef` which is located in the driver via its name.
 */
def makeDriverRef(name: String, conf: SparkConf, rpcEnv: RpcEnv): RpcEndpointRef = {
  val driverHost: String = conf.get("spark.driver.host", "localhost")
  val driverPort: Int = conf.getInt("spark.driver.port", 7077)
  Utils.checkHost(driverHost)
  rpcEnv.setupEndpointRef(RpcAddress(driverHost, driverPort), name)
}

可以看到對(duì)于 Driver 端塞淹,則創(chuàng)建 BlockManagerMasterEndpoint 通信終端,而 Executor 則創(chuàng)建 BlockManagerMasterEndpoint 的引用 RpcEndpointRef罪裹。這樣 Executor 就能向發(fā)起通信

2.再看 BlockManager 的創(chuàng)建饱普,其初始化時(shí)會(huì)創(chuàng)建 BlockManagerSlaveEndpoint 通信終端。

BlockManager:

private val slaveEndpoint = rpcEnv.setupEndpoint(
  "BlockManagerEndpoint" + BlockManager.ID_GENERATOR.next,
  new BlockManagerSlaveEndpoint(rpcEnv, this, mapOutputTracker))

并且在 BlockManager 實(shí)例化后調(diào)用 initialize 方法中會(huì)將

BlockManagerSlaveEndpoint 注冊(cè)到 Driver 中坊谁,這樣 Driver 端就持有 BlockManagerSlaveEndpoint 通信端點(diǎn)的引用费彼。顯然 Driver 端即是 BlockManagerMasterEndpoint 也是 BlockManagerSlaveEndpoint ,而 Executor 端只是 BlockManagerSlaveEndpoint 口芍。

BlockManager # initialize:

val idFromMaster = master.registerBlockManager(
  id,
  maxOnHeapMemory,
  maxOffHeapMemory,
  slaveEndpoint)

BlockManagerMaster # registerBlockManager:

/**
 * Register the BlockManager's id with the driver. The input BlockManagerId does not contain
 * topology information. This information is obtained from the master and we respond with an
 * updated BlockManagerId fleshed out with this information.
 */
def registerBlockManager(
    blockManagerId: BlockManagerId,
    maxOnHeapMemSize: Long,
    maxOffHeapMemSize: Long,
    slaveEndpoint: RpcEndpointRef): BlockManagerId = {
  logInfo(s"Registering BlockManager $blockManagerId")
  val updatedId = driverEndpoint.askSync[BlockManagerId](
    RegisterBlockManager(blockManagerId, maxOnHeapMemSize, maxOffHeapMemSize, slaveEndpoint))
  logInfo(s"Registered BlockManager $updatedId")
  updatedId
}

Driver 端持有 Executor 端 BlockManagerSlaveEndpoint 引用箍铲,Executor 又持有 Driver 端 BlockManagerMasterEndpoint 引用,Driver 和 Executor 相互持有 鬓椭,可以在應(yīng)用程序執(zhí)行中相互通信颠猴。

三、相互通信

在 BlockManagerMasterEndpoint 中有三個(gè) HashMap 存放數(shù)據(jù)塊的元數(shù)據(jù)小染。

// Mapping from block manager id to the block manager's information.
// 存放了 BlockMangerId 與 BlockMangerInfo 的對(duì)應(yīng)翘瓮,其中BlockMangerInfo 包含了 Executor 內(nèi)存使用情況、數(shù)據(jù)塊的使用情況裤翩、已被緩存的數(shù)據(jù)塊和 Executor 終端點(diǎn)引用资盅,通過該引用可以向 Execuotr 發(fā)送消息
private val blockManagerInfo = new mutable.HashMap[BlockManagerId, BlockManagerInfo]

// Mapping from executor ID to block manager ID.
// 存放了 ExecutorID 和 BlockMangerID 對(duì)應(yīng)列表
private val blockManagerIdByExecutor = new mutable.HashMap[String, BlockManagerId]

// Mapping from block id to the set of block managers that have the block.
// 存放了 BlockId 和 BlockManagerId 序列對(duì)應(yīng)的列表,原因在于一個(gè)數(shù)據(jù)塊可能存在多個(gè)副本踊赠,保持在多個(gè) Executor 中
private val blockLocations = new JHashMap[BlockId, mutable.HashSet[BlockManagerId]]

3.1呵扛、更新數(shù)據(jù)塊元信息

當(dāng)寫入、更新或刪除數(shù)據(jù)完畢后筐带,發(fā)送數(shù)據(jù)塊的最新狀態(tài)消息 UpdateBlockInfo 給 BlockMangerMasterEndPoint 終端點(diǎn)今穿,由其更新數(shù)據(jù)塊的元數(shù)據(jù),主要更新 BlockManagerInfo 和 BlockLocations 兩個(gè)列表伦籍。

BlockManagerMasterEndpoint # receiveAndReply:

override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
  ...
  case _updateBlockInfo @
      UpdateBlockInfo(blockManagerId, blockId, storageLevel, deserializedSize, size) =>
    context.reply(updateBlockInfo(blockManagerId, blockId, storageLevel, deserializedSize, size))
    listenerBus.post(SparkListenerBlockUpdated(BlockUpdatedInfo(_updateBlockInfo)))
    
  ...

BlockManagerMasterEndpoint # updateBlockInfo:

private def updateBlockInfo(
    blockManagerId: BlockManagerId,
    blockId: BlockId,
    storageLevel: StorageLevel,
    memSize: Long,
    diskSize: Long): Boolean = {

  if (!blockManagerInfo.contains(blockManagerId)) {
    if (blockManagerId.isDriver && !isLocal) {
      // We intentionally do not register the master (except in local mode),
      // so we should not indicate failure.
      return true
    } else {
      return false
    }
  }

  if (blockId == null) {
    blockManagerInfo(blockManagerId).updateLastSeenMs()
    return true
  }

  blockManagerInfo(blockManagerId).updateBlockInfo(blockId, storageLevel, memSize, diskSize)

  var locations: mutable.HashSet[BlockManagerId] = null
  if (blockLocations.containsKey(blockId)) {
    locations = blockLocations.get(blockId)
  } else {
    locations = new mutable.HashSet[BlockManagerId]
    blockLocations.put(blockId, locations)
  }

  if (storageLevel.isValid) {
    locations.add(blockManagerId)
  } else {
    locations.remove(blockManagerId)
  }

  // Remove the block from master tracking if it has been removed on all slaves.
  if (locations.size == 0) {
    blockLocations.remove(blockId)
  }
  true
}
  • 在處理 BokcMangerInfo 時(shí)蓝晒,傳入 BlockMangerId、blockId 和SotrageLevel 等參數(shù)帖鸦,通過這些參數(shù)判斷數(shù)據(jù)的操作是插入芝薇、更新還是刪除操作。

BlockManagerMasterEndpoint # updateBlockInfo:

def updateBlockInfo(
    blockId: BlockId,
    storageLevel: StorageLevel,
    memSize: Long,
    diskSize: Long) {

  updateLastSeenMs()

  val blockExists = _blocks.containsKey(blockId)
  var originalMemSize: Long = 0
  var originalDiskSize: Long = 0
  var originalLevel: StorageLevel = StorageLevel.NONE

  if (blockExists) {
    // The block exists on the slave already.
    val blockStatus: BlockStatus = _blocks.get(blockId)
    originalLevel = blockStatus.storageLevel
    originalMemSize = blockStatus.memSize
    originalDiskSize = blockStatus.diskSize

    if (originalLevel.useMemory) {
      _remainingMem += originalMemSize
    }
  }

  if (storageLevel.isValid) {
    /* isValid means it is either stored in-memory or on-disk.
     * The memSize here indicates the data size in or dropped from memory,
     * externalBlockStoreSize here indicates the data size in or dropped from externalBlockStore,
     * and the diskSize here indicates the data size in or dropped to disk.
     * They can be both larger than 0, when a block is dropped from memory to disk.
     * Therefore, a safe way to set BlockStatus is to set its info in accurate modes. */
    var blockStatus: BlockStatus = null
    if (storageLevel.useMemory) {
      blockStatus = BlockStatus(storageLevel, memSize = memSize, diskSize = 0)
      _blocks.put(blockId, blockStatus)
      _remainingMem -= memSize
      if (blockExists) {
        logInfo(s"Updated $blockId in memory on ${blockManagerId.hostPort}" +
          s" (current size: ${Utils.bytesToString(memSize)}," +
          s" original size: ${Utils.bytesToString(originalMemSize)}," +
          s" free: ${Utils.bytesToString(_remainingMem)})")
      } else {
        logInfo(s"Added $blockId in memory on ${blockManagerId.hostPort}" +
          s" (size: ${Utils.bytesToString(memSize)}," +
          s" free: ${Utils.bytesToString(_remainingMem)})")
      }
    }
    if (storageLevel.useDisk) {
      blockStatus = BlockStatus(storageLevel, memSize = 0, diskSize = diskSize)
      _blocks.put(blockId, blockStatus)
      if (blockExists) {
        logInfo(s"Updated $blockId on disk on ${blockManagerId.hostPort}" +
          s" (current size: ${Utils.bytesToString(diskSize)}," +
          s" original size: ${Utils.bytesToString(originalDiskSize)})")
      } else {
        logInfo(s"Added $blockId on disk on ${blockManagerId.hostPort}" +
          s" (size: ${Utils.bytesToString(diskSize)})")
      }
    }
    if (!blockId.isBroadcast && blockStatus.isCached) {
      _cachedBlocks += blockId
    }
  } else if (blockExists) {
    // If isValid is not true, drop the block.
    _blocks.remove(blockId)
    _cachedBlocks -= blockId
    if (originalLevel.useMemory) {
      logInfo(s"Removed $blockId on ${blockManagerId.hostPort} in memory" +
        s" (size: ${Utils.bytesToString(originalMemSize)}," +
        s" free: ${Utils.bytesToString(_remainingMem)})")
    }
    if (originalLevel.useDisk) {
      logInfo(s"Removed $blockId on ${blockManagerId.hostPort} on disk" +
        s" (size: ${Utils.bytesToString(originalDiskSize)})")
    }
  }
}
  • 在處理 blockLoacations作儿,根據(jù) blockId 判斷 blockLocations 中是否包含該數(shù)據(jù)塊剩燥。如果包含該數(shù)據(jù)塊,則根據(jù)數(shù)據(jù)塊的操作,當(dāng)進(jìn)行數(shù)據(jù)更新時(shí)灭红,更新數(shù)據(jù)塊所在的 BlockMangerId 信息,當(dāng)進(jìn)行數(shù)據(jù)刪除時(shí)口注,則移除該 BlockMangerId 信息变擒,在刪除過程中判斷數(shù)據(jù)塊對(duì)應(yīng)的 Executor 是否為空,如果為空表示在集群中刪除了該數(shù)據(jù)塊寝志,則在 blockLoactions 刪除該數(shù)據(jù)塊信息娇斑。

3.2、獲取遠(yuǎn)程節(jié)點(diǎn)數(shù)據(jù)塊

應(yīng)用程序數(shù)據(jù)存儲(chǔ)后材部,在獲取遠(yuǎn)程節(jié)點(diǎn)數(shù)據(jù)毫缆、獲取 RDD 執(zhí)行的首選位置等操作時(shí)需要根據(jù)數(shù)據(jù)塊的編號(hào)查詢數(shù)據(jù)塊所處的位置,此時(shí)發(fā)送 GetLocations 或 GetLocationsMultipleBlockIds 給 BlockManagerMasterEndpoint 乐导,通過對(duì)元數(shù)據(jù)的查詢獲取數(shù)據(jù)塊的位置信息苦丁。

BlockManagerMasterEndpoint # receiveAndReply:

case GetLocations(blockId) =>
  context.reply(getLocations(blockId))

BlockManagerMasterEndpoint # getLocations:

private def getLocations(blockId: BlockId): Seq[BlockManagerId] = {
  if (blockLocations.containsKey(blockId)) blockLocations.get(blockId).toSeq else Seq.empty
}

3.3、刪除數(shù)據(jù)塊

Spark 提供刪除 RDD物臂、數(shù)據(jù)塊旺拉、廣播變量的方式。當(dāng)數(shù)據(jù)需要?jiǎng)h除的時(shí)候棵磷,提交刪除信息給 BlockMangerSlaveEndPoint 終端點(diǎn)蛾狗,在該終端點(diǎn)發(fā)起刪除操作,刪除操作一方面需要?jiǎng)h除 Driver 端元數(shù)據(jù)信息仪媒,另一方面需要發(fā)送消息通知 Executor沉桌,刪除對(duì)應(yīng)的物理數(shù)據(jù)。

比如 SparkContext 中 unpersistRDD:

/**
 * Unpersist an RDD from memory and/or disk storage
 */
private[spark] def unpersistRDD(rddId: Int, blocking: Boolean = true) {
  env.blockManager.master.removeRdd(rddId, blocking)
  persistentRdds.remove(rddId)
  listenerBus.post(SparkListenerUnpersistRDD(rddId))
}

BlockManagerMaster # removeRdd:

/** Remove all blocks belonging to the given RDD. */
def removeRdd(rddId: Int, blocking: Boolean) {
  val future = driverEndpoint.askSync[Future[Seq[Int]]](RemoveRdd(rddId))
  future.failed.foreach(e =>
    logWarning(s"Failed to remove RDD $rddId - ${e.getMessage}", e)
  )(ThreadUtils.sameThread)
  if (blocking) {
    timeout.awaitResult(future)
  }
}

經(jīng)過消息傳遞來到 BlockManagerMasterEndpoint 消息終端算吩。

BlockManagerMasterEndpoint # receiveAndReply:

case RemoveRdd(rddId) =>
  context.reply(removeRdd(rddId))

BlockManagerMasterEndpoint # removeRdd:

private def removeRdd(rddId: Int): Future[Seq[Int]] = {
  // First remove the metadata for the given RDD, and then asynchronously remove the blocks
  // from the slaves.

  // Find all blocks for the given RDD, remove the block from both blockLocations and
  // the blockManagerInfo that is tracking the blocks.
  // 首先根據(jù) RDDId 獲取該 RDD 對(duì)應(yīng)的數(shù)據(jù)塊信息
  val blocks = blockLocations.asScala.keys.flatMap(_.asRDDId).filter(_.rddId == rddId)
  blocks.foreach { blockId =>
    // 然后根據(jù)該 blockId 找出這些數(shù)據(jù)塊在 blockManagerId 中的列表留凭,遍歷這些列表并刪除
    val bms: mutable.HashSet[BlockManagerId] = blockLocations.get(blockId)
    bms.foreach(bm => blockManagerInfo.get(bm).foreach(_.removeBlock(blockId)))
    // 同時(shí)刪除 blockLocations 對(duì)應(yīng)數(shù)據(jù)塊的元數(shù)據(jù)
    blockLocations.remove(blockId)
  }

  // Ask the slaves to remove the RDD, and put the result in a sequence of Futures.
  // The dispatcher is used as an implicit argument into the Future sequence construction.
  // 最后發(fā)送 RemoveRDD 消息給 Executor,通知其刪除 RDD
  val removeMsg = RemoveRdd(rddId)

  val futures = blockManagerInfo.values.map { bm =>
    bm.slaveEndpoint.ask[Int](removeMsg).recover {
      case e: IOException =>
        logWarning(s"Error trying to remove RDD $rddId from block manager ${bm.blockManagerId}",
          e)
        0 // zero blocks were removed
    }
  }.toSeq

  Future.sequence(futures)
}

3.4赌莺、總結(jié)

BlockManger 存在于 Dirver 端和每個(gè) Executor 中冰抢,在 Driver 端的 BlockManger 保存了數(shù)據(jù)的元數(shù)據(jù)信息,而在 Executor 的 BlockManger 根據(jù)接受到消息進(jìn)行操作:

  • 當(dāng) Executor 的 BlockManger 接受到讀取數(shù)據(jù)時(shí)艘狭,根據(jù)數(shù)據(jù)塊所在節(jié)點(diǎn)是否為本地使用 BlockManger 不同的方法進(jìn)行處理挎扰。如果在本地,則直接調(diào)用 MemoryStore 和 DiskStore 中的取方法 getValues/getBytes 進(jìn)行讀瘸惨簟遵倦;如果在遠(yuǎn)程,則調(diào)用 BlockTransferService 的服務(wù)進(jìn)行獲取遠(yuǎn)程數(shù)據(jù)節(jié)點(diǎn)上的數(shù)據(jù)官撼。

  • 當(dāng) Executor 的 BlockManger 接收到寫入數(shù)據(jù)時(shí)梧躺,如果不需要?jiǎng)?chuàng)建副本,則調(diào)用 BlockStore 的接口方法進(jìn)行處理,根據(jù)數(shù)據(jù)寫入的存儲(chǔ)模型掠哥,決定調(diào)用對(duì)應(yīng)的寫入方法巩踏。

BlockManager # get:

/**
 * Get a block from the block manager (either local or remote).
 *
 * This acquires a read lock on the block if the block was stored locally and does not acquire
 * any locks if the block was fetched from a remote block manager. The read lock will
 * automatically be freed once the result's `data` iterator is fully consumed.
 */
def get[T: ClassTag](blockId: BlockId): Option[BlockResult] = {
  val local = getLocalValues(blockId)
  if (local.isDefined) {
    logInfo(s"Found block $blockId locally")
    return local
  }
  val remote = getRemoteValues[T](blockId)
  if (remote.isDefined) {
    logInfo(s"Found block $blockId remotely")
    return remote
  }
  None
}
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市续搀,隨后出現(xiàn)的幾起案子塞琼,更是在濱河造成了極大的恐慌,老刑警劉巖禁舷,帶你破解...
    沈念sama閱讀 218,122評(píng)論 6 505
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件彪杉,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡牵咙,警方通過查閱死者的電腦和手機(jī)派近,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,070評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來洁桌,“玉大人渴丸,你說我怎么就攤上這事≌嚼ぃ” “怎么了曙强?”我有些...
    開封第一講書人閱讀 164,491評(píng)論 0 354
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)途茫。 經(jīng)常有香客問我碟嘴,道長(zhǎng),這世上最難降的妖魔是什么囊卜? 我笑而不...
    開封第一講書人閱讀 58,636評(píng)論 1 293
  • 正文 為了忘掉前任娜扇,我火速辦了婚禮,結(jié)果婚禮上栅组,老公的妹妹穿的比我還像新娘雀瓢。我一直安慰自己,他們只是感情好玉掸,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,676評(píng)論 6 392
  • 文/花漫 我一把揭開白布刃麸。 她就那樣靜靜地躺著,像睡著了一般司浪。 火紅的嫁衣襯著肌膚如雪泊业。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,541評(píng)論 1 305
  • 那天啊易,我揣著相機(jī)與錄音吁伺,去河邊找鬼。 笑死租谈,一個(gè)胖子當(dāng)著我的面吹牛篮奄,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播,決...
    沈念sama閱讀 40,292評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼窟却,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼昼丑!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起夸赫,我...
    開封第一講書人閱讀 39,211評(píng)論 0 276
  • 序言:老撾萬榮一對(duì)情侶失蹤矾克,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后憔足,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,655評(píng)論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡酒繁,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,846評(píng)論 3 336
  • 正文 我和宋清朗相戀三年滓彰,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片州袒。...
    茶點(diǎn)故事閱讀 39,965評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡揭绑,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出郎哭,到底是詐尸還是另有隱情他匪,我是刑警寧澤,帶...
    沈念sama閱讀 35,684評(píng)論 5 347
  • 正文 年R本政府宣布夸研,位于F島的核電站邦蜜,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏亥至。R本人自食惡果不足惜悼沈,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,295評(píng)論 3 329
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望姐扮。 院中可真熱鬧絮供,春花似錦、人聲如沸茶敏。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,894評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)惊搏。三九已至贮乳,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間胀屿,已是汗流浹背塘揣。 一陣腳步聲響...
    開封第一講書人閱讀 33,012評(píng)論 1 269
  • 我被黑心中介騙來泰國(guó)打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留宿崭,地道東北人亲铡。 一個(gè)月前我還...
    沈念sama閱讀 48,126評(píng)論 3 370
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親奖蔓。 傳聞我的和親對(duì)象是個(gè)殘疾皇子赞草,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,914評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容