前言
Kafka最初由Linkedin公司開發(fā),是一個分布式、支持分區(qū)的(partition)热幔、多副本的(replica),基于zookeeper協(xié)調(diào)的分布式消息系統(tǒng)讼庇,它的最大的特性就是可以實時的處理大量數(shù)據(jù)以滿足各種需求場景:比如基于hadoop的批處理系統(tǒng)绎巨、低延遲的實時系統(tǒng)、storm/Spark流式處理引擎蠕啄,web/nginx日志场勤、訪問日志,消息服務(wù)等等歼跟,用scala語言編寫和媳,Linkedin于2010年貢獻給了Apache基金會并成為頂級開源項目。
1.簡介
1.1? 特性
????1. 高吞吐量哈街、低延遲:kafka每秒可以處理幾十萬條消息留瞳,它的延遲最低只有幾毫秒。
????2. 可擴展性:kafka集群支持熱擴展骚秦。
????3. 持久性她倘、可靠性:消息被持久化到本地磁盤,并且支持?jǐn)?shù)據(jù)備份防止數(shù)據(jù)丟失作箍。
????4. 容錯性:允許集群中節(jié)點失斢擦骸(若副本數(shù)量為n,則允許n-1個節(jié)點失敗)
????5. 高并發(fā):支持?jǐn)?shù)千個客戶端同時讀寫
? ? 6.?不可靠性:為了高性能胞得,降低了部分可靠性荧止,消息存在丟失和重復(fù)的情況。
1.2?使用場景
基于Kafka的特性懒震,一般應(yīng)用在日志收集罩息,消息系統(tǒng),流式處理等對吞吐量要求較高个扰,但對可靠性要求較低的場景下瓷炮。
與Kafka相比,RabbitMQ/RocketMQ更側(cè)重于消息的可靠性递宅,一般用于金融或電商訂單業(yè)務(wù)娘香。
2.架構(gòu)圖
3.消息存儲
3.1?消息格式
每個partition分區(qū)在broker上保存為一個文件目錄苍狰,命名為<topic_name>_<partition_id>。
每個partition目錄下包含多個相同大小的segment文件烘绽,并以文件內(nèi)首個消息的offset命名淋昭,擴展名為.log。
segment文件內(nèi)消息存儲格式為:<offset> <message_size> <message>安接,每個partition分區(qū)的offset都是獨立并遞增的翔忽。
每個segment文件維護一個索引,擴展名.index盏檐,支持針對offset的二分查找歇式。
3.2?消息刪除
無論消息是否被消費,Kafka 都會保存所有的消息胡野。那對于舊數(shù)據(jù)有什么刪除策略呢材失?
基于時間,默認(rèn)配置是 168 小時(7 天)硫豆。
基于大小龙巨,默認(rèn)配置是 1073741824。
需要注意的是熊响,Kafka 讀取特定消息的時間復(fù)雜度是 O(1)旨别,所以這里刪除過期的文件并不會提高 Kafka 的性能!
4.生產(chǎn)者設(shè)計
4.1?producer寫
寫入時需要指定topic耘眨,key和partition可選昼榛,如果partition沒有指定,則根據(jù)key做hash取模得到partition剔难,如果key也沒有設(shè)置胆屿,則用輪詢。
每個topic-partition有一個發(fā)送隊列偶宫,業(yè)務(wù)將消息寫入隊列非迹,后臺線程根據(jù)batch_size和linger.ms最大等待時間執(zhí)行批量發(fā)送,這樣會造成消息的延遲纯趋,但是卻減少網(wǎng)絡(luò)IO憎兽,提高了吞吐量。
4.2 集群寫
producer從broker集群中獲取當(dāng)前分區(qū)對應(yīng)的leader吵冒,并將消息發(fā)送給leader纯命。
leader負(fù)責(zé)將消息寫入log,并等待其他副本更新痹栖。
其他從leader同步消息亿汞,并寫入log,返回給leader?ack揪阿。
leader收到所有副本返回的ack疗我,判定消息寫入成功咆畏,返回給producer成功。
為提高吞吐量吴裤,默認(rèn)配置為當(dāng)leader寫入成功旧找,就返回成功,此時如果leader服務(wù)掛掉麦牺,會造成數(shù)據(jù)丟失钮蛛。
5.消費者設(shè)計
5.1 消費流程
由于partition分區(qū)只對應(yīng)一個consumer,所以推薦consumer的個數(shù)和分區(qū)個數(shù)一樣剖膳,這樣能形成點對點愿卒,處理效率是最高的,如果consumer個數(shù)小于分區(qū)個數(shù)潮秘,則每個consumer會被分配多個partition,如果大于分區(qū)個數(shù)易结,則超出的consumer節(jié)點不會分配到partition枕荞,消費不到數(shù)據(jù)。
consumer從broker集群獲取當(dāng)前分區(qū)的leader搞动,并從leader批量pull消息躏精,并提交消息的offset,為提高吞吐量鹦肿,默認(rèn)為5秒自動提交一次offset矗烛,但這樣會造成消息的丟失和重復(fù)。
5.2?已消費offset
Kafka集群在每個partition上為每個消費組維護一個已消費offset箩溃,每次consumer消費完成并提交后瞭吃,集群都會更新這個offset。
在歷史版本中涣旨,這個offset信息是維護在zookeeper中歪架,新版本維護在 __consumer_offsets 這個 Topic 中。
5.3 rebalanced機制
Rebalance 本質(zhì)上是一種協(xié)議霹陡,規(guī)定了一個 Consumer Group 下的所有 consumer 如何達(dá)成一致和蚪,來分配訂閱 Topic 的每個分區(qū)。
Rebalance 的觸發(fā)條件主要有2個:
????1.組成員個數(shù)發(fā)生變化烹棉,增加組員或者減少組員攒霹。
????2.訂閱的partition分區(qū)數(shù)發(fā)生變化。
5.4?心跳監(jiān)測
集群通過消費端的兩個線程來監(jiān)測狀態(tài)浆洗,一個是心跳線程催束,一個是用戶poll線程。
心跳線程根據(jù)heartbeat.interval.ms參數(shù)(默認(rèn)3s)辅髓,定時向集群發(fā)送心跳包泣崩,心跳線程用于快速監(jiān)測消費端的故障少梁,盡早rebalance。
用戶poll線程從集群循環(huán)拉取消息矫付,如果兩次poll的時間間隔超過了max.poll.interval.ms(默認(rèn)300s)凯沪,則認(rèn)定消費端故障,執(zhí)行rebalance买优。
Kafka 0.10版本之前心跳包是放在poll線程去發(fā)的妨马,這樣導(dǎo)致為了滿足業(yè)務(wù)處理時間,heartbeat.interval.ms時間要設(shè)置的很大才行杀赢,如果消費端出了故障烘跺,心跳監(jiān)測不能馬上檢查到。
6.?高可用設(shè)計
Kafka在0.8以前的版本中脂崔,并不提供High Availablity機制滤淳,一旦一個或多個Broker宕機,則宕機期間其上所有Partition都無法繼續(xù)提供服務(wù)砌左。
6.1?副本機制
同一個Partition會有多個Replication脖咐,并選出一個Leader,Producer和Consumer只與這個Leader交互汇歹,其它Replica作為Follower從Leader中復(fù)制數(shù)據(jù)屁擅。
6.2 zookeeper管理
引入zookeeper來管理broker的動態(tài)加入和離開,實現(xiàn)故障發(fā)現(xiàn)和leader選舉产弹。
zookeeper同時也會管理consumer的動態(tài)加入與離開派歌,Producer不需要管理,隨便一臺計算機都可以作為Producer向Kakfa Broker發(fā)消息痰哨。
7.?高吞吐設(shè)計
Kafka基于頁緩存計算+磁盤順序?qū)懡汗瑢崿F(xiàn)了寫入數(shù)據(jù)的超高性能。
基于零拷貝技術(shù)作谭,提高了讀取數(shù)據(jù)的性能稽物。
7.1? 頁緩存技術(shù)
文件讀寫并不是直接訪問磁盤,而是利用到了操作系統(tǒng)的page-cache(頁緩存)折欠,所以寫磁盤文件其實就是在寫內(nèi)存贝或。
7.2 磁盤順序?qū)?/h2>
普通的機械磁盤隨機寫的性能極差,也就是隨便找到文件的某個位置來寫數(shù)據(jù)锐秦。
如果是追加文件末尾按照順序來寫數(shù)據(jù)的話咪奖,和寫內(nèi)存的性能是差不多的。
7.3?零拷貝技術(shù)
正常的數(shù)據(jù)發(fā)送流程:將數(shù)據(jù)從page-cache拷貝到應(yīng)用程序的進程緩存中酱床,然后調(diào)用write方法羊赵,將數(shù)據(jù)再拷貝到內(nèi)核socket發(fā)送緩沖區(qū)中,再經(jīng)過網(wǎng)卡發(fā)送出去。
零拷貝發(fā)送流程是:僅僅拷貝socket的描述符昧捷,然后數(shù)據(jù)就直接從page-cache中發(fā)送到網(wǎng)卡闲昭,節(jié)省了兩次數(shù)據(jù)的拷貝。
零拷貝的好處有:
? ? 1.避免操作系統(tǒng)內(nèi)核緩沖區(qū)之間進行數(shù)據(jù)拷貝操作靡挥。
? ? 2.?避免操作系統(tǒng)內(nèi)核和用戶應(yīng)用程序地址空間這兩者之間進行數(shù)據(jù)拷貝操作序矩。
? ? 3.?減少內(nèi)核和用戶進程的上下文切換。
? ? 4.?數(shù)據(jù)傳輸盡量讓 DMA 來做跋破,解放了cpu簸淀。
8.?不可靠特性
Kafka是為了高吞吐量設(shè)計的,在滿足性能的前提下毒返,不可避免的會帶來一些不可靠問題租幕。
8.1?消息丟失
生產(chǎn)者丟失
生產(chǎn)者采用定時批量發(fā)送數(shù)據(jù),如果期間生產(chǎn)者進程掛掉拧簸,消息來不及發(fā)送出去劲绪,則消息丟失,解決辦法是減少消息發(fā)送的最大等待時間盆赤,比如可以配置為5ms珠叔,從而減少消息丟失的數(shù)量和幾率。
集群丟失
Kafka默認(rèn)是同步寫入弟劲,只要leader寫入成功就返回成功,此時如果leader掛掉姥芥,其他副本還沒來得及同步消息兔乞,則消息丟失,解決辦法是配置為等待所有副本寫入成功后凉唐,才返回成功,此時會降低寫入的性能,影響吞吐量畅铭。
消費者丟失
消費者設(shè)置為自動提交時譬重,如果消息被提交后,還沒來得及處理簿训,進程掛掉咱娶,此時消息丟失,解決辦法是改為手動提交强品,犧牲性能膘侮。
8.2 重復(fù)消費
重復(fù)消費問題無法完全避免,如果業(yè)務(wù)系統(tǒng)不能容忍消息重復(fù)的榛,需要自己實現(xiàn)冪等性琼了。
生產(chǎn)者重復(fù)
生產(chǎn)者發(fā)送完消息,因為網(wǎng)絡(luò)問題沒有收到response夫晌,此時會重發(fā)消息雕薪,造成消息重復(fù)昧诱。
消費者重復(fù)
消費者設(shè)置為自動提交時,如果業(yè)務(wù)層消息處理時間太久所袁,超過了max.poll.interval.ms(默認(rèn)300s)盏档,則判定消費端故障產(chǎn)生rebalance,再次poll時仍獲取到之前的消息纲熏,導(dǎo)致重復(fù)妆丘。解決辦法是減少max.poll.records(poll的消息個數(shù)),盡量保證消息處理的夠快局劲。
在自動提交模式下勺拣,只要集群產(chǎn)生rebalance,已處理過但來不及提交的消息都會被再消費一次鱼填,導(dǎo)致重復(fù)药有。
8.3 同分區(qū)消息亂序
生產(chǎn)者發(fā)送消息時,如果前一個消息未響應(yīng)苹丸,可以繼續(xù)發(fā)送消息愤惰,如果前一個消息最終超時導(dǎo)致重發(fā),則會出現(xiàn)消息亂序赘理。
配置max.in.flight.requests.per.connection:限制客戶端在單個連接上能夠發(fā)送的未響應(yīng)請求的個數(shù)宦言。設(shè)置此值是1表示kafka broker在響應(yīng)請求之前client不能再向同一個broker發(fā)送請求,但吞吐量會下降