本文主要講解 Kafka 的架構(gòu)包括工作流程和存儲(chǔ)機(jī)制圈驼,以及生產(chǎn)者和消費(fèi)者人芽,最終大家會(huì)掌握 Kafka 中最重要的原理和概念,分別是 broker绩脆、producer萤厅、consumer橄抹、consumer group、topic惕味、partition楼誓、replica、leader名挥、follower疟羹,這是學(xué)會(huì)和理解 Kafka 的基礎(chǔ)和必備內(nèi)容,建議收藏禀倔。
1. 定義
Kafka 是一個(gè)分布式的基于發(fā)布/訂閱模式的消息隊(duì)列(Message Queue)榄融,主要應(yīng)用與大數(shù)據(jù)實(shí)時(shí)處理領(lǐng)域。
1.1 消息隊(duì)列
Kafka 本質(zhì)上是一個(gè) MQ(Message Queue)救湖,使用消息隊(duì)列的好處愧杯? (面試會(huì)問)
- 解耦:允許我們獨(dú)立的擴(kuò)展或修改隊(duì)列兩邊的處理過程。
- 可恢復(fù)性:即使一個(gè)處理消息的進(jìn)程掛掉捎谨,加入隊(duì)列中的消息仍然可以在系統(tǒng)恢復(fù)后被處理民效。
- 緩沖:有助于解決生產(chǎn)消息和消費(fèi)消息的處理速度不一致的情況。
- 靈活性&峰值處理能力:不會(huì)因?yàn)橥话l(fā)的超負(fù)荷的請求而完全崩潰涛救,消息隊(duì)列能夠使關(guān)鍵組件頂住突發(fā)的訪問壓力畏邢。
- 異步通信:消息隊(duì)列允許用戶把消息放入隊(duì)列但不立即處理它。
1.2 發(fā)布/訂閱模式
一對多检吆,生產(chǎn)者將消息發(fā)布到 topic 中舒萎,有多個(gè)消費(fèi)者訂閱該主題,發(fā)布到 topic 的消息會(huì)被所有訂閱者消費(fèi)蹭沛,被消費(fèi)的數(shù)據(jù)不會(huì)立即從 topic 清除臂寝。
2. 架構(gòu)
Kafka 存儲(chǔ)的消息來自任意多被稱為 Producer 生產(chǎn)者的進(jìn)程。數(shù)據(jù)從而可以被發(fā)布到不同的 Topic 主題下的不同 Partition 分區(qū)摊灭。在一個(gè)分區(qū)內(nèi)咆贬,這些消息被索引并連同時(shí)間戳存儲(chǔ)在一起。其它被稱為 Consumer 消費(fèi)者的進(jìn)程可以從分區(qū)訂閱消息帚呼。Kafka 運(yùn)行在一個(gè)由一臺(tái)或多臺(tái)服務(wù)器組成的集群上掏缎,并且分區(qū)可以跨集群結(jié)點(diǎn)分布。
下面給出 Kafka 一些重要概念煤杀,讓大家對 Kafka 有個(gè)整體的認(rèn)識(shí)和感知眷蜈,后面還會(huì)詳細(xì)的解析每一個(gè)概念的作用以及更深入的原理。
- Producer: 消息生產(chǎn)者沈自,向 Kafka Broker 發(fā)消息的客戶端酌儒。
- Consumer: 消息消費(fèi)者,從 Kafka Broker 取消息的客戶端枯途。
- Consumer Group: 消費(fèi)者組(CG)忌怎,消費(fèi)者組內(nèi)每個(gè)消費(fèi)者負(fù)責(zé)消費(fèi)不同分區(qū)的數(shù)據(jù)籍滴,提高消費(fèi)能力。一個(gè)分區(qū)只能由組內(nèi)一個(gè)消費(fèi)者消費(fèi)呆躲,消費(fèi)者組之間互不影響异逐。所有的消費(fèi)者都屬于某個(gè)消費(fèi)者組,即消費(fèi)者組是邏輯上的一個(gè)訂閱者插掂。
- Broker: 一臺(tái) Kafka 機(jī)器就是一個(gè) broker灰瞻。一個(gè)集群由多個(gè) broker 組成。一個(gè) broker 可以容納多個(gè) topic辅甥。
- Topic: 可以理解為一個(gè)隊(duì)列酝润,topic 將消息分類,生產(chǎn)者和消費(fèi)者面向的是同一個(gè) topic璃弄。
- Partition: 為了實(shí)現(xiàn)擴(kuò)展性要销,提高并發(fā)能力,一個(gè)非常大的 topic 可以分布到多個(gè) broker (即服務(wù)器)上夏块,一個(gè) topic 可以分為多個(gè) partition疏咐,每個(gè) partition 是一個(gè) 有序的隊(duì)列。
- Replica: 副本脐供,為實(shí)現(xiàn)備份的功能浑塞,保證集群中的某個(gè)節(jié)點(diǎn)發(fā)生故障時(shí),該節(jié)點(diǎn)上的 partition 數(shù)據(jù)不丟失政己,且 Kafka 仍然能夠繼續(xù)工作酌壕,Kafka 提供了副本機(jī)制,一個(gè) topic 的每個(gè)分區(qū)都有若干個(gè)副本歇由,一個(gè) leader 和若干個(gè) follower卵牍。
- Leader: 每個(gè)分區(qū)多個(gè)副本的“主”副本,生產(chǎn)者發(fā)送數(shù)據(jù)的對象沦泌,以及消費(fèi)者消費(fèi)數(shù)據(jù)的對象糊昙,都是 leader。
- Follower: 每個(gè)分區(qū)多個(gè)副本的“從”副本谢谦,實(shí)時(shí)從 leader 中同步數(shù)據(jù)溅蛉,保持和 leader 數(shù)據(jù)的同步汹桦。leader 發(fā)生故障時(shí)诊杆,某個(gè) follower 還會(huì)成為新的 leader奴愉。
- offset: 消費(fèi)者消費(fèi)的位置信息,監(jiān)控?cái)?shù)據(jù)消費(fèi)到什么位置厅各,當(dāng)消費(fèi)者掛掉再重新恢復(fù)的時(shí)候,可以從消費(fèi)位置繼續(xù)消費(fèi)预柒。
- Zookeeper: Kafka 集群能夠正常工作队塘,需要依賴于 zookeeper袁梗,zookeeper 幫助 Kafka 存儲(chǔ)和管理集群信息。
3 工作流程
Kafka集群將 Record 流存儲(chǔ)在稱為 topic 的類別中憔古,每個(gè)記錄由一個(gè)鍵遮怜、一個(gè)值和一個(gè)時(shí)間戳組成。Kafka 是一個(gè)分布式流平臺(tái)鸿市,這到底是什么意思锯梁?
- 發(fā)布和訂閱記錄流,類似于消息隊(duì)列或企業(yè)消息傳遞系統(tǒng)焰情。
- 以容錯(cuò)的持久方式存儲(chǔ)記錄流陌凳。
- 處理記錄流。
Kafka 中消息是以 topic 進(jìn)行分類的内舟,生產(chǎn)者生產(chǎn)消息合敦,消費(fèi)者消費(fèi)消息,面向的都是同一個(gè) topic验游。
topic 是邏輯上的概念充岛,而 partition 是物理上的概念,每個(gè) partition 對應(yīng)于一個(gè) log 文件耕蝉,該 log 文件中存儲(chǔ)的就是 Producer 生產(chǎn)的數(shù)據(jù)崔梗。Producer 生產(chǎn)的數(shù)據(jù)會(huì)不斷追加到該 log 文件末端,且每條數(shù)據(jù)都有自己的 offset赔硫。消費(fèi)者組中的每個(gè)消費(fèi)者炒俱,都會(huì)實(shí)時(shí)記錄自己消費(fèi)到了哪個(gè) offset,以便出錯(cuò)恢復(fù)時(shí)爪膊,從上次的位置繼續(xù)消費(fèi)权悟。
4 存儲(chǔ)機(jī)制
由于生產(chǎn)者生產(chǎn)的消息會(huì)不斷追加到 log 文件末尾,為防止 log 文件過大導(dǎo)致數(shù)據(jù)定位效率低下推盛,Kafka 采取了分片和索引機(jī)制峦阁,將每個(gè) partition 分為多個(gè) segment,每個(gè) segment 對應(yīng)兩個(gè)文件:“.index” 索引文件和 “.log” 數(shù)據(jù)文件耘成。這些文件位于同一文件下榔昔,該文件夾的命名規(guī)則為:topic 名-分區(qū)號(hào)。例如瘪菌,first 這個(gè) topic 有三分分區(qū)撒会,則其對應(yīng)的文件夾為 first-0,first-1师妙,first-2诵肛。
# ls /root/data/kafka/first-0
00000000000000009014.index
00000000000000009014.log
00000000000000009014.timeindex
00000000000000009014.snapshot
leader-epoch-checkpoint
index 和 log 文件以當(dāng)前 segment 的第一條消息的 offset 命名。下圖為 index 文件 和 log 文件的結(jié)構(gòu)示意圖默穴。
“.index” 文件存儲(chǔ)大量的索引信息怔檩,“.log” 文件存儲(chǔ)大量的數(shù)據(jù)褪秀,索引文件中的元數(shù)據(jù)指向?qū)?yīng)數(shù)據(jù)文件中 message 的物理偏移量。
5. 生產(chǎn)者
5.1 分區(qū)策略
5.1.1 分區(qū)原因
- 方便在集群中擴(kuò)展薛训,每個(gè) partition 可以通過調(diào)整以適應(yīng)它所在的機(jī)器媒吗,而一個(gè) topic 又可以有多個(gè) partition 組成,因此可以以 partition 為單位讀寫了乙埃。
- 可以提高并發(fā)闸英,因此可以以 partition 為單位讀寫了。
5.1.2 分區(qū)原則
我們需要將 Producer 發(fā)送的數(shù)據(jù)封裝成一個(gè) ProducerRecord 對象膊爪。該對象需要指定一些參數(shù):
- topic:string 類型自阱,NotNull
- partition:int 類型,可選
- timestamp:long 類型米酬,可選
- key:string類型沛豌,可選
- value:string 類型,可選
- headers:array 類型赃额,Nullable
(1) 指明 partition 的情況下加派,直接將給定的 value 作為 partition 的值。
(2) 沒有指明 partition 但有 key 的情況下跳芳,將 key 的 hash 值與分區(qū)數(shù)取余得到 partition 值芍锦。
(3) 既沒有 partition 有沒有 key 的情況下,第一次調(diào)用時(shí)隨機(jī)生成一個(gè)整數(shù)(后面每次調(diào)用都在這個(gè)整數(shù)上自增)飞盆,將這個(gè)值與可用的分區(qū)數(shù)取余娄琉,得到 partition 值,也就是常說的 round-robin 輪詢算法吓歇。
5.2 數(shù)據(jù)可靠性保證
為保證 producer 發(fā)送的數(shù)據(jù)孽水,能可靠地發(fā)送到指定的 topic,topic 的每個(gè) partition 收到 producer 發(fā)送的數(shù)據(jù)后城看,都需要向 producer 發(fā)送 ack(acknowledge 確認(rèn)收到)女气,如果 producer 收到 ack,就會(huì)進(jìn)行下一輪的發(fā)送测柠,否則重新發(fā)送數(shù)據(jù)炼鞠。
5.2.1 副本數(shù)據(jù)同步策略
(1)何時(shí)發(fā)送 ack?
確保有 follower 與 leader 同步完成轰胁,leader 再發(fā)送 ack谒主,這樣才能保證 leader 掛掉之后,能在 follower 中選舉出新的 leader 而不丟數(shù)據(jù)赃阀。
(2)多少個(gè) follower 同步完成后發(fā)送 ack霎肯?
全部 follower 同步完成,再發(fā)送 ack。
5.2.2 ISR
采用第二種方案姿现,所有 follower 完成同步,producer 才能繼續(xù)發(fā)送數(shù)據(jù)肖抱,設(shè)想有一個(gè) follower 因?yàn)槟撤N原因出現(xiàn)故障备典,那 leader 就要一直等到它完成同步。這個(gè)問題怎么解決意述?
leader維護(hù)了一個(gè)動(dòng)態(tài)的 in-sync replica set(ISR):和 leader 保持同步的 follower 集合提佣。當(dāng) ISR 集合中的 follower 完成數(shù)據(jù)的同步之后,leader 就會(huì)給 follower 發(fā)送 ack荤崇。如果 follower 長時(shí)間未向 leader 同步數(shù)據(jù)拌屏,則該 follower 將被踢出 ISR 集合,該時(shí)間閾值由 replica.lag.time.max.ms 參數(shù)設(shè)定术荤。leader 發(fā)生故障后倚喂,就會(huì)從 ISR 中選舉出新的 leader。
5.2.3 ack 應(yīng)答機(jī)制
對于某些不太重要的數(shù)據(jù)瓣戚,對數(shù)據(jù)的可靠性要求不是很高端圈,能夠容忍數(shù)據(jù)的少量丟失,所以沒必要等 ISR 中的 follower 全部接受成功子库。
所以 Kafka 為用戶提供了三種可靠性級別舱权,用戶根據(jù)可靠性和延遲的要求進(jìn)行權(quán)衡,選擇以下的配置仑嗅。
(1)ack 參數(shù)配置:
- 0:producer 不等待 broker 的 ack宴倍,這提供了最低延遲,broker 一收到數(shù)據(jù)還沒有寫入磁盤就已經(jīng)返回仓技,當(dāng) broker 故障時(shí)有可能丟失數(shù)據(jù)鸵贬。
- 1:producer 等待 broker 的 ack,partition 的 leader 落盤成功后返回 ack浑彰,如果在 follower 同步成功之前 leader 故障恭理,那么將會(huì)丟失數(shù)據(jù)。
- -1(all):producer 等待 broker 的 ack郭变,partition 的 leader 和 follower 全部落盤成功后才返回 ack颜价。但是在 broker 發(fā)送 ack 時(shí),leader 發(fā)生故障诉濒,則會(huì)造成數(shù)據(jù)重復(fù)周伦。
5.2.4 故障處理細(xì)節(jié)
LEO:每個(gè)副本最大的 offset。
HW:消費(fèi)者能見到的最大的 offset未荒,ISR 隊(duì)列中最小的 LEO专挪。
(1)Follower 故障
follower 發(fā)生故障后會(huì)被臨時(shí)踢出 ISR 集合,待該 follower 恢復(fù)后,follower 會(huì) 讀取本地磁盤記錄的上次的 HW寨腔,并將 log 文件高于 HW 的部分截取掉速侈,從 HW 開始向 leader 進(jìn)行同步數(shù)據(jù)操作。等該 follower 的 LEO 大于等于該 partition 的 HW迫卢,即 follower 追上 leader 后倚搬,就可以重新加入 ISR 了。
(2)Leader 故障
leader 發(fā)生故障后乾蛤,會(huì)從 ISR 中選出一個(gè)新的 leader每界,之后,為保證多個(gè)副本之間的數(shù)據(jù)一致性家卖,其余的 follower 會(huì)先將各自的 log 文件高于 HW 的部分截掉眨层,然后從新的 leader 同步數(shù)據(jù)。
注意:這只能保證副本之間的數(shù)據(jù)一致性上荡,并不能保證數(shù)據(jù)不丟失或者不重復(fù)趴樱。
5.3 Exactly Once 語義
將服務(wù)器的 ACK 級別設(shè)置為-1,可以保證 producer 到 server 之間不會(huì)丟失數(shù)據(jù)榛臼,即 At Least Once 語義伊佃。相對的,將服務(wù)器 ACK 級別設(shè)置為0沛善,可以保證生產(chǎn)者每條消息只會(huì)被發(fā)送一次航揉,即At Most Once 語義。
At Least Once 可以保證數(shù)據(jù)不丟失金刁,但是不能保證數(shù)據(jù)不重復(fù)帅涂;相對的,At Most Once 可以保證數(shù)據(jù)不重復(fù)尤蛮,但是不能保證數(shù)據(jù)不丟失媳友。但是,對于一些非常重要的信息产捞,比如交易數(shù)據(jù)醇锚,下游數(shù)據(jù)消費(fèi)者要求數(shù)據(jù)既不重復(fù)也不丟失,即 Exactly Once 語義坯临。
0.11版本的 Kafka焊唬,引入了冪等性:producer 不論向 server 發(fā)送多少重復(fù)數(shù)據(jù),server 端都只會(huì)持久化一條看靠。即:
At Least Once + 冪等性 = Exactly Once
要啟用冪等性赶促,只需要將 producer 的參數(shù)中 enable.idompotence
設(shè)置為 true
即可。開啟冪等性的 producer 在初始化時(shí)會(huì)被分配一個(gè) PID挟炬,發(fā)往同一 partition 的消息會(huì)附帶 Sequence Number鸥滨。而 borker 端會(huì)對 <PID,Partition,SeqNumber> 做緩存嗦哆,當(dāng)具有相同主鍵的消息提交時(shí),broker 只會(huì)持久化一條婿滓。
但是 PID 重啟后就會(huì)變化老速,同時(shí)不同的 partition 也具有不同主鍵,所以冪等性無法保證跨分區(qū)會(huì)話的 Exactly Once凸主。
6. 消費(fèi)者
6.1 消費(fèi)方式
consumer 采用 pull(拉人盖汀)模式從 broker 中讀取數(shù)據(jù)。
consumer 采用 push(推送)模式秕铛,broker 給 consumer 推送消息的速率是由 broker 決定的,很難適應(yīng)消費(fèi)速率不同的消費(fèi)者缩挑。它的目標(biāo)是盡可能以最快速度傳遞消息但两,但是這樣很容易造成 consumer 來不及處理消息,典型的表現(xiàn)就是拒絕服務(wù)以及網(wǎng)絡(luò)擁塞供置。而 pull 模式則可以根據(jù) consumer 的消費(fèi)能力以適當(dāng)?shù)乃俾氏M(fèi)消息谨湘。
pull 模式不足之處是,如果 Kafka 沒有數(shù)據(jù)芥丧,消費(fèi)者可能會(huì)陷入循環(huán)中紧阔,一直返回空數(shù)據(jù)。因?yàn)橄M(fèi)者從 broker 主動(dòng)拉取數(shù)據(jù)续担,需要維護(hù)一個(gè)長輪詢擅耽,針對這一點(diǎn), Kafka 的消費(fèi)者在消費(fèi)數(shù)據(jù)時(shí)會(huì)傳入一個(gè)時(shí)長參數(shù) timeout物遇,如果當(dāng)前沒有數(shù)據(jù)可供消費(fèi)乖仇,consumer 會(huì)等待一段時(shí)間之后再返回,這段時(shí)長即為 timeout询兴。
6.2 分區(qū)分配策略
一個(gè) consumer group 中有多個(gè) consumer乃沙,一個(gè) topic 有多個(gè) partition,所以必然會(huì)涉及到 partition 的分配問題诗舰,即確定哪個(gè) partition 由哪個(gè) consumer 來消費(fèi)警儒。
Kafka 有兩種分配策略,一個(gè)是 RoundRobin眶根,一個(gè)是 Range蜀铲,默認(rèn)為range,當(dāng)消費(fèi)者組內(nèi)消費(fèi)者發(fā)生變化時(shí)汛闸,會(huì)觸發(fā)分區(qū)分配策略(方法重新分配)蝙茶。
(1) RoundRobin
RoundRobin 輪詢方式將分區(qū)所有作為一個(gè)整體進(jìn)行 hash 排序,消費(fèi)者組內(nèi)分配分區(qū)個(gè)數(shù)最大差別為1诸老,是按照組來分的隆夯,可以解決多個(gè)消費(fèi)者消費(fèi)數(shù)據(jù)不均衡的問題钳恕。
但是,當(dāng)消費(fèi)者組內(nèi)訂閱不同主題時(shí)蹄衷,可能造成消費(fèi)混亂忧额,如下圖所示,consumer0 訂閱主題A愧口,consumer1 訂閱主題B睦番,將 A、B主題的分區(qū)排序后分配給消費(fèi)者組耍属,TopicB 分區(qū)中的數(shù)據(jù)可能分配到 consumer0 中托嚣。
(2)Range
range 方式是按照主題來分的,不會(huì)產(chǎn)生輪詢方式的消費(fèi)混亂問題厚骗。
但是示启,如下圖所示,consumer0领舰、consumer1 同時(shí)訂閱了主題A和B夫嗓,可能造成消息分配不對等問題,當(dāng)消費(fèi)者組內(nèi)訂閱的主題越多冲秽,分區(qū)分配可能越不均衡舍咖。
6.3 offset 的維護(hù)
由于 consumer 在消費(fèi)過程中可能會(huì)出現(xiàn)斷電宕機(jī)等故障,consumer 恢復(fù)后锉桑,需要從故障前的位置繼續(xù)消費(fèi)排霉,所以 consumer 需要實(shí)時(shí)記錄自己消費(fèi)到了哪個(gè) offset,以便故障恢復(fù)后繼續(xù)消費(fèi)民轴。
Kafka 0.9 版本之前郑诺,consumer 默認(rèn)將 offset 保存在 Zookeeper 中,從 0.9 版本開始杉武,consumer 默認(rèn)將 offset 保存在 Kafka 一個(gè)內(nèi)置的 topic 中辙诞,該 topic 為 __consumer_offsets。
剛剛我們一起深入探討了 Kafka 是什么轻抱,比較偏重理論和基礎(chǔ)飞涂,這是掌握 Kafka 的必要內(nèi)容,接下來我會(huì)以代碼和實(shí)例的方式祈搜,更新 Kafka 有關(guān)API 以及事務(wù)较店、攔截器、監(jiān)控等高級篇容燕,讓大家徹底理解并且會(huì)用 Kafka梁呈。如果對你有幫助,點(diǎn)個(gè)贊相互鼓勵(lì)一下吧~