最近在研究kafka肮韧,原版論文見(jiàn)我的另一篇文章饱普,本文對(duì)kafka2.7最新版的設(shè)計(jì)文檔做一個(gè)概括性解讀孝常,原文見(jiàn)官網(wǎng)的設(shè)計(jì)文檔。
1. Motivation
kafka被設(shè)計(jì)成一個(gè)通用的數(shù)據(jù)流處理平臺(tái)。(早已超出2011年出版的日志收集器和消息隊(duì)列)蔓涧。
- 高吞吐(日志聚合)
- 大型數(shù)據(jù)塊的整合(offline 平臺(tái)的拉取)
- 低延遲(消息隊(duì)列)
- 分區(qū)笋额、分布式的實(shí)時(shí)流處理元暴,從一個(gè)流派生新的流;
- 容錯(cuò)(當(dāng)被投喂非日志數(shù)據(jù)時(shí))
為了支持這些特性兄猩,一系列組件被開發(fā)茉盏,使得kafka更像是一個(gè)DB的日志,而不是消息隊(duì)列枢冤。
2. 持久性
不要害怕文件系統(tǒng)鸠姨!
選擇用文件系統(tǒng),而不是把日志存進(jìn)進(jìn)程的緩存再統(tǒng)一flush淹真,有以下理由:
- 順序讀寫是隨機(jī)讀寫的3000x(機(jī)械硬盤)
- OS disk page cache 收益很大讶迁,有時(shí)緩存進(jìn)進(jìn)程反而是double cache了
- JVM的堆內(nèi)存管理比較雞肋,GC開銷也大
- 經(jīng)過(guò)原子化的訪問(wèn)和高效壓縮數(shù)據(jù)成字節(jié)序列核蘸,可以更高效使用內(nèi)存
- 為什么不用樹結(jié)構(gòu)巍糯?
B-Tree的結(jié)構(gòu)通用性很強(qiáng),O(logN)的操作復(fù)雜度對(duì)于很多系統(tǒng)都足夠了客扎。但是祟峦,樹結(jié)構(gòu)本身不具備擴(kuò)展性,尤其是其需要隨機(jī)磁盤讀寫徙鱼。在固定cache的情況下宅楞,樹形結(jié)構(gòu)的性能最好也是超線性的,隨著數(shù)據(jù)量的增加袱吆。
對(duì)于磁盤的順序?qū)懯莑og的普遍選擇厌衙。這里寫不會(huì)阻塞讀,寫操作都是O(1)的復(fù)雜度杆故。最重要的是迅箩,這里和數(shù)據(jù)量的大小不再有關(guān)系。因此处铛,kafka可以將消息持久化保持7天饲趋,以供重復(fù)讀。
3. 效率
由于消費(fèi)是規(guī)模最大的操作撤蟆,所以我們要盡可能把消費(fèi)做的“輕”奕塑。
disk efficiency:類似系統(tǒng)問(wèn)題主要在兩方面:I/O操作太多+Bytes過(guò)度拷貝。
之前已經(jīng)討論過(guò)disk方面的選擇家肯。I/O操作太多龄砰,kafka的解決方案是利用消息組的抽象概念,用大塊的消息讀寫(生產(chǎn)/消費(fèi))來(lái)均攤網(wǎng)絡(luò)代價(jià)。
Bytes過(guò)度拷貝問(wèn)題换棚,kafka讓producer, comsumer, broker采用同樣的序列化協(xié)議式镐,開辟了優(yōu)化空間,然后利用Sendfile系統(tǒng)調(diào)用減少Copy固蚤。(具體見(jiàn)我的另一篇文章)End-to-end Batch Compression
保證傳輸效率的另一點(diǎn)就是壓縮娘汞。kafka可以支持批壓縮,主要是因?yàn)椴煌琹og之間經(jīng)常會(huì)產(chǎn)生大量的重復(fù)夕玩。相比于端上的單條日志壓縮你弦,可以有更好的壓縮比。壓縮的消息會(huì)被寫入磁盤燎孟,會(huì)被發(fā)送給Consumer禽作,最終由消費(fèi)者解壓縮。支持的壓縮方案有GZIP, Snappy, LZ4 and ZStandard等揩页。
4. Producer
4.1 負(fù)載均衡
producer選好了partition旷偿,broker會(huì)直接answer給producer這個(gè)partition的leader所在的broker,然后直接傳輸碍沐,沒(méi)有中間的路由層狸捅。
producer如何選partition呢?可以隨機(jī)尘喝,也可以自選分區(qū)key和分區(qū)函數(shù)。以滿足一些本地性斋陪。
4.2 異步發(fā)送
批量發(fā)送∥扌椋可配置的定時(shí)/定量進(jìn)行buffer batch send。
5. Consumer
5.1 Push vs. pull
- push-fashion的系統(tǒng)友题,比如flume,難點(diǎn)在于對(duì)于多個(gè)消費(fèi)者度宦,沒(méi)有辦法根據(jù)消費(fèi)者的接收能力控制消費(fèi)速度。pull-fashion在這點(diǎn)要靈活的多戈抄。
- 可以由consumer主動(dòng)去進(jìn)行批量拉取(用戶配置)划鸽,而不是靠broker猜測(cè)戚哎。
- 如果broker暫時(shí)沒(méi)數(shù)據(jù),consumer不會(huì)忙等嫂用,會(huì)把自己阻塞掉型凳,定期輪詢。
5.2 Consumer Position
消息的消費(fèi)狀態(tài)由消費(fèi)者保存(offset)尸折。
如果由broker保存啰脚,會(huì)有一系列問(wèn)題。吞吐量肯定會(huì)低实夹,其次,如果消費(fèi)者消費(fèi)了粒梦,但是沒(méi)有Ack亮航,那么將來(lái)會(huì)重復(fù)消費(fèi),以及broker一系列tricky的問(wèn)題匀们。
5.3 Offline Data Load
由于kafka的持久存儲(chǔ)系統(tǒng)缴淋,數(shù)據(jù)倉(cāng)庫(kù)/HDFS,會(huì)選擇周期性的批量bulk load數(shù)據(jù)到數(shù)倉(cāng)中泄朴,這就對(duì)kafka對(duì)大量數(shù)據(jù)的吞吐效率有保證重抖。
對(duì)于此,kafka將數(shù)據(jù)負(fù)載平均分割祖灰,然后并行化钟沛,每一個(gè)split都是一個(gè)map task,也可以做combination局扶。某個(gè)map task fail掉了也不要緊恨统,可以直接從最初的位置重新開始。
5.4 Static Memebership
為了避免頻繁觸發(fā)rebalance導(dǎo)致Stop-The-World三妈,kafka 2.3之后設(shè)置了靜態(tài)成員畜埋,由用戶設(shè)置其consumer-id,好處是當(dāng)短暫離線退組畴蒲,再上線進(jìn)組時(shí)悠鞍,使用同一個(gè)id,不會(huì)導(dǎo)致rebalance模燥。但是其他broker和consumer變更的情況下咖祭,仍然必須要rebalance保證負(fù)載均衡。
(rebalance具體算法見(jiàn)我另一篇文章)
6. 消息傳遞語(yǔ)義
在kafka中涧窒,日志是有提交的概念的(具體的見(jiàn)下一節(jié))心肪,如果日志提交了,只要復(fù)制了這個(gè)分區(qū)的broker有一個(gè)活著的纠吴,日志就還在。在本節(jié)我們假設(shè)broker本身不會(huì)丟失數(shù)據(jù)固该,以便理解對(duì)producer/consumer的消息傳遞保證。
6.1 Producer Delivery
對(duì)于producer來(lái)說(shuō)怔匣,如果出現(xiàn)網(wǎng)絡(luò)錯(cuò)誤每瞒,是沒(méi)法知道傳輸?shù)娜罩臼欠褚呀?jīng)提交了纯露。在0.11.0版本以前,如果沒(méi)有收到ack浓利,那么沒(méi)有別的辦法贷掖,只能重傳渴语,這實(shí)際上就是至少一次的語(yǔ)義遵班。
然而在0.11.0版本之后狭郑,kafka為每個(gè)消息提供了Sequence number,為每個(gè)producer分發(fā)id脏答,這樣broker的接收操作殖告,可以設(shè)置為冪等的黄绩,就完成了對(duì)producer的確切一次的語(yǔ)義玷过。
而且也是從0.11.0開始,producer對(duì)多個(gè)topic partitions發(fā)送數(shù)據(jù)也可以保證事務(wù)性粤蝎,要么全部接收初澎,要么全都沒(méi)接收。
具體到使用的時(shí)候软啼,producer可以根據(jù)消息類型自主選擇持久化級(jí)別焰宣。log信息可以完全異步發(fā)送,當(dāng)有重要數(shù)據(jù)時(shí)也可以選擇有回調(diào)函數(shù)的Send盈罐,等待commit時(shí)block掉盅粪,commit的級(jí)別也可以設(shè)置,是leader收到即可或者需要多少個(gè)follower副本础浮。一般來(lái)說(shuō)豆同,同步的Send在10ms這個(gè)級(jí)別影锈。
6.2 Consumer Delivery
由于上面我們說(shuō)蝉绷,producer可以對(duì)多個(gè)topic partitions進(jìn)行事務(wù)性的寫(同時(shí)寫成功或同時(shí)不成功)熔吗。這給kafka的一個(gè)場(chǎng)景帶來(lái)了極大的便利:流處理桅狠。流處理就是通過(guò)一個(gè)topic經(jīng)過(guò)一些變換產(chǎn)出到另一個(gè)topic中去轿秧,整個(gè)過(guò)程都在kafka集群中完成砸逊。我們把兩條消息組成一個(gè)事務(wù):轉(zhuǎn)換后的消息+消費(fèi)的offset。利用producer的事務(wù)寫铝侵,要么offset和數(shù)據(jù)同時(shí)寫入触徐,要么同時(shí)沒(méi)有被寫入撞鹉,這就達(dá)成了消費(fèi)端的確切一次語(yǔ)義。
如果事務(wù)中途abort掉了享郊,對(duì)于consumer有兩種可見(jiàn)性炊琉,取決于consumer的隔離性級(jí)別
- read_uncommited:可以看到?jīng)]提交的寫
- read_commited:看不到?jīng)]提交的寫
上面說(shuō)的是流處理的過(guò)程是可以達(dá)成確切一次語(yǔ)義苔咪。對(duì)于consumer來(lái)自外部系統(tǒng)呢团赏?麻煩在于要把消費(fèi)者的位置(broker知道)和實(shí)際消費(fèi)的日志(consumer知道)同步起來(lái)夹界,一個(gè)通用的做法是進(jìn)行兩階段提交(編者注:kafka集群作為coordinator可柿,每一個(gè)consumer作為一個(gè)worker)。然而很多外部系統(tǒng)(比如HDFS)并不支持兩階段提交营密。因此只能用一個(gè)更輕型也更通用的方案评汰,讓每一個(gè)consumer把自己的offset和實(shí)際數(shù)據(jù)放在同一個(gè)位置。有一點(diǎn)不妙的是主儡,由于此時(shí)的消息沒(méi)有主鍵糜值,因此也無(wú)法進(jìn)行去重(編者注:offset不可以作為消息的主鍵么寂汇?)捣染。最終支持的是至少一次語(yǔ)義耍攘。
7. Replication
手工配置副本個(gè)數(shù)。副本個(gè)數(shù)為1就是不復(fù)制臼膏。
所有的讀寫都走leader,只在leader掛了嚷硫,follower才用于自動(dòng)故障轉(zhuǎn)移仔掸。
kafka在復(fù)制容錯(cuò)方面起暮,只考慮宕機(jī)/恢復(fù)模型,不會(huì)考慮分布式系統(tǒng)領(lǐng)域的拜占庭故障筒捺,即故意發(fā)錯(cuò)誤信息的特殊情況系吭。
kafka判定節(jié)點(diǎn)是否alive有兩個(gè)條件:
- 是否和zookeeper的session心跳保持聯(lián)系颗品;
- 是否和leader落后在一定范圍內(nèi)(用戶參數(shù))。
對(duì)于一個(gè)partition则吟, follower和leader共同構(gòu)成副本集, follower像是consumer一樣去拉取leader的日志。leader和alive follower共同構(gòu)成ISR水慨,leader時(shí)刻通過(guò)zookeeper跟蹤ISR集合讥巡,剔除死掉的follower舔哪。
producer可以在持久性和吞吐率之間做權(quán)衡捉蚤。可以設(shè)置mininum replica must write布持。producer有幾個(gè)選擇:
- 完全不需要ack
- 需要ack陕悬,但只要leader的就可以
- 需要ack捉超,要至少mininum副本寫入(minimum ISR)
對(duì)于1.2.這兩種選擇拼岳,可能只有l(wèi)eader寫了日志,然后就被消費(fèi)了叶撒。
因此kafka的保障是:對(duì)于提交的消息祠够,只要有一個(gè)副本活著粪牲,就不會(huì)丟失虑瀑。
kafka對(duì)于節(jié)點(diǎn)短時(shí)間宕機(jī)恢復(fù)有容錯(cuò)保障滴须,但是對(duì)網(wǎng)絡(luò)分區(qū)就不再保證可用了扔水。
7.1 Replicated Logs: Quorums, ISRs, and State Machines (Oh my!)
kafka采用的是replicated log模型魔市,即消息由leader定序赵哲,F(xiàn)ollower無(wú)腦copy即可枫夺。
如果leader宕機(jī)掉了橡庞,就要在ISR中啟動(dòng)多數(shù)選舉扒最。(Raft, Paxos等),最接近kafka的是MS的PacificA法竞。
對(duì)于宕機(jī)恢復(fù)的節(jié)點(diǎn)爪喘,kafka不要求它的數(shù)據(jù)完全一致纠拔,但是在加入ISR之前稠诲,它的數(shù)據(jù)必須得到全量的恢復(fù)诡曙。
7.2 trade-off between availability and duribility
上面說(shuō)過(guò)producer有三種選擇,對(duì)于第三種選擇劝萤,即最小ISR基數(shù)床嫌,存在一個(gè)trade-off。過(guò)于大的minimum容易導(dǎo)致分區(qū)不可用鳖谈,必須阻塞等待有足夠多的ISR缆娃;過(guò)于小的minimum容易導(dǎo)致數(shù)據(jù)丟失瑰排,比如minimum=1椭住,那么實(shí)際上只有l(wèi)eader寫入了函荣。這里存在一個(gè)權(quán)衡,要用戶把握乘碑。
7.3 Replica Management
kafka用round-rubin的方式保證某個(gè)topic的partitions不會(huì)聚集在少量的節(jié)點(diǎn)中兽肤。同樣资铡,也會(huì)用同樣的方式保證leaders不會(huì)聚集在少量節(jié)點(diǎn)中笤休。
另一方面店雅,一般kafka由節(jié)點(diǎn)掛掉闹啦,是broker直接掛掉辕坝,不會(huì)是某個(gè)partition掛掉,那么一個(gè)broker掛掉江场,可能會(huì)觸發(fā)幾個(gè)甚至幾十個(gè)partition的重新選舉/rebalance扛稽。此時(shí)kafka的策略是選擇另外一個(gè)broker在张,在更高的級(jí)別上領(lǐng)導(dǎo)這些partition的leader選舉,這樣使得選舉過(guò)程可以批量化矮慕,更為高效瘟斜。
8. Log Compaction
kafka log壓縮保證在一個(gè)topic partition內(nèi)螺句,在消息內(nèi)部每個(gè)key的最新值都會(huì)被保留下來(lái)橡类。這意味著在任意時(shí)刻取劫,我們能拿到當(dāng)前各個(gè)key的最新快照。這在一些事務(wù)型的日志中非常重要谱邪,可以用于下游的數(shù)據(jù)恢復(fù)惦银。
比如下圖這三次更改中扯俱,只有最后一條記錄不會(huì)被壓縮。
這樣的話其實(shí)就產(chǎn)生了兩種保留策略基显,一種是默認(rèn)的按照時(shí)間(7天)或者大小來(lái)保留;另一種是按照壓縮來(lái)保留善炫。
8.1 Log Compaction Basics
這是一個(gè)kafka log的邏輯視圖撩幽。
真實(shí)的log compaction大概是這個(gè)樣子的。offset即使被壓縮也永遠(yuǎn)不會(huì)變,以免含義混淆窜醉。
對(duì)于log compaction宪萄,kafka給出了一些保證:
- 消息會(huì)在一個(gè)可配置的時(shí)間之后才會(huì)進(jìn)入log尾,可壓縮榨惰;也就是說(shuō)拜英,如果一直在監(jiān)聽(tīng)消費(fèi)的consumer可以收到連續(xù)offset的消息琅催,不會(huì)立即被壓縮;
- 消息的順序不會(huì)被打亂弄兜,只是有些消息就被刪除了;
- 消息的offset不變;
(編者注:本節(jié)中有關(guān)消息刪除的暫時(shí)略過(guò))
8.2 Log Compaction Details
log compaction是由一個(gè)后臺(tái)的線程池log cleaner來(lái)做的,不會(huì)block前臺(tái)的produce/consume。同時(shí)也有一個(gè)用戶參數(shù)來(lái)限制compaction的I/O帶寬占用榨呆。一次log clean包含以下四步:
- 選擇最大的比例:log head/log tail
- 用一個(gè)哈希表對(duì)log head中的每個(gè)key進(jìn)行存儲(chǔ)
- 從頭到尾重新copy數(shù)據(jù)到一個(gè)新的位置彻消,那些老keys會(huì)被直接刪除谢澈,新的位置寫滿了1個(gè)segment file就會(huì)copy回去怠肋,所以只會(huì)有1個(gè)Segment file的額外空間占用。
9. Quota
這個(gè)是kafka在消費(fèi)組/消費(fèi)者之間的調(diào)度系統(tǒng),放止某些消費(fèi)者故意搗亂頻繁拉取數(shù)據(jù),占據(jù)了大量broker的資源而產(chǎn)生的〖紫祝可以按照帶寬/請(qǐng)求量進(jìn)行分配朦乏,這里偏運(yùn)維不細(xì)說(shuō),遇到再補(bǔ)充镊尺。