為了演示Topic模式搪花,所以模擬一個需求:
用戶A想接收開封市的天氣
用戶B想接收開封市的新聞
用戶C想接收開封市的新聞和天氣
1.創(chuàng)建基于SpringBoot的生產(chǎn)者模塊和消費者模塊咱娶,兩個模塊結(jié)構(gòu)相同,在hello包下創(chuàng)建SpringBoot主啟動類。
2.引入starter
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
</dependencies>
3.在resources包下創(chuàng)建application.yml文件懊蒸,內(nèi)容如下(根據(jù)實際情況填寫地址用戶名密碼等等):
4.在生產(chǎn)者模塊創(chuàng)建config包,在此包下創(chuàng)建一個配置類(功能是聲明隊列测秸,聲明交換機,聲明RoutingKey,聲明隊列和交換機的綁定關(guān)系)系宜,結(jié)構(gòu)和內(nèi)容如下:
package com.hello.config;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConfig {
//為了方便演示我都使用的public
//隊列
public static final String QUEUE_NEWS = "queue_news";
public static final String QUEUE_WEATHER = "queue_weather";
public static final String QUEUE_NEWS_WEATHER = "queue_news_weather";
//交換機
public static final String EXCHANGE_TOPIC_KAIFENG = "exchange_topic_kaifeng";
//交換機與隊列綁定的RoutingKey
public static final String ROUTINGKEY_NEWS = "*.news";
public static final String ROUTINGKEY_WEATHER = "*.weather";
public static final String ROUTINGKEY_ALL = "kaifeng.*";
//聲明隊列
@Bean(QUEUE_NEWS)
public Queue QUEUE_NEWS(){ //新聞的隊列
return new Queue(QUEUE_NEWS);
}
@Bean(QUEUE_WEATHER)
public Queue QUEUE_WEATHER(){ //天氣的隊列
return new Queue(QUEUE_WEATHER);
}
@Bean(QUEUE_NEWS_WEATHER)
public Queue QUEUE_NEWS_WEATHER(){ //新聞和天氣的隊列
return new Queue(QUEUE_NEWS_WEATHER);
}
//聲明交換機
@Bean(EXCHANGE_TOPIC_KAIFENG)
public Exchange EXCHANGE_TOPIC_INFORM(){
//聲明了一個Topic類型的交換機照激,durable是持久化(重啟rabbitmq這個交換機不會被自動刪除)
return ExchangeBuilder.topicExchange(EXCHANGE_TOPIC_KAIFENG).durable(true).build();
}
//聲明news隊列和交換機綁定關(guān)系,并且指定RoutingKey
@Bean
public Binding NEWS_BINDING_TOPIC(@Qualifier(QUEUE_NEWS) Queue queue,
@Qualifier(EXCHANGE_TOPIC_KAIFENG) Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_NEWS).noargs();
}
//聲明weather隊列和交換機綁定關(guān)系盹牧,并且指定RoutingKey
@Bean
public Binding WEATHER_BINDING_TOPIC(@Qualifier(QUEUE_WEATHER) Queue queue,
@Qualifier(EXCHANGE_TOPIC_KAIFENG) Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_WEATHER).noargs();
}
//聲明news+weather隊列和交換機綁定關(guān)系俩垃,并且指定RoutingKey
@Bean
public Binding NEWS_WEATHER_BINDING_TOPIC(@Qualifier(QUEUE_NEWS_WEATHER) Queue queue,
@Qualifier(EXCHANGE_TOPIC_KAIFENG) Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_ALL).noargs();
}
}
5. 在測試類發(fā)送消息
@SpringBootTest
@RunWith(SpringRunner.class)
public class RabbitTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void sendNews(){
//指定交換機,指定routing key汰寓,發(fā)送消息的內(nèi)容
rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_TOPIC_KAIFENG,"kaifeng.news","開封今年糧食產(chǎn)量提升10%");
}
@Test
public void sendWeather(){
//指定交換機口柳,指定routing key,發(fā)送消息的內(nèi)容
rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_TOPIC_KAIFENG,"kaifeng.weather","開封明天白天多云15℃");
}
}
6.當前整個結(jié)構(gòu)如下
7.我們也可以Rabbit網(wǎng)頁管理平臺上面查看隊列和交換機都已經(jīng)創(chuàng)建,交換機綁定的隊列關(guān)系也已經(jīng)綁定,消息因為沒有消費者所以也是存儲在MQ中有滑。
7.在消費者模塊創(chuàng)建一個類MQConsumer接收消息
內(nèi)容如下:
package com.hello.mq;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class MQConsumer {
//模擬三個用戶接收
@RabbitListener(queues = {"queue_weather"})
public void getMessageA(String msg){
System.out.println("A用戶想看天氣接收到:" + msg);
}
@RabbitListener(queues = {"queue_news"})
public void getMessageB(Message message){ //我們也可以用Message作為參數(shù)接收
byte[] body = message.getBody();
String msg = new String(body);
System.out.println("B用戶想看新聞接收到:" + msg);
}
@RabbitListener(queues = {"queue_news_weather"})
public void getMessageC(String msg){
System.out.println("C用戶想看新聞和天氣接收到:" + msg);
}
}