springboot使用Rabbitmq實(shí)現(xiàn)消息隊(duì)列服務(wù)

rabbitmq中間件搭建在本地虛擬機(jī)上雏门,詳情搭建過程可查看:rabbitmq安裝部署
使用上次搭建的dubbo項(xiàng)目補(bǔ)充rabbitmq實(shí)現(xiàn),代碼可參考:20分鐘springboot搭建dubbo服務(wù)

首先查看virtual-host配置(VirtualHost相當(dāng)月一個(gè)相對獨(dú)立的RabbitMQ服務(wù)器,每個(gè)VirtualHost之間是相互隔離的缚陷。exchange、queue游昼、message不能互通仪缸。)


image.png

rabbitmq原理結(jié)構(gòu)


image.png

生產(chǎn)者/消費(fèi)者模型,類似于交換機(jī)痴荐。Exchange交換器血柳,共有四種類型,不同的類型對應(yīng)不同的路由策略生兆。
Queue:消息隊(duì)列难捌,接收消息膝宁、緩存消息。

Exchange:交換機(jī)根吁,一方面接收生產(chǎn)者發(fā)送來的消息员淫。另一方面知道如何處理消息,例如交給特別的隊(duì)列击敌,或者全部的隊(duì)列介返,或者將消息丟棄。到底如何操作取決于Exchange是哪種類型:

根據(jù)交換機(jī)類型不同沃斤,分為3種發(fā)布模式:
Direct<定向>:1對1-----一個(gè)消息只能被一個(gè)消費(fèi)者消費(fèi)圣蝎;把消息交給符合特定routing key(queue與exchange的關(guān)系key) 的隊(duì)列。
Topic<通配符>:1對多-----一個(gè)消息可以被多個(gè)消費(fèi)者消費(fèi)(輪詢);把消息交給符合routing pattern (路由模式)的隊(duì)列衡瓶。
Fanout<廣播>:將消息分發(fā)給所有綁定到交換機(jī)的隊(duì)列晓避。

消息隊(duì)列內(nèi)生產(chǎn)者添加消息隊(duì)列數(shù)據(jù)放坏,消費(fèi)者接收并使用隊(duì)列中的數(shù)據(jù)玫荣,上次搭建的簡單的dubbo服務(wù)中consumer發(fā)出請求璧针,provider提供查詢數(shù)據(jù)庫的服務(wù),具體如下圖:


image.png

繼續(xù)完成代碼實(shí)現(xiàn)

consumer主體結(jié)構(gòu)如下:


image.png

補(bǔ)充consumer的pom文件rabbitmq配置


    <!--rabbitmq-->
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>

補(bǔ)充consumer內(nèi)yml的rabbitmq配置

spring:
  application:
    name: consumer
  profiles:
    active: test
  #配置rabbitMq 服務(wù)器
  rabbitmq:
    host: 10.1.31.199
    port: 5672
    username: admin
    password: admin
    #虛擬host 可以不設(shè)置,使用server默認(rèn)host
    virtual-host: /

注意1) rabbitmq的默認(rèn)web端口號是15672十厢,接扣訪問端口是5672
2)rabbitmq的默認(rèn)virtualhost配置為"/"
在config文件夾添加DirectRabbitConfig類缭裆,配置rabbitmq的配置信息如下:

package com.example.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class DirectRabbitConfig {
    //隊(duì)列 起名:TestDirectQueue
    @Bean
    public Queue TestDirectQueue() {
        // durable:是否持久化,默認(rèn)是false,持久化隊(duì)列:會(huì)被存儲(chǔ)在磁盤上,當(dāng)消息代理重啟時(shí)仍然存在寿烟,暫存隊(duì)列:當(dāng)前連接有效
        // exclusive:默認(rèn)也是false澈驼,只能被當(dāng)前創(chuàng)建的連接使用,而且當(dāng)連接關(guān)閉后隊(duì)列即被刪除筛武。此參考優(yōu)先級高于durable
        // autoDelete:是否自動(dòng)刪除缝其,當(dāng)沒有生產(chǎn)者或者消費(fèi)者使用此隊(duì)列,該隊(duì)列會(huì)自動(dòng)刪除徘六。
        //   return new Queue("TestDirectQueue",true,true,false);

        //一般設(shè)置一下隊(duì)列的持久化就好,其余兩個(gè)就是默認(rèn)false
        return new Queue("TestDirectQueue",true);
    }

    //Direct交換機(jī) 起名:TestDirectExchange
    @Bean
    DirectExchange TestDirectExchange() {
        //  return new DirectExchange("TestDirectExchange",true,true);
        return new DirectExchange("TestDirectExchange",true,false);
    }

    //綁定  將隊(duì)列和交換機(jī)綁定, 并設(shè)置用于匹配鍵:TestDirectRouting
    @Bean
    Binding bindingDirect() {
        return BindingBuilder.bind(TestDirectQueue()).to(TestDirectExchange()).with("TestDirectRouting");
    }



    @Bean
    DirectExchange lonelyDirectExchange() {
        return new DirectExchange("lonelyDirectExchange");
    }
}

創(chuàng)建測試接口SendMessageController類内边,完成消息隊(duì)列數(shù)據(jù)的添加

package com.example.consumer.openapi;


import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;

@RestController
@RequestMapping("/demo")
public class SendMessageController {


    @Autowired
    RabbitTemplate rabbitTemplate;  //使用RabbitTemplate,這提供了接收/發(fā)送等等方法

    @GetMapping("/sendDirectMessage")
    public String sendDirectMessage() {
        String messageId = String.valueOf(UUID.randomUUID());
        String messageData = "test message, hello!";
        String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
        Map<String,Object> map=new HashMap<>();
        map.put("messageId",messageId);
        map.put("messageData",messageData);
        map.put("createTime",createTime);
        //將消息攜帶綁定鍵值:TestDirectRouting 發(fā)送到交換機(jī)TestDirectExchange
        rabbitTemplate.convertAndSend("TestDirectExchange", "TestDirectRouting", map);
        return "ok";
    }
}

注意:1)convertAndSend的方法中exchange是Virtual host的name決定了在哪個(gè)queue存放消息,routingKey則確定了queue與exchange的綁定待锈,不填寫時(shí)自動(dòng)為exchange的name漠其。
2)rabbitTemplate與amqpTemplate方法,rabbitTemplate實(shí)現(xiàn)自amqpTemplate接口,使用起來并無區(qū)別

啟動(dòng)項(xiàng)目竿音,訪問url和屎,執(zhí)行rabbitmq消息寫入:


image.png

寫入成功:


image.png

provider主體結(jié)構(gòu):


image.png

首先同理consumer,修改provider的pom文件及yml文件

在service文件夾內(nèi)添加DirectReceiver類如下:

package com.example.provider.service.rabbitmq;


import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.util.Map;

@Component
@RabbitListener(queues = "TestDirectQueue")//監(jiān)聽的隊(duì)列名稱 TestDirectQueue
public class DirectReceiver {


    @RabbitHandler
    public void process(Map testMessage) {
        System.out.println("DirectReceiver消費(fèi)者收到消息  : " + testMessage.toString());
    }
}

啟動(dòng)provider項(xiàng)目春瞬,查看監(jiān)聽到的消息如下:


image.png

簡單的消息隊(duì)列完成柴信。


RabbitTemplate和AmqpTemplate的使用區(qū)別:

兩者都能調(diào)用convertAndSend方法向隊(duì)列發(fā)送消息,而
API:amqpTemplate.convertAndSend("隊(duì)列名"宽气,“消息內(nèi)容”)此處隊(duì)列名必須與創(chuàng)建的隊(duì)列一致随常。
API:amqpTemplate.convertAndSend("交換機(jī)名"潜沦,“路由鍵”,“消息內(nèi)容”)
具體實(shí)現(xiàn)可詳看使用方法绪氛。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末唆鸡,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子枣察,更是在濱河造成了極大的恐慌争占,老刑警劉巖,帶你破解...
    沈念sama閱讀 216,919評論 6 502
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件询件,死亡現(xiàn)場離奇詭異,居然都是意外死亡唆樊,警方通過查閱死者的電腦和手機(jī)宛琅,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,567評論 3 392
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來逗旁,“玉大人嘿辟,你說我怎么就攤上這事∑В” “怎么了红伦?”我有些...
    開封第一講書人閱讀 163,316評論 0 353
  • 文/不壞的土叔 我叫張陵,是天一觀的道長淀衣。 經(jīng)常有香客問我昙读,道長,這世上最難降的妖魔是什么膨桥? 我笑而不...
    開封第一講書人閱讀 58,294評論 1 292
  • 正文 為了忘掉前任蛮浑,我火速辦了婚禮,結(jié)果婚禮上只嚣,老公的妹妹穿的比我還像新娘沮稚。我一直安慰自己,他們只是感情好册舞,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,318評論 6 390
  • 文/花漫 我一把揭開白布蕴掏。 她就那樣靜靜地躺著,像睡著了一般调鲸。 火紅的嫁衣襯著肌膚如雪盛杰。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,245評論 1 299
  • 那天藐石,我揣著相機(jī)與錄音饶唤,去河邊找鬼。 笑死贯钩,一個(gè)胖子當(dāng)著我的面吹牛募狂,可吹牛的內(nèi)容都是我干的办素。 我是一名探鬼主播,決...
    沈念sama閱讀 40,120評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼祸穷,長吁一口氣:“原來是場噩夢啊……” “哼性穿!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起雷滚,我...
    開封第一講書人閱讀 38,964評論 0 275
  • 序言:老撾萬榮一對情侶失蹤需曾,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后祈远,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體呆万,經(jīng)...
    沈念sama閱讀 45,376評論 1 313
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,592評論 2 333
  • 正文 我和宋清朗相戀三年车份,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了谋减。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 39,764評論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡扫沼,死狀恐怖出爹,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情缎除,我是刑警寧澤严就,帶...
    沈念sama閱讀 35,460評論 5 344
  • 正文 年R本政府宣布,位于F島的核電站器罐,受9級特大地震影響梢为,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜轰坊,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,070評論 3 327
  • 文/蒙蒙 一抖誉、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧衰倦,春花似錦袒炉、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,697評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至驻襟,卻和暖如春夺艰,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背沉衣。 一陣腳步聲響...
    開封第一講書人閱讀 32,846評論 1 269
  • 我被黑心中介騙來泰國打工郁副, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人豌习。 一個(gè)月前我還...
    沈念sama閱讀 47,819評論 2 370
  • 正文 我出身青樓存谎,卻偏偏與公主長得像拔疚,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個(gè)殘疾皇子既荚,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,665評論 2 354

推薦閱讀更多精彩內(nèi)容