一睦优、為什么需要消息系統(tǒng)
1.解耦:
允許你獨(dú)立的擴(kuò)展或修改兩邊的處理過程,只要確保它們遵守同樣的接口約束。
2.冗余:
消息隊(duì)列把數(shù)據(jù)進(jìn)行持久化直到它們已經(jīng)被完全處理衷蜓,通過這一方式規(guī)避了數(shù)據(jù)丟失風(fēng)險(xiǎn)。許多消息隊(duì)列所采用的"插入-獲取-刪除"范式中尘喝,在把一個(gè)消息從隊(duì)列中刪除之前磁浇,需要你的處理系統(tǒng)明確的指出該消息已經(jīng)被處理完畢,從而確保你的數(shù)據(jù)被安全的保存直到你使用完畢朽褪。
3.擴(kuò)展性:
因?yàn)橄㈥?duì)列解耦了你的處理過程置吓,所以增大消息入隊(duì)和處理的頻率是很容易的,只要另外增加處理過程即可缔赠。
4.靈活性 & 峰值處理能力:
在訪問量劇增的情況下衍锚,應(yīng)用仍然需要繼續(xù)發(fā)揮作用,但是這樣的突發(fā)流量并不常見嗤堰。如果為以能處理這類峰值訪問為標(biāo)準(zhǔn)來投入資源隨時(shí)待命無疑是巨大的浪費(fèi)戴质。使用消息隊(duì)列能夠使關(guān)鍵組件頂住突發(fā)的訪問壓力,而不會(huì)因?yàn)橥话l(fā)的超負(fù)荷的請(qǐng)求而完全崩潰踢匣。
5.可恢復(fù)性:
系統(tǒng)的一部分組件失效時(shí)告匠,不會(huì)影響到整個(gè)系統(tǒng)。消息隊(duì)列降低了進(jìn)程間的耦合度离唬,所以即使一個(gè)處理消息的進(jìn)程掛掉凫海,加入隊(duì)列中的消息仍然可以在系統(tǒng)恢復(fù)后被處理。
6.順序保證:
在大多使用場(chǎng)景下男娄,數(shù)據(jù)處理的順序都很重要行贪。大部分消息隊(duì)列本來就是排序的,并且能保證數(shù)據(jù)會(huì)按照特定的順序來處理模闲。(Kafka 保證一個(gè) Partition 內(nèi)的消息的有序性)
7.緩沖:
有助于控制和優(yōu)化數(shù)據(jù)流經(jīng)過系統(tǒng)的速度建瘫,解決生產(chǎn)消息和消費(fèi)消息的處理速度不一致的情況。
8.異步通信:
很多時(shí)候尸折,用戶不想也不需要立即處理消息啰脚。消息隊(duì)列提供了異步處理機(jī)制,允許用戶把一個(gè)消息放入隊(duì)列,但并不立即處理它橄浓。想向隊(duì)列中放入多少消息就放多少粒梦,然后在需要的時(shí)候再去處理它們。
二荸实、kafka 架構(gòu)
2.1 拓?fù)浣Y(jié)構(gòu)
如下圖:
2.2 相關(guān)概念
如圖.1中匀们,kafka 相關(guān)名詞解釋如下:
1.producer:
消息生產(chǎn)者,發(fā)布消息到 kafka 集群的終端或服務(wù)准给。
2.broker:
kafka 集群中包含的服務(wù)器泄朴。
3.topic:
每條發(fā)布到 kafka 集群的消息屬于的類別,即 kafka 是面向 topic 的露氮。
4.partition:
partition 是物理上的概念祖灰,每個(gè) topic 包含一個(gè)或多個(gè) partition。kafka 分配的單位是 partition畔规。
5.consumer:
從 kafka 集群中消費(fèi)消息的終端或服務(wù)局扶。
6.Consumer group:
high-level consumer API 中,每個(gè) consumer 都屬于一個(gè) consumer group叁扫,每條消息只能被 consumer group 中的一個(gè) Consumer 消費(fèi)详民,但可以被多個(gè) consumer group 消費(fèi)。
7.replica:
partition 的副本陌兑,保障 partition 的高可用沈跨。
8.leader:
replica 中的一個(gè)角色, producer 和 consumer 只跟 leader 交互兔综。
9.follower:
replica 中的一個(gè)角色饿凛,從 leader 中復(fù)制數(shù)據(jù)。
10.controller:
kafka 集群中的其中一個(gè)服務(wù)器软驰,用來進(jìn)行 leader election 以及 各種 failover涧窒。
12.zookeeper:
kafka 通過 zookeeper 來存儲(chǔ)集群的 meta 信息。
2.3 zookeeper 節(jié)點(diǎn)
kafka 在 zookeeper 中的存儲(chǔ)結(jié)構(gòu)如下圖所示:
2.4 kafka controller
Kakfa Broker Leader的選舉:Kakfa Broker集群受Zookeeper管理锭亏。所有的Kafka Broker節(jié)點(diǎn)一起去Zookeeper上注冊(cè)一個(gè)臨時(shí)節(jié)點(diǎn)纠吴,因?yàn)橹挥幸粋€(gè)Kafka
Broker會(huì)注冊(cè)成功,其他的都會(huì)失敗慧瘤,所以這個(gè)成功在Zookeeper上注冊(cè)臨時(shí)節(jié)點(diǎn)的這個(gè)Kafka Broker會(huì)成為Kafka Broker Controller戴已,其他的Kafka broker叫Kafka Broker follower。(這個(gè)過程叫Controller在ZooKeeper注冊(cè)Watch)锅减。這個(gè)Controller會(huì)監(jiān)聽其他的Kafka Broker的所有信息糖儡,如果這個(gè)kafka broker controller宕機(jī)了,在zookeeper上面的那個(gè)臨時(shí)節(jié)點(diǎn)就會(huì)消失怔匣,此時(shí)所有的kafka
broker又會(huì)一起去Zookeeper上注冊(cè)一個(gè)臨時(shí)節(jié)點(diǎn)握联,因?yàn)橹挥幸粋€(gè)Kafka Broker會(huì)注冊(cè)成功,其他的都會(huì)失敗,所以這個(gè)成功在Zookeeper上注冊(cè)臨時(shí)節(jié)點(diǎn)的這個(gè)Kafka Broker會(huì)成為Kafka Broker Controller金闽,其他的Kafka broker叫Kafka Broker follower纯露。
三、producer 發(fā)布消息
3.1 寫入方式
producer 采用 push 模式將消息發(fā)布到 broker代芜,每條消息都被 append 到 patition 中埠褪,屬于順序?qū)懘疟P(順序?qū)懘疟P效率比隨機(jī)寫內(nèi)存要高,保障 kafka 吞吐率)蜒犯。
3.2 消息路由
producer 發(fā)送消息到 broker 時(shí),會(huì)根據(jù)分區(qū)算法選擇將其存儲(chǔ)到哪一個(gè) partition荞膘。其路由機(jī)制為:
1\. 指定了 patition罚随,則直接使用;
2\. 未指定 patition 但指定 key羽资,通過對(duì) key 的 value 進(jìn)行hash 選出一個(gè) patition
3\. patition 和 key 都未指定淘菩,使用輪詢選出一個(gè) patition。
3.3 寫入流程
producer 寫入消息序列圖如下所示:
1\. producer 先從 zookeeper 的 "/brokers/.../state" 節(jié)點(diǎn)找到該 partition 的 leader
2\. producer 將消息發(fā)送給該 leader
3\. leader 將消息寫入本地 log
4\. followers 從 leader pull 消息屠升,寫入本地 log 后 leader 發(fā)送 ACK
5\. leader 收到所有 ISR 中的 replica 的 ACK 后潮改,增加 HW(high watermark,最后 commit 的 offset) 并向 producer 發(fā)送 ACK
3.4 producer delivery guarantee
一般情況下存在三種情況:
1\. At most once 消息可能會(huì)丟腹暖,但絕不會(huì)重復(fù)傳輸
2\. At least one 消息絕不會(huì)丟汇在,但可能會(huì)重復(fù)傳輸
3\. Exactly once 每條消息肯定會(huì)被傳輸一次且僅傳輸一次
當(dāng) producer 向 broker 發(fā)送消息時(shí),一旦這條消息被 commit脏答,由于 replication 的存在糕殉,它就不會(huì)丟。但是如果 producer 發(fā)送數(shù)據(jù)給 broker 后殖告,遇到網(wǎng)絡(luò)問題而造成通信中斷阿蝶,那 Producer 就無法判斷該條消息是否已經(jīng) commit。雖然 Kafka 無法確定網(wǎng)絡(luò)故障期間發(fā)生了什么黄绩,但是 producer 可以生成一種類似于主鍵的東西羡洁,發(fā)生故障時(shí)冪等性的重試多次,這樣就做到了 Exactly once爽丹,但目前還并未實(shí)現(xiàn)筑煮。所以目前默認(rèn)情況下一條消息從 producer 到 broker 是確保了 At least once,可通過設(shè)置 producer 異步發(fā)送實(shí)現(xiàn)At most once粤蝎。
四咆瘟、broker 保存消息
4.1 存儲(chǔ)方式
物理上把 topic 分成一個(gè)或多個(gè) patition(對(duì)應(yīng) server.properties 中的 num.partitions=3 配置),每個(gè) patition 物理上對(duì)應(yīng)一個(gè)文件夾(該文件夾存儲(chǔ)該 patition 的所有消息和索引文件)诽里,如下:
4.2 存儲(chǔ)策略
無論消息是否被消費(fèi)袒餐,kafka 都會(huì)保留所有消息。有兩種策略可以刪除舊數(shù)據(jù):
- 基于時(shí)間:log.retention.hours=168
- 基于大小:log.retention.bytes=1073741824
需要注意的是灸眼,因?yàn)镵afka讀取特定消息的時(shí)間復(fù)雜度為O(1)卧檐,即與文件大小無關(guān),所以這里刪除過期文件與提高 Kafka 性能無關(guān)焰宣。
4.3 topic 創(chuàng)建與刪除
4.3.1 創(chuàng)建 topic
創(chuàng)建 topic 的序列圖如下所示:
controller 在 ZooKeeper 的 /brokers/topics 節(jié)點(diǎn)上注冊(cè) watcher霉囚,當(dāng) topic 被創(chuàng)建,則 controller 會(huì)通過 watch 得到該 topic 的 partition/replica 分配匕积。
controller從 /brokers/ids 讀取當(dāng)前所有可用的 broker 列表盈罐,對(duì)于 set_p 中的每一個(gè) partition:
2.1 從分配給該 partition 的所有 replica(稱為AR)中任選一個(gè)可用的 broker 作為新的 leader,并將AR設(shè)置為新的 ISR
2.2 將新的 leader 和 ISR 寫入 /brokers/topics/[topic]/partitions/[partition]/statecontroller 通過 RPC 向相關(guān)的 broker 發(fā)送 LeaderAndISRRequest闪唆。
4.3.2 刪除 topic
刪除 topic 的序列圖如下所示:
流程說明:
controller 在 zooKeeper 的 /brokers/topics 節(jié)點(diǎn)上注冊(cè) watcher盅粪,當(dāng) topic 被刪除,則 controller 會(huì)通過 watch 得到該 topic 的 partition/replica 分配悄蕾。
若 delete.topic.enable=false票顾,結(jié)束;否則 controller 注冊(cè)在 /admin/delete_topics 上的 watch 被 fire帆调,controller 通過回調(diào)向?qū)?yīng)的 broker 發(fā)送 StopReplicaRequest奠骄。
五、kafka 高可用機(jī)制
5.1 replication
如圖.1所示番刊,同一個(gè) partition 可能會(huì)有多個(gè) replica(對(duì)應(yīng) server.properties 配置中的 default.replication.factor=N)含鳞。沒有 replica 的情況下,一旦 broker 宕機(jī)芹务,其上所有 patition 的數(shù)據(jù)都不可被消費(fèi)民晒,同時(shí) producer 也不能再將數(shù)據(jù)存于其上的 patition。引入replication 之后锄禽,同一個(gè) partition 可能會(huì)有多個(gè) replica潜必,而這時(shí)需要在這些 replica 之間選出一個(gè) leader,producer 和 consumer 只與這個(gè) leader 交互沃但,其它 replica 作為 follower 從 leader 中復(fù)制數(shù)據(jù)磁滚。
Kafka 分配 Replica 的算法如下:
- 將所有 broker(假設(shè)共 n 個(gè) broker)和待分配的 partition 排序
- 將第 i 個(gè) partition 分配到第(i mod n)個(gè) broker 上
- 將第 i 個(gè) partition 的第 j 個(gè) replica 分配到第((i + j) mode n)個(gè) broker上
5.2 leader failover
當(dāng) partition 對(duì)應(yīng)的 leader 宕機(jī)時(shí),需要從 follower 中選舉出新 leader宵晚。在選舉新leader時(shí)垂攘,一個(gè)基本的原則是,新的 leader 必須擁有舊 leader commit 過的所有消息淤刃。
kafka 在 zookeeper 中(/brokers/.../state)動(dòng)態(tài)維護(hù)了一個(gè) ISR(in-sync replicas)晒他,由3.3節(jié)的寫入流程可知 ISR 里面的所有 replica 都跟上了 leader,只有 ISR 里面的成員才能選為 leader逸贾。對(duì)于 f+1 個(gè) replica陨仅,一個(gè) partition 可以在容忍 f 個(gè) replica 失效的情況下保證消息不丟失津滞。
當(dāng)所有 replica 都不工作時(shí),有兩種可行的方案:
- 等待 ISR 中的任一個(gè) replica 活過來灼伤,并選它作為 leader触徐。可保障數(shù)據(jù)不丟失狐赡,但時(shí)間可能相對(duì)較長(zhǎng)撞鹉。
- 選擇第一個(gè)活過來的 replica(不一定是 ISR 成員)作為 leader。無法保障數(shù)據(jù)不丟失颖侄,但相對(duì)不可用時(shí)間較短鸟雏。
kafka 0.8.* 使用第二種方式。
kafka 通過 Controller 來選舉 leader览祖,流程請(qǐng)參考5.3節(jié)孝鹊。
5.3 broker failover
kafka broker failover 序列圖如下所示:
- controller 在 zookeeper 的 /brokers/ids/[brokerId] 節(jié)點(diǎn)注冊(cè) Watcher,當(dāng) broker 宕機(jī)時(shí) zookeeper 會(huì) fire watch
- controller 從 /brokers/ids 節(jié)點(diǎn)讀取可用broker
- controller決定set_p穴墅,該集合包含宕機(jī) broker 上的所有 partition
- 對(duì) set_p 中的每一個(gè) partition
4.1 從/brokers/topics/[topic]/partitions/[partition]/state 節(jié)點(diǎn)讀取 ISR
4.2 決定新 leader(如4.3節(jié)所描述)
4.3 將新 leader惶室、ISR温自、controller_epoch 和 leader_epoch 等信息寫入 state 節(jié)點(diǎn) - 通過 RPC 向相關(guān) broker 發(fā)送 leaderAndISRRequest 命令
6. consumer 消費(fèi)消息
6.1 consumer API
kafka 提供了兩套 consumer API:
- The high-level Consumer API
- The SimpleConsumer API
其中 high-level consumer API 提供了一個(gè)從 kafka 消費(fèi)數(shù)據(jù)的高層抽象玄货,而 SimpleConsumer API 則需要開發(fā)人員更多地關(guān)注細(xì)節(jié)。
以下針對(duì) high-level Consumer API 進(jìn)行說明悼泌。
6.2 consumer group
如 2.2 節(jié)所說松捉, kafka 的分配單位是 patition。每個(gè) consumer 都屬于一個(gè) group馆里,一個(gè) partition 只能被同一個(gè) group 內(nèi)的一個(gè) consumer 所消費(fèi)(也就保障了一個(gè)消息只能被 group 內(nèi)的一個(gè) consuemr 所消費(fèi))隘世,但是多個(gè) group 可以同時(shí)消費(fèi)這個(gè) partition。
kafka 的設(shè)計(jì)目標(biāo)之一就是同時(shí)實(shí)現(xiàn)離線處理和實(shí)時(shí)處理鸠踪,根據(jù)這一特性丙者,可以使用 spark/Storm 這些實(shí)時(shí)處理系統(tǒng)對(duì)消息在線處理,同時(shí)使用 Hadoop 批處理系統(tǒng)進(jìn)行離線處理营密,還可以將數(shù)據(jù)備份到另一個(gè)數(shù)據(jù)中心械媒,只需要保證這三者屬于不同的 consumer group。如下圖所示:
6.3 消費(fèi)方式
consumer 采用 pull 模式從 broker 中讀取數(shù)據(jù)评汰。
push 模式很難適應(yīng)消費(fèi)速率不同的消費(fèi)者纷捞,因?yàn)橄l(fā)送速率是由 broker 決定的。它的目標(biāo)是盡可能以最快速度傳遞消息被去,但是這樣很容易造成 consumer 來不及處理消息主儡,典型的表現(xiàn)就是拒絕服務(wù)以及網(wǎng)絡(luò)擁塞。而 pull 模式則可以根據(jù) consumer 的消費(fèi)能力以適當(dāng)?shù)乃俾氏M(fèi)消息惨缆。
kafka文件的存儲(chǔ)機(jī)制
(1)糜值、kafka文件存儲(chǔ)基本結(jié)構(gòu)
- 在Kafka文件存儲(chǔ)中丰捷,同一個(gè)topic下有多個(gè)不同partition,每個(gè)partition為一個(gè)目錄臀玄,partiton命名規(guī)則為topic名稱+有序序號(hào)瓢阴,第一個(gè)partiton序號(hào)從0開始,序號(hào)最大值為partitions數(shù)量減1健无。
如荣恐,前篇里面的orderMq這個(gè)topic對(duì)應(yīng)的partitions在三臺(tái)機(jī)器上名稱分別為
drwxr-xr-x. 2 root root 4096 11月 21 22:25 orderMq-0
drwxr-xr-x. 2 root root 4096 11月 21 22:25 orderMq-2
drwxr-xr-x. 2 root root 4096 11月 14 18:45 orderMq-1
drwxr-xr-x. 2 root root 4096 11月 14 18:45 orderMq-2
drwxr-xr-x. 2 root root 4096 11月 21 22:25 orderMq-0
drwxr-xr-x. 2 root root 4096 11月 21 22:25 orderMq-1
注:重復(fù)的是副本,partition是為orderMq-0,orderMq-1,orderMq-2
- 每個(gè)partion(目錄)相當(dāng)于一個(gè)巨型文件被平均分配到多個(gè)大小相等segment(段)數(shù)據(jù)文件中累贤。但每個(gè)段segment file消息數(shù)量不一定相等叠穆,這種特性方便old segment file快速被刪除。默認(rèn)保留7天的數(shù)據(jù)臼膏。
如orderMq-0目錄下(index和log為后綴名的文件合稱就是segment 文件)
[root@mini3 orderMq-0]# ll
總用量 4
-rw-r--r--. 1 root root 10485760 11月 21 22:31 00000000000000000000.index
-rw-r--r--. 1 root root 219 11月 22 05:22 00000000000000000000.log
-每個(gè)partiton只需要支持順序讀寫就行了硼被,segment文件生命周期由服務(wù)端配置參數(shù)決定。(什么時(shí)候創(chuàng)建渗磅,什么時(shí)候刪除)
(2)kafka的segment 文件
- Segment file組成:由2大部分組成嚷硫,分別為index file和data file,此2個(gè)文件一一對(duì)應(yīng)始鱼,成對(duì)出現(xiàn)仔掸,后綴”.index”和“.log”分別表示為segment索引文件、數(shù)據(jù)文件医清。
Segment文件命名規(guī)則:partion全局的第一個(gè)segment從0開始起暮,后續(xù)每個(gè)segment文件名為上一個(gè)segment文件最后一條消息的offset值。數(shù)值最大為64位long大小会烙,19位數(shù)字字符長(zhǎng)度负懦,沒有數(shù)字用0填充。
索引文件存儲(chǔ)大量元數(shù)據(jù)柏腻,數(shù)據(jù)文件存儲(chǔ)大量消息纸厉,索引文件中元數(shù)據(jù)指向?qū)?yīng)數(shù)據(jù)文件中message的物理偏移地址。
3五嫂,497:當(dāng)前l(fā)og文件中的第幾條信息颗品,存放在磁盤上的那個(gè)地方
上述圖中索引文件存儲(chǔ)大量元數(shù)據(jù),數(shù)據(jù)文件存儲(chǔ)大量消息贫导,索引文件中元數(shù)據(jù)指向?qū)?yīng)數(shù)據(jù)文件中message的物理偏移地址抛猫。
其中以索引文件中元數(shù)據(jù)3,497為例,依次在數(shù)據(jù)文件中表示第3個(gè)message(在全局partiton表示第368772個(gè)message)孩灯、以及該消息的物理偏移地址為497闺金。
(3)、kafka查找message
讀取offset=368776的message峰档,需要通過下面2個(gè)步驟查找败匹。
第一步:查找segment file
00000000000000000000.index表示最開始的文件寨昙,起始偏移量(offset)為0
00000000000000368769.index的消息量起始偏移量為368770 = 368769 + 1
00000000000000737337.index的起始偏移量為737338=737337 + 1
其他后續(xù)文件依次類推。
以起始偏移量命名并排序這些文件掀亩,只要根據(jù)offset **二分查找**文件列表舔哪,就可以快速定位到具體文件。當(dāng)offset=368776時(shí)定位到00000000000000368769.index和對(duì)應(yīng)log文件槽棍。
第二步:通過segment file查找message
當(dāng)offset=368776時(shí)捉蚤,依次定位到00000000000000368769.index的元數(shù)據(jù)物理位置和00000000000000368769.log的物理偏移地址
然后再通過00000000000000368769.log順序查找直到offset=368776為止。