springboot如何配置RabbitMq
<!-- 添加springboot對amqp的支持 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
接著在application.properties中配置mq賬戶密碼等
發(fā)送者
@Autowired
private AmqpTemplate template;
/**
* 發(fā)送數(shù)據(jù)到rabbitMq
* @param exchangeName 交換機(jī)名稱
* @param routingKey 路由
* @param data 需發(fā)送的數(shù)據(jù)
*/
public void sendMqData(String exchangeName, String routingKey, String json){
log.info(routingKey+"===========sendmq:"+json);
template.convertAndSend(exchangeName,routingKey,json);
}
接受者
springboot注解的方式很簡單驼唱,注意里面的參數(shù)怎么寫
@RabbitListener(bindings =@QueueBinding(
value = @Queue(value = RabbitMqConfig.reciveQueueName,autoDelete = "true"),
exchange = @Exchange(value = RabbitMqConfig.defaultExchangeName,type = "topic"),
key = "XXX.aaa.gateway.#"
))
@RabbitHandler
public void reciveMqMessage(String message, Channel channel) {
log.info("===========recivemq:"+message);
try {
gatewayReciveDataService.reciveDataHandle(message);
} catch (Exception e) {
e.printStackTrace();
}
}
如何動(dòng)態(tài)創(chuàng)建隊(duì)列并監(jiān)聽隊(duì)列
先寫配置類
package com.iot.service;
import com.iot.listener.RabbitMqListener;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* RabbitMq配置類
* GYB
* 20190320
*/
@Configuration
public class RabbitMqConf {
// 配置rabbitmq的監(jiān)聽類,本來打算在xml配置中配置辨赐,但是配置文件中要求必須指定隊(duì)列名稱,所以改成這種配置類注解的方式
// <!--<rabbit:listener-container connection-factory="mqconnectionFactory" >-->
// <!--<rabbit:listener ref="rabbitMqListener" queue-names=""/>-->
// <!--</rabbit:listener-container>-->
//RabbitMqListener是我們自定義的接受數(shù)據(jù)的類帆焕,他要 implements MessageListener類不恭,重寫onMessage方法即可
@Autowired
RabbitMqListener rabbitMqListener;
@Bean
public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setAcknowledgeMode(AcknowledgeMode.AUTO);
// container.setQueueNames("test-queue1");
container.setMessageListener(rabbitMqListener);
return container;
}
}
然后我們在自己的邏輯業(yè)務(wù)中,動(dòng)態(tài)創(chuàng)建隊(duì)列和 監(jiān)聽隊(duì)列
@Autowired
SimpleMessageListenerContainer container;
@Autowired
ConnectionFactory connectionFactory;
/**
* 創(chuàng)建隊(duì)列换吧,并綁定routeKeys,并監(jiān)聽此隊(duì)列消息
* @param queuename
* @param routeKeys
*/
public void createOrBindQueue(String queuename,String... routeKeys) {
if(RabbitMqMemory.IotMqChannel==null){
//創(chuàng)建隊(duì)列
Channel channel = connectionFactory.createConnection().createChannel(false);
try {
//創(chuàng)建排他隊(duì)列沾瓦,Connection連接(非channel連接!)斷開時(shí)自動(dòng)刪除隊(duì)列暴拄,
channel.queueDeclare(queuename, false, false, true, null);
if(routeKeys.length>0){
// 隊(duì)列綁定exchange和多個(gè)路由
for (String routeKey:routeKeys
) {
channel.queueBind(queuename, RabbitMqMemory.defaultExchangeName, routeKey);
}
}
} catch (IOException e) {
e.printStackTrace();
}
// 為某隊(duì)列添加監(jiān)聽
container.addQueueNames(queuename);
System.out.println("隊(duì)列監(jiān)聽中========================");
RabbitMqMemory.IotMqChannel = channel;
}else{
//已有隊(duì)列和通道,只需綁定routeKeys
queueBindRouteKeys(queuename,routeKeys);
}
}