DelayedOperationPurgatory--謎之煉獄

  • 在kafka中有很多操作需要延遲等待, 比如客戶端發(fā)送數(shù)據(jù)到達leader后, 根據(jù)設(shè)置ack方式不同,需要等待其replicas返回ack, 那這個ack就需要延遲等待;對于一個拉取操作, 需要延遲等待期望拉取的字節(jié)數(shù)準備好;
  • 有延遲操作, 那必然會存在操作的超時處理. 還記得我們上一篇Kafka中的時間輪中對Timer的分析吧, 這里的延遲操作需要使用它來實現(xiàn);

DelayedOperation
  • 所在文件: core/src/main/scala/kafka/server/DelayedOperation.scala
  • 這是個抽象類, 所有具體的延遲任務都需要繼承這個類;
  • 同時每個延遲任務必然存在操作的超時, 那么其超時操作是通過將對象放到Kafka中的時間輪中的Timer中處理, 因此這個類又繼承自TimerTask;
  • private val completed = new AtomicBoolean(false): 原子變量, 標識當前operation是否已完成;
  • def forceComplete(): Boolean: 強制完成操作;
if (completed.compareAndSet(false, true)) {
      // cancel the timeout timer
      cancel()
      onComplete()
      true
    } else {
      false
    }

分兩種情況:

  1. 當前操作已經(jīng)完成,則不再需要強制完成,返回false;
  2. 當前操作未完成, 則首先在Timer中取消這個定時任務, 然后回調(diào)onComplete
  • override def run(): Unit: 實現(xiàn)的是TimerTask的方法, 當超時時會執(zhí)行此操作:
if (forceComplete())
      onExpiration()

里面的操作比較簡單, 調(diào)用forceComplete, 如果成功,表明是真的超時了,回調(diào)onExpiration;

  • 需要由子類實現(xiàn)的方法:
  1. def onExpiration(): Unit: 超時后的回調(diào)處理;
  2. def onComplete(): Unit: 操作完成后的回調(diào)處理;
  3. def tryComplete(): Boolean: 在放入到Timer前, 先嘗試著執(zhí)行一下這個操作, 看是否可以完成, 如果可以就不用放到Timer里了, 這是為了確保任務都盡快完成作的一個優(yōu)化;
Watchers
  • 所在文件: core/src/main/scala/kafka/server/DelayedOperation.scala
  • 對于一個延遲任務, 一般會有兩個操作加持在身:
  1. 上面說的作為超時任務放在Timer中;
  2. 與某些事件關(guān)聯(lián)在一起, 可以關(guān)聯(lián)多個事件, 當這些事件中的某一個發(fā)生時, 這個任務即可認為是完成;這個就是 Watchers類要完成的工作;
  • class Watchers(val key: Any): 構(gòu)造時需要一個參數(shù)key, 你可以理解成是一個事件;
  • private[this] val operations = new LinkedList[T](): 用于存放和這個key關(guān)聯(lián)的所有操作,一個key可以關(guān)聯(lián)多個操作, 同時一個操作也可以被多個key關(guān)聯(lián)(即位于多個Watchers對象中)
  • def purgeCompleted(): Int: 刪除鏈表中所有已經(jīng)完成的操作
      var purged = 0
      operations synchronized {
        val iter = operations.iterator()
        while (iter.hasNext) {
          val curr = iter.next()
          if (curr.isCompleted) {
            iter.remove()
            purged += 1
          }
        }
      }
      if (operations.size == 0)
        removeKeyIfEmpty(key, this)

      purged
    }
  • def tryCompleteWatched(): Int:
     var completed = 0
      operations synchronized {
        val iter = operations.iterator()
        while (iter.hasNext) {
          val curr = iter.next()
          if (curr.isCompleted) {
            // another thread has completed this operation, just remove it
            iter.remove()
          } else if (curr synchronized curr.tryComplete()) {
            completed += 1
            iter.remove()
          }
        }
      }

      if (operations.size == 0)
        removeKeyIfEmpty(key, this)

      completed

掃描整個鏈表:

  1. 如果任務已完成,則移除;
  2. 如果任務未完成, 調(diào)用tryComplete嘗試立即完成, 如果可以完成, 則移除;
  • 添加任務:
def watch(t: T) {
      operations synchronized operations.add(t)
    }
DelayedOperationPurgatory
  • 所在文件: core/src/main/scala/kafka/server/DelayedOperation.scala
  • 終于要揭開我們的謎之煉獄啦, 源碼里的注釋如下:

A helper purgatory class for bookkeeping delayed operations with a timeout, and expiring timed out operations

實際上就是用來通過TimerWatchers來管理一批延遲任務;

  • private[this] val timeoutTimer = new Timer(executor): 用來處理加入的作務的超時行為;
  • private val expirationReaper = new ExpiredOperationReaper():
private class ExpiredOperationReaper extends ShutdownableThread(
    "ExpirationReaper-%d".format(brokerId),
    false) {

    override def doWork() {
      timeoutTimer.advanceClock(200L)

      if (estimatedTotalOperations.get - delayed > purgeInterval) {
        estimatedTotalOperations.getAndSet(delayed)
        debug("Begin purging watch lists")
        val purged = allWatchers.map(_.purgeCompleted()).sum
        debug("Purged %d elements from watch lists.".format(purged))
      }
    }
  }
  1. timeoutTimer.advanceClock(200L): 驅(qū)動Timer向前走, pop出超時的延遲任務;
  2. val purged = allWatchers.map(_.purgeCompleted()).sum: 利用閾值(purgeInterval)來周期性地從Watchers中清理掉已經(jīng)完成的任務;
  • def tryCompleteElseWatch(operation: T, watchKeys: Seq[Any]): Boolean: 將operation和一系列的事件(key)關(guān)聯(lián)起來, 然后調(diào)用tryComplete嘗試立即完成該操作,如果不能完成,加入到Timer中;

  • def checkAndComplete(key: Any): Int: 按key找到相應的Watchers對象, 然后調(diào)用其tryCompleteWatched(), 說明上面用;

簡圖:
DelayedOperation.png
基本上就是這些了

Kafka源碼分析-匯總

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末洪灯,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子秕噪,更是在濱河造成了極大的恐慌关霸,老刑警劉巖酱吝,帶你破解...
    沈念sama閱讀 206,482評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件澳化,死亡現(xiàn)場離奇詭異屈雄,居然都是意外死亡橄碾,警方通過查閱死者的電腦和手機蒋得,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,377評論 2 382
  • 文/潘曉璐 我一進店門级及,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人额衙,你說我怎么就攤上這事饮焦∨挛猓” “怎么了?”我有些...
    開封第一講書人閱讀 152,762評論 0 342
  • 文/不壞的土叔 我叫張陵县踢,是天一觀的道長转绷。 經(jīng)常有香客問我,道長硼啤,這世上最難降的妖魔是什么议经? 我笑而不...
    開封第一講書人閱讀 55,273評論 1 279
  • 正文 為了忘掉前任,我火速辦了婚禮谴返,結(jié)果婚禮上煞肾,老公的妹妹穿的比我還像新娘。我一直安慰自己嗓袱,他們只是感情好籍救,可當我...
    茶點故事閱讀 64,289評論 5 373
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著渠抹,像睡著了一般蝙昙。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上梧却,一...
    開封第一講書人閱讀 49,046評論 1 285
  • 那天奇颠,我揣著相機與錄音,去河邊找鬼放航。 笑死烈拒,一個胖子當著我的面吹牛,可吹牛的內(nèi)容都是我干的三椿。 我是一名探鬼主播缺菌,決...
    沈念sama閱讀 38,351評論 3 400
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼搜锰!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起耿战,我...
    開封第一講書人閱讀 36,988評論 0 259
  • 序言:老撾萬榮一對情侶失蹤蛋叼,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后剂陡,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體狈涮,經(jīng)...
    沈念sama閱讀 43,476評論 1 300
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 35,948評論 2 324
  • 正文 我和宋清朗相戀三年鸭栖,在試婚紗的時候發(fā)現(xiàn)自己被綠了歌馍。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 38,064評論 1 333
  • 序言:一個原本活蹦亂跳的男人離奇死亡晕鹊,死狀恐怖松却,靈堂內(nèi)的尸體忽然破棺而出暴浦,到底是詐尸還是另有隱情,我是刑警寧澤晓锻,帶...
    沈念sama閱讀 33,712評論 4 323
  • 正文 年R本政府宣布歌焦,位于F島的核電站,受9級特大地震影響砚哆,放射性物質(zhì)發(fā)生泄漏独撇。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 39,261評論 3 307
  • 文/蒙蒙 一躁锁、第九天 我趴在偏房一處隱蔽的房頂上張望纷铣。 院中可真熱鬧,春花似錦战转、人聲如沸关炼。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,264評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽儒拂。三九已至,卻和暖如春色鸳,著一層夾襖步出監(jiān)牢的瞬間社痛,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,486評論 1 262
  • 我被黑心中介騙來泰國打工命雀, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留蒜哀,地道東北人。 一個月前我還...
    沈念sama閱讀 45,511評論 2 354
  • 正文 我出身青樓吏砂,卻偏偏與公主長得像撵儿,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子狐血,可洞房花燭夜當晚...
    茶點故事閱讀 42,802評論 2 345

推薦閱讀更多精彩內(nèi)容