一咖刃、為什么需要消息系統(tǒng)
1抖誉、解耦:允許你獨(dú)立的擴(kuò)展或修改兩邊的處理過程,只要確保它們遵守同樣的接口約束钦奋。
2座云、冗余:消息隊(duì)列把數(shù)據(jù)進(jìn)行持久化直到它們已經(jīng)被完全處理,通過這一方式規(guī)避了數(shù)據(jù)丟失風(fēng)險(xiǎn)付材。許多消息隊(duì)列所采用的"插入-獲取-刪除"范式中朦拖,在把一個(gè)消息從隊(duì)列中刪除之前,需要你的處理系統(tǒng)明確的指出該消息已經(jīng)被處理完畢厌衔,從而確保你的數(shù)據(jù)被安全的保存直到你使用完畢璧帝。
3、擴(kuò)展性:因?yàn)橄㈥?duì)列解耦了你的處理過程富寿,所以增大消息入隊(duì)和處理的頻率是很容易的睬隶,只要另外增加處理過程即可。
4页徐、靈活性 & 峰值處理能力:在訪問量劇增的情況下苏潜,應(yīng)用仍然需要繼續(xù)發(fā)揮作用,但是這樣的突發(fā)流量并不常見变勇。如果為以能處理這類峰值訪問為標(biāo)準(zhǔn)來投入資源隨時(shí)待命無疑是巨大的浪費(fèi)恤左。使用消息隊(duì)列能夠使關(guān)鍵組件頂住突發(fā)的訪問壓力,而不會(huì)因?yàn)橥话l(fā)的超負(fù)荷的請求而完全崩潰搀绣。
5飞袋、可恢復(fù)性:系統(tǒng)的一部分組件失效時(shí),不會(huì)影響到整個(gè)系統(tǒng)豌熄。消息隊(duì)列降低了進(jìn)程間的耦合度,所以即使一個(gè)處理消息的進(jìn)程掛掉物咳,加入隊(duì)列中的消息仍然可以在系統(tǒng)恢復(fù)后被處理锣险。
6蹄皱、順序保證:在大多使用場景下,數(shù)據(jù)處理的順序都很重要芯肤。大部分消息隊(duì)列本來就是排序的巷折,并且能保證數(shù)據(jù)會(huì)按照特定的順序來處理。(Kafka 保證一個(gè) Partition 內(nèi)的消息的有序性)
7崖咨、緩沖:有助于控制和優(yōu)化數(shù)據(jù)流經(jīng)過系統(tǒng)的速度锻拘,解決生產(chǎn)消息和消費(fèi)消息的處理速度不一致的情況。
8击蹲、異步通信:很多時(shí)候署拟,用戶不想也不需要立即處理消息。消息隊(duì)列提供了異步處理機(jī)制歌豺,允許用戶把一個(gè)消息放入隊(duì)列推穷,但并不立即處理它。想向隊(duì)列中放入多少消息就放多少类咧,然后在需要的時(shí)候再去處理它們馒铃。
二、消息隊(duì)列的兩種模式
- 1痕惋、點(diǎn)對點(diǎn)模式(一對一区宇,消費(fèi)者主動(dòng)拉取數(shù)據(jù),消息收到后消息清除)
消息生產(chǎn)者生產(chǎn)消息發(fā)送到Queue中值戳,然后消息消費(fèi)者從Queue中取出并且消費(fèi)消息议谷。
消息被消費(fèi)以后,queue 中不再有存儲述寡,所以消息消費(fèi)者不可能消費(fèi)到已經(jīng)被消費(fèi)的消息柿隙。
Queue 支持存在多個(gè)消費(fèi)者,但是對一個(gè)消息而言鲫凶,只會(huì)有一個(gè)消費(fèi)者可以消費(fèi)禀崖。
- 2、發(fā)布/訂閱模式(一對多螟炫,消費(fèi)者消費(fèi)數(shù)據(jù)之后不會(huì)清除消息)
消息生產(chǎn)者(發(fā)布)將消息發(fā)布到 topic 中波附,同時(shí)有多個(gè)消息消費(fèi)者(訂閱)消費(fèi)該消
息。和點(diǎn)對點(diǎn)方式不同昼钻,發(fā)布到 topic 的消息會(huì)被所有訂閱者消費(fèi)掸屡。
發(fā)布者發(fā)送到topic的消息,只有訂閱了topic的訂閱者才會(huì)收到消息然评。
三仅财、Kafka 基礎(chǔ)架構(gòu)
3.1、Kafka拓?fù)浣Y(jié)構(gòu)
3.2碗淌、上圖中kafka相關(guān)術(shù)語
1盏求、Producer:消息生產(chǎn)者抖锥,發(fā)布消息到 kafka 集群的終端或服務(wù)。
2碎罚、Broker:代理磅废,kafka集群包含的服務(wù)端(每個(gè)服務(wù)端稱之為代理),負(fù)責(zé)處理消息讀寫請求及存儲消息荆烈。
3拯勉、Topic:每條發(fā)布到 kafka 集群的消息屬于的類別,即 kafka 是面向 topic 的憔购。
4宫峦、Partition:partition 是物理上的概念,每個(gè) topic 包含一個(gè)或多個(gè) partition倦始。kafka 分配的單位是 partition斗遏。
5、Consumer:從 kafka 集群中消費(fèi)消息的終端或服務(wù)鞋邑。
6诵次、Consumer group:high-level consumer API 中,每個(gè) consumer 都屬于一個(gè) consumer group枚碗,每條消息只能被 consumer group 中的一個(gè) Consumer 消費(fèi)逾一,但可以被多個(gè) consumer group 消費(fèi)。
7肮雨、Replica:partition 的副本遵堵,保障 partition 的高可用。
8怨规、Leader:replica 中的一個(gè)角色陌宿, producer 和 consumer 只跟 leader 交互。
9波丰、Follower:replica 中的一個(gè)角色壳坪,從 leader 中復(fù)制數(shù)據(jù)。
10掰烟、Controller:kafka 集群中的其中一個(gè)服務(wù)器爽蝴,用來進(jìn)行 leader election 以及 各種 failover。
Kafka集群中的其中一個(gè)Broker會(huì)被選舉為Controller纫骑,主要負(fù)責(zé)Partition管理和副本狀態(tài)管理蝎亚,也會(huì)執(zhí)行類似于重分配Partition之類的管理任務(wù)。如果當(dāng)前的Controller失敗先馆,會(huì)從其他正常的Broker中重新選舉Controller(leader election選舉算法)发框。11、Zookeeper:一個(gè)分布式應(yīng)用程序協(xié)調(diào)服務(wù)煤墙,kafka 通過 zookeeper 來存儲集群的 meta 信息梅惯。
3.3顾患、zookeeper 節(jié)點(diǎn)
kafka 在 zookeeper 中的存儲結(jié)構(gòu)如下圖所示:
四、Kafka 工作流程與文件存儲機(jī)制
4.1个唧、工作流程
Kafka 中消息是以 topic 進(jìn)行分類的,生產(chǎn)者生產(chǎn)消息设预,消費(fèi)者消費(fèi)消息徙歼,都是面向 topic 的。
topic 是邏輯上的概念鳖枕,而 partition 是物理上的概念魄梯,每個(gè) partition 對應(yīng)于一個(gè) log 文件,該 log 文件中存儲的就是 producer 生產(chǎn)的數(shù)據(jù)宾符。
Producer 生產(chǎn)的數(shù)據(jù)會(huì)被不斷追加到該 log 文件末端酿秸,且每條數(shù)據(jù)都有自己的 offset。消費(fèi)者組中的每個(gè)消費(fèi)者魏烫,都會(huì)實(shí)時(shí)記錄自己消費(fèi)到了哪個(gè) offset辣苏,以便出錯(cuò)恢復(fù)時(shí),從上次的位置繼續(xù)消費(fèi)哄褒。
4.2稀蟋、文件存儲機(jī)制
由于生產(chǎn)者生產(chǎn)的消息會(huì)不斷追加到 log 文件末尾,為防止 log 文件過大導(dǎo)致數(shù)據(jù)定位效率低下呐赡,Kafka 采取了分片和索引機(jī)制退客,將每個(gè) partition 分為多個(gè) segment。
每個(gè) segment 對應(yīng)兩個(gè)文件:“.index”文件和“.log”文件链嘀。
這些文件位于一個(gè)文件夾下萌狂,該文件夾的命名規(guī)則為:topic 名稱 + 分區(qū)序號。
例如怀泊,first 這個(gè) topic 有三個(gè)分區(qū)茫藏,則其對應(yīng)的文件夾為 first0、first-1包个、first-2刷允。
00000000000000000000.index
00000000000000000000.log
00000000000000170410.index
00000000000000170410.log
00000000000000239430.index
00000000000000239430.log
index 和 log 文件以當(dāng)前 segment 的第一條消息的 offset 命名。下圖為 index 文件和 log 文件的結(jié)構(gòu)示意圖碧囊。
“.index”文件存儲大量的索引信息树灶,“.log”文件存儲大量的數(shù)據(jù),索引文件中的元數(shù)據(jù)指向?qū)?yīng)數(shù)據(jù)文件中 message 的物理偏移地址糯而。
四天通、Zookeeper 在 Kafka 中的作用
Kafka 集群中有一個(gè) broker 會(huì)被選舉為 Controller,負(fù)責(zé)管理集群 broker 的上下線熄驼,所
有 topic 的分區(qū)副本分配和 leader 選舉等工作像寒。
Controller 的管理工作都是依賴于 Zookeeper 的烘豹。
以下為 partition 的 leader 選舉過程:
五、kafka HA
5.1诺祸、replication
如Kafka拓?fù)浣Y(jié)構(gòu)圖所示携悯,同一個(gè) partition 可能會(huì)有多個(gè) replica(對應(yīng) server.properties 配置中的 default.replication.factor=N)。沒有 replica 的情況下筷笨,一旦 broker 宕機(jī)憔鬼,其上所有 patition 的數(shù)據(jù)都不可被消費(fèi),同時(shí) producer 也不能再將數(shù)據(jù)存于其上的 patition胃夏。引入replication 之后轴或,同一個(gè) partition 可能會(huì)有多個(gè) replica,而這時(shí)需要在這些 replica 之間選出一個(gè) leader仰禀,producer 和 consumer 只與這個(gè) leader 交互照雁,其它 replica 作為 follower 從 leader 中復(fù)制數(shù)據(jù)。
Kafka 分配 Replica 的算法如下:
- 1答恶、將所有 broker(假設(shè)共 n 個(gè) broker)和待分配的 partition 排序饺蚊。
- 2、將第 i 個(gè) partition 分配到第(i mod n)個(gè) broker 上悬嗓。
- 3卸勺、將第 i 個(gè) partition 的第 j 個(gè) replica 分配到第((i + j) mode n)個(gè) broker上。
5.2烫扼、leader failover
當(dāng) partition 對應(yīng)的 leader 宕機(jī)時(shí)曙求,需要從 follower 中選舉出新 leader。在選舉新leader時(shí)映企,一個(gè)基本的原則是悟狱,新的 leader 必須擁有舊 leader commit 過的所有消息。
kafka 在 zookeeper 中(/brokers/.../state)動(dòng)態(tài)維護(hù)了一個(gè) ISR(in-sync replicas)堰氓,由3.3節(jié)的寫入流程可知 ISR 里面的所有 replica 都跟上了 leader挤渐,只有 ISR 里面的成員才能選為 leader。對于 f+1 個(gè) replica双絮,一個(gè) partition 可以在容忍 f 個(gè) replica 失效的情況下保證消息不丟失浴麻。
當(dāng)所有 replica 都不工作時(shí),有兩種可行的方案:
- 1囤攀、等待 ISR 中的任一個(gè) replica 活過來软免,并選它作為 leader》倌樱可保障數(shù)據(jù)不丟失膏萧,但時(shí)間可能相對較長。
- 2、選擇第一個(gè)活過來的 replica(不一定是 ISR 成員)作為 leader榛泛。無法保障數(shù)據(jù)不丟失蝌蹂,但相對不可用時(shí)間較短。
kafka 0.8.* 使用第二種方式曹锨。
5.3孤个、broker failover
kafka broker failover 序列圖如下所示:
kafka 通過 Controller 來選舉 leader,流程說明:
- 1沛简、controller 在 zookeeper 的 /brokers/ids/[brokerId] 節(jié)點(diǎn)注冊 Watcher硼身,當(dāng) broker 宕機(jī)時(shí) zookeeper 會(huì) fire watch。
- 2覆享、controller 從 /brokers/ids 節(jié)點(diǎn)讀取可用broker。
- 3营袜、controller決定set_p撒顿,該集合包含宕機(jī) broker 上的所有 partition。
- 4荚板、對 set_p 中的每一個(gè) partition
- 4.1凤壁、從/brokers/topics/[topic]/partitions/[partition]/state 節(jié)點(diǎn)讀取 ISR。
- 4.2跪另、決定新 leader(如4.3節(jié)所描述)拧抖。
- 4.3、將新 leader免绿、ISR唧席、controller_epoch 和 leader_epoch 等信息寫入 state 節(jié)點(diǎn)。
- 5嘲驾、通過 RPC 向相關(guān) broker 發(fā)送 leaderAndISRRequest 命令淌哟。
5.4、controller failover
當(dāng) controller 宕機(jī)時(shí)會(huì)觸發(fā) controller failover辽故。每個(gè) broker 都會(huì)在 zookeeper 的 "/controller" 節(jié)點(diǎn)注冊 watcher徒仓,當(dāng) controller 宕機(jī)時(shí) zookeeper 中的臨時(shí)節(jié)點(diǎn)消失,所有存活的 broker 收到 fire 的通知誊垢,每個(gè) broker 都嘗試創(chuàng)建新的 controller path掉弛,只有一個(gè)競選成功并當(dāng)選為 controller。
當(dāng)新的 controller 當(dāng)選時(shí)喂走,會(huì)觸發(fā) KafkaController.onControllerFailover 方法殃饿,在該方法中完成如下操作:
- 1、讀取并增加 Controller Epoch芋肠。
- 2壁晒、在 reassignedPartitions Patch(/admin/reassign_partitions) 上注冊 watcher。
- 3、在 preferredReplicaElection Path(/admin/preferred_replica_election) 上注冊 watcher秒咐。
- 4谬晕、通過 partitionStateMachine 在 broker Topics Patch(/brokers/topics) 上注冊 watcher。
- 5携取、若 delete.topic.enable=true(默認(rèn)值是 false)攒钳,則 partitionStateMachine 在 Delete Topic Patch(/admin/delete_topics) 上注冊 watcher。
- 6雷滋、通過 replicaStateMachine在 Broker Ids Patch(/brokers/ids)上注冊Watch不撑。
- 7、初始化 ControllerContext 對象晤斩,設(shè)置當(dāng)前所有 topic焕檬,“活”著的 broker 列表,所有 partition 的 leader 及 ISR等澳泵。
- 8实愚、啟動(dòng) replicaStateMachine 和 partitionStateMachine。
- 9兔辅、將 brokerState 狀態(tài)設(shè)置為 RunningAsController腊敲。
- 10、將每個(gè) partition 的 Leadership 信息發(fā)送給所有“活”著的 broker维苔。
- 11碰辅、若 auto.leader.rebalance.enable=true(默認(rèn)值是true),則啟動(dòng) partition-rebalance 線程介时。
- 12没宾、若 delete.topic.enable=true 且Delete Topic Patch(/admin/delete_topics)中有值,則刪除相應(yīng)的Topic沸柔。
參考:
https://www.cnblogs.com/cyfonly/p/5954614.html
https://www.cnblogs.com/xifenglou/p/7251112.html