使用Interceptors實(shí)現(xiàn)消息端到端跟蹤

0. 版本信息

引入版本:0.10.0.0

Kafka Improvement Proposals:KIP-42

1.引入動(dòng)機(jī)

Kafka在 KIP-42中詳細(xì)描述了引入攔截器的動(dòng)機(jī)和對(duì)kafka API做的修改沮翔。


現(xiàn)在愉舔,Kafka指標(biāo)的采集都僅包括客戶端或broker,這使得用戶跟蹤消息在集群內(nèi)的傳遞路徑咸灿,構(gòu)建系統(tǒng)端到端的性能和行為畫像變的困難。從技術(shù)上講,通過(guò)修改應(yīng)用以收集或跟蹤額外的信息來(lái)測(cè)量系統(tǒng)端到端的性能是可行的,但對(duì)于關(guān)鍵的基礎(chǔ)設(shè)施應(yīng)用來(lái)說(shuō)商佑,這種方案并不一定是切實(shí)可行的。在生產(chǎn)環(huán)境中厢塘,能夠快速部署工具來(lái)觀察茶没,測(cè)量和監(jiān)控Kafka客戶端行為(粒度直至消息級(jí)別),是非常有用的晚碾。同時(shí)抓半,不同應(yīng)用的度量指標(biāo)需要的上下文元數(shù)據(jù)各異。無(wú)需重新編寫代碼或重新編譯即可實(shí)現(xiàn)監(jiān)控客戶端的能力十分重要(在某些場(chǎng)景下格嘁,這種能力有助于連接到正在運(yùn)行的應(yīng)用程序)笛求。

為了實(shí)現(xiàn)這個(gè)功能,kafka 更加傾向于增加生產(chǎn)者和消費(fèi)者攔截器糕簿,攔截器可以在生產(chǎn)者和消費(fèi)者處理消息的不同階段攔截消息探入。在Apache Flume 攔截器接口的啟發(fā)下,kafka開(kāi)發(fā)了現(xiàn)在的機(jī)制懂诗。雖然蜂嗽,有很多功能都可以使用攔截器實(shí)現(xiàn)(例如,異常檢測(cè)殃恒,數(shù)據(jù)加密植旧,字段過(guò)濾等),但是每個(gè)功能都需要仔細(xì)的評(píng)估是否應(yīng)該使用攔截器還是其他機(jī)制來(lái)完成离唐。當(dāng)這些場(chǎng)景有明確的使用動(dòng)機(jī)時(shí)隆嗅,提供明確的API是一種良好的實(shí)踐。因此侯繁,kafka提供了最小化的生產(chǎn)者和消費(fèi)者攔截器接口,旨在僅支持測(cè)量和監(jiān)控泡躯。

盡管增加更多的指標(biāo)或改進(jìn)kafka的監(jiān)控是可能的贮竟,但是基于以下原因我們認(rèn)為提供靈活的丽焊,用戶可定制的接口更加有益:

  1. 構(gòu)建通用監(jiān)控工具。在一家大公司咕别,不同的團(tuán)隊(duì)合作構(gòu)建系統(tǒng)技健。通常來(lái)說(shuō),隨著時(shí)間的推移惰拱,不同的團(tuán)隊(duì)開(kāi)發(fā)部署不同的組件雌贱。此外,組織對(duì)于通用的指標(biāo)偿短、數(shù)據(jù)格式和數(shù)據(jù)系統(tǒng)希望實(shí)現(xiàn)標(biāo)準(zhǔn)化欣孤。對(duì)于一個(gè)組織,我們認(rèn)為開(kāi)發(fā)部署通用的Kafka客戶端監(jiān)控工具并在所有使用Kafka的應(yīng)用中部署該工具是非常有價(jià)值的昔逗。

  2. 高昂的監(jiān)控代價(jià)降传。向kafka添加其他指標(biāo)可能會(huì)影響kafka的性能。不幸的是勾怒,有時(shí)候需要在系統(tǒng)性能和數(shù)據(jù)收集之間進(jìn)行權(quán)衡婆排。舉個(gè)例子,考慮檢測(cè)消息大小的場(chǎng)景笔链。代價(jià)最低段只,最簡(jiǎn)單,最直接的方法是計(jì)算消息的平均大小鉴扫。計(jì)算分布式系統(tǒng)中的百分比要比計(jì)算簡(jiǎn)單的平均值代價(jià)更高赞枕,更復(fù)雜,但是在很多應(yīng)用中這是非常有用的幔妨。我們希望能夠讓客戶使用不同的算法收集指標(biāo)數(shù)據(jù)鹦赎,或者不收集。

  3. 應(yīng)用對(duì)指標(biāo)的要求不同误堡。例如古话,一個(gè)用戶可能認(rèn)為監(jiān)控kafka中不同key的消息數(shù)非常的重要。在kafka內(nèi)部提供所有的指標(biāo)是不切實(shí)際的锁施。插件化的攔截系統(tǒng)為指標(biāo)的定制化提供了簡(jiǎn)單可行的能力陪踩。

  4. 在一個(gè)組織中kafka通常是大型基礎(chǔ)設(shè)施的一部分,在基礎(chǔ)設(shè)施中實(shí)現(xiàn)端到端的跟蹤是非常有用的悉抵。攔截器提供了在相同基礎(chǔ)設(shè)施中跟蹤kafka客戶端的能力肩狂。

為了支持?jǐn)r截器功能,Kafka在0.10.0.0版本增加了兩個(gè)全新的接口:ProducerInterceptorConsumerInterceptor并支持實(shí)現(xiàn)和配置攔截器鏈姥饰。攔截器API允許修改消息以支持給消息增加額外元數(shù)據(jù)實(shí)現(xiàn)端到端跟蹤的能力傻谁。

2. API 介紹

生產(chǎn)者攔截器 ProducerInterceptor

public interface ProducerInterceptor<K, V> extends Configurable {

 /**
 * 鍵值序列化和分配分區(qū)之前, KafkaProducer.send(ProducerRecord) and
 * KafkaProducer.send(ProducerRecord, Callback)方法調(diào)用該方法列粪。
 */
 public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record);
?
 /**
 * 該方法在消息發(fā)送到服務(wù)端而且得到服務(wù)端確認(rèn)時(shí)或者在發(fā)送之前失敗時(shí)被調(diào)用审磁。
 */
 public void onAcknowledgement(RecordMetadata metadata, Exception exception);
?
 /**
 * 該方法在關(guān)閉攔截器時(shí)調(diào)用谈飒。
 */
 public void close();
}

消費(fèi)者攔截器 ConsumerInterceptor

public interface ConsumerInterceptor<K, V> extends Configurable {

 /**
 * 在KafkaConsumer.poll(java.time.Duration)返回消息之前
 */
 ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> var1);
?
 /**
 * 當(dāng)offset提交后調(diào)用該方法。
 */
 void onCommit(Map<TopicPartition, OffsetAndMetadata> var1);

 /**
 * 該方法在關(guān)閉攔截器時(shí)調(diào)用态蒂。
 */
 void close();
}

3 .動(dòng)手實(shí)踐

下面以實(shí)現(xiàn)一個(gè)簡(jiǎn)單的kafka指標(biāo)采集小功能為例杭措,進(jìn)一步了解kafka攔截器的功能和使用方法。采集指標(biāo)包括:

  1. 生產(chǎn)和消費(fèi)消費(fèi)的線程名

  2. 生產(chǎn)者生產(chǎn)消息成功失敗次數(shù)統(tǒng)計(jì)

3.1 修改消息钾恢,增加處理線程名

在生產(chǎn)端手素,實(shí)現(xiàn)ProducerInterceptor接口并覆寫onSend方法,修改ProducerRecord瘩蚪,在Heads中增加生產(chǎn)者線程名:

public class TraceProducerInterceptor implements ProducerInterceptor<String, String> {

 @Override
 public ProducerRecord<String,String> onSend(ProducerRecord<String,String > record) {
 Header producerThread = new RecordHeader("producerThread",Thread.currentThread().getName().getBytes());
 record.headers().add(producerThread);
 return new ProducerRecord<>(record.topic(),record.partition(),record.timestamp(),record.key(),record.value(),record.headers());
 }
}

在消費(fèi)端泉懦,實(shí)現(xiàn)ConsumerInterceptor接口并覆寫onConsume方法,修改ConsumerRecord募舟,在Heads中增加消費(fèi)者線程名:

public class TraceConsumerInterceptor implements ConsumerInterceptor<String, String> {

 @Override
 public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {
 byte[] currentThreadName = Thread.currentThread().getName().getBytes();
 Header header = new RecordHeader("consumer Thread", currentThreadName);
 records.forEach(record -> record.headers().add(header));
 return records;
 }
}

3.2 實(shí)現(xiàn)生產(chǎn)者消息成功失敗統(tǒng)計(jì)

在生產(chǎn)端祠斧,實(shí)現(xiàn)ProducerInterceptor接口并覆寫onAcknowledgement方法,對(duì)發(fā)送成功和失敗的消息進(jìn)行統(tǒng)計(jì)拱礁,并在攔截器關(guān)閉時(shí)將數(shù)據(jù)打印到控制臺(tái):

public class TraceProducerInterceptor implements ProducerInterceptor<String, String> {
 private AtomicLong successCounts = new AtomicLong(0);
 private AtomicLong failedCounts = new AtomicLong(0);
?
 @Override
 public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
 if (null == exception) {
 successCounts.getAndIncrement();
 } else {
 failedCounts.getAndIncrement();
 }
 }
?
 @Override
 public void close() {
 System.out.println("success counts " + successCounts.get());
 System.out.println("failed counts " + failedCounts);
 }
}

3.3 . 攔截器配置:

生產(chǎn)者和消費(fèi)者可以通過(guò)interceptor.classes屬性配置攔截器琢锋,屬性的值為一個(gè)字符串集合,集合中的元素為攔截器類的全路徑名(包括包名)呢灶。

生產(chǎn)者只包含攔截器的配置如下:

Properties props = new Properties();
List<String> interceptors = new ArrayList<>();
interceptors.add("io.github.ctlove0523.stackoverflow.kafka.TraceConsumerInterceptor");
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);

消費(fèi)者只包含攔截器的配置如下:

Properties props = new Properties();
List<String> interceptors = new ArrayList<>();
interceptors.add("io.github.ctlove0523.stackoverflow.kafka.TraceConsumerInterceptor");
props.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);

3.4. 測(cè)試

生產(chǎn)者使用三個(gè)線程吴超,每個(gè)線程發(fā)送一個(gè)消息到kafka,在主線程啟動(dòng)消費(fèi)者消費(fèi)kafka的消息鸯乃,收到的每條消息打印消息的Heads信息鲸阻。

  • 生產(chǎn)者

創(chuàng)建發(fā)送消息的線程池:

private static ExecutorService executor = Executors.newFixedThreadPool(3);

為了避免主線程退出導(dǎo)致發(fā)送消息失敗,在添加任務(wù)時(shí)缨睡,將返回的Future對(duì)象保存到隊(duì)列中鸟悴,然后逐個(gè)檢查任務(wù)是否完成,詳細(xì)的代碼如下:

 // 生產(chǎn)者配置信息
 Properties props = new Properties();
 props.put("bootstrap.servers", "localhost:9092");
 props.put("acks", "all");
 props.put("delivery.timeout.ms", 300000);
 props.put("batch.size", 16384);
 props.put("linger.ms", 1);
 props.put("buffer.memory", 33554432);
 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
 // 配置攔截器
 List<String> interceptors = new ArrayList<>();
 interceptors.add("io.github.ctlove0523.stackoverflow.kafka.TraceProducerInterceptor");
 props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);

 // 創(chuàng)建生產(chǎn)者并發(fā)送消息
 Producer<String, String> producer = new KafkaProducer<>(props);
 List<Future> futures = new ArrayList<>(3);
 for (int i = 0; i < 3; i++) {
 futures.add(executor.submit(() -> {
 producer.send(new ProducerRecord<>("TEST", "hello world "));
 }));
 }

 // 檢查任務(wù)是否完成
 futures.forEach(future -> {
 try {
 future.get();
 } catch (Exception e) {
 System.out.println(e.getMessage());
 }
 });

 // 關(guān)閉生產(chǎn)者
 producer.close();

代碼的輸出結(jié)果如下:

success counts 3
failed counts 0
  • 消費(fèi)者

消費(fèi)者拉取消息奖年,打印收到消息的Heads信息以驗(yàn)證攔截器是否生效细诸。

 // 消費(fèi)者配置信息
 Properties props = new Properties();
 props.put("bootstrap.servers", "localhost:9092");
 props.put("group.id", "chentong");
 props.put("enable.auto.commit", "false");
 props.put("key.deserializer",         "org.apache.kafka.common.serialization.StringDeserializer");
 props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
 // 配置攔截器
 List<String> interceptors = new ArrayList<>();
 interceptors.add("io.github.ctlove0523.stackoverflow.kafka.TraceConsumerInterceptor");
 props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);
?
 // 創(chuàng)建消費(fèi)者對(duì)象并拉取消息
 Consumer<String, String> consumer = new KafkaConsumer<>(props);
 consumer.assign(Arrays.asList(new TopicPartition("TEST", 0)));
 consumer.seek(new TopicPartition("TEST", 0), 0L);
 while (true) {
 ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(5));
 records.forEach(record -> {
 record.headers().headers("producer thread")
 .forEach(header -> System.out.print("producer thread = " + new String(header.value())));
 record.headers().headers("consumer thread")
 .forEach(header -> System.out.println("\t consumer thread = " + new String(header.value())));
 });
 }

代碼輸出結(jié)果如下:

producer thread = pool-1-thread-2  consumer thread = main
producer thread = pool-1-thread-1  consumer thread = main
producer thread = pool-1-thread-3  consumer thread = main

4. 總結(jié)

本文首先介紹了kafka攔截器引入的動(dòng)機(jī),主要為了解決當(dāng)前kafka指標(biāo)采集和監(jiān)控的痛點(diǎn)問(wèn)題陋守;接著簡(jiǎn)單介紹了ProducerInterceptorConsumerInterceptor兩個(gè)接口震贵,最后以一個(gè)實(shí)際修改kafka消息Heads的例子進(jìn)一步闡述了如何使用kafka提供的攔截器功能。文中所有樣例的代碼可以從GitHub下載水评。

5. 參考

  1. KIP-42

  2. Kafka API doc

  3. Release Notes

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末猩系,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子中燥,更是在濱河造成了極大的恐慌寇甸,老刑警劉巖,帶你破解...
    沈念sama閱讀 217,084評(píng)論 6 503
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異幽纷,居然都是意外死亡式塌,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,623評(píng)論 3 392
  • 文/潘曉璐 我一進(jìn)店門友浸,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人偏窝,你說(shuō)我怎么就攤上這事收恢。” “怎么了祭往?”我有些...
    開(kāi)封第一講書人閱讀 163,450評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵伦意,是天一觀的道長(zhǎng)。 經(jīng)常有香客問(wèn)我硼补,道長(zhǎng)驮肉,這世上最難降的妖魔是什么? 我笑而不...
    開(kāi)封第一講書人閱讀 58,322評(píng)論 1 293
  • 正文 為了忘掉前任已骇,我火速辦了婚禮离钝,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘褪储。我一直安慰自己卵渴,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,370評(píng)論 6 390
  • 文/花漫 我一把揭開(kāi)白布鲤竹。 她就那樣靜靜地躺著浪读,像睡著了一般。 火紅的嫁衣襯著肌膚如雪辛藻。 梳的紋絲不亂的頭發(fā)上碘橘,一...
    開(kāi)封第一講書人閱讀 51,274評(píng)論 1 300
  • 那天,我揣著相機(jī)與錄音吱肌,去河邊找鬼痘拆。 笑死,一個(gè)胖子當(dāng)著我的面吹牛岩榆,可吹牛的內(nèi)容都是我干的错负。 我是一名探鬼主播,決...
    沈念sama閱讀 40,126評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼勇边,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼犹撒!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起粒褒,我...
    開(kāi)封第一講書人閱讀 38,980評(píng)論 0 275
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤识颊,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體祥款,經(jīng)...
    沈念sama閱讀 45,414評(píng)論 1 313
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡清笨,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,599評(píng)論 3 334
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了刃跛。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片抠艾。...
    茶點(diǎn)故事閱讀 39,773評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖桨昙,靈堂內(nèi)的尸體忽然破棺而出检号,到底是詐尸還是另有隱情,我是刑警寧澤蛙酪,帶...
    沈念sama閱讀 35,470評(píng)論 5 344
  • 正文 年R本政府宣布齐苛,位于F島的核電站,受9級(jí)特大地震影響桂塞,放射性物質(zhì)發(fā)生泄漏凹蜂。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,080評(píng)論 3 327
  • 文/蒙蒙 一阁危、第九天 我趴在偏房一處隱蔽的房頂上張望玛痊。 院中可真熱鬧,春花似錦欲芹、人聲如沸卿啡。這莊子的主人今日做“春日...
    開(kāi)封第一講書人閱讀 31,713評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)颈娜。三九已至,卻和暖如春浙宜,著一層夾襖步出監(jiān)牢的瞬間官辽,已是汗流浹背。 一陣腳步聲響...
    開(kāi)封第一講書人閱讀 32,852評(píng)論 1 269
  • 我被黑心中介騙來(lái)泰國(guó)打工粟瞬, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留同仆,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 47,865評(píng)論 2 370
  • 正文 我出身青樓裙品,卻偏偏與公主長(zhǎng)得像俗批,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子市怎,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,689評(píng)論 2 354

推薦閱讀更多精彩內(nèi)容