Kafka集群中的controller, rebalance, HW
1. controller
集群中誰(shuí)來(lái)充當(dāng)controller
每個(gè)broker啟動(dòng)時(shí)會(huì)向zk創(chuàng)建一個(gè)臨時(shí)序號(hào)節(jié)點(diǎn),獲得的序號(hào)最小的那個(gè)broker將會(huì)作為集群中的controller濒旦,負(fù)責(zé)幾件事:
?當(dāng)集群中有一個(gè)副本的leader掛掉痹兜,需要在集群中選舉出一個(gè)新的leader,選舉的規(guī)則是從isr集合中最左邊獲得耕陷。
?當(dāng)集群中有broker新增或減少,controller會(huì)同步信息給其他broker
?當(dāng)集群中有分區(qū)新增或減少据沈,controller會(huì)同步信息給其他broker
2. rebalance機(jī)制
?前提:消費(fèi)組中的消費(fèi)者沒有指明分區(qū)來(lái)消費(fèi)
?觸發(fā)的條件:當(dāng)消費(fèi)組中的消費(fèi)者和分區(qū)的關(guān)系發(fā)生變化的時(shí)候
?分區(qū)分配的策略:在rebalance之前哟沫,分區(qū)怎么分配會(huì)有三種策略
? ???????range:根據(jù)公式計(jì)算得到每個(gè)消費(fèi)者消費(fèi)哪幾個(gè)區(qū)域:
????????????????前面的消費(fèi)者是:(分區(qū)總數(shù) / 消費(fèi)者數(shù)量)+1
? ? ? ? ? ? ????之后的消費(fèi)者是:分區(qū)總數(shù) / 消費(fèi)者數(shù)量
? ???????輪詢:大家輪著來(lái)
????? ???sticky:粘合策略,如果需要rebalance锌介,會(huì)在之前已分配的基礎(chǔ)上進(jìn)行調(diào)整嗜诀,不會(huì)改變之前的分配情況。如果這個(gè)策略沒有開孔祸,那么就要進(jìn)行全部的重新分配隆敢,建議開始。
3. HW和LEO
LEO是某個(gè)副本最后消息的消息位置(log-end-offset)
HW是已完成同步的位置崔慧。消息在寫入broker時(shí)拂蝎,且每個(gè)broker完成這條消息的同步后,hw才會(huì)變化惶室。在這之前消費(fèi)者是消費(fèi)不到這條消息的温自。在同步完成之后玄货,HW更新之后,消費(fèi)者才能消費(fèi)到這條消息悼泌,這樣的目的是防止消息的丟失松捉。
還有一種情況,如果broker0在更新完最新消息馆里,且同步給了broker1惩坑,消費(fèi)者已經(jīng)讀完最新消息,如果這時(shí)broker0掛掉也拜,那么broker1會(huì)成為新的leader以舒,這是最新消息會(huì)被消費(fèi)者再次讀取,造成了消費(fèi)的重復(fù)讀慢哈。
Kafka中的優(yōu)化問(wèn)題
1. 如何防止消息丟失
?生產(chǎn)者:1) 使用同步發(fā)送? 2) 把a(bǔ)ck設(shè)成1或者all蔓钟,并且設(shè)置同步的分區(qū)數(shù)>=2
?消費(fèi)者:把自動(dòng)提交改成手動(dòng)提交
2. 如何防止重復(fù)消費(fèi)
在防止消息丟失的方案中,如果生產(chǎn)者發(fā)送完消息后卵贱,因?yàn)榫W(wǎng)絡(luò)抖動(dòng)滥沫,沒有收到ack,但實(shí)際上broker已經(jīng)收到了键俱。此時(shí)生產(chǎn)者會(huì)進(jìn)行重試兰绣,于是broker就會(huì)收到多條相同的消息,而造成消費(fèi)者的重復(fù)消費(fèi)编振。
怎么解決:
?生產(chǎn)者關(guān)閉重試:會(huì)造成丟失消息(不建議)
?消費(fèi)者解決非冪等性消費(fèi)問(wèn)題:
所謂冪等性:多次訪問(wèn)的結(jié)果是一樣的缀辩。對(duì)于restful的請(qǐng)求(get(冪等), post(非冪等), put(冪等), delete(冪等))
解決方案:
? ???在數(shù)據(jù)庫(kù)中創(chuàng)建聯(lián)合索引,防止相同的主鍵創(chuàng)建出多條記錄
? ???使用分布式鎖踪央,以業(yè)務(wù)id為鎖臀玄,保證只有一條記錄能夠創(chuàng)建成功
3. 如何做到消息的順序消費(fèi)
?生產(chǎn)者:保證消息按順序消費(fèi),且消息不丟失 -- 使用同步的發(fā)送畅蹂,ack設(shè)置成非0的值
?消費(fèi)者:主題只能設(shè)置一個(gè)分區(qū)健无,消費(fèi)組中只能有一個(gè)消費(fèi)者
kafka的順序消費(fèi)使用場(chǎng)景不多,因?yàn)闋奚袅诵阅芤盒保潜热鏡ocketMQ在這一塊有專門的功能已設(shè)計(jì)好累贤。
4. 如何解決消息積壓?jiǎn)栴}
1) 消息積壓?jiǎn)栴}的出現(xiàn)
消息的消費(fèi)者的消費(fèi)速度,遠(yuǎn)趕不上生產(chǎn)者的生產(chǎn)消息的速度少漆,導(dǎo)致kafka中有大量的數(shù)據(jù)沒有被消費(fèi)臼膏。隨著沒有被消費(fèi)的數(shù)據(jù)堆積越多,消費(fèi)者尋址的性能會(huì)越來(lái)越差检疫,最后導(dǎo)致整個(gè)kafka對(duì)外提供的服務(wù)的性能很差讶请,從而造成其他服務(wù)也訪問(wèn)速度變慢祷嘶,造成服務(wù)雪崩屎媳。
2) 消息積壓的解決方案
?在這個(gè)消費(fèi)者中夺溢,使用多線程,充分利用機(jī)器的性能進(jìn)行消息消費(fèi)
?通過(guò)業(yè)務(wù)的架構(gòu)設(shè)計(jì)烛谊,提升業(yè)務(wù)層面消費(fèi)的性能
?創(chuàng)建多個(gè)消費(fèi)組风响,多個(gè)消費(fèi)者,部署到其他機(jī)器上丹禀,一起消費(fèi)状勤,提高消費(fèi)者的消費(fèi)速度
?創(chuàng)建一個(gè)消費(fèi)者,該消費(fèi)者在kafka另建一個(gè)主題双泪,配上多個(gè)分區(qū)持搜,多個(gè)分區(qū)再配上多個(gè)消費(fèi)者。該消費(fèi)者將poll下來(lái)的消息焙矛,不進(jìn)行消費(fèi)葫盼,直接轉(zhuǎn)發(fā)到新建的主題上。此時(shí)村斟,新的主題的多個(gè)分區(qū)的多個(gè)消費(fèi)者就開始一起消費(fèi)了贫导。--不常用
5. 實(shí)現(xiàn)延時(shí)隊(duì)列的效果
1) 應(yīng)用場(chǎng)景
訂單創(chuàng)建后,超過(guò)30分鐘沒有支付蟆盹,則需要取消訂單孩灯,這種場(chǎng)景可以通過(guò)延時(shí)隊(duì)列來(lái)實(shí)現(xiàn)
2) 具體方案
?kafka中創(chuàng)建相應(yīng)的主題
?消費(fèi)者消費(fèi)該主題的消息(輪詢)
?消費(fèi)者消費(fèi)消息時(shí)判斷消息的創(chuàng)建時(shí)間和當(dāng)前時(shí)間是否超過(guò)30分鐘(前提是訂單沒支付)
? ???如果是:去數(shù)據(jù)庫(kù)中修改訂單狀態(tài)為已取消
? ???如果否:記錄當(dāng)前消息的offset,并不再繼續(xù)消費(fèi)之后的消息逾滥,等待1分鐘之后峰档,再次向kafka拉取該offset及之后的消息,繼續(xù)進(jìn)行判斷寨昙,以此反復(fù)面哥。
6. 搭建kafka-wagle監(jiān)控平臺(tái)
?去kafka-eagle官網(wǎng)下載壓縮包
Download - EFAK (kafka-eagle.org)
?分配一臺(tái)虛擬機(jī)
?虛擬機(jī)中安裝jdk
?解壓縮kafka-eagle的壓縮包
?給kafka-eagle配置環(huán)境變量
?需要修改kafka-eagle內(nèi)部的配置文件:vim system-config.properties
修改里面的zk的地址和mysql的地址
?進(jìn)入到bin中,通過(guò)命令來(lái)啟動(dòng)