1.消費(fèi)者和消費(fèi)者組的關(guān)系
Kafka并不刪除已消費(fèi)的消息,為了實(shí)現(xiàn)傳統(tǒng)Message Queue消息只被消費(fèi)一次的語義张咳,Kafka保證每條消息在同一個Consumer Group里只會被某一個Consumer消費(fèi)。與傳統(tǒng)Message Queue不同的是矢洲,Kafka還允許不同Consumer Group同時消費(fèi)同一條消息炊甲,這一特性可以為消息的多元化處理提供支持。
消費(fèi)者組
測試:創(chuàng)建一個Topic (名為topic1)琴拧,再創(chuàng)建一個屬于group1的Consumer實(shí)例,并創(chuàng)建三個屬于group2的Consumer實(shí)例嘱支,然后通過Producer向topic1發(fā)送Key分別為1蚓胸,2,3的消息除师。結(jié)果發(fā)現(xiàn)屬于group1的Consumer收到了所有的這三條消息沛膳,同時group2中的3個Consumer分別收到了Key為1,2汛聚,3的消息锹安,如下圖所示。
消費(fèi)者組測試
2.kafka消費(fèi)者Consumer Rebalance
(1)Kafka的Rebalance(再均衡:在同一個消費(fèi)者組當(dāng)中,分區(qū)的所有權(quán)從一個消費(fèi)者轉(zhuǎn)移到另外一個消費(fèi)者)機(jī)制叹哭,Rebalance顧名思義就是重新均衡消費(fèi)者消費(fèi)忍宋。Rebalance的過程如下:
漸進(jìn)式rebalance
第一步:所有成員都向coordinator發(fā)送請求,請求入組风罩。一旦所有成員都發(fā)送了請求糠排,coordinator會從中選擇一個consumer擔(dān)任leader的角色,并把組成員信息以及訂閱信息發(fā)給leader超升。
第二步:leader開始分配消費(fèi)方案入宦,指明具體哪個consumer負(fù)責(zé)消費(fèi)哪些topic的哪些partition。一旦完成分配室琢,leader會將這個方案發(fā)給coordinator乾闰。coordinator接收到分配方案之后會把方案發(fā)給各個consumer,這樣組內(nèi)的所有成員就都知道自己應(yīng)該消費(fèi)哪些分區(qū)了盈滴。
(2)以下的場景會觸發(fā)Consumer Rebalance操作:
①新的消費(fèi)者加入Consumer Group涯肩;②有消費(fèi)者主動退出Consumer Group;③Consumer Group訂閱的任何一個Topic出現(xiàn)分區(qū)數(shù)量的變化雹熬。
(3)默認(rèn)情況下宽菜,Kafka提供了兩種分配策略:Range和RoundRobin。
Range策略:①對一個topic中的partition進(jìn)行排序竿报;②對消費(fèi)者按字典進(jìn)行排序铅乡;③遍歷排序后的partition的方式分配給消費(fèi)者。
例子:有兩個消費(fèi)者C0和C1烈菌,兩個topic(t0,t1)阵幸,每個topic有三個分區(qū)p(0-2),那么采用Range策略芽世,分配出的結(jié)果為:
C0: [t0-p0, t0-p1, t1-p0, t1-p1]
C1: [t0-p2, t1-p2]
RoundRobin策略:RoundRobin策略和Range策略類型挚赊,唯一的區(qū)別就是Range策略分配partition時,是按照topic逐次劃分的济瓢。RoundRobin策略則是將所有topic的所有分區(qū)一起排序荠割,然后遍歷partition分配給消費(fèi)者。
例子:有兩個消費(fèi)者C0和C1旺矾,兩個topic(t0,t1)蔑鹦,每個topic有三個分區(qū)p(0-2),那么采用RoundRobin策略箕宙,分配出的結(jié)果為:
C0: [t0-p0, t0-p2, t1-p1]
C1: [t0-p1, t1-p0, t1-p2]
(4)Kafka對消息的分配是以Partition為單位分配的嚎朽,而非以每一條消息作為分配單元。這樣設(shè)計的劣勢:無法保證同一個Consumer Group里的Consumer均勻消費(fèi)數(shù)據(jù)柬帕。優(yōu)勢:①每個Consumer不用都跟大量的Broker通信哟忍,減少通信開銷狡门;②降低了分配難度,實(shí)現(xiàn)也更簡單锅很;③因?yàn)橥粋€Partition里的數(shù)據(jù)是有序的其馏,這種設(shè)計可以保證每個Partition里的數(shù)據(jù)可以被有序消費(fèi)。
如果某Consumer Group中Consumer實(shí)例數(shù)量少于Partition數(shù)量粗蔚,則至少有一個Consumer會消費(fèi)多個Partition的數(shù)據(jù)尝偎,如果Consumer的數(shù)量與Partition數(shù)量相同,則正好一個Consumer消費(fèi)一個Partition的數(shù)據(jù)鹏控。而如果Consumer的數(shù)量多于Partition的數(shù)量時,會有部分Consumer無法消費(fèi)該Topic下任何一條消息肤寝。
3.非線程安全的Kafka Consumer實(shí)現(xiàn)多線程消費(fèi)
(1)線程封閉当辐,即為每個線程實(shí)例化一個Consumer對象。說明:一個線程對應(yīng)一個Consumer 實(shí)例鲤看,稱之為消費(fèi)線程缘揪。一個消費(fèi)線程可以消費(fèi)一個或多個分區(qū)中的消息,所有的消費(fèi)線程都隸屬于同一個消費(fèi)組义桂。
封閉線程消費(fèi)
(2)消費(fèi)者程序使用單或多線程獲取消息找筝,同時創(chuàng)建多個消費(fèi)線程執(zhí)行消息處理邏輯。獲取消息的線程可以是一個慷吊,也可以是多個袖裕,每個線程維護(hù)專屬的Consumer實(shí)例,處理消息則交由特定的線程池來做溉瓶,從而實(shí)現(xiàn)消息獲取與消息處理的真正解耦急鳄。
線程池消費(fèi)
4.指定offset或者timestamp消費(fèi)數(shù)據(jù)
指定offset:Kafka是通過seek() 方法來指定消費(fèi)的,在執(zhí)行seek() 方法之前要去執(zhí)行一次poll()方法堰酿,等到分配到分區(qū)之后會去對應(yīng)partition的指定offset開始消費(fèi)疾宏。
指定timestamp:Kafka的offsetsForTimes() 方法,通過timestamp 來查詢與此對應(yīng)的分區(qū)位置触创。offsetsForTimes() 方法的參數(shù) timestampsToSearch 是一個Map類型坎藐,key為待查詢的partition,而 value為待查詢的時間戳哼绑,該方法會返回時間戳大于等于待查詢時間的第一條消息對應(yīng)的位置和時間戳岩馍。
5.消費(fèi)者提交消費(fèi)位移時提交的是當(dāng)前消費(fèi)到的最新消息的offset還是offset+1
在舊消費(fèi)者客戶端中,消費(fèi)位移是存儲在 ZooKeeper 中的凌那。而在新消費(fèi)者客戶端中兼雄,消費(fèi)位移存儲在 Kafka 內(nèi)部的主題__consumer_offsets 中。
當(dāng)前消費(fèi)者需要提交的消費(fèi)位移是offset+1帽蝶。
6.造成重復(fù)消費(fèi)的場景
(1)Rebalance:一個consumer正在消費(fèi)一個partition的消息A赦肋,還沒有消費(fèi)完块攒,發(fā)生了rebalance(加入了一個consumer),從而導(dǎo)致消息A沒有消費(fèi)成功佃乘,rebalance后囱井,另一個consumer又把這條消息消費(fèi)一遍。
(2)消費(fèi)者端手動提交:如果先消費(fèi)消息趣避,再更新offset位置庞呕,導(dǎo)致消息重復(fù)消費(fèi)。
(3)消費(fèi)者端自動提交:設(shè)置offset為自動提交程帕,關(guān)閉kafka時住练,如果在close之前,調(diào)用 consumer.unsubscribe() 則有可能部分offset沒提交愁拭,下次重啟會重復(fù)消費(fèi)讲逛。
(4)生產(chǎn)者端:生產(chǎn)者因?yàn)闃I(yè)務(wù)問題導(dǎo)致的宕機(jī),在重啟之后可能數(shù)據(jù)會重發(fā)岭埠。
7.造成消息漏消費(fèi)的場景
(1)自動提交:設(shè)置offset為自動定時提交盏混,當(dāng)offset被自動定時提交時,數(shù)據(jù)還在內(nèi)存中未處理惜论,此時剛好把線程kill掉许赃,那么offset已經(jīng)提交,但是數(shù)據(jù)未處理馆类,導(dǎo)致這部分內(nèi)存中的數(shù)據(jù)丟失混聊。
(2)生產(chǎn)者端:發(fā)送消息設(shè)置的是fire-and-forget(發(fā)后即忘),它只管往 Kafka 中發(fā)送消息而并不關(guān)心消息是否正確到達(dá)蹦掐。不過在某些時候(比如發(fā)生不可重試異常時)會造成消息的丟失技羔。這種發(fā)送方式的性能最高,可靠性也最差卧抗。
(3)消費(fèi)者端:先提交位移藤滥,但是消息還沒消費(fèi)完就宕機(jī)了,造成了消息沒有被消費(fèi)社裆。自動位移提交同理acks沒有設(shè)置為all拙绊。如果在broker還沒把消息同步到其他broker的時候宕機(jī)了,那么消息將會丟失泳秀。