目錄
前言
在上一篇文章中,我們對(duì)與塊相關(guān)的BlockId、BlockData和BlockInfo有了比較全面的理解鲤竹。前面已經(jīng)提到過,塊在讀寫時(shí)有鎖機(jī)制,并且委托給BlockInfoManager來管理定罢。雖然BlockInfoManager的字面意思是“塊信息管理器”族购,但管理塊信息的意圖并不明顯,管理塊的鎖才是真正主要的任務(wù)抵拘。本文就來研究BlockInfoManager的具體實(shí)現(xiàn)哎榴。
BlockInfoManager的成員屬性及構(gòu)造方法
代碼#22.1 - o.a.s.storage.BlockInfoManager的成員屬性及構(gòu)造方法
private[storage] class BlockInfoManager extends Logging {
private type TaskAttemptId = Long
@GuardedBy("this")
private[this] val infos = new mutable.HashMap[BlockId, BlockInfo]
@GuardedBy("this")
private[this] val writeLocksByTask =
new mutable.HashMap[TaskAttemptId, mutable.Set[BlockId]]
with mutable.MultiMap[TaskAttemptId, BlockId]
@GuardedBy("this")
private[this] val readLocksByTask =
new mutable.HashMap[TaskAttemptId, ConcurrentHashMultiset[BlockId]]
registerTask(BlockInfo.NON_TASK_WRITER)
def registerTask(taskAttemptId: TaskAttemptId): Unit = synchronized {
require(!readLocksByTask.contains(taskAttemptId),
s"Task attempt $taskAttemptId is already registered")
readLocksByTask(taskAttemptId) = ConcurrentHashMultiset.create()
}
- TaskAttemptId:實(shí)際上就是對(duì)Long型的重命名,用來表示一次Task嘗試的ID僵蛛。
- infos:存儲(chǔ)BlockId與BlockInfo的映射關(guān)系尚蝌,這就是為什么BlockInfo結(jié)構(gòu)中并沒有包含BlockId對(duì)應(yīng)的字段。
- writeLocksByTask:存儲(chǔ)TaskAttemptId與該Task獲取寫鎖的塊之間的映射關(guān)系充尉。注意BlockId存儲(chǔ)在集合中飘言,也就是說一次Task嘗試可以獲取多個(gè)塊的寫鎖。
- readLocksByTask:存儲(chǔ)TaskAttemptId與該Task獲取讀鎖的塊之間的映射關(guān)系驼侠。一次Task嘗試也可以獲取多個(gè)塊的讀鎖姿鸿。
在BlockInfoManager構(gòu)造時(shí),會(huì)調(diào)用registerTask()方法注冊(cè)任務(wù)倒源,其實(shí)就是將NON_TASK_WRITER這個(gè)TaskAttemptId對(duì)應(yīng)的BlockId集合初始化好苛预。NON_TASK_WRITER在BlockInfo伴生對(duì)象里定義,是一個(gè)特殊的標(biāo)記(-1024)笋熬,表示當(dāng)前持有寫鎖的并非一個(gè)具體的Task热某,而是其他線程。registerTask()也會(huì)被BlockManager調(diào)用,這是后話昔馋。
下面我們來看看BlockInfoManager提供的與鎖相關(guān)的操作筹吐。
BlockInfoManager提供的鎖方法
注意這些方法都是同步方法(被synchronized關(guān)鍵字修飾的)。
獲取讀鎖
lockForReading()方法為一個(gè)塊加讀鎖秘遏,其代碼如下丘薛。
代碼#21.2 - o.a.s.storage.BlockInfoManager.lockForReading()方法
def lockForReading(
blockId: BlockId,
blocking: Boolean = true): Option[BlockInfo] = synchronized {
logTrace(s"Task $currentTaskAttemptId trying to acquire read lock for $blockId")
do {
infos.get(blockId) match {
case None => return None
case Some(info) =>
if (info.writerTask == BlockInfo.NO_WRITER) {
info.readerCount += 1
readLocksByTask(currentTaskAttemptId).add(blockId)
logTrace(s"Task $currentTaskAttemptId acquired read lock for $blockId")
return Some(info)
}
}
if (blocking) {
wait()
}
} while (blocking)
None
}
注意blocking參數(shù),它表示加讀鎖的過程是否阻塞(默認(rèn)阻塞)垄提。如果不阻塞的話榔袋,獲取讀鎖失敗就會(huì)立即返回。
該方法的執(zhí)行流程是:根據(jù)塊ID獲取它對(duì)應(yīng)的BlockInfo铡俐,檢查它的writerTask是否為NO_WRITER(值為-1凰兑,表示該BlockInfo的寫鎖沒有被占用)。如果是审丘,就自增BlockInfo結(jié)構(gòu)中的readerCount計(jì)數(shù)吏够,并將塊ID加入readLocksByTask映射,視為加鎖成功滩报。若blocking為true的話锅知,就會(huì)調(diào)用Object.wait()方法等待,直到該塊的寫鎖釋放后被notify()/notifyAll()方法喚醒脓钾∈鄱茫可見,如果該塊的寫鎖一直不釋放可训,那么lockForReading()方法可能會(huì)無限等待下去昌妹。
獲取寫鎖
與lockForReading()方法相對(duì)地,lockForWriting()方法為一個(gè)塊加寫鎖握截,其代碼如下飞崖。
代碼#21.3 - o.a.s.storage.BlockInfoManager.lockForWriting()方法
def lockForWriting(
blockId: BlockId,
blocking: Boolean = true): Option[BlockInfo] = synchronized {
logTrace(s"Task $currentTaskAttemptId trying to acquire write lock for $blockId")
do {
infos.get(blockId) match {
case None => return None
case Some(info) =>
if (info.writerTask == BlockInfo.NO_WRITER && info.readerCount == 0) {
info.writerTask = currentTaskAttemptId
writeLocksByTask.addBinding(currentTaskAttemptId, blockId)
logTrace(s"Task $currentTaskAttemptId acquired write lock for $blockId")
return Some(info)
}
}
if (blocking) {
wait()
}
} while (blocking)
None
}
這個(gè)方法的執(zhí)行流程與lockForReading()方法相似,不過會(huì)將BlockInfo中的writerTask字段設(shè)為Task嘗試ID谨胞,將塊ID加入writeLocksByTask映射固歪,并且判斷條件是沒有讀鎖也沒有寫鎖。也就是說胯努,塊的讀鎖和寫鎖牢裳、寫鎖和寫鎖之間是互斥的,而讀鎖和讀鎖之間是可以共享的叶沛,并且讀鎖可重入蒲讯,寫鎖不可重入。
同樣地恬汁,如果該塊的其他寫鎖一直不釋放伶椿,那么lockForWriting()方法也有可能會(huì)無限等待下去。
另外氓侧,還有一個(gè)lockNewBlockForWriting()方法用來獲取一個(gè)新塊的寫鎖脊另。
代碼#21.4 - o.a.s.storage.BlockInfoManager.lockNewBlockForWriting()方法
def lockNewBlockForWriting(
blockId: BlockId,
newBlockInfo: BlockInfo): Boolean = synchronized {
logTrace(s"Task $currentTaskAttemptId trying to put $blockId")
lockForReading(blockId) match {
case Some(info) =>
false
case None =>
infos(blockId) = newBlockInfo
lockForWriting(blockId)
true
}
}
該方法先試圖持有blockId對(duì)應(yīng)的塊的讀鎖。如果能獲取到约巷,說明該塊已經(jīng)存在了偎痛,亦即已經(jīng)有其他線程贏得競(jìng)爭(zhēng)并寫了這個(gè)塊,沒有必要再寫独郎,直接返回false(表示返回讀鎖)踩麦。反之,就將這個(gè)新的塊放入infos映射氓癌,然后獲取其對(duì)應(yīng)的寫鎖谓谦,并返回true。
釋放鎖
釋放單個(gè)塊的鎖的邏輯由unlock()方法實(shí)現(xiàn)贪婉。
代碼#21.5 - o.a.s.storage.BlockInfoManager.unlock()方法
def unlock(blockId: BlockId, taskAttemptId: Option[TaskAttemptId] = None): Unit = synchronized {
val taskId = taskAttemptId.getOrElse(currentTaskAttemptId)
logTrace(s"Task $taskId releasing lock for $blockId")
val info = get(blockId).getOrElse {
throw new IllegalStateException(s"Block $blockId not found")
}
if (info.writerTask != BlockInfo.NO_WRITER) {
info.writerTask = BlockInfo.NO_WRITER
writeLocksByTask.removeBinding(taskId, blockId)
} else {
assert(info.readerCount > 0, s"Block $blockId is not locked for reading")
info.readerCount -= 1
val countsForTask = readLocksByTask(taskId)
val newPinCountForTask: Int = countsForTask.remove(blockId, 1) - 1
assert(newPinCountForTask >= 0,
s"Task $taskId release lock on block $blockId more times than it acquired it")
}
notifyAll()
}
該方法首先獲取Task嘗試ID與對(duì)應(yīng)的塊信息(get()方法就負(fù)責(zé)從infos映射中取得塊信息)反粥,然后檢查當(dāng)前Task如果已經(jīng)持有塊的寫鎖,就將writerTask置為NO_WRITER疲迂,即釋放寫鎖才顿。如果未持有寫鎖,就將readerCount自減尤蒿,即釋放讀鎖郑气。最后,調(diào)用notifyAll()方法喚醒所有塊上等待的線程腰池。
另外尾组,還有一個(gè)releaseAllLocksForTask()方法,它會(huì)釋放當(dāng)前TaskAttemptId對(duì)應(yīng)的所有鎖巩螃,并返回所有塊ID的序列演怎。它的實(shí)現(xiàn)如下,沒有什么特殊的點(diǎn)避乏,看官可以自行參考爷耀。
代碼#21.6 - o.a.s.storage.BlockInfoManager.releaseAllLocksForTask()方法
def releaseAllLocksForTask(taskAttemptId: TaskAttemptId): Seq[BlockId] = synchronized {
val blocksWithReleasedLocks = mutable.ArrayBuffer[BlockId]()
val readLocks = readLocksByTask.remove(taskAttemptId).getOrElse(ImmutableMultiset.of[BlockId]())
val writeLocks = writeLocksByTask.remove(taskAttemptId).getOrElse(Seq.empty)
for (blockId <- writeLocks) {
infos.get(blockId).foreach { info =>
assert(info.writerTask == taskAttemptId)
info.writerTask = BlockInfo.NO_WRITER
}
blocksWithReleasedLocks += blockId
}
readLocks.entrySet().iterator().asScala.foreach { entry =>
val blockId = entry.getElement
val lockCount = entry.getCount
blocksWithReleasedLocks += blockId
get(blockId).foreach { info =>
info.readerCount -= lockCount
assert(info.readerCount >= 0)
}
}
notifyAll()
blocksWithReleasedLocks
}
鎖降級(jí)
鎖降級(jí)的標(biāo)準(zhǔn)定義就是寫線程在持有寫鎖的情況下去獲取讀鎖,然后釋放寫鎖拍皮。BlockInfoManager中的塊鎖降級(jí)實(shí)現(xiàn)如下歹叮。
代碼#21.7 - o.a.s.storage.BlockInfoManager.downgradeLock()方法
def downgradeLock(blockId: BlockId): Unit = synchronized {
logTrace(s"Task $currentTaskAttemptId downgrading write lock for $blockId")
val info = get(blockId).get
require(info.writerTask == currentTaskAttemptId,
s"Task $currentTaskAttemptId tried to downgrade a write lock that it does not hold on" +
s" block $blockId")
unlock(blockId)
val lockOutcome = lockForReading(blockId, blocking = false)
assert(lockOutcome.isDefined)
}
可見,這個(gè)降級(jí)的過程與上面的標(biāo)準(zhǔn)定義有所出入铆帽,實(shí)際上是先釋放了寫鎖咆耿,然后重新獲取了讀鎖,但結(jié)果是相同的爹橱。
刪除BlockInfo
removeBlock()方法從infos映射中刪掉對(duì)應(yīng)的BlockInfo萨螺,同時(shí)釋放它對(duì)應(yīng)的所有鎖。代碼如下。
代碼#21.8 - o.a.s.storage.BlockInfoManager.removeBlock()方法
def removeBlock(blockId: BlockId): Unit = synchronized {
logTrace(s"Task $currentTaskAttemptId trying to remove block $blockId")
infos.get(blockId) match {
case Some(blockInfo) =>
if (blockInfo.writerTask != currentTaskAttemptId) {
throw new IllegalStateException(
s"Task $currentTaskAttemptId called remove() on block $blockId without a write lock")
} else {
infos.remove(blockId)
blockInfo.readerCount = 0
blockInfo.writerTask = BlockInfo.NO_WRITER
writeLocksByTask.removeBinding(currentTaskAttemptId, blockId)
}
case None =>
throw new IllegalArgumentException(
s"Task $currentTaskAttemptId called remove() on non-existent block $blockId")
}
notifyAll()
}
可見慰技,只有在持有BlockInfo寫鎖的Task是當(dāng)前Task的情況下椭盏,才可以真正釋放鎖,包括將readerCount清零吻商,將writerTask置為NO_WRITER掏颊。最后仍然要調(diào)用notifyAll()方法喚醒所有塊上等待的線程。
總結(jié)
本文通過塊信息管理器BlockInfoManager的源碼艾帐,詳細(xì)解釋了Spark塊的鎖機(jī)制乌叶,包含獲取讀鎖、獲取寫鎖柒爸、釋放鎖和鎖降級(jí)的細(xì)節(jié)准浴。