kafka整理

kafka 知識(shí)整理

kafka介紹

kafka是一種分布式的基于發(fā)布/訂閱的消息系統(tǒng)厉颤。具有如下特征:

? 以時(shí)間復(fù)雜度為O(1)的方式提供消息持久化能力匿又,即使對(duì)TB級(jí)以上數(shù)據(jù)也能保證常數(shù)時(shí)間的訪問性能

? 高吞吐率冈止。即使是在非常廉價(jià)的商用機(jī)器上也能做到單機(jī)支持每秒100k條消息的傳輸

? 支持消息分區(qū)及分布式消費(fèi)碴里,同時(shí)保證每個(gè)partition內(nèi)的消息順序傳輸

? 同時(shí)支持離線和實(shí)時(shí)數(shù)據(jù)處理

kafka解析

broker

? kafka集群包括有一個(gè)或多個(gè)服務(wù)器肚豺,這種服務(wù)器被稱為broker

topic

? kafka發(fā)送消息的類別劫恒,用于區(qū)分不同消息發(fā)送的哨坪。物理上不同的topic的消息分開存儲(chǔ)庸疾,邏輯上一個(gè)topic的消息雖然保存于一個(gè)或者多個(gè)broker上,但用戶只需指定消息的topic即可生產(chǎn)或消費(fèi)數(shù)據(jù)当编,而不需要關(guān)心數(shù)據(jù)存于何處

partition

? topic物理上的分組届慈。每個(gè)topic包含一個(gè)或多個(gè)partition,創(chuàng)建topic時(shí)可指定parition數(shù)量忿偷。每個(gè)partition是一個(gè)有序的隊(duì)列金顿,對(duì)應(yīng)于一個(gè)文件夾,該文件夾下存儲(chǔ)該partition的數(shù)據(jù)和索引文件鲤桥。在發(fā)送一條消息時(shí)揍拆,生產(chǎn)者可以指定這條消息的key和分區(qū)機(jī)制來發(fā)送到不同的分區(qū)。

producer

? 負(fù)責(zé)發(fā)送消息到kafka broker茶凳,推送消息

consumer

? 負(fù)責(zé)從kafka broker 拉取消息嫂拴。每個(gè)consumer屬于一個(gè)特定的consumer group,同一個(gè)topic的一條消息只能被同一個(gè)group內(nèi)的一個(gè)consumer消費(fèi)慧妄,但多個(gè)consumer group可同時(shí)消費(fèi)這一topic

kafka架構(gòu)


? 如上圖所示顷牌,一個(gè)kafka集群包含有若干個(gè)producer,若干個(gè)broker(支持水平擴(kuò)展塞淹,broker數(shù)量越多窟蓝,集群的吞吐率越高),若干consumer group饱普,以及一個(gè)zookeeper集群运挫。

? kafka通過zookeeper管理集群配置,選舉leader套耕,以及在consumer group 發(fā)生變化時(shí)進(jìn)行rebalance谁帕。producer通過push模式將消息發(fā)布到broker,consumer使用pull模式從broker訂閱并消費(fèi)消息冯袍。

partition

每一條消息發(fā)送到broker時(shí)匈挖,會(huì)根據(jù)partition規(guī)則選擇被存儲(chǔ)到哪一個(gè)partition碾牌。如果partition規(guī)則設(shè)置合理,所有的消息可以均勻的分布到不同的partition中儡循,這樣就可以實(shí)現(xiàn)水平擴(kuò)展舶吗。在創(chuàng)建topic時(shí),可以在server.properties中指定partition的數(shù)量(num.partition=3)择膝,也可以在創(chuàng)建topic后再去修改partition數(shù)量誓琼。

? ?在發(fā)送一條消息時(shí),可以指定這條消息的key肴捉,producer會(huì)根據(jù)這個(gè)可以和partition機(jī)制來判斷將這條消息發(fā)送到哪個(gè)partition腹侣。partition機(jī)制可以通過producer的partition.class參數(shù)來指定,該class必須要實(shí)現(xiàn)kafka.producer.Partitioner接口齿穗。

kafka刪除舊數(shù)據(jù)策略

kafka提供了2種策略去刪除舊數(shù)據(jù):一是基于時(shí)間傲隶;二是基于partition文件大小

eg:在server.properties中配置

log.retention.hours=168? ? ? ? ? #kafka數(shù)據(jù)保留一周,刪除1周前舊數(shù)據(jù)

log.retention.bytes=1073741824? #partition 文件大小超過1GB則刪除舊數(shù)據(jù)


注:kafka讀取特定消息的時(shí)間復(fù)雜度為O(1)缤灵,即與文件大小無關(guān)伦籍,所以這里刪除文件與kafka性能無關(guān),選擇刪除舊數(shù)據(jù)策略只與磁盤及具體的需求有關(guān)

? ? kafka會(huì)為每一個(gè)consumer group保留一些metadata信息—當(dāng)前消費(fèi)的信息的position? 即offset腮出。這個(gè)offset由consumer控制,正常情況下consumer會(huì)在消費(fèi)完一條消息后線性增加offset芝薇。當(dāng)然胚嘲,這個(gè)offset也可以手動(dòng)設(shè)置,重新消費(fèi)一些消息洛二。因?yàn)閛ffset由consumer控制馋劈,所以kafka broker是無狀態(tài)的,不需要標(biāo)記哪些信息被consumer消費(fèi)過晾嘶,不過需要通過broker去保證同一個(gè)consumer group 只有一個(gè)consumer能消費(fèi)某一條消息妓雾,因此就不需要提供鎖機(jī)制,這也是kafka的高吞吐率提供了有力保障

replication & leader election

kafka從0.8版本開始提供了partition級(jí)別的replication垒迂,可以在server.properties 中配置? default.replication.factor = 1 械姻,默認(rèn)情況下為1

replication與leader election配合提供了自動(dòng)的failover機(jī)制,replication對(duì)kafka的吞吐率有一定的影響机断,但是極大的增強(qiáng)了可用性楷拳。每個(gè)partition都有一個(gè)唯一的leader,所有的讀寫操作都是在leader上完成吏奸,leader批量從leader上拉取數(shù)據(jù)欢揖,一般情況下partition的數(shù)量大于等于broker的數(shù)量,并且所有的partition的leader均勻分布在broker上奋蔚,follower的日志和其leader上的完全一致她混。

kafka判斷broker是否active烈钞,需要有兩個(gè)條件:

(1) 必須維護(hù)與zookeeper的會(huì)話(通過zookeeper的心跳機(jī)制來實(shí)現(xiàn))

(2) follower必須能夠及時(shí)從leader的數(shù)據(jù)復(fù)制過來,不能”落后太多”

? ? ? ? leader 選舉機(jī)制:

? ? ? ? ?每個(gè)partition都有一個(gè)”in sync”的node list,如果有一個(gè)follower宕機(jī)坤按,或者落后太多棵磷,leader將把它從”in sync”list中移除,這里落后太多的標(biāo)準(zhǔn)晋涣,是在server.properties中配置:? ?replica.lag.max.messages=4000; replica.lag.time.max.ms=10000

? ? ? ? ? kafka在zookeeper中動(dòng)態(tài)維護(hù)了一個(gè)ISR(in sync node list)set仪媒,這個(gè)set里面所有的replicat都跟上了leader,只有isr里面的成員才有可能被選中l(wèi)eader谢鹊;在isr中至少有一個(gè)follower時(shí)算吩,kafka可以確保已經(jīng)commit的數(shù)據(jù)不丟失,如果partition的所有replica都掛了佃扼,就無法保證數(shù)據(jù)不丟失了偎巢,這種情況下有兩種可行的方案:

(1) 等待isr中的任一個(gè)replica “活”過來,并且選它作為leader

(2) 選擇第一個(gè)“活”過來的replica(不一定是isr中的)作為leader

這就需要在可用性和一致性當(dāng)中做出一個(gè)簡(jiǎn)單的平衡兼耀,如果一定等待isr中的replica活過來压昼,那么可能會(huì)耗費(fèi)比較長(zhǎng)的時(shí)間;如果選擇非isr中的replica作為leader瘤运,即不能保證包含所有已經(jīng)提交的消息窍霞。Kafka支持用戶通過配置選擇這兩種方式中的一種,從而根據(jù)不同的使用場(chǎng)景選擇高可用性還是強(qiáng)一致性拯坟。

consumer group

? 每一個(gè)consumer實(shí)例都屬于一個(gè)consumer group但金,每一條消息只會(huì)被同一個(gè)consumer group里的一個(gè)consumer消費(fèi)。(不同group可以同時(shí)消費(fèi)同一條消息)


? Consumer Rebalance

如果某consumer group中consumer數(shù)量少于partition數(shù)量郁季,則至少有一個(gè)consumer會(huì)消費(fèi)多個(gè)partition的數(shù)據(jù)冷溃,如果consumer的數(shù)量與partition數(shù)量相同,則正好一個(gè)consumer消費(fèi)一個(gè)partition的數(shù)據(jù)梦裂,而如果consumer的數(shù)量多于partition的數(shù)量時(shí)似枕,會(huì)有部分consumer無法消費(fèi)該topic下任何一條消息。

每一個(gè)consumer或者broker的增加或者減少都會(huì)觸發(fā)consumer rebalance年柠。因?yàn)槊總€(gè)consumer只負(fù)責(zé)調(diào)整自己所消費(fèi)的partition凿歼,為了保證整個(gè)consumer group的一致性,所以當(dāng)一個(gè)consumer觸發(fā)了rebalance時(shí)彪杉,該consumer group內(nèi)的其它所有consumer也應(yīng)該同時(shí)觸發(fā)rebalance毅往。

文件存儲(chǔ)


同一個(gè)Topic 通常存儲(chǔ)的是一類消息,每個(gè)topic內(nèi)部實(shí)現(xiàn)又被分成多個(gè)partition派近,每個(gè)partition在存儲(chǔ)層面是append log文件攀唯。

在Kafka文件存儲(chǔ)中,同一個(gè)topic下有多個(gè)不同partition渴丸,每個(gè)partition為一個(gè)目錄侯嘀,partiton命名規(guī)則為topic名稱+有序序號(hào)另凌,第一個(gè)partiton序號(hào)從0開始,序號(hào)最大值為partitions數(shù)量減1戒幔。

? 每個(gè)partion(目錄)相當(dāng)于一個(gè)巨型文件被平均分配到多個(gè)大小相等segment(段)數(shù)據(jù)文件中吠谢。但每個(gè)段segment file消息數(shù)量不一定相等,這種特性方便old segment file快速被刪除诗茎。

? 每個(gè)partiton只需要支持順序讀寫就行了工坊,segment文件生命周期由服務(wù)端配置參數(shù)決定。

這樣做的好處就是能快速刪除無用文件敢订,有效提高磁盤利用率王污。

? segment file組成:由2大部分組成,分別為index file和data file楚午,此2個(gè)文件一一對(duì)應(yīng)昭齐,成對(duì)出現(xiàn)找前,后綴”.index”和“.log”分別表示為segment索引文件驮审、數(shù)據(jù)文件。

? segment文件命名規(guī)則:partion全局的第一個(gè)segment從0開始舷胜,后續(xù)每個(gè)segment文件名為上一個(gè)segment文件最后一條消息的offset值怪蔑。數(shù)值最大為64位long大小里覆,19位數(shù)字字符長(zhǎng)度,沒有數(shù)字用0填充饮睬。


副本放置策略

為了更好的做負(fù)載均衡租谈,Kafka盡量將所有的Partition均勻分配到整個(gè)集群上。Kafka分配Replica的算法如下:

? 將所有存活的N個(gè)Brokers和待分配的Partition排序

? 將第i個(gè)Partition分配到第(i mod n)個(gè)Broker上捆愁,這個(gè)Partition的第一個(gè)Replica存在于這個(gè)分配的Broker上,并且會(huì)作為partition的優(yōu)先副本

? 將第i個(gè)Partition的第j個(gè)Replica分配到第((i + j) mod n)個(gè)Broker上

假設(shè)集群一共有4個(gè)brokers窟却,一個(gè)topic有4個(gè)partition昼丑,每個(gè)Partition有3個(gè)副本。


同步策略

Producer在發(fā)布消息到某個(gè)Partition時(shí)夸赫,先通過ZooKeeper找到該P(yáng)artition的Leader菩帝,然后無論該Topic的Replication Factor為多少,Producer只將該消息發(fā)送到該P(yáng)artition的Leader茬腿。Leader會(huì)將該消息寫入其本地Log呼奢。每個(gè)Follower都從Leader pull數(shù)據(jù)。這種方式上切平,F(xiàn)ollower存儲(chǔ)的數(shù)據(jù)順序與Leader保持一致握础。Follower在收到該消息并寫入其Log后,向Leader發(fā)送ACK悴品。一旦Leader收到了ISR中的所有Replica的ACK禀综,該消息就被認(rèn)為已經(jīng)commit了简烘,Leader將增加HW并且向Producer發(fā)送ACK。

為了提高性能定枷,每個(gè)Follower在接收到數(shù)據(jù)后就立馬向Leader發(fā)送ACK孤澎,而非等到數(shù)據(jù)寫入Log中。因此欠窒,對(duì)于已經(jīng)commit的消息覆旭,Kafka只能保證它被存于多個(gè)Replica的內(nèi)存中,而不能保證它們被持久化到磁盤中岖妄,也就不能完全保證異常發(fā)生后該條消息一定能被Consumer消費(fèi)型将。

Consumer讀消息也是從Leader讀取,只有被commit過的消息才會(huì)暴露給Consumer衣吠。

Kafka Replication的數(shù)據(jù)流如下圖所示:



acks

枚舉值【-1,0,1】

? 返回值“0”茶敏,表示producer每生產(chǎn)一條數(shù)據(jù),不會(huì)等broker確認(rèn)是否已經(jīng)提交到log

? 返回值“1”缚俏,表示producer每生產(chǎn)一條數(shù)據(jù)惊搏,會(huì)跟leader的replica確認(rèn)是否收到數(shù)據(jù),這種保證了延遲性小的同時(shí)確保了leader成功接收了數(shù)據(jù)

? 返回值“-1”忧换,表示producer每生產(chǎn)一條數(shù)據(jù)恬惯,會(huì)跟所有的replica確認(rèn)是否收到數(shù)據(jù),這種情況延遲性最大

ksql


kafka 監(jiān)控

Kafka Eagle 用于監(jiān)控 Kafka 集群中 Topic 被消費(fèi)的情況亚茬。包含 Lag 的產(chǎn)生酪耳,Offset 的變動(dòng),Partition 的分布刹缝,Owner 碗暗,Topic 被創(chuàng)建的時(shí)間和修改的時(shí)間等信息

下載地址:https://github.com/smartloli/kafka-eagle

文檔:https://ke.smartloli.org/2.Install/2.Installing.html

安裝

? 解壓 tar –zxvf kafka-eagle-${version}-bin.tar.gz

? 添加環(huán)境變量

export KE_HOME=/data/soft/new/kafka-eagle

export PATH=$PATH:$KE_HOME/bin

? 修改系統(tǒng)環(huán)境變量? system-config.properties

cluster1.zk.list=node51:2181,node50:2181

修改mysql的配置信息,包括創(chuàng)建mysql庫ke梢夯,執(zhí)行sql目錄下ke.sql 創(chuàng)建表信息

? 啟動(dòng)kafka eagle

sh bin/ke.sh start

? 瀏覽器訪問

http://node51:8048/ke? ? 登錄用戶名從mysql的user表中查詢言疗,admin/123456


kafka基本命令

? 生產(chǎn)消息

sh kafka-console-consumer.sh --bootstrap-server node51:6667 --topic mp1

? 消費(fèi)消息

sh kafka-console-producer.sh --broker-list node51:6667 --topic mp1

? topic描述

sh kafka-topics.sh --zookeeper node51:2181 --describe --topic mp1

? topic列表

sh kafka-topics.sh --zookeeper node51:2181 --list

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市颂砸,隨后出現(xiàn)的幾起案子噪奄,更是在濱河造成了極大的恐慌,老刑警劉巖人乓,帶你破解...
    沈念sama閱讀 211,265評(píng)論 6 490
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件勤篮,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡色罚,警方通過查閱死者的電腦和手機(jī)碰缔,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,078評(píng)論 2 385
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來保屯,“玉大人手负,你說我怎么就攤上這事涤垫。” “怎么了竟终?”我有些...
    開封第一講書人閱讀 156,852評(píng)論 0 347
  • 文/不壞的土叔 我叫張陵蝠猬,是天一觀的道長(zhǎng)。 經(jīng)常有香客問我统捶,道長(zhǎng)榆芦,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 56,408評(píng)論 1 283
  • 正文 為了忘掉前任喘鸟,我火速辦了婚禮匆绣,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘什黑。我一直安慰自己崎淳,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,445評(píng)論 5 384
  • 文/花漫 我一把揭開白布愕把。 她就那樣靜靜地躺著拣凹,像睡著了一般。 火紅的嫁衣襯著肌膚如雪恨豁。 梳的紋絲不亂的頭發(fā)上嚣镜,一...
    開封第一講書人閱讀 49,772評(píng)論 1 290
  • 那天,我揣著相機(jī)與錄音橘蜜,去河邊找鬼菊匿。 笑死,一個(gè)胖子當(dāng)著我的面吹牛计福,可吹牛的內(nèi)容都是我干的跌捆。 我是一名探鬼主播,決...
    沈念sama閱讀 38,921評(píng)論 3 406
  • 文/蒼蘭香墨 我猛地睜開眼象颖,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼疹蛉!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起力麸,我...
    開封第一講書人閱讀 37,688評(píng)論 0 266
  • 序言:老撾萬榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎育韩,沒想到半個(gè)月后克蚂,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,130評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡筋讨,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,467評(píng)論 2 325
  • 正文 我和宋清朗相戀三年埃叭,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片悉罕。...
    茶點(diǎn)故事閱讀 38,617評(píng)論 1 340
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡赤屋,死狀恐怖立镶,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情类早,我是刑警寧澤媚媒,帶...
    沈念sama閱讀 34,276評(píng)論 4 329
  • 正文 年R本政府宣布,位于F島的核電站涩僻,受9級(jí)特大地震影響缭召,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜逆日,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,882評(píng)論 3 312
  • 文/蒙蒙 一嵌巷、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧室抽,春花似錦搪哪、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,740評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至神年,卻和暖如春已维,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背已日。 一陣腳步聲響...
    開封第一講書人閱讀 31,967評(píng)論 1 265
  • 我被黑心中介騙來泰國(guó)打工垛耳, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人飘千。 一個(gè)月前我還...
    沈念sama閱讀 46,315評(píng)論 2 360
  • 正文 我出身青樓堂鲜,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親护奈。 傳聞我的和親對(duì)象是個(gè)殘疾皇子缔莲,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,486評(píng)論 2 348

推薦閱讀更多精彩內(nèi)容