Kafka 源碼解析之 Server 端如何處理 Fetch 請求

[TOC]
日志的讀寫操作是 Kafka 存儲層最重要的內(nèi)容芬为,本文會以 Server 端處理 Fetch 請求的過程為入口模闲,一步步深入到底層的 Log 實例部分腾节。與 Produce 請求不一樣的地方是忘嫉,對于 Fetch 請求,是有兩種不同的來源:consumer 和 follower案腺,consumer 讀取數(shù)據(jù)與副本同步數(shù)據(jù)都是通過向 leader 發(fā)送 Fetch 請求來實現(xiàn)的庆冕,在對這兩種不同情況處理過程中,其底層的實現(xiàn)是統(tǒng)一的劈榨,只是實現(xiàn)方法的參數(shù)不同而已访递,在本文中會詳細講述對這兩種不同情況的處理。

Fetch 請求處理的整體流程

Fetch 請求(讀請求)的處理與 Produce 請求(寫請求)的整體流程非常類似同辣,讀和寫由最上面的抽象層做入口拷姿,最終還是在存儲層的 Log 對象實例進行真正的讀寫操作,在這一點上邑闺,Kafka 封裝的非常清晰跌前,這樣的系統(tǒng)設計是非常值得學習的,甚至可以作為分布式系統(tǒng)的模范系統(tǒng)來學習陡舅。

Fetch 請求處理的整體流程如下圖所示抵乓,與 Produce 請求的處理流程非常相似。


image.png

Fetch 請求的來源
那 Server 要處理的 Fetch 請求有幾種類型呢?來自于哪里呢灾炭?第一個來源肯定是 Consumer茎芋,Consumer 在消費數(shù)據(jù)時會向 Server 端發(fā)送 Fetch 請求,那么是不是還沒有其他的類型蜈出,對 Kafka 比較熟悉的同學大概會猜到田弥,還有一種就是:副本同步,follower 在從 leader 同步數(shù)據(jù)時铡原,也是發(fā)送的 Fetch 請求偷厦,下面看下這兩種情況的具體實現(xiàn)(代碼會進行簡化,并不完全與源碼一致燕刻,便于理解)

Consumer Fetch 請求
Consumer 的 Fetch 請求是在 poll 方法中調(diào)用的只泼,F(xiàn)etcher 請求的構造過程及發(fā)送如下所示:

/**
 * Set-up a fetch request for any node that we have assigned partitions for which doesn't already have
 * an in-flight fetch or pending fetch data.
 * @return number of fetches sent
 */
//note: 向訂閱的所有 partition (只要該 leader 暫時沒有拉取請求)所在 leader 發(fā)送 fetch請求
public int sendFetches() {
    //note: 1 創(chuàng)建 Fetch Request
    Map<Node, FetchRequest.Builder> fetchRequestMap = createFetchRequests();
    for (Map.Entry<Node, FetchRequest.Builder> fetchEntry : fetchRequestMap.entrySet()) {
        final FetchRequest.Builder request = fetchEntry.getValue();
        final Node fetchTarget = fetchEntry.getKey();

        log.debug("Sending fetch for partitions {} to broker {}", request.fetchData().keySet(), fetchTarget);
        //note: 2 發(fā)送 Fetch Request
        client.send(fetchTarget, request)
                .addListener(new RequestFutureListener<ClientResponse>() {
                    @Override
                    public void onSuccess(ClientResponse resp) {
                        ...
                    }

                    @Override
                    public void onFailure(RuntimeException e) {
                        ...
                    }
                });
    }
    return fetchRequestMap.size();
}

/**
 * Create fetch requests for all nodes for which we have assigned partitions
 * that have no existing requests in flight.
 */
//note: 為所有 node 創(chuàng)建 fetch request
private Map<Node, FetchRequest.Builder> createFetchRequests() {
    // create the fetch info
    Cluster cluster = metadata.fetch();
    Map<Node, LinkedHashMap<TopicPartition, FetchRequest.PartitionData>> fetchable = new LinkedHashMap<>();
    for (TopicPartition partition : fetchablePartitions()) {
        Node node = cluster.leaderFor(partition);
        if (node == null) {
            metadata.requestUpdate();
        } else if (this.client.pendingRequestCount(node) == 0) {
            // if there is a leader and no in-flight requests, issue a new fetch
            LinkedHashMap<TopicPartition, FetchRequest.PartitionData> fetch = fetchable.get(node);
            if (fetch == null) {
                fetch = new LinkedHashMap<>();
                fetchable.put(node, fetch);
            }

            long position = this.subscriptions.position(partition);
            //note: 要 fetch 的 position 以及 fetch 的大小
            fetch.put(partition, new FetchRequest.PartitionData(position, this.fetchSize));
            log.trace("Added fetch request for partition {} at offset {} to node {}", partition, position, node);
        } else {
            log.trace("Skipping fetch for partition {} because there is an in-flight request to {}", partition, node);
        }
    }

    // create the fetches
    Map<Node, FetchRequest.Builder> requests = new HashMap<>();
    for (Map.Entry<Node, LinkedHashMap<TopicPartition, FetchRequest.PartitionData>> entry : fetchable.entrySet()) {
        Node node = entry.getKey();
        // 構造 Fetch 請求
        FetchRequest.Builder fetch = new FetchRequest.Builder(this.maxWaitMs, this.minBytes, entry.getValue()).
                setMaxBytes(this.maxBytes);//note: 構建 Fetch Request
        requests.put(node, fetch);
    }
    return requests;
}

從上面可以看出,Consumer 的 Fetcher 請求構造為:

FetchRequest.Builder fetch = new FetchRequest.Builder(this.maxWaitMs, this.minBytes, entry.getValue()).
                setMaxBytes(this.maxBytes);//note: 構建 Fetch Request

Replica 同步 Fetch 請求
在 Replica 同步(Replica 同步流程的講解將會在下篇文章中詳細展開)的 Fetch 請求中卵洗,其 Fetch 請求的構造如下所示:

//note: 構造 Fetch 請求
protected def buildFetchRequest(partitionMap: Seq[(TopicPartition, PartitionFetchState)]): FetchRequest = {
  val requestMap = new util.LinkedHashMap[TopicPartition, JFetchRequest.PartitionData]

  partitionMap.foreach { case (topicPartition, partitionFetchState) =>
    // We will not include a replica in the fetch request if it should be throttled.
    if (partitionFetchState.isActive && !shouldFollowerThrottle(quota, topicPartition))
      requestMap.put(topicPartition, new JFetchRequest.PartitionData(partitionFetchState.offset, fetchSize))
  }
  //note: 關鍵在于 setReplicaId 方法,設置了 replicaId, consumer 的該值為 CONSUMER_REPLICA_ID(-1)
  val requestBuilder = new JFetchRequest.Builder(maxWait, minBytes, requestMap).
      setReplicaId(replicaId).setMaxBytes(maxBytes)
  requestBuilder.setVersion(fetchRequestVersion)
  new FetchRequest(requestBuilder)
}

與 Consumer Fetch 請求進行對比请唱,這里區(qū)別僅在于在構造 FetchRequest 時,調(diào)用了 setReplicaId() 方法設置了對應的 replicaId过蹂,而 Consumer 在構造時則沒有進行設置十绑,該值默認為 CONSUMER_REPLICA_ID,即 -1酷勺,這個值是作為 Consumer 的 Fetch 請求與 Replica 同步的 Fetch 請求的區(qū)分本橙。

Server 端的處理

這里開始真正講解 Fetch 請求的處理過程,會按照前面圖中的處理流程開始講解鸥印,本節(jié)主要是 Server 端抽象層的內(nèi)容勋功。

KafkaApis 如何處理 Fetch 請求
關于 Fetch 請求的處理,如下所示:

/**
 * Handle a fetch request
 */
def handleFetchRequest(request: RequestChannel.Request) {
  val fetchRequest = request.body.asInstanceOf[FetchRequest]
  val versionId = request.header.apiVersion
  val clientId = request.header.clientId

  //note: 判斷 tp 是否存在以及是否有 Describe 權限
  val (existingAndAuthorizedForDescribeTopics, nonExistingOrUnauthorizedForDescribeTopics) = fetchRequest.fetchData.asScala.toSeq.partition {
    case (tp, _) => authorize(request.session, Describe, new Resource(auth.Topic, tp.topic)) && metadataCache.contains(tp.topic)
  }

  //note: 判斷 tp 是否有 Read 權限
  val (authorizedRequestInfo, unauthorizedForReadRequestInfo) = existingAndAuthorizedForDescribeTopics.partition {
    case (tp, _) => authorize(request.session, Read, new Resource(auth.Topic, tp.topic))
  }

  //note: 不存在或沒有 Describe 權限的 topic 返回 UNKNOWN_TOPIC_OR_PARTITION 錯誤
  val nonExistingOrUnauthorizedForDescribePartitionData = nonExistingOrUnauthorizedForDescribeTopics.map {
    case (tp, _) => (tp, new FetchResponse.PartitionData(Errors.UNKNOWN_TOPIC_OR_PARTITION.code, -1, MemoryRecords.EMPTY))
  }

  //note: 沒有 Read 權限的 topic 返回 TOPIC_AUTHORIZATION_FAILED 錯誤
  val unauthorizedForReadPartitionData = unauthorizedForReadRequestInfo.map {
    case (tp, _) => (tp, new FetchResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED.code, -1, MemoryRecords.EMPTY))
  }

  // the callback for sending a fetch response
  def sendResponseCallback(responsePartitionData: Seq[(TopicPartition, FetchPartitionData)]) {
    ....
    def fetchResponseCallback(delayTimeMs: Int) {
      trace(s"Sending fetch response to client $clientId of " +
        s"${convertedPartitionData.map { case (_, v) => v.records.sizeInBytes }.sum} bytes")
      val fetchResponse = if (delayTimeMs > 0) new FetchResponse(versionId, fetchedPartitionData, delayTimeMs) else response
      requestChannel.sendResponse(new RequestChannel.Response(request, fetchResponse))
    }

    // When this callback is triggered, the remote API call has completed
    request.apiRemoteCompleteTimeMs = time.milliseconds

    //note: 配額情況的處理
    if (fetchRequest.isFromFollower) {
      // We've already evaluated against the quota and are good to go. Just need to record it now.
      val responseSize = sizeOfThrottledPartitions(versionId, fetchRequest, mergedPartitionData, quotas.leader)
      quotas.leader.record(responseSize)
      fetchResponseCallback(0)
    } else {
      quotas.fetch.recordAndMaybeThrottle(request.session.sanitizedUser, clientId, response.sizeOf, fetchResponseCallback)
    }
  }

  if (authorizedRequestInfo.isEmpty)
    sendResponseCallback(Seq.empty)
  else {
    // call the replica manager to fetch messages from the local replica
    //note: 從 replica 上拉取數(shù)據(jù),滿足條件后調(diào)用回調(diào)函數(shù)進行返回
    replicaManager.fetchMessages(
      fetchRequest.maxWait.toLong, //note: 拉取請求最長的等待時間
      fetchRequest.replicaId, //note: Replica 編號库说,Consumer 的為 -1
      fetchRequest.minBytes, //note: 拉取請求設置的最小拉取字節(jié)
      fetchRequest.maxBytes, //note: 拉取請求設置的最大拉取字節(jié)
      versionId <= 2,
      authorizedRequestInfo,
      replicationQuota(fetchRequest),
      sendResponseCallback)
  }
}

Fetch 請求處理的真正實現(xiàn)是在 replicaManager 的 fetchMessages() 方法中狂鞋,在這里,可以看出潜的,無論是 Fetch 請求還是 Produce 請求骚揍,都是通過副本管理器來實現(xiàn)的,副本管理器(ReplicaManager)管理的對象是分區(qū)實例(Partition)啰挪,而每個分區(qū)都會與相應的副本實例對應(Replica)信不,在這個節(jié)點上的副本又會與唯一的 Log 實例對應,正如流程圖的上半部分一樣亡呵,Server 就是通過這幾部分抽象概念來管理真正存儲層的內(nèi)容抽活。

ReplicaManager 如何處理 Fetch 請求
ReplicaManger 處理 Fetch 請求的入口在 fetchMessages() 方法。

fetchMessages
fetchMessages() 方法的具體如下:

/**
 * Fetch messages from the leader replica, and wait until enough data can be fetched and return;
 * the callback function will be triggered either when timeout or required fetch info is satisfied
 */
//note: 從 leader 拉取數(shù)據(jù),等待拉取到足夠的數(shù)據(jù)或者達到 timeout 時間后返回拉取的結果
def fetchMessages(timeout: Long,
                  replicaId: Int,
                  fetchMinBytes: Int,
                  fetchMaxBytes: Int,
                  hardMaxBytesLimit: Boolean,
                  fetchInfos: Seq[(TopicPartition, PartitionData)],
                  quota: ReplicaQuota = UnboundedQuota,
                  responseCallback: Seq[(TopicPartition, FetchPartitionData)] => Unit) {
  val isFromFollower = replicaId >= 0 //note: 判斷請求是來自 consumer (這個值為 -1)還是副本同步
  //note: 默認都是從 leader 拉取锰什,推測這個值只是為了后續(xù)能從 follower 消費數(shù)據(jù)而設置的
  val fetchOnlyFromLeader: Boolean = replicaId != Request.DebuggingConsumerId
  //note: 如果拉取請求來自 consumer(true),只拉取 HW 以內(nèi)的數(shù)據(jù),如果是來自 Replica 同步,則沒有該限制(false)下硕。
  val fetchOnlyCommitted: Boolean = ! Request.isValidBrokerId(replicaId)

  // read from local logs
  //note:獲取本地日志
  val logReadResults = readFromLocalLog(
    replicaId = replicaId,
    fetchOnlyFromLeader = fetchOnlyFromLeader,
    readOnlyCommitted = fetchOnlyCommitted,
    fetchMaxBytes = fetchMaxBytes,
    hardMaxBytesLimit = hardMaxBytesLimit,
    readPartitionInfo = fetchInfos,
    quota = quota)

  // if the fetch comes from the follower,
  // update its corresponding log end offset
  //note: 如果 fetch 來自 broker 的副本同步,那么就更新相關的 log end offset
  if(Request.isValidBrokerId(replicaId))
    updateFollowerLogReadResults(replicaId, logReadResults)

  // check if this fetch request can be satisfied right away
  val logReadResultValues = logReadResults.map { case (_, v) => v }
  val bytesReadable = logReadResultValues.map(_.info.records.sizeInBytes).sum
  val errorReadingData = logReadResultValues.foldLeft(false) ((errorIncurred, readResult) =>
    errorIncurred || (readResult.error != Errors.NONE))

  // respond immediately if 1) fetch request does not want to wait
  //                        2) fetch request does not require any data
  //                        3) has enough data to respond
  //                        4) some error happens while reading data
  //note: 如果滿足以下條件的其中一個,將會立馬返回結果:
  //note: 1. timeout 達到; 2. 拉取結果為空; 3. 拉取到足夠的數(shù)據(jù); 4. 拉取是遇到 error
  if (timeout <= 0 || fetchInfos.isEmpty || bytesReadable >= fetchMinBytes || errorReadingData) {
    val fetchPartitionData = logReadResults.map { case (tp, result) =>
      tp -> FetchPartitionData(result.error, result.hw, result.info.records)
    }
    responseCallback(fetchPartitionData)
  } else {
    //note: 其他情況下,延遲發(fā)送結果
    // construct the fetch results from the read results
    val fetchPartitionStatus = logReadResults.map { case (topicPartition, result) =>
      val fetchInfo = fetchInfos.collectFirst {
        case (tp, v) if tp == topicPartition => v
      }.getOrElse(sys.error(s"Partition $topicPartition not found in fetchInfos"))
      (topicPartition, FetchPartitionStatus(result.info.fetchOffsetMetadata, fetchInfo))
    }
    val fetchMetadata = FetchMetadata(fetchMinBytes, fetchMaxBytes, hardMaxBytesLimit, fetchOnlyFromLeader,
      fetchOnlyCommitted, isFromFollower, replicaId, fetchPartitionStatus)
    val delayedFetch = new DelayedFetch(timeout, fetchMetadata, this, quota, responseCallback)

    // create a list of (topic, partition) pairs to use as keys for this delayed fetch operation
    val delayedFetchKeys = fetchPartitionStatus.map { case (tp, _) => new TopicPartitionOperationKey(tp) }

    // try to complete the request immediately, otherwise put it into the purgatory;
    // this is because while the delayed fetch operation is being created, new requests
    // may arrive and hence make this operation completable.
    delayedFetchPurgatory.tryCompleteElseWatch(delayedFetch, delayedFetchKeys)
  }
}

整體來說丁逝,分為以下幾步:

  1. readFromLocalLog():調(diào)用該方法,從本地日志拉取相應的數(shù)據(jù)梭姓;
  2. 判斷 Fetch 請求來源霜幼,如果來自副本同步,那么更新該副本的 the end offset 記錄誉尖,如果該副本不在 isr 中罪既,并判斷是否需要更新 isr;
  3. 返回結果铡恕,滿足條件的話立馬返回琢感,否則的話,通過延遲操作探熔,延遲返回結果猩谊。

readFromLocalLog
readFromLocalLog() 方法的實現(xiàn)如下:

/**
 * Read from multiple topic partitions at the given offset up to maxSize bytes
 */
//note: 按 offset 從 tp 列表中讀取相應的數(shù)據(jù)
def readFromLocalLog(replicaId: Int,
                     fetchOnlyFromLeader: Boolean,
                     readOnlyCommitted: Boolean,
                     fetchMaxBytes: Int,
                     hardMaxBytesLimit: Boolean,
                     readPartitionInfo: Seq[(TopicPartition, PartitionData)],
                     quota: ReplicaQuota): Seq[(TopicPartition, LogReadResult)] = {

  def read(tp: TopicPartition, fetchInfo: PartitionData, limitBytes: Int, minOneMessage: Boolean): LogReadResult = {
    val offset = fetchInfo.offset
    val partitionFetchSize = fetchInfo.maxBytes

    BrokerTopicStats.getBrokerTopicStats(tp.topic).totalFetchRequestRate.mark()
    BrokerTopicStats.getBrokerAllTopicsStats().totalFetchRequestRate.mark()

    try {
      trace(s"Fetching log segment for partition $tp, offset $offset, partition fetch size $partitionFetchSize, " +
        s"remaining response limit $limitBytes" +
        (if (minOneMessage) s", ignoring response/partition size limits" else ""))

      // decide whether to only fetch from leader
      //note: 根據(jù)決定 [是否只從 leader 讀取數(shù)據(jù)] 來獲取相應的副本
      //note: 根據(jù) tp 獲取 Partition 對象, 在獲取相應的 Replica 對象
      val localReplica = if (fetchOnlyFromLeader)
        getLeaderReplicaIfLocal(tp)
      else
        getReplicaOrException(tp)

      // decide whether to only fetch committed data (i.e. messages below high watermark)
      //note: 獲取 hw 位置,副本同步不設置這個值
      val maxOffsetOpt = if (readOnlyCommitted)
        Some(localReplica.highWatermark.messageOffset)
      else
        None

      /* Read the LogOffsetMetadata prior to performing the read from the log.
       * We use the LogOffsetMetadata to determine if a particular replica is in-sync or not.
       * Using the log end offset after performing the read can lead to a race condition
       * where data gets appended to the log immediately after the replica has consumed from it
       * This can cause a replica to always be out of sync.
       */
      val initialLogEndOffset = localReplica.logEndOffset.messageOffset //note: the end offset
      val initialHighWatermark = localReplica.highWatermark.messageOffset //note: hw
      val fetchTimeMs = time.milliseconds
      val logReadInfo = localReplica.log match {
        case Some(log) =>
          val adjustedFetchSize = math.min(partitionFetchSize, limitBytes)

          // Try the read first, this tells us whether we need all of adjustedFetchSize for this partition
          //note: 從指定的 offset 位置開始讀取數(shù)據(jù)祭刚,副本同步不需要 maxOffsetOpt
          val fetch = log.read(offset, adjustedFetchSize, maxOffsetOpt, minOneMessage)

          // If the partition is being throttled, simply return an empty set.
          if (shouldLeaderThrottle(quota, tp, replicaId)) //note: 如果被限速了,那么返回 空 集合
            FetchDataInfo(fetch.fetchOffsetMetadata, MemoryRecords.EMPTY)
          // For FetchRequest version 3, we replace incomplete message sets with an empty one as consumers can make
          // progress in such cases and don't need to report a `RecordTooLargeException`
          else if (!hardMaxBytesLimit && fetch.firstEntryIncomplete)
            FetchDataInfo(fetch.fetchOffsetMetadata, MemoryRecords.EMPTY)
          else fetch

        case None =>
          error(s"Leader for partition $tp does not have a local log")
          FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY)
      }

      //note: 返回最后的結果,返回的都是 LogReadResult 對象
      LogReadResult(info = logReadInfo,
                    hw = initialHighWatermark,
                    leaderLogEndOffset = initialLogEndOffset,
                    fetchTimeMs = fetchTimeMs,
                    readSize = partitionFetchSize,
                    exception = None)
    } catch {
      // NOTE: Failed fetch requests metric is not incremented for known exceptions since it
      // is supposed to indicate un-expected failure of a broker in handling a fetch request
      case e@ (_: UnknownTopicOrPartitionException |
               _: NotLeaderForPartitionException |
               _: ReplicaNotAvailableException |
               _: OffsetOutOfRangeException) =>
        LogReadResult(info = FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY),
                      hw = -1L,
                      leaderLogEndOffset = -1L,
                      fetchTimeMs = -1L,
                      readSize = partitionFetchSize,
                      exception = Some(e))
      case e: Throwable =>
        BrokerTopicStats.getBrokerTopicStats(tp.topic).failedFetchRequestRate.mark()
        BrokerTopicStats.getBrokerAllTopicsStats().failedFetchRequestRate.mark()
        error(s"Error processing fetch operation on partition $tp, offset $offset", e)
        LogReadResult(info = FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY),
                      hw = -1L,
                      leaderLogEndOffset = -1L,
                      fetchTimeMs = -1L,
                      readSize = partitionFetchSize,
                      exception = Some(e))
    }
  }

  var limitBytes = fetchMaxBytes
  val result = new mutable.ArrayBuffer[(TopicPartition, LogReadResult)]
  var minOneMessage = !hardMaxBytesLimit
  readPartitionInfo.foreach { case (tp, fetchInfo) =>
    val readResult = read(tp, fetchInfo, limitBytes, minOneMessage) //note: 讀取該 tp 的數(shù)據(jù)
    val messageSetSize = readResult.info.records.sizeInBytes
    // Once we read from a non-empty partition, we stop ignoring request and partition level size limits
    if (messageSetSize > 0)
      minOneMessage = false
    limitBytes = math.max(0, limitBytes - messageSetSize)
    result += (tp -> readResult)
  }
  result
}

readFromLocalLog() 方法的處理過程:

  1. 先根據(jù)要拉取的 topic-partition 獲取對應的 Partition 對象,根據(jù) Partition 對象獲取對應的 Replica 對象墙牌;
  2. 根據(jù) Replica 對象找到對應的 Log 對象涡驮,然后調(diào)用其 read() 方法從指定的位置讀取數(shù)據(jù)。

存儲層對 Fetch 請求的處理
每個 Replica 會對應一個 log 對象喜滨,而每個 log 對象會管理相應的 LogSegment 實例捉捅。

read()
Log 對象的 read() 方法的實現(xiàn)如下所示:

//note: 從指定 offset 開始讀取數(shù)據(jù)
def read(startOffset: Long, maxLength: Int, maxOffset: Option[Long] = None, minOneMessage: Boolean = false): FetchDataInfo = {
  trace("Reading %d bytes from offset %d in log %s of length %d bytes".format(maxLength, startOffset, name, size))

  // Because we don't use lock for reading, the synchronization is a little bit tricky.
  // We create the local variables to avoid race conditions with updates to the log.
  val currentNextOffsetMetadata = nextOffsetMetadata
  val next = currentNextOffsetMetadata.messageOffset
  if(startOffset == next)
    return FetchDataInfo(currentNextOffsetMetadata, MemoryRecords.EMPTY)

  //note: 先查找對應的日志分段(segment)
  var entry = segments.floorEntry(startOffset)

  // attempt to read beyond the log end offset is an error
  if(startOffset > next || entry == null)
    throw new OffsetOutOfRangeException("Request for offset %d but we only have log segments in the range %d to %d.".format(startOffset, segments.firstKey, next))

  // Do the read on the segment with a base offset less than the target offset
  // but if that segment doesn't contain any messages with an offset greater than that
  // continue to read from successive segments until we get some messages or we reach the end of the log
  while(entry != null) {
    // If the fetch occurs on the active segment, there might be a race condition where two fetch requests occur after
    // the message is appended but before the nextOffsetMetadata is updated. In that case the second fetch may
    // cause OffsetOutOfRangeException. To solve that, we cap the reading up to exposed position instead of the log
    // end of the active segment.
    //note: 如果 Fetch 請求剛好發(fā)生在 the active segment 上,當多個 Fetch 請求同時處理,如果 nextOffsetMetadata 更新不及時,可能會導致
    //note: 發(fā)送 OffsetOutOfRangeException 異常; 為了解決這個問題, 這里能讀取的最大位置是對應的物理位置(exposedPos)
    //note: 而不是 the log end of the active segment.
    val maxPosition = {
      if (entry == segments.lastEntry) {
        //note: nextOffsetMetadata 對應的實際物理位置
        val exposedPos = nextOffsetMetadata.relativePositionInSegment.toLong
        // Check the segment again in case a new segment has just rolled out.
        if (entry != segments.lastEntry) //note: 可能會有新的 segment 產(chǎn)生,所以需要再次判斷
          // New log segment has rolled out, we can read up to the file end.
          entry.getValue.size
        else
          exposedPos
      } else {
        entry.getValue.size
      }
    }
    //note: 從 segment 中讀取相應的數(shù)據(jù)
    val fetchInfo = entry.getValue.read(startOffset, maxOffset, maxLength, maxPosition, minOneMessage)
    if(fetchInfo == null) { //note: 如果該日志分段沒有讀取到數(shù)據(jù),則讀取更高的日志分段
      entry = segments.higherEntry(entry.getKey)
    } else {
      return fetchInfo
    }
  }

  // okay we are beyond the end of the last segment with no data fetched although the start offset is in range,
  // this can happen when all messages with offset larger than start offsets have been deleted.
  // In this case, we will return the empty set with log end offset metadata
  FetchDataInfo(nextOffsetMetadata, MemoryRecords.EMPTY)
}

從實現(xiàn)可以看出,該方法會先查找對應的 Segment 對象(日志分段)虽风,然后循環(huán)直到讀取到數(shù)據(jù)結束棒口,如果當前的日志分段沒有讀取到相應的數(shù)據(jù),那么會更新日志分段及對應的最大位置辜膝。

日志分段實際上是邏輯概念无牵,它管理了物理概念的一個數(shù)據(jù)文件、一個時間索引文件和一個 offset 索引文件厂抖,讀取日志分段時茎毁,會先讀取 offset 索引文件再讀取數(shù)據(jù)文件,具體步驟如下:

  1. 根據(jù)要讀取的起始偏移量(startOffset)讀取 offset 索引文件中對應的物理位置忱辅;
  2. 查找 offset 索引文件最后返回:起始偏移量對應的最近物理位置(startPosition)七蜘;
  3. 根據(jù) startPosition 直接定位到數(shù)據(jù)文件,然后讀取數(shù)據(jù)文件內(nèi)容墙懂;
  4. 最多能讀到數(shù)據(jù)文件的結束位置(maxPosition)橡卤。

LogSegment
關乎 數(shù)據(jù)文件、offset 索引文件和時間索引文件真正的操作都是在 LogSegment 對象中的损搬,日志讀取也與這個方法息息相關碧库。

read()
read() 方法的實現(xiàn)如下:

//note: 讀取日志分段(副本同步不會設置 maxSize)
def read(startOffset: Long, maxOffset: Option[Long], maxSize: Int, maxPosition: Long = size,
         minOneMessage: Boolean = false): FetchDataInfo = {
  if (maxSize < 0)
    throw new IllegalArgumentException("Invalid max size for log read (%d)".format(maxSize))

  //note: log 文件物理長度
  val logSize = log.sizeInBytes // this may change, need to save a consistent copy
  //note: 將起始的 offset 轉換為起始的實際物理位置
  val startOffsetAndSize = translateOffset(startOffset)

  // if the start position is already off the end of the log, return null
  if (startOffsetAndSize == null)
    return null

  val startPosition = startOffsetAndSize.position.toInt
  val offsetMetadata = new LogOffsetMetadata(startOffset, this.baseOffset, startPosition)

  val adjustedMaxSize =
    if (minOneMessage) math.max(maxSize, startOffsetAndSize.size)
    else maxSize

  // return a log segment but with zero size in the case below
  if (adjustedMaxSize == 0)
    return FetchDataInfo(offsetMetadata, MemoryRecords.EMPTY)

  // calculate the length of the message set to read based on whether or not they gave us a maxOffset
  //note: 計算讀取的長度
  val length = maxOffset match {
    //note: 副本同步時的計算方式
    case None =>
      // no max offset, just read until the max position
      min((maxPosition - startPosition).toInt, adjustedMaxSize) //note: 直接讀取到最大的位置
    //note: consumer 拉取時,計算方式
    case Some(offset) =>
      // there is a max offset, translate it to a file position and use that to calculate the max read size;
      // when the leader of a partition changes, it's possible for the new leader's high watermark to be less than the
      // true high watermark in the previous leader for a short window. In this window, if a consumer fetches on an
      // offset between new leader's high watermark and the log end offset, we want to return an empty response.
      if (offset < startOffset)
        return FetchDataInfo(offsetMetadata, MemoryRecords.EMPTY, firstEntryIncomplete = false)
      val mapping = translateOffset(offset, startPosition)
      val endPosition =
        if (mapping == null)
          logSize // the max offset is off the end of the log, use the end of the file
        else
          mapping.position
      min(min(maxPosition, endPosition) - startPosition, adjustedMaxSize).toInt
  }

  //note: 根據(jù)起始的物理位置和讀取長度讀取數(shù)據(jù)文件
  FetchDataInfo(offsetMetadata, log.read(startPosition, length),
    firstEntryIncomplete = adjustedMaxSize < startOffsetAndSize.size)
}

從上面的實現(xiàn)來看柜与,上述過程分為以下三部分:

  1. 根據(jù) startOffset 得到實際的物理位置(translateOffset());
  2. 計算要讀取的實際物理長度谈为;
  3. 根據(jù)實際起始物理位置和要讀取實際物理長度讀取數(shù)據(jù)文件旅挤。

translateOffset()
translateOffset() 方法的實現(xiàn)過程主要分為兩部分:

  1. 查找 offset 索引文件:調(diào)用 offset 索引文件的 lookup() 查找方法,獲取離 startOffset 最接近的物理位置伞鲫;
  2. 調(diào)用數(shù)據(jù)文件的 searchFor() 方法粘茄,從指定的物理位置開始讀取每條數(shù)據(jù),知道找到對應 offset 的物理位置秕脓。
private[log] def translateOffset(offset: Long, startingFilePosition: Int = 0): LogEntryPosition = {
  //note: 獲取離 offset 最新的物理位置,返回包括 offset 和物理位置(不是準確值)
  val mapping = index.lookup(offset)
  //note: 從指定的位置開始消費,直到找到 offset 對應的實際物理位置,返回包括 offset 和物理位置(準確值)
  log.searchForOffsetWithSize(offset, max(mapping.position, startingFilePosition))
}

查找 offset 索引文件
offset 索引文件是使用內(nèi)存映射的方式加載到內(nèi)存中的柒瓣,在查詢的過程中,內(nèi)存映射是會發(fā)生變化吠架,所以在 lookup() 中先拷貝出來了一個(idx)芙贫,然后再進行查詢,具體實現(xiàn)如下:

//note: 查找小于等于指定 offset 的最大 offset,并且返回對應的 offset 和實際物理位置
def lookup(targetOffset: Long): OffsetPosition = {
  maybeLock(lock) {
    val idx = mmap.duplicate //note: 查詢時,mmap 會發(fā)生變化,先復制出來一個
    val slot = indexSlotFor(idx, targetOffset, IndexSearchType.KEY) //note: 二分查找
    if(slot == -1)
      OffsetPosition(baseOffset, 0)
    else
      //note: 先計算絕對偏移量,再計算物理位置
      parseEntry(idx, slot).asInstanceOf[OffsetPosition]
  }
}

override def parseEntry(buffer: ByteBuffer, n: Int): IndexEntry = {
    OffsetPosition(baseOffset + relativeOffset(buffer, n), physical(buffer, n))
}

private def relativeOffset(buffer: ByteBuffer, n: Int): Int = buffer.getInt(n * entrySize)

private def physical(buffer: ByteBuffer, n: Int): Int = buffer.getInt(n * entrySize + 4)

關于 relativeOffset 和 physical 的計算方法傍药,可以參考下面這張圖(來自《Kafka 計算內(nèi)幕》):

image.png

根據(jù)索引條目編號查找偏移量的值和物理位置的值

搜索數(shù)據(jù)文件獲取準確的物理位置
前面通過 offset 索引文件獲取的物理位置是一個接近值磺平,下面通過實際讀取數(shù)據(jù)文件將會得到一個真正的準確值,它是通過遍歷數(shù)據(jù)文件實現(xiàn)的拐辽。

/**
 * Search forward for the file position of the last offset that is greater than or equal to the target offset
 * and return its physical position and the size of the message (including log overhead) at the returned offset. If
 * no such offsets are found, return null.
 *
 * @param targetOffset The offset to search for.
 * @param startingPosition The starting position in the file to begin searching from.
 */
public LogEntryPosition searchForOffsetWithSize(long targetOffset, int startingPosition) {
    for (FileChannelLogEntry entry : shallowEntriesFrom(startingPosition)) {
        long offset = entry.offset();
        if (offset >= targetOffset)
            return new LogEntryPosition(offset, entry.position(), entry.sizeInBytes());
    }
    return null;
}

到這里拣挪,一個 Fetch 請求的處理過程算是完成了。

最后編輯于
?著作權歸作者所有,轉載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末俱诸,一起剝皮案震驚了整個濱河市菠劝,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌睁搭,老刑警劉巖赶诊,帶你破解...
    沈念sama閱讀 216,496評論 6 501
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異园骆,居然都是意外死亡舔痪,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,407評論 3 392
  • 文/潘曉璐 我一進店門锌唾,熙熙樓的掌柜王于貴愁眉苦臉地迎上來辙喂,“玉大人,你說我怎么就攤上這事鸠珠∥『模” “怎么了?”我有些...
    開封第一講書人閱讀 162,632評論 0 353
  • 文/不壞的土叔 我叫張陵渐排,是天一觀的道長炬太。 經(jīng)常有香客問我,道長驯耻,這世上最難降的妖魔是什么亲族? 我笑而不...
    開封第一講書人閱讀 58,180評論 1 292
  • 正文 為了忘掉前任炒考,我火速辦了婚禮,結果婚禮上霎迫,老公的妹妹穿的比我還像新娘斋枢。我一直安慰自己,他們只是感情好知给,可當我...
    茶點故事閱讀 67,198評論 6 388
  • 文/花漫 我一把揭開白布瓤帚。 她就那樣靜靜地躺著,像睡著了一般涩赢。 火紅的嫁衣襯著肌膚如雪戈次。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,165評論 1 299
  • 那天筒扒,我揣著相機與錄音怯邪,去河邊找鬼。 笑死花墩,一個胖子當著我的面吹牛悬秉,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播冰蘑,決...
    沈念sama閱讀 40,052評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼搂捧,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了懂缕?” 一聲冷哼從身側響起,我...
    開封第一講書人閱讀 38,910評論 0 274
  • 序言:老撾萬榮一對情侶失蹤王凑,失蹤者是張志新(化名)和其女友劉穎搪柑,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體索烹,經(jīng)...
    沈念sama閱讀 45,324評論 1 310
  • 正文 獨居荒郊野嶺守林人離奇死亡工碾,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,542評論 2 332
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了百姓。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片渊额。...
    茶點故事閱讀 39,711評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖垒拢,靈堂內(nèi)的尸體忽然破棺而出旬迹,到底是詐尸還是另有隱情,我是刑警寧澤求类,帶...
    沈念sama閱讀 35,424評論 5 343
  • 正文 年R本政府宣布奔垦,位于F島的核電站,受9級特大地震影響尸疆,放射性物質發(fā)生泄漏椿猎。R本人自食惡果不足惜惶岭,卻給世界環(huán)境...
    茶點故事閱讀 41,017評論 3 326
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望犯眠。 院中可真熱鬧按灶,春花似錦、人聲如沸筐咧。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,668評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽嗜浮。三九已至羡亩,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間危融,已是汗流浹背畏铆。 一陣腳步聲響...
    開封第一講書人閱讀 32,823評論 1 269
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留吉殃,地道東北人辞居。 一個月前我還...
    沈念sama閱讀 47,722評論 2 368
  • 正文 我出身青樓,卻偏偏與公主長得像蛋勺,于是被迫代替她去往敵國和親瓦灶。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 44,611評論 2 353

推薦閱讀更多精彩內(nèi)容