1. Kafka簡(jiǎn)介
Kafka是一種分布式的,基于發(fā)布/訂閱的消息系統(tǒng)芥挣。主要設(shè)計(jì)目標(biāo)如下:
- 以時(shí)間復(fù)雜度為O(1)的方式提供消息持久化能力篇恒,即使對(duì)TB級(jí)以上數(shù)據(jù)也能保證常數(shù)時(shí)間復(fù)雜度的訪問(wèn)性能
- 高吞吐率道伟。即使在非常廉價(jià)的商用機(jī)器上也能做到單機(jī)支持每秒100K條以上消息的傳輸
- 支持Kafka Server間的消息分區(qū)媒咳,及分布式消費(fèi)粹排,同時(shí)保證每個(gè)Partition內(nèi)的消息順序傳輸
- 同時(shí)支持離線數(shù)據(jù)處理和實(shí)時(shí)數(shù)據(jù)處理
- Scale out:支持在線水平擴(kuò)展
2. Kafka架構(gòu)
Broker:Kafka集群包含一個(gè)或多個(gè)服務(wù)器,這種服務(wù)器被稱(chēng)為broker
Topic:每條發(fā)布到Kafka集群的消息都有一個(gè)類(lèi)別涩澡,這個(gè)類(lèi)別被稱(chēng)為T(mén)opic顽耳。(物理上不同Topic的消息分開(kāi)存儲(chǔ),邏輯上一個(gè)Topic的消息雖然保存于一個(gè)或多個(gè)broker上但用戶(hù)只需指定消息的Topic即可生產(chǎn)或消費(fèi)數(shù)據(jù)而不必關(guān)心數(shù)據(jù)存于何處)
Partition:Parition是物理上的概念妙同,每個(gè)Topic包含一個(gè)或多個(gè)Partition.
Producer:負(fù)責(zé)發(fā)布消息到Kafka broker
Consumer:消息消費(fèi)者斧抱,向Kafka broker讀取消息的客戶(hù)端。
Consumer Group:每個(gè)Consumer屬于一個(gè)特定的Consumer Group(可為每個(gè)Consumer指定group name渐溶,若不指定group name則屬于默認(rèn)的group)。
3. Kafka拓?fù)浣Y(jié)構(gòu)
如上圖所示弄抬,一個(gè)典型的Kafka集群中包含若干Producer(可以是web前端產(chǎn)生的Page View茎辐,或者是服務(wù)器日志,系統(tǒng)CPU掂恕、Memory等)拖陆,若干broker(Kafka支持水平擴(kuò)展,一般broker數(shù)量越多懊亡,集群吞吐率越高)依啰,若干Consumer Group,以及一個(gè)Zookeeper集群店枣。Kafka通過(guò)Zookeeper管理集群配置速警,選舉leader,以及在Consumer Group發(fā)生變化時(shí)進(jìn)行rebalance鸯两。Producer使用push模式將消息發(fā)布到broker闷旧,Consumer使用pull模式從broker訂閱并消費(fèi)消息。
4. Delivery Guarantee
有這么幾種可能的delivery guarantee:
-
At most once
消息可能會(huì)丟钧唐,但絕不會(huì)重復(fù)傳輸 -
At least one
消息絕不會(huì)丟忙灼,但可能會(huì)重復(fù)傳輸 -
Exactly once
每條消息肯定會(huì)被傳輸一次且僅傳輸一次,很多時(shí)候這是用戶(hù)所想要的。
5. Kafka HA設(shè)計(jì)解析
5.1 Replica復(fù)制算法
? 為了更好的做負(fù)載均衡该园,Kafka盡量將所有的Partition均勻分配到整個(gè)集群上酸舍。一個(gè)典型的部署方式是一個(gè)Topic的Partition數(shù)量大于Broker的數(shù)量。同時(shí)為了提高Kafka的容錯(cuò)能力里初,也需要將同一個(gè)Partition的Replica盡量分散到不同的機(jī)器啃勉。實(shí)際上,如果所有的Replica都在同一個(gè)Broker上青瀑,那一旦該Broker宕機(jī)璧亮,該P(yáng)artition的所有Replica都無(wú)法工作,也就達(dá)不到HA的效果斥难。同時(shí)枝嘶,如果某個(gè)Broker宕機(jī)了,需要保證它上面的負(fù)載可以被均勻的分配到其它幸存的所有Broker上哑诊。
- 將所有Broker(假設(shè)共n個(gè)Broker)和待分配的Partition排序
- 將第i個(gè)Partition分配到第(i mod n)個(gè)Broker上
- 將第i個(gè)Partition的第j個(gè)Replica分配到第((i + j) mod n)個(gè)Broker上
5.2 Broker活著的判定
kafka判定broker是否活著群扶,通過(guò)以下2個(gè)方式:
- 和zk的session沒(méi)有斷(通過(guò)心跳來(lái)維系)
- follower能及時(shí)將leader消息復(fù)制過(guò)來(lái),不能落后太多(例如默認(rèn)lag超過(guò)4000就會(huì)踢出ISR)
5.3 所有Replica都不工作的情況
如果所有副本都出問(wèn)題镀裤,一般有兩種選擇:
- 等待ISR中的任一個(gè)Replica“活”過(guò)來(lái)竞阐,并且選它作為L(zhǎng)eader(一致性好,但是可用性差)
- 選擇第一個(gè)“活”過(guò)來(lái)的Replica(不一定是ISR中的)作為L(zhǎng)eader(一致性差暑劝,但是可用性相比第一種方式好)
5.4 Propagate消息
? Producer在發(fā)布消息到某個(gè)Partition時(shí)骆莹,先通過(guò) Metadata (通過(guò) Broker 獲取并且緩存在 Producer 內(nèi)) 找到該 Partition 的Leader,然后無(wú)論該Topic的Replication Factor為多少(也即該P(yáng)artition有多少個(gè)Replica)担猛,Producer只將該消息發(fā)送到該P(yáng)artition的Leader幕垦。Leader會(huì)將該消息寫(xiě)入其本地Log。每個(gè)Follower都從Leader pull數(shù)據(jù)傅联。這種方式上先改,F(xiàn)ollower存儲(chǔ)的數(shù)據(jù)順序與Leader保持一致。Follower在收到該消息并寫(xiě)入其Log后蒸走,向Leader發(fā)送ACK仇奶。一旦Leader收到了ISR中的所有Replica的ACK,該消息就被認(rèn)為已經(jīng)commit了比驻,Leader將增加HW并且向Producer發(fā)送ACK该溯。
為了提高性能,每個(gè)Follower在接收到數(shù)據(jù)后就立馬向Leader發(fā)送ACK嫁艇,而非等到數(shù)據(jù)寫(xiě)入Log中朗伶。因此,對(duì)于已經(jīng)commit的消息步咪,Kafka只能保證它被存于多個(gè)Replica的內(nèi)存中论皆,而不能保證它們被持久化到磁盤(pán)中,也就不能完全保證異常發(fā)生后該條消息一定能被Consumer消費(fèi)。但考慮到這種場(chǎng)景非常少見(jiàn)点晴,可以認(rèn)為這種方式在性能和數(shù)據(jù)持久化上做了一個(gè)比較好的平衡感凤。
5.5 ACK前需要保證有多少個(gè)備份
? 和大部分分布式系統(tǒng)一樣,Kafka處理失敗需要明確定義一個(gè)Broker是否“活著”粒督。對(duì)于Kafka而言陪竿,Kafka存活包含兩個(gè)條件,一是它必須維護(hù)與Zookeeper的session(這個(gè)通過(guò)Zookeeper的Heartbeat機(jī)制來(lái)實(shí)現(xiàn))屠橄。二是Follower必須能夠及時(shí)將Leader的消息復(fù)制過(guò)來(lái)族跛,不能“落后太多”。
Leader會(huì)跟蹤與其保持同步的Replica列表锐墙,該列表稱(chēng)為ISR(即in-sync Replica)礁哄。如果一個(gè)Follower宕機(jī),或者落后太多溪北,Leader將把它從ISR中移除桐绒。這里所描述的“落后太多”指Follower復(fù)制的消息落后于Leader后的條數(shù)超過(guò)預(yù)定值(該值可在KAFKA_HOME/config/server.properties中通過(guò)
replica.lag.time.max.ms
來(lái)配置之拨,其默認(rèn)值是10000)未向Leader發(fā)送fetch請(qǐng)求茉继。。
Kafka的復(fù)制機(jī)制既不是完全的同步復(fù)制蚀乔,也不是單純的異步復(fù)制烁竭。事實(shí)上,同步復(fù)制要求所有能工作的Follower都復(fù)制完吉挣,這條消息才會(huì)被認(rèn)為commit颖变,這種復(fù)制方式極大的影響了吞吐率(高吞吐率是Kafka非常重要的一個(gè)特性)。而異步復(fù)制方式下听想,F(xiàn)ollower異步的從Leader復(fù)制數(shù)據(jù),數(shù)據(jù)只要被Leader寫(xiě)入log就被認(rèn)為已經(jīng)commit马胧,這種情況下如果Follower都復(fù)制完都落后于Leader汉买,而如果Leader突然宕機(jī),則會(huì)丟失數(shù)據(jù)佩脊。而Kafka的這種使用ISR的方式則很好的均衡了確保數(shù)據(jù)不丟失以及吞吐率蛙粘。Follower可以批量的從Leader復(fù)制數(shù)據(jù),這樣極大的提高復(fù)制性能(批量寫(xiě)磁盤(pán))威彰,極大減少了Follower與Leader的差距出牧。
6. 分區(qū)Leader選舉方法
一般比較容易想到的一個(gè)方法是:所有Follower都在Zookeeper上設(shè)置一個(gè)Watch,一旦Leader宕機(jī)歇盼,其對(duì)應(yīng)的ephemeral znode會(huì)自動(dòng)刪除舔痕,此時(shí)所有Follower都嘗試創(chuàng)建該節(jié)點(diǎn),而創(chuàng)建成功者(Zookeeper保證只有一個(gè)能創(chuàng)建成功)即是新的Leader,其它Replica即為Follower伯复。
該方法會(huì)存在3個(gè)問(wèn)題:
- split-brain 這是由Zookeeper的特性引起的慨代,雖然Zookeeper能保證所有Watch按順序觸發(fā),但并不能保證同一時(shí)刻所有Replica“看”到的狀態(tài)是一樣的啸如,這就可能造成不同Replica的響應(yīng)不一致
- herd effect 如果宕機(jī)的那個(gè)Broker上的Partition比較多侍匙,會(huì)造成多個(gè)Watch被觸發(fā),造成集群內(nèi)大量的調(diào)整
- Zookeeper負(fù)載過(guò)重 每個(gè)Replica都要為此在Zookeeper上注冊(cè)一個(gè)Watch叮雳,當(dāng)集群規(guī)模增加到幾千個(gè)Partition時(shí)Zookeeper負(fù)載會(huì)過(guò)重想暗。
改進(jìn)的方法——所有broker中選出一個(gè)controller,所有Partition的Leader選舉都由controller決定帘不。controller會(huì)將Leader的改變直接通過(guò)RPC的方式(比Zookeeper Queue的方式更高效)通知需為此作出響應(yīng)的Broker说莫。同時(shí)controller也負(fù)責(zé)增刪Topic以及Replica的重新分配。
7. 各組件Failover過(guò)程
Broker failover過(guò)程
- Controller在Zookeeper注冊(cè)Watch厌均,一旦有Broker宕機(jī)(這是用宕機(jī)代表任何讓系統(tǒng)認(rèn)為其die的情景唬滑,包括但不限于機(jī)器斷電,網(wǎng)絡(luò)不可用棺弊,GC導(dǎo)致的Stop The World晶密,進(jìn)程crash等),其在Zookeeper對(duì)應(yīng)的znode會(huì)自動(dòng)被刪除模她,Zookeeper會(huì)fire Controller注冊(cè)的watch稻艰,Controller讀取最新的幸存的Broker
- Controller決定set_p,該集合包含了宕機(jī)的所有Broker上的所有Partition
- 對(duì)set_p中的每一個(gè)Partition
3.1 從/brokers/topics/[topic]/partitions/[partition]/state讀取該P(yáng)artition當(dāng)前的ISR
? 3.2 決定該P(yáng)artition的新Leader侈净。如果當(dāng)前ISR中有至少一個(gè)Replica還幸存尊勿,則選擇其中一個(gè)作為新Leader,新的ISR則包含當(dāng)前ISR中所有幸存的Replica畜侦。否則選擇該P(yáng)artition中任意一個(gè)幸存的Replica作為新的Leader以及ISR(該場(chǎng)景下可能會(huì)有潛在的數(shù)據(jù)丟失)元扔。如果該P(yáng)artition的所有Replica都宕機(jī)了,則將新的Leader設(shè)置為-1旋膳。
3.3 將新的Leader澎语,ISR和新的leader_epoch及controller_epoch寫(xiě)入/brokers/topics/[topic]/partitions/[partition]/state。注意验懊,該操作只有其version在3.1至3.3的過(guò)程中無(wú)變化時(shí)才會(huì)執(zhí)行擅羞,否則跳轉(zhuǎn)到3.1
- 直接通過(guò)RPC向set_p相關(guān)的Broker發(fā)送LeaderAndISRRequest命令。Controller可以在一個(gè)RPC操作中發(fā)送多個(gè)命令從而提高效率义图。
Broker failover順序圖如下所示减俏。
Controller failure過(guò)程
- Controller在Zookeeper的
/brokers/ids
節(jié)點(diǎn)上注冊(cè)Watch。一旦有Broker宕機(jī)(本文用宕機(jī)代表任何讓Kafka認(rèn)為其Broker die的情景碱工,包括但不限于機(jī)器斷電娃承,網(wǎng)絡(luò)不可用奏夫,GC導(dǎo)致的Stop The World,進(jìn)程crash等)草慧,其在Zookeeper對(duì)應(yīng)的Znode會(huì)自動(dòng)被刪除桶蛔,Zookeeper會(huì)fire Controller注冊(cè)的Watch,Controller即可獲取最新的幸存的Broker列表漫谷。 - Controller決定set_p仔雷,該集合包含了宕機(jī)的所有Broker上的所有Partition。
- 對(duì)set_p中的每一個(gè)Partition:
3.1 從/brokers/topics/[topic]/partitions/[partition]/state
讀取該P(yáng)artition當(dāng)前的ISR舔示。
3.2 決定該P(yáng)artition的新Leader碟婆。如果當(dāng)前ISR中有至少一個(gè)Replica還幸存,則選擇其中一個(gè)作為新Leader惕稻,新的ISR則包含當(dāng)前ISR中所有幸存的Replica竖共。否則選擇該P(yáng)artition中任意一個(gè)幸存的Replica作為新的Leader以及ISR(該場(chǎng)景下可能會(huì)有潛在的數(shù)據(jù)丟失)。如果該P(yáng)artition的所有Replica都宕機(jī)了俺祠,則將新的Leader設(shè)置為-1公给。
3.3 將新的Leader,ISR和新的leader_epoch
及controller_epoch
寫(xiě)入/brokers/topics/[topic]/partitions/[partition]/state
蜘渣。注意淌铐,該操作只有Controller版本在3.1至3.3的過(guò)程中無(wú)變化時(shí)才會(huì)執(zhí)行,否則跳轉(zhuǎn)到3.1蔫缸。 - 直接通過(guò)RPC向set_p相關(guān)的Broker發(fā)送LeaderAndISRRequest命令腿准。Controller可以在一個(gè)RPC操作中發(fā)送多個(gè)命令從而提高效率。
Broker failover順序圖如下所示拾碌。
Partition重新分配
管理工具發(fā)出重新分配Partition請(qǐng)求后吐葱,會(huì)將相應(yīng)信息寫(xiě)到/admin/reassign_partitions
上,而該操作會(huì)觸發(fā)ReassignedPartitionsIsrChangeListener校翔,從而通過(guò)執(zhí)行回調(diào)函數(shù)KafkaController.onPartitionReassignment來(lái)完成以下操作:
- 將Zookeeper中的AR(Current Assigned Replicas)更新為OAR(Original list of replicas for partition) + RAR(Reassigned replicas)弟跑。
- 強(qiáng)制更新Zookeeper中的leader epoch,向AR中的每個(gè)Replica發(fā)送LeaderAndIsrRequest防症。
- 將RAR - OAR中的Replica設(shè)置為NewReplica狀態(tài)窖认。
- 等待直到RAR中所有的Replica都與其Leader同步。
- 將RAR中所有的Replica都設(shè)置為OnlineReplica狀態(tài)告希。
- 將Cache中的AR設(shè)置為RAR。
- 若Leader不在RAR中烧给,則從RAR中重新選舉出一個(gè)新的Leader并發(fā)送LeaderAndIsrRequest燕偶。若新的Leader不是從RAR中選舉而出,則還要增加Zookeeper中的leader epoch础嫡。
- 將OAR - RAR中的所有Replica設(shè)置為OfflineReplica狀態(tài)指么,該過(guò)程包含兩部分酝惧。第一,將Zookeeper上ISR中的OAR - RAR移除并向Leader發(fā)送LeaderAndIsrRequest從而通知這些Replica已經(jīng)從ISR中移除伯诬;第二晚唇,向OAR - RAR中的Replica發(fā)送StopReplicaRequest從而停止不再分配給該P(yáng)artition的Replica。
- 將OAR - RAR中的所有Replica設(shè)置為NonExistentReplica狀態(tài)從而將其從磁盤(pán)上刪除盗似。
- 將Zookeeper中的AR設(shè)置為RAR哩陕。
- 刪除
/admin/reassign_partition
。
參考文章
如果大家喜歡我的文章赫舒,可以關(guān)注個(gè)人訂閱號(hào)悍及。歡迎隨時(shí)留言、交流接癌。