spark源碼閱讀之storage模塊①

Storage模塊負責(zé)管理spark在計算過程中產(chǎn)生的數(shù)據(jù),對用戶來說,spark的編程面向的是RDD這種抽象的邏輯數(shù)據(jù)集筷畦,對RDD的轉(zhuǎn)換和動作完成對數(shù)據(jù)運算邏輯的處理。而在RDD優(yōu)雅外表之下刺洒,Storage模塊則是兢兢業(yè)業(yè)的管理著數(shù)據(jù)的計算鳖宾,可以說是背后的功臣。

storage模塊的架構(gòu)
storage-frame1.png

如上圖所示逆航,Storage模塊與Driver和Executor遙相呼應(yīng)鼎文,也是標(biāo)準(zhǔn)的Master-Slave的模式,Block對應(yīng)的是RDD中Partition的概念纸泡,是用來存儲數(shù)據(jù)的漂问,而BlockManager就是用來管理每個節(jié)點上的Block,BlockManager的Master角色是在Driver上創(chuàng)建的女揭,而Slave角色是在Executors上創(chuàng)建的蚤假,想要理解這個架構(gòu)圖,需要先看一下BlockManager類的構(gòu)造方法和變量

**
 * Manager running on every node (driver and executors) which provides interfaces for putting and
 * retrieving blocks both locally and remotely into various stores (memory, disk, and off-heap).
 *
 * Note that #initialize() must be called before the BlockManager is usable.
 */
private[spark] class BlockManager(
    executorId: String,
    rpcEnv: RpcEnv,
    val master: BlockManagerMaster,
    defaultSerializer: Serializer,
    val conf: SparkConf,
    val memoryManager: MemoryManager,
    mapOutputTracker: MapOutputTracker,
    shuffleManager: ShuffleManager,
    blockTransferService: BlockTransferService,
    securityManager: SecurityManager,
    numUsableCores: Int)
  extends BlockDataManager with Logging

在BlockManager的參數(shù)列表中吧兔,傳入了BlockManagerMaster實例

private val slaveEndpoint = rpcEnv.setupEndpoint(
    "BlockManagerEndpoint" + BlockManager.ID_GENERATOR.next,
    new BlockManagerSlaveEndpoint(rpcEnv, this, mapOutputTracker))

而在BlockManager的變量中磷仰,創(chuàng)建了BlockManagerSlaveEndpoint對象

BlockManager運行在每一個節(jié)點(Driver和Executors),需要調(diào)用其initialize()方法進行初始化后才能使用境蔼,initialize方法:

def initialize(appId: String): Unit = {
  // 初始化BlockTransferService和ShuffleClient用于shuffle過程
  blockTransferService.init(this)
  shuffleClient.init(appId)
  blockManagerId = BlockManagerId(
    executorId, blockTransferService.hostName, blockTransferService.port)
  // 外部shuffle服務(wù)配置
  shuffleServerId = if (externalShuffleServiceEnabled) {
    logInfo(s"external shuffle service port = $externalShuffleServicePort")
    BlockManagerId(executorId, blockTransferService.hostName, externalShuffleServicePort)
  } else {
    blockManagerId
  }
  // 向master注冊
  master.registerBlockManager(blockManagerId, maxMemory, slaveEndpoint)
  // Register Executors' configuration with the local shuffle service, if one should exist.
  if (externalShuffleServiceEnabled && !blockManagerId.isDriver) {
    registerWithExternalShuffleServer()
  }
}
private def registerWithExternalShuffleServer() {

可以看到灶平,在初始化方法中伺通,向master注冊了BlockManager的信息,也就同時注冊了所有的slaveEndpoint

以上逢享,可以總結(jié)為兩點:

  1. Driver端的BlockManager實例化了BlockManagerMaster并注冊了BlockManagerMasterEndpoint罐监,且持有所有BlockManagerSlaveEndpoint實例的引用
  2. Executor端的BlockManager實例化過程中持有了BlockManagerMaster的引用,并向BlockManagerMasterEndpoint注冊當(dāng)前節(jié)點的BlockManagerSlaveEndpoint

所以storage實際的框架應(yīng)該如下所示storage-frame2瞒爬,但為了表達簡單弓柱,就簡化為上圖storage-frame1


storage-frame2.png

BlockManagerMasterEndpoint中維護了一些數(shù)據(jù)結(jié)構(gòu),為了方便下面的源碼閱讀侧但,有必要單獨拉出來說明一下:

  1. private val blockManagerInfo = new mutable.HashMap[BlockManagerId, BlockManagerInfo]

Mapping from block manager id to the block manager's information.
保存了BlockManagerId與BlockManagerInfo的映射
BlockManagerInfo保存了Slave節(jié)點的內(nèi)存使用情況矢空、Block的狀態(tài)、BlockManagerSlaveEndpoint的Reference

  1. private val blockManagerIdByExecutor = new mutable.HashMap[String, BlockManagerId]

Mapping from executor ID to block manager ID.
保存了Executor ID與BlockManagerId的映射禀横,這樣Master就可以通過ExecutorID迅速的查找到相應(yīng)的BlockManagerId

  1. private val blockLocations = new JHashMap[BlockId, mutable.HashSet[BlockManagerId]]

Mapping from block id to the set of block managers that have the block.
保存Block是在那些BlockManager上的屁药,因為Block可能在多個Slave上都有備份

Master和Slave的消息傳遞

通過上面的架構(gòu)說明可知,Driver上的BlockManagerMaster管理著所有Block的元數(shù)據(jù)信息柏锄,而Executor上則儲存著所有的Block數(shù)據(jù)酿箭,那Master和Slave之間是如何通信的呢?我們通過RDD的unpersist()命令來了解一下這個過程绢彤。

RDD的unpersist命令是用來釋放緩存數(shù)據(jù)的

def unpersist(blocking: Boolean = true): this.type = {
  logInfo("Removing RDD " + id + " from persistence list")
  sc.unpersistRDD(id, blocking)
  storageLevel = StorageLevel.NONE
  this
}

調(diào)用sparkContext的unpersistRDD命令:

/**
 * Unpersist an RDD from memory and/or disk storage
 */
private[spark] def unpersistRDD(rddId: Int, blocking: Boolean = true) {
  env.blockManager.master.removeRdd(rddId, blocking)
  persistentRdds.remove(rddId)
  listenerBus.post(SparkListenerUnpersistRDD(rddId))
}

這里的blockManager是Driver節(jié)點的七问,調(diào)用master的removeRdd()

/** Remove all blocks belonging to the given RDD. */
def removeRdd(rddId: Int, blocking: Boolean) {
  val future = driverEndpoint.askWithRetry[Future[Seq[Int]]](RemoveRdd(rddId))
  future.onFailure {
    case e: Exception =>
      logWarning(s"Failed to remove RDD $rddId - ${e.getMessage}", e)
  }(ThreadUtils.sameThread)
  if (blocking) {
    timeout.awaitResult(future)
  }
}

BlockManagerMaster首先通知各Slave節(jié)點,發(fā)送RemoveRdd的消息茫舶,拿到結(jié)果后做一些處理

Master端的動作最后通過調(diào)用BlockManagerMasterEndpoint的removeRdd()方法來完成

private def removeRdd(rddId: Int): Future[Seq[Int]] = {
  // First remove the metadata for the given RDD, and then asynchronously remove the blocks
  // from the slaves.
  // Find all blocks for the given RDD, remove the block from both blockLocations and
  // the blockManagerInfo that is tracking the blocks.
  // 首先要刪除Master上保存的關(guān)于此RDD的元數(shù)據(jù)信息
  val blocks: Iterable[RDDBlockId] = blockLocations.asScala.keys.flatMap(_.asRDDId).filter(_.rddId == rddId)
  blocks.foreach { blockId =>
    val bms: mutable.HashSet[BlockManagerId] = blockLocations.get(blockId)
    bms.foreach(bm => blockManagerInfo.get(bm).foreach(_.removeBlock(blockId)))
    blockLocations.remove(blockId)
  }
  // Ask the slaves to remove the RDD, and put the result in a sequence of Futures.
  // The dispatcher is used as an implicit argument into the Future sequence construction.
  // 其次要刪除Slave上的關(guān)于此RDD的信息
  val removeMsg = RemoveRdd(rddId)
  Future.sequence(
    blockManagerInfo.values.map { bm =>
      bm.slaveEndpoint.ask[Int](removeMsg)
    }.toSeq
  )
}

首先處理了Master中的有關(guān)要刪除RDD的元數(shù)據(jù)信息:在blockLocations 中刪除了有關(guān)于此RDD的Block信息械巡,然后想各個slave節(jié)點發(fā)送RemoveRdd信息

BlockManagerSlaveEndpoint節(jié)點在接收到Master節(jié)點的信息后作何處理呢?

case RemoveRdd(rddId) =>
  doAsync[Int]("removing RDD " + rddId, context) {
    blockManager.removeRdd(rddId)
  }

最后調(diào)用的是BlockManager的removeBlock方法來完成slave節(jié)點刪除RDD緩存的任務(wù)

def removeBlock(blockId: BlockId, tellMaster: Boolean = true): Unit = {
  logDebug(s"Removing block $blockId")
  val info: BlockInfo = blockInfo.get(blockId).orNull
```scala
case RemoveRdd(rddId) =>
  doAsync[Int]("removing RDD " + rddId, context) {
    blockManager.removeRdd(rddId)
  }

最后調(diào)用的是BlockManager的removeBlock方法來完成slave節(jié)點刪除RDD緩存的任務(wù)

def removeBlock(blockId: BlockId, tellMaster: Boolean = true): Unit = {
  logDebug(s"Removing block $blockId")
  val info: BlockInfo = blockInfo.get(blockId).orNull
  if (info != null && pendingToRemove.putIfAbsent(blockId, currentTaskAttemptId) == 0L) {
    try {
      info.synchronized {
        val level = info.level
        // Removals are idempotent in disk store and memory store. At worst, we get a warning.
        val removedFromMemory = if (level.useMemory) memoryStore.remove(blockId) else false
        val removedFromDisk = if (level.useDisk) diskStore.remove(blockId) else false
        val removedFromExternalBlockStore =
          if (externalBlockStoreInitialized) externalBlockStore.remove(blockId) else false
        if (!removedFromMemory && !removedFromDisk && !removedFromExternalBlockStore) {
          logWarning(s"Block $blockId could not be removed as it was not found in either " +
            "the disk, memory, or external block store")
        }
        blockInfo.remove(blockId)
        val status = getCurrentBlockStatus(blockId, info)
        if (tellMaster && info.tellMaster) {
          reportBlockStatus(blockId, info, status)
        }
        Option(TaskContext.get()).foreach { tc =>
          val metrics = tc.taskMetrics()
          val lastUpdatedBlocks = metrics.updatedBlocks.getOrElse(Seq[(BlockId, BlockStatus)]())
          metrics.updatedBlocks = Some(lastUpdatedBlocks ++ Seq((blockId, status)))
        }
      }
    } finally {
      pendingToRemove.remove(blockId)
    }
  } else {
    // The block has already been removed; do nothing.
    logWarning(s"Asked to remove block $blockId, which does not exist")
  }
}

Storage數(shù)據(jù)可能存儲在內(nèi)存饶氏、磁盤或是外部存儲中讥耗,在Executor上移除相關(guān)RDD數(shù)據(jù)的時候這三個地方都需要考慮到,刪除完之后需要更新一下數(shù)據(jù)結(jié)構(gòu)和TaskContext中的元數(shù)據(jù)疹启。

經(jīng)過以上master和slave的消息傳遞和一系列操作之后古程,unpersist操作已經(jīng)完成,相關(guān)RDD的緩存已從Storage中刪除喊崖。

以下將列舉BlockManagerMasterEndpoint和BlockManagerSlaveEndpoint之間各種消息傳遞內(nèi)容:

master 的消息
// 由BlockManagerMaster向BlockManagerMasterEndpoint發(fā)起的注冊挣磨,通過注冊,Master Endpoint會保存該BlockManager所包含的Block信息
case RegisterBlockManager(blockManagerId, maxMemSize, slaveEndpoint) =>
  register(blockManagerId, maxMemSize, slaveEndpoint)
  context.reply(true)
// 向Master匯報Block的信息荤懂,Master會記錄這些信息并且供slave查詢
case _updateBlockInfo @ UpdateBlockInfo(
  blockManagerId, blockId, storageLevel, deserializedSize, size, externalBlockStoreSize) =>
  context.reply(updateBlockInfo(
    blockManagerId, blockId, storageLevel, deserializedSize, size, externalBlockStoreSize))
  listenerBus.post(SparkListenerBlockUpdated(BlockUpdatedInfo(_updateBlockInfo)))
// 獲得某個Block所在的位置信息茁裙,返回有BlockManagerId組成的列表,Block可能在多個節(jié)點都有備份
case GetLocations(blockId) =>
  context.reply(getLocations(blockId))
// 一次獲取多個Block的位置信息
case GetLocationsMultipleBlockIds(blockIds) =>
  context.reply(getLocationsMultipleBlockIds(blockIds))
// 獲取其他的BlockManager节仿,這個在做Block的副本時會用到
case GetPeers(blockManagerId) =>
  context.reply(getPeers(blockManagerId))
case GetExecutorEndpointRef(executorId) =>
  context.reply(getExecutorEndpointRef(executorId))
// 獲取所有的Executor的內(nèi)存使用狀態(tài)晤锥,包括使用的最大內(nèi)存大小、剩余內(nèi)存大小
case GetMemoryStatus =>
  context.reply(memoryStatus)
// 獲取每個Executor的Storage狀態(tài),包括最大可用內(nèi)存數(shù)和Block的信息
case GetStorageStatus =>
  context.reply(storageStatus)
// 根據(jù)blockId獲取Block的status矾瘾,一般用于測試
case GetBlockStatus(blockId, askSlaves) =>
  context.reply(blockStatus(blockId, askSlaves))
// 根據(jù)filter篩選合適的Block女轿,返回BlockId
case GetMatchingBlockIds(filter, askSlaves) =>
  context.reply(getMatchingBlockIds(filter, askSlaves))
// 根據(jù)RDDId刪除RDD的緩存
case RemoveRdd(rddId) =>
  context.reply(removeRdd(rddId))
// 根據(jù)shuffleId移除相關(guān)的Shuffle
case RemoveShuffle(shuffleId) =>
  context.reply(removeShuffle(shuffleId))
// 移除廣播變量
case RemoveBroadcast(broadcastId, removeFromDriver) =>
  context.reply(removeBroadcast(broadcastId, removeFromDriver))
// 移除Block
case RemoveBlock(blockId) =>
  removeBlockFromWorkers(blockId)
  context.reply(true)
// 刪除Master上保存的executorId對應(yīng)的Executor上的BlockManager的信息
case RemoveExecutor(execId) =>
  removeExecutor(execId)
  context.reply(true)
// 在停止BlockManagerMaster的時候調(diào)用,它會停止Master的Endpoint
case StopBlockManagerMaster =>
  context.reply(true)
  stop()
// Master和Slave的心跳的實現(xiàn)壕翩,通過Executor和Driver之間的心跳來實現(xiàn)的
case BlockManagerHeartbeat(blockManagerId) =>
  context.reply(heartbeatReceived(blockManagerId))
// 根據(jù)executorId判斷是否cache了Block
case HasCachedBlocks(executorId) =>
  blockManagerIdByExecutor.get(executorId) match {
    case Some(bm) =>
      if (blockManagerInfo.contains(bm)) {
        val bmInfo = blockManagerInfo(bm)
        context.reply(bmInfo.cachedBlocks.nonEmpty)
      } else {
        context.reply(false)
      }
    case None => context.reply(false)
  }
slave的消息
// 根據(jù)blockId刪除該Executor上的Block
case RemoveBlock(blockId) =>
  doAsync[Boolean]("removing block " + blockId, context) {
    blockManager.removeBlock(blockId)
    true
  }
// 根據(jù)rddId刪除該Executor上的Rdd
case RemoveRdd(rddId) =>
  doAsync[Int]("removing RDD " + rddId, context) {
    blockManager.removeRdd(rddId)
  }
//根據(jù)shuffleID刪除該Executor上與此shuffle有關(guān)的Block蛉迹,需要先卸載mapOutputTracker上的注冊信息
case RemoveShuffle(shuffleId) =>
  doAsync[Boolean]("removing shuffle " + shuffleId, context) {
    if (mapOutputTracker != null) {
      mapOutputTracker.unregisterShuffle(shuffleId)
    }
    SparkEnv.get.shuffleManager.unregisterShuffle(shuffleId)
  }

case RemoveBroadcast(broadcastId, _) =>
  doAsync[Int]("removing broadcast " + broadcastId, context) {
    blockManager.removeBroadcast(broadcastId, tellMaster = true)
  }
// 根據(jù)blockId向Master返回該Block的status,一般用于測試戈泼,注意這個操作非常耗時
case GetBlockStatus(blockId, _) =>
  context.reply(blockManager.getStatus(blockId))
// 根據(jù)filter向Master返回符合filter的所有BlockId婿禽,一般用于測試
case GetMatchingBlockIds(filter, _) =>
  context.reply(blockManager.getMatchingBlockIds(filter))

case TriggerThreadDump =>
  context.reply(Utils.getThreadDump())
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末赏僧,一起剝皮案震驚了整個濱河市大猛,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌淀零,老刑警劉巖挽绩,帶你破解...
    沈念sama閱讀 212,332評論 6 493
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異驾中,居然都是意外死亡唉堪,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,508評論 3 385
  • 文/潘曉璐 我一進店門肩民,熙熙樓的掌柜王于貴愁眉苦臉地迎上來唠亚,“玉大人,你說我怎么就攤上這事持痰≡钏眩” “怎么了?”我有些...
    開封第一講書人閱讀 157,812評論 0 348
  • 文/不壞的土叔 我叫張陵工窍,是天一觀的道長割卖。 經(jīng)常有香客問我,道長患雏,這世上最難降的妖魔是什么鹏溯? 我笑而不...
    開封第一講書人閱讀 56,607評論 1 284
  • 正文 為了忘掉前任,我火速辦了婚禮淹仑,結(jié)果婚禮上丙挽,老公的妹妹穿的比我還像新娘。我一直安慰自己匀借,他們只是感情好颜阐,可當(dāng)我...
    茶點故事閱讀 65,728評論 6 386
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著怀吻,像睡著了一般瞬浓。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上蓬坡,一...
    開封第一講書人閱讀 49,919評論 1 290
  • 那天猿棉,我揣著相機與錄音磅叛,去河邊找鬼。 笑死萨赁,一個胖子當(dāng)著我的面吹牛弊琴,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播杖爽,決...
    沈念sama閱讀 39,071評論 3 410
  • 文/蒼蘭香墨 我猛地睜開眼敲董,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了慰安?” 一聲冷哼從身側(cè)響起腋寨,我...
    開封第一講書人閱讀 37,802評論 0 268
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎化焕,沒想到半個月后萄窜,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,256評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡撒桨,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,576評論 2 327
  • 正文 我和宋清朗相戀三年查刻,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片凤类。...
    茶點故事閱讀 38,712評論 1 341
  • 序言:一個原本活蹦亂跳的男人離奇死亡穗泵,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出谜疤,到底是詐尸還是另有隱情佃延,我是刑警寧澤,帶...
    沈念sama閱讀 34,389評論 4 332
  • 正文 年R本政府宣布茎截,位于F島的核電站苇侵,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏企锌。R本人自食惡果不足惜榆浓,卻給世界環(huán)境...
    茶點故事閱讀 40,032評論 3 316
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望撕攒。 院中可真熱鬧陡鹃,春花似錦、人聲如沸抖坪。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,798評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽擦俐。三九已至脊阴,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背嘿期。 一陣腳步聲響...
    開封第一講書人閱讀 32,026評論 1 266
  • 我被黑心中介騙來泰國打工品擎, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人备徐。 一個月前我還...
    沈念sama閱讀 46,473評論 2 360
  • 正文 我出身青樓萄传,卻偏偏與公主長得像,于是被迫代替她去往敵國和親蜜猾。 傳聞我的和親對象是個殘疾皇子秀菱,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 43,606評論 2 350

推薦閱讀更多精彩內(nèi)容