1 概述
Kakfa起初是由LinkedIn公司開發(fā)的一個分布式的消息系統(tǒng)杀饵,后成為Apache的一部分油讯,它使用Scala編寫塔拳,以可水平擴展和高吞吐率而被廣泛使用欠橘。目前越來越多的開源分布式處理系統(tǒng)如Cloudera、Apache Storm赞季、Spark等都支持與Kafka集成愧捕。
Kafka憑借著自身的優(yōu)勢,越來越受到互聯(lián)網(wǎng)企業(yè)的青睞申钩,唯品會也采用Kafka作為其內(nèi)部核心消息引擎之一次绘。Kafka作為一個商業(yè)級消息中間件,消息可靠性的重要性可想而知撒遣。如何確保消息的精確傳輸邮偎?如何確保消息的準確存儲?如何確保消息的正確消費义黎?這些都是需要考慮的問題钢猛。本文首先從Kafka的架構(gòu)著手,先了解下Kafka的基本原理轩缤,然后通過對kakfa的存儲機制、復制原理贩绕、同步原理火的、可靠性和持久性保證等等一步步對其可靠性進行分析,最后通過benchmark來增強對Kafka高可靠性的認知淑倾。
2 Kafka體系架構(gòu)
如上圖所示馏鹤,一個典型的Kafka體系架構(gòu)包括若干Producer(可以是服務器日志,業(yè)務數(shù)據(jù)娇哆,頁面前端產(chǎn)生的page view等等)湃累,若干broker(Kafka支持水平擴展勃救,一般broker數(shù)量越多,集群吞吐率越高)治力,若干Consumer (Group)蒙秒,以及一個Zookeeper集群。Kafka通過Zookeeper管理集群配置宵统,選舉leader晕讲,以及在consumer group發(fā)生變化時進行rebalance。Producer使用push(推)模式將消息發(fā)布到broker马澈,Consumer使用pull(拉)模式從broker訂閱并消費消息瓢省。
名詞解釋:
2.1 Topic & Partition
一個topic可以認為一個一類消息,每個topic將被分成多個partition痊班,每個partition在存儲層面是append log文件勤婚。任何發(fā)布到此partition的消息都會被追加到log文件的尾部,每條消息在文件中的位置稱為offset(偏移量)涤伐,offset為一個long型的數(shù)字馒胆,它唯一標記一條消息。每條消息都被append到partition中废亭,是順序?qū)懘疟P国章,因此效率非常高(經(jīng)驗證,順序?qū)懘疟P效率比隨機寫內(nèi)存還要高豆村,這是Kafka高吞吐率的一個很重要的保證)液兽。
每一條消息被發(fā)送到broker中,會根據(jù)partition規(guī)則選擇被存儲到哪一個partition掌动。如果partition規(guī)則設置的合理四啰,所有消息可以均勻分布到不同的partition里,這樣就實現(xiàn)了水平擴展粗恢。(如果一個topic對應一個文件柑晒,那這個文件所在的機器I/O將會成為這個topic的性能瓶頸,而partition解決了這個問題)眷射。在創(chuàng)建topic時可以在$KAFKA_HOME/config/server.properties中指定這個partition的數(shù)量(如下所示)匙赞,當然可以在topic創(chuàng)建之后去修改partition的數(shù)量。
1
2
3
4
# The?default?number of log partitions per topic. More partitions allow greater
# parallelism?for?consumption, but?this?will also result in more files across
# the brokers.
num.partitions=3
在發(fā)送一條消息時妖碉,可以指定這個消息的key涌庭,producer根據(jù)這個key和partition機制來判斷這個消息發(fā)送到哪個partition。partition機制可以通過指定producer的partition.class這一參數(shù)來指定,該class必須實現(xiàn)kafka.producer.Partitioner接口。
有關Topic與Partition的更多細節(jié)拗馒,可以參考下面的“Kafka文件存儲機制”這一節(jié)。
3 高可靠性存儲分析
Kafka的高可靠性的保障來源于其健壯的副本(replication)策略席镀。通過調(diào)節(jié)其副本相關參數(shù)匹中,可以使得Kafka在性能和可靠性之間運轉(zhuǎn)的游刃有余。Kafka從0.8.x版本開始提供partition級別的復制,replication的數(shù)量可以在$KAFKA_HOME/config/server.properties中配置(default.replication.refactor)豪诲。
這里先從Kafka文件存儲機制入手顶捷,從最底層了解Kafka的存儲細節(jié),進而對其的存儲有個微觀的認知跛溉。之后通過Kafka復制原理和同步方式來闡述宏觀層面的概念焊切。最后從ISR,HW芳室,leader選舉以及數(shù)據(jù)可靠性和持久性保證等等各個維度來豐富對Kafka相關知識點的認知专肪。
3.1 Kafka文件存儲機制
Kafka中消息是以topic進行分類的,生產(chǎn)者通過topic向Kafka broker發(fā)送消息堪侯,消費者通過topic讀取數(shù)據(jù)嚎尤。然而topic在物理層面又能以partition為分組,一個topic可以分成若干個partition伍宦,那么topic以及partition又是怎么存儲的呢芽死?partition還可以細分為segment,一個partition物理上由多個segment組成次洼,那么這些segment又是什么呢关贵?下面我們來一一揭曉。
為了便于說明問題卖毁,假設這里只有一個Kafka集群揖曾,且這個集群只有一個Kafka broker,即只有一臺物理機亥啦。在這個Kafka broker中配置($KAFKA_HOME/config/server.properties中)log.dirs=/tmp/kafka-logs炭剪,以此來設置Kafka消息文件存儲目錄,與此同時創(chuàng)建一個topic:topic_zzh_test翔脱,partition的數(shù)量為4($KAFKA_HOME/bin/kafka-topics.sh –create –zookeeper localhost:2181 –partitions 4 –topic topic_vms_test –replication-factor 4)奴拦。那么我們此時可以在/tmp/kafka-logs目錄中可以看到生成了4個目錄:
1
2
3
4
drwxr-xr-x?2?root root?4096?Apr?10?16:10?topic_zzh_test-0
drwxr-xr-x?2?root root?4096?Apr?10?16:10?topic_zzh_test-1
drwxr-xr-x?2?root root?4096?Apr?10?16:10?topic_zzh_test-2
drwxr-xr-x?2?root root?4096?Apr?10?16:10?topic_zzh_test-3
在Kafka文件存儲中,同一個topic下有多個不同的partition届吁,每個partiton為一個目錄错妖,partition的名稱規(guī)則為:topic名稱+有序序號,第一個序號從0開始計疚沐,最大的序號為partition數(shù)量減1暂氯,partition是實際物理上的概念,而topic是邏輯上的概念濒旦。
上面提到partition還可以細分為segment,這個segment又是什么再登?如果就以partition為最小存儲單位尔邓,我們可以想象當Kafka producer不斷發(fā)送消息晾剖,必然會引起partition文件的無限擴張,這樣對于消息文件的維護以及已經(jīng)被消費的消息的清理帶來嚴重的影響梯嗽,所以這里以segment為單位又將partition細分齿尽。每個partition(目錄)相當于一個巨型文件被平均分配到多個大小相等的segment(段)數(shù)據(jù)文件中(每個segment 文件中消息數(shù)量不一定相等)這種特性也方便old segment的刪除,即方便已被消費的消息的清理灯节,提高磁盤的利用率循头。每個partition只需要支持順序讀寫就行,segment的文件生命周期由服務端配置參數(shù)(log.segment.bytes炎疆,log.roll.{ms,hours}等若干參數(shù))決定卡骂。
segment文件由兩部分組成,分別為“.index”文件和“.log”文件形入,分別表示為segment索引文件和數(shù)據(jù)文件全跨。這兩個文件的命令規(guī)則為:partition全局的第一個segment從0開始,后續(xù)每個segment文件名為上一個segment文件最后一條消息的offset值亿遂,數(shù)值大小為64位浓若,20位數(shù)字字符長度,沒有數(shù)字用0填充蛇数,如下:
1
2
3
4
5
6
00000000000000000000.index
00000000000000000000.log
00000000000000170410.index
00000000000000170410.log
00000000000000239430.index
00000000000000239430.log
以上面的segment文件為例挪钓,展示出segment:00000000000000170410的“.index”文件和“.log”文件的對應的關系,如下圖:
如上圖耳舅,“.index”索引文件存儲大量的元數(shù)據(jù)碌上,“.log”數(shù)據(jù)文件存儲大量的消息,索引文件中的元數(shù)據(jù)指向?qū)獢?shù)據(jù)文件中message的物理偏移地址挽放。其中以“.index”索引文件中的元數(shù)據(jù)[3, 348]為例绍赛,在“.log”數(shù)據(jù)文件表示第3個消息,即在全局partition中表示170410+3=170413個消息辑畦,該消息的物理偏移地址為348吗蚌。
那么如何從partition中通過offset查找message呢?
以上圖為例纯出,讀取offset=170418的消息蚯妇,首先查找segment文件,其中00000000000000000000.index為最開始的文件暂筝,第二個文件為00000000000000170410.index(起始偏移為170410+1=170411)箩言,而第三個文件為00000000000000239430.index(起始偏移為239430+1=239431),所以這個offset=170418就落到了第二個文件之中焕襟。其他后續(xù)文件可以依次類推陨收,以其實偏移量命名并排列這些文件,然后根據(jù)二分查找法就可以快速定位到具體文件位置。其次根據(jù)00000000000000170410.index文件中的[8,1325]定位到00000000000000170410.log文件中的1325的位置進行讀取务漩。
要是讀取offset=170418的消息拄衰,從00000000000000170410.log文件中的1325的位置進行讀取,那么怎么知道何時讀完本條消息饵骨,否則就讀到下一條消息的內(nèi)容了翘悉?
這個就需要聯(lián)系到消息的物理結(jié)構(gòu)了,消息都具有固定的物理結(jié)構(gòu)居触,包括:offset(8 Bytes)妖混、消息體的大小(4 Bytes)轮洋、crc32(4 Bytes)制市、magic(1 Byte)、attributes(1 Byte)砖瞧、key length(4 Bytes)息堂、key(K Bytes)、payload(N Bytes)等等字段块促,可以確定一條消息的大小荣堰,即讀取到哪里截止。
3.2 復制原理和同步方式
Kafka中topic的每個partition有一個預寫式的日志文件竭翠,雖然partition可以繼續(xù)細分為若干個segment文件振坚,但是對于上層應用來說可以將partition看成最小的存儲單元(一個有多個segment文件拼接的“巨型”文件),每個partition都由一些列有序的斋扰、不可變的消息組成渡八,這些消息被連續(xù)的追加到partition中。
上圖中有兩個新名詞:HW和LEO传货。這里先介紹下LEO屎鳍,LogEndOffset的縮寫,表示每個partition的log最后一條Message的位置问裕。HW是HighWatermark的縮寫逮壁,是指consumer能夠看到的此partition的位置,這個涉及到多副本的概念粮宛,這里先提及一下窥淆,下節(jié)再詳表。
言歸正傳巍杈,為了提高消息的可靠性忧饭,Kafka每個topic的partition有N個副本(replicas),其中N(大于等于1)是topic的復制因子(replica fator)的個數(shù)筷畦。Kafka通過多副本機制實現(xiàn)故障自動轉(zhuǎn)移词裤,當Kafka集群中一個broker失效情況下仍然保證服務可用。在Kafka中發(fā)生復制時確保partition的日志能有序地寫到其他節(jié)點上,N個replicas中吼砂,其中一個replica為leader作媚,其他都為follower, leader處理partition的所有讀寫請求,與此同時帅刊,follower會被動定期地去復制leader上的數(shù)據(jù)。
如下圖所示漂问,Kafka集群中有4個broker, 某topic有3個partition,且復制因子即副本個數(shù)也為3:
Kafka提供了數(shù)據(jù)復制算法保證赖瞒,如果leader發(fā)生故障或掛掉,一個新leader被選舉并被接受客戶端的消息成功寫入蚤假。Kafka確保從同步副本列表中選舉一個副本為leader栏饮,或者說follower追趕leader數(shù)據(jù)。leader負責維護和跟蹤ISR(In-Sync Replicas的縮寫磷仰,表示副本同步隊列袍嬉,具體可參考下節(jié))中所有follower滯后的狀態(tài)。當producer發(fā)送一條消息到broker后灶平,leader寫入消息并復制到所有follower伺通。消息提交之后才被成功復制到所有的同步副本。消息復制延遲受最慢的follower限制逢享,重要的是快速檢測慢副本罐监,如果follower“落后”太多或者失效,leader將會把它從ISR中刪除瞒爬。
3.3 ISR
上節(jié)我們涉及到ISR (In-Sync Replicas)弓柱,這個是指副本同步隊列。副本數(shù)對Kafka的吞吐率是有一定的影響侧但,但極大的增強了可用性矢空。默認情況下Kafka的replica數(shù)量為1,即每個partition都有一個唯一的leader禀横,為了確保消息的可靠性屁药,通常應用中將其值(由broker的參數(shù)offsets.topic.replication.factor指定)大小設置為大于1,比如3燕侠。 所有的副本(replicas)統(tǒng)稱為Assigned Replicas者祖,即AR。ISR是AR中的一個子集绢彤,由leader維護ISR列表七问,follower從leader同步數(shù)據(jù)有一些延遲(包括延遲時間replica.lag.time.max.ms和延遲條數(shù)replica.lag.max.messages兩個維度, 當前最新的版本0.10.x中只支持replica.lag.time.max.ms這個維度),任意一個超過閾值都會把follower剔除出ISR, 存入OSR(Outof-Sync Replicas)列表茫舶,新加入的follower也會先存放在OSR中械巡。AR=ISR+OSR。
Kafka 0.10.x版本后移除了replica.lag.max.messages參數(shù),只保留了replica.lag.time.max.ms作為ISR中副本管理的參數(shù)讥耗。為什么這樣做呢有勾?replica.lag.max.messages表示當前某個副本落后leaeder的消息數(shù)量超過了這個參數(shù)的值,那么leader就會把follower從ISR中刪除古程。假設設置replica.lag.max.messages=4蔼卡,那么如果producer一次傳送至broker的消息數(shù)量都小于4條時,因為在leader接受到producer發(fā)送的消息之后而follower副本開始拉取這些消息之前挣磨,follower落后leader的消息數(shù)不會超過4條消息雇逞,故此沒有follower移出ISR,所以這時候replica.lag.max.message的設置似乎是合理的茁裙。但是producer發(fā)起瞬時高峰流量塘砸,producer一次發(fā)送的消息超過4條時,也就是超過replica.lag.max.messages晤锥,此時follower都會被認為是與leader副本不同步了掉蔬,從而被踢出了ISR。但實際上這些follower都是存活狀態(tài)的且沒有性能問題矾瘾。那么在之后追上leader,并被重新加入了ISR女轿。于是就會出現(xiàn)它們不斷地剔出ISR然后重新回歸ISR,這無疑增加了無謂的性能損耗壕翩。而且這個參數(shù)是broker全局的谈喳。設置太大了,影響真正“落后”follower的移除戈泼;設置的太小了婿禽,導致follower的頻繁進出。無法給定一個合適的replica.lag.max.messages的值大猛,故此扭倾,新版本的Kafka移除了這個參數(shù)。
注:ISR中包括:leader和follower挽绩。
上面一節(jié)還涉及到一個概念膛壹,即HW。HW俗稱高水位唉堪,HighWatermark的縮寫模聋,取一個partition對應的ISR中最小的LEO作為HW,consumer最多只能消費到HW所在的位置唠亚。另外每個replica都有HW,leader和follower各自負責更新自己的HW的狀態(tài)链方。對于leader新寫入的消息,consumer不能立刻消費灶搜,leader會等待該消息被所有ISR中的replicas同步后更新HW祟蚀,此時消息才能被consumer消費工窍。這樣就保證了如果leader所在的broker失效,該消息仍然可以從新選舉的leader中獲取前酿。對于來自內(nèi)部broKer的讀取請求患雏,沒有HW的限制。
下圖詳細的說明了當producer生產(chǎn)消息至broker后罢维,ISR以及HW和LEO的流轉(zhuǎn)過程:
由此可見淹仑,Kafka的復制機制既不是完全的同步復制,也不是單純的異步復制肺孵。事實上攻人,同步復制要求所有能工作的follower都復制完,這條消息才會被commit悬槽,這種復制方式極大的影響了吞吐率。而異步復制方式下瞬浓,follower異步的從leader復制數(shù)據(jù)初婆,數(shù)據(jù)只要被leader寫入log就被認為已經(jīng)commit,這種情況下如果follower都還沒有復制完猿棉,落后于leader時磅叛,突然leader宕機,則會丟失數(shù)據(jù)萨赁。而Kafka的這種使用ISR的方式則很好的均衡了確保數(shù)據(jù)不丟失以及吞吐率弊琴。
Kafka的ISR的管理最終都會反饋到Zookeeper節(jié)點上。具體位置為:/brokers/topics/[topic]/partitions/[partition]/state杖爽。目前有兩個地方會對這個Zookeeper的節(jié)點進行維護:
Controller來維護:Kafka集群中的其中一個Broker會被選舉為Controller敲董,主要負責Partition管理和副本狀態(tài)管理,也會執(zhí)行類似于重分配partition之類的管理任務慰安。在符合某些特定條件下腋寨,Controller下的LeaderSelector會選舉新的leader,ISR和新的leader_epoch及controller_epoch寫入Zookeeper的相關節(jié)點中化焕。同時發(fā)起LeaderAndIsrRequest通知所有的replicas萄窜。
leader來維護:leader有單獨的線程定期檢測ISR中follower是否脫離ISR, 如果發(fā)現(xiàn)ISR變化,則會將新的ISR的信息返回到Zookeeper的相關節(jié)點中撒桨。
3.4 數(shù)據(jù)可靠性和持久性保證
當producer向leader發(fā)送數(shù)據(jù)時查刻,可以通過request.required.acks參數(shù)來設置數(shù)據(jù)可靠性的級別:
1(默認):這意味著producer在ISR中的leader已成功收到的數(shù)據(jù)并得到確認后發(fā)送下一條message。如果leader宕機了凤类,則會丟失數(shù)據(jù)穗泵。
0:這意味著producer無需等待來自broker的確認而繼續(xù)發(fā)送下一批消息。這種情況下數(shù)據(jù)傳輸效率最高谜疤,但是數(shù)據(jù)可靠性確是最低的火欧。
-1:producer需要等待ISR中的所有follower都確認接收到數(shù)據(jù)后才算一次發(fā)送完成棋电,可靠性最高。但是這樣也不能保證數(shù)據(jù)不丟失苇侵,比如當ISR中只有l(wèi)eader時(前面ISR那一節(jié)講到赶盔,ISR中的成員由于某些情況會增加也會減少,最少就只剩一個leader)榆浓,這樣就變成了acks=1的情況于未。
如果要提高數(shù)據(jù)的可靠性,在設置request.required.acks=-1的同時陡鹃,也要min.insync.replicas這個參數(shù)(可以在broker或者topic層面進行設置)的配合烘浦,這樣才能發(fā)揮最大的功效。min.insync.replicas這個參數(shù)設定ISR中的最小副本數(shù)是多少萍鲸,默認值為1闷叉,當且僅當request.required.acks參數(shù)設置為-1時,此參數(shù)才生效脊阴。如果ISR中的副本數(shù)少于min.insync.replicas配置的數(shù)量時握侧,客戶端會返回異常:org.apache.kafka.common.errors.NotEnoughReplicasExceptoin: Messages are rejected since there are fewer in-sync replicas than required。
接下來對acks=1和-1的兩種情況進行詳細分析:
1. request.required.acks=1
producer發(fā)送數(shù)據(jù)到leader嘿期,leader寫本地日志成功品擎,返回客戶端成功;此時ISR中的副本還沒有來得及拉取該消息备徐,leader就宕機了萄传,那么此次發(fā)送的消息就會丟失。
2. request.required.acks=-1
同步(Kafka默認為同步蜜猾,即producer.type=sync)的發(fā)送模式秀菱,replication.factor>=2且min.insync.replicas>=2的情況下,不會丟失數(shù)據(jù)蹭睡。
有兩種典型情況答朋。acks=-1的情況下(如無特殊說明,以下acks都表示為參數(shù)request.required.acks)棠笑,數(shù)據(jù)發(fā)送到leader, ISR的follower全部完成數(shù)據(jù)同步后梦碗,leader此時掛掉,那么會選舉出新的leader蓖救,數(shù)據(jù)不會丟失洪规。
acks=-1的情況下,數(shù)據(jù)發(fā)送到leader后 循捺,部分ISR的副本同步斩例,leader此時掛掉。比如follower1h和follower2都有可能變成新的leader, producer端會得到返回異常从橘,producer端會重新發(fā)送數(shù)據(jù)念赶,數(shù)據(jù)可能會重復础钠。
當然上圖中如果在leader crash的時候,follower2還沒有同步到任何數(shù)據(jù)叉谜,而且follower2被選舉為新的leader的話旗吁,這樣消息就不會重復。
注:Kafka只處理fail/recover問題,不處理Byzantine問題停局。
3.5 關于HW的進一步探討
考慮上圖(即acks=-1,部分ISR副本同步)中的另一種情況很钓,如果在Leader掛掉的時候,follower1同步了消息4,5董栽,follower2同步了消息4码倦,與此同時follower2被選舉為leader,那么此時follower1中的多出的消息5該做如何處理呢锭碳?
這里就需要HW的協(xié)同配合了袁稽。如前所述,一個partition中的ISR列表中擒抛,leader的HW是所有ISR列表里副本中最小的那個的LEO幢竹。類似于木桶原理拌蜘,水位取決于最低那塊短板绝页。
如上圖迄汛,某個topic的某partition有三個副本癣丧,分別為A槽畔、B、C胁编。A作為leader肯定是LEO最高厢钧,B緊隨其后,C機器由于配置比較低嬉橙,網(wǎng)絡比較差早直,故而同步最慢。這個時候A機器宕機市框,這時候如果B成為leader霞扬,假如沒有HW,在A重新恢復之后會做同步(makeFollower)操作枫振,在宕機時log文件之后直接做追加操作喻圃,而假如B的LEO已經(jīng)達到了A的LEO,會產(chǎn)生數(shù)據(jù)不一致的情況粪滤,所以使用HW來避免這種情況斧拍。
A在做同步操作的時候,先將log文件截斷到之前自己的HW的位置杖小,即3肆汹,之后再從B中拉取消息進行同步愚墓。
如果失敗的follower恢復過來,它首先將自己的log文件截斷到上次checkpointed時刻的HW的位置昂勉,之后再從leader中同步消息浪册。leader掛掉會重新選舉,新的leader會發(fā)送“指令”讓其余的follower截斷至自身的HW的位置然后再拉取新的消息硼啤。
當ISR中的個副本的LEO不一致時议经,如果此時leader掛掉,選舉新的leader時并不是按照LEO的高低進行選舉谴返,而是按照ISR中的順序選舉煞肾。
在此我向大家推薦一個架構(gòu)學習交流群。交流學習群號: 744642380嗓袱, 里面會分享一些資深架構(gòu)師錄制的視頻錄像:有Spring籍救,MyBatis,Netty源碼分析渠抹,高并發(fā)蝙昙、高性能、分布式梧却、微服務架構(gòu)的原理奇颠,JVM性能優(yōu)化、分布式架構(gòu)等這些成為架構(gòu)師必備的知識體系放航。還能領取免費的學習資源
一條消息只有被ISR中的所有follower都從leader復制過去才會被認為已提交烈拒。這樣就避免了部分數(shù)據(jù)被寫進了leader,還沒來得及被任何follower復制就宕機了广鳍,而造成數(shù)據(jù)丟失荆几。而對于producer而言,它可以選擇是否等待消息commit赊时,這可以通過request.required.acks來設置吨铸。這種機制確保了只要ISR中有一個或者以上的follower,一條被commit的消息就不會丟失祖秒。
有一個很重要的問題是當leader宕機了诞吱,怎樣在follower中選舉出新的leader,因為follower可能落后很多或者直接crash了竭缝,所以必須確保選擇“最新”的follower作為新的leader狐胎。一個基本的原則就是,如果leader不在了歌馍,新的leader必須擁有原來的leader commit的所有消息握巢。這就需要做一個折中,如果leader在表名一個消息被commit前等待更多的follower確認松却,那么在它掛掉之后就有更多的follower可以成為新的leader暴浦,但這也會造成吞吐率的下降溅话。
一種非常常用的選舉leader的方式是“少數(shù)服從多數(shù)”,Kafka并不是采用這種方式歌焦。這種模式下飞几,如果我們有2f+1個副本,那么在commit之前必須保證有f+1個replica復制完消息独撇,同時為了保證能正確選舉出新的leader屑墨,失敗的副本數(shù)不能超過f個。這種方式有個很大的優(yōu)勢纷铣,系統(tǒng)的延遲取決于最快的幾臺機器卵史,也就是說比如副本數(shù)為3,那么延遲就取決于最快的那個follower而不是最慢的那個搜立∫郧“少數(shù)服從多數(shù)”的方式也有一些劣勢,為了保證leader選舉的正常進行啄踊,它所能容忍的失敗的follower數(shù)比較少忧设,如果要容忍1個follower掛掉,那么至少要3個以上的副本颠通,如果要容忍2個follower掛掉址晕,必須要有5個以上的副本。也就是說顿锰,在生產(chǎn)環(huán)境下為了保證較高的容錯率谨垃,必須要有大量的副本,而大量的副本又會在大數(shù)據(jù)量下導致性能的急劇下降撵儿。這種算法更多用在Zookeeper這種共享集群配置的系統(tǒng)中而很少在需要大量數(shù)據(jù)的系統(tǒng)中使用的原因乘客。HDFS的HA功能也是基于“少數(shù)服從多數(shù)”的方式狐血,但是其數(shù)據(jù)存儲并不是采用這樣的方式淀歇。
實際上,leader選舉的算法非常多匈织,比如Zookeeper的Zab浪默、Raft以及Viewstamped Replication。而Kafka所使用的leader選舉算法更像是微軟的PacificA算法缀匕。
Kafka在Zookeeper中為每一個partition動態(tài)的維護了一個ISR纳决,這個ISR里的所有replica都跟上了leader,只有ISR里的成員才能有被選為leader的可能(unclean.leader.election.enable=false)乡小。在這種模式下阔加,對于f+1個副本,一個Kafka topic能在保證不丟失已經(jīng)commit消息的前提下容忍f個副本的失敗满钟,在大多數(shù)使用場景下胜榔,這種模式是十分有利的胳喷。事實上,為了容忍f個副本的失敗夭织,“少數(shù)服從多數(shù)”的方式和ISR在commit前需要等待的副本的數(shù)量是一樣的吭露,但是ISR需要的總的副本的個數(shù)幾乎是“少數(shù)服從多數(shù)”的方式的一半。
上文提到尊惰,在ISR中至少有一個follower時讲竿,Kafka可以確保已經(jīng)commit的數(shù)據(jù)不丟失,但如果某一個partition的所有replica都掛了弄屡,就無法保證數(shù)據(jù)不丟失了题禀。這種情況下有兩種可行的方案:
等待ISR中任意一個replica“活”過來,并且選它作為leader
選擇第一個“活”過來的replica(并不一定是在ISR中)作為leader
這就需要在可用性和一致性當中作出一個簡單的抉擇琢岩。如果一定要等待ISR中的replica“活”過來投剥,那不可用的時間就可能會相對較長。而且如果ISR中所有的replica都無法“活”過來了担孔,或者數(shù)據(jù)丟失了江锨,這個partition將永遠不可用。選擇第一個“活”過來的replica作為leader,而這個replica不是ISR中的replica,那即使它并不保障已經(jīng)包含了所有已commit的消息糕篇,它也會成為leader而作為consumer的數(shù)據(jù)源啄育。默認情況下,Kafka采用第二種策略拌消,即unclean.leader.election.enable=true挑豌,也可以將此參數(shù)設置為false來啟用第一種策略。
unclean.leader.election.enable這個參數(shù)對于leader的選舉墩崩、系統(tǒng)的可用性以及數(shù)據(jù)的可靠性都有至關重要的影響氓英。下面我們來分析下幾種典型的場景。
如果上圖所示鹦筹,假設某個partition中的副本數(shù)為3铝阐,replica-0, replica-1, replica-2分別存放在broker0, broker1和broker2中。AR=(0,1,2)铐拐,ISR=(0,1)徘键。
設置request.required.acks=-1, min.insync.replicas=2,unclean.leader.election.enable=false遍蟋。這里講broker0中的副本也稱之為broker0起初broker0為leader吹害,broker1為follower。
當ISR中的replica-0出現(xiàn)crash的情況時虚青,broker1選舉為新的leader[ISR=(1)]它呀,因為受min.insync.replicas=2影響,write不能服務,但是read能繼續(xù)正常服務纵穿。此種情況恢復方案:
嘗試恢復(重啟)replica-0烟号,如果能起來,系統(tǒng)正常政恍;
如果replica-0不能恢復汪拥,需要將min.insync.replicas設置為1,恢復write功能篙耗。
當ISR中的replica-0出現(xiàn)crash迫筑,緊接著replica-1也出現(xiàn)了crash, 此時[ISR=(1),leader=-1],不能對外提供服務,此種情況恢復方案:
嘗試恢復replica-0和replica-1宗弯,如果都能起來脯燃,則系統(tǒng)恢復正常;
如果replica-0起來蒙保,而replica-1不能起來辕棚,這時候仍然不能選出leader,因為當設置unclean.leader.election.enable=false時邓厕,leader只能從ISR中選舉逝嚎,當ISR中所有副本都失效之后,需要ISR中最后失效的那個副本能恢復之后才能選舉leader, 即replica-0先失效详恼,replica-1后失效补君,需要replica-1恢復后才能選舉leader。保守的方案建議把unclean.leader.election.enable設置為true,但是這樣會有丟失數(shù)據(jù)的情況發(fā)生昧互,這樣可以恢復read服務挽铁。同樣需要將min.insync.replicas設置為1,恢復write功能敞掘;
replica-1恢復叽掘,replica-0不能恢復,這個情況上面遇到過玖雁,read服務可用更扁,需要將min.insync.replicas設置為1,恢復write功能茄菊;
replica-0和replica-1都不能恢復疯潭,這種情況可以參考情形2.
當ISR中的replica-0, replica-1同時宕機,此時[ISR=(0,1)],不能對外提供服務赊堪,此種情況恢復方案:嘗試恢復replica-0和replica-1面殖,當其中任意一個副本恢復正常時,對外可以提供read服務哭廉。直到2個副本恢復正常脊僚,write功能才能恢復,或者將將min.insync.replicas設置為1。
3.7 Kafka的發(fā)送模式
Kafka的發(fā)送模式由producer端的配置參數(shù)producer.type來設置辽幌,這個參數(shù)指定了在后臺線程中消息的發(fā)送方式是同步的還是異步的增淹,默認是同步的方式,即producer.type=sync乌企。如果設置成異步的模式虑润,即producer.type=async,可以是producer以batch的形式push數(shù)據(jù)加酵,這樣會極大的提高broker的性能拳喻,但是這樣會增加丟失數(shù)據(jù)的風險。如果需要確保消息的可靠性猪腕,必須要將producer.type設置為sync冗澈。
對于異步模式,還有4個配套的參數(shù)陋葡,如下:
以batch的方式推送數(shù)據(jù)可以極大的提高處理效率亚亲,kafka producer可以將消息在內(nèi)存中累計到一定數(shù)量后作為一個batch發(fā)送請求。batch的數(shù)量大小可以通過producer的參數(shù)(batch.num.messages)控制腐缤。通過增加batch的大小捌归,可以減少網(wǎng)絡請求和磁盤IO的次數(shù),當然具體參數(shù)設置需要在效率和時效性方面做一個權(quán)衡岭粤。在比較新的版本中還有batch.size這個參數(shù)陨溅。
4 高可靠性使用分析
4.1 消息傳輸保障
前面已經(jīng)介紹了Kafka如何進行有效的存儲,以及了解了producer和consumer如何工作绍在。接下來討論的是Kafka如何確保消息在producer和consumer之間傳輸门扇。有以下三種可能的傳輸保障(delivery guarantee):
At most once: 消息可能會丟,但絕不會重復傳輸
At least once:消息絕不會丟偿渡,但可能會重復傳輸
Exactly once:每條消息肯定會被傳輸一次且僅傳輸一次
Kafka的消息傳輸保障機制非常直觀臼寄。當producer向broker發(fā)送消息時,一旦這條消息被commit溜宽,由于副本機制(replication)的存在吉拳,它就不會丟失。但是如果producer發(fā)送數(shù)據(jù)給broker后适揉,遇到的網(wǎng)絡問題而造成通信中斷留攒,那producer就無法判斷該條消息是否已經(jīng)提交(commit)。雖然Kafka無法確定網(wǎng)絡故障期間發(fā)生了什么嫉嘀,但是producer可以retry多次炼邀,確保消息已經(jīng)正確傳輸?shù)絙roker中,所以目前Kafka實現(xiàn)的是at least once剪侮。
consumer從broker中讀取消息后拭宁,可以選擇commit,該操作會在Zookeeper中存下該consumer在該partition下讀取的消息的offset。該consumer下一次再讀該partition時會從下一條開始讀取杰标。如未commit兵怯,下一次讀取的開始位置會跟上一次commit之后的開始位置相同。當然也可以將consumer設置為autocommit腔剂,即consumer一旦讀取到數(shù)據(jù)立即自動commit媒区。如果只討論這一讀取消息的過程,那Kafka是確保了exactly once, 但是如果由于前面producer與broker之間的某種原因?qū)е孪⒌闹貜偷敲催@里就是at least once驻仅。
考慮這樣一種情況,當consumer讀完消息之后先commit再處理消息登渣,在這種模式下噪服,如果consumer在commit后還沒來得及處理消息就crash了,下次重新開始工作后就無法讀到剛剛已提交而未處理的消息胜茧,這就對應于at most once了粘优。
讀完消息先處理再commit。這種模式下呻顽,如果處理完了消息在commit之前consumer crash了雹顺,下次重新開始工作時還會處理剛剛未commit的消息,實際上該消息已經(jīng)被處理過了廊遍,這就對應于at least once嬉愧。
要做到exactly once就需要引入消息去重機制。
4.2 消息去重
如上一節(jié)所述喉前,Kafka在producer端和consumer端都會出現(xiàn)消息的重復没酣,這就需要去重處理。
Kafka文檔中提及GUID(Globally Unique Identifier)的概念卵迂,通過客戶端生成算法得到每個消息的unique id裕便,同時可映射至broker上存儲的地址,即通過GUID便可查詢提取消息內(nèi)容见咒,也便于發(fā)送方的冪等性保證偿衰,需要在broker上提供此去重處理模塊,目前版本尚不支持改览。
針對GUID, 如果從客戶端的角度去重下翎,那么需要引入集中式緩存,必然會增加依賴復雜度宝当,另外緩存的大小難以界定视事。
不只是Kafka, 類似RabbitMQ以及RocketMQ這類商業(yè)級中間件也只保障at least once, 且也無法從自身去進行消息去重。所以我們建議業(yè)務方根據(jù)自身的業(yè)務特點進行去重今妄,比如業(yè)務消息本身具備冪等性郑口,或者借助Redis等其他產(chǎn)品進行去重處理。
4.3 高可靠性配置
Kafka提供了很高的數(shù)據(jù)冗余彈性盾鳞,對于需要數(shù)據(jù)高可靠性的場景犬性,我們可以增加數(shù)據(jù)冗余備份數(shù)(replication.factor),調(diào)高最小寫入副本數(shù)的個數(shù)(min.insync.replicas)等等腾仅,但是這樣會影響性能乒裆。反之,性能提高而可靠性則降低推励,用戶需要自身業(yè)務特性在彼此之間做一些權(quán)衡性選擇鹤耍。
要保證數(shù)據(jù)寫入到Kafka是安全的,高可靠的验辞,需要如下的配置:
topic的配置:replication.factor>=3,即副本數(shù)至少是3個稿黄;2<=min.insync.replicas<=replication.factor
broker的配置:leader的選舉條件unclean.leader.election.enable=false
producer的配置:request.required.acks=-1(all),producer.type=sync
5 BenchMark
Kafka在唯品會有著很深的歷史淵源跌造,根據(jù)唯品會消息中間件團隊(VMS團隊)所掌握的資料顯示杆怕,在VMS團隊運轉(zhuǎn)的Kafka集群中所支撐的topic數(shù)已接近2000,每天的請求量也已達千億級壳贪。這里就以Kafka的高可靠性為基準點來探究幾種不同場景下的行為表現(xiàn)陵珍,以此來加深對Kafka的認知,為大家在以后高效的使用Kafka時提供一份依據(jù)违施。
5.1 測試環(huán)境
Kafka broker用到了4臺機器互纯,分別為broker[0/1/2/3]配置如下:
CPU: 24core/2.6GHZ
Memory: 62G
Network: 4000Mb
OS/kernel: CentOs release 6.6 (Final)
Disk: 1089G
Kafka版本:0.10.1.0
broker端JVM參數(shù)設置:
-Xmx8G -Xms8G -server -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:+CMSClassUnloadingEnabled -XX:+CMSScavengeBeforeRemark -XX:+DisableExplicitGC -Djava.awt.headless=true -Xloggc:/apps/service/kafka/bin/../logs/kafkaServer-gc.log -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.port=9999
客戶端機器配置:
CPU: 24core/2.6GHZ
Memory: 3G
Network: 1000Mb
OS/kernel: CentOs release 6.3 (Final)
Disk: 240G
5.2 不同場景測試
場景1:測試不同的副本數(shù)、min.insync.replicas策略以及request.required.acks策略(以下簡稱acks策略)對于發(fā)送速度(TPS)的影響磕蒲。
具體配置:一個producer留潦;發(fā)送方式為sync;消息體大小為1kB辣往;partition數(shù)為12愤兵。副本數(shù)為:1/2/4;min.insync.replicas分別為1/2/4排吴;acks分別為-1(all)/1/0秆乳。
具體測試數(shù)據(jù)如下表(min.insync.replicas只在acks=-1時有效):
測試結(jié)果分析:
客戶端的acks策略對發(fā)送的TPS有較大的影響,TPS:acks_0 > acks_1 > ack_-1;
副本數(shù)越高钻哩,TPS越低屹堰;副本數(shù)一致時,min.insync.replicas不影響TPS街氢;
acks=0/1時扯键,TPS與min.insync.replicas參數(shù)以及副本數(shù)無關,僅受acks策略的影響珊肃。
下面將partition的個數(shù)設置為1荣刑,來進一步確認下不同的acks策略馅笙、不同的min.insync.replicas策略以及不同的副本數(shù)對于發(fā)送速度的影響,詳細請看情景2和情景3厉亏。
場景2:在partition個數(shù)固定為1董习,測試不同的副本數(shù)和min.insync.replicas策略對發(fā)送速度的影響。
具體配置:一個producer爱只;發(fā)送方式為sync皿淋;消息體大小為1kB;producer端acks=-1(all)恬试。變換副本數(shù):2/3/4窝趣; min.insync.replicas設置為:1/2/4。
測試結(jié)果如下:
測試結(jié)果分析:副本數(shù)越高训柴,TPS越低(這點與場景1的測試結(jié)論吻合)哑舒,但是當partition數(shù)為1時差距甚微。min.insync.replicas不影響TPS幻馁。
場景3:在partition個數(shù)固定為1散址,測試不同的acks策略和副本數(shù)對發(fā)送速度的影響。
具體配置:一個producer宣赔;發(fā)送方式為sync预麸;消息體大小為1kB;min.insync.replicas=1儒将。topic副本數(shù)為:1/2/4吏祸;acks: 0/1/-1。
測試結(jié)果如下:
測試結(jié)果分析(與情景1一致):
副本數(shù)越多钩蚊,TPS越低贡翘;
客戶端的acks策略對發(fā)送的TPS有較大的影響,TPS:acks_0 > acks_1 > ack_-1砰逻。
場景4:測試不同partition數(shù)對發(fā)送速率的影響
具體配置:一個producer鸣驱;消息體大小為1KB;發(fā)送方式為sync蝠咆;topic副本數(shù)為2踊东;min.insync.replicas=2;acks=-1刚操。partition數(shù)量設置為1/2/4/8/12韭邓。
測試結(jié)果:
測試結(jié)果分析:partition的不同會影響TPS咐柜,隨著partition的個數(shù)的增長TPS會有所增長,但并不是一直成正比關系制恍,到達一定臨界值時阁谆,partition數(shù)量的增加反而會使TPS略微降低惊畏。
場景5:通過將集群中部分broker設置成不可服務狀態(tài),測試對客戶端以及消息落盤的影響。
具體配置:一個producer司训;消息體大小1KB;發(fā)送方式為sync;topic副本數(shù)為4液南;min.insync.replicas設置為2壳猜;acks=-1;retries=0/100000000贺拣;partition數(shù)為12蓖谢。
具體測試數(shù)據(jù)如下表:
出錯信息:
錯誤1:客戶端返回異常捂蕴,部分數(shù)據(jù)可落盤譬涡,部分失敗:org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.
錯誤2:[WARN]internals.Sender – Got error produce response with correlation id 19369 on topic-partition default_channel_replicas_4_1-3, retrying (999999999 attempts left). Error: NETWORK_EXCEPTION
錯誤3: [WARN]internals.Sender – Got error produce response with correlation id 77890 on topic-partition default_channel_replicas_4_1-8, retrying (999999859 attempts left). Error: NOT_ENOUGH_REPLICAS
錯誤4: [WARN]internals.Sender – Got error produce response with correlation id 77705 on topic-partition default_channel_replicas_4_1-3, retrying (999999999 attempts left). Error: NOT_ENOUGH_REPLICAS_AFTER_APPEND
測試結(jié)果分析:
kill兩臺broker后啥辨,客戶端可以繼續(xù)發(fā)送涡匀。broker減少后,partition的leader分布在剩余的兩臺broker上溉知,造成了TPS的減性纱瘛;
kill三臺broker后级乍,客戶端無法繼續(xù)發(fā)送舌劳。Kafka的自動重試功能開始起作用,當大于等于min.insync.replicas數(shù)量的broker恢復后玫荣,可以繼續(xù)發(fā)送甚淡;
當retries不為0時,消息有重復落盤捅厂;客戶端成功返回的消息都成功落盤贯卦,異常時部分消息可以落盤。
場景6:測試單個producer的發(fā)送延遲焙贷,以及端到端的延遲撵割。
具體配置::一個producer;消息體大小1KB辙芍;發(fā)送方式為sync啡彬;topic副本數(shù)為4;min.insync.replicas設置為2故硅;acks=-1外遇;partition數(shù)為12。
測試數(shù)據(jù)及結(jié)果(單位為ms):
各場景測試總結(jié):
當acks=-1時契吉,Kafka發(fā)送端的TPS受限于topic的副本數(shù)量(ISR中)跳仿,副本越多TPS越低;
acks=0時捐晶,TPS最高菲语,其次為1妄辩,最差為-1,即TPS:acks_0 > acks_1 > ack_-1山上;
min.insync.replicas參數(shù)不影響TPS眼耀;
partition的不同會影響TPS,隨著partition的個數(shù)的增長TPS會有所增長佩憾,但并不是一直成正比關系哮伟,到達一定臨界值時,partition數(shù)量的增加反而會使TPS略微降低妄帘;
Kafka在acks=-1,min.insync.replicas>=1時楞黄,具有高可靠性,所有成功返回的消息都可以落盤抡驼。
?在此我向大家推薦一個架構(gòu)學習交流群鬼廓。交流學習群號: 744642380, 里面會分享一些資深架構(gòu)師錄制的視頻錄像:有Spring致盟,MyBatis碎税,Netty源碼分析,高并發(fā)馏锡、高性能雷蹂、分布式、微服務架構(gòu)的原理杯道,JVM性能優(yōu)化匪煌、分布式架構(gòu)等這些成為架構(gòu)師必備的知識體系。還能領取免費的學習資源