3.Partition Replication原則
Kafka高效文件存儲設(shè)計特點(diǎn)
- Kafka把topic中一個parition大文件分成多個小文件段胎署,通過多個小文件段,就容易定期清除或刪除已經(jīng)消費(fèi)完文件,減少磁盤占用。
- 通過索引信息可以快速定位message和確定response的最大大小婴洼。
- 通過index元數(shù)據(jù)全部映射到memory,可以避免segment file的IO磁盤操作共屈。
- 通過索引文件稀疏存儲,可以大幅降低index文件元數(shù)據(jù)占用空間大小党窜。
3.1 Kafka集群partition replication默認(rèn)自動分配分析
下面以一個Kafka集群中4個Broker舉例拗引,創(chuàng)建1個topic包含4個Partition,2 Replication幌衣;數(shù)據(jù)Producer流動如圖所示:
(1)
(2)當(dāng)集群中新增2節(jié)點(diǎn)矾削,Partition增加到6個時分布情況如下:
副本分配邏輯規(guī)則如下:
- 在Kafka集群中,每個Broker都有均等分配Partition的Leader機(jī)會。
- 上述圖Broker Partition中哼凯,箭頭指向?yàn)楦北居洌訮artition-0為例:broker1中parition-0為Leader,Broker2中Partition-0為副本断部。
- 上述圖中每個Broker(按照BrokerId有序)依次分配主Partition,下一個Broker為副本猎贴,如此循環(huán)迭代分配,多副本都遵循此規(guī)則家坎。
副本分配算法如下:
- 將所有N Broker和待分配的i個Partition排序.
- 將第i個Partition分配到第(i mod n)個Broker上.
- 將第i個Partition的第j個副本分配到第((i + j) mod n)個Broker上.
4.Kafka Broker一些特性
4.1 無狀態(tài)的Kafka Broker :
1. Broker沒有副本機(jī)制,一旦broker宕機(jī)吝梅,該broker的消息將都不可用虱疏。
2. Broker不保存訂閱者的狀態(tài),由訂閱者自己保存苏携。
3. 無狀態(tài)導(dǎo)致消息的刪除成為難題(可能刪除的消息正在被訂閱)做瞪,kafka采用基于時間的SLA(服務(wù)水平保證),消息保存一定時間(通常為7天)后會被刪除右冻。
4. 消息訂閱者可以rewind back到任意位置重新進(jìn)行消費(fèi)装蓬,當(dāng)訂閱者故障時,可以選擇最小的offset進(jìn)行重新讀取消費(fèi)消息纱扭。
4.2 message的交付與生命周期 :
1. 不是嚴(yán)格的JMS牍帚, 因此kafka對消息的重復(fù)、丟失乳蛾、錯誤以及順序型沒有嚴(yán)格的要求暗赶。(這是與AMQ最大的區(qū)別)
2. kafka提供at-least-once delivery,即當(dāng)consumer宕機(jī)后,有些消息可能會被重復(fù)delivery肃叶。
3. 因每個partition只會被consumer group內(nèi)的一個consumer消費(fèi)蹂随,故kafka保證每個partition內(nèi)的消息會被順序的訂閱。
4. Kafka為每條消息為每條消息計算CRC校驗(yàn)因惭,用于錯誤檢測岳锁,crc校驗(yàn)不通過的消息會直接被丟棄掉。
4.3 壓縮
Kafka支持以集合(batch)為單位發(fā)送消息蹦魔,在此基礎(chǔ)上激率,Kafka還支持對消息集合進(jìn)行壓縮,Producer端可以通過GZIP或Snappy格式對消息集合進(jìn)行壓縮勿决。Producer端進(jìn)行壓縮之后柱搜,在Consumer端需進(jìn)行解壓。壓縮的好處就是減少傳輸?shù)臄?shù)據(jù)量剥险,減輕對網(wǎng)絡(luò)傳輸?shù)膲毫Υ险海趯Υ髷?shù)據(jù)處理上,瓶頸往往體現(xiàn)在網(wǎng)絡(luò)上而不是CPU。
那么如何區(qū)分消息是壓縮的還是未壓縮的呢健爬,Kafka在消息頭部添加了一個描述壓縮屬性字節(jié)控乾,這個字節(jié)的后兩位表示消息的壓縮采用的編碼,如果后兩位為0娜遵,則表示消息未被壓縮蜕衡。
4.4 消息可靠性
在消息系統(tǒng)中,保證消息在生產(chǎn)和消費(fèi)過程中的可靠性是十分重要的设拟,在實(shí)際消息傳遞過程中慨仿,可能會出現(xiàn)如下三中情況:
一個消息發(fā)送失敗
一個消息被發(fā)送多次
最理想的情況:exactly-once ,一個消息發(fā)送成功且僅發(fā)送了一次
有許多系統(tǒng)聲稱它們實(shí)現(xiàn)了exactly-once,但是它們其實(shí)忽略了生產(chǎn)者或消費(fèi)者在生產(chǎn)和消費(fèi)過程中有可能失敗的情況纳胧。比如雖然一個Producer成功發(fā)送一個消息镰吆,但是消息在發(fā)送途中丟失,或者成功發(fā)送到broker跑慕,也被consumer成功取走万皿,但是這個consumer在處理取過來的消息時失敗了。
從Producer端看:Kafka是這么處理的核行,當(dāng)一個消息被發(fā)送后牢硅,Producer會等待broker成功接收到消息的反饋(可通過參數(shù)控制等待時間),如果消息在途中丟失或是其中一個broker掛掉芝雪,Producer會重新發(fā)送(我們知道Kafka有備份機(jī)制减余,可以通過參數(shù)控制是否等待所有備份節(jié)點(diǎn)都收到消息)。
從Consumer端看:前面講到過partition惩系,broker端記錄了partition中的一個offset值佳励,這個值指向Consumer下一個即將消費(fèi)message。當(dāng)Consumer收到了消息蛆挫,但卻在處理過程中掛掉赃承,此時Consumer可以通過這個offset值重新找到上一個消息再進(jìn)行處理。Consumer還有權(quán)限控制這個offset值悴侵,對持久化到broker端的消息做任意處理瞧剖。
4.5 備份機(jī)制
備份機(jī)制是Kafka0.8版本的新特性,備份機(jī)制的出現(xiàn)大大提高了Kafka集群的可靠性可免、穩(wěn)定性抓于。有了備份機(jī)制后,Kafka允許集群中的節(jié)點(diǎn)掛掉后而不影響整個集群工作浇借。一個備份數(shù)量為n的集群允許n-1個節(jié)點(diǎn)失敗捉撮。在所有備份節(jié)點(diǎn)中,有一個節(jié)點(diǎn)作為lead節(jié)點(diǎn)妇垢,這個節(jié)點(diǎn)保存了其它備份節(jié)點(diǎn)列表巾遭,并維持各個備份間的狀體同步肉康。下面這幅圖解釋了Kafka的備份機(jī)制:
4.6 Kafka高效性相關(guān)設(shè)計
4.6.1 消息的持久化
Kafka高度依賴文件系統(tǒng)來存儲和緩存消息(AMQ的nessage是持久化到mysql數(shù)據(jù)庫中的),因?yàn)橐话愕娜苏J(rèn)為磁盤是緩慢的灼舍,這導(dǎo)致人們對持久化結(jié)構(gòu)具有競爭性持懷疑態(tài)度吼和。其實(shí),磁盤的快或者慢骑素,這決定于我們?nèi)绾问褂么疟P炫乓。因?yàn)榇疟P線性寫的速度遠(yuǎn)遠(yuǎn)大于隨機(jī)寫。線性讀寫在大多數(shù)應(yīng)用場景下是可以預(yù)測的献丑。
4.6.2 常數(shù)時間性能保證
每個Topic的Partition的是一個大文件夾末捣,里面有無數(shù)個小文件夾segment,但partition是一個隊(duì)列创橄,隊(duì)列中的元素是segment,消費(fèi)的時候先從第0個segment開始消費(fèi)箩做,新來message存在最后一個消息隊(duì)列中。對于segment也是對隊(duì)列筐摘,隊(duì)列元素是message,有對應(yīng)的offsite標(biāo)識是哪個message卒茬。消費(fèi)的時候先從這個segment的第一個message開始消費(fèi)船老,新來的message存在segment的最后咖熟。
消息系統(tǒng)的持久化隊(duì)列可以構(gòu)建在對一個文件的讀和追加上,就像一般情況下的日志解決方案柳畔。它有一個優(yōu)點(diǎn)馍管,所有的操作都是常數(shù)時間,并且讀寫之間不會相互阻塞薪韩。這種設(shè)計具有極大的性能優(yōu)勢:最終系統(tǒng)性能和數(shù)據(jù)大小完全無關(guān)确沸,服務(wù)器可以充分利用廉價的硬盤來提供高效的消息服務(wù)。
事實(shí)上還有一點(diǎn)俘陷,磁盤空間的無限增大而不影響性能這點(diǎn)罗捎,意味著我們可以提供一般消息系統(tǒng)無法提供的特性。比如說拉盾,消息被消費(fèi)后不是立馬被刪除桨菜,我們可以將這些消息保留一段相對比較長的時間(比如一個星期)。
5.Kafka 生產(chǎn)者-消費(fèi)者
消息系統(tǒng)通常都會由生產(chǎn)者捉偏,消費(fèi)者倒得,Broker三大部分組成,生產(chǎn)者會將消息寫入到Broker夭禽,消費(fèi)者會從Broker中讀取出消息霞掺,不同的MQ實(shí)現(xiàn)的Broker實(shí)現(xiàn)會有所不同,不過Broker的本質(zhì)都是要負(fù)責(zé)將消息落地到服務(wù)端的存儲系統(tǒng)中讹躯。具體步驟如下:
- 生產(chǎn)者客戶端應(yīng)用程序產(chǎn)生消息:
1. 客戶端連接對象將消息包裝到請求中發(fā)送到服務(wù)端
2. 服務(wù)端的入口也有一個連接對象負(fù)責(zé)接收請求菩彬,并將消息以文件的形式存儲起來
3. 服務(wù)端返回響應(yīng)結(jié)果給生產(chǎn)者客戶端
- 消費(fèi)者客戶端應(yīng)用程序消費(fèi)消息:
1. 客戶端連接對象將消費(fèi)信息也包裝到請求中發(fā)送給服務(wù)端
2. 服務(wù)端從文件存儲系統(tǒng)中取出消息
3. 服務(wù)端返回響應(yīng)結(jié)果給消費(fèi)者客戶端
4. 客戶端將響應(yīng)結(jié)果還原成消息并開始處理消息
5.1 Producers
Producers直接發(fā)送消息到broker上的leader partition缠劝,不需要經(jīng)過任何中介或其他路由轉(zhuǎn)發(fā)。為了實(shí)現(xiàn)這個特性挤巡,kafka集群中的每個broker都可以響應(yīng)producer的請求剩彬,并返回topic的一些元信息,這些元信息包括哪些機(jī)器是存活的矿卑,topic的leader partition都在哪喉恋,現(xiàn)階段哪些leader partition是可以直接被訪問的。
Producer客戶端自己控制著消息被推送到哪些partition母廷。實(shí)現(xiàn)的方式可以是隨機(jī)分配轻黑、實(shí)現(xiàn)一類隨機(jī)負(fù)載均衡算法,或者指定一些分區(qū)算法琴昆。Kafka提供了接口供用戶實(shí)現(xiàn)自定義的partition氓鄙,用戶可以為每個消息指定一個partitionKey,通過這個key來實(shí)現(xiàn)一些hash分區(qū)算法业舍。比如抖拦,把userid作為partitionkey的話,相同userid的消息將會被推送到同一個partition舷暮。
以Batch的方式推送數(shù)據(jù)可以極大的提高處理效率态罪,kafka Producer 可以將消息在內(nèi)存中累計到一定數(shù)量后作為一個batch發(fā)送請求。Batch的數(shù)量大小可以通過Producer的參數(shù)控制下面,參數(shù)值可以設(shè)置為累計的消息的數(shù)量(如500條)复颈、累計的時間間隔(如100ms)或者累計的數(shù)據(jù)大小(64KB)。通過增加batch的大小沥割,可以減少網(wǎng)絡(luò)請求和磁盤IO的次數(shù)耗啦,當(dāng)然具體參數(shù)設(shè)置需要在效率和時效性方面做一個權(quán)衡。
Producers可以異步的并行的向kafka發(fā)送消息机杜,但是通常producer在發(fā)送完消息之后會得到一個future響應(yīng)帜讲,返回的是offset值或者發(fā)送過程中遇到的錯誤。這其中有個非常重要的參數(shù)“acks”,這個參數(shù)決定了producer要求leader partition 收到確認(rèn)的副本個數(shù)椒拗,如果acks設(shè)置數(shù)量為0似将,表示producer不會等待broker的響應(yīng),所以陡叠,producer無法知道消息是否發(fā)送成功玩郊,這樣有可能會導(dǎo)致數(shù)據(jù)丟失,但同時枉阵,acks值為0會得到最大的系統(tǒng)吞吐量译红。
若acks設(shè)置為1,表示producer會在leader partition收到消息時得到broker的一個確認(rèn)兴溜,這樣會有更好的可靠性侦厚,因?yàn)榭蛻舳藭却钡絙roker確認(rèn)收到消息耻陕。若設(shè)置為-1,producer會在所有備份的partition收到消息時得到broker的確認(rèn)刨沦,這個設(shè)置可以得到最高的可靠性保證诗宣。
Kafka 消息有一個定長的header和變長的字節(jié)數(shù)組組成。因?yàn)閗afka消息支持字節(jié)數(shù)組想诅,也就使得kafka可以支持任何用戶自定義的序列號格式或者其它已有的格式如Apache Avro召庞、protobuf等。Kafka沒有限定單個消息的大小来破,但我們推薦消息大小不要超過1MB,通常一般消息大小都在1~10kB之前篮灼。
發(fā)布消息時,kafka client先構(gòu)造一條消息徘禁,將消息加入到消息集set中(kafka支持批量發(fā)布诅诱,可以往消息集合中添加多條消息,一次行發(fā)布)送朱,send消息時娘荡,producer client需指定消息所屬的topic。
5.2 Consumers
Kafka提供了兩套consumer api驶沼,分為high-level api和sample-api炮沐。Sample-api 是一個底層的API,它維持了一個和單一broker的連接商乎,并且這個API是完全無狀態(tài)的央拖,每次請求都需要指定offset值祭阀,因此鹉戚,這套API也是最靈活的。
在kafka中专控,當(dāng)前讀到哪條消息的offset值是由consumer來維護(hù)的抹凳,因此,consumer可以自己決定如何讀取kafka中的數(shù)據(jù)伦腐。比如赢底,consumer可以通過重設(shè)offset值來重新消費(fèi)已消費(fèi)過的數(shù)據(jù)。不管有沒有被消費(fèi)柏蘑,kafka會保存數(shù)據(jù)一段時間幸冻,這個時間周期是可配置的,只有到了過期時間咳焚,kafka才會刪除這些數(shù)據(jù)洽损。(這一點(diǎn)與AMQ不一樣,AMQ的message一般來說都是持久化到mysql中的革半,消費(fèi)完的message會被delete掉)
High-level API封裝了對集群中一系列broker的訪問碑定,可以透明的消費(fèi)一個topic流码。它自己維持了已消費(fèi)消息的狀態(tài),即每次消費(fèi)的都是下一個消息延刘。
High-level API還支持以組的形式消費(fèi)topic漫试,如果consumers有同一個組名,那么kafka就相當(dāng)于一個隊(duì)列消息服務(wù)碘赖,而各個consumer均衡的消費(fèi)相應(yīng)partition中的數(shù)據(jù)驾荣。若consumers有不同的組名,那么此時kafka就相當(dāng)與一個廣播服務(wù)普泡,會把topic中的所有消息廣播到每個consumer秘车。
High level api和Low level api是針對consumer而言的,和producer無關(guān)劫哼。
High level api是consumer讀的partition的offsite是存在zookeeper上叮趴。High level api 會啟動另外一個線程去每隔一段時間,offsite自動同步到zookeeper上权烧。換句話說眯亦,如果使用了High level api, 每個message只能被讀一次般码,一旦讀了這條message之后妻率,無論我consumer的處理是否ok。High level api的另外一個線程會自動的把offiste+1同步到zookeeper上板祝。如果consumer讀取數(shù)據(jù)出了問題宫静,offsite也會在zookeeper上同步。因此券时,如果consumer處理失敗了孤里,會繼續(xù)執(zhí)行下一條。這往往是不對的行為橘洞。因此捌袜,Best Practice是一旦consumer處理失敗,直接讓整個conusmer group拋Exception終止炸枣,但是最后讀的這一條數(shù)據(jù)是丟失了虏等,因?yàn)樵趜ookeeper里面的offsite已經(jīng)+1了。等再次啟動conusmer group的時候适肠,已經(jīng)從下一條開始讀取處理了霍衫。
Low level api是consumer讀的partition的offsite在consumer自己的程序中維護(hù)。不會同步到zookeeper上侯养。但是為了kafka manager能夠方便的監(jiān)控敦跌,一般也會手動的同步到zookeeper上。這樣的好處是一旦讀取某個message的consumer失敗了沸毁,這條message的offsite我們自己維護(hù)峰髓,我們不會+1傻寂。下次再啟動的時候,還會從這個offsite開始讀携兵。這樣可以做到exactly once對于數(shù)據(jù)的準(zhǔn)確性有保證疾掰。
對于Consumer group:
1. 允許consumer group(包含多個consumer,如一個集群同時消費(fèi))對一個topic進(jìn)行消費(fèi)徐紧,不同的consumer group之間獨(dú)立消費(fèi)静檬。
2. 為了對減小一個consumer group中不同consumer之間的分布式協(xié)調(diào)開銷,指定partition為最小的并行消費(fèi)單位并级,即一個group內(nèi)的consumer只能消費(fèi)不同的partition拂檩。
Consumer與Partition的關(guān)系:
- 如果consumer比partition多,是浪費(fèi)嘲碧,因?yàn)閗afka的設(shè)計是在一個partition上是不允許并發(fā)的稻励,所以consumer數(shù)不要大于partition數(shù)
- 如果consumer比partition少,一個consumer會對應(yīng)于多個partitions愈涩,這里主要合理分配consumer數(shù)和partition數(shù)望抽,否則會導(dǎo)致partition里面的數(shù)據(jù)被取的不均勻
- 如果consumer從多個partition讀到數(shù)據(jù),不保證數(shù)據(jù)間的順序性履婉,kafka只保證在一個partition上數(shù)據(jù)是有序的煤篙,但多個partition,根據(jù)你讀的順序會有不同
- 增減consumer毁腿,broker辑奈,partition會導(dǎo)致rebalance,所以rebalance后consumer對應(yīng)的partition會發(fā)生變化
- High-level接口中獲取不到數(shù)據(jù)的時候是會block的
負(fù)載低的情況下可以每個線程消費(fèi)多個partition已烤。但負(fù)載高的情況下鸠窗,Consumer 線程數(shù)最好和Partition數(shù)量保持一致。如果還是消費(fèi)不過來草戈,應(yīng)該再開 Consumer 進(jìn)程塌鸯,進(jìn)程內(nèi)線程數(shù)同樣和分區(qū)數(shù)一致侍瑟。
消費(fèi)消息時唐片,kafka client需指定topic以及partition number(每個partition對應(yīng)一個邏輯日志流,如topic代表某個產(chǎn)品線涨颜,partition代表產(chǎn)品線的日志按天切分的結(jié)果)费韭,consumer client訂閱后,就可迭代讀取消息庭瑰,如果沒有消息星持,consumer client會阻塞直到有新的消息發(fā)布。consumer可以累積確認(rèn)接收到的消息弹灭,當(dāng)其確認(rèn)了某個offset的消息督暂,意味著之前的消息也都已成功接收到揪垄,此時broker會更新zookeeper上地offset registry。
5.3 高效的數(shù)據(jù)傳輸
- 發(fā)布者每次可發(fā)布多條消息(將消息加到一個消息集合中發(fā)布)逻翁, consumer每次迭代消費(fèi)一條消息饥努。
- 不創(chuàng)建單獨(dú)的cache,使用系統(tǒng)的page cache八回。發(fā)布者順序發(fā)布酷愧,訂閱者通常比發(fā)布者滯后一點(diǎn)點(diǎn),直接使用Linux的page cache效果也比較后缠诅,同時減少了cache管理及垃圾收集的開銷溶浴。
- 使用sendfile優(yōu)化網(wǎng)絡(luò)傳輸,減少一次內(nèi)存拷貝管引。
6.Kafka 與 Zookeeper
6.1 Zookeeper 協(xié)調(diào)控制
- 管理broker與consumer的動態(tài)加入與離開士败。(Producer不需要管理,隨便一臺計算機(jī)都可以作為Producer向Kakfa Broker發(fā)消息)
- 觸發(fā)負(fù)載均衡褥伴,當(dāng)broker或consumer加入或離開時會觸發(fā)負(fù)載均衡算法拱烁,使得一個consumer group內(nèi)的多個consumer的消費(fèi)負(fù)載平衡。(因?yàn)橐粋€comsumer消費(fèi)一個或多個partition噩翠,一個partition只能被一個consumer消費(fèi))
- 維護(hù)消費(fèi)關(guān)系及每個partition的消費(fèi)信息戏自。
6.2 Zookeeper上的細(xì)節(jié):
- 每個broker啟動后會在zookeeper上注冊一個臨時的broker registry,包含broker的ip地址和端口號伤锚,所存儲的topics和partitions信息擅笔。
- 每個consumer啟動后會在zookeeper上注冊一個臨時的consumer registry:包含consumer所屬的consumer group以及訂閱的topics。
- 每個consumer group關(guān)聯(lián)一個臨時的owner registry和一個持久的offset registry屯援。對于被訂閱的每個partition包含一個owner registry猛们,內(nèi)容為訂閱這個partition的consumer id;同時包含一個offset registry狞洋,內(nèi)容為上一次訂閱的offset弯淘。