概念:
Kafka作為一個分布式的流平臺具有三個關(guān)鍵能力:
1.發(fā)布和訂閱消息(流)才漆,在這方面,它類似于一個消息隊列或企業(yè)消息系統(tǒng)
2.以容錯的方式存儲消息(流)
3.在消息流發(fā)生時處理它們
優(yōu)勢:
1.構(gòu)建實時的流數(shù)據(jù)管道佛点,可靠地獲取系統(tǒng)和應(yīng)用程序之間的數(shù)據(jù)
2.構(gòu)建實時流的應(yīng)用程序醇滥,對數(shù)據(jù)流進行轉(zhuǎn)換或反應(yīng)
首先幾個概念:
1.kafka作為一個集群運行在一個或多個服務(wù)器上。
2.kafka集群存儲的消息是以topic為類別記錄的超营。
3.每個消息(也叫記錄record鸳玩,我習(xí)慣叫消息)是由一個key,一個value和時間戳構(gòu)成演闭。
kafka有四個核心API:
1.應(yīng)用程序使用 Producer API 發(fā)布消息到1個或多個topic(主題)不跟。
2.應(yīng)用程序使用 Consumer API 來訂閱一個或多個topic,并處理產(chǎn)生的消息米碰。
3.應(yīng)用程序使用StreamsAPI充當一個流處理器窝革,從1個或多個topic消費輸入流,
并生產(chǎn)一個輸出流到1個或多個輸出topic吕座,有效地將輸入流轉(zhuǎn)換到輸出流虐译。
4.ConnectorAPI允許構(gòu)建或運行可重復(fù)使用的生產(chǎn)者或消費者,將topic連接到現(xiàn)有的應(yīng)用程序或數(shù)據(jù)系統(tǒng)吴趴。
下載安裝:
我是在自己的mac安裝的菱蔬,下面給出安裝命令:
brew install kafka (會自動安裝Zookeeper)
啟動:
brew services start zookeeper
brew services start kafka
if you don't want/need a background service you can just run:
zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties & kafka-server-start /usr/local/etc/kafka/server.properties
使用kafka:
1.創(chuàng)建一個名為 dog 的Topic, 只有一個分區(qū)和一個備份
kafka-topics --create --zookeeper localhost:9092 --replication-factor 1 --partitions 1 --topic dog
2.查看創(chuàng)建的topic信息
kafka-topics --list --zookeeper localhost:9092
3.發(fā)送消息
kafka-console-producer --broker-list localhost:9092 --topic dog
4.消費消息
kafka-console-consumer --bootstrap-server localhost:9092 --topic dog
生產(chǎn)消息 和 消費消息 見下圖:
kafka在linux的集群搭建
1.下載: http://kafka.apache.org/downloads.html
2.安裝:
#解壓到目錄: cd usr/local/kafka
tar -zxvf kafka_2.12-2.5.0.tgz
# 創(chuàng)建logs目錄
mkdir logs
[root@hadoop2 kafka]# pwd
/usr/local/kafka/kafka
[root@hadoop2 kafka]# ls
bin config libs LICENSE logs NOTICE site-docs
## 修改配置文件: vim /config/server.properties ##
# broker的全局唯一編號,不能重復(fù)
broker.id=2
# 處理網(wǎng)絡(luò)請求的線程數(shù)量
num.network.threads=3
# 用來處理磁盤IO的線程數(shù)量
num.io.threads=8
# 發(fā)送套接字的緩沖區(qū)大小
socket.send.buffer.bytes=102400
# 接收套接字的緩沖區(qū)大小
socket.receive.buffer.bytes=102400
# 請求套接字的緩沖區(qū)大小
socket.request.max.bytes=104857600
log.dirs=/usr/local/kafka/kafka/logs
# topic在當前broker上的分區(qū)個數(shù)
num.partitions=1
listeners=PLAINTEXT://hadoop2:9092
advertised.listeners=PLAINTEXT://hadoop2:9092
# 配置連接Zookeeper集群地址
zookeeper.connect=hadoop2:2181,hadoop3:2181,hadoop4:2181
# 配置環(huán)境變量
## sudo vim /etc/profile
kafka_home
export KAFKA_HOME=/usr/local/kafka/kafka
export PATH=$PATH:$KAFKA_HOME/bin
## source /etc/profile
## 分發(fā)到各臺機器(假設(shè)當前在hadoop2這臺機器)
[root@hadoop2 kafka]# ls
kafka
[root@hadoop2 kafka]# pwd
/usr/local/kafka
[root@hadoop2 kafka]# scp -r kafka root@hadoop3:/usr/local/kafka
# 修改每臺機器的broker.id
## vim /config/server.properties
broker.id = 2 (hadoop2)
broker.id = 3 (hadoop3)
broker.id = 4 (hadoop4)
# 啟動集群
## 依次在hadoop2史侣、hadoop3拴泌、hadoop4 啟動kafka
[root@hadoop2 kafka]# ls
bin config libs LICENSE logs NOTICE site-docs
[root@hadoop2 kafka]# pwd
/usr/local/kafka/kafka
[root@hadoop2 kafka]# bin/kafka-server-start.sh config/server.properties &
# 啟動集群方式2
[root@hadoop2 kafka]# bin/kafka-server-start.sh -daemon config/server.properties
[root@hadoop2 kafka]# jps
9462 Kafka
9480 Jps
3419 QuorumPeerMain
# 關(guān)閉集群
[root@hadoop2 kafka]# bin/kafka-server-stop.sh stop
Kafka命令行操作
# 查看當前服務(wù)器中的所有topic
[root@hadoop2 kafka]# bin/kafka-topics.sh --zookeeper hadoop2:2181 --list
# 創(chuàng)建topic
[root@hadoop2 kafka]# bin/kafka-topics.sh --zookeeper hadoop4:2181 \ --create --replication-factor 3 --partitions 1 --topic first
# 報錯:
[root@hadoop2 kafka]# bin/kafka-topics.sh zookeeper hadoop4:2181 --list
Exception in thread "main" java.lang.IllegalArgumentException: Only one of --bootstrap-server or --zookeeper must be specified
at kafka.admin.TopicCommand$TopicCommandOptions.checkArgs(TopicCommand.scala:702)
at kafka.admin.TopicCommand$.main(TopicCommand.scala:52)
at kafka.admin.TopicCommand.main(TopicCommand.scala)
## 注意: broker.id 要與對應(yīng)的zookeepr的myid保持一致
主機名 host(ip) zookeeper myid
kafka-2 hadoop2 server.2 2
kafka-3 hadoop3 server.3 3
kafka-4 hadoop4 server.4 4
- 嘗試正確的創(chuàng)建topic命令
# 在hadoop4機器上創(chuàng)建topic
[root@hadoop4 kafka]# bin/kafka-topics.sh --create --zookeeper hadoop4:2181 --replication-factor 3 --partitions 6 --topic test
Created topic test.
## 選項說明
--topic : 定義topic名
--partitions: 定義分區(qū)數(shù)
--replication-factor: 定義副本數(shù)
# 查看創(chuàng)建的topic(創(chuàng)建成功后hadoop2、hadoop3都可查看)
[root@hadoop4 kafka] bin/kafka-topics.sh --describe --zookeeper hadoop4:2181 --topic test
> Topic: test PartitionCount: 6 ReplicationFactor: 3 Configs:
> Topic: test Partition: 0 Leader: 2 Replicas: 2,3,4 Isr: 2
> Topic: test Partition: 1 Leader: 3 Replicas: 3,4,2 Isr: 3,4,2
> Topic: test Partition: 2 Leader: 4 Replicas: 4,2,3 Isr: 4,2,3
> Topic: test Partition: 3 Leader: 2 Replicas: 2,4,3 Isr: 2
> Topic: test Partition: 4 Leader: 3 Replicas: 3,2,4 Isr: 3,2,4
> Topic: test Partition: 5 Leader: 4 Replicas: 4,3,2 Isr: 4,3,2
# 刪除topic
[root@hadoop2 kafka]# bin/kafka-topics.sh --zookeeper hadoop4:2181 --delete --topic test
# 在hadoop2上啟動消費者
[root@hadoop2 kafka]# bin/kafka-console-consumer.sh --bootstrap-server hadoop2:9092 --topic test
# 在hadoop4啟動生產(chǎn)者
[root@hadoop2 kafka]# bin/kafka-console-producer.sh --broker-list hadoop2:9092 --topic test
調(diào)整準則:
一般來說惊橱,若是集群較序礁(小于6個brokers),則配置2 x broker數(shù)的partition數(shù)税朴。在這里主要考慮的是之后的擴展回季。若是集群擴展了一倍(例如12個)家制,則不用擔心會有partition不足的現(xiàn)象發(fā)生
一般來說,若是集群較大(大于12個)泡一,則配置1 x broker 數(shù)的partition數(shù)颤殴。因為這里不需要再考慮集群的擴展情況,與broker數(shù)相同的partition數(shù)已經(jīng)足夠應(yīng)付常規(guī)場景鼻忠。若有必要涵但,則再手動調(diào)整
考慮最高峰吞吐需要的并行consumer數(shù),調(diào)整partition的數(shù)目帖蔓。若是應(yīng)用場景需要有20個(同一個consumer group中的)consumer并行消費矮瘟,則據(jù)此設(shè)置為20個partition
考慮producer所需的吞吐,調(diào)整partition數(shù)目(如果producer的吞吐非常高塑娇,或是在接下來兩年內(nèi)都比較高澈侠,則增加partition的數(shù)目)
kafka工作流程分析
寫入方式:producer 采用推(push)模式將消息發(fā)布到 broker,每條消息都被追加(append)到分 區(qū)(patition)中埋酬,屬于順序?qū)懘疟P(順序?qū)懘疟P效率比隨機寫內(nèi)存要高哨啃,保障 kafka 吞吐率)
-
分區(qū): 消息發(fā)送時都被發(fā)送到一個 topic,其本質(zhì)就是一個目錄写妥,而 topic 是由一些 Partition Logs(分區(qū)日志)組成棘催,其組織結(jié)構(gòu)如下圖所示
522069A3-BD57-465D-8AF5-91594CC5DA62.png
我們可以看到,每個 Partition 中的消息都是有序的耳标,生產(chǎn)的消息被不斷追加到 Partition log 上醇坝,其中的每一個消息都被賦予了一個唯一的 offset 值。
分區(qū)的原因
1>方便在集群中擴展次坡,每個 Partition 可以通過調(diào)整以適應(yīng)它所在的機器呼猪,而一個 topic 又可以有多個 Partition 組成,因此整個集群就可以適應(yīng)任意大小的數(shù)據(jù)了
2>可以提高并發(fā)砸琅,因為可以以 Partition 為單位讀寫了分區(qū)原則
1>指定了 patition宋距,則直接使用
2>未指定 patition 但指定 key,通過對 key 的 value 進行 hash 出一個 patition
3>patition 和 key 都未指定症脂,使用輪詢選出一個 patition副本
同一個 partition 可能會有多個 replication(對應(yīng) server.properties 配置中的 default.replication.factor=N)谚赎。沒有 replication 的情況下,一旦 broker 宕機诱篷,其上所有 patition 的數(shù)據(jù)都不可被消費壶唤,同時 producer 也不能再將數(shù)據(jù)存于其上的 patition。引入 replication 之 后棕所,同一個 partition 可能會有多個 replication闸盔,而這時需要在這些 replication 之間選出一個 leader,producer 和 consumer 只與這個 leader 交互琳省,其它 replication 作為 follower 從 leader 中復(fù)制數(shù)據(jù)
寫入流程
1>producer 先從 zookeeper 的 "/brokers/.../state"節(jié)點找到該 partition 的 leader
2>producer 將消息發(fā)送給該 leader
3>leader 將消息寫入本地 log
4>followers 從 leader pull 消息迎吵,寫入本地 log 后向 leader 發(fā)送 ACK
5>leader 收到所有 ISR 中的 replication 的 ACK 后躲撰,增加 HW(high watermark,最后 commit 的 offset)并向 producer 發(fā)送 ACK
保存消息
存儲方式
物理上把topic分成一個或多個patition(對應(yīng) server.properties 中的num.partitions=3配 置)击费,每個 patition 物理上對應(yīng)一個文件夾(該文件夾存儲該 patition 的所有消息和索引文 件)存儲策略
無論消息是否被消費拢蛋,kafka 都會保留所有消息。有兩種策略可以刪除舊數(shù)據(jù):
1>基于時間:log.retention.hours=168
2>基于大小:log.retention.bytes=1073741824
需要注意的是蔫巩,因為 Kafka 讀取特定消息的時間復(fù)雜度為 O(1)谆棱,即與文件大小無關(guān),所
以這里刪除過期文件與提高 Kafka 性能無關(guān)
注意:
ACK的確認方式有:
0: 不需要確認, 速度塊
1: 生產(chǎn)者需要Leader的確認
all: 生產(chǎn)者需要Leder批幌、Follower的確認
小試牛刀
- 創(chuàng)建一個Maven項目 , pom.xml文件如下
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>kafkademo</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients !-->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.11.0.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.12</artifactId>
<version>0.11.0.0</version>
</dependency>
<!-- 日志包 -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.5</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.5</version>
</dependency>
</dependencies>
</project>
- 在resources目錄下創(chuàng)建一個log4j.properties文件
log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/spring.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n
- 代碼
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
public class Producer {
public static void main(String[] args) {
Properties properties = new Properties();
// 配置信息
// 1.kafka集群
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop2:9092");
// 2.應(yīng)答機制
properties.setProperty(ProducerConfig.ACKS_CONFIG, "all");
// 3.重試次數(shù)
properties.setProperty(ProducerConfig.RETRIES_CONFIG, "0");
// 4.批量大小
properties.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, "16384"); // 16k
// 提交延時
properties.setProperty(ProducerConfig.LINGER_MS_CONFIG, "1");
// 緩存
properties.setProperty(ProducerConfig.BUFFER_MEMORY_CONFIG, "33554432"); // 32M
// KV的序列化類
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
// 創(chuàng)建生產(chǎn)者對象
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
for (int i=10; i<20; i++){
producer.send(new ProducerRecord<String, String>("test", Integer.valueOf(i).toString()), (recordMetadata, e) -> {
if (e == null) {
System.out.println(recordMetadata.partition() + "-----" + recordMetadata.offset());
} else {
System.out.println("發(fā)送失敗");
}
});
}
// 關(guān)閉資源
producer.close();
}
}
運行程序
- 開啟消費者
[root@hadoop4 kafka]# bin/kafka-console-consumer.sh --bootstrap-server hadoop4:9092 --topic test
0
1
2
3
4
5
6
7
8
9
注意:如果topic有多(3)個分區(qū)础锐,那么則是按照分區(qū)順序輸出的嗓节,有可能是輸出 0 3 6 9 1 4 7 2 5 8, 但是每個分區(qū)內(nèi)部是有序的
創(chuàng)建多分區(qū)多副本
[root@hadoop2 kafka]# bin/kafka-topics.sh --create --zookeeper hadoop2:2181 --replication-factor 3 --partitions 3 --topic secondTest
Created topic secondTest.
# 查看創(chuàng)建多分區(qū)多副本的topic
[root@hadoop4 kafka]# bin/kafka-topics.sh --describe --zookeeper hadoop4:2181 --topic secondTest
Topic: secondTest PartitionCount: 3 ReplicationFactor: 3 Configs:
Topic: secondTest Partition: 0 Leader: 2 Replicas: 2,3,4 Isr: 2,3,4
Topic: secondTest Partition: 1 Leader: 3 Replicas: 3,4,2 Isr: 3,4,2
Topic: secondTest Partition: 2 Leader: 4 Replicas: 4,2,3 Isr: 4,2,3
[root@hadoop2 kafka]#
- 自定義分區(qū)
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import java.util.List;
import java.util.Map;
import java.util.Random;
/**
自定義分區(qū)
*/
public class CustomerPartitioner implements Partitioner {
@Override
public int partition(String topic, Object o, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster) {
// return 0;
List<PartitionInfo> partitionInfoList = cluster.partitionsForTopic(topic);
int partitionNum = partitionInfoList.size();
Random random = new Random();
return random.nextInt(partitionNum); // [0 partitionNum) 中的一個隨機數(shù)
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> map) {
}
}
# 生產(chǎn)者(Producer)代碼 參考上例
properties.setProperty(ProducerConfig.PARTITIONER_CLASS_CONFIG, "CustomerPartitioner");
控制臺輸出:
分區(qū)=0-----偏移量=2
分區(qū)=0-----偏移量=3
分區(qū)=1-----偏移量=2
分區(qū)=1-----偏移量=3
分區(qū)=1-----偏移量=4
分區(qū)=1-----偏移量=5
分區(qū)=1-----偏移量=6
分區(qū)=1-----偏移量=7
分區(qū)=1-----偏移量=8
分區(qū)=2-----偏移量=6
kafka消費者輸出:
[root@hadoop2 kafka]# bin/kafka-console-consumer.sh --bootstrap-server hadoop2:9092 --topic secondTest
1
5
0
2
3
6
7
8
9
4
由上可知荧缘, 1,5 在分區(qū)0中,0,2,3,6,7,8,9在分區(qū)1中拦宣, 4在分區(qū)2中 截粗, 每個分區(qū)維護著一套自己的offset, 每個分區(qū)內(nèi)部有序
消費者(Consumer)
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Properties;
public class Consumer {
public static void main(String[] args) {
Properties properties = new Properties();
// kafka集群
properties.setProperty("bootstrap.servers", "hadoop2:9092");
// 消費者組id
properties.setProperty("group.id", "secondTest");
// 設(shè)置自動提交offset
properties.setProperty("enable.auto.commit", "true");
// 延時提交
properties.setProperty("auto.commit.interval.ms", "1000");
// KV的反序列化
properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// 創(chuàng)建消費者對象
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);
// 指定topic
// topic = "third" 這個topic是沒有創(chuàng)建的,開啟了log4j后會提示
// WARN [org.apache.kafka.clients.NetworkClient] - Error while fetching metadata with correlation id 2 : {third=LEADER_NOT_AVAILABLE}"
kafkaConsumer.subscribe(Arrays.asList("test", "secondTest","third"));
// 如果想只消費某一個指定的topic
// kafkaConsumer.subscribe(Collections.singletonList("test"));
while (true){
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(1000);
for (ConsumerRecord<String, String> record: consumerRecords) {
System.out.println(
"topic=" + record.topic() + "---" +
"patition=" + record.partition() + "---" +
"key=" + record.key() + "---" +
"value=" + record.value() + "---" +
"headers=" + record.headers().toString()
) ;
}
}
}
}
# 生產(chǎn)測試數(shù)據(jù)
[root@hadoop2 kafka]# bin/kafka-console-producer.sh --broker-list hadoop4:9092 --topic secondTest
>wudy haha
>still water run deep
>
# console輸出
topic=secondTest---patition=2---key=null---value=wudy haha---headers=RecordHeaders(headers = [], isReadOnly = false)
topic=secondTest---patition=0---key=null---value=still water run deep---headers=RecordHeaders(headers = [], isReadOnly = false)
- kafka如何重復(fù)消費?
1.使用低級API直接指定到哪個offset
2.設(shè)置auto.offset.reset=earliest鸵隧, 并更換組名
properties.setProperty("group.id", "wudyGroup"); // 原來 == secondTest
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
- kafka seek方法
# 可能存在的報錯
Exception in thread "main" java.lang.IllegalStateException: No current assignment for partition secondTest-0
at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:269)
at org.apache.kafka.clients.consumer.internals.SubscriptionState.seek(SubscriptionState.java:294)
at org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1249)
at Consumer.main(Consumer.java:51)
# 解決方式1
// 指定消費某一個partition分區(qū)的數(shù)據(jù)
kafkaConsumer.assign(Collections.singletonList(new TopicPartition("secondTest", 0)));
// 指定獲取某個topic的指定分區(qū)的指定offset的數(shù)據(jù)
kafkaConsumer.seek(new TopicPartition("secondTest", 0), 4);
# 解決方式2
// 如果想只消費某一個指定的topi
kafkaConsumer.subscribe(Collections.singletonList("secondTest"));
kafkaConsumer.poll(0);
// 指定獲取某個topic的指定分區(qū)的指定offset的數(shù)
kafkaConsumer.seek(new TopicPartition("secondTest", 0), 4);
攔截器
Producer攔截器(interceptor)主要用于實現(xiàn)clients端的定制化控制邏輯
對于Producer而言绸罗,interceptor使得用戶在消息發(fā)送前以及producer回掉邏輯前有機會對消息做一些定制化需求,比如修改消息
同時Producer允許用戶指定多個interceptor按序作用于同一消息從而形成一個攔截鏈(org.apache.kafka.clients.producer.ProducerInterceptor)
1.configure(configs)
獲取配置信息和初始化數(shù)據(jù)時調(diào)用2.onSend(ProducerRecord)
該方法封裝進kafkaProducer.send方法中豆瘫,即它運行在用戶主線程中珊蟀,Producer確保在消息被序列化以及計算分區(qū)前調(diào)用該方法。用戶可以在該方法中對消息做任何操作外驱,但最好保證不要修改消息所屬的topic和分區(qū)3.onAcknowledgement(RecordMetadata, Exception):
該方法會在消息被應(yīng)答或消息發(fā)送失敗時調(diào)用育灸,通常都是在producer回掉邏輯觸發(fā)之前。onAcknowledgement運行在producer的IO線程中昵宇,不要在該方法中放很重的邏輯磅崭,否則會拖慢Producer的消息發(fā)送效率4.close
關(guān)閉interceptor, 主要用于執(zhí)行一些資源清理工作。
interceptor可能被運行在多個線程中瓦哎,因此在具體實現(xiàn)時需要用戶自行確保線程安全砸喻,倘若指定了多個interceptor,則producer將按照指定順序調(diào)用她們,并僅僅是捕獲每個interceptor可能拋出的異常記錄到錯誤日志再向上傳遞