簡介
由于項目中需要使用kafka作為消息隊列愉择,并且項目是基于spring-boot來進(jìn)行構(gòu)建的,所以項目采用了spring-kafka作為原生kafka的一個擴展庫進(jìn)行使用壳鹤。先說明一下版本:
- spring-boot 的版本是1.4.0.RELEASE
- kafka 的版本是0.9.0.x 版本
- spring-kafka 的版本是1.0.3.RELEASE
用過kafka的人都知道,對于使用kafka來說,producer的使用相對簡單一些,只需要把數(shù)據(jù)按照指定的格式發(fā)送給kafka中某一個topic就可以了砖茸。本文主要是針對spring-kafka的consumer端上的使用進(jìn)行簡單一些分析和總結(jié)。
kafka的速度是很快脯倚,所以一般來說producer的生產(chǎn)消息的邏輯速度都會比consumer的消費消息的邏輯速度快。
具體案例
之前在項目中遇到了一個案例是嵌屎,consumer消費一條數(shù)據(jù)平均需要200ms的時間推正,并且在某個時刻,producer會在短時間內(nèi)產(chǎn)生大量的數(shù)據(jù)丟進(jìn)kafka的broker里面(假設(shè)平均1s中內(nèi)丟入了5w條需要消費的消息宝惰,這個情況會持續(xù)幾分鐘)植榕。
對于這種情況,kafka的consumer的行為會是:
- kafka的consumer會從broker里面取出一批數(shù)據(jù)尼夺,?給消費線程進(jìn)行消費尊残。
- 由于取出的一批消息數(shù)量太大炒瘸,consumer在session.timeout.ms時間之內(nèi)沒有消費完成
- consumer coordinator 會由于沒有接受到心跳而掛掉,并且出現(xiàn)一些日志
日志的意思大概是coordinator掛掉了寝衫,然后自動提交offset失敗顷扩,然后重新分配partition給客戶端 - 由于自動提交offset失敗,導(dǎo)致重新分配了partition的客戶端又重新消費之前的一批數(shù)據(jù)
- 接著consumer重新消費慰毅,又出現(xiàn)了消費超時隘截,無限循環(huán)下去。
解決方案
遇到了這個問題之后汹胃, 我們做了一些步驟:
- 提高了partition的數(shù)量婶芭,從而提高了consumer的并行能力,從而提高數(shù)據(jù)的消費能力
- 對于單partition的消費線程着饥,增加了一個固定長度的阻塞隊列和工作線程池進(jìn)一步提高并行消費的能力
- 由于使用了spring-kafka犀农,則把kafka-client的enable.auto.commit設(shè)置成了false,表示禁止kafka-client自動提交offset宰掉,因為就是之前的自動提交失敗呵哨,導(dǎo)致offset永遠(yuǎn)沒更新,從而轉(zhuǎn)向使用spring-kafka的offset提交機制贵扰。并且spring-kafka提供了多種提交策略:
這些策略保證了在一批消息沒有完成消費的情況下仇穗,也能提交offset,從而避免了完全提交不上而導(dǎo)致永遠(yuǎn)重復(fù)消費的問題戚绕。
分析
那么問題來了纹坐,為什么spring-kafka的提交offset的策略能夠解決spring-kafka的auto-commit的帶來的重復(fù)消費的問題呢?下面通過分析spring-kafka的關(guān)鍵源碼來解析這個問題舞丛。
首先來看看spring-kafka的消費線程邏輯
if (isRunning() && this.definedPartitions != null) {
initPartitionsIfNeeded();
// we start the invoker here as there will be no rebalance calls to
// trigger it, but only if the container is not set to autocommit
// otherwise we will process records on a separate thread
if (!this.autoCommit) {
startInvoker();
}
}
上面可以看到耘子,如果auto.commit關(guān)掉的話,spring-kafka會啟動一個invoker球切,這個invoker的目的就是啟動一個線程去消費數(shù)據(jù)谷誓,他消費的數(shù)據(jù)不是直接從kafka里面直接取的,那么他消費的數(shù)據(jù)從哪里來呢吨凑?他是從一個spring-kafka自己創(chuàng)建的阻塞隊列里面取的捍歪。
然后會進(jìn)入一個循環(huán),從源代碼中可以看到如果auto.commit被關(guān)掉的話鸵钝, 他會先把之前處理過的數(shù)據(jù)先進(jìn)行提交offset糙臼,然后再去從kafka里面取數(shù)據(jù)。
然后把取到的數(shù)據(jù)丟給上面提到的阻塞列隊恩商,由上面創(chuàng)建的線程去消費变逃,并且如果阻塞隊列滿了導(dǎo)致取到的數(shù)據(jù)塞不進(jìn)去的話,spring-kafka會調(diào)用kafka的pause方法怠堪,則consumer會停止從kafka里面繼續(xù)再拿數(shù)據(jù)揽乱。
接著spring-kafka還會處理一些異常的情況名眉,比如失敗之后是不是需要commit offset這樣的邏輯。
方法二
- 可以根據(jù)消費者的消費速度對session.timeout.ms的時間進(jìn)行設(shè)置凰棉,適當(dāng)延長
- 或者減少每次從partition里面撈取的數(shù)據(jù)分片的大小损拢,提高消費者的消費速度。