今天我司線上kafka消息代理出現(xiàn)錯誤日志,異常rebalance安吁,而且平均間隔2到3分鐘就會rebalance一次未辆,分析日志發(fā)現(xiàn)比較嚴重芹壕。錯誤日志如下
08-09 11:01:11 131 pool-7-thread-3 ERROR [] -
commit failed
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:713) ~[MsgAgent-jar-with-dependencies.jar:na]
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:596) ~[MsgAgent-jar-with-dependencies.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1218) ~[MsgAgent-jar-with-dependencies.jar:na]
at com.today.eventbus.common.MsgConsumer.run(MsgConsumer.java:121) ~[MsgAgent-jar-with-dependencies.jar:na]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_161]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_161]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_161]
這個錯誤的意思是汇四,消費者在處理完一批poll的消息后,在同步提交偏移量給broker時報的錯踢涌。初步分析日志是由于當前消費者線程消費的分區(qū)已經(jīng)被broker
給回收了通孽,因為kafka認為這個消費者死了,那么為什么呢睁壁?
分析問題
這里就涉及到問題是消費者在創(chuàng)建時會有一個屬性max.poll.interval.ms
背苦,
該屬性意思為kafka消費者在每一輪poll()
調(diào)用之間的最大延遲,消費者在獲取更多記錄之前可以空閑的時間量的上限互捌。如果此超時時間期滿之前poll()
沒有被再次調(diào)用,則消費者被視為失敗行剂,并且分組將重新平衡秕噪,以便將分區(qū)重新分配給別的成員。
如上圖厚宰,在while循環(huán)里腌巾,我們會循環(huán)調(diào)用poll拉取broker中的最新消息。每次拉取后固阁,會有一段處理時長壤躲,處理完成后,會進行下一輪poll备燃。引入該配置的用途是,限制兩次poll之間的間隔凌唬,消息處理邏輯太重并齐,每一條消息處理時間較長,但是在這次poll()到下一輪poll()時間不能超過該配置間隔客税,協(xié)調(diào)器會明確地讓使用者離開組况褪,并觸發(fā)新一輪的再平衡。
max.poll.interval.ms
默認間隔時間為300s
分析日志
從日志中我們能看到poll量有時能夠達到250多條
一次性拉取250多條消息進行消費更耻,而由于每一條消息都有一定的處理邏輯测垛,根據(jù)以往的日志分析,每條消息平均在500ms內(nèi)就能處理完成秧均。然而食侮,我們今天查到有兩條消息處理時間超過了1分鐘。
消息處理日志1
08-09 08:50:05 430 pool-7-thread-3 INFO [] - [RestKafkaConsumer] receive message (收到消息目胡,準備過濾锯七,然后處理), topic: member_1.0.0_event ,partition: 0 ,offset: 1504617
08-09 08:50:05 431 pool-7-thread-3 INFO [] - [RestKafkaConsumer]:解析消息成功,準備請求調(diào)用!
08-09 08:51:05 801 pool-7-thread-3 INFO [] - [HttpClient]:response code: {"status":200,"data":{"goodsSendRes":{"status":400,"info":"指>定商品送沒有可用的營銷活動--老pos機"},"fullAmountSendRes":{"status":400,"info":"滿額送沒有可用的營銷活動--老pos機"}},"info":"發(fā)券流程執(zhí)
行成功"}, event:com.today.api.member.events.ConsumeFullEvent, url:https://wechat-lite.today36524.com/api/dapeng/subscribe/index,event內(nèi)
容:{"id":36305914,"score":16,"orderPrice":15.9,"payTime":1533775401000,"thirdTransId":"4200000160201808
消息處理日志2
08-09 08:51:32 450 pool-7-thread-3 INFO [] - [RestKafkaConsumer] receive message (收到消息,準備過濾誉己,然后處理), topic: member_1.0.0_event ,partition: 0 ,offset: 1504674
08-09 08:51:32 450 pool-7-thread-3 INFO [] - [RestKafkaConsumer]:解析消息成功,準備請求調(diào)用!
08-09 08:52:32 843 pool-7-thread-3 INFO [] - [HttpClient]:response code: {"status":200,"data":{"goodsSendRes":{"status":400,"info":"指>定商品送沒有可用的營銷活動--老pos機"},"fullAmountSendRes":{"status":400,"info":"滿額送沒有可用的營銷活動--老pos機"}},"info":"發(fā)券流程執(zhí)
行成功"}, event:com.today.api.member.events.ConsumeFullEvent, url:https://wechat-lite.today36524.com/api/dapeng/subscribe/index,event內(nèi)
容:{"id":36306061,"score":3,"orderPrice":3.0,"payTime":1533775482000,"thirdTransId":"420000016320180809
我們看到消息消費時間都超過了1分鐘眉尸。
分析原因
如下是我們消費者處理邏輯(省略部分代碼)
while (isRunning) {
ConsumerRecords<KEY, VALUE> records = consumer.poll(100);
if (records != null && records.count() > 0) {
for (ConsumerRecord<KEY, VALUE> record : records) {
dealMessage(bizConsumer, record.value());
try {
//records記錄全部完成后,才提交
consumer.commitSync();
} catch (CommitFailedException e) {
logger.error("commit failed,will break this for loop", e);
break;
}
}
}
poll()
方法該方法輪詢返回消息集巨双,調(diào)用一次可以獲取一批消息噪猾。
kafkaConsumer
調(diào)用一次輪詢方法只是拉取一次消息≈郏客戶端為了不斷拉取消息袱蜡,會用一個外部循環(huán)不斷調(diào)用消費者的輪詢方法。每次輪詢到消息疼阔,在處理完這一批消息后戒劫,才會繼續(xù)下一次輪詢半夷。但如果一次輪詢返回的結(jié)構(gòu)沒辦法及時處理完成,會有什么后果呢迅细?服務(wù)端約定了和客戶端max.poll.interval.ms
巫橄,兩次poll
最大間隔。如果客戶端處理一批消息花費的時間超過了這個限制時間茵典,服務(wù)端可能就會把消費者客戶端移除掉湘换,并觸發(fā)rebalance
。
拉取偏移量與提交偏移量
kafka
的偏移量(offset
)是由消費者進行管理的统阿,偏移量有兩種彩倚,拉取偏移量
(position)與提交偏移量
(committed)。拉取偏移量代表當前消費者分區(qū)消費進度扶平。每次消息消費后帆离,需要提交偏移量。在提交偏移量時结澄,kafka
會使用拉取偏移量
的值作為分區(qū)的提交偏移量
發(fā)送給協(xié)調(diào)者哥谷。
如果沒有提交偏移量,下一次消費者重新與broker連接后麻献,會從當前消費者group已提交到broker的偏移量處開始消費们妥。
所以,問題就在這里勉吻,當我們處理消息時間太長時,已經(jīng)被broker剔除监婶,提交偏移量又會報錯。所以拉取偏移量沒有提交到broker齿桃,分區(qū)又rebalance惑惶。下一次重新分配分區(qū)時,消費者會從最新的已提交偏移量處開始消費源譬。這里就出現(xiàn)了重復消費的問題集惋。
解決方案
1.增加max.poll.interval.ms
處理時長
kafka消費者 默認此間隔時長為300s,本次故障是300s都沒處理完成踩娘,于是改成500s刮刑。
max.poll.interval.ms=500000
2.設(shè)置分區(qū)拉取閾值
kafkaConsumer調(diào)用一次輪詢方法只是拉取一次消息⊙剩客戶端為了不斷拉取消息雷绢,會用一個外部循環(huán)不斷調(diào)用輪詢方法poll()。每次輪詢后理卑,在處理完這一批消息后翘紊,才會繼續(xù)下一次的輪詢。
max.poll.records = 50
3.poll到的消息藐唠,處理完一條就提交一條帆疟,當出現(xiàn)提交失敗時鹉究,馬上跳出循環(huán),這時候kafka就會進行rebalance
,下一次會繼續(xù)從當前offset
進行消費踪宠。
while (isRunning) {
ConsumerRecords<KEY, VALUE> records = consumer.poll(100);
if (records != null && records.count() > 0) {
for (ConsumerRecord<KEY, VALUE> record : records) {
dealMessage(bizConsumer, record.value());
try {
//records記錄全部完成后自赔,才提交
consumer.commitSync();
} catch (CommitFailedException e) {
logger.error("commit failed,will break this for loop", e);
break;
}
}
}
附錄 查詢?nèi)罩?某個topic的 partition
的rebalance過程
member_1分區(qū)
時間 | revoked position | revoked committed | 時間 | assigned |
---|---|---|---|---|
08:53:21 | 1508667 | 1508509 | 08:57:17 | 1508509 |
09:16:31 | 1509187 | 1508509 | 09:21:02 | 1508509 |
09:23:18 | 1509323 | 1508509 | 09:26:02 | 1508509 |
09:35:16 | 1508509 | 1508509 | 09:36:03 | 1508509 |
09:36:21 | 1508509 | 1508509 | 09:41:03 | 1508509 |
09:42:15 | 1509323 | 1508509 | 09:46:03 | 1508509 |
09:47:19 | 1508509 | 1508509 | 09:51:03 | 1508509 |
09:55:04 | 1509323 | 1509323 | 09:56:03 | 1509323 |
多余消費 | 被回滾 | 重復消費 | 10:01:03 | 1509323 |
10:02:20 | 1510205 | 1509323 | 10:06:03 | 1509323 |
10:07:29 | 1509323 | 1509323 | 10:08:35 | 1509323 |
10:24:43 | 1509693 | 1509693 | 10:25:18 | 1509693 |
10:28:38 | 1510604 | 1510604 | 10:35:18 | 1510604 |
10:36:37 | 1511556 | 1510604 | 10:40:18 | 1510604 |
10:54:26 | 1511592 | 1511592 | 10:54:32 | 1511592 |
- | - | - | 10:59:32 | 1511979 |
11:01:11 | 1512178 | 1512178 | 11:03:40 | 1512178 |
11:04:35 | 1512245 | 1512245 | 11:08:49 | 1512245 |
11:12:47 | 1512407 | 1512407 | 11:12:49 | 1512407 |