Kafka

概念:
Kafka作為一個分布式的流平臺具有三個關(guān)鍵能力:
1.發(fā)布和訂閱消息(流)才漆,在這方面,它類似于一個消息隊列或企業(yè)消息系統(tǒng)
2.以容錯的方式存儲消息(流)
3.在消息流發(fā)生時處理它們

優(yōu)勢:
1.構(gòu)建實時的流數(shù)據(jù)管道佛点,可靠地獲取系統(tǒng)和應(yīng)用程序之間的數(shù)據(jù)
2.構(gòu)建實時流的應(yīng)用程序醇滥,對數(shù)據(jù)流進行轉(zhuǎn)換或反應(yīng)

首先幾個概念:
1.kafka作為一個集群運行在一個或多個服務(wù)器上。
2.kafka集群存儲的消息是以topic為類別記錄的超营。
3.每個消息(也叫記錄record鸳玩,我習(xí)慣叫消息)是由一個key,一個value和時間戳構(gòu)成演闭。

kafka有四個核心API:
1.應(yīng)用程序使用 Producer API 發(fā)布消息到1個或多個topic(主題)不跟。
2.應(yīng)用程序使用 Consumer API 來訂閱一個或多個topic,并處理產(chǎn)生的消息米碰。
3.應(yīng)用程序使用StreamsAPI充當一個流處理器窝革,從1個或多個topic消費輸入流,
并生產(chǎn)一個輸出流到1個或多個輸出topic吕座,有效地將輸入流轉(zhuǎn)換到輸出流虐译。
4.ConnectorAPI允許構(gòu)建或運行可重復(fù)使用的生產(chǎn)者或消費者,將topic連接到現(xiàn)有的應(yīng)用程序或數(shù)據(jù)系統(tǒng)吴趴。

下載安裝:
我是在自己的mac安裝的菱蔬,下面給出安裝命令:
brew install kafka (會自動安裝Zookeeper)

啟動:
brew services start zookeeper
brew services start kafka

if you don't want/need a background service you can just run:
zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties & kafka-server-start /usr/local/etc/kafka/server.properties

使用kafka:
1.創(chuàng)建一個名為 dog 的Topic, 只有一個分區(qū)和一個備份
kafka-topics --create --zookeeper localhost:9092 --replication-factor 1 --partitions 1 --topic dog
2.查看創(chuàng)建的topic信息
kafka-topics --list --zookeeper localhost:9092
3.發(fā)送消息
kafka-console-producer --broker-list localhost:9092 --topic dog
4.消費消息
kafka-console-consumer --bootstrap-server localhost:9092 --topic dog

生產(chǎn)消息 和 消費消息 見下圖:


9AC323F5-A495-4A5F-BE2C-86990F4C15EC.png

kafka在linux的集群搭建

1.下載: http://kafka.apache.org/downloads.html
2.安裝:

#解壓到目錄: cd  usr/local/kafka
tar  -zxvf  kafka_2.12-2.5.0.tgz
# 創(chuàng)建logs目錄
mkdir  logs
[root@hadoop2 kafka]# pwd
/usr/local/kafka/kafka
[root@hadoop2 kafka]# ls
bin  config  libs  LICENSE  logs  NOTICE  site-docs

## 修改配置文件: vim  /config/server.properties ##

# broker的全局唯一編號,不能重復(fù)
broker.id=2

# 處理網(wǎng)絡(luò)請求的線程數(shù)量
num.network.threads=3

# 用來處理磁盤IO的線程數(shù)量
num.io.threads=8

# 發(fā)送套接字的緩沖區(qū)大小
socket.send.buffer.bytes=102400

# 接收套接字的緩沖區(qū)大小
socket.receive.buffer.bytes=102400

# 請求套接字的緩沖區(qū)大小
socket.request.max.bytes=104857600

log.dirs=/usr/local/kafka/kafka/logs

# topic在當前broker上的分區(qū)個數(shù)
num.partitions=1

listeners=PLAINTEXT://hadoop2:9092
advertised.listeners=PLAINTEXT://hadoop2:9092

# 配置連接Zookeeper集群地址
zookeeper.connect=hadoop2:2181,hadoop3:2181,hadoop4:2181
# 配置環(huán)境變量
## sudo  vim /etc/profile
kafka_home
export KAFKA_HOME=/usr/local/kafka/kafka
export PATH=$PATH:$KAFKA_HOME/bin

## source /etc/profile
## 分發(fā)到各臺機器(假設(shè)當前在hadoop2這臺機器)

[root@hadoop2 kafka]# ls
kafka
[root@hadoop2 kafka]# pwd
/usr/local/kafka
[root@hadoop2 kafka]# scp -r  kafka  root@hadoop3:/usr/local/kafka

# 修改每臺機器的broker.id 
## vim  /config/server.properties
broker.id = 2  (hadoop2)
broker.id = 3   (hadoop3)
broker.id = 4   (hadoop4)
# 啟動集群
## 依次在hadoop2史侣、hadoop3拴泌、hadoop4 啟動kafka
[root@hadoop2 kafka]# ls
bin  config  libs  LICENSE  logs  NOTICE  site-docs
[root@hadoop2 kafka]# pwd
/usr/local/kafka/kafka
[root@hadoop2 kafka]# bin/kafka-server-start.sh config/server.properties &

# 啟動集群方式2
[root@hadoop2 kafka]# bin/kafka-server-start.sh -daemon config/server.properties 
[root@hadoop2 kafka]# jps
9462 Kafka
9480 Jps
3419 QuorumPeerMain


# 關(guān)閉集群
[root@hadoop2 kafka]# bin/kafka-server-stop.sh stop

Kafka命令行操作

# 查看當前服務(wù)器中的所有topic
[root@hadoop2 kafka]# bin/kafka-topics.sh  --zookeeper hadoop2:2181 --list
# 創(chuàng)建topic
[root@hadoop2 kafka]# bin/kafka-topics.sh --zookeeper hadoop4:2181 \ --create --replication-factor 3 --partitions 1 --topic first
# 報錯:
[root@hadoop2 kafka]# bin/kafka-topics.sh zookeeper hadoop4:2181 --list
Exception in thread "main" java.lang.IllegalArgumentException: Only one of --bootstrap-server or --zookeeper must be specified
    at kafka.admin.TopicCommand$TopicCommandOptions.checkArgs(TopicCommand.scala:702)
    at kafka.admin.TopicCommand$.main(TopicCommand.scala:52)
    at kafka.admin.TopicCommand.main(TopicCommand.scala)

## 注意: broker.id 要與對應(yīng)的zookeepr的myid保持一致
主機名          host(ip)      zookeeper    myid
kafka-2        hadoop2        server.2      2 
kafka-3        hadoop3        server.3      3
kafka-4        hadoop4        server.4      4
  • 嘗試正確的創(chuàng)建topic命令
# 在hadoop4機器上創(chuàng)建topic
[root@hadoop4 kafka]# bin/kafka-topics.sh --create --zookeeper hadoop4:2181 --replication-factor 3 --partitions 6 --topic test
Created topic test.
## 選項說明
--topic : 定義topic名
--partitions: 定義分區(qū)數(shù)
--replication-factor: 定義副本數(shù)


# 查看創(chuàng)建的topic(創(chuàng)建成功后hadoop2、hadoop3都可查看)
[root@hadoop4 kafka] bin/kafka-topics.sh --describe --zookeeper  hadoop4:2181 --topic test

>   Topic: test     PartitionCount: 6           ReplicationFactor: 3    Configs: 
>   Topic: test     Partition: 0    Leader: 2   Replicas: 2,3,4         Isr: 2
>   Topic: test     Partition: 1    Leader: 3   Replicas: 3,4,2         Isr: 3,4,2
>   Topic: test     Partition: 2    Leader: 4   Replicas: 4,2,3         Isr: 4,2,3
>   Topic: test     Partition: 3    Leader: 2   Replicas: 2,4,3         Isr: 2
>   Topic: test     Partition: 4    Leader: 3   Replicas: 3,2,4         Isr: 3,2,4
>   Topic: test     Partition: 5    Leader: 4   Replicas: 4,3,2         Isr: 4,3,2

# 刪除topic  
[root@hadoop2 kafka]# bin/kafka-topics.sh --zookeeper hadoop4:2181  --delete --topic test

# 在hadoop2上啟動消費者
[root@hadoop2 kafka]# bin/kafka-console-consumer.sh --bootstrap-server  hadoop2:9092 --topic test

# 在hadoop4啟動生產(chǎn)者
[root@hadoop2 kafka]# bin/kafka-console-producer.sh --broker-list  hadoop2:9092 --topic test

調(diào)整準則:
一般來說惊橱,若是集群較序礁(小于6個brokers),則配置2 x broker數(shù)的partition數(shù)税朴。在這里主要考慮的是之后的擴展回季。若是集群擴展了一倍(例如12個)家制,則不用擔心會有partition不足的現(xiàn)象發(fā)生
一般來說,若是集群較大(大于12個)泡一,則配置1 x broker 數(shù)的partition數(shù)颤殴。因為這里不需要再考慮集群的擴展情況,與broker數(shù)相同的partition數(shù)已經(jīng)足夠應(yīng)付常規(guī)場景鼻忠。若有必要涵但,則再手動調(diào)整
考慮最高峰吞吐需要的并行consumer數(shù),調(diào)整partition的數(shù)目帖蔓。若是應(yīng)用場景需要有20個(同一個consumer group中的)consumer并行消費矮瘟,則據(jù)此設(shè)置為20個partition
考慮producer所需的吞吐,調(diào)整partition數(shù)目(如果producer的吞吐非常高塑娇,或是在接下來兩年內(nèi)都比較高澈侠,則增加partition的數(shù)目)

kafka工作流程分析

32402AB8-6275-42DC-96D2-026C327B8AD2.png
  • 寫入方式:producer 采用推(push)模式將消息發(fā)布到 broker,每條消息都被追加(append)到分 區(qū)(patition)中埋酬,屬于順序?qū)懘疟P(順序?qū)懘疟P效率比隨機寫內(nèi)存要高哨啃,保障 kafka 吞吐率)

  • 分區(qū): 消息發(fā)送時都被發(fā)送到一個 topic,其本質(zhì)就是一個目錄写妥,而 topic 是由一些 Partition Logs(分區(qū)日志)組成棘催,其組織結(jié)構(gòu)如下圖所示


    522069A3-BD57-465D-8AF5-91594CC5DA62.png

我們可以看到,每個 Partition 中的消息都是有序的耳标,生產(chǎn)的消息被不斷追加到 Partition log 上醇坝,其中的每一個消息都被賦予了一個唯一的 offset 值。

  • 分區(qū)的原因
    1>方便在集群中擴展次坡,每個 Partition 可以通過調(diào)整以適應(yīng)它所在的機器呼猪,而一個 topic 又可以有多個 Partition 組成,因此整個集群就可以適應(yīng)任意大小的數(shù)據(jù)了
    2>可以提高并發(fā)砸琅,因為可以以 Partition 為單位讀寫了

  • 分區(qū)原則
    1>指定了 patition宋距,則直接使用
    2>未指定 patition 但指定 key,通過對 key 的 value 進行 hash 出一個 patition
    3>patition 和 key 都未指定症脂,使用輪詢選出一個 patition

  • 副本
    同一個 partition 可能會有多個 replication(對應(yīng) server.properties 配置中的 default.replication.factor=N)谚赎。沒有 replication 的情況下,一旦 broker 宕機诱篷,其上所有 patition 的數(shù)據(jù)都不可被消費壶唤,同時 producer 也不能再將數(shù)據(jù)存于其上的 patition。引入 replication 之 后棕所,同一個 partition 可能會有多個 replication闸盔,而這時需要在這些 replication 之間選出一個 leader,producer 和 consumer 只與這個 leader 交互琳省,其它 replication 作為 follower 從 leader 中復(fù)制數(shù)據(jù)

寫入流程

12A28A8B-BC04-4FF8-84AE-46A5CB6B653A.png

1>producer 先從 zookeeper 的 "/brokers/.../state"節(jié)點找到該 partition 的 leader
2>producer 將消息發(fā)送給該 leader
3>leader 將消息寫入本地 log
4>followers 從 leader pull 消息迎吵,寫入本地 log 后向 leader 發(fā)送 ACK
5>leader 收到所有 ISR 中的 replication 的 ACK 后躲撰,增加 HW(high watermark,最后 commit 的 offset)并向 producer 發(fā)送 ACK

保存消息

  • 存儲方式
    物理上把topic分成一個或多個patition(對應(yīng) server.properties 中的num.partitions=3配 置)击费,每個 patition 物理上對應(yīng)一個文件夾(該文件夾存儲該 patition 的所有消息和索引文 件)

  • 存儲策略
    無論消息是否被消費拢蛋,kafka 都會保留所有消息。有兩種策略可以刪除舊數(shù)據(jù):
    1>基于時間:log.retention.hours=168
    2>基于大小:log.retention.bytes=1073741824
    需要注意的是蔫巩,因為 Kafka 讀取特定消息的時間復(fù)雜度為 O(1)谆棱,即與文件大小無關(guān),所
    以這里刪除過期文件與提高 Kafka 性能無關(guān)

注意:
ACK的確認方式有:
0:  不需要確認, 速度塊
1: 生產(chǎn)者需要Leader的確認
all: 生產(chǎn)者需要Leder批幌、Follower的確認

小試牛刀

  • 創(chuàng)建一個Maven項目 , pom.xml文件如下
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>kafkademo</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>
        <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients !-->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.11.0.0</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.12</artifactId>
            <version>0.11.0.0</version>
        </dependency>

        <!-- 日志包 -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.5</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.5</version>
        </dependency>

    </dependencies>

</project>
  • 在resources目錄下創(chuàng)建一個log4j.properties文件
log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/spring.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n
  • 代碼
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;

public class Producer {
    public static void main(String[] args) {

        Properties properties = new Properties();

        // 配置信息
        // 1.kafka集群
        properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop2:9092");

        // 2.應(yīng)答機制
        properties.setProperty(ProducerConfig.ACKS_CONFIG, "all");

        // 3.重試次數(shù)
        properties.setProperty(ProducerConfig.RETRIES_CONFIG, "0");

        // 4.批量大小
        properties.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, "16384");  // 16k

        // 提交延時
        properties.setProperty(ProducerConfig.LINGER_MS_CONFIG, "1");

        // 緩存
        properties.setProperty(ProducerConfig.BUFFER_MEMORY_CONFIG, "33554432");  // 32M

        // KV的序列化類
        properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

        // 創(chuàng)建生產(chǎn)者對象
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);

        for (int i=10; i<20; i++){
            producer.send(new ProducerRecord<String, String>("test", Integer.valueOf(i).toString()), (recordMetadata, e) -> {
                    if (e == null) {
                        System.out.println(recordMetadata.partition() + "-----" + recordMetadata.offset());
                    } else {
                        System.out.println("發(fā)送失敗");
                    }
            });
        }

        // 關(guān)閉資源
        producer.close();
    }
}

運行程序

  • 開啟消費者
[root@hadoop4 kafka]# bin/kafka-console-consumer.sh --bootstrap-server  hadoop4:9092   --topic  test
0
1
2
3
4
5
6
7
8
9

注意:如果topic有多(3)個分區(qū)础锐,那么則是按照分區(qū)順序輸出的嗓节,有可能是輸出 0 3 6 9 1 4 7 2 5 8, 但是每個分區(qū)內(nèi)部是有序的

創(chuàng)建多分區(qū)多副本

[root@hadoop2 kafka]# bin/kafka-topics.sh --create --zookeeper  hadoop2:2181 --replication-factor 3 --partitions  3 --topic  secondTest
Created topic secondTest.

# 查看創(chuàng)建多分區(qū)多副本的topic
[root@hadoop4 kafka]# bin/kafka-topics.sh --describe --zookeeper  hadoop4:2181 --topic  secondTest
Topic: secondTest   PartitionCount: 3   ReplicationFactor: 3    Configs: 
Topic: secondTest   Partition: 0        Leader: 2 Replicas: 2,3,4   Isr: 2,3,4
Topic: secondTest   Partition: 1        Leader: 3 Replicas: 3,4,2   Isr: 3,4,2
Topic: secondTest   Partition: 2        Leader: 4 Replicas: 4,2,3   Isr: 4,2,3
[root@hadoop2 kafka]# 
  • 自定義分區(qū)
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;

import java.util.List;
import java.util.Map;
import java.util.Random;

/**
  自定義分區(qū)
 */
public class CustomerPartitioner implements Partitioner {
    @Override
    public int partition(String topic, Object o, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster) {
//        return 0;
        List<PartitionInfo> partitionInfoList = cluster.partitionsForTopic(topic);
        int partitionNum = partitionInfoList.size();
        Random random = new Random();
        return random.nextInt(partitionNum);  // [0 partitionNum) 中的一個隨機數(shù)
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> map) {

    }
}

# 生產(chǎn)者(Producer)代碼 參考上例
properties.setProperty(ProducerConfig.PARTITIONER_CLASS_CONFIG, "CustomerPartitioner");

控制臺輸出:

分區(qū)=0-----偏移量=2
分區(qū)=0-----偏移量=3
分區(qū)=1-----偏移量=2
分區(qū)=1-----偏移量=3
分區(qū)=1-----偏移量=4
分區(qū)=1-----偏移量=5
分區(qū)=1-----偏移量=6
分區(qū)=1-----偏移量=7
分區(qū)=1-----偏移量=8
分區(qū)=2-----偏移量=6

kafka消費者輸出:

[root@hadoop2 kafka]# bin/kafka-console-consumer.sh --bootstrap-server  hadoop2:9092 --topic secondTest
1
5
0
2
3
6
7
8
9
4

由上可知荧缘, 1,5 在分區(qū)0中,0,2,3,6,7,8,9在分區(qū)1中拦宣, 4在分區(qū)2中 截粗, 每個分區(qū)維護著一套自己的offset, 每個分區(qū)內(nèi)部有序

消費者(Consumer)

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Properties;

public class Consumer {
    public static void main(String[] args) {
        Properties properties = new Properties();
        // kafka集群
        properties.setProperty("bootstrap.servers", "hadoop2:9092");
        // 消費者組id
        properties.setProperty("group.id", "secondTest");
        // 設(shè)置自動提交offset
        properties.setProperty("enable.auto.commit", "true");
        // 延時提交
        properties.setProperty("auto.commit.interval.ms", "1000");
        // KV的反序列化
        properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        // 創(chuàng)建消費者對象
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);

        // 指定topic
        // topic = "third" 這個topic是沒有創(chuàng)建的,開啟了log4j后會提示
        // WARN [org.apache.kafka.clients.NetworkClient] - Error while fetching metadata with correlation id 2 : {third=LEADER_NOT_AVAILABLE}"
        kafkaConsumer.subscribe(Arrays.asList("test", "secondTest","third"));
        // 如果想只消費某一個指定的topic
//        kafkaConsumer.subscribe(Collections.singletonList("test"));

        while (true){
            ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(1000);
            for (ConsumerRecord<String, String> record: consumerRecords) {
                System.out.println(
                        "topic=" + record.topic() + "---" +
                        "patition=" + record.partition() + "---" +
                                "key=" + record.key() + "---" +
                                "value=" + record.value() + "---" +
                                "headers=" + record.headers().toString()
                ) ;
            }
        }
    }
}

# 生產(chǎn)測試數(shù)據(jù)
[root@hadoop2 kafka]# bin/kafka-console-producer.sh --broker-list  hadoop4:9092 --topic secondTest
>wudy  haha
>still water run deep
>

# console輸出
topic=secondTest---patition=2---key=null---value=wudy  haha---headers=RecordHeaders(headers = [], isReadOnly = false)
topic=secondTest---patition=0---key=null---value=still water run deep---headers=RecordHeaders(headers = [], isReadOnly = false)
  • kafka如何重復(fù)消費?

1.使用低級API直接指定到哪個offset
2.設(shè)置auto.offset.reset=earliest鸵隧, 并更換組名

properties.setProperty("group.id", "wudyGroup"); // 原來 == secondTest
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
  1. kafka seek方法
# 可能存在的報錯

Exception in thread "main" java.lang.IllegalStateException: No current assignment for partition secondTest-0
    at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:269)
    at org.apache.kafka.clients.consumer.internals.SubscriptionState.seek(SubscriptionState.java:294)
    at org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1249)
    at Consumer.main(Consumer.java:51)
# 解決方式1
// 指定消費某一個partition分區(qū)的數(shù)據(jù)
        kafkaConsumer.assign(Collections.singletonList(new TopicPartition("secondTest", 0)));
        // 指定獲取某個topic的指定分區(qū)的指定offset的數(shù)據(jù)
        kafkaConsumer.seek(new TopicPartition("secondTest", 0), 4);
# 解決方式2
// 如果想只消費某一個指定的topi
 kafkaConsumer.subscribe(Collections.singletonList("secondTest"));
 kafkaConsumer.poll(0);
 // 指定獲取某個topic的指定分區(qū)的指定offset的數(shù)
  kafkaConsumer.seek(new TopicPartition("secondTest", 0), 4);

攔截器

Producer攔截器(interceptor)主要用于實現(xiàn)clients端的定制化控制邏輯
對于Producer而言绸罗,interceptor使得用戶在消息發(fā)送前以及producer回掉邏輯前有機會對消息做一些定制化需求,比如修改消息
同時Producer允許用戶指定多個interceptor按序作用于同一消息從而形成一個攔截鏈(org.apache.kafka.clients.producer.ProducerInterceptor)

  • 1.configure(configs)
    獲取配置信息和初始化數(shù)據(jù)時調(diào)用

  • 2.onSend(ProducerRecord)
    該方法封裝進kafkaProducer.send方法中豆瘫,即它運行在用戶主線程中珊蟀,Producer確保在消息被序列化以及計算分區(qū)前調(diào)用該方法。用戶可以在該方法中對消息做任何操作外驱,但最好保證不要修改消息所屬的topic和分區(qū)

  • 3.onAcknowledgement(RecordMetadata, Exception):
    該方法會在消息被應(yīng)答或消息發(fā)送失敗時調(diào)用育灸,通常都是在producer回掉邏輯觸發(fā)之前。onAcknowledgement運行在producer的IO線程中昵宇,不要在該方法中放很重的邏輯磅崭,否則會拖慢Producer的消息發(fā)送效率

  • 4.close
    關(guān)閉interceptor, 主要用于執(zhí)行一些資源清理工作。
    interceptor可能被運行在多個線程中瓦哎,因此在具體實現(xiàn)時需要用戶自行確保線程安全砸喻,倘若指定了多個interceptor,則producer將按照指定順序調(diào)用她們,并僅僅是捕獲每個interceptor可能拋出的異常記錄到錯誤日志再向上傳遞

參考文章:https://mp.weixin.qq.com/s?__biz=Mzg2MjEwMjI1Mg==&mid=2247498512&idx=3&sn=d3f2fc7124fbfa556ecdfdd789417788&chksm=ce0e4c93f979c5859e0bb8bbebca80d389041894c4bbbf3c16af66040ab57c987afa6a03bb05&mpshare=1&scene=23&srcid=0813E6fZRaGThkrOfoStJoBs&sharer_sharetime=1597325081562&sharer_shareid=f770d25bc57f1c2f9159f85750f854dc%23rd

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末蒋譬,一起剝皮案震驚了整個濱河市割岛,隨后出現(xiàn)的幾起案子纵柿,更是在濱河造成了極大的恐慌递递,老刑警劉巖宣增,帶你破解...
    沈念sama閱讀 217,084評論 6 503
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件土匀,死亡現(xiàn)場離奇詭異帚湘,居然都是意外死亡,警方通過查閱死者的電腦和手機俏拱,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,623評論 3 392
  • 文/潘曉璐 我一進店門埃篓,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人疆股,你說我怎么就攤上這事费坊。” “怎么了旬痹?”我有些...
    開封第一講書人閱讀 163,450評論 0 353
  • 文/不壞的土叔 我叫張陵附井,是天一觀的道長。 經(jīng)常有香客問我两残,道長永毅,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,322評論 1 293
  • 正文 為了忘掉前任人弓,我火速辦了婚禮沼死,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘崔赌。我一直安慰自己意蛀,他們只是感情好,可當我...
    茶點故事閱讀 67,370評論 6 390
  • 文/花漫 我一把揭開白布健芭。 她就那樣靜靜地躺著县钥,像睡著了一般。 火紅的嫁衣襯著肌膚如雪慈迈。 梳的紋絲不亂的頭發(fā)上若贮,一...
    開封第一講書人閱讀 51,274評論 1 300
  • 那天,我揣著相機與錄音痒留,去河邊找鬼谴麦。 笑死,一個胖子當著我的面吹牛狭瞎,可吹牛的內(nèi)容都是我干的细移。 我是一名探鬼主播,決...
    沈念sama閱讀 40,126評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼熊锭,長吁一口氣:“原來是場噩夢啊……” “哼弧轧!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起碗殷,我...
    開封第一講書人閱讀 38,980評論 0 275
  • 序言:老撾萬榮一對情侶失蹤精绎,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后锌妻,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體代乃,經(jīng)...
    沈念sama閱讀 45,414評論 1 313
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,599評論 3 334
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了搁吓。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片原茅。...
    茶點故事閱讀 39,773評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖堕仔,靈堂內(nèi)的尸體忽然破棺而出擂橘,到底是詐尸還是另有隱情,我是刑警寧澤摩骨,帶...
    沈念sama閱讀 35,470評論 5 344
  • 正文 年R本政府宣布通贞,位于F島的核電站,受9級特大地震影響恼五,放射性物質(zhì)發(fā)生泄漏昌罩。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,080評論 3 327
  • 文/蒙蒙 一灾馒、第九天 我趴在偏房一處隱蔽的房頂上張望茎用。 院中可真熱鬧,春花似錦你虹、人聲如沸绘搞。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,713評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至琉预,卻和暖如春董饰,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背圆米。 一陣腳步聲響...
    開封第一講書人閱讀 32,852評論 1 269
  • 我被黑心中介騙來泰國打工卒暂, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人娄帖。 一個月前我還...
    沈念sama閱讀 47,865評論 2 370
  • 正文 我出身青樓也祠,卻偏偏與公主長得像,于是被迫代替她去往敵國和親近速。 傳聞我的和親對象是個殘疾皇子诈嘿,可洞房花燭夜當晚...
    茶點故事閱讀 44,689評論 2 354

推薦閱讀更多精彩內(nèi)容