rabbitmq中間件搭建在本地虛擬機(jī)上雏门,詳情搭建過程可查看:rabbitmq安裝部署
使用上次搭建的dubbo項(xiàng)目補(bǔ)充rabbitmq實(shí)現(xiàn),代碼可參考:20分鐘springboot搭建dubbo服務(wù)
首先查看virtual-host配置(VirtualHost相當(dāng)月一個(gè)相對獨(dú)立的RabbitMQ服務(wù)器,每個(gè)VirtualHost之間是相互隔離的缚陷。exchange、queue游昼、message不能互通仪缸。)
rabbitmq原理結(jié)構(gòu)
生產(chǎn)者/消費(fèi)者模型,類似于交換機(jī)痴荐。Exchange交換器血柳,共有四種類型,不同的類型對應(yīng)不同的路由策略生兆。
Queue:消息隊(duì)列难捌,接收消息膝宁、緩存消息。
Exchange:交換機(jī)根吁,一方面接收生產(chǎn)者發(fā)送來的消息员淫。另一方面知道如何處理消息,例如交給特別的隊(duì)列击敌,或者全部的隊(duì)列介返,或者將消息丟棄。到底如何操作取決于Exchange是哪種類型:
根據(jù)交換機(jī)類型不同沃斤,分為3種發(fā)布模式:
Direct<定向>:1對1-----一個(gè)消息只能被一個(gè)消費(fèi)者消費(fèi)圣蝎;把消息交給符合特定routing key(queue與exchange的關(guān)系key) 的隊(duì)列。
Topic<通配符>:1對多-----一個(gè)消息可以被多個(gè)消費(fèi)者消費(fèi)(輪詢);把消息交給符合routing pattern (路由模式)的隊(duì)列衡瓶。
Fanout<廣播>:將消息分發(fā)給所有綁定到交換機(jī)的隊(duì)列晓避。
消息隊(duì)列內(nèi)生產(chǎn)者添加消息隊(duì)列數(shù)據(jù)放坏,消費(fèi)者接收并使用隊(duì)列中的數(shù)據(jù)玫荣,上次搭建的簡單的dubbo服務(wù)中consumer發(fā)出請求璧针,provider提供查詢數(shù)據(jù)庫的服務(wù),具體如下圖:
繼續(xù)完成代碼實(shí)現(xiàn)
consumer主體結(jié)構(gòu)如下:
補(bǔ)充consumer的pom文件rabbitmq配置
<!--rabbitmq-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
補(bǔ)充consumer內(nèi)yml的rabbitmq配置
spring:
application:
name: consumer
profiles:
active: test
#配置rabbitMq 服務(wù)器
rabbitmq:
host: 10.1.31.199
port: 5672
username: admin
password: admin
#虛擬host 可以不設(shè)置,使用server默認(rèn)host
virtual-host: /
注意1) rabbitmq的默認(rèn)web端口號是15672十厢,接扣訪問端口是5672
2)rabbitmq的默認(rèn)virtualhost配置為"/"
在config文件夾添加DirectRabbitConfig類缭裆,配置rabbitmq的配置信息如下:
package com.example.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class DirectRabbitConfig {
//隊(duì)列 起名:TestDirectQueue
@Bean
public Queue TestDirectQueue() {
// durable:是否持久化,默認(rèn)是false,持久化隊(duì)列:會(huì)被存儲(chǔ)在磁盤上,當(dāng)消息代理重啟時(shí)仍然存在寿烟,暫存隊(duì)列:當(dāng)前連接有效
// exclusive:默認(rèn)也是false澈驼,只能被當(dāng)前創(chuàng)建的連接使用,而且當(dāng)連接關(guān)閉后隊(duì)列即被刪除筛武。此參考優(yōu)先級高于durable
// autoDelete:是否自動(dòng)刪除缝其,當(dāng)沒有生產(chǎn)者或者消費(fèi)者使用此隊(duì)列,該隊(duì)列會(huì)自動(dòng)刪除徘六。
// return new Queue("TestDirectQueue",true,true,false);
//一般設(shè)置一下隊(duì)列的持久化就好,其余兩個(gè)就是默認(rèn)false
return new Queue("TestDirectQueue",true);
}
//Direct交換機(jī) 起名:TestDirectExchange
@Bean
DirectExchange TestDirectExchange() {
// return new DirectExchange("TestDirectExchange",true,true);
return new DirectExchange("TestDirectExchange",true,false);
}
//綁定 將隊(duì)列和交換機(jī)綁定, 并設(shè)置用于匹配鍵:TestDirectRouting
@Bean
Binding bindingDirect() {
return BindingBuilder.bind(TestDirectQueue()).to(TestDirectExchange()).with("TestDirectRouting");
}
@Bean
DirectExchange lonelyDirectExchange() {
return new DirectExchange("lonelyDirectExchange");
}
}
創(chuàng)建測試接口SendMessageController類内边,完成消息隊(duì)列數(shù)據(jù)的添加
package com.example.consumer.openapi;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
@RestController
@RequestMapping("/demo")
public class SendMessageController {
@Autowired
RabbitTemplate rabbitTemplate; //使用RabbitTemplate,這提供了接收/發(fā)送等等方法
@GetMapping("/sendDirectMessage")
public String sendDirectMessage() {
String messageId = String.valueOf(UUID.randomUUID());
String messageData = "test message, hello!";
String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
Map<String,Object> map=new HashMap<>();
map.put("messageId",messageId);
map.put("messageData",messageData);
map.put("createTime",createTime);
//將消息攜帶綁定鍵值:TestDirectRouting 發(fā)送到交換機(jī)TestDirectExchange
rabbitTemplate.convertAndSend("TestDirectExchange", "TestDirectRouting", map);
return "ok";
}
}
注意:1)convertAndSend的方法中exchange是Virtual host的name決定了在哪個(gè)queue存放消息,routingKey則確定了queue與exchange的綁定待锈,不填寫時(shí)自動(dòng)為exchange的name漠其。
2)rabbitTemplate與amqpTemplate方法,rabbitTemplate實(shí)現(xiàn)自amqpTemplate接口,使用起來并無區(qū)別
啟動(dòng)項(xiàng)目竿音,訪問url和屎,執(zhí)行rabbitmq消息寫入:
寫入成功:
provider主體結(jié)構(gòu):
首先同理consumer,修改provider的pom文件及yml文件
在service文件夾內(nèi)添加DirectReceiver類如下:
package com.example.provider.service.rabbitmq;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Map;
@Component
@RabbitListener(queues = "TestDirectQueue")//監(jiān)聽的隊(duì)列名稱 TestDirectQueue
public class DirectReceiver {
@RabbitHandler
public void process(Map testMessage) {
System.out.println("DirectReceiver消費(fèi)者收到消息 : " + testMessage.toString());
}
}
啟動(dòng)provider項(xiàng)目春瞬,查看監(jiān)聽到的消息如下:
簡單的消息隊(duì)列完成柴信。
RabbitTemplate和AmqpTemplate的使用區(qū)別:
兩者都能調(diào)用convertAndSend方法向隊(duì)列發(fā)送消息,而
API:amqpTemplate.convertAndSend("隊(duì)列名"宽气,“消息內(nèi)容”)此處隊(duì)列名必須與創(chuàng)建的隊(duì)列一致随常。
API:amqpTemplate.convertAndSend("交換機(jī)名"潜沦,“路由鍵”,“消息內(nèi)容”)
具體實(shí)現(xiàn)可詳看使用方法绪氛。