KafkaConsumer(消費(fèi)者)每次調(diào)用 poll()方法涨醋,它總是返回由生產(chǎn)者寫入 Kafka但還沒(méi)有被消費(fèi)者讀取過(guò)的記錄盆繁, 我們因 此可以追蹤到哪些記錄是被群組里的哪個(gè)消費(fèi)者讀取的细诸。之前已經(jīng)討論過(guò)倒庵, Kafka 不會(huì)像其他 JMS 隊(duì)列那樣需要得到消費(fèi)者的確認(rèn)绿贞,這是 Kafka 的一個(gè)獨(dú)特之處因块。相反,消 費(fèi)者可以使用 Kafka來(lái)追蹤消息在分區(qū)里的位置(偏移量)籍铁。
我們把更新分區(qū)當(dāng)前位置的操作叫作提交涡上。
那么消費(fèi)者是如何提交偏移量的呢?消費(fèi)者往一個(gè) 叫作 _consumer_offset 的特殊主題發(fā)送 消息趾断,消息里包含每個(gè)分區(qū)的偏移量。 如果消費(fèi)者一直處于運(yùn)行狀態(tài)吩愧,那么偏移量就沒(méi)有 什么用處芋酌。不過(guò),如果悄費(fèi)者發(fā)生崩潰或者有新 的消費(fèi)者加入群組耻警,就會(huì)觸發(fā)再均衡隔嫡,完 成再均衡之后,每個(gè)消費(fèi)者可能分配到新 的分區(qū)甘穿,而不是之前處理的那個(gè)腮恩。為了能夠繼續(xù) 之前的工作,消費(fèi)者需要讀取每個(gè)分區(qū)最后一次提交 的偏移量温兼,然后從偏移量指定的地方 繼續(xù)處理秸滴。
如果提交的偏移量小于客戶端處理 的最后一個(gè)消息的偏移量 ,那么處于兩個(gè)偏移量之間的 消息就會(huì)被重復(fù)處理募判,如圖 4-6所示荡含。
如果提交的偏移量大于客戶端處理的最后一個(gè)消息的偏移量,那么處于兩個(gè)偏移量之間的 消息將會(huì)丟失届垫,如圖 4-7 所示释液。
所以,處理偏移量的方式對(duì)客戶端會(huì)有很大的影響装处。 KafkaConsumer API提供了很多種方式來(lái)提交偏移量误债。
自動(dòng)提交
最簡(jiǎn)單的提交方式是讓悄費(fèi)者自動(dòng)提交偏移量。如果enable.auto.commit被設(shè)為 true
妄迁,那么每過(guò)5s寝蹈,消費(fèi)者會(huì)自動(dòng)把從 poll() 方法接收到的最大偏移量提交上去。提交時(shí)間間隔由 auto.commit.interval.ms 控制登淘,默認(rèn)值是 5s箫老。與消費(fèi)者里的其他東西 一樣,自動(dòng)提交也是在輪詢(poll() )里進(jìn)行的黔州。消費(fèi)者每次在進(jìn)行輪詢時(shí)會(huì)檢查是否該提交偏移量了耍鬓,如果是,那 么就會(huì)提交從上一次輪詢返回的偏移量辩撑。
不過(guò)界斜,在使用這種簡(jiǎn)便的方式之前,需要知道它將會(huì)帶來(lái)怎樣的結(jié)果合冀。
假設(shè)我們?nèi)匀皇褂媚J(rèn)的 5s提交時(shí)間間隔,在最近一次提交之后的 3s發(fā)生了再均衡项贺,再 均衡之后君躺,消費(fèi)者從最后一次提交的偏移量位置開(kāi)始讀取消息峭判。這個(gè)時(shí)候偏移量已經(jīng)落后 了 3s,所以在這 3s 內(nèi)到達(dá)的消息會(huì)被重復(fù)處理棕叫×煮Γ可以通過(guò)修改提交時(shí)間間隔來(lái)更頻繁地提交偏移量,減小可能出現(xiàn)重復(fù)消息的時(shí)間窗俺泣,不過(guò)這種情況是無(wú)也完全避免的 疗认。
在使用自動(dòng)提交時(shí) ,每次調(diào)用輪詢方怯都會(huì)把上一次調(diào)用返 回的偏移量提交上去伏钠,它并不 知道具體哪些消息已經(jīng)被處理了横漏,所以在再次調(diào)用之前最好確保所有當(dāng)前調(diào)用返回 的消息 都已經(jīng)處理完畢(在調(diào)用 close() 方法之前也會(huì)進(jìn)行自動(dòng)提交)。 一般情況下不會(huì)有什么問(wèn) 題熟掂,不過(guò)在處理異扯薪剑或提前退出輪詢時(shí)要格外小心 。
自動(dòng)提交雖然方便 赴肚, 不過(guò)并沒(méi)有為開(kāi)發(fā)者留有余地來(lái)避免重復(fù)處理消息素跺。
提交當(dāng)前偏移量
大部分開(kāi)發(fā)者通過(guò)控制偏移量提交時(shí)間來(lái)消除丟失消息的可能性,井在發(fā)生再均衡時(shí)減少 重復(fù)消息的數(shù)量誉券。消費(fèi)者 API提供了另一種提交偏移量的方式 指厌, 開(kāi)發(fā)者可以在必要的時(shí)候 提交當(dāng)前偏移盤,而不是基于時(shí)間間隔踊跟。
取消自動(dòng)提交踩验,把 auto.commit.offset 設(shè)為 false
,讓應(yīng)用程序決定何時(shí)提交 偏 移量琴锭。使用 commitSync() 提交偏移量最簡(jiǎn)單也最可靠晰甚。這個(gè) API會(huì)提交由 poll() 方法返回 的最新偏移量,提交成 功后馬上返回决帖,如果提交失敗就拋出異常厕九。
要記住, commitSync() 將會(huì)提交由 poll() 返回的最新偏移量 地回, 所以在處理完所有記錄后要 確保調(diào)用了 commitSync()扁远,否則還是會(huì)有丟失消息的風(fēng)險(xiǎn)。如果發(fā)生了再均衡刻像,從最近一 批消息到發(fā)生再均衡之間的所有消息都將被重復(fù)處理畅买。
下面是我們?cè)谔幚硗曜罱慌⒑笫褂?commitSync() 方法提交偏移量的例子。
異步提交
同步提交有一個(gè)不足之處细睡,在 broker對(duì)提交請(qǐng)求作出回應(yīng)之前谷羞,應(yīng)用程序會(huì)一直阻塞,這樣會(huì)限制應(yīng)用程序的吞吐量。我們可以通過(guò)降低提交頻率來(lái)提升吞吐量湃缎,但如果發(fā)生了再均衡犀填, 會(huì)增加重復(fù)消息的數(shù)量。
這個(gè)時(shí)候可以使用異步提交 API嗓违。我們只管發(fā)送提交請(qǐng)求九巡,無(wú)需等待 broker的響應(yīng)。
在成功提交或碰到無(wú)怯恢復(fù)的錯(cuò)誤之前蹂季, commitSync() 會(huì)一直重試(應(yīng)用程序也一直阻塞)冕广,但是 commitAsync() 不會(huì),這也是 commitAsync() 不好的 一個(gè)地方偿洁。它之所以不進(jìn)行重試撒汉,是因?yàn)樵谒盏?服務(wù)器響應(yīng)的時(shí)候,可能有一個(gè)更大的偏移量已經(jīng)提交成功父能。假設(shè)我們發(fā)出一個(gè)請(qǐng)求用于提交偏移量 2000神凑,這個(gè)時(shí)候發(fā)生了短暫的通信問(wèn)題 ,服務(wù)器收不到請(qǐng)求何吝,自然也不會(huì) 作出任何響應(yīng)溉委。與此同時(shí),我們處理了另外一批消息爱榕,并成功提交了偏移量 3000瓣喊。如果 commitAsync() 重新嘗試提交偏移量 2000,它有可能在偏移量 3000之后提交成功黔酥。這個(gè)時(shí) 候如果發(fā)生再均衡藻三,就會(huì)出現(xiàn)重復(fù)消息。
我們之所以提到這個(gè)問(wèn)題的復(fù)雜性和提交順序的重要性跪者,是因?yàn)?commitAsync()也支持回 調(diào)棵帽,在 broker 作出響應(yīng)時(shí)會(huì)執(zhí)行回調(diào)≡幔回調(diào)經(jīng)常被用于記錄提交錯(cuò)誤或生成度量指標(biāo)逗概, 不 過(guò)如果你要用它來(lái)進(jìn)行重試, 一定要注意提交的順序忘衍。
重試異步提交
我們可以使用一個(gè)單調(diào)遞增的序列號(hào)來(lái)維護(hù)異步提交的順序逾苫。在每次提交偏 移量之后或在回調(diào)里提交偏移量時(shí)遞增序列號(hào)。在進(jìn)行重試前枚钓,先檢查回調(diào) 的序列號(hào)和即將提交的偏移量是否相等铅搓,如果相等,說(shuō)明沒(méi)有新的提交搀捷,那么可以安全地進(jìn)行重試星掰。如果序列號(hào)比較大,說(shuō)明有一個(gè)新的提交已經(jīng)發(fā)送出去了,應(yīng)該停止重試蹋偏。
同步和異步組合提交
一般情況下便斥,針對(duì)偶爾出現(xiàn)的提交失敗至壤,不進(jìn)行重試不會(huì)有太大問(wèn)題威始,因?yàn)槿绻峤皇?是 因?yàn)榕R時(shí)問(wèn)題導(dǎo)致的,那么后續(xù)的提交總會(huì)有成功的像街。但如果這是發(fā)生在關(guān)閉消費(fèi)者或 再均衡前的最后一次提交黎棠,就要確保能夠提交成功。
因此镰绎,在消費(fèi)者關(guān)閉前一般會(huì)組合使用 commitAsync()和 commitSync()脓斩。它們的工作原理如下(后面講到再均衡監(jiān)聽(tīng)器時(shí),我們會(huì)討論如何在發(fā)生再均衡前提交偏移量):
提交特定的偏移量
提交偏移量的頻率與處理消息批次的頻率是一樣的畴栖。但如果想要更頻繁地提交出怎么辦随静?如果 poll() 方法返回一大批數(shù)據(jù),為了避免因再均衡引起的重復(fù)處理整批消息吗讶,想要在批次中間提交偏移量該怎么辦燎猛?這種情況無(wú)法通過(guò)調(diào)用 commitSync()或 commitAsync() 來(lái)實(shí)現(xiàn),因?yàn)樗鼈冎粫?huì)提交最后一個(gè)偏移量照皆,而此時(shí)該批次里的消息還沒(méi)有處理完重绷。
幸運(yùn)的是,消費(fèi)者 API 允許在調(diào)用 commitSync()和 commitAsync()方法時(shí)傳進(jìn)去希望提交 的分區(qū)和偏移量的 map膜毁。假設(shè)你處理了半個(gè)批次的消息昭卓, 最后一個(gè)來(lái)自主題“customers” 分區(qū) 3 的消息的偏移量是 5000, 你可以調(diào)用 commitSync() 方法來(lái)提交它瘟滨。不過(guò)候醒,因?yàn)橄M(fèi)者可能不只讀取一個(gè)分區(qū), 你需要跟蹤所有分區(qū)的偏移量杂瘸,所以在這個(gè)層面上控制偏移 量 的提交會(huì)讓代碼變復(fù)雜倒淫。
下面是提交特定偏移量的例子 :
再均衡監(jiān)聽(tīng)器
在提交偏移量一節(jié)中提到過(guò),消費(fèi)者在退出和進(jìn)行分區(qū)再均衡之前胧沫,會(huì)做一些清理工作昌简。
你會(huì)在消費(fèi)者失去對(duì)一個(gè)分區(qū)的所有權(quán)之前提交最后一個(gè)已處理記錄的偏移量。如果消費(fèi) 者準(zhǔn)備了 一 個(gè)緩沖區(qū)用于處理偶發(fā)的事件绒怨,那么在失去分區(qū)所有權(quán)之前纯赎, 需要處理在緩沖 區(qū)累積下來(lái)的記錄。你可能還需要關(guān)閉文件句柄南蹂、數(shù)據(jù)庫(kù)連接等犬金。
在為消費(fèi)者分配新分區(qū)或移除舊分區(qū)時(shí),可以通過(guò)消費(fèi)者 API執(zhí)行 一 些應(yīng)用程序代碼,在調(diào)用 subscribe()方法時(shí)傳進(jìn)去一個(gè)ConsumerRebalancelistener實(shí)例就可以了晚顷。 ConsumerRebalancelistener有兩個(gè)需要實(shí)現(xiàn)的方法峰伙。
(1) public void onPartitionsRevoked(Collection<TopicPartition> partitions)方法會(huì)在 再均衡開(kāi)始之前和消費(fèi)者停止讀取消息之后被調(diào)用。如果在這里提交偏移量该默,下一個(gè)接 管分區(qū) 的消費(fèi)者就知道該從哪里開(kāi)始讀取了瞳氓。
(2) public void onPartitionsAssigned(Collection<TopicPartition> partitions)方法會(huì)在 重新分配分區(qū)之后和消費(fèi)者開(kāi)始讀取消息之前被調(diào)用。
下面的例子將演示如何在失去分區(qū)所有權(quán)之前通過(guò) onPartitionsRevoked()方法來(lái)提交偏移量栓袖。在下一節(jié)匣摘,我們會(huì)演示另一個(gè)同時(shí)使用了 onPartitionsAssigned()方法的例子。
從特定偏移量處開(kāi)始處理記錄
到目前為止裹刮,我們知道了如何使用 poll() 方法從各個(gè)分區(qū)的最新偏移量處開(kāi)始處理消息音榜。 不過(guò),有時(shí)候我們也需要從特定的偏移量處開(kāi)始讀取消息捧弃。
如果你想從分區(qū)的起始位置開(kāi)始讀取消息赠叼,或者直接跳到分區(qū)的末尾開(kāi)始讀取消息封恰, 可以使 用 seekToBeginning(Collection<TopicPartition> tp) 和 seekToEnd(Collection<TopicPartition> tp) 這兩個(gè)方法彪杉。
不過(guò), Kafka也為我們提供了用 于查找特定偏移量的 API蹦掐。 它有很多用途葛家,比如向 后回退 幾個(gè)消息或者向前跳過(guò)幾個(gè)消息(對(duì)時(shí)間比較敏感的應(yīng)用程序在處理滯后的情況下希望能 夠向前跳過(guò)若干個(gè)消息)户辞。在使用 Kafka 以外的系統(tǒng)來(lái)存儲(chǔ)偏移量時(shí),它將給我們 帶來(lái)更 大的驚喜癞谒。
試想一下這樣的場(chǎng)景:應(yīng)用程序從 Kafka讀取事件(可能是網(wǎng)站的用戶點(diǎn)擊事件流 )底燎,對(duì) 它們進(jìn)行處理(可能是使用自動(dòng)程序清理點(diǎn)擊操作井添加會(huì)話信息),然后把結(jié)果保 存到 數(shù)據(jù)庫(kù)弹砚、 NoSQL 存儲(chǔ)引擎或 Hadoop双仍。假設(shè)我們真的不想丟失任何數(shù)據(jù),也不想在數(shù)據(jù)庫(kù) 里多次保存相同的結(jié)果桌吃。
這種情況下朱沃,消費(fèi)者的代碼可能是這樣的 :
在這個(gè)例子里,每處理一條記錄就提交一次偏移量茅诱。盡管如此逗物, 在記錄被保存到數(shù)據(jù)庫(kù)之后以及偏移量被提交之前 ,應(yīng)用程序仍然有可能發(fā)生崩潰瑟俭,導(dǎo)致重復(fù)處理數(shù)據(jù)翎卓,數(shù)據(jù)庫(kù)里就會(huì)出現(xiàn)重復(fù)記錄。
如果保存記錄和偏移量可以在一個(gè)原子操作里完成摆寄,就可以避免出現(xiàn)上述情況失暴。記錄和偏 移量要么 都被成功提交坯门,要么都不提交。如果記錄是保存在數(shù)據(jù)庫(kù)里而偏移量是提交到 Kafka 上逗扒,那么就無(wú)法實(shí)現(xiàn)原子操作古戴。
不過(guò) ,如果在同一個(gè)事務(wù)里把記錄和偏移量都寫到數(shù)據(jù)庫(kù)里會(huì)怎樣呢矩肩?那么我們就會(huì)知道 記錄和偏移量要么都成功提交现恼,要么都沒(méi)有,然后重新處理記錄蛮拔。
現(xiàn)在的問(wèn)題是:如果偏移量是保存在數(shù)據(jù)庫(kù)里而不是 Kafka里述暂,那么消費(fèi)者在得到新分區(qū) 時(shí)怎么知道該從哪里開(kāi)始讀取?這個(gè)時(shí)候可以使用 seek() 方法。在消費(fèi)者啟動(dòng)或分配到新 分區(qū)時(shí) 建炫,可以使用 seek()方法查找保存在數(shù)據(jù)庫(kù)里的偏移量。
下面的例子大致說(shuō)明了如何使用這個(gè) API疼蛾。 使用 ConsumerRebalancelistener和 seek() 方 戰(zhàn)確保我們是從數(shù)據(jù)庫(kù)里保存的偏移量所指定的位置開(kāi)始處理消息的肛跌。
通過(guò)把偏移量和記錄保存到同 一個(gè)外部系統(tǒng)來(lái)實(shí)現(xiàn)單次語(yǔ)義可以有很多種方式,不過(guò)它們 都需要結(jié)合使用 ConsumerRebalancelistener和 seek() 方法來(lái)確保能夠及時(shí)保存偏移量察郁, 井保證消費(fèi)者總是能夠從正確的位置開(kāi)始讀取消息衍慎。
如何退出
在之前討論輪詢時(shí)就說(shuō)過(guò),不需要擔(dān)心消費(fèi)者會(huì)在一個(gè)無(wú)限循環(huán)里輪詢消息皮钠,我們會(huì)告訴消費(fèi)者如何優(yōu)雅地退出循環(huán)稳捆。
如果確定要退出循環(huán),需要通過(guò)另一個(gè)線程調(diào)用 consumer.wakeup()方法麦轰。如果循環(huán)運(yùn)行 在主線程里乔夯,可以在 ShutdownHook里調(diào)用該方法。要記住款侵, consumer.wakeup() 是消費(fèi)者 唯一一個(gè)可以從其他線程里安全調(diào)用的方法末荐。調(diào)用 consumer.wakeup()可以退出 poll(), 并拋出 WakeupException異常,或者如果調(diào)用 cconsumer.wakeup() 時(shí)線程沒(méi)有等待輪詢新锈, 那 么異常將在下一輪調(diào)用 poll()時(shí)拋出甲脏。我們不需要處理 WakeupException,因?yàn)樗皇怯糜谔鲅h(huán)的一種方式妹笆。不過(guò)块请, 在退出線程之前調(diào)用 consumer.close()是很有必要的, 它 會(huì)提交任何還沒(méi)有提交的東西 拳缠, 并向群組協(xié)調(diào)器(broker)發(fā)送消息墩新,告知自己要離開(kāi)群組,接下來(lái) 就會(huì)觸發(fā)再均衡 脊凰,而不需要等待會(huì)話超時(shí)抖棘。
下面是運(yùn)行在主線程上的消費(fèi)者退出線程的代碼 茂腥。 這些代碼經(jīng)過(guò)了簡(jiǎn)化,你可以在這里查 看完整的代碼: http://bit.ly/2u47e9A切省。