消息持久化和緩存
Kafka高度依賴文件系統(tǒng)來(lái)存儲(chǔ)和緩存消息。一般的人都認(rèn)為“磁盤是緩慢的”,這使得人們對(duì)“持久化結(jié)構(gòu)提供具有競(jìng)爭(zhēng)性的性能”這樣的結(jié)論持有懷疑態(tài)度。實(shí)際上,磁盤比人們預(yù)想的快很多也慢很多芜辕,這取決于它們?nèi)绾伪皇褂茫灰粋€(gè)好的磁盤結(jié)構(gòu)設(shè)計(jì)可以使之跟網(wǎng)絡(luò)速度一樣快活孩。
一個(gè)有關(guān)磁盤性能的關(guān)鍵事實(shí)是:磁盤驅(qū)動(dòng)器的吞吐量跟尋道延遲是相背離的物遇。結(jié)果就是:在一個(gè)6 7200rpm SATA RAID-5 的磁盤陣列上線性寫(xiě)的速度大概是300M/秒乖仇,但是隨機(jī)寫(xiě)的速度只有50K/秒,兩者相差將近10000倍询兴。線性讀寫(xiě)在大多數(shù)應(yīng)用場(chǎng)景下是可以預(yù)測(cè)的乃沙,因此,操作系統(tǒng)利用read-ahead和write-behind技術(shù)來(lái)從大的數(shù)據(jù)塊中預(yù)取數(shù)據(jù)诗舰,或者將多個(gè)邏輯上的寫(xiě)操作組合成一個(gè)大寫(xiě)物理寫(xiě)操作中警儒,對(duì)磁盤的線性讀在有些情況下可以比內(nèi)存的隨機(jī)訪問(wèn)要快一些。消息讀取
Kafka在讀方面使用了sendfile這個(gè)高級(jí)系統(tǒng)函數(shù)眶根,也即zero-copy技術(shù)蜀铲,感興趣的同學(xué)可以去閱讀IBM的文章。 這項(xiàng)技術(shù)通過(guò)減少系統(tǒng)拷貝次數(shù)属百,極大地提高了數(shù)據(jù)傳輸?shù)男始侨啊榱死斫鈙endfile的影響,需要理解一般的將數(shù)據(jù)從文件傳到套接字的路徑:
1)操作系統(tǒng)將數(shù)據(jù)從磁盤讀到內(nèi)核空間的頁(yè)緩存中
2)應(yīng)用將數(shù)據(jù)從內(nèi)核空間讀到用戶空間的緩存中
3)應(yīng)用將數(shù)據(jù)寫(xiě)回內(nèi)存空間的套接字緩存中
4)操作系統(tǒng)將數(shù)據(jù)從套接字緩存寫(xiě)到網(wǎng)卡緩存中族扰,以便將數(shù)據(jù)經(jīng)網(wǎng)絡(luò)發(fā)出
這樣做明顯是低效的厌丑,這里有四次拷貝,兩次系統(tǒng)調(diào)用渔呵。如果使用sendfile怒竿,再次拷貝可以被避免:允許操作系統(tǒng)將數(shù)據(jù)直接從頁(yè)緩存發(fā)送到網(wǎng)絡(luò)上。所以在這個(gè)優(yōu)化的路徑中扩氢,只有最后一步將數(shù)據(jù)拷貝到網(wǎng)卡緩存中是需要的耕驰。
我們期望一個(gè)主題上有多個(gè)消費(fèi)者是一種常見(jiàn)的應(yīng)用場(chǎng)景。利用上述的零拷貝录豺,數(shù)據(jù)只被拷貝到頁(yè)緩存一次朦肘,然后就可以在每次消費(fèi)時(shí)被重得利用,而不需要將數(shù)據(jù)存在內(nèi)存中巩检,然后在每次讀的時(shí)候拷貝到內(nèi)核空間中厚骗。這使得消息消費(fèi)速度可以達(dá)到網(wǎng)絡(luò)連接的速度。端到端的批量壓縮
在許多場(chǎng)景下兢哭,瓶頸實(shí)際上不是CPU而是網(wǎng)絡(luò)。這在需要在多個(gè)數(shù)據(jù)中心之間發(fā)送消息的數(shù)據(jù)流水線的情況下更是如此夫嗓。當(dāng)然迟螺,用戶可以不需要Kafka的支持而發(fā)送壓縮后的消息,但是這會(huì)導(dǎo)致非常差的壓縮率舍咖。高效的壓縮需要將多個(gè)消息一塊兒壓縮而不是對(duì)每一個(gè)消息進(jìn)行壓縮矩父。理想情況下,這可以在端到端的情況下實(shí)現(xiàn)排霉,數(shù)據(jù)會(huì)先被壓縮窍株,然后被生產(chǎn)者發(fā)送,并且在服務(wù)端也是保持壓縮狀態(tài),只有在最終的消費(fèi)者端才會(huì)被解壓縮球订。
Kafka通過(guò)遞歸消息集合來(lái)支持這一點(diǎn)后裸。一批消息可以放在一起被壓縮,然后以這種形式發(fā)給服務(wù)器冒滩。這批消息會(huì)被遞送到相同的消費(fèi)者那里微驶,并且保持壓縮的形式,直到它到達(dá)目的地,Kafka支持GZIP和Snappy壓縮協(xié)議开睡。消費(fèi)狀態(tài)由消費(fèi)者自己維護(hù)
在Kafka中因苹,消費(fèi)者負(fù)責(zé)記錄狀態(tài)信息(偏移量),也就是已經(jīng)消費(fèi)到哪個(gè)位置了篇恒。準(zhǔn)確地說(shuō)扶檐,消費(fèi)者庫(kù)將他們的狀態(tài)信息寫(xiě)到zookeeper中。但是胁艰,將狀態(tài)數(shù)據(jù)寫(xiě)到另一個(gè)地方——處理結(jié)果所存放的數(shù)據(jù)中心——可能會(huì)更好款筑。打個(gè)比方,消費(fèi)者可能只需要簡(jiǎn)單地將一些合計(jì)值寫(xiě)到中心化的事務(wù)型OLTP數(shù)據(jù)庫(kù)中蝗茁。在這種情況下醋虏,消費(fèi)者可以將狀態(tài)信息寫(xiě)到同一個(gè)事務(wù)中。這解決了分布式一致性問(wèn)題——通過(guò)去除分布式部分哮翘。類似的技巧可以用在一些非事務(wù)型的系統(tǒng)中颈嚼。一個(gè)搜索系統(tǒng)可以將消費(fèi)者狀態(tài)存放在索引塊中。盡管這不提供持久性保證饭寺,但這意味著索引可以和消費(fèi)者狀態(tài)保持同步:如果一個(gè)沒(méi)有刷新的索引塊在一次故障中丟失了阻课,那么這些索引可以從最近的檢查點(diǎn)偏移處開(kāi)始重新消費(fèi)。同樣的艰匙,在并行加載數(shù)據(jù)到Hadoop時(shí)限煞,可以利用類似的技巧。每個(gè)mapper在map 任務(wù)的最后將偏移量寫(xiě)到HDFS中员凝。這樣的話署驻,如果一個(gè)加載任務(wù)失敗了,每個(gè)mapper可以簡(jiǎn)單地從存儲(chǔ)在HDFS中的偏移量處重啟消費(fèi)健霹。
這個(gè)決定有另外一個(gè)好處旺上。消費(fèi)者可以重新消費(fèi)已經(jīng)消費(fèi)過(guò)的數(shù)據(jù)。這違反了隊(duì)列的性質(zhì)糖埋,但是這樣可以使多個(gè)消費(fèi)者一起來(lái)消費(fèi)宣吱。打個(gè)比方,如果一段消費(fèi)者代碼出bug了瞳别,在發(fā)現(xiàn)bug之間這個(gè)消費(fèi)者又消費(fèi)了一堆數(shù)據(jù)征候,那個(gè)在bug修復(fù)之后杭攻,消費(fèi)者可以從指定的位置重新消費(fèi)。自動(dòng)的生產(chǎn)者負(fù)載均衡
Kafka支持消息生產(chǎn)者在客戶端的負(fù)載均衡疤坝,或者利用專有的負(fù)載均衡器來(lái)均衡TCP連接兆解。一個(gè)專用的四層均衡器通過(guò)將TCP連接均衡到Kafka的broker上來(lái)工作。在這種配置下卒煞,所有的來(lái)自同一個(gè)生產(chǎn)者的消息被發(fā)送到一個(gè)borker上痪宰,這種做法的優(yōu)點(diǎn)是,一個(gè)生產(chǎn)者只需要一個(gè)TCP連接畔裕,而不需要與zookeeper的連接衣撬。缺點(diǎn)是負(fù)載均衡只能在TCP連接的層面上來(lái)做,因此扮饶,它有可能不是均衡得非常好(如果一些生產(chǎn)者比其他生產(chǎn)者生產(chǎn)更多的消息具练,給每個(gè)broker分配相同的TCP連接不一定會(huì)使每個(gè)broker得到相同的消息)。
基于zookeeper的客戶端的負(fù)載均衡可以解決這個(gè)問(wèn)題甜无。它允許生產(chǎn)者動(dòng)態(tài)地發(fā)現(xiàn)新的broker扛点,并且在每個(gè)請(qǐng)求上進(jìn)行負(fù)載均衡。同樣的岂丘,它允許生產(chǎn)者根據(jù)一些鍵將數(shù)據(jù)分開(kāi)陵究,而不是隨機(jī)分,這可以增加與消費(fèi)者的粘性(比如奥帘,根據(jù)用用戶id來(lái)化分?jǐn)?shù)據(jù)的消費(fèi))铜邮。這個(gè)特性被稱為“語(yǔ)義化分”,下文會(huì)詳述寨蹋。
這種基于zookeeper的負(fù)載均衡如下所述松蒜。zookeeper watchers注冊(cè)以下一些事件:
1)一個(gè)新的broker啟動(dòng)
2)一個(gè)broker關(guān)閉
3)一個(gè)新的主題注冊(cè)進(jìn)來(lái)
4)一個(gè)borker注冊(cè)一個(gè)已經(jīng)存在的主題
在內(nèi)部,生產(chǎn)者維護(hù)一個(gè)與borker的彈性連接池已旧。這個(gè)連接池通過(guò)zookeeper watchers的回調(diào)函數(shù)來(lái)保持更新以便與所有存活的broker建立或保持連接秸苗。當(dāng)一個(gè)生產(chǎn)者對(duì)某一個(gè)主題的請(qǐng)求上來(lái)時(shí),一個(gè)主題的分區(qū)被分區(qū)器提取到运褪。連接池中的一個(gè)連接被用來(lái)將數(shù)據(jù)發(fā)送到前面所選的那個(gè)broker分區(qū)中惊楼。拉還是推?
Kafka采用的策略是:生產(chǎn)者把數(shù)據(jù)推到borker上,而消費(fèi)者主動(dòng)去broker上拉數(shù)據(jù)秸讹。最近的一些系統(tǒng)包括flume和scribe胁后,都是broker將數(shù)據(jù)推給消費(fèi)者,這有可能會(huì)存在一個(gè)問(wèn)題嗦枢,如果推的速度過(guò)快,消費(fèi)者會(huì)被淹沒(méi)屯断。而在Kafka中不會(huì)出現(xiàn)這樣的問(wèn)題文虏,因?yàn)橄M(fèi)者是主動(dòng)去borker上拉數(shù)據(jù)的侣诺。異步發(fā)送
異步的非阻塞發(fā)送對(duì)于擴(kuò)展消息系統(tǒng)是基本的。在Kafka中氧秘,生產(chǎn)者提供一個(gè)選項(xiàng)用來(lái)使用生產(chǎn)請(qǐng)求的異步分派(producer.type=async)年鸳。這允許將生產(chǎn)請(qǐng)求緩存在一個(gè)內(nèi)存隊(duì)列中,然后在被一個(gè)時(shí)間間隔或者預(yù)先設(shè)定的batch大小觸發(fā)時(shí)發(fā)送出去丸相。由于數(shù)據(jù)是從異構(gòu)的機(jī)器上以不同的速率發(fā)布的搔确,這種異步的緩存機(jī)制可以生成統(tǒng)一的通往broker的traffic, 從而使得網(wǎng)絡(luò)資源得到充分利用灭忠,同時(shí)也提高吞吐量膳算。Replication & Leader election
Kafka從0.8開(kāi)始提供partition級(jí)別的replication,replication的數(shù)量可在$KAFKA_HOME/config/server.properties
中配置弛作。
default.replication.factor = 1
該 Replication與leader election配合提供了自動(dòng)的failover機(jī)制涕蜂。replication對(duì)Kafka的吞吐率是有一定影響的,但極大的增強(qiáng)了可用性映琳。默認(rèn)情況下机隙,Kafka的replication數(shù)量為1。 每個(gè)partition都有一個(gè)唯一的leader萨西,所有的讀寫(xiě)操作都在leader上完成有鹿,leader批量從leader上pull數(shù)據(jù)。一般情況下partition的數(shù)量大于等于broker的數(shù)量谎脯,并且所有partition的leader均勻分布在broker上葱跋。follower上的日志和其leader上的完全一樣。
和大部分分布式系統(tǒng)一樣穿肄,Kakfa處理失敗需要明確定義一個(gè)broker是否alive年局。對(duì)于Kafka而言,Kafka存活包含兩個(gè)條件咸产,一是它必須維護(hù)與Zookeeper的session(這個(gè)通過(guò)Zookeeper的heartbeat機(jī)制來(lái)實(shí)現(xiàn))矢否。二是follower必須能夠及時(shí)將leader的writing復(fù)制過(guò)來(lái),不能“落后太多”脑溢。
leader會(huì)track“in sync”的node list僵朗。如果一個(gè)follower宕機(jī),或者落后太多屑彻,leader將把它從”in sync” list中移除验庙。這里所描述的“落后太多”指follower復(fù)制的消息落后于leader后的條數(shù)超過(guò)預(yù)定值,該值可在 $KAFKA_HOME/config/server.properties
中配置
#If a replica falls more than this many messages behind the leader, the leader will remove the follower from ISR and treat it as deadreplica.lag.max.messages=4000#If a follower hasn't sent any fetch requests for this window of time, the leader will remove the follower from ISR (in-sync replicas) and treat it as deadreplica.lag.time.max.ms=10000
需要說(shuō)明的是社牲,Kafka只解決”fail/recover”粪薛,不處理“Byzantine”(“拜占庭”)問(wèn)題。
一條消息只有被“in sync” list里的所有follower都從leader復(fù)制過(guò)去才會(huì)被認(rèn)為已提交搏恤。這樣就避免了部分?jǐn)?shù)據(jù)被寫(xiě)進(jìn)了leader违寿,還沒(méi)來(lái)得及被任何follower復(fù)制就宕機(jī)了湃交,而造成數(shù)據(jù)丟失(consumer無(wú)法消費(fèi)這些數(shù)據(jù))。而對(duì)于producer而言藤巢,它可以選擇是否等待消息commit搞莺,這可以通過(guò) request.required.acks
來(lái)設(shè)置。這種機(jī)制確保了只要“in sync” list有一個(gè)或以上的flollower掂咒,一條被commit的消息就不會(huì)丟失才沧。
這里的復(fù)制機(jī)制即不是同步復(fù)制,也不是單純的異步復(fù)制绍刮。事實(shí)上温圆,同步復(fù)制要求“活著的”follower都復(fù)制完,這條消息才會(huì)被認(rèn)為commit录淡,這種復(fù)制方式極大的影響了吞吐率(高吞吐率是Kafka非常重要的一個(gè)特性)捌木。而異步復(fù)制方式下,follower異步的從leader復(fù)制數(shù)據(jù)嫉戚,數(shù)據(jù)只要被leader寫(xiě)入log就被認(rèn)為已經(jīng)commit刨裆,這種情況下如果follwer都落后于leader,而leader突然宕機(jī)彬檀,則會(huì)丟失數(shù)據(jù)帆啃。而Kafka的這種使用“in sync” list的方式則很好的均衡了確保數(shù)據(jù)不丟失以及吞吐率。follower可以批量的從leader復(fù)制數(shù)據(jù)窍帝,這樣極大的提高復(fù)制性能(批量寫(xiě)磁盤)努潘,極大減少了follower與leader的差距(前文有說(shuō)到,只要follower落后leader不太遠(yuǎn)坤学,則被認(rèn)為在“in sync” list里)疯坤。
上文說(shuō)明了Kafka是如何做replication的,另外一個(gè)很重要的問(wèn)題是當(dāng)leader宕機(jī)了深浮,怎樣在follower中選舉出新的leader压怠。因?yàn)閒ollower可能落后許多或者crash了,所以必須確保選擇“最新”的follower作為新的leader飞苇。一個(gè)基本的原則就是菌瘫,如果leader不在了,新的leader必須擁有原來(lái)的leader commit的所有消息布卡。這就需要作一個(gè)折衷雨让,如果leader在標(biāo)明一條消息被commit前等待更多的follower確認(rèn),那在它die之后就有更多的follower可以作為新的leader忿等,但這也會(huì)造成吞吐率的下降栖忠。
一種非常常用的選舉leader的方式是“majority 靈秀”(“少數(shù)服從多數(shù)”),但Kafka并未采用這種方式。這種模式下娃闲,如果我們有2f+1個(gè)replica(包含leader和follower)虚汛,那在commit之前必須保證有f+1個(gè)replica復(fù)制完消息,為了保證正確選出新的leader皇帮,fail的replica不能超過(guò)f個(gè)。因?yàn)樵谑O碌娜我鈌+1個(gè)replica里蛋辈,至少有一個(gè)replica包含有最新的所有消息属拾。這種方式有個(gè)很大的優(yōu)勢(shì),系統(tǒng)的latency只取決于最快的幾臺(tái)server冷溶,也就是說(shuō)渐白,如果replication factor是3,那latency就取決于最快的那個(gè)follower而非最慢那個(gè)逞频。majority vote也有一些劣勢(shì)纯衍,為了保證leader election的正常進(jìn)行,它所能容忍的fail的follower個(gè)數(shù)比較少苗胀。如果要容忍1個(gè)follower掛掉襟诸,必須要有3個(gè)以上的replica,如果要容忍2個(gè)follower掛掉基协,必須要有5個(gè)以上的replica歌亲。也就是說(shuō),在生產(chǎn)環(huán)境下為了保證較高的容錯(cuò)程度澜驮,必須要有大量的replica陷揪,而大量的replica又會(huì)在大數(shù)據(jù)量下導(dǎo)致性能的急劇下降。這就是這種算法更多用在 Zookeeper 這種共享集群配置的系統(tǒng)中而很少在需要存儲(chǔ)大量數(shù)據(jù)的系統(tǒng)中使用的原因杂穷。例如HDFS的HA feature是基于 majority-vote-based journal 悍缠,但是它的數(shù)據(jù)存儲(chǔ)并沒(méi)有使用這種expensive的方式。
實(shí)際上耐量,leader election算法非常多飞蚓,比如Zookeper的 Zab , Raft 和 Viewstamped Replication 。而Kafka所使用的leader election算法更像微軟的 PacificA 算法拴鸵。
Kafka在Zookeeper中動(dòng)態(tài)維護(hù)了一個(gè)ISR(in-sync replicas) set玷坠,這個(gè)set里的所有replica都跟上了leader,只有ISR里的成員才有被選為leader的可能劲藐。在這種模式下八堡,對(duì)于f+1個(gè)replica,一個(gè)Kafka topic能在保證不丟失已經(jīng)ommit的消息的前提下容忍f個(gè)replica的失敗聘芜。在大多數(shù)使用場(chǎng)景中兄渺,這種模式是非常有利的。事實(shí)上汰现,為了容忍f個(gè)replica的失敗挂谍,majority vote和ISR在commit前需要等待的replica數(shù)量是一樣的叔壤,但是ISR需要的總的replica的個(gè)數(shù)幾乎是majority vote的一半。
雖然majority vote與ISR相比有不需等待最慢的server這一優(yōu)勢(shì)口叙,但是Kafka作者認(rèn)為Kafka可以通過(guò)producer選擇是否被commit阻塞來(lái)改善這一問(wèn)題炼绘,并且節(jié)省下來(lái)的replica和磁盤使得ISR模式仍然值得。
上文提到妄田,在ISR中至少有一個(gè)follower時(shí)咙好,Kafka可以確保已經(jīng)commit的數(shù)據(jù)不丟失该窗,但如果某一個(gè)partition的所有replica都掛了赫舒,就無(wú)法保證數(shù)據(jù)不丟失了翼馆。這種情況下有兩種可行的方案:
等待ISR中的任一個(gè)replica“活”過(guò)來(lái),并且選它作為leader
選擇第一個(gè)“活”過(guò)來(lái)的replica(不一定是ISR中的)作為leader
這就需要在可用性和一致性當(dāng)中作出一個(gè)簡(jiǎn)單的平衡启具。如果一定要等待ISR中的replica“活”過(guò)來(lái)本讥,那不可用的時(shí)間就可能會(huì)相對(duì)較長(zhǎng)。而且如果ISR中的所有replica都無(wú)法“活”過(guò)來(lái)了鲁冯,或者數(shù)據(jù)都丟失了拷沸,這個(gè)partition將永遠(yuǎn)不可用。選擇第一個(gè)“活”過(guò)來(lái)的replica作為leader晓褪,而這個(gè)replica不是ISR中的replica堵漱,那即使它并不保證已經(jīng)包含了所有已commit的消息,它也會(huì)成為leader而作為consumer的數(shù)據(jù)源(前文有說(shuō)明涣仿,所有讀寫(xiě)都由leader完成)勤庐。Kafka0.8.*使用了第二種方式。根據(jù)Kafka的文檔好港,在以后的版本中愉镰,Kafka支持用戶通過(guò)配置選擇這兩種方式中的一種,從而根據(jù)不同的使用場(chǎng)景選擇高可用性還是強(qiáng)一致性钧汹。
上文說(shuō)明了一個(gè)parition的replication過(guò)程丈探,然爾Kafka集群需要管理成百上千個(gè)partition,Kafka通過(guò)round-robin的方式來(lái)平衡partition從而避免大量partition集中在了少數(shù)幾個(gè)節(jié)點(diǎn)上拔莱。同時(shí)Kafka也需要平衡leader的分布碗降,盡可能的讓所有partition的leader均勻分布在不同broker上。另一方面塘秦,優(yōu)化leadership election的過(guò)程也是很重要的讼渊,畢竟這段時(shí)間相應(yīng)的partition處于不可用狀態(tài)。一種簡(jiǎn)單的實(shí)現(xiàn)是暫停宕機(jī)的broker上的所有partition尊剔,并為之選舉leader爪幻。實(shí)際上,Kafka選舉一個(gè)broker作為controller,這個(gè)controller通過(guò)watch Zookeeper檢測(cè)所有的broker failure挨稿,并負(fù)責(zé)為所有受影響的parition選舉leader仇轻,再將相應(yīng)的leader調(diào)整命令發(fā)送至受影響的broker,過(guò)程如下圖所示奶甘。
這樣做的好處是篷店,可以批量的通知leadership的變化,從而使得選舉過(guò)程成本更低甩十,尤其對(duì)大量的partition而言船庇。如果controller失敗了,幸存的所有broker都會(huì)嘗試在Zookeeper中創(chuàng)建/controller->{this broker id}侣监,如果創(chuàng)建成功(只可能有一個(gè)創(chuàng)建成功),則該broker會(huì)成為controller臣淤,若創(chuàng)建不成功橄霉,則該broker會(huì)等待新controller的命令。