[TOC]
日志的讀寫操作是 Kafka 存儲層最重要的內(nèi)容芬为,本文會以 Server 端處理 Fetch 請求的過程為入口模闲,一步步深入到底層的 Log 實例部分腾节。與 Produce 請求不一樣的地方是忘嫉,對于 Fetch 請求,是有兩種不同的來源:consumer 和 follower案腺,consumer 讀取數(shù)據(jù)與副本同步數(shù)據(jù)都是通過向 leader 發(fā)送 Fetch 請求來實現(xiàn)的庆冕,在對這兩種不同情況處理過程中,其底層的實現(xiàn)是統(tǒng)一的劈榨,只是實現(xiàn)方法的參數(shù)不同而已访递,在本文中會詳細講述對這兩種不同情況的處理。
Fetch 請求處理的整體流程
Fetch 請求(讀請求)的處理與 Produce 請求(寫請求)的整體流程非常類似同辣,讀和寫由最上面的抽象層做入口拷姿,最終還是在存儲層的 Log 對象實例進行真正的讀寫操作,在這一點上邑闺,Kafka 封裝的非常清晰跌前,這樣的系統(tǒng)設計是非常值得學習的,甚至可以作為分布式系統(tǒng)的模范系統(tǒng)來學習陡舅。
Fetch 請求處理的整體流程如下圖所示抵乓,與 Produce 請求的處理流程非常相似。
Fetch 請求的來源
那 Server 要處理的 Fetch 請求有幾種類型呢?來自于哪里呢灾炭?第一個來源肯定是 Consumer茎芋,Consumer 在消費數(shù)據(jù)時會向 Server 端發(fā)送 Fetch 請求,那么是不是還沒有其他的類型蜈出,對 Kafka 比較熟悉的同學大概會猜到田弥,還有一種就是:副本同步,follower 在從 leader 同步數(shù)據(jù)時铡原,也是發(fā)送的 Fetch 請求偷厦,下面看下這兩種情況的具體實現(xiàn)(代碼會進行簡化,并不完全與源碼一致燕刻,便于理解)
Consumer Fetch 請求
Consumer 的 Fetch 請求是在 poll 方法中調(diào)用的只泼,F(xiàn)etcher 請求的構造過程及發(fā)送如下所示:
/**
* Set-up a fetch request for any node that we have assigned partitions for which doesn't already have
* an in-flight fetch or pending fetch data.
* @return number of fetches sent
*/
//note: 向訂閱的所有 partition (只要該 leader 暫時沒有拉取請求)所在 leader 發(fā)送 fetch請求
public int sendFetches() {
//note: 1 創(chuàng)建 Fetch Request
Map<Node, FetchRequest.Builder> fetchRequestMap = createFetchRequests();
for (Map.Entry<Node, FetchRequest.Builder> fetchEntry : fetchRequestMap.entrySet()) {
final FetchRequest.Builder request = fetchEntry.getValue();
final Node fetchTarget = fetchEntry.getKey();
log.debug("Sending fetch for partitions {} to broker {}", request.fetchData().keySet(), fetchTarget);
//note: 2 發(fā)送 Fetch Request
client.send(fetchTarget, request)
.addListener(new RequestFutureListener<ClientResponse>() {
@Override
public void onSuccess(ClientResponse resp) {
...
}
@Override
public void onFailure(RuntimeException e) {
...
}
});
}
return fetchRequestMap.size();
}
/**
* Create fetch requests for all nodes for which we have assigned partitions
* that have no existing requests in flight.
*/
//note: 為所有 node 創(chuàng)建 fetch request
private Map<Node, FetchRequest.Builder> createFetchRequests() {
// create the fetch info
Cluster cluster = metadata.fetch();
Map<Node, LinkedHashMap<TopicPartition, FetchRequest.PartitionData>> fetchable = new LinkedHashMap<>();
for (TopicPartition partition : fetchablePartitions()) {
Node node = cluster.leaderFor(partition);
if (node == null) {
metadata.requestUpdate();
} else if (this.client.pendingRequestCount(node) == 0) {
// if there is a leader and no in-flight requests, issue a new fetch
LinkedHashMap<TopicPartition, FetchRequest.PartitionData> fetch = fetchable.get(node);
if (fetch == null) {
fetch = new LinkedHashMap<>();
fetchable.put(node, fetch);
}
long position = this.subscriptions.position(partition);
//note: 要 fetch 的 position 以及 fetch 的大小
fetch.put(partition, new FetchRequest.PartitionData(position, this.fetchSize));
log.trace("Added fetch request for partition {} at offset {} to node {}", partition, position, node);
} else {
log.trace("Skipping fetch for partition {} because there is an in-flight request to {}", partition, node);
}
}
// create the fetches
Map<Node, FetchRequest.Builder> requests = new HashMap<>();
for (Map.Entry<Node, LinkedHashMap<TopicPartition, FetchRequest.PartitionData>> entry : fetchable.entrySet()) {
Node node = entry.getKey();
// 構造 Fetch 請求
FetchRequest.Builder fetch = new FetchRequest.Builder(this.maxWaitMs, this.minBytes, entry.getValue()).
setMaxBytes(this.maxBytes);//note: 構建 Fetch Request
requests.put(node, fetch);
}
return requests;
}
從上面可以看出,Consumer 的 Fetcher 請求構造為:
FetchRequest.Builder fetch = new FetchRequest.Builder(this.maxWaitMs, this.minBytes, entry.getValue()).
setMaxBytes(this.maxBytes);//note: 構建 Fetch Request
Replica 同步 Fetch 請求
在 Replica 同步(Replica 同步流程的講解將會在下篇文章中詳細展開)的 Fetch 請求中卵洗,其 Fetch 請求的構造如下所示:
//note: 構造 Fetch 請求
protected def buildFetchRequest(partitionMap: Seq[(TopicPartition, PartitionFetchState)]): FetchRequest = {
val requestMap = new util.LinkedHashMap[TopicPartition, JFetchRequest.PartitionData]
partitionMap.foreach { case (topicPartition, partitionFetchState) =>
// We will not include a replica in the fetch request if it should be throttled.
if (partitionFetchState.isActive && !shouldFollowerThrottle(quota, topicPartition))
requestMap.put(topicPartition, new JFetchRequest.PartitionData(partitionFetchState.offset, fetchSize))
}
//note: 關鍵在于 setReplicaId 方法,設置了 replicaId, consumer 的該值為 CONSUMER_REPLICA_ID(-1)
val requestBuilder = new JFetchRequest.Builder(maxWait, minBytes, requestMap).
setReplicaId(replicaId).setMaxBytes(maxBytes)
requestBuilder.setVersion(fetchRequestVersion)
new FetchRequest(requestBuilder)
}
與 Consumer Fetch 請求進行對比请唱,這里區(qū)別僅在于在構造 FetchRequest 時,調(diào)用了 setReplicaId() 方法設置了對應的 replicaId过蹂,而 Consumer 在構造時則沒有進行設置十绑,該值默認為 CONSUMER_REPLICA_ID,即 -1酷勺,這個值是作為 Consumer 的 Fetch 請求與 Replica 同步的 Fetch 請求的區(qū)分本橙。
Server 端的處理
這里開始真正講解 Fetch 請求的處理過程,會按照前面圖中的處理流程開始講解鸥印,本節(jié)主要是 Server 端抽象層的內(nèi)容勋功。
KafkaApis 如何處理 Fetch 請求
關于 Fetch 請求的處理,如下所示:
/**
* Handle a fetch request
*/
def handleFetchRequest(request: RequestChannel.Request) {
val fetchRequest = request.body.asInstanceOf[FetchRequest]
val versionId = request.header.apiVersion
val clientId = request.header.clientId
//note: 判斷 tp 是否存在以及是否有 Describe 權限
val (existingAndAuthorizedForDescribeTopics, nonExistingOrUnauthorizedForDescribeTopics) = fetchRequest.fetchData.asScala.toSeq.partition {
case (tp, _) => authorize(request.session, Describe, new Resource(auth.Topic, tp.topic)) && metadataCache.contains(tp.topic)
}
//note: 判斷 tp 是否有 Read 權限
val (authorizedRequestInfo, unauthorizedForReadRequestInfo) = existingAndAuthorizedForDescribeTopics.partition {
case (tp, _) => authorize(request.session, Read, new Resource(auth.Topic, tp.topic))
}
//note: 不存在或沒有 Describe 權限的 topic 返回 UNKNOWN_TOPIC_OR_PARTITION 錯誤
val nonExistingOrUnauthorizedForDescribePartitionData = nonExistingOrUnauthorizedForDescribeTopics.map {
case (tp, _) => (tp, new FetchResponse.PartitionData(Errors.UNKNOWN_TOPIC_OR_PARTITION.code, -1, MemoryRecords.EMPTY))
}
//note: 沒有 Read 權限的 topic 返回 TOPIC_AUTHORIZATION_FAILED 錯誤
val unauthorizedForReadPartitionData = unauthorizedForReadRequestInfo.map {
case (tp, _) => (tp, new FetchResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED.code, -1, MemoryRecords.EMPTY))
}
// the callback for sending a fetch response
def sendResponseCallback(responsePartitionData: Seq[(TopicPartition, FetchPartitionData)]) {
....
def fetchResponseCallback(delayTimeMs: Int) {
trace(s"Sending fetch response to client $clientId of " +
s"${convertedPartitionData.map { case (_, v) => v.records.sizeInBytes }.sum} bytes")
val fetchResponse = if (delayTimeMs > 0) new FetchResponse(versionId, fetchedPartitionData, delayTimeMs) else response
requestChannel.sendResponse(new RequestChannel.Response(request, fetchResponse))
}
// When this callback is triggered, the remote API call has completed
request.apiRemoteCompleteTimeMs = time.milliseconds
//note: 配額情況的處理
if (fetchRequest.isFromFollower) {
// We've already evaluated against the quota and are good to go. Just need to record it now.
val responseSize = sizeOfThrottledPartitions(versionId, fetchRequest, mergedPartitionData, quotas.leader)
quotas.leader.record(responseSize)
fetchResponseCallback(0)
} else {
quotas.fetch.recordAndMaybeThrottle(request.session.sanitizedUser, clientId, response.sizeOf, fetchResponseCallback)
}
}
if (authorizedRequestInfo.isEmpty)
sendResponseCallback(Seq.empty)
else {
// call the replica manager to fetch messages from the local replica
//note: 從 replica 上拉取數(shù)據(jù),滿足條件后調(diào)用回調(diào)函數(shù)進行返回
replicaManager.fetchMessages(
fetchRequest.maxWait.toLong, //note: 拉取請求最長的等待時間
fetchRequest.replicaId, //note: Replica 編號库说,Consumer 的為 -1
fetchRequest.minBytes, //note: 拉取請求設置的最小拉取字節(jié)
fetchRequest.maxBytes, //note: 拉取請求設置的最大拉取字節(jié)
versionId <= 2,
authorizedRequestInfo,
replicationQuota(fetchRequest),
sendResponseCallback)
}
}
Fetch 請求處理的真正實現(xiàn)是在 replicaManager 的 fetchMessages() 方法中狂鞋,在這里,可以看出潜的,無論是 Fetch 請求還是 Produce 請求骚揍,都是通過副本管理器來實現(xiàn)的,副本管理器(ReplicaManager)管理的對象是分區(qū)實例(Partition)啰挪,而每個分區(qū)都會與相應的副本實例對應(Replica)信不,在這個節(jié)點上的副本又會與唯一的 Log 實例對應,正如流程圖的上半部分一樣亡呵,Server 就是通過這幾部分抽象概念來管理真正存儲層的內(nèi)容抽活。
ReplicaManager 如何處理 Fetch 請求
ReplicaManger 處理 Fetch 請求的入口在 fetchMessages() 方法。
fetchMessages
fetchMessages() 方法的具體如下:
/**
* Fetch messages from the leader replica, and wait until enough data can be fetched and return;
* the callback function will be triggered either when timeout or required fetch info is satisfied
*/
//note: 從 leader 拉取數(shù)據(jù),等待拉取到足夠的數(shù)據(jù)或者達到 timeout 時間后返回拉取的結果
def fetchMessages(timeout: Long,
replicaId: Int,
fetchMinBytes: Int,
fetchMaxBytes: Int,
hardMaxBytesLimit: Boolean,
fetchInfos: Seq[(TopicPartition, PartitionData)],
quota: ReplicaQuota = UnboundedQuota,
responseCallback: Seq[(TopicPartition, FetchPartitionData)] => Unit) {
val isFromFollower = replicaId >= 0 //note: 判斷請求是來自 consumer (這個值為 -1)還是副本同步
//note: 默認都是從 leader 拉取锰什,推測這個值只是為了后續(xù)能從 follower 消費數(shù)據(jù)而設置的
val fetchOnlyFromLeader: Boolean = replicaId != Request.DebuggingConsumerId
//note: 如果拉取請求來自 consumer(true),只拉取 HW 以內(nèi)的數(shù)據(jù),如果是來自 Replica 同步,則沒有該限制(false)下硕。
val fetchOnlyCommitted: Boolean = ! Request.isValidBrokerId(replicaId)
// read from local logs
//note:獲取本地日志
val logReadResults = readFromLocalLog(
replicaId = replicaId,
fetchOnlyFromLeader = fetchOnlyFromLeader,
readOnlyCommitted = fetchOnlyCommitted,
fetchMaxBytes = fetchMaxBytes,
hardMaxBytesLimit = hardMaxBytesLimit,
readPartitionInfo = fetchInfos,
quota = quota)
// if the fetch comes from the follower,
// update its corresponding log end offset
//note: 如果 fetch 來自 broker 的副本同步,那么就更新相關的 log end offset
if(Request.isValidBrokerId(replicaId))
updateFollowerLogReadResults(replicaId, logReadResults)
// check if this fetch request can be satisfied right away
val logReadResultValues = logReadResults.map { case (_, v) => v }
val bytesReadable = logReadResultValues.map(_.info.records.sizeInBytes).sum
val errorReadingData = logReadResultValues.foldLeft(false) ((errorIncurred, readResult) =>
errorIncurred || (readResult.error != Errors.NONE))
// respond immediately if 1) fetch request does not want to wait
// 2) fetch request does not require any data
// 3) has enough data to respond
// 4) some error happens while reading data
//note: 如果滿足以下條件的其中一個,將會立馬返回結果:
//note: 1. timeout 達到; 2. 拉取結果為空; 3. 拉取到足夠的數(shù)據(jù); 4. 拉取是遇到 error
if (timeout <= 0 || fetchInfos.isEmpty || bytesReadable >= fetchMinBytes || errorReadingData) {
val fetchPartitionData = logReadResults.map { case (tp, result) =>
tp -> FetchPartitionData(result.error, result.hw, result.info.records)
}
responseCallback(fetchPartitionData)
} else {
//note: 其他情況下,延遲發(fā)送結果
// construct the fetch results from the read results
val fetchPartitionStatus = logReadResults.map { case (topicPartition, result) =>
val fetchInfo = fetchInfos.collectFirst {
case (tp, v) if tp == topicPartition => v
}.getOrElse(sys.error(s"Partition $topicPartition not found in fetchInfos"))
(topicPartition, FetchPartitionStatus(result.info.fetchOffsetMetadata, fetchInfo))
}
val fetchMetadata = FetchMetadata(fetchMinBytes, fetchMaxBytes, hardMaxBytesLimit, fetchOnlyFromLeader,
fetchOnlyCommitted, isFromFollower, replicaId, fetchPartitionStatus)
val delayedFetch = new DelayedFetch(timeout, fetchMetadata, this, quota, responseCallback)
// create a list of (topic, partition) pairs to use as keys for this delayed fetch operation
val delayedFetchKeys = fetchPartitionStatus.map { case (tp, _) => new TopicPartitionOperationKey(tp) }
// try to complete the request immediately, otherwise put it into the purgatory;
// this is because while the delayed fetch operation is being created, new requests
// may arrive and hence make this operation completable.
delayedFetchPurgatory.tryCompleteElseWatch(delayedFetch, delayedFetchKeys)
}
}
整體來說丁逝,分為以下幾步:
- readFromLocalLog():調(diào)用該方法,從本地日志拉取相應的數(shù)據(jù)梭姓;
- 判斷 Fetch 請求來源霜幼,如果來自副本同步,那么更新該副本的 the end offset 記錄誉尖,如果該副本不在 isr 中罪既,并判斷是否需要更新 isr;
- 返回結果铡恕,滿足條件的話立馬返回琢感,否則的話,通過延遲操作探熔,延遲返回結果猩谊。
readFromLocalLog
readFromLocalLog() 方法的實現(xiàn)如下:
/**
* Read from multiple topic partitions at the given offset up to maxSize bytes
*/
//note: 按 offset 從 tp 列表中讀取相應的數(shù)據(jù)
def readFromLocalLog(replicaId: Int,
fetchOnlyFromLeader: Boolean,
readOnlyCommitted: Boolean,
fetchMaxBytes: Int,
hardMaxBytesLimit: Boolean,
readPartitionInfo: Seq[(TopicPartition, PartitionData)],
quota: ReplicaQuota): Seq[(TopicPartition, LogReadResult)] = {
def read(tp: TopicPartition, fetchInfo: PartitionData, limitBytes: Int, minOneMessage: Boolean): LogReadResult = {
val offset = fetchInfo.offset
val partitionFetchSize = fetchInfo.maxBytes
BrokerTopicStats.getBrokerTopicStats(tp.topic).totalFetchRequestRate.mark()
BrokerTopicStats.getBrokerAllTopicsStats().totalFetchRequestRate.mark()
try {
trace(s"Fetching log segment for partition $tp, offset $offset, partition fetch size $partitionFetchSize, " +
s"remaining response limit $limitBytes" +
(if (minOneMessage) s", ignoring response/partition size limits" else ""))
// decide whether to only fetch from leader
//note: 根據(jù)決定 [是否只從 leader 讀取數(shù)據(jù)] 來獲取相應的副本
//note: 根據(jù) tp 獲取 Partition 對象, 在獲取相應的 Replica 對象
val localReplica = if (fetchOnlyFromLeader)
getLeaderReplicaIfLocal(tp)
else
getReplicaOrException(tp)
// decide whether to only fetch committed data (i.e. messages below high watermark)
//note: 獲取 hw 位置,副本同步不設置這個值
val maxOffsetOpt = if (readOnlyCommitted)
Some(localReplica.highWatermark.messageOffset)
else
None
/* Read the LogOffsetMetadata prior to performing the read from the log.
* We use the LogOffsetMetadata to determine if a particular replica is in-sync or not.
* Using the log end offset after performing the read can lead to a race condition
* where data gets appended to the log immediately after the replica has consumed from it
* This can cause a replica to always be out of sync.
*/
val initialLogEndOffset = localReplica.logEndOffset.messageOffset //note: the end offset
val initialHighWatermark = localReplica.highWatermark.messageOffset //note: hw
val fetchTimeMs = time.milliseconds
val logReadInfo = localReplica.log match {
case Some(log) =>
val adjustedFetchSize = math.min(partitionFetchSize, limitBytes)
// Try the read first, this tells us whether we need all of adjustedFetchSize for this partition
//note: 從指定的 offset 位置開始讀取數(shù)據(jù)祭刚,副本同步不需要 maxOffsetOpt
val fetch = log.read(offset, adjustedFetchSize, maxOffsetOpt, minOneMessage)
// If the partition is being throttled, simply return an empty set.
if (shouldLeaderThrottle(quota, tp, replicaId)) //note: 如果被限速了,那么返回 空 集合
FetchDataInfo(fetch.fetchOffsetMetadata, MemoryRecords.EMPTY)
// For FetchRequest version 3, we replace incomplete message sets with an empty one as consumers can make
// progress in such cases and don't need to report a `RecordTooLargeException`
else if (!hardMaxBytesLimit && fetch.firstEntryIncomplete)
FetchDataInfo(fetch.fetchOffsetMetadata, MemoryRecords.EMPTY)
else fetch
case None =>
error(s"Leader for partition $tp does not have a local log")
FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY)
}
//note: 返回最后的結果,返回的都是 LogReadResult 對象
LogReadResult(info = logReadInfo,
hw = initialHighWatermark,
leaderLogEndOffset = initialLogEndOffset,
fetchTimeMs = fetchTimeMs,
readSize = partitionFetchSize,
exception = None)
} catch {
// NOTE: Failed fetch requests metric is not incremented for known exceptions since it
// is supposed to indicate un-expected failure of a broker in handling a fetch request
case e@ (_: UnknownTopicOrPartitionException |
_: NotLeaderForPartitionException |
_: ReplicaNotAvailableException |
_: OffsetOutOfRangeException) =>
LogReadResult(info = FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY),
hw = -1L,
leaderLogEndOffset = -1L,
fetchTimeMs = -1L,
readSize = partitionFetchSize,
exception = Some(e))
case e: Throwable =>
BrokerTopicStats.getBrokerTopicStats(tp.topic).failedFetchRequestRate.mark()
BrokerTopicStats.getBrokerAllTopicsStats().failedFetchRequestRate.mark()
error(s"Error processing fetch operation on partition $tp, offset $offset", e)
LogReadResult(info = FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY),
hw = -1L,
leaderLogEndOffset = -1L,
fetchTimeMs = -1L,
readSize = partitionFetchSize,
exception = Some(e))
}
}
var limitBytes = fetchMaxBytes
val result = new mutable.ArrayBuffer[(TopicPartition, LogReadResult)]
var minOneMessage = !hardMaxBytesLimit
readPartitionInfo.foreach { case (tp, fetchInfo) =>
val readResult = read(tp, fetchInfo, limitBytes, minOneMessage) //note: 讀取該 tp 的數(shù)據(jù)
val messageSetSize = readResult.info.records.sizeInBytes
// Once we read from a non-empty partition, we stop ignoring request and partition level size limits
if (messageSetSize > 0)
minOneMessage = false
limitBytes = math.max(0, limitBytes - messageSetSize)
result += (tp -> readResult)
}
result
}
readFromLocalLog() 方法的處理過程:
- 先根據(jù)要拉取的 topic-partition 獲取對應的 Partition 對象,根據(jù) Partition 對象獲取對應的 Replica 對象墙牌;
- 根據(jù) Replica 對象找到對應的 Log 對象涡驮,然后調(diào)用其 read() 方法從指定的位置讀取數(shù)據(jù)。
存儲層對 Fetch 請求的處理
每個 Replica 會對應一個 log 對象喜滨,而每個 log 對象會管理相應的 LogSegment 實例捉捅。
read()
Log 對象的 read() 方法的實現(xiàn)如下所示:
//note: 從指定 offset 開始讀取數(shù)據(jù)
def read(startOffset: Long, maxLength: Int, maxOffset: Option[Long] = None, minOneMessage: Boolean = false): FetchDataInfo = {
trace("Reading %d bytes from offset %d in log %s of length %d bytes".format(maxLength, startOffset, name, size))
// Because we don't use lock for reading, the synchronization is a little bit tricky.
// We create the local variables to avoid race conditions with updates to the log.
val currentNextOffsetMetadata = nextOffsetMetadata
val next = currentNextOffsetMetadata.messageOffset
if(startOffset == next)
return FetchDataInfo(currentNextOffsetMetadata, MemoryRecords.EMPTY)
//note: 先查找對應的日志分段(segment)
var entry = segments.floorEntry(startOffset)
// attempt to read beyond the log end offset is an error
if(startOffset > next || entry == null)
throw new OffsetOutOfRangeException("Request for offset %d but we only have log segments in the range %d to %d.".format(startOffset, segments.firstKey, next))
// Do the read on the segment with a base offset less than the target offset
// but if that segment doesn't contain any messages with an offset greater than that
// continue to read from successive segments until we get some messages or we reach the end of the log
while(entry != null) {
// If the fetch occurs on the active segment, there might be a race condition where two fetch requests occur after
// the message is appended but before the nextOffsetMetadata is updated. In that case the second fetch may
// cause OffsetOutOfRangeException. To solve that, we cap the reading up to exposed position instead of the log
// end of the active segment.
//note: 如果 Fetch 請求剛好發(fā)生在 the active segment 上,當多個 Fetch 請求同時處理,如果 nextOffsetMetadata 更新不及時,可能會導致
//note: 發(fā)送 OffsetOutOfRangeException 異常; 為了解決這個問題, 這里能讀取的最大位置是對應的物理位置(exposedPos)
//note: 而不是 the log end of the active segment.
val maxPosition = {
if (entry == segments.lastEntry) {
//note: nextOffsetMetadata 對應的實際物理位置
val exposedPos = nextOffsetMetadata.relativePositionInSegment.toLong
// Check the segment again in case a new segment has just rolled out.
if (entry != segments.lastEntry) //note: 可能會有新的 segment 產(chǎn)生,所以需要再次判斷
// New log segment has rolled out, we can read up to the file end.
entry.getValue.size
else
exposedPos
} else {
entry.getValue.size
}
}
//note: 從 segment 中讀取相應的數(shù)據(jù)
val fetchInfo = entry.getValue.read(startOffset, maxOffset, maxLength, maxPosition, minOneMessage)
if(fetchInfo == null) { //note: 如果該日志分段沒有讀取到數(shù)據(jù),則讀取更高的日志分段
entry = segments.higherEntry(entry.getKey)
} else {
return fetchInfo
}
}
// okay we are beyond the end of the last segment with no data fetched although the start offset is in range,
// this can happen when all messages with offset larger than start offsets have been deleted.
// In this case, we will return the empty set with log end offset metadata
FetchDataInfo(nextOffsetMetadata, MemoryRecords.EMPTY)
}
從實現(xiàn)可以看出,該方法會先查找對應的 Segment 對象(日志分段)虽风,然后循環(huán)直到讀取到數(shù)據(jù)結束棒口,如果當前的日志分段沒有讀取到相應的數(shù)據(jù),那么會更新日志分段及對應的最大位置辜膝。
日志分段實際上是邏輯概念无牵,它管理了物理概念的一個數(shù)據(jù)文件、一個時間索引文件和一個 offset 索引文件厂抖,讀取日志分段時茎毁,會先讀取 offset 索引文件再讀取數(shù)據(jù)文件,具體步驟如下:
- 根據(jù)要讀取的起始偏移量(startOffset)讀取 offset 索引文件中對應的物理位置忱辅;
- 查找 offset 索引文件最后返回:起始偏移量對應的最近物理位置(startPosition)七蜘;
- 根據(jù) startPosition 直接定位到數(shù)據(jù)文件,然后讀取數(shù)據(jù)文件內(nèi)容墙懂;
- 最多能讀到數(shù)據(jù)文件的結束位置(maxPosition)橡卤。
LogSegment
關乎 數(shù)據(jù)文件、offset 索引文件和時間索引文件真正的操作都是在 LogSegment 對象中的损搬,日志讀取也與這個方法息息相關碧库。
read()
read() 方法的實現(xiàn)如下:
//note: 讀取日志分段(副本同步不會設置 maxSize)
def read(startOffset: Long, maxOffset: Option[Long], maxSize: Int, maxPosition: Long = size,
minOneMessage: Boolean = false): FetchDataInfo = {
if (maxSize < 0)
throw new IllegalArgumentException("Invalid max size for log read (%d)".format(maxSize))
//note: log 文件物理長度
val logSize = log.sizeInBytes // this may change, need to save a consistent copy
//note: 將起始的 offset 轉換為起始的實際物理位置
val startOffsetAndSize = translateOffset(startOffset)
// if the start position is already off the end of the log, return null
if (startOffsetAndSize == null)
return null
val startPosition = startOffsetAndSize.position.toInt
val offsetMetadata = new LogOffsetMetadata(startOffset, this.baseOffset, startPosition)
val adjustedMaxSize =
if (minOneMessage) math.max(maxSize, startOffsetAndSize.size)
else maxSize
// return a log segment but with zero size in the case below
if (adjustedMaxSize == 0)
return FetchDataInfo(offsetMetadata, MemoryRecords.EMPTY)
// calculate the length of the message set to read based on whether or not they gave us a maxOffset
//note: 計算讀取的長度
val length = maxOffset match {
//note: 副本同步時的計算方式
case None =>
// no max offset, just read until the max position
min((maxPosition - startPosition).toInt, adjustedMaxSize) //note: 直接讀取到最大的位置
//note: consumer 拉取時,計算方式
case Some(offset) =>
// there is a max offset, translate it to a file position and use that to calculate the max read size;
// when the leader of a partition changes, it's possible for the new leader's high watermark to be less than the
// true high watermark in the previous leader for a short window. In this window, if a consumer fetches on an
// offset between new leader's high watermark and the log end offset, we want to return an empty response.
if (offset < startOffset)
return FetchDataInfo(offsetMetadata, MemoryRecords.EMPTY, firstEntryIncomplete = false)
val mapping = translateOffset(offset, startPosition)
val endPosition =
if (mapping == null)
logSize // the max offset is off the end of the log, use the end of the file
else
mapping.position
min(min(maxPosition, endPosition) - startPosition, adjustedMaxSize).toInt
}
//note: 根據(jù)起始的物理位置和讀取長度讀取數(shù)據(jù)文件
FetchDataInfo(offsetMetadata, log.read(startPosition, length),
firstEntryIncomplete = adjustedMaxSize < startOffsetAndSize.size)
}
從上面的實現(xiàn)來看柜与,上述過程分為以下三部分:
- 根據(jù) startOffset 得到實際的物理位置(translateOffset());
- 計算要讀取的實際物理長度谈为;
- 根據(jù)實際起始物理位置和要讀取實際物理長度讀取數(shù)據(jù)文件旅挤。
translateOffset()
translateOffset() 方法的實現(xiàn)過程主要分為兩部分:
- 查找 offset 索引文件:調(diào)用 offset 索引文件的 lookup() 查找方法,獲取離 startOffset 最接近的物理位置伞鲫;
- 調(diào)用數(shù)據(jù)文件的 searchFor() 方法粘茄,從指定的物理位置開始讀取每條數(shù)據(jù),知道找到對應 offset 的物理位置秕脓。
private[log] def translateOffset(offset: Long, startingFilePosition: Int = 0): LogEntryPosition = {
//note: 獲取離 offset 最新的物理位置,返回包括 offset 和物理位置(不是準確值)
val mapping = index.lookup(offset)
//note: 從指定的位置開始消費,直到找到 offset 對應的實際物理位置,返回包括 offset 和物理位置(準確值)
log.searchForOffsetWithSize(offset, max(mapping.position, startingFilePosition))
}
查找 offset 索引文件
offset 索引文件是使用內(nèi)存映射的方式加載到內(nèi)存中的柒瓣,在查詢的過程中,內(nèi)存映射是會發(fā)生變化吠架,所以在 lookup() 中先拷貝出來了一個(idx)芙贫,然后再進行查詢,具體實現(xiàn)如下:
//note: 查找小于等于指定 offset 的最大 offset,并且返回對應的 offset 和實際物理位置
def lookup(targetOffset: Long): OffsetPosition = {
maybeLock(lock) {
val idx = mmap.duplicate //note: 查詢時,mmap 會發(fā)生變化,先復制出來一個
val slot = indexSlotFor(idx, targetOffset, IndexSearchType.KEY) //note: 二分查找
if(slot == -1)
OffsetPosition(baseOffset, 0)
else
//note: 先計算絕對偏移量,再計算物理位置
parseEntry(idx, slot).asInstanceOf[OffsetPosition]
}
}
override def parseEntry(buffer: ByteBuffer, n: Int): IndexEntry = {
OffsetPosition(baseOffset + relativeOffset(buffer, n), physical(buffer, n))
}
private def relativeOffset(buffer: ByteBuffer, n: Int): Int = buffer.getInt(n * entrySize)
private def physical(buffer: ByteBuffer, n: Int): Int = buffer.getInt(n * entrySize + 4)
關于 relativeOffset 和 physical 的計算方法傍药,可以參考下面這張圖(來自《Kafka 計算內(nèi)幕》):
根據(jù)索引條目編號查找偏移量的值和物理位置的值
搜索數(shù)據(jù)文件獲取準確的物理位置
前面通過 offset 索引文件獲取的物理位置是一個接近值磺平,下面通過實際讀取數(shù)據(jù)文件將會得到一個真正的準確值,它是通過遍歷數(shù)據(jù)文件實現(xiàn)的拐辽。
/**
* Search forward for the file position of the last offset that is greater than or equal to the target offset
* and return its physical position and the size of the message (including log overhead) at the returned offset. If
* no such offsets are found, return null.
*
* @param targetOffset The offset to search for.
* @param startingPosition The starting position in the file to begin searching from.
*/
public LogEntryPosition searchForOffsetWithSize(long targetOffset, int startingPosition) {
for (FileChannelLogEntry entry : shallowEntriesFrom(startingPosition)) {
long offset = entry.offset();
if (offset >= targetOffset)
return new LogEntryPosition(offset, entry.position(), entry.sizeInBytes());
}
return null;
}
到這里拣挪,一個 Fetch 請求的處理過程算是完成了。