背景:由于一些原因锤灿,為了實現(xiàn)ApiGateway的動態(tài)更新,當前微服務架構并沒有總線(Spring cloud bus)堕仔,為了實現(xiàn)的業(yè)務場景是在管理平臺更新某個路由規(guī)則蔑祟,需要通知到所有別的實例。
RabbitMQ
為了實現(xiàn)業(yè)務第一時間想到MQ忌傻。但是RabbitMQ只有基于Queue的大脉。
Exchange有三類:fanout、direct水孩、topic
exchange與queue是綁定管理镰矿。
fanout 會把同一條消息發(fā)送給所有綁定的queue
direct 會把消息路由到那些binding key與routing key完全匹配的Queue
topic 也是將消息路由到binding key與routing key相匹配的Queue中
很明顯,我們選擇的是fanout的Exchange . 在代碼里俘种,動態(tài)創(chuàng)建臨時的queue秤标,動態(tài)綁定到fanout绝淡。由于臨時的queue在實例關閉的時候就會刪除。所以非常滿足當前的業(yè)務場景苍姜。
上代碼:
///創(chuàng)建mqFactory
@Bean(name = "mqConnectionFactory")
@Primary
public ConnectionFactory firstConnectionFactory(@Value("${mq.server.host}") String host,
@Value("${mq.server.port}") int port,
@Value("${mq.server.username}") String username,
@Value("${mq.server.password}") String password,
@Value("${mq.server.virtual-host}") String virtualHost) {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setHost(host);
connectionFactory.setPort(port);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
connectionFactory.setVirtualHost(virtualHost);
return connectionFactory;
}
///創(chuàng)建臨時的queue ,通過KeyGenerator保證queue名唯一(實現(xiàn)可以很多)
@Bean(name = "refreshRoutesQueue")
@Primary
public Queue queue() {
String queueName = "etc.gk." + KeyGenerator.generatorOrderKey();
Queue queue = new Queue(queueName, false, false, true);
return queue;
}
///創(chuàng)建fanout的實例
@Bean(name = "changeFanout")
@Primary
public FanoutExchange fanoutExchange(@Value("${mq.service.routes.change.fanout}") String fanout) {
return new FanoutExchange(fanout);
}
///綁定queue和exchange
@Bean(name = "binding-exchangeMessage")
public Binding bindingExchangeMessage(@Qualifier("refreshRoutesQueue") Queue queueMessage, @Qualifier("changeFanout") FanoutExchange exchange) {
return BindingBuilder.bind(queueMessage).to(exchange);
}
接下來的操作就是監(jiān)聽這個Queue,創(chuàng)建Listener牢酵,Container 就不贅述了