一涤躲、RocketMQ的優(yōu)勢(shì)?
- 底層是Java實(shí)現(xiàn)的虱咧,于閱讀源碼熊榛、了解實(shí)現(xiàn)有利(RabbitMQ 底層是 Erlang,kafka 底層是 Scala)
- 能夠保證嚴(yán)格的消息順序
- 提供了豐富的消息拉取模式
- 高效的訂閱者水平擴(kuò)展能力
- 實(shí)時(shí)的消息訂閱機(jī)制
- 億級(jí)的消息堆積能力
二腕巡、整體流程
官方給出的 RocketMQ 架構(gòu)圖
- 啟動(dòng) Namesrv玄坦,Namesrv起來(lái)后監(jiān)聽(tīng)端口,等待Broker绘沉、Producer煎楣、Consumer連上來(lái),相當(dāng)于一個(gè)路由控制中心
- Broker啟動(dòng)车伞,跟所有的Namesrv保持長(zhǎng)連接择懂,定時(shí)發(fā)送心跳包(心跳包中,包含當(dāng)前Broker信息-IP和端口等另玖,以及存儲(chǔ)所有Topic信息困曙,注冊(cè)成功后,Namesrv集群中就有Topic跟Broker 的映射關(guān)系)
- 收發(fā)消息前谦去,先創(chuàng)建Topic慷丽。創(chuàng)建Topic時(shí),需要指定該Topic要存儲(chǔ)在哪些Broker上哪轿。也可以在發(fā)送消息時(shí)自動(dòng)創(chuàng)建Topic盈魁。
- Producer 發(fā)送消息。(啟東時(shí)窃诉,先跟Namesrv集群中的其中一臺(tái)建立長(zhǎng)連接杨耙,并從Namesrv中獲取當(dāng)前發(fā)送的Topic存在哪些Broker上,然后跟對(duì)應(yīng)的Broker建立長(zhǎng)連接飘痛,直接向Broker發(fā)消息珊膜。)
- Consumer消費(fèi)消息。(Consumer跟Producer類似宣脉,跟其中一臺(tái)Namesrv建立長(zhǎng)連接车柠,獲取當(dāng)前訂閱Topic存在哪些Broker上,然后直接跟Broker建立連接通道塑猖,開(kāi)始消費(fèi)消息竹祷。)
三、Docker搭建RocketMQ
rocketmq的docker鏡像可以自己制作羊苟,官方文檔中有詳細(xì)介紹:rocketmq-docker
我找到了全網(wǎng)最快捷的搭建方式塑陵,使用foxiswho的鏡像:foxiswho/rocketmq
在自己新建的rocketmq目錄下打開(kāi)終端,執(zhí)行以下命令:
| git clone https://github.com/foxiswho/docker-rocketmq.git
| cd docker-rocketmq
| cd rmq
| chmod +x start.sh
| ./start.sh
控制臺(tái)會(huì)輸出rocketmq三臺(tái)容器的狀態(tài)
RocketMQ Docker 容器狀態(tài)
此時(shí)我們通過(guò)瀏覽器訪問(wèn)localhost:8180查看到以下頁(yè)面則說(shuō)明安裝成功蜡励。
RocketMQ 控制臺(tái)
四令花、SpringBoot 整合 RocketMQ Demo
1. 創(chuàng)建一個(gè)SpringBoot項(xiàng)目阻桅,
使用IDEA-File-New Project-Spring Initializr,可以很輕松的創(chuàng)建出一個(gè)簡(jiǎn)單的Web工程兼都。
2. 引入RocketMQ依賴
<dependencies>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.1.0</version>
</dependency>
</dependencies>
3. 配置 application.yml
# rocketmq 配置項(xiàng)嫂沉,對(duì)應(yīng) RocketMQProperties 配置類
rocketmq:
name-server: 127.0.0.1:9876 # RocketMQ Namesrv
# Producer 配置項(xiàng)
producer:
group: demo-producer-group # 生產(chǎn)者分組
send-message-timeout: 3000 # 發(fā)送消息超時(shí)時(shí)間,單位:毫秒扮碧。默認(rèn)為 3000 趟章。
compress-message-body-threshold: 4096 # 消息壓縮閥值,當(dāng)消息體的大小超過(guò)該閥值后芬萍,進(jìn)行消息壓縮尤揣。默認(rèn)為 4 * 1024B
max-message-size: 4194304 # 消息體的最大允許大小搔啊。柬祠。默認(rèn)為 4 * 1024 * 1024B
retry-times-when-send-failed: 2 # 同步發(fā)送消息時(shí),失敗重試次數(shù)负芋。默認(rèn)為 2 次漫蛔。
retry-times-when-send-async-failed: 2 # 異步發(fā)送消息時(shí),失敗重試次數(shù)旧蛾。默認(rèn)為 2 次莽龟。
retry-next-server: false # 發(fā)送消息給 Broker 時(shí),如果發(fā)送失敗锨天,是否重試另外一臺(tái) Broker 毯盈。默認(rèn)為 false
access-key: # Access Key ,可閱讀 https://github.com/apache/rocketmq/blob/master/docs/cn/acl/user_guide.md 文檔
secret-key: # Secret Key
enable-msg-trace: true # 是否開(kāi)啟消息軌跡功能病袄。默認(rèn)為 true 開(kāi)啟搂赋。可閱讀 https://github.com/apache/rocketmq/blob/master/docs/cn/msg_trace/user_guide.md 文檔
customized-trace-topic: RMQ_SYS_TRACE_TOPIC # 自定義消息軌跡的 Topic 益缠。默認(rèn)為 RMQ_SYS_TRACE_TOPIC 脑奠。
# Consumer 配置項(xiàng)
consumer:
listeners: # 配置某個(gè)消費(fèi)分組,是否監(jiān)聽(tīng)指定 Topic 幅慌。結(jié)構(gòu)為 Map<消費(fèi)者分組, <Topic, Boolean>> 宋欺。默認(rèn)情況下,不配置表示監(jiān)聽(tīng)胰伍。
test-consumer-group:
topic1: false # 關(guān)閉 test-consumer-group 對(duì) topic1 的監(jiān)聽(tīng)消費(fèi)
4. 創(chuàng)建一個(gè)生產(chǎn)者類
生產(chǎn)者發(fā)送消息
@RestController
public class RocketController {
@Autowired
private RocketMQTemplate rocketMQTemplate;
// 延時(shí)消息齿诞,RocketMQ支持這幾個(gè)級(jí)別的延時(shí)消息,自定義需要修改broker配置文件
// 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
@GetMapping("/rocket/delayMsg/send")
public String rocketDelayMsgSend() {
LocalDateTime currentDateTime = LocalDateTime.now();
rocketMQTemplate.syncSend("rocket-topic-2:tag-2", MessageBuilder.withPayload(currentDateTime.toString()).build(), 2000, 3);
return currentDateTime.toString();
}
}
5. 創(chuàng)建一個(gè)消費(fèi)者
消費(fèi)者監(jiān)聽(tīng)消息
@Slf4j
@Component
public class RokcetServiceListener {
@Service
@RocketMQMessageListener(consumerGroup = "consumer-group-1", topic = "rocket-topic-2")
public class Consumer1 implements RocketMQListener<String> {
@Override
public void onMessage(String s) {
log.info("consumer1 rocket收到消息:{}", s);
}
}
//MessageModel.BROADCASTING 廣播消息模式
@Service
@RocketMQMessageListener(consumerGroup = "consumer-group-2", topic = "rocket-topic-2", selectorExpression = "tag-2", messageModel = MessageModel.BROADCASTING)
public class Consumer2 implements RocketMQListener<String> {
@Override
public void onMessage(String s) {
log.info("consumer2 rocket收到消息:{}", s);
}
}
}
6.測(cè)試
我們?cè)跒g覽器中訪問(wèn)localhost:8080/rocket/send骂租,即可看到返回的時(shí)間戳
瀏覽器返回
同時(shí)在控制臺(tái)可以看到兩個(gè)消費(fèi)者都獲取到了這條消息(延時(shí)10s)
Consumer1和Consumer2都獲取到了消息
在rocketMq-console也可以看到這條消息
rocketMq-console控制臺(tái)
網(wǎng)絡(luò)問(wèn)題
- org.apache.rocketmq.remoting.exception.RemotingConnectException:connect to failed
本地調(diào)試項(xiàng)目時(shí)祷杈,不能直接訪問(wèn) docker rocketmq 容器,因此我們需要將修改broker.conf配置菩咨,將/rmq/rmq/brokerconf目錄下的broker.conf中的#brokerIP1=xxxxx注釋去掉吠式,并將IP地址改成局域網(wǎng)IP陡厘。