小記一次Kafka集群響應慢問題追查

問題

某一天業(yè)務來找我稚失,反映發(fā)數(shù)據(jù)到某一個Kafka集群特別慢。
并且他們提供了一份自己的測試結(jié)果飞袋,結(jié)果顯示發(fā)送數(shù)據(jù)到Kafka集群A的平均響應延遲在10ms以內(nèi)戳气,但是發(fā)送到Kafka集群B的平均響應延遲卻達到了2000ms+。
這種問題一般是比較頭疼的巧鸭,首先瓶您,我們Kafka集群都有監(jiān)控和報警,通過查看可用性纲仍、流量變化呀袱、Kafka日志等方式,都沒有發(fā)現(xiàn)任何異樣郑叠;其次夜赵,響應慢也有可能和用戶的使用方式和測試方法有關(guān)系。
因此第一步锻拘,我決定驗證一下問題的存在油吭。

驗證問題

kafka/bin目錄中,kafka提供了一個寫請求性能測試腳本kafka-producer-perf-test.sh署拟。
這個腳本會運行kafka中的kafka.perf.ProducerPerformance類婉宰,發(fā)送消息到kafka并輸出CSV報告。
測試命令如下:

kafka/bin/kafka-producer-perf-test.sh --broker-list ${BROKER_LIST} --topics perf-test-topic --show-detailed-stats --messages 10000 --csv-reporter-enabled --metrics-dir ./perf-report

通過分析生成的報告推穷,發(fā)現(xiàn)確實有一臺節(jié)點的響應比較慢:

time min max mean median stddev 95% 99% 99.90%
1 0 0 0 0 0 0 0 0
2 1184.369398 1184.369398 1184.369398 1184.369398 0 1184.369398 1184.369398 1184.369398
3 1184.369398 1308.03076 1246.200079 1246.200079 87.44178764 1308.03076 1308.03076 1308.03076
4 1036.153496 1308.03076 1176.184551 1184.369398 136.1233097 1308.03076 1308.03076 1308.03076
5 1036.153496 1308.03076 1176.184551 1184.369398 136.1233097 1308.03076 1308.03076 1308.03076
6 1036.153496 1308.03076 1170.298591 1168.505053 111.7658942 1308.03076 1308.03076 1308.03076
7 1036.153496 1308.03076 1195.533735 1184.369398 112.0391625 1308.03076 1308.03076 1308.03076
8 1036.153496 1308.03076 1176.72978 1168.505053 110.2893991 1308.03076 1308.03076 1308.03076

可以看到P999分布已經(jīng)達到了1300ms左右心包,這顯然是不正常的,但是原因是什么呢馒铃?

分析

既然日志沒有問題蟹腾,那只能看一下jstack信息了:

"kafka-request-handler-12" daemon prio=10 tid=0x00007fee9c7eb800 nid=0xea5a waiting for monitor entry [0x00007fecfbaf9000]
   java.lang.Thread.State: BLOCKED (on object monitor)
        at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:350)
        - waiting to lock <0x0000000640327150> (a java.lang.Object)
        at kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:376)
        at kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:366)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
        at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
        at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
        at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
        at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
        at scala.collection.AbstractTraversable.map(Traversable.scala:105)
        at kafka.server.KafkaApis.appendToLocalLog(KafkaApis.scala:366)
        at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:292)
        at kafka.server.KafkaApis.handle(KafkaApis.scala:185)
        at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:42)
        at java.lang.Thread.run(Thread.java:662)
"kafka-request-handler-11" daemon prio=10 tid=0x00007fee9c7e9000 nid=0xea59 waiting for monitor entry [0x00007fecfbbfa000]
   java.lang.Thread.State: BLOCKED (on object monitor)
        at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:350)
        - waiting to lock <0x0000000640327150> (a java.lang.Object)
        at kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:376)
        at kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:366)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
        at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
        at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
        at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
        at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
        at scala.collection.AbstractTraversable.map(Traversable.scala:105)
        at kafka.server.KafkaApis.appendToLocalLog(KafkaApis.scala:366)
        at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:292)
        at kafka.server.KafkaApis.handle(KafkaApis.scala:185)
        at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:42)
        at java.lang.Thread.run(Thread.java:662)

如上發(fā)現(xiàn)jstack中有非常奇怪的信息,很多kafka-request-handler線程都處于阻塞狀態(tài)区宇。
這里簡單解釋一下kafka的處理請求線程模型娃殖,引用一篇講Kafka NIO網(wǎng)絡(luò)通信的文章中的圖來說明:

kafka-nio

如圖,kafka采用了Java NIO中的selector模型议谷。一個Acceptor線程負責接受請求炉爆,多個Processor線程負責處理請求。但實際上Processor線程只是把請求封裝成kafka request卧晓,然后丟到RequestChannel中(當然也負責讀取response并返回芬首,這里不展開)。真正執(zhí)行請求的是KafkaRequestHandler逼裆,即jstack中的kafka-request-handler線程郁稍。
所以當kafka-request-handler線程出現(xiàn)大量阻塞的時候,會極大地影響整個節(jié)點的響應效率胜宇。

關(guān)于Java線程中的BLOCKED狀態(tài)耀怜,可以直接看一下Java doc說明:

        /**
         * Thread state for a thread blocked waiting for a monitor lock.
         * A thread in the blocked state is waiting for a monitor lock
         * to enter a synchronized block/method or
         * reenter a synchronized block/method after calling
         * {@link Object#wait() Object.wait}.
         */

可見kafka-request-handler線程是因為搶鎖而發(fā)生了阻塞。我們根據(jù)jstack信息中的kafka.cluster.Partition.appendMessagesToLeader定位到了源碼:

  def appendMessagesToLeader(messages: ByteBufferMessageSet) = {
    leaderIsrUpdateLock synchronized {
      val leaderReplicaOpt = leaderReplicaIfLocal()
      leaderReplicaOpt match {
        case Some(leaderReplica) =>
          val log = leaderReplica.log.get
          val info = log.append(messages, assignOffsets = true)
          // we may need to increment high watermark since ISR could be down to 1
          maybeIncrementLeaderHW(leaderReplica)
          info
        case None =>
          throw new NotLeaderForPartitionException("Leader not local for partition [%s,%d] on broker %d"
            .format(topic, partitionId, localBrokerId))
      }
    }
  }

可以看到這個方法確實是同步的掸屡,同步對象是leaderIsrUpdateLock封寞。由于leaderIsrUpdateLock是kafka.cluster.Partition的成員變量,也就是說只有在寫同一個topic partition的時候仅财,才會發(fā)生互斥等待狈究。
所以發(fā)生上面問題的原因只可能是某個topic有大量的寫請求,而且這個topic的partition數(shù)量不多盏求,導致并發(fā)不足抖锥。
于是大量該topic的ProduceRequest占用了kafka-request-handler線程池,但是這些線程之間互相搶鎖碎罚,執(zhí)行效率比較低磅废,從而導致其他topic的請求無法及時被處理。

解決

通過分析日志和查看監(jiān)控流量荆烈,定位到集群中某個topic的ProduceRequest請求的QPS占了整個集群的80%以上拯勉。
通過查看該topic監(jiān)控指標中的單位時間內(nèi)的消息條目數(shù)(MessagesInPerSec)和單位時間內(nèi)的發(fā)送請求數(shù)(ProduceRequestPerSec)竟趾,可以計算出該Topic平均不到10條消息就會觸發(fā)一次kafka寫請求;再考慮到partition數(shù)量宫峦,推測應該是業(yè)務采用了kafka producer的同步模式岔帽,每條消息都觸發(fā)一次kafka寫請求。
解決方法有兩種:

  1. 通過在kafka producer config中配置producer.type=async來使用異步發(fā)送模式导绷。該模式下client會把消息先放到一個queue中犀勒,后臺的發(fā)送線程會從queue中取出消息,以batch(默認200條)的方式發(fā)送到kafka妥曲。這種方式提高了吞吐贾费,妥協(xié)了時效性(可以配置最大發(fā)送間隔,默認5000ms)檐盟,適合數(shù)據(jù)量比較大褂萧,對延遲不敏感的業(yè)務。
  2. 依舊采用默認的同步方式葵萎,不過client需要把要發(fā)送的消息先緩存到buffer中箱玷,然后調(diào)用send接口。其實send接口的參數(shù)是可變參數(shù)陌宿,接收的是message列表锡足,def send(messages: KeyedMessage[K, V]*): Unit;但有一些用戶不注意壳坪,會把自己集合中的一批消息逐條的調(diào)用send舶得,給kafka后端帶來QPS壓力。
  • 錯誤示例
    val messages = Seq("hello", "world")
    val properties = new Properties()
    // custom properties here.
    val kafkaProducer = new Producer[String, String](new ProducerConfig(new Properties()))

    messages.foreach{m =>
      val keyedMessage = new KeyedMessage[String, String]("topic", null, m)
      kafkaProducer.send(keyedMessage)
    }
  • 正確示例
    val messages = Seq("hello", "world")
    val properties = new Properties()
    // custom properties here.
    val kafkaProducer = new Producer[String, String](new ProducerConfig(new Properties()))

    val keyedMessages = messages.map(m => new KeyedMessage[String, String]("topic", null, m))
    kafkaProducer.send(keyedMessages: _*)

當然爽蝴,增加topic partition數(shù)量也能在一定程度上緩解問題沐批,因為不同partition之間的寫請求是不互斥的,但這種方式更像是治標不治本蝎亚,掩蓋了根本問題九孩。

總結(jié)

合理地發(fā)送網(wǎng)絡(luò)請求在分布式系統(tǒng)中非常重要,為了提高效率发框,通常在權(quán)衡時效性和吞吐的情況下躺彬,以“聚少為多”的方式發(fā)送批量的請求。過多的小請求不僅會降低吞吐梅惯,還可能會壓垮后端的服務宪拥。
當然,作為服務提供方铣减,需要通過多租戶她君、限流等方式,避免不正常使用的場景把服務壓垮葫哗。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末缔刹,一起剝皮案震驚了整個濱河市球涛,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌校镐,老刑警劉巖宾符,帶你破解...
    沈念sama閱讀 212,383評論 6 493
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異灭翔,居然都是意外死亡,警方通過查閱死者的電腦和手機辣苏,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,522評論 3 385
  • 文/潘曉璐 我一進店門肝箱,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人稀蟋,你說我怎么就攤上這事煌张。” “怎么了退客?”我有些...
    開封第一講書人閱讀 157,852評論 0 348
  • 文/不壞的土叔 我叫張陵骏融,是天一觀的道長。 經(jīng)常有香客問我萌狂,道長档玻,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 56,621評論 1 284
  • 正文 為了忘掉前任茫藏,我火速辦了婚禮误趴,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘务傲。我一直安慰自己凉当,他們只是感情好,可當我...
    茶點故事閱讀 65,741評論 6 386
  • 文/花漫 我一把揭開白布售葡。 她就那樣靜靜地躺著看杭,像睡著了一般。 火紅的嫁衣襯著肌膚如雪挟伙。 梳的紋絲不亂的頭發(fā)上楼雹,一...
    開封第一講書人閱讀 49,929評論 1 290
  • 那天,我揣著相機與錄音尖阔,去河邊找鬼六敬。 笑死,一個胖子當著我的面吹牛悴能,可吹牛的內(nèi)容都是我干的杂伟。 我是一名探鬼主播,決...
    沈念sama閱讀 39,076評論 3 410
  • 文/蒼蘭香墨 我猛地睜開眼筷笨,長吁一口氣:“原來是場噩夢啊……” “哼憔鬼!你這毒婦竟也來了龟劲?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 37,803評論 0 268
  • 序言:老撾萬榮一對情侶失蹤轴或,失蹤者是張志新(化名)和其女友劉穎昌跌,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體照雁,經(jīng)...
    沈念sama閱讀 44,265評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡蚕愤,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,582評論 2 327
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了饺蚊。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片萍诱。...
    茶點故事閱讀 38,716評論 1 341
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖污呼,靈堂內(nèi)的尸體忽然破棺而出裕坊,到底是詐尸還是另有隱情,我是刑警寧澤燕酷,帶...
    沈念sama閱讀 34,395評論 4 333
  • 正文 年R本政府宣布籍凝,位于F島的核電站,受9級特大地震影響苗缩,放射性物質(zhì)發(fā)生泄漏饵蒂。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 40,039評論 3 316
  • 文/蒙蒙 一酱讶、第九天 我趴在偏房一處隱蔽的房頂上張望苹享。 院中可真熱鬧,春花似錦浴麻、人聲如沸得问。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,798評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽宫纬。三九已至,卻和暖如春膏萧,著一層夾襖步出監(jiān)牢的瞬間漓骚,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,027評論 1 266
  • 我被黑心中介騙來泰國打工榛泛, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留蝌蹂,地道東北人。 一個月前我還...
    沈念sama閱讀 46,488評論 2 361
  • 正文 我出身青樓曹锨,卻偏偏與公主長得像孤个,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子沛简,可洞房花燭夜當晚...
    茶點故事閱讀 43,612評論 2 350

推薦閱讀更多精彩內(nèi)容