Kafka學習(一)

  • 下載安裝

wget https://downloads.apache.org/kafka/3.3.1/kafka_2.13-3.3.1.tgz
# 解壓
tar -zxvf  kafka_2.13-3.3.1.tgz -C /opt/module/
mv  kafka_2.13-3.3.1/ kafka
# 修改配置文件
cd config/
vi server.properties #設置broker.id没佑、log.dirs目錄毕贼、zookeeper.connect
 # 添加到系統(tǒng)變量
export KAFKA_HOME=/opt/module/kafka
export PATH=$PATH:$KAFKA_HOME/bin

  • kafka啟停腳本

#!/bin/bash

case $1 in 
"start")
    for i in hadoop1 hadoop2 hadoop3
    do
        echo "--- 啟動 $i kafka ---"
        ssh $i "/opt/module/kafka/bin/kafka-server-start.sh -daemon /opt/module/kafka/config/server.properties"
        done
;;
"stop")
    for i in hadoop1 hadoop2 hadoop3
    do
        echo "--- 停止 $i kafka ---"
        ssh $i "/opt/module/kafka/bin/kafka-server-stop.sh "
        done
;;
esac
  • kafka主題使用

# 創(chuàng)建主題
bin/kafka-topics.sh --bootstrap-server hadoop1:9092 --topic one --create --partitions 1 --replication-factor 3
# 查看所有主題
bin/kafka-topics.sh --bootstrap-server hadoop1:9092 --list
# 查看主題詳細描述
bin/kafka-topics.sh --bootstrap-server hadoop1:9092 --topic one --describe
  • kafka生產(chǎn)者使用

bin/kafka-console-producer.sh --bootstrap-server hadoop1:9092 --topic one 

生產(chǎn)者有main線程和sender線程,main中的分區(qū)器默認32M,DQuene默認16K
batch.size:16K,數(shù)據(jù)積累到16K后蛤奢,sender才會發(fā)送或者
linger.ms:發(fā)果長時間不到batch.size鬼癣,可以設置等待時間,默認0ms
應答機制
0:生產(chǎn)者不需要等待數(shù)據(jù)落盤應答
1:生產(chǎn)者要等Leader收到數(shù)據(jù)后應答
-1:生產(chǎn)者要等Leader和ISR隊列中所有節(jié)點收齊數(shù)據(jù)后應答

異步發(fā)送

# 導入依賴
<dependencies>  
    <dependency>  
        <groupId>org.apache.kafka</groupId>  
        <artifactId>kafka-clients</artifactId>  
        <version>3.0.0</version>  
    </dependency>  
</dependencies>

KafkaProducer類

//callback換成.get()就是同步
public class KafkaProducer {
 
    public static void main(String[] args) {
        Properties properties = new Properties();
        //連接集群
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop1:9092,hadoop2:9092");
        //指定序列化類型
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName()); 
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
        properties.put("acks", "all");
        properties.put("retries", 0);
        properties.put("batch.size", 16384);
1       properties.put("linger.ms", 1);
        properties.put("buffer.memory", 33554432);
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        KafkaProducer<String, String> kafkaProducer = new org.apache.kafka.clients.producer.KafkaProducer<String, String>(properties);
        for (int i = 1; i <= 600; i++) {
            //參數(shù)1:topic名, 參數(shù)2:消息文本啤贩; ProducerRecord多個重載的構造方法
            kafkaProducer.send(new ProducerRecord<String, String>("one", "message"+i),new Callback(){
                 @Override
                 public void onCompletion(RecordMetadata metadata, Exception exception){
                       if(exception == null){
                           System.out.println("主題:"+ metadata.topic() + "  分區(qū): "+metadata.partition());
                       }
                 }
            });
            System.out.println("message"+i);
        }
        kafkaProducer.close();
    }
}
  • kafka消費者使用

bin/kafka-console-consumer.sh --bootstrap-server hadoop1:9092 --topic one 
# --from-beginning 從開始(歷史數(shù)據(jù)也接收)接收
  • kafka自定義分區(qū)

properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.test.kafka.MyPartition");   //partition類名全路徑

public class MyPartition implements Partitioner {
  private Random random = new Random();
  @Override
  public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
       String msgValues = value.toString();
        int partition;
        if(msgValues.contains("hello")){
            partition = 0;
        }else{
            partition = 1; 
       }
  }

  @Override
  public void close() {}

  @Override
  public void configure(Map<String, ?> configs) {}
}
  • 提高生產(chǎn)者吞吐量

# KafkaProducer類中加入
//緩沖區(qū)大小
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432);
//批次大小
properties.put(ProducerConfig.BATCH_SIZE_CONFIG,16384);
//linger.ms
properties.put(ProducerConfig.LINGER_MS_CONFIG,1);
//壓縮,壓縮可配置gzip,snappy,lz4,zstd
properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"snappy");
  • ACK應答級別

1.acks=0,生產(chǎn)者發(fā)送過來的數(shù)據(jù)就不管待秃,可靠性差,效率高
2.acks=1,生產(chǎn)者發(fā)送過來數(shù)據(jù)Leader應答痹屹,可靠性中等章郁,效率中等
3.acks=-1,生產(chǎn)者發(fā)送過來數(shù)據(jù)Leader和ISR隊列里所有Follwer應答志衍,可靠性高暖庄,效率低。

//具體配置
properties.put(ProducerConfig.ACKS_CONFIG,"all");
//重試次數(shù)
properties.put(ProducerConfig.RETRIES_CONFIG,3);
  • kafka事務

注意:開啟事務足画,必須開啟冪等性,每個broker都有一個事務協(xié)調器雄驹,

//冪等性默認是開啟的
enable.idempotence=true

生產(chǎn)者事務

# KafkaProducer類中加入
//0.指定事務id
properties.put(ProducerConfig.TRANSCATIONAL_ID_CONFIG,"transcational_id");
//1.初始化事務
kafkaProducer.initTransaction();
//2.開啟事務
kafkaProducer.beginTransaction();
//3.發(fā)送數(shù)據(jù)
try{
    for (int i = 1; i <= 600; i++) {
            //參數(shù)1:topic名, 參數(shù)2:消息文本; ProducerRecord多個重載的構造方法
            kafkaProducer.send(new ProducerRecord<String, String>("one", "message"+i));
    }
    //4.提交事務
    kafkaProducer.commitTransaction();
}catch(Exception e){
    kafkaProducer.abortTransaction();
}finally{
    //關閉資源
    kafkaProducer.close();
}
  • 數(shù)據(jù)的有序與亂序

1.kafka在1.x版本之前保證數(shù)據(jù)單分區(qū)有序淹辞,條件是max.in.flight.requests.connection=1,無需考慮是否開啟冪等性
2.1.x以后的版本医舆,分為未開啟冪等性max.in.flight.requests.connection=1,需要設置為1俘侠,開啟冪等性,max.in.flight.requests.connection=需要設置小于等于5.
原因:因為1.x后蔬将,啟用冪等后爷速,kafka服務器會緩存producer發(fā)來的最近5個request的元數(shù)據(jù),
故無論如何霞怀,都可以保證最近5個request的數(shù)據(jù)都是有序的惫东。

?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市毙石,隨后出現(xiàn)的幾起案子廉沮,更是在濱河造成了極大的恐慌,老刑警劉巖徐矩,帶你破解...
    沈念sama閱讀 219,589評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件滞时,死亡現(xiàn)場離奇詭異,居然都是意外死亡滤灯,警方通過查閱死者的電腦和手機坪稽,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,615評論 3 396
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來鳞骤,“玉大人窒百,你說我怎么就攤上這事≡ゾ。” “怎么了篙梢?”我有些...
    開封第一講書人閱讀 165,933評論 0 356
  • 文/不壞的土叔 我叫張陵,是天一觀的道長美旧。 經(jīng)常有香客問我庭猩,道長,這世上最難降的妖魔是什么陈症? 我笑而不...
    開封第一講書人閱讀 58,976評論 1 295
  • 正文 為了忘掉前任蔼水,我火速辦了婚禮,結果婚禮上录肯,老公的妹妹穿的比我還像新娘趴腋。我一直安慰自己,他們只是感情好论咏,可當我...
    茶點故事閱讀 67,999評論 6 393
  • 文/花漫 我一把揭開白布优炬。 她就那樣靜靜地躺著,像睡著了一般厅贪。 火紅的嫁衣襯著肌膚如雪蠢护。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,775評論 1 307
  • 那天养涮,我揣著相機與錄音葵硕,去河邊找鬼眉抬。 笑死,一個胖子當著我的面吹牛懈凹,可吹牛的內容都是我干的蜀变。 我是一名探鬼主播,決...
    沈念sama閱讀 40,474評論 3 420
  • 文/蒼蘭香墨 我猛地睜開眼介评,長吁一口氣:“原來是場噩夢啊……” “哼库北!你這毒婦竟也來了?” 一聲冷哼從身側響起们陆,我...
    開封第一講書人閱讀 39,359評論 0 276
  • 序言:老撾萬榮一對情侶失蹤寒瓦,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后坪仇,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體孵构,經(jīng)...
    沈念sama閱讀 45,854評論 1 317
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 38,007評論 3 338
  • 正文 我和宋清朗相戀三年烟很,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片蜡镶。...
    茶點故事閱讀 40,146評論 1 351
  • 序言:一個原本活蹦亂跳的男人離奇死亡雾袱,死狀恐怖,靈堂內的尸體忽然破棺而出官还,到底是詐尸還是另有隱情芹橡,我是刑警寧澤,帶...
    沈念sama閱讀 35,826評論 5 346
  • 正文 年R本政府宣布望伦,位于F島的核電站林说,受9級特大地震影響,放射性物質發(fā)生泄漏屯伞。R本人自食惡果不足惜腿箩,卻給世界環(huán)境...
    茶點故事閱讀 41,484評論 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望劣摇。 院中可真熱鬧珠移,春花似錦、人聲如沸末融。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,029評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽勾习。三九已至浓瞪,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間巧婶,已是汗流浹背乾颁。 一陣腳步聲響...
    開封第一講書人閱讀 33,153評論 1 272
  • 我被黑心中介騙來泰國打工涂乌, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人钮孵。 一個月前我還...
    沈念sama閱讀 48,420評論 3 373
  • 正文 我出身青樓骂倘,卻偏偏與公主長得像,于是被迫代替她去往敵國和親巴席。 傳聞我的和親對象是個殘疾皇子历涝,可洞房花燭夜當晚...
    茶點故事閱讀 45,107評論 2 356

推薦閱讀更多精彩內容