一抚垄,kafka的java客戶端--消費者的實現(xiàn)
1. 消費者的基本實現(xiàn)
2. 關(guān)于消費者自動提交和手動提交offset
1)提交的內(nèi)容
消費者無論是自動提交還是手動提交,都需要把所屬的消費組(Consumer Group)+消費的某個主題(topic)+消費的某個分區(qū)(partition)及消費的偏移量(offset)氮采,這樣的信息提交給集群的_consumer_offsets主題里面殷绍。
2)自動提交
消費者poll消息下來以后就會自動提交offset
注意:自動提交會丟失消息。因為消費者在消費前提交offset鹊漠,有可能提交完后還沒消費時主到,消費者掛了。
3)手動提交
需要把自動提交的配置改成false
手動又分成了兩種
?手動同步提交
在消費完消息后調(diào)用同步提交的方法躯概,當(dāng)集群返回ack前一直阻塞登钥,返回ack后表示提交成功,執(zhí)行之后的邏輯
?手動異步提交
在消息消費完后提交娶靡,不需要等到集群ack牧牢,直接執(zhí)行之后的邏輯,可以設(shè)置一個回調(diào)方法,供集群調(diào)用塔鳍。
3. 長輪詢poll消息
?默認(rèn)情況下伯铣,消費者一次會poll500條消息
?代碼中設(shè)置了長輪詢的時間是1000毫秒
意味著:
? ???如果一次poll到500條,就直接執(zhí)行for循環(huán)
? ???如果這一次沒有poll到500條轮纫,且時間在1s內(nèi)腔寡,那么長輪詢繼續(xù)poll,要么到500條掌唾,要么到1s
? ???如果多次poll都沒達(dá)到500條放前,且1s時間到了,那么直接執(zhí)行for循環(huán)
如果兩次poll的間隔超過30s糯彬,集群會認(rèn)為消費者的消費能力過弱犀斋,該消費者被踢出消費組,觸發(fā)rebalance機制情连。rebalance機制會造成性能開銷,可以通過設(shè)置這個 參數(shù)览效,讓一次poll的消息條數(shù)少一點却舀。
4. 消費者的健康狀態(tài)檢查
消費者每隔1s向kafka集群發(fā)送心跳,集群發(fā)現(xiàn)如果超過10s沒有續(xù)約的消費者锤灿,將被踢出消費組挽拔,觸發(fā)該消費組的rebalance機制,將該分區(qū)交給消費組里的其他消費者進(jìn)行消費但校。
5. 指定分區(qū)和偏移量螃诅,時間消費
?指定分區(qū)消費
?從頭消費(消息回溯消費)
?指定offset消費(需要指定分區(qū),然后指定offset)
指定時間消費
根據(jù)時間状囱,去所有的partition中確定該時間的offset术裸,然后去所有的partition中找到該offset之后的消息開始消費。
6. 新消費組的消費offset規(guī)則
新消費組中的消費者在啟動以后亭枷,默認(rèn)會從當(dāng)前分區(qū)的最后一條消息的offset+1開始消費(消費新消息)袭艺。可以通過以下的設(shè)置叨粘,讓新的消費者第一次從頭開始消費猾编。之后開始消費新消息(最后消息的位置的偏移量+1)。
?Latest : 默認(rèn)的升敲,消費新消息
?earliest :?第一次從頭開始消費答倡。之后開始消費新消息(最后消息的位置的偏移量+1)
二,Springboot中使用kafka
1. 引入依賴
2. 編寫配置文件
3. 編寫生產(chǎn)者
4. 編寫消費者