- 現(xiàn)在的Kafka增加了高可用的特性薯演,即增加了復本的特性,同時必然會引入選主,同步等復雜性;
- ReplicaManager負責消息的寫入兑牡,消費在多復本間同步, 節(jié)點成為主或從的轉換等等相關的操作;
- 這篇我們先集中介紹下ReplicaManager里用到的各種基礎類庫
OffsetCheckPoint類
- 文件:/core/src/main/scala/kafka/server/OffsetCheckPoint.scala
- 在kafka的log dir目錄下有一文件:replication-offset-checkpoint, 以Topic+Partition為key, 記錄其高水位的值。那么何為高水位税灌?簡單說就是已經(jīng)復制到所有replica的最后提交的offset, 即所有ISR中的logEndOffset的最小值與leader的目前的高水位值的之間的大者.
- replication-offset-checkpoint文件結構很簡單:
第一行:版本號均函,當前是0
第二行:當前寫入的Topic+Partition的記錄個數(shù)
其他每行: topic 空格 partition 空格 offset - OffsetCheckPoint類實現(xiàn)了對這個文件的讀寫
每次寫入的時修會先寫到 replication-offset-checkpoint.tmp 的臨時文件,讀入后再作rename操作; - recovery-point-offset-checkpoint文件格式與replication-offset-checkpointg一樣垄琐,同樣使用OffsetCheckPoint類來讀寫.
AbstractFetcherManager類
- 文件:/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
- 是個基類, 用于管理當前broker上的所有從leader的數(shù)據(jù)同步;
- 主要成員變量:
private val fetcherThreadMap = new mutable.HashMap[BrokerAndFetcherId, AbstractFetcherThread]
, 實現(xiàn)的拉取消息由AbstractFetcherThread
來負責, 每個brokerId+fetcherId對應一個AbstractFetcherThread
; -
addFetcherForPartitions(partitionAndOffsets: Map[TopicAndPartition, BrokerAndInitialOffset])
: 創(chuàng)建并開始消息同步線程;
其中最主要的操作是調(diào)用AbstractFetcherThread
的addPartitions
方法來告訴同步線程具體需要同步哪些partition; -
def removeFetcherForPartitions(partitions: Set[TopicAndPartition])
: 移出對某些partition的同步; -
def shutdownIdleFetcherThreads()
: 如果某些同步線程負責同步的partition數(shù)量為0,則停掉該線程; -
def closeAllFetchers()
: 停掉所有的同步線程 -
def createFetcherThread(fetcherId: Int, sourceBroker: BrokerEndPoint): AbstractFetcherThread
: 抽象方法, 由子類實現(xiàn), 用于創(chuàng)建具體的同步線程;
ReplicaFetcherManager類
- 文件:/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala
- 繼承自AbstractFetcherManager類
- 僅實現(xiàn)了
def createFetcherThread(fetcherId: Int, sourceBroker: BrokerEndPoint): AbstractFetcherThread
: 創(chuàng)建了ReplicaFetcherThread
AbstractFetcherThread類
- 文件: /core/src/main/scala/kafka/server/AbstractFetcherThread.scala
- 本身是個抽象基類, 實現(xiàn)了從partition的leader來同步數(shù)據(jù)的具體操作;
-
def addPartitions(partitionAndOffsets: Map[TopicAndPartition, Long])
: 添加需要同步的partition信息, 包換topic, partition和初始開始同步的offset;
如果提供的初始offset無效, 則通過handleOffsetOutOfRange(topicAndPartition)
方法來獲取有效的初始offset, 這個方法的說明參見下面ReplicaFetcherThread類
的分析; -
def delayPartitions(partitions: Iterable[TopicAndPartition], delay: Long)
: 延遲同步某些partition, 通過DelayItem
來實現(xiàn); -
def removePartitions(topicAndPartitions: Set[TopicAndPartition])
: 移除某些partition的同步; - 此線程的執(zhí)行體:
override def doWork() {
val fetchRequest = inLock(partitionMapLock) {
val fetchRequest = buildFetchRequest(partitionMap)
if (fetchRequest.isEmpty) {
trace("There are no active partitions. Back off for %d ms before sending a fetch request".format(fetchBackOffMs))
partitionMapCond.await(fetchBackOffMs, TimeUnit.MILLISECONDS)
}
fetchRequest
}
if (!fetchRequest.isEmpty)
processFetchRequest(fetchRequest)
}
基本上就是作三件事: 構造FetchRequest, 同步發(fā)送FetchRequest并接收FetchResponse, 處理FetchResponse, 這三件事的實現(xiàn)調(diào)用了下列方法:
def processPartitionData(topicAndPartition: TopicAndPartition, fetchOffset: Long, partitionData: PD)
// handle a partition whose offset is out of range and return a new fetch offset
def handleOffsetOutOfRange(topicAndPartition: TopicAndPartition): Long
// deal with partitions with errors, potentially due to leadership changes
def handlePartitionsWithErrors(partitions: Iterable[TopicAndPartition])
protected def buildFetchRequest(partitionMap: Map[TopicAndPartition, PartitionFetchState]): REQ
protected def fetch(fetchRequest: REQ): Map[TopicAndPartition, PD]
它們都是在具體的子類中實現(xiàn), 我們在下面的 ReplicaFetcherThread類
中作說明.
ReplicaFetcherThread類
- 文件: /core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
-
handleOffsetOutOfRange
:處理從leader同步數(shù)據(jù)時,當前replica的的初始offset為-1的情況
-
earliestOrLatestOffset(topicAndPartition, ListOffsetRequest.LATEST_TIMESTAMP, brokerConfig.brokerId)
先通過向Leader發(fā)送OffsetRequest
來獲取leader當前的LogEndOffset; -
如果Leader的LogEndOffset小于當前replica的logEndOffset, 這原則上不可能啊,除非是出現(xiàn)了
Unclean leader election
:即ISR里的broker都掛了,然后ISR之外的一個replica作了主; - 如果broker的配置不允許
Unclean leader election
, 則Runtime.getRuntime.halt(1)
; - 如果broker的配置允許
Unclean leader election
, 則當前replica本地的log要作truncate, truncate到Leader的LogEndOffset; - 如果Leader的LogEndOffset大于當前replica的logEndOffset, 說明Leader有有效的數(shù)據(jù)供當前的replica來同步,那么剩下的問題就是看從哪里開始同步了;
-
earliestOrLatestOffset(topicAndPartition, ListOffsetRequest.EARLIEST_TIMESTAMP, brokerConfig.brokerId)
通過向Leader發(fā)送OffsetRequest
來獲取leader當前有效的最舊Offset: StartOffset; - 作一次truncate, 從startOffset開始追加:
replicaMgr.logManager.truncateFullyAndStartAt(topicAndPartition, leaderStartOffset)
-
def buildFetchRequest(partitionMap: Map[TopicAndPartition, PartitionFetchState]): FetchRequest
: 構造FetchRequest
val requestMap = mutable.Map.empty[TopicPartition, JFetchRequest.PartitionData]
partitionMap.foreach { case ((TopicAndPartition(topic, partition), partitionFetchState)) =>
if (partitionFetchState.isActive)
requestMap(new TopicPartition(topic, partition)) = new JFetchRequest.PartitionData(partitionFetchState.offset, fetchSize)
}
new FetchRequest(new JFetchRequest(replicaId, maxWait, minBytes, requestMap.asJava))
這個沒什么好說的,就是按照FetchRequest的協(xié)議來;
-
def fetch(fetchRequest: REQ): Map[TopicAndPartition, PD]
: 發(fā)送FetchRequest請求,同步等待FetchResponse的返回
val clientResponse = sendRequest(ApiKeys.FETCH, Some(fetchRequestVersion), fetchRequest.underlying)
new FetchResponse(clientResponse.responseBody).responseData.asScala.map { case (key, value) =>
TopicAndPartition(key.topic, key.partition) -> new PartitionData(value)
}
使用NetworkClient
來實現(xiàn)到leader broker的連接,請求的發(fā)送和接收,
使用kafka.utils.NetworkClientBlockingOps._
實現(xiàn)了這個網(wǎng)絡操作的同步阻塞方式.
這個實現(xiàn)可參見KafkaController分析2-NetworkClient分析
-
def processPartitionData(topicAndPartition: TopicAndPartition, fetchOffset: Long, partitionData: PartitionData)
: 處理拉取過來的消息
try {
val TopicAndPartition(topic, partitionId) = topicAndPartition
val replica = replicaMgr.getReplica(topic, partitionId).get
val messageSet = partitionData.toByteBufferMessageSet
warnIfMessageOversized(messageSet)
if (fetchOffset != replica.logEndOffset.messageOffset)
throw new RuntimeException("Offset mismatch: fetched offset = %d, log end offset = %d.".format(fetchOffset, replica.logEndOffset.messageOffset))
replica.log.get.append(messageSet, assignOffsets = false)
val followerHighWatermark = replica.logEndOffset.messageOffset.min(partitionData.highWatermark)
replica.highWatermark = new LogOffsetMetadata(followerHighWatermark)
trace("Follower %d set replica high watermark for partition [%s,%d] to %s"
.format(replica.brokerId, topic, partitionId, followerHighWatermark))
} catch {
case e: KafkaStorageException =>
fatal("Disk error while replicating data.", e)
Runtime.getRuntime.halt(1)
}
干三件事:
- 消息寫入以相應的replica;
- 更新replica的highWatermark
- 如果有
KafkaStorageException
異常,就退出啦~~
-
def handlePartitionsWithErrors(partitions: Iterable[TopicAndPartition])
: 對于在同步過程中發(fā)生錯誤的partition,會調(diào)用此方法處理:
delayPartitions(partitions, brokerConfig.replicaFetchBackoffMs.toLong)
目前的作法是將此partition的同步操作延遲一段時間.