Spring 整合MQTT

Spring提供了對(duì)多種消息中間件的整合,其中也包括MQTT。具體請(qǐng)參見(jiàn)以下鏈接:

https://docs.spring.io/spring-integration/reference/html/

Spring整合MQTT步驟如下:

  1. 創(chuàng)建Spring Boot Maven工程根穷,引入如下依賴:

    <dependencies>
         <dependency>
             <groupId>org.springframework.boot</groupId>
             <artifactId>spring-boot-starter-web</artifactId>
         </dependency>
    
         <dependency>
             <groupId>org.springframework.boot</groupId>
             <artifactId>spring-boot-starter-test</artifactId>
             <scope>test</scope>
         </dependency>
    
         <dependency>
             <groupId>org.springframework.integration</groupId>
             <artifactId>spring-integration-mqtt</artifactId>
             <version>5.1.3.RELEASE</version>
         </dependency>
     </dependencies>
    
  2. 配置MQTT消費(fèi)端

    添加SpringConfig.java類,添加消息消費(fèi)Bean

    /*****
         * 創(chuàng)建MqttPahoClientFactory覆履,設(shè)置MQTT Broker連接屬性怨酝,如果使用SSL驗(yàn)證,也在這里設(shè)置漆魔。
         * @return
         */
        @Bean
        public MqttPahoClientFactory mqttClientFactory() {
            DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
            MqttConnectOptions options = new MqttConnectOptions();
            options.setServerURIs(new String[]{"tcp://10.69.94.176:1883"});
            factory.setConnectionOptions(options);
            return factory;
        }
    
        @Bean
        public MessageChannel mqttInputChannel() {
            return new DirectChannel();
        }
    
        @Bean
        public MessageProducer inbound() {
            MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter("consumerClient",
                    mqttClientFactory(), "topic1", "topic2");
            adapter.setCompletionTimeout(5000);
            adapter.setConverter(new DefaultPahoMessageConverter());
            adapter.setQos(1);
            adapter.setOutputChannel(mqttInputChannel());
            return adapter;
        }
    
        @Bean
        //ServiceActivator注解表明當(dāng)前方法用于處理MQTT消息坷檩,inputChannel參數(shù)指定了用于接收消息信息的channel却音。
        @ServiceActivator(inputChannel = "mqttInputChannel")
        public MessageHandler handler() {
            return message -> {
                String payload = message.getPayload().toString();
                String topic = message.getHeaders().get("mqtt_receivedTopic").toString();
                // 根據(jù)topic分別進(jìn)行消息處理。
                if (topic.equals("topic1")) {
                    System.out.println("topic1: 處理消息 " + payload);
                } else if (topic.equals("topic2")) {
                    System.out.println("topic2: 處理消息 " + payload);
                } else {
                    System.out.println(topic + ": 丟棄消息 " + payload);
                }
            };
        }
    

    @ServiceActivator注解表明當(dāng)前方法用于處理MQTT消息矢炼,inputChannel參數(shù)指定了用于接收消息的channel系瓢。

    當(dāng)接收到消息時(shí),可以先拿到topic句灌,然后根據(jù)不同的topic分別對(duì)消息進(jìn)行處理夷陋。

  3. 配置MQTT消息發(fā)送端。

    在MQTT使用場(chǎng)景中胰锌,一般處理接收消息的同時(shí)骗绕,也會(huì)發(fā)送消息。在SpringConfig.java配置文件中添加如下Bean注入资昧,用于消息發(fā)送酬土。

     @Bean
        public MessageChannel mqttOutboundChannel() {
            return new DirectChannel();
        }
    
        /*****
         * 發(fā)送消息和消費(fèi)消息Channel可以使用相同MqttPahoClientFactory
         * @return
         */
        @Bean
        @ServiceActivator(inputChannel = "mqttOutboundChannel")
        public MessageHandler outbound() {
            // 在這里進(jìn)行mqttOutboundChannel的相關(guān)設(shè)置
            MqttPahoMessageHandler messageHandler =
                    new MqttPahoMessageHandler("publishClient", mqttClientFactory());
            messageHandler.setAsync(true); //如果設(shè)置成true,發(fā)送消息時(shí)將不會(huì)阻塞格带。
            messageHandler.setDefaultTopic("testTopic");
            return messageHandler;
        }
    
        @MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
        public interface MqttGateway {
            // 定義重載方法撤缴,用于消息發(fā)送
            void sendToMqtt(String payload);
            // 指定topic進(jìn)行消息發(fā)送
            void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload);
            void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);
        }
    

    @MessagingGateway是一個(gè)用于提供消息網(wǎng)關(guān)代理整合的注解,參數(shù)defaultRequestChannel指定發(fā)送消息綁定的channel叽唱。

    在這里我們定義了MqttGateway接口腹泌,該接口可以被注入到其它類中,用于消息發(fā)送尔觉。

     @Autowired
        private SpringConfig.MqttGateway mqttGateway;
        
        // 然后在類方法中調(diào)用下面方法發(fā)送消息
        // mqttGateway.sendToMqtt("testTopic",message);
    
  4. 創(chuàng)建Rest Controller凉袱,通過(guò)http請(qǐng)求發(fā)送MQTT消息。

    @RestController
    public class MqttController {
    
        @Autowired
        private SpringConfig.MqttGateway mqttGateway;
    
        @RequestMapping("/send/{topic}/{message}")
        public String send(@PathVariable String topic, @PathVariable String message) {
            mqttGateway.sendToMqtt(topic, message);
            return "send message : " + message;
        }
    }
    

    在瀏覽器中輸入如下網(wǎng)址進(jìn)行測(cè)試侦铜。

    http://localhost:8080/send/topic1/message1

    http://localhost:8080/send/topic2/message2

  5. 項(xiàng)目源代碼可參考https://github.com/40925645/zengbiaobiao/new/master/mqtt-demo

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末专甩,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子钉稍,更是在濱河造成了極大的恐慌涤躲,老刑警劉巖,帶你破解...
    沈念sama閱讀 218,682評(píng)論 6 507
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件贡未,死亡現(xiàn)場(chǎng)離奇詭異种樱,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)俊卤,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,277評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門嫩挤,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人消恍,你說(shuō)我怎么就攤上這事岂昭。” “怎么了狠怨?”我有些...
    開封第一講書人閱讀 165,083評(píng)論 0 355
  • 文/不壞的土叔 我叫張陵约啊,是天一觀的道長(zhǎng)邑遏。 經(jīng)常有香客問(wèn)我,道長(zhǎng)恰矩,這世上最難降的妖魔是什么记盒? 我笑而不...
    開封第一講書人閱讀 58,763評(píng)論 1 295
  • 正文 為了忘掉前任,我火速辦了婚禮外傅,結(jié)果婚禮上孽鸡,老公的妹妹穿的比我還像新娘。我一直安慰自己栏豺,他們只是感情好彬碱,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,785評(píng)論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著奥洼,像睡著了一般巷疼。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上灵奖,一...
    開封第一講書人閱讀 51,624評(píng)論 1 305
  • 那天嚼沿,我揣著相機(jī)與錄音,去河邊找鬼瓷患。 笑死骡尽,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的擅编。 我是一名探鬼主播攀细,決...
    沈念sama閱讀 40,358評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼爱态!你這毒婦竟也來(lái)了谭贪?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,261評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤锦担,失蹤者是張志新(化名)和其女友劉穎俭识,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體洞渔,經(jīng)...
    沈念sama閱讀 45,722評(píng)論 1 315
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡套媚,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,900評(píng)論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了磁椒。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片堤瘤。...
    茶點(diǎn)故事閱讀 40,030評(píng)論 1 350
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖衷快,靈堂內(nèi)的尸體忽然破棺而出宙橱,到底是詐尸還是另有隱情姨俩,我是刑警寧澤蘸拔,帶...
    沈念sama閱讀 35,737評(píng)論 5 346
  • 正文 年R本政府宣布师郑,位于F島的核電站,受9級(jí)特大地震影響调窍,放射性物質(zhì)發(fā)生泄漏宝冕。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,360評(píng)論 3 330
  • 文/蒙蒙 一邓萨、第九天 我趴在偏房一處隱蔽的房頂上張望地梨。 院中可真熱鬧,春花似錦缔恳、人聲如沸宝剖。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,941評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)万细。三九已至,卻和暖如春纸泄,著一層夾襖步出監(jiān)牢的瞬間赖钞,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,057評(píng)論 1 270
  • 我被黑心中介騙來(lái)泰國(guó)打工聘裁, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留雪营,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 48,237評(píng)論 3 371
  • 正文 我出身青樓衡便,卻偏偏與公主長(zhǎng)得像献起,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子镣陕,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,976評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容