Kafaka介紹
Kafka是一個分布式的消息隊(duì)列系統(tǒng)(Message Queue)
一、Kafka介紹
kafka是一個高吞吐的分布式消息隊(duì)列系統(tǒng)。特點(diǎn)是生產(chǎn)者消費(fèi)者模式糠悯,先進(jìn)先出(FIFO)保證順序甥雕,自己不丟數(shù)據(jù),默認(rèn)每隔7天清理數(shù)據(jù)萤晴。消息列隊(duì)常見場景:系統(tǒng)之間解耦合吐句、峰值壓力緩沖胁后、異步通信。
二嗦枢、Kafka生產(chǎn)消息攀芯、存儲消息、消費(fèi)消息
Kafka架構(gòu)是由producer(消息生產(chǎn)者)文虏、consumer(消息消費(fèi)者)侣诺、borker(kafka集群的server,負(fù)責(zé)處理消息讀氧秘、寫請求紧武,存儲消息,在kafka cluster這一層這里敏储,其實(shí)里面是有很多個broker)阻星、topic(消息隊(duì)列/分類相當(dāng)于隊(duì)列,里面有生產(chǎn)者和消費(fèi)者模型)已添、zookeeper(元數(shù)據(jù)信息存在zookeeper中妥箕,包括:存儲消費(fèi)偏移量,topic話題信息更舞,partition信息) 這些部分組成畦幢。
kafka里面的消息是有topic來組織的,簡單的我們可以想象為一個隊(duì)列缆蝉,一個隊(duì)列就是一個topic宇葱,然后它把每個topic又分為很多個partition,這個是為了做并行的刊头,在每個partition內(nèi)部消息強(qiáng)有序黍瞧,相當(dāng)于有序的隊(duì)列,其中每個消息都有個序號offset原杂,比如0到12印颤,從前面讀往后面寫。一個partition對應(yīng)一個broker穿肄,一個broker可以管多個partition年局,比如說,topic有6個partition咸产,有兩個broker矢否,那每個broker就管3個partition。這個partition可以很簡單想象為一個文件脑溢,當(dāng)數(shù)據(jù)發(fā)過來的時候它就往這個partition上面append僵朗,追加就行,消息不經(jīng)過內(nèi)存緩沖,直接寫入文件衣迷,kafka和很多消息系統(tǒng)不一樣,很多消息系統(tǒng)是消費(fèi)完了我就把它刪掉酱酬,而kafka是根據(jù)時間策略刪除壶谒,而不是消費(fèi)完就刪除,在kafka里面沒有一個消費(fèi)完這么個概念膳沽,只有過期這樣一個概念汗菜。
producer自己決定往哪個partition里面去寫,這里有一些的策略挑社,譬如hash陨界。consumer自己維護(hù)消費(fèi)到哪個offset,每個consumer都有對應(yīng)的group痛阻,group內(nèi)是queue消費(fèi)模型(各個consumer消費(fèi)不同的partition菌瘪,因此一個消息在group內(nèi)只消費(fèi)一次),group間是publish-subscribe消費(fèi)模型阱当,各個group各自獨(dú)立消費(fèi)俏扩,互不影響,因此一個消息在被每個group消費(fèi)一次弊添。
三录淡、Kafka的特點(diǎn)
? 系統(tǒng)的特點(diǎn):生產(chǎn)者消費(fèi)者模型,F(xiàn)IFO
Partition內(nèi)部是FIFO的油坝,partition之間呢不是FIFO的嫉戚,當(dāng)然我們可以把topic設(shè)為一個partition,這樣就是嚴(yán)格的FIFO澈圈。
? 高性能:單節(jié)點(diǎn)支持上千個客戶端彬檀,百M(fèi)B/s吞吐,接近網(wǎng)卡的極限
? 持久性:消息直接持久化在普通磁盤上且性能好
直接寫到磁盤中去瞬女,就是直接append到磁盤里去凤覆,這樣的好處是直接持久化,數(shù)據(jù)不會丟失拆魏,第二個好處是順序?qū)懚㈣耄缓笙M(fèi)數(shù)據(jù)也是順序的讀,所以持久化的同時還能保證順序渤刃,比較好拥峦,因?yàn)榇疟P順序讀比較好。
? 分布式:數(shù)據(jù)副本冗余卖子、流量負(fù)載均衡略号、可擴(kuò)展
分布式,數(shù)據(jù)副本,也就是同一份數(shù)據(jù)可以到不同的broker上面去玄柠,也就是當(dāng)一份數(shù)據(jù)突梦,磁盤壞掉的時候,數(shù)據(jù)不會丟失羽利,比如3個副本宫患,就是在3個機(jī)器磁盤都壞掉的情況下數(shù)據(jù)才會丟,在大量使用情況下看這樣是非常好的这弧,負(fù)載均衡娃闲,可擴(kuò)展,在線擴(kuò)展匾浪,不需要停服務(wù)皇帮。
? 很靈活:消息長時間持久化+Client維護(hù)消費(fèi)狀態(tài)
消費(fèi)方式非常靈活,第一原因是消息持久化時間跨度比較長蛋辈,一天或者一星期等属拾,第二消費(fèi)狀態(tài)自己維護(hù)消費(fèi)到哪個地方了可以自定義消費(fèi)偏移量。
四冷溶、集群搭建
1.服務(wù)器準(zhǔn)備
Zookeeper集群共三臺服務(wù)器捌年,分別為:node05、node06挂洛、node07
Kafka集群共三臺服務(wù)器礼预,分別為:node05、node06虏劲、node07
2.Zookeeper集群準(zhǔn)備
Kafka是一個分布式消息隊(duì)列托酸,需要依賴ZooKeeper,請先安裝好zk集群柒巫。
Zookeeper集群安裝步驟略励堡,可參照【Storm學(xué)習(xí)筆記三:全分布式搭建】。
3.Kafka安裝解壓
(1)解壓
[root@node05 software] tar xf kafka_2.10-0.9.0.1.tgz -C /opt/sxt
[root@node05 sxt] mv kafka_2.10-0.9.0.1/ kafka
(2)修改配置文件 server.properties
[root@node05 sxt] cd /opt/sxt/kafka/config
############################# Server Basics #############################
# The id of the broker. This must be set to a unique integer for each broker.
broker.id=0
#broker集群中唯一標(biāo)識id堡掏,0应结、1、2泉唁、3依次增長(broker即Kafka集群中的一臺服務(wù)器)
############################# Zookeeper #############################
# root directory for all kafka znodes.
#zookeeper.connect=localhost:2181
zookeeper.connect=node05:2181,node06:2181,node07:2181
(3)安裝包分發(fā)到node06/node07
scp -r kafka/ node06:`pwd`
scp -r kafka/ node07:`pwd`
分別修改node06鹅龄、node07上Kafka配置文件中的broker.id為1、2
4.啟動Kafka集群
啟動Zookeeper集群
#分別在三臺服務(wù)器上執(zhí)行以下命令啟動
zkServer.sh start
啟動Kafka集群
#分別在三臺服務(wù)器上執(zhí)行以下命令啟動
cd /opt/sxt/kafka
bin/kafka-server-start.sh config/server.properties
5.測試Kafka
XShell新啟動一個標(biāo)簽頁
[c:\~]$ ssh node05
創(chuàng)建Topic
#查看幫助手冊
bin/kafka-topics.sh --help
#創(chuàng)建Topic
bin/kafka-topics.sh --zookeeper node05:2181,node06:2181,node07:2181 --create --replication-factor 2 --partitions 3 --topic test
參數(shù)說明:
--replication-factor:指定每個分區(qū)的復(fù)制因子個數(shù)亭畜,默認(rèn)1個
--partitions:指定當(dāng)前創(chuàng)建的kafka分區(qū)數(shù)量扮休,默認(rèn)為1個
--topic:指定新建topic的名稱
查看Topic列表
bin/kafka-topics.sh --zookeeper node05:2181,node06:2181,node07:2181 --list
查看"test"Topic描述
bin/kafka-topics.sh --zookeeper node05:2181,node06:2181,node07:2181 --describe --topic test
創(chuàng)建生產(chǎn)者(注意端口9092)
bin/kafka-console-producer.sh --broker-list node05:9092,node06:9092,node07:9092 --topic test
參數(shù)說明:
--topic test 表示給test主題創(chuàng)建生產(chǎn)者
注:此時shell會阻塞住,等待生產(chǎn)數(shù)據(jù)拴鸵,不斷輸入數(shù)據(jù)按回車即可
創(chuàng)建消費(fèi)者
此時從新開一個標(biāo)簽頁執(zhí)行以下命令
bin/kafka-console-consumer.sh --zookeeper node05:2181,node06:2181,node07:2181 --from-beginning --topic test
參數(shù)說明:
--from-beginning 消費(fèi)時從頭消費(fèi)數(shù)據(jù)
--topic test 表示給test主題創(chuàng)建生產(chǎn)者
注:此時就會看到生產(chǎn)者發(fā)送的數(shù)據(jù)玷坠,且生產(chǎn)者新生產(chǎn)一個蜗搔,消費(fèi)者就接收到一個
五、Kafka的leader的均衡機(jī)制
當(dāng)一個broker停止或者crashes時八堡,所有本來將它作為leader的分區(qū)將會把leader轉(zhuǎn)移到其他broker上去樟凄,極端情況下,會導(dǎo)致同一個leader管理多個分區(qū)兄渺,導(dǎo)致負(fù)載不均衡缝龄,同時當(dāng)這個broker重啟時,如果這個broker不再是任何分區(qū)的leader,kafka的client也不會從這個broker來讀取消息溶耘,從而導(dǎo)致資源的浪費(fèi)。
kafka中有一個被稱為優(yōu)先副本(preferred replicas)的概念服鹅。如果一個分區(qū)有3個副本凳兵,且這3個副本的優(yōu)先級別分別為0,1,2,根據(jù)優(yōu)先副本的概念企软,0會作為leader 庐扫。當(dāng)0節(jié)點(diǎn)的broker掛掉時,會啟動1這個節(jié)點(diǎn)broker當(dāng)做leader仗哨。當(dāng)0節(jié)點(diǎn)的broker再次啟動后形庭,會自動恢復(fù)為此partition的leader。不會導(dǎo)致負(fù)載不均衡和資源浪費(fèi)厌漂,這就是leader的均衡機(jī)制萨醒。
在配置文件conf/ server.properties中配置開啟(默認(rèn)就是開啟):
auto.leader.rebalance.enable true
六、Kafka 0.11版本改變
kafka 0.8.2版本消費(fèi)者offset存儲在zookeeper中苇倡,對于zookeeper而言每次寫操作代價(jià)是很昂貴的富纸,而且zookeeper集群是不能擴(kuò)展寫能力。kafka 0.11版本默認(rèn)使用新的消費(fèi)者api ,消費(fèi)者offset會更新到一個kafka自帶的topic【__consumer_offsets】中旨椒。以消費(fèi)者組groupid 為單位晓褪,可以查詢每個組的消費(fèi)topic情況:
#查看所有消費(fèi)者組
./kafka-consumer-groups.sh --bootstrap-server c7node1:9092,c7node2:9092,c7node3:9092 --list
#查看消費(fèi)者消費(fèi)的offset位置信息
./kafka-consumer-groups.sh --bootstrap-server c7node1:9092,c7node2:9092,c7node3:9092 --describe --group MyGroupId
#重置消費(fèi)者組的消費(fèi)offset信息 ,--reset-offsets –all-topics 所有offset综慎。--to-earliest 最小位置涣仿。
# --execute 執(zhí)行
./kafka-consumer-groups.sh --bootstrap-server c7node1:9092,c7node2:9092,c7node3:9092 --group MyGroupId --reset-offsets --all-topics --to-earliest --execute