圖示
生產(chǎn)消費(fèi)過(guò)程
Partition Leader 和 Partition Follower之間的關(guān)系
Partition中消息的追加方式
0. 基礎(chǔ)知識(shí)
0.1 名詞解釋
Cluster(集群)?
Broker(代理) , 每一個(gè)Server叫一個(gè)Broker
Producer(生產(chǎn)者)?
Consumer(消費(fèi)者)?
Topic(邏輯分區(qū)), 每一類消息會(huì)落入不同的topic類中
Partition(物理分區(qū))?
Segment(段),最小分區(qū), 含有index和log文件
Zookeeper(分布式應(yīng)用程序協(xié)調(diào)服務(wù))
Lag(消費(fèi)滯后)?
Rebalance(平衡)?
Append(追加)?
Offset(偏移量)?
Coodinator 協(xié)調(diào)者
HW(high watermark),這是一個(gè)分區(qū)的最后一條消息的offset
ACK(Acknowledgement)接收站發(fā)給發(fā)送站的一種傳輸類控制字符。表示發(fā)來(lái)的數(shù)據(jù)已確認(rèn)接收無(wú)誤
SYN? 同步序列編號(hào)(Synchronize Sequence Numbers)犹菇。是TCP/IP建立連接時(shí)使用的握手信號(hào)
ISR同步副本 (in-sync replicas)揭芍。ISR的副本保持和leader的同步卸例,當(dāng)然leader本身也在ISR中
0.2 消息隊(duì)列
消息隊(duì)列中間件是分布式系統(tǒng)中重要的組件筷转,主要解決應(yīng)用耦合呜舒、異步消息、流量削鋒等問(wèn)題唤殴。實(shí)現(xiàn)高性能朵逝、高可用配名、可伸縮和最終一致性架構(gòu)。是大型分布式系統(tǒng)不可缺少的中間件宇整。?
目前在生產(chǎn)環(huán)境没陡,使用較多的消息隊(duì)列有ActiveMQ索赏、RabbitMQ潜腻、ZeroMQ融涣、Kafka精钮、MetaMQ轨香、RocketMQ等。?
0.3 Kafka存活的條件
1.必須維護(hù)與Zookeeper的session(這個(gè)通過(guò)Zookeeper的Heartbeat機(jī)制來(lái)實(shí)現(xiàn))
2.Follower必須能夠及時(shí)將Leader的消息復(fù)制過(guò)來(lái)科雳,不能“落后太多”
Leader會(huì)跟蹤與其保持同步的Replica列表糟秘,該列表稱為ISR(即in-sync Replica)
//replica.lag.max.messages配置尿赚,其默認(rèn)值是4000? 允許落后的最大值
//replica.lag.time.max.ms來(lái)配置凌净,其默認(rèn)值是10000? 允許的最大超時(shí)時(shí)間
1. Broker
1.1 Leader選舉
Zookeeper 主要用來(lái)跟蹤Kafka 集群中的節(jié)點(diǎn)狀態(tài), 以及Kafka Topic, message 等等其他信息. 同時(shí), Kafka 依賴于Zookeeper, 沒(méi)有Zookeeper 是不能運(yùn)行起來(lái)Kafka 的.?
Kafka 通過(guò) Zookeeper 管理集群配置泻蚊,選舉 leader性雄,以及在 consumer group 發(fā)生變化時(shí)進(jìn)行 rebalance
1.Controller 選舉:?
Controller 是一個(gè)特殊的Broker, 其負(fù)責(zé)所有Partition 的leader/follower 關(guān)系.?
記錄集群中都有哪些活躍著的Broker.?
Zookeeper 負(fù)責(zé)從Broker 中選舉出一個(gè)作為Controller, 并確保其唯一性. 同時(shí), 當(dāng)Controller 宕機(jī)時(shí), 選舉一個(gè)新的.?
當(dāng)broker啟動(dòng)的時(shí)候秒旋,都會(huì)創(chuàng)建KafkaController對(duì)象
這些每個(gè)節(jié)點(diǎn)上的KafkaController會(huì)在指定的zookeeper路徑下創(chuàng)建臨時(shí)節(jié)點(diǎn)迁筛,只有第一個(gè)成功創(chuàng)建的節(jié)點(diǎn)的KafkaController才可以成為leader细卧,其余的都是follower
Broker1=>被選舉為Controller Leader, 這個(gè)Broker1 負(fù)責(zé)管理整個(gè)集群中分區(qū)和副本的狀態(tài)
Broker2 => 其余每個(gè)單獨(dú)的server叫做Replica Manager
Broker3
2.Topic 配置:?
有哪些Topic, Topic 都有哪些Partition, Replica 存放在哪里, Leader 是誰(shuí).?
1.2 Topic
Partition
文件存儲(chǔ)
1.會(huì)根據(jù) partition 規(guī)則選擇被存儲(chǔ)到哪一個(gè) partition蜘犁。?
2.順序追加到log中?
3.每條消息在log文件中的位置成為offset(偏移量)这橙,offset為一個(gè)long型數(shù)字导披,唯一標(biāo)記一條消息,?
文件復(fù)制和同步
1.Producer只將該消息發(fā)送到該P(yáng)artition的Leader , Consumer讀消息也是從Leader讀取
2. partition有n個(gè)副本...?
N 個(gè) replicas 中撩匕,其中一個(gè) replica 為 leader止毕,其他都為 follower?
leader 處理 partition 的所有讀寫(xiě)請(qǐng)求滓技,與此同時(shí),follower 會(huì)被動(dòng)定期地去復(fù)制 leader 上的數(shù)據(jù)
副本個(gè)數(shù)N = leader + follower1 + follower2....?
leader掛了, 會(huì)從follower中 重新選舉一個(gè)leader?
3.Leader =>將消息 =>寫(xiě)入本地Log
4.Follower => pull 消息=> Leader ,? 寫(xiě)入其Log后膝昆,向Leader發(fā)送ACK
//為了提高性能,每個(gè)Follower在接收到數(shù)據(jù)后就立馬向Leader發(fā)送ACK纬朝,而非等到數(shù)據(jù)寫(xiě)入Log中
因此收叶,對(duì)于已經(jīng)commit的消息判没,Kafka只能保證它被存于多個(gè)Replica的內(nèi)存中,而不能保證它們被持久化到磁盤(pán)中
也就不能完全保證異常發(fā)生后該條消息一定能被Consumer消費(fèi)
5.一旦Leader收到了ISR中的所有Replica的ACK嫉沽,該消息就被認(rèn)為已經(jīng)commit了,Leader將增加HW并且向Producer發(fā)送ACK
2. Consumer
2.1 位移管理
1.kafka會(huì)定期把group消費(fèi)情況保存起來(lái)玻佩,做成一個(gè)offset map
2.將offset信息寫(xiě)入 __consumer_offsets topic, 這個(gè)topic 保存了每個(gè)consumer group某一時(shí)刻提交的offset信息
2.2 Rebalance的概念
1.rebalance本質(zhì)上是一種協(xié)議,規(guī)定了一個(gè)consumer group下的所有consumer如何達(dá)成一致來(lái)分配訂閱topic的每個(gè)分區(qū)酣胀。
比如某個(gè)group下有20個(gè)consumer,它訂閱了一個(gè)具有100個(gè)分區(qū)的topic娶聘。正常情況下闻镶,Kafka平均會(huì)為每個(gè)consumer分配5個(gè)分區(qū)。這個(gè)分配的過(guò)程就叫rebalance丸升。
2.3 Rebalance的時(shí)機(jī)
組成員發(fā)生變更(新consumer加入組铆农、已有consumer主動(dòng)離開(kāi)組或已有consumer崩潰了——這兩者的區(qū)別后面會(huì)談到)
訂閱主題數(shù)發(fā)生變更——這當(dāng)然是可能的,如果你使用了正則表達(dá)式的方式進(jìn)行訂閱狡耻,那么新建匹配正則表達(dá)式的topic就會(huì)觸發(fā)rebalance
訂閱主題的分區(qū)數(shù)發(fā)生變更
2.4 Rebalance的過(guò)程
1.JoinGroup
consumer1 => 發(fā)送訂閱信息,并請(qǐng)求入組 => coordinator => 選擇consumer1作為leader, 并發(fā)送給c1成員信息和訂閱信息, 讓其負(fù)責(zé) "消費(fèi)分配方案的制定"
consumer2 => 發(fā)送訂閱信息,并請(qǐng)求入組 => coordinator => 告訴c2,你們組leader是c1
2.SyncGroup
consumer1 => 發(fā)送分配消費(fèi)的方案 => coordinator => 下發(fā)方案給c1
consumer2 => 發(fā)送分配消費(fèi)的方案(非leader 內(nèi)容為空) => coordinator => 下發(fā)方案給c1
2.5 誰(shuí)來(lái)執(zhí)行rebalance和consumer group管理
1.Coordinator來(lái)執(zhí)行對(duì)于consumer group的管理墩剖。
2.先確定consumer group位移信息寫(xiě)入__consumers_offsets的哪個(gè)分區(qū), 則該分區(qū)leader 所在的broker 就是被選定的coordinator
3. Zookeeper
3.1 組件
KafkaController: 負(fù)責(zé)管理整個(gè)集群中分區(qū)和副本的狀態(tài)
ReplicaManager:負(fù)責(zé)管理當(dāng)前broker所有分區(qū)和副本的信息,會(huì)處理KafkaController發(fā)起的一些請(qǐng)求夷狰,副本狀態(tài)的切換岭皂,添加/讀取消息等
ZookeeperLeaderElector: 主要用于KafkController Leader選舉
ControllerContext: 維護(hù)了controller需要用到的上下文沼头,同時(shí)也緩存一些zookeeper數(shù)據(jù),包括可用的broker,全部的topic,分區(qū)和副本信息
ControllerChannelManager: 維護(hù)Controller Leader與集群中其他broker之間連接解幽,是管理這個(gè)集群的基礎(chǔ)
TopicDeletionManager: 用于刪除指定的topic
PartitionStateMachine: 用于管理集群所有partition狀態(tài)的狀態(tài)機(jī)
ReplicaStateMachine: 用于管理集群中所有副本狀態(tài)的狀態(tài)機(jī)
ControllerBrokerRequestBatch: 實(shí)現(xiàn)了向broker批量發(fā)送請(qǐng)求的功能
PartitionLeaderSelector:選舉leader副本的選舉策略
IzkChildListener:是zookeeper上的監(jiān)聽(tīng)器徘溢,實(shí)現(xiàn)了對(duì)zookeeper上某些節(jié)點(diǎn)數(shù)據(jù),子節(jié)點(diǎn)或者session狀態(tài)的監(jiān)聽(tīng)助被,被處罰后調(diào)用對(duì)應(yīng)的業(yè)務(wù)邏輯
? ? ? ? ?
?? ?
?? ?
?? ?
??
? ? ? ? ? ? ? ? ? ? ? ? ? ?
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?
li.li1 {margin: 0.0px 0.0px 0.0px 0.0px; font: 12.0px Helvetica}ul.ul1 {list-style-type: disc}
li.li1 {margin: 0.0px 0.0px 0.0px 0.0px; font: 12.0px Helvetica}ul.ul1 {list-style-type: disc}
li.li1 {margin: 0.0px 0.0px 0.0px 0.0px; font: 12.0px Helvetica}ul.ul1 {list-style-type: disc}
li.li1 {margin: 0.0px 0.0px 0.0px 0.0px; font: 12.0px Helvetica}ul.ul1 {list-style-type: disc}
li.li1 {margin: 0.0px 0.0px 0.0px 0.0px; font: 12.0px Helvetica}ul.ul1 {list-style-type: disc}