消息中間件—RocketMQ消息消費(二)(push模式實現)

摘要:在RocketMQ中,消息消費都是基于Pull消息方式挨务,那么Push模式中又是如何實現Consumer端準實時消費的呢?
在上一篇—“消息中間件—RocketMQ消息消費(一)”中玉组,已經簡要地介紹了下RocketMQ中“Pull和Push兩種消費方式的簡要流程”以及“Push消費方式的啟動流程”(ps:如果不熟悉這幾塊內容的童鞋谎柄,可以自己回顧下上一篇的內容)。本文將詳細介紹RocketMQ中Push消費方式下的“Pull消息的長輪詢機制”和“Consumer端的負載均衡機制”這兩塊關鍵核心內容惯雳。
由于RocketMQ系列的技術分享存在一定的連續(xù)性朝巫,因此希望讀者能回顧下往期RocketMQ分享的篇幅:
(1)消息中間件—RocketMQ的RPC通信(一)
(2)消息中間件—RocketMQ的RPC通信(二)
(3)消息中間件—RocketMQ消息發(fā)送
(4)消息中間件—RocketMQ消息消費(一)

一、RocketMQ中長輪詢的Pull消息機制

在上一篇中石景,已經簡略地介紹過RocketMQ中消息消費時Pull消息的長輪詢機制了劈猿,其主要的思路是:Consumer如果第一次嘗試Pull消息失敗(比如:Broker端沒有可以消費的消息)鸵钝,并不立即給消費者客戶端返回Response的響應糙臼,而是先hold住并且掛起請求。然后在Broker端恩商,通過后臺獨立線程—PullRequestHoldService重復嘗試執(zhí)行Pull消息請求來取消息变逃。同時,另外一個ReputMessageService線程不斷地構建ConsumeQueue/IndexFile數據怠堪,并取出hold住的Pull請求進行二次處理揽乱。通過這種長輪詢機制名眉,即可解決Consumer端需要通過不斷地發(fā)送無效的輪詢Pull請求,而導致整個RocketMQ集群中Broker端負載很高的問題凰棉。

1.1 Consumer向Broker端發(fā)送Pull消息請求的主要過程

在RocketMQ的Consumer端损拢,后臺獨立線程服務—pullMessageService是Pull消息請求的發(fā)起者,它不斷地嘗試從阻塞隊列—LinkedBlockingQueue<PullRequest>中獲取元素PullRequest撒犀,并根據pullRequest中的參數以及訂閱關系信息調用pullAPIWrapper的pullKernelImpl()方法發(fā)送封裝后的Pull消息請求—PullMessageRequestHeader至Broker端來拉取消息(具體完成發(fā)送一次Pull消息的PRC通信請求的是MQClientAPIImpl中的pullMessage()方法)福压。這里涉及細節(jié)的時序圖(ps:時序圖中沒有涉及PRC異步通信中的callback過程)如下:

Consumer向Broker端發(fā)送長輪詢請求的時序圖.jpg

其中, DefaultMQPushConsumerImpl的pullMessage(pullRequest)方法是發(fā)送Pull消息請求的關鍵:
(1)校驗ProcessQueue是否“drop”或舞, 如果為“drop”為true則直接返回(這個“drop”的設置在下面一節(jié)—“Consumer端的負載均衡機制”中會提到)荆姆;
(2)給ProcessQueue設置Pull消息的時間戳;
(3)做流量控制映凳,對于滿足下面條件的任何一種情況胆筒,稍后再發(fā)起Pull消息的請求;
條件1:正在消費的隊列中诈豌,未被消費的消息數和消息大小超過閥值(默認每個隊列消息數為1000個/消息存儲容量100MB)仆救;
條件2:如果是順序消費,正在消費的隊列中矫渔,消息的跨度超過閥值(默認2000)彤蔽;
(4)根據topic獲取訂閱關系—SubscriptionData送膳;
(5)構建Pull消息的回調對象—PullBack,這里從Broker端Pull消息的返回結果處理是通過異步回調(發(fā)送異步通信RPC請求),其中如果Broker端返回Pull消息成功,在回調方法中先填充至處理隊列—processQueue中(將Pull下來的消息袜啃,設置到ProcessQueue的msgTreeMap容器中)群发,然后通過消費消息的服務線程—consumeMessageService熟妓,將封裝好的ConsumeRequest提交至消費端消費線程池—consumeExecutor異步執(zhí)行處理(具體處理邏輯:通過業(yè)務應用系統(tǒng)在DefaultMQPushConsumer實例中注冊的消息監(jiān)聽器完成業(yè)務端的消息消費);
(6)從Consumer端內存中獲取commitOffsetValue;
(7)通過RocketMQ的Remoting通信層向Broker端發(fā)送Pull消息的RPC請求疤剑;

1.2 Broker端處理Pull消息請求的一般過程

這里先來說下對于一般情況下(即為所要Pull的消息在RocketMQ的Broker端已經是存在疑故,一般可以Pull到的情況)钦铁,Broker端處理這個Pull消息請求的主要過程牛曹。其時序圖(ps:圖中只是畫了大部分的流程,詳細細節(jié)還需要對照源碼看下)如下:

Broker端接受長輪詢請求的處理時序圖.jpg

從上面的簡易時序圖中可以看到Broker端Pull消息的主要關鍵點如下:
(1)Pull消息的業(yè)務處理器—PullMessageProcessor的processRequest為處理拉取消息請求的入口不跟,在設置reponse返回結果中的opaque值后,就完成一些前置的校驗(Broker是否可讀史侣、Topic/ConsumerGroup是否存在、讀取隊列Id是否在Topic配置的隊列范圍數內)魏身;
(2)根據“ConsumerGroup”惊橱、“Topic”、“queueId”和“offset”這些參數來調用MessageStore實例的getMessage()方法來產嘗試讀取Broker端的消息箭昵;
(3)其中税朴,通過findConsumeQueue()方法,獲取邏輯消費隊列—ConsumeQueue家制;
(4)根據offset與邏輯消費隊列中的maxOffset正林、minOffset的比較,來設置狀態(tài)值status颤殴,同時計算出下次Pull消息的開始偏移量值—nextBeginOffset觅廓,然后通過MappedFile的方式獲取ConsumeQueue的Buffer映射結果值;
(5)根據算出來的offsetPy(物理偏移量值)和sizePy(消息的物理大泻)杈绸,從commitLog獲取對應消息的Buffer映射結果值,并填充至GetMessageResult返回對象矮瘟,并設置返回結果(狀態(tài)/下次其實偏移量/maxOffset/minOffset)后return瞳脓;
(6)根據isTransferMsgByHeap的設置情況(默認為true),選擇下面兩種方式之一來真正讀取GetMessageResult的消息內容并返回至Consumer端芥永;
方式1:使用JDK NIO的ByteBuffer篡殷,循環(huán)地讀取存有消息內容的messageBufferList至堆內內存中,返回byte[]字節(jié)數組埋涧,并設置到響應的body中;然后奇瘦,通過RPC通信組件—NettyRemotingServer發(fā)送響應至Consumer端棘催;
方式2:采用基于Zero-Copy的Netty組件的FileRegion,其包裝的“FileChannel.tranferTo”實現文件傳輸耳标,可以直接將文件緩沖區(qū)的數據發(fā)送至通信目標通道Channel中醇坝,避免了通過循環(huán)write方式導致的內存拷貝開銷,這種方式性能上更優(yōu);
(7)在PullMessageProcessor業(yè)務處理器的最后呼猪,提交并持久化消息消費的offset偏移量進度画畅;

1.3 Broker端對于Pull請求掛起處理的流程

說完了Pull消息請求的一般流程,下面主要看下Broker端的PullMessageProcessor業(yè)務處理器在RocketMQ中還沒有消息可以拉取情況下(即為:PULL_NOT_FOUND)的處理流程宋距,本節(jié)內容也是RocketMQ中長輪詢機制的關鍵轴踱。
長輪詢機制是對普通輪詢的一種優(yōu)化方案,它平衡了傳統(tǒng)Push/Pull模型的各自缺點谚赎,Server端如果當前沒有Client端請求拉取的相關數據會hold住這個請求淫僻,直到Server端存在相關的數據,或者等待超時時間后返回壶唤。在響應返回后雳灵,Client端又會再次發(fā)起下一次的長輪詢請求。RocketMQ的push模式正是采用了這種長輪詢機制的設計思路闸盔,如果在上面所述的第一次嘗試Pull消息失敗后(比如Broker端暫時沒有可以消費的消息)悯辙,先hold住并且掛起該請求(這里,設置返回響應response為null迎吵,此時不會向Consumer端發(fā)送任何響應的內容躲撰,即不會對響應結果進行處理),然后通過Broker端的后臺線程PullRequestHoldService重新嘗試和后臺線程ReputMessageService的二次處理钓觉。在Broker端茴肥,兩個后臺線程服務PullRequestHoldService和ReputMessageService是實現長輪詢機制的關鍵點。下面就來分別介紹這兩個服務線程:
(1)PullRequestHoldService:該服務線程會從pullRequestTable本地緩存變量中取PullRequest請求荡灾,檢查輪詢條件—“待拉取消息的偏移量是否小于消費隊列最大偏移量”是否成立瓤狐,如果條件成立則說明有新消息達到Broker端,則通過PullMessageProcessor的executeRequestWhenWakeup()方法重新嘗試發(fā)起Pull消息的RPC請求(此處批幌,每隔5S重試一次础锐,默認長輪詢整體的時間設置為30s);
(2)ReputMessageService:該服務線程會在Broker端不斷地從數據存儲對象—commitLog中解析數據并分發(fā)請求荧缘,隨后構建出ConsumeQueue(邏輯消費隊列)和IndexFile(消息索引文件)兩種類型的數據皆警。同時從本地緩存變量—pullRequestTable中,取出hold住的PullRequest請求并執(zhí)行二次處理(具體的做法是截粗,在PullMessageProcessor的executeRequestWhenWakeup()方法中信姓,通過業(yè)務線程池pullMessageExecutor,異步提交重新Pull消息的請求任務绸罗,即為重新調了一次PullMessageProcessor業(yè)務處理器的processRequest()方法意推,來實現Pull消息請求的二次處理)。這里珊蟀,ReputMessageService服務線程菊值,每處理一次,Thread.sleep(1),繼續(xù)下一次處理腻窒。

二昵宇、Consumer端的負載均衡機制

看了上面一節(jié)—“RocketMQ中長輪詢的Pull消息機制”后,大家可能會有這樣子一個疑問:在Consumer端pullMessageService線程作為消息的主動拉取者不斷地從阻塞隊列中獲取元素PullRequest儿子,那么這里的PullRequest是在哪兒由哪個線程放入至阻塞隊列中的呢瓦哎?本節(jié)內容將介紹“Consumer端的負載均衡機制”,同時解答上面的疑問典徊。

2.1 RocketMQ為何需要在Consumer端做負載均衡杭煎?

在RocketMQ中,Consumer端的兩種消費模式(Push/Pull)都是基于拉模式來Pull消息的卒落,而在Push模式中只是采用了長輪詢的方式而實現了準實時的自動消息拉取羡铲。在兩種基于拉模式的消費方式(Push/Pull)中,均需要Consumer端在知道從Broker端的哪一個消息隊列—MessageQueue中去Pull消息儡毕。因此也切,消息隊列的負載均衡處理(即Broker端中多個MessageQueue分配給同一個ConsumerGroup中的哪些Consumer消費),由Consumer端來主動完成更為合理腰湾。

2.2 Consumer端負載均衡的主要流程

1. Consumer端的心跳包發(fā)送
在Consumer啟動后雷恃,它就會通過定時任務不斷地向RocketMQ集群中的所有Broker實例發(fā)送心跳包(其中包含了,消息消費分組名稱费坊、訂閱關系集合倒槐、消息通信模式和客戶端id的值等信息)。Broker端在收到Consumer的心跳消息后附井,會將它維護在ConsumerManager的本地緩存變量—consumerTable讨越,同時并將封裝后的客戶端網絡通道信息保存在本地緩存變量—channelInfoTable中,為之后做Consumer端的負載均衡提供可以依據的元數據信息永毅。
2. Consumer端實現負載均衡的核心類—RebalanceImpl
在上一篇文章的"Consumer啟動流程"中已經介紹了在啟動MQClientInstance實例時候把跨,會完成負載均衡服務線程—RebalanceService的啟動(每隔20s執(zhí)行一次)。通過查看源碼可以發(fā)現沼死,RebalanceService線程的run()方法最終調用的是RebalanceImpl類的rebalanceByTopic()方法着逐,該方法是實現Consumer端負載均衡的核心關鍵。
這里意蛀,rebalanceByTopic()方法會根據消費者通信類型為“廣播模式”還是“集群模式”做不同的邏輯處理耸别。這里主要來看下集群模式下的主要處理流程:
(1)從rebalanceImpl實例的本地緩存變量—topicSubscribeInfoTable中,獲取該Topic主題下的消息消費隊列集合(mqSet)县钥;
(2)根據topic和consumerGroup為參數調用mQClientFactory.findConsumerIdList()方法向Broker端發(fā)送獲取該消費組下消費者Id列表的RPC通信請求(Broker端基于前面Consumer端上報的心跳包數據而構建的consumerTable做出響應返回太雨,業(yè)務請求碼:GET_CONSUMER_LIST_BY_GROUP);
(3)先對Topic下的消息消費隊列魁蒜、消費者Id排序,然后用消息隊列分配策略算法(默認為:消息隊列的平均分配算法),計算出待拉取的消息隊列兜看。

Consumer端負載均衡策略的分配.jpg

這里的平均分配算法锥咸,類似于分頁的算法,將所有MessageQueue排好序類似于記錄细移,將所有消費端Consumer排好序類似頁數搏予,并求出每一頁需要包含的平均size和每個頁面記錄的范圍range,最后遍歷整個range而計算出當前Consumer端應該分配到的記錄(這里即為:MessageQueue)弧轧。具體的算法代碼如下:

@Override
    public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
        List<String> cidAll) {
        //省略代碼......
        List<MessageQueue> result = new ArrayList<MessageQueue>();
        //省略代碼......
        int index = cidAll.indexOf(currentCID);
        int mod = mqAll.size() % cidAll.size();
        int averageSize =
            mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size()
                + 1 : mqAll.size() / cidAll.size());
        int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;
        int range = Math.min(averageSize, mqAll.size() - startIndex);
        for (int i = 0; i < range; i++) {
            result.add(mqAll.get((startIndex + i) % mqAll.size()));
        }
        return result;

(4)然后雪侥,調用updateProcessQueueTableInRebalance()方法,具體的做法是精绎,先將分配到的消息隊列集合(mqSet)與processQueueTable做一個過濾比對速缨,具體的過濾比對方式如下圖:

RebalancePushImpl負載均衡(分發(fā)pullRequest到pullRequestQueue).jpg

這里可以分如下兩種情況來篩選過濾:
a.圖中processQueueTable標注的紅色部分,表示與分配到的消息隊列集合mqSet互不包含代乃。將這些隊列設置Dropped屬性為true旬牲,然后查看這些隊列是否可以移除出processQueueTable緩存變量,這里具體執(zhí)行removeUnnecessaryMessageQueue()方法搁吓,即每隔1s 查看是否可以獲取當前消費處理隊列的鎖原茅,拿到的話返回true。如果等待1s后堕仔,仍然拿不到當前消費處理隊列的鎖則返回false擂橘。如果返回true,則從processQueueTable緩存變量中移除對應的Entry摩骨;
b.圖中processQueueTable的綠色部分通贞,表示與分配到的消息隊列集合mqSet的交集。判斷該ProcessQueue是否已經過期了仿吞,在Pull模式的不用管滑频,如果是Push模式的,設置Dropped屬性為true唤冈,并且調用removeUnnecessaryMessageQueue()方法峡迷,像上面一樣嘗試移除Entry;
最后你虹,為過濾后的消息隊列集合(mqSet)中的每個MessageQueue創(chuàng)建一個ProcessQueue對象并存入RebalanceImpl的processQueueTable隊列中(其中調用RebalanceImpl實例的computePullFromWhere(MessageQueue mq)方法獲取該MessageQueue對象的下一個進度消費值offset绘搞,隨后填充至接下來要創(chuàng)建的pullRequest對象屬性中),并創(chuàng)建拉取請求對象—pullRequest添加到拉取列表—pullRequestList中傅物,最后執(zhí)行dispatchPullRequest()方法夯辖,將Pull消息的請求對象PullRequest依次放入PullMessageService服務線程的阻塞隊列pullRequestQueue中,待該服務線程取出后向Broker端發(fā)起Pull消息的請求董饰。其中蒿褂,可以重點對比下圆米,RebalancePushImpl和RebalancePullImpl兩個實現類的dispatchPullRequest()方法不同,RebalancePullImpl類里面的該方法為空啄栓,這樣子也就回答了上一篇中最后的那道思考題了娄帖。

三、總結

RocketMQ的消息消費(二)(push模式實現)篇幅就先分析到這里了昙楚。關于RocketMQ消息消費的內容比較多也比較復雜近速,需要讀者結合源碼并多次debug才能對其有一個較為深刻的理解。另外堪旧,對于消息消費部分的““消息ACK機制”削葱、“消費重試機制”等剩余內容將在后續(xù)的篇幅進行介紹和分析。限于筆者的才疏學淺淳梦,對本文內容可能還有理解不到位的地方析砸,如有闡述不合理之處還望留言一起探討。

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯系作者
  • 序言:七十年代末谭跨,一起剝皮案震驚了整個濱河市干厚,隨后出現的幾起案子,更是在濱河造成了極大的恐慌螃宙,老刑警劉巖蛮瞄,帶你破解...
    沈念sama閱讀 206,126評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現場離奇詭異谆扎,居然都是意外死亡挂捅,警方通過查閱死者的電腦和手機,發(fā)現死者居然都...
    沈念sama閱讀 88,254評論 2 382
  • 文/潘曉璐 我一進店門堂湖,熙熙樓的掌柜王于貴愁眉苦臉地迎上來闲先,“玉大人,你說我怎么就攤上這事无蜂∷趴罚” “怎么了?”我有些...
    開封第一講書人閱讀 152,445評論 0 341
  • 文/不壞的土叔 我叫張陵斥季,是天一觀的道長训桶。 經常有香客問我,道長酣倾,這世上最難降的妖魔是什么舵揭? 我笑而不...
    開封第一講書人閱讀 55,185評論 1 278
  • 正文 為了忘掉前任,我火速辦了婚禮躁锡,結果婚禮上午绳,老公的妹妹穿的比我還像新娘。我一直安慰自己映之,他們只是感情好拦焚,可當我...
    茶點故事閱讀 64,178評論 5 371
  • 文/花漫 我一把揭開白布蜡坊。 她就那樣靜靜地躺著,像睡著了一般耕漱。 火紅的嫁衣襯著肌膚如雪算色。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 48,970評論 1 284
  • 那天螟够,我揣著相機與錄音,去河邊找鬼峡钓。 笑死妓笙,一個胖子當著我的面吹牛,可吹牛的內容都是我干的能岩。 我是一名探鬼主播寞宫,決...
    沈念sama閱讀 38,276評論 3 399
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼拉鹃!你這毒婦竟也來了辈赋?” 一聲冷哼從身側響起,我...
    開封第一講書人閱讀 36,927評論 0 259
  • 序言:老撾萬榮一對情侶失蹤膏燕,失蹤者是張志新(化名)和其女友劉穎钥屈,沒想到半個月后,有當地人在樹林里發(fā)現了一具尸體坝辫,經...
    沈念sama閱讀 43,400評論 1 300
  • 正文 獨居荒郊野嶺守林人離奇死亡篷就,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 35,883評論 2 323
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現自己被綠了近忙。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片竭业。...
    茶點故事閱讀 37,997評論 1 333
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖及舍,靈堂內的尸體忽然破棺而出未辆,到底是詐尸還是另有隱情,我是刑警寧澤锯玛,帶...
    沈念sama閱讀 33,646評論 4 322
  • 正文 年R本政府宣布咐柜,位于F島的核電站,受9級特大地震影響更振,放射性物質發(fā)生泄漏炕桨。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 39,213評論 3 307
  • 文/蒙蒙 一肯腕、第九天 我趴在偏房一處隱蔽的房頂上張望献宫。 院中可真熱鬧,春花似錦实撒、人聲如沸姊途。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,204評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽捷兰。三九已至立叛,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間贡茅,已是汗流浹背秘蛇。 一陣腳步聲響...
    開封第一講書人閱讀 31,423評論 1 260
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留顶考,地道東北人赁还。 一個月前我還...
    沈念sama閱讀 45,423評論 2 352
  • 正文 我出身青樓,卻偏偏與公主長得像驹沿,于是被迫代替她去往敵國和親艘策。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 42,722評論 2 345

推薦閱讀更多精彩內容

  • 文章摘要:在發(fā)送消息給RocketMQ后渊季,消費者需要消費朋蔫。消息的消費比發(fā)送要復雜一些,那么RocketMQ是如何來...
    癲狂俠閱讀 36,787評論 8 43
  • 分布式開放消息系統(tǒng)(RocketMQ)的原理與實踐 來源:http://www.reibang.com/p/453...
    meng_philip123閱讀 12,896評論 6 104
  • metaq是阿里團隊的消息中間件却汉,之前也有用過和了解過kafka驯妄,據說metaq是基于kafka的源碼改過來的,他...
    菜鳥小玄閱讀 32,841評論 0 14
  • 消息隊列已經逐漸成為企業(yè)IT系統(tǒng)內部通信的核心手段病涨。它具有低耦合富玷、可靠投遞、廣播既穆、流量控制赎懦、最終一致性等一系列功能...
    Sophie12138閱讀 721評論 0 7
  • 文/八爪青蛙 我同事是個話嘮一說起來就沒完,平常也是嘴特別貧的一個人幻工,可是昨天認真起來了励两。突然說到七月份結束實習回...
    八爪青蛙閱讀 288評論 0 0