SpringBoot--實戰(zhàn)開發(fā)--MQTT消息推送(六十)

一、MQTT簡介

??MQTT(Message Queuing Telemetry Transport讥裤,消息隊列遙測傳輸)是IBM開發(fā)的一個即時通訊協(xié)議己英,有可能成為物聯(lián)網(wǎng)的重要組成部分损肛。該協(xié)議支持所有平臺治拿,幾乎可以把所有聯(lián)網(wǎng)物品和外部連接起來劫谅,被用來當(dāng)做傳感器和制動器(比如通過Twitter讓房屋聯(lián)網(wǎng))的通信協(xié)議。

主要特征:
MQTT協(xié)議是為大量計算能力有限未檩,且工作在低帶寬冤狡、不可靠的網(wǎng)絡(luò)的遠(yuǎn)程傳感器和控制設(shè)備通訊而設(shè)計的協(xié)議挎峦,它具有以下主要的幾項特性:
1.使用發(fā)布/訂閱消息模式坦胶,提供一對多的消息發(fā)布顿苇,解除應(yīng)用程序耦合纪岁;
2.對負(fù)載內(nèi)容屏蔽的消息傳輸;
3.使用TCP/IP 提供網(wǎng)絡(luò)連接西壮;
4.有三種消息發(fā)布服務(wù)質(zhì)量:
“至多一次”贡定,消息發(fā)布完全依賴底層 TCP/IP 網(wǎng)絡(luò)蚓耽。會發(fā)生消息丟失或重復(fù)。這一級別可用于如下情況,環(huán)境傳感器數(shù)據(jù)答姥,丟失一次讀記錄無所謂,因為不久后還會有第二次發(fā)送敲长。
“至少一次”泽铛,確保消息到達(dá)盔腔,但消息重復(fù)可能會發(fā)生。
“只有一次”,確保消息到達(dá)一次盐杂。這一級別可用于如下情況,在計費系統(tǒng)中,消息重復(fù)或丟失會導(dǎo)致不正確的結(jié)果漩勤。
5.小型傳輸拆祈,開銷很小(固定長度的頭部是 2 字節(jié)),協(xié)議交換最小化,以降低網(wǎng)絡(luò)流量坪蚁;

二贱田、Maven依賴

<!--web-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <optional>true</optional>
</dependency>
<!--mqtt-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-stream</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-mqtt</artifactId>
</dependency>
<!--lombok-->
<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <optional>true</optional>
</dependency>

三耗拓、apollo服務(wù)器

  1. 配置文件(application.properties)
#MQTT配置信息
spring.mqtt.username=admin
spring.mqtt.password=password
## MQTT-服務(wù)器連接地址竿刁,如果有多個副编,用逗號隔開,
# 如:tcp://127.0.0.1:61613带污,tcp://192.168.2.133:61613
spring.mqtt.url=tcp://192.168.77.132:61613
## MQTT-連接服務(wù)器默認(rèn)客戶端ID
spring.mqtt.client.id=clientId
## MQTT-連接服務(wù)器默認(rèn)服務(wù)端ID
spring.mqtt.server.id=serverId
## MQTT-默認(rèn)的消息推送主題充易,實際可在調(diào)用接口時指定
spring.mqtt.default.topic=topic

  1. 消息發(fā)送配置類
@Configuration
@IntegrationComponentScan
@Slf4j
public class MqttSenderConfig {

    @Value("${spring.mqtt.username}")
    private String username;

    @Value("${spring.mqtt.password}")
    private String password;

    @Value("${spring.mqtt.url}")
    private String hostUrl;

    @Value("${spring.mqtt.client.id}")
    private String clientId;

    @Value("${spring.mqtt.default.topic}")
    private String defaultTopic;

    /**
     * 連接設(shè)置
     * @return
     */
    @Bean
    public MqttConnectOptions getMqttConnectOptions(){
        MqttConnectOptions mqttConnectOptions=new MqttConnectOptions();
        mqttConnectOptions.setUserName(username);
        mqttConnectOptions.setPassword(password.toCharArray());
        mqttConnectOptions.setServerURIs(new String[]{hostUrl});
        mqttConnectOptions.setKeepAliveInterval(2);
        return mqttConnectOptions;
    }

    /**
     * 客戶端工廠
     * @return
     */
    @Bean
    public MqttPahoClientFactory mqttClientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        factory.setConnectionOptions(getMqttConnectOptions());
        return factory;
    }

    /**
     * 發(fā)布通知
     * @return
     */
    @Bean
    @ServiceActivator(inputChannel = "mqttOutboundChannel")
    public MessageHandler mqttOutbound() {
        MqttPahoMessageHandler messageHandler =  new MqttPahoMessageHandler(clientId, mqttClientFactory());
        messageHandler.setAsync(true);
        messageHandler.setDefaultTopic(defaultTopic);
        return messageHandler;
    }

    /**
     * 發(fā)布通道為直連
     * @return
     */
    @Bean
    public MessageChannel mqttOutboundChannel() {
        return new DirectChannel();
    }
}
  1. 消息發(fā)送接口
/**
 * 消息發(fā)送接口
 */
@Component
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MsgWriter {
    void sendToMqtt(String data);
    void sendToMqtt(String payload,@Header(MqttHeaders.TOPIC) String topic);
    void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);
}

消息發(fā)送接口,需要發(fā)送消息的時候直接調(diào)用就行了,提供了幾個重載方法payload或者data是發(fā)送消息的內(nèi)容
topic是消息發(fā)送的主題,這里可以自己靈活定義,也可以使用默認(rèn)的主題,就是配置文件的主題,qos是mqtt 對消息處理的幾種機(jī)制分為0,1,2 其中0表示的是訂閱者沒收到消息不會再次發(fā)送,消息會丟失,1表示的是會嘗試重試,一直到接收到消息,但這種情況可能導(dǎo)致訂閱者收到多次重復(fù)消息,2相比多了一次去重的動作,確保訂閱者收到的消息有一次
當(dāng)然,這三種模式下的性能肯定也不一樣,qos=0是最好的,2是最差的 。

  1. 測試
@RestController
public class MqttController {
    @Autowired
    private MsgWriter msgWriter;
    @RequestMapping("/send")
    public String sendMqtt(String  sendData){
        msgWriter.sendToMqtt(sendData,"hello");
        return "OK";
    }
}

接口測試

Web查看
  1. 消息消費(本測試用的是同一項目盹靴,建議創(chuàng)建單獨消費項目進(jìn)行測試)
@Configuration
@IntegrationComponentScan
@Slf4j
public class MqttConsumersConfig {
//    訂閱的主題可以指定,我訂閱的是剛才發(fā)的too主題,還有訂閱方的id 別和發(fā)送方的id 一樣
    @Value("${spring.mqtt.server.id}")
    private String serverId;
    /**
     * 使用MqttSenderConfig中生成的工廠對象炸茧。
     * 如果單獨服務(wù)器瑞妇,請使用以下@Bean代碼。
     */
    @Autowired
    private MqttPahoClientFactory mqttClientFactory;

//    @Bean
//    public MqttPahoClientFactory mqttClientFactory() {
//        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
//        factory.setServerURIs("tcp://localhost:61613");
//        factory.setUserName("admin");
//        factory.setPassword("password");
//        return factory;
//    }

    /**
     * consumer 訂閱者監(jiān)聽消息
     * @return
     */
    @Bean
    public IntegrationFlow mqttInFlow() {
        return IntegrationFlows.from(mqttInbound())
                .transform(p -> p + ", 收到消息梭冠,來自MQTT")
                .handle(logger())
                .get();
    }

    /**
     * 處理日志
     * @return
     */
    private LoggingHandler logger() {
        LoggingHandler loggingHandler = new LoggingHandler("INFO");
        loggingHandler.setLoggerName("siSample");
        return loggingHandler;
    }

    /**
     * 訂閱主題
     * @return
     */
    @Bean
    public MessageProducerSupport mqttInbound() {
        /**
         * 訂閱的主題可以指定,我訂閱的是剛才發(fā)的too主題,還有訂閱方的id 別和發(fā)送方的id 一樣
         */
        MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(serverId,
                mqttClientFactory, "hello");
        adapter.setCompletionTimeout(5000);
        adapter.setConverter(new DefaultPahoMessageConverter());
        adapter.setQos(1);
        return adapter;
    }
}

注意:
主題名稱生產(chǎn)者與消費者一定要對應(yīng)辕狰,否則取不到消息 。

消費結(jié)果

四控漠、EMQ服務(wù)器

  1. 配置
# MQTT-密碼
# MQTT-服務(wù)器連接地址蔓倍,如果有多個,用逗號隔開盐捷,如:tcp://127.0.0.1:1883偶翅,tcp://192.168.2.133:61613
spring.mqtt.url=tcp://192.168.77.132:1883
# MQTT-連接服務(wù)器默認(rèn)客戶端ID
spring.mqtt.client.id=mqttId
# MQTT-連接服務(wù)器默認(rèn)服務(wù)端ID
spring.mqtt.server.id=serverId
# MQTT-默認(rèn)的消息推送主題,實際可在調(diào)用接口時指定
spring.mqtt.default.topic=topic
spring.mqtt.username=admin
spring.mqtt.password=public

  1. 代碼
    與apollo服務(wù)器相同碉渡。
  2. 測試結(jié)果
    測試方法同apollo服務(wù)器聚谁。


    測試結(jié)果
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市滞诺,隨后出現(xiàn)的幾起案子形导,更是在濱河造成了極大的恐慌,老刑警劉巖铭段,帶你破解...
    沈念sama閱讀 218,036評論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件骤宣,死亡現(xiàn)場離奇詭異,居然都是意外死亡序愚,警方通過查閱死者的電腦和手機(jī)憔披,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,046評論 3 395
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來爸吮,“玉大人芬膝,你說我怎么就攤上這事⌒谓浚” “怎么了锰霜?”我有些...
    開封第一講書人閱讀 164,411評論 0 354
  • 文/不壞的土叔 我叫張陵,是天一觀的道長桐早。 經(jīng)常有香客問我癣缅,道長,這世上最難降的妖魔是什么哄酝? 我笑而不...
    開封第一講書人閱讀 58,622評論 1 293
  • 正文 為了忘掉前任友存,我火速辦了婚禮,結(jié)果婚禮上陶衅,老公的妹妹穿的比我還像新娘屡立。我一直安慰自己,他們只是感情好搀军,可當(dāng)我...
    茶點故事閱讀 67,661評論 6 392
  • 文/花漫 我一把揭開白布膨俐。 她就那樣靜靜地躺著勇皇,像睡著了一般。 火紅的嫁衣襯著肌膚如雪焚刺。 梳的紋絲不亂的頭發(fā)上敛摘,一...
    開封第一講書人閱讀 51,521評論 1 304
  • 那天,我揣著相機(jī)與錄音檩坚,去河邊找鬼着撩。 笑死,一個胖子當(dāng)著我的面吹牛匾委,可吹牛的內(nèi)容都是我干的拖叙。 我是一名探鬼主播,決...
    沈念sama閱讀 40,288評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼赂乐,長吁一口氣:“原來是場噩夢啊……” “哼薯鳍!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起挨措,我...
    開封第一講書人閱讀 39,200評論 0 276
  • 序言:老撾萬榮一對情侶失蹤挖滤,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后浅役,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體斩松,經(jīng)...
    沈念sama閱讀 45,644評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,837評論 3 336
  • 正文 我和宋清朗相戀三年觉既,在試婚紗的時候發(fā)現(xiàn)自己被綠了惧盹。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 39,953評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡瞪讼,死狀恐怖钧椰,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情符欠,我是刑警寧澤嫡霞,帶...
    沈念sama閱讀 35,673評論 5 346
  • 正文 年R本政府宣布,位于F島的核電站希柿,受9級特大地震影響诊沪,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜曾撤,卻給世界環(huán)境...
    茶點故事閱讀 41,281評論 3 329
  • 文/蒙蒙 一娄徊、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧盾戴,春花似錦、人聲如沸兵多。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,889評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至衅斩,卻和暖如春盆顾,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背畏梆。 一陣腳步聲響...
    開封第一講書人閱讀 33,011評論 1 269
  • 我被黑心中介騙來泰國打工您宪, 沒想到剛下飛機(jī)就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人奠涌。 一個月前我還...
    沈念sama閱讀 48,119評論 3 370
  • 正文 我出身青樓宪巨,卻偏偏與公主長得像,于是被迫代替她去往敵國和親溜畅。 傳聞我的和親對象是個殘疾皇子捏卓,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,901評論 2 355

推薦閱讀更多精彩內(nèi)容