接下去幾篇關(guān)于可靠性的文章全部只討論一個經(jīng)典問題:
Kafka怎么樣才能不丟消息暴心?
怎么樣的情況叫做丟消息咳短?客戶端調(diào)用future = send(msg, callback)
嗅战,但是中途報錯了评也,這種情況不叫丟消息攒磨。真正丟消息的場景是逐样,客戶端調(diào)用了future = send(msg, callback)
后蜗字,Broker已經(jīng)明確告知客戶端,這條消息已經(jīng)發(fā)送成功了(future.isDone為true脂新,或者callback的onSuccess被調(diào)用)挪捕,但是消費者缺永久性消費不到這條數(shù)據(jù)。
在生產(chǎn)者上争便,有一個參數(shù)叫做acks级零,
如果acks=0,代表消息一旦被發(fā)送到Socket buffer中滞乙,就已經(jīng)可以考慮消息發(fā)送成功奏纪,這個顯然是不安全的,不做討論斩启;
如果acks=1序调,代表消息只要在1
個ISR
中被持久化成功后,Broker就可以告訴生產(chǎn)者兔簇,消息已經(jīng)發(fā)送成功了发绢。
如果acks=all硬耍,代表消息需要在所有
ISR都被持久化成功后,Broker才可以告訴生產(chǎn)者边酒,消息已經(jīng)發(fā)送成功了经柴。
假如Broker關(guān)于測試Topic的Replic設(shè)置為3,也就是說正常情況下ISR為3墩朦。
首先將生產(chǎn)者的acks配置為1(acks=1)
消息被發(fā)送到Broker后坯认,是由該TopicPartition的Leader處理。Leader會調(diào)用appendToLocalLog將消息持久化在本地氓涣。
val localProduceResults = appendToLocalLog(internalTopicsAllowed = internalTopicsAllowed,
isFromClient = isFromClient, entriesPerPartition, requiredAcks)
持久化成功后牛哺,如果Leader立刻用reponse通知生產(chǎn)者,說春哨,消息已經(jīng)發(fā)送成功了荆隘,萬一這時Leader掛了恩伺,那么消息就丟失了赴背,消費者將沒有辦法消費到這條數(shù)據(jù)。
將生產(chǎn)者的acks配置為all(acks=-1/all)
Leader調(diào)用appendToLocalLog將消息持久化在本地后晶渠,不會立馬給生產(chǎn)者返回凰荚,而是啟動一個DelayedProduce(延時發(fā)送任務(wù))。
if (delayedProduceRequestRequired(requiredAcks, entriesPerPartition, localProduceResults)) {
// create delayed produce operation
val produceMetadata = ProduceMetadata(requiredAcks, produceStatus)
val delayedProduce = new DelayedProduce(timeout, produceMetadata, this, responseCallback, delayedProduceLock)
// create a list of (topic, partition) pairs to use as keys for this delayed produce operation
val producerRequestKeys = entriesPerPartition.keys.map(new TopicPartitionOperationKey(_)).toSeq
// try to complete the request immediately, otherwise put it into the purgatory
// this is because while the delayed produce operation is being created, new
// requests may arrive and hence make this operation completable.
delayedProducePurgatory.tryCompleteElseWatch(delayedProduce, producerRequestKeys)
} else {
// we can respond immediately
val produceResponseStatus = produceStatus.mapValues(status => status.responseStatus)
responseCallback(produceResponseStatus)
}
// If all the following conditions are true, we need to put a delayed produce request and wait for replication to complete
//
// 1. required acks = -1
// 2. there is data to append
// 3. at least one partition append was successful (fewer errors than partitions)
private def delayedProduceRequestRequired(requiredAcks: Short,
entriesPerPartition: Map[TopicPartition, MemoryRecords],
localProduceResults: Map[TopicPartition, LogAppendResult]): Boolean = {
requiredAcks == -1 &&
entriesPerPartition.nonEmpty &&
localProduceResults.values.count(_.exception.isDefined) < entriesPerPartition.size
}
這時對于生產(chǎn)者而言褒脯,它還沒有被通知消息已經(jīng)發(fā)送成功了便瑟。即使這個時候這個Leader掛了,也不能算是消息丟失番川,只是生產(chǎn)者需要重新發(fā)送下就好到涂。
問題還沒有結(jié)束,對于那個Leader而言颁督,剛剛說到它只是創(chuàng)建了一個DelayedProduce践啄,它什么時候才會給生產(chǎn)者回復(fù)呢。問題就到了這個DelayedProduce身上沉御,延時是不可能無休止的屿讽,查看到DelayedProduce的tryComplete方法,只要滿足了下面的這個條件吠裆,DelayedProduce這個延時任務(wù)就需要開始執(zhí)行伐谈。
// kafka.server.DelayedProduce#tryComplete
override def tryComplete(): Boolean = {
// check for each partition if it still has pending acks
produceMetadata.produceStatus.foreach { case (topicPartition, status) =>
if (status.acksPending) {
val (hasEnough, error) = replicaManager.getPartition(topicPartition) match {
// code
// 返回false的情況很多,但是我們目前只關(guān)注返回true的情況试疙,所以這里需要跟進(jìn)去
partition.checkEnoughReplicasReachOffset(status.requiredOffset)
// code
}
// code return false
}
// ..
}
def checkEnoughReplicasReachOffset(requiredOffset: Long): (Boolean, Errors) = {
leaderReplicaIfLocal match {
// code
val minIsr = leaderReplica.log.get.config.minInSyncReplicas
// 足夠數(shù)量的ISR同步到了待發(fā)送的這條消息
if (leaderReplica.highWatermark.messageOffset >= requiredOffset) {
/*
* The topic may be configured not to accept messages if there are not enough replicas in ISR
* in this scenario the request was already appended locally and then added to the purgatory before the ISR was shrunk
*/
if (minIsr <= curInSyncReplicas.size)
(true, Errors.NONE)
else
(true, Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND)
} else
(false, Errors.NONE)
case None =>
(false, Errors.NOT_LEADER_FOR_PARTITION)
}
}
簡述這段邏輯就是诵棵,當(dāng)足夠數(shù)量的ISR同步到了待發(fā)送的這條消息,DelayedProduce會主動給生產(chǎn)者發(fā)送成功的響應(yīng)祝旷,也就是下面這段邏輯履澳。
// kafka.server.DelayedProduce#onComplete
override def onComplete() {
val responseStatus = produceMetadata.produceStatus.mapValues(status => status.responseStatus)
responseCallback(responseStatus)
}
生產(chǎn)者順利收到Broker的響應(yīng)后柱徙,消息就成功發(fā)送。這時奇昙,萬一Leader掛了护侮,就不怕了,剩下存活著的ISR中的某一個會被選為新的Leader(這個邏輯之后再聊)储耐,消費者照樣還是能消費到這條消息的羊初。
問題解決了么?還沒有什湘,請看下篇分解长赞。