轉(zhuǎn)載: http://matt33.com/2018/03/18/kafka-server-handle-produce-request/
[TOC]
produce 請(qǐng)求處理整體流程
在 Producer Client 端滋早,Producer 會(huì)維護(hù)一個(gè) ConcurrentMap<TopicPartition, Deque<RecordBatch>> batches 的變量适荣,然后會(huì)根據(jù) topic-partition 的 leader 信息漂问,將 leader 在同一臺(tái)機(jī)器上的 batch 放在一個(gè) request 中,發(fā)送到 server村刨,這樣可以節(jié)省很多網(wǎng)絡(luò)開銷,提高發(fā)送效率钝诚。
Producer Client 發(fā)送請(qǐng)求的方法實(shí)現(xiàn)如下:
//NOTE: 發(fā)送 produce 請(qǐng)求
private void sendProduceRequest(long now, int destination, short acks, int timeout, List<RecordBatch> batches) {
Map<TopicPartition, MemoryRecords> produceRecordsByPartition = new HashMap<>(batches.size());
final Map<TopicPartition, RecordBatch> recordsByPartition = new HashMap<>(batches.size());
for (RecordBatch batch : batches) {
TopicPartition tp = batch.topicPartition;
produceRecordsByPartition.put(tp, batch.records());
recordsByPartition.put(tp, batch);
}
ProduceRequest.Builder requestBuilder =
new ProduceRequest.Builder(acks, timeout, produceRecordsByPartition);
RequestCompletionHandler callback = new RequestCompletionHandler() {
public void onComplete(ClientResponse response) {
handleProduceResponse(response, recordsByPartition, time.milliseconds());
}
};
String nodeId = Integer.toString(destination);
ClientRequest clientRequest = client.newClientRequest(nodeId, requestBuilder, now, acks != 0, callback);
client.send(clientRequest, now);
log.trace("Sent produce request to {}: {}", nodeId, requestBuilder);
}
在發(fā)送 Produce 的請(qǐng)求里,Client 是把一個(gè) Map<TopicPartition, MemoryRecords> 類型的 produceRecordsByPartition 作為內(nèi)容發(fā)送給了 Server 端歇万,那么 Server 端是如何處理這個(gè)請(qǐng)求的呢?這就是本篇文章要講述的內(nèi)容勋陪,Server 處理這個(gè)請(qǐng)求的總體邏輯如下圖所示:
Server 端處理 produce 請(qǐng)求的總體過(guò)程
Broker 在收到 Produce 請(qǐng)求后贪磺,會(huì)有一個(gè) KafkaApis 進(jìn)行處理,KafkaApis 是 Server 端處理所有請(qǐng)求的入口诅愚,它會(huì)負(fù)責(zé)將請(qǐng)求的具體處理交給相應(yīng)的組件進(jìn)行處理寒锚,從上圖可以看到 Produce 請(qǐng)求是交給了 ReplicaManager 對(duì)象進(jìn)行處理了。
Server 端處理
Server 端的處理過(guò)程會(huì)按照上圖的流程一塊一塊去介紹违孝。
KafkaApis 處理 Produce 請(qǐng)求
KafkaApis 處理 produce 請(qǐng)求是在 handleProducerRequest() 方法中完成刹前,具體實(shí)現(xiàn)如下:
/**
* Handle a produce request
*/
def handleProducerRequest(request: RequestChannel.Request) {
val produceRequest = request.body.asInstanceOf[ProduceRequest]
val numBytesAppended = request.header.sizeOf + produceRequest.sizeOf
//note: 按 exist 和有 Describe 權(quán)限進(jìn)行篩選
val (existingAndAuthorizedForDescribeTopics, nonExistingOrUnauthorizedForDescribeTopics) = produceRequest.partitionRecords.asScala.partition {
case (topicPartition, _) => authorize(request.session, Describe, new Resource(auth.Topic, topicPartition.topic)) && metadataCache.contains(topicPartition.topic)
}
//note: 判斷有沒(méi)有 Write 權(quán)限
val (authorizedRequestInfo, unauthorizedForWriteRequestInfo) = existingAndAuthorizedForDescribeTopics.partition {
case (topicPartition, _) => authorize(request.session, Write, new Resource(auth.Topic, topicPartition.topic))
}
// the callback for sending a produce response
//note: 回調(diào)函數(shù)
def sendResponseCallback(responseStatus: Map[TopicPartition, PartitionResponse]) {
val mergedResponseStatus = responseStatus ++
unauthorizedForWriteRequestInfo.mapValues(_ => new PartitionResponse(Errors.TOPIC_AUTHORIZATION_FAILED)) ++
nonExistingOrUnauthorizedForDescribeTopics.mapValues(_ => new PartitionResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION))
var errorInResponse = false
mergedResponseStatus.foreach { case (topicPartition, status) =>
if (status.error != Errors.NONE) {
errorInResponse = true
debug("Produce request with correlation id %d from client %s on partition %s failed due to %s".format(
request.header.correlationId,
request.header.clientId,
topicPartition,
status.error.exceptionName))
}
}
def produceResponseCallback(delayTimeMs: Int) {
if (produceRequest.acks == 0) {
// no operation needed if producer request.required.acks = 0; however, if there is any error in handling
// the request, since no response is expected by the producer, the server will close socket server so that
// the producer client will know that some error has happened and will refresh its metadata
//note: 因?yàn)樵O(shè)置的 ack=0, 相當(dāng)于 client 會(huì)默認(rèn)發(fā)送成功了,如果 server 在處理的過(guò)程出現(xiàn)了錯(cuò)誤,那么就會(huì)關(guān)閉 socket 連接來(lái)間接地通知 client
//note: client 會(huì)重新刷新 meta,重新建立相應(yīng)的連接
if (errorInResponse) {
val exceptionsSummary = mergedResponseStatus.map { case (topicPartition, status) =>
topicPartition -> status.error.exceptionName
}.mkString(", ")
info(
s"Closing connection due to error during produce request with correlation id ${request.header.correlationId} " +
s"from client id ${request.header.clientId} with ack=0\n" +
s"Topic and partition to exceptions: $exceptionsSummary"
)
requestChannel.closeConnection(request.processor, request)
} else {
requestChannel.noOperation(request.processor, request)
}
} else {
val respBody = request.header.apiVersion match {
case 0 => new ProduceResponse(mergedResponseStatus.asJava)
case version@(1 | 2) => new ProduceResponse(mergedResponseStatus.asJava, delayTimeMs, version)
// This case shouldn't happen unless a new version of ProducerRequest is added without
// updating this part of the code to handle it properly.
case version => throw new IllegalArgumentException(s"Version `$version` of ProduceRequest is not handled. Code must be updated.")
}
requestChannel.sendResponse(new RequestChannel.Response(request, respBody))
}
}
// When this callback is triggered, the remote API call has completed
request.apiRemoteCompleteTimeMs = time.milliseconds
quotas.produce.recordAndMaybeThrottle(
request.session.sanitizedUser,
request.header.clientId,
numBytesAppended,
produceResponseCallback)
}
if (authorizedRequestInfo.isEmpty)
sendResponseCallback(Map.empty)
else {
val internalTopicsAllowed = request.header.clientId == AdminUtils.AdminClientId
// call the replica manager to append messages to the replicas
//note: 追加 Record
replicaManager.appendRecords(
produceRequest.timeout.toLong,
produceRequest.acks,
internalTopicsAllowed,
authorizedRequestInfo,
sendResponseCallback)
// if the request is put into the purgatory, it will have a held reference
// and hence cannot be garbage collected; hence we clear its data here in
// order to let GC re-claim its memory since it is already appended to log
produceRequest.clearPartitionRecords()
}
}
總體來(lái)說(shuō),處理過(guò)程是(在權(quán)限系統(tǒng)的情況下):
- 查看 topic 是否存在雌桑,以及 client 是否有相應(yīng)的 Desribe 權(quán)限喇喉;
- 對(duì)于已經(jīng)有 Describe 權(quán)限的 topic 查看是否有 Write 權(quán)限;
- 調(diào)用 replicaManager.appendRecords() 方法向有 Write 權(quán)限的 topic-partition 追加相應(yīng)的 record校坑。
ReplicaManager
ReplicaManager 顧名思義拣技,它就是副本管理器,副本管理器的作用是管理這臺(tái) broker 上的所有副本(replica)撒踪。在 Kafka 中,每個(gè)副本(replica)都會(huì)跟日志實(shí)例(Log 對(duì)象)一一對(duì)應(yīng)大渤,一個(gè)副本會(huì)對(duì)應(yīng)一個(gè) Log 對(duì)象制妄。
Kafka Server 在啟動(dòng)的時(shí)候,會(huì)創(chuàng)建 ReplicaManager 對(duì)象泵三,如下所示耕捞。在 ReplicaManager 的構(gòu)造方法中,它需要 LogManager 作為成員變量烫幕。
//kafka.server.KafkaServer
def startup() {
try {
info("starting")
/* start replica manager */
replicaManager = new ReplicaManager(config, metrics, time, zkUtils, kafkaScheduler, logManager, isShuttingDown, quotaManagers.follower)
replicaManager.startup()
}catch {
case e: Throwable =>
fatal("Fatal error during KafkaServer startup. Prepare to shutdown", e)
isStartingUp.set(false)
shutdown()
throw e
}
}
ReplicaManager 的并不負(fù)責(zé)具體的日志創(chuàng)建俺抽,它只是管理 Broker 上的所有分區(qū)(也就是圖中下一步的那個(gè) Partition 對(duì)象)。在創(chuàng)建 Partition 對(duì)象時(shí)较曼,它需要 ReplicaManager 的 logManager 對(duì)象磷斧,Partition 會(huì)通過(guò)這個(gè) logManager 對(duì)象為每個(gè) replica 創(chuàng)建對(duì)應(yīng)的日志。
/**
* Data structure that represents a topic partition. The leader maintains the AR, ISR, CUR, RAR
*/
class Partition(val topic: String,
val partitionId: Int,
time: Time,
replicaManager: ReplicaManager) extends Logging with KafkaMetricsGroup {
val topicPartition = new TopicPartition(topic, partitionId)
private val localBrokerId = replicaManager.config.brokerId
private val logManager = replicaManager.logManager //note: 日志管理器
}
ReplicaManager 與 LogManger 對(duì)比如下:
管理對(duì)象 | 組成部分 | |
---|---|---|
日志管理器(LogManager) | 日志(Log) | 日志分段(LogSegment) |
副本管理器(ReplicaManager) | 分區(qū)(Partition) | 副本(Replica) |
appendRecords() 實(shí)現(xiàn)
下面我們來(lái)看 appendRecords() 方法的具體實(shí)現(xiàn)捷犹。
//note: 向 partition 的 leader 寫入數(shù)據(jù)
def appendRecords(timeout: Long,
requiredAcks: Short,
internalTopicsAllowed: Boolean,
entriesPerPartition: Map[TopicPartition, MemoryRecords],
responseCallback: Map[TopicPartition, PartitionResponse] => Unit) {
if (isValidRequiredAcks(requiredAcks)) { //note: acks 設(shè)置有效
val sTime = time.milliseconds
//note: 向本地的副本 log 追加數(shù)據(jù)
val localProduceResults = appendToLocalLog(internalTopicsAllowed, entriesPerPartition, requiredAcks)
debug("Produce to local log in %d ms".format(time.milliseconds - sTime))
val produceStatus = localProduceResults.map { case (topicPartition, result) =>
topicPartition ->
ProducePartitionStatus(
result.info.lastOffset + 1, // required offset
new PartitionResponse(result.error, result.info.firstOffset, result.info.logAppendTime)) // response status
}
if (delayedRequestRequired(requiredAcks, entriesPerPartition, localProduceResults)) {
//note: 處理 ack=-1 的情況,需要等到 isr 的 follower 都寫入成功的話,才能返回最后結(jié)果
// create delayed produce operation
val produceMetadata = ProduceMetadata(requiredAcks, produceStatus)
//note: 延遲 produce 請(qǐng)求
val delayedProduce = new DelayedProduce(timeout, produceMetadata, this, responseCallback)
// create a list of (topic, partition) pairs to use as keys for this delayed produce operation
val producerRequestKeys = entriesPerPartition.keys.map(new TopicPartitionOperationKey(_)).toSeq
// try to complete the request immediately, otherwise put it into the purgatory
// this is because while the delayed produce operation is being created, new
// requests may arrive and hence make this operation completable.
delayedProducePurgatory.tryCompleteElseWatch(delayedProduce, producerRequestKeys)
} else {
// we can respond immediately
//note: 通過(guò)回調(diào)函數(shù)直接返回結(jié)果
val produceResponseStatus = produceStatus.mapValues(status => status.responseStatus)
responseCallback(produceResponseStatus)
}
} else {
// If required.acks is outside accepted range, something is wrong with the client
// Just return an error and don't handle the request at all
//note: 返回 INVALID_REQUIRED_ACKS 錯(cuò)誤
val responseStatus = entriesPerPartition.map { case (topicPartition, _) =>
topicPartition -> new PartitionResponse(Errors.INVALID_REQUIRED_ACKS,
LogAppendInfo.UnknownLogAppendInfo.firstOffset, Record.NO_TIMESTAMP)
}
responseCallback(responseStatus)
}
}
從上面的實(shí)現(xiàn)來(lái)看弛饭,appendRecords() 的實(shí)現(xiàn)主要分為以下幾步:
- 首先判斷 acks 設(shè)置是否有效(-1,0萍歉,1三個(gè)值有效)侣颂,無(wú)效的話直接返回異常,不再處理枪孩;
- acks 設(shè)置有效的話憔晒,調(diào)用 appendToLocalLog() 方法將 records 追加到本地對(duì)應(yīng)的 log 對(duì)象中藻肄;
- appendToLocalLog() 處理完后,如果發(fā)現(xiàn) clients 設(shè)置的 acks=-1拒担,即需要 isr 的其他的副本同步完成才能返回 response嘹屯,那么就會(huì)創(chuàng)建一個(gè) DelayedProduce 對(duì)象,等待 isr 的其他副本進(jìn)行同步澎蛛,否則的話直接返回追加的結(jié)果抚垄。
appendToLocalLog() 的實(shí)現(xiàn)
追加日志的實(shí)際操作是在 appendToLocalLog() 中完成的,這里看下它的具體實(shí)現(xiàn):
/**
* Append the messages to the local replica logs
*/
//note: 向本地的 replica 寫入數(shù)據(jù)
private def appendToLocalLog(internalTopicsAllowed: Boolean,
entriesPerPartition: Map[TopicPartition, MemoryRecords],
requiredAcks: Short): Map[TopicPartition, LogAppendResult] = {
trace("Append [%s] to local log ".format(entriesPerPartition))
entriesPerPartition.map { case (topicPartition, records) => //note: 遍歷要寫的所有 topic-partition
BrokerTopicStats.getBrokerTopicStats(topicPartition.topic).totalProduceRequestRate.mark()
BrokerTopicStats.getBrokerAllTopicsStats().totalProduceRequestRate.mark()
// reject appending to internal topics if it is not allowed
//note: 不能向 kafka 內(nèi)部使用的 topic 追加數(shù)據(jù)
if (Topic.isInternal(topicPartition.topic) && !internalTopicsAllowed) {
(topicPartition, LogAppendResult(
LogAppendInfo.UnknownLogAppendInfo,
Some(new InvalidTopicException(s"Cannot append to internal topic ${topicPartition.topic}"))))
} else {
try {
//note: 查找對(duì)應(yīng)的 Partition,并向分區(qū)對(duì)應(yīng)的副本寫入數(shù)據(jù)文件
val partitionOpt = getPartition(topicPartition) //note: 獲取 topic-partition 的 Partition 對(duì)象
val info = partitionOpt match {
case Some(partition) =>
partition.appendRecordsToLeader(records, requiredAcks) //note: 如果找到了這個(gè)對(duì)象,就開始追加日志
case None => throw new UnknownTopicOrPartitionException("Partition %s doesn't exist on %d"
.format(topicPartition, localBrokerId)) //note: 沒(méi)有找到的話,返回異常
}
//note: 追加的 msg 數(shù)
val numAppendedMessages =
if (info.firstOffset == -1L || info.lastOffset == -1L)
0
else
info.lastOffset - info.firstOffset + 1
// update stats for successfully appended bytes and messages as bytesInRate and messageInRate
//note: 更新 metrics
BrokerTopicStats.getBrokerTopicStats(topicPartition.topic).bytesInRate.mark(records.sizeInBytes)
BrokerTopicStats.getBrokerAllTopicsStats.bytesInRate.mark(records.sizeInBytes)
BrokerTopicStats.getBrokerTopicStats(topicPartition.topic).messagesInRate.mark(numAppendedMessages)
BrokerTopicStats.getBrokerAllTopicsStats.messagesInRate.mark(numAppendedMessages)
trace("%d bytes written to log %s-%d beginning at offset %d and ending at offset %d"
.format(records.sizeInBytes, topicPartition.topic, topicPartition.partition, info.firstOffset, info.lastOffset))
(topicPartition, LogAppendResult(info))
} catch { //note: 處理追加過(guò)程中出現(xiàn)的異常
// NOTE: Failed produce requests metric is not incremented for known exceptions
// it is supposed to indicate un-expected failures of a broker in handling a produce request
case e: KafkaStorageException =>
fatal("Halting due to unrecoverable I/O error while handling produce request: ", e)
Runtime.getRuntime.halt(1)
(topicPartition, null)
case e@ (_: UnknownTopicOrPartitionException |
_: NotLeaderForPartitionException |
_: RecordTooLargeException |
_: RecordBatchTooLargeException |
_: CorruptRecordException |
_: InvalidTimestampException) =>
(topicPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(e)))
case t: Throwable =>
BrokerTopicStats.getBrokerTopicStats(topicPartition.topic).failedProduceRequestRate.mark()
BrokerTopicStats.getBrokerAllTopicsStats.failedProduceRequestRate.mark()
error("Error processing append operation on partition %s".format(topicPartition), t)
(topicPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(t)))
}
}
}
}
從上面可以看到 appendToLocalLog() 的實(shí)現(xiàn)如下:
- 首先判斷要寫的 topic 是不是 Kafka 內(nèi)置的 topic谋逻,內(nèi)置的 topic 是不允許 Producer 寫入的呆馁;
- 先查找 topic-partition 對(duì)應(yīng)的 Partition 對(duì)象,如果在 allPartitions 中查找到了對(duì)應(yīng)的 partition毁兆,那么直接調(diào)用 partition.appendRecordsToLeader() 方法追加相應(yīng)的 records浙滤,否則會(huì)向 client 拋出異常。
Partition.appendRecordsToLeader() 方法
ReplicaManager 在追加 records 時(shí)气堕,調(diào)用的是 Partition 的 appendRecordsToLeader() 方法纺腊,其具體的實(shí)現(xiàn)如下:
def appendRecordsToLeader(records: MemoryRecords, requiredAcks: Int = 0) = {
val (info, leaderHWIncremented) = inReadLock(leaderIsrUpdateLock) {
leaderReplicaIfLocal match {
case Some(leaderReplica) =>
val log = leaderReplica.log.get //note: 獲取對(duì)應(yīng)的 Log 對(duì)象
val minIsr = log.config.minInSyncReplicas
val inSyncSize = inSyncReplicas.size
// Avoid writing to leader if there are not enough insync replicas to make it safe
//note: 如果 ack 設(shè)置為-1, isr 數(shù)小于設(shè)置的 min.isr 時(shí),就會(huì)向 producer 拋出相應(yīng)的異常
if (inSyncSize < minIsr && requiredAcks == -1) {
throw new NotEnoughReplicasException("Number of insync replicas for partition %s is [%d], below required minimum [%d]"
.format(topicPartition, inSyncSize, minIsr))
}
//note: 向副本對(duì)應(yīng)的 log 追加響應(yīng)的數(shù)據(jù)
val info = log.append(records, assignOffsets = true)
// probably unblock some follower fetch requests since log end offset has been updated
replicaManager.tryCompleteDelayedFetch(TopicPartitionOperationKey(this.topic, this.partitionId))
// we may need to increment high watermark since ISR could be down to 1
//note: 判斷是否需要增加 HHW(追加日志后會(huì)進(jìn)行一次判斷)
(info, maybeIncrementLeaderHW(leaderReplica))
case None =>
//note: leader 不在本臺(tái)機(jī)器上
throw new NotLeaderForPartitionException("Leader not local for partition %s on broker %d"
.format(topicPartition, localBrokerId))
}
}
// some delayed operations may be unblocked after HW changed
if (leaderHWIncremented)
tryCompleteDelayedRequests()
info
}
在這個(gè)方法里,會(huì)根據(jù) topic 的 min.isrs 配置以及當(dāng)前這個(gè) partition 的 isr 情況判斷是否可以寫入茎芭,如果不滿足條件揖膜,就會(huì)拋出 NotEnoughReplicasException 的異常,如果滿足條件梅桩,就會(huì)調(diào)用 log.append() 向 replica 追加日志壹粟。
存儲(chǔ)層
跟著最開始圖中的流程及代碼分析,走到這里宿百,才算是到了 Kafka 的存儲(chǔ)層部分趁仙,在這里會(huì)詳細(xì)講述在存儲(chǔ)層 Kafka 如何寫入日志。
Log 對(duì)象
在上面有過(guò)一些介紹垦页,每個(gè) replica 會(huì)對(duì)應(yīng)一個(gè) log 對(duì)象雀费,log 對(duì)象是管理當(dāng)前分區(qū)的一個(gè)單位,它會(huì)包含這個(gè)分區(qū)的所有 segment 文件(包括對(duì)應(yīng)的 offset 索引和時(shí)間戳索引文件)痊焊,它會(huì)提供一些增刪查的方法盏袄。
在 Log 對(duì)象的初始化時(shí),有三個(gè)變量是比較重要的:
- nextOffsetMetadata:可以叫做下一個(gè)偏移量元數(shù)據(jù)薄啥,它包括 activeSegment 的下一條消息的偏移量貌矿,該 activeSegment 的基準(zhǔn)偏移量及日志分段的大小罪佳;
- activeSegment:指的是該 Log 管理的 segments 中那個(gè)最新的 segment(這里叫做活躍的 segment)逛漫,一個(gè) Log 中只會(huì)有一個(gè)活躍的 segment,其他的 segment 都已經(jīng)被持久化到磁盤了赘艳;
- logEndOffset:表示下一條消息的 offset酌毡,它取自 nextOffsetMetadata 的 offset克握,實(shí)際上就是活動(dòng)日志分段的下一個(gè)偏移量。
//note: nextOffsetMetadata 聲明為 volatile枷踏,如果該值被修改菩暗,其他使用此變量的線程就可以立刻見到變化后的值,在生產(chǎn)和消費(fèi)都會(huì)使用到這個(gè)值
@volatile private var nextOffsetMetadata: LogOffsetMetadata = _
/* Calculate the offset of the next message */
//note: 下一個(gè)偏移量元數(shù)據(jù)
//note: 第一個(gè)參數(shù):下一條消息的偏移量旭蠕;第二個(gè)參數(shù):日志分段的基準(zhǔn)偏移量停团;第三個(gè)參數(shù):日志分段大小
nextOffsetMetadata = new LogOffsetMetadata(activeSegment.nextOffset(), activeSegment.baseOffset, activeSegment.size.toInt)
/**
* The active segment that is currently taking appends
*/
//note: 任何時(shí)刻,只會(huì)有一個(gè)活動(dòng)的日志分段
def activeSegment = segments.lastEntry.getValue
/**
* The offset of the next message that will be appended to the log
*/
//note: 下一條消息的 offset掏熬,從 nextOffsetMetadata 中獲取的
def logEndOffset: Long = nextOffsetMetadata.messageOffset
日志寫入
在 Log 中一個(gè)重要的方法就是日志的寫入方法佑稠,下面來(lái)看下這個(gè)方法的實(shí)現(xiàn)。
/**
* Append this message set to the active segment of the log, rolling over to a fresh segment if necessary.
*
* This method will generally be responsible for assigning offsets to the messages,
* however if the assignOffsets=false flag is passed we will only check that the existing offsets are valid.
*
* @param records The log records to append
* @param assignOffsets Should the log assign offsets to this message set or blindly apply what it is given
* @throws KafkaStorageException If the append fails due to an I/O error.
* @return Information about the appended messages including the first and last offset.
*/
//note: 向 active segment 追加 log,必要的情況下,滾動(dòng)創(chuàng)建新的 segment
def append(records: MemoryRecords, assignOffsets: Boolean = true): LogAppendInfo = {
val appendInfo = analyzeAndValidateRecords(records) //note: 返回這批消息的該要信息,并對(duì)這批 msg 進(jìn)行校驗(yàn)
// if we have any valid messages, append them to the log
if (appendInfo.shallowCount == 0)
return appendInfo
// trim any invalid bytes or partial messages before appending it to the on-disk log
//note: 刪除這批消息中無(wú)效的消息
var validRecords = trimInvalidBytes(records, appendInfo)
try {
// they are valid, insert them in the log
lock synchronized {
if (assignOffsets) {
// assign offsets to the message set
//note: 計(jì)算這個(gè)消息集起始 offset旗芬,對(duì) offset 的操作是一個(gè)原子操作
val offset = new LongRef(nextOffsetMetadata.messageOffset)
appendInfo.firstOffset = offset.value //note: 作為消息集的第一個(gè) offset
val now = time.milliseconds //note: 設(shè)置的時(shí)間錯(cuò)以 server 收到的時(shí)間戳為準(zhǔn)
//note: 驗(yàn)證消息,并為沒(méi)條 record 設(shè)置相應(yīng)的 offset 和 timestrap
val validateAndOffsetAssignResult = try {
LogValidator.validateMessagesAndAssignOffsets(validRecords,
offset,
now,
appendInfo.sourceCodec,
appendInfo.targetCodec,
config.compact,
config.messageFormatVersion.messageFormatVersion,
config.messageTimestampType,
config.messageTimestampDifferenceMaxMs)
} catch {
case e: IOException => throw new KafkaException("Error in validating messages while appending to log '%s'".format(name), e)
}
//note: 返回已經(jīng)計(jì)算好 offset 和 timestrap 的 MemoryRecords
validRecords = validateAndOffsetAssignResult.validatedRecords
appendInfo.maxTimestamp = validateAndOffsetAssignResult.maxTimestamp
appendInfo.offsetOfMaxTimestamp = validateAndOffsetAssignResult.shallowOffsetOfMaxTimestamp
appendInfo.lastOffset = offset.value - 1 //note: 最后一條消息的 offset
if (config.messageTimestampType == TimestampType.LOG_APPEND_TIME)
appendInfo.logAppendTime = now
// re-validate message sizes if there's a possibility that they have changed (due to re-compression or message
// format conversion)
//note: 更新 metrics 的記錄
if (validateAndOffsetAssignResult.messageSizeMaybeChanged) {
for (logEntry <- validRecords.shallowEntries.asScala) {
if (logEntry.sizeInBytes > config.maxMessageSize) {
// we record the original message set size instead of the trimmed size
// to be consistent with pre-compression bytesRejectedRate recording
BrokerTopicStats.getBrokerTopicStats(topicPartition.topic).bytesRejectedRate.mark(records.sizeInBytes)
BrokerTopicStats.getBrokerAllTopicsStats.bytesRejectedRate.mark(records.sizeInBytes)
throw new RecordTooLargeException("Message size is %d bytes which exceeds the maximum configured message size of %d."
.format(logEntry.sizeInBytes, config.maxMessageSize))
}
}
}
} else {
// we are taking the offsets we are given
if (!appendInfo.offsetsMonotonic || appendInfo.firstOffset < nextOffsetMetadata.messageOffset)
throw new IllegalArgumentException("Out of order offsets found in " + records.deepEntries.asScala.map(_.offset))
}
// check messages set size may be exceed config.segmentSize
if (validRecords.sizeInBytes > config.segmentSize) {
throw new RecordBatchTooLargeException("Message set size is %d bytes which exceeds the maximum configured segment size of %d."
.format(validRecords.sizeInBytes, config.segmentSize))
}
// maybe roll the log if this segment is full
//note: 如果當(dāng)前 segment 滿了舌胶,就需要重新新建一個(gè) segment
val segment = maybeRoll(messagesSize = validRecords.sizeInBytes,
maxTimestampInMessages = appendInfo.maxTimestamp,
maxOffsetInMessages = appendInfo.lastOffset)
// now append to the log
//note: 追加消息到當(dāng)前 segment
segment.append(firstOffset = appendInfo.firstOffset,
largestOffset = appendInfo.lastOffset,
largestTimestamp = appendInfo.maxTimestamp,
shallowOffsetOfMaxTimestamp = appendInfo.offsetOfMaxTimestamp,
records = validRecords)
// increment the log end offset
//note: 修改最新的 next_offset
updateLogEndOffset(appendInfo.lastOffset + 1)
trace("Appended message set to log %s with first offset: %d, next offset: %d, and messages: %s"
.format(this.name, appendInfo.firstOffset, nextOffsetMetadata.messageOffset, validRecords))
if (unflushedMessages >= config.flushInterval)//note: 滿足條件的話,刷新磁盤
flush()
appendInfo
}
} catch {
case e: IOException => throw new KafkaStorageException("I/O exception in append to log '%s'".format(name), e)
}
}
Server 將每個(gè)分區(qū)的消息追加到日志中時(shí)疮丛,是以 segment 為單位的幔嫂,當(dāng) segment 的大小到達(dá)閾值大小之后,會(huì)滾動(dòng)新建一個(gè)日志分段(segment)保存新的消息誊薄,而分區(qū)的消息總是追加到最新的日志分段(也就是 activeSegment)中履恩。每個(gè)日志分段都會(huì)有一個(gè)基準(zhǔn)偏移量(segmentBaseOffset,或者叫做 baseOffset)呢蔫,這個(gè)基準(zhǔn)偏移量就是分區(qū)級(jí)別的絕對(duì)偏移量切心,而且這個(gè)值在日志分段是固定的。有了這個(gè)基準(zhǔn)偏移量咐刨,就可以計(jì)算出來(lái)每條消息在分區(qū)中的絕對(duì)偏移量昙衅,最后把數(shù)據(jù)以及對(duì)應(yīng)的絕對(duì)偏移量寫到日志文件中扬霜。append() 方法的過(guò)程可以總結(jié)如下:
- analyzeAndValidateRecords():對(duì)這批要寫入的消息進(jìn)行檢測(cè)定鸟,主要是檢查消息的大小及 crc 校驗(yàn);
- trimInvalidBytes():會(huì)將這批消息中無(wú)效的消息刪除著瓶,返回一個(gè)都是有效消息的 MemoryRecords联予;
- LogValidator.validateMessagesAndAssignOffsets():為每條消息設(shè)置相應(yīng)的 offset(絕對(duì)偏移量) 和 timestrap;
- maybeRoll():判斷是否需要新建一個(gè) segment 的材原,如果當(dāng)前的 segment 放不下這批消息的話沸久,需要新建一個(gè) segment;
- segment.append():向 segment 中添加消息余蟹;
- 更新 logEndOffset 和判斷是否需要刷新磁盤(如果需要的話卷胯,調(diào)用 flush() 方法刷到磁盤)。
關(guān)于 timestrap 的設(shè)置威酒,這里也順便介紹一下窑睁,在新版的 Kafka 中挺峡,每條 msg 都會(huì)有一個(gè)對(duì)應(yīng)的時(shí)間戳記錄,producer 端可以設(shè)置這個(gè)字段 message.timestamp.type 來(lái)選擇 timestrap 的類型担钮,默認(rèn)是按照創(chuàng)建時(shí)間橱赠,只能選擇從下面的選擇中二選一:
- CreateTime,默認(rèn)值箫津;
- LogAppendTime狭姨。
日志分段
在 Log 的 append() 方法中,會(huì)調(diào)用 maybeRoll() 方法來(lái)判斷是否需要進(jìn)行相應(yīng)日志分段操作苏遥,其具體實(shí)現(xiàn)如下:
/**
* Roll the log over to a new empty log segment if necessary.
*
* @param messagesSize The messages set size in bytes
* @param maxTimestampInMessages The maximum timestamp in the messages.
* logSegment will be rolled if one of the following conditions met
* <ol>
* <li> The logSegment is full
* <li> The maxTime has elapsed since the timestamp of first message in the segment (or since the create time if
* the first message does not have a timestamp)
* <li> The index is full
* </ol>
* @return The currently active segment after (perhaps) rolling to a new segment
*/
//note: 判斷是否需要?jiǎng)?chuàng)建日志分段,如果不需要返回當(dāng)前分段,需要的話,返回新創(chuàng)建的日志分段
private def maybeRoll(messagesSize: Int, maxTimestampInMessages: Long, maxOffsetInMessages: Long): LogSegment = {
val segment = activeSegment //note: 對(duì)活躍的日志分段進(jìn)行判斷,它也是最新的一個(gè)日志分段
val now = time.milliseconds
//note: 距離上次日志分段的時(shí)間是否達(dá)到了設(shè)置的閾值(log.roll.hours)
val reachedRollMs = segment.timeWaitedForRoll(now, maxTimestampInMessages) > config.segmentMs - segment.rollJitterMs
//note: 這是五個(gè)條件: 1. 文件滿了,不足以放心這么大的 messageSet; 2. 文件有數(shù)據(jù),并且到分段的時(shí)間閾值; 3. 索引文件滿了;
//note: 4. 時(shí)間索引文件滿了; 5. 最大的 offset饼拍,其相對(duì)偏移量超過(guò)了正整數(shù)的閾值
if (segment.size > config.segmentSize - messagesSize ||
(segment.size > 0 && reachedRollMs) ||
segment.index.isFull || segment.timeIndex.isFull || !segment.canConvertToRelativeOffset(maxOffsetInMessages)) {
debug(s"Rolling new log segment in $name (log_size = ${segment.size}/${config.segmentSize}}, " +
s"index_size = ${segment.index.entries}/${segment.index.maxEntries}, " +
s"time_index_size = ${segment.timeIndex.entries}/${segment.timeIndex.maxEntries}, " +
s"inactive_time_ms = ${segment.timeWaitedForRoll(now, maxTimestampInMessages)}/${config.segmentMs - segment.rollJitterMs}).")
roll(maxOffsetInMessages - Integer.MAX_VALUE) //note: 創(chuàng)建新的日志分段
} else {
segment //note: 使用當(dāng)前的日志分段
}
}
從 maybeRoll() 的實(shí)現(xiàn)可以看到,是否需要?jiǎng)?chuàng)建新的日志分段暖眼,有下面幾種情況:
- 當(dāng)前日志分段的大小加上消息的大小超過(guò)了日志分段的閾值(log.segment.bytes)惕耕;
- 距離上次創(chuàng)建日志分段的時(shí)間達(dá)到了一定的閾值(log.roll.hours),并且數(shù)據(jù)文件有數(shù)據(jù)诫肠;
- 索引文件滿了司澎;
- 時(shí)間索引文件滿了;
- 最大的 offset栋豫,其相對(duì)偏移量超過(guò)了正整數(shù)的閾值懦鼠。
如果上面的其中一個(gè)條件,就會(huì)創(chuàng)建新的 segment 文件恬砂,見 roll() 方法實(shí)現(xiàn):
/**
* Roll the log over to a new active segment starting with the current logEndOffset.
* This will trim the index to the exact size of the number of entries it currently contains.
*
* @return The newly rolled segment
*/
//note: 滾動(dòng)創(chuàng)建日志,并添加到日志管理的映射表中
def roll(expectedNextOffset: Long = 0): LogSegment = {
val start = time.nanoseconds
lock synchronized {
val newOffset = Math.max(expectedNextOffset, logEndOffset) //note: 選擇最新的 offset 作為基準(zhǔn)偏移量
val logFile = logFilename(dir, newOffset) //note: 創(chuàng)建數(shù)據(jù)文件
val indexFile = indexFilename(dir, newOffset) //note: 創(chuàng)建 offset 索引文件
val timeIndexFile = timeIndexFilename(dir, newOffset) //note: 創(chuàng)建 time 索引文件
for(file <- List(logFile, indexFile, timeIndexFile); if file.exists) {
warn("Newly rolled segment file " + file.getName + " already exists; deleting it first")
file.delete()
}
segments.lastEntry() match {
case null =>
case entry => {
val seg = entry.getValue
seg.onBecomeInactiveSegment()
seg.index.trimToValidSize()
seg.timeIndex.trimToValidSize()
seg.log.trim()
}
}
//note: 創(chuàng)建一個(gè) segment 對(duì)象
val segment = new LogSegment(dir,
startOffset = newOffset,
indexIntervalBytes = config.indexInterval,
maxIndexSize = config.maxIndexSize,
rollJitterMs = config.randomSegmentJitter,
time = time,
fileAlreadyExists = false,
initFileSize = initFileSize,
preallocate = config.preallocate)
val prev = addSegment(segment) //note: 添加到日志管理中
if(prev != null)
throw new KafkaException("Trying to roll a new log segment for topic partition %s with start offset %d while it already exists.".format(name, newOffset))
// We need to update the segment base offset and append position data of the metadata when log rolls.
// The next offset should not change.
updateLogEndOffset(nextOffsetMetadata.messageOffset) //note: 更新 offset
// schedule an asynchronous flush of the old segment
scheduler.schedule("flush-log", () => flush(newOffset), delay = 0L)
info("Rolled new log segment for '" + name + "' in %.0f ms.".format((System.nanoTime - start) / (1000.0*1000.0)))
segment
}
}
創(chuàng)建一個(gè) segment 對(duì)象嘹叫,真正的實(shí)現(xiàn)是在 Log 的 roll() 方法中,也就是上面的方法中丛肢,創(chuàng)建 segment 對(duì)象围肥,主要包括三部分:數(shù)據(jù)文件、offset 索引文件和 time 索引文件蜂怎。
offset 索引文件
這里順便講述一下 offset 索引文件穆刻,Kafka 的索引文件有下面一個(gè)特點(diǎn):
- 采用 絕對(duì)偏移量+相對(duì)偏移量 的方式進(jìn)行存儲(chǔ)的,每個(gè) segment 最開始絕對(duì)偏移量也是其基準(zhǔn)偏移量杠步;
- 數(shù)據(jù)文件每隔一定的大小創(chuàng)建一個(gè)索引條目氢伟,而不是每條消息會(huì)創(chuàng)建索引條目,通過(guò) index.interval.bytes 來(lái)配置幽歼,默認(rèn)是 4096朵锣,也就是4KB;
這樣做的好處也非常明顯:
- 因?yàn)椴皇敲織l消息都創(chuàng)建相應(yīng)的索引條目甸私,所以索引條目是稀疏的诚些;
- 索引的相對(duì)偏移量占據(jù)4個(gè)字節(jié),而絕對(duì)偏移量占據(jù)8個(gè)字節(jié)皇型,加上物理位置的4個(gè)字節(jié)诬烹,使用相對(duì)索引可以將每條索引條目的大小從12字節(jié)減少到8個(gè)字節(jié)助析;
- 因?yàn)槠屏坑行虻模僮x取數(shù)據(jù)時(shí)椅您,可以按照二分查找的方式去快速定位偏移量的位置外冀;
- 這樣的稀疏索引是可以完全放到內(nèi)存中,加快偏移量的查找掀泳。
LogSegment 寫入
真正的日志寫入雪隧,還是在 LogSegment 的 append() 方法中完成的,LogSegment 會(huì)跟 Kafka 最底層的文件通道员舵、mmap 打交道脑沿。
/**
* Append the given messages starting with the given offset. Add
* an entry to the index if needed.
*
* It is assumed this method is being called from within a lock.
*
* @param firstOffset The first offset in the message set.
* @param largestTimestamp The largest timestamp in the message set.
* @param shallowOffsetOfMaxTimestamp The offset of the message that has the largest timestamp in the messages to append.
* @param records The log entries to append.
*/
//note: 在指定的 offset 處追加指定的 msgs, 需要的情況下追加相應(yīng)的索引
@nonthreadsafe
def append(firstOffset: Long, largestOffset: Long, largestTimestamp: Long, shallowOffsetOfMaxTimestamp: Long, records: MemoryRecords) {
if (records.sizeInBytes > 0) {
trace("Inserting %d bytes at offset %d at position %d with largest timestamp %d at shallow offset %d"
.format(records.sizeInBytes, firstOffset, log.sizeInBytes(), largestTimestamp, shallowOffsetOfMaxTimestamp))
val physicalPosition = log.sizeInBytes()
if (physicalPosition == 0)
rollingBasedTimestamp = Some(largestTimestamp)
// append the messages
require(canConvertToRelativeOffset(largestOffset), "largest offset in message set can not be safely converted to relative offset.")
val appendedBytes = log.append(records) //note: 追加到數(shù)據(jù)文件中
trace(s"Appended $appendedBytes to ${log.file()} at offset $firstOffset")
// Update the in memory max timestamp and corresponding offset.
if (largestTimestamp > maxTimestampSoFar) {
maxTimestampSoFar = largestTimestamp
offsetOfMaxTimestamp = shallowOffsetOfMaxTimestamp
}
// append an entry to the index (if needed)
//note: 判斷是否需要追加索引(數(shù)據(jù)每次都會(huì)添加到數(shù)據(jù)文件中,但不是每次都會(huì)添加索引的,間隔 indexIntervalBytes 大小才會(huì)寫入一個(gè)索引文件)
if(bytesSinceLastIndexEntry > indexIntervalBytes) {
index.append(firstOffset, physicalPosition) //note: 添加索引
timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestamp)
bytesSinceLastIndexEntry = 0 //note: 重置為0
}
bytesSinceLastIndexEntry += records.sizeInBytes
}
}
經(jīng)過(guò)上面的分析,一個(gè)消息集(MemoryRecords)在 Kafka 存儲(chǔ)層的調(diào)用情況如下圖所示:
最后還是利用底層的 Java NIO 實(shí)現(xiàn)马僻。
FAQ
有一點(diǎn)不是很明白庄拇,kafka server在接收到producer的請(qǐng)求以后是不是實(shí)時(shí)持久化的呢?
看到文章的最后說(shuō),
LogSegment 會(huì)跟 Kafka 最底層的文件通道韭邓、mmap 打交道措近。
是不是是說(shuō)kafka的消息是實(shí)時(shí)的進(jìn)行了持久化呢?
但是也看到上面在Log.append() 方法中有這樣的介紹
if (unflushedMessages >= config.flushInterval)//note: 滿足條件的話女淑,刷新磁盤
flush()
如果是實(shí)時(shí)持久化的為什么還需要這個(gè)判斷操作呢
明白了瞭郑,使用mmap只是寫入了頁(yè)緩存,并沒(méi)有flush進(jìn)磁盤鸭你,