1.介紹
這篇文章就是為了幫助大家來理解Produce存在的意義以及工作的方式來進(jìn)行編寫的淮韭,整篇文章也會從整體組成、運(yùn)行時(shí)架構(gòu)兩個方面去進(jìn)行講解,希望大家能夠看懂渤早,如果有什么錯誤的描述,希望大家可以留言告訴我瘫俊,讓我能夠及時(shí)的做出更改鹊杖。
2.整體組成
kafka的生產(chǎn)者主要就是負(fù)責(zé)給kafka中發(fā)送數(shù)據(jù)的,其核心的本質(zhì)工作內(nèi)容就是快速且不丟失完成自己的工作內(nèi)容扛芽。我還是先畫一個組成架構(gòu)圖骂蓖,方便大家理解。
這個圖差不多就是這個樣子的川尖,也是從根本上描述了生產(chǎn)者到kafka之間的工作結(jié)構(gòu)登下,并且通過箭頭還能夠讀出工作流程。接下來我們就先說結(jié)構(gòu)叮喳,再說流程被芳。
2.1 結(jié)構(gòu)
從結(jié)構(gòu)上看呢,kafka的生產(chǎn)者主要有兩個線程來完成工作馍悟,一個是main線程畔濒,另外一個是sender線程。main線程呢赋朦,主要負(fù)責(zé)數(shù)據(jù)的讀取篓冲、攔截李破、序列化和發(fā)送,并且在啟動的時(shí)候壹将,會創(chuàng)建一個雙端隊(duì)列RecordAccmulator嗤攻,所有的數(shù)據(jù)都是發(fā)送到這個雙端隊(duì)列中來的,這個雙端隊(duì)列的內(nèi)存默認(rèn)是32M诽俯,它的內(nèi)部是一個一個的Dquene妇菱,而每個Dquene中都有很多個塊,也就是ProducerBatch暴区。這個塊默認(rèn)是16K闯团。當(dāng)它符合某種規(guī)則的時(shí)候,數(shù)據(jù)就會從Dquene中進(jìn)入到下一個階段仙粱,這個規(guī)則有兩種房交,一種是當(dāng)batch的大小到達(dá)觸發(fā)發(fā)送機(jī)制的大小的時(shí)候數(shù)據(jù)被發(fā)送,另外一種就是設(shè)置的等待時(shí)間到了伐割,數(shù)據(jù)就會被發(fā)送候味。上述內(nèi)容記本上就是main線程要做的事情了。
至于sender線程隔心,它會不斷的從雙端隊(duì)列中拉取數(shù)據(jù)發(fā)送給KafkaBroker白群,但是單純的拉取數(shù)據(jù)肯定是不夠嚴(yán)謹(jǐn)?shù)摹.?dāng)sender線程開始工作的時(shí)候硬霍,它主要要做兩件事情帜慢,第一件事情就是創(chuàng)建networkClient用于存儲對kafka發(fā)送數(shù)據(jù)的請求,這個請求默認(rèn)是5個唯卖。第二件事是創(chuàng)建一個selector用于發(fā)送數(shù)據(jù)粱玲,并且等待kafka的回應(yīng),如果kafka回應(yīng)了耐床,證明數(shù)據(jù)寫入完成密幔,那這個時(shí)候就會清理掉請求列表中的請求,并且通知雙端隊(duì)列清除已經(jīng)寫完數(shù)據(jù)的對應(yīng)的Dquene中的batch撩轰。如果發(fā)送數(shù)據(jù)失敗了,那selector就i會觸發(fā)重試昧廷,這個重試的次數(shù)就是int類型的最大值堪嫂。
2.2 工作流程
降完了組成就要將工作流程了,工作流程就干了一件事兒木柬,那就是將生產(chǎn)者的數(shù)據(jù)發(fā)送給Kafka皆串。
1.kafkaProducer創(chuàng)建main線程和sender線程用于數(shù)據(jù)的發(fā)送工作。
2.main線程創(chuàng)建對應(yīng)的攔截器眉枕、序列化器恶复、分區(qū)器怜森,并且會創(chuàng)建一個能夠緩存發(fā)送數(shù)據(jù)的雙端隊(duì)列。
3.然后分區(qū)器向雙端隊(duì)列中寫入數(shù)據(jù)谤牡,在雙端隊(duì)列的內(nèi)部由一個一個的Dquene消息隊(duì)列來存儲這些數(shù)據(jù)副硅。
4.sender線程會周期性的監(jiān)控這個雙端隊(duì)列,一旦達(dá)到觸發(fā)拉取的時(shí)間或者大小就會向kafka的producer發(fā)送請求翅萤,請求發(fā)送數(shù)據(jù)。
5.這些請求內(nèi)容會被存儲在networkCLient中。
6.一旦kafka能夠接收數(shù)據(jù)雄妥,就由sender線程的selector來負(fù)責(zé)數(shù)據(jù)的發(fā)送工作索抓。
7.當(dāng)kafka接收完成數(shù)據(jù)之后,會發(fā)送ack驗(yàn)證碼給selector胚泌,seletor會根據(jù)是否接收到ack碼來確定數(shù)據(jù)是否發(fā)送成功省咨。
8.數(shù)據(jù)發(fā)送成功之后,selector會刪除network中的請求玷室,通知main線程刪除雙端隊(duì)列中的Dquene中的batch中的數(shù)據(jù)茸炒;如果失敗,那就重試阵苇,默認(rèn)可以重試Int類型的最大值次壁公。
3.數(shù)據(jù)發(fā)送方式
producer發(fā)送數(shù)據(jù)的方式有兩種,分別是異步發(fā)送和同步發(fā)送绅项。并且還要區(qū)分是否需要回調(diào)信息紊册,那接下來,就讓我一個一個的分別介紹給大家快耿。
3.1 不帶回調(diào)信息的異步發(fā)送
大家注意一下哈囊陡,這個異不異步不是針對sender到kafka的異步,而是外部數(shù)據(jù)到雙端隊(duì)列的異步掀亥。因?yàn)樯a(chǎn)者發(fā)送數(shù)據(jù)實(shí)際上就是往雙端隊(duì)列中寫入數(shù)據(jù)撞反,然后sender從雙端隊(duì)列中拿取數(shù)據(jù)給kafka的過程,因?yàn)檫@個異步就是圍繞著雙端隊(duì)列進(jìn)行劃分的搪花。在這一階段遏片,異步就是指外部數(shù)據(jù)一直往雙端隊(duì)列中寫入數(shù)據(jù),并不會在乎Dquene中的數(shù)據(jù)是否成功寫入到kafka中撮竿。
如果用代碼實(shí)現(xiàn)的話吮便,大家可以看一下:
<pre data-language="java" id="LvH9u" class="ne-codeblock language-java" style="border: 1px solid #e8e8e8; border-radius: 2px; background: #f9f9f9; padding: 16px; font-size: 13px; color: #595959">//1.引入依賴
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.0.0</version>
</dependency>
//2.編寫程序
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class KafkaProducerStudy {
public static void main(String[] args) {
//todo 0.創(chuàng)建配置類
Properties properties = new Properties();
//kafka的地址
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"had1:9092");
//kafka序列化
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
//todo 創(chuàng)建kafka生產(chǎn)者對象
KafkaProducer<String, String> kafkaProducer = new
KafkaProducer<String, String>(properties);
for (int i = 0; i < 5; i++) {
kafkaProducer.send(
new ProducerRecord<>("testTopic","number"+i)
);
}
//todo 關(guān)閉資源
kafkaProducer.close();
}
}
</pre>
是不是看起來沒什么感覺?哈哈幢踏,我也覺著沒什么感覺髓需,但是帶回調(diào)信息的異步發(fā)送就有一點(diǎn)點(diǎn)不一樣了,不過因?yàn)樾枰却卣{(diào)信息的返回房蝉,所以速度會慢一些僚匆。
3.2 異步發(fā)送帶回調(diào)信息
這個回調(diào)函數(shù)會在producer的selector接收到ack機(jī)制的時(shí)候被調(diào)用微渠,調(diào)用方式也是異步的。如果返回的信息是異常的話咧擂,就代表著數(shù)據(jù)發(fā)送失敗了逞盆。如果失敗了,程序會自動重試屋确。
具體的代碼如下:
<pre data-language="java" id="XeBCc" class="ne-codeblock language-java" style="border: 1px solid #e8e8e8; border-radius: 2px; background: #f9f9f9; padding: 16px; font-size: 13px; color: #595959">import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class KafkaProducerStudyWithSentTwo {
public static void main(String[] args) {
//todo 1.配置類
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hdp1");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getClass());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
for (int i = 0; i < 10; i++) {
producer.send(new ProducerRecord<>("testTopic","number:"+i)
, new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e == null){
System.out.println("主題:"+recordMetadata.topic()+"->"+
"分區(qū):"+recordMetadata.partition());
}else {
e.printStackTrace();
}
}
});
}
//todo 關(guān)閉生產(chǎn)者
producer.close();
}
}</pre>
3.3 同步發(fā)送API
同步發(fā)送就需要雙端隊(duì)列等到kafka返回?cái)?shù)據(jù)保存成功的消息之后纳击,外部數(shù)據(jù)才能夠把數(shù)據(jù)發(fā)送到雙端隊(duì)列中。而且同步發(fā)送在代碼上就多了一個get
<pre data-language="java" id="JkMJa" class="ne-codeblock language-java" style="border: 1px solid #e8e8e8; border-radius: 2px; background: #f9f9f9; padding: 16px; font-size: 13px; color: #595959">import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class KafkaProducerStudyWithSentThr {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//todo 添加配置文件信息
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hdp1");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
//todo 創(chuàng)建kafka生產(chǎn)者
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
for (int i = 0; i < 5; i++) {
producer.send(new ProducerRecord<>("testTopic","number:"+i)).get();
}
//todo 關(guān)閉資源
producer.close();
}
}</pre>
4.生產(chǎn)者的分區(qū)
生產(chǎn)者的分區(qū)主要是針對kafka的topic來實(shí)現(xiàn)的攻臀,如果一份數(shù)據(jù)只固定的發(fā)送給一個topic的一個partition的話焕数,那效率是很低下的。所以生產(chǎn)者發(fā)送數(shù)據(jù)的時(shí)候刨啸,是針對topic中所有的leader分區(qū)來進(jìn)行拆解發(fā)送的堡赔,這樣不僅會讓發(fā)送的數(shù)據(jù)能夠分散在多個broker上,實(shí)現(xiàn)負(fù)責(zé)均衡设联。還能夠提高并行度善已,畢竟是向多個partition同時(shí)發(fā)送數(shù)據(jù),肯定要比向一個partition發(fā)送來的快得多离例。
不過既然要通過使用分區(qū)的方式完成數(shù)據(jù)的發(fā)送换团,那么就需要有對應(yīng)的發(fā)送策略還維持生產(chǎn)者發(fā)送數(shù)據(jù)時(shí)的平衡。Kafka生產(chǎn)者的默認(rèn)分區(qū)策略是:
1.指明分區(qū)的時(shí)候宫蛆,直接向這個分區(qū)寫入數(shù)據(jù)艘包,所有的數(shù)據(jù)都如此寫入。
2.當(dāng)沒有指明分區(qū)的時(shí)候耀盗,如果有key值想虎,可以對key值取Hash,然后對分區(qū)數(shù)取模叛拷,這樣所有的數(shù)據(jù)就能夠進(jìn)入到不同的分區(qū)中去了舌厨。
3.如果既沒有key也沒有partition,那么kafka就會采用黏粘性分區(qū)器(Sticky)忿薇,隨機(jī)選擇一個分區(qū)裙椭,然后一直向這個分區(qū)中寫入數(shù)據(jù),如果雙端隊(duì)列中的Dquene滿了煌恢,就換一個其他的分區(qū)繼續(xù)寫骇陈,以此類推。
下面用幾段代碼來描述一下不同的場景:
<pre data-language="java" id="vxmOq" class="ne-codeblock language-java" style="border: 1px solid #e8e8e8; border-radius: 2px; background: #f9f9f9; padding: 16px; font-size: 13px; color: #595959">//指定topic
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class KafkaProducerStudyWithSentFor {
public static void main(String[] args) {
//todo 添加配置文件信息
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hdp1");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
//todo 創(chuàng)建kafka生產(chǎn)者
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
for (int i = 0 ; i< 5 ; i++){
//僅指定partition
producer.send(new ProducerRecord<>("testTopic", 0, "", "number:" + i)
, new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
//todo 觀察是否進(jìn)入到了對應(yīng)的分區(qū)
if (e == null){
System.out.println("數(shù)據(jù)進(jìn)入到了分區(qū):"+recordMetadata.topic());
}else {
e.printStackTrace();
}
}
});
}
//todo 關(guān)閉ziyuan
producer.close();
}
}</pre>
<pre data-language="java" id="jZDNu" class="ne-codeblock language-java" style="border: 1px solid #e8e8e8; border-radius: 2px; background: #f9f9f9; padding: 16px; font-size: 13px; color: #595959">//todo 沒指明partition的瑰抵,但是指明了key
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class KafkaProducerStudyWithSentFive {
public static void main(String[] args) {
//todo 添加配置文件信息
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hdp1");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
//todo 創(chuàng)建kafka生產(chǎn)者
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
for (int i = 0 ; i< 5 ; i++){
//僅指定key,觀察數(shù)據(jù)是否都進(jìn)入到了一個分區(qū)內(nèi)部
producer.send(new ProducerRecord<>("testTopic", "a", "number:" + i)
, new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
//todo 觀察是否進(jìn)入到了對應(yīng)的分區(qū)
if (e == null){
System.out.println("數(shù)據(jù)進(jìn)入到了分區(qū):"+recordMetadata.topic());
}else {
e.printStackTrace();
}
}
});
}
//todo 關(guān)閉ziyuan
producer.close();
}
}
</pre>
除此之外器联,kafka生產(chǎn)者還能夠自己定義分區(qū)器二汛,通過如下方法婿崭,就能夠讓包含博主昵稱的數(shù)據(jù)進(jìn)入到0號分區(qū)了。
<pre data-language="java" id="OauTG" class="ne-codeblock language-java" style="border: 1px solid #e8e8e8; border-radius: 2px; background: #f9f9f9; padding: 16px; font-size: 13px; color: #595959">//todo 實(shí)現(xiàn)partitioner接口肴颊,重寫patition方法
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import java.util.Map;
public class MyPartitioner implements Partitioner {
@Override
public int partition(String s, Object o, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster) {
String msgValue = s.toString();
int partition;
if (msgValue.contains("迷茫的小黑狗")){
partition = 0;
}else {
partition = 1;
}
return partition;
}
//關(guān)閉資源
@Override
public void close() {
}
//配置方法
@Override
public void configure(Map<String, ?> map) {
}
}
</pre>
那既然創(chuàng)建了自定義的分區(qū)器氓栈,就要應(yīng)用到程序中,具體的應(yīng)用方法如下段代碼所示:
<pre data-language="java" id="aClnC" class="ne-codeblock language-java" style="border: 1px solid #e8e8e8; border-radius: 2px; background: #f9f9f9; padding: 16px; font-size: 13px; color: #595959">public class KafkaProducerStudyWithSentSix {
public static void main(String[] args) {
//todo 添加配置文件信息
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hdp1");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,
MyPartitioner.class.getName());
//todo 創(chuàng)建kafka生產(chǎn)者
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
for (int i = 0 ; i< 5 ; i++){
if (i % 2 ==0){
//不包含博主昵稱的
producer.send(new ProducerRecord<>("testTopic", "number:" + i)
, new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
//todo 觀察是否進(jìn)入到了對應(yīng)的分區(qū)
if (e == null){
System.out.println("數(shù)據(jù)進(jìn)入到了分區(qū):"+recordMetadata.topic());
}else {
e.printStackTrace();
}
}
});
}else {
producer.send(new ProducerRecord<>("testTopic", "迷茫的小黑狗" + i)
, new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
//todo 觀察是否進(jìn)入到了對應(yīng)的分區(qū)
if (e == null){
System.out.println("數(shù)據(jù)進(jìn)入到了分區(qū):"+recordMetadata.topic());
}else {
e.printStackTrace();
}
}
});
}
}
//todo 關(guān)閉資源
producer.close();
}
}</pre>
4.1 如何提高生產(chǎn)者的吞吐量
提升吞吐量的目的就是為了能夠加快數(shù)據(jù)的發(fā)送婿着,如果我們把數(shù)據(jù)單次發(fā)送的能力加強(qiáng)了授瘦,是不是就代表著生產(chǎn)者的吞吐量加強(qiáng)了呢?通過這個思路竟宋,我們可以選擇增大雙端隊(duì)列的容積提完、延長Dquene的等待時(shí)間、加大Dquene中batch的塊容積丘侠,通過這幾種方法徒欣,就能夠強(qiáng)化吞吐量啦。
<pre data-language="java" id="rvQzk" class="ne-codeblock language-java" style="border: 1px solid #e8e8e8; border-radius: 2px; background: #f9f9f9; padding: 16px; font-size: 13px; color: #595959">import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class KafkaProducerStudyWithSentServen {
public static void main(String[] args) {
//todo 添加配置文件信息
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hdp1");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,
MyPartitioner.class.getName());
//將默認(rèn)的16k增大到32K
properties.put(ProducerConfig.BATCH_SIZE_CONFIG,16384 * 2 );
//增大等待時(shí)間蜗字,從默認(rèn)的0(不生效)改為5-100ms
properties.put(ProducerConfig.LINGER_MS_CONFIG,5);
//增大雙端隊(duì)列的緩沖區(qū)大小打肝,由默認(rèn)的32變成64
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432 * 2);
//開啟壓縮,增大單批次數(shù)據(jù)傳輸?shù)臄?shù)量
properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"snappy");
//todo 創(chuàng)建kafka生產(chǎn)者
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
producer.send(new ProducerRecord<>("testTopic","迷茫的小黑狗"));
//todo 關(guān)閉資源
producer.close();
}
}
</pre>
4.2 數(shù)據(jù)的可靠性
既然提交了吞吐量挪捕,就代表著數(shù)據(jù)發(fā)送的能力變強(qiáng)了粗梭。不過能力變強(qiáng)了,準(zhǔn)度可不能下降级零,所以還要研究一下數(shù)據(jù)傳輸?shù)目煽啃远弦健2贿^大家要記住哈,這個可靠性不是精準(zhǔn)一次性妄讯,它只會代表數(shù)據(jù)不丟失孩锡,不會讓數(shù)據(jù)只有一次。
在kafka生產(chǎn)者可靠性這一部分亥贸,引入了一個ack機(jī)制和ISR隊(duì)列兩個概念躬窜,所以在具體說之前,我要給大家講一下這兩個機(jī)制都是干嘛的:
ack:kafka對producer發(fā)送數(shù)據(jù)的一種應(yīng)答機(jī)制炕置,不同的返回結(jié)果代表了kafka對發(fā)送過來的數(shù)據(jù)的不同的處理規(guī)則荣挨。
ISR:所有分區(qū)的副本集合,如果一個topic里面有三個分區(qū)朴摊,每個分區(qū)有三個副本默垄,那么它的ISR隊(duì)列就是【0,1甚纲,2】口锭,是一個分區(qū)的Leader和Follower的集合。
明確了這兩個概念之后,就可以繼續(xù)往下說啦鹃操,sender線程的seletor方法將數(shù)據(jù)發(fā)送到kafka的broker之后韭寸,broker會返回一個ack碼,這個ack碼有三種不同的內(nèi)容荆隘,分別是 0 恩伺,1 ,-1椰拒。
0:生產(chǎn)者發(fā)送過來數(shù)據(jù)之后就不管了晶渠,丟失與否不重要。
1:Leader把生產(chǎn)者發(fā)送過來的數(shù)據(jù)保存下來之后燃观,就返回給producer褒脯。Follower是否同步不重要。
-1(all):只有當(dāng)leader接收完成數(shù)據(jù)仪壮,所有的Follower都備份完了憨颠,才會返回這個。
這三種ack碼可以由我們來指定积锅,當(dāng)我們確定時(shí)候那種策略之后爽彤,kafka生產(chǎn)者就會尊崇那種策略,并且依照策略進(jìn)行工作缚陷。但是0和1都會面臨數(shù)據(jù)丟失的問題适篙,所以如果想要保證數(shù)據(jù)最少一次性,那就要使用ack碼為-1的這種情況箫爷。
代碼實(shí)現(xiàn)如下:
<pre data-language="java" id="qXUp8" class="ne-codeblock language-java" style="border: 1px solid #e8e8e8; border-radius: 2px; background: #f9f9f9; padding: 16px; font-size: 13px; color: #595959">public class KafkaProducerAck {
public static void main(String[] args) {
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hdp1");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,
MyPartitioner.class.getName());
//todo 設(shè)置ack為 -1
properties.put(ProducerConfig.ACKS_CONFIG,"all");
//todo 設(shè)置發(fā)送失敗的重試次數(shù)
properties.put(ProducerConfig.RETRIES_CONFIG,10);
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
for (int i = 0; i < 100; i++) {
kafkaProducer.send(new ProducerRecord<>("testTopic","number:"+i));
}
kafkaProducer.close();
}
}</pre>
4.3 數(shù)據(jù)的唯一性
ack機(jī)制只能保證數(shù)據(jù)不丟失嚷节,也就是數(shù)據(jù)的最少一次性,但是也正是因?yàn)檫@個最少一次性虎锚,就會造成數(shù)據(jù)丟失的隱患硫痰。所以就必須使用某種技術(shù),來輔助ack機(jī)制完成數(shù)據(jù)的精準(zhǔn)一次性窜护。
那這個時(shí)候就又要請出老哥倆了效斑,冪等和事務(wù)。
4.3.1 冪等
冪等是默認(rèn)開啟的柱徙,它會保證數(shù)據(jù)無論是發(fā)送了多少次缓屠,在broker里面也就只有一條,不會讓數(shù)據(jù)重復(fù)护侮。也就是說敌完,它是通過下面的這種方式來完成數(shù)據(jù)的精準(zhǔn)一次性的:
精準(zhǔn)一次性 = 冪等 + 最少一次性
不過冪等性維持的這個精準(zhǔn)一次性僅僅只能在單會話、單分區(qū)內(nèi)有效羊初,單分區(qū)內(nèi)是無可厚非的滨溉,因?yàn)榉謪^(qū)間數(shù)據(jù)本來就是沒什么關(guān)系的。只不過單會話就比較慘了,如果kafka一重啟业踏,可能會導(dǎo)致有些數(shù)據(jù)就會重復(fù)禽炬,這是因?yàn)閮绲鹊呐袛鄻?biāo)準(zhǔn)是:
<PID,Partition,SeqNumber>這樣一個類似于元組的信息作為判斷主鍵涧卵,如果主鍵一樣那這條數(shù)據(jù)就不會被寫入啦勤家。在三個參數(shù)中柳恐,SeqNumber是一直單調(diào)遞增的,Partition是分區(qū)號讼庇,正好對應(yīng)了同分區(qū)內(nèi)部的這樣一個概念,兒PID是kafka每啟動一次就會重新分配的東西蠕啄,所以冪等只能保證是在單分區(qū)、單會話內(nèi)歼跟。
4.3.2 事務(wù)
如果向開啟事務(wù)格遭,就要讓冪等性保證開啟,不過冪等默認(rèn)就是開啟的拒迅,所以做什么改變,直接開啟事務(wù)就好璧微。當(dāng)使用事務(wù)之后,一旦發(fā)生故障胞得,那么在同一個事務(wù)批次里面在故障前寫入的數(shù)據(jù)就會回滾,回滾就會讓重復(fù)寫入的數(shù)據(jù)消失开瞭,所以也就實(shí)現(xiàn)了精準(zhǔn)一次性懒震。
只不過事務(wù)需要多加入一個內(nèi)容,就是自定義一個唯一的transactional.id嗤详,有了它个扰,重啟之后也能找到原來的位置,這樣就使得數(shù)據(jù)的精準(zhǔn)一次性不會再像冪等一樣僅局限在單會話里面了葱色。
雖然好用递宅,但是還是有點(diǎn)繁瑣的,因?yàn)槿绻褂檬聞?wù)的話,還要加入一個事務(wù)協(xié)調(diào)器(transaction coordinator)的概念办龄,下面我就按照開啟事務(wù)的場景烘绽,給大家說一下開啟事務(wù)后的提交流程。
1.producer向事務(wù)協(xié)調(diào)器請求producer id俐填,事務(wù)協(xié)調(diào)器會為不同的producer返回對應(yīng)的PID安接。
2.produce得到PID之后,就會向topic里面的Leader分區(qū)發(fā)送數(shù)據(jù)英融,當(dāng)數(shù)據(jù)發(fā)送一階段之后盏檐,生產(chǎn)者請求提交事務(wù)。
3.事務(wù)協(xié)調(diào)器會處理這個事務(wù)提交請求驶悟,它會向kafka中存儲事務(wù)信息的特殊主題發(fā)送請求胡野,希望可以持久話這次事務(wù)提交請求。
4.這個topic內(nèi)部默認(rèn)由50個分區(qū)痕鳍,每一個分區(qū)都負(fù)責(zé)處理一部分事務(wù)硫豆。當(dāng)事務(wù)提交請求發(fā)送過來的時(shí)候,就會根據(jù)這個事務(wù)TID的hash值笼呆,對50取模熊响,算出自己該保存在哪個partition中,那這個partition的leader副本所在的broker節(jié)點(diǎn)抄邀,就是這個TID對應(yīng)的事務(wù)協(xié)調(diào)器節(jié)點(diǎn)耘眨。
<pre data-language="java" id="EaW1t" class="ne-codeblock language-java" style="border: 1px solid #e8e8e8; border-radius: 2px; background: #f9f9f9; padding: 16px; font-size: 13px; color: #595959">//事務(wù)實(shí)現(xiàn)的代碼如下
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class KafkaProducerTran {
public static void main(String[] args) {
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hdp1");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
//todo 定義producer 的 Transaction ID
properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,
"Tid_0");
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
//todo 初始化事務(wù)
kafkaProducer.initTransactions();
//todo 開啟事務(wù)
kafkaProducer.beginTransaction();
try {
for (int o = 5 ; o < 100 ; o ++){
kafkaProducer.send(new ProducerRecord<>("testTopic","number:" + o));
}
//定義一個異常,看看事務(wù)有沒有回滾
int i = 1 / 0;
//todo 提交事務(wù)
kafkaProducer.commitTransaction();
}catch (Exception e){
//todo 終止事務(wù)
kafkaProducer.abortTransaction();
}finally {
//關(guān)閉資源
kafkaProducer.close();
}
}</pre>
4.4 生產(chǎn)者順序
生產(chǎn)者順序發(fā)送數(shù)據(jù)境肾,就是生產(chǎn)者發(fā)送數(shù)據(jù)保障的最后一個階段啦剔难。這個順序就是單分區(qū)內(nèi)有序,多分區(qū)間無序的偶宫。那么如果解決數(shù)據(jù)亂序呢纯趋?還記得之前提到的sender線程中維護(hù)了一個network client么吵冒,它的內(nèi)部就是存儲的一個又一個的請求痹栖,這個請求就是數(shù)據(jù)向kafka中發(fā)送數(shù)據(jù)的請求順序揪阿,如果kafka的broker按照這個順序來讀取數(shù)據(jù)南捂,數(shù)據(jù)就不會出現(xiàn)亂序的問題溺健。
但是這一組(5個為一組)中的請求矿瘦,如果在讀取第三個請求中的數(shù)據(jù)的時(shí)候出錯了缚去,等到第三個請求重試完成了易结,第4搞动、5個請求都發(fā)送完了怎么辦鹦肿?
這不是問題箩溃,因?yàn)樵?.x版本之后涣旨,kafka內(nèi)部會緩存這些請求的順序,如果發(fā)生了12453的情況止状,那kafka是不會向下游發(fā)送數(shù)據(jù)的怯疤,而是會按照緩存的順序旅薄,將處理過的請求排序,在向下游發(fā)送矫付。
5.結(jié)尾
截止到目前位置买优,與kafka生產(chǎn)者有關(guān)的內(nèi)容就說完啦杀赢。明天我會從kafka本身的角度來講講kafka處理數(shù)據(jù)時(shí)候是個什么樣子的情況脂崔。