本文主要介紹RocketMQ的多端口監(jiān)聽(tīng)機(jī)制呵曹,通過(guò)本文慰于,你可以了解到Broker端源碼中remotingServer和fastRemotingServer的區(qū)別钮科,以及客戶(hù)端配置中,vipChannelEnabled的作用婆赠。
1 多端口監(jiān)聽(tīng)
在RocketMQ中绵脯,可以通過(guò)broker.conf配置文件中指定listenPort配置項(xiàng)來(lái)指定Broker監(jiān)聽(tīng)客戶(hù)端請(qǐng)求的端口,如果不指定,默認(rèn)監(jiān)聽(tīng)10911端口桨嫁。
listenPort=10911
不過(guò)植兰,Broker啟動(dòng)時(shí),實(shí)際上會(huì)監(jiān)聽(tīng)3個(gè)端口:10909璃吧、10911楣导、10912,如下所示:
$?lsof?-iTCP?-nP?|?grep?LISTEN
java??1892656?tianshouzhi.robin???96u??IPv6?14889281??0t0??TCP?*:10912?(LISTEN)
java??1892656?tianshouzhi.robin??101u??IPv6?14889285??0t0??TCP?*:10911?(LISTEN)
java??1892656?tianshouzhi.robin??102u??IPv6?14889288??0t0??TCP?*:10909?(LISTEN)
而其他兩個(gè)端口是根據(jù)listenPort的值畜挨,動(dòng)態(tài)計(jì)算出來(lái)的筒繁。這三個(gè)端口由Broker內(nèi)部不同的組件使用,作用分別如下:
????remotingServer:監(jiān)聽(tīng)listenPort配置項(xiàng)指定的監(jiān)聽(tīng)端口巴元,默認(rèn)10911
????fastRemotingServer:監(jiān)聽(tīng)端口值listenPort-2,即默認(rèn)為10909
????HAService:監(jiān)聽(tīng)端口為值為listenPort+1逮刨,即10912呕缭,該端口用于Broker的主從同步
本文主要聚焦于remotingServer和fastRemotingServer的區(qū)別:
Broker端:remotingServer可以處理客戶(hù)端所有請(qǐng)求,如:生產(chǎn)者發(fā)送消息的請(qǐng)求修己,消費(fèi)者拉取消息的請(qǐng)求恢总。fastRemotingServer功能基本與remotingServer相同,唯一不同的是不可以處理消費(fèi)者拉取消息的請(qǐng)求睬愤。Broker在向NameServer注冊(cè)時(shí)片仿,只會(huì)上報(bào)remotingServer監(jiān)聽(tīng)的listenPort端口。
客戶(hù)端:默認(rèn)情況下尤辱,生產(chǎn)者發(fā)送消息是請(qǐng)求fastRemotingServer砂豌,我們也可以通過(guò)配置讓其請(qǐng)求remotingServer;消費(fèi)者拉取消息只能請(qǐng)求remotingServer光督。
下面通過(guò)源碼進(jìn)行驗(yàn)證Broker端構(gòu)建remotingServer和fastRemotingServer時(shí)的區(qū)別阳距,以及客戶(hù)端如何配置。
2 Broker端
在BrokerController內(nèi)部定義了remotingServer和fastRemotingServer兩個(gè)字段
private?RemotingServer?remotingServer;
private?RemotingServer?fastRemotingServer;
在初始化時(shí)可帽,在initiallize方法內(nèi)部會(huì)對(duì)這兩個(gè)字段的進(jìn)行初始化:
BrokerController#initialize
public?boolean?initialize()?throws?CloneNotSupportedException?{
????boolean?result?=?this.topicConfigManager.load();
????result?=?result?&&?this.consumerOffsetManager.load();
????result?=?result?&&?this.subscriptionGroupManager.load();
????result?=?result?&&?this.consumerFilterManager.load();
????if?(result)?{..}//加載message?store娄涩,略
????result?=?result?&&?this.messageStore.load();
????if?(result)?{
????????//1?remotingServer監(jiān)聽(tīng)listenPort端口,默認(rèn)10911
????????this.remotingServer?=?new?NettyRemotingServer(this.nettyServerConfig,?
??????????????????????????????????????????????????????this.clientHousekeepingService);
????????//2?fastRemotingServer監(jiān)聽(tīng)listenPort-2端口映跟,默認(rèn)10990
????????NettyServerConfig?fastConfig?=?(NettyServerConfig)?this.nettyServerConfig.clone();
????????fastConfig.setListenPort(nettyServerConfig.getListenPort()?-?2);
????????this.fastRemotingServer?=?new?NettyRemotingServer(fastConfig,?
????????????????????????????????????????????????????this.clientHousekeepingService);
????????//...啟動(dòng)異步線程池蓄拣,略
????????//3?注冊(cè)請(qǐng)求處理器
????????this.registerProcessor();
可以看到,這兩個(gè)字段實(shí)例化時(shí):remotingServer使用了nettyServerConfig配置努隙;而fastRemotingServer將配置克隆了一份球恤,然后只是修改了監(jiān)聽(tīng)的的端口號(hào),其他不變荸镊。
創(chuàng)建完之后remotingServer和fastRemotingServer咽斧,會(huì)調(diào)用registerProcessor注冊(cè)請(qǐng)求處理器堪置。fastRemotingServer與remotingServer注冊(cè)的請(qǐng)求處理器類(lèi)型幾乎完全相同,相關(guān)源碼如下紅色框所示:
org.apache.rocketmq.broker.BrokerController#registerProcessor
public?void?registerProcessor()?{
????/**
?????*?SendMessageProcessor
?????*/
????SendMessageProcessor?sendProcessor?=?new?SendMessageProcessor(this);
????sendProcessor.registerSendMessageHook(sendMessageHookList);
????sendProcessor.registerConsumeMessageHook(consumeMessageHookList);
????this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE,?sendProcessor,?this.sendMessageExecutor);
????this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2,?sendProcessor,?this.sendMessageExecutor);
????this.remotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE,?sendProcessor,this.sendMessageExecutor);
????this.remotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK,?sendProcessor,?this.sendMessageExecutor);
????this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE,?sendProcessor,?this.sendMessageExecutor);
????this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2,?sendProcessor,?this.sendMessageExecutor);
????this.fastRemotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE,?sendProcessor,?this.sendMessageExecutor);
????this.fastRemotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK,?sendProcessor,?this.sendMessageExecutor);
????/**
?????*?PullMessageProcessor张惹,注意這里只注冊(cè)到了到了remotingServer中舀锨,沒(méi)有注冊(cè)到fastRemotingServer
?????*/
????this.remotingServer.registerProcessor(RequestCode.PULL_MESSAGE,?this.pullMessageProcessor,?this.pullMessageExecutor);
????this.pullMessageProcessor.registerConsumeMessageHook(consumeMessageHookList);
????//...
可以看到,唯一不同的是宛逗,對(duì)于PullMessageProcessor坎匿,只在remotingServer中注冊(cè)了,并沒(méi)有在fastRemotingServer注冊(cè)雷激。意味著為fastRemotingServer不可以處理消費(fèi)者拉取消息的請(qǐng)求(還有很多其他的處理器類(lèi)型替蔬,這里并沒(méi)有完全列出)。
在明白了fastRemotingServer和remotingServer之后屎暇,下面從客戶(hù)端分析承桥,如何進(jìn)行選擇。
3 客戶(hù)端
客戶(hù)端的DefaultMQProducer和DefaultMQPushConsumer都繼承了ClientConfig類(lèi)根悼,這個(gè)類(lèi)中有一些公共的配置項(xiàng)凶异,其中包含一個(gè)布爾字段vipChannelEnabled。從字面意思看挤巡,其用于控制是否開(kāi)啟VIP通道唠帝,如果為true,生產(chǎn)者發(fā)送的消息會(huì)請(qǐng)求fastRemotingServer玄柏,否則請(qǐng)求remotingServer。
在RocketMQ 4.5.0及之前贴铜,vipChannelEnabled字段值默認(rèn)為true粪摘。在RocketMQ 4.5.1之后,修改為了false绍坝∨且猓可以通過(guò)JVM參數(shù) -Dcom.rocketmq.sendMessageWithVIPChannel=false,來(lái)修改這個(gè)默認(rèn)值轩褐。
org.apache.rocketmq.client.ClientConfig
public?class?ClientConfig?{
//是否啟用VIP通道
??private?boolean?vipChannelEnabled?=?Boolean.parseBoolean(
????????????????System.getProperty(SEND_MESSAGE_WITH_VIP_CHANNEL_PROPERTY,?
????????????????"true"));
...
??public?boolean?isVipChannelEnabled()?{
??????return?vipChannelEnabled;
??}
??public?void?setVipChannelEnabled(final?boolean?vipChannelEnabled)?{
??????this.vipChannelEnabled?=?vipChannelEnabled;
??}
...
}
生產(chǎn)者:
生產(chǎn)者在發(fā)送消息時(shí)椎咧,都會(huì)通過(guò)DefaultMQProducerImpl#sendKernelImpl方法,這個(gè)方法內(nèi)部會(huì)判斷是否開(kāi)啟VIP通道把介,如下圖紅色框:
private?SendResult?sendKernelImpl(final?Message?msg,
????final?MessageQueue?mq,
????final?CommunicationMode?communicationMode,
????final?SendCallback?sendCallback,
????final?TopicPublishInfo?topicPublishInfo,
????final?long?timeout)?throws?MQClientException?{
????long?beginStartTime?=?System.currentTimeMillis();
????String?brokerAddr?=?this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
????if?(null?==?brokerAddr)?{
????????tryToFindTopicPublishInfo(mq.getTopic());
????????brokerAddr?=?this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
????}
????SendMessageContext?context?=?null;
????//判斷是否開(kāi)啟了VIP通道
????if?(brokerAddr?!=?null)?{
????????brokerAddr?=?MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(),?
?????????????????????????????????????????????brokerAddr);
????//...
在開(kāi)啟VIP通道的情況下勤讽,會(huì)將請(qǐng)求的broker 端口地址-2,改為請(qǐng)求fastRemotingServer拗踢,如下所示:
org.apache.rocketmq.common.MixAll#brokerVIPChannel
public?static?String?brokerVIPChannel(final?boolean?isChange,?final?String?brokerAddr)?{
????if?(isChange)?{
????????String[]?ipAndPort?=?brokerAddr.split(":");
????????String?brokerAddrNew?=?ipAndPort[0]?+?":"?
???????????????????????????????+?(Integer.parseInt(ipAndPort[1])?-?2);
????????return?brokerAddrNew;
????}?else?{
????????return?brokerAddr;
????}
}
消費(fèi)者
消費(fèi)者拉取消息總是會(huì)調(diào)用remotingServer脚牍,因?yàn)镻ullMessageProcessor只在remotingServer中進(jìn)行了注冊(cè),fastRemotingServer無(wú)法處理這個(gè)請(qǐng)求巢墅,因此并不會(huì)修改端口诸狭,可參考PullAPIWrapper類(lèi)券膀。
關(guān)于其他請(qǐng)求:
Broker支持很多客戶(hù)端請(qǐng)求類(lèi)型,除了發(fā)送/拉取消息之外驯遇,還包括創(chuàng)建Topic芹彬、查詢(xún)/更新offset,發(fā)送心跳信息叉庐,查詢(xún)消費(fèi)者組ID列表等舒帮。具體請(qǐng)求哪個(gè)端口,主要是看有沒(méi)有調(diào)用MixAl#brokerVIPChannel方法修改端口眨唬。例如對(duì)于心跳請(qǐng)求会前,即使設(shè)置了brokerVIPChannel也不起作用,因?yàn)樾奶?qǐng)求之前不會(huì)修改端口號(hào)匾竿,因此總是請(qǐng)求remotingServer瓦宜。
免費(fèi)學(xué)習(xí)視頻歡迎關(guān)注云圖智聯(lián):https://e.yuntuzhilian.com/