kafka基本概念:
1.什么是kafka?
1). apache kafka 是一個開源的分布式消息隊列 (生產(chǎn)者消費者模式)
2).apache kafka 目標(biāo): 構(gòu)建企業(yè)中統(tǒng)一的,高通量的,低延遲的消息平臺
3).大多的消息隊列(消息中間件) 都是基于 JMS 標(biāo)準(zhǔn)實現(xiàn)的, apache kafka 類似于 JMS 的實現(xiàn)
1.2 kafka 的特點
*作為緩沖(流量消減),來異構(gòu),解耦系統(tǒng)
? ? *用戶注冊需要完成多個步驟,每個步驟執(zhí)行都需要很長時間,代表用戶等待時間是所有步驟的累計時間
? ? *為了減少用戶等待的時間,使用并執(zhí)行,有多少個步驟,就開啟多少個線程來執(zhí)行,代表用戶等待時間是所有步驟中消耗時間最長的半個步驟時間
? ? *有了新的問題: 開啟多個線程執(zhí)行每個步驟,如果有個步驟執(zhí)行異常,或者嚴(yán)重超時,用戶等待的時間就不可控制了.
? ? *通過消息隊列來保證
? ? *注冊時,立即返回成功
? ? *發(fā)送注冊成功的消息到消息平臺
2.Apache kafka 的基本架構(gòu)
* kafka cluster :由多個服務(wù)器組成, 每個服務(wù)器單獨的名字 broker
* kafka broker : kafka 集群中包含的服務(wù)器
* kafka producer : 消息生產(chǎn)者, 發(fā)布消息到 kafka 集群的終端或服務(wù)
* kafka consumer : 消息消費者 ,負(fù)責(zé)消費數(shù)據(jù)
* kafka topic : 主題,一個類型消息的名稱.存儲數(shù)據(jù)時將同一類的數(shù)據(jù)存放在某個 topic 下,消費數(shù)據(jù)也是消費一類的數(shù)據(jù)
比如:
? ? 訂單系統(tǒng): 創(chuàng)建一個 topic ,叫做 order
? ? 用戶系統(tǒng): 創(chuàng)建一個 topic ,叫做 user
? ? 商品系統(tǒng): 創(chuàng)建一個topic ,叫做product
*注意: kafka 的元數(shù)據(jù)都是存放在 zookeeper 中的
3. 搭建kafka 集群
3.1 準(zhǔn)備工作:
1) 準(zhǔn)備三臺服務(wù)器,安裝jdk 1.8 ,其中每一臺虛擬機(jī)的hosts 文件中都要配置如下(域名映射)的內(nèi)容:
192.168.72.141 node01
192.168.72.142 node02
192.168.72.143 node03
2) 安裝目錄的 /export/servers? ? ? ? ?mkdir -p /export/servers/
3) zookeeper 集群安裝并啟動好(參見我的zookeeper集群安裝與配置)
3.2 去官網(wǎng)下載安裝包
由于kafka是scala 語言編寫的,基于Scala的多個版本,kafka 發(fā)布了多個版本,
這里推薦使用 2.11 版本.
3.3 上傳壓縮包并解壓
啟動命令:? ./kafka-server-start.sh /export/servers/kafka/config/server.properties 1>/dev/null 2>&1 &
由于kafka集群并沒有UI界面可以查看,所以我們可以通過查看zookeeper ,來判斷 kafka 集群是否正常運(yùn)行.
1)使用zookeeper的可視化工具進(jìn)行查看.
到這里kafka的集群已經(jīng)安裝完畢,啟動嫌麻煩的話,可以自己寫一個腳本,下面附上腳本代碼.給有興趣的人.
為什么要免密登錄 因為集群節(jié)點眾多, 所以一般在主節(jié)點啟動從節(jié)點, 這個時候就需要程序自動在主節(jié)點 登錄到從節(jié)點中, 如果不能免密就每次都要輸入密碼, 非常麻煩
?免密 SSH 登錄的原理?
1. 需要先在 B節(jié)點 配置 A節(jié)點 的公鑰
?2. A節(jié)點 請求 B節(jié)點 要求登錄?
3. B節(jié)點 使用 A節(jié)點 的公鑰, 加密一段隨機(jī)文本
4. A節(jié)點 使用私鑰解密, 并發(fā)回給 B節(jié)點 5. B節(jié)點 驗證文本是否正確
第一步:三臺機(jī)器生成公鑰與私鑰
在三臺機(jī)器執(zhí)行以下命令扁誓,生成公鑰與私鑰
ssh-keygen -t rsa
執(zhí)行該命令之后防泵,按下三個回車即可
第二步:拷貝公鑰到同一臺機(jī)器
三臺機(jī)器將拷貝公鑰到第一臺機(jī)器
三臺機(jī)器執(zhí)行命令:
ssh-copy-id node01
第三步:復(fù)制第一臺機(jī)器的認(rèn)證到其他機(jī)器
將第一臺機(jī)器的公鑰拷貝到其他機(jī)器上
在第一天機(jī)器上面指向以下命令
scp /root/.ssh/authorized_keys node02:/root/.ssh
scp /root/.ssh/authorized_keys node03:/root/.ssh
4.kafka 的基本使用
kafka 其本身就是一個消息隊列的中間件,主要是用來實現(xiàn)系統(tǒng)與系統(tǒng)之間信息的傳輸,一般有兩大角色,一個是生產(chǎn)者,一個是消費者.
所以kafka 的基本使用,就是用來如何使生產(chǎn)者發(fā)送數(shù)據(jù),如何去消費數(shù)據(jù),kafka提供了兩種方式來進(jìn)行實現(xiàn), 一種是采用kafaka 自帶的腳本來操作,另一種是使用相關(guān)的語言的 API 來進(jìn)行操作.
4.1 使用腳本操作kafka
說明: 索引執(zhí)行的腳本文件都存放在kafka的bin目錄中, 需要先進(jìn)入bin目錄才可以執(zhí)行?
cd /export/servers/kafka/bin
1) 創(chuàng)建一個topic: topic: 指的是話題, 主題的意思, 在消息發(fā)送的時候, 我們需要對消息進(jìn)行分類, 生產(chǎn)者和消費 者需要在同一個topic下, 才可以進(jìn)行發(fā)送和接收
./kafka-topics.sh --create --zookeeper node01:2181 --replication-factor 1 -partitions 1 --topic order
2) 使用Kafka自帶一個命令行客戶端啟動一個生產(chǎn)者,生產(chǎn)數(shù)據(jù)
./kafka-console-producer.sh --broker-list node01:9092 --topic order
3) 使用Kafka自帶一個命令行客戶端啟動一個消費者蝗敢,消費數(shù)據(jù)
./kafka-console-consumer.sh --bootstrap-server node01:9092 --topic order
該消費語句捷泞,只能獲取最新的數(shù)據(jù),要想歷史數(shù)據(jù)寿谴,需要添加選項--from-beginning
如:bin/kafka-console-consumer.sh --bootstrap-server node01:9092 --from-beginning --topic order
4) 查看有哪些topic
./kafka-topics.sh --list --zookeeper node01:2181
5) 查看某一個具體的Topic的詳細(xì)信息
./kafka-topics.sh --describe --topic order --zookeeper node01:2181
6) 刪除topic
./kafka-topics.sh --delete --topic order --zookeeper node01:2181
注意:徹底刪除一個topic锁右,需要在server.properties中配置delete.topic.enable=true,否則只 是標(biāo)記刪除 配置完成之后讶泰,需要重啟kafka服務(wù)咏瑟。
也可以通過zookeeper的客戶端工具, 直接將topic的對應(yīng)節(jié)點刪除
4.2 使用API 操作kafka,這里以java API 為例.直接去官網(wǎng)復(fù)制例子進(jìn)行修改就好了.
5. apache kafka 原理
5.1 分片與副本機(jī)制:
此處的分片值得是對 topic 中的數(shù)據(jù)進(jìn)行分片和建立副本,一個個topic 可以理解為 solrCloud 中一個個大的索引庫
分片機(jī)制: 主要解決了單臺服務(wù)器存儲容量有限的問題
當(dāng)數(shù)據(jù)量非常大的時候,一個服務(wù)器存放不下,就將數(shù)據(jù)分成兩個或者多個部分,存放在多臺服務(wù)器上,每個服務(wù)器上的額數(shù)據(jù),就叫做一個分片.
副本: 副本備份機(jī)制是為了解決數(shù)據(jù)存儲的高可用的問題
當(dāng)數(shù)據(jù)只保存一份的時候,有丟失的風(fēng)險,為了更好的容錯和容災(zāi),將數(shù)據(jù)拷貝幾份,保存到不同的機(jī)器上面.
5.2 kafka 保證數(shù)據(jù)不丟失的機(jī)制
5.2.1 保證生產(chǎn)者端不丟失
1) 消息生產(chǎn)分為同步模式/異步模式
2)消息確認(rèn)分為 三個狀態(tài):
0 : 生產(chǎn)者只負(fù)責(zé)數(shù)據(jù)的發(fā)送:
1 : 某個 partition 的 leader 收到數(shù)據(jù)給出響應(yīng)
-1 : 某個 partition 的所有副本都收到數(shù)據(jù)后給出響應(yīng) (partition 指上文說的一個分片,leader 指的是所有的副本中只有一個leader能執(zhí)行寫,修改等操作,其他follower的只能提供讀的操作,并且和leader 進(jìn)行同步)
3) 在同步模式下
a) 生產(chǎn)者等待 10s ,如果 broker(指集群中的一臺kafka服務(wù)器) 沒有給出 ack 響應(yīng),就認(rèn)為失敗.
b) 生產(chǎn)者重試3次, 如果還沒有響應(yīng),就報錯
4)在異步模式下:
a) 先將數(shù)據(jù)保存在生產(chǎn)者端的 buffer 中,buffer 大小是2 萬條
b)滿足數(shù)據(jù)閥值或者數(shù)量(時間)閥值其中的一個條件就可以發(fā)送數(shù)據(jù)
c) 發(fā)送一批數(shù)據(jù)的大小是 500 條
如果broker 遲遲不給 ack ,而 buffer 又滿了
開發(fā)者可以設(shè)置是否清空 buffer 中的數(shù)據(jù),如果不清空,表示等待
未完待續(xù)....有空更新