Kafka源碼分析(六)消息發(fā)送可靠性——acks

接下去幾篇關(guān)于可靠性的文章全部只討論一個經(jīng)典問題:
Kafka怎么樣才能不丟消息暴心?

怎么樣的情況叫做丟消息咳短?客戶端調(diào)用future = send(msg, callback)嗅战,但是中途報錯了评也,這種情況不叫丟消息攒磨。真正丟消息的場景是逐样,客戶端調(diào)用了future = send(msg, callback)后蜗字,Broker已經(jīng)明確告知客戶端,這條消息已經(jīng)發(fā)送成功了(future.isDone為true脂新,或者callback的onSuccess被調(diào)用)挪捕,但是消費者缺永久性消費不到這條數(shù)據(jù)。

在生產(chǎn)者上争便,有一個參數(shù)叫做acks级零,
如果acks=0,代表消息一旦被發(fā)送到Socket buffer中滞乙,就已經(jīng)可以考慮消息發(fā)送成功奏纪,這個顯然是不安全的,不做討論斩启;
如果acks=1序调,代表消息只要在1ISR中被持久化成功后,Broker就可以告訴生產(chǎn)者兔簇,消息已經(jīng)發(fā)送成功了发绢。
如果acks=all硬耍,代表消息需要在所有ISR都被持久化成功后,Broker才可以告訴生產(chǎn)者边酒,消息已經(jīng)發(fā)送成功了经柴。

假如Broker關(guān)于測試Topic的Replic設(shè)置為3,也就是說正常情況下ISR為3墩朦。
首先將生產(chǎn)者的acks配置為1(acks=1)
消息被發(fā)送到Broker后坯认,是由該TopicPartition的Leader處理。Leader會調(diào)用appendToLocalLog將消息持久化在本地氓涣。

val localProduceResults = appendToLocalLog(internalTopicsAllowed = internalTopicsAllowed,
        isFromClient = isFromClient, entriesPerPartition, requiredAcks)

持久化成功后牛哺,如果Leader立刻用reponse通知生產(chǎn)者,說春哨,消息已經(jīng)發(fā)送成功了荆隘,萬一這時Leader掛了恩伺,那么消息就丟失了赴背,消費者將沒有辦法消費到這條數(shù)據(jù)。

將生產(chǎn)者的acks配置為all(acks=-1/all)
Leader調(diào)用appendToLocalLog將消息持久化在本地后晶渠,不會立馬給生產(chǎn)者返回凰荚,而是啟動一個DelayedProduce(延時發(fā)送任務(wù))。

if (delayedProduceRequestRequired(requiredAcks, entriesPerPartition, localProduceResults)) {
    // create delayed produce operation
    val produceMetadata = ProduceMetadata(requiredAcks, produceStatus)
    val delayedProduce = new DelayedProduce(timeout, produceMetadata, this, responseCallback, delayedProduceLock)

    // create a list of (topic, partition) pairs to use as keys for this delayed produce operation
    val producerRequestKeys = entriesPerPartition.keys.map(new TopicPartitionOperationKey(_)).toSeq

    // try to complete the request immediately, otherwise put it into the purgatory
    // this is because while the delayed produce operation is being created, new
    // requests may arrive and hence make this operation completable.
    delayedProducePurgatory.tryCompleteElseWatch(delayedProduce, producerRequestKeys)
} else {
    // we can respond immediately
    val produceResponseStatus = produceStatus.mapValues(status => status.responseStatus)
    responseCallback(produceResponseStatus)
}

// If all the following conditions are true, we need to put a delayed produce request and wait for replication to complete
//
// 1. required acks = -1
// 2. there is data to append
// 3. at least one partition append was successful (fewer errors than partitions)
private def delayedProduceRequestRequired(requiredAcks: Short,
                                        entriesPerPartition: Map[TopicPartition, MemoryRecords],
                                        localProduceResults: Map[TopicPartition, LogAppendResult]): Boolean = {
    requiredAcks == -1 &&
    entriesPerPartition.nonEmpty &&
    localProduceResults.values.count(_.exception.isDefined) < entriesPerPartition.size
}

這時對于生產(chǎn)者而言褒脯,它還沒有被通知消息已經(jīng)發(fā)送成功了便瑟。即使這個時候這個Leader掛了,也不能算是消息丟失番川,只是生產(chǎn)者需要重新發(fā)送下就好到涂。

問題還沒有結(jié)束,對于那個Leader而言颁督,剛剛說到它只是創(chuàng)建了一個DelayedProduce践啄,它什么時候才會給生產(chǎn)者回復(fù)呢。問題就到了這個DelayedProduce身上沉御,延時是不可能無休止的屿讽,查看到DelayedProduce的tryComplete方法,只要滿足了下面的這個條件吠裆,DelayedProduce這個延時任務(wù)就需要開始執(zhí)行伐谈。

// kafka.server.DelayedProduce#tryComplete
override def tryComplete(): Boolean = {
    // check for each partition if it still has pending acks
    produceMetadata.produceStatus.foreach { case (topicPartition, status) =>
        if (status.acksPending) {
        val (hasEnough, error) = replicaManager.getPartition(topicPartition) match {
            // code 
            // 返回false的情況很多,但是我們目前只關(guān)注返回true的情況试疙,所以這里需要跟進(jìn)去
            partition.checkEnoughReplicasReachOffset(status.requiredOffset)
            // code
        }
        // code return false
    }

    // ..
}

def checkEnoughReplicasReachOffset(requiredOffset: Long): (Boolean, Errors) = {
    leaderReplicaIfLocal match {
        // code

        val minIsr = leaderReplica.log.get.config.minInSyncReplicas
        // 足夠數(shù)量的ISR同步到了待發(fā)送的這條消息
        if (leaderReplica.highWatermark.messageOffset >= requiredOffset) {
            /*
            * The topic may be configured not to accept messages if there are not enough replicas in ISR
            * in this scenario the request was already appended locally and then added to the purgatory before the ISR was shrunk
            */
            if (minIsr <= curInSyncReplicas.size)
            (true, Errors.NONE)
            else
            (true, Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND)
        } else
            (false, Errors.NONE)
        case None =>
        (false, Errors.NOT_LEADER_FOR_PARTITION)
    }
}

簡述這段邏輯就是诵棵,當(dāng)足夠數(shù)量的ISR同步到了待發(fā)送的這條消息,DelayedProduce會主動給生產(chǎn)者發(fā)送成功的響應(yīng)祝旷,也就是下面這段邏輯履澳。

// kafka.server.DelayedProduce#onComplete
override def onComplete() {
    val responseStatus = produceMetadata.produceStatus.mapValues(status => status.responseStatus)
    responseCallback(responseStatus)
}

生產(chǎn)者順利收到Broker的響應(yīng)后柱徙,消息就成功發(fā)送。這時奇昙,萬一Leader掛了护侮,就不怕了,剩下存活著的ISR中的某一個會被選為新的Leader(這個邏輯之后再聊)储耐,消費者照樣還是能消費到這條消息的羊初。

問題解決了么?還沒有什湘,請看下篇分解长赞。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市闽撤,隨后出現(xiàn)的幾起案子得哆,更是在濱河造成了極大的恐慌,老刑警劉巖哟旗,帶你破解...
    沈念sama閱讀 218,204評論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件贩据,死亡現(xiàn)場離奇詭異,居然都是意外死亡闸餐,警方通過查閱死者的電腦和手機(jī)饱亮,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,091評論 3 395
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來舍沙,“玉大人近上,你說我怎么就攤上這事》髡。” “怎么了壹无?”我有些...
    開封第一講書人閱讀 164,548評論 0 354
  • 文/不壞的土叔 我叫張陵,是天一觀的道長感帅。 經(jīng)常有香客問我斗锭,道長,這世上最難降的妖魔是什么留瞳? 我笑而不...
    開封第一講書人閱讀 58,657評論 1 293
  • 正文 為了忘掉前任拒迅,我火速辦了婚禮,結(jié)果婚禮上她倘,老公的妹妹穿的比我還像新娘璧微。我一直安慰自己,他們只是感情好硬梁,可當(dāng)我...
    茶點故事閱讀 67,689評論 6 392
  • 文/花漫 我一把揭開白布前硫。 她就那樣靜靜地躺著,像睡著了一般荧止。 火紅的嫁衣襯著肌膚如雪屹电。 梳的紋絲不亂的頭發(fā)上阶剑,一...
    開封第一講書人閱讀 51,554評論 1 305
  • 那天,我揣著相機(jī)與錄音危号,去河邊找鬼牧愁。 笑死,一個胖子當(dāng)著我的面吹牛外莲,可吹牛的內(nèi)容都是我干的猪半。 我是一名探鬼主播,決...
    沈念sama閱讀 40,302評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼偷线,長吁一口氣:“原來是場噩夢啊……” “哼磨确!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起声邦,我...
    開封第一講書人閱讀 39,216評論 0 276
  • 序言:老撾萬榮一對情侶失蹤乏奥,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后亥曹,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體邓了,經(jīng)...
    沈念sama閱讀 45,661評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,851評論 3 336
  • 正文 我和宋清朗相戀三年歇式,在試婚紗的時候發(fā)現(xiàn)自己被綠了驶悟。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片胡野。...
    茶點故事閱讀 39,977評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡材失,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出硫豆,到底是詐尸還是另有隱情龙巨,我是刑警寧澤,帶...
    沈念sama閱讀 35,697評論 5 347
  • 正文 年R本政府宣布熊响,位于F島的核電站旨别,受9級特大地震影響撮奏,放射性物質(zhì)發(fā)生泄漏牢酵。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,306評論 3 330
  • 文/蒙蒙 一搪搏、第九天 我趴在偏房一處隱蔽的房頂上張望洪碳。 院中可真熱鬧递览,春花似錦、人聲如沸瞳腌。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,898評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽嫂侍。三九已至儿捧,卻和暖如春荚坞,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背菲盾。 一陣腳步聲響...
    開封第一講書人閱讀 33,019評論 1 270
  • 我被黑心中介騙來泰國打工颓影, 沒想到剛下飛機(jī)就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人懒鉴。 一個月前我還...
    沈念sama閱讀 48,138評論 3 370
  • 正文 我出身青樓瞭空,卻偏偏與公主長得像,于是被迫代替她去往敵國和親疗我。 傳聞我的和親對象是個殘疾皇子咆畏,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,927評論 2 355