RabbitMQ學(xué)習(xí)筆記 - RabbitMQ整合SpringCloud

RabbitMQ整合SpringCloud

????你好宙枷!歡迎來到Java成長筆記,主要是用于相互交流茧跋,相互學(xué)習(xí)慰丛,也希望分享能幫到大家,如有錯誤之處瘾杭,希望指正诅病,謝謝!

整合RabbitMQ簡介

1、SpringCloudStream定位是兼容主流消息中間件的集成使用贤笆,減少不同消息中間件集成的配置蝇棉,它整合RabbitMQ配置相較簡單,不需要定義相應(yīng)的交換機(jī)苏潜、隊列银萍、以及關(guān)系綁定,使相應(yīng)的配置減少恤左。
2、SpringCloudStream通過上層結(jié)構(gòu)上的處理搀绣,使消息生產(chǎn)端飞袋、消費(fèi)端可以多樣化,不需要拘泥于生產(chǎn)消費(fèi)端使用相同的消息中間件链患。例如:生產(chǎn)端可以使用RabbitMQ巧鸭,而消費(fèi)端可以使用Kafka,讓開發(fā)者省去了相應(yīng)不同的配置的集成麻捻,開發(fā)者只需要使用好相應(yīng)的幾個注解纲仍,就能實(shí)現(xiàn)高性能的生產(chǎn)和消費(fèi)的場景。
3贸毕、當(dāng)然它也有一個非常大的問題就是不能實(shí)現(xiàn)消息的可靠性投遞郑叠,也就是不能保證消息的100%可靠性,會存在少量的消息丟失明棍。

引入依賴配置

主要依賴

// 指定統(tǒng)一配置
<properties>
    <java.version>1.8</java.version>
    <spring.cloud-version>Greenwich.SR6</spring.cloud-version>
    <spring.boot-version>Brussels-SR17</spring.boot-version>
</properties>

// springcloud依賴
<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-dependencies</artifactId>
            <version>${spring.cloud-version}</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>

// 其它配置
<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
    </dependency>
</dependencies>

使用注解說明

@Output:輸出注解乡革,用于定義發(fā)送消息接口
@input:輸入注解,用于定義消息的消費(fèi)者接口
@StreamListener:用于定義監(jiān)聽方法的注解
@EnableBinding:啟動綁定關(guān)系

消息生產(chǎn)端

生產(chǎn)端配置說明

spring:
  cloud:
    stream:
      binders:
        defaultRabbit:                          # 定義的名稱摊腋,用于bidding整合
          type: rabbit                          # 指定消息類型
          environment:
            spring:
              rabbitmq:                         # rabbitmq 配置信息
                addresses: 127.0.0.1:5672       # rabbitmq 連接地址
                username: rabbitmq              # rabbitmq 用戶
                password: 123456                # rabbitmq 連接密碼
                virtual-host: /                 # 虛擬路徑
      bindings:
        userOutPutChannel:
          destination: exchange_cloud           # Exchange名稱沸版,交換模式默認(rèn)topic,把stream輸出通道綁定到exchange_cloud交換機(jī)
          group: userGroup                      # 分組名稱兴蒸,生產(chǎn)端视粮、消費(fèi)端名稱需要一致
          default-binder: defaultRabbit         # 和上面定義的 binders:defaultRabbit需要一致
          content-type: application/json        # 設(shè)置消息類型 為json 

生產(chǎn)端代碼

展示如下:

// 定義輸出類型的通道
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.stereotype.Component;

@Component
public interface Barista {

    final static String OUTPUT_CHANNEL = "userOutPutChannel";

    /*
     * @Description: 定義一個輸出類型的通道
     * @Author ly
     * @param []
     * @return org.springframework.messaging.MessageChannel
     * @date 2021/3/22 17:07
     */
    @Output(Barista.OUTPUT_CHANNEL)
    MessageChannel userOutPutChannel();

}

// 實(shí)現(xiàn)類封裝
import com.alibaba.fastjson.JSON;
import com.show.service.Barista;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;
import java.util.Map;

@EnableBinding(Barista.class)
@Service
@Slf4j
public class RabbitSender {

    @Resource
    @Output(Barista.OUTPUT_CHANNEL)
    private MessageChannel channel;

    @Resource
    private Barista barista;

    /*
     * @Description: channel 發(fā)送消息
     * @Author ly
     * @date 2021/3/23 11:12
     */
    public void sendMsg (String msg) {
        channel.send(MessageBuilder.withPayload(msg).build());
        log.error("channel消息發(fā)送成功:{} " + msg);
    }

    /*
     * @Description:  barista 發(fā)送消息
     * @Author ly
     * @date 2021/3/23 11:12
     */
   public void sendMessage (Object message, Map<String, Object> properties) {
        final MessageHeaders messageHeaders = new MessageHeaders(properties);
        final Message msg = MessageBuilder.createMessage(message, messageHeaders);
        final boolean sendResult = barista.userOutPutChannel().send(msg);
        log.error("barista消息發(fā)送成功:{},sendResult:{} " + JSON.toJSONString(msg), sendResult);
    }

}

// 測試類處理
import com.google.common.collect.ImmutableMap;
import com.show.model.User;
import com.show.service.impl.RabbitSender;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.test.context.junit4.SpringRunner;
import javax.annotation.Resource;
import java.math.BigDecimal;
import java.util.Map;

@RunWith(SpringRunner.class)
@SpringBootTest(classes = ECloudProducerApplicationTests.class)
@ComponentScan(basePackages = {"com.show.*"})
@Slf4j
public class ECloudProducerApplicationTests {

    @Resource
    private RabbitSender rabbitSender;

    @Test
    public void sendMessage () {
        final String message = "Hello RabbitMQ";
        rabbitSender.sendMsg(message);
    }

    @Test
    public void sendRabbitMessage () {
        final Map<String, Object> properties = ImmutableMap.of("cloud-stream", "cloud-stream");
        final User user = new User("simon","123456", 22,  new BigDecimal(100));
        rabbitSender.sendMessage(user, properties);
    }

}

消費(fèi)端代碼

消費(fèi)端配置說明

spring:
  cloud:
    stream:
      binders:
        defaultRabbit:                              # 此配置為rabbitmq配置說明
          type: rabbit
          environment:
            spring:
              rabbitmq:
                addresses: 127.0.0.1:5672
                username: rabbit
                password: 123456
                virtual-host: /
      bindings:
        userInChannel:                              # 定義輸入管道名稱          
          destination: exchange_cloud               # 交換模式默認(rèn)是topic,把stream的消息輸出通道綁定到exchange_cloud交換器
          group: userGroup                          # 分組名稱與生產(chǎn)端名稱一致
          content-type: application/json            # 消費(fèi)端消息類型 json
          default-binder: defaultRabbit             # 與binders:defaultRabbit名稱一致 環(huán)境名稱
          consumer:
            concurrency: 1                          # 默認(rèn)監(jiān)聽數(shù)量
      rabbit:
        bindings:
          userInChannel:                            # 和bindings:userInChannel 名稱一致
            consumer:                               # 消費(fèi)端配置     
              requeue-reject: false                 # 是否支持return
              acknowledge-mode: MANUAL              # 簽收模式 手動簽收
              recovery-interval: 3000               # 3s重新連接 
              durable-subscription: true            # 是否啟動持久化訂閱
              max-concurrency: 5                    # 最大監(jiān)聽數(shù)量

消費(fèi)端代碼

// 定義一個輸入通道
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.stereotype.Component;

@Component
public interface Barista {
      
    String INPUT_CHANNEL = "userInChannel";

    /*
     * @Description: 定義一個輸入類型的通道
     * @Author ly
     * @param []
     * @return org.springframework.messaging.SubscribableChannel
     * @date 2021/3/22 17:34
     */
    @Input(Barista.INPUT_CHANNEL)
    SubscribableChannel userInChannel();

}  

// 定義輸入通道監(jiān)聽類
import com.alibaba.fastjson.JSON;
import com.rabbitmq.client.Channel;
import com.show.service.Barista;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Service;

@EnableBinding(Barista.class)
@Service
@Slf4j
public class MQReceiver {

    @StreamListener(Barista.INPUT_CHANNEL)
    public void receiver(Message message) throws Exception {
        Channel channel = (Channel) message.getHeaders().get(AmqpHeaders.CHANNEL);
        Long deliveryTag = (Long) message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
        log.error("消費(fèi)完畢:{}, Object:{}", System.currentTimeMillis(), JSON.toJSON(message));
        channel.basicAck(deliveryTag, false);
    }

}  

接收消息返回信息示例

sendMessage測試返回

對象消息返回信息示例
sendRabbitMessage測試返回

本章完結(jié)橙凳,后續(xù)還會持續(xù)更新蕾殴,分享Java成長筆記,希望我們能一起成長痕惋。如果你覺得我的分享有用区宇,記得點(diǎn)贊和關(guān)注哦!這對我是最好的鼓勵值戳。謝謝议谷!

PS:轉(zhuǎn)載請注明出處!

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末堕虹,一起剝皮案震驚了整個濱河市卧晓,隨后出現(xiàn)的幾起案子芬首,更是在濱河造成了極大的恐慌,老刑警劉巖逼裆,帶你破解...
    沈念sama閱讀 212,454評論 6 493
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件郁稍,死亡現(xiàn)場離奇詭異,居然都是意外死亡胜宇,警方通過查閱死者的電腦和手機(jī)耀怜,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,553評論 3 385
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來桐愉,“玉大人财破,你說我怎么就攤上這事〈踊澹” “怎么了左痢?”我有些...
    開封第一講書人閱讀 157,921評論 0 348
  • 文/不壞的土叔 我叫張陵,是天一觀的道長系洛。 經(jīng)常有香客問我俊性,道長,這世上最難降的妖魔是什么描扯? 我笑而不...
    開封第一講書人閱讀 56,648評論 1 284
  • 正文 為了忘掉前任定页,我火速辦了婚禮,結(jié)果婚禮上荆烈,老公的妹妹穿的比我還像新娘拯勉。我一直安慰自己,他們只是感情好憔购,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,770評論 6 386
  • 文/花漫 我一把揭開白布宫峦。 她就那樣靜靜地躺著,像睡著了一般玫鸟。 火紅的嫁衣襯著肌膚如雪导绷。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,950評論 1 291
  • 那天屎飘,我揣著相機(jī)與錄音妥曲,去河邊找鬼。 笑死钦购,一個胖子當(dāng)著我的面吹牛檐盟,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播押桃,決...
    沈念sama閱讀 39,090評論 3 410
  • 文/蒼蘭香墨 我猛地睜開眼葵萎,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起羡忘,我...
    開封第一講書人閱讀 37,817評論 0 268
  • 序言:老撾萬榮一對情侶失蹤谎痢,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后卷雕,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體节猿,經(jīng)...
    沈念sama閱讀 44,275評論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,592評論 2 327
  • 正文 我和宋清朗相戀三年漫雕,在試婚紗的時候發(fā)現(xiàn)自己被綠了滨嘱。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 38,724評論 1 341
  • 序言:一個原本活蹦亂跳的男人離奇死亡浸间,死狀恐怖九孩,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情发框,我是刑警寧澤,帶...
    沈念sama閱讀 34,409評論 4 333
  • 正文 年R本政府宣布煤墙,位于F島的核電站梅惯,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏仿野。R本人自食惡果不足惜铣减,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 40,052評論 3 316
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望脚作。 院中可真熱鬧葫哗,春花似錦、人聲如沸球涛。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,815評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽亿扁。三九已至捺典,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間从祝,已是汗流浹背襟己。 一陣腳步聲響...
    開封第一講書人閱讀 32,043評論 1 266
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留牍陌,地道東北人擎浴。 一個月前我還...
    沈念sama閱讀 46,503評論 2 361
  • 正文 我出身青樓,卻偏偏與公主長得像,于是被迫代替她去往敵國和親榜贴。 傳聞我的和親對象是個殘疾皇子做瞪,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,627評論 2 350

推薦閱讀更多精彩內(nèi)容