2.一文搞定Kafka的生產(chǎn)者

1.介紹

這篇文章就是為了幫助大家來理解Produce存在的意義以及工作的方式來進(jìn)行編寫的淮韭,整篇文章也會從整體組成、運(yùn)行時(shí)架構(gòu)兩個方面去進(jìn)行講解,希望大家能夠看懂渤早,如果有什么錯誤的描述,希望大家可以留言告訴我瘫俊,讓我能夠及時(shí)的做出更改鹊杖。

2.整體組成

kafka的生產(chǎn)者主要就是負(fù)責(zé)給kafka中發(fā)送數(shù)據(jù)的,其核心的本質(zhì)工作內(nèi)容就是快速且不丟失完成自己的工作內(nèi)容扛芽。我還是先畫一個組成架構(gòu)圖骂蓖,方便大家理解。

kafka.jpg

這個圖差不多就是這個樣子的川尖,也是從根本上描述了生產(chǎn)者到kafka之間的工作結(jié)構(gòu)登下,并且通過箭頭還能夠讀出工作流程。接下來我們就先說結(jié)構(gòu)叮喳,再說流程被芳。

2.1 結(jié)構(gòu)

從結(jié)構(gòu)上看呢,kafka的生產(chǎn)者主要有兩個線程來完成工作馍悟,一個是main線程畔濒,另外一個是sender線程。main線程呢赋朦,主要負(fù)責(zé)數(shù)據(jù)的讀取篓冲、攔截李破、序列化和發(fā)送,并且在啟動的時(shí)候壹将,會創(chuàng)建一個雙端隊(duì)列RecordAccmulator嗤攻,所有的數(shù)據(jù)都是發(fā)送到這個雙端隊(duì)列中來的,這個雙端隊(duì)列的內(nèi)存默認(rèn)是32M诽俯,它的內(nèi)部是一個一個的Dquene妇菱,而每個Dquene中都有很多個塊,也就是ProducerBatch暴区。這個塊默認(rèn)是16K闯团。當(dāng)它符合某種規(guī)則的時(shí)候,數(shù)據(jù)就會從Dquene中進(jìn)入到下一個階段仙粱,這個規(guī)則有兩種房交,一種是當(dāng)batch的大小到達(dá)觸發(fā)發(fā)送機(jī)制的大小的時(shí)候數(shù)據(jù)被發(fā)送,另外一種就是設(shè)置的等待時(shí)間到了伐割,數(shù)據(jù)就會被發(fā)送候味。上述內(nèi)容記本上就是main線程要做的事情了。

至于sender線程隔心,它會不斷的從雙端隊(duì)列中拉取數(shù)據(jù)發(fā)送給KafkaBroker白群,但是單純的拉取數(shù)據(jù)肯定是不夠嚴(yán)謹(jǐn)?shù)摹.?dāng)sender線程開始工作的時(shí)候硬霍,它主要要做兩件事情帜慢,第一件事情就是創(chuàng)建networkClient用于存儲對kafka發(fā)送數(shù)據(jù)的請求,這個請求默認(rèn)是5個唯卖。第二件事是創(chuàng)建一個selector用于發(fā)送數(shù)據(jù)粱玲,并且等待kafka的回應(yīng),如果kafka回應(yīng)了耐床,證明數(shù)據(jù)寫入完成密幔,那這個時(shí)候就會清理掉請求列表中的請求,并且通知雙端隊(duì)列清除已經(jīng)寫完數(shù)據(jù)的對應(yīng)的Dquene中的batch撩轰。如果發(fā)送數(shù)據(jù)失敗了,那selector就i會觸發(fā)重試昧廷,這個重試的次數(shù)就是int類型的最大值堪嫂。

2.2 工作流程

降完了組成就要將工作流程了,工作流程就干了一件事兒木柬,那就是將生產(chǎn)者的數(shù)據(jù)發(fā)送給Kafka皆串。

1.kafkaProducer創(chuàng)建main線程和sender線程用于數(shù)據(jù)的發(fā)送工作。

2.main線程創(chuàng)建對應(yīng)的攔截器眉枕、序列化器恶复、分區(qū)器怜森,并且會創(chuàng)建一個能夠緩存發(fā)送數(shù)據(jù)的雙端隊(duì)列。

3.然后分區(qū)器向雙端隊(duì)列中寫入數(shù)據(jù)谤牡,在雙端隊(duì)列的內(nèi)部由一個一個的Dquene消息隊(duì)列來存儲這些數(shù)據(jù)副硅。

4.sender線程會周期性的監(jiān)控這個雙端隊(duì)列,一旦達(dá)到觸發(fā)拉取的時(shí)間或者大小就會向kafka的producer發(fā)送請求翅萤,請求發(fā)送數(shù)據(jù)。

5.這些請求內(nèi)容會被存儲在networkCLient中。

6.一旦kafka能夠接收數(shù)據(jù)雄妥,就由sender線程的selector來負(fù)責(zé)數(shù)據(jù)的發(fā)送工作索抓。

7.當(dāng)kafka接收完成數(shù)據(jù)之后,會發(fā)送ack驗(yàn)證碼給selector胚泌,seletor會根據(jù)是否接收到ack碼來確定數(shù)據(jù)是否發(fā)送成功省咨。

8.數(shù)據(jù)發(fā)送成功之后,selector會刪除network中的請求玷室,通知main線程刪除雙端隊(duì)列中的Dquene中的batch中的數(shù)據(jù)茸炒;如果失敗,那就重試阵苇,默認(rèn)可以重試Int類型的最大值次壁公。

3.數(shù)據(jù)發(fā)送方式

producer發(fā)送數(shù)據(jù)的方式有兩種,分別是異步發(fā)送和同步發(fā)送绅项。并且還要區(qū)分是否需要回調(diào)信息紊册,那接下來,就讓我一個一個的分別介紹給大家快耿。

3.1 不帶回調(diào)信息的異步發(fā)送

大家注意一下哈囊陡,這個異不異步不是針對sender到kafka的異步,而是外部數(shù)據(jù)到雙端隊(duì)列的異步掀亥。因?yàn)樯a(chǎn)者發(fā)送數(shù)據(jù)實(shí)際上就是往雙端隊(duì)列中寫入數(shù)據(jù)撞反,然后sender從雙端隊(duì)列中拿取數(shù)據(jù)給kafka的過程,因?yàn)檫@個異步就是圍繞著雙端隊(duì)列進(jìn)行劃分的搪花。在這一階段遏片,異步就是指外部數(shù)據(jù)一直往雙端隊(duì)列中寫入數(shù)據(jù),并不會在乎Dquene中的數(shù)據(jù)是否成功寫入到kafka中撮竿。

如果用代碼實(shí)現(xiàn)的話吮便,大家可以看一下:

<pre data-language="java" id="LvH9u" class="ne-codeblock language-java" style="border: 1px solid #e8e8e8; border-radius: 2px; background: #f9f9f9; padding: 16px; font-size: 13px; color: #595959">//1.引入依賴
<dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-clients</artifactId>
                <version>3.0.0</version>
            </dependency>
//2.編寫程序
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

public class KafkaProducerStudy {
    public static void main(String[] args) {
        //todo 0.創(chuàng)建配置類
        Properties properties = new Properties();
        //kafka的地址
        properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"had1:9092");
        //kafka序列化
        properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringSerializer");
        //todo 創(chuàng)建kafka生產(chǎn)者對象
        KafkaProducer<String, String> kafkaProducer = new
                KafkaProducer<String, String>(properties);

        for (int i = 0; i < 5; i++) {
            kafkaProducer.send(
                    new ProducerRecord<>("testTopic","number"+i)
            );
        }

        //todo 關(guān)閉資源
        kafkaProducer.close();
    }
}
</pre>

是不是看起來沒什么感覺?哈哈幢踏,我也覺著沒什么感覺髓需,但是帶回調(diào)信息的異步發(fā)送就有一點(diǎn)點(diǎn)不一樣了,不過因?yàn)樾枰却卣{(diào)信息的返回房蝉,所以速度會慢一些僚匆。

3.2 異步發(fā)送帶回調(diào)信息

這個回調(diào)函數(shù)會在producer的selector接收到ack機(jī)制的時(shí)候被調(diào)用微渠,調(diào)用方式也是異步的。如果返回的信息是異常的話咧擂,就代表著數(shù)據(jù)發(fā)送失敗了逞盆。如果失敗了,程序會自動重試屋确。

具體的代碼如下:

<pre data-language="java" id="XeBCc" class="ne-codeblock language-java" style="border: 1px solid #e8e8e8; border-radius: 2px; background: #f9f9f9; padding: 16px; font-size: 13px; color: #595959">import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class KafkaProducerStudyWithSentTwo {
    public static void main(String[] args) {
        //todo 1.配置類
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hdp1");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                StringSerializer.class.getClass());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                StringSerializer.class.getName());

        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);

        for (int i = 0; i < 10; i++) {
            producer.send(new ProducerRecord<>("testTopic","number:"+i)
                    , new Callback() {
                @Override
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    if (e == null){
                        System.out.println("主題:"+recordMetadata.topic()+"->"+
                                "分區(qū):"+recordMetadata.partition());
                    }else {
                        e.printStackTrace();
                    }
                }
            });
        }

        //todo 關(guān)閉生產(chǎn)者
        producer.close();
    }
}</pre>

3.3 同步發(fā)送API

同步發(fā)送就需要雙端隊(duì)列等到kafka返回?cái)?shù)據(jù)保存成功的消息之后纳击,外部數(shù)據(jù)才能夠把數(shù)據(jù)發(fā)送到雙端隊(duì)列中。而且同步發(fā)送在代碼上就多了一個get

<pre data-language="java" id="JkMJa" class="ne-codeblock language-java" style="border: 1px solid #e8e8e8; border-radius: 2px; background: #f9f9f9; padding: 16px; font-size: 13px; color: #595959">import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class KafkaProducerStudyWithSentThr {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        //todo 添加配置文件信息
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hdp1");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                StringSerializer.class.getName());
        //todo 創(chuàng)建kafka生產(chǎn)者
        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);

        for (int i = 0; i < 5; i++) {
            producer.send(new ProducerRecord<>("testTopic","number:"+i)).get();
        }
        //todo 關(guān)閉資源
        producer.close();
    }
}</pre>

4.生產(chǎn)者的分區(qū)

生產(chǎn)者的分區(qū)主要是針對kafka的topic來實(shí)現(xiàn)的攻臀,如果一份數(shù)據(jù)只固定的發(fā)送給一個topic的一個partition的話焕数,那效率是很低下的。所以生產(chǎn)者發(fā)送數(shù)據(jù)的時(shí)候刨啸,是針對topic中所有的leader分區(qū)來進(jìn)行拆解發(fā)送的堡赔,這樣不僅會讓發(fā)送的數(shù)據(jù)能夠分散在多個broker上,實(shí)現(xiàn)負(fù)責(zé)均衡设联。還能夠提高并行度善已,畢竟是向多個partition同時(shí)發(fā)送數(shù)據(jù),肯定要比向一個partition發(fā)送來的快得多离例。

不過既然要通過使用分區(qū)的方式完成數(shù)據(jù)的發(fā)送换团,那么就需要有對應(yīng)的發(fā)送策略還維持生產(chǎn)者發(fā)送數(shù)據(jù)時(shí)的平衡。Kafka生產(chǎn)者的默認(rèn)分區(qū)策略是:

1.指明分區(qū)的時(shí)候宫蛆,直接向這個分區(qū)寫入數(shù)據(jù)艘包,所有的數(shù)據(jù)都如此寫入。

2.當(dāng)沒有指明分區(qū)的時(shí)候耀盗,如果有key值想虎,可以對key值取Hash,然后對分區(qū)數(shù)取模叛拷,這樣所有的數(shù)據(jù)就能夠進(jìn)入到不同的分區(qū)中去了舌厨。

3.如果既沒有key也沒有partition,那么kafka就會采用黏粘性分區(qū)器(Sticky)忿薇,隨機(jī)選擇一個分區(qū)裙椭,然后一直向這個分區(qū)中寫入數(shù)據(jù),如果雙端隊(duì)列中的Dquene滿了煌恢,就換一個其他的分區(qū)繼續(xù)寫骇陈,以此類推。

下面用幾段代碼來描述一下不同的場景:

<pre data-language="java" id="vxmOq" class="ne-codeblock language-java" style="border: 1px solid #e8e8e8; border-radius: 2px; background: #f9f9f9; padding: 16px; font-size: 13px; color: #595959">//指定topic
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class KafkaProducerStudyWithSentFor {
    public static void main(String[] args) {
        //todo 添加配置文件信息
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hdp1");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                StringSerializer.class.getName());
        //todo 創(chuàng)建kafka生產(chǎn)者
        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);

        for (int i = 0 ; i< 5 ; i++){
            //僅指定partition
            producer.send(new ProducerRecord<>("testTopic", 0, "", "number:" + i)
                    , new Callback() {
                        @Override
                        public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                            //todo 觀察是否進(jìn)入到了對應(yīng)的分區(qū)
                            if (e == null){
                                System.out.println("數(shù)據(jù)進(jìn)入到了分區(qū):"+recordMetadata.topic());
                            }else {
                                e.printStackTrace();
                            }
                        }
                    });
        }
        //todo 關(guān)閉ziyuan
        producer.close();
    }
}</pre>
<pre data-language="java" id="jZDNu" class="ne-codeblock language-java" style="border: 1px solid #e8e8e8; border-radius: 2px; background: #f9f9f9; padding: 16px; font-size: 13px; color: #595959">//todo 沒指明partition的瑰抵,但是指明了key
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class KafkaProducerStudyWithSentFive {
    public static void main(String[] args) {
        //todo 添加配置文件信息
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hdp1");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                StringSerializer.class.getName());
        //todo 創(chuàng)建kafka生產(chǎn)者
        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);

        for (int i = 0 ; i< 5 ; i++){
            //僅指定key,觀察數(shù)據(jù)是否都進(jìn)入到了一個分區(qū)內(nèi)部
            producer.send(new ProducerRecord<>("testTopic", "a", "number:" + i)
                    , new Callback() {
                        @Override
                        public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                            //todo 觀察是否進(jìn)入到了對應(yīng)的分區(qū)
                            if (e == null){
                                System.out.println("數(shù)據(jù)進(jìn)入到了分區(qū):"+recordMetadata.topic());
                            }else {
                                e.printStackTrace();
                            }
                        }
                    });
        }
        //todo 關(guān)閉ziyuan
        producer.close();
    }
}
</pre>

除此之外器联,kafka生產(chǎn)者還能夠自己定義分區(qū)器二汛,通過如下方法婿崭,就能夠讓包含博主昵稱的數(shù)據(jù)進(jìn)入到0號分區(qū)了。

<pre data-language="java" id="OauTG" class="ne-codeblock language-java" style="border: 1px solid #e8e8e8; border-radius: 2px; background: #f9f9f9; padding: 16px; font-size: 13px; color: #595959">//todo 實(shí)現(xiàn)partitioner接口肴颊,重寫patition方法
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;

import java.util.Map;

public class MyPartitioner implements Partitioner {
    @Override
    public int partition(String s, Object o, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster) {
        String msgValue = s.toString();

        int partition;

        if (msgValue.contains("迷茫的小黑狗")){
            partition = 0;
        }else {
            partition = 1;
        }
        return partition;
    }
    //關(guān)閉資源
    @Override
    public void close() {

    }
    //配置方法
    @Override
    public void configure(Map<String, ?> map) {

    }
}
</pre>

那既然創(chuàng)建了自定義的分區(qū)器氓栈,就要應(yīng)用到程序中,具體的應(yīng)用方法如下段代碼所示:

<pre data-language="java" id="aClnC" class="ne-codeblock language-java" style="border: 1px solid #e8e8e8; border-radius: 2px; background: #f9f9f9; padding: 16px; font-size: 13px; color: #595959">public class KafkaProducerStudyWithSentSix {
    public static void main(String[] args) {
        //todo 添加配置文件信息
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hdp1");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                StringSerializer.class.getName());
        properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,
                MyPartitioner.class.getName());
        //todo 創(chuàng)建kafka生產(chǎn)者
        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);

        for (int i = 0 ; i< 5 ; i++){
            if (i % 2 ==0){
            //不包含博主昵稱的
            producer.send(new ProducerRecord<>("testTopic", "number:" + i)
                    , new Callback() {
                        @Override
                        public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                            //todo 觀察是否進(jìn)入到了對應(yīng)的分區(qū)
                            if (e == null){
                                System.out.println("數(shù)據(jù)進(jìn)入到了分區(qū):"+recordMetadata.topic());
                            }else {
                                e.printStackTrace();
                            }
                        }
                    });
            }else {
                producer.send(new ProducerRecord<>("testTopic", "迷茫的小黑狗" + i)
                        , new Callback() {
                            @Override
                            public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                                //todo 觀察是否進(jìn)入到了對應(yīng)的分區(qū)
                                if (e == null){
                                    System.out.println("數(shù)據(jù)進(jìn)入到了分區(qū):"+recordMetadata.topic());
                                }else {
                                    e.printStackTrace();
                                }
                            }
                        });
            }
        }
        //todo 關(guān)閉資源
        producer.close();
    }
}</pre>

4.1 如何提高生產(chǎn)者的吞吐量

提升吞吐量的目的就是為了能夠加快數(shù)據(jù)的發(fā)送婿着,如果我們把數(shù)據(jù)單次發(fā)送的能力加強(qiáng)了授瘦,是不是就代表著生產(chǎn)者的吞吐量加強(qiáng)了呢?通過這個思路竟宋,我們可以選擇增大雙端隊(duì)列的容積提完、延長Dquene的等待時(shí)間、加大Dquene中batch的塊容積丘侠,通過這幾種方法徒欣,就能夠強(qiáng)化吞吐量啦。

<pre data-language="java" id="rvQzk" class="ne-codeblock language-java" style="border: 1px solid #e8e8e8; border-radius: 2px; background: #f9f9f9; padding: 16px; font-size: 13px; color: #595959">import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class KafkaProducerStudyWithSentServen {
    public static void main(String[] args) {
        //todo 添加配置文件信息
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hdp1");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                StringSerializer.class.getName());
        properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,
                MyPartitioner.class.getName());
        //將默認(rèn)的16k增大到32K
        properties.put(ProducerConfig.BATCH_SIZE_CONFIG,16384 * 2 );
        //增大等待時(shí)間蜗字,從默認(rèn)的0(不生效)改為5-100ms
        properties.put(ProducerConfig.LINGER_MS_CONFIG,5);
        //增大雙端隊(duì)列的緩沖區(qū)大小打肝,由默認(rèn)的32變成64
        properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432 * 2);
        //開啟壓縮,增大單批次數(shù)據(jù)傳輸?shù)臄?shù)量
        properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"snappy");

        //todo 創(chuàng)建kafka生產(chǎn)者
        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);

        producer.send(new ProducerRecord<>("testTopic","迷茫的小黑狗"));
        //todo 關(guān)閉資源
        producer.close();
    }
}
</pre>

4.2 數(shù)據(jù)的可靠性

既然提交了吞吐量挪捕,就代表著數(shù)據(jù)發(fā)送的能力變強(qiáng)了粗梭。不過能力變強(qiáng)了,準(zhǔn)度可不能下降级零,所以還要研究一下數(shù)據(jù)傳輸?shù)目煽啃远弦健2贿^大家要記住哈,這個可靠性不是精準(zhǔn)一次性妄讯,它只會代表數(shù)據(jù)不丟失孩锡,不會讓數(shù)據(jù)只有一次。

在kafka生產(chǎn)者可靠性這一部分亥贸,引入了一個ack機(jī)制和ISR隊(duì)列兩個概念躬窜,所以在具體說之前,我要給大家講一下這兩個機(jī)制都是干嘛的:

ack:kafka對producer發(fā)送數(shù)據(jù)的一種應(yīng)答機(jī)制炕置,不同的返回結(jié)果代表了kafka對發(fā)送過來的數(shù)據(jù)的不同的處理規(guī)則荣挨。

ISR:所有分區(qū)的副本集合,如果一個topic里面有三個分區(qū)朴摊,每個分區(qū)有三個副本默垄,那么它的ISR隊(duì)列就是【0,1甚纲,2】口锭,是一個分區(qū)的Leader和Follower的集合。

明確了這兩個概念之后,就可以繼續(xù)往下說啦鹃操,sender線程的seletor方法將數(shù)據(jù)發(fā)送到kafka的broker之后韭寸,broker會返回一個ack碼,這個ack碼有三種不同的內(nèi)容荆隘,分別是 0 恩伺,1 ,-1椰拒。

0:生產(chǎn)者發(fā)送過來數(shù)據(jù)之后就不管了晶渠,丟失與否不重要。

1:Leader把生產(chǎn)者發(fā)送過來的數(shù)據(jù)保存下來之后燃观,就返回給producer褒脯。Follower是否同步不重要。

-1(all):只有當(dāng)leader接收完成數(shù)據(jù)仪壮,所有的Follower都備份完了憨颠,才會返回這個。

這三種ack碼可以由我們來指定积锅,當(dāng)我們確定時(shí)候那種策略之后爽彤,kafka生產(chǎn)者就會尊崇那種策略,并且依照策略進(jìn)行工作缚陷。但是0和1都會面臨數(shù)據(jù)丟失的問題适篙,所以如果想要保證數(shù)據(jù)最少一次性,那就要使用ack碼為-1的這種情況箫爷。

代碼實(shí)現(xiàn)如下:

<pre data-language="java" id="qXUp8" class="ne-codeblock language-java" style="border: 1px solid #e8e8e8; border-radius: 2px; background: #f9f9f9; padding: 16px; font-size: 13px; color: #595959">public class KafkaProducerAck {
    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hdp1");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                StringSerializer.class.getName());
        properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,
                MyPartitioner.class.getName());
        //todo 設(shè)置ack為 -1
        properties.put(ProducerConfig.ACKS_CONFIG,"all");
        //todo 設(shè)置發(fā)送失敗的重試次數(shù)
        properties.put(ProducerConfig.RETRIES_CONFIG,10);

        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);

        for (int i = 0; i < 100; i++) {
            kafkaProducer.send(new ProducerRecord<>("testTopic","number:"+i));
        }

        kafkaProducer.close();
    }
}</pre>

4.3 數(shù)據(jù)的唯一性

ack機(jī)制只能保證數(shù)據(jù)不丟失嚷节,也就是數(shù)據(jù)的最少一次性,但是也正是因?yàn)檫@個最少一次性虎锚,就會造成數(shù)據(jù)丟失的隱患硫痰。所以就必須使用某種技術(shù),來輔助ack機(jī)制完成數(shù)據(jù)的精準(zhǔn)一次性窜护。

那這個時(shí)候就又要請出老哥倆了效斑,冪等和事務(wù)。

4.3.1 冪等

冪等是默認(rèn)開啟的柱徙,它會保證數(shù)據(jù)無論是發(fā)送了多少次缓屠,在broker里面也就只有一條,不會讓數(shù)據(jù)重復(fù)护侮。也就是說敌完,它是通過下面的這種方式來完成數(shù)據(jù)的精準(zhǔn)一次性的:

精準(zhǔn)一次性 = 冪等 + 最少一次性

不過冪等性維持的這個精準(zhǔn)一次性僅僅只能在單會話、單分區(qū)內(nèi)有效羊初,單分區(qū)內(nèi)是無可厚非的滨溉,因?yàn)榉謪^(qū)間數(shù)據(jù)本來就是沒什么關(guān)系的。只不過單會話就比較慘了,如果kafka一重啟业踏,可能會導(dǎo)致有些數(shù)據(jù)就會重復(fù)禽炬,這是因?yàn)閮绲鹊呐袛鄻?biāo)準(zhǔn)是:

<PID,Partition,SeqNumber>這樣一個類似于元組的信息作為判斷主鍵涧卵,如果主鍵一樣那這條數(shù)據(jù)就不會被寫入啦勤家。在三個參數(shù)中柳恐,SeqNumber是一直單調(diào)遞增的,Partition是分區(qū)號讼庇,正好對應(yīng)了同分區(qū)內(nèi)部的這樣一個概念,兒PID是kafka每啟動一次就會重新分配的東西蠕啄,所以冪等只能保證是在單分區(qū)、單會話內(nèi)歼跟。

4.3.2 事務(wù)

如果向開啟事務(wù)格遭,就要讓冪等性保證開啟,不過冪等默認(rèn)就是開啟的拒迅,所以做什么改變,直接開啟事務(wù)就好璧微。當(dāng)使用事務(wù)之后,一旦發(fā)生故障胞得,那么在同一個事務(wù)批次里面在故障前寫入的數(shù)據(jù)就會回滾,回滾就會讓重復(fù)寫入的數(shù)據(jù)消失开瞭,所以也就實(shí)現(xiàn)了精準(zhǔn)一次性懒震。

只不過事務(wù)需要多加入一個內(nèi)容,就是自定義一個唯一的transactional.id嗤详,有了它个扰,重啟之后也能找到原來的位置,這樣就使得數(shù)據(jù)的精準(zhǔn)一次性不會再像冪等一樣僅局限在單會話里面了葱色。

雖然好用递宅,但是還是有點(diǎn)繁瑣的,因?yàn)槿绻褂檬聞?wù)的話,還要加入一個事務(wù)協(xié)調(diào)器(transaction coordinator)的概念办龄,下面我就按照開啟事務(wù)的場景烘绽,給大家說一下開啟事務(wù)后的提交流程。

1.producer向事務(wù)協(xié)調(diào)器請求producer id俐填,事務(wù)協(xié)調(diào)器會為不同的producer返回對應(yīng)的PID安接。

2.produce得到PID之后,就會向topic里面的Leader分區(qū)發(fā)送數(shù)據(jù)英融,當(dāng)數(shù)據(jù)發(fā)送一階段之后盏檐,生產(chǎn)者請求提交事務(wù)。

3.事務(wù)協(xié)調(diào)器會處理這個事務(wù)提交請求驶悟,它會向kafka中存儲事務(wù)信息的特殊主題發(fā)送請求胡野,希望可以持久話這次事務(wù)提交請求。

4.這個topic內(nèi)部默認(rèn)由50個分區(qū)痕鳍,每一個分區(qū)都負(fù)責(zé)處理一部分事務(wù)硫豆。當(dāng)事務(wù)提交請求發(fā)送過來的時(shí)候,就會根據(jù)這個事務(wù)TID的hash值笼呆,對50取模熊响,算出自己該保存在哪個partition中,那這個partition的leader副本所在的broker節(jié)點(diǎn)抄邀,就是這個TID對應(yīng)的事務(wù)協(xié)調(diào)器節(jié)點(diǎn)耘眨。

<pre data-language="java" id="EaW1t" class="ne-codeblock language-java" style="border: 1px solid #e8e8e8; border-radius: 2px; background: #f9f9f9; padding: 16px; font-size: 13px; color: #595959">//事務(wù)實(shí)現(xiàn)的代碼如下
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class KafkaProducerTran {
    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hdp1");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                StringSerializer.class.getName());
        //todo 定義producer 的 Transaction ID
        properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,
                "Tid_0");

        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
        //todo 初始化事務(wù)
        kafkaProducer.initTransactions();
        //todo 開啟事務(wù)
        kafkaProducer.beginTransaction();
        try {
            for (int o = 5 ; o < 100 ; o ++){
                kafkaProducer.send(new ProducerRecord<>("testTopic","number:" + o));
            }
            //定義一個異常,看看事務(wù)有沒有回滾
            int i = 1 / 0;
            //todo 提交事務(wù)
            kafkaProducer.commitTransaction();
        }catch (Exception e){
            //todo 終止事務(wù)
            kafkaProducer.abortTransaction();
        }finally {
            //關(guān)閉資源
            kafkaProducer.close();
        }
    }</pre>

4.4 生產(chǎn)者順序

生產(chǎn)者順序發(fā)送數(shù)據(jù)境肾,就是生產(chǎn)者發(fā)送數(shù)據(jù)保障的最后一個階段啦剔难。這個順序就是單分區(qū)內(nèi)有序,多分區(qū)間無序的偶宫。那么如果解決數(shù)據(jù)亂序呢纯趋?還記得之前提到的sender線程中維護(hù)了一個network client么吵冒,它的內(nèi)部就是存儲的一個又一個的請求痹栖,這個請求就是數(shù)據(jù)向kafka中發(fā)送數(shù)據(jù)的請求順序揪阿,如果kafka的broker按照這個順序來讀取數(shù)據(jù)南捂,數(shù)據(jù)就不會出現(xiàn)亂序的問題溺健。

但是這一組(5個為一組)中的請求矿瘦,如果在讀取第三個請求中的數(shù)據(jù)的時(shí)候出錯了缚去,等到第三個請求重試完成了易结,第4搞动、5個請求都發(fā)送完了怎么辦鹦肿?

這不是問題箩溃,因?yàn)樵?.x版本之后涣旨,kafka內(nèi)部會緩存這些請求的順序,如果發(fā)生了12453的情況止状,那kafka是不會向下游發(fā)送數(shù)據(jù)的怯疤,而是會按照緩存的順序旅薄,將處理過的請求排序,在向下游發(fā)送矫付。

5.結(jié)尾

截止到目前位置买优,與kafka生產(chǎn)者有關(guān)的內(nèi)容就說完啦杀赢。明天我會從kafka本身的角度來講講kafka處理數(shù)據(jù)時(shí)候是個什么樣子的情況脂崔。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末砌左,一起剝皮案震驚了整個濱河市汇歹,隨后出現(xiàn)的幾起案子产弹,更是在濱河造成了極大的恐慌痰哨,老刑警劉巖作谭,帶你破解...
    沈念sama閱讀 218,941評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異锐秦,居然都是意外死亡酱床,警方通過查閱死者的電腦和手機(jī)扇谣,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,397評論 3 395
  • 文/潘曉璐 我一進(jìn)店門罐寨,熙熙樓的掌柜王于貴愁眉苦臉地迎上來鸯绿,“玉大人瓶蝴,你說我怎么就攤上這事舷手【鬯” “怎么了蝎宇?”我有些...
    開封第一講書人閱讀 165,345評論 0 356
  • 文/不壞的土叔 我叫張陵,是天一觀的道長汇鞭。 經(jīng)常有香客問我霍骄,道長读整,這世上最難降的妖魔是什么米间? 我笑而不...
    開封第一講書人閱讀 58,851評論 1 295
  • 正文 為了忘掉前任的榛,我火速辦了婚禮逻锐,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘晓淀。我一直安慰自己纲熏,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,868評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著鱼填,像睡著了一般毅戈。 火紅的嫁衣襯著肌膚如雪苇经。 梳的紋絲不亂的頭發(fā)上扇单,一...
    開封第一講書人閱讀 51,688評論 1 305
  • 那天蜘澜,我揣著相機(jī)與錄音鄙信,去河邊找鬼装诡。 笑死,一個胖子當(dāng)著我的面吹牛蚓土,可吹牛的內(nèi)容都是我干的蜀漆。 我是一名探鬼主播确丢,決...
    沈念sama閱讀 40,414評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼褂始!你這毒婦竟也來了崎苗?” 一聲冷哼從身側(cè)響起胆数,我...
    開封第一講書人閱讀 39,319評論 0 276
  • 序言:老撾萬榮一對情侶失蹤必尼,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后育谬,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體膛檀,經(jīng)...
    沈念sama閱讀 45,775評論 1 315
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡宿刮,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,945評論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了踩叭。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片容贝。...
    茶點(diǎn)故事閱讀 40,096評論 1 350
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖斤富,靈堂內(nèi)的尸體忽然破棺而出满力,到底是詐尸還是另有隱情轻纪,我是刑警寧澤刻帚,帶...
    沈念sama閱讀 35,789評論 5 346
  • 正文 年R本政府宣布,位于F島的核電站航厚,受9級特大地震影響衙吩,放射性物質(zhì)發(fā)生泄漏坤塞。R本人自食惡果不足惜澈蚌,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,437評論 3 331
  • 文/蒙蒙 一浮禾、第九天 我趴在偏房一處隱蔽的房頂上張望盈电。 院中可真熱鬧匆帚,春花似錦吸重、人聲如沸嚎幸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,993評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽坡疼。三九已至,卻和暖如春柄瑰,著一層夾襖步出監(jiān)牢的瞬間教沾,已是汗流浹背译断。 一陣腳步聲響...
    開封第一講書人閱讀 33,107評論 1 271
  • 我被黑心中介騙來泰國打工堪唐, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留翎蹈,地道東北人淮菠。 一個月前我還...
    沈念sama閱讀 48,308評論 3 372
  • 正文 我出身青樓,卻偏偏與公主長得像荤堪,于是被迫代替她去往敵國和親合陵。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,037評論 2 355

推薦閱讀更多精彩內(nèi)容