Storage模塊負責(zé)管理spark在計算過程中產(chǎn)生的數(shù)據(jù),對用戶來說,spark的編程面向的是RDD這種抽象的邏輯數(shù)據(jù)集筷畦,對RDD的轉(zhuǎn)換和動作完成對數(shù)據(jù)運算邏輯的處理。而在RDD優(yōu)雅外表之下刺洒,Storage模塊則是兢兢業(yè)業(yè)的管理著數(shù)據(jù)的計算鳖宾,可以說是背后的功臣。
storage模塊的架構(gòu)
如上圖所示逆航,Storage模塊與Driver和Executor遙相呼應(yīng)鼎文,也是標(biāo)準(zhǔn)的Master-Slave的模式,Block對應(yīng)的是RDD中Partition的概念纸泡,是用來存儲數(shù)據(jù)的漂问,而BlockManager就是用來管理每個節(jié)點上的Block,BlockManager的Master角色是在Driver上創(chuàng)建的女揭,而Slave角色是在Executors上創(chuàng)建的蚤假,想要理解這個架構(gòu)圖,需要先看一下BlockManager類的構(gòu)造方法和變量
**
* Manager running on every node (driver and executors) which provides interfaces for putting and
* retrieving blocks both locally and remotely into various stores (memory, disk, and off-heap).
*
* Note that #initialize() must be called before the BlockManager is usable.
*/
private[spark] class BlockManager(
executorId: String,
rpcEnv: RpcEnv,
val master: BlockManagerMaster,
defaultSerializer: Serializer,
val conf: SparkConf,
val memoryManager: MemoryManager,
mapOutputTracker: MapOutputTracker,
shuffleManager: ShuffleManager,
blockTransferService: BlockTransferService,
securityManager: SecurityManager,
numUsableCores: Int)
extends BlockDataManager with Logging
在BlockManager的參數(shù)列表中吧兔,傳入了BlockManagerMaster實例
private val slaveEndpoint = rpcEnv.setupEndpoint(
"BlockManagerEndpoint" + BlockManager.ID_GENERATOR.next,
new BlockManagerSlaveEndpoint(rpcEnv, this, mapOutputTracker))
而在BlockManager的變量中磷仰,創(chuàng)建了BlockManagerSlaveEndpoint對象
BlockManager運行在每一個節(jié)點(Driver和Executors),需要調(diào)用其initialize()方法進行初始化后才能使用境蔼,initialize方法:
def initialize(appId: String): Unit = {
// 初始化BlockTransferService和ShuffleClient用于shuffle過程
blockTransferService.init(this)
shuffleClient.init(appId)
blockManagerId = BlockManagerId(
executorId, blockTransferService.hostName, blockTransferService.port)
// 外部shuffle服務(wù)配置
shuffleServerId = if (externalShuffleServiceEnabled) {
logInfo(s"external shuffle service port = $externalShuffleServicePort")
BlockManagerId(executorId, blockTransferService.hostName, externalShuffleServicePort)
} else {
blockManagerId
}
// 向master注冊
master.registerBlockManager(blockManagerId, maxMemory, slaveEndpoint)
// Register Executors' configuration with the local shuffle service, if one should exist.
if (externalShuffleServiceEnabled && !blockManagerId.isDriver) {
registerWithExternalShuffleServer()
}
}
private def registerWithExternalShuffleServer() {
可以看到灶平,在初始化方法中伺通,向master注冊了BlockManager的信息,也就同時注冊了所有的slaveEndpoint
以上逢享,可以總結(jié)為兩點:
- Driver端的BlockManager實例化了BlockManagerMaster并注冊了BlockManagerMasterEndpoint罐监,且持有所有BlockManagerSlaveEndpoint實例的引用
- Executor端的BlockManager實例化過程中持有了BlockManagerMaster的引用,并向BlockManagerMasterEndpoint注冊當(dāng)前節(jié)點的BlockManagerSlaveEndpoint
所以storage實際的框架應(yīng)該如下所示storage-frame2瞒爬,但為了表達簡單弓柱,就簡化為上圖storage-frame1
BlockManagerMasterEndpoint中維護了一些數(shù)據(jù)結(jié)構(gòu),為了方便下面的源碼閱讀侧但,有必要單獨拉出來說明一下:
- private val blockManagerInfo = new mutable.HashMap[BlockManagerId, BlockManagerInfo]
Mapping from block manager id to the block manager's information.
保存了BlockManagerId與BlockManagerInfo的映射
BlockManagerInfo保存了Slave節(jié)點的內(nèi)存使用情況矢空、Block的狀態(tài)、BlockManagerSlaveEndpoint的Reference
- private val blockManagerIdByExecutor = new mutable.HashMap[String, BlockManagerId]
Mapping from executor ID to block manager ID.
保存了Executor ID與BlockManagerId的映射禀横,這樣Master就可以通過ExecutorID迅速的查找到相應(yīng)的BlockManagerId
- private val blockLocations = new JHashMap[BlockId, mutable.HashSet[BlockManagerId]]
Mapping from block id to the set of block managers that have the block.
保存Block是在那些BlockManager上的屁药,因為Block可能在多個Slave上都有備份
Master和Slave的消息傳遞
通過上面的架構(gòu)說明可知,Driver上的BlockManagerMaster管理著所有Block的元數(shù)據(jù)信息柏锄,而Executor上則儲存著所有的Block數(shù)據(jù)酿箭,那Master和Slave之間是如何通信的呢?我們通過RDD的unpersist()命令來了解一下這個過程绢彤。
RDD的unpersist命令是用來釋放緩存數(shù)據(jù)的
def unpersist(blocking: Boolean = true): this.type = {
logInfo("Removing RDD " + id + " from persistence list")
sc.unpersistRDD(id, blocking)
storageLevel = StorageLevel.NONE
this
}
調(diào)用sparkContext的unpersistRDD命令:
/**
* Unpersist an RDD from memory and/or disk storage
*/
private[spark] def unpersistRDD(rddId: Int, blocking: Boolean = true) {
env.blockManager.master.removeRdd(rddId, blocking)
persistentRdds.remove(rddId)
listenerBus.post(SparkListenerUnpersistRDD(rddId))
}
這里的blockManager是Driver節(jié)點的七问,調(diào)用master的removeRdd()
/** Remove all blocks belonging to the given RDD. */
def removeRdd(rddId: Int, blocking: Boolean) {
val future = driverEndpoint.askWithRetry[Future[Seq[Int]]](RemoveRdd(rddId))
future.onFailure {
case e: Exception =>
logWarning(s"Failed to remove RDD $rddId - ${e.getMessage}", e)
}(ThreadUtils.sameThread)
if (blocking) {
timeout.awaitResult(future)
}
}
BlockManagerMaster首先通知各Slave節(jié)點,發(fā)送RemoveRdd的消息茫舶,拿到結(jié)果后做一些處理
Master端的動作最后通過調(diào)用BlockManagerMasterEndpoint的removeRdd()方法來完成
private def removeRdd(rddId: Int): Future[Seq[Int]] = {
// First remove the metadata for the given RDD, and then asynchronously remove the blocks
// from the slaves.
// Find all blocks for the given RDD, remove the block from both blockLocations and
// the blockManagerInfo that is tracking the blocks.
// 首先要刪除Master上保存的關(guān)于此RDD的元數(shù)據(jù)信息
val blocks: Iterable[RDDBlockId] = blockLocations.asScala.keys.flatMap(_.asRDDId).filter(_.rddId == rddId)
blocks.foreach { blockId =>
val bms: mutable.HashSet[BlockManagerId] = blockLocations.get(blockId)
bms.foreach(bm => blockManagerInfo.get(bm).foreach(_.removeBlock(blockId)))
blockLocations.remove(blockId)
}
// Ask the slaves to remove the RDD, and put the result in a sequence of Futures.
// The dispatcher is used as an implicit argument into the Future sequence construction.
// 其次要刪除Slave上的關(guān)于此RDD的信息
val removeMsg = RemoveRdd(rddId)
Future.sequence(
blockManagerInfo.values.map { bm =>
bm.slaveEndpoint.ask[Int](removeMsg)
}.toSeq
)
}
首先處理了Master中的有關(guān)要刪除RDD的元數(shù)據(jù)信息:在blockLocations 中刪除了有關(guān)于此RDD的Block信息械巡,然后想各個slave節(jié)點發(fā)送RemoveRdd信息
BlockManagerSlaveEndpoint節(jié)點在接收到Master節(jié)點的信息后作何處理呢?
case RemoveRdd(rddId) =>
doAsync[Int]("removing RDD " + rddId, context) {
blockManager.removeRdd(rddId)
}
最后調(diào)用的是BlockManager的removeBlock方法來完成slave節(jié)點刪除RDD緩存的任務(wù)
def removeBlock(blockId: BlockId, tellMaster: Boolean = true): Unit = {
logDebug(s"Removing block $blockId")
val info: BlockInfo = blockInfo.get(blockId).orNull
```scala
case RemoveRdd(rddId) =>
doAsync[Int]("removing RDD " + rddId, context) {
blockManager.removeRdd(rddId)
}
最后調(diào)用的是BlockManager的removeBlock方法來完成slave節(jié)點刪除RDD緩存的任務(wù)
def removeBlock(blockId: BlockId, tellMaster: Boolean = true): Unit = {
logDebug(s"Removing block $blockId")
val info: BlockInfo = blockInfo.get(blockId).orNull
if (info != null && pendingToRemove.putIfAbsent(blockId, currentTaskAttemptId) == 0L) {
try {
info.synchronized {
val level = info.level
// Removals are idempotent in disk store and memory store. At worst, we get a warning.
val removedFromMemory = if (level.useMemory) memoryStore.remove(blockId) else false
val removedFromDisk = if (level.useDisk) diskStore.remove(blockId) else false
val removedFromExternalBlockStore =
if (externalBlockStoreInitialized) externalBlockStore.remove(blockId) else false
if (!removedFromMemory && !removedFromDisk && !removedFromExternalBlockStore) {
logWarning(s"Block $blockId could not be removed as it was not found in either " +
"the disk, memory, or external block store")
}
blockInfo.remove(blockId)
val status = getCurrentBlockStatus(blockId, info)
if (tellMaster && info.tellMaster) {
reportBlockStatus(blockId, info, status)
}
Option(TaskContext.get()).foreach { tc =>
val metrics = tc.taskMetrics()
val lastUpdatedBlocks = metrics.updatedBlocks.getOrElse(Seq[(BlockId, BlockStatus)]())
metrics.updatedBlocks = Some(lastUpdatedBlocks ++ Seq((blockId, status)))
}
}
} finally {
pendingToRemove.remove(blockId)
}
} else {
// The block has already been removed; do nothing.
logWarning(s"Asked to remove block $blockId, which does not exist")
}
}
Storage數(shù)據(jù)可能存儲在內(nèi)存饶氏、磁盤或是外部存儲中讥耗,在Executor上移除相關(guān)RDD數(shù)據(jù)的時候這三個地方都需要考慮到,刪除完之后需要更新一下數(shù)據(jù)結(jié)構(gòu)和TaskContext中的元數(shù)據(jù)疹启。
經(jīng)過以上master和slave的消息傳遞和一系列操作之后古程,unpersist操作已經(jīng)完成,相關(guān)RDD的緩存已從Storage中刪除喊崖。
以下將列舉BlockManagerMasterEndpoint和BlockManagerSlaveEndpoint之間各種消息傳遞內(nèi)容:
master 的消息
// 由BlockManagerMaster向BlockManagerMasterEndpoint發(fā)起的注冊挣磨,通過注冊,Master Endpoint會保存該BlockManager所包含的Block信息
case RegisterBlockManager(blockManagerId, maxMemSize, slaveEndpoint) =>
register(blockManagerId, maxMemSize, slaveEndpoint)
context.reply(true)
// 向Master匯報Block的信息荤懂,Master會記錄這些信息并且供slave查詢
case _updateBlockInfo @ UpdateBlockInfo(
blockManagerId, blockId, storageLevel, deserializedSize, size, externalBlockStoreSize) =>
context.reply(updateBlockInfo(
blockManagerId, blockId, storageLevel, deserializedSize, size, externalBlockStoreSize))
listenerBus.post(SparkListenerBlockUpdated(BlockUpdatedInfo(_updateBlockInfo)))
// 獲得某個Block所在的位置信息茁裙,返回有BlockManagerId組成的列表,Block可能在多個節(jié)點都有備份
case GetLocations(blockId) =>
context.reply(getLocations(blockId))
// 一次獲取多個Block的位置信息
case GetLocationsMultipleBlockIds(blockIds) =>
context.reply(getLocationsMultipleBlockIds(blockIds))
// 獲取其他的BlockManager节仿,這個在做Block的副本時會用到
case GetPeers(blockManagerId) =>
context.reply(getPeers(blockManagerId))
case GetExecutorEndpointRef(executorId) =>
context.reply(getExecutorEndpointRef(executorId))
// 獲取所有的Executor的內(nèi)存使用狀態(tài)晤锥,包括使用的最大內(nèi)存大小、剩余內(nèi)存大小
case GetMemoryStatus =>
context.reply(memoryStatus)
// 獲取每個Executor的Storage狀態(tài),包括最大可用內(nèi)存數(shù)和Block的信息
case GetStorageStatus =>
context.reply(storageStatus)
// 根據(jù)blockId獲取Block的status矾瘾,一般用于測試
case GetBlockStatus(blockId, askSlaves) =>
context.reply(blockStatus(blockId, askSlaves))
// 根據(jù)filter篩選合適的Block女轿,返回BlockId
case GetMatchingBlockIds(filter, askSlaves) =>
context.reply(getMatchingBlockIds(filter, askSlaves))
// 根據(jù)RDDId刪除RDD的緩存
case RemoveRdd(rddId) =>
context.reply(removeRdd(rddId))
// 根據(jù)shuffleId移除相關(guān)的Shuffle
case RemoveShuffle(shuffleId) =>
context.reply(removeShuffle(shuffleId))
// 移除廣播變量
case RemoveBroadcast(broadcastId, removeFromDriver) =>
context.reply(removeBroadcast(broadcastId, removeFromDriver))
// 移除Block
case RemoveBlock(blockId) =>
removeBlockFromWorkers(blockId)
context.reply(true)
// 刪除Master上保存的executorId對應(yīng)的Executor上的BlockManager的信息
case RemoveExecutor(execId) =>
removeExecutor(execId)
context.reply(true)
// 在停止BlockManagerMaster的時候調(diào)用,它會停止Master的Endpoint
case StopBlockManagerMaster =>
context.reply(true)
stop()
// Master和Slave的心跳的實現(xiàn)壕翩,通過Executor和Driver之間的心跳來實現(xiàn)的
case BlockManagerHeartbeat(blockManagerId) =>
context.reply(heartbeatReceived(blockManagerId))
// 根據(jù)executorId判斷是否cache了Block
case HasCachedBlocks(executorId) =>
blockManagerIdByExecutor.get(executorId) match {
case Some(bm) =>
if (blockManagerInfo.contains(bm)) {
val bmInfo = blockManagerInfo(bm)
context.reply(bmInfo.cachedBlocks.nonEmpty)
} else {
context.reply(false)
}
case None => context.reply(false)
}
slave的消息
// 根據(jù)blockId刪除該Executor上的Block
case RemoveBlock(blockId) =>
doAsync[Boolean]("removing block " + blockId, context) {
blockManager.removeBlock(blockId)
true
}
// 根據(jù)rddId刪除該Executor上的Rdd
case RemoveRdd(rddId) =>
doAsync[Int]("removing RDD " + rddId, context) {
blockManager.removeRdd(rddId)
}
//根據(jù)shuffleID刪除該Executor上與此shuffle有關(guān)的Block蛉迹,需要先卸載mapOutputTracker上的注冊信息
case RemoveShuffle(shuffleId) =>
doAsync[Boolean]("removing shuffle " + shuffleId, context) {
if (mapOutputTracker != null) {
mapOutputTracker.unregisterShuffle(shuffleId)
}
SparkEnv.get.shuffleManager.unregisterShuffle(shuffleId)
}
case RemoveBroadcast(broadcastId, _) =>
doAsync[Int]("removing broadcast " + broadcastId, context) {
blockManager.removeBroadcast(broadcastId, tellMaster = true)
}
// 根據(jù)blockId向Master返回該Block的status,一般用于測試戈泼,注意這個操作非常耗時
case GetBlockStatus(blockId, _) =>
context.reply(blockManager.getStatus(blockId))
// 根據(jù)filter向Master返回符合filter的所有BlockId婿禽,一般用于測試
case GetMatchingBlockIds(filter, _) =>
context.reply(blockManager.getMatchingBlockIds(filter))
case TriggerThreadDump =>
context.reply(Utils.getThreadDump())