介紹
生產(chǎn)環(huán)境中的Kafka,需要從CPU泣矛, Memory疲眷, 磁盤,Kafka自身的Metrics等多方面進行監(jiān)控乳蓄。這樣才能在出現(xiàn)問題的時候,做到精確定位夕膀,及時響應(yīng)虚倒。
在眾多指標(biāo)中有一個很重要的指標(biāo)就是Lag,它表示的是消費者消費的滯后程度产舞。如果生產(chǎn)者向kafka的某個topic分區(qū)中寫入了1w條消息魂奥,消費者當(dāng)前消費的offset的是9000, 則Lag就是1000易猫。如果Lag很大耻煤,則表明消費者無法及時消費kafka topic中消息。此時需要定位具體原因准颓,及時處理哈蝇,避免更大的損失。
kafka常用的監(jiān)控方法有三種
- 使用kafka命令行工具kafka-consumer-groups
- 使用kafka Java Consumer API編程
- 使用Kafka自帶的JMX監(jiān)控指標(biāo)
本文主要介紹的是第二種方式攘已。利用kafka java consumer API獲取Lag信息炮赦,然后寫入時序數(shù)據(jù)庫InfluxDB,最后用grafana進行展示样勃。
數(shù)據(jù)流圖
kafka monitor program
主要代碼部分
首先定義AdminClient, KafkaConsumer, InfluxDB三個bean吠勘。
@Bean
public AdminClient adminClient() {
Properties properties = new Properties();
properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
return KafkaAdminClient.create(properties);
}
@Bean
public KafkaConsumer kafkaConsumer() {
Properties properties = new Properties();
properties.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, kafkaMonitorConsumerGroup);
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
return new KafkaConsumer(properties);
}
@Bean
public InfluxDB influxDB() {
InfluxDB influxDB = InfluxDBFactory.connect(influxdbUrl, influxdbUsername, influxdbPassword);
influxDB.ping();
return influxDB;
}
然后獲取集群所有ConsumerGroup信息性芬,再通過ConsumerGroup ID獲取分區(qū)和offset信息。
public List<OffsetEntity> getOffsetEntityFromCluster() {
ListConsumerGroupsResult listConsumerGroupsResult = adminClient.listConsumerGroups();
KafkaFuture<Collection<ConsumerGroupListing>> consumerGroupsFuture = listConsumerGroupsResult.all();
Collection<ConsumerGroupListing> consumerGroupListingCollection = null;
try {
consumerGroupListingCollection = consumerGroupsFuture.get();
} catch (InterruptedException e) {
logger.error("get consumer groups interrupted exception: " + e.getMessage());
System.exit(-1);
} catch (ExecutionException e) {
logger.error("get consumer groups execution exception: " + e.getMessage());
System.exit(-1);
}
List<OffsetEntity> offsetEntityList = new ArrayList<>();
for(ConsumerGroupListing consumerGroupListing : consumerGroupListingCollection) {
offsetEntityList.addAll(getOffsetEntityFromGroup(consumerGroupListing.groupId()));
}
return offsetEntityList;
}
public List<OffsetEntity> getOffsetEntityFromGroup(String groupId) {
ListConsumerGroupOffsetsResult offsetResult = adminClient.listConsumerGroupOffsets(groupId);
KafkaFuture<Map<TopicPartition, OffsetAndMetadata>> offsetFuture = offsetResult.partitionsToOffsetAndMetadata();
Map<TopicPartition, OffsetAndMetadata> offsetMap = null;
try {
offsetMap = offsetFuture.get();
} catch (InterruptedException e) {
logger.error("get offset interrupted exception: " + e.getMessage());
System.exit(-1);
} catch (ExecutionException e) {
logger.error("get offset execution exception: " + e.getMessage());
System.exit(-1);
}
Map<TopicPartition, Long> topicPartitionOffsetMap = kafkaConsumer.endOffsets(offsetMap.keySet());
List<OffsetEntity> offsetEntityList = new ArrayList<>();
for(Map.Entry<TopicPartition, OffsetAndMetadata> offsetEntry : offsetMap.entrySet()) {
TopicPartition topicPartition = offsetEntry.getKey();
OffsetAndMetadata offsetAndMetadata = offsetEntry.getValue();
OffsetEntity offsetEntity = new OffsetEntity();
offsetEntity.setRegion(region);
offsetEntity.setGroupId(groupId);
offsetEntity.setTopic(topicPartition.topic());
offsetEntity.setPartition(topicPartition.partition());
Long logEndOffset = topicPartitionOffsetMap.get(topicPartition);
Long offset = offsetAndMetadata.offset();
offsetEntity.setLogEndOffset(logEndOffset);
offsetEntity.setOffset(offset);
offsetEntity.setLag(logEndOffset - offset);
offsetEntityList.add(offsetEntity);
}
return offsetEntityList;
}
最終grafana中展示效果
grafana的配置可以通過https://github.com/samrui/kafkawarden/blob/master/grafana/dashboard.json 文件導(dǎo)入