前言
1.rocketmq 安裝可參考:http://www.reibang.com/p/f3713adfa3dd
2.啟動好nameserv 和 broker
3.官方RocketMQ+springcloud stream 例子 https://github.com/alibaba/spring-cloud-alibaba/blob/2021.x/spring-cloud-alibaba-examples/rocketmq-example/readme-zh.md
- 本文將說明普通消息發(fā)送/消費、廣播消息發(fā)送/消費吴菠、延時消息發(fā)送消費三種模式
項目環(huán)境/依賴:
<properties>
<spring-boot-version>2.3.12.RELEASE</spring-boot-version>
<spring-cloud-version>Hoxton.SR12</spring-cloud-version>
<spring-cloud-alibaba-version>2.2.7.RELEASE</spring-cloud-alibaba-version>
<rocketmq.version>2021.1</rocketmq.version>
</properties>
!-- Environment START-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>${spring-boot-version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-alibaba-dependencies</artifactId>
<version>${spring-cloud-alibaba-version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud-version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/com.alibaba.cloud/spring-cloud-starter-stream-rocketmq -->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
<version>${rocketmq.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-acl</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-client -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.9.4</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-acl -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-acl</artifactId>
<version>4.9.4</version>
</dependency>
依賴說明:spring-cloud-starter-stream-rocketmq 排除了rocketmq-client芝加、rocketmq-acl依賴是因為我想換成新一點的依賴忠怖,不排除也是可以的威恼。
1.普通消息發(fā)送
新建模塊A用于消息發(fā)送
創(chuàng)建一個controller用戶測試消息發(fā)送
@RestController
public class RocketMqSendMsgController {
@Autowired
private StreamBridge streamBridge;
@PostMapping(value = "/cluster")
public void sendClusterMsg(@RequestParam("message") String message) {
Message<BaseMessage<String>> msg = new GenericMessage<>(new BaseMessage<>(CLUSTER_MESSAGE_OUTPUT,"",message));
boolean result = streamBridge.send(CLUSTER_MESSAGE_OUTPUT, msg);
System.out.println(Thread.currentThread().getName() + " 消息集群發(fā)送: " + msg.getPayload().getData());
}
}
yml配置
server:
port: 10004
spring:
application:
name: search-server
cloud:
stream:
rocketmq:
binder:
name-server: localhost:9876
bindings:
cluster-out-0:
destination: cluster
配置說明
1.配置name-server服務(wù)地址暗膜,必須要配置
2.cluster-out-0 :channel 通道名稱 默認的一個規(guī)則吧 發(fā)送消息就是 -out- 這樣子
- destination: cluster :topic為cluster
附上代碼中用到的常量類
package com.ly.tuliy.commons.base.mq;
/**
* 類說明: mq 常量類
*
* @author wqf
* @date 2022/9/7 9:30
*/
public class MessageConstant {
//生產(chǎn)者-集群消息主題
public static String CLUSTER_MESSAGE_OUTPUT="cluster-out-0";
//生產(chǎn)者-廣播消息主題
public static String BROADCAST_MESSAGE_OUTPUT="broadcast-out-0";
//生產(chǎn)者-延時消息主題
public static String DELAYED_MESSAGE_OUTPUT="delayed-out-0";
//消費者-集群消息主題
public static String CLUSTER_MESSAGE_INPUT="cluster-in-0";
//消費者-廣播消息主題
public static String BROADCAST_MESSAGE_INPUT="broadcast-in-0";
//消費者-延時消息主題
public static String DELAYED_MESSAGE_INPUT="delayed-in-0";
}
import java.io.Serializable;
import java.util.Map;
/**
* @Author: wqf
* @Date: 2022/09/09
* @Description: mq 發(fā)送消息的內(nèi)容體基礎(chǔ)內(nèi)容
*/
@ToString
public class BaseMessage<T> implements Serializable {
/**
* 消息主題
*/
private String topic;
/**
* 消息標簽
*/
private String tag;
/**
* 消息內(nèi)容
*/
private T data;
/**
*
*/
private Map<String, Object> header;
public String getTopic() {
return topic;
}
public void setTopic(String topic) {
this.topic = topic;
}
public String getTag() {
return tag;
}
public void setTag(String tag) {
this.tag = tag;
}
public T getData() {
return data;
}
public void setData(T data) {
this.data = data;
}
public Map<String, Object> getHeader() {
return header;
}
public void setHeader(Map<String, Object> header) {
this.header = header;
}
public BaseMessage(String topic, String tag, T data, Map<String, Object> header) {
this.topic = topic;
this.tag = tag;
this.data = data;
this.header = header;
}
public BaseMessage(String topic, String tag, T data) {
this.topic = topic;
this.tag = tag;
this.data = data;
}
public BaseMessage(String topic, T data) {
this.topic = topic;
this.data = data;
}
public BaseMessage() {
}
}
新建模塊B用于消息消費
創(chuàng)建一個類接收消息
/**
* @Author: wqf
* @Date: 2022/09/09
* @Description:
*/
@RestController
public class RocketMqReceiveMsgController {
@Autowired
private StreamBridge streamBridge;
/**
* 函數(shù)式編輯接收消息
*/
@Bean
public Consumer<String> cluster() {
return message -> {
System.out.println("接收的集群消息為:" + message);
};
}
yml配置
server:
port: 10005 #${random.int[10000,19999]} # 隨機端口绵载,方便啟動多個消費者
spring:
application:
name: seckill-server
cloud:
stream:
function:
#消費者端配置
definition: cluster
rocketmq:
binder:
name-server: localhost:9876
bindings:
cluster-in-0:
destination: cluster
group: cluster-group
配置說明:
1.definition: cluster 消費者端配置饲梭,這里配置的cluster 必須和我們接收消息類中的方法名稱一致
2.cluster-in-0:也是默認的規(guī)則 -in- 標識接收消息
3.group:消費組名稱配置 期吓,這個一定要配早歇,名稱命名沒有要求
測試:
用postman在生產(chǎn)者端(A)發(fā)送消息,消費端(B)能正常接收到消息讨勤。將消費端B多啟動幾個端口箭跳,創(chuàng)建多消費者環(huán)境,此時我們發(fā)送消息可以觀測到消息將隨即被幾個消費者消費悬襟,一個消息只會被消費一次
出現(xiàn)的問題: 消息接收不到或者是報錯衅码,請先檢查下主題是否創(chuàng)建(rocketmq 控制臺看看),或者啟動broker時修改配置為自動創(chuàng)建主題脊岳。
2.廣播消息發(fā)送
生產(chǎn)者(A)controller添加測試接口
@PostMapping(value = "/broadcast")
public void sendBroadcastMsg(@RequestParam("message") String message) {
Message<BaseMessage<String>> msg = new GenericMessage<>(new BaseMessage<>(BROADCAST_MESSAGE_OUTPUT,"",message));
boolean result = streamBridge.send(BROADCAST_MESSAGE_OUTPUT, msg);
System.out.println(Thread.currentThread().getName() + " 消息廣播發(fā)送: " + msg.getPayload().getData());
}
消費者端(B)添加以下配置
/**
* 函數(shù)式編輯接收消息
*/
@Bean
public Consumer<String> broadcast() {
return message -> {
System.out.println("接收的廣播消息為:" + message);
};
}
server:
port: 10005 #${random.int[10000,19999]} # 隨機端口逝段,方便啟動多個消費者
spring:
application:
name: seckill-server
cloud:
stream:
function:
#消費者端配置
definition: cluster;broadcast
rocketmq:
binder:
name-server: localhost:9876
bindings:
broadcast-in-0:
consumer:
#配置是否開啟廣播消息 默認為false
broadcasting: true
bindings:
cluster-in-0:
destination: cluster
group: cluster-group
broadcast-in-0:
destination: broadcast
group: broadcast-group
配置說明:
1.consumer.broadcasting: true 該配置默認是false,true表示開啟廣播消費
測試:
啟動多個消費者割捅,發(fā)送消息時奶躯,每個消費者都能接收到每條生產(chǎn)者的消息
3.延時消息發(fā)送
生產(chǎn)者(A)controller添加測試接口
@PostMapping(value = "/delayed")
public void sendDelayedMsg(@RequestParam("message") String message) {
String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
for (int i = 0; i < 100; i++) {
String key = "KEY" + i;
Map<String, Object> headers = new HashMap<>();
headers.put(MessageConst.PROPERTY_KEYS, key);
headers.put(MessageConst.PROPERTY_ORIGIN_MESSAGE_ID, i);
// 設(shè)置延時等級1~10
headers.put(MessageConst.PROPERTY_DELAY_TIME_LEVEL, 4);
BaseMessage<String> baseMessage = new BaseMessage<>(MessageConstant.DELAYED_MESSAGE_OUTPUT, message);
baseMessage.setHeader(headers);
Message<BaseMessage<String>> msg = new GenericMessage<>(baseMessage, headers);
streamBridge.send(MessageConstant.DELAYED_MESSAGE_OUTPUT, msg);
System.out.println(Thread.currentThread().getName() + " 延時消息: " + msg.getPayload().getData());
}
}
參數(shù)說明:
messageDelayLevel :延時有18個等級(我試了前4個等級),每個等級延時時間如代碼
yml添加配置
server:
port: 10004
spring:
application:
name: search-server
cloud:
stream:
rocketmq:
binder:
name-server: localhost:9876
bindings:
delayed-out-0:
producer:
group: delayed-group
sync: true
bindings:
cluster-out-0:
destination: cluster
broadcast-out-0:
destination: broadcast
delayed-out-0:
destination: delayed
配置說明:
bindings.delayed-out-0.producer.sync=true 該項配置只在生產(chǎn)端配置亿驾,表示消息發(fā)送通道delayed-out-0開啟消息異步發(fā)送嘹黔,一定要有,不然延時消息沒效果
消費者端(B)添加以下配置
/**
* 函數(shù)式編輯接收消息
*/
@Bean
public Consumer<String> delayed() {
return message -> {
System.out.println("接收的延時消息為:" + message);
};
}
server:
port: 10005 #${random.int[10000,19999]} # 隨機端口莫瞬,方便啟動多個消費者
spring:
application:
name: seckill-server
cloud:
stream:
function:
#消費者端配置
definition: cluster;broadcast;delayed
rocketmq:
binder:
name-server: localhost:9876
bindings:
broadcast-in-0:
consumer:
#配置是否開啟廣播消息 默認為false
broadcasting: true
bindings:
cluster-in-0:
destination: cluster
group: cluster-group
broadcast-in-0:
destination: broadcast
group: broadcast-group
delayed-in-0:
destination: delayed
group: delayed-group