zookeeper簡介
Zookeeper是一種在分布式系統(tǒng)中被廣泛用來作為:分布式狀態(tài)管理艰赞、分布式協(xié)調(diào)管理、分布式配置管理、和分布式鎖服務(wù)的集群。kafka增加和減少服務(wù)器都會在Zookeeper節(jié)點(diǎn)上觸發(fā)相應(yīng)的事件kafka系統(tǒng)會捕獲這些事件阿宅,進(jìn)行新一輪的負(fù)載均衡,客戶端也會捕獲這些事件來進(jìn)行新一輪的處理笼蛛。
kafka簡介
生產(chǎn)者生產(chǎn)消息洒放、kafka集群、消費(fèi)者獲取消息這樣一種架構(gòu)滨砍,如下圖:
一些基本的概念:
Broker:Kafka集群包含一個或多個服務(wù)器往湿,這種服務(wù)器被稱為broker
Topic :每條發(fā)布到Kafka集群的消息都有一個類別,這個類別被稱為topic惋戏。(物理上不同topic的消息分開存儲领追,邏輯上一個topic的消息雖然保存于一個或多個broker上但用戶只需指定消息的topic即可生產(chǎn)或消費(fèi)數(shù)據(jù)而不必關(guān)心數(shù)據(jù)存于何處)
Partition :parition是物理上的概念,每個topic包含一個或多個partition日川,創(chuàng)建topic時可指定parition數(shù)量蔓腐。每個partition對應(yīng)于一個文件夾矩乐,該文件夾下存儲該partition的數(shù)據(jù)和索引文件龄句。一個分區(qū)可以看作是一個FIFO的隊(duì)列回论。
Tips:kafka只保證同一個Partition中的消息的順序性的。所以如果需要順序消費(fèi)數(shù)據(jù)分歇,可以根據(jù)key來消費(fèi)傀蓉。根據(jù)官方介紹:If a valid partition number is specified that partition will be used when sending the record. If no partition is specified but a key is present a partition will be chosen using a hash of the key. If neither key nor partition is present a partition will be assigned in a round-robin fashion.
Producer : 負(fù)責(zé)發(fā)布消息到Kafka broker
Consumer :消費(fèi)消息。每個consumer屬于一個特定的consuer group(可為每個consumer指定group name职抡,若不指定group name則屬于默認(rèn)的group)葬燎。使用consumer high level API時,同一topic的一條消息只能被同一個consumer group內(nèi)的一個consumer消費(fèi)缚甩,但多個consumer group可同時消費(fèi)這一消息谱净。
工作圖:
一個典型的kafka集群中包含若干producer(可以是web前端產(chǎn)生的page view,或者是服務(wù)器日志擅威,系統(tǒng)CPU壕探、memory等),若干broker(Kafka支持水平擴(kuò)展郊丛,一般broker數(shù)量越多李请,集群吞吐率越高),若干consumer group厉熟,以及一個Zookeeper集群导盅。Kafka通過Zookeeper管理集群配置,選舉leader揍瑟,以及在consumer group發(fā)生變化時進(jìn)行rebalance白翻。producer使用push模式將消息發(fā)布到broker,consumer使pull模式從broker訂閱并消費(fèi)消息月培。
Topic & Partition
Topic在邏輯上可以被認(rèn)為是一個在的queue嘁字,每條消費(fèi)都必須指定它的topic,可以簡單理解為必須指明把這條消息放進(jìn)哪個queue里杉畜。為了使得Kafka的吞吐率可以水平擴(kuò)展纪蜒,物理上把topic分成一個或多個partition,每個partition在物理上對應(yīng)一個文件夾此叠,該文件夾下存儲這個partition的所有消息和索引文件纯续。
如果partition規(guī)則設(shè)置的合理,所有消息可以均勻分布到不同的partition里灭袁,這樣就實(shí)現(xiàn)了水平擴(kuò)展猬错。(如果一個topic對應(yīng)一個文件,那這個文件所在的機(jī)器I/O將會成為這個topic的性能瓶頸茸歧,而partition解決了這個問題)倦炒。在創(chuàng)建topic時可以在$KAFKA_HOME/config/server.properties中指定這個partition的數(shù)量(如下所示),當(dāng)然也可以在topic創(chuàng)建之后去修改parition數(shù)量软瞎。
# The default number of log partitions per topic. More partitions allow greater# parallelism for consumption, but this will also result in more files across# the brokers.num.partitions=3
tips:對于傳統(tǒng)的message queue而言逢唤,一般會刪除已經(jīng)被消費(fèi)的消息拉讯,而Kafka集群會保留所有的消息,無論其被消費(fèi)與否鳖藕。當(dāng)然魔慷,因?yàn)榇疟P限制,不可能永久保留所有數(shù)據(jù)(實(shí)際上也沒必要)著恩,因此Kafka提供兩種策略去刪除舊數(shù)據(jù)院尔。一是基于時間,二是基于partition文件大小喉誊。例如可以通過配置$KAFKA_HOME/config/server.properties邀摆,讓Kafka刪除一周前的數(shù)據(jù),也可通過配置讓Kafka在partition文件超過1GB時刪除舊數(shù)據(jù)伍茄。這里要注意隧熙,因?yàn)镵afka讀取特定消息的時間復(fù)雜度為O(1),即與文件大小無關(guān)幻林,所以這里刪除文件與Kafka性能無關(guān)贞盯,選擇怎樣的刪除策略只與磁盤以及具體的需求有關(guān)。另外沪饺,Kafka會為每一個consumer group保留一些metadata信息—當(dāng)前消費(fèi)的消息的position躏敢,也即offset。這個offset由consumer控制整葡。正常情況下consumer會在消費(fèi)完一條消息后線性增加這個offset件余。當(dāng)然,consumer也可將offset設(shè)成一個較小的值遭居,重新消費(fèi)一些消息啼器。因?yàn)閛ffet由consumer控制,所以Kafka broker是無狀態(tài)的俱萍,它不需要標(biāo)記哪些消息被哪些consumer過端壳,不需要通過broker去保證同一個consumer group只有一個consumer能消費(fèi)某一條消息,因此也就不需要鎖機(jī)制枪蘑,這也為Kafka的高吞吐率提供了有力保障损谦。
Replication & Leader election
該 Replication與leader election配合提供了自動的failover機(jī)制。replication對Kafka的吞吐率是有一定影響的岳颇,但極大的增強(qiáng)了可用性照捡。默認(rèn)情況下,Kafka的replication數(shù)量為1话侧。 每個partition都有一個唯一的leader栗精,所有的讀寫操作都在leader上完成,follower批量從leader上pull數(shù)據(jù)瞻鹏。一般情況下partition的數(shù)量大于等于broker的數(shù)量悲立,并且所有partition的leader均勻分布在broker上赢赊。follower上的日志和其leader上的完全一樣。
和大部分分布式系統(tǒng)一樣级历,Kakfa處理失敗需要明確定義一個broker是否alive。對于Kafka而言叭披,Kafka存活包含兩個條件寥殖,一是它必須維護(hù)與Zookeeper的session(這個通過Zookeeper的heartbeat機(jī)制來實(shí)現(xiàn))。二是follower必須能夠及時將leader的writing復(fù)制過來涩蜘,不能“落后太多”嚼贡。
leader會track“in sync”的node list。如果一個follower宕機(jī)同诫,或者落后太多粤策,leader將把它從”in sync” list中移除。這里所描述的“落后太多”指follower復(fù)制的消息落后于leader后的條數(shù)超過預(yù)定值误窖。該值是server.properties文件中的replica.lag.max.messages=4000
tips:Kafka只解決”fail/recover”叮盘,不處理“Byzantine”(“拜占庭”)問題。這里應(yīng)該就是CAP理論中的AP吧霹俺。一條消息只有被“in sync” list里的所有follower都從leader復(fù)制過去才會被認(rèn)為已提交柔吼。這樣就避免了部分?jǐn)?shù)據(jù)被寫進(jìn)了leader,還沒來得及被任何follower復(fù)制就宕機(jī)了丙唧,而造成數(shù)據(jù)丟失(consumer無法消費(fèi)這些數(shù)據(jù))愈魏。而對于producer而言,它可以選擇是否等待消息commit想际,這可以通過request.required.acks來設(shè)置培漏。這種機(jī)制確保了只要“in sync” list有一個或以上的flollower,一條被commit的消息就不會丟失胡本。
這里的復(fù)制機(jī)制即不是同步復(fù)制牌柄,也不是單純的異步復(fù)制。事實(shí)上侧甫,同步復(fù)制要求“活著的”follower都復(fù)制完友鼻,這條消息才會被認(rèn)為commit,這種復(fù)制方式極大的影響了吞吐率闺骚。而異步復(fù)制方式下彩扔,follower異步的從leader復(fù)制數(shù)據(jù),數(shù)據(jù)只要被leader寫入log就被認(rèn)為已經(jīng)commit僻爽,這種情況下如果follwer都落后于leader虫碉,而leader突然宕機(jī),則會丟失數(shù)據(jù)胸梆。而Kafka的這種使用“in sync” list的方式則很好的均衡了確保數(shù)據(jù)不丟失以及吞吐率敦捧。follower可以批量的從leader復(fù)制數(shù)據(jù)须板,這樣極大的提高復(fù)制性能(批量寫磁盤),極大減少了follower與leader的差距(前文有說到兢卵,只要follower落后leader不太遠(yuǎn)习瑰,則被認(rèn)為在“in sync” list里)。
上文說明了Kafka是如何做replication的秽荤,另外一個很重要的問題是當(dāng)leader宕機(jī)了甜奄,怎樣在follower中選舉出新的leader。因?yàn)閒ollower可能落后許多或者crash了窃款,所以必須確保選擇“最新”的follower作為新的leader课兄。一個基本的原則就是,如果leader不在了晨继,新的leader必須擁有原來的leader commit的所有消息烟阐。這就需要作一個折衷,如果leader在標(biāo)明一條消息被commit前等待更多的follower確認(rèn)紊扬,那在它die之后就有更多的follower可以作為新的leader蜒茄,但這也會造成吞吐率的下降。
一種非常常用的選舉leader的方式是“majority 靈秀”(“少數(shù)服從多數(shù)”)餐屎,這種模式下扩淀,如果我們有2f+1個replica(包含leader和follower),那在commit之前必須保證有f+1個replica復(fù)制完消息啤挎,為了保證正確選出新的leader驻谆,fail的replica不能超過f個。因?yàn)樵谑O碌娜我鈌+1個replica里庆聘,至少有一個replica包含有最新的所有消息胜臊。這種方式有個很大的優(yōu)勢,系統(tǒng)的latency只取決于最快的幾臺server伙判,也就是說象对,如果replication factor是3,那latency就取決于最快的那個follower而非最慢那個宴抚。majority vote也有一些劣勢勒魔,為了保證leader election的正常進(jìn)行,它所能容忍的fail的follower個數(shù)比較少菇曲。如果要容忍1個follower掛掉冠绢,必須要有3個以上的replica,如果要容忍2個follower掛掉常潮,必須要有5個以上的replica弟胀。也就是說,在生產(chǎn)環(huán)境下為了保證較高的容錯程度,必須要有大量的replica孵户,而大量的replica又會在大數(shù)據(jù)量下導(dǎo)致性能的急劇下降萧朝。這就是這種算法更多用在Zookeeper這種共享集群配置的系統(tǒng)中而很少在需要存儲大量數(shù)據(jù)的系統(tǒng)中使用的原因夏哭。
而kafka的方式是:Kafka在Zookeeper中動態(tài)維護(hù)了一個ISR(in-sync replicas) set竖配,這個set里的所有replica都跟上了leader,只有ISR里的成員才有被選為leader的可能龄减。在這種模式下,對于f+1個replica宠能,一個Kafka topic能在保證不丟失已經(jīng)commit的消息的前提下容忍f個replica的失敗。