Spring Cloud Alibaba(五)RocketMQ 異步通信實(shí)現(xiàn)

本文探討如何使用 RocketMQ Binder 完成 Spring Cloud 應(yīng)用消息的訂閱和發(fā)布归粉。

介紹

RocketMQ 是一款開源的分布式消息系統(tǒng),基于高可用分布式集群技術(shù)该肴,提供低延時(shí)的、高可靠的消息發(fā)布與訂閱服務(wù)藐不,廣泛應(yīng)用于多個(gè)領(lǐng)域匀哄,包括異步通信解耦秦效、企業(yè)解決方案、金融支付拱雏、電信棉安、電子商務(wù)底扳、快遞物流铸抑、廣告營銷、社交衷模、即時(shí)通信鹊汛、移動(dòng)應(yīng)用、手游阱冶、視頻刁憋、物聯(lián)網(wǎng)、車聯(lián)網(wǎng)等木蹬。

RocketMQ 是阿里巴巴在2012年開源的分布式消息中間件至耻,目前已經(jīng)捐贈(zèng)給 Apache 軟件基金會(huì),并于2017年9月25日成為 Apache 的頂級(jí)項(xiàng)目镊叁。作為經(jīng)歷過多次阿里巴巴雙十一這種“超級(jí)工程”的洗禮并有穩(wěn)定出色表現(xiàn)的國產(chǎn)中間件尘颓,以其高性能、低延時(shí)和高可靠等特性近年來已經(jīng)也被越來越多的國內(nèi)企業(yè)使用晦譬。

RocketMQ特點(diǎn)

  • 是一個(gè)隊(duì)列模型的消息中間件疤苹,具有高性能、高可靠敛腌、高實(shí)時(shí)卧土、分布式等特點(diǎn)
  • Producer、Consumer像樊、隊(duì)列都可以分布式
  • Producer 向一些隊(duì)列輪流發(fā)送消息尤莺,隊(duì)列集合稱為 Topic,Consumer 如果做廣播消費(fèi)生棍,則一個(gè) Consumer 實(shí)例消費(fèi)這個(gè) Topic 對(duì)應(yīng)的所有隊(duì)列颤霎,如果做集群消費(fèi),則多個(gè) Consumer 實(shí)例平均消費(fèi)這個(gè) Topic 對(duì)應(yīng)的隊(duì)列集合
  • 能夠保證嚴(yán)格的消息順序
  • 支持拉(pull)和推(push)兩種消息模式
  • 高效的訂閱者水平擴(kuò)展能力
  • 實(shí)時(shí)的消息訂閱機(jī)制
  • 億級(jí)消息堆積能力
  • 支持多種消息協(xié)議足绅,如 JMS捷绑、OpenMessaging 等
  • 較少的依賴

Spring Cloud Stream

Spring Cloud Stream 是一個(gè)構(gòu)建消息驅(qū)動(dòng)微服務(wù)的框架。

Spring Cloud Stream 提供了消息中間件配置的統(tǒng)一抽象氢妈,推出了 pub/sub粹污,consumer groups,semantics首量,stateful partition 這些統(tǒng)一的模型支持壮吩。

Spring Cloud Stream 核心構(gòu)件有:Binders进苍、Bindings和Message,應(yīng)用程序通過 inputs 或者 outputs 來與 binder 交互鸭叙,通過我們配置來 binding 觉啊,而 binder 負(fù)責(zé)與中間件交互,Message為數(shù)據(jù)交換的統(tǒng)一數(shù)據(jù)規(guī)范格式沈贝。

  • Binding: 包括 Input Binding 和 Output Binding杠人。

Binding 在消息中間件與應(yīng)用程序提供的 Provider 和 Consumer 之間提供了一個(gè)橋梁,實(shí)現(xiàn)了開發(fā)者只需使用應(yīng)用程序的 Provider 或 Consumer 生產(chǎn)或消費(fèi)數(shù)據(jù)即可宋下,屏蔽了開發(fā)者與底層消息中間件的接觸嗡善。

  • Binder: 跟外部消息中間件集成的組件,用來創(chuàng)建 Binding学歧,各消息中間件都有自己的 Binder 實(shí)現(xiàn)罩引。

比如 Kafka 的實(shí)現(xiàn) KafkaMessageChannelBinderRabbitMQ 的實(shí)現(xiàn) RabbitMessageChannelBinder 以及 RocketMQ 的實(shí)現(xiàn) RocketMQMessageChannelBinder枝笨。

  • Message:是 Spring Framework 中的一個(gè)模塊袁铐,其作用就是統(tǒng)一消息的編程模型。

比如消息 Messaging 對(duì)應(yīng)的模型就包括一個(gè)消息體 Payload 和消息頭 Header横浑。

spring-cloud-stream 官網(wǎng)

Window搭建部署RocketMQ

下載

當(dāng)前最新版本為4.6.0

下載出來解壓到:D:\rocketmq 目錄剔桨,目錄最好不要帶空格和太深,否則服務(wù)運(yùn)行可能會(huì)報(bào)錯(cuò)

啟動(dòng)NameServer服務(wù)

在啟動(dòng)之前需要配置系統(tǒng)環(huán)境伪嫁,不然會(huì)報(bào)錯(cuò)领炫。

Please set the ROCKETMQ_HOME variable in your environment! 

系統(tǒng)環(huán)境變量名:ROCKETMQ_HOME

根據(jù)你解壓的目錄配置環(huán)境變量,比如我的變量值為:D:\rocketmq

進(jìn)入window命令窗口张咳,進(jìn)入D:\rocketmq\bin目錄下帝洪,執(zhí)行

start mqnamesrv.cmd

如上則NameServer啟動(dòng)成功。使用期間脚猾,窗口不要關(guān)閉葱峡。

啟動(dòng)Broker服務(wù)

進(jìn)入bin目錄下,輸入

start mqbroker.cmd -n localhost:9876

如上的 ip+port 是rocketmq的服務(wù)地址和端口龙助。

運(yùn)行如上命令砰奕,可能會(huì)報(bào)如下錯(cuò)誤。找不到或無法加載主類

如果出此情況提鸟,打開bin-->runbroker.cmd军援,修改%CLASSPATH%成"%CLASSPATH%"

保存再次執(zhí)行如上命令。執(zhí)行成功后称勋,提示boot success 代表成功胸哥。

示例

本示例實(shí)現(xiàn)三種消息的發(fā)布以及訂閱接收。

創(chuàng)建 RocketMQ 消息生產(chǎn)者

創(chuàng)建 ali-rocketmq-producer 工程赡鲜,端口為:28081

  • pom.xml添加依賴
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">

    <parent>
        <artifactId>cloud-alibaba</artifactId>
        <groupId>com.easy</groupId>
        <version>1.0.0</version>
    </parent>

    <modelVersion>4.0.0</modelVersion>
    <artifactId>ali-rocketmq-producer</artifactId>
    <packaging>jar</packaging>

    <dependencies>

        <!--rocketmq依賴-->
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
        </dependency>

        <!--web依賴-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

  • 配置 Output 的 Binding 信息并配合 @EnableBinding 注解使其生效

application.yml配置

server:
  port: 28081

spring:
  application:
    name: ali-rocketmq-producer
  cloud:
    stream:
      rocketmq:
        binder:
          # RocketMQ 服務(wù)器地址
          name-server: 127.0.0.1:9876
      bindings:
        output1: {destination: test-topic1, content-type: application/json}
        output2: {destination: test-topic2, content-type: application/json}

management:
  endpoints:
    web:
      exposure:
        include: '*'
  endpoint:
    health:
      show-details: always

ArProduceApplication.java

@SpringBootApplication
@EnableBinding({MySource.class})
public class ArProduceApplication {

    public static void main(String[] args) {
        SpringApplication.run(ArProduceApplication.class, args);
    }
}
  • 消息生產(chǎn)者服務(wù)

MySource.java

package com.easy.arProduce;

import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;

public interface MySource {

    @Output("output1")
    MessageChannel output1();

    @Output("output2")
    MessageChannel output2();
}

SenderService.java

package com.easy.arProduce;

import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
import org.springframework.util.MimeTypeUtils;

@Service
public class SenderService {

    @Autowired
    private MySource source;

    /**
     * 發(fā)送字符串
     *
     * @param msg
     */
    public void send(String msg) {
        Message message = MessageBuilder.withPayload(msg)
                .build();
        source.output1().send(message);
    }

    /**
     * 發(fā)送帶tag的字符串
     *
     * @param msg
     * @param tag
     */
    public void sendWithTags(String msg, String tag) {
        Message message = MessageBuilder.withPayload(msg)
                .setHeader(RocketMQHeaders.TAGS, tag)
                .build();
        source.output1().send(message);
    }

    /**
     * 發(fā)送對(duì)象
     *
     * @param msg
     * @param tag
     * @param <T>
     */
    public <T> void sendObject(T msg, String tag) {
        Message message = MessageBuilder.withPayload(msg)
                .setHeader(RocketMQHeaders.TAGS, tag)
                .setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON)
                .build();
        source.output2().send(message);
    }
}

編寫 TestController.java 控制器方便測試

package com.easy.arProduce;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping(value = "test")
public class TestController {
    @Autowired
    SenderService senderService;

    @RequestMapping(value = "/send", method = RequestMethod.GET)
    public String send(String msg) {
        senderService.send(msg);
        return "字符串消息發(fā)送成功!";
    }

    @RequestMapping(value = "/sendWithTags", method = RequestMethod.GET)
    public String sendWithTags(String msg) {
        senderService.sendWithTags(msg, "tagStr");
        return "帶tag字符串消息發(fā)送成功!";
    }

    @RequestMapping(value = "/sendObject", method = RequestMethod.GET)
    public String sendObject(int index) {
        senderService.sendObject(new Foo(index, "foo"), "tagObj");
        return "Object對(duì)象消息發(fā)送成功!";
    }
}

創(chuàng)建 RocketMQ 消息消費(fèi)者

創(chuàng)建 ali-rocketmq-consumer 工程空厌,端口為:28082

  • pom.xml添加依賴
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">

    <parent>
        <artifactId>cloud-alibaba</artifactId>
        <groupId>com.easy</groupId>
        <version>1.0.0</version>
    </parent>

    <modelVersion>4.0.0</modelVersion>

    <artifactId>ali-rocketmq-consumer</artifactId>
    <packaging>jar</packaging>

    <dependencies>
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
</project>

-配置 Input 的 Binding 信息并配合 @EnableBinding 注解使其生效

application.yml配置

server:
  port: 28082

spring:
  application:
    name: ali-rocketmq-consumer
  cloud:
    stream:
      rocketmq:
        binder:
          name-server: 127.0.0.1:9876 #rocketmq 服務(wù)地址
        bindings:
          input1: {consumer.orderly: true}  #是否排序
          input2: {consumer.tags: tagStr}   #訂閱 帶tag值為tagStr的字符串
          input3: {consumer.tags: tagObj}   #訂閱 帶tag值為tabObj的字符串
      bindings:
        input1: {destination: test-topic1, content-type: text/plain, group: test-group1, consumer.maxAttempts: 1}
        input2: {destination: test-topic1, content-type: application/plain, group: test-group2, consumer.maxAttempts: 1}
        input3: {destination: test-topic2, content-type: application/plain, group: test-group3, consumer.maxAttempts: 1}

management:
  endpoints:
    web:
      exposure:
        include: '*'
  endpoint:
    health:
      show-details: always

ArConsumerApplication.java

package com.easy.arConsumer;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;

@SpringBootApplication
@EnableBinding({MySource.class})
public class ArConsumerApplication {

    public static void main(String[] args) {
        SpringApplication.run(ArConsumerApplication.class, args);
    }
}
  • 消息消費(fèi)者服務(wù)

MySource.java

package com.easy.arConsumer;

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;

public interface MySource {
    @Input("input1")
    SubscribableChannel input1();

    @Input("input2")
    SubscribableChannel input2();

    @Input("input3")
    SubscribableChannel input3();
}

ReceiveService.java

package com.easy.arConsumer;

import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Service;

@Service
@Slf4j
public class ReceiveService {

    @StreamListener("input1")
    public void receiveInput1(String receiveMsg) {
        log.info("input1 接收到了消息:" + receiveMsg);
    }

    @StreamListener("input2")
    public void receiveInput2(String receiveMsg) {
        log.info("input2 接收到了消息:" + receiveMsg);
    }

    @StreamListener("input3")
    public void receiveInput3(@Payload Foo foo) {
        log.info("input3 接收到了消息:" + foo);
    }
}

使用示例

示例關(guān)聯(lián)項(xiàng)目

本示例我們創(chuàng)建了兩個(gè)項(xiàng)目實(shí)現(xiàn)

  • ali-rocketmq-producer:RocketMQ 消息服務(wù)生產(chǎn)者庐船,服務(wù)名:ali-rocketmq-producer,端口:28081

  • ali-rocketmq-consumer:RocketMQ 消息服務(wù)消費(fèi)者嘲更,服務(wù)名:ali-rocketmq-producer筐钟,端口:28082

運(yùn)行示例測試

首先要啟動(dòng)ali-rocketmq-producer服務(wù)及ali-rocketmq-consumer服務(wù)

查看服務(wù)消費(fèi)者控制臺(tái),輸出

2019-12-04 15:37:47.859  INFO 6356 --- [MessageThread_1] com.easy.arConsumer.ReceiveService       : input1 接收到了消息:yuntian
2019-12-04 15:37:47.859  INFO 6356 --- [MessageThread_1] s.b.r.c.RocketMQListenerBindingContainer : consume C0A8096E200818B4AAC212CDA70E0014 cost: 1 ms

表示字符串消費(fèi)成功被input1消費(fèi)了

查看服務(wù)消費(fèi)者控制臺(tái)赋朦,輸出

2019-12-04 15:38:09.586  INFO 6356 --- [MessageThread_1] com.easy.arConsumer.ReceiveService       : input2 接收到了消息:tagyuntian
2019-12-04 15:38:09.592  INFO 6356 --- [MessageThread_1] com.easy.arConsumer.ReceiveService       : input1 接收到了消息:tagyuntian
2019-12-04 15:38:09.592  INFO 6356 --- [MessageThread_1] s.b.r.c.RocketMQListenerBindingContainer : consume C0A8096E200818B4AAC212CDFCD30015 cost: 6 ms

表示帶tag的字符串成功被input2和input1消費(fèi)了篓冲,因?yàn)閕nput1也訂閱了test-topic1,并且沒有我們沒有加tag過濾北发,默認(rèn)表示接收所有消息纹因,所以也能成功接收tagyuntian字符串

查看服務(wù)消費(fèi)者控制臺(tái)喷屋,輸出

2019-12-04 15:41:15.285  INFO 6356 --- [MessageThread_1] com.easy.arConsumer.ReceiveService       : input3 接收到了消息:Foo{id=1, bar='foo'}

表示input3成功接收到了tag帶tagObj的對(duì)象消息了琳拨,而input1卻沒有輸出消息,這是因?yàn)閟endObject發(fā)布的消息走的是test-topic2消息管道屯曹,所以不會(huì)發(fā)布給input1及input2訂閱者

資料

Spring Boot狱庇、Cloud 學(xué)習(xí)項(xiàng)目

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市恶耽,隨后出現(xiàn)的幾起案子密任,更是在濱河造成了極大的恐慌,老刑警劉巖偷俭,帶你破解...
    沈念sama閱讀 211,042評(píng)論 6 490
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件浪讳,死亡現(xiàn)場離奇詭異,居然都是意外死亡涌萤,警方通過查閱死者的電腦和手機(jī)淹遵,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 89,996評(píng)論 2 384
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來负溪,“玉大人透揣,你說我怎么就攤上這事〈眨” “怎么了辐真?”我有些...
    開封第一講書人閱讀 156,674評(píng)論 0 345
  • 文/不壞的土叔 我叫張陵,是天一觀的道長崖堤。 經(jīng)常有香客問我侍咱,道長,這世上最難降的妖魔是什么密幔? 我笑而不...
    開封第一講書人閱讀 56,340評(píng)論 1 283
  • 正文 為了忘掉前任楔脯,我火速辦了婚禮,結(jié)果婚禮上老玛,老公的妹妹穿的比我還像新娘钧敞。我一直安慰自己,他們只是感情好麸粮,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,404評(píng)論 5 384
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著愚战,像睡著了一般。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上违诗,一...
    開封第一講書人閱讀 49,749評(píng)論 1 289
  • 那天,我揣著相機(jī)與錄音笛粘,去河邊找鬼。 笑死垛膝,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的惑折。 我是一名探鬼主播乍恐,決...
    沈念sama閱讀 38,902評(píng)論 3 405
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢(mèng)啊……” “哼测砂!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起百匆,我...
    開封第一講書人閱讀 37,662評(píng)論 0 266
  • 序言:老撾萬榮一對(duì)情侶失蹤砌些,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后加匈,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體存璃,經(jīng)...
    沈念sama閱讀 44,110評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,451評(píng)論 2 325
  • 正文 我和宋清朗相戀三年雕拼,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了纵东。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 38,577評(píng)論 1 340
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡啥寇,死狀恐怖偎球,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情辑甜,我是刑警寧澤衰絮,帶...
    沈念sama閱讀 34,258評(píng)論 4 328
  • 正文 年R本政府宣布,位于F島的核電站磷醋,受9級(jí)特大地震影響猫牡,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜邓线,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,848評(píng)論 3 312
  • 文/蒙蒙 一淌友、第九天 我趴在偏房一處隱蔽的房頂上張望煌恢。 院中可真熱鬧,春花似錦震庭、人聲如沸症虑。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,726評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽谍憔。三九已至,卻和暖如春主籍,著一層夾襖步出監(jiān)牢的瞬間习贫,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,952評(píng)論 1 264
  • 我被黑心中介騙來泰國打工千元, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留苫昌,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 46,271評(píng)論 2 360
  • 正文 我出身青樓幸海,卻偏偏與公主長得像祟身,于是被迫代替她去往敵國和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子物独,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,452評(píng)論 2 348

推薦閱讀更多精彩內(nèi)容