0.8.2.1版本KafkaProducer消息發(fā)送超時原因分析

0.8.2.1版本KafkaProducer消息發(fā)送超時原因分析

問題描述

我們在測試環(huán)境收到用戶反饋發(fā)送kafka消息出現(xiàn)ReadTimeout異常。對此我們進行了簡單的分析
用戶的請求日志

"request-start@2018-08-22 10:30:45.399" "192.168.5.116 (172.16.74.67) -> 172.16.52.72:80"            "POST //kafka/publish HTTP/1.1" 603 process-req-time:60003 "response-end@2018-08-22 10:31:45.402" status:200 97            http-nio-8080-exec-9 "-" "Java/1.8.0_144" traceId:- topic:payment.abc.msg connection-status:???X???

我們發(fā)現(xiàn)多條請求超時日志,出現(xiàn)大約60s左右的耗時猾担,檢查這些topic的配置都是正常的(topic存在,leader節(jié)點存在) .
繼續(xù)查看kafka服務端日志,有部分ERROR日志识埋,大致意思是有不合法的topic請求存在

[2018-08-22 11:00:00,174] ERROR [KafkaApi-2] error when handling request Name: TopicMetadataRequest; Version: 0; CorrelationId: 6226810; ClientId: producer-30; Topics: fincx.xxx.status (wpk-reactor-web),payment.abc.msg (kafka.server.KafkaApis)
kafka.common.InvalidTopicException: topic name fincx.repayment.status (wpk-reactor-web) is illegal, contains a character other than ASCII alphanumerics, '.', '_' and '-'
        at kafka.common.Topic$.validate(Topic.scala:42)
        at kafka.admin.AdminUtils$.createOrUpdateTopicPartitionAssignmentPathInZK(AdminUtils.scala:181)
        at kafka.admin.AdminUtils$.createTopic(AdminUtils.scala:172)
        at kafka.server.KafkaApis$$anonfun$19.apply(KafkaApis.scala:520)
        at kafka.server.KafkaApis$$anonfun$19.apply(KafkaApis.scala:503)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
        at scala.collection.immutable.Set$Set1.foreach(Set.scala:79)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:245)
        at scala.collection.AbstractSet.scala$collection$SetLike$$super$map(Set.scala:47)
        at scala.collection.SetLike$class.map(SetLike.scala:92)
        at scala.collection.AbstractSet.map(Set.scala:47)
        at kafka.server.KafkaApis.getTopicMetadata(KafkaApis.scala:503)
        at kafka.server.KafkaApis.handleTopicMetadataRequest(KafkaApis.scala:542)
        at kafka.server.KafkaApis.handle(KafkaApis.scala:62)
        at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:59)
        at java.lang.Thread.run(Thread.java:748)

問題分析

這個問題困惑的地方在于發(fā)送一個配置正常的topic竟然發(fā)送超時免绿,日志中含有其他topic的異常錯誤信息唧席,不同topic之間的消息發(fā)送竟然相互干擾?
為什么會因為有一個異常topic存在,導致其他原本正常的topic都無法發(fā)送消息淌哟,這是個比較嚴重的問題了迹卢。因為KafkaProducer是線程安全,一個應用中往往使用一個KafkaProducer發(fā)送多個topic消息徒仓。

重現(xiàn)問題

重啟服務腐碱,新初始化的KafkaProducer發(fā)送正常topic的消息發(fā)送成功,沒有超時掉弛,嘗試發(fā)送topic命名不合法的topic症见,發(fā)送超時,kafka拋出異常kafka.common.InvalidTopicException殃饿。此后再發(fā)送正常topic的消息均發(fā)送失敗谋作。
證明KafkaProducer會因為發(fā)送一個異常topic消息導致完全不可用。

代碼分析

0.8.2.1的KafkaProducer源碼分析

根據(jù)異常日志我們比較容易分析出異常原因是KafkaProducer發(fā)送消息前需要拉取broker端的TopicMetadata乎芳;如果發(fā)現(xiàn)topic不存在遵蚜,會調用AdminUtils.createTopic()方法創(chuàng)建,此時topic命名不規(guī)范秒咐,拋出異常谬晕。接下來我們看下KafkaProducer是如何處理這一異常的。

分析KafkaProducer.waitOnMetadata方法:

private void waitOnMetadata(String topic, long maxWaitMs) {
        if (metadata.fetch().partitionsForTopic(topic) != null) {
            return;
        } else {
            long begin = time.milliseconds();
            long remainingWaitMs = maxWaitMs;
            while (metadata.fetch().partitionsForTopic(topic) == null) {
                log.trace("Requesting metadata update for topic {}.", topic);
                int version = metadata.requestUpdate();
                metadata.add(topic);
                sender.wakeup();
                metadata.awaitUpdate(version, remainingWaitMs);
                long elapsed = time.milliseconds() - begin;
                if (elapsed >= maxWaitMs)
                    throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");
                remainingWaitMs = maxWaitMs - elapsed;
            }
        }
    }

上述方法maxWaitMs參數(shù)取自生產者配置參數(shù)metadata.fetch.timeout.ms携取,官方默認值為60000ms攒钳,和前面我們超時日志請求時間60s是完全對的上的。
第一步:metadata.fetch().partitionsForTopic(topic)檢查topic的meta信息是否已經有緩存雷滋,有的化就可以直接用了
第二步:獲取最新TopicMetadata數(shù)據(jù)不撑,此處while循環(huán),強制在metadata.fetch.timeout.ms時間內完成TopicMetadata的更新操作晤斩,metadata.requestUpdate設置當前TopicMetadata需要強制更新

調用NetworkClient.metadataRequest發(fā)送消息

private ClientRequest metadataRequest(long now, int node, Set<String> topics) {
        MetadataRequest metadata = new MetadataRequest(new ArrayList<String>(topics));
        RequestSend send = new RequestSend(node, nextRequestHeader(ApiKeys.METADATA), metadata.toStruct());
        return new ClientRequest(now, true, send, null);
    }

NetworkClient.metadataRequest的調用方NetworkClient.maybeUpdateMetadata方法片段

Set<String> topics = metadata.topics();
this.metadataFetchInProgress = true;
ClientRequest metadataRequest = metadataRequest(now, node.id(), topics);

我們看到MetadataRequest的請求是包含了整個KafkaProducer緩存的所有topic列表數(shù)據(jù)焕檬,并不是只對當前發(fā)送的topic進行元數(shù)據(jù)更新。
至此我們大致了解了KafkaProducer發(fā)送超時的前因后果:因為KafkaProducer緩存的topic列表中有一個非法的topic澳泵,導致每次批量更新元數(shù)據(jù)時实愚,都包含了非法的topic,每次元數(shù)據(jù)更新操作都失敗兔辅。這是KafkaProducer異常處理缺陷導致腊敲。不過該BUG只有在auto.create.topics.enable=true才觸發(fā),如果為false维苔,就不會創(chuàng)建topic碰辅,不會拋出異常。這是一個kafka官方BUG KAFKA-1884

關于auto.create.topics.enable參數(shù)

該參數(shù)默認為true介时,老proxy代理的kafka集群該參數(shù)配置為false没宾,禁止用戶自動創(chuàng)建topic凌彬,官方也不建議開啟該參數(shù)。
此外循衰,auto.create.topics.enable=true時候铲敛,當啟動一個消費者,消費的topic不存在時羹蚣,也會導致topic創(chuàng)建原探,導致服務器上存在一堆垃圾topic。
萬幸的是我們線上業(yè)務集群都是禁用自動生成topic的顽素,使得這一BUG沒有影響到線上系統(tǒng)咽弦。

1.0.0的KafkaProducer是否存在同樣問題

我們新研發(fā)的消息系統(tǒng)是基于Kafka1.0.0版本的,針對KafkaProducer這一明顯設計缺陷胁出,1.0.0版本官方是否已經解決型型?
我們在本地進行了驗證

public class KafkaProducerTest {
    public static void main(String[] args) throws Exception {
        Properties props=new Properties();
        props.put("bootstrap.servers", "127.0.0.1:9080");
        props.put(org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        props.put(org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
        KafkaProducer<String, byte[]> kafkaProducer=new KafkaProducer(props);
        sendMessage(kafkaProducer,"test");
        sendMessage(kafkaProducer,"fincx.repayment.status (wpk-reactor-web)");
        sendMessage(kafkaProducer,"test");
    }

    private static void sendMessage(KafkaProducer kafkaProducer,String topic) {
        try{
            Future<RecordMetadata> f1= kafkaProducer.send(new ProducerRecord<String, byte[]>(topic, "12", "12".getBytes()));
            RecordMetadata meta=f1.get(5, TimeUnit.SECONDS);
            System.out.println(meta.offset()+",topic:"+meta.topic());
        }catch (Exception e){
            e.printStackTrace();
        }

    }
}

測試結果發(fā)現(xiàn)1.0.0版本的KafkaProducer已經修復了這一問題,異常的topic不會影響正常topic數(shù)據(jù)的發(fā)送全蝶。 具體原因分析如下
1.0.0版本服務端KafkaApis.createTopic方法

private def createTopic(topic: String,
                          numPartitions: Int,
                          replicationFactor: Int,
                          properties: Properties = new Properties()): MetadataResponse.TopicMetadata = {
    try {
      AdminUtils.createTopic(zkUtils, topic, numPartitions, replicationFactor, properties, RackAwareMode.Safe)
      info("Auto creation of topic %s with %d partitions and replication factor %d is successful"
        .format(topic, numPartitions, replicationFactor))
      new MetadataResponse.TopicMetadata(Errors.LEADER_NOT_AVAILABLE, topic, isInternal(topic),
        java.util.Collections.emptyList())
    } catch {
      case _: TopicExistsException => // let it go, possibly another broker created this topic
        new MetadataResponse.TopicMetadata(Errors.LEADER_NOT_AVAILABLE, topic, isInternal(topic),
          java.util.Collections.emptyList())
      case ex: Throwable  => // Catch all to prevent unhandled errors
        new MetadataResponse.TopicMetadata(Errors.forException(ex), topic, isInternal(topic),
          java.util.Collections.emptyList())
    }
  }

0.8.2.1版本服務端KafkaApis中createTopic方法

AdminUtils.createTopic(zkClient, topic, config.offsetsTopicPartitions,
                                     offsetsTopicReplicationFactor,
                                     offsetManager.offsetsTopicConfig)

1.0.0版本的broker對創(chuàng)建topic的異常進行了捕獲闹蒜,并且返回一個帶有異常信息、不包含分區(qū)及l(fā)eader信息的空的TopicMetadata(這種場景和topic不存在類似)抑淫。并且在客戶端獲取到該TopicMetadata時绷落,會自動忽略帶有異常信息的TopicMetadata。從而避免一個topic異常導致整個KafkaProducer不可用始苇。

0.8.2.1的kafka.javaapi.producer.Producer分析

未升級前我們使用的是kafka.javaapi.producer.Producer砌烁,并且在測試環(huán)境和線上使用時均未出現(xiàn)問題,我們可以分析下改Producer的實現(xiàn)催式。
kafka broker配置auto.create.topics.enable=true函喉。用kafka.javaapi.producer.Producer嘗試發(fā)送一個topic命名不合法的消息,客戶端拋出如下:

kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.
    at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
    at kafka.producer.Producer.send(Producer.scala:77)
    at kafka.javaapi.producer.Producer.send(Producer.scala:33)
    at com.wacai.oldkafka.proxy.ScalaProducerTest.sendMessage(ScalaProducerTest.java:40)
    at com.wacai.oldkafka.proxy.ScalaProducerTest.main(ScalaProducerTest.java:31)

kafka服務端拋出如下異常

kafka.common.InvalidTopicException: topic name fincx.repayment.status (wpk-reactor-web) is illegal, contains a character other than ASCII alphanumerics, '.', '_' and '-'
    at kafka.common.Topic$.validate(Topic.scala:42)
    at kafka.admin.AdminUtils$.createOrUpdateTopicPartitionAssignmentPathInZK(AdminUtils.scala:181)
    at kafka.admin.AdminUtils$.createTopic(AdminUtils.scala:172)
    at kafka.server.KafkaApis$$anonfun$19.apply(KafkaApis.scala:520)
    at kafka.server.KafkaApis$$anonfun$19.apply(KafkaApis.scala:503)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:194)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:194)
    at scala.collection.immutable.Set$Set1.foreach(Set.scala:86)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:194)
    at scala.collection.immutable.Set$Set1.scala$collection$SetLike$$super$map(Set.scala:73)
    at scala.collection.SetLike$class.map(SetLike.scala:93)
    at scala.collection.immutable.Set$Set1.map(Set.scala:73)
    at kafka.server.KafkaApis.getTopicMetadata(KafkaApis.scala:503)
    at kafka.server.KafkaApis.handleTopicMetadataRequest(KafkaApis.scala:542)
    at kafka.server.KafkaApis.handle(KafkaApis.scala:62)
    at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:59)
    at java.lang.Thread.run(Thread.java:748)

分析發(fā)送TopicMetadata的調用棧

kafka在發(fā)消息前需要先獲取該消息的TopicMetadata信息荣月,這些信息包含了分區(qū)數(shù)管呵,各個分區(qū)的leader節(jié)點等信息。

"main@1" prio=5 tid=0x1 nid=NA runnable
  java.lang.Thread.State: RUNNABLE
      at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
      at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
      at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
      at kafka.producer.async.DefaultEventHandler$$anonfun$handle$1.apply$mcV$sp(DefaultEventHandler.scala:67)
      at kafka.utils.Utils$.swallow(Utils.scala:172)
      at kafka.utils.Logging$class.swallowError(Logging.scala:106)
      at kafka.utils.Utils$.swallowError(Utils.scala:45)
      at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:67)
      at kafka.producer.Producer.send(Producer.scala:77)
      - locked <0x68d> (a java.lang.Object)
      at kafka.javaapi.producer.Producer.send(Producer.scala:33)
      at com.wacai.oldkafka.proxy.ScalaProducerTest.sendMessage(ScalaProducerTest.java:41)
      at com.wacai.oldkafka.proxy.ScalaProducerTest.main(ScalaProducerTest.java:32)

分析關鍵代碼BrokerPartitionInfo.updateInfo哺窄。此處從broker拉取topic相關的metadata信息捐下,

def updateInfo(topics: Set[String], correlationId: Int) {
    var topicsMetadata: Seq[TopicMetadata] = Nil
    val topicMetadataResponse = ClientUtils.fetchTopicMetadata(topics, brokers, producerConfig, correlationId)
    topicsMetadata = topicMetadataResponse.topicsMetadata
    // throw partition specific exception
    topicsMetadata.foreach(tmd =>{
      trace("Metadata for topic %s is %s".format(tmd.topic, tmd))
      if(tmd.errorCode == ErrorMapping.NoError) {
        topicPartitionInfo.put(tmd.topic, tmd)
      } else
        warn("Error while fetching metadata [%s] for topic [%s]: %s ".format(tmd, tmd.topic, ErrorMapping.exceptionFor(tmd.errorCode).getClass))
      tmd.partitionsMetadata.foreach(pmd =>{
        if (pmd.errorCode != ErrorMapping.NoError && pmd.errorCode == ErrorMapping.LeaderNotAvailableCode) {
          warn("Error while fetching metadata %s for topic partition [%s,%d]: [%s]".format(pmd, tmd.topic, pmd.partitionId,
            ErrorMapping.exceptionFor(pmd.errorCode).getClass))
        } // any other error code (e.g. ReplicaNotAvailable) can be ignored since the producer does not need to access the replica and isr metadata
      })
    })
    producerPool.updateProducer(topicsMetadata)
  }

當topic不合法時候,topicsMetadata返回包含錯誤信息,并且partitionsMetadata為空集萌业。

    TopicMetadata for topic fincx.repayment.status (wpk-reactor-web) -> 
No partition metadata for topic fincx.repayment.status (wpk-reactor-web) due to kafka.common.InvalidTopicException

跟蹤調用棧蔑担,客戶端發(fā)送消息時,對拉取TopicMetaData失敗的異常均有處理咽白,并且最大重試次數(shù)受message.send.max.retries=3參數(shù)控制。所以kafka.javaapi.producer.Producer遇到此種異常會快速失敗鸟缕,并且TopicMetaData不是全量更新晶框。設計上也是合理的排抬,不存在改BUG。

相關資料

apache官方bug說明 https://issues.apache.org/jira/browse/KAFKA-1884
[Kafka-dev] [jira] [Commented] (KAFKA-1884) New Producer blocks forever for Invalid topic names
https://webmail.dev411.com/p/kafka/dev/15260xjzjg/jira-commented-kafka-1884-new-producer-blocks-forever-for-invalid-topic-names
修改kafka源碼,編譯kafka 0.8.2.2,解決bug kafka.common.InvalidTopicException https://blog.csdn.net/u010670689/article/details/78393214

?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末授段,一起剝皮案震驚了整個濱河市蹲蒲,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌侵贵,老刑警劉巖届搁,帶你破解...
    沈念sama閱讀 216,544評論 6 501
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異窍育,居然都是意外死亡卡睦,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,430評論 3 392
  • 文/潘曉璐 我一進店門漱抓,熙熙樓的掌柜王于貴愁眉苦臉地迎上來表锻,“玉大人,你說我怎么就攤上這事乞娄∷惭罚” “怎么了?”我有些...
    開封第一講書人閱讀 162,764評論 0 353
  • 文/不壞的土叔 我叫張陵仪或,是天一觀的道長确镊。 經常有香客問我,道長范删,這世上最難降的妖魔是什么蕾域? 我笑而不...
    開封第一講書人閱讀 58,193評論 1 292
  • 正文 為了忘掉前任,我火速辦了婚禮瓶逃,結果婚禮上束铭,老公的妹妹穿的比我還像新娘。我一直安慰自己厢绝,他們只是感情好契沫,可當我...
    茶點故事閱讀 67,216評論 6 388
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著昔汉,像睡著了一般懈万。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上靶病,一...
    開封第一講書人閱讀 51,182評論 1 299
  • 那天会通,我揣著相機與錄音,去河邊找鬼娄周。 笑死涕侈,一個胖子當著我的面吹牛,可吹牛的內容都是我干的煤辨。 我是一名探鬼主播裳涛,決...
    沈念sama閱讀 40,063評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼木张,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了端三?” 一聲冷哼從身側響起舷礼,我...
    開封第一講書人閱讀 38,917評論 0 274
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎郊闯,沒想到半個月后妻献,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經...
    沈念sama閱讀 45,329評論 1 310
  • 正文 獨居荒郊野嶺守林人離奇死亡团赁,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 37,543評論 2 332
  • 正文 我和宋清朗相戀三年育拨,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片然痊。...
    茶點故事閱讀 39,722評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡至朗,死狀恐怖,靈堂內的尸體忽然破棺而出剧浸,到底是詐尸還是另有隱情锹引,我是刑警寧澤,帶...
    沈念sama閱讀 35,425評論 5 343
  • 正文 年R本政府宣布唆香,位于F島的核電站嫌变,受9級特大地震影響,放射性物質發(fā)生泄漏躬它。R本人自食惡果不足惜腾啥,卻給世界環(huán)境...
    茶點故事閱讀 41,019評論 3 326
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望冯吓。 院中可真熱鬧倘待,春花似錦、人聲如沸组贺。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,671評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽失尖。三九已至啊奄,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間掀潮,已是汗流浹背菇夸。 一陣腳步聲響...
    開封第一講書人閱讀 32,825評論 1 269
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留仪吧,地道東北人庄新。 一個月前我還...
    沈念sama閱讀 47,729評論 2 368
  • 正文 我出身青樓,卻偏偏與公主長得像薯鼠,于是被迫代替她去往敵國和親择诈。 傳聞我的和親對象是個殘疾皇子凡蚜,可洞房花燭夜當晚...
    茶點故事閱讀 44,614評論 2 353

推薦閱讀更多精彩內容

  • 姓名:周小蓬 16019110037 轉載自:http://blog.csdn.net/YChenFeng/art...
    aeytifiw閱讀 34,721評論 13 425
  • Spring Cloud為開發(fā)人員提供了快速構建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務發(fā)現(xiàn)吭从,斷路器,智...
    卡卡羅2017閱讀 134,652評論 18 139
  • Kafka入門經典教程-Kafka-about云開發(fā) http://www.aboutyun.com/threa...
    葡萄喃喃囈語閱讀 10,827評論 4 54
  • kafka數(shù)據(jù)可靠性深度解讀 Kafka起初是由LinkedIn公司開發(fā)的一個分布式的消息系統(tǒng)恶迈,后成為Apache...
    it_zzy閱讀 2,007評論 2 20
  • 起風了 「文」水金時 起風了 密閉的房間里也有波動 冷空氣襲殺進屋 沖進身體里取暖 今夜 該有一場大雪 匹配這恰如...
    水金時閱讀 213評論 4 5