Spark Core源碼精讀計(jì)劃#22:BlockInfoManager與其實(shí)現(xiàn)的塊鎖機(jī)制

目錄

前言

在上一篇文章中,我們對(duì)與塊相關(guān)的BlockId、BlockData和BlockInfo有了比較全面的理解鲤竹。前面已經(jīng)提到過,塊在讀寫時(shí)有鎖機(jī)制,并且委托給BlockInfoManager來管理定罢。雖然BlockInfoManager的字面意思是“塊信息管理器”族购,但管理塊信息的意圖并不明顯,管理塊的鎖才是真正主要的任務(wù)抵拘。本文就來研究BlockInfoManager的具體實(shí)現(xiàn)哎榴。

BlockInfoManager的成員屬性及構(gòu)造方法

代碼#22.1 - o.a.s.storage.BlockInfoManager的成員屬性及構(gòu)造方法

private[storage] class BlockInfoManager extends Logging {
  private type TaskAttemptId = Long

  @GuardedBy("this")
  private[this] val infos = new mutable.HashMap[BlockId, BlockInfo]

  @GuardedBy("this")
  private[this] val writeLocksByTask =
    new mutable.HashMap[TaskAttemptId, mutable.Set[BlockId]]
      with mutable.MultiMap[TaskAttemptId, BlockId]

  @GuardedBy("this")
  private[this] val readLocksByTask =
    new mutable.HashMap[TaskAttemptId, ConcurrentHashMultiset[BlockId]]

  registerTask(BlockInfo.NON_TASK_WRITER)

  def registerTask(taskAttemptId: TaskAttemptId): Unit = synchronized {
    require(!readLocksByTask.contains(taskAttemptId),
      s"Task attempt $taskAttemptId is already registered")
    readLocksByTask(taskAttemptId) = ConcurrentHashMultiset.create()
  }
  • TaskAttemptId:實(shí)際上就是對(duì)Long型的重命名,用來表示一次Task嘗試的ID僵蛛。
  • infos:存儲(chǔ)BlockId與BlockInfo的映射關(guān)系尚蝌,這就是為什么BlockInfo結(jié)構(gòu)中并沒有包含BlockId對(duì)應(yīng)的字段。
  • writeLocksByTask:存儲(chǔ)TaskAttemptId與該Task獲取寫鎖的塊之間的映射關(guān)系充尉。注意BlockId存儲(chǔ)在集合中飘言,也就是說一次Task嘗試可以獲取多個(gè)塊的寫鎖。
  • readLocksByTask:存儲(chǔ)TaskAttemptId與該Task獲取讀鎖的塊之間的映射關(guān)系驼侠。一次Task嘗試也可以獲取多個(gè)塊的讀鎖姿鸿。

在BlockInfoManager構(gòu)造時(shí),會(huì)調(diào)用registerTask()方法注冊(cè)任務(wù)倒源,其實(shí)就是將NON_TASK_WRITER這個(gè)TaskAttemptId對(duì)應(yīng)的BlockId集合初始化好苛预。NON_TASK_WRITER在BlockInfo伴生對(duì)象里定義,是一個(gè)特殊的標(biāo)記(-1024)笋熬,表示當(dāng)前持有寫鎖的并非一個(gè)具體的Task热某,而是其他線程。registerTask()也會(huì)被BlockManager調(diào)用,這是后話昔馋。

下面我們來看看BlockInfoManager提供的與鎖相關(guān)的操作筹吐。

BlockInfoManager提供的鎖方法

注意這些方法都是同步方法(被synchronized關(guān)鍵字修飾的)。

獲取讀鎖

lockForReading()方法為一個(gè)塊加讀鎖秘遏,其代碼如下丘薛。

代碼#21.2 - o.a.s.storage.BlockInfoManager.lockForReading()方法

  def lockForReading(
      blockId: BlockId,
      blocking: Boolean = true): Option[BlockInfo] = synchronized {
    logTrace(s"Task $currentTaskAttemptId trying to acquire read lock for $blockId")
    do {
      infos.get(blockId) match {
        case None => return None
        case Some(info) =>
          if (info.writerTask == BlockInfo.NO_WRITER) {
            info.readerCount += 1
            readLocksByTask(currentTaskAttemptId).add(blockId)
            logTrace(s"Task $currentTaskAttemptId acquired read lock for $blockId")
            return Some(info)
          }
      }
      if (blocking) {
        wait()
      }
    } while (blocking)
    None
  }

注意blocking參數(shù),它表示加讀鎖的過程是否阻塞(默認(rèn)阻塞)垄提。如果不阻塞的話榔袋,獲取讀鎖失敗就會(huì)立即返回。

該方法的執(zhí)行流程是:根據(jù)塊ID獲取它對(duì)應(yīng)的BlockInfo铡俐,檢查它的writerTask是否為NO_WRITER(值為-1凰兑,表示該BlockInfo的寫鎖沒有被占用)。如果是审丘,就自增BlockInfo結(jié)構(gòu)中的readerCount計(jì)數(shù)吏够,并將塊ID加入readLocksByTask映射,視為加鎖成功滩报。若blocking為true的話锅知,就會(huì)調(diào)用Object.wait()方法等待,直到該塊的寫鎖釋放后被notify()/notifyAll()方法喚醒脓钾∈鄱茫可見,如果該塊的寫鎖一直不釋放可训,那么lockForReading()方法可能會(huì)無限等待下去昌妹。

獲取寫鎖

與lockForReading()方法相對(duì)地,lockForWriting()方法為一個(gè)塊加寫鎖握截,其代碼如下飞崖。

代碼#21.3 - o.a.s.storage.BlockInfoManager.lockForWriting()方法

  def lockForWriting(
      blockId: BlockId,
      blocking: Boolean = true): Option[BlockInfo] = synchronized {
    logTrace(s"Task $currentTaskAttemptId trying to acquire write lock for $blockId")
    do {
      infos.get(blockId) match {
        case None => return None
        case Some(info) =>
          if (info.writerTask == BlockInfo.NO_WRITER && info.readerCount == 0) {
            info.writerTask = currentTaskAttemptId
            writeLocksByTask.addBinding(currentTaskAttemptId, blockId)
            logTrace(s"Task $currentTaskAttemptId acquired write lock for $blockId")
            return Some(info)
          }
      }
      if (blocking) {
        wait()
      }
    } while (blocking)
    None
  }

這個(gè)方法的執(zhí)行流程與lockForReading()方法相似,不過會(huì)將BlockInfo中的writerTask字段設(shè)為Task嘗試ID谨胞,將塊ID加入writeLocksByTask映射固歪,并且判斷條件是沒有讀鎖也沒有寫鎖。也就是說胯努,塊的讀鎖和寫鎖牢裳、寫鎖和寫鎖之間是互斥的,而讀鎖和讀鎖之間是可以共享的叶沛,并且讀鎖可重入蒲讯,寫鎖不可重入。

同樣地恬汁,如果該塊的其他寫鎖一直不釋放伶椿,那么lockForWriting()方法也有可能會(huì)無限等待下去。

另外氓侧,還有一個(gè)lockNewBlockForWriting()方法用來獲取一個(gè)新塊的寫鎖脊另。

代碼#21.4 - o.a.s.storage.BlockInfoManager.lockNewBlockForWriting()方法

  def lockNewBlockForWriting(
      blockId: BlockId,
      newBlockInfo: BlockInfo): Boolean = synchronized {
    logTrace(s"Task $currentTaskAttemptId trying to put $blockId")
    lockForReading(blockId) match {
      case Some(info) =>
        false
      case None =>
        infos(blockId) = newBlockInfo
        lockForWriting(blockId)
        true
    }
  }

該方法先試圖持有blockId對(duì)應(yīng)的塊的讀鎖。如果能獲取到约巷,說明該塊已經(jīng)存在了偎痛,亦即已經(jīng)有其他線程贏得競(jìng)爭(zhēng)并寫了這個(gè)塊,沒有必要再寫独郎,直接返回false(表示返回讀鎖)踩麦。反之,就將這個(gè)新的塊放入infos映射氓癌,然后獲取其對(duì)應(yīng)的寫鎖谓谦,并返回true。

釋放鎖

釋放單個(gè)塊的鎖的邏輯由unlock()方法實(shí)現(xiàn)贪婉。

代碼#21.5 - o.a.s.storage.BlockInfoManager.unlock()方法

  def unlock(blockId: BlockId, taskAttemptId: Option[TaskAttemptId] = None): Unit = synchronized {
    val taskId = taskAttemptId.getOrElse(currentTaskAttemptId)
    logTrace(s"Task $taskId releasing lock for $blockId")
    val info = get(blockId).getOrElse {
      throw new IllegalStateException(s"Block $blockId not found")
    }
    if (info.writerTask != BlockInfo.NO_WRITER) {
      info.writerTask = BlockInfo.NO_WRITER
      writeLocksByTask.removeBinding(taskId, blockId)
    } else {
      assert(info.readerCount > 0, s"Block $blockId is not locked for reading")
      info.readerCount -= 1
      val countsForTask = readLocksByTask(taskId)
      val newPinCountForTask: Int = countsForTask.remove(blockId, 1) - 1
      assert(newPinCountForTask >= 0,
        s"Task $taskId release lock on block $blockId more times than it acquired it")
    }
    notifyAll()
  }

該方法首先獲取Task嘗試ID與對(duì)應(yīng)的塊信息(get()方法就負(fù)責(zé)從infos映射中取得塊信息)反粥,然后檢查當(dāng)前Task如果已經(jīng)持有塊的寫鎖,就將writerTask置為NO_WRITER疲迂,即釋放寫鎖才顿。如果未持有寫鎖,就將readerCount自減尤蒿,即釋放讀鎖郑气。最后,調(diào)用notifyAll()方法喚醒所有塊上等待的線程腰池。

另外尾组,還有一個(gè)releaseAllLocksForTask()方法,它會(huì)釋放當(dāng)前TaskAttemptId對(duì)應(yīng)的所有鎖巩螃,并返回所有塊ID的序列演怎。它的實(shí)現(xiàn)如下,沒有什么特殊的點(diǎn)避乏,看官可以自行參考爷耀。

代碼#21.6 - o.a.s.storage.BlockInfoManager.releaseAllLocksForTask()方法

  def releaseAllLocksForTask(taskAttemptId: TaskAttemptId): Seq[BlockId] = synchronized {
    val blocksWithReleasedLocks = mutable.ArrayBuffer[BlockId]()
    val readLocks = readLocksByTask.remove(taskAttemptId).getOrElse(ImmutableMultiset.of[BlockId]())
    val writeLocks = writeLocksByTask.remove(taskAttemptId).getOrElse(Seq.empty)

    for (blockId <- writeLocks) {
      infos.get(blockId).foreach { info =>
        assert(info.writerTask == taskAttemptId)
        info.writerTask = BlockInfo.NO_WRITER
      }
      blocksWithReleasedLocks += blockId
    }

    readLocks.entrySet().iterator().asScala.foreach { entry =>
      val blockId = entry.getElement
      val lockCount = entry.getCount
      blocksWithReleasedLocks += blockId
      get(blockId).foreach { info =>
        info.readerCount -= lockCount
        assert(info.readerCount >= 0)
      }
    }

    notifyAll()
    blocksWithReleasedLocks
  }

鎖降級(jí)

鎖降級(jí)的標(biāo)準(zhǔn)定義就是寫線程在持有寫鎖的情況下去獲取讀鎖,然后釋放寫鎖拍皮。BlockInfoManager中的塊鎖降級(jí)實(shí)現(xiàn)如下歹叮。

代碼#21.7 - o.a.s.storage.BlockInfoManager.downgradeLock()方法

  def downgradeLock(blockId: BlockId): Unit = synchronized {
    logTrace(s"Task $currentTaskAttemptId downgrading write lock for $blockId")
    val info = get(blockId).get
    require(info.writerTask == currentTaskAttemptId,
      s"Task $currentTaskAttemptId tried to downgrade a write lock that it does not hold on" +
        s" block $blockId")
    unlock(blockId)
    val lockOutcome = lockForReading(blockId, blocking = false)
    assert(lockOutcome.isDefined)
  }

可見,這個(gè)降級(jí)的過程與上面的標(biāo)準(zhǔn)定義有所出入铆帽,實(shí)際上是先釋放了寫鎖咆耿,然后重新獲取了讀鎖,但結(jié)果是相同的爹橱。

刪除BlockInfo

removeBlock()方法從infos映射中刪掉對(duì)應(yīng)的BlockInfo萨螺,同時(shí)釋放它對(duì)應(yīng)的所有鎖。代碼如下。

代碼#21.8 - o.a.s.storage.BlockInfoManager.removeBlock()方法

  def removeBlock(blockId: BlockId): Unit = synchronized {
    logTrace(s"Task $currentTaskAttemptId trying to remove block $blockId")
    infos.get(blockId) match {
      case Some(blockInfo) =>
        if (blockInfo.writerTask != currentTaskAttemptId) {
          throw new IllegalStateException(
            s"Task $currentTaskAttemptId called remove() on block $blockId without a write lock")
        } else {
          infos.remove(blockId)
          blockInfo.readerCount = 0
          blockInfo.writerTask = BlockInfo.NO_WRITER
          writeLocksByTask.removeBinding(currentTaskAttemptId, blockId)
        }
      case None =>
        throw new IllegalArgumentException(
          s"Task $currentTaskAttemptId called remove() on non-existent block $blockId")
    }
    notifyAll()
  }

可見慰技,只有在持有BlockInfo寫鎖的Task是當(dāng)前Task的情況下椭盏,才可以真正釋放鎖,包括將readerCount清零吻商,將writerTask置為NO_WRITER掏颊。最后仍然要調(diào)用notifyAll()方法喚醒所有塊上等待的線程。

總結(jié)

本文通過塊信息管理器BlockInfoManager的源碼艾帐,詳細(xì)解釋了Spark塊的鎖機(jī)制乌叶,包含獲取讀鎖、獲取寫鎖柒爸、釋放鎖和鎖降級(jí)的細(xì)節(jié)准浴。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市捎稚,隨后出現(xiàn)的幾起案子兄裂,更是在濱河造成了極大的恐慌,老刑警劉巖阳藻,帶你破解...
    沈念sama閱讀 221,635評(píng)論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件晰奖,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡腥泥,警方通過查閱死者的電腦和手機(jī)匾南,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,543評(píng)論 3 399
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來蛔外,“玉大人蛆楞,你說我怎么就攤上這事〖醒幔” “怎么了豹爹?”我有些...
    開封第一講書人閱讀 168,083評(píng)論 0 360
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)矛纹。 經(jīng)常有香客問我臂聋,道長(zhǎng),這世上最難降的妖魔是什么或南? 我笑而不...
    開封第一講書人閱讀 59,640評(píng)論 1 296
  • 正文 為了忘掉前任孩等,我火速辦了婚禮,結(jié)果婚禮上采够,老公的妹妹穿的比我還像新娘肄方。我一直安慰自己,他們只是感情好蹬癌,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,640評(píng)論 6 397
  • 文/花漫 我一把揭開白布权她。 她就那樣靜靜地躺著虹茶,像睡著了一般。 火紅的嫁衣襯著肌膚如雪隅要。 梳的紋絲不亂的頭發(fā)上香嗓,一...
    開封第一講書人閱讀 52,262評(píng)論 1 308
  • 那天帆疟,我揣著相機(jī)與錄音汽绢,去河邊找鬼混聊。 笑死拟淮,一個(gè)胖子當(dāng)著我的面吹牛环疼,可吹牛的內(nèi)容都是我干的蹋嵌。 我是一名探鬼主播芽隆,決...
    沈念sama閱讀 40,833評(píng)論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼询微,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼崖瞭!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起撑毛,我...
    開封第一講書人閱讀 39,736評(píng)論 0 276
  • 序言:老撾萬榮一對(duì)情侶失蹤书聚,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后藻雌,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體雌续,經(jīng)...
    沈念sama閱讀 46,280評(píng)論 1 319
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,369評(píng)論 3 340
  • 正文 我和宋清朗相戀三年胯杭,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了驯杜。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,503評(píng)論 1 352
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡做个,死狀恐怖鸽心,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情居暖,我是刑警寧澤顽频,帶...
    沈念sama閱讀 36,185評(píng)論 5 350
  • 正文 年R本政府宣布,位于F島的核電站太闺,受9級(jí)特大地震影響糯景,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜省骂,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,870評(píng)論 3 333
  • 文/蒙蒙 一莺奸、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧冀宴,春花似錦灭贷、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,340評(píng)論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)仗岖。三九已至,卻和暖如春览妖,著一層夾襖步出監(jiān)牢的瞬間轧拄,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,460評(píng)論 1 272
  • 我被黑心中介騙來泰國(guó)打工讽膏, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留檩电,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 48,909評(píng)論 3 376
  • 正文 我出身青樓府树,卻偏偏與公主長(zhǎng)得像俐末,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子奄侠,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,512評(píng)論 2 359

推薦閱讀更多精彩內(nèi)容