坑s
關(guān)于消費(fèi)終止
- 對(duì)于消息流來(lái)說(shuō)魂莫,由于其中的一個(gè)消息處理異常導(dǎo)致該條消息失敗,同時(shí)無(wú)法繼續(xù)消費(fèi)后續(xù)消息流耙考;
-
kafka
有liveLock
的概念,如果消費(fèi)端在sessionTimeout
時(shí)間段內(nèi)沒(méi)有poll
消息倦始,kafka
會(huì)認(rèn)為其掛掉了,然后釋放其持有的partition
鞋邑?然而其他消費(fèi)端拿到patition后依然會(huì)消費(fèi)失敗枚碗? - 消費(fèi)失敗的原因是什么铸本?(邏輯問(wèn)題還是非邏輯問(wèn)題),
如果是非邏輯問(wèn)題那么肯定需要支持重試
箱玷,如果是邏輯問(wèn)題那么重試也沒(méi)用; - 消費(fèi)失敗后如果把該條消息存起來(lái)锡足,然后略過(guò)該消息去消費(fèi)后面的消息,那么可能會(huì)面臨消費(fèi)順序的問(wèn)題(可能有)以及手動(dòng)補(bǔ)償?shù)膯?wèn)題舶得;
關(guān)于重復(fù)消費(fèi)
- 目前
kafka-client
的偏移量提交都是批量提交本次poll
的maxPollRecord
條消息; - 不論是
commitSync
和commitAsync
扩灯,都會(huì)存在從kafka
拉取一批數(shù)據(jù)maxPollRecord
,只消費(fèi)成功了一部分?jǐn)?shù)據(jù)珠插,之后崩掉沒(méi)commit
的情況,然后下次又會(huì)重復(fù)消費(fèi)上一次消費(fèi)成功的消息捻撑; - 要避免重復(fù)消費(fèi)缤底,可以考慮拉一批回來(lái)番捂,消費(fèi)時(shí)在同一事務(wù)(
maxPollRecord
不可太大,否則大事務(wù))设预;或者maxPollRecord=1
,事務(wù)性的消費(fèi)(提交偏移量需要在事務(wù)內(nèi))鳖枕; - 或者避免不了重復(fù)消費(fèi),需要保證重復(fù)消費(fèi)時(shí)的冪等性(記錄消費(fèi)成功的消息【最好有一個(gè)唯一主鍵宾符,關(guān)于唯一主鍵的生成...】);
關(guān)于重復(fù)消費(fèi)
和消費(fèi)終止
- 如果
消費(fèi)終止
魏烫,并且maxPollRecord>1
,那么很大可能性就會(huì)存在重復(fù)消費(fèi)
的問(wèn)題哄褒; - 如果
不消費(fèi)終止
,那么就需要有補(bǔ)償機(jī)制读处,并且需要記下日志唱矛,方便后續(xù)的補(bǔ)償;
關(guān)于消費(fèi)場(chǎng)景
的問(wèn)題
- 對(duì)于
消費(fèi)順序和消息依賴(lài)
沒(méi)有要求的消費(fèi)端:- 對(duì)于消息丟失可以容忍的場(chǎng)景(后續(xù)查詢(xún)時(shí)強(qiáng)制刷新)绎谦,可以忽略該條消息;
- 對(duì)于消息丟失無(wú)法容忍的場(chǎng)景窃肠,且不希望
消息終止
,那么需要吞掉消費(fèi)時(shí)的異常冤留,不過(guò)需要注意的是,這種情況需要記錄消費(fèi)成功的消息狀態(tài)纤怒、記錄消費(fèi)失敗的消息詳情,同時(shí)針對(duì)消費(fèi)失敗的消息詳情泊窘,需要有補(bǔ)償措施像寒;
- 對(duì)于
消費(fèi)順序和消息依賴(lài)
有要求的消費(fèi)端:消息順序分全局有序和分區(qū)有序瓜贾,或者需要等待前面的消息消費(fèi)完畢后才能消費(fèi)后面的消息。這兩類(lèi)情況只能等待當(dāng)前消息消費(fèi)成功祭芦,如果消費(fèi)異常了,那么需要等待其消費(fèi)成功(對(duì)于非邏輯錯(cuò)誤可以考慮重試实束,如果為邏輯錯(cuò)誤那么只能等待修復(fù)該bug);