Kafka史上最詳細(xì)原理總結(jié)

姓名:周小蓬 16019110037

轉(zhuǎn)載自:http://blog.csdn.net/YChenFeng/article/details/74980531

[嵌牛導(dǎo)讀]

Kafka

Kafka是最初由Linkedin公司開(kāi)發(fā),是一個(gè)分布式、支持分區(qū)的(partition)眯娱、多副本的(replica)普办,基于zookeeper協(xié)調(diào)的分布式消息系統(tǒng),它的最大的特性就是可以實(shí)時(shí)的處理大量數(shù)據(jù)以滿(mǎn)足各種需求場(chǎng)景:比如基于hadoop的批處理系統(tǒng)亩钟、低延遲的實(shí)時(shí)系統(tǒng)、storm/Spark流式處理引擎,web/nginx日志痕寓、訪問(wèn)日志傲醉,消息服務(wù)等等,用scala語(yǔ)言編寫(xiě)呻率,Linkedin于2010年貢獻(xiàn)給了Apache基金會(huì)并成為頂級(jí)開(kāi)源 項(xiàng)目硬毕。

[嵌牛鼻子]

kafka

[嵌牛提問(wèn)]

如何學(xué)習(xí)kafka以及步驟

[嵌牛正文]

1.前言

消息隊(duì)列的性能好壞,其文件存儲(chǔ)機(jī)制設(shè)計(jì)是衡量一個(gè)消息隊(duì)列服務(wù)技術(shù)水平和最關(guān)鍵指標(biāo)之一礼仗。下面將從Kafka文件存儲(chǔ)機(jī)制和物理結(jié)構(gòu)角度吐咳,分析Kafka是如何實(shí)現(xiàn)高效文件存儲(chǔ),及實(shí)際應(yīng)用效果元践。

1.1 ?Kafka的特性:

- 高吞吐量韭脊、低延遲:kafka每秒可以處理幾十萬(wàn)條消息,它的延遲最低只有幾毫秒单旁,每個(gè)topic可以分多個(gè)partition, consumer group 對(duì)partition進(jìn)行consume操作沪羔。

- 可擴(kuò)展性:kafka集群支持熱擴(kuò)展

- 持久性、可靠性:消息被持久化到本地磁盤(pán)象浑,并且支持?jǐn)?shù)據(jù)備份防止數(shù)據(jù)丟失

- 容錯(cuò)性:允許集群中節(jié)點(diǎn)失斎文凇(若副本數(shù)量為n,則允許n-1個(gè)節(jié)點(diǎn)失敗)

- 高并發(fā):支持?jǐn)?shù)千個(gè)客戶(hù)端同時(shí)讀寫(xiě)

1.2 ? Kafka的使用場(chǎng)景:

- 日志收集:一個(gè)公司可以用Kafka可以收集各種服務(wù)的log融柬,通過(guò)kafka以統(tǒng)一接口服務(wù)的方式開(kāi)放給各種consumer死嗦,例如hadoop、Hbase粒氧、Solr等越除。

- 消息系統(tǒng):解耦和生產(chǎn)者和消費(fèi)者、緩存消息等外盯。

- 用戶(hù)活動(dòng)跟蹤:Kafka經(jīng)常被用來(lái)記錄web用戶(hù)或者app用戶(hù)的各種活動(dòng)摘盆,如瀏覽網(wǎng)頁(yè)、搜索饱苟、點(diǎn)擊等活動(dòng)孩擂,這些活動(dòng)信息被各個(gè)服務(wù)器發(fā)布到kafka的topic中,然后訂閱者通過(guò)訂閱這些topic來(lái)做實(shí)時(shí)的監(jiān)控分析箱熬,或者裝載到hadoop类垦、數(shù)據(jù)倉(cāng)庫(kù)中做離線分析和挖掘。

- 運(yùn)營(yíng)指標(biāo):Kafka也經(jīng)常用來(lái)記錄運(yùn)營(yíng)監(jiān)控?cái)?shù)據(jù)城须。包括收集各種分布式應(yīng)用的數(shù)據(jù)蚤认,生產(chǎn)各種操作的集中反饋,比如報(bào)警和報(bào)告糕伐。

- 流式處理:比如spark streaming和storm

- 事件源

1.3 ?Kakfa的設(shè)計(jì)思想

-Kakfa Broker Leader的選舉:Kakfa Broker集群受Zookeeper管理砰琢。所有的Kafka Broker節(jié)點(diǎn)一起去Zookeeper上注冊(cè)一個(gè)臨時(shí)節(jié)點(diǎn),因?yàn)橹挥幸粋€(gè)Kafka Broker會(huì)注冊(cè)成功,其他的都會(huì)失敗陪汽,所以這個(gè)成功在Zookeeper上注冊(cè)臨時(shí)節(jié)點(diǎn)的這個(gè)Kafka Broker會(huì)成為Kafka Broker Controller训唱,其他的Kafka broker叫Kafka Broker follower。(這個(gè)過(guò)程叫Controller在ZooKeeper注冊(cè)Watch)挚冤。這個(gè)Controller會(huì)監(jiān)聽(tīng)其他的Kafka Broker的所有信息雪情,如果這個(gè)kafka broker controller宕機(jī)了,在zookeeper上面的那個(gè)臨時(shí)節(jié)點(diǎn)就會(huì)消失你辣,此時(shí)所有的kafka broker又會(huì)一起去Zookeeper上注冊(cè)一個(gè)臨時(shí)節(jié)點(diǎn),因?yàn)橹挥幸粋€(gè)Kafka Broker會(huì)注冊(cè)成功尘执,其他的都會(huì)失敗舍哄,所以這個(gè)成功在Zookeeper上注冊(cè)臨時(shí)節(jié)點(diǎn)的這個(gè)Kafka Broker會(huì)成為Kafka Broker Controller,其他的Kafka broker叫Kafka Broker follower誊锭。例如:一旦有一個(gè)broker宕機(jī)了表悬,這個(gè)kafka broker controller會(huì)讀取該宕機(jī)broker上所有的partition在zookeeper上的狀態(tài),并選取ISR列表中的一個(gè)replica作為partition leader(如果ISR列表中的replica全掛丧靡,選一個(gè)幸存的replica作為leader; 如果該partition的所有的replica都宕機(jī)了蟆沫,則將新的leader設(shè)置為-1,等待恢復(fù)温治,等待ISR中的任一個(gè)Replica“活”過(guò)來(lái)饭庞,并且選它作為L(zhǎng)eader;或選擇第一個(gè)“活”過(guò)來(lái)的Replica(不一定是ISR中的)作為L(zhǎng)eader)熬荆,這個(gè)broker宕機(jī)的事情舟山,kafka controller也會(huì)通知zookeeper,zookeeper就會(huì)通知其他的kafka broker卤恳。

這里曾經(jīng)發(fā)生過(guò)一個(gè)bug累盗,TalkingData使用Kafka0.8.1的時(shí)候,kafka controller在Zookeeper上注冊(cè)成功后突琳,它和Zookeeper通信的timeout時(shí)間是6s若债,也就是如果kafka controller如果有6s中沒(méi)有和Zookeeper做心跳,那么Zookeeper就認(rèn)為這個(gè)kafka controller已經(jīng)死了拆融,就會(huì)在Zookeeper上把這個(gè)臨時(shí)節(jié)點(diǎn)刪掉蠢琳,那么其他Kafka就會(huì)認(rèn)為controller已經(jīng)沒(méi)了,就會(huì)再次搶著注冊(cè)臨時(shí)節(jié)點(diǎn),注冊(cè)成功的那個(gè)kafka broker成為controller,然后,之前的那個(gè)kafka controller就需要各種shut down去關(guān)閉各種節(jié)點(diǎn)和事件的監(jiān)聽(tīng)芬沉。但是當(dāng)kafka的讀寫(xiě)流量都非常巨大的時(shí)候,TalkingData的一個(gè)bug是阁猜,由于網(wǎng)絡(luò)等原因丸逸,kafka controller和Zookeeper有6s中沒(méi)有通信,于是重新選舉出了一個(gè)新的kafka controller剃袍,但是原來(lái)的controller在shut down的時(shí)候總是不成功黄刚,這個(gè)時(shí)候producer進(jìn)來(lái)的message由于Kafka集群中存在兩個(gè)kafka controller而無(wú)法落地。導(dǎo)致數(shù)據(jù)淤積民效。

這里曾經(jīng)還有一個(gè)bug憔维,TalkingData使用Kafka0.8.1的時(shí)候,當(dāng)ack=0的時(shí)候畏邢,表示producer發(fā)送出去message业扒,只要對(duì)應(yīng)的kafka broker topic partition leader接收到的這條message,producer就返回成功舒萎,不管partition leader 是否真的成功把message真正存到kafka程储。當(dāng)ack=1的時(shí)候,表示producer發(fā)送出去message,同步的把message存到對(duì)應(yīng)topic的partition的leader上章鲤,然后producer就返回成功,partition leader異步的把message同步到其他partition replica上败徊。當(dāng)ack=all或-1帚呼,表示producer發(fā)送出去message,同步的把message存到對(duì)應(yīng)topic的partition的leader和對(duì)應(yīng)的replica上之后皱蹦,才返回成功煤杀。但是如果某個(gè)kafka controller切換的時(shí)候,會(huì)導(dǎo)致partition leader的切換(老的kafka controller上面的partition leader會(huì)選舉到其他的kafka broker上),但是這樣就會(huì)導(dǎo)致丟數(shù)據(jù)沪哺。

-Consumergroup:各個(gè)consumer(consumer 線程)可以組成一個(gè)組(Consumer group)沈自,partition中的每個(gè)message只能被組(Consumer group)中的一個(gè)consumer(consumer 線程)消費(fèi),如果一個(gè)message可以被多個(gè)consumer(consumer 線程)消費(fèi)的話凤粗,那么這些consumer必須在不同的組。Kafka不支持一個(gè)partition中的message由兩個(gè)或兩個(gè)以上的同一個(gè)consumer group下的consumer thread來(lái)處理今豆,除非再啟動(dòng)一個(gè)新的consumer group嫌拣。所以如果想同時(shí)對(duì)一個(gè)topic做消費(fèi)的話,啟動(dòng)多個(gè)consumer group就可以了呆躲,但是要注意的是异逐,這里的多個(gè)consumer的消費(fèi)都必須是順序讀取partition里面的message,新啟動(dòng)的consumer默認(rèn)從partition隊(duì)列最頭端最新的地方開(kāi)始阻塞的讀message插掂。它不能像AMQ那樣可以多個(gè)BET作為consumer去互斥的(for update悲觀鎖)并發(fā)處理message灰瞻,這是因?yàn)槎鄠€(gè)BET去消費(fèi)一個(gè)Queue中的數(shù)據(jù)的時(shí)候,由于要保證不能多個(gè)線程拿同一條message辅甥,所以就需要行級(jí)別悲觀所(for update),這就導(dǎo)致了consume的性能下降酝润,吞吐量不夠。而kafka為了保證吞吐量璃弄,只允許同一個(gè)consumer group下的一個(gè)consumer線程去訪問(wèn)一個(gè)partition要销。如果覺(jué)得效率不高的時(shí)候,可以加partition的數(shù)量來(lái)橫向擴(kuò)展夏块,那么再加新的consumer thread去消費(fèi)疏咐。如果想多個(gè)不同的業(yè)務(wù)都需要這個(gè)topic的數(shù)據(jù),起多個(gè)consumer group就好了脐供,大家都是順序的讀取message浑塞,offsite的值互不影響。這樣沒(méi)有鎖競(jìng)爭(zhēng)政己,充分發(fā)揮了橫向的擴(kuò)展性酌壕,吞吐量極高。這也就形成了分布式消費(fèi)的概念。

當(dāng)啟動(dòng)一個(gè)consumer group去消費(fèi)一個(gè)topic的時(shí)候仅孩,無(wú)論topic里面有多個(gè)少個(gè)partition托猩,無(wú)論我們consumer group里面配置了多少個(gè)consumer thread,這個(gè)consumer group下面的所有consumer thread一定會(huì)消費(fèi)全部的partition辽慕;即便這個(gè)consumer group下只有一個(gè)consumer thread京腥,那么這個(gè)consumer thread也會(huì)去消費(fèi)所有的partition。因此溅蛉,最優(yōu)的設(shè)計(jì)就是公浪,consumer group下的consumer thread的數(shù)量等于partition數(shù)量,這樣效率是最高的船侧。

同一partition的一條message只能被同一個(gè)Consumer Group內(nèi)的一個(gè)Consumer消費(fèi)欠气。不能夠一個(gè)consumer group的多個(gè)consumer同時(shí)消費(fèi)一個(gè)partition钥弯。

一個(gè)consumer group下豹储,無(wú)論有多少個(gè)consumer,這個(gè)consumer group一定回去把這個(gè)topic下所有的partition都消費(fèi)了喳资。當(dāng)consumer group里面的consumer數(shù)量小于這個(gè)topic下的partition數(shù)量的時(shí)候袁梗,如下圖groupA,groupB宜鸯,就會(huì)出現(xiàn)一個(gè)conusmer thread消費(fèi)多個(gè)partition的情況,總之是這個(gè)topic下的partition都會(huì)被消費(fèi)遮怜。如果consumer group里面的consumer數(shù)量等于這個(gè)topic下的partition數(shù)量的時(shí)候淋袖,如下圖groupC,此時(shí)效率是最高的锯梁,每個(gè)partition都有一個(gè)consumer thread去消費(fèi)即碗。當(dāng)consumer group里面的consumer數(shù)量大于這個(gè)topic下的partition數(shù)量的時(shí)候,如下圖GroupD陌凳,就會(huì)有一個(gè)consumer thread空閑剥懒。因此,我們?cè)谠O(shè)定consumer group的時(shí)候合敦,只需要指明里面有幾個(gè)consumer數(shù)量即可蕊肥,無(wú)需指定對(duì)應(yīng)的消費(fèi)partition序號(hào),consumer會(huì)自動(dòng)進(jìn)行rebalance蛤肌。

多個(gè)Consumer Group下的consumer可以消費(fèi)同一條message壁却,但是這種消費(fèi)也是以o(1)的方式順序的讀取message去消費(fèi),,所以一定會(huì)重復(fù)消費(fèi)這批message的裸准,不能向AMQ那樣多個(gè)BET作為consumer消費(fèi)(對(duì)message加鎖展东,消費(fèi)的時(shí)候不能重復(fù)消費(fèi)message)

-Consumer Rebalance的觸發(fā)條件:(1)Consumer增加或刪除會(huì)觸發(fā) Consumer Group的Rebalance(2)Broker的增加或者減少都會(huì)觸發(fā) Consumer Rebalance

-Consumer:Consumer處理partition里面的message的時(shí)候是o(1)順序讀取的。所以必須維護(hù)著上一次讀到哪里的offsite信息炒俱。high level API,offset存于Zookeeper中盐肃,low level API的offset由自己維護(hù)爪膊。一般來(lái)說(shuō)都是使用high level api的。Consumer的delivery gurarantee砸王,默認(rèn)是讀完message先commmit再處理message推盛,autocommit默認(rèn)是true,這時(shí)候先commit就會(huì)更新offsite+1谦铃,一旦處理失敗耘成,offsite已經(jīng)+1,這個(gè)時(shí)候就會(huì)丟message驹闰;也可以配置成讀完消息處理再commit瘪菌,這種情況下consumer端的響應(yīng)就會(huì)比較慢的,需要等處理完才行嘹朗。

一般情況下师妙,一定是一個(gè)consumer group處理一個(gè)topic的message。Best Practice是這個(gè)consumer group里面consumer的數(shù)量等于topic里面partition的數(shù)量屹培,這樣效率是最高的默穴,一個(gè)consumer thread處理一個(gè)partition。如果這個(gè)consumer group里面consumer的數(shù)量小于topic里面partition的數(shù)量褪秀,就會(huì)有consumer thread同時(shí)處理多個(gè)partition(這個(gè)是kafka自動(dòng)的機(jī)制蓄诽,我們不用指定),但是總之這個(gè)topic里面的所有partition都會(huì)被處理到的溜歪。若专。如果這個(gè)consumer group里面consumer的數(shù)量大于topic里面partition的數(shù)量许蓖,多出的consumer thread就會(huì)閑著啥也不干蝴猪,剩下的是一個(gè)consumer thread處理一個(gè)partition,這就造成了資源的浪費(fèi)膊爪,因?yàn)橐粋€(gè)partition不可能被兩個(gè)consumer thread去處理自阱。所以我們線上的分布式多個(gè)service服務(wù),每個(gè)service里面的kafka consumer數(shù)量都小于對(duì)應(yīng)的topic的partition數(shù)量米酬,但是所有服務(wù)的consumer數(shù)量只和等于partition的數(shù)量沛豌,這是因?yàn)榉植际絪ervice服務(wù)的所有consumer都來(lái)自一個(gè)consumer group,如果來(lái)自不同的consumer group就會(huì)處理重復(fù)的message了(同一個(gè)consumer group下的consumer不能處理同一個(gè)partition赃额,不同的consumer group可以處理同一個(gè)topic加派,那么都是順序處理message,一定會(huì)處理重復(fù)的跳芳。一般這種情況都是兩個(gè)不同的業(yè)務(wù)邏輯芍锦,才會(huì)啟動(dòng)兩個(gè)consumer group來(lái)處理一個(gè)topic)。

如果producer的流量增大飞盆,當(dāng)前的topic的parition數(shù)量=consumer數(shù)量娄琉,這時(shí)候的應(yīng)對(duì)方式就是很想擴(kuò)展:增加topic下的partition次乓,同時(shí)增加這個(gè)consumer group下的consumer。

-Delivery Mode :Kafka producer 發(fā)送message不用維護(hù)message的offsite信息孽水,因?yàn)檫@個(gè)時(shí)候票腰,offsite就相當(dāng)于一個(gè)自增id,producer就盡管發(fā)送message就好了女气。而且Kafka與AMQ不同杏慰,AMQ大都用在處理業(yè)務(wù)邏輯上,而Kafka大都是日志主卫,所以Kafka的producer一般都是大批量的batch發(fā)送message逃默,向這個(gè)topic一次性發(fā)送一大批message,load balance到一個(gè)partition上簇搅,一起插進(jìn)去完域,offsite作為自增id自己增加就好。但是Consumer端是需要維護(hù)這個(gè)partition當(dāng)前消費(fèi)到哪個(gè)message的offsite信息的瘩将,這個(gè)offsite信息吟税,high level api是維護(hù)在Zookeeper上,low level api是自己的程序維護(hù)姿现。(Kafka管理界面上只能顯示high level api的consumer部分肠仪,因?yàn)閘ow level api的partition offsite信息是程序自己維護(hù),kafka是不知道的备典,無(wú)法在管理界面上展示 )當(dāng)使用high level api的時(shí)候异旧,先拿message處理,再定時(shí)自動(dòng)commit offsite+1(也可以改成手動(dòng)), 并且kakfa處理message是沒(méi)有鎖操作的提佣。因此如果處理message失敗吮蛹,此時(shí)還沒(méi)有commit offsite+1,當(dāng)consumer thread重啟后會(huì)重復(fù)消費(fèi)這個(gè)message拌屏。但是作為高吞吐量高并發(fā)的實(shí)時(shí)處理系統(tǒng)潮针,at least once的情況下,至少一次會(huì)被處理到倚喂,是可以容忍的每篷。如果無(wú)法容忍,就得使用low level api來(lái)自己程序維護(hù)這個(gè)offsite信息端圈,那么想什么時(shí)候commit offsite+1就自己搞定了焦读。

-Topic & Partition:Topic相當(dāng)于傳統(tǒng)消息系統(tǒng)MQ中的一個(gè)隊(duì)列queue,producer端發(fā)送的message必須指定是發(fā)送到哪個(gè)topic舱权,但是不需要指定topic下的哪個(gè)partition矗晃,因?yàn)閗afka會(huì)把收到的message進(jìn)行l(wèi)oad balance,均勻的分布在這個(gè)topic下的不同的partition上( hash(message) % [broker數(shù)量] ?)刑巧。物理上存儲(chǔ)上喧兄,這個(gè)topic會(huì)分成一個(gè)或多個(gè)partition无畔,每個(gè)partiton相當(dāng)于是一個(gè)子queue。在物理結(jié)構(gòu)上吠冤,每個(gè)partition對(duì)應(yīng)一個(gè)物理的目錄(文件夾)浑彰,文件夾命名是[topicname]_[partition]_[序號(hào)],一個(gè)topic可以有無(wú)數(shù)多的partition拯辙,根據(jù)業(yè)務(wù)需求和數(shù)據(jù)量來(lái)設(shè)置郭变。在kafka配置文件中可隨時(shí)更高num.partitions參數(shù)來(lái)配置更改topic的partition數(shù)量,在創(chuàng)建Topic時(shí)通過(guò)參數(shù)指定parittion數(shù)量涯保。Topic創(chuàng)建之后通過(guò)Kafka提供的工具也可以修改partiton數(shù)量诉濒。

一般來(lái)說(shuō),(1)一個(gè)Topic的Partition數(shù)量大于等于Broker的數(shù)量夕春,可以提高吞吐率未荒。(2)同一個(gè)Partition的Replica盡量分散到不同的機(jī)器,高可用及志。

當(dāng)add a new partition的時(shí)候片排,partition里面的message不會(huì)重新進(jìn)行分配,原來(lái)的partition里面的message數(shù)據(jù)不會(huì)變速侈,新加的這個(gè)partition剛開(kāi)始是空的率寡,隨后進(jìn)入這個(gè)topic的message就會(huì)重新參與所有partition的load balance

-Partition Replica:每個(gè)partition可以在其他的kafka broker節(jié)點(diǎn)上存副本,以便某個(gè)kafka broker節(jié)點(diǎn)宕機(jī)不會(huì)影響這個(gè)kafka集群倚搬。存replica副本的方式是按照kafka broker的順序存冶共。例如有5個(gè)kafka broker節(jié)點(diǎn),某個(gè)topic有3個(gè)partition每界,每個(gè)partition存2個(gè)副本捅僵,那么partition1存broker1,broker2,partition2存broker2,broker3盆犁。命咐。篡九。以此類(lèi)推(replica副本數(shù)目不能大于kafka broker節(jié)點(diǎn)的數(shù)目谐岁,否則報(bào)錯(cuò)。這里的replica數(shù)其實(shí)就是partition的副本總數(shù)榛臼,其中包括一個(gè)leader伊佃,其他的就是copy副本)。這樣如果某個(gè)broker宕機(jī)沛善,其實(shí)整個(gè)kafka內(nèi)數(shù)據(jù)依然是完整的航揉。但是,replica副本數(shù)越高金刁,系統(tǒng)雖然越穩(wěn)定帅涂,但是回來(lái)帶資源和性能上的下降议薪;replica副本少的話,也會(huì)造成系統(tǒng)丟數(shù)據(jù)的風(fēng)險(xiǎn)媳友。

(1)怎樣傳送消息:producer先把message發(fā)送到partition leader斯议,再由leader發(fā)送給其他partition follower。(如果讓producer發(fā)送給每個(gè)replica那就太慢了)

(2)在向Producer發(fā)送ACK前需要保證有多少個(gè)Replica已經(jīng)收到該消息:根據(jù)ack配的個(gè)數(shù)而定

(3)怎樣處理某個(gè)Replica不工作的情況:如果這個(gè)部工作的partition replica不在ack列表中醇锚,就是producer在發(fā)送消息到partition leader上哼御,partition leader向partition follower發(fā)送message沒(méi)有響應(yīng)而已,這個(gè)不會(huì)影響整個(gè)系統(tǒng)焊唬,也不會(huì)有什么問(wèn)題恋昼。如果這個(gè)不工作的partition replica在ack列表中的話,producer發(fā)送的message的時(shí)候會(huì)等待這個(gè)不工作的partition replca寫(xiě)message成功赶促,但是會(huì)等到time out液肌,然后返回失敗因?yàn)槟硞€(gè)ack列表中的partition replica沒(méi)有響應(yīng),此時(shí)kafka會(huì)自動(dòng)的把這個(gè)部工作的partition replica從ack列表中移除鸥滨,以后的producer發(fā)送message的時(shí)候就不會(huì)有這個(gè)ack列表下的這個(gè)部工作的partition replica了矩屁。

(4)怎樣處理Failed Replica恢復(fù)回來(lái)的情況:如果這個(gè)partition replica之前不在ack列表中,那么啟動(dòng)后重新受Zookeeper管理即可爵赵,之后producer發(fā)送message的時(shí)候吝秕,partition leader會(huì)繼續(xù)發(fā)送message到這個(gè)partition follower上。如果這個(gè)partition replica之前在ack列表中空幻,此時(shí)重啟后烁峭,需要把這個(gè)partition replica再手動(dòng)加到ack列表中。(ack列表是手動(dòng)添加的秕铛,出現(xiàn)某個(gè)部工作的partition replica的時(shí)候自動(dòng)從ack列表中移除的)

-Partition leader與follower:partition也有l(wèi)eader和follower之分约郁。leader是主partition,producer寫(xiě)kafka的時(shí)候先寫(xiě)partition leader但两,再由partition leader push給其他的partition follower鬓梅。partition leader與follower的信息受Zookeeper控制,一旦partition leader所在的broker節(jié)點(diǎn)宕機(jī)谨湘,zookeeper會(huì)沖其他的broker的partition follower上選擇follower變?yōu)閜arition leader绽快。

-Topic分配partition和partition replica的算法:(1)將Broker(size=n)和待分配的Partition排序。(2)將第i個(gè)Partition分配到第(i%n)個(gè)Broker上紧阔。(3)將第i個(gè)Partition的第j個(gè)Replica分配到第((i + j) % n)個(gè)Broker上

-?消息投遞可靠性

一個(gè)消息如何算投遞成功坊罢,Kafka提供了三種模式:

- 第一種是啥都不管,發(fā)送出去就當(dāng)作成功擅耽,這種情況當(dāng)然不能保證消息成功投遞到broker活孩;

- 第二種是Master-Slave模型,只有當(dāng)Master和所有Slave都接收到消息時(shí)乖仇,才算投遞成功憾儒,這種模型提供了最高的投遞可靠性询兴,但是損傷了性能;

- 第三種模型起趾,即只要Master確認(rèn)收到消息就算投遞成功蕉朵;實(shí)際使用時(shí),根據(jù)應(yīng)用特性選擇阳掐,絕大多數(shù)情況下都會(huì)中和可靠性和性能選擇第三種模型

消息在broker上的可靠性始衅,因?yàn)橄?huì)持久化到磁盤(pán)上,所以如果正常stop一個(gè)broker缭保,其上的數(shù)據(jù)不會(huì)丟失汛闸;但是如果不正常stop,可能會(huì)使存在頁(yè)面緩存來(lái)不及寫(xiě)入磁盤(pán)的消息丟失艺骂,這可以通過(guò)配置flush頁(yè)面緩存的周期诸老、閾值緩解,但是同樣會(huì)頻繁的寫(xiě)磁盤(pán)會(huì)影響性能钳恕,又是一個(gè)選擇題别伏,根據(jù)實(shí)際情況配置。

消息消費(fèi)的可靠性忧额,Kafka提供的是“At least once”模型厘肮,因?yàn)橄⒌淖x取進(jìn)度由offset提供,offset可以由消費(fèi)者自己維護(hù)也可以維護(hù)在zookeeper里睦番,但是當(dāng)消息消費(fèi)后consumer掛掉类茂,offset沒(méi)有即時(shí)寫(xiě)回,就有可能發(fā)生重復(fù)讀的情況托嚣,這種情況同樣可以通過(guò)調(diào)整commit offset周期巩检、閾值緩解,甚至消費(fèi)者自己把消費(fèi)和commit offset做成一個(gè)事務(wù)解決示启,但是如果你的應(yīng)用不在乎重復(fù)消費(fèi)兢哭,那就干脆不要解決,以換取最大的性能夫嗓。

-Partition ack:當(dāng)ack=1迟螺,表示producer寫(xiě)partition leader成功后,broker就返回成功啤月,無(wú)論其他的partition follower是否寫(xiě)成功煮仇。當(dāng)ack=2劳跃,表示producer寫(xiě)partition leader和其他一個(gè)follower成功的時(shí)候谎仲,broker就返回成功,無(wú)論其他的partition follower是否寫(xiě)成功刨仑。當(dāng)ack=-1[parition的數(shù)量]的時(shí)候郑诺,表示只有producer全部寫(xiě)成功的時(shí)候夹姥,才算成功,kafka broker才返回成功信息辙诞。這里需要注意的是辙售,如果ack=1的時(shí)候,一旦有個(gè)broker宕機(jī)導(dǎo)致partition的follower和leader切換飞涂,會(huì)導(dǎo)致丟數(shù)據(jù)旦部。

-message狀態(tài):在Kafka中,消息的狀態(tài)被保存在consumer中较店,broker不會(huì)關(guān)心哪個(gè)消息被消費(fèi)了被誰(shuí)消費(fèi)了士八,只記錄一個(gè)offset值(指向partition中下一個(gè)要被消費(fèi)的消息位置),這就意味著如果consumer處理不好的話梁呈,broker上的一個(gè)消息可能會(huì)被消費(fèi)多次婚度。

-message持久化:Kafka中會(huì)把消息持久化到本地文件系統(tǒng)中,并且保持o(1)極高的效率官卡。我們眾所周知IO讀取是非常耗資源的性能也是最慢的蝗茁,這就是為了數(shù)據(jù)庫(kù)的瓶頸經(jīng)常在IO上,需要換SSD硬盤(pán)的原因寻咒。但是Kafka作為吞吐量極高的MQ哮翘,卻可以非常高效的message持久化到文件。這是因?yàn)镵afka是順序?qū)懭雘(1)的時(shí)間復(fù)雜度毛秘,速度非橙炭溃快。也是高吞吐量的原因熔脂。由于message的寫(xiě)入持久化是順序?qū)懭氲呐逖校虼薽essage在被消費(fèi)的時(shí)候也是按順序被消費(fèi)的,保證partition的message是順序消費(fèi)的霞揉。一般的機(jī)器,單機(jī)每秒100k條數(shù)據(jù)旬薯。

-message有效期:Kafka會(huì)長(zhǎng)久保留其中的消息,以便consumer可以多次消費(fèi)适秩,當(dāng)然其中很多細(xì)節(jié)是可配置的绊序。

-Produer :Producer向Topic發(fā)送message,不需要指定partition秽荞,直接發(fā)送就好了骤公。kafka通過(guò)partition ack來(lái)控制是否發(fā)送成功并把信息返回給producer,producer可以有任意多的thread扬跋,這些kafka服務(wù)器端是不care的阶捆。Producer端的delivery guarantee默認(rèn)是At least once的。也可以設(shè)置Producer異步發(fā)送實(shí)現(xiàn)At most once。Producer可以用主鍵冪等性實(shí)現(xiàn)Exactly once

-Kafka高吞吐量: Kafka的高吞吐量體現(xiàn)在讀寫(xiě)上洒试,分布式并發(fā)的讀和寫(xiě)都非潮渡荩快,寫(xiě)的性能體現(xiàn)在以o(1)的時(shí)間復(fù)雜度進(jìn)行順序?qū)懭肜萜濉Wx的性能體現(xiàn)在以o(1)的時(shí)間復(fù)雜度進(jìn)行順序讀取卒煞,對(duì)topic進(jìn)行partition分區(qū),consume group中的consume線程可以以很高能性能進(jìn)行順序讀叼架。

- Kafka delivery guarantee(message傳送保證):(1)At most once消息可能會(huì)丟畔裕,絕對(duì)不會(huì)重復(fù)傳輸;(2)At least once 消息絕對(duì)不會(huì)丟乖订,但是可能會(huì)重復(fù)傳輸柴钻;(3)Exactly once每條信息肯定會(huì)被傳輸一次且僅傳輸一次,這是用戶(hù)想要的垢粮。

-批量發(fā)送:Kafka支持以消息集合為單位進(jìn)行批量發(fā)送贴届,以提高push效率。

-push-and-pull: Kafka中的Producer和consumer采用的是push-and-pull模式蜡吧,即Producer只管向broker push消息毫蚓,consumer只管從broker pull消息,兩者對(duì)消息的生產(chǎn)和消費(fèi)是異步的昔善。

-Kafka集群中broker之間的關(guān)系:不是主從關(guān)系元潘,各個(gè)broker在集群中地位一樣,我們可以隨意的增加或刪除任何一個(gè)broker節(jié)點(diǎn)君仆。

-負(fù)載均衡方面: Kafka提供了一個(gè) metadata API來(lái)管理broker之間的負(fù)載(對(duì)Kafka0.8.x而言翩概,對(duì)于0.7.x主要靠zookeeper來(lái)實(shí)現(xiàn)負(fù)載均衡)。

-同步異步:Producer采用異步push方式返咱,極大提高Kafka系統(tǒng)的吞吐率(可以通過(guò)參數(shù)控制是采用同步還是異步方式)钥庇。

-分區(qū)機(jī)制partition:Kafka的broker端支持消息分區(qū)partition,Producer可以決定把消息發(fā)到哪個(gè)partition咖摹,在一個(gè)partition中message的順序就是Producer發(fā)送消息的順序评姨,一個(gè)topic中可以有多個(gè)partition,具體partition的數(shù)量是可配置的萤晴。partition的概念使得kafka作為MQ可以橫向擴(kuò)展吐句,吞吐量巨大。partition可以設(shè)置replica副本店读,replica副本存在不同的kafka broker節(jié)點(diǎn)上嗦枢,第一個(gè)partition是leader,其他的是follower,message先寫(xiě)到partition leader上,再由partition leader push到parition follower上。所以說(shuō)kafka可以水平擴(kuò)展,也就是擴(kuò)展partition挪拟。

-離線數(shù)據(jù)裝載:Kafka由于對(duì)可拓展的數(shù)據(jù)持久化的支持择葡,它也非常適合向Hadoop或者數(shù)據(jù)倉(cāng)庫(kù)中進(jìn)行數(shù)據(jù)裝載紧武。

-實(shí)時(shí)數(shù)據(jù)與離線數(shù)據(jù):kafka既支持離線數(shù)據(jù)也支持實(shí)時(shí)數(shù)據(jù)剃氧,因?yàn)閗afka的message持久化到文件敏储,并可以設(shè)置有效期,因此可以把kafka作為一個(gè)高效的存儲(chǔ)來(lái)使用朋鞍,可以作為離線數(shù)據(jù)供后面的分析已添。當(dāng)然作為分布式實(shí)時(shí)消息系統(tǒng),大多數(shù)情況下還是用于實(shí)時(shí)的數(shù)據(jù)處理的滥酥,但是當(dāng)cosumer消費(fèi)能力下降的時(shí)候可以通過(guò)message的持久化在淤積數(shù)據(jù)在kafka更舞。

-插件支持:現(xiàn)在不少活躍的社區(qū)已經(jīng)開(kāi)發(fā)出不少插件來(lái)拓展Kafka的功能,如用來(lái)配合Storm坎吻、Hadoop缆蝉、flume相關(guān)的插件。

-解耦: ?相當(dāng)于一個(gè)MQ瘦真,使得Producer和Consumer之間異步的操作刊头,系統(tǒng)之間解耦

-冗余: ?replica有多個(gè)副本,保證一個(gè)broker node宕機(jī)后不會(huì)影響整個(gè)服務(wù)

-擴(kuò)展性: ?broker節(jié)點(diǎn)可以水平擴(kuò)展诸尽,partition也可以水平增加原杂,partition replica也可以水平增加

-峰值: ?在訪問(wèn)量劇增的情況下,kafka水平擴(kuò)展, 應(yīng)用仍然需要繼續(xù)發(fā)揮作用

-可恢復(fù)性: ?系統(tǒng)的一部分組件失效時(shí)您机,由于有partition的replica副本穿肄,不會(huì)影響到整個(gè)系統(tǒng)。

-順序保證性:由于kafka的producer的寫(xiě)message與consumer去讀message都是順序的讀寫(xiě)际看,保證了高效的性能咸产。

-緩沖:由于producer那面可能業(yè)務(wù)很簡(jiǎn)單,而后端consumer業(yè)務(wù)會(huì)很復(fù)雜并有數(shù)據(jù)庫(kù)的操作仲闽,因此肯定是producer會(huì)比consumer處理速度快锐朴,如果沒(méi)有kafka,producer直接調(diào)用consumer蔼囊,那么就會(huì)造成整個(gè)系統(tǒng)的處理速度慢焚志,加一層kafka作為MQ,可以起到緩沖的作用畏鼓。

-異步通信:作為MQ酱酬,Producer與Consumer異步通信

2.Kafka文件存儲(chǔ)機(jī)制

2.1 Kafka部分名詞解釋如下:

Kafka中發(fā)布訂閱的對(duì)象是topic。我們可以為每類(lèi)數(shù)據(jù)創(chuàng)建一個(gè)topic云矫,把向topic發(fā)布消息的客戶(hù)端稱(chēng)作producer膳沽,從topic訂閱消息的客戶(hù)端稱(chēng)作consumer。Producers和consumers可以同時(shí)從多個(gè)topic讀寫(xiě)數(shù)據(jù)。一個(gè)kafka集群由一個(gè)或多個(gè)broker服務(wù)器組成挑社,它負(fù)責(zé)持久化和備份具體的kafka消息陨界。

Broker:Kafka節(jié)點(diǎn),一個(gè)Kafka節(jié)點(diǎn)就是一個(gè)broker痛阻,多個(gè)broker可以組成一個(gè)Kafka集群菌瘪。

Topic:一類(lèi)消息,消息存放的目錄即主題阱当,例如page view日志俏扩、click日志等都可以以topic的形式存在,Kafka集群能夠同時(shí)負(fù)責(zé)多個(gè)topic的分發(fā)弊添。

Partition:topic物理上的分組录淡,一個(gè)topic可以分為多個(gè)partition,每個(gè)partition是一個(gè)有序的隊(duì)列

Segment:partition物理上由多個(gè)segment組成油坝,每個(gè)Segment存著message信息

Producer: 生產(chǎn)message發(fā)送到topic

Consumer: 訂閱topic消費(fèi)message, consumer作為一個(gè)線程來(lái)消費(fèi)

Consumer Group:一個(gè)Consumer Group包含多個(gè)consumer, 這個(gè)是預(yù)先在配置文件中配置好的嫉戚。各個(gè)consumer(consumer 線程)可以組成一個(gè)組(Consumer group ),partition中的每個(gè)message只能被組(Consumer group ) 中的一個(gè)consumer(consumer 線程 )消費(fèi)澈圈,如果一個(gè)message可以被多個(gè)consumer(consumer 線程 ) 消費(fèi)的話彬檀,那么這些consumer必須在不同的組。Kafka不支持一個(gè)partition中的message由兩個(gè)或兩個(gè)以上的consumer thread來(lái)處理极舔,即便是來(lái)自不同的consumer group的也不行凤覆。它不能像AMQ那樣可以多個(gè)BET作為consumer去處理message,這是因?yàn)槎鄠€(gè)BET去消費(fèi)一個(gè)Queue中的數(shù)據(jù)的時(shí)候拆魏,由于要保證不能多個(gè)線程拿同一條message盯桦,所以就需要行級(jí)別悲觀所(for update),這就導(dǎo)致了consume的性能下降,吞吐量不夠渤刃。而kafka為了保證吞吐量拥峦,只允許一個(gè)consumer線程去訪問(wèn)一個(gè)partition。如果覺(jué)得效率不高的時(shí)候卖子,可以加partition的數(shù)量來(lái)橫向擴(kuò)展略号,那么再加新的consumer thread去消費(fèi)。這樣沒(méi)有鎖競(jìng)爭(zhēng)洋闽,充分發(fā)揮了橫向的擴(kuò)展性玄柠,吞吐量極高。這也就形成了分布式消費(fèi)的概念诫舅。

2.2 kafka一些原理概念

1.持久化

kafka使用文件存儲(chǔ)消息(append only log),這就直接決定kafka在性能上嚴(yán)重依賴(lài)文件系統(tǒng)的本身特性.且無(wú)論任何OS下,對(duì)文件系統(tǒng)本身的優(yōu)化是非常艱難的.文件緩存/直接內(nèi)存映射等是常用的手段.因?yàn)閗afka是對(duì)日志文件進(jìn)行append操作,因此磁盤(pán)檢索的開(kāi)支是較小的;同時(shí)為了減少磁盤(pán)寫(xiě)入的次數(shù),broker會(huì)將消息暫時(shí)buffer起來(lái),當(dāng)消息的個(gè)數(shù)(或尺寸)達(dá)到一定閥值時(shí),再flush到磁盤(pán),這樣減少了磁盤(pán)IO調(diào)用的次數(shù).對(duì)于kafka而言,較高性能的磁盤(pán),將會(huì)帶來(lái)更加直接的性能提升.

2.性能

除磁盤(pán)IO之外,我們還需要考慮網(wǎng)絡(luò)IO,這直接關(guān)系到kafka的吞吐量問(wèn)題.kafka并沒(méi)有提供太多高超的技巧;對(duì)于producer端,可以將消息buffer起來(lái),當(dāng)消息的條數(shù)達(dá)到一定閥值時(shí),批量發(fā)送給broker;對(duì)于consumer端也是一樣,批量fetch多條消息.不過(guò)消息量的大小可以通過(guò)配置文件來(lái)指定.對(duì)于kafka broker端,似乎有個(gè)sendfile系統(tǒng)調(diào)用可以潛在的提升網(wǎng)絡(luò)IO的性能:將文件的數(shù)據(jù)映射到系統(tǒng)內(nèi)存中,socket直接讀取相應(yīng)的內(nèi)存區(qū)域即可,而無(wú)需進(jìn)程再次copy和交換(這里涉及到"磁盤(pán)IO數(shù)據(jù)"/"內(nèi)核內(nèi)存"/"進(jìn)程內(nèi)存"/"網(wǎng)絡(luò)緩沖區(qū)",多者之間的數(shù)據(jù)copy).

其實(shí)對(duì)于producer/consumer/broker三者而言,CPU的開(kāi)支應(yīng)該都不大,因此啟用消息壓縮機(jī)制是一個(gè)良好的策略;壓縮需要消耗少量的CPU資源,不過(guò)對(duì)于kafka而言,網(wǎng)絡(luò)IO更應(yīng)該需要考慮.可以將任何在網(wǎng)絡(luò)上傳輸?shù)南⒍冀?jīng)過(guò)壓縮.kafka支持gzip/snappy等多種壓縮方式.

3.負(fù)載均衡

kafka集群中的任何一個(gè)broker,都可以向producer提供metadata信息,這些metadata中包含"集群中存活的servers列表"/"partitions leader列表"等信息(請(qǐng)參看zookeeper中的節(jié)點(diǎn)信息). 當(dāng)producer獲取到metadata信息之后, producer將會(huì)和Topic下所有partition leader保持socket連接;消息由producer直接通過(guò)socket發(fā)送到broker,中間不會(huì)經(jīng)過(guò)任何"路由層".

異步發(fā)送羽利,將多條消息暫且在客戶(hù)端buffer起來(lái),并將他們批量發(fā)送到broker;小數(shù)據(jù)IO太多,會(huì)拖慢整體的網(wǎng)絡(luò)延遲,批量延遲發(fā)送事實(shí)上提升了網(wǎng)絡(luò)效率;不過(guò)這也有一定的隱患,比如當(dāng)producer失效時(shí),那些尚未發(fā)送的消息將會(huì)丟失。

4.Topic模型

其他JMS實(shí)現(xiàn),消息消費(fèi)的位置是有prodiver保留,以便避免重復(fù)發(fā)送消息或者將沒(méi)有消費(fèi)成功的消息重發(fā)等,同時(shí)還要控制消息的狀態(tài).這就要求JMS broker需要太多額外的工作.在kafka中,partition中的消息只有一個(gè)consumer在消費(fèi),且不存在消息狀態(tài)的控制,也沒(méi)有復(fù)雜的消息確認(rèn)機(jī)制,可見(jiàn)kafka broker端是相當(dāng)輕量級(jí)的.當(dāng)消息被consumer接收之后,consumer可以在本地保存最后消息的offset,并間歇性的向zookeeper注冊(cè)offset.由此可見(jiàn),consumer客戶(hù)端也很輕量級(jí)刊懈。

kafka中consumer負(fù)責(zé)維護(hù)消息的消費(fèi)記錄,而broker則不關(guān)心這些,這種設(shè)計(jì)不僅提高了consumer端的靈活性,也適度的減輕了broker端設(shè)計(jì)的復(fù)雜度;這是和眾多JMS prodiver的區(qū)別.此外,kafka中消息ACK的設(shè)計(jì)也和JMS有很大不同,kafka中的消息是批量(通常以消息的條數(shù)或者chunk的尺寸為單位)發(fā)送給consumer,當(dāng)消息消費(fèi)成功后,向zookeeper提交消息的offset,而不會(huì)向broker交付ACK.或許你已經(jīng)意識(shí)到,這種"寬松"的設(shè)計(jì),將會(huì)有"丟失"消息/"消息重發(fā)"的危險(xiǎn).

5.消息傳輸一致

Kafka提供3種消息傳輸一致性語(yǔ)義:最多1次这弧,最少1次娃闲,恰好1次。

最少1次:可能會(huì)重傳數(shù)據(jù)匾浪,有可能出現(xiàn)數(shù)據(jù)被重復(fù)處理的情況;

最多1次:可能會(huì)出現(xiàn)數(shù)據(jù)丟失情況;

恰好1次:并不是指真正只傳輸1次皇帮,只不過(guò)有一個(gè)機(jī)制。確保不會(huì)出現(xiàn)“數(shù)據(jù)被重復(fù)處理”和“數(shù)據(jù)丟失”的情況蛋辈。

at most once: 消費(fèi)者fetch消息,然后保存offset,然后處理消息;當(dāng)client保存offset之后,但是在消息處理過(guò)程中consumer進(jìn)程失效(crash),導(dǎo)致部分消息未能繼續(xù)處理.那么此后可能其他consumer會(huì)接管,但是因?yàn)閛ffset已經(jīng)提前保存,那么新的consumer將不能fetch到offset之前的消息(盡管它們尚沒(méi)有被處理),這就是"at most once".

at least once: 消費(fèi)者fetch消息,然后處理消息,然后保存offset.如果消息處理成功之后,但是在保存offset階段zookeeper異呈羰埃或者consumer失效,導(dǎo)致保存offset操作未能執(zhí)行成功,這就導(dǎo)致接下來(lái)再次fetch時(shí)可能獲得上次已經(jīng)處理過(guò)的消息,這就是"at least once".

"Kafka Cluster"到消費(fèi)者的場(chǎng)景中可以采取以下方案來(lái)得到“恰好1次”的一致性語(yǔ)義:

最少1次+消費(fèi)者的輸出中額外增加已處理消息最大編號(hào):由于已處理消息最大編號(hào)的存在,不會(huì)出現(xiàn)重復(fù)處理消息的情況梯浪。

6.副本

kafka中,replication策略是基于partition,而不是topic;kafka將每個(gè)partition數(shù)據(jù)復(fù)制到多個(gè)server上,任何一個(gè)partition有一個(gè)leader和多個(gè)follower(可以沒(méi)有);備份的個(gè)數(shù)可以通過(guò)broker配置文件來(lái)設(shè)定捌年。leader處理所有的read-write請(qǐng)求,follower需要和leader保持同步.Follower就像一個(gè)"consumer",消費(fèi)消息并保存在本地日志中;leader負(fù)責(zé)跟蹤所有的follower狀態(tài),如果follower"落后"太多或者失效,leader將會(huì)把它從replicas同步列表中刪除.當(dāng)所有的follower都將一條消息保存成功,此消息才被認(rèn)為是"committed",那么此時(shí)consumer才能消費(fèi)它,這種同步策略,就要求follower和leader之間必須具有良好的網(wǎng)絡(luò)環(huán)境.即使只有一個(gè)replicas實(shí)例存活,仍然可以保證消息的正常發(fā)送和接收,只要zookeeper集群存活即可.

選擇follower時(shí)需要兼顧一個(gè)問(wèn)題,就是新leader server上所已經(jīng)承載的partition leader的個(gè)數(shù),如果一個(gè)server上有過(guò)多的partition leader,意味著此server將承受著更多的IO壓力.在選舉新leader,需要考慮到"負(fù)載均衡",partition leader較少的broker將會(huì)更有可能成為新的leader.

7.log

每個(gè)log entry格式為"4個(gè)字節(jié)的數(shù)字N表示消息的長(zhǎng)度" + "N個(gè)字節(jié)的消息內(nèi)容";每個(gè)日志都有一個(gè)offset來(lái)唯一的標(biāo)記一條消息,offset的值為8個(gè)字節(jié)的數(shù)字,表示此消息在此partition中所處的起始位置..每個(gè)partition在物理存儲(chǔ)層面,有多個(gè)log file組成(稱(chēng)為segment).segment file的命名為"最小offset".kafka.例如"00000000000.kafka";其中"最小offset"表示此segment中起始消息的offset.

獲取消息時(shí),需要指定offset和最大chunk尺寸,offset用來(lái)表示消息的起始位置,chunk size用來(lái)表示最大獲取消息的總長(zhǎng)度(間接的表示消息的條數(shù)).根據(jù)offset,可以找到此消息所在segment文件,然后根據(jù)segment的最小offset取差值,得到它在file中的相對(duì)位置,直接讀取輸出即可.

8.分布式

kafka使用zookeeper來(lái)存儲(chǔ)一些meta信息,并使用了zookeeper watch機(jī)制來(lái)發(fā)現(xiàn)meta信息的變更并作出相應(yīng)的動(dòng)作(比如consumer失效,觸發(fā)負(fù)載均衡等)

Broker node registry: 當(dāng)一個(gè)kafka broker啟動(dòng)后,首先會(huì)向zookeeper注冊(cè)自己的節(jié)點(diǎn)信息(臨時(shí)znode),同時(shí)當(dāng)broker和zookeeper斷開(kāi)連接時(shí),此znode也會(huì)被刪除.

Broker Topic Registry: 當(dāng)一個(gè)broker啟動(dòng)時(shí),會(huì)向zookeeper注冊(cè)自己持有的topic和partitions信息,仍然是一個(gè)臨時(shí)znode.

Consumer and Consumer group: 每個(gè)consumer客戶(hù)端被創(chuàng)建時(shí),會(huì)向zookeeper注冊(cè)自己的信息;此作用主要是為了"負(fù)載均衡".一個(gè)group中的多個(gè)consumer可以交錯(cuò)的消費(fèi)一個(gè)topic的所有partitions;簡(jiǎn)而言之,保證此topic的所有partitions都能被此group所消費(fèi),且消費(fèi)時(shí)為了性能考慮,讓partition相對(duì)均衡的分散到每個(gè)consumer上.

Consumer id Registry: 每個(gè)consumer都有一個(gè)唯一的ID(host:uuid,可以通過(guò)配置文件指定,也可以由系統(tǒng)生成),此id用來(lái)標(biāo)記消費(fèi)者信息.

Consumer offset Tracking: 用來(lái)跟蹤每個(gè)consumer目前所消費(fèi)的partition中最大的offset.此znode為持久節(jié)點(diǎn),可以看出offset跟group_id有關(guān),以表明當(dāng)group中一個(gè)消費(fèi)者失效,其他consumer可以繼續(xù)消費(fèi).

Partition Owner registry: 用來(lái)標(biāo)記partition正在被哪個(gè)consumer消費(fèi).臨時(shí)znode瓢娜。此節(jié)點(diǎn)表達(dá)了"一個(gè)partition"只能被group下一個(gè)consumer消費(fèi),同時(shí)當(dāng)group下某個(gè)consumer失效,那么將會(huì)觸發(fā)負(fù)載均衡(即:讓partitions在多個(gè)consumer間均衡消費(fèi),接管那些"游離"的partitions)

當(dāng)consumer啟動(dòng)時(shí),所觸發(fā)的操作:

A) 首先進(jìn)行"Consumer id Registry";

B) 然后在"Consumer id Registry"節(jié)點(diǎn)下注冊(cè)一個(gè)watch用來(lái)監(jiān)聽(tīng)當(dāng)前group中其他consumer的"leave"和"join";只要此znode path下節(jié)點(diǎn)列表變更,都會(huì)觸發(fā)此group下consumer的負(fù)載均衡.(比如一個(gè)consumer失效,那么其他consumer接管partitions).

C) 在"Broker id registry"節(jié)點(diǎn)下,注冊(cè)一個(gè)watch用來(lái)監(jiān)聽(tīng)broker的存活情況;如果broker列表變更,將會(huì)觸發(fā)所有的groups下的consumer重新balance.

總結(jié):

1) Producer端使用zookeeper用來(lái)"發(fā)現(xiàn)"broker列表,以及和Topic下每個(gè)partition leader建立socket連接并發(fā)送消息.

2) Broker端使用zookeeper用來(lái)注冊(cè)broker信息,已經(jīng)監(jiān)測(cè)partition leader存活性.

3) Consumer端使用zookeeper用來(lái)注冊(cè)consumer信息,其中包括consumer消費(fèi)的partition列表等,同時(shí)也用來(lái)發(fā)現(xiàn)broker列表,并和partition leader建立socket連接,并獲取消息挂洛。

9.Leader的選擇

Kafka的核心是日志文件,日志文件在集群中的同步是分布式數(shù)據(jù)系統(tǒng)最基礎(chǔ)的要素眠砾。

如果leaders永遠(yuǎn)不會(huì)down的話我們就不需要followers了虏劲!一旦leader down掉了,需要在followers中選擇一個(gè)新的leader.但是followers本身有可能延時(shí)太久或者crash褒颈,所以必須選擇高質(zhì)量的follower作為leader.必須保證柒巫,一旦一個(gè)消息被提交了,但是leader down掉了谷丸,新選出的leader必須可以提供這條消息堡掏。大部分的分布式系統(tǒng)采用了多數(shù)投票法則選擇新的leader,對(duì)于多數(shù)投票法則,就是根據(jù)所有副本節(jié)點(diǎn)的狀況動(dòng)態(tài)的選擇最適合的作為leader.Kafka并不是使用這種方法刨疼。

Kafka動(dòng)態(tài)維護(hù)了一個(gè)同步狀態(tài)的副本的集合(a set of in-sync replicas)泉唁,簡(jiǎn)稱(chēng)ISR,在這個(gè)集合中的節(jié)點(diǎn)都是和leader保持高度一致的揩慕,任何一條消息必須被這個(gè)集合中的每個(gè)節(jié)點(diǎn)讀取并追加到日志中了亭畜,才回通知外部這個(gè)消息已經(jīng)被提交了。因此這個(gè)集合中的任何一個(gè)節(jié)點(diǎn)隨時(shí)都可以被選為leader.ISR在ZooKeeper中維護(hù)迎卤。ISR中有f+1個(gè)節(jié)點(diǎn)拴鸵,就可以允許在f個(gè)節(jié)點(diǎn)down掉的情況下不會(huì)丟失消息并正常提供服。ISR的成員是動(dòng)態(tài)的蜗搔,如果一個(gè)節(jié)點(diǎn)被淘汰了劲藐,當(dāng)它重新達(dá)到“同步中”的狀態(tài)時(shí),他可以重新加入ISR.這種leader的選擇方式是非痴疗啵快速的聘芜,適合kafka的應(yīng)用場(chǎng)景。

一個(gè)邪惡的想法:如果所有節(jié)點(diǎn)都down掉了怎么辦不同?Kafka對(duì)于數(shù)據(jù)不會(huì)丟失的保證厉膀,是基于至少一個(gè)節(jié)點(diǎn)是存活的溶耘,一旦所有節(jié)點(diǎn)都down了,這個(gè)就不能保證了服鹅。

實(shí)際應(yīng)用中凳兵,當(dāng)所有的副本都down掉時(shí),必須及時(shí)作出反應(yīng)企软÷ǎ可以有以下兩種選擇:

1. 等待ISR中的任何一個(gè)節(jié)點(diǎn)恢復(fù)并擔(dān)任leader。

2. 選擇所有節(jié)點(diǎn)中(不只是ISR)第一個(gè)恢復(fù)的節(jié)點(diǎn)作為leader.

這是一個(gè)在可用性和連續(xù)性之間的權(quán)衡仗哨。如果等待ISR中的節(jié)點(diǎn)恢復(fù)形庭,一旦ISR中的節(jié)點(diǎn)起不起來(lái)或者數(shù)據(jù)都是了,那集群就永遠(yuǎn)恢復(fù)不了了厌漂。如果等待ISR意外的節(jié)點(diǎn)恢復(fù)萨醒,這個(gè)節(jié)點(diǎn)的數(shù)據(jù)就會(huì)被作為線上數(shù)據(jù),有可能和真實(shí)的數(shù)據(jù)有所出入苇倡,因?yàn)橛行?shù)據(jù)它可能還沒(méi)同步到富纸。Kafka目前選擇了第二種策略,在未來(lái)的版本中將使這個(gè)策略的選擇可配置旨椒,可以根據(jù)場(chǎng)景靈活的選擇晓褪。

這種窘境不只Kafka會(huì)遇到,幾乎所有的分布式數(shù)據(jù)系統(tǒng)都會(huì)遇到综慎。

10.副本管理

以上僅僅以一個(gè)topic一個(gè)分區(qū)為例子進(jìn)行了討論涣仿,但實(shí)際上一個(gè)Kafka將會(huì)管理成千上萬(wàn)的topic分區(qū).Kafka盡量的使所有分區(qū)均勻的分布到集群所有的節(jié)點(diǎn)上而不是集中在某些節(jié)點(diǎn)上,另外主從關(guān)系也盡量均衡這樣每個(gè)幾點(diǎn)都會(huì)擔(dān)任一定比例的分區(qū)的leader.

優(yōu)化leader的選擇過(guò)程也是很重要的示惊,它決定了系統(tǒng)發(fā)生故障時(shí)的空窗期有多久好港。Kafka選擇一個(gè)節(jié)點(diǎn)作為“controller”,當(dāng)發(fā)現(xiàn)有節(jié)點(diǎn)down掉的時(shí)候它負(fù)責(zé)在游泳分區(qū)的所有節(jié)點(diǎn)中選擇新的leader,這使得Kafka可以批量的高效的管理所有分區(qū)節(jié)點(diǎn)的主從關(guān)系。如果controller down掉了涝涤,活著的節(jié)點(diǎn)中的一個(gè)會(huì)備切換為新的controller.

11.Leader與副本同步

對(duì)于某個(gè)分區(qū)來(lái)說(shuō)媚狰,保存正分區(qū)的"broker"為該分區(qū)的"leader",保存?zhèn)浞莘謪^(qū)的"broker"為該分區(qū)的"follower"阔拳。備份分區(qū)會(huì)完全復(fù)制正分區(qū)的消息崭孤,包括消息的編號(hào)等附加屬性值。為了保持正分區(qū)和備份分區(qū)的內(nèi)容一致糊肠,Kafka采取的方案是在保存?zhèn)浞莘謪^(qū)的"broker"上開(kāi)啟一個(gè)消費(fèi)者進(jìn)程進(jìn)行消費(fèi)辨宠,從而使得正分區(qū)的內(nèi)容與備份分區(qū)的內(nèi)容保持一致。一般情況下货裹,一個(gè)分區(qū)有一個(gè)“正分區(qū)”和零到多個(gè)“備份分區(qū)”嗤形。可以配置“正分區(qū)+備份分區(qū)”的總數(shù)量弧圆,關(guān)于這個(gè)配置赋兵,不同主題可以有不同的配置值笔咽。注意,生產(chǎn)者霹期,消費(fèi)者只與保存正分區(qū)的"leader"進(jìn)行通信叶组。

Kafka允許topic的分區(qū)擁有若干副本,這個(gè)數(shù)量是可以配置的历造,你可以為每個(gè)topic配置副本的數(shù)量甩十。Kafka會(huì)自動(dòng)在每個(gè)副本上備份數(shù)據(jù),所以當(dāng)一個(gè)節(jié)點(diǎn)down掉時(shí)數(shù)據(jù)依然是可用的吭产。

Kafka的副本功能不是必須的侣监,你可以配置只有一個(gè)副本,這樣其實(shí)就相當(dāng)于只有一份數(shù)據(jù)臣淤。

創(chuàng)建副本的單位是topic的分區(qū)橄霉,每個(gè)分區(qū)都有一個(gè)leader和零或多個(gè)followers.所有的讀寫(xiě)操作都由leader處理,一般分區(qū)的數(shù)量都比broker的數(shù)量多的多荒典,各分區(qū)的leader均勻的分布在brokers中酪劫。所有的followers都復(fù)制leader的日志吞鸭,日志中的消息和順序都和leader中的一致寺董。followers向普通的consumer那樣從leader那里拉取消息并保存在自己的日志文件中。

許多分布式的消息系統(tǒng)自動(dòng)的處理失敗的請(qǐng)求刻剥,它們對(duì)一個(gè)節(jié)點(diǎn)是否著(alive)”有著清晰的定義遮咖。Kafka判斷一個(gè)節(jié)點(diǎn)是否活著有兩個(gè)條件:

1. 節(jié)點(diǎn)必須可以維護(hù)和ZooKeeper的連接,Zookeeper通過(guò)心跳機(jī)制檢查每個(gè)節(jié)點(diǎn)的連接造虏。

2. 如果節(jié)點(diǎn)是個(gè)follower,他必須能及時(shí)的同步leader的寫(xiě)操作御吞,延時(shí)不能太久。

符合以上條件的節(jié)點(diǎn)準(zhǔn)確的說(shuō)應(yīng)該是“同步中的(in sync)”漓藕,而不是模糊的說(shuō)是“活著的”或是“失敗的”陶珠。Leader會(huì)追蹤所有“同步中”的節(jié)點(diǎn),一旦一個(gè)down掉了享钞,或是卡住了揍诽,或是延時(shí)太久,leader就會(huì)把它移除栗竖。至于延時(shí)多久算是“太久”暑脆,是由參數(shù)replica.lag.max.messages決定的,怎樣算是卡住了狐肢,怎是由參數(shù)replica.lag.time.max.ms決定的添吗。

只有當(dāng)消息被所有的副本加入到日志中時(shí),才算是“committed”份名,只有committed的消息才會(huì)發(fā)送給consumer碟联,這樣就不用擔(dān)心一旦leader down掉了消息會(huì)丟失妓美。Producer也可以選擇是否等待消息被提交的通知,這個(gè)是由參數(shù)acks決定的鲤孵。

Kafka保證只要有一個(gè)“同步中”的節(jié)點(diǎn)部脚,“committed”的消息就不會(huì)丟失。

2.3 ?kafka拓?fù)浣Y(jié)構(gòu)

一個(gè)典型的Kafka集群中包含若干Producer(可以是web前端FET裤纹,或者是服務(wù)器日志等)委刘,若干broker(Kafka支持水平擴(kuò)展,一般broker數(shù)量越多鹰椒,集群吞吐率越高)锡移,若干ConsumerGroup,以及一個(gè)Zookeeper集群漆际。Kafka通過(guò)Zookeeper管理Kafka集群配置:選舉Kafka broker的leader淆珊,以及在Consumer Group發(fā)生變化時(shí)進(jìn)行rebalance,因?yàn)閏onsumer消費(fèi)kafka topic的partition的offsite信息是存在Zookeeper的奸汇。Producer使用push模式將消息發(fā)布到broker施符,Consumer使用pull模式從broker訂閱并消費(fèi)消息。

分析過(guò)程分為以下4個(gè)步驟:

topic中partition存儲(chǔ)分布

partiton中文件存儲(chǔ)方式 (partition在linux服務(wù)器上就是一個(gè)目錄(文件夾))

partiton中segment文件存儲(chǔ)結(jié)構(gòu)

在partition中如何通過(guò)offset查找message

通過(guò)上述4過(guò)程詳細(xì)分析擂找,我們就可以清楚認(rèn)識(shí)到kafka文件存儲(chǔ)機(jī)制的奧秘戳吝。

2.3 topic中partition存儲(chǔ)分布

假設(shè)實(shí)驗(yàn)環(huán)境中Kafka集群只有一個(gè)broker,xxx/message-folder為數(shù)據(jù)文件存儲(chǔ)根目錄贯涎,在Kafka broker中server.properties文件配置(參數(shù)log.dirs=xxx/message-folder)听哭,例如創(chuàng)建2個(gè)topic名 稱(chēng)分別為report_push、launch_info, partitions數(shù)量都為partitions=4

存儲(chǔ)路徑和目錄規(guī)則為:

xxx/message-folder

|--report_push-0

|--report_push-1

|--report_push-2

|--report_push-3

|--launch_info-0

|--launch_info-1

|--launch_info-2

|--launch_info-3

在Kafka文件存儲(chǔ)中塘雳,同一個(gè)topic下有多個(gè)不同partition陆盘,每個(gè)partition為一個(gè)目錄,partiton命名規(guī)則為topic名稱(chēng)+有序序號(hào)败明,第一個(gè)partiton序號(hào)從0開(kāi)始隘马,序號(hào)最大值為partitions數(shù)量減1。

消息發(fā)送時(shí)都被發(fā)送到一個(gè)topic妻顶,其本質(zhì)就是一個(gè)目錄酸员,而topic由是由一些Partition組成,其組織結(jié)構(gòu)如下圖所示:

我們可以看到,Partition是一個(gè)Queue的結(jié)構(gòu)盈包,每個(gè)Partition中的消息都是有序的沸呐,生產(chǎn)的消息被不斷追加到Partition上,其中的每一個(gè)消息都被賦予了一個(gè)唯一的offset值呢燥。

Kafka集群會(huì)保存所有的消息崭添,不管消息有沒(méi)有被消費(fèi);我們可以設(shè)定消息的過(guò)期時(shí)間叛氨,只有過(guò)期的數(shù)據(jù)才會(huì)被自動(dòng)清除以釋放磁盤(pán)空間呼渣。比如我們?cè)O(shè)置消息過(guò)期時(shí)間為2天棘伴,那么這2天內(nèi)的所有消息都會(huì)被保存到集群中,數(shù)據(jù)只有超過(guò)了兩天才會(huì)被清除屁置。

Kafka只維護(hù)在Partition中的offset值焊夸,因?yàn)檫@個(gè)offsite標(biāo)識(shí)著這個(gè)partition的message消費(fèi)到哪條了。Consumer每消費(fèi)一個(gè)消息蓝角,offset就會(huì)加1阱穗。其實(shí)消息的狀態(tài)完全是由Consumer控制的,Consumer可以跟蹤和重設(shè)這個(gè)offset值使鹅,這樣的話Consumer就可以讀取任意位置的消息揪阶。

把消息日志以Partition的形式存放有多重考慮,第一患朱,方便在集群中擴(kuò)展鲁僚,每個(gè)Partition可以通過(guò)調(diào)整以適應(yīng)它所在的機(jī)器,而一個(gè)topic又可以有多個(gè)Partition組成裁厅,因此整個(gè)集群就可以適應(yīng)任意大小的數(shù)據(jù)了冰沙;第二就是可以提高并發(fā),因?yàn)榭梢砸訮artition為單位讀寫(xiě)了执虹。

通過(guò)上面介紹的我們可以知道拓挥,kafka中的數(shù)據(jù)是持久化的并且能夠容錯(cuò)的。Kafka允許用戶(hù)為每個(gè)topic設(shè)置副本數(shù)量声畏,副本數(shù)量決定了有幾個(gè)broker來(lái)存放寫(xiě)入的數(shù)據(jù)撞叽。如果你的副本數(shù)量設(shè)置為3,那么一份數(shù)據(jù)就會(huì)被存放在3臺(tái)不同的機(jī)器上插龄,那么就允許有2個(gè)機(jī)器失敗。一般推薦副本數(shù)量至少為2科展,這樣就可以保證增減均牢、重啟機(jī)器時(shí)不會(huì)影響到數(shù)據(jù)消費(fèi)。如果對(duì)數(shù)據(jù)持久化有更高的要求才睹,可以把副本數(shù)量設(shè)置為3或者更多徘跪。

Kafka中的topic是以partition的形式存放的,每一個(gè)topic都可以設(shè)置它的partition數(shù)量琅攘,Partition的數(shù)量決定了組成topic的message的數(shù)量垮庐。Producer在生產(chǎn)數(shù)據(jù)時(shí),會(huì)按照一定規(guī)則(這個(gè)規(guī)則是可以自定義的)把消息發(fā)布到topic的各個(gè)partition中坞琴。上面將的副本都是以partition為單位的哨查,不過(guò)只有一個(gè)partition的副本會(huì)被選舉成leader作為讀寫(xiě)用。

關(guān)于如何設(shè)置partition值需要考慮的因素剧辐。一個(gè)partition只能被一個(gè)消費(fèi)者消費(fèi)(一個(gè)消費(fèi)者可以同時(shí)消費(fèi)多個(gè)partition)寒亥,因此邮府,如果設(shè)置的partition的數(shù)量小于consumer的數(shù)量,就會(huì)有消費(fèi)者消費(fèi)不到數(shù)據(jù)溉奕。所以褂傀,推薦partition的數(shù)量一定要大于同時(shí)運(yùn)行的consumer的數(shù)量。另外一方面加勤,建議partition的數(shù)量大于集群broker的數(shù)量仙辟,這樣leader partition就可以均勻的分布在各個(gè)broker中,最終使得集群負(fù)載均衡鳄梅。在Cloudera,每個(gè)topic都有上百個(gè)partition欺嗤。需要注意的是,kafka需要為每個(gè)partition分配一些內(nèi)存來(lái)緩存消息數(shù)據(jù)卫枝,如果partition數(shù)量越大煎饼,就要為kafka分配更大的heap space。

2.4 partiton中文件存儲(chǔ)方式

每個(gè)partion(目錄)相當(dāng)于一個(gè)巨型文件被平均分配到多個(gè)大小相等segment(段)數(shù)據(jù)文件中校赤。但每個(gè)段segment file消息數(shù)量不一定相等吆玖,這種特性方便old segment file快速被刪除。

每個(gè)partiton只需要支持順序讀寫(xiě)就行了马篮,segment文件生命周期由服務(wù)端配置參數(shù)決定沾乘。

這樣做的好處就是能快速刪除無(wú)用文件,有效提高磁盤(pán)利用率浑测。

2.5 partiton中segment文件存儲(chǔ)結(jié)構(gòu)

producer發(fā)message到某個(gè)topic翅阵,message會(huì)被均勻的分布到多個(gè)partition上(隨機(jī)或根據(jù)用戶(hù)指定的回調(diào)函數(shù)進(jìn)行分布),kafka broker收到message往對(duì)應(yīng)partition的最后一個(gè)segment上添加該消息迁央,當(dāng)某個(gè)segment上的消息條數(shù)達(dá)到配置值或消息發(fā)布時(shí)間超過(guò)閾值時(shí)掷匠,segment上的消息會(huì)被flush到磁盤(pán),只有flush到磁盤(pán)上的消息consumer才能消費(fèi)岖圈,segment達(dá)到一定的大小后將不會(huì)再往該segment寫(xiě)數(shù)據(jù)讹语,broker會(huì)創(chuàng)建新的segment。

每個(gè)part在內(nèi)存中對(duì)應(yīng)一個(gè)index蜂科,記錄每個(gè)segment中的第一條消息偏移顽决。

segment file組成:由2大部分組成,分別為index file和data file导匣,此2個(gè)文件一一對(duì)應(yīng)才菠,成對(duì)出現(xiàn),后綴".index"和“.log”分別表示為segment索引文件贡定、數(shù)據(jù)文件.

segment文件命名規(guī)則:partion全局的第一個(gè)segment從0開(kāi)始赋访,后續(xù)每個(gè)segment文件名為上一個(gè)全局partion的最大offset(偏移message數(shù))。數(shù)值最大為64位long大小,19位數(shù)字字符長(zhǎng)度进每,沒(méi)有數(shù)字用0填充汹粤。

每個(gè)segment中存儲(chǔ)很多條消息,消息id由其邏輯位置決定田晚,即從消息id可直接定位到消息的存儲(chǔ)位置嘱兼,避免id到位置的額外映射。

下面文件列表是筆者在Kafka broker上做的一個(gè)實(shí)驗(yàn)贤徒,創(chuàng)建一個(gè)topicXXX包含1 partition芹壕,設(shè)置每個(gè)segment大小為500MB,并啟動(dòng)producer向Kafka broker寫(xiě)入大量數(shù)據(jù),如下圖2所示segment文件列表形象說(shuō)明了上述2個(gè)規(guī)則:

以上述圖2中一對(duì)segment file文件為例,說(shuō)明segment中index<—->data file對(duì)應(yīng)關(guān)系物理結(jié)構(gòu)如下:

上述圖3中索引文件存儲(chǔ)大量元數(shù)據(jù)接奈,數(shù)據(jù)文件存儲(chǔ)大量消息踢涌,索引文件中元數(shù)據(jù)指向?qū)?yīng)數(shù)據(jù)文件中message的物理偏移地址。其中以索引文件中 元數(shù)據(jù)3,497為例序宦,依次在數(shù)據(jù)文件中表示第3個(gè)message(在全局partiton表示第368772個(gè)message)睁壁、以及該消息的物理偏移 地址為497。

從上述圖3了解到segment data file由許多message組成互捌,下面詳細(xì)說(shuō)明message物理結(jié)構(gòu)如下:

參數(shù)說(shuō)明:

關(guān)鍵字解釋說(shuō)明

8 byte offset在parition(分區(qū))內(nèi)的每條消息都有一個(gè)有序的id號(hào)潘明,這個(gè)id號(hào)被稱(chēng)為偏移(offset),它可以唯一確定每條消息在parition(分區(qū))內(nèi)的位置。即offset表示partiion的第多少message

4 byte message sizemessage大小

4 byte CRC32用crc32校驗(yàn)message

1 byte “magic"表示本次發(fā)布Kafka服務(wù)程序協(xié)議版本號(hào)

1 byte “attributes"表示為獨(dú)立版本秕噪、或標(biāo)識(shí)壓縮類(lèi)型钳降、或編碼類(lèi)型。

4 byte key length表示key的長(zhǎng)度,當(dāng)key為-1時(shí)腌巾,K byte key字段不填

K byte key可選

value bytes payload表示實(shí)際消息數(shù)據(jù)遂填。

2.6 在partition中如何通過(guò)offset查找message

例如讀取offset=368776的message,需要通過(guò)下面2個(gè)步驟查找澈蝙。

第一步查找segment file

上述圖2為例田篇,其中00000000000000000000.index表示最開(kāi)始的文件展姐,起始偏移量(offset)為0.第二個(gè)文件 00000000000000368769.index的消息量起始偏移量為368770 = 368769 + 1.同樣郑什,第三個(gè)文件00000000000000737337.index的起始偏移量為737338=737337 + 1赎瞎,其他后續(xù)文件依次類(lèi)推怔毛,以起始偏移量命名并排序這些文件独悴,只要根據(jù)offset **二分查找**文件列表骗灶,就可以快速定位到具體文件艘策。

當(dāng)offset=368776時(shí)定位到00000000000000368769.index|log

第二步通過(guò)segment file查找message通過(guò)第一步定位到segment file况褪,當(dāng)offset=368776時(shí)撕贞,依次定位到00000000000000368769.index的元數(shù)據(jù)物理位置和 00000000000000368769.log的物理偏移地址,然后再通過(guò)00000000000000368769.log順序查找直到 offset=368776為止测垛。

segment index file采取稀疏索引存儲(chǔ)方式捏膨,它減少索引文件大小,通過(guò)mmap可以直接內(nèi)存操作,稀疏索引為數(shù)據(jù)文件的每個(gè)對(duì)應(yīng)message設(shè)置一個(gè)元數(shù)據(jù)指針,它 比稠密索引節(jié)省了更多的存儲(chǔ)空間号涯,但查找起來(lái)需要消耗更多的時(shí)間目胡。

kafka會(huì)記錄offset到zk中。但是链快,zk client api對(duì)zk的頻繁寫(xiě)入是一個(gè)低效的操作誉己。0.8.2 kafka引入了native offset storage,將offset管理從zk移出域蜗,并且可以做到水平擴(kuò)展巨双。其原理就是利用了kafka的compacted topic,offset以consumer group,topic與partion的組合作為key直接提交到compacted topic中霉祸。同時(shí)Kafka又在內(nèi)存中維護(hù)了的三元組來(lái)維護(hù)最新的offset信息筑累,consumer來(lái)取最新offset信息的時(shí)候直接內(nèi)存里拿即可。當(dāng)然丝蹭,kafka允許你快速的checkpoint最新的offset信息到磁盤(pán)上慢宗。

3.Partition Replication原則

Kafka高效文件存儲(chǔ)設(shè)計(jì)特點(diǎn)

Kafka把topic中一個(gè)parition大文件分成多個(gè)小文件段,通過(guò)多個(gè)小文件段奔穿,就容易定期清除或刪除已經(jīng)消費(fèi)完文件镜沽,減少磁盤(pán)占用。

通過(guò)索引信息可以快速定位message和確定response的最大大小巫橄。

通過(guò)index元數(shù)據(jù)全部映射到memory淘邻,可以避免segment file的IO磁盤(pán)操作。

通過(guò)索引文件稀疏存儲(chǔ)湘换,可以大幅降低index文件元數(shù)據(jù)占用空間大小宾舅。

1. Kafka集群partition?replication默認(rèn)自動(dòng)分配分析

下面以一個(gè)Kafka集群中4個(gè)Broker舉例,創(chuàng)建1個(gè)topic包含4個(gè)Partition彩倚,2 Replication筹我;數(shù)據(jù)Producer流動(dòng)如圖所示:

(1)

(2)當(dāng)集群中新增2節(jié)點(diǎn),Partition增加到6個(gè)時(shí)分布情況如下:

副本分配邏輯規(guī)則如下:

在Kafka集群中帆离,每個(gè)Broker都有均等分配Partition的Leader機(jī)會(huì)蔬蕊。

上述圖Broker Partition中,箭頭指向?yàn)楦北靖绻龋訮artition-0為例:broker1中parition-0為L(zhǎng)eader岸夯,Broker2中Partition-0為副本。

上述圖種每個(gè)Broker(按照BrokerId有序)依次分配主Partition,下一個(gè)Broker為副本们妥,如此循環(huán)迭代分配猜扮,多副本都遵循此規(guī)則。

副本分配算法如下:

將所有N Broker和待分配的i個(gè)Partition排序.

將第i個(gè)Partition分配到第(i mod n)個(gè)Broker上.

將第i個(gè)Partition的第j個(gè)副本分配到第((i + j) mod n)個(gè)Broker上.

4.Kafka Broker一些特性

4.1 無(wú)狀態(tài)的Kafka Broker :

1. Broker沒(méi)有副本機(jī)制监婶,一旦broker宕機(jī)旅赢,該broker的消息將都不可用齿桃。

2. Broker不保存訂閱者的狀態(tài),由訂閱者自己保存煮盼。

3. 無(wú)狀態(tài)導(dǎo)致消息的刪除成為難題(可能刪除的消息正在被訂閱)短纵,kafka采用基于時(shí)間的SLA(服務(wù)水平保證),消息保存一定時(shí)間(通常為7天)后會(huì)被刪除僵控。

4. 消息訂閱者可以rewind back到任意位置重新進(jìn)行消費(fèi)香到,當(dāng)訂閱者故障時(shí),可以選擇最小的offset進(jìn)行重新讀取消費(fèi)消息喉祭。

4.2 message的交付與生命周期 :

1. 不是嚴(yán)格的JMS养渴, 因此kafka對(duì)消息的重復(fù)、丟失泛烙、錯(cuò)誤以及順序型沒(méi)有嚴(yán)格的要求理卑。(這是與AMQ最大的區(qū)別)

2. kafka提供at-least-once delivery,即當(dāng)consumer宕機(jī)后,有些消息可能會(huì)被重復(fù)delivery蔽氨。

3. 因每個(gè)partition只會(huì)被consumer group內(nèi)的一個(gè)consumer消費(fèi)藐唠,故kafka保證每個(gè)partition內(nèi)的消息會(huì)被順序的訂閱。

4. Kafka為每條消息為每條消息計(jì)算CRC校驗(yàn)鹉究,用于錯(cuò)誤檢測(cè)宇立,crc校驗(yàn)不通過(guò)的消息會(huì)直接被丟棄掉。

4.3 壓縮

Kafka支持以集合(batch)為單位發(fā)送消息自赔,在此基礎(chǔ)上妈嘹,Kafka還支持對(duì)消息集合進(jìn)行壓縮,Producer端可以通過(guò)GZIP或Snappy格式對(duì)消息集合進(jìn)行壓縮绍妨。Producer端進(jìn)行壓縮之后润脸,在Consumer端需進(jìn)行解壓。壓縮的好處就是減少傳輸?shù)臄?shù)據(jù)量他去,減輕對(duì)網(wǎng)絡(luò)傳輸?shù)膲毫Ρ醒保趯?duì)大數(shù)據(jù)處理上,瓶頸往往體現(xiàn)在網(wǎng)絡(luò)上而不是CPU灾测。

那么如何區(qū)分消息是壓縮的還是未壓縮的呢爆价,Kafka在消息頭部添加了一個(gè)描述壓縮屬性字節(jié),這個(gè)字節(jié)的后兩位表示消息的壓縮采用的編碼媳搪,如果后兩位為0铭段,則表示消息未被壓縮。

4.4 消息可靠性

在消息系統(tǒng)中秦爆,保證消息在生產(chǎn)和消費(fèi)過(guò)程中的可靠性是十分重要的稠项,在實(shí)際消息傳遞過(guò)程中,可能會(huì)出現(xiàn)如下三中情況:

- 一個(gè)消息發(fā)送失敗

- 一個(gè)消息被發(fā)送多次

- 最理想的情況:exactly-once ,一個(gè)消息發(fā)送成功且僅發(fā)送了一次

有許多系統(tǒng)聲稱(chēng)它們實(shí)現(xiàn)了exactly-once鲜结,但是它們其實(shí)忽略了生產(chǎn)者或消費(fèi)者在生產(chǎn)和消費(fèi)過(guò)程中有可能失敗的情況。比如雖然一個(gè)Producer成功發(fā)送一個(gè)消息,但是消息在發(fā)送途中丟失精刷,或者成功發(fā)送到broker拗胜,也被consumer成功取走,但是這個(gè)consumer在處理取過(guò)來(lái)的消息時(shí)失敗了怒允。

從Producer端看:Kafka是這么處理的埂软,當(dāng)一個(gè)消息被發(fā)送后,Producer會(huì)等待broker成功接收到消息的反饋(可通過(guò)參數(shù)控制等待時(shí)間)纫事,如果消息在途中丟失或是其中一個(gè)broker掛掉勘畔,Producer會(huì)重新發(fā)送(我們知道Kafka有備份機(jī)制,可以通過(guò)參數(shù)控制是否等待所有備份節(jié)點(diǎn)都收到消息)丽惶。

從Consumer端看:前面講到過(guò)partition炫七,broker端記錄了partition中的一個(gè)offset值,這個(gè)值指向Consumer下一個(gè)即將消費(fèi)message钾唬。當(dāng)Consumer收到了消息万哪,但卻在處理過(guò)程中掛掉,此時(shí)Consumer可以通過(guò)這個(gè)offset值重新找到上一個(gè)消息再進(jìn)行處理抡秆。Consumer還有權(quán)限控制這個(gè)offset值奕巍,對(duì)持久化到broker端的消息做任意處理。

4.5 備份機(jī)制

備份機(jī)制是Kafka0.8版本的新特性儒士,備份機(jī)制的出現(xiàn)大大提高了Kafka集群的可靠性的止、穩(wěn)定性。有了備份機(jī)制后着撩,Kafka允許集群中的節(jié)點(diǎn)掛掉后而不影響整個(gè)集群工作诅福。一個(gè)備份數(shù)量為n的集群允許n-1個(gè)節(jié)點(diǎn)失敗。在所有備份節(jié)點(diǎn)中睹酌,有一個(gè)節(jié)點(diǎn)作為lead節(jié)點(diǎn)权谁,這個(gè)節(jié)點(diǎn)保存了其它備份節(jié)點(diǎn)列表,并維持各個(gè)備份間的狀體同步憋沿。下面這幅圖解釋了Kafka的備份機(jī)制:

4.6 Kafka高效性相關(guān)設(shè)計(jì)

4.6.1 消息的持久化

Kafka高度依賴(lài)文件系統(tǒng)來(lái)存儲(chǔ)和緩存消息(AMQ的nessage是持久化到mysql數(shù)據(jù)庫(kù)中的)旺芽,因?yàn)橐话愕娜苏J(rèn)為磁盤(pán)是緩慢的,這導(dǎo)致人們對(duì)持久化結(jié)構(gòu)具有競(jìng)爭(zhēng)性持懷疑態(tài)度辐啄。其實(shí)采章,磁盤(pán)的快或者慢,這決定于我們?nèi)绾问褂么疟P(pán)壶辜。因?yàn)榇疟P(pán)線性寫(xiě)的速度遠(yuǎn)遠(yuǎn)大于隨機(jī)寫(xiě)悯舟。線性讀寫(xiě)在大多數(shù)應(yīng)用場(chǎng)景下是可以預(yù)測(cè)的。

4.6.2 常數(shù)時(shí)間性能保證

每個(gè)Topic的Partition的是一個(gè)大文件夾砸民,里面有無(wú)數(shù)個(gè)小文件夾segment抵怎,但partition是一個(gè)隊(duì)列奋救,隊(duì)列中的元素是segment,消費(fèi)的時(shí)候先從第0個(gè)segment開(kāi)始消費(fèi),新來(lái)message存在最后一個(gè)消息隊(duì)列中反惕。對(duì)于segment也是對(duì)隊(duì)列尝艘,隊(duì)列元素是message,有對(duì)應(yīng)的offsite標(biāo)識(shí)是哪個(gè)message。消費(fèi)的時(shí)候先從這個(gè)segment的第一個(gè)message開(kāi)始消費(fèi)姿染,新來(lái)的message存在segment的最后背亥。

消息系統(tǒng)的持久化隊(duì)列可以構(gòu)建在對(duì)一個(gè)文件的讀和追加上,就像一般情況下的日志解決方案悬赏。它有一個(gè)優(yōu)點(diǎn)狡汉,所有的操作都是常數(shù)時(shí)間,并且讀寫(xiě)之間不會(huì)相互阻塞闽颇。這種設(shè)計(jì)具有極大的性能優(yōu)勢(shì):最終系統(tǒng)性能和數(shù)據(jù)大小完全無(wú)關(guān)盾戴,服務(wù)器可以充分利用廉價(jià)的硬盤(pán)來(lái)提供高效的消息服務(wù)。

事實(shí)上還有一點(diǎn)进萄,磁盤(pán)空間的無(wú)限增大而不影響性能這點(diǎn)捻脖,意味著我們可以提供一般消息系統(tǒng)無(wú)法提供的特性。比如說(shuō)中鼠,消息被消費(fèi)后不是立馬被刪除可婶,我們可以將這些消息保留一段相對(duì)比較長(zhǎng)的時(shí)間(比如一個(gè)星期)。

5.Kafka 生產(chǎn)者-消費(fèi)者

消息系統(tǒng)通常都會(huì)由生產(chǎn)者援雇,消費(fèi)者矛渴,Broker三大部分組成,生產(chǎn)者會(huì)將消息寫(xiě)入到Broker惫搏,消費(fèi)者會(huì)從Broker中讀取出消息具温,不同的MQ實(shí)現(xiàn)的Broker實(shí)現(xiàn)會(huì)有所不同,不過(guò)Broker的本質(zhì)都是要負(fù)責(zé)將消息落地到服務(wù)端的存儲(chǔ)系統(tǒng)中筐赔。具體步驟如下:

生產(chǎn)者客戶(hù)端應(yīng)用程序產(chǎn)生消息:

客戶(hù)端連接對(duì)象將消息包裝到請(qǐng)求中發(fā)送到服務(wù)端

服務(wù)端的入口也有一個(gè)連接對(duì)象負(fù)責(zé)接收請(qǐng)求铣猩,并將消息以文件的形式存儲(chǔ)起來(lái)

服務(wù)端返回響應(yīng)結(jié)果給生產(chǎn)者客戶(hù)端

消費(fèi)者客戶(hù)端應(yīng)用程序消費(fèi)消息:

客戶(hù)端連接對(duì)象將消費(fèi)信息也包裝到請(qǐng)求中發(fā)送給服務(wù)端

服務(wù)端從文件存儲(chǔ)系統(tǒng)中取出消息

服務(wù)端返回響應(yīng)結(jié)果給消費(fèi)者客戶(hù)端

客戶(hù)端將響應(yīng)結(jié)果還原成消息并開(kāi)始處理消息

圖4-1?客戶(hù)端和服務(wù)端交互

5.1 ?Producers

Producers直接發(fā)送消息到broker上的leader partition,不需要經(jīng)過(guò)任何中介或其他路由轉(zhuǎn)發(fā)茴丰。為了實(shí)現(xiàn)這個(gè)特性达皿,kafka集群中的每個(gè)broker都可以響應(yīng)producer的請(qǐng)求,并返回topic的一些元信息贿肩,這些元信息包括哪些機(jī)器是存活的峦椰,topic的leader partition都在哪,現(xiàn)階段哪些leader partition是可以直接被訪問(wèn)的汰规。

Producer客戶(hù)端自己控制著消息被推送到哪些partition汤功。實(shí)現(xiàn)的方式可以是隨機(jī)分配、實(shí)現(xiàn)一類(lèi)隨機(jī)負(fù)載均衡算法溜哮,或者指定一些分區(qū)算法滔金。Kafka提供了接口供用戶(hù)實(shí)現(xiàn)自定義的partition色解,用戶(hù)可以為每個(gè)消息指定一個(gè)partitionKey,通過(guò)這個(gè)key來(lái)實(shí)現(xiàn)一些hash分區(qū)算法鹦蠕。比如冒签,把userid作為partitionkey的話,相同userid的消息將會(huì)被推送到同一個(gè)partition钟病。

以Batch的方式推送數(shù)據(jù)可以極大的提高處理效率,kafka Producer 可以將消息在內(nèi)存中累計(jì)到一定數(shù)量后作為一個(gè)batch發(fā)送請(qǐng)求刚梭。Batch的數(shù)量大小可以通過(guò)Producer的參數(shù)控制肠阱,參數(shù)值可以設(shè)置為累計(jì)的消息的數(shù)量(如500條)、累計(jì)的時(shí)間間隔(如100ms)或者累計(jì)的數(shù)據(jù)大小(64KB)朴读。通過(guò)增加batch的大小屹徘,可以減少網(wǎng)絡(luò)請(qǐng)求和磁盤(pán)IO的次數(shù),當(dāng)然具體參數(shù)設(shè)置需要在效率和時(shí)效性方面做一個(gè)權(quán)衡衅金。

Producers可以異步的并行的向kafka發(fā)送消息噪伊,但是通常producer在發(fā)送完消息之后會(huì)得到一個(gè)future響應(yīng),返回的是offset值或者發(fā)送過(guò)程中遇到的錯(cuò)誤氮唯。這其中有個(gè)非常重要的參數(shù)“acks”,這個(gè)參數(shù)決定了producer要求leader partition 收到確認(rèn)的副本個(gè)數(shù)鉴吹,如果acks設(shè)置數(shù)量為0,表示producer不會(huì)等待broker的響應(yīng)惩琉,所以豆励,producer無(wú)法知道消息是否發(fā)送成功,這樣有可能會(huì)導(dǎo)致數(shù)據(jù)丟失瞒渠,但同時(shí)良蒸,acks值為0會(huì)得到最大的系統(tǒng)吞吐量。

若acks設(shè)置為1伍玖,表示producer會(huì)在leader partition收到消息時(shí)得到broker的一個(gè)確認(rèn)嫩痰,這樣會(huì)有更好的可靠性,因?yàn)榭蛻?hù)端會(huì)等待直到broker確認(rèn)收到消息窍箍。若設(shè)置為-1串纺,producer會(huì)在所有備份的partition收到消息時(shí)得到broker的確認(rèn),這個(gè)設(shè)置可以得到最高的可靠性保證仔燕。

Kafka 消息有一個(gè)定長(zhǎng)的header和變長(zhǎng)的字節(jié)數(shù)組組成造垛。因?yàn)閗afka消息支持字節(jié)數(shù)組,也就使得kafka可以支持任何用戶(hù)自定義的序列號(hào)格式或者其它已有的格式如Apache Avro晰搀、protobuf等五辽。Kafka沒(méi)有限定單個(gè)消息的大小,但我們推薦消息大小不要超過(guò)1MB,通常一般消息大小都在1~10kB之前外恕。

發(fā)布消息時(shí)杆逗,kafka client先構(gòu)造一條消息乡翅,將消息加入到消息集set中(kafka支持批量發(fā)布,可以往消息集合中添加多條消息罪郊,一次行發(fā)布)蠕蚜,send消息時(shí),producer client需指定消息所屬的topic悔橄。

5.2 ?Consumers

Kafka提供了兩套consumer api靶累,分為high-level api和sample-api。Sample-api 是一個(gè)底層的API癣疟,它維持了一個(gè)和單一broker的連接挣柬,并且這個(gè)API是完全無(wú)狀態(tài)的,每次請(qǐng)求都需要指定offset值睛挚,因此邪蛔,這套API也是最靈活的。

在kafka中扎狱,當(dāng)前讀到哪條消息的offset值是由consumer來(lái)維護(hù)的侧到,因此,consumer可以自己決定如何讀取kafka中的數(shù)據(jù)淤击。比如匠抗,consumer可以通過(guò)重設(shè)offset值來(lái)重新消費(fèi)已消費(fèi)過(guò)的數(shù)據(jù)。不管有沒(méi)有被消費(fèi)遭贸,kafka會(huì)保存數(shù)據(jù)一段時(shí)間戈咳,這個(gè)時(shí)間周期是可配置的,只有到了過(guò)期時(shí)間壕吹,kafka才會(huì)刪除這些數(shù)據(jù)著蛙。(這一點(diǎn)與AMQ不一樣,AMQ的message一般來(lái)說(shuō)都是持久化到mysql中的耳贬,消費(fèi)完的message會(huì)被delete掉)

High-level API封裝了對(duì)集群中一系列broker的訪問(wèn)踏堡,可以透明的消費(fèi)一個(gè)topic。它自己維持了已消費(fèi)消息的狀態(tài)咒劲,即每次消費(fèi)的都是下一個(gè)消息顷蟆。

High-level API還支持以組的形式消費(fèi)topic,如果consumers有同一個(gè)組名腐魂,那么kafka就相當(dāng)于一個(gè)隊(duì)列消息服務(wù)帐偎,而各個(gè)consumer均衡的消費(fèi)相應(yīng)partition中的數(shù)據(jù)。若consumers有不同的組名蛔屹,那么此時(shí)kafka就相當(dāng)與一個(gè)廣播服務(wù)削樊,會(huì)把topic中的所有消息廣播到每個(gè)consumer。

High level api和Low level api是針對(duì)consumer而言的,和producer無(wú)關(guān)漫贞。

High level api是consumer讀的partition的offsite是存在zookeeper上甸箱。High level api會(huì)啟動(dòng)另外一個(gè)線程去每隔一段時(shí)間,offsite自動(dòng)同步到zookeeper上迅脐。換句話說(shuō)芍殖,如果使用了High level api, 每個(gè)message只能被讀一次谴蔑,一旦讀了這條message之后豌骏,無(wú)論我consumer的處理是否ok。High level api的另外一個(gè)線程會(huì)自動(dòng)的把offiste+1同步到zookeeper上树碱。如果consumer讀取數(shù)據(jù)出了問(wèn)題肯适,offsite也會(huì)在zookeeper上同步。因此成榜,如果consumer處理失敗了,會(huì)繼續(xù)執(zhí)行下一條蹦玫。這往往是不對(duì)的行為赎婚。因此,Best Practice是一旦consumer處理失敗樱溉,直接讓整個(gè)conusmer group拋Exception終止挣输,但是最后讀的這一條數(shù)據(jù)是丟失了,因?yàn)樵趜ookeeper里面的offsite已經(jīng)+1了福贞。等再次啟動(dòng)conusmer group的時(shí)候撩嚼,已經(jīng)從下一條開(kāi)始讀取處理了。

Low level api是consumer讀的partition的offsite在consumer自己的程序中維護(hù)挖帘。不會(huì)同步到zookeeper上完丽。但是為了kafka manager能夠方便的監(jiān)控,一般也會(huì)手動(dòng)的同步到zookeeper上拇舀。這樣的好處是一旦讀取某個(gè)message的consumer失敗了逻族,這條message的offsite我們自己維護(hù),我們不會(huì)+1骄崩。下次再啟動(dòng)的時(shí)候聘鳞,還會(huì)從這個(gè)offsite開(kāi)始讀。這樣可以做到exactly once對(duì)于數(shù)據(jù)的準(zhǔn)確性有保證要拂。

對(duì)于Consumer group:

1. 允許consumer group(包含多個(gè)consumer抠璃,如一個(gè)集群同時(shí)消費(fèi))對(duì)一個(gè)topic進(jìn)行消費(fèi),不同的consumer group之間獨(dú)立消費(fèi)脱惰。

2. 為了對(duì)減小一個(gè)consumer group中不同consumer之間的分布式協(xié)調(diào)開(kāi)銷(xiāo)搏嗡,指定partition為最小的并行消費(fèi)單位,即一個(gè)group內(nèi)的consumer只能消費(fèi)不同的partition枪芒。

Consumer與Partition的關(guān)系:

- 如果consumer比partition多彻况,是浪費(fèi)谁尸,因?yàn)閗afka的設(shè)計(jì)是在一個(gè)partition上是不允許并發(fā)的,所以consumer數(shù)不要大于partition數(shù)

- 如果consumer比partition少纽甘,一個(gè)consumer會(huì)對(duì)應(yīng)于多個(gè)partitions良蛮,這里主要合理分配consumer數(shù)和partition數(shù),否則會(huì)導(dǎo)致partition里面的數(shù)據(jù)被取的不均勻

- 如果consumer從多個(gè)partition讀到數(shù)據(jù)悍赢,不保證數(shù)據(jù)間的順序性决瞳,kafka只保證在一個(gè)partition上數(shù)據(jù)是有序的,但多個(gè)partition左权,根據(jù)你讀的順序會(huì)有不同

- 增減consumer皮胡,broker,partition會(huì)導(dǎo)致rebalance赏迟,所以rebalance后consumer對(duì)應(yīng)的partition會(huì)發(fā)生變化

- High-level接口中獲取不到數(shù)據(jù)的時(shí)候是會(huì)block的

負(fù)載低的情況下可以每個(gè)線程消費(fèi)多個(gè)partition屡贺。但負(fù)載高的情況下,Consumer 線程數(shù)最好和Partition數(shù)量保持一致锌杀。如果還是消費(fèi)不過(guò)來(lái)甩栈,應(yīng)該再開(kāi) Consumer 進(jìn)程,進(jìn)程內(nèi)線程數(shù)同樣和分區(qū)數(shù)一致糕再。

消費(fèi)消息時(shí)量没,kafka client需指定topic以及partition number(每個(gè)partition對(duì)應(yīng)一個(gè)邏輯日志流,如topic代表某個(gè)產(chǎn)品線突想,partition代表產(chǎn)品線的日志按天切分的結(jié)果)殴蹄,consumer client訂閱后,就可迭代讀取消息猾担,如果沒(méi)有消息袭灯,consumer client會(huì)阻塞直到有新的消息發(fā)布。consumer可以累積確認(rèn)接收到的消息垒探,當(dāng)其確認(rèn)了某個(gè)offset的消息妓蛮,意味著之前的消息也都已成功接收到,此時(shí)broker會(huì)更新zookeeper上地offset registry圾叼。

5.3高效的數(shù)據(jù)傳輸

1.發(fā)布者每次可發(fā)布多條消息(將消息加到一個(gè)消息集合中發(fā)布)蛤克,consumer每次迭代消費(fèi)一條消息。

2.不創(chuàng)建單獨(dú)的cache夷蚊,使用系統(tǒng)的page cache构挤。發(fā)布者順序發(fā)布,訂閱者通常比發(fā)布者滯后一點(diǎn)點(diǎn)惕鼓,直接使用Linux的page cache效果也比較后筋现,同時(shí)減少了cache管理及垃圾收集的開(kāi)銷(xiāo)。

3.使用sendfile優(yōu)化網(wǎng)絡(luò)傳輸,減少一次內(nèi)存拷貝矾飞。

6.Kafka 與 Zookeeper

6.1 Zookeeper協(xié)調(diào)控制

1.管理broker與consumer的動(dòng)態(tài)加入與離開(kāi)一膨。(Producer不需要管理,隨便一臺(tái)計(jì)算機(jī)都可以作為Producer向Kakfa Broker發(fā)消息)

2.觸發(fā)負(fù)載均衡洒沦,當(dāng)broker或consumer加入或離開(kāi)時(shí)會(huì)觸發(fā)負(fù)載均衡算法豹绪,使得一

個(gè)consumer group內(nèi)的多個(gè)consumer的消費(fèi)負(fù)載平衡。(因?yàn)橐粋€(gè)comsumer消費(fèi)一個(gè)或多個(gè)partition申眼,一個(gè)partition只能被一個(gè)consumer消費(fèi))

3.維護(hù)消費(fèi)關(guān)系及每個(gè)partition的消費(fèi)信息瞒津。

6.2 Zookeeper上的細(xì)節(jié):

1.每個(gè)broker啟動(dòng)后會(huì)在zookeeper上注冊(cè)一個(gè)臨時(shí)的broker registry,包含broker的ip地址和端口號(hào)括尸,所存儲(chǔ)的topics和partitions信息巷蚪。

2.每個(gè)consumer啟動(dòng)后會(huì)在zookeeper上注冊(cè)一個(gè)臨時(shí)的consumer registry:包含consumer所屬的consumer group以及訂閱的topics。

3.每個(gè)consumer group關(guān)聯(lián)一個(gè)臨時(shí)的owner registry和一個(gè)持久的offset registry濒翻。對(duì)于被訂閱的每個(gè)partition包含一個(gè)owner registry屁柏,內(nèi)容為訂閱這個(gè)partition的consumer id;同時(shí)包含一個(gè)offset registry有送,內(nèi)容為上一次訂閱的offset前联。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市娶眷,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌啸臀,老刑警劉巖届宠,帶你破解...
    沈念sama閱讀 206,126評(píng)論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異乘粒,居然都是意外死亡豌注,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,254評(píng)論 2 382
  • 文/潘曉璐 我一進(jìn)店門(mén)灯萍,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)轧铁,“玉大人,你說(shuō)我怎么就攤上這事旦棉〕莘纾” “怎么了?”我有些...
    開(kāi)封第一講書(shū)人閱讀 152,445評(píng)論 0 341
  • 文/不壞的土叔 我叫張陵绑洛,是天一觀的道長(zhǎng)救斑。 經(jīng)常有香客問(wèn)我,道長(zhǎng)真屯,這世上最難降的妖魔是什么脸候? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 55,185評(píng)論 1 278
  • 正文 為了忘掉前任,我火速辦了婚禮,結(jié)果婚禮上运沦,老公的妹妹穿的比我還像新娘泵额。我一直安慰自己,他們只是感情好携添,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,178評(píng)論 5 371
  • 文/花漫 我一把揭開(kāi)白布嫁盲。 她就那樣靜靜地躺著,像睡著了一般薪寓。 火紅的嫁衣襯著肌膚如雪亡资。 梳的紋絲不亂的頭發(fā)上,一...
    開(kāi)封第一講書(shū)人閱讀 48,970評(píng)論 1 284
  • 那天向叉,我揣著相機(jī)與錄音锥腻,去河邊找鬼。 笑死母谎,一個(gè)胖子當(dāng)著我的面吹牛瘦黑,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播奇唤,決...
    沈念sama閱讀 38,276評(píng)論 3 399
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼幸斥,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了咬扇?” 一聲冷哼從身側(cè)響起甲葬,我...
    開(kāi)封第一講書(shū)人閱讀 36,927評(píng)論 0 259
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎懈贺,沒(méi)想到半個(gè)月后经窖,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 43,400評(píng)論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡梭灿,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 35,883評(píng)論 2 323
  • 正文 我和宋清朗相戀三年画侣,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片堡妒。...
    茶點(diǎn)故事閱讀 37,997評(píng)論 1 333
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡配乱,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出皮迟,到底是詐尸還是另有隱情搬泥,我是刑警寧澤,帶...
    沈念sama閱讀 33,646評(píng)論 4 322
  • 正文 年R本政府宣布万栅,位于F島的核電站佑钾,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏烦粒。R本人自食惡果不足惜休溶,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,213評(píng)論 3 307
  • 文/蒙蒙 一代赁、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧兽掰,春花似錦芭碍、人聲如沸。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 30,204評(píng)論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)抑胎。三九已至卖鲤,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間筷频,已是汗流浹背熏挎。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 31,423評(píng)論 1 260
  • 我被黑心中介騙來(lái)泰國(guó)打工速勇, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人坎拐。 一個(gè)月前我還...
    沈念sama閱讀 45,423評(píng)論 2 352
  • 正文 我出身青樓烦磁,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親哼勇。 傳聞我的和親對(duì)象是個(gè)殘疾皇子都伪,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,722評(píng)論 2 345

推薦閱讀更多精彩內(nèi)容