MQTT詳解以及實際操作

1 MQTT

1.1 MQTT介紹

1.1.1 簡介

MQTT 全稱(Message Queue Telemetry Transport):一種基于發(fā)布/訂閱(publish/subscribe)模式的輕量級通訊協(xié)議,通過訂閱相應(yīng)的主題來獲取消息酷麦,是物聯(lián)網(wǎng)(Internet of Thing)中的一個標(biāo)準(zhǔn)傳輸協(xié)議矿卑。
MQTT是一種基于發(fā)布/訂閱模式的輕量級通訊協(xié)議,該協(xié)議構(gòu)建在TCP/IP協(xié)議上沃饶。 MQTT最大的有點在于可以以極少的代碼和有限的帶寬母廷,為遠(yuǎn)程設(shè)備提供實時可靠的消息服務(wù)。做為一種低開銷糊肤、低帶寬占用的即時通訊協(xié)議徘意,MQTT在物聯(lián)網(wǎng)、小型設(shè)備轩褐、移動應(yīng)用等方面有廣泛應(yīng)用椎咧。
MQTT協(xié)議將消息的發(fā)布者(publisher)與訂閱者(subscriber)進(jìn)行分離,因此可以在不可靠的網(wǎng)絡(luò)環(huán)境中把介,為遠(yuǎn)程連接的設(shè)備提供可靠的消息服務(wù)勤讽,使用方式與傳統(tǒng)的MQ有點類似。

b768f8788d01f2acf0152ebc965e5498_b1a7a484075640c4907c9fa7e3a61d93.png

TCP協(xié)議位于傳輸層拗踢,MQTT 協(xié)議位于應(yīng)用層脚牍,MQTT 協(xié)議構(gòu)建于TCP/IP協(xié)議上,也就是說只要支持TCP/IP協(xié)議棧的地方巢墅,都可以使用MQTT協(xié)議诸狭。

1.1.2 特點和應(yīng)用

特點:

  • 開放消息協(xié)議,簡單易實現(xiàn)
  • 發(fā)布訂閱模式君纫,一對多消息發(fā)布
  • 基于TCP/IP網(wǎng)絡(luò)連接驯遇,提供有序,無損蓄髓,雙向連接
  • 2字節(jié)固定報頭叉庐,2字節(jié)心跳報文,最小化傳輸開銷和協(xié)議交換会喝,有效減少網(wǎng)絡(luò)流量
  • 消息QoS支持陡叠,可靠傳輸保證

應(yīng)用:

  • 物聯(lián)網(wǎng)M2M通信玩郊,物聯(lián)網(wǎng)大數(shù)據(jù)采集
  • Android消息推送,WEB消息推送
  • 智能硬件枉阵、智能家具译红、智能電器
  • 車聯(lián)網(wǎng)通信,電動車站樁采集
  • 智慧城市兴溜、遠(yuǎn)程醫(yī)療临庇、遠(yuǎn)程教育
  • 電力、石油與能源等行業(yè)市場

1.1.3 為什么要用 MQTT協(xié)議

MQTT 協(xié)議為什么在物聯(lián)網(wǎng)(IOT)中如此受偏愛昵慌?而不是其它協(xié)議假夺,比如我們更為熟悉的 HTTP協(xié)議呢?
首先HTTP協(xié)議它是一種同步協(xié)議斋攀,客戶端請求后需要等待服務(wù)器的響應(yīng)已卷。而在物聯(lián)網(wǎng)(IOT)環(huán)境中,設(shè)備會很受制于環(huán)境的影響淳蔼,比如帶寬低侧蘸、網(wǎng)絡(luò)延遲高、網(wǎng)絡(luò)通信不穩(wěn)定等鹉梨,顯然異步消息協(xié)議更為適合IOT應(yīng)用程序讳癌。
HTTP是單向的,如果要獲取消息客戶端必須發(fā)起連接存皂,而在物聯(lián)網(wǎng)(IOT)應(yīng)用程序中晌坤,設(shè)備或傳感器往往都是客戶端,這意味著它們無法被動地接收來自網(wǎng)絡(luò)的命令旦袋。
通常需要將一條命令或者消息骤菠,發(fā)送到網(wǎng)絡(luò)上的所有設(shè)備上。HTTP要實現(xiàn)這樣的功能不但很困難疤孕,而且成本極高商乎。

1.1.4 名詞介紹

  • Publisher(發(fā)布者):消息的發(fā)出者,負(fù)責(zé)發(fā)送消息
  • Subscriber(訂閱者):消息的訂閱者祭阀,負(fù)責(zé)接收并處理消息
  • Broker(代理):消息代理鹉戚,位于消息發(fā)布者和訂閱者之間,各類支持MQTT協(xié)議的消息中間件都可以充當(dāng)
  • Topic(主題):可以理解為消息隊列中的路由专控,訂閱者訂閱了主題之后抹凳,就可以收到發(fā)送到該主題的消息。
  • Payload(負(fù)載)踩官;可以理解為發(fā)送消息的內(nèi)容却桶。
  • QoS(消息質(zhì)量):全稱Quality of Service,即消息的發(fā)送質(zhì)量蔗牡,主要有QoS 0颖系、QoS 1、QoS 2三個等級辩越,下面分別介紹下:
    • QoS 0(Almost Once):至多一次嘁扼,只發(fā)送一次,會發(fā)生消息丟失或重復(fù)黔攒;
    • QoS 1(Atleast Once):至少一次趁啸,確保消息到達(dá),但消息重復(fù)可能會發(fā)生督惰;
    • QoS 2(Exactly Once):只有一次不傅,確保消息只到達(dá)一次

1.2 MQTT控制報文的結(jié)構(gòu)

MQTT通過交換一些預(yù)定義的MQTT控制報文來工作,每條MQTT命令消息的消息頭都包含一個固定的報頭赏胚,有些消息會攜帶一個可變報文頭和一個負(fù)荷访娶。消息格式如下:

固定包頭,存在于所有MQTT控制包
可變包頭觉阅,存在于某些MQTT控制包
載荷崖疤,存在于某些MQTT控制包

1.2.1 固定報文頭(Fixed Header)

MQTT固定報文頭最少有兩個字節(jié),第一個字節(jié)包含消息類型(Message Type)QoS級別等標(biāo)志位典勇。第二個字節(jié)開始是剩余長度字段劫哼,該長度是后面的可變報文頭加消息負(fù)載的總長度,該字段最多允許四個字節(jié)割笙。

剩余長度使用了一種可變長度的結(jié)構(gòu)來編碼权烧,這種結(jié)構(gòu)使用單一字節(jié)表示0-127的值。大于127的值如下處理伤溉。每個字節(jié)的低7位用來編碼數(shù)據(jù)豪嚎,最高位用來表示是否還有后續(xù)字節(jié)。因此每個字節(jié)可以編碼128個值谈火,再加上一個標(biāo)識位侈询。剩余長度最多可以用四個字節(jié)來表示。

例如十進(jìn)制的數(shù)字64可以被編碼成一個單獨的字節(jié)糯耍,十進(jìn)制為64扔字,八進(jìn)制為0x40。十進(jìn)制數(shù)字321(=65+2×128)被編碼為兩個字節(jié)温技,低位在前革为。第一個字節(jié)是65+128 = 193。注意最高位的128表示后面至少還有一個字節(jié)舵鳞。第二個字節(jié)是2震檩,表示2*127。(注:321 = 11000001 00000010孔庭,第一個字節(jié)是“標(biāo)識符后面還有一個字節(jié)”+65注盈,第二個字節(jié)是“標(biāo)識符后面沒有字節(jié)了”+256)荞膘。

1.2.2 可變報文頭(Variable Header)

可變報文頭主要包含協(xié)議名履怯、協(xié)議版本富稻、連接標(biāo)志(Connect Flags)熏兄、心跳間隔時間(Keep Alive timer)盾沫、連接返回碼(Connect Return Code)病附、主題名(Topic Name)

1.2.3 有效負(fù)荷和消息類型

有效負(fù)荷(Payload)沸毁,可以理解為消息主題(body)
當(dāng) MQTT 發(fā)送的消息類型是 CONNECT(連接)峰髓、PUBLISH(發(fā)布)、SUBSCRIBE(訂閱)息尺、SUBACK(訂閱確認(rèn))携兵、則會帶有負(fù)荷。

各種類型消息的控制報文參考:https://mcxiaoke.gitbooks.io/mqtt-cn/content/mqtt/03-ControlPackets.html

MQTT的消息類型(Message Type)(控制報文類型)

名字 報文流動方向 描述
Reserved 0 禁止 保留
CONNECT 1 客戶端到服務(wù)端 客戶端請求連接服務(wù)端
CONNACK 2 服務(wù)端到客戶端 連接報文確認(rèn)
PUBLISH 3 兩個方向都允許 發(fā)布消息
PUBACK 4 兩個方向都允許 QoS 1消息發(fā)布收到確認(rèn)
PUBREC 5 兩個方向都允許 發(fā)布收到(保證交付第一步)
PUBREL 6 兩個方向都允許 發(fā)布釋放(保證交付第二步)
PUBCOMP 7 兩個方向都允許 QoS 2消息發(fā)布完成(保證交互第三步)
SUBSCRIBE 8 客戶端到服務(wù)端 客戶端訂閱請求
SUBACK 9 服務(wù)端到客戶端 訂閱請求報文確認(rèn)
UNSUBSCRIBE 10 客戶端到服務(wù)端 客戶端取消訂閱請求
UNSUBACK 11 服務(wù)端到客戶端 取消訂閱報文確認(rèn)
PINGREQ 12 客戶端到服務(wù)端 心跳請求
PINGRESP 13 服務(wù)端到客戶端 心跳響應(yīng)
DISCONNECT 14 客戶端到服務(wù)端 客戶端斷開連接
Reserved 15 禁止 保留

1.2.4 消息質(zhì)量(QoS)

消息質(zhì)量(QoS):

  • QoS 0:最多分發(fā)一次搂誉。消息的傳遞完全依賴于底層的TCP/IP協(xié)議徐紧,協(xié)議里沒有定義應(yīng)答和重試,消息要么只會到達(dá)服務(wù)端一次勒葱,要么根本沒有到達(dá)浪汪。
    只會發(fā)送一次,不管成不成功
  • QoS 1:至少分發(fā)一次凛虽。服務(wù)器的消息接收由PUBACK消息進(jìn)行確認(rèn)死遭,如果通信鏈路或發(fā)送設(shè)備異常,或者指定時間內(nèi)沒有收到確認(rèn)消息凯旋,發(fā)送端會重發(fā)這條在消息頭中設(shè)置了DUP位的消息呀潭。
    未成功會繼續(xù)發(fā)送,直到成功至非,可能會收到多次
  • QoS 2:只分發(fā)一次钠署。這是最高級別的消息傳遞,消息丟失和重復(fù)都是不可接受的荒椭,使用這個服務(wù)質(zhì)量等級會有額外的開銷谐鼎。
    未成功會繼續(xù)發(fā)送,但會保證只收到一次

通過下面的例子可以更深刻的理解上面三個傳輸質(zhì)量等級趣惠。
比如目前流行的共享單車智能鎖狸棍,智能鎖可以定時使用QoS level 0質(zhì)量消息請求服務(wù)器,發(fā)送單車的當(dāng)前位置味悄,如果服務(wù)器沒收到也沒關(guān)系草戈,反正過一段時間又會再發(fā)送一次。
之后用戶可以通過App查詢周圍單車位置侍瑟,找到單車后需要進(jìn)行解鎖唐片,這時候可以使用QoS level 1質(zhì)量消息,手機App不斷的發(fā)送解鎖消息給單車鎖,確保有一次消息能達(dá)到以解鎖單車费韭。
最后用戶用完單車后茧球,需要提交付款表單,可以使用QoS level 2質(zhì)量消息揽思,這樣確保只傳遞一次數(shù)據(jù)袜腥,否則用戶就會多付錢了见擦。

1.3 搭建MQTT服務(wù)

1.3.1 直接使用MQTT搭建

在Linux上搭建MQTT服務(wù)
打開EMQ官網(wǎng):https://www.emqx.io/cn/products/broker
點擊開始試用

image.png

選擇服務(wù)器對應(yīng)版本


image.png

復(fù)制下載命令到ssh工具中執(zhí)行


image.png

下載完成
下載完成后執(zhí)行安裝命令


image.png

安裝成功后執(zhí)行命令:

sudo emqx start

出現(xiàn)以下信息表示啟動成功


image.png

瀏覽器訪問ip:18083進(jìn)入管理界面钉汗,默認(rèn)賬號為admin,密碼為public

image.png

1.3.2 使用RabbitMQ搭建MQTT

點擊了解 RabbitMQ 搭建
搭建完 RabbitMQ 后鲤屡,啟用 RabbitMQMQTT 插件了损痰,默認(rèn)是不啟用的,使用如下命令開啟即可:rabbitmq-plugins enable rabbitmq_mqtt
開啟成功后酒来,查看管理控制臺卢未,我們可以發(fā)現(xiàn)MQTT服務(wù)運行在1883端口上了。

image.png

如果使用 RabbitMQ與Web 端交互堰汉,那么底層使用的是 WebSocket辽社,所以需要開啟 RabbitMQMQTT WEB支持,使用如下命令開啟即可:rabbitmq-plugins enable rabbitmq_web_mqtt
開啟成功后翘鸭,查看管理控制臺滴铅,我們可以發(fā)現(xiàn)MQTT的WEB服務(wù)運行在15675端口上了;
image.png

1.4 使用RabbitMQ的SpringBoot操作MQTT

1.4.1 pom.xml

<!--Spring集成MQTT-->
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-mqtt</artifactId>
</dependency>

1.4.2 配置文件

在application.yml中添加MQTT相關(guān)配置就乓,主要是訪問地址汉匙、用戶名密碼、默認(rèn)主題信息生蚁;

rabbitmq:
  mqtt:
    url: tcp://localhost:1883
    username: guest
    password: guest
    defaultTopic: testTopic

編寫一個Java配置類從配置文件中讀取配置便于使用噩翠;

1.4.3 配置類

@Data
@EqualsAndHashCode(callSuper = false)
@Component
@ConfigurationProperties(prefix = "rabbitmq.mqtt")
public class MqttConfig {
    /**
     * RabbitMQ連接用戶名
     */
    private String username;
    /**
     * RabbitMQ連接密碼
     */
    private String password;
    /**
     * RabbitMQ的MQTT默認(rèn)topic
     */
    private String defaultTopic;
    /**
     * RabbitMQ的MQTT連接地址
     */
    private String url;
}

1.4.4 訂閱類

添加MQTT消息訂閱者相關(guān)配置,使用@ServiceActivator注解聲明一個服務(wù)激活器邦投,通過MessageHandler來處理訂閱消息伤锚;

/**
 * MQTT消息訂閱者相關(guān)配置
 */
@Slf4j
@Configuration
public class MqttInboundConfig {
    @Autowired
    private MqttConfig mqttConfig;

    @Bean
    public MessageChannel mqttInputChannel() {
        return new DirectChannel();
    }

    @Bean
    public MessageProducer inbound() {
        MqttPahoMessageDrivenChannelAdapter adapter =
                new MqttPahoMessageDrivenChannelAdapter(mqttConfig.getUrl(), "subscriberClient",
                        mqttConfig.getDefaultTopic());
        adapter.setCompletionTimeout(5000);
        adapter.setConverter(new DefaultPahoMessageConverter());
        //設(shè)置消息質(zhì)量:0->至多一次;1->至少一次志衣;2->只有一次
        adapter.setQos(1);
        adapter.setOutputChannel(mqttInputChannel());
        return adapter;
    }

    @Bean
    @ServiceActivator(inputChannel = "mqttInputChannel")
    public MessageHandler handler() {
        return new MessageHandler() {

            @Override
            public void handleMessage(Message<?> message) throws MessagingException {
                //處理訂閱消息
                log.info("handleMessage : {}",message.getPayload());
            }

        };
    }
}

1.4.5 發(fā)布

添加MQTT消息發(fā)布者相關(guān)配置屯援;

/**
 * MQTT消息發(fā)布者相關(guān)配置
 */
@Configuration
public class MqttOutboundConfig {

    @Autowired
    private MqttConfig mqttConfig;

    @Bean
    public MqttPahoClientFactory mqttClientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        MqttConnectOptions options = new MqttConnectOptions();
        options.setServerURIs(new String[] { mqttConfig.getUrl()});
        options.setUserName(mqttConfig.getUsername());
        options.setPassword(mqttConfig.getPassword().toCharArray());
        factory.setConnectionOptions(options);
        return factory;
    }

    @Bean
    @ServiceActivator(inputChannel = "mqttOutboundChannel")
    public MessageHandler mqttOutbound() {
        MqttPahoMessageHandler messageHandler =
                new MqttPahoMessageHandler("publisherClient", mqttClientFactory());
        messageHandler.setAsync(true);
        messageHandler.setDefaultTopic(mqttConfig.getDefaultTopic());
        return messageHandler;
    }

    @Bean
    public MessageChannel mqttOutboundChannel() {
        return new DirectChannel();
    }
}

1.4.6 網(wǎng)關(guān)和測試

添加MQTT網(wǎng)關(guān),用于向主題中發(fā)送消息蠢涝;

/**
 * MQTT網(wǎng)關(guān)玄呛,通過接口將數(shù)據(jù)傳遞到集成流
 */
@Component
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttGateway {
    /**
     * 發(fā)送消息到默認(rèn)topic
     */
    void sendToMqtt(String payload);

    /**
     * 發(fā)送消息到指定topic
     */
    void sendToMqtt(String payload, @Header(MqttHeaders.TOPIC) String topic);

    /**
     * 發(fā)送消息到指定topic并設(shè)置QOS
     */
    void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);
}

添加MQTT測試接口,使用MQTT網(wǎng)關(guān)向特定主題中發(fā)送消息和二;

/**
 * MQTT測試接口
 */
@Api(tags = "MqttController", description = "MQTT測試接口")
@RestController
@RequestMapping("/mqtt")
public class MqttController {

    @Autowired
    private MqttGateway mqttGateway;

    @PostMapping("/sendToDefaultTopic")
    @ApiOperation("向默認(rèn)主題發(fā)送消息")
    public CommonResult sendToDefaultTopic(String payload) {
        mqttGateway.sendToMqtt(payload);
        return CommonResult.success(null);
    }

    @PostMapping("/sendToTopic")
    @ApiOperation("向指定主題發(fā)送消息")
    public CommonResult sendToTopic(String payload, String topic) {
        mqttGateway.sendToMqtt(payload, topic);
        return CommonResult.success(null);
    }
}

1.5 不使用RabbitMQ的SpringBoot搭建提供端

1.5.1 pom.xml

<!--mqtt相關(guān)依賴-->
<dependency>
  <groupId>org.springframework.integration</groupId>
  <artifactId>spring-integration-stream</artifactId>
</dependency>
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
  <groupId>org.springframework.integration</groupId>
  <artifactId>spring-integration-mqtt</artifactId>
</dependency>

1.5.2 修改配置文件

spring:
  application:
    name: provider
  #MQTT配置信息
  mqtt:
    #MQTT服務(wù)端地址徘铝,端口默認(rèn)為1883,如果有多個,用逗號隔開惕它,如tcp://127.0.0.1:1883,tcp://192.168.60.133:1883
    url: tcp://ip:1883
    #用戶名
    username: admin
    #密碼
    password: public
    #客戶端id(不能重復(fù))
    client:
      id: provider-id
    #MQTT默認(rèn)的消息推送主題怕午,實際可在調(diào)用接口時指定
    default:
      topic: topic
server:
  port: 8081

1.5.3 消息發(fā)布者客戶端配置

import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;

@Configuration
@Slf4j
public class MqttProviderConfig {

    @Value("${spring.mqtt.username}")
    private String username;

    @Value("${spring.mqtt.password}")
    private String password;

    @Value("${spring.mqtt.url}")
    private String hostUrl;

    @Value("${spring.mqtt.client.id}")
    private String clientId;

    @Value("${spring.mqtt.default.topic}")
    private String defaultTopic;

    /**
     * 客戶端對象
     */
    private MqttClient client;

    /**
     * 客戶端連接服務(wù)端
     */

    public void connect(){
        try {
            //創(chuàng)建MQTT客戶端對象
            client = new MqttClient(hostUrl,clientId,new MemoryPersistence());
            //連接設(shè)置
            MqttConnectOptions options = new MqttConnectOptions();
            //是否清空session,設(shè)置為false表示服務(wù)器會保留客戶端的連接記錄(訂閱主題淹魄,qos)郁惜,客戶端重連之后能獲取到服務(wù)器在客戶端斷開連接期間推送的消息
            //設(shè)置為true表示每次連接到服務(wù)端都是以新的身份
            options.setCleanSession(true);
            //設(shè)置連接用戶名
            options.setUserName(username);
            //設(shè)置連接密碼
            options.setPassword(password.toCharArray());
            //設(shè)置超時時間,單位為秒
            options.setConnectionTimeout(100);
            //設(shè)置心跳時間 單位為秒甲锡,表示服務(wù)器每隔1.5*20秒的時間向客戶端發(fā)送心跳判斷客戶端是否在線
            options.setKeepAliveInterval(20);
            //設(shè)置遺囑消息的話題兆蕉,若客戶端和服務(wù)器之間的連接意外斷開,服務(wù)器將發(fā)布客戶端的遺囑信息
            options.setWill("willTopic",(clientId + "與服務(wù)器斷開連接").getBytes(),0,false);
            //設(shè)置回調(diào)
            client.setCallback(new MqttProviderCallBack());
            client.connect(options);
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }

    /**
    * 發(fā)布消息
    * @param qos  服務(wù)質(zhì)量等級
    *  0 只會發(fā)送一次缤沦,不管成不成功
    *  1 未成功會繼續(xù)發(fā)送虎韵,直到成功,可能會收到多次
    *  2 未成功會繼續(xù)發(fā)送缸废,但會保證只收到一次
    * @param retained  保留標(biāo)志
    * 如果設(shè)置為true包蓝,服務(wù)端必須存儲這個應(yīng)用消息和它的服務(wù)質(zhì)量等級,當(dāng)有訂閱者訂閱這個主題時企量,會把消息推送給這個訂閱者
   * 但服務(wù)端對同一個主題只會保留一條retained消息(最后收到的那條)
   */
   public void publish(int qos,boolean retained,String topic,String message){
        MqttMessage mqttMessage = new MqttMessage();
        mqttMessage.setQos(qos);
        mqttMessage.setRetained(retained);
        mqttMessage.setPayload(message.getBytes());
        //主題目的地测萎,用于發(fā)布/訂閱消息
        MqttTopic mqttTopic = client.getTopic(topic);
        //提供一種機制來跟蹤消息的傳遞進(jìn)度。
        //用于在以非阻塞方式(在后臺運行)執(zhí)行發(fā)布時跟蹤消息的傳遞進(jìn)度
        MqttDeliveryToken token;
        try {
            //將指定消息發(fā)布到主題届巩,但不等待消息傳遞完成硅瞧。返回的token可用于跟蹤消息的傳遞狀態(tài)。
            //一旦此方法干凈地返回姆泻,消息就已被客戶端接受發(fā)布零酪。當(dāng)連接可用時,將在后臺完成消息傳遞拇勃。
            token = mqttTopic.publish(mqttMessage);
            token.waitForCompletion();
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }
}

1.5.4 消息發(fā)布客戶端回調(diào)

import org.eclipse.paho.client.mqttv3.IMqttAsyncClient;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;


@Configuration
public class MqttProviderCallBack implements MqttCallback {

    @Value("${spring.mqtt.client.id}")
    private String clientId;

    /**
     * 與服務(wù)器斷開連接的回調(diào)
     */
    @Override
    public void connectionLost(Throwable throwable) {
        System.out.println(clientId + "與服務(wù)器斷開連接");
    }

    /**
     * 消息到達(dá)的回調(diào)
     */
    @Override
    public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {

    }

    /**
     * 消息發(fā)布成功的回調(diào)
     */
    @Override
    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
        IMqttAsyncClient client = iMqttDeliveryToken.getClient();
        System.out.println(client.getClientId() + "發(fā)布消息成功四苇!");
    }
}

1.5.5 創(chuàng)建控制器測試發(fā)布消息

import com.xdemo.mqttprovider.mqtt.MqttProviderConfig;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;

@Controller
public class SendController {

    @Autowired
    private MqttProviderConfig providerClient;

    @RequestMapping("/sendMessage")
    @ResponseBody
    public String sendMessage(int qos,boolean retained,String topic,String message){
        try {
            providerClient.publish(qos,retained,topic,message);
            return "發(fā)送成功";
        }catch (Exception e){
            e.printStackTrace();
            return "發(fā)送失敗";
        }
    }
}

1.6 不使用RabbitMQ的SpringBoot搭建消費端

在父工程下創(chuàng)建一個Springboot項目作為消息消費者,導(dǎo)入以下依賴

1.6.1 pom.xml

<!--mqtt相關(guān)依賴-->
<dependency>
  <groupId>org.springframework.integration</groupId>
  <artifactId>spring-integration-stream</artifactId>
</dependency>
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
  <groupId>org.springframework.integration</groupId>
  <artifactId>spring-integration-mqtt</artifactId>
</dependency>

1.6.2 配置文件

spring:
  application:
    name: consumer
  #MQTT配置信息
  mqtt:
    #MQTT服務(wù)端地址方咆,端口默認(rèn)為1883月腋,如果有多個,用逗號隔開瓣赂,如tcp://127.0.0.1:1883,tcp://192.168.60.133:1883
    url: tcp://ip:1883
    #用戶名
    username: admin
    #密碼
    password: public
    #客戶端id(不能重復(fù))
    client:
      id: consumer-id
    #MQTT默認(rèn)的消息推送主題榆骚,實際可在調(diào)用接口時指定
    default:
      topic: topic
server:
  port: 8082

1.6.3 消費者客戶端配置

import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;

import javax.annotation.PostConstruct;


@Configuration
public class MqttConsumerConfig {
    @Value("${spring.mqtt.username}")
    private String username;

    @Value("${spring.mqtt.password}")
    private String password;

    @Value("${spring.mqtt.url}")
    private String hostUrl;

    @Value("${spring.mqtt.client.id}")
    private String clientId;

    @Value("${spring.mqtt.default.topic}")
    private String defaultTopic;

    /**
     * 客戶端對象
     */
    private MqttClient client;

    /**
     * 在bean初始化后連接到服務(wù)器
     */
    @PostConstruct
    public void init(){
        connect();
    }

    /**
     * 客戶端連接服務(wù)端
     */
    public void connect(){
        try {
            //創(chuàng)建MQTT客戶端對象
            client = new MqttClient(hostUrl,clientId,new MemoryPersistence());
            //連接設(shè)置
            MqttConnectOptions options = new MqttConnectOptions();
            //是否清空session,設(shè)置為false表示服務(wù)器會保留客戶端的連接記錄煌集,客戶端重連之后能獲取到服務(wù)器在客戶端斷開連接期間推送的消息
            //設(shè)置為true表示每次連接到服務(wù)端都是以新的身份
            options.setCleanSession(true);
            //設(shè)置連接用戶名
            options.setUserName(username);
            //設(shè)置連接密碼
            options.setPassword(password.toCharArray());
            //設(shè)置超時時間妓肢,單位為秒
            options.setConnectionTimeout(100);
            //設(shè)置心跳時間 單位為秒,表示服務(wù)器每隔1.5*20秒的時間向客戶端發(fā)送心跳判斷客戶端是否在線
            options.setKeepAliveInterval(20);
            //設(shè)置遺囑消息的話題苫纤,若客戶端和服務(wù)器之間的連接意外斷開碉钠,服務(wù)器將發(fā)布客戶端的遺囑信息
            options.setWill("willTopic",(clientId + "與服務(wù)器斷開連接").getBytes(),0,false);
            //設(shè)置回調(diào)
            client.setCallback(new MqttConsumerCallBack());
            client.connect(options);
            //訂閱主題
            //消息等級纲缓,和主題數(shù)組一一對應(yīng),服務(wù)端將按照指定等級給訂閱了主題的客戶端推送消息
            int[] qos = {1,1};
            //主題
            String[] topics = {"topic1","topic2"};
           //訂閱主題
            client.subscribe(topics,qos);
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }

    /**
     * 斷開連接
     */
    public void disConnect(){
        try {
            client.disconnect();
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }


    /**
     * 訂閱主題
     */
    public void subscribe(String topic,int qos){
        try {
            client.subscribe(topic,qos);
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }
}

1.6.4 消息消費者客戶端回調(diào)

import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;

public class MqttConsumerCallBack implements MqttCallback {
    /**
     * 客戶端斷開連接的回調(diào)
     */
    @Override
    public void connectionLost(Throwable throwable) {
        System.out.println("與服務(wù)器斷開連接喊废,可重連");
    }

    /**
     * 消息到達(dá)的回調(diào)
     */
    @Override
    public void messageArrived(String topic, MqttMessage message) throws Exception {
        System.out.println(String.format("接收消息主題 : %s",topic));
        System.out.println(String.format("接收消息Qos : %d",message.getQos()));
        System.out.println(String.format("接收消息內(nèi)容 : %s",new String(message.getPayload())));
        System.out.println(String.format("接收消息retained : %b",message.isRetained()));
    }

    /**
     * 消息發(fā)布成功的回調(diào)
     */
    @Override
    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {

    }
}

1.6.5 控制器提供手動建立連接和斷開連接方法

import com.xdemo.mqttconsumer.mqtt.MqttConsumerConfig;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;

@Controller
public class TestController {
    @Autowired
    private MqttConsumerConfig client;

    @Value("${spring.mqtt.client.id}")
    private String clientId;

    @RequestMapping("connect")
    @ResponseBody
    public String connect(){
        client.connect();
        return clientId + "連接到服務(wù)器";
    }

    @RequestMapping("disConnect")
    @ResponseBody
    public String disConnect(){
        client.disConnect();
        return clientId + "與服務(wù)器斷開連接";
    }
}

1.7 測試提供端和消費端

1.7.1 使用三分工具

可以使用 MQTT 客戶端來測試 MQTT 的即時通訊功能祝高,這里使用的是 MQTTBox 這個客戶端工具。
首先下載并安裝好 MQTTBox污筷,下載地址:http://workswithweb.com/mqttbox.html

image.png

點擊Create MQTT Client按鈕來創(chuàng)建一個MQTT客戶端工闺;
image.png

接下來對MQTT客戶端進(jìn)行配置,主要是配置好協(xié)議端口瓣蛀、連接用戶名密碼和QoS即可陆蟆;
image.png

再配置一個訂閱者,訂閱者訂閱testTopicA這個主題揪惦,我們會向這個主題發(fā)送消息遍搞;
image.png

發(fā)布者向主題中發(fā)布消息罗侯,訂閱者可以實時接收到

1.7.2 使用代碼測試

分別啟動兩個項目器腋,可以在管理界面看到創(chuàng)建的兩個客戶端


image.png

調(diào)用發(fā)布消息接口發(fā)布消息


image.png

消費者控制臺打印


image.png

客戶端斷線消息恢復(fù)

把消費者與服務(wù)端斷開連接


image.png

再調(diào)用發(fā)布消息接口發(fā)送兩條消息到topic1,然后再把消費者連接到服務(wù)端


image.png

控制臺沒有東西打印


image.png

修改消費者客戶端配置钩杰,把setCleanSession改為false

image.png

重啟項目纫塌,把消費者客戶端斷開連接,調(diào)用發(fā)布消息接口發(fā)布兩條消息讲弄,再把消費者和服務(wù)端連接上


image.png
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末措左,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子避除,更是在濱河造成了極大的恐慌怎披,老刑警劉巖,帶你破解...
    沈念sama閱讀 218,525評論 6 507
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件瓶摆,死亡現(xiàn)場離奇詭異凉逛,居然都是意外死亡,警方通過查閱死者的電腦和手機群井,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,203評論 3 395
  • 文/潘曉璐 我一進(jìn)店門状飞,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人书斜,你說我怎么就攤上這事诬辈。” “怎么了荐吉?”我有些...
    開封第一講書人閱讀 164,862評論 0 354
  • 文/不壞的土叔 我叫張陵焙糟,是天一觀的道長。 經(jīng)常有香客問我样屠,道長穿撮,這世上最難降的妖魔是什么搓劫? 我笑而不...
    開封第一講書人閱讀 58,728評論 1 294
  • 正文 為了忘掉前任,我火速辦了婚禮混巧,結(jié)果婚禮上枪向,老公的妹妹穿的比我還像新娘。我一直安慰自己咧党,他們只是感情好秘蛔,可當(dāng)我...
    茶點故事閱讀 67,743評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著傍衡,像睡著了一般深员。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上蛙埂,一...
    開封第一講書人閱讀 51,590評論 1 305
  • 那天倦畅,我揣著相機與錄音,去河邊找鬼绣的。 笑死叠赐,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的屡江。 我是一名探鬼主播芭概,決...
    沈念sama閱讀 40,330評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼惩嘉!你這毒婦竟也來了罢洲?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,244評論 0 276
  • 序言:老撾萬榮一對情侶失蹤文黎,失蹤者是張志新(化名)和其女友劉穎惹苗,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體耸峭,經(jīng)...
    沈念sama閱讀 45,693評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡桩蓉,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,885評論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了抓艳。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片触机。...
    茶點故事閱讀 40,001評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖玷或,靈堂內(nèi)的尸體忽然破棺而出儡首,到底是詐尸還是另有隱情,我是刑警寧澤偏友,帶...
    沈念sama閱讀 35,723評論 5 346
  • 正文 年R本政府宣布蔬胯,位于F島的核電站,受9級特大地震影響位他,放射性物質(zhì)發(fā)生泄漏氛濒。R本人自食惡果不足惜产场,卻給世界環(huán)境...
    茶點故事閱讀 41,343評論 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望舞竿。 院中可真熱鬧京景,春花似錦、人聲如沸骗奖。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,919評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽执桌。三九已至鄙皇,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間仰挣,已是汗流浹背伴逸。 一陣腳步聲響...
    開封第一講書人閱讀 33,042評論 1 270
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留膘壶,地道東北人错蝴。 一個月前我還...
    沈念sama閱讀 48,191評論 3 370
  • 正文 我出身青樓,卻偏偏與公主長得像香椎,于是被迫代替她去往敵國和親漱竖。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,955評論 2 355

推薦閱讀更多精彩內(nèi)容