問題
某一天業(yè)務來找我稚失,反映發(fā)數(shù)據(jù)到某一個Kafka集群特別慢。
并且他們提供了一份自己的測試結(jié)果飞袋,結(jié)果顯示發(fā)送數(shù)據(jù)到Kafka集群A的平均響應延遲在10ms以內(nèi)戳气,但是發(fā)送到Kafka集群B的平均響應延遲卻達到了2000ms+。
這種問題一般是比較頭疼的巧鸭,首先瓶您,我們Kafka集群都有監(jiān)控和報警,通過查看可用性纲仍、流量變化呀袱、Kafka日志等方式,都沒有發(fā)現(xiàn)任何異樣郑叠;其次夜赵,響應慢也有可能和用戶的使用方式和測試方法有關(guān)系。
因此第一步锻拘,我決定驗證一下問題的存在油吭。
驗證問題
在kafka/bin
目錄中,kafka提供了一個寫請求性能測試腳本kafka-producer-perf-test.sh
署拟。
這個腳本會運行kafka中的kafka.perf.ProducerPerformance
類婉宰,發(fā)送消息到kafka并輸出CSV報告。
測試命令如下:
kafka/bin/kafka-producer-perf-test.sh --broker-list ${BROKER_LIST} --topics perf-test-topic --show-detailed-stats --messages 10000 --csv-reporter-enabled --metrics-dir ./perf-report
通過分析生成的報告推穷,發(fā)現(xiàn)確實有一臺節(jié)點的響應比較慢:
time | min | max | mean | median | stddev | 95% | 99% | 99.90% |
---|---|---|---|---|---|---|---|---|
1 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 |
2 | 1184.369398 | 1184.369398 | 1184.369398 | 1184.369398 | 0 | 1184.369398 | 1184.369398 | 1184.369398 |
3 | 1184.369398 | 1308.03076 | 1246.200079 | 1246.200079 | 87.44178764 | 1308.03076 | 1308.03076 | 1308.03076 |
4 | 1036.153496 | 1308.03076 | 1176.184551 | 1184.369398 | 136.1233097 | 1308.03076 | 1308.03076 | 1308.03076 |
5 | 1036.153496 | 1308.03076 | 1176.184551 | 1184.369398 | 136.1233097 | 1308.03076 | 1308.03076 | 1308.03076 |
6 | 1036.153496 | 1308.03076 | 1170.298591 | 1168.505053 | 111.7658942 | 1308.03076 | 1308.03076 | 1308.03076 |
7 | 1036.153496 | 1308.03076 | 1195.533735 | 1184.369398 | 112.0391625 | 1308.03076 | 1308.03076 | 1308.03076 |
8 | 1036.153496 | 1308.03076 | 1176.72978 | 1168.505053 | 110.2893991 | 1308.03076 | 1308.03076 | 1308.03076 |
可以看到P999分布已經(jīng)達到了1300ms左右心包,這顯然是不正常的,但是原因是什么呢馒铃?
分析
既然日志沒有問題蟹腾,那只能看一下jstack信息了:
"kafka-request-handler-12" daemon prio=10 tid=0x00007fee9c7eb800 nid=0xea5a waiting for monitor entry [0x00007fecfbaf9000]
java.lang.Thread.State: BLOCKED (on object monitor)
at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:350)
- waiting to lock <0x0000000640327150> (a java.lang.Object)
at kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:376)
at kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:366)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at kafka.server.KafkaApis.appendToLocalLog(KafkaApis.scala:366)
at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:292)
at kafka.server.KafkaApis.handle(KafkaApis.scala:185)
at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:42)
at java.lang.Thread.run(Thread.java:662)
"kafka-request-handler-11" daemon prio=10 tid=0x00007fee9c7e9000 nid=0xea59 waiting for monitor entry [0x00007fecfbbfa000]
java.lang.Thread.State: BLOCKED (on object monitor)
at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:350)
- waiting to lock <0x0000000640327150> (a java.lang.Object)
at kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:376)
at kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:366)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at kafka.server.KafkaApis.appendToLocalLog(KafkaApis.scala:366)
at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:292)
at kafka.server.KafkaApis.handle(KafkaApis.scala:185)
at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:42)
at java.lang.Thread.run(Thread.java:662)
如上發(fā)現(xiàn)jstack中有非常奇怪的信息,很多kafka-request-handler線程都處于阻塞狀態(tài)区宇。
這里簡單解釋一下kafka的處理請求線程模型娃殖,引用一篇講Kafka NIO網(wǎng)絡(luò)通信的文章中的圖來說明:
如圖,kafka采用了Java NIO中的selector模型议谷。一個Acceptor線程負責接受請求炉爆,多個Processor線程負責處理請求。但實際上Processor線程只是把請求封裝成kafka request卧晓,然后丟到RequestChannel中(當然也負責讀取response并返回芬首,這里不展開)。真正執(zhí)行請求的是KafkaRequestHandler逼裆,即jstack中的kafka-request-handler線程郁稍。
所以當kafka-request-handler線程出現(xiàn)大量阻塞的時候,會極大地影響整個節(jié)點的響應效率胜宇。
關(guān)于Java線程中的BLOCKED狀態(tài)耀怜,可以直接看一下Java doc說明:
/**
* Thread state for a thread blocked waiting for a monitor lock.
* A thread in the blocked state is waiting for a monitor lock
* to enter a synchronized block/method or
* reenter a synchronized block/method after calling
* {@link Object#wait() Object.wait}.
*/
可見kafka-request-handler線程是因為搶鎖而發(fā)生了阻塞。我們根據(jù)jstack信息中的kafka.cluster.Partition.appendMessagesToLeader
定位到了源碼:
def appendMessagesToLeader(messages: ByteBufferMessageSet) = {
leaderIsrUpdateLock synchronized {
val leaderReplicaOpt = leaderReplicaIfLocal()
leaderReplicaOpt match {
case Some(leaderReplica) =>
val log = leaderReplica.log.get
val info = log.append(messages, assignOffsets = true)
// we may need to increment high watermark since ISR could be down to 1
maybeIncrementLeaderHW(leaderReplica)
info
case None =>
throw new NotLeaderForPartitionException("Leader not local for partition [%s,%d] on broker %d"
.format(topic, partitionId, localBrokerId))
}
}
}
可以看到這個方法確實是同步的掸屡,同步對象是leaderIsrUpdateLock封寞。由于leaderIsrUpdateLock是kafka.cluster.Partition
的成員變量,也就是說只有在寫同一個topic partition的時候仅财,才會發(fā)生互斥等待狈究。
所以發(fā)生上面問題的原因只可能是某個topic有大量的寫請求,而且這個topic的partition數(shù)量不多盏求,導致并發(fā)不足抖锥。
于是大量該topic的ProduceRequest占用了kafka-request-handler線程池,但是這些線程之間互相搶鎖碎罚,執(zhí)行效率比較低磅废,從而導致其他topic的請求無法及時被處理。
解決
通過分析日志和查看監(jiān)控流量荆烈,定位到集群中某個topic的ProduceRequest請求的QPS占了整個集群的80%以上拯勉。
通過查看該topic監(jiān)控指標中的單位時間內(nèi)的消息條目數(shù)(MessagesInPerSec)和單位時間內(nèi)的發(fā)送請求數(shù)(ProduceRequestPerSec)竟趾,可以計算出該Topic平均不到10條消息就會觸發(fā)一次kafka寫請求;再考慮到partition數(shù)量宫峦,推測應該是業(yè)務采用了kafka producer的同步模式岔帽,每條消息都觸發(fā)一次kafka寫請求。
解決方法有兩種:
- 通過在kafka producer config中配置
producer.type=async
來使用異步發(fā)送模式导绷。該模式下client會把消息先放到一個queue中犀勒,后臺的發(fā)送線程會從queue中取出消息,以batch(默認200條)的方式發(fā)送到kafka妥曲。這種方式提高了吞吐贾费,妥協(xié)了時效性(可以配置最大發(fā)送間隔,默認5000ms)檐盟,適合數(shù)據(jù)量比較大褂萧,對延遲不敏感的業(yè)務。 - 依舊采用默認的同步方式葵萎,不過client需要把要發(fā)送的消息先緩存到buffer中箱玷,然后調(diào)用send接口。其實send接口的參數(shù)是可變參數(shù)陌宿,接收的是message列表锡足,
def send(messages: KeyedMessage[K, V]*): Unit
;但有一些用戶不注意壳坪,會把自己集合中的一批消息逐條的調(diào)用send舶得,給kafka后端帶來QPS壓力。
- 錯誤示例
val messages = Seq("hello", "world")
val properties = new Properties()
// custom properties here.
val kafkaProducer = new Producer[String, String](new ProducerConfig(new Properties()))
messages.foreach{m =>
val keyedMessage = new KeyedMessage[String, String]("topic", null, m)
kafkaProducer.send(keyedMessage)
}
- 正確示例
val messages = Seq("hello", "world")
val properties = new Properties()
// custom properties here.
val kafkaProducer = new Producer[String, String](new ProducerConfig(new Properties()))
val keyedMessages = messages.map(m => new KeyedMessage[String, String]("topic", null, m))
kafkaProducer.send(keyedMessages: _*)
當然爽蝴,增加topic partition數(shù)量也能在一定程度上緩解問題沐批,因為不同partition之間的寫請求是不互斥的,但這種方式更像是治標不治本蝎亚,掩蓋了根本問題九孩。
總結(jié)
合理地發(fā)送網(wǎng)絡(luò)請求在分布式系統(tǒng)中非常重要,為了提高效率发框,通常在權(quán)衡時效性和吞吐的情況下躺彬,以“聚少為多”的方式發(fā)送批量的請求。過多的小請求不僅會降低吞吐梅惯,還可能會壓垮后端的服務宪拥。
當然,作為服務提供方铣减,需要通過多租戶她君、限流等方式,避免不正常使用的場景把服務壓垮葫哗。