0. 版本信息
引入版本:0.10.0.0
Kafka Improvement Proposals:KIP-42
1.引入動(dòng)機(jī)
Kafka在 KIP-42中詳細(xì)描述了引入攔截器的動(dòng)機(jī)和對(duì)kafka API做的修改沮翔。
現(xiàn)在愉舔,Kafka指標(biāo)的采集都僅包括客戶端或broker,這使得用戶跟蹤消息在集群內(nèi)的傳遞路徑咸灿,構(gòu)建系統(tǒng)端到端的性能和行為畫像變的困難。從技術(shù)上講,通過(guò)修改應(yīng)用以收集或跟蹤額外的信息來(lái)測(cè)量系統(tǒng)端到端的性能是可行的,但對(duì)于關(guān)鍵的基礎(chǔ)設(shè)施應(yīng)用來(lái)說(shuō)商佑,這種方案并不一定是切實(shí)可行的。在生產(chǎn)環(huán)境中厢塘,能夠快速部署工具來(lái)觀察茶没,測(cè)量和監(jiān)控Kafka客戶端行為(粒度直至消息級(jí)別),是非常有用的晚碾。同時(shí)抓半,不同應(yīng)用的度量指標(biāo)需要的上下文元數(shù)據(jù)各異。無(wú)需重新編寫代碼或重新編譯即可實(shí)現(xiàn)監(jiān)控客戶端的能力十分重要(在某些場(chǎng)景下格嘁,這種能力有助于連接到正在運(yùn)行的應(yīng)用程序)笛求。
為了實(shí)現(xiàn)這個(gè)功能,kafka 更加傾向于增加生產(chǎn)者和消費(fèi)者攔截器糕簿,攔截器可以在生產(chǎn)者和消費(fèi)者處理消息的不同階段攔截消息探入。在Apache Flume 攔截器接口的啟發(fā)下,kafka開(kāi)發(fā)了現(xiàn)在的機(jī)制懂诗。雖然蜂嗽,有很多功能都可以使用攔截器實(shí)現(xiàn)(例如,異常檢測(cè)殃恒,數(shù)據(jù)加密植旧,字段過(guò)濾等),但是每個(gè)功能都需要仔細(xì)的評(píng)估是否應(yīng)該使用攔截器還是其他機(jī)制來(lái)完成离唐。當(dāng)這些場(chǎng)景有明確的使用動(dòng)機(jī)時(shí)隆嗅,提供明確的API是一種良好的實(shí)踐。因此侯繁,kafka提供了最小化的生產(chǎn)者和消費(fèi)者攔截器接口,旨在僅支持測(cè)量和監(jiān)控泡躯。
盡管增加更多的指標(biāo)或改進(jìn)kafka的監(jiān)控是可能的贮竟,但是基于以下原因我們認(rèn)為提供靈活的丽焊,用戶可定制的接口更加有益:
構(gòu)建通用監(jiān)控工具。在一家大公司咕别,不同的團(tuán)隊(duì)合作構(gòu)建系統(tǒng)技健。通常來(lái)說(shuō),隨著時(shí)間的推移惰拱,不同的團(tuán)隊(duì)開(kāi)發(fā)部署不同的組件雌贱。此外,組織對(duì)于通用的指標(biāo)偿短、數(shù)據(jù)格式和數(shù)據(jù)系統(tǒng)希望實(shí)現(xiàn)標(biāo)準(zhǔn)化欣孤。對(duì)于一個(gè)組織,我們認(rèn)為開(kāi)發(fā)部署通用的Kafka客戶端監(jiān)控工具并在所有使用Kafka的應(yīng)用中部署該工具是非常有價(jià)值的昔逗。
高昂的監(jiān)控代價(jià)降传。向kafka添加其他指標(biāo)可能會(huì)影響kafka的性能。不幸的是勾怒,有時(shí)候需要在系統(tǒng)性能和數(shù)據(jù)收集之間進(jìn)行權(quán)衡婆排。舉個(gè)例子,考慮檢測(cè)消息大小的場(chǎng)景笔链。代價(jià)最低段只,最簡(jiǎn)單,最直接的方法是計(jì)算消息的平均大小鉴扫。計(jì)算分布式系統(tǒng)中的百分比要比計(jì)算簡(jiǎn)單的平均值代價(jià)更高赞枕,更復(fù)雜,但是在很多應(yīng)用中這是非常有用的幔妨。我們希望能夠讓客戶使用不同的算法收集指標(biāo)數(shù)據(jù)鹦赎,或者不收集。
應(yīng)用對(duì)指標(biāo)的要求不同误堡。例如古话,一個(gè)用戶可能認(rèn)為監(jiān)控kafka中不同key的消息數(shù)非常的重要。在kafka內(nèi)部提供所有的指標(biāo)是不切實(shí)際的锁施。插件化的攔截系統(tǒng)為指標(biāo)的定制化提供了簡(jiǎn)單可行的能力陪踩。
在一個(gè)組織中kafka通常是大型基礎(chǔ)設(shè)施的一部分,在基礎(chǔ)設(shè)施中實(shí)現(xiàn)端到端的跟蹤是非常有用的悉抵。攔截器提供了在相同基礎(chǔ)設(shè)施中跟蹤kafka客戶端的能力肩狂。
為了支持?jǐn)r截器功能,Kafka在0.10.0.0版本增加了兩個(gè)全新的接口:ProducerInterceptor和ConsumerInterceptor并支持實(shí)現(xiàn)和配置攔截器鏈姥饰。攔截器API允許修改消息以支持給消息增加額外元數(shù)據(jù)實(shí)現(xiàn)端到端跟蹤的能力傻谁。
2. API 介紹
生產(chǎn)者攔截器 ProducerInterceptor
public interface ProducerInterceptor<K, V> extends Configurable {
/**
* 鍵值序列化和分配分區(qū)之前, KafkaProducer.send(ProducerRecord) and
* KafkaProducer.send(ProducerRecord, Callback)方法調(diào)用該方法列粪。
*/
public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record);
?
/**
* 該方法在消息發(fā)送到服務(wù)端而且得到服務(wù)端確認(rèn)時(shí)或者在發(fā)送之前失敗時(shí)被調(diào)用审磁。
*/
public void onAcknowledgement(RecordMetadata metadata, Exception exception);
?
/**
* 該方法在關(guān)閉攔截器時(shí)調(diào)用谈飒。
*/
public void close();
}
消費(fèi)者攔截器 ConsumerInterceptor
public interface ConsumerInterceptor<K, V> extends Configurable {
/**
* 在KafkaConsumer.poll(java.time.Duration)返回消息之前
*/
ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> var1);
?
/**
* 當(dāng)offset提交后調(diào)用該方法。
*/
void onCommit(Map<TopicPartition, OffsetAndMetadata> var1);
/**
* 該方法在關(guān)閉攔截器時(shí)調(diào)用态蒂。
*/
void close();
}
3 .動(dòng)手實(shí)踐
下面以實(shí)現(xiàn)一個(gè)簡(jiǎn)單的kafka指標(biāo)采集小功能為例杭措,進(jìn)一步了解kafka攔截器的功能和使用方法。采集指標(biāo)包括:
生產(chǎn)和消費(fèi)消費(fèi)的線程名
生產(chǎn)者生產(chǎn)消息成功失敗次數(shù)統(tǒng)計(jì)
3.1 修改消息钾恢,增加處理線程名
在生產(chǎn)端手素,實(shí)現(xiàn)ProducerInterceptor接口并覆寫onSend方法,修改ProducerRecord瘩蚪,在Heads中增加生產(chǎn)者線程名:
public class TraceProducerInterceptor implements ProducerInterceptor<String, String> {
@Override
public ProducerRecord<String,String> onSend(ProducerRecord<String,String > record) {
Header producerThread = new RecordHeader("producerThread",Thread.currentThread().getName().getBytes());
record.headers().add(producerThread);
return new ProducerRecord<>(record.topic(),record.partition(),record.timestamp(),record.key(),record.value(),record.headers());
}
}
在消費(fèi)端泉懦,實(shí)現(xiàn)ConsumerInterceptor接口并覆寫onConsume方法,修改ConsumerRecord募舟,在Heads中增加消費(fèi)者線程名:
public class TraceConsumerInterceptor implements ConsumerInterceptor<String, String> {
@Override
public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {
byte[] currentThreadName = Thread.currentThread().getName().getBytes();
Header header = new RecordHeader("consumer Thread", currentThreadName);
records.forEach(record -> record.headers().add(header));
return records;
}
}
3.2 實(shí)現(xiàn)生產(chǎn)者消息成功失敗統(tǒng)計(jì)
在生產(chǎn)端祠斧,實(shí)現(xiàn)ProducerInterceptor接口并覆寫onAcknowledgement方法,對(duì)發(fā)送成功和失敗的消息進(jìn)行統(tǒng)計(jì)拱礁,并在攔截器關(guān)閉時(shí)將數(shù)據(jù)打印到控制臺(tái):
public class TraceProducerInterceptor implements ProducerInterceptor<String, String> {
private AtomicLong successCounts = new AtomicLong(0);
private AtomicLong failedCounts = new AtomicLong(0);
?
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
if (null == exception) {
successCounts.getAndIncrement();
} else {
failedCounts.getAndIncrement();
}
}
?
@Override
public void close() {
System.out.println("success counts " + successCounts.get());
System.out.println("failed counts " + failedCounts);
}
}
3.3 . 攔截器配置:
生產(chǎn)者和消費(fèi)者可以通過(guò)interceptor.classes
屬性配置攔截器琢锋,屬性的值為一個(gè)字符串集合,集合中的元素為攔截器類的全路徑名(包括包名)呢灶。
生產(chǎn)者只包含攔截器的配置如下:
Properties props = new Properties();
List<String> interceptors = new ArrayList<>();
interceptors.add("io.github.ctlove0523.stackoverflow.kafka.TraceConsumerInterceptor");
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);
消費(fèi)者只包含攔截器的配置如下:
Properties props = new Properties();
List<String> interceptors = new ArrayList<>();
interceptors.add("io.github.ctlove0523.stackoverflow.kafka.TraceConsumerInterceptor");
props.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);
3.4. 測(cè)試
生產(chǎn)者使用三個(gè)線程吴超,每個(gè)線程發(fā)送一個(gè)消息到kafka,在主線程啟動(dòng)消費(fèi)者消費(fèi)kafka的消息鸯乃,收到的每條消息打印消息的Heads信息鲸阻。
- 生產(chǎn)者
創(chuàng)建發(fā)送消息的線程池:
private static ExecutorService executor = Executors.newFixedThreadPool(3);
為了避免主線程退出導(dǎo)致發(fā)送消息失敗,在添加任務(wù)時(shí)缨睡,將返回的Future對(duì)象保存到隊(duì)列中鸟悴,然后逐個(gè)檢查任務(wù)是否完成,詳細(xì)的代碼如下:
// 生產(chǎn)者配置信息
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("delivery.timeout.ms", 300000);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 配置攔截器
List<String> interceptors = new ArrayList<>();
interceptors.add("io.github.ctlove0523.stackoverflow.kafka.TraceProducerInterceptor");
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);
// 創(chuàng)建生產(chǎn)者并發(fā)送消息
Producer<String, String> producer = new KafkaProducer<>(props);
List<Future> futures = new ArrayList<>(3);
for (int i = 0; i < 3; i++) {
futures.add(executor.submit(() -> {
producer.send(new ProducerRecord<>("TEST", "hello world "));
}));
}
// 檢查任務(wù)是否完成
futures.forEach(future -> {
try {
future.get();
} catch (Exception e) {
System.out.println(e.getMessage());
}
});
// 關(guān)閉生產(chǎn)者
producer.close();
代碼的輸出結(jié)果如下:
success counts 3
failed counts 0
- 消費(fèi)者
消費(fèi)者拉取消息奖年,打印收到消息的Heads信息以驗(yàn)證攔截器是否生效细诸。
// 消費(fèi)者配置信息
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "chentong");
props.put("enable.auto.commit", "false");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// 配置攔截器
List<String> interceptors = new ArrayList<>();
interceptors.add("io.github.ctlove0523.stackoverflow.kafka.TraceConsumerInterceptor");
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);
?
// 創(chuàng)建消費(fèi)者對(duì)象并拉取消息
Consumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.assign(Arrays.asList(new TopicPartition("TEST", 0)));
consumer.seek(new TopicPartition("TEST", 0), 0L);
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(5));
records.forEach(record -> {
record.headers().headers("producer thread")
.forEach(header -> System.out.print("producer thread = " + new String(header.value())));
record.headers().headers("consumer thread")
.forEach(header -> System.out.println("\t consumer thread = " + new String(header.value())));
});
}
代碼輸出結(jié)果如下:
producer thread = pool-1-thread-2 consumer thread = main
producer thread = pool-1-thread-1 consumer thread = main
producer thread = pool-1-thread-3 consumer thread = main
4. 總結(jié)
本文首先介紹了kafka攔截器引入的動(dòng)機(jī),主要為了解決當(dāng)前kafka指標(biāo)采集和監(jiān)控的痛點(diǎn)問(wèn)題陋守;接著簡(jiǎn)單介紹了ProducerInterceptor和ConsumerInterceptor兩個(gè)接口震贵,最后以一個(gè)實(shí)際修改kafka消息Heads的例子進(jìn)一步闡述了如何使用kafka提供的攔截器功能。文中所有樣例的代碼可以從GitHub下載水评。