Spring Cloud Alibaba - 5躲撰、Stream & RabbitMQ

1、RabbitMQ 環(huán)境安裝

鏈接(linux):https://www.linuxprobe.com/install-rabbitmq-on-centos-7.html
鏈接(win):https://www.cnblogs.com/JustinLau/p/11738511.html

2击费、Producer 生產(chǎn)者

  • main 服務(wù)啟動(dòng)類(lèi)
package com.wyh;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

@EnableBinding(Producer.TestTopic.class)
@SpringBootApplication
@EnableDiscoveryClient
public class Producer {
    public static void main(String[] args) {
        SpringApplication.run(Producer.class, args);
    }

    @RestController
    static class TestController {

        @Autowired
        private TestTopic testTopic;

        @GetMapping("/sendMessage")
        public String messageWithMQ(@RequestParam String message) {
            testTopic.output().send(MessageBuilder.withPayload(message).setHeader("version", "1.0").build());
            testTopic.output().send(MessageBuilder.withPayload(message).setHeader("version", "2.0").build());
            return "ok";
        }

    }


    interface TestTopic {

        String OUTPUT = "example-topic-output";

        @Output(OUTPUT)
        MessageChannel output();

/*
        如果存在多個(gè)通道的話可以這樣使用
        String OUTPUT2 = "example-topic-output";

        String INPUT2 = "example-topic-input";

        @Output(OUTPUT)
        MessageChannel output2();

        @Input(INPUT)
        SubscribableChannel input2();*/

    }
}

  • application.yml
server:
  port: 8006
spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: admin
    password: admin
  application:
    name: rabbitmq-provider   #本服務(wù)的名稱(chēng)
  cloud:                           #注冊(cè)到nacos
    nacos:
      discovery:
        server-addr: localhost:8848
    stream:
      binders:  #需要綁定的rabbitmq的服務(wù)信息
        defaultRabbit: #定義的名稱(chēng)拢蛋,用于binding整合
          type: rabbit #消息組件類(lèi)型
      bindings: # 服務(wù)的整合處理(這里需要注意:如果存在多條管道,那么需要多個(gè)綁定)
        example-topic-output:
          destination: test-topic
management:
  endpoints:
    web:
      exposure:
        include: "*"
  • pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>myspring-cloud</artifactId>
        <groupId>org.example</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>producer</artifactId>

    <dependencies>

        <!-- Spring Cloud Stream RabbitMQ-->

        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>

        <!--  SpringCloud alibaba nacos    -->
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
        </dependency>

        <!--  web組件      -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <scope>runtime</scope>
            <optional>true</optional>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>


    </dependencies>
</project>

3荡灾、consumer-01 消費(fèi)者

  • main 服務(wù)啟動(dòng)類(lèi)
package com.wyh;

import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;

@EnableBinding(consumer01.TestTopic.class)
@SpringBootApplication
@EnableDiscoveryClient
public class consumer01 {
    public static void main(String[] args) {
        SpringApplication.run(consumer01.class, args);
    }


    /**
     * 消息消費(fèi)邏輯
     */
    @Slf4j
    @Component
    static class TestListener {

        @StreamListener(value = TestTopic.INPUT, condition = "headers['version']=='1.0'")
        public void receiveV1(String payload, @Header("version") String version) {
            log.info("Received v1 com-01: " + payload + ", " + version);
        }

        @StreamListener(value = TestTopic.INPUT, condition = "headers['version']=='2.0'")
        public void receiveV2(String payload, @Header("version") String version) {
            log.info("Received v2 com-01: " + payload + ", " + version);
        }

    }

    interface TestTopic {

        String INPUT = "example-topic-input";

        @Input(INPUT)
        SubscribableChannel input();


    }

}

  • application.yml
server:
  port: 8004
spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: admin
    password: admin
  application:
    name: consumer-01   #本服務(wù)的名稱(chēng)
  cloud:                           #注冊(cè)到nacos
    nacos:
      discovery:
        server-addr: localhost:8848
    stream:
      binders:  #需要綁定的rabbitmq的服務(wù)信息
        defaultRabbit: #定義的名稱(chēng)瓤狐,用于binding整合
          type: rabbit #消息組件類(lèi)型
      bindings: # 服務(wù)的整合處理(這里需要注意:如果存在多條管道,那么需要多個(gè)綁定)
        example-topic-input:
          destination: test-topic  #exchange名稱(chēng)批幌,交換模式默認(rèn)是topic
          group: stream-content-route #分組
management:
  endpoints:
    web:
      exposure:
        include: "*"
  • pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>myspring-cloud</artifactId>
        <groupId>org.example</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>consumer-01</artifactId>

    <dependencies>

        <!-- Spring Cloud Stream RabbitMQ-->

        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>

        <!--  SpringCloud alibaba nacos    -->
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
        </dependency>

        <!--  web組件      -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <scope>runtime</scope>
            <optional>true</optional>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>


    </dependencies>
</project>

4础锐、consumer-02 消費(fèi)者

  • 01copy一份即可,端口號(hào)修改一下
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末荧缘,一起剝皮案震驚了整個(gè)濱河市皆警,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌截粗,老刑警劉巖信姓,帶你破解...
    沈念sama閱讀 218,451評(píng)論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異绸罗,居然都是意外死亡意推,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,172評(píng)論 3 394
  • 文/潘曉璐 我一進(jìn)店門(mén)珊蟀,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)菊值,“玉大人,你說(shuō)我怎么就攤上這事∧逯希” “怎么了昵宇?”我有些...
    開(kāi)封第一講書(shū)人閱讀 164,782評(píng)論 0 354
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)儿子。 經(jīng)常有香客問(wèn)我瓦哎,道長(zhǎng),這世上最難降的妖魔是什么柔逼? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 58,709評(píng)論 1 294
  • 正文 為了忘掉前任蒋譬,我火速辦了婚禮,結(jié)果婚禮上愉适,老公的妹妹穿的比我還像新娘羡铲。我一直安慰自己,他們只是感情好儡毕,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,733評(píng)論 6 392
  • 文/花漫 我一把揭開(kāi)白布。 她就那樣靜靜地躺著扑媚,像睡著了一般腰湾。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上疆股,一...
    開(kāi)封第一講書(shū)人閱讀 51,578評(píng)論 1 305
  • 那天费坊,我揣著相機(jī)與錄音,去河邊找鬼旬痹。 笑死附井,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的两残。 我是一名探鬼主播永毅,決...
    沈念sama閱讀 40,320評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼人弓!你這毒婦竟也來(lái)了沼死?” 一聲冷哼從身側(cè)響起,我...
    開(kāi)封第一講書(shū)人閱讀 39,241評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤崔赌,失蹤者是張志新(化名)和其女友劉穎意蛀,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體健芭,經(jīng)...
    沈念sama閱讀 45,686評(píng)論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡县钥,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,878評(píng)論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了慈迈。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片若贮。...
    茶點(diǎn)故事閱讀 39,992評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出兜看,到底是詐尸還是另有隱情锥咸,我是刑警寧澤,帶...
    沈念sama閱讀 35,715評(píng)論 5 346
  • 正文 年R本政府宣布细移,位于F島的核電站搏予,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏弧轧。R本人自食惡果不足惜雪侥,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,336評(píng)論 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望精绎。 院中可真熱鬧速缨,春花似錦、人聲如沸代乃。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 31,912評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)搁吓。三九已至原茅,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間堕仔,已是汗流浹背擂橘。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 33,040評(píng)論 1 270
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留摩骨,地道東北人通贞。 一個(gè)月前我還...
    沈念sama閱讀 48,173評(píng)論 3 370
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像恼五,于是被迫代替她去往敵國(guó)和親昌罩。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,947評(píng)論 2 355