其實我有點猶豫要不要寫這個文章币绩,因為筆者的kafka客戶端版本并不是最新的喝噪,不知道新版本是否會有這個問題。
版本提示
kafka server版本
Scala 2.11 - kafka_2.11-2.0.0.tgz
kafka client版本
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.1.7.RELEASE</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>1.1.1</version>
</dependency>
問題現(xiàn)象
筆者在優(yōu)化公司kafka producer性能時發(fā)現(xiàn)一個很奇怪的現(xiàn)象灼芭,當 1000tps 從外部輸入消息給kafka producer的時候,即每秒1000個record癣猾,但實際上會發(fā)現(xiàn)kafka.producer:type=producer-metrics
mbean的record-send-rate
會達到14000到15000r/s,record-error-rate會達到1000r/s榨了。因為配置了batch.size為5MB和linger.ms為5秒煎谍,所以消息是會緩沖一段時間發(fā)送出去的,但是不至于能長時間持續(xù)積攢出每秒1萬多的記錄龙屉,這樣大的發(fā)送速度發(fā)下去呐粘,也沒消息能積攢下來啊。
我發(fā)現(xiàn)kafka客戶端的日志沒有打印转捕,于是配置了org.apache.kafka.clients
logger, 日志級別設置為TRACE
作岖。
我在日志中發(fā)現(xiàn)了下面的報錯,關鍵詞是MESSAGE_TOO_LARGE
五芝。
2020-05-14 21:29:58.624 [kafka-producer-network-thread | producer-1] WARN (org.apache.kafka.clients.producer.internals.Sender:509) completeBatch - [Producer clientId=producer-1] Got error produce response in correlation id 226233 on topic-partition span-1, splitting and retrying (3 attempts left). Error: MESSAGE_TOO_LARGE
后續(xù)還試過batch.size
設置為1MB痘儡,也是一樣的現(xiàn)象,憑空多出很多消息的感覺枢步。
首次確定報錯含義
搜索這個報錯沉删,是kafka broker側(cè)有個配置message.max.bytes
,默認為1MB醉途,如果發(fā)送的消息體積大于這個矾瑰,就會報錯。
《kafka權(quán)威指南》說這個是限制單條record消息體積大小的隘擎。
官方文檔 也是這么說的
The request included a message larger than the max message size the server will accept.
這些都指向request里面某個record超過了kafka broker側(cè)message.max.bytes
配置殴穴。
當服務端返回MESSAGE_TOO_LARGE
報錯后,客戶端將消息拆分成更小的批次進行重新發(fā)送,所以會憑空產(chǎn)生很多消息的錯覺采幌。沒去探究拆分邏輯劲够,是怎么從1000r/s拆到15000r/s的也是奇怪,最好合理設置參數(shù)避免這種場景休傍。
確定kafka broker側(cè)message.max.bytes
配置大小
開始我想通過kafka自帶的kafka-config.sh征绎,卻發(fā)現(xiàn)在我這個版本,這個腳本帶的命令不會反饋所有配置尊残,只會返回你動態(tài)修改過的那些炒瘸。
kafka-manager界面點擊topic的配置淤堵,也只會顯示空
通過搜索上面這個問題寝衫,我發(fā)現(xiàn)了 Getting broker configuration via kafka-configs.sh
Yes this is a known behavior: https://issues.apache.org/jira/browse/KAFKA-7720
Currently this tool only shows configurations that have been dynamically overridden.
To see all broker configs, you have 2 options:
Use the AdminClient.describeConfigs() API
Tweak the ConfigCommand.scala tool. Just remove the filter on https://github.com/apache/kafka/blob/ad26914de65e2db244fbe36a2a5fd03de87dfb79/core/src/main/scala/kafka/admin/ConfigCommand.scala#L347
Update: This issue is being addressed by KIP-524 which is currently targetted for Kafka 2.5
后面我到server.log去grep
我在server.log里發(fā)現(xiàn)了這個配置的大小kafka實際生效的message.max.bytes值是1000012字節(jié),0.95左右拐邪,小于1MB慰毅。
bash-4.4# cat server.log.*|grep message.max.bytes
message.max.bytes = 1000012
message.max.bytes = 1000012
message.max.bytes = 1000012
message.max.bytes = 1000012
message.max.bytes = 1000012
message.max.bytes = 1000012
topic層的配置沒有進行覆蓋,所以參數(shù)值確定為1000012字節(jié)
疑問
但實際上我這次壓測采用的消息體積是固定的扎阶,kafka producer metrics的record-size-avg
是2.7KB汹胃,record-size-max
是9.6KB,遠遠小于1000012字節(jié)东臀。
為什么這樣還會被拒絕呢着饥。
調(diào)整batch.size重新測試
我將batch.size調(diào)整到512KB, max.request.size調(diào)整到2MB,重新壓測,這次日志里不再刷MESSAGE_TOO_LARGE
報錯了惰赋。
record-send-rate
恢復到1000r/s每秒宰掉,record-error-rate
為0.問題消除
這里我已經(jīng)感覺message.max.bytes
可能限制的不是單條消息的大小,而是限制的batch.size
的大小赁濒。
去kafka源碼探究message.max.bytes
https://github.com/apache/kafka/blob/847ff8f/core/src/main/scala/kafka/log/Log.scala#L1377
// Check if the message sizes are valid.
val batchSize = batch.sizeInBytes
if (batchSize > config.maxMessageSize) {
brokerTopicStats.topicStats(topicPartition.topic).bytesRejectedRate.mark(records.sizeInBytes)
brokerTopicStats.allTopicsStats.bytesRejectedRate.mark(records.sizeInBytes)
throw new RecordTooLargeException(s"The record batch size in the append to $topicPartition is $batchSize bytes " +
s"which exceeds the maximum configured value of ${config.maxMessageSize}.")
}
不知道我這段看的對不對轨奄,不過確實非常可能是
message.max.bytes
是限制的batch.size
的大小拒炎。
總結(jié)
batch.size
最好比服務端實際生效的message.max.bytes
配置小一些挪拟,否則在筆者所在的版本會遇到MESSAGE_TOO_LARGE
報錯,客戶端進行重試發(fā)送造成很大的消息發(fā)送量击你。這個重試是可以避免發(fā)生的玉组。