一褒侧、kafka 架構(gòu)和原理
1.1 相關(guān)概念
如圖.1中漱凝,kafka 相關(guān)名詞解釋如下:
1.producer:
消息生產(chǎn)者单山,發(fā)布消息到 kafka 集群的終端或服務(wù)。
2.broker:
kafka 集群中包含的服務(wù)器万矾。
3.topic:
每條發(fā)布到 kafka 集群的消息屬于的類別谤民,即 kafka 是面向 topic 的沸停。
4.partition:
partition 是物理上的概念膜毁,每個(gè) topic 包含一個(gè)或多個(gè) partition。kafka 分配的單位是 partition愤钾。
5.consumer:
從 kafka 集群中消費(fèi)消息的終端或服務(wù)瘟滨。
6.Consumer group:
high-level consumer API 中,每個(gè) consumer 都屬于一個(gè) consumer group能颁,每條消息只能被 consumer group 中的一個(gè) Consumer 消費(fèi)杂瘸,但可以被多個(gè) consumer group 消費(fèi)。
7.replica:
partition 的副本伙菊,保障 partition 的高可用败玉。
8.leader:
replica 中的一個(gè)角色, producer 和 consumer 只跟 leader 交互镜硕。
9.follower:
replica 中的一個(gè)角色运翼,從 leader 中復(fù)制數(shù)據(jù)。
10.controller:
kafka 集群中的其中一個(gè)服務(wù)器兴枯,用來進(jìn)行 leader election 以及 各種 failover血淌。
12.zookeeper:
kafka 通過 zookeeper 來存儲集群的 meta 信息。
[圖1]
1.2 zookeeper 節(jié)點(diǎn)
kafka 在 zookeeper 中的存儲結(jié)構(gòu)如下圖所示:
1.3财剖、內(nèi)部原理及實(shí)現(xiàn)
Kafka依賴zk悠夯,以集群方式工作,每臺機(jī)器稱為一個(gè)blocker(與其他分布式環(huán)境不同的是躺坟,僅有1臺blocker也能工作)沦补,并分別指定blockerId,kafka中同一類型數(shù)據(jù)以topic形式存在咪橙,可對topic進(jìn)行分區(qū)策彤,及指定副本數(shù),例如可將 topic1 分區(qū)為3個(gè)partition:p0,p1,p2 ;每個(gè)分區(qū)副本數(shù)為3匣摘,則每個(gè)分區(qū)塊會均勻的分布在blocker中店诗,對于每個(gè)partition,有多個(gè)副本音榜,會選出一個(gè)leader partition對外接收請求(只有l(wèi)eader partition會接收外部請求庞瘸,其他follower partition只負(fù)責(zé)同步leader數(shù)據(jù),不接收外部請求赠叼,包括讀請求擦囊,這里與zk不同违霞,zk的follower會接收讀請求)。集群有多個(gè)blocker瞬场,會基于zk選擇出一個(gè)Controller买鸽,來管理整個(gè)集群,包括blocker管理贯被、topic管理眼五,partition leader分配等。
每個(gè)blocker只存儲消息體彤灶,不存儲元數(shù)據(jù)(topic信息看幼;分區(qū)信息:topic有哪些分區(qū),哪些副本幌陕,分別在哪臺blocker上诵姜,哪個(gè)是leader;consumer信息及讀取消息后提交的偏移量數(shù)據(jù)等)搏熄,元數(shù)據(jù)存儲在zk中棚唆。
Controller主要功能:
(1)更新元數(shù)據(jù)
若blocker宕機(jī),相關(guān)副本不可用心例,需要重新選擇相關(guān)partition的leader瑟俭,及更新ISR列表,選舉完畢后契邀,controller會更新zk中的元數(shù)據(jù)摆寄,并廣播給其他blocker(每個(gè)blocker都可以對外提供元數(shù)據(jù)信息)
(2)topic管理
新增、刪除topic
(3)分區(qū)重新分配
通過自帶的 kafka-reassign-partitions 腳本可重新制定topic分區(qū)坯门,由controller處理請求微饥,重新分配并選擇每個(gè)分區(qū)的leader。
(4)partition leader 分配
partition leader 選舉的原理 :
1. 在ZooKeeper上創(chuàng)建/admin/preferred_replica_election節(jié)點(diǎn)古戴,并存入需要調(diào)整Preferred Replica的Partition信息欠橘。
2. Controller一直Watch該節(jié)點(diǎn),一旦該節(jié)點(diǎn)被創(chuàng)建现恼,Controller會收到通知肃续,并獲取該內(nèi)容。
3. Controller讀取Preferred Replica叉袍,如果發(fā)現(xiàn)該Replica當(dāng)前并非是Leader并且它在該P(yáng)artition的ISR中始锚,
Controller向該Replica發(fā)送LeaderAndIsrRequest,使該Replica成為Leader喳逛。
如果該Replica當(dāng)前并非是Leader瞧捌,且不在ISR中,Controller為了保證沒有數(shù)據(jù)丟失,并不會將其設(shè)置為Leader姐呐。
(總結(jié):副本partition向zk創(chuàng)建臨時(shí)節(jié)點(diǎn)并寫入數(shù)據(jù)殿怜,Controller broker監(jiān)聽節(jié)點(diǎn),一旦創(chuàng)建成功曙砂,獲取其中的消息头谜,更新副本Leader及ISR列表,完成選舉鸠澈;只會選擇ISR列表中的設(shè)備作為leader)
具體實(shí)現(xiàn):
在kafka中創(chuàng)建一個(gè)topic柱告,controller會接收創(chuàng)建的請求,并在zk的目錄下新增topic節(jié)點(diǎn)
zk中會維護(hù)consumer相關(guān)信息:
二款侵、producer 發(fā)布消息
2.1 寫入方式
producer 采用 push 模式將消息發(fā)布到 broker,每條消息都被 append 到 patition 中侧纯,屬于順序?qū)懘疟P(順序?qū)懘疟P效率比隨機(jī)寫內(nèi)存要高新锈,保障 kafka 吞吐率)。
2.2 消息路由
producer 發(fā)送消息到 broker 時(shí)眶熬,會根據(jù)分區(qū)算法選擇將其存儲到哪一個(gè) partition妹笆。其路由機(jī)制為:
1. 指定了 patition,則直接使用娜氏;
2. 未指定 patition 但指定 key拳缠,通過對 key 的 value 進(jìn)行hash 選出一個(gè) patition
3. patition 和 key 都未指定,使用輪詢選出一個(gè) patition贸弥。
2.3 寫入流程
producer 寫入消息序列圖如下所示:
流程說明:
1. producer 先從 zookeeper 的 "/brokers/.../state" 節(jié)點(diǎn)找到該 partition 的 leader
2. producer 將消息發(fā)送給該 leader
3. leader 將消息寫入本地 log
4. followers 從 leader pull 消息窟坐,寫入本地 log 后 leader 發(fā)送 ACK
5. leader 收到所有 ISR 中的 replica 的 ACK 后,增加 HW(high watermark绵疲,最后 commit 的 offset) 并向 producer 發(fā)送 ACK
2.4 消息傳輸流程
Producer即生產(chǎn)者哲鸳,向Kafka集群發(fā)送消息,在發(fā)送消息之前盔憨,會對消息進(jìn)行分類徙菠,即Topic,上圖展示了兩個(gè)producer發(fā)送了分類為topic1的消息郁岩,另外一個(gè)發(fā)送了topic2的消息婿奔。
Topic即主題,通過對消息指定主題可以將消息分類问慎,消費(fèi)者可以只關(guān)注自己需要的Topic中的消息
Consumer即消費(fèi)者萍摊,消費(fèi)者通過與kafka集群建立長連接的方式,不斷地從集群中拉取消息如叼,然后可以對這些消息進(jìn)行處理记餐。
從上圖中就可以看出同一個(gè)Topic下的消費(fèi)者和生產(chǎn)者的數(shù)量并不是對應(yīng)的。
2.5 kafka服務(wù)器消息存儲策略
(1)下面以一個(gè)Kafka集群中4個(gè)Broker舉例
創(chuàng)建1個(gè)topic包含4個(gè)Partition薇正,2 Replication片酝;數(shù)據(jù)Producer流動(dòng)如圖所示:
(2)當(dāng)集群中新增2節(jié)點(diǎn)囚衔,Partition增加到6個(gè)時(shí)分布情況如下:
副本分配邏輯規(guī)則如下:
在Kafka集群中,每個(gè)Broker都有均等分配Partition的Leader機(jī)會雕沿。上述圖Broker Partition中练湿,箭頭指向?yàn)楦北荆訮artition-0為例:broker1中parition-0為Leader审轮,Broker2中Partition-0為副本肥哎。上述圖種每個(gè)Broker(按照BrokerId有序)依次分配主Partition,下一個(gè)Broker為副本,如此循環(huán)迭代分配疾渣,多副本都遵循此規(guī)則篡诽。
副本分配算法如下:
將所有N Broker和待分配的i個(gè)Partition排序.將第i個(gè)Partition分配到第(i mod n)個(gè)Broker上.將第i個(gè)Partition的第j個(gè)副本分配到第((i + j) mod n)個(gè)Broker上.
談到kafka的存儲,就不得不提到分區(qū)榴捡,即partitions杈女,創(chuàng)建一個(gè)topic時(shí),同時(shí)可以指定分區(qū)數(shù)目吊圾,分區(qū)數(shù)越多达椰,其吞吐量也越大,但是需要的資源也越多项乒,同時(shí)也會導(dǎo)致更高的不可用性啰劲,kafka在接收到生產(chǎn)者發(fā)送的消息之后,會根據(jù)均衡策略將消息存儲到不同的分區(qū)中檀何。
在每個(gè)分區(qū)中蝇裤,消息以順序存儲,最晚接收的的消息會最后被消費(fèi)频鉴。
2.6與生產(chǎn)者的交互
生產(chǎn)者在向kafka集群發(fā)送消息的時(shí)候猖辫,可以通過指定分區(qū)來發(fā)送到指定的分區(qū)中
也可以通過指定均衡策略來將消息發(fā)送到不同的分區(qū)中
如果不指定,就會采用默認(rèn)的隨機(jī)均衡策略砚殿,將消息隨機(jī)的存儲到不同的分區(qū)中
2.7 與消費(fèi)者的交互
在消費(fèi)者消費(fèi)消息時(shí)啃憎,kafka使用offset來記錄當(dāng)前消費(fèi)的位置
在kafka的設(shè)計(jì)中,可以有多個(gè)不同的group來同時(shí)消費(fèi)同一個(gè)topic下的消息似炎,如圖辛萍,我們有兩個(gè)不同的group同時(shí)消費(fèi),他們的的消費(fèi)的記錄位置offset各不相同羡藐,不互相干擾贩毕。
對于一個(gè)group而言,消費(fèi)者的數(shù)量不應(yīng)該多余分區(qū)的數(shù)量仆嗦,因?yàn)樵谝粋€(gè)group中辉阶,每個(gè)分區(qū)至多只能綁定到一個(gè)消費(fèi)者上,即一個(gè)消費(fèi)者可以消費(fèi)多個(gè)分區(qū),一個(gè)分區(qū)只能給一個(gè)消費(fèi)者消費(fèi)
因此谆甜,若一個(gè)group中的消費(fèi)者數(shù)量大于分區(qū)數(shù)量的話垃僚,多余的消費(fèi)者將不會收到任何消息。
三规辱、broker 保存消息
3.1 存儲方式
物理上把 topic 分成一個(gè)或多個(gè) patition(對應(yīng) server.properties 中的 num.partitions=3 配置)谆棺,每個(gè) patition 物理上對應(yīng)一個(gè)文件夾(該文件夾存儲該 patition 的所有消息和索引文件),如下:
3.2 存儲策略
無論消息是否被消費(fèi)罕袋,kafka 都會保留所有消息改淑。有兩種策略可以刪除舊數(shù)據(jù):
- 基于時(shí)間:log.retention.hours=168
- 基于大小:log.retention.bytes=1073741824
需要注意的是浴讯,因?yàn)镵afka讀取特定消息的時(shí)間復(fù)雜度為O(1)朵夏,即與文件大小無關(guān),所以這里刪除過期文件與提高 Kafka 性能無關(guān)榆纽。
3.3 topic 創(chuàng)建與刪除
3.3.1 創(chuàng)建 topic
創(chuàng)建 topic 的序列圖如下所示:
流程說明:
1. controller 在 ZooKeeper 的 /brokers/topics 節(jié)點(diǎn)上注冊 watcher仰猖,當(dāng) topic 被創(chuàng)建,則 controller 會通過 watch 得到該 topic 的 partition/replica 分配掠河。
2. controller從 /brokers/ids 讀取當(dāng)前所有可用的 broker 列表亮元,對于 set_p 中的每一個(gè) partition:
2.1 從分配給該 partition 的所有 replica(稱為AR)中任選一個(gè)可用的 broker 作為新的 leader猛计,并將AR設(shè)置為新的 ISR
2.2 將新的 leader 和 ISR 寫入 /brokers/topics/[topic]/partitions/[partition]/state
3. controller 通過 RPC 向相關(guān)的 broker 發(fā)送 LeaderAndISRRequest唠摹。
3.3.2 刪除 topic
刪除 topic 的序列圖如下所示:
流程說明:
- controller 在 zooKeeper 的 /brokers/topics 節(jié)點(diǎn)上注冊 watcher,當(dāng) topic 被刪除奉瘤,則 controller 會通過 watch 得到該 topic 的 partition/replica 分配勾拉。
- 若 delete.topic.enable=false,結(jié)束盗温;否則 controller 注冊在 /admin/delete_topics 上的 watch 被 fire藕赞,controller 通過回調(diào)向?qū)?yīng)的 broker 發(fā)送 StopReplicaRequest。
** 四卖局、kafka HA**
4.1 replication
如圖.1所示斧蜕,同一個(gè) partition 可能會有多個(gè) replica(對應(yīng) server.properties 配置中的 default.replication.factor=N)。沒有 replica 的情況下砚偶,一旦 broker 宕機(jī)批销,其上所有 patition 的數(shù)據(jù)都不可被消費(fèi),同時(shí) producer 也不能再將數(shù)據(jù)存于其上的 patition染坯。引入replication 之后均芽,同一個(gè) partition 可能會有多個(gè) replica,而這時(shí)需要在這些 replica 之間選出一個(gè) leader单鹿,producer 和 consumer 只與這個(gè) leader 交互掀宋,其它 replica 作為 follower 從 leader 中復(fù)制數(shù)據(jù)。
Kafka 分配 Replica 的算法如下:
1. 將所有 broker(假設(shè)共 n 個(gè) broker)和待分配的 partition 排序
2. 將第 i 個(gè) partition 分配到第(i mod n)個(gè) broker 上
3. 將第 i 個(gè) partition 的第 j 個(gè) replica 分配到第((i + j) mode n)個(gè) broker上
4.2 leader failover
當(dāng) partition 對應(yīng)的 leader 宕機(jī)時(shí),需要從 follower 中選舉出新 leader劲妙。在選舉新leader時(shí)湃鹊,一個(gè)基本的原則是,新的 leader 必須擁有舊 leader commit 過的所有消息是趴。
kafka 在 zookeeper 中(/brokers/.../state)動(dòng)態(tài)維護(hù)了一個(gè) ISR(in-sync replicas)涛舍,由3.3節(jié)的寫入流程可知 ISR 里面的所有 replica 都跟上了 leader,只有 ISR 里面的成員才能選為 leader唆途。對于 f+1 個(gè) replica富雅,一個(gè) partition 可以在容忍 f 個(gè) replica 失效的情況下保證消息不丟失。
當(dāng)所有 replica 都不工作時(shí)肛搬,有兩種可行的方案:
1. 等待 ISR 中的任一個(gè) replica 活過來没佑,并選它作為 leader∥屡猓可保障數(shù)據(jù)不丟失蛤奢,但時(shí)間可能相對較長。
2. 選擇第一個(gè)活過來的 replica(不一定是 ISR 成員)作為 leader陶贼。無法保障數(shù)據(jù)不丟失啤贩,但相對不可用時(shí)間較短。
kafka 0.8.* 使用第二種方式拜秧。
kafka 通過 Controller 來選舉 leader痹屹,流程請參考5.3節(jié)。
4.3 broker failover
kafka broker failover 序列圖如下所示:
流程說明:
1. controller 在 zookeeper 的 /brokers/ids/[brokerId] 節(jié)點(diǎn)注冊 Watcher枉氮,當(dāng) broker 宕機(jī)時(shí) zookeeper 會 fire watch
2. controller 從 /brokers/ids 節(jié)點(diǎn)讀取可用broker
3. controller決定set_p志衍,該集合包含宕機(jī) broker 上的所有 partition
4. 對 set_p 中的每一個(gè) partition
4.1 從/brokers/topics/[topic]/partitions/[partition]/state 節(jié)點(diǎn)讀取 ISR
4.2 決定新 leader(如4.3節(jié)所描述)
4.3 將新 leader、ISR聊替、controller_epoch 和 leader_epoch 等信息寫入 state 節(jié)點(diǎn)
5. 通過 RPC 向相關(guān) broker 發(fā)送 leaderAndISRRequest 命令
4.4 controller failover
當(dāng) controller 宕機(jī)時(shí)會觸發(fā) controller failover楼肪。每個(gè) broker 都會在 zookeeper 的 "/controller" 節(jié)點(diǎn)注冊 watcher,當(dāng) controller 宕機(jī)時(shí) zookeeper 中的臨時(shí)節(jié)點(diǎn)消失惹悄,所有存活的 broker 收到 fire 的通知春叫,每個(gè) broker 都嘗試創(chuàng)建新的 controller path,只有一個(gè)競選成功并當(dāng)選為 controller泣港。
當(dāng)新的 controller 當(dāng)選時(shí)暂殖,會觸發(fā) KafkaController.onControllerFailover 方法,在該方法中完成如下操作:
1. 讀取并增加 Controller Epoch爷速。
2. 在 reassignedPartitions Patch(/admin/reassign_partitions) 上注冊 watcher央星。
3. 在 preferredReplicaElection Path(/admin/preferred_replica_election) 上注冊 watcher。
4. 通過 partitionStateMachine 在 broker Topics Patch(/brokers/topics) 上注冊 watcher惫东。
5. 若 delete.topic.enable=true(默認(rèn)值是 false)莉给,則 partitionStateMachine 在 Delete Topic Patch(/admin/delete_topics) 上注冊 watcher毙石。
6. 通過 replicaStateMachine在 Broker Ids Patch(/brokers/ids)上注冊Watch。
7. 初始化 ControllerContext 對象颓遏,設(shè)置當(dāng)前所有 topic徐矩,“活”著的 broker 列表,所有 partition 的 leader 及 ISR等叁幢。
8. 啟動(dòng) replicaStateMachine 和 partitionStateMachine滤灯。
9. 將 brokerState 狀態(tài)設(shè)置為 RunningAsController。
10. 將每個(gè) partition 的 Leadership 信息發(fā)送給所有“活”著的 broker曼玩。
11. 若 auto.leader.rebalance.enable=true(默認(rèn)值是true)鳞骤,則啟動(dòng) partition-rebalance 線程。
12. 若 delete.topic.enable=true 且Delete Topic Patch(/admin/delete_topics)中有值黍判,則刪除相應(yīng)的Topic豫尽。
5. consumer 消費(fèi)消息
5.1 consumer API
kafka 提供了兩套 consumer API:
- The high-level Consumer API
- The SimpleConsumer API
其中 high-level consumer API 提供了一個(gè)從 kafka 消費(fèi)數(shù)據(jù)的高層抽象,而 SimpleConsumer API 則需要開發(fā)人員更多地關(guān)注細(xì)節(jié)顷帖。
5.1.1 The high-level consumer API
high-level consumer API 提供了 consumer group 的語義美旧,一個(gè)消息只能被 group 內(nèi)的一個(gè) consumer 所消費(fèi),且 consumer 消費(fèi)消息時(shí)不關(guān)注 offset贬墩,最后一個(gè) offset 由 zookeeper 保存榴嗅。
使用 high-level consumer API 可以是多線程的應(yīng)用,應(yīng)當(dāng)注意:
- 如果消費(fèi)線程大于 patition 數(shù)量陶舞,則有些線程將收不到消息
- 如果 patition 數(shù)量大于線程數(shù)嗽测,則有些線程多收到多個(gè) patition 的消息
- 如果一個(gè)線程消費(fèi)多個(gè) patition,則無法保證你收到的消息的順序吊说,而一個(gè) patition 內(nèi)的消息是有序的
5.1.2 The SimpleConsumer API
如果你想要對 patition 有更多的控制權(quán)论咏,那就應(yīng)該使用 SimpleConsumer API优炬,比如:
1. 多次讀取一個(gè)消息
2. 只消費(fèi)一個(gè) patition 中的部分消息
3. 使用事務(wù)來保證一個(gè)消息僅被消費(fèi)一次
但是使用此 API 時(shí)颁井,partition、offset蠢护、broker雅宾、leader 等對你不再透明,需要自己去管理葵硕。你需要做大量的額外工作:
1. 必須在應(yīng)用程序中跟蹤 offset眉抬,從而確定下一條應(yīng)該消費(fèi)哪條消息
2. 應(yīng)用程序需要通過程序獲知每個(gè) Partition 的 leader 是誰
3. 需要處理 leader 的變更
使用 SimpleConsumer API 的一般流程如下:
1. 查找到一個(gè)“活著”的 broker,并且找出每個(gè) partition 的 leader
2. 找出每個(gè) partition 的 follower
3. 定義好請求懈凹,該請求應(yīng)該能描述應(yīng)用程序需要哪些數(shù)據(jù)
4. fetch 數(shù)據(jù)
5. 識別 leader 的變化蜀变,并對之作出必要的響應(yīng)
以下針對 high-level Consumer API 進(jìn)行說明。
5.2 consumer group
如 2.2 節(jié)所說介评, kafka 的分配單位是 patition库北。每個(gè) consumer 都屬于一個(gè) group爬舰,一個(gè) partition 只能被同一個(gè) group 內(nèi)的一個(gè) consumer 所消費(fèi)(也就保障了一個(gè)消息只能被 group 內(nèi)的一個(gè) consuemr 所消費(fèi)),但是多個(gè) group 可以同時(shí)消費(fèi)這個(gè) partition寒瓦。
kafka 的設(shè)計(jì)目標(biāo)之一就是同時(shí)實(shí)現(xiàn)離線處理和實(shí)時(shí)處理情屹,根據(jù)這一特性,可以使用 spark/Storm 這些實(shí)時(shí)處理系統(tǒng)對消息在線處理杂腰,同時(shí)使用 Hadoop 批處理系統(tǒng)進(jìn)行離線處理垃你,還可以將數(shù)據(jù)備份到另一個(gè)數(shù)據(jù)中心,只需要保證這三者屬于不同的 consumer group喂很。如下圖所示:
5.3 消費(fèi)方式
consumer 采用 pull 模式從 broker 中讀取數(shù)據(jù)惜颇。
push 模式很難適應(yīng)消費(fèi)速率不同的消費(fèi)者,因?yàn)橄l(fā)送速率是由 broker 決定的少辣。它的目標(biāo)是盡可能以最快速度傳遞消息官还,但是這樣很容易造成 consumer 來不及處理消息,典型的表現(xiàn)就是拒絕服務(wù)以及網(wǎng)絡(luò)擁塞毒坛。而 pull 模式則可以根據(jù) consumer 的消費(fèi)能力以適當(dāng)?shù)乃俾氏M(fèi)消息望伦。
對于 Kafka 而言,pull 模式更合適煎殷,它可簡化 broker 的設(shè)計(jì)屯伞,consumer 可自主控制消費(fèi)消息的速率,同時(shí) consumer 可以自己控制消費(fèi)方式——即可批量消費(fèi)也可逐條消費(fèi)豪直,同時(shí)還能選擇不同的提交方式從而實(shí)現(xiàn)不同的傳輸語義劣摇。
5.4 consumer delivery guarantee
如果將 consumer 設(shè)置為 autocommit,consumer 一旦讀到數(shù)據(jù)立即自動(dòng) commit弓乙。如果只討論這一讀取消息的過程末融,那 Kafka 確保了 Exactly once。
但實(shí)際使用中應(yīng)用程序并非在 consumer 讀取完數(shù)據(jù)就結(jié)束了暇韧,而是要進(jìn)行進(jìn)一步處理勾习,而數(shù)據(jù)處理與 commit 的順序在很大程度上決定了consumer delivery guarantee:
1.讀完消息先 commit 再處理消息。
這種模式下懈玻,如果 consumer 在 commit 后還沒來得及處理消息就 crash 了巧婶,下次重新開始工作后就無法讀到剛剛已提交而未處理的消息,這就對應(yīng)于 At most once
2.讀完消息先處理再 commit涂乌。
這種模式下艺栈,如果在處理完消息之后 commit 之前 consumer crash 了,下次重新開始工作時(shí)還會處理剛剛未 commit 的消息湾盒,實(shí)際上該消息已經(jīng)被處理過了湿右。這就對應(yīng)于 At least once。
3.如果一定要做到 Exactly once罚勾,就需要協(xié)調(diào) offset 和實(shí)際操作的輸出毅人。
精典的做法是引入兩階段提交漾唉。如果能讓 offset 和操作輸入存在同一個(gè)地方,會更簡潔和通用堰塌。這種方式可能更好赵刑,因?yàn)樵S多輸出系統(tǒng)可能不支持兩階段提交。比如场刑,consumer 拿到數(shù)據(jù)后可能把數(shù)據(jù)放到 HDFS般此,如果把最新的 offset 和數(shù)據(jù)本身一起寫到 HDFS,那就可以保證數(shù)據(jù)的輸出和 offset 的更新要么都完成牵现,要么都不完成铐懊,間接實(shí)現(xiàn) Exactly once。(目前就 high-level API而言瞎疼,offset 是存于Zookeeper 中的科乎,無法存于HDFS,而SimpleConsuemr API的 offset 是由自己去維護(hù)的贼急,可以將之存于 HDFS 中)
總之茅茂,Kafka 默認(rèn)保證 At least once,并且允許通過設(shè)置 producer 異步提交來實(shí)現(xiàn) At most once(見文章《kafka consumer防止數(shù)據(jù)丟失》)太抓。而 Exactly once 要求與外部存儲系統(tǒng)協(xié)作空闲,幸運(yùn)的是 kafka 提供的 offset 可以非常直接非常容易得使用這種方式。
更多關(guān)于 kafka 傳輸語義的信息請參考《Message Delivery Semantics》走敌。
****5.5 consumer rebalance****
當(dāng)有 consumer 加入或退出碴倾、以及 partition 的改變(如 broker 加入或退出)時(shí)會觸發(fā) rebalance。consumer rebalance算法如下:
1. 將目標(biāo) topic 下的所有 partirtion 排序掉丽,存于PT
2. 對某 consumer group 下所有 consumer 排序米酬,存于 CG绪囱,第 i 個(gè)consumer 記為 Ci
3. N=size(PT)/size(CG)窘行,向上取整
4. 解除 Ci 對原來分配的 partition 的消費(fèi)權(quán)(i從0開始)
5. 將第i*N到(i+1)*N-1個(gè) partition 分配給 Ci
在 0.8.*版本肤舞,每個(gè) consumer 都只負(fù)責(zé)調(diào)整自己所消費(fèi)的 partition,為了保證整個(gè)consumer group 的一致性残邀,當(dāng)一個(gè) consumer 觸發(fā)了 rebalance 時(shí)皆辽,該 consumer group 內(nèi)的其它所有其它 consumer 也應(yīng)該同時(shí)觸發(fā) rebalance柑蛇。這會導(dǎo)致以下幾個(gè)問題:
1.Herd effect
任何 broker 或者 consumer 的增減都會觸發(fā)所有的 consumer 的 rebalance
2.Split Brain
每個(gè) consumer 分別單獨(dú)通過 zookeeper 判斷哪些 broker 和 consumer 宕機(jī)了芥挣,那么不同 consumer 在同一時(shí)刻從 zookeeper 看到的 view 就可能不一樣,這是由 zookeeper 的特性決定的耻台,這就會造成不正確的 reblance 嘗試空免。
3. 調(diào)整結(jié)果不可控
所有的 consumer 都并不知道其它 consumer 的 rebalance 是否成功,這可能會導(dǎo)致 kafka 工作在一個(gè)不正確的狀態(tài)盆耽。
基于以上問題蹋砚,kafka 設(shè)計(jì)者考慮在0.9.*版本開始使用中心 coordinator 來控制 consumer rebalance扼菠,然后又從簡便性和驗(yàn)證要求兩方面考慮,計(jì)劃在 consumer 客戶端實(shí)現(xiàn)分配方案坝咐。(見文章《Kafka Detailed Consumer Coordinator Design》和《Kafka Client-side Assignment Proposal》)循榆,此處不再贅述。
六墨坚、注意事項(xiàng)
6.1 producer 無法發(fā)送消息的問題
最開始在本機(jī)搭建了kafka偽集群秧饮,本地 producer 客戶端成功發(fā)布消息至 broker。隨后在服務(wù)器上搭建了 kafka 集群泽篮,在本機(jī)連接該集群盗尸,producer 卻無法發(fā)布消息到 broker(奇怪也沒有拋錯(cuò))。最開始懷疑是 iptables 沒開放帽撑,于是開放端口泼各,結(jié)果還不行(又開始是代碼問題、版本問題等等亏拉,倒騰了很久)扣蜻。最后沒辦法,一項(xiàng)一項(xiàng)查看 server.properties 配置及塘,發(fā)現(xiàn)以下兩個(gè)配置:
The address the socket server listens on. It will get the value returned from java.net.InetAddress.getCanonicalHostName() if not configured.
FORMAT:
listeners = security_protocol://host_name:port
EXAMPLE:
listeners = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://:9092
# Hostname and port the broker will advertise to producers and consumers. If not set,
# it uses the value for "listeners" if configured. Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
#advertised.listeners=PLAINTEXT://your.host.name:9092
以上說的就是 advertised.listeners 是 broker 給 producer 和 consumer 連接使用的弱贼,如果沒有設(shè)置,就使用 listeners磷蛹,而如果 host_name 沒有設(shè)置的話吮旅,就使用 java.net.InetAddress.getCanonicalHostName() 方法返回的主機(jī)名。
修改方法:
- listeners=PLAINTEXT://121.10.26.XXX:9092
- advertised.listeners=PLAINTEXT://121.10.26.XXX:9092
修改后重啟服務(wù)味咳,正常工作庇勃。關(guān)于更多 kafka 配置說明,見文章《Kafka學(xué)習(xí)整理三(borker(0.9.0及0.10.0)配置)》槽驶。
參考:
https://www.cnblogs.com/xifenglou/p/7251112.html
http://www.reibang.com/p/a036405f989c