1蔫浆、安裝
1.1殖属、Erlang:
Erlang下載地址,下載后安裝即可瓦盛。
1.2忱辅、RabbitMQ安裝
RabbitMQ下載地址,下載后安裝即可谭溉。
注意:Erlang的版本要與RabbitMQ版本需要匹配才行墙懂。
RabbitMQ | Minimum required Erlang/OTP | Maximum supported Erlang/OTP |
---|---|---|
3.7.x | 19.3 | 20.3.x |
3.6.15 | 19.3 | 20.3.x |
3.6.14、 3.6.13 扮念、 3.6.12损搬、 3.6.11 | R16B03 | 20.1.x |
3.6.10 、 3.6.9柜与、 3.6.8巧勤、 3.6.7、 3.6.6弄匕、 3.6.5颅悉、 3.6.4 | R16B03 | 19.3.x |
3.6.3 、 3.6.2迁匠、 3.6.1剩瓶、 3.6.0 | R16B03 | 18.3.x |
3.5.x | R14B04 | 17.5.x |
3.4.x | R13B03 | 16B03 |
2、可視化管理界面
-
Erlang和RabbitMQ安裝完成后城丧,通過命令行進(jìn)入到RabbitMQ的安裝目錄下的sbin目錄延曙,輸入以下命令,等待返回亡哄。
- rabbitmq-plugins enable rabbitmq_management
訪問http://ip:15672/, 使用guest/guest或者admin/admin登錄枝缔。
如果使用的是Linux系統(tǒng),記得把防火墻的端口15672開放或者把防火墻關(guān)閉蚊惯。
如果代碼中需要使用新用戶作為測試愿卸,需要在
Admin
標(biāo)簽頁中新建一個用戶,并同時設(shè)置密碼和virtual hosts
截型。
3趴荸、RabbitMQ術(shù)語
- Server(Broker):接收客戶端連接,實(shí)現(xiàn)AMQP協(xié)議的消息隊列和路由功能的進(jìn)程菠劝;
- Virtual Host:虛擬主機(jī)的概念赊舶,類似權(quán)限控制組睁搭,一個Virtual Host里可以有多個Exchange和Queue,權(quán)限控制的最小麗都是Virtual Host;
- Exchange:交換機(jī)笼平,接收生產(chǎn)者發(fā)送的消息园骆,并根據(jù)Routing Key將消息路由到服務(wù)器中的隊列Queue。
- ExchangeType:交換機(jī)類型決定了路由消息行為寓调,RabbitMQ中主要有三種類型Exchange锌唾,分別是fanout、direct夺英、topic晌涕;
- Message Queue:消息隊列,用于存儲還未被消費(fèi)者消費(fèi)的消息;
- Message:由Header和body組成,Header是由生產(chǎn)者添加的各種屬性的集合战得,包括Message是否被持久化懂傀、優(yōu)先級是多少萄喳、由哪個Message Queue接收等;body是真正需要發(fā)送的數(shù)據(jù)內(nèi)容;
- BindingKey(RouteKey):綁定關(guān)鍵字,將一個特定的Exchange和一個特定的Queue綁定起來垮衷。
4、與Spring Boot的整合(簡單版HelloWorld)
4.1乖坠、引入RabbitMQ依賴
首先當(dāng)然是添加RabbitMQ的依賴?yán)膊笸唬瑥膍vn repository找到SpringBoot整合RabbitMQ的整合包。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>${spring-boot-amqp.version}</version>
</dependency>
來看看其依賴情況:
4.2熊泵、添加RabbitMQ配置信息
spring.rabbitmq.host=ip地址
spring.rabbitmq.port=5672
spring.rabbitmq.username=用戶名
spring.rabbitmq.password=密碼
spring.rabbitmq.virtual-host=/
spring.rabbitmq.publisher-confirms=true
這里配置的信息是最基礎(chǔ)的仰迁,復(fù)雜的配置可以自行百度嘗試。
4.3戈次、創(chuàng)建消息生產(chǎn)者
通過注入
AmqpTemplate
接口的實(shí)例來實(shí)現(xiàn)消息的發(fā)送轩勘,AmqpTemplate
接口定義了一套針對AMQP協(xié)議的基礎(chǔ)操作筒扒。在Spring Boot中會根據(jù)配置來注入其具體實(shí)現(xiàn)怯邪。
@Component
public class Sender {
@Autowired
private AmqpTemplate amqpTemplate;
public void send(){
String content = "hello : " + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
amqpTemplate.convertAndSend("hello", content);
}
}
此時的AmqpTemplate對象其實(shí)是RabbitTemplate的實(shí)例,因為RabbitTemplate是AmqpTemplate的子類:
至于為啥能自動注入這個Bean花墩,后面會講解到悬秉,先不急。
4.4冰蘑、創(chuàng)建消息消費(fèi)者
通過
@RabbitListener
注解定義該類對hello
隊列的監(jiān)聽和泌,并用@RabbitHandler
注解來指定對消息的處理方法。所以祠肥,該消費(fèi)者實(shí)現(xiàn)了對hello
隊列的消費(fèi)武氓,消費(fèi)操作為輸出消息的字符串內(nèi)容。
@RabbitListener(queues = {"hello"})
@Component
public class Receiver {
@RabbitHandler
public void handler(String content){
System.out.println("接收到:" + content);
}
}
4.5、創(chuàng)建RabbitMQ配置類
用來配置隊列县恕、交換器东羹、路由等高級信息。
@Configuration
public class RabbitConfig {
@Bean
public Queue helloQueue(){
return new Queue("hello");
}
}
4.6忠烛、編寫測試類
注入消息生產(chǎn)者用于向隊列中發(fā)送消息
@SpringBootTest
@RunWith(SpringRunner.class)
public class TestRabbitmq {
@Autowired
private Sender sender;
@Test
public void test(){
sender.send();
}
}
4.7属提、測試
- 先啟動主程序類,用于監(jiān)聽隊列美尸;
- 然后運(yùn)行測試類冤议,使用生產(chǎn)者向隊列中發(fā)送消息。
4.8 师坎、疑問
學(xué)過RabbitMQ基礎(chǔ)的童靴肯定知道恕酸,要通過RabbitMQ發(fā)送消息的話,需要創(chuàng)建通道胯陋,交換機(jī)尸疆,隊列,并將通道與交換機(jī)惶岭、交換機(jī)與隊列綁定起來寿弱,而上述的簡單例子中,為什么沒看到通道按灶、交換機(jī)的創(chuàng)建症革,也沒看到綁定的操作呢?其實(shí)在RabbitMQ中鸯旁,在不創(chuàng)建交換機(jī)的情況下噪矛,RabbitMQ會創(chuàng)建一個默認(rèn)的交換機(jī),通過RabbitMQ可視化管理界面可以看到:
而且創(chuàng)建的隊列铺罢,默認(rèn)也就綁定到該交換機(jī)上艇挨,見下圖:
再仔細(xì)看這個默認(rèn)交換機(jī),能看到這個交換機(jī)(Exchange)類型是Direct模式的韭赘,至于什么是Direct模式缩滨,后面會講。
5泉瞻、Spring Boot自動配置RabbitMQ
同之前文章一樣脉漏,SpringBoot整合RabbitMQ同樣有個自動配置類,只不過RabbitMQ的自動配置類是由SpringBoot官方自行提供袖牙,而不像Mybatis是由Mybatis方提供的侧巨。這個自動配置類在spring-boot-autoconfigure-xxx.jar包中:
這里說個題外話,是關(guān)于自定義Starter的小知識點(diǎn):
- 啟動器(Starter)只用來做依賴導(dǎo)入鞭达;
- 需要專門寫一個自動配置模塊司忱;
- 啟動器依賴自動配置模塊皇忿,使用的時候只需要引入啟動器(Starter);
而在命名規(guī)范中約定如下:
- 官方命名空間:
- 前綴:spring-boot-starter-xxx
- 模式:spring-boot-starter-模塊名
- 舉例:spring-boot-starter-web、spring-boot-starter-jdbc坦仍、...
- 自定義命名空間:
- 前綴:xxx-spring-boot-starter
- 模式:模塊名-spring-boot-starter
- 舉例:mybatis-spring-boot-starter禁添、pagehelper-spring-boot-starter、...
進(jìn)入正題之前桨踪,咱們先來看看Spring與RabbitMQ的整合時的配置信息:
<!-- RabbitMQ start -->
<!-- 連接配置 -->
<rabbit:connection-factory id="connectionFactory" host="${mq.host}" username="${mq.username}"
password="${mq.password}" port="${mq.port}" />
<rabbit:admin connection-factory="connectionFactory"/>
<!-- 消息隊列客戶端 -->
<rabbit:template id="amqpTemplate" exchange="${mq.queue}_exchange" connection-factory="connectionFactory" />
<!-- queue 隊列聲明 -->
<!--
durable 是否持久化
exclusive 僅創(chuàng)建者可以使用的私有隊列老翘,斷開后自動刪除
auto-delete 當(dāng)所有消費(fèi)端連接斷開后,是否自動刪除隊列 -->
<rabbit:queue id="test_queue" name="${mq.queue}_testQueue" durable="true" auto-delete="false" exclusive="false" />
<!-- Topic交換機(jī)定義锻离,其他類型交換機(jī)類似 -->
<!--
交換機(jī):一個交換機(jī)可以綁定多個隊列铺峭,一個隊列也可以綁定到多個交換機(jī)上。
如果沒有隊列綁定到交換機(jī)上汽纠,則發(fā)送到該交換機(jī)上的信息則會丟失卫键。
-->
<rabbit:topic-exchange name="${mq.queue}_exchange" durable="true" auto-delete="false">
<rabbit:bindings>
<!-- 設(shè)置消息Queue匹配的pattern (direct模式為key) -->
<rabbit:binding queue="test_queue" pattern="${mq.queue}_patt"/>
</rabbit:bindings>
</rabbit:topic-exchange>
<bean name="rabbitmqService" class="xxx.yyy.zzz"></bean>
<!-- 配置監(jiān)聽 消費(fèi)者 -->
<rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto">
<!--
queues 監(jiān)聽隊列,多個用逗號分隔
ref 監(jiān)聽器 -->
<rabbit:listener queues="test_queue" ref="rabbitmqService"/>
</rabbit:listener-container>
配合上面的xml配置文件來看看SpringBoot中RabbitMQ的自動配置類RabbitAutoConfiguration:
@Configuration
@ConditionalOnClass({ RabbitTemplate.class, Channel.class })
@EnableConfigurationProperties(RabbitProperties.class)
@Import(RabbitAnnotationDrivenConfiguration.class)
public class RabbitAutoConfiguration {
@Configuration
@ConditionalOnMissingBean(ConnectionFactory.class)
protected static class RabbitConnectionFactoryCreator {
@Bean
public CachingConnectionFactory rabbitConnectionFactory(RabbitProperties config)
throws Exception {
RabbitConnectionFactoryBean factory = new RabbitConnectionFactoryBean();
//...
return connectionFactory;
}
}
@Configuration
@Import(RabbitConnectionFactoryCreator.class)
protected static class RabbitTemplateConfiguration {
@Bean
@ConditionalOnSingleCandidate(ConnectionFactory.class)
@ConditionalOnMissingBean(RabbitTemplate.class)
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
//...
return rabbitTemplate;
}
@Bean
@ConditionalOnSingleCandidate(ConnectionFactory.class)
@ConditionalOnProperty(prefix = "spring.rabbitmq", name = "dynamic", matchIfMissing = true)
@ConditionalOnMissingBean(AmqpAdmin.class)
public AmqpAdmin amqpAdmin(ConnectionFactory connectionFactory) {
return new RabbitAdmin(connectionFactory);
}
}
}
該自動配置類中自動注冊了三個重要的Bean虱朵,分別是rabbitConnectionFactory莉炉、rabbitTemplate、amqpAdmin碴犬,剛好與xml配置文件中的前三個Bean一一對應(yīng)絮宁。當(dāng)然RabbitMQ的配置信息由RabbitProperties類進(jìn)行導(dǎo)入:
@ConfigurationProperties(prefix = "spring.rabbitmq")
public class RabbitProperties {
private String host = "localhost";
private int port = 5672;
private String username;
private String password;
private final Ssl ssl = new Ssl();
private String virtualHost;
private String addresses;
private Integer requestedHeartbeat;
private boolean publisherConfirms;
private boolean publisherReturns;
private Integer connectionTimeout;
private final Cache cache = new Cache();
private final Listener listener = new Listener();
private final Template template = new Template();
private List<Address> parsedAddresses;
public static class Ssl {
private boolean enabled;
private String keyStore;
private String keyStorePassword;
private String trustStore;
private String trustStorePassword;
private String algorithm;
}
public static class Cache {
private final Channel channel = new Channel();
private final Connection connection = new Connection();
public static class Channel {
private Integer size;
private Long checkoutTimeout;
}
public static class Connection {
private CacheMode mode = CacheMode.CHANNEL;
private Integer size;
}
}
public static class Listener {
@NestedConfigurationProperty
private final AmqpContainer simple = new AmqpContainer();
}
public static class AmqpContainer {
private boolean autoStartup = true;
private AcknowledgeMode acknowledgeMode;
private Integer concurrency;
private Integer maxConcurrency;
private Integer prefetch;
private Integer transactionSize;
private Boolean defaultRequeueRejected;
private Long idleEventInterval;
@NestedConfigurationProperty
private final ListenerRetry retry = new ListenerRetry();
}
public static class Template {
@NestedConfigurationProperty
private final Retry retry = new Retry();
private Boolean mandatory;
private Long receiveTimeout;
private Long replyTimeout;
}
public static class Retry {
private boolean enabled;
private int maxAttempts = 3;
private long initialInterval = 1000L;
private double multiplier = 1.0;
private long maxInterval = 10000L;
}
public static class ListenerRetry extends Retry {
private boolean stateless = true;
}
private static final class Address {
private static final String PREFIX_AMQP = "amqp://";
private static final int DEFAULT_PORT = 5672;
private String host;
private int port;
private String username;
private String password;
private String virtualHost;
}
}
大家自行嘗試,這里就不多描述了服协。
在RabbitAutoConfiguration配置類上有個簽名:
@Import(RabbitAnnotationDrivenConfiguration.class)
來看看RabbitAnnotationDrivenConfiguration類:
public class SimpleRabbitListenerContainerFactory
extends AbstractRabbitListenerContainerFactory<SimpleMessageListenerContainer>
implements ApplicationContextAware, ApplicationEventPublisherAware {
private Executor taskExecutor;
private PlatformTransactionManager transactionManager;
private Integer txSize;
private Integer concurrentConsumers;
private Integer maxConcurrentConsumers;
private Long startConsumerMinInterval;
private Long stopConsumerMinInterval;
private Integer consecutiveActiveTrigger;
private Integer consecutiveIdleTrigger;
private Integer prefetchCount;
private Long receiveTimeout;
private Boolean defaultRequeueRejected;
private Advice[] adviceChain;
private BackOff recoveryBackOff;
private Boolean missingQueuesFatal;
private Boolean mismatchedQueuesFatal;
private ConsumerTagStrategy consumerTagStrategy;
private Long idleEventInterval;
private ApplicationEventPublisher applicationEventPublisher;
private ApplicationContext applicationContext;
@Override
protected SimpleMessageListenerContainer createContainerInstance() {
return new SimpleMessageListenerContainer();
}
@Override
protected void initializeContainer(SimpleMessageListenerContainer instance) {
//other code...
}
}
它其實(shí)是SimpleMessageListenerContainer的工廠類绍昂,而SimpleMessageListenerContainer又是<rabbit:listener-container />標(biāo)簽的具體實(shí)現(xiàn)類,剛好又同xml配置文件的消費(fèi)監(jiān)聽容器對應(yīng)偿荷。
至于其他的配置信息窘游,如隊列和交換機(jī)的創(chuàng)建,以及隊列與交換機(jī)的綁定就由配置類自行定義了跳纳。請往下接著看...
6忍饰、RabbitMQ交換機(jī)及工作模式
RabbitMQ的交換機(jī)Exchange有如下幾種類型:
Fanout
Direct
Topic
**Header **
其中header類型的Exchange由于用的相對較少,所以本章主要講述其他三種類型的Exchange寺庄。
RabbitMQ的工作模式:
- 發(fā)布/訂閱模式:對應(yīng)Fanout類型的交換機(jī)艾蓝。
- 路由模式:對應(yīng)Direct類型的交換機(jī)。
- 通配符模式:對應(yīng)Topic類型的交換機(jī)铣揉。
6.1饶深、發(fā)布/訂閱模式(Fanout)
任何發(fā)送到Fanout Exchange的消息都會被轉(zhuǎn)發(fā)到與該Exchange綁定(Binding)的所有Queue上。
- 可以理解為路由表的模式逛拱;
- 這種模式不需要RouteKey;
- 這種模式需要提前將Exchange與Queue進(jìn)行綁定台猴,一個Exchange可以綁定多個Queue朽合,一個Queue可以同多個Exchange進(jìn)行綁定俱两;
- 如果接受到消息的Exchange沒有與任何Queue綁定,則消息會被拋棄曹步。
代碼示例:
FanoutConfig配置類代碼宪彩,配置了兩個隊列和一個交換機(jī),并綁定:
@Configuration
public class FanoutConfig {
public static final String FANOUT_QUEUE_NAME_1 = "fanout-queue-1";
public static final String FANOUT_QUEUE_NAME_2 = "fanout-queue-2";
public static final String FANOUT_EXCHANGE_NAME = "fanout-exchange";
@Bean
public Queue fanoutQueue1() {
// return new Queue(FANOUT_QUEUE_NAME_1);//默認(rèn)情況讲婚,durable為true,exclusive為false,auto-delete為false
return QueueBuilder.durable(FANOUT_QUEUE_NAME_1).build();
}
@Bean
public Queue fanoutQueue2() {
// return new Queue(FANOUT_QUEUE_NAME_1);//默認(rèn)情況尿孔,durable為true,exclusive為false,auto-delete為false
return QueueBuilder.durable(FANOUT_QUEUE_NAME_2).build();
}
@Bean
public FanoutExchange fanoutExchange() {
// return new FanoutExchange(FANOUT_EXCHANGE_NAME);//默認(rèn)情況下,durable為true,auto-delete為false
return (FanoutExchange) ExchangeBuilder.fanoutExchange(FANOUT_EXCHANGE_NAME).durable(true).build();
}
@Bean
public Binding fanoutBinding1(FanoutExchange fanoutExchange, Queue fanoutQueue1) {
return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
}
@Bean
public Binding fanoutBinding2(FanoutExchange fanoutExchange, Queue fanoutQueue2) {
return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
}
}
消息生產(chǎn)者:
@Component(value = "fanout-sender")
public class Sender {
@Autowired
private AmqpTemplate amqpTemplate;
public void send(String name){
String content = "hello : " + name + "筹麸,當(dāng)前時間:" + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
amqpTemplate.convertAndSend(FanoutConfig.FANOUT_EXCHANGE_NAME, "", content);
}
}
消費(fèi)者1號:
@RabbitListener(queues = {FanoutConfig.FANOUT_QUEUE_NAME_1})
@Component("fanout-receiver1")
public class Receiver1 {
@RabbitHandler
public void handler(String content){
System.out.println("Fanout.Receiver1接收到:" + content);
}
}
消費(fèi)者2號:
@RabbitListener(queues = {FanoutConfig.FANOUT_QUEUE_NAME_2})
@Component("fanout-receiver2")
public class Receiver2 {
@RabbitHandler
public void handler(String content){
System.out.println("Fanout.Receiver2接收到:" + content);
}
}
Fanout控制器:
@RestController
public class FanoutController {
@Autowired
@Qualifier("fanout-sender")
private Sender sender;
@RequestMapping("/fanout")
public String hello(String name){
sender.send(name);
return "success";
}
}
輸入url:http://localhost:8081/fanout?name=Cay 觀看控制臺輸出:
6.2活合、路由模式(Direct)
RabbitMQ默認(rèn)自帶Exchange,該Exchange的名字為空字符串物赶,當(dāng)前也可以自己指定名字白指;
在默認(rèn)的Exchange下,不需要將Exchange與Queue綁定酵紫, RabbitMQ會自動綁定告嘲;而如果使用自定義的Exchange,則需要在將Exchange綁定到Queue的時候需要指定一個RouteKey奖地;
在消息傳遞時需要一個RouteKey橄唬;
所有發(fā)送到Direct Exchange的消息會被轉(zhuǎn)發(fā)到RouteKey中指定的Queue。
如果vhost中不存在RouteKey中指定的隊列名参歹,則該消息會被拋棄轧坎。
代碼示例:
DirectConfig配置類代碼,配置兩個隊列泽示,通過兩個不同的routeKey綁定到同一個Exchange上:
@Configuration
public class DirectConfig {
public static final String DIRECT_QUEUE_NAME_1 = "direct-queue-1";
public static final String DIRECT_QUEUE_NAME_2 = "direct-queue-2";
public static final String DIRECT_EXCHANGE_NAME = "direct-exchange";
public static final String ROUTE_KEY_1 = "direct.route.key.1";
public static final String ROUTE_KEY_2 = "direct.route.key.2";
@Bean
public Queue directQueue1() {
// return new Queue(DIRECT_QUEUE_NAME_1);//默認(rèn)情況缸血,durable為true,exclusive為false,auto-delete為false
return QueueBuilder.durable(DIRECT_QUEUE_NAME_1).build();
}
@Bean
public Queue directQueue2() {
// return new Queue(DIRECT_QUEUE_NAME_2);//默認(rèn)情況,durable為true,exclusive為false,auto-delete為false
return QueueBuilder.durable(DIRECT_QUEUE_NAME_2).build();
}
@Bean
public DirectExchange directExchange() {
// return new DirectExchange(DIRECT_EXCHANGE_NAME_1);//默認(rèn)情況下械筛,durable為true,auto-delete為false
return (DirectExchange) ExchangeBuilder.directExchange(DIRECT_EXCHANGE_NAME).durable(true).build();
}
@Bean
public Binding directBinding1(DirectExchange directExchange, Queue directQueue1) {
return BindingBuilder.bind(directQueue1).to(directExchange).with(ROUTE_KEY_1);
}
@Bean
public Binding directBinding2(DirectExchange directExchange, Queue directQueue2) {
return BindingBuilder.bind(directQueue2).to(directExchange).with(ROUTE_KEY_2);
}
}
消息生產(chǎn)者:
@Component("direct-sender")
public class Sender {
@Autowired
private AmqpTemplate amqpTemplate;
public void send(Integer selector) {
String content = "hello捎泻,我是%d號,當(dāng)前時間:" + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
String routeKey = "";
if (selector.intValue() == 1) {
content = String.format(content, 1);
routeKey = DirectConfig.ROUTE_KEY_1;
} else if (selector.intValue() == 2) {
content = String.format(content, 2);
routeKey = DirectConfig.ROUTE_KEY_2;
}
amqpTemplate.convertAndSend(DirectConfig.DIRECT_EXCHANGE_NAME, routeKey, content);
}
}
消費(fèi)者1號:
@RabbitListener(queues = {DirectConfig.DIRECT_QUEUE_NAME_1})
@Component("direct-receiver1")
public class Receiver1 {
@RabbitHandler
public void handler(String content){
System.out.println("Direct.Receiver1接收到:" + content);
}
}
消費(fèi)者2號:
@RabbitListener(queues = {DirectConfig.DIRECT_QUEUE_NAME_2})
@Component("direct-receiver2")
public class Receiver2 {
@RabbitHandler
public void handler(String content){
System.out.println("Direct.Receiver2接收到:" + content);
}
}
Direct控制器:
@RestController
public class DirectController {
private static final Logger logger = LoggerFactory.getLogger(DirectController.class);
@Autowired
@Qualifier("direct-sender")
private Sender sender;
@RequestMapping("/direct")
public String hello(@RequestParam(defaultValue = "1") int selector){
logger.info("參數(shù)selector:" + selector);
sender.send(selector);
return "success";
}
}
輸入兩次不同的參數(shù)selector:
6.3埋哟、通配符模式(Topic)
任何發(fā)送到Topic Exchange的消息都會被轉(zhuǎn)發(fā)到所有關(guān)心RouteKey中指定的Queue上笆豁。
這種模式較為復(fù)雜,簡單來說赤赊,就是每個隊列都有其關(guān)心的主題闯狱,所有的消息都帶有一個“標(biāo)題”(RouteKey),Exchange會將消息轉(zhuǎn)發(fā)到所有關(guān)注主題能與RouteKey模糊匹配的隊列抛计。
這種模式需要RouteKey哄孤,也需要提前綁定Exchange與Queue。
在進(jìn)行綁定時吹截,要提供一個該隊列關(guān)心的主題瘦陈,如“#.log.#”表示該隊列關(guān)心所有涉及l(fā)og的消息(一個RouteKey為”MQ.log.error”的消息會被轉(zhuǎn)發(fā)到該隊列)凝危。
“#”表示0個或若干個關(guān)鍵字,“*”表示一個關(guān)鍵字晨逝。如“l(fā)og.*”能與“l(fā)og.warn”匹配蛾默,但是無法與“l(fā)og.warn.timeout”匹配;但是“l(fā)og.#”能與上述兩者匹配捉貌。
同樣支鸡,如果Exchange沒有發(fā)現(xiàn)能夠與RouteKey匹配的Queue,則會拋棄此消息趁窃。
代碼示例:
TopicConfig配置類牧挣,聲明了兩個隊列,分別對應(yīng)兩個routeKey:topic.#和topic.*
@Configuration
public class TopicConfig {
public static final String TOPIC_QUEUE_NAME_1 = "topic-queue-1";
public static final String TOPIC_QUEUE_NAME_2 = "topic-queue-2";
public static final String TOPIC_EXCHANGE_NAME = "topic-exchange";
public static final String ROUTE_KEY_1 = "topic.#";
public static final String ROUTE_KEY_2 = "topic.*";
@Bean
public TopicExchange topicExchange() {
// return new TopicExchange(TOPIC_EXCHANGE_NAME);//默認(rèn)情況下棚菊,durable為true,auto-delete為false
return (TopicExchange) ExchangeBuilder.topicExchange(TOPIC_EXCHANGE_NAME).durable(true).build();
}
@Bean
public Queue topicQueue1() {
return new Queue(TOPIC_QUEUE_NAME_1);
}
@Bean
public Queue topicQueue2() {
return new Queue(TOPIC_QUEUE_NAME_2);
}
@Bean
public Binding topicBinding1(TopicExchange topicExchange, Queue topicQueue1) {
return BindingBuilder.bind(topicQueue1).to(topicExchange).with(ROUTE_KEY_1);
}
@Bean
public Binding topicBinding2(TopicExchange topicExchange, Queue topicQueue2) {
return BindingBuilder.bind(topicQueue2).to(topicExchange).with(ROUTE_KEY_2);
}
}
消息生產(chǎn)者:
@Component("topic-sender")
public class Sender {
private static final String TOPIC_PREFIX = "topic.";
@Autowired
private AmqpTemplate amqpTemplate;
public void send(String selector){
String content = "hello浸踩,當(dāng)前時間:" + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
amqpTemplate.convertAndSend(TopicConfig.TOPIC_EXCHANGE_NAME, TOPIC_PREFIX + selector, content);
}
}
消費(fèi)者1號:
@RabbitListener(queues = {TopicConfig.TOPIC_QUEUE_NAME_1})
@Component("topic-receiver1")
public class Receiver1 {
@RabbitHandler
public void handler(String content){
System.out.println("Topic.Receiver1接收到:" + content);
}
}
消費(fèi)者2號:
@RabbitListener(queues = {TopicConfig.TOPIC_QUEUE_NAME_2})
@Component("topic-receiver2")
public class Receiver2 {
@RabbitHandler
public void handler(String content){
System.out.println("Topic.Receiver2接收到:" + content);
}
}
Topic控制器:
@RestController
@RequestMapping("/topic")
public class TopicController {
@Autowired
@Qualifier("topic-sender")
private Sender sender;
@RequestMapping("/send")
public String send(String routeKey){
sender.send(routeKey);
return "success";
}
}
通過url訪問不同的routeKey來查看結(jié)果:
可以看到,如果參數(shù)為message的時候即routeKey為topic.message统求,兩個隊列都能接收到消息检碗,而如果參數(shù)為message.a的時候即routeKey為topic.message.a,只有隊列1能接收到消息码邻,而隊列2不能接收到消息折剃。
7、消息確認(rèn)與回調(diào)
? 默認(rèn)情況下像屋,RabbitMQ發(fā)送消息以及接收消息是自動確認(rèn)的怕犁,意思也就是說,消息發(fā)送方發(fā)送消息的時候己莺,認(rèn)為消息已經(jīng)成功發(fā)送到了RabbitMQ服務(wù)器奏甫,而當(dāng)消息發(fā)送給消費(fèi)者后,RabbitMQ服務(wù)器就立即自動確認(rèn)凌受,然后將消息從隊列中刪除了阵子。而這樣的自動機(jī)制會造成消息的丟失,我們常常聽到“丟消息”的字眼胜蛉。
? 為了解決消息的丟失挠进,RabbitMQ便產(chǎn)生了手動確認(rèn)的機(jī)制:
- 發(fā)送者:
- 當(dāng)消息不能路由到任何隊列時,會進(jìn)行確認(rèn)失敗操作誊册,如果發(fā)送方設(shè)置了mandatory=true模式领突,則先會調(diào)用basic.return方法,然后調(diào)用basic.ack方法案怯;
- 當(dāng)消息可以路由時君旦,消息被發(fā)送到所有綁定的隊列時,進(jìn)行消息的確認(rèn)basic.ack。
- 接收者:
- 當(dāng)消息成功被消費(fèi)時于宙,可以進(jìn)行消息的確認(rèn)basic.ack浮驳;
- 當(dāng)消息不能正常被消息時悍汛,可以進(jìn)行消息的反確認(rèn)basic.nack 或者拒絕basic.reject捞魁。
當(dāng)設(shè)置mandatory=true時,其中basic.ack和basic.nack會調(diào)用自定義的RabbitTemplate.ConfirmCallback接口的confirm方法离咐。
public interface ConfirmCallback {
/**
* Confirmation callback.
* @param correlationData correlation data for the callback.
* @param ack true for ack, false for nack
* @param cause An optional cause, for nack, when available, otherwise null.
*/
void confirm(CorrelationData correlationData, boolean ack, String cause);
}
而發(fā)送者發(fā)送消息時無法路由后谱俭,會調(diào)用baisc.return方法,其確認(rèn)機(jī)制由RabbitTemplate.ReturnCallback接口的returnedMessage方法實(shí)現(xiàn)宵蛀。
public interface ReturnCallback {
/**
* Returned message callback.
* @param message the returned message.
* @param replyCode the reply code.
* @param replyText the reply text.
* @param exchange the exchange.
* @param routingKey the routing key.
*/
void returnedMessage(Message message, int replyCode, String replyText,
String exchange, String routingKey);
}
7.1昆著、使用Spring配置RabbitMQ的確認(rèn)機(jī)制
- 修改publisher-confirms為true
<!--連接工廠 -->
<rabbit:connection-factory id="connectionFactory" host="{ip地址}" port="{端口}" username="{用戶名}" password="{密碼}" publisher-confirms="true"/>
- 修改消息回調(diào)方法confirm-callback和return-callback為bean的id
<!-- mandatory必須設(shè)置true,return callback才生效 -->
<rabbit:template id="amqpTemplate" connection-factory="connectionFactory"
confirm-callback="confirmCallBackListener"
return-callback="returnCallBackListener"
mandatory="true"
/>
- 消息回調(diào)類
@Service
public class ConfirmCallBackListener implements RabbitTemplate.ConfirmCallback{
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
//other code...
}
}
@Service
class returnCallBackListener implements RabbitTemplate.ReturnCallback{
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
//other code...
}
}
- 修改消息確認(rèn)機(jī)制改成手動確認(rèn)manual:
<rabbit:listener-container
connection-factory="connectionFactory" acknowledge="manual" >
<!-- other xml -->
</rabbit:listener-container>
7.2、使用SpringBoot配置RabbitMQ的確認(rèn)機(jī)制
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
//other code...
//發(fā)送者是否確認(rèn)
cachingConnectionFactory.setPublisherConfirms(true);
return cachingConnectionFactory;
}
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(){
SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory = new SimpleRabbitListenerContainerFactory();
//other code...
//修改成手動確認(rèn)
rabbitListenerContainerFactory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
return rabbitListenerContainerFactory;
}
@Bean
public RabbitTemplate rabbitTemplate() {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
//重點(diǎn)
rabbitTemplate.setMandatory(true);
//消息回調(diào)
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
});
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
});
return rabbitTemplate;
}