ps: 本文所有代碼可在 這里 查看捶码。
背景
記得之前有一個場景磨德,網(wǎng)關(guān)接收各種類型設(shè)備上傳過來的數(shù)據(jù)包矿筝,然后根據(jù)不同類型的數(shù)據(jù),通過 MQ
轉(zhuǎn)發(fā)到相應(yīng)的處理器進行消費念脯。舉個例子:現(xiàn)在有2種類型的設(shè)備采集器狞洋,分別為 水位監(jiān)測器 和 溫度監(jiān)測器,最后會分發(fā)到各自的處理器進行處理绿店。
解決方案
一般做法
所有處理不同類型數(shù)據(jù)的隊列吉懊,監(jiān)聽同一個 Topic
,然后消費時假勿,通過判斷數(shù)據(jù)的類型來決定是否需要處理借嗽。比如上面的例子,每來一條數(shù)據(jù)转培,2個處理器都會去消費這條數(shù)據(jù)恶导,對于 水位監(jiān)測器 的處理器,如果是 水位監(jiān)測器 的數(shù)據(jù)浸须,那剛好惨寿,正常消費,如果是 溫度監(jiān)測器 的數(shù)據(jù)羽戒,直接跳過缤沦。
這種做法,優(yōu)點很明顯易稠,即不用增加其他配置缸废,只需在消費時做下類型判斷;但缺點也特別明顯驶社,所有消息企量,每一個隊列都需要消費一次。為什么這么說呢亡电?我們都知道届巩,消息在投遞過程中,消息是需要序列化和反序列化的(一般使用的是 json
)份乒,序列化和反序列化是需要耗系統(tǒng)資源的恕汇,而且投遞過程中也是需要占用帶寬的腕唧,而消息到達消費端時,大部分情況下都會因為類型不符而跳過處理瘾英,最后還要通知交換機處理結(jié)果枣接,這樣就會造成不必要的資源浪費。
可以看到缺谴,如果數(shù)據(jù)量小但惶,分類不多画侣,缺點并不會造成多嚴重的后果絮缅,但如果數(shù)據(jù)量一大,分類一多墙基,那將會極大的浪費系統(tǒng)資源阳啥。我們都知道添谊,物聯(lián)網(wǎng)的各種設(shè)備何止千千萬,不同設(shè)備類型更是繁多苫纤,那么一旦使用這種方案碉钠,本來10臺機器能搞定的事情,最后可能需要幾十臺卷拘,數(shù)據(jù)分類多的話喊废,可能還需要更多。而且數(shù)據(jù)量如果突然劇增栗弟,系統(tǒng)也很容易就扛不住污筷。
綜上,這種方案大多情況下是不適用的乍赫。那有沒有更好的方案瓣蛀,比如不同處理器,只處理一種對應(yīng)的設(shè)備上報的數(shù)據(jù)包雷厂?答案是肯定的惋增,那就是——動態(tài)路由。
動態(tài)路由
何為 動態(tài)路由改鲫?簡單的說诈皿,就是:消息到達交換機后,會根據(jù)動態(tài)的 routingKey
像棘,投遞到與交換機綁定時 bindingKey
相同的隊列中稽亏。
舉個例子,水位監(jiān)測器 的隊列與交換機綁定時使用的 bindingKey
為 waterLevel
缕题,這時如果來了一條監(jiān)測到的水位數(shù)據(jù)截歉,消息在發(fā)布時使用的動態(tài) routingKey
也為 waterLevel
,那這條數(shù)據(jù) 水位監(jiān)測器 的處理器能正常處理烟零,而 bindingKey
為 temperature
的 溫度監(jiān)測器 隊列則收不到這條數(shù)據(jù)瘪松。
我們都知道咸作, bindingKey
可以通過配置 spring.cloud.stream.rabbit.bindings.<channelName>.consumer.bindingRoutingKey
來達到效果,那難點就剩下:如何在發(fā)布消息時指定想要的 routingKey
宵睦。翻了下官方文檔性宏,找到這樣一個配置:
很明顯,這就是我們想要的状飞,支持一個 SpEL
表達式,如果是固定的 routingKey
书斜,寫一個常量字符串即可诬辈。
文檔鏈接在 這里。
接下來荐吉,我們來進行一個簡單的 demo
試一下焙糟。
application-dynamic.yml
spring:
cloud:
stream:
bindings:
packetUplinkOutput:
destination: packetUplinkTopic
content-type: application/json
binder: rabbit
waterLevelInput:
destination: packetUplinkTopic
content-type: application/json
group: ${spring.application.name}.waterLevel
binder: rabbit
temperatureInput:
destination: packetUplinkTopic
content-type: application/json
group: ${spring.application.name}.temperature
binder: rabbit
rabbit:
bindings:
packetUplinkOutput:
producer:
# 生產(chǎn)者配置RabbitMq的動態(tài)路由鍵
routingKeyExpression: headers.type
waterLevelInput:
consumer:
bindingRoutingKey: waterLevel # 將queue綁定到exchange時使用的routing key。默認'#'
temperatureInput:
consumer:
bindingRoutingKey: temperature # 將queue綁定到exchange時使用的routing key样屠。默認'#'
ScasDynamicRoutingTest
@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest
@ActiveProfiles("dynamic")
@EnableBinding({ScasDynamicRoutingTest.MessageSink.class, ScasDynamicRoutingTest.MessageSource.class})
public class ScasDynamicRoutingTest {
@Autowired
private PacketUplinkProducer packetUplinkProducer;
private Random random = new Random();
private List<String> devEuis = new ArrayList<>(10);
@PostConstruct
private void initDevEuis() {
devEuis.add("10001");
devEuis.add("10002");
devEuis.add("10003");
devEuis.add("10004");
devEuis.add("10005");
devEuis.add("10006");
devEuis.add("10007");
devEuis.add("10008");
devEuis.add("10009");
devEuis.add("10010");
}
@Test
public void test() throws InterruptedException {
for (int i = 0; i < 5; i++) {
String devEui = getDevEuis();
String type = "waterLevel";
packetUplinkProducer.publish(new PacketModel(devEui, type));
}
for (int i = 0; i < 5; i++) {
String devEui = getDevEuis();
String type = "temperature";
packetUplinkProducer.publish(new PacketModel(devEui, type));
}
Thread.sleep(10000000);
}
private String getDevEuis() {
return devEuis.get(random.nextInt(10));
}
@Component
public static class PacketUplinkProducer {
@Autowired
private MessageSource messageSource;
public void publish(PacketModel model) {
log.info("發(fā)布上行數(shù)據(jù)包消息. model: [{}].", model);
Message<PacketModel> message = MessageBuilder.withPayload(model).setHeader("type", model.getType()).build();
messageSource.packetUplinkOutput().send(message);
}
}
@Component
public static class PacketUplinkHandler {
@StreamListener("waterLevelInput")
public void handleWaterLevelPacket(PacketModel model) throws InterruptedException {
log.info("消費【水位監(jiān)測器】數(shù)據(jù)包消息. model: [{}].", model);
}
@StreamListener("temperatureInput")
public void handleTemperaturePacket(PacketModel model) throws InterruptedException {
log.info("消費【溫度監(jiān)測器】數(shù)據(jù)包消息. model: [{}].", model);
}
}
public interface MessageSink {
@Input("waterLevelInput")
SubscribableChannel waterLevelInput();
@Input("temperatureInput")
SubscribableChannel temperatureInput();
}
public interface MessageSource {
@Output("packetUplinkOutput")
MessageChannel packetUplinkOutput();
}
@Data
@NoArgsConstructor
@AllArgsConstructor
static class PacketModel {
/**
* 設(shè)備 eui
*/
private String devEui;
/**
* 設(shè)備類型
*/
private String type;
}
}
測試結(jié)果
再看一下可視化界面的隊列詳情:
可以看到穿撮,
routing key
為我們配置的 waterLevel
。
如果將其中某一個的 bindingRoutingKey
去掉或改成默認值 #
痪欲,結(jié)果如下:
代碼分析
其實最關(guān)鍵的一行代碼是:
// ...
Message<PacketModel> message = MessageBuilder.withPayload(model).setHeader("type", model.getType()).build();
// ...
構(gòu)建消息時悦穿,自定義一個 key
為 type
的 header
,而我們在定義生產(chǎn)者時业踢,指定了 routingKeyExpression
為 headers.type
栗柒,也就是說,在投遞時會以 type
的值作為最后的 routingKey
知举。所以瞬沦,這樣也就達到了我們想要的效果。
總結(jié)
這種配置方式雇锡,適合:生產(chǎn)者只有一個逛钻,消費者有多個,且需要將不同的消息投遞到不同的目標隊列锰提。這樣的場景很多曙痘,除了上面舉的例子,還有:不同平臺(天貓欲账、淘寶屡江、京東、有贊等)的訂單赛不,需要被各自的處理器進行消費惩嘉。
相關(guān)鏈接
推薦閱讀
Spring Cloud 進階玩法
統(tǒng)一異常處理介紹及實戰(zhàn)
Spring Cloud Stream 進階配置——使用延遲隊列實現(xiàn)“定時關(guān)閉超時未支付訂單”
Spring Cloud Stream 進階配置——高可用(二)——死信隊列