有的時候需要檢出Kafka中某個topic的所有partition的offset range. 比如Spark Streaming在指定fromOffset時杠人,如果不校驗邊界耿焊,可能會出錯。Kafka提供了命令來check泻云。這里提供一個基于Java API的方式
代碼如下
KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(config);
consumer.subscribe(topics);
ConsumerRecords<String, byte[]> records = consumer.poll(1000);
return records.partitions().parallelStream().map(topicPartition -> {
consumer.seekToBeginning(Collections.singletonList(topicPartition));
long offset = consumer.position(topicPartition);
return new TopicPartitionInfo(topicPartition.topic(), topicPartition.partition(), offset);
}).collect(Collectors.toList());
完整代碼:See Here
依賴
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>1.0.0</version>
</dependency>
2018.01.09更新
上述代碼中固额,如果poll(1000)
獲得的records
并沒有包含所有的partition的record磕谅,records.partitions()
所獲取的并非為全部的該topic的partition私爷。
即records.partitions()
只會返回這段records中所含有的partition雾棺。
因此膊夹,你可能需要
Map<TopicPartition, Long> fromOffsets = new HashMap<>();
//do fill your fromOffsets with your own local offset-store here
KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(kafkaParams);
consumer.subscribe(topics);
consumer.poll(100);
for (TopicPartition topicPartition : fromOffsets.keySet()) {
consumer.seekToBeginning(Collections.singletonList(topicPartition));
long offset = consumer.position(topicPartition);
long consumedOffset = fromOffsets.getOrDefault(topicPartition, 0L);
if (offset > consumedOffset) {
log.warn("At partition {}, our system has consumed to {} but we can start only from {} because of retention expiration.", topicPartition.partition(), consumedOffset, offset);
log.warn("At partition {}, start offset has been adjusted to {}", topicPartition.partition(), offset);
fromOffsets.put(topicPartition, offset);
}
}
consumer.unsubscribe();