SpringBoot中一般我們創(chuàng)建RabbitMQ隊列以及綁定關(guān)系枫弟,是通過@Bean的方式萧恕,但是RabbitMQ提供了
AmqpAdmin
對象玄柏,可以在代碼中聲明隊列以及綁定關(guān)系八拱。
讀取配置文件的RabbitMQ的組件信息柱恤,然后動態(tài)的去創(chuàng)建關(guān)系。
配置文件格式:
/**
* RabbitMq的隊列拷况,交互機作煌,綁定關(guān)系的對象
*
*/
@Data
public class RabbitModuleInfo {
/**
* 路由主鍵
*/
private String routingKey;
/**
* 隊列信息
*/
private Queue queue;
/**
* 交換機信息
*/
private Exchange exchange;
/**
* 交換機的詳細配置
*/
@Data
public static class Exchange {
/**
* 交互機名稱
*/
private String name;
/**
* 交互機類型。
* 默認:直連型號
*/
private ExchangeTypeEnum type = ExchangeTypeEnum.DIRECT;
/**
* 是否持久化
* 默認true:當(dāng)RabbitMq重啟時赚瘦,消息不丟失
*/
private boolean durable = true;
/**
* 當(dāng)所有綁定隊列都不在使用時粟誓,是否自動 刪除交換器
* 默認值false:不自動刪除,推薦使用起意。
*/
private boolean autoDelete = false;
/**
* 判斷是否是延遲交互機
*/
private boolean delayed;
/**
* 交互機的額外參數(shù)
*/
private java.util.Map<String, Object> arguments;
}
/**
* 隊列的詳細信息
* 提供默認的配置參數(shù)
*/
@Data
public static class Queue {
/**
* 隊列名
* 必填
*/
private String name;
/**
* 是否持久化
* 默認true:當(dāng)RabbitMq重啟時鹰服,消息不丟失
*/
private boolean durable = true;
/**
* 是否具有排他性
* 默認false:可以多個消息者消費同一個隊列
*/
private boolean exclusive = false;
/**
* 當(dāng)消費者客戶端均斷開連接,是否自動刪除隊列
* 默認值false:不自動刪除,推薦使用悲酷,避免消費者斷開后套菜,隊列中丟棄消息
*/
private boolean autoDelete = false;
/**
* 需要綁定的死信隊列的交換機名稱
*/
private String deadExchangeName;
/**
* 需要綁定的死信隊列的路由key的名稱
*/
private String deadRoutingKey;
/**
* 隊列的額外參數(shù)
*/
private java.util.Map<String, Object> arguments;
}
}
@Data
@ConfigurationProperties(prefix = "my.rabbit")
public class SealRabbitProperty {
/**
* 組件信息(讀取配置文件,自動創(chuàng)建隊列信息)
*/
private List<RabbitModuleInfo> moduleInfos;
}
配置文件:
my:
rabbit:
module-infos:
# ocr隊列
- routing-key:my.routing.xx
queue:
name:my.queue.xx
exchange:
name: my.exchage.xx
- routing-key: my.routing.yy
queue:
name:my.queue.yy
exchange:
name: my.exchage.yy
type: FANOUT
聲明操作類(容器中單例bean創(chuàng)建完畢设易,執(zhí)行回調(diào)方法):
@Bean
@ConditionalOnMissingBean
public DeclareRabbitModule declareRabbitModule(RegisterRabbitModule rabbitModule, SealRabbitProperty sealRabbitProperty) {
return new DeclareRabbitModule(rabbitModule, sealRabbitProperty);
}
@Slf4j
public class RegisterRabbitModule {
private AmqpAdmin amqpAdmin;
public void setAmqpAdmin(AmqpAdmin amqpAdmin) {
this.amqpAdmin = amqpAdmin;
}
/**
* 注冊RabbitMq的組件信息
*
* @param queue 隊列對象
* @param exchange 交換機對象
* @param binding 隊列與交換機綁定關(guān)系對象
*/
public void registerModule(Queue queue, Exchange exchange, Binding binding) {
amqpAdmin.declareQueue(queue);
amqpAdmin.declareExchange(exchange);
amqpAdmin.declareBinding(binding);
}
}
@Slf4j
public class DeclareRabbitModule implements SmartInitializingSingleton {
/**
* 死信隊列 交換機標(biāo)識符
*/
public static final String DEAD_LETTER_QUEUE_KEY = "x-dead-letter-exchange";
/**
* 死信隊列交換機綁定鍵標(biāo)識符
*/
public static final String DEAD_LETTER_ROUTING_KEY = "x-dead-letter-routing-key";
private AmqpAdmin amqpAdmin;
private SealRabbitProperty sealRabbitProperty;
public DeclareRabbitModule(AmqpAdmin amqpAdmin, SealRabbitProperty sealRabbitProperty) {
this.amqpAdmin = amqpAdmin;
this.sealRabbitProperty = sealRabbitProperty;
}
@Override
public void afterSingletonsInstantiated() {
log.info("動態(tài)創(chuàng)建MQ配置信息...");
declareModule();
}
private void declareModule() {
//獲取組件信息
List<RabbitModuleInfo> moduleInfos = sealRabbitProperty.getModuleInfos();
if (CollectionUtils.isEmpty(moduleInfos)) {
log.warn("配置文件中不含有組件信息笼踩,不進行注冊!");
return;
}
//注冊組件信息
for (RabbitModuleInfo moduleInfo : moduleInfos) {
//數(shù)據(jù)校驗
declareValidate(moduleInfo);
//獲取隊列
Queue queue = tranSealQueue(moduleInfo.getQueue());
//獲取交換機
Exchange exchange = tranSealExchange(moduleInfo.getExchange());
//綁定關(guān)系
Binding binding = new Binding(queue.getName(), Binding.DestinationType.QUEUE,
exchange.getName(), moduleInfo.getRoutingKey(), null);
//創(chuàng)建隊列
amqpAdmin.declareQueue(queue);
amqpAdmin.declareExchange(exchange);
amqpAdmin.declareBinding(binding);
}
}
/**
* 聲明模塊數(shù)據(jù)校驗
*
* @param moduleInfo 配置文件的模塊信息
*/
public void declareValidate(RabbitModuleInfo moduleInfo) {
//判斷關(guān)鍵參數(shù)是否存在且合法
if (moduleInfo.getRoutingKey() == null) {
throw new RabbitDeclareModuleException("RoutingKey 不存在亡嫌!");
}
if (moduleInfo.getExchange() == null) {
throw new RabbitDeclareModuleException(String.format("my-rabbit的配置文件中,routingKey:[%s]未配置exchange掘而!", moduleInfo.getRoutingKey()));
}
if (moduleInfo.getExchange().getName() == null) {
throw new RabbitDeclareModuleException(String.format("my-rabbit的配置文件中挟冠,routingKey:[%s]未配置exchange的name屬性!", moduleInfo.getRoutingKey()));
}
if (moduleInfo.getQueue() == null) {
throw new RabbitDeclareModuleException(String.format("my-rabbit的配置文件中袍睡,routingKey:[%s]未配置queue知染!", moduleInfo.getRoutingKey()));
}
if (moduleInfo.getQueue().getName() == null) {
throw new RabbitDeclareModuleException(String.format("my-rabbit的配置文件中,routingKey:[%s]未配置queue的name屬性斑胜!", moduleInfo.getRoutingKey()));
}
}
/**
* 隊列的對象轉(zhuǎn)換
*
* @param queue 自定義的隊列信息
* @return RabbitMq的Queue對象
*/
private Queue tranSealQueue(RabbitModuleInfo.Queue queue) {
Map<String, Object> arguments = queue.getArguments();
//判斷是否需要綁定死信隊列
if (queue.getDeadExchangeName() != null && queue.getDeadRoutingKey() != null) {
//設(shè)置響應(yīng)參數(shù)
if (queue.getArguments() == null) {
arguments = new HashMap<>(2);
}
arguments.put(DEAD_LETTER_QUEUE_KEY, queue.getDeadExchangeName());
arguments.put(DEAD_LETTER_ROUTING_KEY, queue.getDeadRoutingKey());
}
return new Queue(queue.getName(), queue.isDurable(), queue.isExclusive(), queue.isAutoDelete(), arguments);
}
/**
* 將配置信息轉(zhuǎn)換交換機信息
*
* @param exchangeInfo 自定義交換機信息
* @return RabbitMq的Exchange的信息
*/
private Exchange tranSealExchange(RabbitModuleInfo.Exchange exchangeInfo) {
AbstractExchange exchange = null;
//判斷類型
switch (exchangeInfo.getType()) {
//直連模式
case DIRECT:
exchange = new DirectExchange(exchangeInfo.getName(), exchangeInfo.isDurable(), exchangeInfo.isAutoDelete(), exchangeInfo.getArguments());
break;
//廣播模式:
case FANOUT:
exchange = new FanoutExchange(exchangeInfo.getName(), exchangeInfo.isDurable(), exchangeInfo.isAutoDelete(), exchangeInfo.getArguments());
break;
//通配符模式
case TOPIC:
exchange = new TopicExchange(exchangeInfo.getName(), exchangeInfo.isDurable(), exchangeInfo.isAutoDelete(), exchangeInfo.getArguments());
break;
case HEADERS:
exchange = new HeadersExchange(exchangeInfo.getName(), exchangeInfo.isDurable(), exchangeInfo.isAutoDelete(), exchangeInfo.getArguments());
break;
}
//設(shè)置延遲隊列
exchange.setDelayed(exchangeInfo.isDelayed());
return exchange;
}
}
在項目啟動后控淡,便會去讀取配置文件的信息,然后去創(chuàng)建RabbitMQ的組件信息止潘,實現(xiàn)了配置集中管理掺炭。