rocketmq模型如上圖所示泰演,分為如下幾個(gè)部分:
- NameServer:主要用作注冊中心谒所,用于管理Topic信息和路由信息的管理
- Broker:負(fù)責(zé)存儲(chǔ)向拆、消息tag過濾和轉(zhuǎn)發(fā)罪既。需將自身信息上報(bào)給注冊中心NameServer
- Producer:生產(chǎn)者
- Consumer:消費(fèi)者
由上各部分角色的功能可知,我們需要先安裝啟動(dòng)NameServer汇在,再啟動(dòng)Broker即可搭建完RocketMQ
1. 部署NameServer
首先下載鏡像:
docker pull rocketmqinc/rocketmq:4.4.0
啟動(dòng)NameServer翰萨,暴露9876端口
docker run --name rmqnamesrv -d -p 9876:9876 rocketmqinc/rocketmq:4.4.0 sh mqnamesrv
啟動(dòng)完成后,可以curl 9876端口測試服務(wù)是否啟動(dòng)成功
2. 部署B(yǎng)roker
RocketMQ是Java編寫的程序糕殉,Broker和NameServer都在上面的鏡像中亩鬼,只是啟動(dòng)命令不同而已。
啟動(dòng)Broker
docker run --name rmqbroker -d -p 10911:10911 -p 10909:10909 --link rmqnamesrv:namesrv -e "NAMESRV_ADDR=namesrv:9876" rocketmqinc/rocketmq:4.4.0 sh mqbroker
--link 將NameServer容器起個(gè)別名糙麦,Broker中需要配置一個(gè)NAMESRV_ADDR參數(shù)指向NameServer地址辛孵。
同上,這里也可以使用curl localhost:10911驗(yàn)證下服務(wù)器是否啟動(dòng)
3. 部署RocketMQ可視化界面控制臺(tái)
這一個(gè)步驟不做也可以通過Java等客戶端訪問到RocketMQ了赡磅,不過有可視化界面便于觀察RocketMQ數(shù)據(jù)魄缚。不需要的可以跳過這一步
下載鏡像:
docker pull pangliang/rocketmq-console-ng
啟動(dòng)容器:
docker run --name rmqconsole -d -p 8080:8080 --link rmqnamesrv:namesrv -e "JAVA_OPTS=-Drocketmq.namesrv.addr=namesrv:9876" pangliang/rocketmq-console-ng
自此,也可以使用curl命令測試控制臺(tái)界面是否成功啟動(dòng)焚廊。curl localhost:8080冶匹,如下表示啟動(dòng)成功。
宿主機(jī)也可以登錄訪問控制臺(tái)界面。
4. SpringBoot整合RocketMQ小實(shí)例
maven中先導(dǎo)入apache官方提供的starter
<!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-spring-boot-starter -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.1.0</version>
</dependency>
application.yml配置一個(gè)name-server地址,具體值看你的機(jī)器斧拍。
這里也可以通過accessKey和secureKey登錄連接楣富。默認(rèn)配置在RocketMQ的配置文件中腰涧,默認(rèn)值是:
accessKey: RocketMQ
secureKey: 12345678
生產(chǎn)者發(fā)送消息:
@RestController
public class RocketController {
@Autowired
private RocketMQTemplate rocketMQTemplate;
// 發(fā)送給Broker汛骂,默認(rèn)會(huì)自動(dòng)創(chuàng)建topic竟终,topic和tag用冒號(hào)分隔
@GetMapping("/rocket/send")
public String rocketSend() {
LocalDateTime currentTime = LocalDateTime.now();
rocketMQTemplate.convertAndSend("rocket-topic-1", currentTime.toString());
return currentTime.toString();
}
// 延時(shí)消息茎毁,RocketMQ支持這幾個(gè)級(jí)別的延時(shí)消息卧檐,不能自定義時(shí)長
// 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
@GetMapping("/rocket/delayMsg/send")
public String rocketDelayMsgSend() {
LocalDateTime currentTime = LocalDateTime.now();
rocketMQTemplate.syncSend("rocket-topic-1:tag-2", MessageBuilder.withPayload(currentTime.toString()).build(), 2000, 3);
return currentTime.toString();
}
}
消費(fèi)者:
@Component
@Slf4j
public class RokcetServiceListener {
@Service
@RocketMQMessageListener(consumerGroup = "consumer-group-1", topic = "rocket-topic-1")
public class Consumer1 implements RocketMQListener<String> {
@Override
public void onMessage(String s) {
log.info("consumer1 rocket收到消息:{}", s);
}
}
// RocketMQ支持兩種消費(fèi)方式墓懂,集器消費(fèi)和廣播消費(fèi)
@Service
@RocketMQMessageListener(consumerGroup = "consumer-group-2", topic = "rocket-topic-1",
selectorExpression = "tag2", messageModel = MessageModel.BROADCASTING)
public class Consumer2 implements RocketMQListener<String> {
@Override
public void onMessage(String s) {
log.info("consumer2 rocket收到消息:{}", s);
}
}
}