如何用 influxDB + grafana 監(jiān)控 kafka

介紹

生產(chǎn)環(huán)境中的Kafka,需要從CPU泣矛, Memory疲眷, 磁盤,Kafka自身的Metrics等多方面進行監(jiān)控乳蓄。這樣才能在出現(xiàn)問題的時候,做到精確定位夕膀,及時響應(yīng)虚倒。

在眾多指標(biāo)中有一個很重要的指標(biāo)就是Lag,它表示的是消費者消費的滯后程度产舞。如果生產(chǎn)者向kafka的某個topic分區(qū)中寫入了1w條消息魂奥,消費者當(dāng)前消費的offset的是9000, 則Lag就是1000易猫。如果Lag很大耻煤,則表明消費者無法及時消費kafka topic中消息。此時需要定位具體原因准颓,及時處理哈蝇,避免更大的損失。

kafka常用的監(jiān)控方法有三種

  1. 使用kafka命令行工具kafka-consumer-groups
  2. 使用kafka Java Consumer API編程
  3. 使用Kafka自帶的JMX監(jiān)控指標(biāo)

本文主要介紹的是第二種方式攘已。利用kafka java consumer API獲取Lag信息炮赦,然后寫入時序數(shù)據(jù)庫InfluxDB,最后用grafana進行展示样勃。

數(shù)據(jù)流圖

data-flow.png

kafka monitor program

首先定義AdminClient, KafkaConsumer, InfluxDB三個bean吠勘。


  @Bean
  public AdminClient adminClient() {
    Properties properties = new Properties();
    properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    return KafkaAdminClient.create(properties);
  }

  @Bean
  public KafkaConsumer kafkaConsumer() {
    Properties properties = new Properties();
    properties.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, kafkaMonitorConsumerGroup);
    properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
    properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
    properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
    properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");

    return new KafkaConsumer(properties);
  }

  @Bean
  public InfluxDB influxDB() {
    InfluxDB influxDB = InfluxDBFactory.connect(influxdbUrl, influxdbUsername, influxdbPassword);
    influxDB.ping();
    return influxDB;
  }
  

然后獲取集群所有ConsumerGroup信息性芬,再通過ConsumerGroup ID獲取分區(qū)和offset信息。

  public List<OffsetEntity> getOffsetEntityFromCluster() {
    ListConsumerGroupsResult listConsumerGroupsResult = adminClient.listConsumerGroups();
    KafkaFuture<Collection<ConsumerGroupListing>> consumerGroupsFuture = listConsumerGroupsResult.all();
    Collection<ConsumerGroupListing> consumerGroupListingCollection = null;

    try {
      consumerGroupListingCollection = consumerGroupsFuture.get();
    } catch (InterruptedException e) {
      logger.error("get consumer groups interrupted exception: " + e.getMessage());
      System.exit(-1);
    } catch (ExecutionException e) {
      logger.error("get consumer groups execution exception: " + e.getMessage());
      System.exit(-1);
    }
    List<OffsetEntity> offsetEntityList = new ArrayList<>();
    for(ConsumerGroupListing consumerGroupListing : consumerGroupListingCollection) {
      offsetEntityList.addAll(getOffsetEntityFromGroup(consumerGroupListing.groupId()));
    }
    return offsetEntityList;
  }

  public List<OffsetEntity> getOffsetEntityFromGroup(String groupId) {
    ListConsumerGroupOffsetsResult offsetResult = adminClient.listConsumerGroupOffsets(groupId);
    KafkaFuture<Map<TopicPartition, OffsetAndMetadata>> offsetFuture = offsetResult.partitionsToOffsetAndMetadata();
    Map<TopicPartition, OffsetAndMetadata> offsetMap = null;
    try {
      offsetMap = offsetFuture.get();
    } catch (InterruptedException e) {
      logger.error("get offset interrupted exception: " + e.getMessage());
      System.exit(-1);
    } catch (ExecutionException e) {
      logger.error("get offset execution exception: " + e.getMessage());
      System.exit(-1);
    }

    Map<TopicPartition, Long> topicPartitionOffsetMap = kafkaConsumer.endOffsets(offsetMap.keySet());

    List<OffsetEntity> offsetEntityList = new ArrayList<>();
    for(Map.Entry<TopicPartition, OffsetAndMetadata> offsetEntry : offsetMap.entrySet()) {
      TopicPartition topicPartition = offsetEntry.getKey();
      OffsetAndMetadata offsetAndMetadata = offsetEntry.getValue();

      OffsetEntity offsetEntity = new OffsetEntity();
      offsetEntity.setRegion(region);
      offsetEntity.setGroupId(groupId);
      offsetEntity.setTopic(topicPartition.topic());
      offsetEntity.setPartition(topicPartition.partition());

      Long logEndOffset = topicPartitionOffsetMap.get(topicPartition);
      Long offset = offsetAndMetadata.offset();
      offsetEntity.setLogEndOffset(logEndOffset);
      offsetEntity.setOffset(offset);
      offsetEntity.setLag(logEndOffset - offset);
      offsetEntityList.add(offsetEntity);
    }
    return offsetEntityList;
  }

最終grafana中展示效果

grafana的配置可以通過https://github.com/samrui/kafkawarden/blob/master/grafana/dashboard.json 文件導(dǎo)入

grafana.png
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末剧防,一起剝皮案震驚了整個濱河市植锉,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌峭拘,老刑警劉巖俊庇,帶你破解...
    沈念sama閱讀 216,402評論 6 499
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異棚唆,居然都是意外死亡暇赤,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,377評論 3 392
  • 文/潘曉璐 我一進店門宵凌,熙熙樓的掌柜王于貴愁眉苦臉地迎上來鞋囊,“玉大人,你說我怎么就攤上這事瞎惫×锔” “怎么了?”我有些...
    開封第一講書人閱讀 162,483評論 0 353
  • 文/不壞的土叔 我叫張陵瓜喇,是天一觀的道長挺益。 經(jīng)常有香客問我,道長乘寒,這世上最難降的妖魔是什么望众? 我笑而不...
    開封第一講書人閱讀 58,165評論 1 292
  • 正文 為了忘掉前任,我火速辦了婚禮伞辛,結(jié)果婚禮上烂翰,老公的妹妹穿的比我還像新娘。我一直安慰自己蚤氏,他們只是感情好甘耿,可當(dāng)我...
    茶點故事閱讀 67,176評論 6 388
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著竿滨,像睡著了一般佳恬。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上于游,一...
    開封第一講書人閱讀 51,146評論 1 297
  • 那天毁葱,我揣著相機與錄音,去河邊找鬼贰剥。 笑死头谜,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的鸠澈。 我是一名探鬼主播柱告,決...
    沈念sama閱讀 40,032評論 3 417
  • 文/蒼蘭香墨 我猛地睜開眼截驮,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了际度?” 一聲冷哼從身側(cè)響起葵袭,我...
    開封第一講書人閱讀 38,896評論 0 274
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎乖菱,沒想到半個月后坡锡,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,311評論 1 310
  • 正文 獨居荒郊野嶺守林人離奇死亡窒所,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,536評論 2 332
  • 正文 我和宋清朗相戀三年鹉勒,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片吵取。...
    茶點故事閱讀 39,696評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡禽额,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出皮官,到底是詐尸還是另有隱情脯倒,我是刑警寧澤,帶...
    沈念sama閱讀 35,413評論 5 343
  • 正文 年R本政府宣布捺氢,位于F島的核電站藻丢,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏摄乒。R本人自食惡果不足惜悠反,卻給世界環(huán)境...
    茶點故事閱讀 41,008評論 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望馍佑。 院中可真熱鬧斋否,春花似錦、人聲如沸挤茄。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,659評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽穷劈。三九已至,卻和暖如春踊沸,著一層夾襖步出監(jiān)牢的瞬間歇终,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,815評論 1 269
  • 我被黑心中介騙來泰國打工逼龟, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留评凝,地道東北人。 一個月前我還...
    沈念sama閱讀 47,698評論 2 368
  • 正文 我出身青樓腺律,卻偏偏與公主長得像奕短,于是被迫代替她去往敵國和親宜肉。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,592評論 2 353

推薦閱讀更多精彩內(nèi)容