摘要:在RocketMQ中,消息消費都是基于Pull消息方式挨务,那么Push模式中又是如何實現Consumer端準實時消費的呢?
在上一篇—“消息中間件—RocketMQ消息消費(一)”中玉组,已經簡要地介紹了下RocketMQ中“Pull和Push兩種消費方式的簡要流程”以及“Push消費方式的啟動流程”(ps:如果不熟悉這幾塊內容的童鞋谎柄,可以自己回顧下上一篇的內容)。本文將詳細介紹RocketMQ中Push消費方式下的“Pull消息的長輪詢機制”和“Consumer端的負載均衡機制”這兩塊關鍵核心內容惯雳。
由于RocketMQ系列的技術分享存在一定的連續(xù)性朝巫,因此希望讀者能回顧下往期RocketMQ分享的篇幅:
(1)消息中間件—RocketMQ的RPC通信(一)
(2)消息中間件—RocketMQ的RPC通信(二)
(3)消息中間件—RocketMQ消息發(fā)送
(4)消息中間件—RocketMQ消息消費(一)
一、RocketMQ中長輪詢的Pull消息機制
在上一篇中石景,已經簡略地介紹過RocketMQ中消息消費時Pull消息的長輪詢機制了劈猿,其主要的思路是:Consumer如果第一次嘗試Pull消息失敗(比如:Broker端沒有可以消費的消息)鸵钝,并不立即給消費者客戶端返回Response的響應糙臼,而是先hold住并且掛起請求。然后在Broker端恩商,通過后臺獨立線程—PullRequestHoldService重復嘗試執(zhí)行Pull消息請求來取消息变逃。同時,另外一個ReputMessageService線程不斷地構建ConsumeQueue/IndexFile數據怠堪,并取出hold住的Pull請求進行二次處理揽乱。通過這種長輪詢機制名眉,即可解決Consumer端需要通過不斷地發(fā)送無效的輪詢Pull請求,而導致整個RocketMQ集群中Broker端負載很高的問題凰棉。
1.1 Consumer向Broker端發(fā)送Pull消息請求的主要過程
在RocketMQ的Consumer端损拢,后臺獨立線程服務—pullMessageService是Pull消息請求的發(fā)起者,它不斷地嘗試從阻塞隊列—LinkedBlockingQueue<PullRequest>中獲取元素PullRequest撒犀,并根據pullRequest中的參數以及訂閱關系信息調用pullAPIWrapper的pullKernelImpl()方法發(fā)送封裝后的Pull消息請求—PullMessageRequestHeader至Broker端來拉取消息(具體完成發(fā)送一次Pull消息的PRC通信請求的是MQClientAPIImpl中的pullMessage()方法)福压。這里涉及細節(jié)的時序圖(ps:時序圖中沒有涉及PRC異步通信中的callback過程)如下:
其中, DefaultMQPushConsumerImpl的pullMessage(pullRequest)方法是發(fā)送Pull消息請求的關鍵:
(1)校驗ProcessQueue是否“drop”或舞, 如果為“drop”為true則直接返回(這個“drop”的設置在下面一節(jié)—“Consumer端的負載均衡機制”中會提到)荆姆;
(2)給ProcessQueue設置Pull消息的時間戳;
(3)做流量控制映凳,對于滿足下面條件的任何一種情況胆筒,稍后再發(fā)起Pull消息的請求;
條件1:正在消費的隊列中诈豌,未被消費的消息數和消息大小超過閥值(默認每個隊列消息數為1000個/消息存儲容量100MB)仆救;
條件2:如果是順序消費,正在消費的隊列中矫渔,消息的跨度超過閥值(默認2000)彤蔽;
(4)根據topic獲取訂閱關系—SubscriptionData送膳;
(5)構建Pull消息的回調對象—PullBack,這里從Broker端Pull消息的返回結果處理是通過異步回調(發(fā)送異步通信RPC請求),其中如果Broker端返回Pull消息成功,在回調方法中先填充至處理隊列—processQueue中(將Pull下來的消息袜啃,設置到ProcessQueue的msgTreeMap容器中)群发,然后通過消費消息的服務線程—consumeMessageService熟妓,將封裝好的ConsumeRequest提交至消費端消費線程池—consumeExecutor異步執(zhí)行處理(具體處理邏輯:通過業(yè)務應用系統(tǒng)在DefaultMQPushConsumer實例中注冊的消息監(jiān)聽器完成業(yè)務端的消息消費);
(6)從Consumer端內存中獲取commitOffsetValue;
(7)通過RocketMQ的Remoting通信層向Broker端發(fā)送Pull消息的RPC請求疤剑;
1.2 Broker端處理Pull消息請求的一般過程
這里先來說下對于一般情況下(即為所要Pull的消息在RocketMQ的Broker端已經是存在疑故,一般可以Pull到的情況)钦铁,Broker端處理這個Pull消息請求的主要過程牛曹。其時序圖(ps:圖中只是畫了大部分的流程,詳細細節(jié)還需要對照源碼看下)如下:
從上面的簡易時序圖中可以看到Broker端Pull消息的主要關鍵點如下:
(1)Pull消息的業(yè)務處理器—PullMessageProcessor的processRequest為處理拉取消息請求的入口不跟,在設置reponse返回結果中的opaque值后,就完成一些前置的校驗(Broker是否可讀史侣、Topic/ConsumerGroup是否存在、讀取隊列Id是否在Topic配置的隊列范圍數內)魏身;
(2)根據“ConsumerGroup”惊橱、“Topic”、“queueId”和“offset”這些參數來調用MessageStore實例的getMessage()方法來產嘗試讀取Broker端的消息箭昵;
(3)其中税朴,通過findConsumeQueue()方法,獲取邏輯消費隊列—ConsumeQueue家制;
(4)根據offset與邏輯消費隊列中的maxOffset正林、minOffset的比較,來設置狀態(tài)值status颤殴,同時計算出下次Pull消息的開始偏移量值—nextBeginOffset觅廓,然后通過MappedFile的方式獲取ConsumeQueue的Buffer映射結果值;
(5)根據算出來的offsetPy(物理偏移量值)和sizePy(消息的物理大泻)杈绸,從commitLog獲取對應消息的Buffer映射結果值,并填充至GetMessageResult返回對象矮瘟,并設置返回結果(狀態(tài)/下次其實偏移量/maxOffset/minOffset)后return瞳脓;
(6)根據isTransferMsgByHeap的設置情況(默認為true),選擇下面兩種方式之一來真正讀取GetMessageResult的消息內容并返回至Consumer端芥永;
方式1:使用JDK NIO的ByteBuffer篡殷,循環(huán)地讀取存有消息內容的messageBufferList至堆內內存中,返回byte[]字節(jié)數組埋涧,并設置到響應的body中;然后奇瘦,通過RPC通信組件—NettyRemotingServer發(fā)送響應至Consumer端棘催;
方式2:采用基于Zero-Copy的Netty組件的FileRegion,其包裝的“FileChannel.tranferTo”實現文件傳輸耳标,可以直接將文件緩沖區(qū)的數據發(fā)送至通信目標通道Channel中醇坝,避免了通過循環(huán)write方式導致的內存拷貝開銷,這種方式性能上更優(yōu);
(7)在PullMessageProcessor業(yè)務處理器的最后呼猪,提交并持久化消息消費的offset偏移量進度画畅;
1.3 Broker端對于Pull請求掛起處理的流程
說完了Pull消息請求的一般流程,下面主要看下Broker端的PullMessageProcessor業(yè)務處理器在RocketMQ中還沒有消息可以拉取情況下(即為:PULL_NOT_FOUND)的處理流程宋距,本節(jié)內容也是RocketMQ中長輪詢機制的關鍵轴踱。
長輪詢機制是對普通輪詢的一種優(yōu)化方案,它平衡了傳統(tǒng)Push/Pull模型的各自缺點谚赎,Server端如果當前沒有Client端請求拉取的相關數據會hold住這個請求淫僻,直到Server端存在相關的數據,或者等待超時時間后返回壶唤。在響應返回后雳灵,Client端又會再次發(fā)起下一次的長輪詢請求。RocketMQ的push模式正是采用了這種長輪詢機制的設計思路闸盔,如果在上面所述的第一次嘗試Pull消息失敗后(比如Broker端暫時沒有可以消費的消息)悯辙,先hold住并且掛起該請求(這里,設置返回響應response為null迎吵,此時不會向Consumer端發(fā)送任何響應的內容躲撰,即不會對響應結果進行處理),然后通過Broker端的后臺線程PullRequestHoldService重新嘗試和后臺線程ReputMessageService的二次處理钓觉。在Broker端茴肥,兩個后臺線程服務PullRequestHoldService和ReputMessageService是實現長輪詢機制的關鍵點。下面就來分別介紹這兩個服務線程:
(1)PullRequestHoldService:該服務線程會從pullRequestTable本地緩存變量中取PullRequest請求荡灾,檢查輪詢條件—“待拉取消息的偏移量是否小于消費隊列最大偏移量”是否成立瓤狐,如果條件成立則說明有新消息達到Broker端,則通過PullMessageProcessor的executeRequestWhenWakeup()方法重新嘗試發(fā)起Pull消息的RPC請求(此處批幌,每隔5S重試一次础锐,默認長輪詢整體的時間設置為30s);
(2)ReputMessageService:該服務線程會在Broker端不斷地從數據存儲對象—commitLog中解析數據并分發(fā)請求荧缘,隨后構建出ConsumeQueue(邏輯消費隊列)和IndexFile(消息索引文件)兩種類型的數據皆警。同時從本地緩存變量—pullRequestTable中,取出hold住的PullRequest請求并執(zhí)行二次處理(具體的做法是截粗,在PullMessageProcessor的executeRequestWhenWakeup()方法中信姓,通過業(yè)務線程池pullMessageExecutor,異步提交重新Pull消息的請求任務绸罗,即為重新調了一次PullMessageProcessor業(yè)務處理器的processRequest()方法意推,來實現Pull消息請求的二次處理)。這里珊蟀,ReputMessageService服務線程菊值,每處理一次,Thread.sleep(1),繼續(xù)下一次處理腻窒。
二昵宇、Consumer端的負載均衡機制
看了上面一節(jié)—“RocketMQ中長輪詢的Pull消息機制”后,大家可能會有這樣子一個疑問:在Consumer端pullMessageService線程作為消息的主動拉取者不斷地從阻塞隊列中獲取元素PullRequest儿子,那么這里的PullRequest是在哪兒由哪個線程放入至阻塞隊列中的呢瓦哎?本節(jié)內容將介紹“Consumer端的負載均衡機制”,同時解答上面的疑問典徊。
2.1 RocketMQ為何需要在Consumer端做負載均衡杭煎?
在RocketMQ中,Consumer端的兩種消費模式(Push/Pull)都是基于拉模式來Pull消息的卒落,而在Push模式中只是采用了長輪詢的方式而實現了準實時的自動消息拉取羡铲。在兩種基于拉模式的消費方式(Push/Pull)中,均需要Consumer端在知道從Broker端的哪一個消息隊列—MessageQueue中去Pull消息儡毕。因此也切,消息隊列的負載均衡處理(即Broker端中多個MessageQueue分配給同一個ConsumerGroup中的哪些Consumer消費),由Consumer端來主動完成更為合理腰湾。
2.2 Consumer端負載均衡的主要流程
1. Consumer端的心跳包發(fā)送
在Consumer啟動后雷恃,它就會通過定時任務不斷地向RocketMQ集群中的所有Broker實例發(fā)送心跳包(其中包含了,消息消費分組名稱费坊、訂閱關系集合倒槐、消息通信模式和客戶端id的值等信息)。Broker端在收到Consumer的心跳消息后附井,會將它維護在ConsumerManager的本地緩存變量—consumerTable讨越,同時并將封裝后的客戶端網絡通道信息保存在本地緩存變量—channelInfoTable中,為之后做Consumer端的負載均衡提供可以依據的元數據信息永毅。
2. Consumer端實現負載均衡的核心類—RebalanceImpl
在上一篇文章的"Consumer啟動流程"中已經介紹了在啟動MQClientInstance實例時候把跨,會完成負載均衡服務線程—RebalanceService的啟動(每隔20s執(zhí)行一次)。通過查看源碼可以發(fā)現沼死,RebalanceService線程的run()方法最終調用的是RebalanceImpl類的rebalanceByTopic()方法着逐,該方法是實現Consumer端負載均衡的核心關鍵。
這里意蛀,rebalanceByTopic()方法會根據消費者通信類型為“廣播模式”還是“集群模式”做不同的邏輯處理耸别。這里主要來看下集群模式下的主要處理流程:
(1)從rebalanceImpl實例的本地緩存變量—topicSubscribeInfoTable中,獲取該Topic主題下的消息消費隊列集合(mqSet)县钥;
(2)根據topic和consumerGroup為參數調用mQClientFactory.findConsumerIdList()方法向Broker端發(fā)送獲取該消費組下消費者Id列表的RPC通信請求(Broker端基于前面Consumer端上報的心跳包數據而構建的consumerTable做出響應返回太雨,業(yè)務請求碼:GET_CONSUMER_LIST_BY_GROUP);
(3)先對Topic下的消息消費隊列魁蒜、消費者Id排序,然后用消息隊列分配策略算法(默認為:消息隊列的平均分配算法),計算出待拉取的消息隊列兜看。
這里的平均分配算法锥咸,類似于分頁的算法,將所有MessageQueue排好序類似于記錄细移,將所有消費端Consumer排好序類似頁數搏予,并求出每一頁需要包含的平均size和每個頁面記錄的范圍range,最后遍歷整個range而計算出當前Consumer端應該分配到的記錄(這里即為:MessageQueue)弧轧。具體的算法代碼如下:
@Override
public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
List<String> cidAll) {
//省略代碼......
List<MessageQueue> result = new ArrayList<MessageQueue>();
//省略代碼......
int index = cidAll.indexOf(currentCID);
int mod = mqAll.size() % cidAll.size();
int averageSize =
mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size()
+ 1 : mqAll.size() / cidAll.size());
int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;
int range = Math.min(averageSize, mqAll.size() - startIndex);
for (int i = 0; i < range; i++) {
result.add(mqAll.get((startIndex + i) % mqAll.size()));
}
return result;
(4)然后雪侥,調用updateProcessQueueTableInRebalance()方法,具體的做法是精绎,先將分配到的消息隊列集合(mqSet)與processQueueTable做一個過濾比對速缨,具體的過濾比對方式如下圖:
這里可以分如下兩種情況來篩選過濾:
a.圖中processQueueTable標注的紅色部分,表示與分配到的消息隊列集合mqSet互不包含代乃。將這些隊列設置Dropped屬性為true旬牲,然后查看這些隊列是否可以移除出processQueueTable緩存變量,這里具體執(zhí)行removeUnnecessaryMessageQueue()方法搁吓,即每隔1s 查看是否可以獲取當前消費處理隊列的鎖原茅,拿到的話返回true。如果等待1s后堕仔,仍然拿不到當前消費處理隊列的鎖則返回false擂橘。如果返回true,則從processQueueTable緩存變量中移除對應的Entry摩骨;
b.圖中processQueueTable的綠色部分通贞,表示與分配到的消息隊列集合mqSet的交集。判斷該ProcessQueue是否已經過期了仿吞,在Pull模式的不用管滑频,如果是Push模式的,設置Dropped屬性為true唤冈,并且調用removeUnnecessaryMessageQueue()方法峡迷,像上面一樣嘗試移除Entry;
最后你虹,為過濾后的消息隊列集合(mqSet)中的每個MessageQueue創(chuàng)建一個ProcessQueue對象并存入RebalanceImpl的processQueueTable隊列中(其中調用RebalanceImpl實例的computePullFromWhere(MessageQueue mq)方法獲取該MessageQueue對象的下一個進度消費值offset绘搞,隨后填充至接下來要創(chuàng)建的pullRequest對象屬性中),并創(chuàng)建拉取請求對象—pullRequest添加到拉取列表—pullRequestList中傅物,最后執(zhí)行dispatchPullRequest()方法夯辖,將Pull消息的請求對象PullRequest依次放入PullMessageService服務線程的阻塞隊列pullRequestQueue中,待該服務線程取出后向Broker端發(fā)起Pull消息的請求董饰。其中蒿褂,可以重點對比下圆米,RebalancePushImpl和RebalancePullImpl兩個實現類的dispatchPullRequest()方法不同,RebalancePullImpl類里面的該方法為空啄栓,這樣子也就回答了上一篇中最后的那道思考題了娄帖。
三、總結
RocketMQ的消息消費(二)(push模式實現)篇幅就先分析到這里了昙楚。關于RocketMQ消息消費的內容比較多也比較復雜近速,需要讀者結合源碼并多次debug才能對其有一個較為深刻的理解。另外堪旧,對于消息消費部分的““消息ACK機制”削葱、“消費重試機制”等剩余內容將在后續(xù)的篇幅進行介紹和分析。限于筆者的才疏學淺淳梦,對本文內容可能還有理解不到位的地方析砸,如有闡述不合理之處還望留言一起探討。