RocketMQ 是阿里開(kāi)源的一款高性能、高吞吐量的消息中間件切心。
RocketMQ 架構(gòu)介紹
RocketMQ架構(gòu)分為4部分片吊,Producer、Consumer全谤、Nameserver爷贫、Broker漫萄,四部分均可集群部署。
Producer毕骡、Consumer為客戶端。Producer是消息發(fā)送端葵孤,Consumer是消費(fèi)消息端橱赠。開(kāi)發(fā)過(guò)程中,我們會(huì)對(duì)這一部分關(guān)注更多宰啦。
Nameserver饼拍、Broker為服務(wù)端。Namesever可以看作架構(gòu)的大腦漓柑,負(fù)責(zé)服務(wù)的發(fā)現(xiàn)和路由叨吮;Broker可以看作消息的中轉(zhuǎn)站茶鉴,Producer發(fā)送的消息會(huì)先到Broker,然后由Consumer消費(fèi)惭蹂。
RocketMQ 安裝
下面介紹如何在本地安裝RocketMQ割粮,并使用源碼中自帶的demo發(fā)送消息穆刻、消費(fèi)消息。
環(huán)境準(zhǔn)備:
- 64bit OS, Linux/Unix/Mac 平臺(tái)
- 64bit JDK 1.8+;
- Maven 3.2.x;
下載代碼:RocketMQ項(xiàng)目由github托管榜轿,我們將從github上下載rocketMQ的源碼
鏈接:RocketMQ github倉(cāng)庫(kù)
如果裝有g(shù)it朵锣,可以通過(guò)git checkout 代碼到本地诚些,如果git沒(méi)有也可以直接下載安裝包解壓得到文件皇型,文件目錄如下
安裝步驟如下:
構(gòu)建弃鸦,進(jìn)入文件目錄幢痘,通過(guò)maven構(gòu)建項(xiàng)目
mvn -Prelease-all -DskipTests clean install -U
稍等片刻,構(gòu)建成功
[INFO] ------------------------------------------------------------------------
[INFO] Reactor Summary:
[INFO]
[INFO] Apache RocketMQ 4.4.0 4.4.0 ........................ SUCCESS [ 3.836 s]
[INFO] rocketmq-logging 4.4.0 ............................. SUCCESS [ 2.971 s]
[INFO] rocketmq-remoting 4.4.0 ............................ SUCCESS [ 2.395 s]
[INFO] rocketmq-common 4.4.0 .............................. SUCCESS [ 3.386 s]
[INFO] rocketmq-client 4.4.0 .............................. SUCCESS [ 4.092 s]
[INFO] rocketmq-store 4.4.0 ............................... SUCCESS [ 2.340 s]
[INFO] rocketmq-srvutil 4.4.0 ............................. SUCCESS [ 0.589 s]
[INFO] rocketmq-filter 4.4.0 .............................. SUCCESS [ 1.187 s]
[INFO] rocketmq-acl 4.4.0 ................................. SUCCESS [ 0.862 s]
[INFO] rocketmq-broker 4.4.0 .............................. SUCCESS [ 2.778 s]
[INFO] rocketmq-tools 4.4.0 ............................... SUCCESS [ 1.686 s]
[INFO] rocketmq-namesrv 4.4.0 ............................. SUCCESS [ 0.986 s]
[INFO] rocketmq-logappender 4.4.0 ......................... SUCCESS [ 0.845 s]
[INFO] rocketmq-openmessaging 4.4.0 ....................... SUCCESS [ 0.742 s]
[INFO] rocketmq-example 4.4.0 ............................. SUCCESS [ 0.843 s]
[INFO] rocketmq-test 4.4.0 ................................ SUCCESS [ 1.221 s]
[INFO] rocketmq-distribution 4.4.0 4.4.0 .................. SUCCESS [ 6.102 s]
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 37.160 s
[INFO] Finished at: 2019-02-17T22:26:28+08:00
[INFO] ------------------------------------------------------------------------
進(jìn)入構(gòu)建后的輸出目錄,按順序依次啟動(dòng)Name Server
和Broker
cd distribution/target/apache-rocketmq
啟動(dòng)Name Server喊积,nohup表示不掛斷執(zhí)行玄妈,terminal退出時(shí)MQ的進(jìn)程不會(huì)退出
nohup sh bin/mqnamesrv &
查看Name Server 啟動(dòng)日志
tail -f ~/logs/rocketmqlogs/namesrv.log
啟動(dòng)成功
2019-02-23 22:38:09 INFO main - tls.client.keyPath = null
2019-02-23 22:38:09 INFO main - tls.client.keyPassword = null
2019-02-23 22:38:09 INFO main - tls.client.certPath = null
2019-02-23 22:38:09 INFO main - tls.client.authServer = false
2019-02-23 22:38:09 INFO main - tls.client.trustCertPath = null
2019-02-23 22:38:09 INFO main - Using OpenSSL provider
2019-02-23 22:38:10 INFO main - SSLContext created for server
2019-02-23 22:38:10 INFO NettyEventExecutor - NettyEventExecutor service started
2019-02-23 22:38:10 INFO FileWatchService - FileWatchService service started
2019-02-23 22:38:10 INFO main - The Name Server boot success. serializeType=JSON
啟動(dòng)Broker拟蜻,并指定注冊(cè)到Name Server的地址 127.0.0.1:9786
nohup sh bin/mqbroker -n 127.0.0.1:9876 &
查看Broker 啟動(dòng)日志
tail -f ~/logs/rocketmqlogs/broker.log
啟動(dòng)成功
2019-02-23 22:46:18 WARN main - Load default discard message hook service: DefaultTransactionalMessageCheckListener
2019-02-23 22:46:18 INFO main - The broker dose not enable acl
2019-02-23 22:46:18 INFO FileWatchService - FileWatchService service started
2019-02-23 22:46:18 INFO PullRequestHoldService - PullRequestHoldService service started
2019-02-23 22:46:18 INFO brokerOutApi_thread_1 - register broker to name server 127.0.0.1:9876 OK
2019-02-23 22:46:18 INFO main - Start transaction service!
2019-02-23 22:46:18 INFO main - The broker[zhangjianweideMacBook-Pro.local, 192.168.1.113:10911] boot success. serializeType=JSON and name server is 127.0.0.1:9876
2019-02-23 22:46:28 INFO BrokerControllerScheduledThread1 - dispatch behind commit log 0 bytes
2019-02-23 22:46:28 INFO BrokerControllerScheduledThread1 - Slave fall behind master: 415169 bytes
2019-02-23 22:46:28 INFO brokerOutApi_thread_2 - register broker to name server 127.0.0.1:9876 OK
至此瞭郑,RocketMQ的服務(wù)端已全部啟動(dòng)運(yùn)行了鸭你,下面我們將通過(guò)源碼中的示例驗(yàn)證袱巨,讓Producer發(fā)送消息,Consumer來(lái)消費(fèi)消息愉老。
運(yùn)行示例代碼
接下來(lái)我們就可以運(yùn)行示例代碼嫉入,簡(jiǎn)單驗(yàn)證一下服務(wù)端是否啟動(dòng)并開(kāi)始工作。
設(shè)置Name Server 地址到環(huán)境變量熬拒,Producer和Consumer工作時(shí)都會(huì)用到此地址
export NAMESRV_ADDR=127.0.0.1:9876
啟動(dòng)客戶端垫竞,驗(yàn)證發(fā)送與接收消息蛀序,producer和consumer的啟動(dòng)沒(méi)有先后順序徐裸。
啟動(dòng)Producer啸盏,Producer啟動(dòng)后將發(fā)送多條Topic為“TopicTest”的消息,然后自動(dòng)終止進(jìn)程
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
SendResult [sendStatus=SEND_OK, msgId=C0A8017107605E2DE80C765C360003E3, offsetMsgId=C0A8017100002A9F00000000000BCFA1, messageQueue=MessageQueue [topic=TopicTest, brokerName=zhangjianweideMacBook-Pro.local, queueId=0], queueOffset=998]
SendResult [sendStatus=SEND_OK, msgId=C0A8017107605E2DE80C765C360103E4, offsetMsgId=C0A8017100002A9F00000000000BD055, messageQueue=MessageQueue [topic=TopicTest, brokerName=zhangjianweideMacBook-Pro.local, queueId=1], queueOffset=1001]
SendResult [sendStatus=SEND_OK, msgId=C0A8017107605E2DE80C765C360203E5, offsetMsgId=C0A8017100002A9F00000000000BD109, messageQueue=MessageQueue [topic=TopicTest, brokerName=zhangjianweideMacBook-Pro.local, queueId=2], queueOffset=999]
SendResult [sendStatus=SEND_OK, msgId=C0A8017107605E2DE80C765C360303E6, offsetMsgId=C0A8017100002A9F00000000000BD1BD, messageQueue=MessageQueue [topic=TopicTest, brokerName=zhangjianweideMacBook-Pro.local, queueId=3], queueOffset=999]
SendResult [sendStatus=SEND_OK, msgId=C0A8017107605E2DE80C765C360403E7, offsetMsgId=C0A8017100002A9F00000000000BD271, messageQueue=MessageQueue [topic=TopicTest, brokerName=zhangjianweideMacBook-Pro.local, queueId=0], queueOffset=999]
......
啟動(dòng)Consumer,Consumer啟動(dòng)后將訂閱Topic名稱(chēng)為“TopicTest”消息粉怕,持續(xù)監(jiān)聽(tīng)贫贝,收到消息后進(jìn)行消費(fèi),將消息打印輸出
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
ConsumeMessageThread_1 Receive New Messages: [MessageExt [queueId=0, storeSize=180, queueOffset=983, sysFlag=0, bornTimestamp=1550936154570, bornHost=/192.168.1.113:56251, storeTimestamp=1550936154571, storeHost=/192.168.1.113:10911, msgId=C0A8017100002A9F00000000000BA571, commitLogOffset=763249, bodyCRC=1431313338, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1000, CONSUME_START_TIME=1550936205508, UNIQ_KEY=C0A8017107605E2DE80C765C35CA03A7, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 57, 51, 53], transactionId='null'}]]
ConsumeMessageThread_20 Receive New Messages: [MessageExt [queueId=2, storeSize=180, queueOffset=995, sysFlag=0, bornTimestamp=1550936154615, bornHost=/192.168.1.113:56251, storeTimestamp=1550936154616, storeHost=/192.168.1.113:10911, msgId=C0A8017100002A9F00000000000BC5C9, commitLogOffset=771529, bodyCRC=835257960, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1000, CONSUME_START_TIME=1550936205510, UNIQ_KEY=C0A8017107605E2DE80C765C35F703D5, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 57, 56, 49], transactionId='null'}]]
ConsumeMessageThread_18 Receive New Messages: [MessageExt [queueId=2, storeSize=180, queueOffset=994, sysFlag=0, bornTimestamp=1550936154613, bornHost=/192.168.1.113:56251, storeTimestamp=1550936154614, storeHost=/192.168.1.113:10911, msgId=C0A8017100002A9F00000000000BC2F9, commitLogOffset=770809, bodyCRC=1597161362, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1000, CONSUME_START_TIME=1550936205510, UNIQ_KEY=C0A8017107605E2DE80C765C35F503D1, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 57, 55, 55], transactionId='null'}]]
......
以上就是示例代碼的演示
關(guān)閉服務(wù)端:進(jìn)入安裝目錄中,依次關(guān)閉broker鸳劳、namesrv也搓,如果忘記關(guān)閉,下次啟動(dòng)時(shí)會(huì)提示端口沖突幔摸。
cd distribution/target/apache-rocketmq
sh bin/mqshutdown broker
sh bin/mqshutdown namesrv
小結(jié)
本文介紹了RocketMQ架構(gòu)的主要輪廓颤练,講解如何安裝環(huán)境、運(yùn)行示例代碼患雇。