Rocketmq源碼分析01:搭建源碼調(diào)試環(huán)境
轉(zhuǎn)載地址:https://mp.weixin.qq.com/s/waDzMr4rOaC_NlxSqF0YCg
1. 基本架構(gòu)
RocketMQ
架構(gòu)上主要分為四部分搏嗡,如下圖所示:
Producer
:消息發(fā)布的角色惜姐,支持分布式集群方式部署较木。Producer
通過MQ
的負(fù)載均衡模塊選擇相應(yīng)的Broker
集群隊(duì)列進(jìn)行消息投遞武翎,投遞的過程支持快速失敗并且低延遲饺藤。Consumer
:消息消費(fèi)的角色,支持分布式集群方式部署诊沪。支持以push
推被啼,pull
拉兩種模式對(duì)消息進(jìn)行消費(fèi)。同時(shí)也支持集群方式和廣播方式的消費(fèi)猪勇,它提供實(shí)時(shí)消息訂閱機(jī)制设褐,可以滿足大多數(shù)用戶的需求。-
NameServer
:NameServer
是一個(gè)非常簡單的Topic
路由注冊(cè)中心泣刹,其角色類似Dubbo
中的zookeeper
助析,支持Broker
的動(dòng)態(tài)注冊(cè)與發(fā)現(xiàn)。主要包括兩個(gè)功能:NameServer
通常也是集群的方式部署椅您,各實(shí)例間相互不進(jìn)行信息通訊外冀。Broker
是向每一臺(tái)NameServer
注冊(cè)自己的路由信息,所以每一個(gè)NameServer
實(shí)例上面都保存一份完整的路由信息掀泳。當(dāng)某個(gè)NameServer
因某種原因下線了雪隧,Broker
仍然可以向其它NameServer
同步其路由信息,Producer
,Consumer
仍然可以動(dòng)態(tài)感知Broker
的路由的信息开伏。 -
Broker
管理膀跌,NameServer
接受Broker
集群的注冊(cè)信息并且保存下來作為路由信息的基本數(shù)據(jù)遭商。然后提供心跳檢測機(jī)制固灵,檢查Broker
是否還存活; - 路由信息管理劫流,每個(gè)
NameServer
將保存關(guān)于Broker
集群的整個(gè)路由信息和用于客戶端查詢的隊(duì)列信息巫玻。然后Producer
和Conumser
通過NameServer
就可以知道整個(gè)Broker
集群的路由信息丛忆,從而進(jìn)行消息的投遞和消費(fèi)。
-
BrokerServer
:Broker
主要負(fù)責(zé)消息的存儲(chǔ)仍秤、投遞和查詢以及服務(wù)高可用保證熄诡,為了實(shí)現(xiàn)這些功能,Broker
包含了以下幾個(gè)重要子模塊:-
Client Manager
:負(fù)責(zé)管理客戶端(Producer
/Consumer
)和維護(hù)Consumer
的Topic
訂閱信息 -
Store Service
:提供方便簡單的API接口處理消息存儲(chǔ)到物理硬盤和查詢功能诗力。 -
HA Service
:高可用服務(wù)凰浮,提供Master Broker
和Slave Broker
之間的數(shù)據(jù)同步功能。 -
Index Service
:根據(jù)特定的Message key
對(duì)投遞到Broker
的消息進(jìn)行索引服務(wù)苇本,以提供消息的快速查詢袜茧。 -
Remoting Module
:整個(gè)Broker
的實(shí)體,負(fù)責(zé)處理來自clients
端的請(qǐng)求瓣窄。
-
2. 獲取源碼
rocketMq項(xiàng)目的github
倉庫為https://github.com/apache/rocketmq.git笛厦,由于網(wǎng)絡(luò)原因,我們并不會(huì)直接使用github
倉庫俺夕,而是將其導(dǎo)入到gitee
上裳凸,只需在gitee
創(chuàng)建新倉庫時(shí),選擇導(dǎo)入已有倉庫即可:
導(dǎo)入到gitee
后劝贸,就可以進(jìn)行checkout
了姨谷,本文對(duì)應(yīng)的gitee倉庫為https://gitee.com/funcy/rocketmq.git。
checkout
源碼到本地后映九,默認(rèn)是master
分支菠秒,本人習(xí)慣基于tag
創(chuàng)建自己的分支,然后在自己的分支上進(jìn)行分析氯迂,rocketMq
的tag
如下:
最新版本是4.8.0
践叠,我們將基于此tag創(chuàng)建新分支,使用的命令如下:
# 切換到 rocketmq-all-4.8.0
git checkout rocketmq-all-4.8.0
# 基于 rocketmq-all-4.8.0 創(chuàng)建自己的分析嚼蚀,名稱為 rocketmq-all-4.8.0-LEARN
git checkout -b rocketmq-all-4.8.0-LEARN
# 將 rocketmq-all-4.8.0-LEARN 分支推送到遠(yuǎn)程倉庫
git push -u origin rocketmq-all-4.8.0-LEARN
接下來禁灼,我們所有的操作都是在rocketmq-all-4.8.0-LEARN
分支上進(jìn)行了。
3. 本地啟動(dòng)
拿到代碼后轿曙,我們就開始進(jìn)行本地啟動(dòng)了弄捕,沒錯(cuò),就是在idea中進(jìn)行啟動(dòng)导帝。
3.1 復(fù)制conf
目錄
在啟動(dòng)項(xiàng)目前守谓,我們需要進(jìn)行一些配置,rocketMq
項(xiàng)目的配置文件位于rocketmq/distribution
模塊下的conf
目錄中您单,直接整個(gè)復(fù)制到rocketmq
目錄下:
也不需要改動(dòng)斋荞,復(fù)制出來就行了,這些配置的內(nèi)容后面分析源碼時(shí)再講解吧虐秦。
3.2 啟動(dòng)nameServer
nameServer
的主類為org.apache.rocketmq.namesrv.NamesrvStartup
:
如果我們直接運(yùn)行main()
方法平酿,會(huì)報(bào)錯(cuò):
報(bào)錯(cuò)信息已經(jīng)很明確了凤优,需要我們配置ROCKETMQ_HOME
目錄,我們?cè)?code>idea中進(jìn)行配置即可:
打開配置界面:
填寫ROCKETMQ_HOME
配置:
這里我填寫的是ROCKETMQ_HOME=/Users/chengyan/IdeaProjects/myproject/rocketmq
蜈彼,這個(gè)ROCKETMQ_HOME
路徑就是conf
文件夾所在的目錄筑辨。
填寫好后,就可以啟動(dòng)了:
3.3 啟動(dòng)broker
broker
的主類為org.apache.rocketmq.broker.BrokerStartup
幸逆,啟動(dòng)方式與nameServer
很相似棍辕,啟動(dòng)前也要配置ROCKETMQ_HOME
路徑:
Idea 老版本配置
相比于nameServer
,這里多配置了啟動(dòng)參數(shù):
-n localhost:9876 autoCreateTopicEnable=true
這個(gè)啟動(dòng)參數(shù)是指定nameServer
的地址还绘,以及開啟自動(dòng)創(chuàng)建topic
的功能痢毒。
配置完成之后就可以啟動(dòng)了:
3.4 啟動(dòng)管理后臺(tái)
rocketMq
的管理后臺(tái)在另一個(gè)倉庫https://github.com/apache/rocketmq-externals,除了后臺(tái)蚕甥,這個(gè)倉庫還包含了許多的其他模塊:
我們并不需要分析這個(gè)項(xiàng)目哪替,源碼本可以不必下載,但我在找這個(gè)項(xiàng)目的release
版本時(shí)菇怀,發(fā)現(xiàn)并沒有提供已編譯好的jar包凭舶,需要自己構(gòu)建代碼,因此我就再次下載了這個(gè)代碼源碼爱沟。當(dāng)然帅霜,由于網(wǎng)絡(luò)的原因,這個(gè)項(xiàng)目的源碼也被我導(dǎo)入到了gitee
上呼伸,地址為https://gitee.com/funcy/rocketmq-externals.git.
這個(gè)項(xiàng)目的代碼我們并不分析身冀,因此直接在master
分支上操作即可,
管理后臺(tái)項(xiàng)目為rocketmq-console
括享,主類為org.apache.rocketmq.console.App
:
在啟動(dòng)前搂根,我們需要修改下application.properties
的配置,找到rocketmq.config.namesrvAddr
配置铃辖,添加nameServer
的ip與端口剩愧,這里我們連接的是本地應(yīng)用,直接填寫localhost:9876
:
...
rocketmq.config.namesrvAddr=localhost:9876
...
啟動(dòng)娇斩,結(jié)果如下:
訪問http://localhost:8080
仁卷,結(jié)果如下:
可以看到broker
已經(jīng)出現(xiàn)在cluster
列表中了,這就表明啟動(dòng)成功了犬第。
4. 收發(fā)消息測試
rocketMq
項(xiàng)目的example
模塊下有大量的測試示例锦积,我們選擇其一進(jìn)行消息收發(fā)測試。
4.1 啟動(dòng)Consumer
我們先找到org.apache.rocketmq.example.simple.PushConsumer
歉嗓,代碼如下:
public class PushConsumer {
public static void main(String[] args)
throws InterruptedException, MQClientException {
String nameServer = "localhost:9876";
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");
consumer.setNamesrvAddr(nameServer);
consumer.subscribe("TopicTest", "*");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//wrong time format 2017_0422_221800
consumer.setConsumeTimestamp("20181109221800");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n",
Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
這個(gè)Consumer
監(jiān)聽的topic
是TopicTest
丰介,后面我們就會(huì)往這個(gè)topic
發(fā)送消息。另外,需要注意nameServer
的配置基矮,我們是在本地啟動(dòng)的nameServer
,因此這里配置的是localhost:9876
冠场。
運(yùn)行main()
方法家浇,結(jié)果如下:
4.2 啟動(dòng)Producer
我們找到 org.apache.rocketmq.example.simple.Producer
類,代碼如下:
public class Producer {
public static void main(String[] args)
throws MQClientException, InterruptedException {
String nameServer = "localhost:9876";
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
producer.setNamesrvAddr(nameServer);
producer.start();
for (int i = 0; i < 10; i++)
try {
{
Message msg = new Message("TopicTest",
"TagA",
"OrderID188",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
} catch (Exception e) {
e.printStackTrace();
}
producer.shutdown();
}
}
同樣地碴裙,這里使用的是的nameServer
地址是localhost:9876
钢悲,topic
是TopicTest
,運(yùn)行舔株,結(jié)果如下:
再回過頭看看PushConsumer
的控制臺(tái):
可以看到莺琳,Producer
發(fā)送消息成功了,PushConsumer
也成功獲取到消息了载慈。
4.3 異常分析
如圖所示:
如果出現(xiàn)異常:
org.apache.rocketmq.client.exception.MQClientException:
No route info of this topic: TopicTest
這表明當(dāng)前broker
中沒有TopicTest
的topic
惭等,這時(shí)我們可以手動(dòng)創(chuàng)建topic
,也可以在啟動(dòng)時(shí)指定autoCreateTopicEnable=true
.
如果是按上面步驟進(jìn)行的办铡,請(qǐng)確認(rèn)下org.apache.rocketmq.broker.BrokerStartup
是否配置啟動(dòng)參數(shù)
-n localhost:9876 autoCreateTopicEnable=true
配置方式就按3.3節(jié)
的方式配置就行了辞做。
5. 總結(jié)
本文主要介紹了rocketMq
的基本架構(gòu),通過源碼展示了rocketMq
的啟動(dòng)方式寡具,最后通過rocketMq
項(xiàng)目下example
模塊中的測試代碼展示了消息的收發(fā)過程秤茅。
總的來說,本文還是在準(zhǔn)備源碼分析的環(huán)境童叠,下篇文章開始框喳,我們就正式開始rocketMq
的源碼分析了。