Kafka問題總結(jié)及性能優(yōu)化最佳實(shí)踐

一惯吕、書寫背景:

? 最近陸續(xù)碰到不少朋友在導(dǎo)論在使用kafka時(shí)遇到了不少問題腺劣,特別在高流量的場景下倒庵,更是問題百出褒墨,防不勝防。剛好在之前的項(xiàng)目中也遇到了類似場景的問題擎宝,通過各種摸爬滾打郁妈,算是最近解決了遇到的問題。因此趁此機(jī)會(huì)把實(shí)踐與大家共享绍申,期望對(duì)各位遇到問題的朋友有一定的幫助噩咪。

二、最佳實(shí)踐:

1.? 安裝及時(shí)使用:

? ?kafka的安裝及基本使用可參考:https://www.cnblogs.com/dadonggg/p/8205302.html

2.?百億流量規(guī)劃:


流量分解及環(huán)境分析過程


3. JVM參數(shù)設(shè)置

kafka是scala語言開發(fā)极阅,運(yùn)行在JVM上胃碾,需要對(duì)JVM參數(shù)合理設(shè)置,修改bin/kafka-start-server.sh中的jvm設(shè)置筋搏,假設(shè)機(jī)器是32G內(nèi)存仆百,可以如下設(shè)置:

export KAFKA_HEAP_OPTS="‐Xmx16G‐Xms16G‐Xmn10G‐XX:MetaspaceSize=256M‐XX:+UseG1GC‐XX:MaxGCPauseMillis=50‐XX:G1HeapRegionSize=16M"

這種大內(nèi)存的情況一般都要用G1垃圾收集器,因?yàn)槟贻p代內(nèi)存比較大奔脐,用G1可以設(shè)置GC最大停頓時(shí)間(針對(duì)每個(gè)參數(shù)的具體含義如有不清楚的可百度俄周,在此不對(duì)其參數(shù)的含義做過多的講解)吁讨,不至于一次minorgc就花費(fèi)太長時(shí)間,當(dāng)然峦朗,因?yàn)橄駅afka建丧,rocketmq,es這些中間件波势,寫數(shù)據(jù)到磁盤會(huì)用到操作系統(tǒng)的page cache(對(duì)于系統(tǒng)的所有文件I/O請求翎朱,操作系統(tǒng)都是通過page cache機(jī)制實(shí)現(xiàn)的,對(duì)于操作系統(tǒng)而言尺铣,磁盤文件都是由一系列的數(shù)據(jù)塊順序組成闭翩,數(shù)據(jù)塊的大小隨系統(tǒng)不同而不同,x86 linux系統(tǒng)下是4KB(一個(gè)標(biāo)準(zhǔn)頁面大小)迄埃。內(nèi)核在處理文件I/O請求時(shí),首先到page cache中查找(page cache中的每一個(gè)數(shù)據(jù)塊都設(shè)置了文件以及偏移信息)兑障,如果未命中侄非,則啟動(dòng)磁盤I/O,將磁盤文件中的數(shù)據(jù)塊加載到page cache中的一個(gè)空閑塊流译。之后再copy到用戶緩沖區(qū)中)逞怨,所以JVM內(nèi)存不宜分配過大,需要給操作系統(tǒng)的緩存留出幾個(gè)G福澡。

4. 線上問題及優(yōu)化:

1). 消息丟失情況:

消息發(fā)送端:

(1)acks=0: 表示producer不需要等待任何broker確認(rèn)收到消息的回復(fù)叠赦,就可以繼續(xù)發(fā)送下一條消息。性能最高革砸,但是最容易丟消息除秀。大數(shù)據(jù)統(tǒng)計(jì)報(bào)表場景,對(duì)性能要求很高算利,對(duì)數(shù)據(jù)丟失不敏感的情況可以用這種册踩。

(2)acks=1: 至少要等待leader已經(jīng)成功將數(shù)據(jù)寫入本地log,但是不需要等待所有follower是否成功寫入效拭。就可以繼續(xù)發(fā)送下一條消息暂吉。這種情況下,如果follower沒有成功備份數(shù)據(jù)缎患,而此時(shí)leader又掛掉慕的,則消息會(huì)丟失。

(3)acks=-1或all: 這意味著leader需要等待所有備份(min.insync.replicas配置的備份個(gè)數(shù))都成功寫入日志挤渔,這種策略會(huì)保證只要有一個(gè)備份存活就不會(huì)丟失數(shù)據(jù)肮街。這是最強(qiáng)的數(shù)據(jù)保證。一般除非是金融級(jí)別蚂蕴,或跟錢打交道的場景才會(huì)使用這種配置低散。當(dāng)然如果min.insync.replicas配置的是1則也可能丟消息俯邓,跟acks=1情況類似。

消息消費(fèi)端:

如果消費(fèi)這邊配置的是自動(dòng)提交熔号,萬一消費(fèi)到數(shù)據(jù)還沒處理完稽鞭,就自動(dòng)提交offset了,但是此時(shí)你consumer直接宕機(jī)了引镊,未處理完的數(shù)據(jù)丟失了朦蕴,下次也消費(fèi)不到了。

2). 消息重復(fù)消費(fèi)

消息發(fā)送端:

發(fā)送消息如果配置了重試機(jī)制弟头,比如網(wǎng)絡(luò)抖動(dòng)時(shí)間過長導(dǎo)致發(fā)送端發(fā)送超時(shí)吩抓,實(shí)際broker可能已經(jīng)接收到消息,但發(fā)送方會(huì)重新發(fā)送消息

消息消費(fèi)端:

如果消費(fèi)這邊配置的是自動(dòng)提交赴恨,剛拉取了一批數(shù)據(jù)處理了一部分疹娶,但還沒來得及提交,服務(wù)掛了伦连,下次重啟又會(huì)拉取相同的一批數(shù)據(jù)重復(fù)處理

一般消費(fèi)端都是要做消費(fèi)冪等處理的雨饺。

3). 消息亂序

如果發(fā)送端配置了重試機(jī)制,kafka不會(huì)等之前那條消息完全發(fā)送成功才去發(fā)送下一條消息惑淳,這樣可能會(huì)出現(xiàn)额港,發(fā)送了1,2歧焦,3條消息移斩,第一條超時(shí)了,后面兩條發(fā)送成功绢馍,再重試發(fā)送第1條消息向瓷,這時(shí)消息在broker端的順序就是2,3舰涌,1了

所以风罩,是否一定要配置重試要根據(jù)業(yè)務(wù)情況而定。也可以用同步發(fā)送的模式去發(fā)消息舵稠,當(dāng)然acks不能設(shè)置為0超升,這樣也能保證消息發(fā)送的有序。

kafka保證全鏈路消息順序消費(fèi)哺徊,需要從發(fā)送端開始室琢,將所有有序消息發(fā)送到同一個(gè)分區(qū),然后用一個(gè)消費(fèi)者去消費(fèi)落追,但是這種性能比較低盈滴,可以在消費(fèi)者端接收到消息后將需要保證順序消費(fèi)的幾條消費(fèi)發(fā)到內(nèi)存隊(duì)列(可以搞多個(gè)),一個(gè)內(nèi)存隊(duì)列開啟一個(gè)線程順序處理消息。

4). 消息積壓

(1)線上有時(shí)因?yàn)榘l(fā)送方發(fā)送消息速度過快巢钓,或者消費(fèi)方處理消息過慢病苗,可能會(huì)導(dǎo)致broker積壓大量未消費(fèi)消息。

此種情況如果積壓了上百萬未消費(fèi)消息需要緊急處理症汹,可以修改消費(fèi)端程序硫朦,讓其將收到的消息快速轉(zhuǎn)發(fā)到其他topic(可以設(shè)置很多分區(qū)),然后再啟動(dòng)多個(gè)消費(fèi)者同時(shí)消費(fèi)新主題的不同分區(qū)背镇。

(2)由于消息數(shù)據(jù)格式變動(dòng)或消費(fèi)者程序有bug咬展,導(dǎo)致消費(fèi)者一直消費(fèi)不成功,也可能導(dǎo)致broker積壓大量未消費(fèi)消息瞒斩。

此種情況可以將這些消費(fèi)不成功的消息轉(zhuǎn)發(fā)到其它隊(duì)列里去(類似死信隊(duì)列)破婆,后面再慢慢分析死信隊(duì)列里的消息處理問題。

5). 延時(shí)隊(duì)列

延時(shí)隊(duì)列存儲(chǔ)的對(duì)象是延時(shí)消息胸囱。所謂的“延時(shí)消息”是指消息被發(fā)送以后祷舀,并不想讓消費(fèi)者立刻獲取,而是等待特定的時(shí)間后烹笔,消費(fèi)者才能獲取這個(gè)消息進(jìn)行消費(fèi)蔑鹦,延時(shí)隊(duì)列的使用場景有很多, 比如 :

1)在訂單系統(tǒng)中箕宙, 一個(gè)用戶下單之后通常有 30 分鐘的時(shí)間進(jìn)行支付,如果 30 分鐘之內(nèi)沒有支付成功铺纽,那么這個(gè)訂單將進(jìn)行異常處理柬帕,這時(shí)就可以使用延時(shí)隊(duì)列來處理這些訂單了。

2)訂單完成1小時(shí)后通知用戶進(jìn)行評(píng)價(jià)狡门。

實(shí)現(xiàn)思路:發(fā)送延時(shí)消息時(shí)先把消息按照不同的延遲時(shí)間段發(fā)送到指定的隊(duì)列中(topic_1s陷寝,topic_5s,topic_10s其馏,...topic_n凤跑,這個(gè)一般不能支持任意時(shí)間段的延時(shí)),然后通過定時(shí)器進(jìn)行輪詢消費(fèi)這些topic叛复,查看消息是否到期仔引,如果到期就把這個(gè)消息發(fā)送到具體業(yè)務(wù)處理的topic中,隊(duì)列中消息越靠前的到期時(shí)間越早褐奥,具體來說就是定時(shí)器在一次消費(fèi)過程中咖耘,對(duì)消息的發(fā)送時(shí)間做判斷,看下是否延遲到對(duì)應(yīng)時(shí)間了撬码,如果到了就轉(zhuǎn)發(fā)儿倒,如果還沒到這一次定時(shí)任務(wù)就可以提前結(jié)束了。

6). 消息回溯

如果某段時(shí)間對(duì)已消費(fèi)消息計(jì)算的結(jié)果覺得有問題呜笑,可能是由于程序bug導(dǎo)致的計(jì)算錯(cuò)誤夫否,當(dāng)程序bug修復(fù)后彻犁,這時(shí)可能需要對(duì)之前已消費(fèi)的消息重新消費(fèi),可以指定從多久之前的消息回溯消費(fèi)凰慈,這種可以用consumer的offsetsForTimes汞幢、seek等方法指定從某個(gè)offset偏移的消息開始消費(fèi)。

7). 分區(qū)數(shù)越多吞吐量越高嗎

可以用kafka壓測工具自己測試分區(qū)數(shù)不同溉瓶,各種情況下的吞吐量

# 往test里發(fā)送一百萬條消息(--num-records 1000000)急鳄,每條設(shè)置1KB(record-size 1024字節(jié))

# throughput 用來進(jìn)行限流控制,當(dāng)設(shè)定的值小于 0 時(shí)不限流堰酿,當(dāng)設(shè)定的值大于 0 時(shí)疾宏,當(dāng)發(fā)送的吞吐量大于該值時(shí)就會(huì)被阻塞一段時(shí)間

bin/kafka-producer-perf-test.sh --topic test --num-records 1000000 --record-size 1024 --throughput -1 --producer-props bootstrap.servers=192.168.2.4:9092 acks=1

壓測效果圖

網(wǎng)絡(luò)上很多資料都說分區(qū)數(shù)越多吞吐量越高 , 但從壓測結(jié)果來看触创,分區(qū)數(shù)到達(dá)某個(gè)值吞吐量反而開始下降坎藐,實(shí)際上很多事情都會(huì)有一個(gè)臨界值,當(dāng)超過這個(gè)臨界值之后哼绑,很多原本符合既定邏輯的走向又會(huì)變得不同岩馍。一般情況分區(qū)數(shù)跟集群機(jī)器數(shù)量相當(dāng)就差不多了。

當(dāng)然吞吐量的數(shù)值和走勢還會(huì)和磁盤抖韩、文件系統(tǒng)蛀恩、 I/O調(diào)度策略等因素相關(guān)。

注意:如果分區(qū)數(shù)設(shè)置過大茂浮,比如設(shè)置10000双谆,可能會(huì)設(shè)置不成功,后臺(tái)會(huì)報(bào)錯(cuò)"java.io.IOException : Too manyopenfiles"席揽。

異常中最關(guān)鍵的信息是“ Too many open flies”顽馋,這是一種常見的 Linux 系統(tǒng)錯(cuò)誤,通常意味著文件描述符不足幌羞,它一般發(fā)生在創(chuàng)建線程寸谜、創(chuàng)建 Socket、打開文件這些場景下 属桦。 在 Linux系統(tǒng)的默認(rèn)設(shè)置下熊痴,這個(gè)文件描述符的個(gè)數(shù)不是很多 ,通過 ulimit -n 命令可以查看:一般默認(rèn)是1024聂宾,可以將該值增大愁拭,比如:ulimit-n 65535

8). 消息傳遞保障

at most once(消費(fèi)者最多收到一次消息,0--1次):acks = 0 可以實(shí)現(xiàn)亏吝。

at least once(消費(fèi)者至少收到一次消息岭埠,1--多次):ack = all 可以實(shí)現(xiàn)。

exactly once(消費(fèi)者剛好收到一次消息):at least once 加上消費(fèi)者冪等性可以實(shí)現(xiàn),還可以用kafka生產(chǎn)者的冪等性來實(shí)現(xiàn)惜论。

kafka生產(chǎn)者的冪等性:因?yàn)榘l(fā)送端重試導(dǎo)致的消息重復(fù)發(fā)送問題许赃,kafka的冪等性可以保證重復(fù)發(fā)送的消息只接收一次,只需在生產(chǎn)者加上參數(shù) props.put(“enable.idempotence”, true) 即可馆类,默認(rèn)是false不開啟 混聊。--也可以在消費(fèi)端的業(yè)務(wù)代碼中實(shí)現(xiàn)冪等性

9). kafka的事務(wù)

Kafka的事務(wù)不同于Rocketmq,Rocketmq是保障本地事務(wù)(比如數(shù)據(jù)庫)與mq消息發(fā)送的事務(wù)一致性乾巧,Kafka的事務(wù)主要是保障一次發(fā)送多條消息的事務(wù)一致性(要么同時(shí)成功要么同時(shí)失敗)句喜,一般在kafka的流式計(jì)算場景用得多一點(diǎn),比如沟于,kafka需要對(duì)一個(gè)topic里的消息做不同的流式計(jì)算處理咳胃,處理完分別發(fā)到不同的topic里,這些topic分別被不同的下游系統(tǒng)消費(fèi)(比如hbase旷太,redis展懈,es等),這種我們肯定希望系統(tǒng)發(fā)送到多個(gè)topic的數(shù)據(jù)保持事務(wù)一致性供璧。Kafka要實(shí)現(xiàn)類似Rocketmq的分布式事務(wù)需要額外開發(fā)功能存崖。

kafka的事務(wù)處理可以參考官方文檔

實(shí)現(xiàn)代碼可參考如下:

Properties props = new Properties();

props.put("bootstrap.servers", "localhost:9092");

props.put("transactional.id", "my-transactional-id");

Producer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());

//初始化事務(wù)

producer.initTransactions();

try {

? ? //開啟事務(wù)

? ? producer.beginTransaction();

? ? for (int i = 0; i < 100; i++){

? ? ? ? //發(fā)到不同的主題的不同分區(qū)

? ? ? ? producer.send(new ProducerRecord<>("hdfs-topic", Integer.toString(i), Integer.toString(i)));

? ? ? ? producer.send(new ProducerRecord<>("es-topic", Integer.toString(i), Integer.toString(i)));

? ? ? ? producer.send(new ProducerRecord<>("redis-topic", Integer.toString(i), Integer.toString(i)));

? ? }

? ? //提交事務(wù)

? ? producer.commitTransaction();

} catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {

? ? // We can't recover from these exceptions, so our only option is to close the producer and exit.

? ? producer.close();

} catch (KafkaException e) {

? ? // For all other exceptions, just abort the transaction and try again.

? ? //回滾事務(wù)

? ? producer.abortTransaction();

}

producer.close();

10). kafka高性能的原因

(1). 磁盤順序讀寫:kafka消息不能修改以及不會(huì)從文件中間刪除保證了磁盤順序讀,kafka的消息寫入文件都是追加在文件末尾睡毒,不會(huì)寫入文件中的某個(gè)位置(隨機(jī)寫)保證了磁盤順序?qū)憽?/p>

(2). 數(shù)據(jù)傳輸?shù)牧憧截?零拷貝主要指數(shù)據(jù)操作時(shí)在內(nèi)核空間可直接完成来惧,不用與用戶空間(JVM)進(jìn)行交互),其原理如下:


(3). 讀寫數(shù)據(jù)的批量batch處理以及壓縮傳輸

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市演顾,隨后出現(xiàn)的幾起案子供搀,更是在濱河造成了極大的恐慌,老刑警劉巖偶房,帶你破解...
    沈念sama閱讀 219,039評(píng)論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異军浆,居然都是意外死亡棕洋,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,426評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門乒融,熙熙樓的掌柜王于貴愁眉苦臉地迎上來掰盘,“玉大人,你說我怎么就攤上這事赞季±⒉叮” “怎么了?”我有些...
    開封第一講書人閱讀 165,417評(píng)論 0 356
  • 文/不壞的土叔 我叫張陵申钩,是天一觀的道長次绘。 經(jīng)常有香客問我,道長,這世上最難降的妖魔是什么邮偎? 我笑而不...
    開封第一講書人閱讀 58,868評(píng)論 1 295
  • 正文 為了忘掉前任管跺,我火速辦了婚禮,結(jié)果婚禮上禾进,老公的妹妹穿的比我還像新娘豁跑。我一直安慰自己,他們只是感情好泻云,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,892評(píng)論 6 392
  • 文/花漫 我一把揭開白布艇拍。 她就那樣靜靜地躺著,像睡著了一般宠纯。 火紅的嫁衣襯著肌膚如雪卸夕。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,692評(píng)論 1 305
  • 那天征椒,我揣著相機(jī)與錄音娇哆,去河邊找鬼。 笑死勃救,一個(gè)胖子當(dāng)著我的面吹牛碍讨,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播蒙秒,決...
    沈念sama閱讀 40,416評(píng)論 3 419
  • 文/蒼蘭香墨 我猛地睜開眼勃黍,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了晕讲?” 一聲冷哼從身側(cè)響起覆获,我...
    開封第一講書人閱讀 39,326評(píng)論 0 276
  • 序言:老撾萬榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎瓢省,沒想到半個(gè)月后弄息,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,782評(píng)論 1 316
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡勤婚,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,957評(píng)論 3 337
  • 正文 我和宋清朗相戀三年摹量,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片馒胆。...
    茶點(diǎn)故事閱讀 40,102評(píng)論 1 350
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡缨称,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出祝迂,到底是詐尸還是另有隱情睦尽,我是刑警寧澤,帶...
    沈念sama閱讀 35,790評(píng)論 5 346
  • 正文 年R本政府宣布型雳,位于F島的核電站当凡,受9級(jí)特大地震影響山害,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜宁玫,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,442評(píng)論 3 331
  • 文/蒙蒙 一粗恢、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧欧瘪,春花似錦眷射、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,996評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至芥被,卻和暖如春欧宜,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背拴魄。 一陣腳步聲響...
    開封第一講書人閱讀 33,113評(píng)論 1 272
  • 我被黑心中介騙來泰國打工冗茸, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人匹中。 一個(gè)月前我還...
    沈念sama閱讀 48,332評(píng)論 3 373
  • 正文 我出身青樓夏漱,卻偏偏與公主長得像,于是被迫代替她去往敵國和親顶捷。 傳聞我的和親對(duì)象是個(gè)殘疾皇子挂绰,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,044評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容