-
下載安裝
wget https://downloads.apache.org/kafka/3.3.1/kafka_2.13-3.3.1.tgz
# 解壓
tar -zxvf kafka_2.13-3.3.1.tgz -C /opt/module/
mv kafka_2.13-3.3.1/ kafka
# 修改配置文件
cd config/
vi server.properties #設置broker.id没佑、log.dirs目錄毕贼、zookeeper.connect
# 添加到系統(tǒng)變量
export KAFKA_HOME=/opt/module/kafka
export PATH=$PATH:$KAFKA_HOME/bin
-
kafka啟停腳本
#!/bin/bash
case $1 in
"start")
for i in hadoop1 hadoop2 hadoop3
do
echo "--- 啟動 $i kafka ---"
ssh $i "/opt/module/kafka/bin/kafka-server-start.sh -daemon /opt/module/kafka/config/server.properties"
done
;;
"stop")
for i in hadoop1 hadoop2 hadoop3
do
echo "--- 停止 $i kafka ---"
ssh $i "/opt/module/kafka/bin/kafka-server-stop.sh "
done
;;
esac
-
kafka主題使用
# 創(chuàng)建主題
bin/kafka-topics.sh --bootstrap-server hadoop1:9092 --topic one --create --partitions 1 --replication-factor 3
# 查看所有主題
bin/kafka-topics.sh --bootstrap-server hadoop1:9092 --list
# 查看主題詳細描述
bin/kafka-topics.sh --bootstrap-server hadoop1:9092 --topic one --describe
-
kafka生產(chǎn)者使用
bin/kafka-console-producer.sh --bootstrap-server hadoop1:9092 --topic one
生產(chǎn)者有main線程和sender線程,main中的分區(qū)器默認32M,DQuene默認16K
batch.size:16K,數(shù)據(jù)積累到16K后蛤奢,sender才會發(fā)送或者
linger.ms:發(fā)果長時間不到batch.size鬼癣,可以設置等待時間,默認0ms
應答機制
0:生產(chǎn)者不需要等待數(shù)據(jù)落盤應答
1:生產(chǎn)者要等Leader收到數(shù)據(jù)后應答
-1:生產(chǎn)者要等Leader和ISR隊列中所有節(jié)點收齊數(shù)據(jù)后應答
異步發(fā)送
# 導入依賴
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.0.0</version>
</dependency>
</dependencies>
KafkaProducer類
//callback換成.get()就是同步
public class KafkaProducer {
public static void main(String[] args) {
Properties properties = new Properties();
//連接集群
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop1:9092,hadoop2:9092");
//指定序列化類型
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
properties.put("acks", "all");
properties.put("retries", 0);
properties.put("batch.size", 16384);
1 properties.put("linger.ms", 1);
properties.put("buffer.memory", 33554432);
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> kafkaProducer = new org.apache.kafka.clients.producer.KafkaProducer<String, String>(properties);
for (int i = 1; i <= 600; i++) {
//參數(shù)1:topic名, 參數(shù)2:消息文本啤贩; ProducerRecord多個重載的構造方法
kafkaProducer.send(new ProducerRecord<String, String>("one", "message"+i),new Callback(){
@Override
public void onCompletion(RecordMetadata metadata, Exception exception){
if(exception == null){
System.out.println("主題:"+ metadata.topic() + " 分區(qū): "+metadata.partition());
}
}
});
System.out.println("message"+i);
}
kafkaProducer.close();
}
}
-
kafka消費者使用
bin/kafka-console-consumer.sh --bootstrap-server hadoop1:9092 --topic one
# --from-beginning 從開始(歷史數(shù)據(jù)也接收)接收
-
kafka自定義分區(qū)
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.test.kafka.MyPartition"); //partition類名全路徑
public class MyPartition implements Partitioner {
private Random random = new Random();
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
String msgValues = value.toString();
int partition;
if(msgValues.contains("hello")){
partition = 0;
}else{
partition = 1;
}
}
@Override
public void close() {}
@Override
public void configure(Map<String, ?> configs) {}
}
-
提高生產(chǎn)者吞吐量
# KafkaProducer類中加入
//緩沖區(qū)大小
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432);
//批次大小
properties.put(ProducerConfig.BATCH_SIZE_CONFIG,16384);
//linger.ms
properties.put(ProducerConfig.LINGER_MS_CONFIG,1);
//壓縮,壓縮可配置gzip,snappy,lz4,zstd
properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"snappy");
-
ACK應答級別
1.acks=0,生產(chǎn)者發(fā)送過來的數(shù)據(jù)就不管待秃,可靠性差,效率高
2.acks=1,生產(chǎn)者發(fā)送過來數(shù)據(jù)Leader應答痹屹,可靠性中等章郁,效率中等
3.acks=-1,生產(chǎn)者發(fā)送過來數(shù)據(jù)Leader和ISR隊列里所有Follwer應答志衍,可靠性高暖庄,效率低。
//具體配置
properties.put(ProducerConfig.ACKS_CONFIG,"all");
//重試次數(shù)
properties.put(ProducerConfig.RETRIES_CONFIG,3);
-
kafka事務
注意:開啟事務足画,必須開啟冪等性,每個broker都有一個事務協(xié)調器雄驹,
//冪等性默認是開啟的
enable.idempotence=true
生產(chǎn)者事務
# KafkaProducer類中加入
//0.指定事務id
properties.put(ProducerConfig.TRANSCATIONAL_ID_CONFIG,"transcational_id");
//1.初始化事務
kafkaProducer.initTransaction();
//2.開啟事務
kafkaProducer.beginTransaction();
//3.發(fā)送數(shù)據(jù)
try{
for (int i = 1; i <= 600; i++) {
//參數(shù)1:topic名, 參數(shù)2:消息文本; ProducerRecord多個重載的構造方法
kafkaProducer.send(new ProducerRecord<String, String>("one", "message"+i));
}
//4.提交事務
kafkaProducer.commitTransaction();
}catch(Exception e){
kafkaProducer.abortTransaction();
}finally{
//關閉資源
kafkaProducer.close();
}
-
數(shù)據(jù)的有序與亂序
1.kafka在1.x版本之前保證數(shù)據(jù)單分區(qū)有序淹辞,條件是max.in.flight.requests.connection=1,無需考慮是否開啟冪等性
2.1.x以后的版本医舆,分為未開啟冪等性max.in.flight.requests.connection=1,需要設置為1俘侠,開啟冪等性,max.in.flight.requests.connection=需要設置小于等于5.
原因:因為1.x后蔬将,啟用冪等后爷速,kafka服務器會緩存producer發(fā)來的最近5個request的元數(shù)據(jù),
故無論如何霞怀,都可以保證最近5個request的數(shù)據(jù)都是有序的惫东。