中間件:Kafka
關(guān)鍵字:Kafka文件機(jī)制,Kafka分區(qū),Kafka數(shù)據(jù)可靠性劳坑,Kafka Ack等
注:本文是作者學(xué)習(xí)Kafka時(shí)得筆記和經(jīng)驗(yàn)總結(jié)
Kafka特性總覽:
- 消息隊(duì)列的性能好壞言津,其文件存儲(chǔ)機(jī)制設(shè)計(jì)是衡量一個(gè)消息隊(duì)列服務(wù)技術(shù)水平和最關(guān)鍵指標(biāo)之一。
-
Kafka的特性:
- 高吞吐量进副、低延遲 :kafka每秒可以處理幾十萬條消息这揣,它的延遲最低只有幾毫秒悔常,每個(gè)topic可以分多個(gè)partition, consumer group 對(duì)partition進(jìn)行consume操作。
- 可擴(kuò)展性:kafka 分區(qū)擴(kuò)展给赞,consumer 擴(kuò)展机打。
- 持久性、可靠性:消息被持久化到本地磁盤片迅,并且支持?jǐn)?shù)據(jù)備份防止數(shù)據(jù)丟失残邀。
- 容錯(cuò)性:允許集群中節(jié)點(diǎn)失敗(若副本數(shù)量為n,則允許n-1個(gè)節(jié)點(diǎn)失敻躺摺)芥挣。
- 高并發(fā):支持?jǐn)?shù)千個(gè)客戶端同時(shí)讀寫。
-
Kafka得使用場(chǎng)景:
- 日志收集:一個(gè)公司用Kafka可以收集各種服務(wù)的log耻台,通過kafka以統(tǒng)一接口服務(wù)的方式開放給各種consumer空免,例如hadoop、Solr,ES等盆耽。
- 消息系統(tǒng):解耦生產(chǎn)者和消費(fèi)者蹋砚,降低系統(tǒng)壓力、緩存消息等摄杂。
- 流式處理:比如Spark streaming和Storm
-
Kafka得設(shè)計(jì)思想:
-
Kafka的集群拓?fù)浣Y(jié)構(gòu) :
-
Kakfa Broker Leader的選舉:
① ??Kakfa Broker集群受Zookeeper管理析恢。所有的Kafka Broker節(jié)點(diǎn)一起去Zookeeper上注冊(cè)一個(gè)臨時(shí)節(jié)點(diǎn)墨坚,因?yàn)橹挥幸粋€(gè)Kafka Broker會(huì)注冊(cè)成功,其他的都會(huì)失敗映挂,所以這個(gè)成功在Zookeeper上注冊(cè)臨時(shí)節(jié)點(diǎn)的這個(gè)Kafka Broker會(huì)成為Kafka Broker Controller框杜,其他的Kafka broker叫Kafka Broker follower。(這個(gè)過程叫Controller在ZooKeeper注冊(cè)Watch)
② ??這個(gè)Controller會(huì)監(jiān)聽其他的Kafka Broker的所有信息袖肥,如果這個(gè)kafka broker controller宕機(jī)了咪辱,在zookeeper上面的那個(gè)臨時(shí)節(jié)點(diǎn)就會(huì)消失,所有的broker又會(huì)重復(fù)去注冊(cè)臨時(shí)節(jié)點(diǎn)。
③ ??leader會(huì)追蹤和維護(hù)ISR中所有follower的滯后狀態(tài)椎组。如果滯后太多(數(shù)量滯后和時(shí)間滯后兩個(gè)維度油狂,replica.lag.time.max.ms和replica.lag.max.message可配置),leader會(huì)把該replica從ISR中移除 ( follower 在 replica.lag.time.max.ms 時(shí)間內(nèi)一直落后 leader replica.lag.max.messages 條消息的時(shí)候才會(huì)被踢出)寸癌。被移除ISR的replica一直在追趕leader专筷。leader寫入數(shù)據(jù)后并不會(huì)commit,只有ISR列表中的所有folower同步之后才會(huì)commit蒸苇,把滯后的follower移除ISR主要是避免寫消息延遲磷蛹。
④ ??設(shè)置ISR主要是為了broker宕掉之后,重新選舉partition的leader從ISR列表中選擇溪烤。
⑤ ???? 所以,如果一個(gè)broker掛了槽驶,controller會(huì)讀取該broker上所有得分區(qū)在zookeeper上狀態(tài)责嚷,并選取ISR列表中得一個(gè)replica作為leader(如果ISR列表中的replica全掛,選一個(gè)不在ISR列表中的replica作為leader; 如果該partition的所有的replica都宕機(jī)了掂铐,則將新的leader設(shè)置為-1罕拂,等待恢復(fù),等待ISR中的任一個(gè)Replica“活”過來全陨,并且選它作為L(zhǎng)eader爆班;或選擇第一個(gè)“活”過來的Replica(不一定是ISR中)作為L(zhǎng)eader)。另外辱姨,controller還會(huì)通知 zookeeper 這個(gè)broker 宕機(jī)了柿菩,zookeeper會(huì)通知其他得broker。 - ??Consumer Rebalance的觸發(fā)條件 :
① Consumer增加或刪除會(huì)觸發(fā) Consumer Group的Rebalance
② Broker的增加或者減少會(huì)觸發(fā) Consumer Rebalance(分區(qū)副本發(fā)生改變) - ??Kafka工作流程-高級(jí)消費(fèi)者和低級(jí)消費(fèi)者:
① 高級(jí)消費(fèi):
Ⅰ 自動(dòng)負(fù)載均衡炮叶,高階消費(fèi)者為了簡(jiǎn)化編程碗旅,封裝了一系列 API渡处,這套 API 會(huì)均勻地將分區(qū)分配給消費(fèi)者 線程镜悉,消費(fèi)者消費(fèi)哪個(gè)分區(qū)不由消費(fèi)者決定,而是由高階 API 決定医瘫,如果有消費(fèi)者線程掛 掉了侣肄,高階 API 會(huì)檢測(cè)到,進(jìn)而進(jìn)行重新分配醇份。高階消費(fèi)者 API 將大部分功能已經(jīng)實(shí)現(xiàn)稼锅, 因此,編程者編寫高階消費(fèi)者的難度也隨之降低僚纷,不需要關(guān)注分區(qū)的分配矩距,只需要關(guān)注業(yè)務(wù)邏輯就行
Ⅱ 自動(dòng)提交offset:自動(dòng)提交時(shí),假設(shè) 1s 提交一次 offset 的更新怖竭,設(shè)當(dāng)前 offset=10锥债,當(dāng)消費(fèi)者消費(fèi)了 0.5s 的數(shù)據(jù),offset 移動(dòng)了 15痊臭,由于提交間隔為 1s哮肚,因此這一 offset 的更新并不會(huì)被提交,這時(shí)候我們寫的消費(fèi)者掛掉广匙,重啟后允趟,消費(fèi)者會(huì)去 ZooKeeper 上獲取讀取位置,獲取到的 offset 仍為 10鸦致,它就會(huì)重復(fù)消費(fèi)潮剪,這就是一個(gè)典型的重復(fù)消費(fèi)問題涣楷。
② 低級(jí)消費(fèi):對(duì)于低階消費(fèi)者就不再有分區(qū)到消費(fèi)者之間的 API 中間層了,由消費(fèi)者直接找到分區(qū)進(jìn)行消費(fèi)鲁纠,即消費(fèi)者通過 ZooKeeper 找到指定分區(qū)的 Leader 在哪個(gè) broker 上总棵。首先,在 ZooKeeper 中能夠找到 Kafka 所有 topic 的分區(qū)列表改含,并且可以找到指定分區(qū)的 Leader 在哪個(gè) broker 上情龄。 - ??Topic & Partition:
① Topic相當(dāng)于傳統(tǒng)消息系統(tǒng)MQ中的一個(gè)隊(duì)列queue,producer端發(fā)送的message必須指定是發(fā)送到哪個(gè)topic捍壤,但是不需要指定topic下的哪個(gè)partition骤视,因?yàn)閗afka會(huì)把收到的message進(jìn)行l(wèi)oad balance,均勻的分布在這個(gè)topic下的不同的partition上( hash(message) % [broker數(shù)量] )鹃觉。
② 物理存儲(chǔ)上专酗,這個(gè)topic會(huì)分成一個(gè)或多個(gè)partition,每個(gè)partiton相當(dāng)于是一個(gè)子queue盗扇。在物理結(jié)構(gòu)上祷肯,每個(gè)partition對(duì)應(yīng)一個(gè)物理的目錄(文件夾),文件夾命名是[topicname][partition][序號(hào)]疗隶,一個(gè)topic可以有無數(shù)多的partition佑笋,根據(jù)業(yè)務(wù)需求和數(shù)據(jù)量來設(shè)置。
③ 在kafka配置文件中可隨時(shí)更改num.partitions參數(shù)來配置更改topic的partition數(shù)量斑鼻,也可以通過命令修改分區(qū)數(shù)蒋纬。
④ 當(dāng)add a new partition的時(shí)候,partition里面的message不會(huì)重新進(jìn)行分配坚弱,原來的partition里面的message數(shù)據(jù)不會(huì)變,新加的這個(gè)partition剛開始是空的蜀备,隨后進(jìn)入這個(gè)topic的message就會(huì)重新參與所有partition的load balance。 - ??Partition Replica :
① 每個(gè)partition可以在其他的kafka broker節(jié)點(diǎn)上存副本荒叶,以便某個(gè)kafka broker節(jié)點(diǎn)宕機(jī)不會(huì)影響這個(gè)kafka集群碾阁。存replica副本的方式是按照kafka broker的順序存。
② replica副本數(shù)目不能大于kafka broker節(jié)點(diǎn)的數(shù)目些楣,否則報(bào)錯(cuò)脂凶。這里的replica數(shù)其實(shí)就是partition的副本總數(shù),其中包括一個(gè)leader戈毒,其他的就是copy副本 - ??Topic分配partition和partition replica的算法 :
① 將Broker(size=n)和待分配的Partition排序艰猬。
② 將第 i 個(gè)Partition分配到第(i%n)個(gè)Broker上。
③ 將第 i 個(gè)Partition的第 j 個(gè)Replica分配到第((i + j) % n)個(gè)Broker上埋市。
-
Kafka的集群拓?fù)浣Y(jié)構(gòu) :
- ??消息投遞可靠性 :
-
Partition ack :producer 生產(chǎn)消息被認(rèn)為完成時(shí)得確認(rèn)值:
① 0:不等待broker得確認(rèn)信息冠桃,延遲最小(消費(fèi)可靠性不能保證)
② 1:leader收到了確認(rèn)信息
③ -1:ISR列表中得所有replica都返回確認(rèn)消息
-
Partition ack :producer 生產(chǎn)消息被認(rèn)為完成時(shí)得確認(rèn)值:
-
Kafka高吞吐量 :
- Kafka的高吞吐量體現(xiàn)在讀寫上道宅,分布式并發(fā)的讀和寫都非呈程快胸蛛,寫的性能體現(xiàn)在以O(shè)(1)的時(shí)間復(fù)雜度進(jìn)行順序?qū)懭?/strong>。讀的性能體現(xiàn)在以O(shè)(1)的時(shí)間復(fù)雜度進(jìn)行順序讀取樱报,對(duì)topic進(jìn)行partition分區(qū)葬项,consume group中的consume線程可以以很高能性能進(jìn)行順序讀。
-
批量發(fā)送 :
- Kafka支持以消息集合為單位進(jìn)行批量發(fā)送迹蛤,以提高push效率民珍。
- push-and-pull : Kafka中的Producer和consumer采用的是push-and-pull模式,即Producer只管向broker push消息盗飒,consumer只管從broker pull消息嚷量,兩者對(duì)消息的生產(chǎn)和消費(fèi)是異步的。
- Kafka集群中broker之間的關(guān)系 : 不是主從關(guān)系逆趣,各個(gè)broker在集群中地位一樣蝶溶,我們可以隨意的增加或刪除任何一個(gè)broker節(jié)點(diǎn)。
- Producer采用異步push方式宣渗,極大提高Kafka系統(tǒng)的吞吐率(可以通過參數(shù)控制是采用同步還是異步方式)蕉朵。
-
順序保證性 :
- Kafka只能保證同一分區(qū)得消息順序性捣郊,并且需要消費(fèi)端單線程消費(fèi)究反。正常情況下舟奠,消費(fèi)端會(huì)多線程處理業(yè)務(wù)邏輯來提供系統(tǒng)性能,若是多線程消費(fèi)還想保證消息得順序性咐蝇,可以使用spring-kafka來實(shí)現(xiàn)涯鲁,讀者具體可自行查閱 spring-kafka得用法巷查。
kafka文件機(jī)制:
-
Partition Owner registry :
- 用來標(biāo)記partition正在被哪個(gè)consumer消費(fèi).臨時(shí)znode有序。此節(jié)點(diǎn)表達(dá)了"一個(gè)partition"只能被group下一個(gè)consumer消費(fèi),同時(shí)當(dāng)group下某個(gè)consumer失效,那么將會(huì)觸發(fā)負(fù)載均衡(即:讓partitions在多個(gè)consumer間均衡消費(fèi),接管那些"游離"的partitions)。
- ??持久化 :
- kafka使用文件存儲(chǔ)消息(append only log),這就直接決定kafka在性能上嚴(yán)重依賴文件系統(tǒng)的本身特性.且無論任何OS下,對(duì)文件系統(tǒng)本身的優(yōu)化是非常艱難的.文件緩存/直接內(nèi)存映射等是常用的手段.
- 因?yàn)閗afka是對(duì)日志文件進(jìn)行append操作,因此磁盤檢索的開支是較小的;同時(shí)為了減少磁盤寫入的次數(shù),broker會(huì)將消息暫時(shí)buffer起來,當(dāng)消息的個(gè)數(shù)(或尺寸)達(dá)到一定閥值時(shí),再flush到磁盤,這樣減少了磁盤IO調(diào)用的次數(shù).
- 對(duì)于kafka而言,較高性能的磁盤,將會(huì)帶來更加直接的性能提升岛请。
-
性能 :
- 除磁盤IO之外,我們還需要考慮網(wǎng)絡(luò)IO,這直接關(guān)系到kafka的吞吐量問題.kafka并沒有提供太多高超的技巧;對(duì)于producer端,可以將消息buffer起來,當(dāng)消息的條數(shù)達(dá)到一定閥值時(shí),批量發(fā)送給broker;對(duì)于consumer端也是一樣,批量fetch多條消息.不過消息量的大小可以通過配置文件來指定. 對(duì)于kafka broker端,sendfile系統(tǒng)調(diào)用可以潛在的提升網(wǎng)絡(luò)IO的性能 : 將文件的數(shù)據(jù)映射到系統(tǒng)內(nèi)存中,socket直接讀取相應(yīng)的內(nèi)存區(qū)域即可,而無需進(jìn)程再次copy和交換( 這里涉及到"磁盤IO數(shù)據(jù)"/"內(nèi)核內(nèi)存"/"進(jìn)程內(nèi)存"/"網(wǎng)絡(luò)緩沖區(qū)",多者之間的數(shù)據(jù)copy)旭寿。
- 其實(shí)對(duì)于producer/consumer/broker三者而言,CPU的開支應(yīng)該都不大, 因此啟用消息壓縮機(jī)制是一個(gè)良好的策略;壓縮需要消耗少量的CPU資源,不過對(duì)于kafka而言,網(wǎng)絡(luò)IO更應(yīng)該需要考慮.可以將任何在網(wǎng)絡(luò)上傳輸?shù)南⒍冀?jīng)過壓縮. kafka支持gzip/snappy 等多種壓縮方式.
- ??負(fù)載均衡 :
- kafka集群中的任何一個(gè)broker,都可以向producer提供metadata信息, 這些metadata中包含"集群中存活的servers列表"、"partitions leader列表" 等信息(請(qǐng)參看zookeeper中的節(jié)點(diǎn)信息)崇败。
- 當(dāng)producer獲取到metadata信息之后, producer將會(huì)和Topic下所有partition leader保持socket連接;消息由producer直接通過socket發(fā)送到broker,中間不會(huì)經(jīng)過任何"路由層"盅称。
- 異步發(fā)送,將多條消息暫且在客戶端buffer起來,并將他們批量發(fā)送到broker;小數(shù)據(jù)IO太多,會(huì)拖慢整體的網(wǎng)絡(luò)延遲,批量延遲發(fā)送事實(shí)上提升了網(wǎng)絡(luò)效率;不過這也有一定的隱患,比如當(dāng)producer失效時(shí),那些尚未發(fā)送的消息將會(huì)丟失后室。
-
Topic模型 :
- 在kafka中,partition中的消息只有一個(gè)consumer在消費(fèi),且不存在消息狀態(tài)的控制,也沒有復(fù)雜的消息確認(rèn)機(jī)制,可見kafka broker端是相當(dāng)輕量級(jí)的.當(dāng)消息被consumer接收之后,consumer可以在本地保存最后消息的offset,并間歇性的向zookeeper提交offset.由此可見,consumer客戶端也很輕量級(jí)缩膝。松弛得控制就會(huì)引起重復(fù)消費(fèi)與消息丟失等問題。
- kafka中consumer負(fù)責(zé)維護(hù)消息的消費(fèi)記錄,而broker則不關(guān)心這些,這種設(shè)計(jì)不僅提高了consumer端的靈活性,也適度的減輕了broker端設(shè)計(jì)的復(fù)雜度;這是和眾多JMS prodiver的區(qū)別.
- ??log :
- 每個(gè)log entry格式為"4個(gè)字節(jié)的數(shù)字N表示消息的長(zhǎng)度" + "N個(gè)字節(jié)的消息內(nèi)容"; 每個(gè)日志都有一個(gè)offset來唯一的標(biāo)記一條消息, offset的值為8個(gè)字節(jié)的數(shù)字,表示此消息在此partition中所處的起始位置.
- 每個(gè)partition在物理存儲(chǔ)層面由多個(gè)log file組成(稱為segment) .segment file的命名為"最小offset".kafka.例如"00000000000.kafka";其中"最小offset"表示此segment中起始消息的offset.
- 獲取消息時(shí),需要指定offset和最大chunk尺寸,offset用來表示消息的起始位置,chunk size用來表示最大獲取消息的總長(zhǎng)度(間接的表示消息的條數(shù)). 根據(jù)offset,可以找到此消息所在segment文件,然后根據(jù)segment的最小offset取差值,得到它在file中的相對(duì)位置,直接讀取輸出即可.
-
Partition Owner registry :
- 用來標(biāo)記partition正在被哪個(gè)consumer消費(fèi).臨時(shí)znode岸霹。此節(jié)點(diǎn)表達(dá)了"一個(gè)partition"只能被group下一個(gè)consumer消費(fèi),同時(shí)當(dāng)group下某個(gè)consumer失效,那么將會(huì)觸發(fā)負(fù)載均衡(即:讓partitions在多個(gè)consumer間均衡消費(fèi),接管那些"游離"的partitions)疾层。例如:當(dāng)consumer啟動(dòng)時(shí),所觸發(fā)的操作 :
A) 首先進(jìn)行"Consumer id Registry";
B) 然后在"Consumer id Registry"節(jié)點(diǎn)下注冊(cè)一個(gè)watch用來監(jiān)聽當(dāng)前group中其他consumer的"leave"和"join";只要此znode path下節(jié)點(diǎn)列表變更,都會(huì)觸發(fā)此group下consumer的負(fù)載均衡.(比如一個(gè)consumer失效,那么其他consumer接管partitions).
C) 在"Broker id registry"節(jié)點(diǎn)下,注冊(cè)一個(gè)watch用來監(jiān)聽broker的存活情況;如果broker列表變更,將會(huì)觸發(fā)所有的groups下的consumer重新balance.
??總結(jié)?? :
① Producer端使用zookeeper用來"發(fā)現(xiàn)"broker列表,以及和Topic下每個(gè)partition leader建立socket連接并發(fā)送消息.
② Broker端使用zookeeper用來注冊(cè)broker信息,以及監(jiān)測(cè)partition leader存活性.
③ Consumer端使用zookeeper用來注冊(cè)consumer信息,其中包括consumer消費(fèi)的partition列表等,同時(shí)也用來發(fā)現(xiàn)broker列表,并和partition leader建立socket連接,并獲取消息。
-
Leader的選擇 :
- 必須保證贡避,一旦一個(gè)消息被提交了痛黎,但是leader down掉了予弧,新選出的leader必須可以提供這條消息。大部分的分布式系統(tǒng)采用了多數(shù)投票法則選擇新的leader,對(duì)于多數(shù)投票法則湖饱,就是根據(jù)所有副本節(jié)點(diǎn)的狀況動(dòng)態(tài)的選擇最適合的作為leader.Kafka并不是使用這種方法掖蛤。
- ??Kafka動(dòng)態(tài)維護(hù)了一個(gè)同步狀態(tài)的副本的集合(a set of in-sync replicas),簡(jiǎn)稱ISR井厌,在這個(gè)集合中的節(jié)點(diǎn)都是和leader保持高度一致的蚓庭,任何一條消息必須被這個(gè)集合中的每個(gè)節(jié)點(diǎn)讀取并追加到日志中了,才會(huì)通知外部這個(gè)消息已經(jīng)被提交了仅仆。因此這個(gè)集合中的任何一個(gè)節(jié)點(diǎn)隨時(shí)都可以被選為leader.ISR在ZooKeeper中維護(hù)彪置。ISR中有f+1個(gè)節(jié)點(diǎn),就可以允許在f個(gè)節(jié)點(diǎn)down掉的情況下不會(huì)丟失消息并正常提供服蝇恶。ISR的成員是動(dòng)態(tài)的拳魁,如果一個(gè)節(jié)點(diǎn)被淘汰了,當(dāng)它重新達(dá)到“同步中”的狀態(tài)時(shí)撮弧,他可以重新加入ISR.這種leader的選擇方式是非撑税茫快速的,適合kafka的應(yīng)用場(chǎng)景贿衍。
-
副本管理 :
- ??Kafka盡量的使所有分區(qū)均勻的分布到集群所有的節(jié)點(diǎn)上而不是集中在某些節(jié)點(diǎn)上授舟,另外主從關(guān)系也盡量均衡,這樣每個(gè)節(jié)點(diǎn)都會(huì)擔(dān)任一定比例的分區(qū)的leader.
- 優(yōu)化leader的選擇過程也是很重要的贸辈,它決定了系統(tǒng)發(fā)生故障時(shí)的空窗期有多久释树。Kafka選擇一個(gè)節(jié)點(diǎn)作為“controller”,當(dāng)發(fā)現(xiàn)有節(jié)點(diǎn)down掉的時(shí)候它負(fù)責(zé)在有用分區(qū)的所有節(jié)點(diǎn)中選擇新的leader,這使得Kafka可以批量的高效的管理所有分區(qū)節(jié)點(diǎn)的主從關(guān)系。如果controller down掉了擎淤,活著的節(jié)點(diǎn)中的一個(gè)會(huì)備切換為新的controller.
-
Leader與副本同步 :
- ??對(duì)于某個(gè)分區(qū)來說奢啥,保存正分區(qū)的"broker"為該分區(qū)的"leader",保存?zhèn)浞莘謪^(qū)的"broker"為該分區(qū)的"follower"嘴拢。備份分區(qū)會(huì)完全復(fù)制正分區(qū)的消息桩盲,包括消息的編號(hào)等附加屬性值。為了保持正分區(qū)和備份分區(qū)的內(nèi)容一致席吴,Kafka采取的方案是在保存?zhèn)浞莘謪^(qū)的"broker"上開啟一個(gè)消費(fèi)者進(jìn)程進(jìn)行消費(fèi)赌结,從而使得正分區(qū)的內(nèi)容與備份分區(qū)的內(nèi)容保持一致。一般情況下孝冒,一個(gè)分區(qū)有一個(gè)“正分區(qū)”和零到多個(gè)“備份分區(qū)”柬姚。可以配置“正分區(qū)+備份分區(qū)”的總數(shù)量庄涡,關(guān)于這個(gè)配置量承,不同主題可以有不同的配置值。注意,生產(chǎn)者宴合,消費(fèi)者只與保存正分區(qū)的"leader"進(jìn)行通信焕梅。
- Kafka允許topic的分區(qū)擁有若干副本,這個(gè)數(shù)量是可以配置的卦洽,你可以為每個(gè)topic配置副本的數(shù)量贞言。Kafka會(huì)自動(dòng)在每個(gè)副本上備份數(shù)據(jù),所以當(dāng)一個(gè)節(jié)點(diǎn)down掉時(shí)數(shù)據(jù)依然是可用的阀蒂。
- ??創(chuàng)建副本的單位是topic的分區(qū)该窗,每個(gè)分區(qū)都有一個(gè)leader和零或多個(gè)followers.所有的讀寫操作都由leader處理,一般分區(qū)的數(shù)量都比broker的數(shù)量多的多蚤霞,各分區(qū)的leader均勻的分布在brokers中酗失。所有的followers都復(fù)制leader的日志,日志中的消息和順序都和leader中的一致昧绣。followers向普通的consumer那樣從leader那里拉取消息并保存在自己的日志文件中规肴。
- ??Kafka判斷一個(gè)節(jié)點(diǎn)是否活著有兩個(gè)條件 :
① 節(jié)點(diǎn)必須可以維護(hù)和ZooKeeper的連接,Zookeeper通過心跳機(jī)制檢查每個(gè)節(jié)點(diǎn)的連接夜畴。
② 如果節(jié)點(diǎn)是個(gè)follower,他必須能及時(shí)的同步leader的寫操作拖刃,延時(shí)不能太久。 - ??只有當(dāng)消息被所有的副本加入到日志中時(shí)贪绘,才算是“committed”兑牡,只有committed的消息才會(huì)發(fā)送給consumer,這樣就不用擔(dān)心一旦leader down掉了消息會(huì)丟失税灌。Producer也可以選擇是否等待消息被提交的通知均函,這個(gè)是由參數(shù)acks決定的。
- ????partiton中文件存儲(chǔ)方式 :
- 每個(gè)partion(目錄)相當(dāng)于一個(gè)巨型文件被平均分配到多個(gè)大小相等segment(段)數(shù)據(jù)文件中菱涤。但每個(gè)段segment file消息數(shù)量不一定相等苞也,這種特性方便old segment file快速被刪除。
- 每個(gè)partiton只需要支持順序讀寫就行了狸窘,segment文件生命周期由服務(wù)端配置參數(shù)決定墩朦。
- 這樣做的好處就是能快速刪除無用文件坯认,有效提高磁盤利用率翻擒。
- ????partiton中segment文件存儲(chǔ)結(jié)構(gòu) :
- producer發(fā)message到某個(gè)topic,message會(huì)被均勻的分布到多個(gè)partition上(隨機(jī)或根據(jù)用戶指定的回調(diào)函數(shù)進(jìn)行分布)牛哺,kafka broker收到message往對(duì)應(yīng)partition leader的最后一個(gè)segment上添加該消息陋气,當(dāng)某個(gè)segment上的消息條數(shù)達(dá)到配置值或消息發(fā)布時(shí)間超過閾值時(shí),segment上的消息會(huì)被flush到磁盤引润,只有flush到磁盤上的消息consumer才能消費(fèi)巩趁,segment達(dá)到一定的大小后將不會(huì)再往該segment寫數(shù)據(jù),broker會(huì)創(chuàng)建新的segment。
- 每個(gè)part在內(nèi)存中對(duì)應(yīng)一個(gè)index议慰,記錄每個(gè)segment中的第一條消息偏移蠢古。
- segment file組成:由2大部分組成,分別為index file和data file别凹,此2個(gè)文件一 一對(duì)應(yīng)草讶,成對(duì)出現(xiàn),后綴".index"和“.log”分別表示為segment索引文件炉菲、數(shù)據(jù)文件.
- segment文件命名規(guī)則:partion全局的第一個(gè)segment從0開始堕战,后續(xù)每個(gè)segment文件名為上一個(gè)全局partion的最大offset(偏移message數(shù))。數(shù)值最大為64位long大小拍霜,19位數(shù)字字符長(zhǎng)度嘱丢,沒有數(shù)字用0填充。
- 每個(gè)segment中存儲(chǔ)很多條消息祠饺,消息id由其邏輯位置決定越驻,即從消息id可直接定位到消息的存儲(chǔ)位置,避免id到位置的額外映射道偷。
-
下圖所示segment文件列表形象說明了上述2個(gè)規(guī)則:
關(guān)鍵字解釋說明 :
① 8 byte offset在parition(分區(qū))內(nèi)的每條消息都有一個(gè)有序的id號(hào)履澳,這個(gè)id號(hào)被稱為偏移(offset),它可以唯一確定每條消息在parition(分區(qū))內(nèi)的位置。即offset表示partiion的第多少message
② 4 byte message sizemessage大小
③ 4 byte CRC32用crc32校驗(yàn)message
④ 1 byte “magic"表示本次發(fā)布Kafka服務(wù)程序協(xié)議版本號(hào)
⑤ 1 byte “attributes"表示為獨(dú)立版本怀跛、或標(biāo)識(shí)壓縮類型距贷、或編碼類型。
⑥ 4 byte key length表示key的長(zhǎng)度,當(dāng)key為-1時(shí)吻谋,K byte key字段不填
⑦ K byte key可選
⑧ value bytes payload表示實(shí)際消息數(shù)據(jù)忠蝗。 - ??segment index file采取稀疏索引存儲(chǔ)方式,它減少索引文件大小漓拾,通過mmap可以直接內(nèi)存操作阁最,稀疏索引為數(shù)據(jù)文件的每個(gè)對(duì)應(yīng)message設(shè)置一個(gè)元數(shù)據(jù)指針,它 比稠密索引節(jié)省了更多的存儲(chǔ)空間,但查找起來需要消耗更多的時(shí)間骇两。
8.??kafka會(huì)記錄offset到zk中速种。但是,zk client api對(duì)zk的頻繁寫入是一個(gè)低效的操作低千。0.8.2 kafka引入了native offset storage配阵,將offset管理從zk移出,并且可以做到水平擴(kuò)展。其原理就是利用了kafka的compacted topic棋傍,offset以consumer group,topic與partion的組合作為key直接提交到compacted topic中救拉。同時(shí)Kafka又在內(nèi)存中維護(hù)了三元組來保存最新的offset信息,consumer來取最新offset信息的時(shí)候直接內(nèi)存里拿即可瘫拣。當(dāng)然近上,kafka允許你快速的checkpoint最新的offset信息到磁盤上。
-
Partition Replication原則 :
-
Kafka高效文件存儲(chǔ)設(shè)計(jì)特點(diǎn) :
① Kafka把topic中一個(gè)parition大文件分成多個(gè)小文件段拂铡,通過多個(gè)小文件段壹无,就容易定期清除或刪除已經(jīng)消費(fèi)完文件,減少磁盤占用感帅。
② 通過索引信息可以快速定位message和確定response的最大大小斗锭。
③ 通過index元數(shù)據(jù)全部映射到memory,可以避免segment file的IO磁盤操作失球。
④ 通過索引文件稀疏存儲(chǔ)岖是,可以大幅降低index文件元數(shù)據(jù)占用空間大小。
-
Kafka高效文件存儲(chǔ)設(shè)計(jì)特點(diǎn) :
- ??高效的數(shù)據(jù)傳輸 :
- 發(fā)布者每次可發(fā)布多條消息(將消息加到一個(gè)消息集合中發(fā)布)实苞,consumer每次迭代消費(fèi)一條消息豺撑。
- 不創(chuàng)建單獨(dú)的cache,使用系統(tǒng)的page cache黔牵。發(fā)布者順序發(fā)布聪轿,訂閱者通常比發(fā)布者滯后一點(diǎn)點(diǎn),直接使用Linux的page cache效果比較好猾浦,同時(shí)減少了cache管理及垃圾收集的開銷陆错。
- page cache中的每個(gè)文件都是一棵基數(shù)樹(radix tree,本質(zhì)上是多叉搜索樹)金赦,樹的每個(gè)節(jié)點(diǎn)都是一個(gè)頁(yè)音瓷。根據(jù)文件內(nèi)的偏移量就可以快速定位到所在的頁(yè)。
-
Kafka為什么不自己管理緩存夹抗,而非要用page cache绳慎?原因有如下三點(diǎn):
① JVM中一切皆對(duì)象,數(shù)據(jù)的對(duì)象存儲(chǔ)會(huì)帶來所謂object overhead漠烧,浪費(fèi)空間杏愤;
② 如果由JVM來管理緩存,會(huì)受到GC的影響沽甥,并且過大的堆也會(huì)拖累GC的效率声邦,降低吞吐量;
③ 一旦程序崩潰摆舟,自己管理的緩存數(shù)據(jù)會(huì)全部丟失。 - producer生產(chǎn)消息時(shí),會(huì)使用pwrite()系統(tǒng)調(diào)用【對(duì)應(yīng)到Java NIO中是FileChannel.write() API】按偏移量寫入數(shù)據(jù)恨诱,并且都會(huì)先寫入page cache里媳瞪。
- consumer消費(fèi)消息時(shí),會(huì)使用sendfile()系統(tǒng)調(diào)用【對(duì)應(yīng)FileChannel.transferTo() API】照宝,零拷貝地將數(shù)據(jù)從page cache傳輸?shù)絙roker的Socket buffer蛇受,再通過網(wǎng)絡(luò)傳輸。
- 同時(shí)厕鹃,page cache中的數(shù)據(jù)會(huì)隨著內(nèi)核中flusher線程的調(diào)度以及對(duì)sync()/fsync()的調(diào)用寫回到磁盤兢仰,就算進(jìn)程崩潰,也不用擔(dān)心數(shù)據(jù)丟失剂碴。
kafka分區(qū)的原因:
- 指明partition的情況下把将,直接將指明的值直接作為partition的值。
- 沒有指明partition值但有key的值忆矛,將key的hash值與toptic的partition數(shù)進(jìn)行取余得到partition值察蹲。
- 既沒有partition值又沒有key值的情況下,第一次調(diào)用時(shí)隨機(jī)生成一個(gè)整數(shù)(后面每次調(diào)用在這個(gè)整數(shù)上自增)催训,將這個(gè)值與 topic 可用的 partition 總數(shù)取余得到 partition 值洽议,也就是常說的 round-robin 算法。
kafka高吞吐量的原因:
-
kafka的消息是不斷追加到文件中的漫拭,這個(gè)特性使kafka可以充分利用磁盤的順序讀寫性能亚兄,順序讀寫不需要硬盤磁頭的尋道時(shí)間,只需很少的扇區(qū)旋轉(zhuǎn)時(shí)間采驻,所以速度遠(yuǎn)快于隨機(jī)讀寫儿捧。生產(chǎn)者負(fù)責(zé)寫入數(shù)據(jù),Kafka會(huì)將消息持久化到磁盤挑宠,保證不會(huì)丟失數(shù)據(jù)菲盾,Kafka采用了兩個(gè)技術(shù)提高寫入的速度:
- 順序?qū)懭?/strong>:硬盤是機(jī)械結(jié)構(gòu),需要指針尋址找到存儲(chǔ)數(shù)據(jù)的位置各淀,所以懒鉴,如果是隨機(jī)IO,磁盤會(huì)進(jìn)行頻繁的尋址碎浇,導(dǎo)致寫入速度下降临谱。Kafka使用了順序IO提高了磁盤的寫入速度,Kafka會(huì)將數(shù)據(jù)順序插入到文件末尾奴璃,消費(fèi)者端通過控制偏移量來讀取消息悉默,這樣做會(huì)導(dǎo)致數(shù)據(jù)無法刪除,時(shí)間一長(zhǎng)苟穆,磁盤空間會(huì)滿抄课,kafka提供了2種策略來刪除數(shù)據(jù):基于時(shí)間刪除和基于partition文件的大小刪除唱星。
- Memory Mapped Files:這個(gè)和Java NIO中的內(nèi)存映射基本相同,跟磨,mmf直接利用操作系統(tǒng)的Page來實(shí)現(xiàn)文件到物理內(nèi)存的映射间聊,完成之后對(duì)物理內(nèi)存的操作會(huì)直接同步到硬盤。mmf通過內(nèi)存映射的方式大大提高了IO速率抵拘,省去了用戶空間到內(nèi)核空間的復(fù)制哎榴。它的缺點(diǎn)顯而易見--不可靠,當(dāng)發(fā)生宕機(jī)而數(shù)據(jù)未同步到硬盤時(shí)僵蛛,數(shù)據(jù)會(huì)丟失尚蝌,Kafka提供了produce.type參數(shù)來控制是否主動(dòng)的進(jìn)行刷新,如果kafka寫入到mmp后立即flush再返回給生產(chǎn)者則為同步模式充尉,反之為異步模式飘言。
-
零拷貝:平時(shí)從服務(wù)器讀取靜態(tài)文件時(shí),服務(wù)器先將文件從磁盤復(fù)制到內(nèi)核空間喉酌,再?gòu)?fù)制到用戶空間热凹,最后再?gòu)?fù)制到內(nèi)核空間并通過網(wǎng)卡發(fā)送出去,而零拷貝則是直接從內(nèi)核到內(nèi)核再到網(wǎng)卡泪电,省去了用戶空間的復(fù)制般妙。
- Kafka把所有的消息存放到一個(gè)文件中,當(dāng)消費(fèi)者需要數(shù)據(jù)的時(shí)候直接將文件發(fā)送給消費(fèi)者相速,比如10W的消息共10M碟渺,全部發(fā)送給消費(fèi)者,10M的消息在內(nèi)網(wǎng)中傳輸是非惩晃埽快的苫拍,假如需要1s,那么kafka的tps就是10w旺隙。Zero copy對(duì)應(yīng)的是Linux中sendfile函數(shù)绒极,這個(gè)函數(shù)會(huì)接受一個(gè)offsize來確定從哪里開始讀取。現(xiàn)實(shí)中蔬捷,不可能將整個(gè)文件全部發(fā)給消費(fèi)者垄提,他通過消費(fèi)者傳遞過來的偏移量來使用零拷貝讀取指定內(nèi)容的數(shù)據(jù)返回給消費(fèi)者。
kafka消息丟失的原因:
- 由于Producer端設(shè)置消息發(fā)送ack=0周拐,表示發(fā)送完成即可铡俐,不會(huì)關(guān)心broker的確認(rèn)回復(fù)。
- 高階消費(fèi)的時(shí)候妥粟,由于消息消費(fèi)是自動(dòng)提交的方法审丘,由于自動(dòng)提交時(shí)間默認(rèn)是1秒,消費(fèi)端拉取的消息還沒有處理完勾给,但是消費(fèi)進(jìn)度offset已經(jīng)更新了滩报,此時(shí)如果消費(fèi)端掛了锅知,可能存在1秒內(nèi)的數(shù)據(jù)丟失。
- ack=all的時(shí)候并不能保證數(shù)據(jù)不會(huì)丟失露泊,acks=all必須跟ISR列表里至少有2個(gè)以上的副本配合使用喉镰,起碼是有一個(gè)Leader和一個(gè)Follower才可以旅择。
kafka消息重復(fù)消費(fèi)的原因:
- 由于使用高階消費(fèi)惭笑,自動(dòng)提交過程,拉取的消息1秒內(nèi)已經(jīng)處理完生真,但是還沒有觸發(fā)提交的時(shí)間沉噩,此時(shí)消費(fèi)進(jìn)度offset沒有更新,下次拉取時(shí)就會(huì)存在重復(fù)消費(fèi)的請(qǐng)客柱蟀。
??kafka故障處理機(jī)制:
- Replica 消息圖示:
- follower故障 : follower發(fā)生故障后會(huì)被臨時(shí)踢出ISR长已,待該follower恢復(fù)后畜眨,follower會(huì)讀取本地磁盤記錄的上次的HW,并將log文件高于HW的部分截取掉术瓮,從HW開始向leader進(jìn)行同步康聂。等該follower的LEO大于等于該P(yáng)artition的HW,即follower追上leader之后胞四,就可以重新加入ISR了恬汁。
- leader故障 : leader發(fā)生故障之后,會(huì)從ISR中選出一個(gè)新的leader辜伟,之后氓侧,為保證多個(gè)副本之間的數(shù)據(jù)一致性,其余的follower會(huì)先將各自的log文件高于HW的部分截掉导狡,然后從新的leader同步數(shù)據(jù)约巷。
- 注意:這只能保證副本之間的數(shù)據(jù)一致性,并不能保證數(shù)據(jù)不丟失或者不重復(fù)旱捧。
kafka broker 宕機(jī) :
- 只需要新啟動(dòng)一個(gè)broker, 把broker.id設(shè)置為 損壞的那個(gè)broker的id, 就會(huì)自動(dòng)復(fù)制過去丟失的數(shù)據(jù)独郎。
kafka常用命令:
查詢 :
## 查詢集群描述
bin/kafka-topics.sh --describe --zookeeper
## topic列表查詢
bin/kafka-topics.sh --list --zookeeper 127.0.0.1:2181
## topic列表查詢(支持0.9版本+)
bin/kafka-topics.sh --list --bootstrap-server localhost:9092
## 新消費(fèi)者列表查詢(支持0.9版本+)
bin/kafka-consumer-groups.sh --list --new-consumer --bootstrap-server localhost:9092
## 新消費(fèi)者列表查詢(支持0.10版本+)
bin/kafka-consumer-groups.sh --list --bootstrap-server localhost:9092
## 顯示某個(gè)消費(fèi)組的消費(fèi)詳情(僅支持offset存儲(chǔ)在zookeeper上的)
bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper localhost:2181 --group test
## 顯示某個(gè)消費(fèi)組的消費(fèi)詳情(0.9版本 - 0.10.1.0 之前)
bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server localhost:9092 --describe --group test-consumer-group
## 顯示某個(gè)消費(fèi)組的消費(fèi)詳情(0.10.1.0版本+)
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group
發(fā)送和消費(fèi)
## 生產(chǎn)者
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
## 消費(fèi)者
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test
## 新生產(chǎn)者(支持0.9版本+)
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test --producer.config config/producer.properties
## 新消費(fèi)者(支持0.9版本+)
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --new-consumer --from-beginning --consumer.config config/consumer.properties
## 高級(jí)點(diǎn)的用法
bin/kafka-simple-consumer-shell.sh --brist localhost:9092 --topic test --partition 0 --offset 1234 --max-messages 10
平衡Leader:
bin/kafka-preferred-replica-election.sh --zookeeper zk_host:port/chroot
kafka自帶壓測(cè)工具:
bin/kafka-producer-perf-test.sh --topic test --num-records 100 --record-size 1 --throughput 100 --producer-props bootstrap.servers=localhost:9092
分區(qū)擴(kuò)容 :
bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic topic1 --partitions 2
? 本文部分內(nèi)容參見 美團(tuán)技術(shù)團(tuán)隊(duì)
- ? 文章要是勘誤或者知識(shí)點(diǎn)說的不正確,歡迎評(píng)論廊佩,畢竟這也是作者通過閱讀源碼獲得的知識(shí)囚聚,難免會(huì)有疏忽!
- ? 要是感覺文章對(duì)你有所幫助标锄,不妨點(diǎn)個(gè)關(guān)注顽铸,或者移駕看一下作者的其他文集,也都是干活多多哦料皇,文章也在全力更新中谓松。
- ? 著作權(quán)歸作者所有星压。商業(yè)轉(zhuǎn)載請(qǐng)聯(lián)系作者獲得授權(quán),非商業(yè)轉(zhuǎn)載請(qǐng)注明出處鬼譬!