SpringCloud Stream整合RocketMQ

前言

1.rocketmq 安裝可參考:http://www.reibang.com/p/f3713adfa3dd
2.啟動好nameserv 和 broker
3.官方RocketMQ+springcloud stream 例子 https://github.com/alibaba/spring-cloud-alibaba/blob/2021.x/spring-cloud-alibaba-examples/rocketmq-example/readme-zh.md

  1. 本文將說明普通消息發(fā)送/消費、廣播消息發(fā)送/消費吴菠、延時消息發(fā)送消費三種模式

項目環(huán)境/依賴:

    <properties>
        <spring-boot-version>2.3.12.RELEASE</spring-boot-version>
        <spring-cloud-version>Hoxton.SR12</spring-cloud-version>
        <spring-cloud-alibaba-version>2.2.7.RELEASE</spring-cloud-alibaba-version>
        <rocketmq.version>2021.1</rocketmq.version>
    </properties>
    !-- Environment START-->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-dependencies</artifactId>
        <version>${spring-boot-version}</version>
        <type>pom</type>
        <scope>import</scope>
    </dependency>
    <dependency>
        <groupId>com.alibaba.cloud</groupId>
        <artifactId>spring-cloud-alibaba-dependencies</artifactId>
        <version>${spring-cloud-alibaba-version}</version>
        <type>pom</type>
        <scope>import</scope>
    </dependency>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-dependencies</artifactId>
        <version>${spring-cloud-version}</version>
        <type>pom</type>
        <scope>import</scope>
    </dependency>

    <!-- https://mvnrepository.com/artifact/com.alibaba.cloud/spring-cloud-starter-stream-rocketmq -->
    <dependency>
        <groupId>com.alibaba.cloud</groupId>
        <artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
        <version>${rocketmq.version}</version>
        <exclusions>
            <exclusion>
                <groupId>org.apache.rocketmq</groupId>
                <artifactId>rocketmq-client</artifactId>
            </exclusion>
            <exclusion>
                <groupId>org.apache.rocketmq</groupId>
                <artifactId>rocketmq-acl</artifactId>
            </exclusion>
        </exclusions>
    </dependency>

    <!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-client -->
    <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-client</artifactId>
        <version>4.9.4</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-acl -->
    <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-acl</artifactId>
        <version>4.9.4</version>
    </dependency>

依賴說明:spring-cloud-starter-stream-rocketmq 排除了rocketmq-client芝加、rocketmq-acl依賴是因為我想換成新一點的依賴忠怖,不排除也是可以的威恼。

1.普通消息發(fā)送

新建模塊A用于消息發(fā)送
創(chuàng)建一個controller用戶測試消息發(fā)送

@RestController
public class RocketMqSendMsgController {

    @Autowired
    private StreamBridge streamBridge;

    @PostMapping(value = "/cluster")
    public void sendClusterMsg(@RequestParam("message") String message) {
        Message<BaseMessage<String>> msg = new GenericMessage<>(new BaseMessage<>(CLUSTER_MESSAGE_OUTPUT,"",message));
        boolean result = streamBridge.send(CLUSTER_MESSAGE_OUTPUT, msg);
        System.out.println(Thread.currentThread().getName() + " 消息集群發(fā)送: " + msg.getPayload().getData());
    }
}

yml配置

server:
  port: 10004
spring:
  application:
    name: search-server
  cloud:
    stream:
      rocketmq:
        binder:
          name-server: localhost:9876
      bindings:
        cluster-out-0:
          destination: cluster

配置說明
1.配置name-server服務(wù)地址暗膜,必須要配置
2.cluster-out-0 :channel 通道名稱 默認的一個規(guī)則吧 發(fā)送消息就是 -out- 這樣子

  1. destination: cluster :topic為cluster

附上代碼中用到的常量類

package com.ly.tuliy.commons.base.mq;

/**
 * 類說明: mq 常量類
 *
 * @author wqf
 * @date 2022/9/7 9:30
 */
public class MessageConstant {

    //生產(chǎn)者-集群消息主題
    public static String CLUSTER_MESSAGE_OUTPUT="cluster-out-0";
    //生產(chǎn)者-廣播消息主題
    public static String BROADCAST_MESSAGE_OUTPUT="broadcast-out-0";
    //生產(chǎn)者-延時消息主題
    public static String DELAYED_MESSAGE_OUTPUT="delayed-out-0";


    //消費者-集群消息主題
    public static String CLUSTER_MESSAGE_INPUT="cluster-in-0";
    //消費者-廣播消息主題
    public static String BROADCAST_MESSAGE_INPUT="broadcast-in-0";
    //消費者-延時消息主題
    public static String DELAYED_MESSAGE_INPUT="delayed-in-0";

}

import java.io.Serializable;
import java.util.Map;

/**
 * @Author: wqf
 * @Date: 2022/09/09
 * @Description: mq 發(fā)送消息的內(nèi)容體基礎(chǔ)內(nèi)容
 */
@ToString
public class BaseMessage<T> implements Serializable {
    /**
     * 消息主題
     */
    private String topic;
    /**
     * 消息標簽
     */
    private String tag;
    /**
     * 消息內(nèi)容
     */
    private T data;
    /**
     *
     */
    private Map<String, Object> header;


    public String getTopic() {
        return topic;
    }

    public void setTopic(String topic) {
        this.topic = topic;
    }

    public String getTag() {
        return tag;
    }

    public void setTag(String tag) {
        this.tag = tag;
    }

    public T getData() {
        return data;
    }

    public void setData(T data) {
        this.data = data;
    }

    public Map<String, Object> getHeader() {
        return header;
    }

    public void setHeader(Map<String, Object> header) {
        this.header = header;
    }

    public BaseMessage(String topic, String tag, T data, Map<String, Object> header) {
        this.topic = topic;
        this.tag = tag;
        this.data = data;
        this.header = header;
    }

    public BaseMessage(String topic, String tag, T data) {
        this.topic = topic;
        this.tag = tag;
        this.data = data;
    }

    public BaseMessage(String topic,  T data) {
        this.topic = topic;
        this.data = data;
    }

    public BaseMessage() {
    }
}

新建模塊B用于消息消費
創(chuàng)建一個類接收消息

/**
 * @Author: wqf
 * @Date: 2022/09/09
 * @Description:
 */
@RestController
public class RocketMqReceiveMsgController {

    @Autowired
    private StreamBridge streamBridge;

    /**
     * 函數(shù)式編輯接收消息
     */
    @Bean
    public Consumer<String> cluster() {
        return message -> {
            System.out.println("接收的集群消息為:" + message);
        };
    }

yml配置

server:
  port: 10005 #${random.int[10000,19999]} # 隨機端口绵载,方便啟動多個消費者
spring:
  application:
    name: seckill-server
  cloud:
    stream:
      function:
        #消費者端配置
        definition: cluster
      rocketmq:
        binder:
          name-server: localhost:9876
      bindings:
        cluster-in-0:
          destination: cluster
          group: cluster-group

配置說明:
1.definition: cluster 消費者端配置饲梭,這里配置的cluster 必須和我們接收消息類中的方法名稱一致


image.png

2.cluster-in-0:也是默認的規(guī)則 -in- 標識接收消息
3.group:消費組名稱配置 期吓,這個一定要配早歇,名稱命名沒有要求

測試:
用postman在生產(chǎn)者端(A)發(fā)送消息,消費端(B)能正常接收到消息讨勤。將消費端B多啟動幾個端口箭跳,創(chuàng)建多消費者環(huán)境,此時我們發(fā)送消息可以觀測到消息將隨即被幾個消費者消費悬襟,一個消息只會被消費一次

出現(xiàn)的問題: 消息接收不到或者是報錯衅码,請先檢查下主題是否創(chuàng)建(rocketmq 控制臺看看),或者啟動broker時修改配置為自動創(chuàng)建主題脊岳。

2.廣播消息發(fā)送

生產(chǎn)者(A)controller添加測試接口

    @PostMapping(value = "/broadcast")
    public void sendBroadcastMsg(@RequestParam("message") String message) {
        Message<BaseMessage<String>> msg = new GenericMessage<>(new BaseMessage<>(BROADCAST_MESSAGE_OUTPUT,"",message));
        boolean result = streamBridge.send(BROADCAST_MESSAGE_OUTPUT, msg);
        System.out.println(Thread.currentThread().getName() + " 消息廣播發(fā)送: " + msg.getPayload().getData());
    }

消費者端(B)添加以下配置

    /**
     * 函數(shù)式編輯接收消息
     */
    @Bean
    public Consumer<String> broadcast() {
        return message -> {
            System.out.println("接收的廣播消息為:" + message);
        };
    }
server:
  port: 10005 #${random.int[10000,19999]} # 隨機端口逝段,方便啟動多個消費者
spring:
  application:
    name: seckill-server
  cloud:
    stream:
      function:
        #消費者端配置
        definition: cluster;broadcast
      rocketmq:
        binder:
          name-server: localhost:9876
        bindings:
          broadcast-in-0:
            consumer:
              #配置是否開啟廣播消息 默認為false
              broadcasting: true
      bindings:
        cluster-in-0:
          destination: cluster
          group: cluster-group
        broadcast-in-0:
          destination: broadcast
          group: broadcast-group

配置說明:
1.consumer.broadcasting: true 該配置默認是false,true表示開啟廣播消費

測試:
啟動多個消費者割捅,發(fā)送消息時奶躯,每個消費者都能接收到每條生產(chǎn)者的消息

3.延時消息發(fā)送

生產(chǎn)者(A)controller添加測試接口

    @PostMapping(value = "/delayed")
    public void sendDelayedMsg(@RequestParam("message") String message) {
        String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

        for (int i = 0; i < 100; i++) {
            String key = "KEY" + i;
            Map<String, Object> headers = new HashMap<>();
            headers.put(MessageConst.PROPERTY_KEYS, key);
            headers.put(MessageConst.PROPERTY_ORIGIN_MESSAGE_ID, i);
            // 設(shè)置延時等級1~10
            headers.put(MessageConst.PROPERTY_DELAY_TIME_LEVEL, 4);
            BaseMessage<String> baseMessage = new BaseMessage<>(MessageConstant.DELAYED_MESSAGE_OUTPUT, message);
            baseMessage.setHeader(headers);
            Message<BaseMessage<String>> msg = new GenericMessage<>(baseMessage, headers);
            streamBridge.send(MessageConstant.DELAYED_MESSAGE_OUTPUT, msg);
            System.out.println(Thread.currentThread().getName() + " 延時消息: " + msg.getPayload().getData());
        }
    }

參數(shù)說明:
messageDelayLevel :延時有18個等級(我試了前4個等級),每個等級延時時間如代碼

yml添加配置

server:
  port: 10004
spring:
  application:
    name: search-server
  cloud:
    stream:
      rocketmq:
        binder:
          name-server: localhost:9876
        bindings:
          delayed-out-0:
            producer:
              group: delayed-group
              sync: true
      bindings:
        cluster-out-0:
          destination: cluster
        broadcast-out-0:
          destination: broadcast
        delayed-out-0:
          destination: delayed

配置說明:
bindings.delayed-out-0.producer.sync=true 該項配置只在生產(chǎn)端配置亿驾,表示消息發(fā)送通道delayed-out-0開啟消息異步發(fā)送嘹黔,一定要有,不然延時消息沒效果

消費者端(B)添加以下配置

    /**
     * 函數(shù)式編輯接收消息
     */
    @Bean
    public Consumer<String> delayed() {
        return message -> {
            System.out.println("接收的延時消息為:" + message);
        };
    }
server:
  port: 10005 #${random.int[10000,19999]} # 隨機端口莫瞬,方便啟動多個消費者
spring:
  application:
    name: seckill-server
  cloud:
    stream:
      function:
        #消費者端配置
        definition: cluster;broadcast;delayed
      rocketmq:
        binder:
          name-server: localhost:9876
        bindings:
          broadcast-in-0:
            consumer:
              #配置是否開啟廣播消息 默認為false
              broadcasting: true
      bindings:
        cluster-in-0:
          destination: cluster
          group: cluster-group
        broadcast-in-0:
          destination: broadcast
          group: broadcast-group
        delayed-in-0:
          destination: delayed
          group: delayed-group
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末儡蔓,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子疼邀,更是在濱河造成了極大的恐慌喂江,老刑警劉巖,帶你破解...
    沈念sama閱讀 218,755評論 6 507
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件旁振,死亡現(xiàn)場離奇詭異获询,居然都是意外死亡涨岁,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,305評論 3 395
  • 文/潘曉璐 我一進店門吉嚣,熙熙樓的掌柜王于貴愁眉苦臉地迎上來梢薪,“玉大人,你說我怎么就攤上這事尝哆”玻” “怎么了?”我有些...
    開封第一講書人閱讀 165,138評論 0 355
  • 文/不壞的土叔 我叫張陵较解,是天一觀的道長畜疾。 經(jīng)常有香客問我,道長印衔,這世上最難降的妖魔是什么啡捶? 我笑而不...
    開封第一講書人閱讀 58,791評論 1 295
  • 正文 為了忘掉前任,我火速辦了婚禮奸焙,結(jié)果婚禮上瞎暑,老公的妹妹穿的比我還像新娘。我一直安慰自己与帆,他們只是感情好了赌,可當我...
    茶點故事閱讀 67,794評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著玄糟,像睡著了一般勿她。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上阵翎,一...
    開封第一講書人閱讀 51,631評論 1 305
  • 那天逢并,我揣著相機與錄音,去河邊找鬼郭卫。 笑死砍聊,一個胖子當著我的面吹牛,可吹牛的內(nèi)容都是我干的贰军。 我是一名探鬼主播玻蝌,決...
    沈念sama閱讀 40,362評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼词疼!你這毒婦竟也來了俯树?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,264評論 0 276
  • 序言:老撾萬榮一對情侶失蹤贰盗,失蹤者是張志新(化名)和其女友劉穎聘萨,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體童太,經(jīng)...
    沈念sama閱讀 45,724評論 1 315
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,900評論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了书释。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片翘贮。...
    茶點故事閱讀 40,040評論 1 350
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖爆惧,靈堂內(nèi)的尸體忽然破棺而出狸页,到底是詐尸還是另有隱情,我是刑警寧澤扯再,帶...
    沈念sama閱讀 35,742評論 5 346
  • 正文 年R本政府宣布芍耘,位于F島的核電站,受9級特大地震影響熄阻,放射性物質(zhì)發(fā)生泄漏斋竞。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,364評論 3 330
  • 文/蒙蒙 一秃殉、第九天 我趴在偏房一處隱蔽的房頂上張望坝初。 院中可真熱鬧,春花似錦钾军、人聲如沸鳄袍。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,944評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽拗小。三九已至,卻和暖如春樱哼,著一層夾襖步出監(jiān)牢的瞬間哀九,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,060評論 1 270
  • 我被黑心中介騙來泰國打工唇礁, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留勾栗,地道東北人。 一個月前我還...
    沈念sama閱讀 48,247評論 3 371
  • 正文 我出身青樓盏筐,卻偏偏與公主長得像围俘,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子琢融,可洞房花燭夜當晚...
    茶點故事閱讀 44,979評論 2 355

推薦閱讀更多精彩內(nèi)容