kafka由producer consumer broker topic partitions(分區(qū))組成
kafka cluster(集群)就是由多個(gè)broker topic partitions(分區(qū))組成
開局一張圖
?
上面表示一個(gè)kafka集群交互的流程圖捺弦,kafka cluster中有3個(gè)brock(表示有3臺kafka服務(wù)的集群)股毫,有一個(gè)topic名稱為topic0的主題凸郑,topic0主題有3個(gè)partitions(分區(qū))拨脉,每個(gè)partition有3個(gè)副本(包含本身,partition本身也是副本焦影,是leader副本车遂。一個(gè)leader分區(qū),兩個(gè)follwer分區(qū))
Producers往Brokers里面的指定topic0中寫消息(發(fā)消息時(shí)指定主題的名稱斯辰,默認(rèn)會(huì)創(chuàng)建主題)舶担,Consumers從Brokers里面拉去指定topic0的消息。
topic只是大的概論彬呻,真正存放消息的是partitions(分區(qū))
從圖可以看出 topic0創(chuàng)建了3個(gè)分區(qū) ([1 p2 p0)衣陶,每個(gè)brock分配一個(gè)分區(qū),上面的圖包含了兩個(gè)副本闸氮,如果只創(chuàng)建分區(qū)本身是圖如下
?
分區(qū)上的存儲的?就是每一條的消息(相當(dāng)于rocketmq的隊(duì)列)剪况,消息是有順序的,從0開始(offset)蒲跨,消費(fèi)者pull消息的時(shí)候译断,就是從分區(qū)上獲取里面存放的消息,topic的每個(gè)分區(qū)存放的消息都不一樣
因?yàn)閠opic的每個(gè)分區(qū)存放的消息都不一樣或悲。所有當(dāng)brock0掛掉后孙咪,p1上未被消費(fèi)的消費(fèi)就消費(fèi)不到了堪唐。只有當(dāng)重啟之后才能消息。這樣就沒有起到消息集群容錯(cuò)的效果翎蹈。所以有了分區(qū)副本的概念
?
上圖表示topic0主題有3個(gè)分區(qū)(p1 p2 p0)淮菠,每個(gè)分區(qū)有兩個(gè)副本(replica)(也可以說topic0有3個(gè)不同的分區(qū),每個(gè)分區(qū)有3個(gè)副本(包括自己本身荤堪,本身是leader分區(qū)副本合陵,其他兩個(gè)是follwer分區(qū)副本))
分區(qū)分為leader分區(qū)和follwer分區(qū),我們可以在zookeeper上查看topic0的分區(qū)的leader分布在哪個(gè)brock上
在zk的客戶端執(zhí)行命令get /brokers/topics/topic0 (這個(gè)命令可以查到topic0主題下partitions分布在哪個(gè)brock)
?
可以看到"partitions":{"2":[1,2,0],"1":[0,1,2],"0":[2,0,1]}}澄阳,表示p2分區(qū)分布在brock.id為1和2和0的服務(wù)器上(因?yàn)橛?個(gè)分布曙寡,每個(gè)brock上都會(huì)有,所有1 2 0 brock都有p2分區(qū))
每個(gè)brock上都有p2分區(qū)寇荧,但是leader分區(qū)只有一個(gè),其他的都是follwer分區(qū)执隧。
在zk的客戶端執(zhí)行命令 get /brokers/topics/topic0/partitions/0/state (可以查詢主題下的partitions的leader partition(分區(qū))在哪個(gè)brock上)
?
"leader":2 可以看出 p0分區(qū)在brock.id=2的服務(wù)器上揩抡。
其他的分區(qū)leader分布按上面的命令查詢到(沒有截圖出來)。p1分區(qū)在brock.id=0的服務(wù)器上镀琉、p2分區(qū)在brock.id=1的服務(wù)器上
leader分區(qū)在一圖中都標(biāo)紅色了峦嗤,其他都是follwer。
分區(qū)副本的作用是當(dāng)leader分區(qū)的brock掛了屋摔,會(huì)在fowller分區(qū)上重新選出一個(gè)分區(qū)作為leader分區(qū)烁设,能實(shí)現(xiàn)集群容錯(cuò)效果
leader副本:處理所有的讀寫請求(只有一個(gè)leader,其他的都是follwer)
follwer副本:不接收任何請求處理钓试,只從leader副本同步消息日志
副本分配算法
副本是如何分配到brock上的装黑?
將所有N個(gè)Brock和待分配的i個(gè)partition排序,將第i個(gè)partition分配到第(i%n)個(gè)Brock上弓熏,將第i個(gè)partition的第j個(gè)副本分配到第((i+j)%n)個(gè)brock上
kafka高性能原因
1.消息順序?qū)懭氲酱疟P
將寫磁盤的過程變?yōu)轫樞驅(qū)懥堤罚蓸O大提高對磁盤的利用率。Consumer通過offset順序消費(fèi)這些數(shù)據(jù)
2.零拷貝(直接在內(nèi)核空間將數(shù)據(jù)拷貝到網(wǎng)卡緩存挽鞠。減少了用戶空間的操作過程)
(類似于NIO的直接緩沖區(qū)疚颊,減少jvm內(nèi)存的操作過程)
消息從發(fā)送到落地保存,broker 維護(hù)的消息日志本身就是文件目錄信认,每個(gè)文件都是二進(jìn)制保存材义,生產(chǎn)者和消費(fèi)者使 用相同的格式來處理。在消費(fèi)者獲取消息時(shí)嫁赏,服務(wù)器先從 硬盤讀取數(shù)據(jù)到內(nèi)存其掂,然后把內(nèi)存中的數(shù)據(jù)原封不動(dòng)的通 過 socket 發(fā)送給消費(fèi)者。雖然這個(gè)操作描述起來很簡單潦蝇, 但實(shí)際上經(jīng)歷了很多步驟清寇。
?
? 操作系統(tǒng)將數(shù)據(jù)從磁盤讀入到內(nèi)核空間的頁緩存(Linux內(nèi)核一種重要的磁盤高速緩存)
? 應(yīng)用程序?qū)?shù)據(jù)從內(nèi)核空間讀入到用戶空間緩存中
? 應(yīng)用程序?qū)?shù)據(jù)寫回到內(nèi)核空間到 socket 緩存中
? 操作系統(tǒng)將數(shù)據(jù)從 socket 緩沖區(qū)復(fù)制到網(wǎng)卡緩沖區(qū)喘漏,以便將數(shù)據(jù)經(jīng)網(wǎng)絡(luò)發(fā)出
這個(gè)過程涉及到 4 次上下文切換以及 4 次數(shù)據(jù)復(fù)制,并且有兩次復(fù)制操作是由 CPU 完成华烟。但是這個(gè)過程中翩迈,第二、三步操作數(shù)據(jù)完全沒有進(jìn)行變化盔夜,僅僅是從磁盤復(fù)制到網(wǎng)卡緩沖區(qū)负饲。
通過“零拷貝”技術(shù),可以去掉這些沒必要的數(shù)據(jù)復(fù)制操作喂链, 同時(shí)也會(huì)減少上下文切換次數(shù)》凳現(xiàn)代的 unix 操作系統(tǒng)提供 一個(gè)優(yōu)化的代碼路徑,用于將數(shù)據(jù)從頁緩存?zhèn)鬏數(shù)?socket椭微; 在 Linux 中洞坑,是通過 sendfile 系統(tǒng)調(diào)用來完成的。Java 提 供了訪問這個(gè)系統(tǒng)調(diào)用的方法:FileChannel.transferTo API
?
使用 sendfile蝇率,只需要一次拷貝就行迟杂,允許操作系統(tǒng)將數(shù)據(jù) 直接從頁緩存發(fā)送到網(wǎng)絡(luò)上。所以在這個(gè)優(yōu)化的路徑中本慕, 只有最后一步將數(shù)據(jù)拷貝到網(wǎng)卡緩存中是需要的排拷。
java api kafka配置信息分析
producer 配置可選參數(shù)
acks
配置表示 producer 發(fā)送消息到 broker 上以后的確認(rèn)值。有三個(gè)可選項(xiàng)
1. acks=0表示 producer 不需要等待 broker 的消息確認(rèn)锅尘,發(fā)出消息那么就認(rèn)為消息已成功寫入Kafka监氢,時(shí)效率高,但同時(shí)風(fēng)險(xiǎn)最大藤违,server 宕機(jī)時(shí)浪腐,數(shù)據(jù)將會(huì)丟失
2. acks=1 表示 producer 只需要獲得 kafka 集群中的 leader 節(jié)點(diǎn)確認(rèn)即可,這個(gè)選擇時(shí)延較小同時(shí)確保了 leader 節(jié)點(diǎn)確認(rèn)接收成功
3. acks=all leader 節(jié)點(diǎn)在返回確認(rèn)或錯(cuò)誤響應(yīng)之前顿乒,會(huì)等待所有同步副本都收到消息牛欢。如果和min.insync.replicas參數(shù)結(jié)合起來,就可以決定在返回確認(rèn)前至少有多個(gè)副本能夠收到消息淆游。比如min.insync.replicas=1就需要至少一個(gè)follwer確認(rèn)收到消息傍睹。相對安全,但是效率較低犹菱。但是由于 ISR 可能會(huì)縮小到僅包含一個(gè) Replica拾稳,所以設(shè)置參數(shù)為all并不能一定避免數(shù)據(jù)丟失
batch.size
生產(chǎn)者發(fā)送多個(gè)消息到 broker 上的同一個(gè)分區(qū)時(shí),為了減少網(wǎng)絡(luò)請求帶來的 性能開銷腊脱,通過批量的方式來提交消息访得,可以通過這個(gè)參數(shù)來控制批量提交的 字節(jié)數(shù)大小,默認(rèn)大小是 16384byte,也就是 16kb,意味著當(dāng)一批消息大小達(dá) 到指定的 batch.size 的時(shí)候會(huì)統(tǒng)一發(fā)送
linger.ms
Producer 默認(rèn)會(huì)把兩次發(fā)送時(shí)間間隔內(nèi)收集到的所有 Requests 進(jìn)行一次聚合 然后再發(fā)送悍抑,以此提高吞吐量鳄炉,而 linger.ms 就是為每次發(fā)送到 broker 的請求 增加一些 delay,以此來聚合更多的 Message 請求搜骡。 這個(gè)有點(diǎn)想 TCP 里面的 Nagle 算法拂盯,在 TCP 協(xié)議的傳輸中,為了減少大量小數(shù)據(jù)包的發(fā)送记靡,采用了 Nagle 算法谈竿,也就是基于小包的等-停協(xié)議。
batch.size 和 linger.ms 這兩個(gè)參數(shù)是 kafka 性能優(yōu)化的關(guān)鍵參數(shù)當(dāng)二者都配置的時(shí)候摸吠,只要滿足其中一個(gè)要 求空凸,就會(huì)發(fā)送請求到 broker 上
max.request.size
設(shè)置請求的數(shù)據(jù)的最大字節(jié)數(shù),為了防止發(fā)生較大的數(shù)據(jù)包影響到吞吐量寸痢,默認(rèn)值為 1MB
consumer配置可選參數(shù)
group.id
當(dāng)producer發(fā)送一條消息呀洲,相同group.id的多個(gè)consumer只有其中一個(gè)consumer能消費(fèi)到
(比如 topic=hehe的主題發(fā)送了一條消息,group.id=666的組啼止,有三個(gè)消費(fèi)者監(jiān)聽了這個(gè)主題道逗,但這條消息只會(huì)被其中一個(gè)consumer消費(fèi)到),group.id=999的一個(gè)consumer也能消費(fèi)這條消息族壳。
enable.auto.commit
消費(fèi)者消費(fèi)消息以后自動(dòng)提交,只有當(dāng)消息提交以后趣些,該消息才不會(huì)被再次接 收到仿荆,還可以配合 auto.commit.interval.ms 控制自動(dòng)提交的頻率。 當(dāng)然坏平,我們也可以通過 consumer.commitSync()的方式實(shí)現(xiàn)手動(dòng)提交
auto.offset.reset
這個(gè)參數(shù)是針對新的 groupid 中的消費(fèi)者而言的拢操,當(dāng)有新 groupid 的消費(fèi)者來 消費(fèi)指定的 topic 時(shí),對于該參數(shù)的配置舶替,會(huì)有不同的語義
auto.offset.reset=latest 情況下令境,新的消費(fèi)者將會(huì)從其他消費(fèi)者最后消費(fèi)的 offset 處開始消費(fèi) Topic 下的消息
auto.offset.reset= earliest 情況下,新的消費(fèi)者會(huì)從該 topic 最早的消息開始 消費(fèi)
auto.offset.reset=none 情況下顾瞪,新的消費(fèi)者加入以后舔庶,由于之前不存在 offset,則會(huì)直接拋出異常陈醒。
max.poll.records
此設(shè)置限制每次調(diào)用 poll 返回的消息數(shù)惕橙,這樣可以更容易的預(yù)測每次 poll 間隔 要處理的最大值。通過調(diào)整此值钉跷,可以減少 poll 間隔