ReplicaManager源碼解析1-消息同步線程管理

  • 現(xiàn)在的Kafka增加了高可用的特性薯演,即增加了復本的特性,同時必然會引入選主,同步等復雜性;
  • ReplicaManager負責消息的寫入兑牡,消費在多復本間同步, 節(jié)點成為主或從的轉換等等相關的操作;
  • 這篇我們先集中介紹下ReplicaManager里用到的各種基礎類庫

OffsetCheckPoint類
  • 文件:/core/src/main/scala/kafka/server/OffsetCheckPoint.scala
  • 在kafka的log dir目錄下有一文件:replication-offset-checkpoint, 以Topic+Partition為key, 記錄其高水位的值。那么何為高水位税灌?簡單說就是已經(jīng)復制到所有replica的最后提交的offset, 即所有ISR中的logEndOffset的最小值與leader的目前的高水位值的之間的大者.
  • replication-offset-checkpoint文件結構很簡單:
    第一行:版本號均函,當前是0
    第二行:當前寫入的Topic+Partition的記錄個數(shù)
    其他每行: topic 空格 partition 空格 offset
  • OffsetCheckPoint類實現(xiàn)了對這個文件的讀寫
    每次寫入的時修會先寫到 replication-offset-checkpoint.tmp 的臨時文件,讀入后再作rename操作;
  • recovery-point-offset-checkpoint文件格式與replication-offset-checkpointg一樣垄琐,同樣使用OffsetCheckPoint類來讀寫.
AbstractFetcherManager類
  • 文件:/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
  • 是個基類, 用于管理當前broker上的所有從leader的數(shù)據(jù)同步;
  • 主要成員變量:private val fetcherThreadMap = new mutable.HashMap[BrokerAndFetcherId, AbstractFetcherThread], 實現(xiàn)的拉取消息由AbstractFetcherThread來負責, 每個brokerId+fetcherId對應一個AbstractFetcherThread;
  • addFetcherForPartitions(partitionAndOffsets: Map[TopicAndPartition, BrokerAndInitialOffset]): 創(chuàng)建并開始消息同步線程;
    其中最主要的操作是調(diào)用AbstractFetcherThreadaddPartitions方法來告訴同步線程具體需要同步哪些partition;
  • def removeFetcherForPartitions(partitions: Set[TopicAndPartition]): 移出對某些partition的同步;
  • def shutdownIdleFetcherThreads(): 如果某些同步線程負責同步的partition數(shù)量為0,則停掉該線程;
  • def closeAllFetchers(): 停掉所有的同步線程
  • def createFetcherThread(fetcherId: Int, sourceBroker: BrokerEndPoint): AbstractFetcherThread: 抽象方法, 由子類實現(xiàn), 用于創(chuàng)建具體的同步線程;
ReplicaFetcherManager類
  • 文件:/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala
  • 繼承自AbstractFetcherManager類
  • 僅實現(xiàn)了def createFetcherThread(fetcherId: Int, sourceBroker: BrokerEndPoint): AbstractFetcherThread: 創(chuàng)建了ReplicaFetcherThread
AbstractFetcherThread類
  • 文件: /core/src/main/scala/kafka/server/AbstractFetcherThread.scala
  • 本身是個抽象基類, 實現(xiàn)了從partition的leader來同步數(shù)據(jù)的具體操作;
  • def addPartitions(partitionAndOffsets: Map[TopicAndPartition, Long]): 添加需要同步的partition信息, 包換topic, partition和初始開始同步的offset;
    如果提供的初始offset無效, 則通過handleOffsetOutOfRange(topicAndPartition)方法來獲取有效的初始offset, 這個方法的說明參見下面ReplicaFetcherThread類的分析;
  • def delayPartitions(partitions: Iterable[TopicAndPartition], delay: Long): 延遲同步某些partition, 通過DelayItem來實現(xiàn);
  • def removePartitions(topicAndPartitions: Set[TopicAndPartition]): 移除某些partition的同步;
  • 此線程的執(zhí)行體:
override def doWork() {
     val fetchRequest = inLock(partitionMapLock) {
      val fetchRequest = buildFetchRequest(partitionMap)
      if (fetchRequest.isEmpty) {
        trace("There are no active partitions. Back off for %d ms before sending a fetch request".format(fetchBackOffMs))
        partitionMapCond.await(fetchBackOffMs, TimeUnit.MILLISECONDS)
      }
      fetchRequest
    }

    if (!fetchRequest.isEmpty)
      processFetchRequest(fetchRequest)
  }

基本上就是作三件事: 構造FetchRequest, 同步發(fā)送FetchRequest并接收FetchResponse, 處理FetchResponse, 這三件事的實現(xiàn)調(diào)用了下列方法:

def processPartitionData(topicAndPartition: TopicAndPartition, fetchOffset: Long, partitionData: PD)

  // handle a partition whose offset is out of range and return a new fetch offset
  def handleOffsetOutOfRange(topicAndPartition: TopicAndPartition): Long

  // deal with partitions with errors, potentially due to leadership changes
  def handlePartitionsWithErrors(partitions: Iterable[TopicAndPartition])

  protected def buildFetchRequest(partitionMap: Map[TopicAndPartition, PartitionFetchState]): REQ

  protected def fetch(fetchRequest: REQ): Map[TopicAndPartition, PD]

它們都是在具體的子類中實現(xiàn), 我們在下面的 ReplicaFetcherThread類 中作說明.

ReplicaFetcherThread類
  • 文件: /core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
  • handleOffsetOutOfRange:處理從leader同步數(shù)據(jù)時,當前replica的的初始offset為-1的情況
  1. earliestOrLatestOffset(topicAndPartition, ListOffsetRequest.LATEST_TIMESTAMP, brokerConfig.brokerId) 先通過向Leader發(fā)送OffsetRequest來獲取leader當前的LogEndOffset;
  2. 如果Leader的LogEndOffset小于當前replica的logEndOffset, 這原則上不可能啊,除非是出現(xiàn)了Unclean leader election:即ISR里的broker都掛了,然后ISR之外的一個replica作了主;
  3. 如果broker的配置不允許Unclean leader election, 則Runtime.getRuntime.halt(1);
  4. 如果broker的配置允許Unclean leader election, 則當前replica本地的log要作truncate, truncate到Leader的LogEndOffset;
  5. 如果Leader的LogEndOffset大于當前replica的logEndOffset, 說明Leader有有效的數(shù)據(jù)供當前的replica來同步,那么剩下的問題就是看從哪里開始同步了;
  6. earliestOrLatestOffset(topicAndPartition, ListOffsetRequest.EARLIEST_TIMESTAMP, brokerConfig.brokerId) 通過向Leader發(fā)送OffsetRequest來獲取leader當前有效的最舊Offset: StartOffset;
  7. 作一次truncate, 從startOffset開始追加:replicaMgr.logManager.truncateFullyAndStartAt(topicAndPartition, leaderStartOffset)
  • def buildFetchRequest(partitionMap: Map[TopicAndPartition, PartitionFetchState]): FetchRequest: 構造FetchRequest
  val requestMap = mutable.Map.empty[TopicPartition, JFetchRequest.PartitionData]

    partitionMap.foreach { case ((TopicAndPartition(topic, partition), partitionFetchState)) =>
      if (partitionFetchState.isActive)
        requestMap(new TopicPartition(topic, partition)) = new JFetchRequest.PartitionData(partitionFetchState.offset, fetchSize)
    }

    new FetchRequest(new JFetchRequest(replicaId, maxWait, minBytes, requestMap.asJava))

這個沒什么好說的,就是按照FetchRequest的協(xié)議來;

  • def fetch(fetchRequest: REQ): Map[TopicAndPartition, PD]: 發(fā)送FetchRequest請求,同步等待FetchResponse的返回
    val clientResponse = sendRequest(ApiKeys.FETCH, Some(fetchRequestVersion), fetchRequest.underlying)
    new FetchResponse(clientResponse.responseBody).responseData.asScala.map { case (key, value) =>
      TopicAndPartition(key.topic, key.partition) -> new PartitionData(value)
    }

使用NetworkClient來實現(xiàn)到leader broker的連接,請求的發(fā)送和接收,
使用kafka.utils.NetworkClientBlockingOps._實現(xiàn)了這個網(wǎng)絡操作的同步阻塞方式.
這個實現(xiàn)可參見KafkaController分析2-NetworkClient分析

  • def processPartitionData(topicAndPartition: TopicAndPartition, fetchOffset: Long, partitionData: PartitionData): 處理拉取過來的消息
  try {
      val TopicAndPartition(topic, partitionId) = topicAndPartition
      val replica = replicaMgr.getReplica(topic, partitionId).get
      val messageSet = partitionData.toByteBufferMessageSet
      warnIfMessageOversized(messageSet)

      if (fetchOffset != replica.logEndOffset.messageOffset)
        throw new RuntimeException("Offset mismatch: fetched offset = %d, log end offset = %d.".format(fetchOffset, replica.logEndOffset.messageOffset))
      replica.log.get.append(messageSet, assignOffsets = false)
      val followerHighWatermark = replica.logEndOffset.messageOffset.min(partitionData.highWatermark)
      replica.highWatermark = new LogOffsetMetadata(followerHighWatermark)
      trace("Follower %d set replica high watermark for partition [%s,%d] to %s"
            .format(replica.brokerId, topic, partitionId, followerHighWatermark))
    } catch {
      case e: KafkaStorageException =>
        fatal("Disk error while replicating data.", e)
        Runtime.getRuntime.halt(1)
    }

干三件事:

  1. 消息寫入以相應的replica;
  2. 更新replica的highWatermark
  3. 如果有KafkaStorageException異常,就退出啦~~
  • def handlePartitionsWithErrors(partitions: Iterable[TopicAndPartition]): 對于在同步過程中發(fā)生錯誤的partition,會調(diào)用此方法處理:
delayPartitions(partitions, brokerConfig.replicaFetchBackoffMs.toLong)

目前的作法是將此partition的同步操作延遲一段時間.

Kafka源碼分析-匯總

最后編輯于
?著作權歸作者所有,轉載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末边酒,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子狸窘,更是在濱河造成了極大的恐慌墩朦,老刑警劉巖,帶你破解...
    沈念sama閱讀 211,884評論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件翻擒,死亡現(xiàn)場離奇詭異氓涣,居然都是意外死亡,警方通過查閱死者的電腦和手機陋气,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,347評論 3 385
  • 文/潘曉璐 我一進店門劳吠,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人巩趁,你說我怎么就攤上這事痒玩。” “怎么了?”我有些...
    開封第一講書人閱讀 157,435評論 0 348
  • 文/不壞的土叔 我叫張陵蠢古,是天一觀的道長奴曙。 經(jīng)常有香客問我,道長草讶,這世上最難降的妖魔是什么洽糟? 我笑而不...
    開封第一講書人閱讀 56,509評論 1 284
  • 正文 為了忘掉前任,我火速辦了婚禮堕战,結果婚禮上坤溃,老公的妹妹穿的比我還像新娘。我一直安慰自己嘱丢,他們只是感情好薪介,可當我...
    茶點故事閱讀 65,611評論 6 386
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著屿讽,像睡著了一般昭灵。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上伐谈,一...
    開封第一講書人閱讀 49,837評論 1 290
  • 那天烂完,我揣著相機與錄音,去河邊找鬼诵棵。 笑死抠蚣,一個胖子當著我的面吹牛,可吹牛的內(nèi)容都是我干的履澳。 我是一名探鬼主播嘶窄,決...
    沈念sama閱讀 38,987評論 3 408
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼距贷!你這毒婦竟也來了柄冲?” 一聲冷哼從身側響起,我...
    開封第一講書人閱讀 37,730評論 0 267
  • 序言:老撾萬榮一對情侶失蹤忠蝗,失蹤者是張志新(化名)和其女友劉穎现横,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體阁最,經(jīng)...
    沈念sama閱讀 44,194評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡戒祠,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,525評論 2 327
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了速种。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片姜盈。...
    茶點故事閱讀 38,664評論 1 340
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖配阵,靈堂內(nèi)的尸體忽然破棺而出馏颂,到底是詐尸還是另有隱情示血,我是刑警寧澤,帶...
    沈念sama閱讀 34,334評論 4 330
  • 正文 年R本政府宣布饱亮,位于F島的核電站矾芙,受9級特大地震影響舍沙,放射性物質(zhì)發(fā)生泄漏近上。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 39,944評論 3 313
  • 文/蒙蒙 一拂铡、第九天 我趴在偏房一處隱蔽的房頂上張望壹无。 院中可真熱鬧,春花似錦感帅、人聲如沸斗锭。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,764評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽岖是。三九已至,卻和暖如春实苞,著一層夾襖步出監(jiān)牢的瞬間豺撑,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,997評論 1 266
  • 我被黑心中介騙來泰國打工黔牵, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留聪轿,地道東北人。 一個月前我還...
    沈念sama閱讀 46,389評論 2 360
  • 正文 我出身青樓猾浦,卻偏偏與公主長得像陆错,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子金赦,可洞房花燭夜當晚...
    茶點故事閱讀 43,554評論 2 349

推薦閱讀更多精彩內(nèi)容