安裝篇
- 下載地址: https://www.apache.org/dyn/closer.cgi?path=rocketmq/4.2.0/rocketmq-all-4.2.0-source-release.zip
- 解壓吼蚁、編譯
unzip rocketmq-all-4.2.0-source-release.zip
cd rocketmq-all-4.2.0/
mvn -Prelease-all -DskipTests clean install -U
- 進入到編譯好的目錄
/data/rocketmq-all-4.2.0/distribution/target/apache-rocketmq
- 啟動 NameServer并查看日志
nohup sh bin/mqnamesrv -n 172.16.4.26:9876 &
tail -f ~/logs/rocketmqlogs/namesrv.log
- 啟動 Broker并查看日志
nohup sh bin/mqbroker -n 172.16.4.26:9876 -c conf/broker.conf autoCreateTopicEnable=true &
tail -f ~/logs/rocketmqlogs/broker.log
- 運維相關命令
sh bin/mqshutdown broker // 停止 broker
sh bin/mqshutdown namesrv // 停止 nameserver
sh bin/mqbroker -m // 查看broker參數(shù)
查看集群情況 ./mqadmin clusterList -n 127.0.0.1:9876
查看 broker 狀態(tài) ./mqadmin brokerStatus -n 127.0.0.1:9876 -b 172.20.1.138:10911 (注意換成你的 broker 地址)
查看 topic 列表 ./mqadmin topicList -n 127.0.0.1:9876
查看 topic 狀態(tài) ./mqadmin topicStatus -n 127.0.0.1:9876 -t MyTopic (換成你想查詢的 topic)
查看 topic 路由 ./mqadmin topicRoute -n 127.0.0.1:9876 -t MyTopic
- 還需要注意一點的是要開啟9876,10911這兩個端口
客戶端示例篇(以springboot 2.x整合rocketMQ為示例)
- 添加maven依賴
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.3</version>
</dependency>
- 生產(chǎn)者示例
@Slf4j
@RestController
public class ScheduledMessageProducer {
@Resource
private RocketMQTemplate rocketMQTemplate;
@RequestMapping(value = "send")
public String send() {
String msg = "呵呵呵我是消息體";
String status = "";
try{
//普通消息發(fā)送
rocketMQTemplate.convertAndSend("delay-message-test", msg);
Message<String> message = new GenericMessage<>(msg);
//延時消息發(fā)送,level對應的默認延時時間依次為1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
rocketMQTemplate.syncSend("delay-message-test", message, 10000, 3);
log.info("消息已發(fā)送:{}", msg);
status="消息發(fā)送成功";
}
catch (Exception ex){
log.error("發(fā)送失敗:" + ex.getMessage(), ex);
status="消息發(fā)送失敗";
}
return status;
}
}
- 消費者示例
@Slf4j
@Component
@RocketMQMessageListener(topic = "delay-message-test", consumerGroup = "xht-group")
public class ScheduledMessageConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
log.info("received message: {}", message);
}
}
問題解決
遇到There is insufficient memory for the Java Runtime Environment to continue問題時的解決辦法:
修改/distribution/target/apache-rocketmq/bin 下的 3 個配置文件: runserver.sh撵幽、runbroker.sh 草添、tools.sh
調整jvm為合適大小嚷节,因為默認初始值比較大