1 Kafka 概述
1.1 kafaka 簡介
Apache Kafka 是一個(gè)快速梳杏、可擴(kuò)展的够吩、高吞吐的拉一、可容錯(cuò)的分布式“發(fā)布-訂閱”消息系統(tǒng), 使用 Scala 與Java 語言編寫鸽疾,能夠?qū)⑾囊粋€(gè)端點(diǎn)傳遞到另一個(gè)端點(diǎn),較之傳統(tǒng)的消息中間件(例如 ActiveMQ训貌、RabbitMQ)制肮,Kafka 具有高吞吐量、內(nèi)置分區(qū)递沪、支持消息副本和高容錯(cuò)的特性豺鼻,非常適合大規(guī)模消息處理應(yīng)用程序。
Kafka 官網(wǎng): http://kafka.apache.org/
1.2 Kafa 系統(tǒng)架構(gòu)
1.3 應(yīng)用場景
Kafka 的應(yīng)用場景很多款慨,這里就舉幾個(gè)最常見的場景儒飒。
1.3.1 用戶的活動(dòng)追蹤
用戶在網(wǎng)站的不同活動(dòng)消息發(fā)布到不同的主題中心,然后可以對(duì)這些消息進(jìn)行實(shí)時(shí)監(jiān)測實(shí)時(shí)處理檩奠。當(dāng)然约素,也可加載到 Hadoop 或離線處理數(shù)據(jù)倉庫,對(duì)用戶進(jìn)行畫像笆凌。像淘寶圣猎、京東這些大型的電商平臺(tái),用戶的所有活動(dòng)都是要進(jìn)行追蹤的乞而。
1.3.2 日志聚合
1.3.3 限流削峰
1.4 kafka 高吞吐率實(shí)現(xiàn)
Kafka 與其它 MQ 相比送悔,其最大的特點(diǎn)就是高吞吐率。為了增加存儲(chǔ)能力爪模,Kafka 將所有的消息都寫入到了低速大容的硬盤欠啤。按理說,這將導(dǎo)致性能損失屋灌,但實(shí)際上洁段,kafka 仍可保持超高的吞吐率,性能并未受到影響共郭。其主要采用了如下的方式實(shí)現(xiàn)了高吞吐率祠丝。
?順序讀寫:Kafka 將消息寫入到了分區(qū) partition 中疾呻,而分區(qū)中消息是順序讀寫的。順序讀寫要遠(yuǎn)快于隨機(jī)讀寫写半。
?零拷貝:生產(chǎn)者岸蜗、消費(fèi)者對(duì)于kafka 中消息的操作是采用零拷貝實(shí)現(xiàn)的。
?批量發(fā)送:Kafka 允許使用批量消息發(fā)送模式叠蝇。
?消息壓縮:Kafka 支持對(duì)消息集合進(jìn)行壓縮璃岳。
2 Kafka 工作原理與工作過程
2.1 Kafka 基本術(shù)語
對(duì)于Kafka 基本原理的介紹,可以通過對(duì)以下基本術(shù)語的介紹進(jìn)行悔捶。
2.1.1 Topic
主題铃慷。在 Kafka 中,使用一個(gè)類別屬性來劃分消息的所屬類蜕该,劃分消息的這個(gè)類稱為 topic枚冗。topic 相當(dāng)于消息的分類標(biāo)簽,是一個(gè)邏輯概念蛇损。
2.1.2 Partition
分區(qū)赁温。topic 中的消息被分割為一個(gè)或多個(gè) partition,其是一個(gè)物理概念淤齐,對(duì)應(yīng)到系統(tǒng)上就是一個(gè)或若干個(gè)目錄股囊。
2.1.3 segment
段。將 partition 進(jìn)一步細(xì)分為了若干的 segment更啄,每個(gè) segment 文件的最大大小相等稚疹。
2.1.4 Broker
Kafka 集群包含一個(gè)或多個(gè)服務(wù)器,每個(gè)服務(wù)器節(jié)點(diǎn)稱為一個(gè) broker祭务。partition 的數(shù)量設(shè)置為 broker 數(shù)量的整數(shù)倍内狗。
2.1.5 Producer
生產(chǎn)者。即消息的發(fā)布者义锥,其會(huì)將某 topic 的消息發(fā)布到相應(yīng)的 partition 中柳沙。
2.1.6 Consumer
消費(fèi)者“璞叮可以從 broker 中讀取消息赂鲤。一個(gè)消費(fèi)者可以消費(fèi)多個(gè) topic 的消息。
一個(gè)消費(fèi)者也可以消費(fèi)一個(gè) topic 中的多個(gè) partition 中的消息柱恤。一個(gè) partition 允許多個(gè)無關(guān)的消費(fèi)者同時(shí)消費(fèi)数初。
消費(fèi)者與 partition 的關(guān)系是多對(duì)多的。
2.1.7 Consumer Group
consumer group 是kafka 提供的可擴(kuò)展且具有容錯(cuò)性的消費(fèi)者機(jī)制梗顺。組內(nèi)可以有多個(gè)消費(fèi)者泡孩,它們共享一個(gè)公共的 ID,即 group ID寺谤。組內(nèi)的所有消費(fèi)者會(huì)協(xié)調(diào)在一起平均消費(fèi)訂閱主題的所有分區(qū)仑鸥。
Kafka 可以保證在穩(wěn)定狀態(tài)下吮播,一個(gè) partition 中的消息只能被同一個(gè) consumer group 中的一個(gè)consumer 消費(fèi),而一個(gè)組內(nèi) consumer 只會(huì)消費(fèi)某一個(gè)或幾個(gè)特定的 partition锈候。當(dāng)然, 一個(gè)消息可以同時(shí)被多個(gè) consumer group 消費(fèi)敞贡。
組中 consumer 數(shù)量與 partition 數(shù)量的對(duì)應(yīng)關(guān)系如下泵琳。
2.1.8 Replicas of partition
分區(qū)副本。副本是一個(gè)分區(qū)的備份誊役,是為了防止消息丟失而創(chuàng)建的分區(qū)的備份获列。
2.1.9Partition Leader
當(dāng) partition 有多個(gè)副本時(shí),其中有且僅有一個(gè)作為 Leader蛔垢,Leader 是當(dāng)前負(fù)責(zé)消息讀寫的 partition击孩。即所有讀寫操作只能發(fā)生于 Leader 分區(qū)上。
2.1.10 Partition Follower
所有 Follower 都需要從Leader 同步消息鹏漆,F(xiàn)ollower 與Leader 始終保持消息同步巩梢。Leader 與 Follower 是主備關(guān)系,而非主從關(guān)系艺玲。
leader 不是 zk 選出來的括蝠,是 broker controller 選出來的。
2.1.11 ISR
ISR饭聚,In-Sync Replicas忌警,是指副本同步列表。AR秒梳,Assigned Replicas法绵,指定的副本OSR,Outof-Sync Replicas酪碘。
AR = ISR + OSR
2.1.12 offset
偏移量朋譬。每條消息都有一個(gè)當(dāng)前 Partition 下唯一的 64 字節(jié)的 offset,它是相對(duì)于當(dāng)前分區(qū)第一條消息的偏移量兴垦。
2.1.13 offset commit
Consumer 從 partition 中取出一批消息寫入到 buffer 對(duì)其進(jìn)行消費(fèi)此熬,在規(guī)定時(shí)間內(nèi)消費(fèi)完消息后,會(huì)自動(dòng)將其消費(fèi)消息的 offset 提交給 broker滑进,以讓 broker 記錄下哪些消息是消費(fèi)過的犀忱。當(dāng)然,若在時(shí)限內(nèi)沒有消費(fèi)完畢扶关,其是不會(huì)提交 offset 的阴汇。
2.1.14 HW 與 LEO
HW,HighWatermark节槐,高水位搀庶,表示 Consumer 可以消費(fèi)到的最高 partition 偏移量拐纱。HW 保證了 Kafka 集群中消息的一致性。確切地說哥倔,是在 broker 集群正常運(yùn)轉(zhuǎn)的狀態(tài)下秸架,保證了partition 的 Follower 與 Leader 間數(shù)據(jù)的一致性。
LEO咆蒿,Log End Offset东抹,日志最后消息的偏移量。消息是被寫入到 Kafka 的日志文件中的沃测, 這是當(dāng)前最后一個(gè)寫入的消息在 Partition 中的偏移量缭黔。
對(duì)于 leader 新寫入的消息,consumer 是不能立刻消費(fèi)的蒂破。leader 會(huì)等待該消息被所有ISR 中的 partition follower 同步后才會(huì)更新 HW馏谨,此時(shí)消息才能被 consumer 消費(fèi)。
2.1.15 Rebalance
當(dāng)消費(fèi)者組中消費(fèi)者數(shù)量發(fā)生變化附迷,或 Topic 中的 partition 數(shù)量發(fā)生了變化時(shí)惧互,partition 的所有權(quán)會(huì)在消費(fèi)者間轉(zhuǎn)移,即 partition 會(huì)重新分配喇伯,這個(gè)過程稱為再均衡 Rebalance壹哺。
再均衡能夠給消費(fèi)者組及 broker 集群帶來高可用性和伸縮性,但在再均衡期間消費(fèi)者是無法讀取消息的艘刚,即整個(gè) broker 集群有一小段時(shí)間是不可用的管宵。因此要避免不必要的再均衡。
2.1.16 Broker Controller
Kafka 集群的多個(gè) broker 中攀甚,有一個(gè)會(huì)被選舉為 controller箩朴,負(fù)責(zé)管理整個(gè)集群中 partition和副本 replicas 的狀態(tài)。
2.1.17 Zookeeper
Zookeeper 負(fù)責(zé)維護(hù)和協(xié)調(diào) broker秋度,負(fù)責(zé) Broker Controller 的選舉炸庞。
2.1.18 Coordinator
Coordinator 一般指的是運(yùn)行在每個(gè) broker 上的 group Coordinator 進(jìn)程,用于管理Consumer Group 中的各個(gè)成員荚斯,主要用于 offset 位移管理和 Rebalance埠居。一個(gè) Coordinator 可以同時(shí)管理多個(gè)消費(fèi)者組。
2.2 Kafka 工作原理與過程
2.2.1 消息路由策略
在通過 API 方式發(fā)布消息時(shí)事期,生產(chǎn)者是以 Record 為消息進(jìn)行發(fā)布的滥壕。Record 中包含 key 與 value,value 才是我們真正的消息本身兽泣,而 key 用于路由消息所要存放的 Partition绎橘。消息要寫入到哪個(gè) Partition 并不是隨機(jī)的,而是有路由策略的唠倦。
1)若指定了 partition称鳞,則直接寫入到指定的 partition涮较;
2)若未指定 partition 但指定了 key,則通過對(duì) key 的 hash 值與 partition 數(shù)量取模冈止,該取模結(jié)果就是要選出的 partition 索引狂票;
3)若 partition 和 key 都未指定,則使用輪詢算法選出一個(gè) partition熙暴。
在進(jìn)行 offset commit 時(shí)闺属,offset 也是以消息的形式寫入到了 consumer_offset 主題的partition 中的。寫入到了哪個(gè) partition 呢怨咪?系統(tǒng)會(huì)為每個(gè)提交的 offset 生成一個(gè) key屋剑,然后該 key 的 hash 值與 50 取模結(jié)果即為 partition 的索引润匙。
2.2.2 消息寫入算法
消息生產(chǎn)者將消息發(fā)送給 broker诗眨,并形成最終的可供消費(fèi)者消費(fèi)的 log,是一個(gè)比較復(fù)雜的過程孕讳。
1)producer 向 broker 集群提交連接請(qǐng)求匠楚,其所連接上的任意 broker 都會(huì)向其發(fā)送 broker controller 的通信 URL,即 broker controller 主機(jī)配置文件中的 listeners 地址
2)當(dāng) producer 指定了要生產(chǎn)消息的 topic 后厂财,其會(huì)向 broker controller 發(fā)送請(qǐng)求芋簿,請(qǐng)求當(dāng)前topic 中所有 partition 的 leader 列表地址
3)broker controller 在接收到請(qǐng)求后,會(huì)從zk 中查找到指定topic 的所有partition 的leader璃饱, 并返回給 producer
4)producer 在接收到 leader 列表地址后与斤,根據(jù)消息路由策略找到當(dāng)前要發(fā)送消息所要發(fā)送的 partition leader,然后將消息發(fā)送給該 leader
5)leader 將消息寫入本地 log荚恶,并通知 ISR 中的 followers
6)ISR 中的followers 從 leader 中同步消息后向 leader 發(fā)送 ACK
7)leader 收到所有 ISR 中的 followers 的 ACK 后撩穿,增加 HW,表示消費(fèi)者已經(jīng)可以消費(fèi)到該位置了
8)若 leader 在等待的 followers 的 ack 超時(shí)了谒撼,發(fā)現(xiàn)還有 follower 沒有發(fā)送 ack食寡,則會(huì)將該follower 從 ISR 列表中踢出去,然后增加 HW
2.2.3 HW 截?cái)鄼C(jī)制
如果 partition leader 接收到了新的消息廓潜, ISR 中其它 Follower 正在同步過程中抵皱,還未同步完畢時(shí)leader 掛了。此時(shí)就需要選舉出新的leader辩蛋。若沒有HW 截?cái)鄼C(jī)制呻畸,將會(huì)導(dǎo)致partition 中 leader 與 follower 數(shù)據(jù)的不一致。
當(dāng)原 leader 宕機(jī)后又恢復(fù)時(shí)悼院,將其 LEO 回退到宕機(jī)時(shí)的 HW擂错,然后再與新的 leader 進(jìn)行數(shù)據(jù)同步。這種機(jī)制稱為 HW 截?cái)鄼C(jī)制樱蛤。
2.2.4 消息發(fā)送的可靠性機(jī)制
生產(chǎn)者向kafka 發(fā)送消息時(shí)钮呀,可以選擇需要的可靠性級(jí)別剑鞍。通過 acks 參數(shù)的值進(jìn)行設(shè)置。
(1) 0 值
異步發(fā)送爽醋。生產(chǎn)者向 kafka 發(fā)送消息而不需要 kafka 反饋成功 ack蚁署。該方式效率最高,但可靠性最低蚂四。其可能會(huì)存在消息丟失的情況光戈。
消息丟失的原因: 在發(fā)送前丟失:
在傳輸過程中丟失:
(2) 1 值
同步發(fā)送,默認(rèn)值遂赠。生產(chǎn)者發(fā)送消息給kafka久妆,broker 的 partition leader 在收到消息后馬上發(fā)送成功ack(無需等待 ISR 中的 follower 同步完成),生產(chǎn)者收到后知道消息發(fā)送成功跷睦,然后會(huì)再發(fā)送消息筷弦。如果一直未收到 kafka 的 ack,則生產(chǎn)者會(huì)認(rèn)為消息發(fā)送失敗抑诸,會(huì)重發(fā)消息烂琴。
該設(shè)置無法保證消息發(fā)送的成功,但可以確認(rèn)消息發(fā)送失敗蜕乡。只要沒有收到 ack 一定是失敗的奸绷,但收到了 ack 并不代表成功。
(3) -1 值
同步發(fā)送层玲。其值等同于 all号醉。生產(chǎn)者發(fā)送消息給 kafka,kafka 收到消息后要等到 ISR 列表中的所有副本都同步消息完成后辛块,才向生產(chǎn)者發(fā)送成功 ack畔派。如果一直未收到 kafka 的 ack, 則認(rèn)為消息發(fā)送失敗憨降,會(huì)自動(dòng)重發(fā)消息父虑。
該設(shè)置可能會(huì)出現(xiàn)部分 Follower 重復(fù)接收(不等同于重復(fù)消費(fèi))。
2.2.5 消費(fèi)者消費(fèi)過程解析
生產(chǎn)者將消息發(fā)送到 topic 中授药,消費(fèi)者即可對(duì)其進(jìn)行消費(fèi)士嚎,其消費(fèi)過程如下:
1)consumer 向 broker 集群提交連接請(qǐng)求,其所連接上的任意 broker 都會(huì)向其發(fā)送 broker controller 的通信 URL悔叽,即 broker controller 主機(jī)配置文件中的 listeners 地址
2)當(dāng) consumer 指定了要消費(fèi)的 topic 后莱衩,其會(huì)向 broker controller 發(fā)送 poll 請(qǐng)求
3)broker controller 會(huì)為 consumer 分配一個(gè)或幾個(gè) partition leader,并將該 partitioin 的當(dāng)前 offset 發(fā)送給 consumer
4)consumer 會(huì)按照 broker controller 分配的 partition娇澎,從給定的 offset 處開始笨蚁,一次性讀取一批消息進(jìn)行消費(fèi)
5)當(dāng)這批消息消費(fèi)完該后,消費(fèi)者會(huì)向 broker 發(fā)送消息已被消費(fèi)的反饋,即最后一條消息的 offset
6)當(dāng) broker 接到消費(fèi)者的 offset 后括细,會(huì)更新到相應(yīng)的 consumer_offset 中
7)以上過程一直重復(fù)伪很,直到消費(fèi)者停止請(qǐng)求消息
8)消費(fèi)者可以重置 offset,從而可以靈活消費(fèi)存儲(chǔ)在 broker 上的消息
2.2.6 Partition Leader 選舉范圍
當(dāng) leader 掛了后 broker controller 會(huì)從 ISR 中選一個(gè) follower 成為新的 leader奋单。但锉试,若 ISR 中的所有副本都掛了怎么辦?可以通過 unclean.leader.election.enable 的取值來設(shè)置 Leader 選舉的范圍览濒。
(1)false
必須等待ISR 列表中有副本活過來才進(jìn)行新的選舉呆盖。該策略可靠性有保證,但可用性低贷笛。
(2)true
在 ISR 中沒有副本的情況下可以選擇任何一個(gè)沒有宕機(jī)主機(jī)中該 topic 的 partition 副本作為新的 leader应又,該策略可用性高,但可靠性沒有保證乏苦。
2.2.7 重復(fù)消費(fèi)問題及解決方案
最常見的重復(fù)消費(fèi)有兩種:
(1)同一個(gè) consumer 重復(fù)消費(fèi)
當(dāng) Consumer 由于消費(fèi)能力較低而引發(fā)了消費(fèi)超時(shí)時(shí)株扛,則可能會(huì)形成重復(fù)消費(fèi)。
當(dāng) consumer 由于消費(fèi)能力較低而在自動(dòng)提交超時(shí)時(shí)限內(nèi)沒有消費(fèi)完成其取出的消息時(shí)邑贴,
consumer 會(huì)向 broker 提交一個(gè)異常席里。
解決方案:
?可以延長自動(dòng)提交超時(shí)時(shí)限
?將自動(dòng)提交改為手動(dòng)提交
(2)不同的 consumer 重復(fù)消費(fèi)
當(dāng) Consumer 消費(fèi)了消息但還未提交 offset 時(shí)宕機(jī)叔磷,此時(shí)會(huì)發(fā)生 rebalance拢驾,然后原來的partition 會(huì)被分配給新的 consumer,那么改基,該 partiton 中被原來的 partition 消費(fèi)過的消息則可能會(huì)被新的 consumer 再次消費(fèi)繁疤。這些已被消費(fèi)過的消息會(huì)被重復(fù)消費(fèi)。
解決方案:
?將自動(dòng)提交改為手動(dòng)提交
2.3 Kafka 集群搭建
在生產(chǎn)環(huán)境中為了防止單點(diǎn)問題秕狰,Kafka 都是以集群方式出現(xiàn)的稠腊。下面要搭建一個(gè) Kafka集群,包含三個(gè) Kafka 主機(jī)鸣哀,即三個(gè) Broker架忌。
2.3.1 Kafka 的下載
2.3.2 安裝并配置第一臺(tái)主機(jī)
(1)上傳并解壓
將下載好的 Kafka 壓縮包上傳至 CentOS 虛擬機(jī),并解壓我衬。
(2)創(chuàng)建軟鏈接
(3)修改配置文件
在 kafka 安裝目錄下有一個(gè) config/server.properties 文件叹放,修改該文件。
2.3.3 再克隆兩臺(tái)Kafka
以 kafkaOS1 為母機(jī)再克隆兩臺(tái) Kafka 主機(jī)挠羔。在克隆完畢后井仰,需要修改 server.properties中的 broker.id、listeners 與 advertised.listeners破加。
2.3.4 kafka 的啟動(dòng)與停止
(1)啟動(dòng)zookeeper
(2)啟動(dòng) kafka
在命令后添加-daemon 參數(shù)俱恶,可以使 kafka 以守護(hù)進(jìn)程方式啟動(dòng),即不占用窗口。
(3)停止 kafka
2.3.5 kafka 操作
(1)創(chuàng)建 topic
(2)查看 topic
(3)發(fā)送消息
該命令會(huì)創(chuàng)建一個(gè)生產(chǎn)者,然后由其生產(chǎn)消息咒唆。
(4)消費(fèi)消息
(5)繼續(xù)生產(chǎn)消費(fèi)
(6)刪除 topic
2.4 日志查看
我們這里說的日志不是Kafka 的啟動(dòng)日志蜜托,啟動(dòng)日志在Kafka 安裝目錄下的logs/server.log 中。消息在磁盤上都是以日志的形式保存的捶惜。我們這里說的日志是存放在/tmp/kafka_logs 目錄中的消息日志,即 partition 與 segment荔烧。
2.4.1 查看分區(qū)與備份
(1)1 個(gè)分區(qū) 1 個(gè)備份
我們前面創(chuàng)建的 test 主題是 1 個(gè)分區(qū) 1 個(gè)備份吱七。
(2)3 個(gè)分區(qū) 1 個(gè)備份
再次創(chuàng)建一個(gè)主題,命名為 one鹤竭,創(chuàng)建三個(gè)分區(qū)踊餐,但仍為一個(gè)備份。 依次查看三臺(tái)broker臀稚,可以看到每臺(tái) broker 中都有一個(gè) one 主題的分區(qū)吝岭。
(3)3 個(gè)分區(qū) 3 個(gè)備份
再次創(chuàng)建一個(gè)主題,命名為 two吧寺,創(chuàng)建三個(gè)分區(qū)窜管,三個(gè)備份。依次查看三臺(tái) broker稚机,可以看到每臺(tái) broker 中都有三份 two 主題的分區(qū)幕帆。
2.4.2 查看分區(qū)與備份在 zk 中的信息
使用zkCli.sh 命令連接上 zk,可以查看到kafka 在zk 的信息赖条。
(1)/brokers 目錄
(2)/brokers/ids 目錄
存放的是kafka 集群中各個(gè)主機(jī)的 broker-id 列表失乾。
每個(gè) id 的數(shù)據(jù)內(nèi)容為當(dāng)前主機(jī)的信息。
(3)/brokers/topics
/brokers/topics/city/partitions 中存放的是 city 主題下所包含的 partition纬乍。這里的 0碱茁、1、2仿贬,在/tmp/kafka-logs 目錄中即為 city-0纽竣,city-1,city-2茧泪。
2.4.3 查看段segment
(1)segment 文件
segment 是一個(gè)邏輯概念蜓氨,其由兩類物理文件組成,分別為“.index”文件和“.log”文件调炬∮镉“.log”文件中存放的是消息,而“.index”文件中存放的是“.log”文件中消息的索引缰泡。
(2)查看segment
對(duì)于 segment 中的 log 文件刀荒,不能直接通過 cat 命令查找其內(nèi)容代嗤,而是需要通過kafka自帶的一個(gè)工具查看。
bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files
/tmp/kafka-logs/test-0/00000000000000000000.log --print-data-log
一個(gè)用戶的一個(gè)主題會(huì)被提交到一個(gè) consumer_offsets 分區(qū)中缠借。使用主題字符串的hash 值與 50 取模干毅,結(jié)果即為分區(qū)索引。
3 Kafka API
首先在命令行創(chuàng)建一個(gè)名稱為 cities 的主題泼返,并創(chuàng)建該主題的訂閱者硝逢。
3.1 使用 kafka 原生 API
3.1.1 創(chuàng)建工程
創(chuàng)建一個(gè) Maven 的 Java 工程,命名為 kafkaDemo绅喉。創(chuàng)建時(shí)無需導(dǎo)入依賴渠鸽。為了簡單, 后面的發(fā)布者與消費(fèi)者均創(chuàng)建在該工程中柴罐。
3.1.2 導(dǎo)入依賴
<!-- kafka 依賴 -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.12</artifactId>
<version>1.1.1</version>
</dependency>
3.1.3 創(chuàng)建發(fā)布者 OneProducer
(1)創(chuàng)建發(fā)布者類 OneProducer
(2)創(chuàng)建測試類 OneProducerTest
3.1.4 創(chuàng)建發(fā)布者 TwoProducer
前面的方式在消息發(fā)送成功后徽缚,代碼中沒有任何提示,這里可以使用回調(diào)方式革屠,即發(fā)送成功后凿试,會(huì)觸發(fā)回調(diào)方法的執(zhí)行。
(1)創(chuàng)建發(fā)布者類TwoProducer
復(fù)制 OneProducer 類似芝,僅修改 sendMsg()方法那婉。
(2)創(chuàng)建測試類TwoProducerTest
3.1.5 批量發(fā)送消息
(1)創(chuàng)建發(fā)布者類 SomeProducerBatch
復(fù)制前面的發(fā)布者類,在其基礎(chǔ)上進(jìn)行修改党瓮。
(2)創(chuàng)建測試類 ProducerBatchTest
3.1.6 消費(fèi)者組
(1)創(chuàng)建消費(fèi)者類 SomeConsumer
(2)創(chuàng)建測試類 ConsumerTest
3.1.7 消費(fèi)者同步手動(dòng)提交
(1)自動(dòng)提交的問題
前面的消費(fèi)者都是以自動(dòng)提交 offset 的方式對(duì) broker 中的消息進(jìn)行消費(fèi)的详炬,但自動(dòng)提交可能會(huì)出現(xiàn)消息重復(fù)消費(fèi)的情況。所以在生產(chǎn)環(huán)境下麻诀,很多時(shí)候需要對(duì) offset 進(jìn)行手動(dòng)提交痕寓, 以解決重復(fù)消費(fèi)的問題傲醉。
(2)手動(dòng)提交分類
手動(dòng)提交又可以劃分為同步提交蝇闭、異步提交、同異步聯(lián)合提交硬毕。這些提交方式僅僅是doWork()方法不相同呻引,其構(gòu)造器是相同的。所以下面首先在前面消費(fèi)者類的基礎(chǔ)上進(jìn)行構(gòu)造器的修改吐咳,然后再分別實(shí)現(xiàn)三種不同的提交方式逻悠。
(3)創(chuàng)建消費(fèi)者類 SyncManualConsumer
A、原理
同步提交方式是韭脊,消費(fèi)者向 broker 提交 offset 后等待 broker 成功響應(yīng)童谒。若沒有收到響應(yīng),則會(huì)重新提交沪羔,直到獲取到響應(yīng)饥伊。而在這個(gè)等待過程中,消費(fèi)者是阻塞的。其嚴(yán)重影響了消費(fèi)者的吞吐量琅豆。
B愉豺、 修改構(gòu)造器
直接復(fù)制前面的 SomeConsumer,在其基礎(chǔ)上進(jìn)行修改茫因。
C蚪拦、 修改 doWork()方法
(4)創(chuàng)建測試類 SyncManulTest
3.1.8 消費(fèi)者異步手動(dòng)提交
(1)原理
手動(dòng)同步提交方式需要等待 broker 的成功響應(yīng),效率太低冻押,影響消費(fèi)者的吞吐量驰贷。異步提交方式是,消費(fèi)者向 broker 提交 offset 后不用等待成功響應(yīng)洛巢,所以其增加了消費(fèi)者的吞吐量饱苟。
(2)創(chuàng)建消費(fèi)者類 AsyncManualConsumer
復(fù)制前面的 SyncManualConsumer 類,在其基礎(chǔ)上進(jìn)行修改狼渊。
(3)創(chuàng)建測試類 AsyncManulTest
3.1.9 消費(fèi)者同異步手動(dòng)提交
(1)原理
同異步提交箱熬,即同步提交與異步提交組合使用。一般情況下狈邑,在異步手動(dòng)提交時(shí)城须,若偶爾出現(xiàn)提交失敗,其也不會(huì)影響消費(fèi)者的消費(fèi)米苹。因?yàn)楹罄m(xù)提交最終會(huì)將這次提交失敗的 offset 給提交了糕伐。
但異步手動(dòng)提交會(huì)產(chǎn)生重復(fù)消費(fèi),為了防止重復(fù)消費(fèi)蘸嘶,可以將同步提交與異常提交聯(lián)合使用良瞧。
(2)創(chuàng)建消費(fèi)者類 SyncAsyncManualConsumer
復(fù)制前面的 AsyncManualConsumer 類,在其基礎(chǔ)上進(jìn)行修改训唱。
3.2 Spring Boot Kafka
為了簡單褥蚯,以下代碼是將消息發(fā)布者與訂閱者定義到了一個(gè)工程中的。
3.2.1 創(chuàng)建工程
創(chuàng)建一個(gè) Spring Boot 工程况增,導(dǎo)入如下依賴赞庶。
3.2.2 定義發(fā)布者
Spring 是通過 KafkaTemplate 來完成對(duì) Kafka 的操作的。
(1)修改配置文件
(2)定義發(fā)布者處理器
Spring Kafka 通過KafkaTemplate 完成消息的發(fā)布澳骤。
3.2.3 定義消費(fèi)者
Spring 是通過監(jiān)聽方式實(shí)現(xiàn)消費(fèi)者的歧强。
(1)修改配置文件
在配置文件中添加如下內(nèi)容。注意为肮,Spring 中要求必須為消費(fèi)者指定組摊册。
(2)定義消費(fèi)者
Spring Kafka 是通過 KafkaListener 監(jiān)聽方式來完成消息訂閱與接收的。當(dāng)監(jiān)聽到有指定主題的消息時(shí)颊艳,就會(huì)觸發(fā)@KafkaListener 注解所標(biāo)注的方法的執(zhí)行茅特。