Spring Cloud Stream 進階配置——動態(tài)路由

ps: 本文所有代碼可在 這里 查看捶码。

背景

記得之前有一個場景磨德,網(wǎng)關(guān)接收各種類型設(shè)備上傳過來的數(shù)據(jù)包矿筝,然后根據(jù)不同類型的數(shù)據(jù),通過 MQ 轉(zhuǎn)發(fā)到相應(yīng)的處理器進行消費念脯。舉個例子:現(xiàn)在有2種類型的設(shè)備采集器狞洋,分別為 水位監(jiān)測器溫度監(jiān)測器,最后會分發(fā)到各自的處理器進行處理绿店。

解決方案

一般做法

所有處理不同類型數(shù)據(jù)的隊列吉懊,監(jiān)聽同一個 Topic,然后消費時假勿,通過判斷數(shù)據(jù)的類型來決定是否需要處理借嗽。比如上面的例子,每來一條數(shù)據(jù)转培,2個處理器都會去消費這條數(shù)據(jù)恶导,對于 水位監(jiān)測器 的處理器,如果是 水位監(jiān)測器 的數(shù)據(jù)浸须,那剛好惨寿,正常消費,如果是 溫度監(jiān)測器 的數(shù)據(jù)羽戒,直接跳過缤沦。

這種做法,優(yōu)點很明顯易稠,即不用增加其他配置缸废,只需在消費時做下類型判斷;但缺點也特別明顯驶社,所有消息企量,每一個隊列都需要消費一次。為什么這么說呢亡电?我們都知道届巩,消息在投遞過程中,消息是需要序列化和反序列化的(一般使用的是 json)份乒,序列化和反序列化是需要耗系統(tǒng)資源的恕汇,而且投遞過程中也是需要占用帶寬的腕唧,而消息到達消費端時,大部分情況下都會因為類型不符而跳過處理瘾英,最后還要通知交換機處理結(jié)果枣接,這樣就會造成不必要的資源浪費。

可以看到缺谴,如果數(shù)據(jù)量小但惶,分類不多画侣,缺點并不會造成多嚴重的后果絮缅,但如果數(shù)據(jù)量一大,分類一多墙基,那將會極大的浪費系統(tǒng)資源阳啥。我們都知道添谊,物聯(lián)網(wǎng)的各種設(shè)備何止千千萬,不同設(shè)備類型更是繁多苫纤,那么一旦使用這種方案碉钠,本來10臺機器能搞定的事情,最后可能需要幾十臺卷拘,數(shù)據(jù)分類多的話喊废,可能還需要更多。而且數(shù)據(jù)量如果突然劇增栗弟,系統(tǒng)也很容易就扛不住污筷。

綜上,這種方案大多情況下是不適用的乍赫。那有沒有更好的方案瓣蛀,比如不同處理器,只處理一種對應(yīng)的設(shè)備上報的數(shù)據(jù)包雷厂?答案是肯定的惋增,那就是——動態(tài)路由。

動態(tài)路由

何為 動態(tài)路由改鲫?簡單的說诈皿,就是:消息到達交換機后,會根據(jù)動態(tài)的 routingKey像棘,投遞到與交換機綁定時 bindingKey 相同的隊列中稽亏。

舉個例子,水位監(jiān)測器 的隊列與交換機綁定時使用的 bindingKeywaterLevel缕题,這時如果來了一條監(jiān)測到的水位數(shù)據(jù)截歉,消息在發(fā)布時使用的動態(tài) routingKey 也為 waterLevel,那這條數(shù)據(jù) 水位監(jiān)測器 的處理器能正常處理烟零,而 bindingKeytemperature溫度監(jiān)測器 隊列則收不到這條數(shù)據(jù)瘪松。

我們都知道咸作, bindingKey 可以通過配置 spring.cloud.stream.rabbit.bindings.<channelName>.consumer.bindingRoutingKey 來達到效果,那難點就剩下:如何在發(fā)布消息時指定想要的 routingKey宵睦。翻了下官方文檔性宏,找到這樣一個配置:

routingKeyExpression

很明顯,這就是我們想要的状飞,支持一個 SpEL 表達式,如果是固定的 routingKey书斜,寫一個常量字符串即可诬辈。

文檔鏈接在 這里

接下來荐吉,我們來進行一個簡單的 demo 試一下焙糟。

application-dynamic.yml

spring:
  cloud:
    stream:
      bindings:
        packetUplinkOutput:
          destination: packetUplinkTopic
          content-type: application/json
          binder: rabbit

        waterLevelInput:
          destination: packetUplinkTopic
          content-type: application/json
          group: ${spring.application.name}.waterLevel
          binder: rabbit

        temperatureInput:
          destination: packetUplinkTopic
          content-type: application/json
          group: ${spring.application.name}.temperature
          binder: rabbit

      rabbit:
        bindings:
          packetUplinkOutput:
            producer:
              # 生產(chǎn)者配置RabbitMq的動態(tài)路由鍵
              routingKeyExpression: headers.type

          waterLevelInput:
            consumer:
              bindingRoutingKey: waterLevel # 將queue綁定到exchange時使用的routing key。默認'#'
          temperatureInput:
            consumer:
              bindingRoutingKey: temperature # 將queue綁定到exchange時使用的routing key样屠。默認'#'

ScasDynamicRoutingTest

@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest
@ActiveProfiles("dynamic")
@EnableBinding({ScasDynamicRoutingTest.MessageSink.class, ScasDynamicRoutingTest.MessageSource.class})
public class ScasDynamicRoutingTest {

    @Autowired
    private PacketUplinkProducer packetUplinkProducer;

    private Random random = new Random();
    private List<String> devEuis = new ArrayList<>(10);

    @PostConstruct
    private void initDevEuis() {
        devEuis.add("10001");
        devEuis.add("10002");
        devEuis.add("10003");
        devEuis.add("10004");
        devEuis.add("10005");
        devEuis.add("10006");
        devEuis.add("10007");
        devEuis.add("10008");
        devEuis.add("10009");
        devEuis.add("10010");
    }

    @Test
    public void test() throws InterruptedException {
        
        for (int i = 0; i < 5; i++) {
            String devEui = getDevEuis();
            String type = "waterLevel";
            packetUplinkProducer.publish(new PacketModel(devEui, type));
        }

        for (int i = 0; i < 5; i++) {
            String devEui = getDevEuis();
            String type = "temperature";
            packetUplinkProducer.publish(new PacketModel(devEui, type));
        }

        Thread.sleep(10000000);

    }

    private String getDevEuis() {
        return devEuis.get(random.nextInt(10));
    }

    @Component
    public static class PacketUplinkProducer {

        @Autowired
        private MessageSource messageSource;

        public void publish(PacketModel model) {
            log.info("發(fā)布上行數(shù)據(jù)包消息. model: [{}].", model);
            Message<PacketModel> message = MessageBuilder.withPayload(model).setHeader("type", model.getType()).build();
            messageSource.packetUplinkOutput().send(message);
        }

    }

    @Component
    public static class PacketUplinkHandler {

        @StreamListener("waterLevelInput")
        public void handleWaterLevelPacket(PacketModel model) throws InterruptedException {
            log.info("消費【水位監(jiān)測器】數(shù)據(jù)包消息. model: [{}].", model);
        }

        @StreamListener("temperatureInput")
        public void handleTemperaturePacket(PacketModel model) throws InterruptedException {
            log.info("消費【溫度監(jiān)測器】數(shù)據(jù)包消息. model: [{}].", model);
        }

    }

    public interface MessageSink {

        @Input("waterLevelInput")
        SubscribableChannel waterLevelInput();

        @Input("temperatureInput")
        SubscribableChannel temperatureInput();

    }

    public interface MessageSource {

        @Output("packetUplinkOutput")
        MessageChannel packetUplinkOutput();

    }

    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    static class PacketModel {
        /**
         * 設(shè)備 eui
         */
        private String devEui;

        /**
         * 設(shè)備類型
         */
        private String type;
    }

}

測試結(jié)果

result

再看一下可視化界面的隊列詳情:

waterLevel queue

可以看到穿撮,routing key 為我們配置的 waterLevel

如果將其中某一個的 bindingRoutingKey 去掉或改成默認值 #痪欲,結(jié)果如下:

another result

代碼分析

其實最關(guān)鍵的一行代碼是:

// ...
Message<PacketModel> message = MessageBuilder.withPayload(model).setHeader("type", model.getType()).build();
// ...

構(gòu)建消息時悦穿,自定義一個 keytypeheader,而我們在定義生產(chǎn)者時业踢,指定了 routingKeyExpressionheaders.type栗柒,也就是說,在投遞時會以 type 的值作為最后的 routingKey知举。所以瞬沦,這樣也就達到了我們想要的效果。

總結(jié)

這種配置方式雇锡,適合:生產(chǎn)者只有一個逛钻,消費者有多個,且需要將不同的消息投遞到不同的目標隊列锰提。這樣的場景很多曙痘,除了上面舉的例子,還有:不同平臺(天貓欲账、淘寶屡江、京東、有贊等)的訂單赛不,需要被各自的處理器進行消費惩嘉。

相關(guān)鏈接

Spring Cloud Stream Guide

推薦閱讀

Spring Cloud 進階玩法
統(tǒng)一異常處理介紹及實戰(zhàn)
Spring Cloud Stream 進階配置——使用延遲隊列實現(xiàn)“定時關(guān)閉超時未支付訂單”
Spring Cloud Stream 進階配置——高可用(二)——死信隊列

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市踢故,隨后出現(xiàn)的幾起案子文黎,更是在濱河造成了極大的恐慌惹苗,老刑警劉巖,帶你破解...
    沈念sama閱讀 206,126評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件耸峭,死亡現(xiàn)場離奇詭異桩蓉,居然都是意外死亡,警方通過查閱死者的電腦和手機劳闹,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,254評論 2 382
  • 文/潘曉璐 我一進店門院究,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人本涕,你說我怎么就攤上這事业汰。” “怎么了菩颖?”我有些...
    開封第一講書人閱讀 152,445評論 0 341
  • 文/不壞的土叔 我叫張陵样漆,是天一觀的道長。 經(jīng)常有香客問我晦闰,道長放祟,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 55,185評論 1 278
  • 正文 為了忘掉前任呻右,我火速辦了婚禮跪妥,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘窿冯。我一直安慰自己骗奖,他們只是感情好,可當(dāng)我...
    茶點故事閱讀 64,178評論 5 371
  • 文/花漫 我一把揭開白布醒串。 她就那樣靜靜地躺著执桌,像睡著了一般。 火紅的嫁衣襯著肌膚如雪芜赌。 梳的紋絲不亂的頭發(fā)上仰挣,一...
    開封第一講書人閱讀 48,970評論 1 284
  • 那天,我揣著相機與錄音缠沈,去河邊找鬼膘壶。 笑死,一個胖子當(dāng)著我的面吹牛洲愤,可吹牛的內(nèi)容都是我干的颓芭。 我是一名探鬼主播,決...
    沈念sama閱讀 38,276評論 3 399
  • 文/蒼蘭香墨 我猛地睜開眼柬赐,長吁一口氣:“原來是場噩夢啊……” “哼亡问!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 36,927評論 0 259
  • 序言:老撾萬榮一對情侶失蹤州藕,失蹤者是張志新(化名)和其女友劉穎束世,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體床玻,經(jīng)...
    沈念sama閱讀 43,400評論 1 300
  • 正文 獨居荒郊野嶺守林人離奇死亡毁涉,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 35,883評論 2 323
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了锈死。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片贫堰。...
    茶點故事閱讀 37,997評論 1 333
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖待牵,靈堂內(nèi)的尸體忽然破棺而出严嗜,到底是詐尸還是另有隱情,我是刑警寧澤洲敢,帶...
    沈念sama閱讀 33,646評論 4 322
  • 正文 年R本政府宣布,位于F島的核電站茄蚯,受9級特大地震影響压彭,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜渗常,卻給世界環(huán)境...
    茶點故事閱讀 39,213評論 3 307
  • 文/蒙蒙 一壮不、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧皱碘,春花似錦询一、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,204評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至踢俄,卻和暖如春缩功,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背都办。 一陣腳步聲響...
    開封第一講書人閱讀 31,423評論 1 260
  • 我被黑心中介騙來泰國打工嫡锌, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人琳钉。 一個月前我還...
    沈念sama閱讀 45,423評論 2 352
  • 正文 我出身青樓势木,卻偏偏與公主長得像,于是被迫代替她去往敵國和親歌懒。 傳聞我的和親對象是個殘疾皇子啦桌,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 42,722評論 2 345