本文為 Spark 2.0 源碼分析筆記厘托,某些實(shí)現(xiàn)可能與其他版本有所出入
再次重申標(biāo)題中的 Master 是指 Spark Storage 模塊的 Master友雳,是運(yùn)行在 driver 上的 BlockManager 及其包含的 BlockManagerMaster、RpcEnv 及 RpcEndpoint 等铅匹;而 Slave 則是指 Spark Storage 模塊的 Slave押赊,是運(yùn)行在 executor 上的 BlockManager 及其包含的 BlockManagerMaster、RpcEnv 及 RpcEndpoint 等包斑。下文也將沿用 Master 和 Slave 簡(jiǎn)稱流礁。
Master 與 Slaves 之間是通過消息進(jìn)行通信的,本文將分析 Master 與 Slaves 之間重要的消息以及這些消息是在什么時(shí)機(jī)被觸發(fā)發(fā)送的罗丰。
Master -> Slave
先來看看 Master 都會(huì)發(fā)哪些消息給 Slave
case class RemoveBlock(blockId: BlockId)
用于移除 slave 上的 block神帅。在以下兩個(gè)時(shí)機(jī)會(huì)觸發(fā):
- task 結(jié)束時(shí)
- Spark Streaming 中,清理過期的 batch 對(duì)應(yīng)的 blocks
case class RemoveRdd(rddId: Int)
用于移除歸屬于某個(gè) RDD 的所有 blocks萌抵,觸發(fā)時(shí)機(jī):
- 釋放緩存的 RDD
case class RemoveShuffle(shuffleId: Int)
用于移除歸屬于某次 shuffle 所有的 blocks找御,觸發(fā)時(shí)機(jī):
- 做 shuffle 清理的時(shí)候
case class RemoveBroadcast(broadcastId: Long, removeFromDriver: Boolean = true)
用于移除歸屬于特定 Broadcast 的所有 blocks。觸發(fā)時(shí)機(jī):
- 調(diào)用
Broadcast#destroy
銷毀廣播變量 - 調(diào)用
Broadcast#unpersist
刪除 executors 上的廣播變量拷貝
接下來看看 Slaves 發(fā)送給 Master 的消息
Slave -> Master
case class RegisterBlockManager(blockManagerId: BlockManagerId ...)
用于 Slave(executor 端 BlockManager) 向 Master(driver 端 BlockManager) 注冊(cè)绍填,觸發(fā)時(shí)機(jī):
- executor 端 BlockManager 在初始化時(shí)
case class UpdateBlockInfo(var blockManagerId: BlockManagerId, var blockId: BlockId ...)
用于向 Master 匯報(bào)指定 block 的信息霎桅,包括:storageLevel、存儲(chǔ)在內(nèi)存中的 size讨永、存儲(chǔ)在磁盤上的 size滔驶、是否 cached 等。觸發(fā)時(shí)機(jī):
- BlockManager 注冊(cè)時(shí)
- block 被移除時(shí)
- 原本存儲(chǔ)在內(nèi)存中的 block 因內(nèi)存不足而轉(zhuǎn)移到磁盤上時(shí)
- 生成新的 block 時(shí)
case class GetLocations(blockId: BlockId)
用于獲取指定 blockId 的 block 所在的 BlockManagerId 列表卿闹,觸發(fā)時(shí)機(jī):
- 檢查是否包含某個(gè) block
- 以序列化形式讀取本地或遠(yuǎn)程 BlockManagers 上的數(shù)據(jù)時(shí)
- 讀取以 blocks 形式存儲(chǔ)的 task result 時(shí)
- 讀取 Broadcast blocks 數(shù)據(jù)時(shí)
- 獲取指定 block id 對(duì)應(yīng)的 block 數(shù)據(jù)(比如獲取 RDD partition 對(duì)應(yīng)的 block)
case class RemoveExecutor(execId: String)
用于移除已 lost 的 executor 上的 BlockManager(只在 driver 端進(jìn)行操作)揭糕,觸發(fā)時(shí)機(jī):
- executor lost(一般由于 task 連續(xù)失敗導(dǎo)致)
case object StopBlockManagerMaster
用于停止 driver 或 executor 端的 BlockManager,觸發(fā)時(shí)機(jī):
- SparkContext#stop 被調(diào)用時(shí)比原,也即 driver 停止時(shí)
case object GetMemoryStatus
用于獲取各個(gè) BlockManager 的內(nèi)存使用情況插佛,包括最大可用內(nèi)存以及當(dāng)前可用內(nèi)存(當(dāng)前可用內(nèi)存=最大可用內(nèi)存-已用內(nèi)存)
case object GetStorageStatus
用于獲取各個(gè) BlockManager 的存儲(chǔ)狀態(tài),包括每個(gè) BlockManager 中都存儲(chǔ)了哪些 RDD 的哪些 block(對(duì)應(yīng) partition)以及各個(gè) block 的信息
case class BlockManagerHeartbeat(blockManagerId: BlockManagerId)
用于 Slave 向 Master 發(fā)心跳信息量窘,以通知 Master 其上的某個(gè) BlockManager 還存活著
case class HasCachedBlocks(executorId: String)
用于檢查 executor 是否有緩存 blocks(廣播變量的 blocks 不作考慮雇寇,因?yàn)閺V播變量的 block 不會(huì)匯報(bào)給 Master),觸發(fā)時(shí)機(jī):
- 檢驗(yàn)?zāi)硞€(gè) executor 是否閑置了一段時(shí)間蚌铜,即一段時(shí)間內(nèi)沒有運(yùn)行任何 tasks(這樣的 executor 會(huì)慢慢被移除)
歡迎關(guān)注我的微信公眾號(hào):FunnyBigData