本篇結(jié)構(gòu):
- 概念介紹
- 初始化過程
- 相互通信
Spark 存儲(chǔ)管理模塊采用的是主從結(jié)構(gòu)(Master/Slave)來實(shí)現(xiàn)通信層订晌,主節(jié)點(diǎn)和從節(jié)點(diǎn)之間傳輸控制信息疟位、狀態(tài)信息,通信的底層還是依賴 Spark Rpc 框架飒赃。
Master 主要負(fù)責(zé)整個(gè)應(yīng)用程序在運(yùn)行期間 block 元數(shù)據(jù)的管理和維護(hù)禾酱,Slave 主要負(fù)責(zé)將本地?cái)?shù)據(jù)塊的狀態(tài)的匯報(bào)給 Master,而且接收 Master 傳過來的執(zhí)行指令霉赡,比如獲取數(shù)據(jù)塊狀態(tài)橄务,刪除 RDD 數(shù)據(jù)塊等。
一穴亏、概念介紹
在閱讀 Spark 存儲(chǔ)原理相關(guān)源碼前蜂挪,先了解下 Spark 存儲(chǔ)相關(guān)的類。
Block 是 Spark 數(shù)據(jù)處理的時(shí)候最小單位嗓化,是物理層面的儲(chǔ)存單位棠涮,和邏輯層面 RDD 的分區(qū)是一一對(duì)應(yīng)的。
BlockManager刺覆、BlockManagerMaster 及 BlockManagerMasterEndpoint 這幾個(gè)類從名字上看很相似严肪,理解較模糊,下面簡(jiǎn)單介紹谦屑。
1.1驳糯、BlockManager
顧名思義,BlockManager 被 Spark 用來管理 BlockManager 所在節(jié)點(diǎn)內(nèi)存和磁盤中的數(shù)據(jù)塊氢橙。
Driver 和 Executor 節(jié)點(diǎn)都會(huì)創(chuàng)建 BlockManager 酝枢。Driver 上的 BlockManager 不具備實(shí)際存儲(chǔ)的能力,它記錄了各個(gè) Executor 的 BlockManager 的狀態(tài)悍手。Executor 上的 BlockManager 負(fù)責(zé)數(shù)據(jù)的讀寫請(qǐng)求隧枫,刪除等操作,并向 Driver 節(jié)點(diǎn)注冊(cè)谓苟,匯報(bào)其所管理的數(shù)據(jù)塊元數(shù)據(jù)信息。
1.2协怒、BlockManagerMaster
和 BlockManager 一樣涝焙,BlockManagerMaster 也在 SparkEnv 中創(chuàng)建。
BlockManager 負(fù)責(zé)管理其所在節(jié)點(diǎn)的 Block 數(shù)據(jù)塊孕暇,而 BlockManagerMaster 主要負(fù)責(zé)整個(gè)應(yīng)用程序在運(yùn)行期間 block 元數(shù)據(jù)的管理和維護(hù)仑撞,以及向從節(jié)點(diǎn)發(fā)送指令執(zhí)行命令赤兴。
1.3、BlockManagerMasterEndpoint隧哮、BlockManagerSlaveEndpoint
BlockManagerMaster 并不具備通信的能力桶良,真正通信的是 BlockManagerMasterEndpoint,BlockManagerSlaveEndpoint沮翔,它們負(fù)責(zé)通過遠(yuǎn)程消息通信的方式去管理所有節(jié)點(diǎn)的 BlockManager陨帆。
BlockManagerMasterEndpoint 是 Driver 的通信端點(diǎn),BlockManagerSlaveEndpoint 是 Executor 的通信端點(diǎn)采蚀,Driver 通過 BlockManagerMaster 管理所有的 Block 數(shù)據(jù)塊疲牵,向 Executor 發(fā)送操作 Block 數(shù)據(jù)塊的請(qǐng)求是通過 BlockManagerMasterEndpoint 和 BlockManagerSlaveEndpoint 的通信實(shí)現(xiàn),BlockManagerMasterEndpoint 或者 BlockManagerSlaveEndpoint 收到請(qǐng)求后調(diào)用 BlockManager 去實(shí)際操控 Block 數(shù)據(jù)榆鼠。
二纲爸、初始化過程
通過 Spark-submit 啟動(dòng)一個(gè)應(yīng)用程序后,如果是 client 模式妆够,應(yīng)用程序就運(yùn)行在提交程序端识啦,如果是 cluster 模式,應(yīng)用程序由 Master 分配到 Worker 中運(yùn)行神妹,應(yīng)用程序運(yùn)行的進(jìn)程稱為 driver 端颓哮。
應(yīng)用程序啟動(dòng)時(shí),都會(huì)構(gòu)建 SparkContext灾螃,SparkContext 中會(huì)創(chuàng)建 Driver 端的通信底層框架 SparkEnv题翻,在該 SparkEnv 初始化時(shí)又會(huì)實(shí)例化 BlockManager 和BlockManagerMaster,BlockManagerMaster 實(shí)例化過程中內(nèi)部創(chuàng)建消息通信的終端點(diǎn) BlockManagerMasterEndPoint 腰鬼。
在 SparkEnv 初始化時(shí)還會(huì)創(chuàng)建 BlockTransferService 負(fù)責(zé)網(wǎng)絡(luò)數(shù)據(jù)傳輸服務(wù)嵌赠。
應(yīng)用程序啟動(dòng)后會(huì)和 Master 通信,由 Master 根據(jù)應(yīng)用程序的參數(shù)與 Worker 通信啟動(dòng)相應(yīng)數(shù)量的 Executor熄赡,Executor 啟動(dòng)中也會(huì)創(chuàng)建 SparkEnv姜挺,同樣會(huì)初始化 Block 數(shù)據(jù)塊管理相關(guān)的類,只是根據(jù) Driver 和 Executor 的不同彼硫,這些類的細(xì)節(jié)有部分不同炊豪,使用上也有不同。
SparkEnv:
// 遠(yuǎn)程數(shù)據(jù)傳輸服務(wù)拧篮,使用 Netty 實(shí)現(xiàn)
val blockTransferService =
new NettyBlockTransferService(conf, securityManager, bindAddress, advertiseAddress,
blockManagerPort, numUsableCores)
// 創(chuàng)建 BlockMangerMaster词渤,如果是 Dirver端,在 BlockMangerMaster串绩,內(nèi)部則創(chuàng)建終端點(diǎn) BlockManagerMasterEndpoint // 如果是 Executor缺虐,則持有 BlockManagerMasterEndpoint 的引用
val blockManagerMaster = new BlockManagerMaster(registerOrLookupEndpoint(
BlockManagerMaster.DRIVER_ENDPOINT_NAME,
new BlockManagerMasterEndpoint(rpcEnv, isLocal, conf, listenerBus)),
conf, isDriver)
// NB: blockManager is not valid until initialize() is called later.
// 創(chuàng)建 BlockManager,運(yùn)行在每個(gè)節(jié)點(diǎn)(Driver 和 Executor)上礁凡,該節(jié)點(diǎn)提供接口高氮,用于管理本地節(jié)點(diǎn) Block 數(shù)據(jù)塊慧妄。
val blockManager = new BlockManager(executorId, rpcEnv, blockManagerMaster,
serializerManager, conf, memoryManager, mapOutputTracker, shuffleManager,
blockTransferService, securityManager, numUsableCores)
- 先具體看 BlockManagerMaster 的創(chuàng)建,該過程中會(huì)調(diào)用 registerOrLookupEndpoint 方法剪芍。
SparkEnv # registerOrLookupEndpoint:
def registerOrLookupEndpoint(
name: String, endpointCreator: => RpcEndpoint):
RpcEndpointRef = {
if (isDriver) {
logInfo("Registering " + name)
rpcEnv.setupEndpoint(name, endpointCreator)
} else {
RpcUtils.makeDriverRef(name, conf, rpcEnv)
}
}
RpcUtils # makeDriverRef:
/**
* Retrieve a `RpcEndpointRef` which is located in the driver via its name.
*/
def makeDriverRef(name: String, conf: SparkConf, rpcEnv: RpcEnv): RpcEndpointRef = {
val driverHost: String = conf.get("spark.driver.host", "localhost")
val driverPort: Int = conf.getInt("spark.driver.port", 7077)
Utils.checkHost(driverHost)
rpcEnv.setupEndpointRef(RpcAddress(driverHost, driverPort), name)
}
可以看到對(duì)于 Driver 端塞淹,則創(chuàng)建 BlockManagerMasterEndpoint 通信終端,而 Executor 則創(chuàng)建 BlockManagerMasterEndpoint 的引用 RpcEndpointRef罪裹。這樣 Executor 就能向發(fā)起通信
2.再看 BlockManager 的創(chuàng)建饱普,其初始化時(shí)會(huì)創(chuàng)建 BlockManagerSlaveEndpoint 通信終端。
BlockManager:
private val slaveEndpoint = rpcEnv.setupEndpoint(
"BlockManagerEndpoint" + BlockManager.ID_GENERATOR.next,
new BlockManagerSlaveEndpoint(rpcEnv, this, mapOutputTracker))
并且在 BlockManager 實(shí)例化后調(diào)用 initialize 方法中會(huì)將
BlockManagerSlaveEndpoint 注冊(cè)到 Driver 中坊谁,這樣 Driver 端就持有 BlockManagerSlaveEndpoint 通信端點(diǎn)的引用费彼。顯然 Driver 端即是 BlockManagerMasterEndpoint 也是 BlockManagerSlaveEndpoint ,而 Executor 端只是 BlockManagerSlaveEndpoint 口芍。
BlockManager # initialize:
val idFromMaster = master.registerBlockManager(
id,
maxOnHeapMemory,
maxOffHeapMemory,
slaveEndpoint)
BlockManagerMaster # registerBlockManager:
/**
* Register the BlockManager's id with the driver. The input BlockManagerId does not contain
* topology information. This information is obtained from the master and we respond with an
* updated BlockManagerId fleshed out with this information.
*/
def registerBlockManager(
blockManagerId: BlockManagerId,
maxOnHeapMemSize: Long,
maxOffHeapMemSize: Long,
slaveEndpoint: RpcEndpointRef): BlockManagerId = {
logInfo(s"Registering BlockManager $blockManagerId")
val updatedId = driverEndpoint.askSync[BlockManagerId](
RegisterBlockManager(blockManagerId, maxOnHeapMemSize, maxOffHeapMemSize, slaveEndpoint))
logInfo(s"Registered BlockManager $updatedId")
updatedId
}
Driver 端持有 Executor 端 BlockManagerSlaveEndpoint 引用箍铲,Executor 又持有 Driver 端 BlockManagerMasterEndpoint 引用,Driver 和 Executor 相互持有 鬓椭,可以在應(yīng)用程序執(zhí)行中相互通信颠猴。
三、相互通信
在 BlockManagerMasterEndpoint 中有三個(gè) HashMap 存放數(shù)據(jù)塊的元數(shù)據(jù)小染。
// Mapping from block manager id to the block manager's information.
// 存放了 BlockMangerId 與 BlockMangerInfo 的對(duì)應(yīng)翘瓮,其中BlockMangerInfo 包含了 Executor 內(nèi)存使用情況、數(shù)據(jù)塊的使用情況裤翩、已被緩存的數(shù)據(jù)塊和 Executor 終端點(diǎn)引用资盅,通過該引用可以向 Execuotr 發(fā)送消息
private val blockManagerInfo = new mutable.HashMap[BlockManagerId, BlockManagerInfo]
// Mapping from executor ID to block manager ID.
// 存放了 ExecutorID 和 BlockMangerID 對(duì)應(yīng)列表
private val blockManagerIdByExecutor = new mutable.HashMap[String, BlockManagerId]
// Mapping from block id to the set of block managers that have the block.
// 存放了 BlockId 和 BlockManagerId 序列對(duì)應(yīng)的列表,原因在于一個(gè)數(shù)據(jù)塊可能存在多個(gè)副本踊赠,保持在多個(gè) Executor 中
private val blockLocations = new JHashMap[BlockId, mutable.HashSet[BlockManagerId]]
3.1呵扛、更新數(shù)據(jù)塊元信息
當(dāng)寫入、更新或刪除數(shù)據(jù)完畢后筐带,發(fā)送數(shù)據(jù)塊的最新狀態(tài)消息 UpdateBlockInfo 給 BlockMangerMasterEndPoint 終端點(diǎn)今穿,由其更新數(shù)據(jù)塊的元數(shù)據(jù),主要更新 BlockManagerInfo 和 BlockLocations 兩個(gè)列表伦籍。
BlockManagerMasterEndpoint # receiveAndReply:
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
...
case _updateBlockInfo @
UpdateBlockInfo(blockManagerId, blockId, storageLevel, deserializedSize, size) =>
context.reply(updateBlockInfo(blockManagerId, blockId, storageLevel, deserializedSize, size))
listenerBus.post(SparkListenerBlockUpdated(BlockUpdatedInfo(_updateBlockInfo)))
...
BlockManagerMasterEndpoint # updateBlockInfo:
private def updateBlockInfo(
blockManagerId: BlockManagerId,
blockId: BlockId,
storageLevel: StorageLevel,
memSize: Long,
diskSize: Long): Boolean = {
if (!blockManagerInfo.contains(blockManagerId)) {
if (blockManagerId.isDriver && !isLocal) {
// We intentionally do not register the master (except in local mode),
// so we should not indicate failure.
return true
} else {
return false
}
}
if (blockId == null) {
blockManagerInfo(blockManagerId).updateLastSeenMs()
return true
}
blockManagerInfo(blockManagerId).updateBlockInfo(blockId, storageLevel, memSize, diskSize)
var locations: mutable.HashSet[BlockManagerId] = null
if (blockLocations.containsKey(blockId)) {
locations = blockLocations.get(blockId)
} else {
locations = new mutable.HashSet[BlockManagerId]
blockLocations.put(blockId, locations)
}
if (storageLevel.isValid) {
locations.add(blockManagerId)
} else {
locations.remove(blockManagerId)
}
// Remove the block from master tracking if it has been removed on all slaves.
if (locations.size == 0) {
blockLocations.remove(blockId)
}
true
}
- 在處理 BokcMangerInfo 時(shí)蓝晒,傳入 BlockMangerId、blockId 和SotrageLevel 等參數(shù)帖鸦,通過這些參數(shù)判斷數(shù)據(jù)的操作是插入芝薇、更新還是刪除操作。
BlockManagerMasterEndpoint # updateBlockInfo:
def updateBlockInfo(
blockId: BlockId,
storageLevel: StorageLevel,
memSize: Long,
diskSize: Long) {
updateLastSeenMs()
val blockExists = _blocks.containsKey(blockId)
var originalMemSize: Long = 0
var originalDiskSize: Long = 0
var originalLevel: StorageLevel = StorageLevel.NONE
if (blockExists) {
// The block exists on the slave already.
val blockStatus: BlockStatus = _blocks.get(blockId)
originalLevel = blockStatus.storageLevel
originalMemSize = blockStatus.memSize
originalDiskSize = blockStatus.diskSize
if (originalLevel.useMemory) {
_remainingMem += originalMemSize
}
}
if (storageLevel.isValid) {
/* isValid means it is either stored in-memory or on-disk.
* The memSize here indicates the data size in or dropped from memory,
* externalBlockStoreSize here indicates the data size in or dropped from externalBlockStore,
* and the diskSize here indicates the data size in or dropped to disk.
* They can be both larger than 0, when a block is dropped from memory to disk.
* Therefore, a safe way to set BlockStatus is to set its info in accurate modes. */
var blockStatus: BlockStatus = null
if (storageLevel.useMemory) {
blockStatus = BlockStatus(storageLevel, memSize = memSize, diskSize = 0)
_blocks.put(blockId, blockStatus)
_remainingMem -= memSize
if (blockExists) {
logInfo(s"Updated $blockId in memory on ${blockManagerId.hostPort}" +
s" (current size: ${Utils.bytesToString(memSize)}," +
s" original size: ${Utils.bytesToString(originalMemSize)}," +
s" free: ${Utils.bytesToString(_remainingMem)})")
} else {
logInfo(s"Added $blockId in memory on ${blockManagerId.hostPort}" +
s" (size: ${Utils.bytesToString(memSize)}," +
s" free: ${Utils.bytesToString(_remainingMem)})")
}
}
if (storageLevel.useDisk) {
blockStatus = BlockStatus(storageLevel, memSize = 0, diskSize = diskSize)
_blocks.put(blockId, blockStatus)
if (blockExists) {
logInfo(s"Updated $blockId on disk on ${blockManagerId.hostPort}" +
s" (current size: ${Utils.bytesToString(diskSize)}," +
s" original size: ${Utils.bytesToString(originalDiskSize)})")
} else {
logInfo(s"Added $blockId on disk on ${blockManagerId.hostPort}" +
s" (size: ${Utils.bytesToString(diskSize)})")
}
}
if (!blockId.isBroadcast && blockStatus.isCached) {
_cachedBlocks += blockId
}
} else if (blockExists) {
// If isValid is not true, drop the block.
_blocks.remove(blockId)
_cachedBlocks -= blockId
if (originalLevel.useMemory) {
logInfo(s"Removed $blockId on ${blockManagerId.hostPort} in memory" +
s" (size: ${Utils.bytesToString(originalMemSize)}," +
s" free: ${Utils.bytesToString(_remainingMem)})")
}
if (originalLevel.useDisk) {
logInfo(s"Removed $blockId on ${blockManagerId.hostPort} on disk" +
s" (size: ${Utils.bytesToString(originalDiskSize)})")
}
}
}
- 在處理 blockLoacations作儿,根據(jù) blockId 判斷 blockLocations 中是否包含該數(shù)據(jù)塊剩燥。如果包含該數(shù)據(jù)塊,則根據(jù)數(shù)據(jù)塊的操作,當(dāng)進(jìn)行數(shù)據(jù)更新時(shí)灭红,更新數(shù)據(jù)塊所在的 BlockMangerId 信息,當(dāng)進(jìn)行數(shù)據(jù)刪除時(shí)口注,則移除該 BlockMangerId 信息变擒,在刪除過程中判斷數(shù)據(jù)塊對(duì)應(yīng)的 Executor 是否為空,如果為空表示在集群中刪除了該數(shù)據(jù)塊寝志,則在 blockLoactions 刪除該數(shù)據(jù)塊信息娇斑。
3.2、獲取遠(yuǎn)程節(jié)點(diǎn)數(shù)據(jù)塊
應(yīng)用程序數(shù)據(jù)存儲(chǔ)后材部,在獲取遠(yuǎn)程節(jié)點(diǎn)數(shù)據(jù)毫缆、獲取 RDD 執(zhí)行的首選位置等操作時(shí)需要根據(jù)數(shù)據(jù)塊的編號(hào)查詢數(shù)據(jù)塊所處的位置,此時(shí)發(fā)送 GetLocations 或 GetLocationsMultipleBlockIds 給 BlockManagerMasterEndpoint 乐导,通過對(duì)元數(shù)據(jù)的查詢獲取數(shù)據(jù)塊的位置信息苦丁。
BlockManagerMasterEndpoint # receiveAndReply:
case GetLocations(blockId) =>
context.reply(getLocations(blockId))
BlockManagerMasterEndpoint # getLocations:
private def getLocations(blockId: BlockId): Seq[BlockManagerId] = {
if (blockLocations.containsKey(blockId)) blockLocations.get(blockId).toSeq else Seq.empty
}
3.3、刪除數(shù)據(jù)塊
Spark 提供刪除 RDD物臂、數(shù)據(jù)塊旺拉、廣播變量的方式。當(dāng)數(shù)據(jù)需要?jiǎng)h除的時(shí)候棵磷,提交刪除信息給 BlockMangerSlaveEndPoint 終端點(diǎn)蛾狗,在該終端點(diǎn)發(fā)起刪除操作,刪除操作一方面需要?jiǎng)h除 Driver 端元數(shù)據(jù)信息仪媒,另一方面需要發(fā)送消息通知 Executor沉桌,刪除對(duì)應(yīng)的物理數(shù)據(jù)。
比如 SparkContext 中 unpersistRDD:
/**
* Unpersist an RDD from memory and/or disk storage
*/
private[spark] def unpersistRDD(rddId: Int, blocking: Boolean = true) {
env.blockManager.master.removeRdd(rddId, blocking)
persistentRdds.remove(rddId)
listenerBus.post(SparkListenerUnpersistRDD(rddId))
}
BlockManagerMaster # removeRdd:
/** Remove all blocks belonging to the given RDD. */
def removeRdd(rddId: Int, blocking: Boolean) {
val future = driverEndpoint.askSync[Future[Seq[Int]]](RemoveRdd(rddId))
future.failed.foreach(e =>
logWarning(s"Failed to remove RDD $rddId - ${e.getMessage}", e)
)(ThreadUtils.sameThread)
if (blocking) {
timeout.awaitResult(future)
}
}
經(jīng)過消息傳遞來到 BlockManagerMasterEndpoint 消息終端算吩。
BlockManagerMasterEndpoint # receiveAndReply:
case RemoveRdd(rddId) =>
context.reply(removeRdd(rddId))
BlockManagerMasterEndpoint # removeRdd:
private def removeRdd(rddId: Int): Future[Seq[Int]] = {
// First remove the metadata for the given RDD, and then asynchronously remove the blocks
// from the slaves.
// Find all blocks for the given RDD, remove the block from both blockLocations and
// the blockManagerInfo that is tracking the blocks.
// 首先根據(jù) RDDId 獲取該 RDD 對(duì)應(yīng)的數(shù)據(jù)塊信息
val blocks = blockLocations.asScala.keys.flatMap(_.asRDDId).filter(_.rddId == rddId)
blocks.foreach { blockId =>
// 然后根據(jù)該 blockId 找出這些數(shù)據(jù)塊在 blockManagerId 中的列表留凭,遍歷這些列表并刪除
val bms: mutable.HashSet[BlockManagerId] = blockLocations.get(blockId)
bms.foreach(bm => blockManagerInfo.get(bm).foreach(_.removeBlock(blockId)))
// 同時(shí)刪除 blockLocations 對(duì)應(yīng)數(shù)據(jù)塊的元數(shù)據(jù)
blockLocations.remove(blockId)
}
// Ask the slaves to remove the RDD, and put the result in a sequence of Futures.
// The dispatcher is used as an implicit argument into the Future sequence construction.
// 最后發(fā)送 RemoveRDD 消息給 Executor,通知其刪除 RDD
val removeMsg = RemoveRdd(rddId)
val futures = blockManagerInfo.values.map { bm =>
bm.slaveEndpoint.ask[Int](removeMsg).recover {
case e: IOException =>
logWarning(s"Error trying to remove RDD $rddId from block manager ${bm.blockManagerId}",
e)
0 // zero blocks were removed
}
}.toSeq
Future.sequence(futures)
}
3.4赌莺、總結(jié)
BlockManger 存在于 Dirver 端和每個(gè) Executor 中冰抢,在 Driver 端的 BlockManger 保存了數(shù)據(jù)的元數(shù)據(jù)信息,而在 Executor 的 BlockManger 根據(jù)接受到消息進(jìn)行操作:
當(dāng) Executor 的 BlockManger 接受到讀取數(shù)據(jù)時(shí)艘狭,根據(jù)數(shù)據(jù)塊所在節(jié)點(diǎn)是否為本地使用 BlockManger 不同的方法進(jìn)行處理挎扰。如果在本地,則直接調(diào)用 MemoryStore 和 DiskStore 中的取方法 getValues/getBytes 進(jìn)行讀瘸惨簟遵倦;如果在遠(yuǎn)程,則調(diào)用 BlockTransferService 的服務(wù)進(jìn)行獲取遠(yuǎn)程數(shù)據(jù)節(jié)點(diǎn)上的數(shù)據(jù)官撼。
當(dāng) Executor 的 BlockManger 接收到寫入數(shù)據(jù)時(shí)梧躺,如果不需要?jiǎng)?chuàng)建副本,則調(diào)用 BlockStore 的接口方法進(jìn)行處理,根據(jù)數(shù)據(jù)寫入的存儲(chǔ)模型掠哥,決定調(diào)用對(duì)應(yīng)的寫入方法巩踏。
BlockManager # get:
/**
* Get a block from the block manager (either local or remote).
*
* This acquires a read lock on the block if the block was stored locally and does not acquire
* any locks if the block was fetched from a remote block manager. The read lock will
* automatically be freed once the result's `data` iterator is fully consumed.
*/
def get[T: ClassTag](blockId: BlockId): Option[BlockResult] = {
val local = getLocalValues(blockId)
if (local.isDefined) {
logInfo(s"Found block $blockId locally")
return local
}
val remote = getRemoteValues[T](blockId)
if (remote.isDefined) {
logInfo(s"Found block $blockId remotely")
return remote
}
None
}