架構(gòu)
生產(chǎn)者向broker發(fā)送消息玻熙,消費(fèi)者接收消息撬腾,broker是物理概念,部署幾個(gè)kafka即幾個(gè)broker,topic是邏輯概念顺囊,往topic里發(fā)送消息會發(fā)送到設(shè)置好的幾個(gè)partion上,每個(gè)partion存儲作為不同隊(duì)列存儲不同數(shù)據(jù),partion有l(wèi)eader和follower備份機(jī)制蕉拢,消息發(fā)送時(shí)會輪循發(fā)送到不同broker的不同partion中特碳,同一消費(fèi)者只能消費(fèi)同一分區(qū),通過offset記錄消費(fèi)位置晕换,消費(fèi)者組可以訪問一個(gè)topic的不同partion
docker中kafka的使用
啟動(dòng)鏡像
docker run -dit -p 9200:9200 kafka鏡像id (端口映射之前有說過)
啟動(dòng)kafka可以帶上參數(shù)午乓,這樣會自動(dòng)修改kafka里的配置文件(/opt/kafka_版本/conf/server.properties),否則不帶參數(shù)需要自己進(jìn)入進(jìn)行手動(dòng)修改 帶參數(shù)版啟動(dòng)可參考
docker run -dit --name kafka -p 9092:9092 -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_CONNECT=172.17.0.3:2181/kafka -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://172.17.0.4:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 wurstmeister/kafka
其中172.17.0.3需要改成自己docker的網(wǎng)橋連接地址
查看已啟動(dòng)容器
docker ps
查看所有容器
docker ps -a
啟動(dòng)未啟動(dòng)的容器
docker start 容器id
進(jìn)入kafka容器
docker exec -it 容器id bash(默認(rèn)進(jìn)入/目錄)
創(chuàng)建主題
./kafka-topics.sh --create --zookeeper 172.17.0.3:2181/kafka --replication-factor 1 --partitions 3 --topic sun
主題和分區(qū)可以理解為:topic是邏輯劃分,kafka通過topic進(jìn)行區(qū)分消息闸准,topic的數(shù)據(jù)會被存儲到日志中益愈,如果數(shù)據(jù)量太大可以引入partion(同時(shí)提高讀寫吞吐量)來分段存儲數(shù)據(jù)。其中replication-factor作用是將任意分區(qū)復(fù)制到broker上夷家,broker是物理概念蒸其,部署了一個(gè)kafka可認(rèn)為broker數(shù)為1,我本機(jī)只有一個(gè)kafka所以這里replication-factor超過1會報(bào)錯(cuò)库快。綜上幾個(gè)概念可以理解為:集群中有多個(gè)broker枣接,創(chuàng)建主題時(shí)可以指明topic有多個(gè)partitions(消息拆分到不同分區(qū)進(jìn)行存儲,一個(gè)partion只能被一個(gè)消費(fèi)者消費(fèi)--partion內(nèi)部保證接收數(shù)據(jù)順序)缺谴,可以為分區(qū)創(chuàng)建多個(gè)副本replication但惶,不同副本在不同的broker中(作為備份使用,這里有l(wèi)eader和flower的區(qū)分)湿蛔。
查看topic信息
./kafka-topics.sh --describe --zookeeper 172.17.0.3/kafka --topic sun
集群部署
可以通過compose集群化部署過es膀曾,這里通過創(chuàng)建另一個(gè)compose.yml文件來部署kafka,配置文件參考 docker-compose集群部署
docker-compose -f docker-compose.yml -f docker-compose.prod.yml up -d
spring 中kafka生產(chǎn)消費(fèi)
生產(chǎn)者:
./kafka-console-producer.sh --bootstrap-server node1:9092 --topic my-kafka-topic
消費(fèi)者:
方式一:從當(dāng)前主題的遷移量位置+1開始取數(shù)據(jù)
./kafka-console-consumer.sh --bootstrap-server node1:9092 --topic my-kafka-topic
方式二:從當(dāng)前主題第一條消息開始消費(fèi)
./kafka-console-consumer.sh --bootstrap-server node1:9092 --from-beginning --topic my-kafka-top
生產(chǎn)者將消息發(fā)送broker阳啥,broker將消息保存到本地日志中添谊,消息的保存時(shí)有序的
單播消息:
當(dāng)存在一個(gè)生產(chǎn)者,一個(gè)消費(fèi)者組的時(shí)候察迟,一個(gè)消費(fèi)者組中只有一個(gè)消費(fèi)者會收到消息
./kafka-console-consumer.sh --bootstrap-server node1:9092 --consumer-property group.id=testGroup --topic my-kafka-topic
多播消息:
當(dāng)存在一個(gè)生產(chǎn)者斩狱,多個(gè)消費(fèi)組,不同消費(fèi)組只有一個(gè)消費(fèi)者收到消息
./kafka-console-consumer.sh --bootstrap-server node1:9092 --consumer-property group.id=testGroup1 --topic my-kafka-topic
./kafka-console-consumer.sh --bootstrap-server node1:9092 --consumer-property group.id=testGroup2 --topic my-kafka-topic
查看消費(fèi)組詳細(xì)信息:
./kafka-console-consumer.sh --bootstrap-server node1:9092 --describe --group testGroup
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG
cat 2 79668 79668 0
CURRENT-OFFSET:最后被消費(fèi)的偏移量
LOG-END-OFFSET:消息總量(最后一條消息的偏移量)
LAG :積壓了多少條消息
常見問題:
1扎瓶、如何防止消息丟失
生產(chǎn)者:使用同步消息發(fā)送所踊;ack設(shè)置為1/all;設(shè)置同步分區(qū)數(shù)>=2
消費(fèi)者:把自動(dòng)提交改成手動(dòng)提交
2、如何防止消息的重復(fù)消費(fèi)
針對網(wǎng)絡(luò)抖動(dòng)導(dǎo)致的生產(chǎn)者重試(發(fā)送消息)概荷,可以設(shè)置消費(fèi)者加鎖解決秕岛;
3、消息積壓
消費(fèi)者使用多線程異步處理接收數(shù)據(jù);創(chuàng)建多個(gè)消費(fèi)者組部署到其他機(jī)器上继薛;通過業(yè)務(wù)架構(gòu)設(shè)計(jì)修壕,提升業(yè)務(wù)層面消費(fèi)性能。
ps:
緩沖區(qū):kafka默認(rèn)會創(chuàng)建一個(gè)消息緩沖區(qū)去存放要發(fā)送的消息遏考,大小是32M慈鸠,每次本地線程會去緩沖區(qū)拉16K數(shù)據(jù)發(fā)送到broker,如果不到16K等待10ms也會將數(shù)據(jù)發(fā)送到broker
參考鏈接:
1灌具、kafka安裝教程--推薦
2青团、kafka配置文件server.properties參數(shù)說明
3、創(chuàng)建主題分區(qū)數(shù)
4稽亏、解決docker容器啟動(dòng)不了的問題
5壶冒、通過docker-compose集群部署
6、學(xué)習(xí)視頻