Kafka的架構(gòu)
包括Kafka的基本組成朝抖,Kafka的拓?fù)浣Y(jié)構(gòu)以及Kafka的內(nèi)部通信協(xié)議糊闽。Kafka內(nèi)部的通信協(xié)議是建立在Kafka的拓?fù)浣Y(jié)構(gòu)之上梳玫,而Kafka的拓?fù)浣Y(jié)構(gòu)是由Kafka的基本模塊所組成的。
AK RELEASE 2.5.0
APRIL 15, 2020
Kafka的基本組成
Kafka集群中生產(chǎn)者將消息發(fā)送給以Topic命名的消息隊(duì)列Queue中右犹,消費(fèi)者訂閱發(fā)往以某個Topic命名的消息隊(duì)列Queue中的消息提澎。其中Kafka集群由若干個Broker組成,Topic由若干個Partition組成傀履,每個Partition里的消息通過Offset來獲取虱朵。
基本組成包括:
Broker:一臺Kafka服務(wù)器就是一個Broker,一個集群由多個Broker組成钓账,一個Broker可以容納多個Topic碴犬,Broker和Broker之間沒有Master和Standby的概念,他們之間地位是平等的
Topic:每條發(fā)送到Kafka集群的消息都屬于某個主題梆暮,這個主題就稱為Topic服协。物理上不同Topic的消息分開存儲,邏輯上一個Topic的消息雖然保存在一個或多個Broker上啦粹,但是用戶只需指定消息主題Topic即可生產(chǎn)或消費(fèi)數(shù)據(jù)而不需要關(guān)心數(shù)據(jù)存放在何處偿荷。
Partition:為了實(shí)現(xiàn)可擴(kuò)展性,一個非常大的Topic可以被分為多個Partition唠椭,從而分布到多臺Broker上跳纳。Partition中的每條消息都會被分配一個自增Id(Offset)。Kafka只保證按照一個Partition中的順序?qū)⑾l(fā)送給消費(fèi)者贪嫂,但是不保證單個Topic中多個Partition之間的順序寺庄。
Offset:消息在Topic的Partition中的位置,同一個Partition中的消息隨著消息的寫入其對應(yīng)的Offset也自增力崇。
Replica:副本斗塘,Topic的Partition有N個副本,N為副本因子亮靴。其中一個Replica為Leader馍盟,其他都為Follower,Leader處理Partition的所有讀寫請求茧吊,F(xiàn)ollower定期同步Leader上的數(shù)據(jù)贞岭。
Message:消息是通信的基本單位八毯。每個Producer可以向一個Topic發(fā)布消息
Producer:消息生產(chǎn)者,將消息發(fā)布到指定的Topic中曹步,也能夠決定消息所屬的Partition:比如基于Round-Robin或者Hash算法
Consumer:消息消費(fèi)者宪彩,向指定的Topic獲取消息,根據(jù)指定Topic的分區(qū)索引及其對應(yīng)分區(qū)上的消息偏移量來獲取消息
Consumer Group:消費(fèi)者組讲婚,每個消費(fèi)者都屬于一個組尿孔。當(dāng)消費(fèi)者具有相同組時,消息會在消費(fèi)者之間負(fù)載均衡筹麸。一個Partition的消息只會被相同消費(fèi)者組中的某個消費(fèi)者消費(fèi)活合。不同消費(fèi)者組是相互獨(dú)立的。
Zookeeper:存放Kafka集群相關(guān)元數(shù)據(jù)的組件物赶。Zookeeper集群中保存了Topic的狀態(tài)信息白指,例如分區(qū)個數(shù)、分區(qū)組成酵紫、分區(qū)的分布情況等告嘲;保存Broker的狀態(tài)信息;保存消費(fèi)者的消費(fèi)信息等奖地。通過這些信息橄唬,Kafka很好地將消息生產(chǎn)、消息存儲参歹、消息消費(fèi)的過程結(jié)合起來仰楚。
Kafka的拓?fù)浣Y(jié)構(gòu)
一個典型的Kafka集群中包含若干個Producer(可以是某個模塊下發(fā)的Command,或者是Web前端產(chǎn)生的PageView犬庇,或者是服務(wù)器日志僧界,系統(tǒng)CPU、Memory等)臭挽,若干個Broker(Kafka集群支持水平擴(kuò)展捂襟,一般Broker數(shù)量越多,整個Kafka集群的吞吐率也就越高)欢峰,若干個Consumer Group笆豁,以及一個Zookeeper集群。Kafka通過Zookeeper管理集群配置赤赊。Producer使用Push模式將消息發(fā)布到Broker上,Consumer使用Pull模式從Broker上訂閱并消費(fèi)消息煞赢。
簡單的消息發(fā)送流程如下:
1. Producer根據(jù)指定的路由方法抛计,將消息Push到Topic的某個Partition里
2. Kafka集群接收到Producer發(fā)過來的消息并持久化到硬盤,并保留消息指定時長(可配置)照筑,不關(guān)注消息是否被消費(fèi)吹截。
3. Consumer從Kafka集群Pull數(shù)據(jù)瘦陈,并控制獲取消息的Offset
Kafka內(nèi)部的通信協(xié)議
Kafka內(nèi)部各個Broker之間的角色并不是完全相等的,Broker內(nèi)部負(fù)責(zé)管理分區(qū)和副本狀態(tài)以及異常情況下分區(qū)的重新分片等這些功能的模塊稱為KafkaController波俄。每個Kafka集群中有且只有一個Leader狀態(tài)的KafkaController晨逝,當(dāng)其出現(xiàn)異常時,其余Standby狀態(tài)的KafkaController會通過Zookeeper選舉出有一個Leader狀態(tài)的KafkaController懦铺。
維度一:通信協(xié)議詳情
- ProducerRequest:生產(chǎn)者發(fā)送消息的請求捉貌,生產(chǎn)者將消息發(fā)送至Kafka集群中的某個Broker,Broker接收到此請求后持久化此消息并更新相關(guān)元數(shù)據(jù)信息冬念。
ProducerRequest.requiredAcks的取值為0時趁窃,生產(chǎn)者不關(guān)心Broker Server端持久化執(zhí)行結(jié)果,但是高級消費(fèi)者發(fā)送的提交偏移量的請求還是需要返回具體執(zhí)行結(jié)果急前。為1則生產(chǎn)者消費(fèi)者都需要將Broker Server端持久化的執(zhí)行結(jié)果返回客戶端醒陆。為-1時,不會立刻返回Broker Server端消息持久化的結(jié)果裆针,而是需要等待Partition的ISR列表中的Replica完成數(shù)據(jù)同步刨摩,并且ISR列表的個數(shù)大于min.insync.replicas時才會將響應(yīng)返回給對應(yīng)客戶端。這里采用的是稱為Purgatory的策略世吨。Broker Server上對應(yīng)的Partition的HighWatermark發(fā)生改變才觸發(fā)檢查澡刹。
TopicMetadataRequest:獲取Topic元數(shù)據(jù)信息的請求,無論是生產(chǎn)者還是消費(fèi)者都需要通過此請求來獲取感興趣的Topic的元數(shù)據(jù)另假。
FetchRequest:消費(fèi)者獲取Topic的某個分區(qū)的消息的請求像屋。分區(qū)狀態(tài)為Follower的副本也需要利用此請求去同步分區(qū)狀態(tài)為Leader的對應(yīng)副本數(shù)據(jù)。
OffsetRequest:消費(fèi)者發(fā)送至Kafka集群來獲取感興趣Topic的分區(qū)偏移量的請求边篮,通過此請求可以獲知當(dāng)前Topic所有分區(qū)在不同時間段的偏移量詳情己莺。
OffsetCommitRequest:消費(fèi)者提交Topic被消費(fèi)的分區(qū)偏移量信息至Broker,Broker接收到此請求后持久化相關(guān)偏移量信息戈轿。
OffsetFetchRequest:消費(fèi)者發(fā)送獲取提交至Kafka集群的相關(guān)Topic被消費(fèi)詳細(xì)信息凌受,和OffsetCommitRequest相互對應(yīng)。
LeaderAndIsrRequest:當(dāng)Topic的某個分區(qū)狀態(tài)發(fā)送變化時思杯,處于Leader狀態(tài)的KafkaController發(fā)送至相關(guān)Broker通知其做出相應(yīng)處理胜蛉。
當(dāng)某個Replica稱為Leader:暫停Fetch→添加進(jìn)Assigned Replica列表→添加進(jìn)In-Sync Replica列表→刪除已經(jīng)不存在的Assigned Replica→初始化Leader Replica的HighWatermark
當(dāng)某個Replica成為Follower:暫停舊的Fetch線程→截?cái)鄶?shù)據(jù)至HighWatermark以下→開啟新的Fetch線程→添加進(jìn)Assigned Replica列表→刪除已經(jīng)不存在的Assigned Replica
StopReplicaRequest:當(dāng)Topic的某個分區(qū)被刪除或者下線的時候,處于Leader狀態(tài)KafkaController發(fā)送至相關(guān)Broker通知其做出相應(yīng)處理色乾。
UpdateMetadataRequest:當(dāng)Topic的元數(shù)據(jù)信息發(fā)生變化時誊册,處于Leader狀態(tài)的KafkaController發(fā)送至相關(guān)Broker通知其做出相應(yīng)處理。
BrokerControllerShutdownRequest:當(dāng)Broker正常下線時暖璧,發(fā)送此請求到處于Leader狀態(tài)的KafkaController案怯。
ConsumerMetadataRequest:獲取保存特定Consumer Group消費(fèi)詳情的分區(qū)信息
維度二:通信協(xié)議交互
Producer和Kafka集群:Producer需要利用ProducerRequest和TopicMetadataRequest來完成Topic元數(shù)據(jù)的查詢、消息的發(fā)送澎办。
Consumer和Kafka集群:Consumer需要利用TopicMetadataRequest請求嘲碱、FetchRequest請求金砍、OffsetRequest、OffsetCommitRequest麦锯、OffsetFetchRequest恕稠、ConsumerMetadataRequest來完成Topic元數(shù)據(jù)的查詢、消息的訂閱扶欣、歷史偏移量的查詢鹅巍、偏移量的提交、當(dāng)前偏移量的查詢宵蛀。
KafkaController狀態(tài)為Leader的Broker和KafkaController狀態(tài)為Standby的Broker:Leader需要用LeaderAndIsrRequest昆著、StopReplicaRequest、UpdateMetadataRequest來完成對Topic的管理术陶。Standby需要利用BrokerControllerShutdownRequest來通知Leader自己的下線動作凑懂。
Broker和Broker之間:Broker相互之間需要利用FetchRequest請求來同步Topic分區(qū)的副本數(shù)據(jù),這樣才能使Topic分區(qū)各副本數(shù)據(jù)保持一致梧宫。
Broker概述
Broker內(nèi)部存在的功能模塊包括SocketServer接谨、KafkaRequestHandlerPool、LogManager塘匣、ReplicaManager脓豪、OffsetManager、KafkaScheduler忌卤、KafkaApis扫夜、KafkaHealthcheck和TopicConfigManager九大基本模塊以及KafkaController集群控制管理模塊。
SocketServer:首先開啟一個Acceptor線程驰徊,新的Socket連接成功建立時會將對應(yīng)的SocketChannel以輪詢方式轉(zhuǎn)發(fā)給N個Processor線程中的某一個笤闯,由其處理接下來SocketChannel的請求,將請求放置在RequestChannel中的請求隊(duì)列棍厂;當(dāng)Processor線程監(jiān)聽到SocketChannel請求的響應(yīng)時颗味,會將響應(yīng)從RequestChannel中的響應(yīng)隊(duì)列中取出來并發(fā)給客戶端
KafkaRequestHandlerPool:真正處理Socket請求的線程池,其個數(shù)默認(rèn)為8個牺弹,由參數(shù)num.io.threads決定浦马。該線程池里面的線程KafkaRequestHandler從RequestChannel的請求隊(duì)列中獲取Socket請求,然后調(diào)用KafkaApis完成真正業(yè)務(wù)邏輯最后將響應(yīng)寫回至RequestChannel中的響應(yīng)隊(duì)列张漂,并交給SocketServer中對應(yīng)的Processer線程發(fā)給客戶端晶默。
LogManager:Kafka的日志管理模塊,主要提供針刪除任何過期數(shù)據(jù)和冗余數(shù)據(jù)航攒,刷新臟數(shù)據(jù)磺陡,對日志文件進(jìn)行Checkpoint以及日志合并的功能。
負(fù)責(zé)提供Broker Server上Topic的分區(qū)數(shù)據(jù)讀取和寫入功能,負(fù)責(zé)讀取和寫入位于Broker Server上的所有分區(qū)副本數(shù)據(jù)仅政。如果Partition有多個Replica,則每個Broker Server不會存在相同Partition的Replica盆驹,如果存在一旦遇到Broker Server下線會丟失Partition的多份副本可用性降低圆丹。
LogManager中包含多個TopicAndPartition,每個TopicAndPartition對應(yīng)一個Log躯喇,每個Log中包含多個LogSegment(每個LogSegment文件包括一個日志數(shù)據(jù)文件【.log】和兩個索引文件(偏移量索引文件【.index】和消息時間戳索引文件【.timeindex】))辫封。LogSegment的結(jié)構(gòu)中l(wèi)og代表消息集合,每條消息都有一個Offset廉丽,這是針對Partition中的偏移量倦微;index代表的是消息的索引信息,以KV對的形式記錄正压,其中K為消息在log中的相對偏移量欣福,V為消息在log中的絕對位置;baseOffset代表的是該LogSegment日志段的起始偏移量焦履;indexIntervalByte代表的索引粒度拓劝,即寫入多少字節(jié)之后生成一條索引。OffsetIndex不會保存每條消息的索引嘉裤,因此其索引文件是一個稀疏索引文件(稀疏索引:索引項(xiàng)中只對應(yīng)主文件中的部分記錄郑临,即不會給每條記錄建立索引)。
后臺還會維護(hù)一個日志合并線程屑宠,Kafka發(fā)送消息的時候需要攜帶3個參數(shù)(Topic厢洞,Key,Message)典奉,針對相同的Key值不同的Message只保留最后一個Key值對應(yīng)的消息內(nèi)容躺翻。
- ReplicaManager:Kafka副本管理模塊。主要提供針對Topic分區(qū)副本數(shù)據(jù)的管理功能秋柄,包括有關(guān)副本的Leader和ISR的狀態(tài)變化获枝、副本的刪除、副本的監(jiān)測等骇笔。ISR全稱是In-Syn Replicas省店,處于同步狀態(tài)的副本。AR全稱是Assign Replicas的縮寫笨触,代表分配給Partition的副本懦傍。
主要利用ReplicaFetcherThread(副本數(shù)據(jù)拉取線程)和High Watermark Mechanism(高水位線機(jī)制)來實(shí)現(xiàn)數(shù)據(jù)的同步管理。單個ReplicaFetcherThread線程負(fù)責(zé)某個Broker Server上部分TopicAndPartition的Replica數(shù)據(jù)同步芦劣。將拉取的消息寫入log粗俱,更新當(dāng)前Replica的HighWatermark,代表的是ISR中所有replicas的last commited message的最小起始偏移量虚吟。當(dāng)某個Broker Server上被分配到Replica的時候會進(jìn)入becomeLeaderOrFollower處理流程寸认;當(dāng)Replica被刪除為進(jìn)入stopReplicas處理流程签财;當(dāng)Follower狀態(tài)的Replica長時間沒有同步Leader狀態(tài)的Replica的時候會進(jìn)入maybeShrinkIsr處理流程。
OffsetManager:Kafka的偏移量管理模塊偏塞,主要提供針對偏移量的保存讀取的功能唱蒸,兩種方式保存:一種是把偏移量保存到Zookeeper上,另一種是Kafka灸叼,把偏移量提交至Kafka內(nèi)部Topic為"__consumer_offsets"的日志里面神汹,主要由offsets.storage參數(shù)決定。默認(rèn)為Zookeeper古今。
KafkaScheduler:Kafka的后臺任務(wù)調(diào)度資源池屁魏。提供后臺定時任務(wù)的調(diào)度,主要為LogManager捉腥、ReplicaManager氓拼、OffsetManager提供調(diào)度服務(wù)。
KafkaApis:Kafka的業(yè)務(wù)邏輯實(shí)現(xiàn)層但狭,根據(jù)不同的Request執(zhí)行不同的操作披诗,其中利用LogManager、ReplicaManager立磁、OffsetManager來完成內(nèi)部處理呈队。
KafkaHealthcheck:Broker Server在./brokers/ids上注冊自己的ID,當(dāng)Broker在線的時候唱歧,則對應(yīng)的ID存在宪摧;當(dāng)離線時對應(yīng)ID不存在,從而達(dá)到集群狀態(tài)監(jiān)測的目的颅崩。
TopicConfigManager:在/config/changes上注冊自己的回調(diào)函數(shù)來監(jiān)測Topic配置信息的變化
-
KafkaController:Kafka集群控制管理模塊几于。由于Zookeeper上保存了Kafka機(jī)器的元數(shù)據(jù)信息,因?yàn)镵afkaController通過在不同目錄注冊不同的回調(diào)函數(shù)來達(dá)到監(jiān)測集群狀態(tài)的目的沿后,以及響應(yīng)集群狀態(tài)的變化:
1. /controller目錄保存了Kafka集群中狀態(tài)為Leader的KafkaController標(biāo)識沿彭,通過監(jiān)測這個目錄的變化可以即使響應(yīng)KafkaController狀態(tài)的切換
2. /admin/reassign_partitions目錄保存了Topic重分區(qū)的信息,通過監(jiān)測這個目錄的變化可以及時響應(yīng)Topic分區(qū)變化的請求
3. /admin/preferred_replica_election目錄保存了Topic分區(qū)副本的信息尖滚,通過監(jiān)測這個目錄的變化可以及時響應(yīng)Topic分區(qū)副本變化的請求喉刘。
4. /brokers/topics目錄保存了Topic的信息,通過監(jiān)測這個目錄的變化可以及時響應(yīng)Topic創(chuàng)建和刪除的請求漆弄。
5. /brokers/ids目錄保存了Broker的狀態(tài)睦裳,通過監(jiān)測這個目錄的變化可以及時響應(yīng)Broker的上下線情況。
Broker的控制管理模塊
每個Broker內(nèi)部都會存在一個KafkaController模塊撼唾,但是有且只有一個Broker內(nèi)部的KafkaController模塊對外提供控制管理Kafka集群的功能廉邑,例如負(fù)責(zé)Topic的創(chuàng)建熬甫、分區(qū)的重分配以及分區(qū)副本Leader的重新選舉等颂斜。
KafkaController的選舉策略
Leader和Follower的選舉是基于Zookeeper實(shí)現(xiàn)的,嘗試在Zookeeper的相同路徑上創(chuàng)建瞬時節(jié)點(diǎn)(Ephemeral Node)筏勒,只有一個KafkaController會創(chuàng)建成功盖矫。其中負(fù)責(zé)狀態(tài)管理的類為ZookeeperLeaderElector缰揪,字面意思上就可以看出是基于Zookeeper的Leader選舉權(quán)误窖。其中包含了controllerContext當(dāng)前Topic的元數(shù)據(jù)信息以及集群的元數(shù)據(jù)信息等碉京;electionPath為多個KafkaController競爭寫的路徑,其值為/controller课舍;onBecomingLeader為狀態(tài)轉(zhuǎn)化成Leader時候的回調(diào)函數(shù);onResigningAsLeader為狀態(tài)轉(zhuǎn)化位Follower時候的回調(diào)函數(shù)他挎;brokerId為當(dāng)前Broker Server的Id筝尾。ZookeeperLeaderElector啟動后負(fù)責(zé)觀察數(shù)據(jù)節(jié)點(diǎn)狀態(tài),瞬時節(jié)點(diǎn)消失觸發(fā)再次選舉办桨,嘗試寫入的節(jié)點(diǎn)內(nèi)容就是brokerId筹淫。
KafkaController的初始化
當(dāng)選舉為Leader時分為下面幾步:
1. 初始化Kafka集群內(nèi)部的時鐘,存放在Zookeeper的/controller_epoch呢撞,Broker Server用這個值區(qū)分請求的時效性
2. 注冊各種監(jiān)聽函數(shù)损姜,針對Zookeeper不同目錄下Kafka存儲的不同元數(shù)據(jù)進(jìn)行監(jiān)聽。
3. 通過initializeControllerContext()殊霞、replicaStateMachine.startup()和partitionStateMachine.startup()初始化Kafka集群內(nèi)部元數(shù)據(jù)信息摧阅。建立和集群內(nèi)其他狀態(tài)為Follower的KafkaController的通信鏈路,處理集群啟動前沒有及時處理的用戶請求绷蹲,此時可能會變更Kafka集群內(nèi)部的元數(shù)據(jù)信息棒卷,最后通過sendUpdateMetadataRequest()將Kafka集群內(nèi)部的元數(shù)據(jù)信息同步給其他狀態(tài)為Follower的KafkaController。
4. 根據(jù)auto.leader.rebalance.enable配置項(xiàng)按需啟動Kafka集群內(nèi)部的負(fù)載均衡線程祝钢,默認(rèn)開啟
5. 根據(jù)delete.topic.enable配置項(xiàng)按需啟動Kafka集群內(nèi)部的Topic刪除線程比规,默認(rèn)關(guān)閉
選舉為Follower的時候分為下面幾步,正好與Controller相反:
1. 取消Zookeeper路徑上的監(jiān)聽函數(shù)
- 根據(jù)delete.topic.enable配置項(xiàng)按需啟動Kafka集群內(nèi)部的Topic刪除線程拦英,默認(rèn)關(guān)閉
3. 關(guān)閉Kafka集群內(nèi)部的負(fù)載均衡線程
4. 斷開和集群內(nèi)其他狀態(tài)為Follower的KafkaController的通信線路
5. 重置集群內(nèi)部時鐘
Topic的分區(qū)狀態(tài)轉(zhuǎn)化機(jī)制
Topic的分區(qū)狀態(tài)維護(hù)是由PartitionStateMachine模塊負(fù)責(zé)的蜒什,通過在/brokers/topics 和 /admin/delete_topics目錄上注冊不同的監(jiān)聽函數(shù),監(jiān)聽Topic的創(chuàng)建和刪除事件觸發(fā)Topic分區(qū)狀態(tài)的轉(zhuǎn)換疤估。
PartitionStateMachine中分區(qū)狀態(tài)由PartitionState用一個字節(jié)表示不同狀態(tài)灾常,分為四種:
NonExistentPartition:代表分區(qū)從來沒有被創(chuàng)建或者被創(chuàng)建之后又被刪除呃狀態(tài)
NewPartition:分區(qū)剛創(chuàng)建包含AR,但是此時Leader或ISR還沒有創(chuàng)建做裙,處于非活動狀態(tài)無法接收數(shù)據(jù)
OnlinePartition:分區(qū)Leader已經(jīng)被選舉岗憋,產(chǎn)生來對應(yīng)的ISR,處于活動狀態(tài)可以接收數(shù)據(jù)
OfflinePartition:代表分區(qū)Leader由于某種原因下線時導(dǎo)致分區(qū)暫時不可用
每個狀態(tài)都是由一個合理的前置狀態(tài)轉(zhuǎn)換而來锚贱。
Topic分區(qū)的領(lǐng)導(dǎo)者副本選舉策略
Topic分區(qū)的Leader Replica在不同場景下的選舉策略是不一樣的仔戈,不同選舉策略都基礎(chǔ)PartitionLeaderSelector。其根據(jù)Topic、Partition监徘、當(dāng)前Leader晋修、當(dāng)前的ISR選舉出新的Leader,新的ISR和新的AR(在線狀態(tài))凰盔,共有5種不同的策略:
NoOpLeaderSelector:默認(rèn)的選舉策略
ReassignedPartitionLeaderSelector:當(dāng)分區(qū)AR重新分配時使用的策略
PreferredReplicaPartitionLeaderSelector:集群內(nèi)部自動平衡負(fù)載或者用戶觸發(fā)手動平衡負(fù)載時使用的策略
隨著Topic的新建刪除以及Broker Server的上下線墓卦,原本Topic分區(qū)的Leader Replica在集群中的分布越來越不均勻。 auto.leader.rebalance.enable為true户敬,則會自動觸發(fā)分區(qū)的Leader Replica選舉落剪,或者管理員下發(fā)分區(qū)Leader Replica選舉指令。這會在Zookeeper的 /admin/preferred_replica_election指定具體的Topic和分區(qū)尿庐,此時Leader狀態(tài)的KafkaController監(jiān)測到這個路徑的數(shù)據(jù)變化就會觸發(fā)相應(yīng)的回調(diào)函數(shù)忠怖,促使對應(yīng)的Topic分區(qū)發(fā)生Leader Replica的選舉。
- OfflinePartitionLeaderSelector:分區(qū)狀態(tài)從OfflinePartition或者NewPartition切換為OnlinePartition時使用的策略
1. 篩選出在線的ISR和AR
2. 優(yōu)先從在線的ISR中選擇抄瑟,如果列表不為空則選擇列表中的第一個凡泣,選舉結(jié)束
3. 在線ISR為空,根據(jù) unclean.leader.election.enable 決定是否從在線的AR中選舉Leader皮假,如果允許鞋拟,則選擇AR列表中的第一個,結(jié)束選舉惹资,如果AR列表為空選舉失敗贺纲。
- ControllerShutdownLeaderSelector:Leader狀態(tài)的KafkaController處理其他Broker Server下線導(dǎo)致分區(qū)的Leader Replica發(fā)生切換時使用的策略。
1. 篩選出在線的ISR
2. 剔除離線的ISR形成新的ISR列表
3. 如果新的ISR列表不為空褪测,則選舉第一個Replica作為新的Leader哮笆,否則選舉失敗
Topic分區(qū)的副本狀態(tài)轉(zhuǎn)換機(jī)制
Topic分區(qū)的副本狀態(tài)維護(hù)是由ReplicaStateMachine模塊負(fù)責(zé)的,Topic分區(qū)的副本狀態(tài)伴隨著Topic分區(qū)狀態(tài)的變化而變化
分區(qū)副本狀態(tài)只要有7種:
NewReplica:分區(qū)剛被分配但是沒有開始工作的狀態(tài)
OnlineReplica:分區(qū)副本開始工作時的狀態(tài)汰扭,此時該副本時該分區(qū)的Leader或者Follower
OfflineReplica:分區(qū)副本所在的Broker Server宕機(jī)時所導(dǎo)致的副本狀態(tài)
ReplicaDeletionStarted:分區(qū)副本下線之后準(zhǔn)備開始刪除的狀態(tài)
ReplicaDeletionSuccessful:相關(guān)Broker Server正確響應(yīng)分區(qū)副本被刪除請求之后的狀態(tài)
ReplicaDeletionIneligible:相關(guān)Broker Server錯誤響應(yīng)分區(qū)副本被刪除請求之后的狀態(tài)
NonExistentReplica:代表分區(qū)副本被徹底刪除之后的狀態(tài)
目標(biāo)狀態(tài)也是由合理的前置狀態(tài)轉(zhuǎn)換而來的稠肘。
KafkaController內(nèi)部的監(jiān)聽器
KafkaController內(nèi)部通過監(jiān)聽函數(shù)來維護(hù)集群的元數(shù)據(jù)。
TopicChangeListener:注冊在 /broker/topics 路徑萝毛,監(jiān)聽Topic的創(chuàng)建
AddPartitionListener:在Topic創(chuàng)建過程中會在 /broker/topics/[topic]目錄下注冊AddPartitionListener用于監(jiān)聽Topic分區(qū)的變化
PartitionReassignedListener:KafkaController轉(zhuǎn)換為Leader的過程中在路徑 /admin/reassign_partitions注冊了PartitionReassignedListener用于監(jiān)聽Topic分區(qū)的重分配项阴。在正式啟動重分配之前會判斷是否需要進(jìn)行重分配,重分配之后的AR列表和當(dāng)前的AR列表不相同并且重分配之后的AR列表所在的Broker Server都在線笆包,滿足上面兩個條件才會觸發(fā)分區(qū)的重分配环揽。
ReassignedPartitionsIsrChangeListener:當(dāng)Leader Replica所在的Broker Server接收到來自Follower Replica的FetchRequest請求時,KafkaApis的handleFetchRequest會統(tǒng)計(jì)每個Replica的狀態(tài)庵佣,一旦發(fā)現(xiàn)改Replica同步上Leader Replica之后歉胶,此時會調(diào)用Partition的updateLeaderHWAndMaybeExpandIsr及時更新 /brokers/topics/[topic]/partitions/partitionId/state/ 目錄上的Partition狀態(tài),包括Leader巴粪、ISR等信息通今,監(jiān)聽到分區(qū)狀態(tài)發(fā)生變化會觸發(fā)ReassignedPartitionsIsrChangeListener
PreferredReplicaElectionListener:每個Partition可以有多個Replica粥谬,即AR列表。在這個列表中的第一個Replica稱為“Preferred Replica”辫塌,當(dāng)創(chuàng)建Topic時漏策,Kafka要確保所有的Topic的“Preferred Replica”均勻地分布在Kafka集群中。Topic的Partition需要重新均衡Leader Replica至Preferred Replica臼氨,此時會觸發(fā)PreferredReplicaElectionListener
BrokerChangeListener:Broker Server的上下線影響著其中所有Replica的狀態(tài)掺喻,因此ReplicaStateMachine在路徑為/broker/topic的路徑上注冊了BrokerChangeListener,用于監(jiān)聽Broker Server的上下線储矩。
Broker Server上線時步驟為:
1. Leader狀態(tài)的KafkaController同步Kafka集群所有的Topic信息給新上線的Broker Server感耙。
2. 將原本位于該Broker Server上的所有Replica狀態(tài)切換至OnlineReplica
3. 如果Partition的Replica有且只有一個,并且正好位于Broker Server上持隧,則切換Partition狀態(tài)至OnlinePartition抑月。
4. 如果分區(qū)重分配之前由于該Broker Server下線導(dǎo)致推出的話,則嘗試重新進(jìn)行分區(qū)重分配舆蝴。
5. 如果之前由于Broker Server下線導(dǎo)致對應(yīng)的Replica無法刪除的話,則恢復(fù)刪除流程题诵。
Broker Server下線步驟:
1. 更新ControllerContext內(nèi)部正在下線的Broker Server列表
2. 將Leader Replica位于該Broker Server上的分區(qū)狀態(tài)切換為OnlinePartition洁仗,緊接著觸發(fā)分區(qū)狀態(tài)切換為OnlinePartition,利用OfflinePartitionLeaderSelector副本選舉策略進(jìn)行Leader Replica的選舉性锭。
3. 將位于該Broker Server上的Replica狀態(tài)切換為OfflineReplica
4. 如果對應(yīng)Replica的Topic處于刪除隊(duì)列中的話赠潦,則標(biāo)記暫時無法刪除。
- DeleteTopicsListener:監(jiān)聽Topic的刪除
Kafka集群的負(fù)載均衡流程
Partition的AR列表的第一個Replica稱為“Preferred Replica”草冈,并均勻分布在整個Kafka集群中她奥。由于每個Partition只有Leader Replica對外提供讀寫服務(wù),并且Partition創(chuàng)建的時候默認(rèn)的Leader Replica位于Preferred Replica之上怎棱,此時Kafka集群的負(fù)載是均衡的哩俭,如果Kafka集群長時間運(yùn)行,Broker Server中途由于異常而發(fā)生重啟拳恋,此時Partition的Leader Replica會發(fā)生遷移凡资,這樣會導(dǎo)致其Partition的Leader Replica在集群中不再均衡了。
Kafka集群的Topic刪除流程
Topic是由Partition組成的谬运,而Partition是由Replica組成的隙赁,因此只有Partition的Assigned Replica全部被刪除了該P(yáng)artition才可以被刪除;只有Topic的所有Partition都被刪除了梆暖,該Topic才可以最終真正的被刪除伞访。
KafkaController的通信模塊
ControllerChannelManager提供了Leader狀態(tài)的KafkaController和集群其他Broker Server通信的功能,內(nèi)部針對每一個在線的Broker Server會維護(hù)一個通信鏈路轰驳,并分別通過各自的RequestSendThread線程將請求發(fā)送給對應(yīng)的Broker Server厚掷。
Topic管理工具
kafka-topics.sh提供了Topic的創(chuàng)建弟灼、修改、列舉蝗肪、描述袜爪、刪除功能,在內(nèi)部時通過TopicCommand類來實(shí)現(xiàn)的薛闪。
kafka-reassign-partitions.sh提供來重新分配分區(qū)副本的能力辛馆。該工具可以促進(jìn)Kafka集群的負(fù)載均衡。因?yàn)镕ollower Replica需要從Leader Replica Fetch數(shù)據(jù)以保持與與Leader Replica同步豁延,僅保持Leader Replica分布的平衡對整個集群的負(fù)載均衡時不夠的昙篙。另外當(dāng)Kafka集群擴(kuò)容后,該工具可以將已有Topic的Partition遷移到新加入的Broker上诱咏。
分區(qū)重分片是一個異步的流程苔可,因此該腳本還提供了查看當(dāng)前分區(qū)重分配進(jìn)度的指令。
kafka-preferred-replica-election.sh用于在整個集群中恢復(fù)Leader Replica為Preferred Replica袋狞。
生產(chǎn)者
生產(chǎn)者是指消息的生成者焚辅。生產(chǎn)者可以通過特定的分區(qū)函數(shù)決定消息路由到Topic的某個分區(qū)。消息的生產(chǎn)者發(fā)送消息有兩種模式苟鸯,分別為同步模式和異步模式同蜻。
kafka.javaapi.producer.Producer#send方法發(fā)送
指定 metadata.broker.list 屬性,配置Broker地址
指定 partitioner.class 屬性早处,配置分區(qū)函數(shù)湾蔓,分區(qū)函數(shù)決定路由。分區(qū)函數(shù)必須實(shí)現(xiàn) kafka.producer.Partitioner的 partition接口砌梆,參數(shù)為消息key值默责,分區(qū)總數(shù),返回值為分區(qū)的索引咸包。
Producer內(nèi)部包括以下幾個主要模塊:
ProducerSendThread:當(dāng)producer.type配置為async桃序,則其主要用于緩存客戶端的KeyedMessage,然后累積到batch.num.messages配置數(shù)量或者間隔 queue.enqueue.timeout.ms配置的時間還沒有獲取到新的客戶端的KeyedMessage烂瘫,則調(diào)用DefaultEventHandler將KeyedMessage發(fā)送出去
ProducerPool:緩存客戶端和各個Broker Server的通信葡缰,DefaultEventHandler從ProducerPool中獲取和某個Broker Server的通信對象SyncProducer,然后通過SyncProducer將KeyedMessage發(fā)送給指定的Broker Server忱反。
DefaultEventHandler:將KeyedMessage集合按照分區(qū)規(guī)則計(jì)算不同Broker Server所應(yīng)該接收的部分KeyedMessage泛释,然后通過SyncProducer將KeyedMessage發(fā)送出去。在DefaultEventHandler模塊內(nèi)部提供來SyncProducer發(fā)送失敗的重試機(jī)制和平滑擴(kuò)容Broker Server的機(jī)制温算。
發(fā)送模式
生產(chǎn)者由兩種發(fā)送模式:同步和異步
當(dāng)producer.type配置為sync時怜校,同步發(fā)送消息。
當(dāng)producer.type配置為async時注竿,異步發(fā)送消息茄茁。
消費(fèi)者
Kafka提供了兩種不同的方式來獲取消息:簡單消費(fèi)者和高級消費(fèi)者魂贬。簡單消費(fèi)者獲取消息時,用戶需要知道待消費(fèi)的消息位于哪個Topic的哪個分區(qū)裙顽,并且該目的分區(qū)的Leader Replica位于哪個Broker Server上付燥;高級消費(fèi)者獲取消息時,只需要指定待消費(fèi)的消息屬于哪個Topic即可愈犹。
簡單消費(fèi)者
簡單消費(fèi)者提供的客戶端API稱為低級API键科,本質(zhì)上客戶端獲取消息最終時利用FetchRequest請求從目的端Broker Server拉取消息。
FetchRequest請求中可以指定Topic的名稱漩怎,Topic的分區(qū)勋颖,起始偏移量、最大字節(jié)數(shù)勋锤。
客戶端無論生產(chǎn)消息還是消費(fèi)消息饭玲,最終都是通過與目的地端Broker Server建立通信鏈路,并且以阻塞模式允許叁执,然后通過該條鏈路將不同的請求發(fā)送出去茄厘。
高級消費(fèi)者
高級消費(fèi)者以Consumer Group(消費(fèi)組)的形式來管理消息的消費(fèi),以Stream(流)的形式來提供具體消息的讀取谈宛。Stream是指來自若干個Broker Server上的若干個Partition的消息次哈。客戶端需要正確設(shè)置Stream的個數(shù)入挣,并且應(yīng)該針對每個Stream開啟一個線程進(jìn)行消息的消費(fèi)。一個Stream代表了多個Partition消息的聚合硝拧,但是每一個Partition只能映射到一個Stream径筏。
消息的最終獲取是通過遍歷KafkaStream的迭代器ConsumerIterator來逐條獲取的,其數(shù)據(jù)來源于分配給該KafkaStream的阻塞消息隊(duì)列BlockingQueue障陶,而BlockingQueue的數(shù)據(jù)來源針對每個Broker Server的FetchThread線程滋恬。FetchThread線程會將Broker Server上的部分Partition數(shù)據(jù)發(fā)送給對應(yīng)的阻塞消息隊(duì)列BlockingQueue,而KafkaStream正是從該阻塞消息隊(duì)列BlockingQueue中不斷的消費(fèi)消息抱究。
ConsumerThread本質(zhì)上是客戶端的消費(fèi)線程恢氯,消費(fèi)若干個Partition上的數(shù)據(jù),并且與BlockingQueueu相互映射鼓寺,只要確定了ConsumerThread和Partition之間的關(guān)系勋拟,也就確定了BlockingQueue和Partition之間的關(guān)系。Kafka提供了兩種ConsumerThread和Partition的分配算法Range(范圍分區(qū)分配)和RoundRobin(循環(huán)分區(qū)分配)
高級消費(fèi)者中妈候,每個具體消費(fèi)者實(shí)例啟動之后會在/consumers/[group]/ids/的Zookeeper目錄下注冊自己的id敢靡;Kafka集群內(nèi)部Topic會在/brokers/topics/[topic]/的Zookeeper目錄下注冊自己的Partition,因此消費(fèi)者實(shí)例一旦發(fā)現(xiàn)以上2個路徑的數(shù)據(jù)發(fā)生變化時苦银,則會觸發(fā)高級消費(fèi)者的負(fù)載均衡流程啸胧,除此之外赶站,消費(fèi)者實(shí)例一旦和Zookeeper的鏈接重新建立時也會觸發(fā)高級消費(fèi)者的負(fù)載均衡流程。
高級消費(fèi)者內(nèi)部針對Zookeeper的連接建立纺念、Topic的Partition變化贝椿、Consumer的新增會建立3個不同的Listener,分別是ZKSessionExpireListener陷谱、ZKTopicPartitionChangeListener和ZKRebalancerListener烙博。
高級消費(fèi)者消費(fèi)消息時提供了兩種持久化偏移量的機(jī)制,由參數(shù)auto.commit.enable叭首,默認(rèn)為true自動提交习勤。否則需要手動調(diào)用ZookeeperConsumerConnector的commitOffsets。Kafka根據(jù)參數(shù)offsets.storage焙格,默認(rèn)為zookeeper(保存路徑為/consumers/[group]/offset/[topic]/[partition])图毕,可以設(shè)置為kafka(保存再Topic為“__consumer_offsets”的日志中)。高級消費(fèi)者內(nèi)部會自動間隔一定時間(由參數(shù) auto.commit.interval.ms決定眷唉,默認(rèn)60*1000ms)
面試相關(guān)
kafka高吞吐量的原因:
一予颤、順序讀寫磁盤,充分利用了操作系統(tǒng)的預(yù)讀機(jī)制冬阳。
二蛤虐、linux中使用sendfile命令,減少一次數(shù)據(jù)拷貝肝陪,如下驳庭。
①把數(shù)據(jù)從硬盤讀取到內(nèi)核中的頁緩存。
②把數(shù)據(jù)從內(nèi)核中讀取到用戶空間氯窍。(sendfile命令將跳過此步驟)
③把用戶空間中的數(shù)據(jù)寫到socket緩沖區(qū)中饲常。
④操作系統(tǒng)將數(shù)據(jù)從socket緩沖區(qū)中復(fù)制到網(wǎng)卡緩沖區(qū),以便將數(shù)據(jù)經(jīng)網(wǎng)絡(luò)發(fā)出
三狼讨、生產(chǎn)者客戶端緩存消息批量發(fā)送贝淤,消費(fèi)者批量從broker獲取消息,減少網(wǎng)絡(luò)io次數(shù)政供,充分利用磁盤順序讀寫的性能播聪。
四、通常情況下kafka的瓶頸不是cpu或者磁盤布隔,而是網(wǎng)絡(luò)帶寬离陶,所以生產(chǎn)者可以對數(shù)據(jù)進(jìn)行壓縮。
kafka在高并發(fā)的情況下,如何避免消息丟失和消息重復(fù)?
消息丟失解決方案:
首先對kafka進(jìn)行限速衅檀, 其次啟用重試機(jī)制枕磁,重試間隔時間設(shè)置長一些,最后Kafka設(shè)置acks=all术吝,即需要相應(yīng)的所有處于ISR的分區(qū)都確認(rèn)收到該消息后计济,才算發(fā)送成功
消息重復(fù)解決方案:
消息可以使用唯一id標(biāo)識
生產(chǎn)者(ack=all 代表至少成功發(fā)送一次)
消費(fèi)者 (offset手動提交茸苇,業(yè)務(wù)邏輯成功處理后,提交offset)
落表(主鍵或者唯一索引的方式沦寂,避免重復(fù)數(shù)據(jù))
業(yè)務(wù)邏輯處理(選擇唯一主鍵存儲到Redis或者mongdb中学密,先查詢是否存在,若存在則不處理传藏;若不存在腻暮,先插入Redis或Mongdb,再進(jìn)行業(yè)務(wù)邏輯處理)
kafka怎么保證數(shù)據(jù)消費(fèi)一次且僅消費(fèi)一次
冪等producer:保證發(fā)送單個分區(qū)的消息只會發(fā)送一次,不會出現(xiàn)重復(fù)消息
事務(wù)(transaction):保證原子性地寫入到多個分區(qū)毯侦,即寫入到多個分區(qū)的消息要么全部成功哭靖,要么全部回滾流處理EOS:流處理本質(zhì)上可看成是“讀取-處理-寫入”的管道。此EOS保證整個過程的操作是原子性侈离。注意试幽,這只適用于Kafka Streams
kafka保證數(shù)據(jù)一致性和可靠性
數(shù)據(jù)一致性保證
一致性定義:若某條消息對client可見,那么即使Leader掛了卦碾,在新Leader上數(shù)據(jù)依然可以被讀到
HW-HighWaterMark: client可以從Leader讀到的最大msg offset铺坞,即對外可見的最大offset, HW=max(replica.offset)
對于Leader新收到的msg洲胖,client不能立刻消費(fèi)济榨,Leader會等待該消息被所有ISR中的replica同步后,更新HW绿映,此時該消息才能被client消費(fèi)擒滑,這樣就保證了如果Leader fail,該消息仍然可以從新選舉的Leader中獲取叉弦。
對于來自內(nèi)部Broker的讀取請求丐一,沒有HW的限制。同時卸奉,F(xiàn)ollower也會維護(hù)一份自己的HW钝诚,F(xiàn)olloer.HW = min(Leader.HW, Follower.offset)
數(shù)據(jù)可靠性保證
當(dāng)Producer向Leader發(fā)送數(shù)據(jù)時颖御,可以通過acks參數(shù)設(shè)置數(shù)據(jù)可靠性的級別
0: 不論寫入是否成功榄棵,server不需要給Producer發(fā)送Response,如果發(fā)生異常潘拱,server會終止連接疹鳄,觸發(fā)Producer更新meta數(shù)據(jù);
1: Leader寫入成功后即發(fā)送Response芦岂,此種情況如果Leader fail瘪弓,會丟失數(shù)據(jù)
-1: 等待所有ISR接收到消息后再給Producer發(fā)送Response,這是最強(qiáng)保證
kafka到spark streaming怎么保證數(shù)據(jù)完整性禽最,怎么保證數(shù)據(jù)不重復(fù)消費(fèi)腺怯?
保證數(shù)據(jù)不丟失(at-least)
spark RDD內(nèi)部機(jī)制可以保證數(shù)據(jù)at-least語義袱饭。
Receiver方式
開啟WAL(預(yù)寫日志),將從kafka中接受到的數(shù)據(jù)寫入到日志文件中呛占,所有數(shù)據(jù)從失敗中可恢復(fù)虑乖。
Direct方式
依靠checkpoint機(jī)制來保證。
保證數(shù)據(jù)不重復(fù)(exactly-once)
要保證數(shù)據(jù)不重復(fù)晾虑,即Exactly once語義疹味。
冪等操作:重復(fù)執(zhí)行不會產(chǎn)生問題,不需要做額外的工作即可保證數(shù)據(jù)不重復(fù)帜篇。
業(yè)務(wù)代碼添加事務(wù)操作
就是說針對每個partition的數(shù)據(jù)糙捺,產(chǎn)生一個uniqueId,只有這個partition的所有數(shù)據(jù)被完全消費(fèi)笙隙,則算成功洪灯,否則算失效,要回滾逃沿。下次重復(fù)執(zhí)行這個uniqueId時婴渡,如果已經(jīng)被執(zhí)行成功,則skip掉凯亮。