0.8.2.1版本KafkaProducer消息發(fā)送超時原因分析
問題描述
我們在測試環(huán)境收到用戶反饋發(fā)送kafka消息出現(xiàn)ReadTimeout異常。對此我們進行了簡單的分析
用戶的請求日志
"request-start@2018-08-22 10:30:45.399" "192.168.5.116 (172.16.74.67) -> 172.16.52.72:80" "POST //kafka/publish HTTP/1.1" 603 process-req-time:60003 "response-end@2018-08-22 10:31:45.402" status:200 97 http-nio-8080-exec-9 "-" "Java/1.8.0_144" traceId:- topic:payment.abc.msg connection-status:???X???
我們發(fā)現(xiàn)多條請求超時日志,出現(xiàn)大約60s左右的耗時猾担,檢查這些topic的配置都是正常的(topic存在,leader節(jié)點存在) .
繼續(xù)查看kafka服務端日志,有部分ERROR日志识埋,大致意思是有不合法的topic請求存在
[2018-08-22 11:00:00,174] ERROR [KafkaApi-2] error when handling request Name: TopicMetadataRequest; Version: 0; CorrelationId: 6226810; ClientId: producer-30; Topics: fincx.xxx.status (wpk-reactor-web),payment.abc.msg (kafka.server.KafkaApis)
kafka.common.InvalidTopicException: topic name fincx.repayment.status (wpk-reactor-web) is illegal, contains a character other than ASCII alphanumerics, '.', '_' and '-'
at kafka.common.Topic$.validate(Topic.scala:42)
at kafka.admin.AdminUtils$.createOrUpdateTopicPartitionAssignmentPathInZK(AdminUtils.scala:181)
at kafka.admin.AdminUtils$.createTopic(AdminUtils.scala:172)
at kafka.server.KafkaApis$$anonfun$19.apply(KafkaApis.scala:520)
at kafka.server.KafkaApis$$anonfun$19.apply(KafkaApis.scala:503)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
at scala.collection.immutable.Set$Set1.foreach(Set.scala:79)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:245)
at scala.collection.AbstractSet.scala$collection$SetLike$$super$map(Set.scala:47)
at scala.collection.SetLike$class.map(SetLike.scala:92)
at scala.collection.AbstractSet.map(Set.scala:47)
at kafka.server.KafkaApis.getTopicMetadata(KafkaApis.scala:503)
at kafka.server.KafkaApis.handleTopicMetadataRequest(KafkaApis.scala:542)
at kafka.server.KafkaApis.handle(KafkaApis.scala:62)
at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:59)
at java.lang.Thread.run(Thread.java:748)
問題分析
這個問題困惑的地方在于發(fā)送一個配置正常的topic竟然發(fā)送超時免绿,日志中含有其他topic的異常錯誤信息唧席,不同topic之間的消息發(fā)送竟然相互干擾?
為什么會因為有一個異常topic存在,導致其他原本正常的topic都無法發(fā)送消息淌哟,這是個比較嚴重的問題了迹卢。因為KafkaProducer是線程安全,一個應用中往往使用一個KafkaProducer發(fā)送多個topic消息徒仓。
重現(xiàn)問題
重啟服務腐碱,新初始化的KafkaProducer發(fā)送正常topic的消息發(fā)送成功,沒有超時掉弛,嘗試發(fā)送topic命名不合法的topic症见,發(fā)送超時,kafka拋出異常kafka.common.InvalidTopicException殃饿。此后再發(fā)送正常topic的消息均發(fā)送失敗谋作。
證明KafkaProducer會因為發(fā)送一個異常topic消息導致完全不可用。
代碼分析
0.8.2.1的KafkaProducer源碼分析
根據(jù)異常日志我們比較容易分析出異常原因是KafkaProducer發(fā)送消息前需要拉取broker端的TopicMetadata乎芳;如果發(fā)現(xiàn)topic不存在遵蚜,會調用AdminUtils.createTopic()方法創(chuàng)建,此時topic命名不規(guī)范秒咐,拋出異常谬晕。接下來我們看下KafkaProducer是如何處理這一異常的。
分析KafkaProducer.waitOnMetadata方法:
private void waitOnMetadata(String topic, long maxWaitMs) {
if (metadata.fetch().partitionsForTopic(topic) != null) {
return;
} else {
long begin = time.milliseconds();
long remainingWaitMs = maxWaitMs;
while (metadata.fetch().partitionsForTopic(topic) == null) {
log.trace("Requesting metadata update for topic {}.", topic);
int version = metadata.requestUpdate();
metadata.add(topic);
sender.wakeup();
metadata.awaitUpdate(version, remainingWaitMs);
long elapsed = time.milliseconds() - begin;
if (elapsed >= maxWaitMs)
throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");
remainingWaitMs = maxWaitMs - elapsed;
}
}
}
上述方法maxWaitMs參數(shù)取自生產者配置參數(shù)metadata.fetch.timeout.ms携取,官方默認值為60000ms攒钳,和前面我們超時日志請求時間60s是完全對的上的。
第一步:metadata.fetch().partitionsForTopic(topic)檢查topic的meta信息是否已經有緩存雷滋,有的化就可以直接用了
第二步:獲取最新TopicMetadata數(shù)據(jù)不撑,此處while循環(huán),強制在metadata.fetch.timeout.ms時間內完成TopicMetadata的更新操作晤斩,metadata.requestUpdate設置當前TopicMetadata需要強制更新
調用NetworkClient.metadataRequest發(fā)送消息
private ClientRequest metadataRequest(long now, int node, Set<String> topics) {
MetadataRequest metadata = new MetadataRequest(new ArrayList<String>(topics));
RequestSend send = new RequestSend(node, nextRequestHeader(ApiKeys.METADATA), metadata.toStruct());
return new ClientRequest(now, true, send, null);
}
NetworkClient.metadataRequest的調用方NetworkClient.maybeUpdateMetadata方法片段
Set<String> topics = metadata.topics();
this.metadataFetchInProgress = true;
ClientRequest metadataRequest = metadataRequest(now, node.id(), topics);
我們看到MetadataRequest的請求是包含了整個KafkaProducer緩存的所有topic列表數(shù)據(jù)焕檬,并不是只對當前發(fā)送的topic進行元數(shù)據(jù)更新。
至此我們大致了解了KafkaProducer發(fā)送超時的前因后果:因為KafkaProducer緩存的topic列表中有一個非法的topic澳泵,導致每次批量更新元數(shù)據(jù)時实愚,都包含了非法的topic,每次元數(shù)據(jù)更新操作都失敗兔辅。這是KafkaProducer異常處理缺陷導致腊敲。不過該BUG只有在auto.create.topics.enable=true才觸發(fā),如果為false维苔,就不會創(chuàng)建topic碰辅,不會拋出異常。這是一個kafka官方BUG KAFKA-1884
關于auto.create.topics.enable參數(shù)
該參數(shù)默認為true介时,老proxy代理的kafka集群該參數(shù)配置為false没宾,禁止用戶自動創(chuàng)建topic凌彬,官方也不建議開啟該參數(shù)。
此外循衰,auto.create.topics.enable=true時候铲敛,當啟動一個消費者,消費的topic不存在時羹蚣,也會導致topic創(chuàng)建原探,導致服務器上存在一堆垃圾topic。
萬幸的是我們線上業(yè)務集群都是禁用自動生成topic的顽素,使得這一BUG沒有影響到線上系統(tǒng)咽弦。
1.0.0的KafkaProducer是否存在同樣問題
我們新研發(fā)的消息系統(tǒng)是基于Kafka1.0.0版本的,針對KafkaProducer這一明顯設計缺陷胁出,1.0.0版本官方是否已經解決型型?
我們在本地進行了驗證
public class KafkaProducerTest {
public static void main(String[] args) throws Exception {
Properties props=new Properties();
props.put("bootstrap.servers", "127.0.0.1:9080");
props.put(org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
KafkaProducer<String, byte[]> kafkaProducer=new KafkaProducer(props);
sendMessage(kafkaProducer,"test");
sendMessage(kafkaProducer,"fincx.repayment.status (wpk-reactor-web)");
sendMessage(kafkaProducer,"test");
}
private static void sendMessage(KafkaProducer kafkaProducer,String topic) {
try{
Future<RecordMetadata> f1= kafkaProducer.send(new ProducerRecord<String, byte[]>(topic, "12", "12".getBytes()));
RecordMetadata meta=f1.get(5, TimeUnit.SECONDS);
System.out.println(meta.offset()+",topic:"+meta.topic());
}catch (Exception e){
e.printStackTrace();
}
}
}
測試結果發(fā)現(xiàn)1.0.0版本的KafkaProducer已經修復了這一問題,異常的topic不會影響正常topic數(shù)據(jù)的發(fā)送全蝶。 具體原因分析如下
1.0.0版本服務端KafkaApis.createTopic方法
private def createTopic(topic: String,
numPartitions: Int,
replicationFactor: Int,
properties: Properties = new Properties()): MetadataResponse.TopicMetadata = {
try {
AdminUtils.createTopic(zkUtils, topic, numPartitions, replicationFactor, properties, RackAwareMode.Safe)
info("Auto creation of topic %s with %d partitions and replication factor %d is successful"
.format(topic, numPartitions, replicationFactor))
new MetadataResponse.TopicMetadata(Errors.LEADER_NOT_AVAILABLE, topic, isInternal(topic),
java.util.Collections.emptyList())
} catch {
case _: TopicExistsException => // let it go, possibly another broker created this topic
new MetadataResponse.TopicMetadata(Errors.LEADER_NOT_AVAILABLE, topic, isInternal(topic),
java.util.Collections.emptyList())
case ex: Throwable => // Catch all to prevent unhandled errors
new MetadataResponse.TopicMetadata(Errors.forException(ex), topic, isInternal(topic),
java.util.Collections.emptyList())
}
}
0.8.2.1版本服務端KafkaApis中createTopic方法
AdminUtils.createTopic(zkClient, topic, config.offsetsTopicPartitions,
offsetsTopicReplicationFactor,
offsetManager.offsetsTopicConfig)
1.0.0版本的broker對創(chuàng)建topic的異常進行了捕獲闹蒜,并且返回一個帶有異常信息、不包含分區(qū)及l(fā)eader信息的空的TopicMetadata(這種場景和topic不存在類似)抑淫。并且在客戶端獲取到該TopicMetadata時绷落,會自動忽略帶有異常信息的TopicMetadata。從而避免一個topic異常導致整個KafkaProducer不可用始苇。
0.8.2.1的kafka.javaapi.producer.Producer分析
未升級前我們使用的是kafka.javaapi.producer.Producer砌烁,并且在測試環(huán)境和線上使用時均未出現(xiàn)問題,我們可以分析下改Producer的實現(xiàn)催式。
kafka broker配置auto.create.topics.enable=true函喉。用kafka.javaapi.producer.Producer嘗試發(fā)送一個topic命名不合法的消息,客戶端拋出如下:
kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.
at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
at kafka.producer.Producer.send(Producer.scala:77)
at kafka.javaapi.producer.Producer.send(Producer.scala:33)
at com.wacai.oldkafka.proxy.ScalaProducerTest.sendMessage(ScalaProducerTest.java:40)
at com.wacai.oldkafka.proxy.ScalaProducerTest.main(ScalaProducerTest.java:31)
kafka服務端拋出如下異常
kafka.common.InvalidTopicException: topic name fincx.repayment.status (wpk-reactor-web) is illegal, contains a character other than ASCII alphanumerics, '.', '_' and '-'
at kafka.common.Topic$.validate(Topic.scala:42)
at kafka.admin.AdminUtils$.createOrUpdateTopicPartitionAssignmentPathInZK(AdminUtils.scala:181)
at kafka.admin.AdminUtils$.createTopic(AdminUtils.scala:172)
at kafka.server.KafkaApis$$anonfun$19.apply(KafkaApis.scala:520)
at kafka.server.KafkaApis$$anonfun$19.apply(KafkaApis.scala:503)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:194)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:194)
at scala.collection.immutable.Set$Set1.foreach(Set.scala:86)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:194)
at scala.collection.immutable.Set$Set1.scala$collection$SetLike$$super$map(Set.scala:73)
at scala.collection.SetLike$class.map(SetLike.scala:93)
at scala.collection.immutable.Set$Set1.map(Set.scala:73)
at kafka.server.KafkaApis.getTopicMetadata(KafkaApis.scala:503)
at kafka.server.KafkaApis.handleTopicMetadataRequest(KafkaApis.scala:542)
at kafka.server.KafkaApis.handle(KafkaApis.scala:62)
at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:59)
at java.lang.Thread.run(Thread.java:748)
分析發(fā)送TopicMetadata的調用棧
kafka在發(fā)消息前需要先獲取該消息的TopicMetadata信息荣月,這些信息包含了分區(qū)數(shù)管呵,各個分區(qū)的leader節(jié)點等信息。
"main@1" prio=5 tid=0x1 nid=NA runnable
java.lang.Thread.State: RUNNABLE
at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
at kafka.producer.async.DefaultEventHandler$$anonfun$handle$1.apply$mcV$sp(DefaultEventHandler.scala:67)
at kafka.utils.Utils$.swallow(Utils.scala:172)
at kafka.utils.Logging$class.swallowError(Logging.scala:106)
at kafka.utils.Utils$.swallowError(Utils.scala:45)
at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:67)
at kafka.producer.Producer.send(Producer.scala:77)
- locked <0x68d> (a java.lang.Object)
at kafka.javaapi.producer.Producer.send(Producer.scala:33)
at com.wacai.oldkafka.proxy.ScalaProducerTest.sendMessage(ScalaProducerTest.java:41)
at com.wacai.oldkafka.proxy.ScalaProducerTest.main(ScalaProducerTest.java:32)
分析關鍵代碼BrokerPartitionInfo.updateInfo哺窄。此處從broker拉取topic相關的metadata信息捐下,
def updateInfo(topics: Set[String], correlationId: Int) {
var topicsMetadata: Seq[TopicMetadata] = Nil
val topicMetadataResponse = ClientUtils.fetchTopicMetadata(topics, brokers, producerConfig, correlationId)
topicsMetadata = topicMetadataResponse.topicsMetadata
// throw partition specific exception
topicsMetadata.foreach(tmd =>{
trace("Metadata for topic %s is %s".format(tmd.topic, tmd))
if(tmd.errorCode == ErrorMapping.NoError) {
topicPartitionInfo.put(tmd.topic, tmd)
} else
warn("Error while fetching metadata [%s] for topic [%s]: %s ".format(tmd, tmd.topic, ErrorMapping.exceptionFor(tmd.errorCode).getClass))
tmd.partitionsMetadata.foreach(pmd =>{
if (pmd.errorCode != ErrorMapping.NoError && pmd.errorCode == ErrorMapping.LeaderNotAvailableCode) {
warn("Error while fetching metadata %s for topic partition [%s,%d]: [%s]".format(pmd, tmd.topic, pmd.partitionId,
ErrorMapping.exceptionFor(pmd.errorCode).getClass))
} // any other error code (e.g. ReplicaNotAvailable) can be ignored since the producer does not need to access the replica and isr metadata
})
})
producerPool.updateProducer(topicsMetadata)
}
當topic不合法時候,topicsMetadata返回包含錯誤信息,并且partitionsMetadata為空集萌业。
TopicMetadata for topic fincx.repayment.status (wpk-reactor-web) ->
No partition metadata for topic fincx.repayment.status (wpk-reactor-web) due to kafka.common.InvalidTopicException
跟蹤調用棧蔑担,客戶端發(fā)送消息時,對拉取TopicMetaData失敗的異常均有處理咽白,并且最大重試次數(shù)受message.send.max.retries=3參數(shù)控制。所以kafka.javaapi.producer.Producer遇到此種異常會快速失敗鸟缕,并且TopicMetaData不是全量更新晶框。設計上也是合理的排抬,不存在改BUG。
相關資料
apache官方bug說明 https://issues.apache.org/jira/browse/KAFKA-1884
[Kafka-dev] [jira] [Commented] (KAFKA-1884) New Producer blocks forever for Invalid topic names
https://webmail.dev411.com/p/kafka/dev/15260xjzjg/jira-commented-kafka-1884-new-producer-blocks-forever-for-invalid-topic-names
修改kafka源碼,編譯kafka 0.8.2.2,解決bug kafka.common.InvalidTopicException https://blog.csdn.net/u010670689/article/details/78393214