在之前的《淺入淺出消息隊列》一文中,我們了解了消息隊列的作用鳄哭、優(yōu)缺點和使用場景锄俄,相信你對消息隊列已經(jīng)有了一個大致的概念奶赠,文末給自己埋的坑說日后會寫一篇實戰(zhàn)教程,正好現(xiàn)在實習(xí)結(jié)束了苇经,也許久沒有寫實戰(zhàn)教程了,于是這就來填坑了。
前置知識
閱讀本文前兼都,建議有一些前置知識,包括且不限于:
- 常見的 Linux 命令
- 消息隊列的相關(guān)知識
- Docker 的基本使用
- docker-compose 的基礎(chǔ)知識
- SpringBoot 的基本使用
那廢話不多說慎王,我們就開始吧。
本文的所涉及到的代碼可在微信公眾號「01 二進制」后臺回復(fù)「rocketmq」獲得咱旱。
為什么要以 RocketMQ 為例?
本文主要是為了通過實例的方式直觀的了解消息隊列诸典。那么問題來了赘阀,消息隊列那么多(ActiveMQ、RabbitMQ轰豆、Kafka),我為什么要選擇 RocketMQ 呢?這里我們不談原理宿刮,只說說體驗,僅是個人選擇磕潮,不喜勿噴。
- 背靠阿里膏潮,不看測評屋谭,純粹看他經(jīng)歷過多次雙十一的檢驗就已經(jīng)知道其性能是處于第一批次的悔耘。
- 作為一個 Java 程序員缓艳,如果選擇一個純 Java 編寫的軟件,后期閱讀其源碼難度也會小很多溪窒。(RabbitMQ 底層是 Erlang,kafka 底層是 Scala)
- 在阿里實習(xí)的時候一直都是使用 RocketMQ 的內(nèi)部版本,于我而言份汗,RocketMQ 更熟悉。
初識 RocketMQ
在使用消息隊列前,我們要知道消息隊列是什么均践,這一塊內(nèi)容參考之前的文章《淺入淺出消息隊列》,這里不再贅述。
本段節(jié)來講解 RocketMQ 所涉及到的相關(guān)概念斯辰,我們先來簡單看下官方給出的 RocketMQ 架構(gòu)圖
從上圖我們可以很直觀的看出,一個完整的 RocketMQ 架構(gòu)包含四個部分:NameServer剪况、Broker、Producer 和 Consumer。
- NameServer:主要用作注冊中心该贾,用于管理 Topic 信息和路由信息的管理
- Broker:負責(zé)存儲杨蛋、消息 tag 過濾和轉(zhuǎn)發(fā)。需將自身信息上報給注冊中心 NameServer
- Producer:生產(chǎn)者
- Consumer:消費者
從寄信的角度理解
上面的解釋可能難以理解理澎,我們從寄信這一實例來看以下四個部分所承擔的責(zé)任逞力。
- Producer 和 Consumer 不必多說,消息的生產(chǎn)者和消費者糠爬,生產(chǎn)者負責(zé)投遞消息,消費者負責(zé)接收消息执隧,是我們要編寫的應(yīng)用程序揩抡。可以理解為寄信人和收信人镀琉。
- Broker 負責(zé)消息存儲峦嗤,以 Topic(主題)為維度,以隊列的形式存儲消息屋摔∷干瑁可以理解為信箱,專門存儲信件钓试,收信人(Consumer)可以從這里獲取信件装黑。
- NameServer 負責(zé)對源數(shù)據(jù)進行管理,包括了對 Topic 和 Broker 的管理弓熏×堤罚可以理解為郵局,負責(zé)管理郵件的分發(fā)硝烂,維護信箱(Broker)的狀態(tài)箕别。
由上各部分角色的功能可知铜幽,我們需要先安裝啟動 NameServer滞谢,再啟動 Broker 即可搭建完 RocketMQ
安裝 RocketMQ
如果你的電腦上已經(jīng)配置好了 rocketmq 的相關(guān)環(huán)境串稀,可以跳過本章節(jié)。
從上面的介紹我們可以得知狮杨,在生產(chǎn)和消費消息之前母截,我們需要安裝好Broker 和 NameServer。
準備工作
為了部署方便橄教,我推薦使用 docker 搭建服務(wù)清寇。此外,由于 rocketmq 需要分別部署 broker 與 nameserver 护蝶,考慮到分開部署比較麻煩华烟,這里我將會使用 docker-compose廉侧。因此漆枚,你需要在你的宿主機中安裝好 docker 和 docker-compose。
此外瘫筐,我們還需要搭建一個 web 可視化控制臺堤魁,用于監(jiān)控 mq 服務(wù)狀態(tài)喂链,以及消息消費情況,這里使用 rocketmq-console妥泉,同樣該程序也將使用 docker 安裝椭微。
如果對 docker 不熟悉的話,可以先閱讀菜鳥教程的 docker 教程學(xué)習(xí) ??Docker 教程
安裝
安裝 Docker
Linux:
執(zhí)行以下命令
curl -fsSL https://get.docker.com | bash -s docker --mirror Aliyun
Mac:
執(zhí)行以下命令
brew cask install docker
Win:
下載對應(yīng)的安裝文件盲链,然后雙擊運行安裝蝇率。下載地址在:https://hub.docker.com/editions/community/docker-ce-desktop-windows
考慮到下載該文件需要科學(xué)上網(wǎng),你可以在微信公眾號「01 二進制」后臺回復(fù)「docker」獲取 docker 安裝包的下載鏈接刽沾。
如果你的 win10 系統(tǒng)可以使用 winget瓢剿,那就執(zhí)行以下命令。(win 終于也有自己的包管理工具了 ??)
winget install Docker.DockerDesktop
國內(nèi)從 DockerHub 拉取鏡像有時會遇到困難悠轩,此時可以配置鏡像加速器间狂。配置教程可參考 ??Docker 鏡像加速
安裝 RocketMQ 鏡像
rocketmq 的 docker 鏡像我們可以自己制作,官方文檔中有詳細介紹 ??apache/rocketmq-docker
為了方便起見火架,這里我們直接使用別人已經(jīng)制作好的鏡像鉴象,鏡像地址 ?? foxiswho/rocketmq
新建一個目錄用于存放相關(guān)腳本,然后在終端執(zhí)行下面的命令 ??
git clone https://github.com/foxiswho/docker-rocketmq.git
cd docker-rocketmq
cd rmq
chmod +x start.sh
./start.sh
在經(jīng)過一段時間的等待后何鸡,我們通過瀏覽器訪問localhost:8180
查看到以下頁面則說明安裝成功纺弊。
安裝腳本解析
通過腳本的方式一鍵安裝確實很方便,但如果只是安裝完成就萬事大吉了自然是不行的骡男,本著授人以漁的態(tài)度淆游,我們來看看安裝腳本里都有些啥:
start.sh
4-7 行在創(chuàng)建目錄,10-13 行在給剛才創(chuàng)建的目錄設(shè)置權(quán)限,至于原因我們之后再說犹菱。
我們看到 16 行使用 docker-compose 命令啟動了容器拾稳,并設(shè)置為了后臺自動啟動,因此我們來看一下這個 docker-compose.yml 文件腊脱。
docker-compose.yml
version: "3.5"
services:
rmqnamesrv:
image: foxiswho/rocketmq:4.7.0
container_name: rmqnamesrv
ports:
- 9876:9876
volumes:
- ./rmqs/logs:/opt/logs
- ./rmqs/store:/opt/store
environment:
JAVA_OPT_EXT: "-Duser.home=/opt -Xms512M -Xmx512M -Xmn128m"
command: ["sh", "mqnamesrv"]
networks:
rmq:
aliases:
- rmqnamesrv
rmqbroker:
image: foxiswho/rocketmq:4.7.0
container_name: rmqbroker
ports:
- 10909:10909
- 10911:10911
volumes:
- ./rmq/logs:/opt/logs
- ./rmq/store:/opt/store
- ./rmq/brokerconf/broker.conf:/etc/rocketmq/broker.conf
environment:
JAVA_OPT_EXT: "-Duser.home=/opt -Xms512M -Xmx512M -Xmn128m"
command:
[
"sh",
"mqbroker",
"-c",
"/etc/rocketmq/broker.conf",
"-n",
"rmqnamesrv:9876",
"autoCreateTopicEnable=true",
]
depends_on:
- rmqnamesrv
networks:
rmq:
aliases:
- rmqbroker
rmqconsole:
image: styletang/rocketmq-console-ng
container_name: rmqconsole
ports:
- 8180:8080
environment:
JAVA_OPTS: "-Drocketmq.namesrv.addr=rmqnamesrv:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false"
depends_on:
- rmqnamesrv
networks:
rmq:
aliases:
- rmqconsole
networks:
rmq:
name: rmq
driver: bridge
我們創(chuàng)建了三個服務(wù)访得,這三個服務(wù)的名字分別是 rmqnamesrv、rmqbroker 和 rmqconsole陕凹,分別對應(yīng)我們之前所說的 nameserver悍抑、broker 和可視化控制臺。并且對不同的服務(wù)做了不同的端口映射杜耙,同時將本地指定的文件目錄掛載到 docker 容器中搜骡,并以網(wǎng)橋(bridge)的形式進行網(wǎng)絡(luò)連接。
以rmqnamesrv
為例佑女,其基礎(chǔ)鏡像為foxiswho/rocketmq:4.7.0
记靡,創(chuàng)建的容器名為rmqnamesrv
,并將其內(nèi)部的 9876 端口映射到宿主機的 9876 端口珊豹,并將本地的./rmqs/logs
文件掛載到 docker 容器的/opt/logs
目錄中簸呈。
rmqnamesrv:
image: foxiswho/rocketmq:4.7.0
container_name: rmqnamesrv
ports:
- 9876:9876
volumes:
- ./rmqs/logs:/opt/logs
- ./rmqs/store:/opt/store
如果對于 docker-compose 不熟悉的讀者,可以先參考相關(guān)的教程學(xué)習(xí)一下 ??Docker Compose
SpringBoot 整合 RocketMQ 小實例
在完成了相對復(fù)雜的安裝店茶、配置后蜕便,我們終于可以實現(xiàn)一個小的 demo 來打通整個流程了。
創(chuàng)建消息主題和訂閱組
使用 RocketMQ 進行發(fā)消息時贩幻,必須要指定 topic轿腺,對于 topic 的設(shè)置有一個開關(guān)autoCreateTopicEnable
,一般在開發(fā)測試環(huán)境中會使用默認設(shè)置autoCreateTopicEnable = true
丛楚,但是這樣就會導(dǎo)致 topic 的設(shè)置不容易規(guī)范管理族壳,沒有統(tǒng)一的審核等等,所以在正式環(huán)境中會在 Broker 啟動時設(shè)置參數(shù)autoCreateTopicEnable = false
趣些。這樣當需要增加 topic 時就需要在 web 管理界面上添加即可仿荆。
在 web 界面添加 topic 的方式如下:
同理,在接受消息時坏平,我們同樣需要對消息訂閱組進行配置拢操,對于消息的訂閱設(shè)置有一個開關(guān)autoCreateSubscriptionGroup
,通常情況下舶替,在生產(chǎn)環(huán)境下令境,我們需要設(shè)置為autoCreateSubscriptionGroup=false
,這就要求了管理者必須去 web 管理界面上創(chuàng)建訂閱組才可以收到消息顾瞪。
在 web 界面添加訂閱組的方式類似舔庶,如下圖所示:
如果只是測試環(huán)境抛蚁,我們可以在配置文件中將這兩個開關(guān)打開,配置文件在
rmq/rmq/brokerconf
目錄下
編寫代碼
apache 官方已經(jīng)提供了 rocketmq 對應(yīng)的 springboot starter惕橙,這極大的簡化了我們所需要做的配置工作瞧甩,因此我們要做的就是先新建一個 springboot 項目,然后按照下面的方式著手實現(xiàn)吕漂。
導(dǎo)入依賴
首先先在 pom.xml 中導(dǎo)入 apache 官方提供的 starter
<!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-spring-boot-starter -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.1.0</version>
</dependency>
配置 application.yml
依賴導(dǎo)入后亲配,我們需要在 application.yml 配置一個 name-server 地址尘应,具體值看你的機器惶凝。
rocketmq:
name-server: localhost:9876
producer:
group: myGroup
創(chuàng)建一個生產(chǎn)者類
生產(chǎn)者發(fā)送消息:
@RestController
public class RocketController {
@Autowired
private RocketMQTemplate rocketMQTemplate;
// 發(fā)送給Broker,默認會自動創(chuàng)建topic犬钢,topic和tag用冒號分隔
@GetMapping("/rocket/send")
public String rocketSend() {
LocalDateTime currentTime = LocalDateTime.now();
rocketMQTemplate.convertAndSend("rocket-topic-2", currentTime.toString());
return currentTime.toString();
}
// 延時消息苍鲜,RocketMQ支持這幾個級別的延時消息,不能自定義時長
// 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
@GetMapping("/rocket/delayMsg/send")
public String rocketDelayMsgSend() {
LocalDateTime currentTime = LocalDateTime.now();
rocketMQTemplate.syncSend("rocket-topic-2:tag-2", MessageBuilder.withPayload(currentTime.toString()).build(), 2000, 3);
return currentTime.toString();
}
}
創(chuàng)建一個消費者
消費者監(jiān)聽消息:
@Component
@Slf4j
public class RokcetServiceListener {
@Service
@RocketMQMessageListener(consumerGroup = "consumer-group-1", topic = "rocket-topic-2")
public class Consumer1 implements RocketMQListener<String> {
@Override
public void onMessage(String s) {
log.info("consumer1 rocket收到消息:{}", s);
}
}
// RocketMQ支持兩種消費方式玷犹,集器消費和廣播消費
@Service
@RocketMQMessageListener(consumerGroup = "consumer-group-2", topic = "rocket-topic-2",
selectorExpression = "tag2", messageModel = MessageModel.BROADCASTING)
public class Consumer2 implements RocketMQListener<String> {
@Override
public void onMessage(String s) {
log.info("consumer2 rocket收到消息:{}", s);
}
}
}
測試
我們在瀏覽器中訪問localhost:8080/rocket/send
混滔,即可看到返回的時間戳。
同時在控制臺也可以看到消費者已經(jīng)獲取到這條信息了
同樣的歹颓,我們也可以在可視化控制臺查看到相應(yīng)的消息
我們同樣可以在可視化控制臺查看消費者和生產(chǎn)者對于消息的生產(chǎn)與消費的情況坯屿,這些就留給讀者自己探索了。至此巍扛,一個完整的利用 Docker 安裝 RocketMQ 并結(jié)合 SpringBoot 使用的實例就結(jié)束了领跛。
問題
問題 1:No route info of this topic: xxxxxx
通過翻譯我們可以知道,這個錯誤產(chǎn)生的原因是因為消息隊列中并未產(chǎn)生相對應(yīng)的topic撤奸,所以我們要做的應(yīng)該是去控制臺新建一個 topic
問題 2:連接異常
如果出現(xiàn)類似下述這種連接異常的錯誤
com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to <172.0.0.120:10909> failed
可能的原因是你并沒有將項目放至 docker 容器中吠昭,因此你的項目代碼不能直接與 rocketmq 容器訪問,因此我們需要將broker.conf
中的 #brokerIP1=xxxxx
前面#
號去掉胧瓜,并且把后面的IP地址
改成你的rocketmq
容器宿主機IP地址
矢棚,配置文件在 rmq/rmq/brokerconf
目錄下。
最后
為了填坑府喳,我選擇了 rocketmq 作為實例講解的對象蒲肋,并在第一節(jié)闡述了我為什么要使用 RocketMQ 的原因,之后解釋了 RocketMQ 中幾個重要的概念钝满,然后利用 docker 快速的部署安裝了一個 rocketmq 的單機實例兜粘,并分析了安裝腳本。最后我們通過 springboot 這一目前主流的 web 框架實現(xiàn)了一個生產(chǎn)者與消費者的實例舱沧,并說明了可能會遇到的問題及解決方案妹沙。
以上就是本文的全部內(nèi)容了,如果你覺得對你有所幫助熟吏,不放關(guān)注點贊支持一波距糖,你們的支持是我更新的最大動力玄窝。