1. Kafka簡(jiǎn)介
Kafka 是一種分布式的扁掸,基于發(fā)布/訂閱的消息系統(tǒng)策泣,主要設(shè)計(jì)目標(biāo)如下:
- 以時(shí)間復(fù)雜度為 O(1) 的方式提供消息持久化能力,即使對(duì) TB 級(jí)以上數(shù)據(jù)也能保證常數(shù)時(shí)間復(fù)雜度的訪(fǎng)問(wèn)性能乔煞。
- 高吞吐率褂傀。即使在非常廉價(jià)的商用機(jī)器上也能做到單機(jī)支持每秒 100K 條以上消息的傳輸。
- 支持 Kafka Server 間的消息分區(qū)编检,及分布式消費(fèi)无蜂,同時(shí)保證每個(gè) Partition 內(nèi)的消息順序傳輸。
- 同時(shí)支持離線(xiàn)數(shù)據(jù)處理和實(shí)時(shí)數(shù)據(jù)處理蒙谓。
- Scale out:支持在線(xiàn)水平擴(kuò)展斥季。
1.1 為什么使用消息系統(tǒng)
解耦
在項(xiàng)目啟動(dòng)之初來(lái)預(yù)測(cè)將來(lái)項(xiàng)目會(huì)碰到什么需求,是極其困難的累驮。消息系統(tǒng)在處理過(guò)程中間插入了一個(gè)隱含的酣倾、基于數(shù)據(jù)的接口層,兩邊的處理過(guò)程都要實(shí)現(xiàn)這一接口谤专。這允許你獨(dú)立的擴(kuò)展或修改兩邊的處理過(guò)程躁锡,只要確保它們遵守同樣的接口約束。冗余
有些情況下置侍,處理數(shù)據(jù)的過(guò)程會(huì)失敗映之。除非數(shù)據(jù)被持久化,否則將造成丟失蜡坊。消息隊(duì)列把數(shù)據(jù)進(jìn)行持久化直到它們已經(jīng)被完全處理杠输,通過(guò)這一方式規(guī)避了數(shù)據(jù)丟失風(fēng)險(xiǎn)。許多消息隊(duì)列所采用的”插入-獲取-刪除”范式中秕衙,在把一個(gè)消息從隊(duì)列中刪除之前蠢甲,需要你的處理系統(tǒng)明確的指出該消息已經(jīng)被處理完畢,從而確保你的數(shù)據(jù)被安全的保存直到你使用完畢据忘。擴(kuò)展性
因?yàn)橄㈥?duì)列解耦了你的處理過(guò)程鹦牛,所以增大消息入隊(duì)和處理的頻率是很容易的搞糕,只要另外增加處理過(guò)程即可。不需要改變代碼曼追、不需要調(diào)節(jié)參數(shù)窍仰。擴(kuò)展就像調(diào)大電力按鈕一樣簡(jiǎn)單。靈活性 & 峰值處理能力
在訪(fǎng)問(wèn)量劇增的情況下礼殊,應(yīng)用仍然需要繼續(xù)發(fā)揮作用辈赋,但是這樣的突發(fā)流量并不常見(jiàn);如果為以能處理這類(lèi)峰值訪(fǎng)問(wèn)為標(biāo)準(zhǔn)來(lái)投入資源隨時(shí)待命無(wú)疑是巨大的浪費(fèi)膏燕。使用消息隊(duì)列能夠使關(guān)鍵組件頂住突發(fā)的訪(fǎng)問(wèn)壓力钥屈,而不會(huì)因?yàn)橥话l(fā)的超負(fù)荷的請(qǐng)求而完全崩潰。可恢復(fù)性
系統(tǒng)的一部分組件失效時(shí)坝辫,不會(huì)影響到整個(gè)系統(tǒng)篷就。消息隊(duì)列降低了進(jìn)程間的耦合度,所以即使一個(gè)處理消息的進(jìn)程掛掉近忙,加入隊(duì)列中的消息仍然可以在系統(tǒng)恢復(fù)后被處理竭业。順序保證
在大多使用場(chǎng)景下,數(shù)據(jù)處理的順序都很重要及舍。大部分消息隊(duì)列本來(lái)就是排序的未辆,并且能保證數(shù)據(jù)會(huì)按照特定的順序來(lái)處理。Kafka保證一個(gè)Partition內(nèi)的消息的有序性锯玛。緩沖
在任何重要的系統(tǒng)中咐柜,都會(huì)有需要不同的處理時(shí)間的元素。例如攘残,加載一張圖片比應(yīng)用過(guò)濾器花費(fèi)更少的時(shí)間拙友。消息隊(duì)列通過(guò)一個(gè)緩沖層來(lái)幫助任務(wù)最高效率的執(zhí)行———寫(xiě)入隊(duì)列的處理會(huì)盡可能的快速。該緩沖有助于控制和優(yōu)化數(shù)據(jù)流經(jīng)過(guò)系統(tǒng)的速度歼郭。異步通信
很多時(shí)候遗契,用戶(hù)不想也不需要立即處理消息。消息隊(duì)列提供了異步處理機(jī)制病曾,允許用戶(hù)把一個(gè)消息放入隊(duì)列牍蜂,但并不立即處理它。想向隊(duì)列中放入多少消息就放多少泰涂,然后在需要的時(shí)候再去處理它們鲫竞。
1.2 常用Message Queue對(duì)比
RabbitMQ
RabbitMQ是使用Erlang編寫(xiě)的一個(gè)開(kāi)源的消息隊(duì)列,本身支持很多的協(xié)議:AMQP负敏,XMPP, SMTP, STOMP贡茅,也正因如此,它非常重量級(jí)其做,更適合于企業(yè)級(jí)的開(kāi)發(fā)顶考。同時(shí)實(shí)現(xiàn)了Broker構(gòu)架,這意味著消息在發(fā)送給客戶(hù)端時(shí)先在中心隊(duì)列排隊(duì)妖泄。對(duì)路由驹沿,負(fù)載均衡或者數(shù)據(jù)持久化都有很好的支持。Redis
Redis是一個(gè)基于Key-Value對(duì)的NoSQL數(shù)據(jù)庫(kù)蹈胡,開(kāi)發(fā)維護(hù)很活躍渊季。雖然它是一個(gè)Key-Value數(shù)據(jù)庫(kù)存儲(chǔ)系統(tǒng),但它本身支持MQ功能罚渐,所以完全可以當(dāng)做一個(gè)輕量級(jí)的隊(duì)列服務(wù)來(lái)使用却汉。對(duì)于RabbitMQ和Redis的入隊(duì)和出隊(duì)操作,各執(zhí)行100萬(wàn)次荷并,每10萬(wàn)次記錄一次執(zhí)行時(shí)間合砂。測(cè)試數(shù)據(jù)分為128Bytes、512Bytes源织、1K和10K四個(gè)不同大小的數(shù)據(jù)翩伪。實(shí)驗(yàn)表明:入隊(duì)時(shí),當(dāng)數(shù)據(jù)比較小時(shí)Redis的性能要高于RabbitMQ谈息,而如果數(shù)據(jù)大小超過(guò)了10K缘屹,Redis則慢的無(wú)法忍受;出隊(duì)時(shí)侠仇,無(wú)論數(shù)據(jù)大小轻姿,Redis都表現(xiàn)出非常好的性能,而RabbitMQ的出隊(duì)性能則遠(yuǎn)低于Redis逻炊。ZeroMQ
ZeroMQ號(hào)稱(chēng)最快的消息隊(duì)列系統(tǒng)踢代,尤其針對(duì)大吞吐量的需求場(chǎng)景。ZMQ能夠?qū)崿F(xiàn)RabbitMQ不擅長(zhǎng)的高級(jí)/復(fù)雜的隊(duì)列嗅骄,但是開(kāi)發(fā)人員需要自己組合多種技術(shù)框架胳挎,技術(shù)上的復(fù)雜度是對(duì)這MQ能夠應(yīng)用成功的挑戰(zhàn)。ZeroMQ具有一個(gè)獨(dú)特的非中間件的模式溺森,你不需要安裝和運(yùn)行一個(gè)消息服務(wù)器或中間件慕爬,因?yàn)槟愕膽?yīng)用程序?qū)缪葸@個(gè)服務(wù)器角色。你只需要簡(jiǎn)單的引用ZeroMQ程序庫(kù)屏积,可以使用NuGet安裝医窿,然后你就可以愉快的在應(yīng)用程序之間發(fā)送消息了。但是ZeroMQ僅提供非持久性的隊(duì)列炊林,也就是說(shuō)如果宕機(jī)姥卢,數(shù)據(jù)將會(huì)丟失。其中,Twitter的Storm 0.9.0以前的版本中默認(rèn)使用ZeroMQ作為數(shù)據(jù)流的傳輸(Storm從0.9版本開(kāi)始同時(shí)支持ZeroMQ和Netty作為傳輸模塊)独榴。ActiveMQ
ActiveMQ是Apache下的一個(gè)子項(xiàng)目僧叉。 類(lèi)似于ZeroMQ,它能夠以代理人和點(diǎn)對(duì)點(diǎn)的技術(shù)實(shí)現(xiàn)隊(duì)列棺榔。同時(shí)類(lèi)似于RabbitMQ瓶堕,它少量代碼就可以高效地實(shí)現(xiàn)高級(jí)應(yīng)用場(chǎng)景。Kafka/Jafka
Kafka是Apache下的一個(gè)子項(xiàng)目症歇,是一個(gè)高性能跨語(yǔ)言分布式發(fā)布/訂閱消息隊(duì)列系統(tǒng)郎笆,而Jafka是在Kafka之上孵化而來(lái)的,即Kafka的一個(gè)升級(jí)版忘晤。具有以下特性:快速持久化宛蚓,可以在O(1)的系統(tǒng)開(kāi)銷(xiāo)下進(jìn)行消息持久化;高吞吐设塔,在一臺(tái)普通的服務(wù)器上既可以達(dá)到10W/s的吞吐速率凄吏;完全的分布式系統(tǒng),Broker壹置、Producer竞思、Consumer都原生自動(dòng)支持分布式,自動(dòng)實(shí)現(xiàn)負(fù)載均衡钞护;支持Hadoop數(shù)據(jù)并行加載盖喷,對(duì)于像Hadoop的一樣的日志數(shù)據(jù)和離線(xiàn)分析系統(tǒng),但又要求實(shí)時(shí)處理的限制难咕,這是一個(gè)可行的解決方案课梳。Kafka通過(guò)Hadoop的并行加載機(jī)制統(tǒng)一了在線(xiàn)和離線(xiàn)的消息處理。Apache Kafka相對(duì)于ActiveMQ是一個(gè)非常輕量級(jí)的消息系統(tǒng)余佃,除了性能非常好之外暮刃,還是一個(gè)工作良好的分布式系統(tǒng)。
2. Kafka架構(gòu)介紹
2.1. 基礎(chǔ)概念
-
Producer
負(fù)責(zé)發(fā)布消息到Kafka broker -
Consumer
消息消費(fèi)者爆土,向Kafka broker讀取消息的客戶(hù)端椭懊。 -
Topic
每條發(fā)布到Kafka集群的消息都有一個(gè)類(lèi)別,這個(gè)類(lèi)別被稱(chēng)為T(mén)opic步势。在 Kafka 中氧猬,消息以主題(Topic)來(lái)分類(lèi),每一個(gè)主題都對(duì)應(yīng)一個(gè)「消息隊(duì)列」坏瘩,這有點(diǎn)兒類(lèi)似于數(shù)據(jù)庫(kù)中的表盅抚。(物理上不同Topic的消息分開(kāi)存儲(chǔ),邏輯上一個(gè)Topic的消息雖然保存于一個(gè)或多個(gè)broker上但用戶(hù)只需指定消息的Topic即可生產(chǎn)或消費(fèi)數(shù)據(jù)而不必關(guān)心數(shù)據(jù)存于何處)
但是如果我們把所有同類(lèi)的消息都塞入到一個(gè)“中心”隊(duì)列中倔矾,勢(shì)必缺少可伸縮性妄均,無(wú)論是生產(chǎn)者/消費(fèi)者數(shù)目的增加柱锹,還是消息數(shù)量的增加,都可能耗盡系統(tǒng)的性能或存儲(chǔ)丰包。 -
Partition
Parition是物理上的概念禁熏,每個(gè)Topic包含一個(gè)或多個(gè)Partition。 -
Broker
Kafka集群包含一個(gè)或多個(gè)服務(wù)器烫沙,這種服務(wù)器被稱(chēng)為broker匹层。 -
Consumer Group
每個(gè)Consumer屬于一個(gè)特定的Consumer Group(可為每個(gè)Consumer指定group name隙笆,若不指定group name則屬于默認(rèn)的group)锌蓄。
2.2. Kafka拓?fù)浣Y(jié)構(gòu)
如上圖所示,一個(gè)典型的Kafka集群中包含若干Producer(可以是web前端產(chǎn)生的Page View撑柔,或者是服務(wù)器日志,系統(tǒng)CPU、Memory等)审编,若干broker(Kafka支持水平擴(kuò)展拗胜,一般broker數(shù)量越多,集群吞吐率越高)檀训,若干Consumer Group柑潦,以及一個(gè)Zookeeper集群。Kafka通過(guò)Zookeeper管理集群配置峻凫,選舉leader渗鬼,以及在Consumer Group發(fā)生變化時(shí)進(jìn)行rebalance。Producer使用push模式將消息發(fā)布到broker荧琼,Consumer使用pull模式從broker訂閱并消費(fèi)消息譬胎。
2.3. Topic & Partition
Topic 在邏輯上可以被認(rèn)為是一個(gè) Queue,每條消費(fèi)都必須指定它的 Topic命锄,可以簡(jiǎn)單理解為必須指明把這條消息放進(jìn)哪個(gè) Queue 里堰乔。我們把一類(lèi)消息按照主題來(lái)分類(lèi),有點(diǎn)類(lèi)似于數(shù)據(jù)庫(kù)中的表脐恩。
為了使得 Kafka 的吞吐率可以線(xiàn)性提高镐侯,物理上把 Topic 分成一個(gè)或多個(gè) Partition。對(duì)應(yīng)到系統(tǒng)上就是一個(gè)或若干個(gè)目錄驶冒。
如果一個(gè)Topic對(duì)應(yīng)一個(gè)文件苟翻,那這個(gè)文件所在的機(jī)器I/O將會(huì)成為這個(gè)Topic的性能瓶頸,而有了Partition后只怎,不同的消息可以并行寫(xiě)入不同broker的不同Partition里袜瞬,極大的提高了吞吐率。
可以在$KAFKA_HOME/config/server.properties中通過(guò)配置項(xiàng)num.partitions來(lái)指定新建Topic的默認(rèn)Partition數(shù)量身堡,也可在創(chuàng)建Topic時(shí)通過(guò)參數(shù)指定邓尤,同時(shí)也可以在Topic創(chuàng)建之后通過(guò)Kafka提供的工具修改。
假設(shè)我們現(xiàn)在 Kafka 集群只有一個(gè) Broker,我們創(chuàng)建 2 個(gè) Topic 名稱(chēng)分別為:「Topic1」和「Topic2」汞扎,Partition 數(shù)量分別為 1季稳、2。
那么我們的根目錄下就會(huì)創(chuàng)建如下三個(gè)文件夾:
| --topic1-0
| --topic2-0
| --topic2-1
在 Kafka 的文件存儲(chǔ)中澈魄,同一個(gè) Topic 下有多個(gè)不同的 Partition景鼠,每個(gè) Partition 都為一個(gè)目錄。
而每一個(gè)目錄又被平均分配成多個(gè)大小相等的 Segment File 中痹扇,Segment File 又由 index file 和 data file 組成铛漓,他們總是成對(duì)出現(xiàn),后綴 ".index" 和 ".log" 分表表示 Segment 索引文件和數(shù)據(jù)文件鲫构。
現(xiàn)在假設(shè)我們?cè)O(shè)置每個(gè) Segment 大小為 500 MB浓恶,并啟動(dòng)生產(chǎn)者向 topic1 中寫(xiě)入大量數(shù)據(jù),topic1-0 文件夾中就會(huì)產(chǎn)生類(lèi)似如下的一些文件:
| --topic1-0
| --00000000000000000000.index
| --00000000000000000000.log
| --00000000000000368769.index
| --00000000000000368769.log
| --00000000000000737337.index
| --00000000000000737337.log
| --00000000000001105814.index
| --00000000000001105814.log
| --topic2-0
| --topic2-1
Segment 是 Kafka 文件存儲(chǔ)的最小單位结笨。Segment 文件命名規(guī)則:Partition 全局的第一個(gè) Segment 從 0 開(kāi)始包晰,后續(xù)每個(gè) Segment 文件名為上一個(gè) Segment 文件最后一條消息的 offset 值。
數(shù)值最大為 64 位 long 大小炕吸,19 位數(shù)字字符長(zhǎng)度伐憾,沒(méi)有數(shù)字用 0 填充。如 00000000000000368769.index 和 00000000000000368769.log赫模。
以上面的一對(duì) Segment File 為例树肃,說(shuō)明一下索引文件和數(shù)據(jù)文件對(duì)應(yīng)關(guān)系:
其中以索引文件中元數(shù)據(jù)
<3, 497>
為例,依次在數(shù)據(jù)文件中表示第 3
個(gè) Message(在全局 Partition 表示第 368769 + 3 = 368772
個(gè) message)以及該消息的物理偏移地址為 497
嘴瓤。
注意該 Index
文件并不是從0開(kāi)始扫外,也不是每次遞增 1 的,這是因?yàn)?Kafka 采取稀疏索引存儲(chǔ)的方式廓脆,每隔一定字節(jié)的數(shù)據(jù)建立一條索引筛谚。
它減少了索引文件大小,使得能夠把 Index
映射到內(nèi)存停忿,降低了查詢(xún)時(shí)的磁盤(pán) IO 開(kāi)銷(xiāo)驾讲,同時(shí)也并沒(méi)有給查詢(xún)帶來(lái)太多的時(shí)間消耗。
因?yàn)槠湮募麨樯弦粋€(gè) Segment
最后一條消息的 Offset
席赂,所以當(dāng)需要查找一個(gè)指定 Offset
的 Message
時(shí)吮铭,通過(guò)在所有 Segment
的文件名中進(jìn)行二分查找就能找到它歸屬的 Segment
。
再在其 Index 文件中找到其對(duì)應(yīng)到文件上的物理位置颅停,就能拿出該 Message谓晌。
由于消息在 Partition
的 Segment
數(shù)據(jù)文件中是順序讀寫(xiě)的,且消息消費(fèi)后不會(huì)刪除(刪除策略是針對(duì)過(guò)期的 Segment
文件)癞揉,這是順序磁盤(pán) IO 存儲(chǔ)設(shè)計(jì)師 Kafka 高性能很重要的原因纸肉。
Kafka
是如何準(zhǔn)確的知道 Message 的偏移的呢溺欧?這是因?yàn)樵?Kafka 定義了標(biāo)準(zhǔn)的數(shù)據(jù)存儲(chǔ)結(jié)構(gòu),在 Partition 中的每一條 Message 都包含了以下三個(gè)屬性:
Offset
:表示 Message 在當(dāng)前 Partition 中的偏移量柏肪,是一個(gè)邏輯上的值姐刁,唯一確定了 Partition 中的一條 Message,可以簡(jiǎn)單的認(rèn)為是一個(gè) ID烦味。
MessageSize
:表示 Message 內(nèi)容 Data 的大小聂使。
Data
:Message 的具體內(nèi)容。
因?yàn)槊織l消息都被append到該P(yáng)artition中谬俄,屬于順序?qū)懘疟P(pán)柏靶,因此效率非常高(經(jīng)驗(yàn)證,順序?qū)懘疟P(pán)效率比隨機(jī)寫(xiě)內(nèi)存還要高凤瘦,這是Kafka高吞吐率的一個(gè)很重要的保證)宿礁。
如何根據(jù)offset查找message
例如讀取 offset=368776的 message案铺,需要通過(guò)下面2個(gè)步驟查找:
-
第一步查找 segment file 上述圖為例蔬芥,其中00000000000000000000.index 表示最開(kāi)始的文件,起始偏移量(offset)為 0控汉。第二個(gè)文件00000000000000368769.index 的消息量起始偏移量為368770 = 368769 + 1笔诵,其他后續(xù)文件依次類(lèi)推,以起始偏移量命名并排序這些文件姑子,只要根據(jù) offset 二分查找文件列表乎婿,就可以快速定位到具體文件。 當(dāng)offset=368776時(shí)定位到00000000000000368769.index | log街佑。
- 第二步通過(guò) segment file 查找 message 通過(guò)第一步定位到 segment file谢翎,當(dāng) offset=368776時(shí),依次定位到00000000000000368769.index 的元數(shù)據(jù)物理位置和
00000000000000368769.log 的物理偏移地址沐旨,然后再通過(guò)00000000000000368769.log 順序查找直到offset=368776 為止森逮。
如何根據(jù)timeindex查找message
Kafka 從0.10.0.0版本起,為分片日志文件中新增了一個(gè) .timeindex 的索引文件磁携,可以根據(jù)時(shí)間戳定位消息褒侧。同樣我們可以通過(guò)腳本 kafka-dump-log.sh 查看時(shí)間索引的文件內(nèi)容。
- 首先定位分片谊迄,將 1570793423501 與每個(gè)分片的最大時(shí)間戳進(jìn)行對(duì)比(最大時(shí)間戳取時(shí)間索引文件的最后一條記錄時(shí)間闷供,如果時(shí)間為 0 則取該日志分段的最近修改時(shí)間),直到找到大于或等于 1570793423501 的日志分段统诺,因此會(huì)定位到時(shí)間索引文件00000000000003257573.timeindex歪脏,其最大時(shí)間戳為 1570793423505。
- 重復(fù) Offset 找到 log 文件的步驟粮呢。
分區(qū)分配策略
Kafka提供了三個(gè)分區(qū)分配策略:RangeAssignor婿失、RoundRobinAssignor以及StickyAssignor怠硼,下面簡(jiǎn)單介紹下各個(gè)算法的實(shí)現(xiàn)。
-
RangeAssignor:kafka默認(rèn)會(huì)采用此策略進(jìn)行分區(qū)分配移怯,主要流程如下:
假設(shè)一個(gè)消費(fèi)組中存在兩個(gè)消費(fèi)者{C0,C1}香璃,該消費(fèi)組訂閱了三個(gè)主題{T1,T2,T3},每個(gè)主題分別存在三個(gè)分區(qū)舟误,一共就有9個(gè)分區(qū){TP1,TP2,...,TP9}葡秒。通過(guò)以上算法我們可以得到D=4,R=1嵌溢,那么消費(fèi)組C0將消費(fèi)的分區(qū)為{TP1,TP2,TP3,TP4,TP5}眯牧,C1將消費(fèi)分區(qū){TP6,TP7,TP8,TP9}。這里存在一個(gè)問(wèn)題赖草,如果不能均分学少,那么前面的幾個(gè)消費(fèi)者將會(huì)多消費(fèi)一個(gè)分區(qū)。
- 將所有訂閱主題下的分區(qū)進(jìn)行排序得到集合TP={TP0,Tp1,...,TPN+1}秧骑。
- 對(duì)消費(fèi)組中的所有消費(fèi)者根據(jù)名字進(jìn)行字典排序得到集合CG={C0,C1,...,CM+1}版确。
- 計(jì)算D=N/M,R=N%M乎折。
-
消費(fèi)者Ci獲取消費(fèi)分區(qū)起始位置=D*i+min(i,R)
绒疗,Ci獲取的分區(qū)總數(shù)=D+(if (i+1>R)0 else 1)
。
-
RoundRobinAssignor:使用該策略需要滿(mǎn)足以下兩個(gè)條件:1) 消費(fèi)組中的所有消費(fèi)者應(yīng)該訂閱主題相同骂澄;2) 同一個(gè)消費(fèi)組的所有消費(fèi)者在實(shí)例化時(shí)給每個(gè)主題指定相同的流數(shù)吓蘑。
- 對(duì)所有主題的所有分區(qū)根據(jù)主題+分區(qū)得到的哈希值進(jìn)行排序。
- 對(duì)所有消費(fèi)者按字典排序坟冲。
- 通過(guò)輪詢(xún)的方式將分區(qū)分配給消費(fèi)者磨镶。
-
StickyAssignor:該分配方式在0.11版本開(kāi)始引入,主要是保證以下特性:
- 盡可能的保證分配均衡健提;
- 當(dāng)重新分配時(shí)琳猫,保留盡可能多的現(xiàn)有分配。
其中第一條的優(yōu)先級(jí)要大于第二條矩桂。
2.4. Broker 和集群(Cluster)
一個(gè) Kafka 服務(wù)器也稱(chēng)為 Broker沸移,它接受生產(chǎn)者發(fā)送的消息并存入磁盤(pán);Broker 同時(shí)服務(wù)消費(fèi)者拉取分區(qū)消息的請(qǐng)求侄榴,返回目前已經(jīng)提交的消息雹锣。使用特定的機(jī)器硬件,一個(gè) Broker 每秒可以處理成千上萬(wàn)的分區(qū)和百萬(wàn)量級(jí)的消息癞蚕。
若干個(gè) Broker 組成一個(gè)集群(Cluster)蕊爵,其中集群內(nèi)某個(gè) Broker 會(huì)成為集群控制器(Cluster Controller),它負(fù)責(zé)管理集群桦山,包括分配分區(qū)到 Broker攒射、監(jiān)控 Broker 故障等醋旦。
在集群內(nèi),一個(gè)分區(qū)由一個(gè) Broker 負(fù)責(zé)会放,這個(gè) Broker 也稱(chēng)為這個(gè)分區(qū)的 Leader饲齐。
對(duì)于傳統(tǒng)的message queue而言捂人,一般會(huì)刪除已經(jīng)被消費(fèi)的消息,而Kafka集群會(huì)保留所有的消息矢沿,無(wú)論其被消費(fèi)與否滥搭。當(dāng)然,因?yàn)榇疟P(pán)限制捣鲸,不可能永久保留所有數(shù)據(jù)(實(shí)際上也沒(méi)必要)瑟匆,因此Kafka提供兩種策略刪除舊數(shù)據(jù):一是基于時(shí)間,二是基于Partition文件大小栽惶。例如可以通過(guò)配置$KAFKA_HOME/config/server.properties愁溜,讓Kafka刪除一周前的數(shù)據(jù),也可在Partition文件超過(guò)1GB時(shí)刪除舊數(shù)據(jù)媒役,配置如下所示:
# The minimum age of a log file to be eligible for deletion
log.retention.hours=168
# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824
# The interval at which log segments are checked to see if they can be deleted according to the retention policies
log.retention.check.interval.ms=300000
# If log.cleaner.enable=true is set the cleaner will be enabled and individual logs can then be marked for log compaction.
log.cleaner.enable=false
這里要注意祝谚,因?yàn)镵afka讀取特定消息的時(shí)間復(fù)雜度為O(1),即與文件大小無(wú)關(guān)酣衷,所以這里刪除過(guò)期文件與提高Kafka性能無(wú)關(guān)。選擇怎樣的刪除策略只與磁盤(pán)以及具體的需求有關(guān)次泽。另外穿仪,Kafka會(huì)為每一個(gè)Consumer Group保留一些metadata信息——當(dāng)前消費(fèi)的消息的position,也即offset意荤。
這個(gè)offset由Consumer控制啊片。正常情況下Consumer會(huì)在消費(fèi)完一條消息后遞增該offset。當(dāng)然玖像,Consumer也可將offset設(shè)成一個(gè)較小的值紫谷,重新消費(fèi)一些消息。因?yàn)閛ffet由Consumer控制捐寥,所以Kafka broker是無(wú)狀態(tài)的笤昨,它不需要標(biāo)記哪些消息被哪些消費(fèi)過(guò),也不需要通過(guò)broker去保證同一個(gè)Consumer Group只有一個(gè)Consumer能消費(fèi)某一條消息握恳,因此也就不需要鎖機(jī)制瞒窒,這也為Kafka的高吞吐率提供了有力保障。
2.5. Producer
Producer發(fā)送消息到broker時(shí)乡洼,會(huì)根據(jù)Paritition機(jī)制選擇將其存儲(chǔ)到哪一個(gè)Partition崇裁。如果Partition機(jī)制設(shè)置合理匕坯,所有消息可以均勻分布到不同的Partition里,這樣就實(shí)現(xiàn)了負(fù)載均衡拔稳。
- 指明 Partition 的情況下葛峻,直接將給定的Value 作為 Partition 的值
- 沒(méi)有指明 Partition 但有 Key 的情況下,將Key 的 Hash 值與分區(qū)數(shù)取余得到Partition值巴比。
- 既沒(méi)有 Partition 又沒(méi)有 Key 的情況下泞歉,第一次調(diào)用時(shí)隨機(jī)生成一個(gè)整數(shù)(后面每次調(diào)用都在這個(gè)整數(shù)上自增),將這個(gè)值與可用的分區(qū)數(shù)取余匿辩,得到 Partition 值腰耙,也就是常說(shuō)的 Round-Robin 輪詢(xún)算法。
為保證 Producer 發(fā)送的數(shù)據(jù)铲球,能可靠地發(fā)送到指定的 Topic挺庞,Topic 的每個(gè)Partition 收到 Producer 發(fā)送的數(shù)據(jù)后,都需要向 Producer 發(fā)送 ACK稼病。如果Producer 收到 ACK选侨,就會(huì)進(jìn)行下一輪的發(fā)送,否則重新發(fā)送數(shù)據(jù)然走。
ack參數(shù)設(shè)置及意義
生產(chǎn)端往kafka集群發(fā)送消息時(shí)援制,可以通過(guò)request.required.acks參數(shù)來(lái)設(shè)置數(shù)據(jù)的可靠性級(jí)別
1:默認(rèn)為1,表示在ISR中的leader副本成功接收到數(shù)據(jù)并確認(rèn)后再發(fā)送下一條消息芍瑞,如果主節(jié)點(diǎn)宕機(jī)則可能出現(xiàn)數(shù)據(jù)丟失場(chǎng)景晨仑,詳細(xì)分析可參考前面提到的副本章節(jié)。
0:表示生產(chǎn)端不需要等待節(jié)點(diǎn)的確認(rèn)就可以繼續(xù)發(fā)送下一批數(shù)據(jù)拆檬,這種情況下數(shù)據(jù)傳輸效率最高洪己,但是數(shù)據(jù)的可靠性最低。
-1:表示生產(chǎn)端需要等待ISR中的所有副本節(jié)點(diǎn)都收到數(shù)據(jù)之后才算消息寫(xiě)入成功竟贯,可靠性最高答捕,但是性能最低,如果服務(wù)端的min.insync.replicas值設(shè)置為1屑那,那么在這種情況下允許ISR集合只有一個(gè)副本拱镐,因此也會(huì)存在數(shù)據(jù)丟失的情況。
冪等特性
冪等性:同一個(gè)操作任意執(zhí)行多次產(chǎn)生的影響或效果與一次執(zhí)行影響相同持际。
冪等的關(guān)鍵在于服務(wù)端能否識(shí)別出請(qǐng)求是否重復(fù)沃琅,然后過(guò)濾掉這些重復(fù)請(qǐng)求,通常情況下需要以下信息來(lái)實(shí)現(xiàn)冪等特性:
- 唯一標(biāo)識(shí):判斷某個(gè)請(qǐng)求是否重復(fù)选酗,需要有一個(gè)唯一性標(biāo)識(shí)阵难,然后服務(wù)端就能根據(jù)這個(gè)唯一標(biāo)識(shí)來(lái)判斷是否為重復(fù)請(qǐng)求。
-
記錄已經(jīng)處理過(guò)的請(qǐng)求:服務(wù)端需要記錄已經(jīng)處理過(guò)的請(qǐng)求芒填,然后根據(jù)唯一標(biāo)識(shí)來(lái)判斷是否是重復(fù)請(qǐng)求呜叫,如果已經(jīng)處理過(guò)空繁,則直接拒絕或者不做任何操作返回成功。
kafka中Producer端的冪等性是指當(dāng)發(fā)送同一條消息時(shí)朱庆,消息在集群中只會(huì)被持久化一次盛泡,其冪等是在以下條件中才成立: - 只能保證生產(chǎn)端在單個(gè)會(huì)話(huà)內(nèi)的冪等,如果生產(chǎn)端因?yàn)槟承┰蛞馔鈷斓羧缓笾貑⒂榧眨藭r(shí)是沒(méi)辦法保證冪等的傲诵,因?yàn)檫@時(shí)沒(méi)辦法獲取到之前的狀態(tài)信息,即無(wú)法做到垮會(huì)話(huà)級(jí)別的冪等箱硕。
- 冪等性不能垮多個(gè)主題分區(qū)拴竹,只能保證單個(gè)分區(qū)內(nèi)的冪等,涉及到多個(gè)消息分區(qū)時(shí)剧罩,中間的狀態(tài)并沒(méi)有同步栓拜。
如果要支持垮會(huì)話(huà)或者垮多個(gè)消息分區(qū)的情況,則需要使用kafka的事務(wù)性
來(lái)實(shí)現(xiàn)惠昔。
為了實(shí)現(xiàn)生成端的冪等語(yǔ)義幕与,引入了Producer ID(PID)
與Sequence Number
的概念:
-
Producer ID(PID)
:每個(gè)生產(chǎn)者在初始化時(shí)都會(huì)分配一個(gè)唯一的PID,PID的分配對(duì)于用戶(hù)來(lái)說(shuō)是透明的镇防。 -
Sequence Number(序列號(hào))
:對(duì)于給定的PID而言啦鸣,序列號(hào)從0開(kāi)始單調(diào)遞增,每個(gè)主題分區(qū)均會(huì)產(chǎn)生一個(gè)獨(dú)立序列號(hào)来氧,生產(chǎn)者在發(fā)送消息時(shí)會(huì)給每條消息添加一個(gè)序列號(hào)诫给。broker端緩存了已經(jīng)提交消息的序列號(hào),只有比緩存分區(qū)中最后提交消息的序列號(hào)大1的消息才會(huì)被接受饲漾,其他會(huì)被拒絕蝙搔。
生產(chǎn)端消息發(fā)送流程的冪等處理
下面簡(jiǎn)單介紹下支持冪等的消息發(fā)送端工作流程
生產(chǎn)端通過(guò)Kafkaproducer會(huì)將數(shù)據(jù)添加到RecordAccumulator中,數(shù)據(jù)添加時(shí)會(huì)判斷是否需要新建一個(gè)ProducerBatch考传。
生產(chǎn)端后臺(tái)啟動(dòng)發(fā)送線(xiàn)程,會(huì)判斷當(dāng)前的PID是否需要重置证鸥,重置的原因是因?yàn)槟承┫⒎謪^(qū)的batch重試多次仍然失敗最后因?yàn)槌瑫r(shí)而被移除僚楞,這個(gè)時(shí)候序列號(hào)無(wú)法連續(xù),導(dǎo)致后續(xù)消息無(wú)法發(fā)送枉层,因此會(huì)重置PID泉褐,并將相關(guān)緩存信息清空,這個(gè)時(shí)候消息會(huì)丟失鸟蜡。
發(fā)送線(xiàn)程判斷是否需要新申請(qǐng)PID膜赃,如果需要?jiǎng)t會(huì)阻塞直到獲取到PID信息。
發(fā)送線(xiàn)程在調(diào)用sendProducerData()方法發(fā)送數(shù)據(jù)時(shí)揉忘,會(huì)進(jìn)行以下判斷:
判斷主題分區(qū)是否可以繼續(xù)發(fā)送跳座、PID是否有效端铛、如果是重試batch需要判斷之前的batch是否發(fā)送完成,如果沒(méi)有發(fā)送完成則會(huì)跳過(guò)當(dāng)前主題分區(qū)的消息發(fā)送疲眷,直到前面的batch發(fā)送完成禾蚕。
如果對(duì)應(yīng)ProducerBatch沒(méi)有分配對(duì)應(yīng)的PID與序列號(hào)信息,則會(huì)在這里進(jìn)行設(shè)置狂丝。
服務(wù)端消息接受流程的冪等處理
服務(wù)端(broker)在收到生產(chǎn)端發(fā)送的數(shù)據(jù)寫(xiě)請(qǐng)求之后换淆,會(huì)進(jìn)行一些判斷來(lái)決定是否可以寫(xiě)入數(shù)據(jù),這里也主要介紹關(guān)于冪等相關(guān)的操作流程几颜。
- 如果請(qǐng)求設(shè)置了冪等特性倍试,則會(huì)檢查是否對(duì)ClusterResource有IdempotentWrite權(quán)限,如果沒(méi)有蛋哭,則會(huì)返回錯(cuò)誤CLUSTER_AUTHORIZATION_FAILED县习。
- 檢查是否有PID信息
- 根據(jù)batch的序列號(hào)檢查該batch是否重復(fù),服務(wù)端會(huì)緩存每個(gè)PID對(duì)應(yīng)主題分區(qū)的最近5個(gè)batch信息具壮,如果有重復(fù)准颓,則直接返回寫(xiě)入成功,但是不會(huì)執(zhí)行真正的數(shù)據(jù)寫(xiě)入操作棺妓。
- 如果有PID且非重復(fù)batch攘已,則進(jìn)行以下操作:
- 判斷該P(yáng)ID是否已經(jīng)存在緩存中。
- 如果不存在則判斷序列號(hào)是否是從0開(kāi)始怜跑,如果是則表示為新的PID样勃,在緩存中記錄PID的信息(包括PID、epoch以及序列號(hào)信息)性芬,然后執(zhí)行數(shù)據(jù)寫(xiě)入操作峡眶;如果不存在但是序列號(hào)不是從0開(kāi)始,則直接返回錯(cuò)誤植锉,表示PID在服務(wù)端以及過(guò)期或者PID寫(xiě)的數(shù)據(jù)已經(jīng)過(guò)期辫樱。
- 如果PID存在,則會(huì)檢查PID的epoch版本是否與服務(wù)端一致俊庇,如果不一致且序列號(hào)不是從0開(kāi)始狮暑,則返回錯(cuò)誤。如果epoch不一致但是序列號(hào)是從0開(kāi)始辉饱,則可以正常寫(xiě)入搬男。
- 如果epoch版本一致,則會(huì)查詢(xún)緩存中最近一次序列號(hào)是否連續(xù)彭沼,不連續(xù)則會(huì)返回錯(cuò)誤缔逛,否則正常寫(xiě)入。
2.6. Consumer
假設(shè)這么個(gè)場(chǎng)景:我們從 Kafka 中讀取消息,并且進(jìn)行檢查褐奴,最后產(chǎn)生結(jié)果數(shù)據(jù)按脚。
我們可以創(chuàng)建一個(gè)消費(fèi)者實(shí)例去做這件事情,但如果生產(chǎn)者寫(xiě)入消息的速度比消費(fèi)者讀取的速度快怎么辦呢歉糜?
這樣隨著時(shí)間增長(zhǎng)乘寒,消息堆積越來(lái)越嚴(yán)重。對(duì)于這種場(chǎng)景匪补,我們需要增加多個(gè)消費(fèi)者來(lái)進(jìn)行水平擴(kuò)展伞辛。
Kafka 消費(fèi)者是消費(fèi)組的一部分,當(dāng)多個(gè)消費(fèi)者形成一個(gè)消費(fèi)組來(lái)消費(fèi)主題時(shí)夯缺,每個(gè)消費(fèi)者會(huì)收到不同分區(qū)的消息蚤氏。
假設(shè)有一個(gè) T1 主題,該主題有 4 個(gè)分區(qū)踊兜;同時(shí)我們有一個(gè)消費(fèi)組 G1竿滨,這個(gè)消費(fèi)組只有一個(gè)消費(fèi)者 C1。那么消費(fèi)者 C1 將會(huì)收到這 4 個(gè)分區(qū)的消息捏境。如果我們?cè)黾有碌南M(fèi)者 C2 到消費(fèi)組 G1于游,那么每個(gè)消費(fèi)者將會(huì)分別收到兩個(gè)分區(qū)的消息。相當(dāng)于 T1 Topic 內(nèi)的 Partition 均分給了 G1 消費(fèi)的所有消費(fèi)者垫言,在這里 C1 消費(fèi) P0 和 P2贰剥,C2 消費(fèi)P1 和 P3。
如果增加到 4 個(gè)消費(fèi)者筷频,那么每個(gè)消費(fèi)者將會(huì)分別收到一個(gè)分區(qū)的消息蚌成。這時(shí)候每個(gè)消費(fèi)者都處理其中一個(gè)分區(qū),滿(mǎn)負(fù)載運(yùn)行凛捏。
但如果我們繼續(xù)增加消費(fèi)者到這個(gè)消費(fèi)組担忧,剩余的消費(fèi)者將會(huì)空閑,不會(huì)收到任何消息坯癣。
總而言之瓶盛,我們可以通過(guò)增加消費(fèi)組的消費(fèi)者來(lái)進(jìn)行水平擴(kuò)展提升消費(fèi)能力。
這也是為什么建議創(chuàng)建主題時(shí)使用比較多的分區(qū)數(shù),這樣可以在消費(fèi)負(fù)載高的情況下增加消費(fèi)者來(lái)提升性能。
另外诞丽,消費(fèi)者的數(shù)量不應(yīng)該比分區(qū)數(shù)多磅轻,因?yàn)槎喑鰜?lái)的消費(fèi)者是空閑的,沒(méi)有任何幫助吵取。
如果我們的 C1 處理消息仍然還有瓶頸禽额,我們?nèi)绾蝺?yōu)化和處理?
把 C1 內(nèi)部的消息進(jìn)行二次 Sharding,開(kāi)啟多個(gè)Goroutine Worker 進(jìn)行消費(fèi)脯倒,為了保障 Offset 提交的正確性实辑,需要使用 WaterMark 機(jī)制,保障最小的 Offset 保存藻丢,才能往 Broker 提交剪撬。
2.7. Consumer Group
Kafka 一個(gè)很重要的特性就是,只需寫(xiě)入一次消息悠反,可以支持任意多的應(yīng)用讀取這
個(gè)消息残黑。
使用Consumer high level API時(shí),同一Topic的一條消息只能被同一個(gè)Consumer Group內(nèi)的一個(gè)Consumer消費(fèi)斋否,但多個(gè)Consumer Group可同時(shí)消費(fèi)這一消息梨水。
這是Kafka用來(lái)實(shí)現(xiàn)一個(gè)Topic消息的廣播(發(fā)給所有的Consumer)和單播(發(fā)給某一個(gè)Consumer)的手段。一個(gè)Topic可以對(duì)應(yīng)多個(gè)Consumer Group茵臭。如果需要實(shí)現(xiàn)廣播疫诽,只要每個(gè)Consumer有一個(gè)獨(dú)立的Group就可以了。要實(shí)現(xiàn)單播只要所有的Consumer在同一個(gè)Group里旦委。用Consumer Group還可以將Consumer進(jìn)行自由的分組而不需要多次發(fā)送消息到不同的Topic奇徒。 下面這個(gè)例子更清晰地展示了Kafka Consumer Group的特性。首先創(chuàng)建一個(gè)Topic (名為topic1缨硝,包含3個(gè)Partition)摩钙,然后創(chuàng)建一個(gè)屬于group1的Consumer實(shí)例,并創(chuàng)建三個(gè)屬于group2的Consumer實(shí)例追葡,最后通過(guò)Producer向topic1發(fā)送key分別為1腺律,2,3的消息宜肉。結(jié)果發(fā)現(xiàn)屬于group1的Consumer收到了所有的這三條消息匀钧,同時(shí)group2中的3個(gè)Consumer分別收到了key為1,2谬返,3的消息之斯。如下圖所示。
2.7.1 Rebalance
可以看到遣铝,當(dāng)新的消費(fèi)者加入消費(fèi)組佑刷,它會(huì)消費(fèi)一個(gè)或多個(gè)分區(qū),而這些分區(qū)之前是由其他消費(fèi)者負(fù)責(zé)的酿炸。另外瘫絮,當(dāng)消費(fèi)者離開(kāi)消費(fèi)組(比如重啟、宕機(jī)等)時(shí)填硕,它所消費(fèi)的分區(qū)會(huì)分配給其他分區(qū)麦萤。這種現(xiàn)象稱(chēng)為重平衡(Rebalance)鹿鳖。
重平衡是 Kafka 一個(gè)很重要的性質(zhì),這個(gè)性質(zhì)保證了高可用和水平擴(kuò)展壮莹。不過(guò)也需要注意到翅帜,在重平衡期間,所有消費(fèi)者都不能消費(fèi)消息命满,因此會(huì)造成整個(gè)消費(fèi)組短暫的不可用涝滴。
而且,將分區(qū)進(jìn)行重平衡也會(huì)導(dǎo)致原來(lái)的消費(fèi)者狀態(tài)過(guò)期胶台,從而導(dǎo)致消費(fèi)者需要重新更新?tīng)顟B(tài)歼疮,這段期間也會(huì)降低消費(fèi)性能。
消費(fèi)者通過(guò)定期發(fā)送心跳(Hearbeat)到一個(gè)作為組協(xié)調(diào)者(Group Coordinator)的 Broker 來(lái)保持在消費(fèi)組內(nèi)存活概作。這個(gè) Broker 不是固定的腋妙,每個(gè)消費(fèi)組都可能不同。當(dāng)消費(fèi)者拉取消息或者提交時(shí)讯榕,便會(huì)發(fā)送心跳骤素。如果消費(fèi)者超過(guò)一定時(shí)間沒(méi)有發(fā)送心跳,那么它的會(huì)話(huà)(Session)就會(huì)過(guò)期愚屁,組協(xié)調(diào)者會(huì)認(rèn)為該消費(fèi)者已經(jīng)宕機(jī)济竹,然后觸發(fā)重平衡。
可以看到霎槐,從消費(fèi)者宕機(jī)到會(huì)話(huà)過(guò)期是有一定時(shí)間的送浊,這段時(shí)間內(nèi)該消費(fèi)者的分區(qū)都不能進(jìn)行消息消費(fèi)。通常情況下丘跌,我們可以進(jìn)行優(yōu)雅關(guān)閉袭景,這樣消費(fèi)者會(huì)發(fā)送離開(kāi)的消息到組協(xié)調(diào)者,這樣組協(xié)調(diào)者可以立即進(jìn)行重平衡而不需要等待會(huì)話(huà)過(guò)期闭树。
在 0.10.1 版本耸棒,Kafka 對(duì)心跳機(jī)制進(jìn)行了修改,將發(fā)送心跳與拉取消息進(jìn)行分離报辱,這樣使得發(fā)送心跳的頻率不受拉取的頻率影響与殃。
另外更高版本的 Kafka 支持配置一個(gè)消費(fèi)者多長(zhǎng)時(shí)間不拉取消息但仍然保持存活,這個(gè)配置可以避免活鎖(livelock)碍现》郏活鎖,是指應(yīng)用沒(méi)有故障但是由于某些原因不能進(jìn)一步消費(fèi)昼接。
但是活鎖也很容易導(dǎo)致連鎖故障爽篷,當(dāng)消費(fèi)端下游的組件性能退化,那么消息消費(fèi)會(huì)變的很慢慢睡,會(huì)很容易出發(fā)livelock 的重新均衡機(jī)制狼忱,反而影響吞吐膨疏。
2.8. Push vs. Pull
作為一個(gè)消息系統(tǒng),Kafka遵循了傳統(tǒng)的方式钻弄,選擇由Producer向broker push消息并由Consumer從broker pull消息。一些logging-centric system者吁,比如Facebook的Scribe和Cloudera的Flume窘俺,采用push模式。事實(shí)上复凳,push模式和pull模式各有優(yōu)劣瘤泪。
push模式
很難適應(yīng)消費(fèi)速率不同的消費(fèi)者,因?yàn)橄l(fā)送速率是由broker決定的育八。push模式的目標(biāo)是盡可能以最快速度傳遞消息对途,但是這樣很容易造成Consumer來(lái)不及處理消息,典型的表現(xiàn)就是拒絕服務(wù)以及網(wǎng)絡(luò)擁塞髓棋。而pull模式
則可以根據(jù)Consumer的消費(fèi)能力以適當(dāng)?shù)乃俾氏M(fèi)消息实檀。
對(duì)于Kafka而言,pull模式更合適按声。pull模式可簡(jiǎn)化broker的設(shè)計(jì)膳犹,Consumer可自主控制消費(fèi)消息的速率,同時(shí)Consumer可以自己控制消費(fèi)方式——即可批量消費(fèi)也可逐條消費(fèi)签则,同時(shí)還能選擇不同的提交方式從而實(shí)現(xiàn)不同的傳輸語(yǔ)義须床。
2.9. Kafka消息交付的保證性
有這么幾種可能的消息交付的保證性(delivery guarantee):
-
At most once
消息可能會(huì)丟,但絕不會(huì)重復(fù)傳輸 -
At least one
消息絕不會(huì)丟渐裂,但可能會(huì)重復(fù)傳輸 -
Exactly once
每條消息肯定會(huì)被傳輸一次且僅傳輸一次豺旬,很多時(shí)候這是用戶(hù)所想要的
2.9.1 At most once
-
讀完消息先commit再處理消息。這種模式下柒凉,如果Consumer在commit后還沒(méi)來(lái)得及處理消息就crash了族阅,下次重新開(kāi)始工作后就無(wú)法讀到剛剛已提交而未處理的消息,這就對(duì)應(yīng)于
at most once
(消息會(huì)丟扛拨,但不重復(fù))耘分。
2.9.2 At least one
- 當(dāng)
Producer
向Broker
發(fā)送數(shù)據(jù)后,會(huì)進(jìn)行 commit绑警,如果commit成功求泰,由于Replica
副本機(jī)制的存在,則意味著消息不會(huì)丟失计盒。但是Producer
發(fā)送數(shù)據(jù)給Broker
后渴频,遇到網(wǎng)絡(luò)問(wèn)題而造成通信中斷,那么Producer
就無(wú)法準(zhǔn)確判斷該消息是否已經(jīng)被提交(commit)北启,這就可能造成at least once
(消息絕不會(huì)丟卜朗,但可能會(huì)重復(fù)傳輸)拔第。 -
讀完消息先處理再commit。這種模式下场钉,如果在處理完消息之后commit之前Consumer crash了蚊俺,下次重新開(kāi)始工作時(shí)還會(huì)處理剛剛未commit的消息,實(shí)際上該消息已經(jīng)被處理過(guò)了逛万。這就對(duì)應(yīng)于
at least once
(消息不丟泳猬,但被多次重復(fù)處理)。
2.9.3 Exactly once
-
在 Kafka 0.11.0.0 之前宇植, 如果
Producer
沒(méi)有收到消息 commit 的響應(yīng)結(jié)果得封,它只能重新發(fā)送消息,確保消息已經(jīng)被正確的傳輸?shù)?Broker
指郁,重新發(fā)送的時(shí)候會(huì)將消息再次寫(xiě)入日志中忙上;而在 0.11.0.0 版本之后,**Producer
支持冪等傳遞選項(xiàng)闲坎,保證重新發(fā)送不會(huì)導(dǎo)致消息在日志出現(xiàn)重復(fù)**疫粥。為了實(shí)現(xiàn)這個(gè),Broker
為Producer
分配了一個(gè)ID,發(fā)往同一 Partition 的消息會(huì)附帶Sequence Number箫柳。并通過(guò)每條消息的序列號(hào)進(jìn)行去重手形。也支持了類(lèi)似事務(wù)語(yǔ)義來(lái)保證將消息發(fā)送到多個(gè)Topic
分區(qū)中,保證所有消息要么都寫(xiě)入成功悯恍,要么都失敗库糠,這個(gè)主要用在Topic
之間的exactly once
(每條消息肯定會(huì)被傳輸一次且僅傳輸一次)。其中啟用冪等傳遞的方法配置:
enable.idempotence = true
涮毫。啟用事務(wù)支持的方法配置:設(shè)置屬性
transcational.id = "指定值"
瞬欧。
3. Kafka高可用設(shè)計(jì)
3.1 Replication
Kafka 在0.8以前的版本中,并不提供 HA 機(jī)制罢防,一旦一個(gè)或多個(gè) Broker 宕機(jī)艘虎,則宕機(jī)期間其上所有 Partition 都無(wú)法繼續(xù)提供服務(wù)。若該 Broker 永遠(yuǎn)不能再恢復(fù)咒吐,亦或磁盤(pán)故障野建,則其上數(shù)據(jù)將丟失。
在沒(méi)有 Replication 的情況下恬叹,一旦某機(jī)器宕機(jī)或者某個(gè) Broker 停止工作則會(huì)造成整個(gè)系統(tǒng)的可用性降低候生。隨著集群規(guī)模的增加,整個(gè)集群中出現(xiàn)該類(lèi)異常的幾率大大增加绽昼,因此對(duì)于生產(chǎn)系統(tǒng)而言 Replication 機(jī)制的引入非常重要唯鸭。
為了更好的做負(fù)載均衡,Kafka盡量將所有的Partition均勻分配到整個(gè)集群上硅确。一個(gè)典型的部署方式是一個(gè)Topic的Partition數(shù)量大于Broker的數(shù)量目溉。同時(shí)為了提高Kafka的容錯(cuò)能力明肮,也需要將同一個(gè)Partition的Replica盡量分散到不同的機(jī)器。實(shí)際上缭付,如果所有的Replica都在同一個(gè)Broker上柿估,那一旦該Broker宕機(jī),該P(yáng)artition的所有Replica都無(wú)法工作蛉腌,也就達(dá)不到HA的效果官份。同時(shí),如果某個(gè)Broker宕機(jī)了烙丛,需要保證它上面的負(fù)載可以被均勻的分配到其它幸存的所有Broker上。
Kafka分配Replica的算法如下:
- 將所有Broker(假設(shè)共n個(gè)Broker)和待分配的Partition排序
- 將第i個(gè)Partition分配到第(i mod n)個(gè)Broker上
- 將第i個(gè)Partition的第j個(gè)Replica分配到第((i + j) mod n)個(gè)Broker上
Kafka的Data Replication 需要解決如下問(wèn)題:
- 怎樣傳播消息
- 在向Producer發(fā)送ACK前需要保證有多少個(gè)Replica已經(jīng)收到該消息
- 怎樣處理某個(gè)Replica不工作的情況
- 怎樣處理Failed Replica恢復(fù)回來(lái)的情況
3.2 怎樣傳播消息
- Producer在發(fā)布消息到某個(gè)Partition時(shí)羔味,先通過(guò) Metadata (通過(guò) Broker 獲取并且緩存在 Producer 內(nèi)) 找到該 Partition 的Leader河咽,Producer只將該消息發(fā)送到該P(yáng)artition的Leader。 Leader會(huì)將該消息寫(xiě)入其本地Log赋元。
- 每個(gè)Follower都從Leader pull數(shù)據(jù)忘蟹。Follower存儲(chǔ)的數(shù)據(jù)順序與Leader保持一致。Follower在收到該消息后搁凸,立即向Leader發(fā)送ACK媚值, 而后將數(shù)據(jù)寫(xiě)入其Log。
- 一旦Leader收到了ISR中的所有Replica的ACK护糖,該消息就被認(rèn)為已經(jīng)commit了褥芒,Leader將增加HW并且向Producer發(fā)送ACK。
為了提高性能嫡良,每個(gè)Follower在接收到數(shù)據(jù)后就立馬向Leader發(fā)送ACK锰扶,而非等到數(shù)據(jù)寫(xiě)入Log中。因此寝受,對(duì)于已經(jīng)commit的消息坷牛,Kafka只能保證它被存于多個(gè)Replica的內(nèi)存中,而不能保證它們被持久化到磁盤(pán)中很澄,也就不能完全保證異常發(fā)生后該條消息一定能被Consumer消費(fèi)京闰。但考慮到這種場(chǎng)景非常少見(jiàn),可以認(rèn)為這種方式在性能和數(shù)據(jù)持久化上做了一個(gè)比較好的平衡甩苛。在將來(lái)的版本中蹂楣,Kafka會(huì)考慮提供更高的持久性。
Consumer讀消息也是從Leader讀取浪藻,只有被commit過(guò)的消息(offset低于HW的消息)才會(huì)暴露給Consumer捐迫。
Kafka Replication的數(shù)據(jù)流如下圖所示:3.3 向Producer發(fā)送ACK前需要保證有多少個(gè)Replica已經(jīng)收到該消息
Kafka處理失敗需要明確定義一個(gè)Broker是否“活著”。對(duì)于Kafka而言爱葵,Kafka存活包含兩個(gè)條件:
- 它必須維護(hù)與Zookeeper的session(這個(gè)通過(guò)Zookeeper的Heartbeat機(jī)制來(lái)實(shí)現(xiàn))
- 從副本的最后一條消息的 Offset 需要與主副本的最后一條消息 Offset 差值不超過(guò)設(shè)定閾值(replica.lag.max.messages)或者副本的 LEO 落后于主副本的 LEO 時(shí)長(zhǎng)不大于設(shè)定閾值(replica.lag.time.max.ms)施戴,官方推薦使用后者判斷反浓,并在新版本 Kafka0.10.0 移除了replica.lag.max.messages 參數(shù)。
Leader會(huì)跟蹤與其保持同步的Replica列表赞哗,該列表稱(chēng)為ISR(即in-sync Replica)
雷则。如果一個(gè)Follower宕機(jī),或者落后太多肪笋,Leader將把它從ISR中移除月劈。當(dāng)其再次滿(mǎn)足以上條件之后又會(huì)被重新加入集合中。
ISR 的引入主要是解決同步副本與異步復(fù)制兩種方案各自的缺陷:
- 同步副本中如果有個(gè)副本宕機(jī)或者超時(shí)就會(huì)拖慢該副本組的整體性能藤乙。
- 如果僅僅使用異步副本猜揪,當(dāng)所有的副本消息均遠(yuǎn)落后于主副本時(shí),一旦主副本宕機(jī)重新選舉坛梁,那么就會(huì)存在消息丟失情況而姐。
Follower可以批量的從Leader復(fù)制數(shù)據(jù),這樣極大的提高復(fù)制性能(批量寫(xiě)磁盤(pán))划咐,極大減少了Follower與Leader的差距
一條消息只有被ISR里的所有Follower都從Leader復(fù)制過(guò)去才會(huì)被認(rèn)為已提交拴念。這樣就避免了部分?jǐn)?shù)據(jù)被寫(xiě)進(jìn)了Leader,還沒(méi)來(lái)得及被任何Follower復(fù)制就宕機(jī)了褐缠,而造成數(shù)據(jù)丟失(Consumer無(wú)法消費(fèi)這些數(shù)據(jù))政鼠。而對(duì)于Producer而言,它可以選擇是否等待消息commit队魏,這可以通過(guò)request.required.acks
來(lái)設(shè)置公般。這種機(jī)制確保了只要ISR有一個(gè)或以上的Follower,一條被commit的消息就不會(huì)丟失器躏。
3.4 主從數(shù)據(jù)同步流程詳解
初始時(shí) Leader 和 Follower 的 HW(High Watermark)
和 LEO(Log End Offset)
都是0俐载。Leader 中的 remote LEO 指的就是Leader 端保存的 follower LEO,也被初始化成 0登失。
Follower 發(fā)送 Fetch 請(qǐng)求在 Leader 處理完 Producer 請(qǐng)求之后。Producer 給該 Topic 分區(qū)發(fā)送了一條消息。
- 把消息寫(xiě)入寫(xiě)底層 Log(同時(shí)也就自動(dòng)地更新了 Leader 的 LEO)账月。
- 嘗試更新 Leader HW 值综膀。我們已經(jīng)假設(shè)此時(shí) Follower 尚未發(fā)送 Fetch 請(qǐng)求,那么 Leader 端保存的 remote LEO 依然是0局齿,因此 Leader 會(huì)比較它自己的 LEO 值和 remote LEO 值剧劝,發(fā)現(xiàn)最小值是 0,與當(dāng)前 HW 值相同抓歼,故不會(huì)更新分區(qū) HW 值讥此。
本例中當(dāng) Follower 發(fā)送 Fetch 請(qǐng)求時(shí)取胎,Leader 端的處理依次是:
? 讀取底層 Log 數(shù)據(jù)。
? 更新 remote LEO = 0(為什么是 0湃窍? 因?yàn)榇藭r(shí) Follower 還沒(méi)有寫(xiě)入這條消息。Leader 如何
確認(rèn) Follower 還未寫(xiě)入呢匪傍?這是通過(guò) Follower 發(fā)來(lái)的 Fetch 請(qǐng)求中的 Fetch Offset 來(lái)確定
的)您市。
? 嘗試更新分區(qū) HW —— 此時(shí) Leader LEO = 1,remote LEO = 0役衡,故分區(qū) HW 值= min(leader
LEO, follower remote LEO) = 0茵休。
? 把數(shù)據(jù)和當(dāng)前分區(qū) HW 值(依然是0)發(fā)送給 Follower 副本。
而 Follower 副本接收到 Fetch Response 后依次執(zhí)行下列操作:
? 寫(xiě)入本地 Log(同時(shí)更新 Follower LEO)手蝎。
? 更新 Follower HW —— 比較本地 LEO 和當(dāng)前 Leader HW 取小者榕莺,故 Follower HW = 0。
Follower 發(fā)來(lái)了第二輪 Fetch 請(qǐng)求吨述,Leader 端接收到后仍然會(huì)依次執(zhí)行下列操作:
? 讀取底層 Log 數(shù)據(jù)岩睁。
? 更新 remote LEO = 1(這次為什么是1了? 因?yàn)檫@輪 FETCH RPC 攜帶的 Fetch Offset 是1揣云,那么為什么這輪攜帶的就是1了呢捕儒,因?yàn)樯弦惠喗Y(jié)束后 Follower LEO 被更新為1了)。
? 嘗試更新分區(qū) HW —— 此時(shí) Leader LEO = 1邓夕,remote LEO = 1刘莹,故分區(qū) HW 值= min(leader LEO, follower remote LEO) = 1阎毅。
? 把數(shù)據(jù)(實(shí)際上沒(méi)有數(shù)據(jù))和當(dāng)前分區(qū) HW 值(已更新為1)發(fā)送給 Follower 副本。
同樣地栋猖,F(xiàn)ollower 副本接收到 Fetch Response 后依次執(zhí)行下列操作:
? 寫(xiě)入本地 Log净薛,當(dāng)然沒(méi)東西可寫(xiě),故 Follower LEO 也不會(huì)變化蒲拉,依然是1肃拜。
? 更新 Follower HW —— 比較本地 LEO 和當(dāng)前 Leader HW 取小者。由于此時(shí)兩者都是1雌团,故更新 Follower HW = 1 燃领。
? Producer 端發(fā)送消息后 Broker 端完整的處理流程就講完了。此時(shí)消息已經(jīng)成功地被復(fù)制到Leader 和 Follower 的 Log 中且分區(qū) HW 是1锦援,表明 Consumer 能夠消費(fèi) offset = 0 的這條消息猛蔽。下面我們來(lái)分析下 Produce 和 Fetch 請(qǐng)求交互的第二種情況。
第二種情況:Fetch 請(qǐng)求保存在 purgatory 中 Produce 請(qǐng)求到來(lái)灵寺。
這種情況實(shí)際上和第一種情況差不多曼库。前面說(shuō)過(guò)了,當(dāng) Leader 無(wú)法立即滿(mǎn)足 Fetch 返回要求的時(shí)候(比如沒(méi)有數(shù)據(jù))略板,那么該 Fetch 請(qǐng)求會(huì)被暫存到 Leader 端的purgatory 中毁枯,待時(shí)機(jī)成熟時(shí)會(huì)嘗試再次處理它。不過(guò) Kafka 不會(huì)無(wú)限期地將其緩存著叮称,默認(rèn)有個(gè)超時(shí)時(shí)間(500ms)种玛,一旦超時(shí)時(shí)間已過(guò),則這個(gè)請(qǐng)求會(huì)被強(qiáng)制完成瓤檐。不過(guò)我們要討論的場(chǎng)景是在寄存期間赂韵,Producer 發(fā)送 Produce 請(qǐng)求從而使之滿(mǎn)足了條件從而被喚醒。
此時(shí)挠蛉,Leader 端處理流程如下:
- Leader 寫(xiě)入本地 Log(同時(shí)自動(dòng)更新 Leader LEO)祭示。
- 嘗試喚醒在 purgatory 中寄存的 Fetch 請(qǐng)求。
- 嘗試更新分區(qū) HW碌秸。
數(shù)據(jù)丟失場(chǎng)景(更新了LEO绍移,但未更新HW時(shí),主從先后故障)
初始情況為主副本 A 已經(jīng)寫(xiě)入了兩條消息讥电,對(duì)應(yīng) HW=1蹂窖,LEO=2,LEOB=1恩敌,從副本 B 寫(xiě)入了一條消息瞬测,對(duì)應(yīng)HW=1,LEO=1。- 此時(shí)從副本 B 向主副本 A 發(fā)起 fetchOffset=1 請(qǐng)求月趟,主副本收到請(qǐng)求之后更新LEOB=1灯蝴,表示副本 B 已經(jīng)收到了消息0,然后嘗試更新 HW 值孝宗,in(LEO,LEOB)=1穷躁,即不需要更新,然后將消息1以及當(dāng)前分區(qū) HW=1 返回給從副本 B因妇,從副本 B 收到響應(yīng)之后寫(xiě)入日志并更新LEO=2问潭,然后更新其 HW=1,雖然已經(jīng)寫(xiě)入了兩條消息婚被,但是 HW 值需要在下一輪的請(qǐng)求才會(huì)更新為2狡忙。
- 此時(shí)從副本 B 重啟,重啟之后會(huì)根據(jù) HW 值進(jìn)行日志截?cái)嘀沸荆聪?會(huì)被刪除灾茁。
- 從副本 B 向主副本 A 發(fā)送 fetchOffset=1 請(qǐng)求,如果此時(shí)主副本 A 沒(méi)有什么異常谷炸,則跟第二步一樣沒(méi)有什么問(wèn)題北专,假設(shè)此時(shí)主副本也宕機(jī)了,那么從副本 B 會(huì)變成主副本旬陡。
- 當(dāng)副本 A 恢復(fù)之后會(huì)變成從副本并根據(jù) HW 值進(jìn)行日志截?cái)喽河啵窗严?1 丟失,此時(shí)消息 1 就永久丟失了季惩。
數(shù)據(jù)不一致場(chǎng)景(更新了LEO,但未更新HW時(shí)腻格,舊主故障画拾,從成為主并寫(xiě)入了新數(shù)據(jù),舊主恢復(fù)后成為從菜职,主從HW一致但數(shù)據(jù)不一致)
- 初始狀態(tài)為主副本 A 已經(jīng)寫(xiě)入了兩條消息對(duì)應(yīng)HW=1青抛,LEO=2,LEOB=1酬核,從副本 B 也同步了兩條消息蜜另,對(duì)應(yīng) HW=1,LEO=2嫡意。
- 此時(shí)從副本 B 向主副本發(fā)送 fetchOffset=2 請(qǐng)求举瑰,主副本 A 在收到請(qǐng)求后更新分區(qū) HW=2 并將該值返回給從副本 B,如果此時(shí)從副本 B 宕機(jī)則會(huì)導(dǎo)致HW 值寫(xiě)入失敗蔬螟。
- 我們假設(shè)此時(shí)主副本 A 也宕機(jī)了此迅,從副本 B 先恢復(fù)并成為主副本,此時(shí)會(huì)發(fā)生日志截?cái)啵槐A粝?0耸序,然后對(duì)外提供服務(wù)忍些,假設(shè)外部寫(xiě)入了一個(gè)消息 1(這個(gè)消息與之前的消息 1不一樣,用不同的顏色標(biāo)識(shí)不同消息)坎怪。
- 等副本 A 起來(lái)之后會(huì)變成從副本罢坝,不會(huì)發(fā)生日志截?cái)啵驗(yàn)?HW=2搅窿,但是對(duì)應(yīng)位移 1 的消息其實(shí)是不一致的嘁酿。
Leader Eepoch
為了解決數(shù)據(jù)丟失及數(shù)據(jù)不一致的問(wèn)題,在新版的 Kafka(0.11.0.0)引入了Leader Epoch 概念戈钢。
Leader Epoch 表示一個(gè)鍵值對(duì) <epoch, offset>痹仙,其中 Eepoch 表示 Leader 主副本的版本號(hào),從 0 開(kāi)始編碼殉了,當(dāng) Leader 每變更一次就會(huì)+1开仰,Offset 表示該 Epoch 版本的主副本寫(xiě)入第一條消息的位置。
比如 <0,0> 表示第一個(gè)主副本從位移 0 開(kāi)始寫(xiě)入消息薪铜,<1,100> 表示第二個(gè)主副本版本號(hào)為1并從位移 100 開(kāi)始寫(xiě)入消息众弓,主副本會(huì)將該信息保存在緩存中并定期寫(xiě)入到 CheckPoint 文件中,每次發(fā)生主副本切換都會(huì)去從緩存中查詢(xún)?cè)撔畔ⅰ?/p>
引入了Leader Eepoch后的數(shù)據(jù)丟失場(chǎng)景:
如圖所示隔箍,當(dāng)從副本 B 重啟之后向主副本 A 發(fā)送offsetsForLeaderEpochRequest谓娃,Epoch 主從副本相等,則 A 返回當(dāng)前的 LEO=2蜒滩,從副本 B 中沒(méi)有任何大于2 的位移滨达,因此不需要截?cái)唷?p>
- 當(dāng)從副本 B 向主副本 A 發(fā)送 fetchoffset=2 請(qǐng)求時(shí),A宕機(jī)俯艰,所以從副本 B 成為主副本捡遍,并更新 Epoch 值為<epoch=1, offset=2>逊桦,HW 值更新為 2锅移。
- 當(dāng) A 恢復(fù)之后成為從副本娶靡,并向 B 發(fā)送 fetcheOffset=2請(qǐng)求起胰,B 返回 HW=2汁果,則從副本 A 更新 HW=2雹拄。
- 主副本 B 接受外界的寫(xiě)請(qǐng)求艳狐,從副本 A 向主副本 A 不斷
發(fā)起數(shù)據(jù)同步請(qǐng)求聘殖。
從上可以看出引入 Leader Epoch 值之后避免了前面提到的數(shù)據(jù)丟失情況芹关,但是這里需要注意的是如果在上面的第一步续挟,從副本 B 起來(lái)之后向主副本 A 發(fā)送offsetsForLeaderEpochRequest 請(qǐng)求失敗,即主副本 A同時(shí)也宕機(jī)了侥衬,那么消息 1 就會(huì)丟失庸推,具體可見(jiàn)下面數(shù)據(jù)不一致場(chǎng)景中有提到常侦。
引入了Leader Eepoch后的數(shù)據(jù)不一致場(chǎng)景:
- 從副本 B 恢復(fù)之后向主副本 A 發(fā)送offsetsForLeaderEpochRequest 請(qǐng)求,由于主
副本也宕機(jī)了贬媒,因此副本 B 將變成主副本并將消息1 截?cái)嗔觯藭r(shí)接受到新消息 1 的寫(xiě)入。 - 副本 A 恢復(fù)之后變成從副本并向主副本 A 發(fā)送offsetsForLeaderEpochRequest 請(qǐng)求际乘,請(qǐng)求的Epoch 值小于主副本 B坡倔,因此主副本 B 會(huì)返回epoch=1 時(shí)的開(kāi)始位移,即 lastoffset=1脖含,因此從副本 A 會(huì)截?cái)嘞?1罪塔。
- 從副本 A 從主副本 B 拉取消息,并更新 Epoch 值<epoch=1, offset=1>养葵。
可以看出 Epoch 的引入可以避免數(shù)據(jù)不一致征堪,但是兩個(gè)副本均宕機(jī),則還是存在數(shù)據(jù)丟失的場(chǎng)景关拒。
3.5 Leader Election
引入 Replication 之后佃蚜,同一個(gè) Partition 可能會(huì)有多個(gè) Replica,而這時(shí)需要在這些Replication 之間選出一個(gè) Leader着绊,Producer 和 Consumer 只與這個(gè) Leader 交互谐算,其它 Replica 作為 Follower 從 Leader 中復(fù)制數(shù)據(jù)。
因?yàn)樾枰WC同一個(gè) Partition 的多個(gè) Replica 之間的數(shù)據(jù)一致性(其中一個(gè)宕機(jī)后其它 Replica必須要能繼續(xù)服務(wù)并且即不能造成數(shù)據(jù)重復(fù)也不能造成數(shù)據(jù)丟失)归露。
如果沒(méi)有一個(gè) Leader洲脂,所有 Replica 都可同時(shí)讀/寫(xiě)數(shù)據(jù),那就需要保證多個(gè) Replica 之間互相(N×N 條通路)同步數(shù)據(jù)剧包,數(shù)據(jù)的一致性和有序性非常難保證恐锦,大大增加了 Replication 實(shí)現(xiàn)的復(fù)雜性,同時(shí)也增加了出現(xiàn)異常的幾率疆液。而引入 Leader 后踩蔚,只有 Leader 負(fù)責(zé)數(shù)據(jù)讀寫(xiě),F(xiàn)ollower 只向 Leader 順序 Fetch 數(shù)據(jù)(N 條通路)枚粘,系統(tǒng)更加簡(jiǎn)單且高效。
由于 Kafka 集群依賴(lài) ZooKeeper 集群飘蚯,所以最簡(jiǎn)單最直觀的方案是馍迄,所有 Follower都在 ZooKeeper 上設(shè)置一個(gè) Watch,一旦 Leader 宕機(jī)局骤,其對(duì)應(yīng)的 Ephemeral Znode 會(huì)自動(dòng)刪除攀圈,此時(shí)所有 Follower 都嘗試創(chuàng)建該節(jié)點(diǎn),而創(chuàng)建成功者(ZooKeeper 保證只有一個(gè)能創(chuàng)建成功)即是新的 Leader峦甩,其它 Replica 即為Follower赘来。
前面的方案有以下缺點(diǎn):
- Split-Brain (腦裂): 這是由 ZooKeeper 的特性引起的现喳,雖然 ZooKeeper 能保證所有Watch 按順序觸發(fā),但并不能保證同一時(shí)刻所有 Replica “看”到的狀態(tài)是一樣的犬辰,這就可能造成不同 Replica 的響應(yīng)不一致 嗦篱。
- Herd Effect (羊群效應(yīng)): 如果宕機(jī)的那個(gè) Broker 上的 Partition 比較多,會(huì)造成多個(gè)Watch 被觸發(fā)幌缝,造成集群內(nèi)大量的調(diào)整灸促。
- ZooKeeper( 負(fù)載過(guò)重) : 每個(gè) Replica 都要為此在 ZooKeeper 上注冊(cè)一個(gè) Watch,當(dāng)集群規(guī)模增加到幾千個(gè) Partition 時(shí) ZooKeeper 負(fù)載會(huì)過(guò)重涵卵。
Controller
Kafka 的 Leader Election 方案解決了上述問(wèn)題浴栽,它在所有 Broker 中選出一個(gè)Controller,所有 Partition 的 Leader 選舉都由 Controller 決定轿偎。Controller 會(huì)將Leader 的改變直接通過(guò) RPC 的方式(比 ZooKeeper Queue 的方式更高效)通知需為此作為響應(yīng)的 Broker典鸡。
Kafka 集群 Controller 的選舉過(guò)程如下 :
- 每個(gè) Broker 都會(huì)在 Controller Path (/controller)上注冊(cè)一個(gè) Watch。
- 當(dāng)前 Controller 失敗時(shí)坏晦,對(duì)應(yīng)的 Controller Path 會(huì)自動(dòng)消失(因?yàn)樗?Ephemeral Node)萝玷,此時(shí)該 Watch 被 fire英遭,所有“活”著的 Broker 都會(huì)去競(jìng)選成為新的 Controller(創(chuàng)建新的Controller Path)汁尺,但是只會(huì)有一個(gè)競(jìng)選成功(這點(diǎn)由 ZooKeeper 保證)。
- 競(jìng)選成功者即為新的 Leader辽装,競(jìng)選失敗者則重新在新的 Controller Path 上注冊(cè) Watch。因?yàn)閆ooKeeper 的 Watch 是一次性的,被 fire 一次之后即失效肛度,所以需要重新注冊(cè)冠骄。
Kafka Partition Leader 的選舉過(guò)程如下 (由 Controller 執(zhí)行): - 從 ZooKeeper 中讀取當(dāng)前分區(qū)的所有 ISR(in-sync replicas)集合。
- 調(diào)用配置的分區(qū)選擇算法選擇分區(qū)的 Leader蟀给。
Kafka在Zookeeper中動(dòng)態(tài)維護(hù)了一個(gè)ISR(in-sync replicas),這個(gè)ISR里的所有Replica都跟上了leader,只有ISR里的成員才有被選為L(zhǎng)eader的可能拭卿。在這種模式下,對(duì)于f+1個(gè)Replica惠桃,一個(gè)Partition能在保證不丟失已經(jīng)commit的消息的前提下容忍f個(gè)Replica的失敗罐孝。在大多數(shù)使用場(chǎng)景中汹来,這種模式是非常有利的。事實(shí)上,為了容忍f個(gè)Replica的失敗酣溃,Majority Vote和ISR在commit前需要等待的Replica數(shù)量是一樣的,但是ISR需要的總的Replica的個(gè)數(shù)幾乎是Majority Vote的一半碘饼。
3.6. 如何處理所有Replica都不工作
上文提到,在ISR中至少有一個(gè)follower時(shí)钠绍,Kafka可以確保已經(jīng)commit的數(shù)據(jù)不丟失,但如果某個(gè)Partition的所有Replica都宕機(jī)了,就無(wú)法保證數(shù)據(jù)不丟失了。這種情況下有兩種可行的方案:
- 等待ISR中的任一個(gè)Replica“活”過(guò)來(lái)架曹,并且選它作為L(zhǎng)eader(強(qiáng)一致性奥邮,不可用時(shí)間相對(duì)較長(zhǎng))
- 選擇第一個(gè)“活”過(guò)來(lái)的Replica(不一定是ISR中的)作為L(zhǎng)eader(高可用性)
Kafka0.8.*使用了第二種方式脚粟。根據(jù)Kafka的文檔团南,在以后的版本中正歼,Kafka支持用戶(hù)通過(guò)配置選擇這兩種方式中的一種,從而根據(jù)不同的使用場(chǎng)景選擇高可用性還是強(qiáng)一致性萄唇。
3.7 broker故障恢復(fù)過(guò)程
- Controller在Zookeeper注冊(cè)Watch,一旦有Broker宕機(jī)(這是用宕機(jī)代表任何讓系統(tǒng)認(rèn)為其die的情景仲墨,包括但不限于機(jī)器斷電毒嫡,網(wǎng)絡(luò)不可用,GC導(dǎo)致的Stop The World兜畸,進(jìn)程crash等)努释,其在Zookeeper對(duì)應(yīng)的znode會(huì)自動(dòng)被刪除,Zookeeper會(huì)fire Controller注冊(cè)的watch咬摇,Controller讀取最新的幸存的Broker
- Controller決定set_p伐蒂,該集合包含了宕機(jī)的所有Broker上的所有Partition
- 對(duì)set_p中的每一個(gè)Partition執(zhí)行以下操作:
3.1. 從/brokers/topics/[topic]/partitions/[partition]/state
讀取該Partition
當(dāng)前的ISR
3.2. 決定該P(yáng)artition的新Leader。如果當(dāng)前ISR中有至少一個(gè)Replica還幸存,則選擇其中一個(gè)作為新Leader桥狡,新的ISR則包含當(dāng)前ISR中所有幸存的Replica。否則選擇該P(yáng)artition中任意一個(gè)幸存的Replica作為新的Leader以及ISR(該場(chǎng)景下可能會(huì)有潛在的數(shù)據(jù)丟失)。如果該P(yáng)artition的所有Replica都宕機(jī)了寄悯,則將新的Leader設(shè)置為-1。
3.3. 將新的Leader,ISR和新的leader_epoch及controller_epoch寫(xiě)入/brokers/topics/[topic]/partitions/[partition]/state慕购。注意涉馁,該操作只有其version在3.1至3.3的過(guò)程中無(wú)變化時(shí)才會(huì)執(zhí)行,否則跳轉(zhuǎn)到3.1 -
直接通過(guò)RPC向set_p相關(guān)的Broker發(fā)送LeaderAndISRRequest命令肴茄。Controller可以在一個(gè)RPC操作中發(fā)送多個(gè)命令從而提高效率。
4.kafka為什么高性能
架構(gòu)層面:
? Partition 級(jí)別并行:Broker、Disk、Consumer 端
? ISR:避免同步個(gè)別副本時(shí)拖慢整體副本組性能奏属,同時(shí)還能避免主從節(jié)點(diǎn)間數(shù)據(jù)落后過(guò)多導(dǎo)致的消息丟失
I/O 層面:
? Batch 讀寫(xiě):減少I(mǎi)/O次數(shù),增加吞吐量
? 磁盤(pán)順序 I/O:在某些情況下,順序磁盤(pán)訪(fǎng)問(wèn)比隨機(jī)內(nèi)存訪(fǎng)問(wèn)更快
? Page Cache:將Index及消息緩存到Page Cache中,提升處理效率
? Zero Copy:減少內(nèi)核態(tài)與用戶(hù)態(tài)之間的I/O次數(shù)
? 壓縮:log壓縮及消息壓縮宰睡,節(jié)省磁盤(pán)空間蒲凶,節(jié)省字節(jié)大小
References:
https://kafka.apache.org/documentation/#design
http://www.reibang.com/p/bde902c57e80
https://mp.weixin.qq.com/s/fX26tCdYSMgwM54_2CpVrw
https://zhuanlan.zhihu.com/p/27587872
https://mp.weixin.qq.com/s/X301soSDWRfOemQhk9AuPw
http://www.jasongj.com/2015/08/09/KafkaColumn1/
http://www.jasongj.com/2015/08/09/KafkaColumn2/
http://www.jasongj.com/2015/08/09/KafkaColumn3/
http://www.jasongj.com/2015/08/09/KafkaColumn4/
http://www.jasongj.com/2015/08/09/KafkaColumn5/
http://www.jasongj.com/2015/08/09/KafkaColumn6/
http://www.jasongj.com/2015/08/09/KafkaColumn7/
https://www.cnblogs.com/wxd0108/p/6519973.html
https://cloud.tencent.com/developer/article/1589157
https://zhuanlan.zhihu.com/p/459610418