kafka 知識(shí)整理
kafka介紹
kafka是一種分布式的基于發(fā)布/訂閱的消息系統(tǒng)厉颤。具有如下特征:
? 以時(shí)間復(fù)雜度為O(1)的方式提供消息持久化能力匿又,即使對(duì)TB級(jí)以上數(shù)據(jù)也能保證常數(shù)時(shí)間的訪問性能
? 高吞吐率冈止。即使是在非常廉價(jià)的商用機(jī)器上也能做到單機(jī)支持每秒100k條消息的傳輸
? 支持消息分區(qū)及分布式消費(fèi)碴里,同時(shí)保證每個(gè)partition內(nèi)的消息順序傳輸
? 同時(shí)支持離線和實(shí)時(shí)數(shù)據(jù)處理
kafka解析
broker
? kafka集群包括有一個(gè)或多個(gè)服務(wù)器肚豺,這種服務(wù)器被稱為broker
topic
? kafka發(fā)送消息的類別劫恒,用于區(qū)分不同消息發(fā)送的哨坪。物理上不同的topic的消息分開存儲(chǔ)庸疾,邏輯上一個(gè)topic的消息雖然保存于一個(gè)或者多個(gè)broker上,但用戶只需指定消息的topic即可生產(chǎn)或消費(fèi)數(shù)據(jù)当编,而不需要關(guān)心數(shù)據(jù)存于何處
partition
? topic物理上的分組届慈。每個(gè)topic包含一個(gè)或多個(gè)partition,創(chuàng)建topic時(shí)可指定parition數(shù)量忿偷。每個(gè)partition是一個(gè)有序的隊(duì)列金顿,對(duì)應(yīng)于一個(gè)文件夾,該文件夾下存儲(chǔ)該partition的數(shù)據(jù)和索引文件鲤桥。在發(fā)送一條消息時(shí)揍拆,生產(chǎn)者可以指定這條消息的key和分區(qū)機(jī)制來發(fā)送到不同的分區(qū)。
producer
? 負(fù)責(zé)發(fā)送消息到kafka broker茶凳,推送消息
consumer
? 負(fù)責(zé)從kafka broker 拉取消息嫂拴。每個(gè)consumer屬于一個(gè)特定的consumer group,同一個(gè)topic的一條消息只能被同一個(gè)group內(nèi)的一個(gè)consumer消費(fèi)慧妄,但多個(gè)consumer group可同時(shí)消費(fèi)這一topic
kafka架構(gòu)
? 如上圖所示顷牌,一個(gè)kafka集群包含有若干個(gè)producer,若干個(gè)broker(支持水平擴(kuò)展塞淹,broker數(shù)量越多窟蓝,集群的吞吐率越高),若干consumer group饱普,以及一個(gè)zookeeper集群运挫。
? kafka通過zookeeper管理集群配置,選舉leader套耕,以及在consumer group 發(fā)生變化時(shí)進(jìn)行rebalance谁帕。producer通過push模式將消息發(fā)布到broker,consumer使用pull模式從broker訂閱并消費(fèi)消息冯袍。
partition
每一條消息發(fā)送到broker時(shí)匈挖,會(huì)根據(jù)partition規(guī)則選擇被存儲(chǔ)到哪一個(gè)partition碾牌。如果partition規(guī)則設(shè)置合理,所有的消息可以均勻的分布到不同的partition中儡循,這樣就可以實(shí)現(xiàn)水平擴(kuò)展舶吗。在創(chuàng)建topic時(shí),可以在server.properties中指定partition的數(shù)量(num.partition=3)择膝,也可以在創(chuàng)建topic后再去修改partition數(shù)量誓琼。
? ?在發(fā)送一條消息時(shí),可以指定這條消息的key肴捉,producer會(huì)根據(jù)這個(gè)可以和partition機(jī)制來判斷將這條消息發(fā)送到哪個(gè)partition腹侣。partition機(jī)制可以通過producer的partition.class參數(shù)來指定,該class必須要實(shí)現(xiàn)kafka.producer.Partitioner接口齿穗。
kafka刪除舊數(shù)據(jù)策略
kafka提供了2種策略去刪除舊數(shù)據(jù):一是基于時(shí)間傲隶;二是基于partition文件大小
eg:在server.properties中配置
log.retention.hours=168? ? ? ? ? #kafka數(shù)據(jù)保留一周,刪除1周前舊數(shù)據(jù)
log.retention.bytes=1073741824? #partition 文件大小超過1GB則刪除舊數(shù)據(jù)
注:kafka讀取特定消息的時(shí)間復(fù)雜度為O(1)缤灵,即與文件大小無關(guān)伦籍,所以這里刪除文件與kafka性能無關(guān),選擇刪除舊數(shù)據(jù)策略只與磁盤及具體的需求有關(guān)
? ? kafka會(huì)為每一個(gè)consumer group保留一些metadata信息—當(dāng)前消費(fèi)的信息的position? 即offset腮出。這個(gè)offset由consumer控制,正常情況下consumer會(huì)在消費(fèi)完一條消息后線性增加offset芝薇。當(dāng)然胚嘲,這個(gè)offset也可以手動(dòng)設(shè)置,重新消費(fèi)一些消息洛二。因?yàn)閛ffset由consumer控制馋劈,所以kafka broker是無狀態(tài)的,不需要標(biāo)記哪些信息被consumer消費(fèi)過晾嘶,不過需要通過broker去保證同一個(gè)consumer group 只有一個(gè)consumer能消費(fèi)某一條消息妓雾,因此就不需要提供鎖機(jī)制,這也是kafka的高吞吐率提供了有力保障
replication & leader election
kafka從0.8版本開始提供了partition級(jí)別的replication垒迂,可以在server.properties 中配置? default.replication.factor = 1 械姻,默認(rèn)情況下為1
replication與leader election配合提供了自動(dòng)的failover機(jī)制,replication對(duì)kafka的吞吐率有一定的影響机断,但是極大的增強(qiáng)了可用性楷拳。每個(gè)partition都有一個(gè)唯一的leader,所有的讀寫操作都是在leader上完成吏奸,leader批量從leader上拉取數(shù)據(jù)欢揖,一般情況下partition的數(shù)量大于等于broker的數(shù)量,并且所有的partition的leader均勻分布在broker上奋蔚,follower的日志和其leader上的完全一致她混。
kafka判斷broker是否active烈钞,需要有兩個(gè)條件:
(1) 必須維護(hù)與zookeeper的會(huì)話(通過zookeeper的心跳機(jī)制來實(shí)現(xiàn))
(2) follower必須能夠及時(shí)從leader的數(shù)據(jù)復(fù)制過來,不能”落后太多”
? ? ? ? leader 選舉機(jī)制:
? ? ? ? ?每個(gè)partition都有一個(gè)”in sync”的node list,如果有一個(gè)follower宕機(jī)坤按,或者落后太多棵磷,leader將把它從”in sync”list中移除,這里落后太多的標(biāo)準(zhǔn)晋涣,是在server.properties中配置:? ?replica.lag.max.messages=4000; replica.lag.time.max.ms=10000
? ? ? ? ? kafka在zookeeper中動(dòng)態(tài)維護(hù)了一個(gè)ISR(in sync node list)set仪媒,這個(gè)set里面所有的replicat都跟上了leader,只有isr里面的成員才有可能被選中l(wèi)eader谢鹊;在isr中至少有一個(gè)follower時(shí)算吩,kafka可以確保已經(jīng)commit的數(shù)據(jù)不丟失,如果partition的所有replica都掛了佃扼,就無法保證數(shù)據(jù)不丟失了偎巢,這種情況下有兩種可行的方案:
(1) 等待isr中的任一個(gè)replica “活”過來,并且選它作為leader
(2) 選擇第一個(gè)“活”過來的replica(不一定是isr中的)作為leader
這就需要在可用性和一致性當(dāng)中做出一個(gè)簡(jiǎn)單的平衡兼耀,如果一定等待isr中的replica活過來压昼,那么可能會(huì)耗費(fèi)比較長(zhǎng)的時(shí)間;如果選擇非isr中的replica作為leader瘤运,即不能保證包含所有已經(jīng)提交的消息窍霞。Kafka支持用戶通過配置選擇這兩種方式中的一種,從而根據(jù)不同的使用場(chǎng)景選擇高可用性還是強(qiáng)一致性拯坟。
consumer group
? 每一個(gè)consumer實(shí)例都屬于一個(gè)consumer group但金,每一條消息只會(huì)被同一個(gè)consumer group里的一個(gè)consumer消費(fèi)。(不同group可以同時(shí)消費(fèi)同一條消息)
? Consumer Rebalance
如果某consumer group中consumer數(shù)量少于partition數(shù)量郁季,則至少有一個(gè)consumer會(huì)消費(fèi)多個(gè)partition的數(shù)據(jù)冷溃,如果consumer的數(shù)量與partition數(shù)量相同,則正好一個(gè)consumer消費(fèi)一個(gè)partition的數(shù)據(jù)梦裂,而如果consumer的數(shù)量多于partition的數(shù)量時(shí)似枕,會(huì)有部分consumer無法消費(fèi)該topic下任何一條消息。
每一個(gè)consumer或者broker的增加或者減少都會(huì)觸發(fā)consumer rebalance年柠。因?yàn)槊總€(gè)consumer只負(fù)責(zé)調(diào)整自己所消費(fèi)的partition凿歼,為了保證整個(gè)consumer group的一致性,所以當(dāng)一個(gè)consumer觸發(fā)了rebalance時(shí)彪杉,該consumer group內(nèi)的其它所有consumer也應(yīng)該同時(shí)觸發(fā)rebalance毅往。
文件存儲(chǔ)
同一個(gè)Topic 通常存儲(chǔ)的是一類消息,每個(gè)topic內(nèi)部實(shí)現(xiàn)又被分成多個(gè)partition派近,每個(gè)partition在存儲(chǔ)層面是append log文件攀唯。
在Kafka文件存儲(chǔ)中,同一個(gè)topic下有多個(gè)不同partition渴丸,每個(gè)partition為一個(gè)目錄侯嘀,partiton命名規(guī)則為topic名稱+有序序號(hào)另凌,第一個(gè)partiton序號(hào)從0開始,序號(hào)最大值為partitions數(shù)量減1戒幔。
? 每個(gè)partion(目錄)相當(dāng)于一個(gè)巨型文件被平均分配到多個(gè)大小相等segment(段)數(shù)據(jù)文件中吠谢。但每個(gè)段segment file消息數(shù)量不一定相等,這種特性方便old segment file快速被刪除诗茎。
? 每個(gè)partiton只需要支持順序讀寫就行了工坊,segment文件生命周期由服務(wù)端配置參數(shù)決定。
這樣做的好處就是能快速刪除無用文件敢订,有效提高磁盤利用率王污。
? segment file組成:由2大部分組成,分別為index file和data file楚午,此2個(gè)文件一一對(duì)應(yīng)昭齐,成對(duì)出現(xiàn)找前,后綴”.index”和“.log”分別表示為segment索引文件驮审、數(shù)據(jù)文件。
? segment文件命名規(guī)則:partion全局的第一個(gè)segment從0開始舷胜,后續(xù)每個(gè)segment文件名為上一個(gè)segment文件最后一條消息的offset值怪蔑。數(shù)值最大為64位long大小里覆,19位數(shù)字字符長(zhǎng)度,沒有數(shù)字用0填充饮睬。
副本放置策略
為了更好的做負(fù)載均衡租谈,Kafka盡量將所有的Partition均勻分配到整個(gè)集群上。Kafka分配Replica的算法如下:
? 將所有存活的N個(gè)Brokers和待分配的Partition排序
? 將第i個(gè)Partition分配到第(i mod n)個(gè)Broker上捆愁,這個(gè)Partition的第一個(gè)Replica存在于這個(gè)分配的Broker上,并且會(huì)作為partition的優(yōu)先副本
? 將第i個(gè)Partition的第j個(gè)Replica分配到第((i + j) mod n)個(gè)Broker上
假設(shè)集群一共有4個(gè)brokers窟却,一個(gè)topic有4個(gè)partition昼丑,每個(gè)Partition有3個(gè)副本。
同步策略
Producer在發(fā)布消息到某個(gè)Partition時(shí)夸赫,先通過ZooKeeper找到該P(yáng)artition的Leader菩帝,然后無論該Topic的Replication Factor為多少,Producer只將該消息發(fā)送到該P(yáng)artition的Leader茬腿。Leader會(huì)將該消息寫入其本地Log呼奢。每個(gè)Follower都從Leader pull數(shù)據(jù)。這種方式上切平,F(xiàn)ollower存儲(chǔ)的數(shù)據(jù)順序與Leader保持一致握础。Follower在收到該消息并寫入其Log后,向Leader發(fā)送ACK悴品。一旦Leader收到了ISR中的所有Replica的ACK禀综,該消息就被認(rèn)為已經(jīng)commit了简烘,Leader將增加HW并且向Producer發(fā)送ACK。
為了提高性能定枷,每個(gè)Follower在接收到數(shù)據(jù)后就立馬向Leader發(fā)送ACK孤澎,而非等到數(shù)據(jù)寫入Log中。因此欠窒,對(duì)于已經(jīng)commit的消息覆旭,Kafka只能保證它被存于多個(gè)Replica的內(nèi)存中,而不能保證它們被持久化到磁盤中岖妄,也就不能完全保證異常發(fā)生后該條消息一定能被Consumer消費(fèi)型将。
Consumer讀消息也是從Leader讀取,只有被commit過的消息才會(huì)暴露給Consumer衣吠。
Kafka Replication的數(shù)據(jù)流如下圖所示:
acks
枚舉值【-1,0,1】
? 返回值“0”茶敏,表示producer每生產(chǎn)一條數(shù)據(jù),不會(huì)等broker確認(rèn)是否已經(jīng)提交到log
? 返回值“1”缚俏,表示producer每生產(chǎn)一條數(shù)據(jù)惊搏,會(huì)跟leader的replica確認(rèn)是否收到數(shù)據(jù),這種保證了延遲性小的同時(shí)確保了leader成功接收了數(shù)據(jù)
? 返回值“-1”忧换,表示producer每生產(chǎn)一條數(shù)據(jù)恬惯,會(huì)跟所有的replica確認(rèn)是否收到數(shù)據(jù),這種情況延遲性最大
ksql
kafka 監(jiān)控
Kafka Eagle 用于監(jiān)控 Kafka 集群中 Topic 被消費(fèi)的情況亚茬。包含 Lag 的產(chǎn)生酪耳,Offset 的變動(dòng),Partition 的分布刹缝,Owner 碗暗,Topic 被創(chuàng)建的時(shí)間和修改的時(shí)間等信息
下載地址:https://github.com/smartloli/kafka-eagle
文檔:https://ke.smartloli.org/2.Install/2.Installing.html
安裝
? 解壓 tar –zxvf kafka-eagle-${version}-bin.tar.gz
? 添加環(huán)境變量
export KE_HOME=/data/soft/new/kafka-eagle
export PATH=$PATH:$KE_HOME/bin
? 修改系統(tǒng)環(huán)境變量? system-config.properties
cluster1.zk.list=node51:2181,node50:2181
修改mysql的配置信息,包括創(chuàng)建mysql庫ke梢夯,執(zhí)行sql目錄下ke.sql 創(chuàng)建表信息
? 啟動(dòng)kafka eagle
sh bin/ke.sh start
? 瀏覽器訪問
http://node51:8048/ke? ? 登錄用戶名從mysql的user表中查詢言疗,admin/123456
kafka基本命令
? 生產(chǎn)消息
sh kafka-console-consumer.sh --bootstrap-server node51:6667 --topic mp1
? 消費(fèi)消息
sh kafka-console-producer.sh --broker-list node51:6667 --topic mp1
? topic描述
sh kafka-topics.sh --zookeeper node51:2181 --describe --topic mp1
? topic列表
sh kafka-topics.sh --zookeeper node51:2181 --list