1 MQTT
1.1 MQTT介紹
1.1.1 簡介
MQTT
全稱(Message Queue Telemetry Transport
):一種基于發(fā)布/訂閱(publish/subscribe
)模式的輕量級通訊協(xié)議,通過訂閱相應(yīng)的主題來獲取消息酷麦,是物聯(lián)網(wǎng)(Internet of Thing
)中的一個標(biāo)準(zhǔn)傳輸協(xié)議矿卑。
MQTT
是一種基于發(fā)布/訂閱
模式的輕量級通訊協(xié)議,該協(xié)議構(gòu)建在TCP/IP
協(xié)議上沃饶。 MQTT
最大的有點在于可以以極少的代碼和有限的帶寬母廷,為遠(yuǎn)程設(shè)備提供實時可靠的消息服務(wù)。做為一種低開銷糊肤、低帶寬
占用的即時通訊協(xié)議徘意,MQTT
在物聯(lián)網(wǎng)、小型設(shè)備轩褐、移動應(yīng)用等方面有廣泛應(yīng)用椎咧。
MQTT
協(xié)議將消息的發(fā)布者(publisher
)與訂閱者(subscriber
)進(jìn)行分離,因此可以在不可靠的網(wǎng)絡(luò)環(huán)境中把介,為遠(yuǎn)程連接的設(shè)備提供可靠的消息服務(wù)勤讽,使用方式與傳統(tǒng)的MQ
有點類似。
TCP
協(xié)議位于傳輸層拗踢,MQTT
協(xié)議位于應(yīng)用層脚牍,MQTT
協(xié)議構(gòu)建于TCP/IP
協(xié)議上,也就是說只要支持TCP/IP
協(xié)議棧的地方巢墅,都可以使用MQTT
協(xié)議诸狭。
1.1.2 特點和應(yīng)用
特點:
- 開放消息協(xié)議,簡單易實現(xiàn)
- 發(fā)布訂閱模式君纫,一對多消息發(fā)布
- 基于
TCP/IP
網(wǎng)絡(luò)連接驯遇,提供有序,無損蓄髓,雙向連接
- 2字節(jié)固定報頭叉庐,2字節(jié)心跳報文,最小化傳輸開銷和協(xié)議交換会喝,有效減少網(wǎng)絡(luò)流量
- 消息
QoS
支持陡叠,可靠傳輸保證
應(yīng)用:
- 物聯(lián)網(wǎng)M2M通信玩郊,物聯(lián)網(wǎng)大數(shù)據(jù)采集
- Android消息推送,WEB消息推送
- 智能硬件枉阵、智能家具译红、智能電器
- 車聯(lián)網(wǎng)通信,電動車站樁采集
- 智慧城市兴溜、遠(yuǎn)程醫(yī)療临庇、遠(yuǎn)程教育
- 電力、石油與能源等行業(yè)市場
1.1.3 為什么要用 MQTT協(xié)議
MQTT
協(xié)議為什么在物聯(lián)網(wǎng)(IOT
)中如此受偏愛昵慌?而不是其它協(xié)議假夺,比如我們更為熟悉的 HTTP
協(xié)議呢?
首先HTTP
協(xié)議它是一種同步協(xié)議
斋攀,客戶端請求后需要等待服務(wù)器的響應(yīng)已卷。而在物聯(lián)網(wǎng)(IOT
)環(huán)境中,設(shè)備會很受制于環(huán)境的影響淳蔼,比如帶寬低侧蘸、網(wǎng)絡(luò)延遲高、網(wǎng)絡(luò)通信
不穩(wěn)定等鹉梨,顯然異步消息協(xié)議更為適合IOT
應(yīng)用程序讳癌。
HTTP
是單向的,如果要獲取消息客戶端必須發(fā)起連接存皂,而在物聯(lián)網(wǎng)(IOT
)應(yīng)用程序中晌坤,設(shè)備或傳感器往往都是客戶端,這意味著它們無法被動地接收來自網(wǎng)絡(luò)的命令旦袋。
通常需要將一條命令或者消息骤菠,發(fā)送到網(wǎng)絡(luò)上的所有設(shè)備上。HTTP
要實現(xiàn)這樣的功能不但很困難疤孕,而且成本極高商乎。
1.1.4 名詞介紹
-
Publisher
(發(fā)布者):消息的發(fā)出者,負(fù)責(zé)發(fā)送消息 -
Subscriber
(訂閱者):消息的訂閱者祭阀,負(fù)責(zé)接收并處理消息 -
Broker
(代理):消息代理鹉戚,位于消息發(fā)布者和訂閱者之間,各類支持MQTT
協(xié)議的消息中間件都可以充當(dāng) -
Topic
(主題):可以理解為消息隊列中的路由专控,訂閱者訂閱了主題之后抹凳,就可以收到發(fā)送到該主題的消息。 -
Payload
(負(fù)載)踩官;可以理解為發(fā)送消息的內(nèi)容却桶。 -
QoS
(消息質(zhì)量):全稱Quality of Service,即消息的發(fā)送質(zhì)量蔗牡,主要有QoS 0颖系、QoS 1、QoS 2
三個等級辩越,下面分別介紹下:-
QoS 0
(Almost Once):至多一次嘁扼,只發(fā)送一次,會發(fā)生消息丟失或重復(fù)黔攒; -
QoS 1
(Atleast Once):至少一次趁啸,確保消息到達(dá),但消息重復(fù)可能會發(fā)生督惰; -
QoS 2
(Exactly Once):只有一次不傅,確保消息只到達(dá)一次
-
1.2 MQTT控制報文的結(jié)構(gòu)
MQTT
通過交換一些預(yù)定義的MQTT
控制報文來工作,每條MQTT
命令消息的消息頭都包含一個固定的報頭
赏胚,有些消息會攜帶一個可變報文頭
和一個負(fù)荷
访娶。消息格式如下:
固定包頭,存在于所有MQTT控制包
可變包頭觉阅,存在于某些MQTT控制包
載荷崖疤,存在于某些MQTT控制包
1.2.1 固定報文頭(Fixed Header)
MQTT
固定報文頭最少有兩個字節(jié)
,第一個字節(jié)包含消息類型(Message Type)
和QoS級別
等標(biāo)志位典勇。第二個字節(jié)開始是剩余長度
字段劫哼,該長度是后面的可變報文頭加消息負(fù)載的總長度,該字段最多允許四個字節(jié)割笙。
剩余長度
使用了一種可變長度的結(jié)構(gòu)來編碼权烧,這種結(jié)構(gòu)使用單一字節(jié)
表示0-127
的值。大于127的值如下處理伤溉。每個字節(jié)的低7位用來編碼數(shù)據(jù)豪嚎,最高位用來表示是否還有后續(xù)字節(jié)。因此每個字節(jié)可以編碼128個值谈火,再加上一個標(biāo)識位侈询。剩余長度最多可以用四個字節(jié)來表示。
例如十進(jìn)制的數(shù)字64可以被編碼成一個單獨的字節(jié)糯耍,十進(jìn)制為64扔字,八進(jìn)制為0x40。十進(jìn)制數(shù)字321(=65+2×128)被編碼為兩個字節(jié)温技,低位在前革为。第一個字節(jié)是65+128 = 193。注意最高位的128表示后面至少還有一個字節(jié)舵鳞。第二個字節(jié)是2震檩,表示2*127。(注:321 = 11000001 00000010孔庭,第一個字節(jié)是“標(biāo)識符后面還有一個字節(jié)”+65注盈,第二個字節(jié)是“標(biāo)識符后面沒有字節(jié)了”+256)荞膘。
1.2.2 可變報文頭(Variable Header)
可變報文頭主要包含協(xié)議名
履怯、協(xié)議版本
富稻、連接標(biāo)志(Connect Flags)
熏兄、心跳間隔時間(Keep Alive timer)
盾沫、連接返回碼(Connect Return Code)
病附、主題名(Topic Name)
等
1.2.3 有效負(fù)荷和消息類型
有效負(fù)荷(Payload)沸毁,可以理解為消息主題(body)
當(dāng) MQTT
發(fā)送的消息類型是 CONNECT
(連接)峰髓、PUBLISH
(發(fā)布)、SUBSCRIBE
(訂閱)息尺、SUBACK
(訂閱確認(rèn))携兵、則會帶有負(fù)荷。
各種類型消息的控制報文參考:https://mcxiaoke.gitbooks.io/mqtt-cn/content/mqtt/03-ControlPackets.html
MQTT
的消息類型(Message Type)(控制報文類型)
名字 | 值 | 報文流動方向 | 描述 |
---|---|---|---|
Reserved | 0 | 禁止 | 保留 |
CONNECT | 1 | 客戶端到服務(wù)端 | 客戶端請求連接服務(wù)端 |
CONNACK | 2 | 服務(wù)端到客戶端 | 連接報文確認(rèn) |
PUBLISH | 3 | 兩個方向都允許 | 發(fā)布消息 |
PUBACK | 4 | 兩個方向都允許 | QoS 1消息發(fā)布收到確認(rèn) |
PUBREC | 5 | 兩個方向都允許 | 發(fā)布收到(保證交付第一步) |
PUBREL | 6 | 兩個方向都允許 | 發(fā)布釋放(保證交付第二步) |
PUBCOMP | 7 | 兩個方向都允許 | QoS 2消息發(fā)布完成(保證交互第三步) |
SUBSCRIBE | 8 | 客戶端到服務(wù)端 | 客戶端訂閱請求 |
SUBACK | 9 | 服務(wù)端到客戶端 | 訂閱請求報文確認(rèn) |
UNSUBSCRIBE | 10 | 客戶端到服務(wù)端 | 客戶端取消訂閱請求 |
UNSUBACK | 11 | 服務(wù)端到客戶端 | 取消訂閱報文確認(rèn) |
PINGREQ | 12 | 客戶端到服務(wù)端 | 心跳請求 |
PINGRESP | 13 | 服務(wù)端到客戶端 | 心跳響應(yīng) |
DISCONNECT | 14 | 客戶端到服務(wù)端 | 客戶端斷開連接 |
Reserved | 15 | 禁止 | 保留 |
1.2.4 消息質(zhì)量(QoS)
消息質(zhì)量(QoS
):
-
QoS 0
:最多分發(fā)一次搂誉。消息的傳遞完全依賴于底層的TCP/IP
協(xié)議徐紧,協(xié)議里沒有定義應(yīng)答和重試,消息要么只會到達(dá)服務(wù)端一次勒葱,要么根本沒有到達(dá)浪汪。
只會發(fā)送一次,不管成不成功 -
QoS 1
:至少分發(fā)一次凛虽。服務(wù)器的消息接收由PUBACK
消息進(jìn)行確認(rèn)死遭,如果通信鏈路或發(fā)送設(shè)備異常,或者指定時間內(nèi)沒有收到確認(rèn)消息凯旋,發(fā)送端會重發(fā)這條在消息頭中設(shè)置了DUP位的消息呀潭。
未成功會繼續(xù)發(fā)送,直到成功至非,可能會收到多次 -
QoS 2
:只分發(fā)一次钠署。這是最高級別的消息傳遞,消息丟失和重復(fù)都是不可接受的荒椭,使用這個服務(wù)質(zhì)量等級會有額外的開銷谐鼎。
未成功會繼續(xù)發(fā)送,但會保證只收到一次
通過下面的例子可以更深刻的理解上面三個傳輸質(zhì)量等級趣惠。
比如目前流行的共享單車智能鎖狸棍,智能鎖可以定時使用QoS level 0
質(zhì)量消息請求服務(wù)器,發(fā)送單車的當(dāng)前位置味悄,如果服務(wù)器沒收到也沒關(guān)系草戈,反正過一段時間又會再發(fā)送一次。
之后用戶可以通過App查詢周圍單車位置侍瑟,找到單車后需要進(jìn)行解鎖唐片,這時候可以使用QoS level 1
質(zhì)量消息,手機App不斷的發(fā)送解鎖消息給單車鎖,確保有一次消息能達(dá)到以解鎖單車费韭。
最后用戶用完單車后茧球,需要提交付款表單,可以使用QoS level 2
質(zhì)量消息揽思,這樣確保只傳遞一次數(shù)據(jù)袜腥,否則用戶就會多付錢了见擦。
1.3 搭建MQTT服務(wù)
1.3.1 直接使用MQTT搭建
在Linux上搭建MQTT服務(wù)
打開EMQ官網(wǎng):https://www.emqx.io/cn/products/broker
點擊開始試用
選擇服務(wù)器對應(yīng)版本
復(fù)制下載命令到ssh工具中執(zhí)行
下載完成
下載完成后執(zhí)行安裝命令
安裝成功后執(zhí)行命令:
sudo emqx start
出現(xiàn)以下信息表示啟動成功
瀏覽器訪問ip:18083
進(jìn)入管理界面钉汗,默認(rèn)賬號為admin,密碼為public
1.3.2 使用RabbitMQ搭建MQTT
點擊了解 RabbitMQ 搭建
搭建完 RabbitMQ
后鲤屡,啟用 RabbitMQ
的 MQTT
插件了损痰,默認(rèn)是不啟用的,使用如下命令開啟即可:rabbitmq-plugins enable rabbitmq_mqtt
開啟成功后酒来,查看管理控制臺卢未,我們可以發(fā)現(xiàn)MQTT服務(wù)運行在1883端口上了。
如果使用
RabbitMQ與Web
端交互堰汉,那么底層使用的是 WebSocket
辽社,所以需要開啟 RabbitMQ
的 MQTT WEB
支持,使用如下命令開啟即可:rabbitmq-plugins enable rabbitmq_web_mqtt
開啟成功后翘鸭,查看管理控制臺滴铅,我們可以發(fā)現(xiàn)MQTT的WEB服務(wù)運行在15675端口上了;
1.4 使用RabbitMQ的SpringBoot操作MQTT
1.4.1 pom.xml
<!--Spring集成MQTT-->
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
1.4.2 配置文件
在application.yml中添加MQTT相關(guān)配置就乓,主要是訪問地址汉匙、用戶名密碼、默認(rèn)主題信息生蚁;
rabbitmq:
mqtt:
url: tcp://localhost:1883
username: guest
password: guest
defaultTopic: testTopic
編寫一個Java配置類從配置文件中讀取配置便于使用噩翠;
1.4.3 配置類
@Data
@EqualsAndHashCode(callSuper = false)
@Component
@ConfigurationProperties(prefix = "rabbitmq.mqtt")
public class MqttConfig {
/**
* RabbitMQ連接用戶名
*/
private String username;
/**
* RabbitMQ連接密碼
*/
private String password;
/**
* RabbitMQ的MQTT默認(rèn)topic
*/
private String defaultTopic;
/**
* RabbitMQ的MQTT連接地址
*/
private String url;
}
1.4.4 訂閱類
添加MQTT消息訂閱者相關(guān)配置,使用@ServiceActivator
注解聲明一個服務(wù)激活器邦投,通過MessageHandler來處理訂閱消息伤锚;
/**
* MQTT消息訂閱者相關(guān)配置
*/
@Slf4j
@Configuration
public class MqttInboundConfig {
@Autowired
private MqttConfig mqttConfig;
@Bean
public MessageChannel mqttInputChannel() {
return new DirectChannel();
}
@Bean
public MessageProducer inbound() {
MqttPahoMessageDrivenChannelAdapter adapter =
new MqttPahoMessageDrivenChannelAdapter(mqttConfig.getUrl(), "subscriberClient",
mqttConfig.getDefaultTopic());
adapter.setCompletionTimeout(5000);
adapter.setConverter(new DefaultPahoMessageConverter());
//設(shè)置消息質(zhì)量:0->至多一次;1->至少一次志衣;2->只有一次
adapter.setQos(1);
adapter.setOutputChannel(mqttInputChannel());
return adapter;
}
@Bean
@ServiceActivator(inputChannel = "mqttInputChannel")
public MessageHandler handler() {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
//處理訂閱消息
log.info("handleMessage : {}",message.getPayload());
}
};
}
}
1.4.5 發(fā)布
添加MQTT消息發(fā)布者相關(guān)配置屯援;
/**
* MQTT消息發(fā)布者相關(guān)配置
*/
@Configuration
public class MqttOutboundConfig {
@Autowired
private MqttConfig mqttConfig;
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
MqttConnectOptions options = new MqttConnectOptions();
options.setServerURIs(new String[] { mqttConfig.getUrl()});
options.setUserName(mqttConfig.getUsername());
options.setPassword(mqttConfig.getPassword().toCharArray());
factory.setConnectionOptions(options);
return factory;
}
@Bean
@ServiceActivator(inputChannel = "mqttOutboundChannel")
public MessageHandler mqttOutbound() {
MqttPahoMessageHandler messageHandler =
new MqttPahoMessageHandler("publisherClient", mqttClientFactory());
messageHandler.setAsync(true);
messageHandler.setDefaultTopic(mqttConfig.getDefaultTopic());
return messageHandler;
}
@Bean
public MessageChannel mqttOutboundChannel() {
return new DirectChannel();
}
}
1.4.6 網(wǎng)關(guān)和測試
添加MQTT網(wǎng)關(guān),用于向主題中發(fā)送消息蠢涝;
/**
* MQTT網(wǎng)關(guān)玄呛,通過接口將數(shù)據(jù)傳遞到集成流
*/
@Component
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttGateway {
/**
* 發(fā)送消息到默認(rèn)topic
*/
void sendToMqtt(String payload);
/**
* 發(fā)送消息到指定topic
*/
void sendToMqtt(String payload, @Header(MqttHeaders.TOPIC) String topic);
/**
* 發(fā)送消息到指定topic并設(shè)置QOS
*/
void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);
}
添加MQTT測試接口,使用MQTT網(wǎng)關(guān)向特定主題中發(fā)送消息和二;
/**
* MQTT測試接口
*/
@Api(tags = "MqttController", description = "MQTT測試接口")
@RestController
@RequestMapping("/mqtt")
public class MqttController {
@Autowired
private MqttGateway mqttGateway;
@PostMapping("/sendToDefaultTopic")
@ApiOperation("向默認(rèn)主題發(fā)送消息")
public CommonResult sendToDefaultTopic(String payload) {
mqttGateway.sendToMqtt(payload);
return CommonResult.success(null);
}
@PostMapping("/sendToTopic")
@ApiOperation("向指定主題發(fā)送消息")
public CommonResult sendToTopic(String payload, String topic) {
mqttGateway.sendToMqtt(payload, topic);
return CommonResult.success(null);
}
}
1.5 不使用RabbitMQ的SpringBoot搭建提供端
1.5.1 pom.xml
<!--mqtt相關(guān)依賴-->
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
1.5.2 修改配置文件
spring:
application:
name: provider
#MQTT配置信息
mqtt:
#MQTT服務(wù)端地址徘铝,端口默認(rèn)為1883,如果有多個,用逗號隔開惕它,如tcp://127.0.0.1:1883,tcp://192.168.60.133:1883
url: tcp://ip:1883
#用戶名
username: admin
#密碼
password: public
#客戶端id(不能重復(fù))
client:
id: provider-id
#MQTT默認(rèn)的消息推送主題怕午,實際可在調(diào)用接口時指定
default:
topic: topic
server:
port: 8081
1.5.3 消息發(fā)布者客戶端配置
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
@Configuration
@Slf4j
public class MqttProviderConfig {
@Value("${spring.mqtt.username}")
private String username;
@Value("${spring.mqtt.password}")
private String password;
@Value("${spring.mqtt.url}")
private String hostUrl;
@Value("${spring.mqtt.client.id}")
private String clientId;
@Value("${spring.mqtt.default.topic}")
private String defaultTopic;
/**
* 客戶端對象
*/
private MqttClient client;
/**
* 客戶端連接服務(wù)端
*/
public void connect(){
try {
//創(chuàng)建MQTT客戶端對象
client = new MqttClient(hostUrl,clientId,new MemoryPersistence());
//連接設(shè)置
MqttConnectOptions options = new MqttConnectOptions();
//是否清空session,設(shè)置為false表示服務(wù)器會保留客戶端的連接記錄(訂閱主題淹魄,qos)郁惜,客戶端重連之后能獲取到服務(wù)器在客戶端斷開連接期間推送的消息
//設(shè)置為true表示每次連接到服務(wù)端都是以新的身份
options.setCleanSession(true);
//設(shè)置連接用戶名
options.setUserName(username);
//設(shè)置連接密碼
options.setPassword(password.toCharArray());
//設(shè)置超時時間,單位為秒
options.setConnectionTimeout(100);
//設(shè)置心跳時間 單位為秒甲锡,表示服務(wù)器每隔1.5*20秒的時間向客戶端發(fā)送心跳判斷客戶端是否在線
options.setKeepAliveInterval(20);
//設(shè)置遺囑消息的話題兆蕉,若客戶端和服務(wù)器之間的連接意外斷開,服務(wù)器將發(fā)布客戶端的遺囑信息
options.setWill("willTopic",(clientId + "與服務(wù)器斷開連接").getBytes(),0,false);
//設(shè)置回調(diào)
client.setCallback(new MqttProviderCallBack());
client.connect(options);
} catch (MqttException e) {
e.printStackTrace();
}
}
/**
* 發(fā)布消息
* @param qos 服務(wù)質(zhì)量等級
* 0 只會發(fā)送一次缤沦,不管成不成功
* 1 未成功會繼續(xù)發(fā)送虎韵,直到成功,可能會收到多次
* 2 未成功會繼續(xù)發(fā)送缸废,但會保證只收到一次
* @param retained 保留標(biāo)志
* 如果設(shè)置為true包蓝,服務(wù)端必須存儲這個應(yīng)用消息和它的服務(wù)質(zhì)量等級,當(dāng)有訂閱者訂閱這個主題時企量,會把消息推送給這個訂閱者
* 但服務(wù)端對同一個主題只會保留一條retained消息(最后收到的那條)
*/
public void publish(int qos,boolean retained,String topic,String message){
MqttMessage mqttMessage = new MqttMessage();
mqttMessage.setQos(qos);
mqttMessage.setRetained(retained);
mqttMessage.setPayload(message.getBytes());
//主題目的地测萎,用于發(fā)布/訂閱消息
MqttTopic mqttTopic = client.getTopic(topic);
//提供一種機制來跟蹤消息的傳遞進(jìn)度。
//用于在以非阻塞方式(在后臺運行)執(zhí)行發(fā)布時跟蹤消息的傳遞進(jìn)度
MqttDeliveryToken token;
try {
//將指定消息發(fā)布到主題届巩,但不等待消息傳遞完成硅瞧。返回的token可用于跟蹤消息的傳遞狀態(tài)。
//一旦此方法干凈地返回姆泻,消息就已被客戶端接受發(fā)布零酪。當(dāng)連接可用時,將在后臺完成消息傳遞拇勃。
token = mqttTopic.publish(mqttMessage);
token.waitForCompletion();
} catch (MqttException e) {
e.printStackTrace();
}
}
}
1.5.4 消息發(fā)布客戶端回調(diào)
import org.eclipse.paho.client.mqttv3.IMqttAsyncClient;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;
@Configuration
public class MqttProviderCallBack implements MqttCallback {
@Value("${spring.mqtt.client.id}")
private String clientId;
/**
* 與服務(wù)器斷開連接的回調(diào)
*/
@Override
public void connectionLost(Throwable throwable) {
System.out.println(clientId + "與服務(wù)器斷開連接");
}
/**
* 消息到達(dá)的回調(diào)
*/
@Override
public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
}
/**
* 消息發(fā)布成功的回調(diào)
*/
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
IMqttAsyncClient client = iMqttDeliveryToken.getClient();
System.out.println(client.getClientId() + "發(fā)布消息成功四苇!");
}
}
1.5.5 創(chuàng)建控制器測試發(fā)布消息
import com.xdemo.mqttprovider.mqtt.MqttProviderConfig;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
@Controller
public class SendController {
@Autowired
private MqttProviderConfig providerClient;
@RequestMapping("/sendMessage")
@ResponseBody
public String sendMessage(int qos,boolean retained,String topic,String message){
try {
providerClient.publish(qos,retained,topic,message);
return "發(fā)送成功";
}catch (Exception e){
e.printStackTrace();
return "發(fā)送失敗";
}
}
}
1.6 不使用RabbitMQ的SpringBoot搭建消費端
在父工程下創(chuàng)建一個Springboot項目作為消息消費者,導(dǎo)入以下依賴
1.6.1 pom.xml
<!--mqtt相關(guān)依賴-->
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
1.6.2 配置文件
spring:
application:
name: consumer
#MQTT配置信息
mqtt:
#MQTT服務(wù)端地址方咆,端口默認(rèn)為1883月腋,如果有多個,用逗號隔開瓣赂,如tcp://127.0.0.1:1883,tcp://192.168.60.133:1883
url: tcp://ip:1883
#用戶名
username: admin
#密碼
password: public
#客戶端id(不能重復(fù))
client:
id: consumer-id
#MQTT默認(rèn)的消息推送主題榆骚,實際可在調(diào)用接口時指定
default:
topic: topic
server:
port: 8082
1.6.3 消費者客戶端配置
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import javax.annotation.PostConstruct;
@Configuration
public class MqttConsumerConfig {
@Value("${spring.mqtt.username}")
private String username;
@Value("${spring.mqtt.password}")
private String password;
@Value("${spring.mqtt.url}")
private String hostUrl;
@Value("${spring.mqtt.client.id}")
private String clientId;
@Value("${spring.mqtt.default.topic}")
private String defaultTopic;
/**
* 客戶端對象
*/
private MqttClient client;
/**
* 在bean初始化后連接到服務(wù)器
*/
@PostConstruct
public void init(){
connect();
}
/**
* 客戶端連接服務(wù)端
*/
public void connect(){
try {
//創(chuàng)建MQTT客戶端對象
client = new MqttClient(hostUrl,clientId,new MemoryPersistence());
//連接設(shè)置
MqttConnectOptions options = new MqttConnectOptions();
//是否清空session,設(shè)置為false表示服務(wù)器會保留客戶端的連接記錄煌集,客戶端重連之后能獲取到服務(wù)器在客戶端斷開連接期間推送的消息
//設(shè)置為true表示每次連接到服務(wù)端都是以新的身份
options.setCleanSession(true);
//設(shè)置連接用戶名
options.setUserName(username);
//設(shè)置連接密碼
options.setPassword(password.toCharArray());
//設(shè)置超時時間妓肢,單位為秒
options.setConnectionTimeout(100);
//設(shè)置心跳時間 單位為秒,表示服務(wù)器每隔1.5*20秒的時間向客戶端發(fā)送心跳判斷客戶端是否在線
options.setKeepAliveInterval(20);
//設(shè)置遺囑消息的話題苫纤,若客戶端和服務(wù)器之間的連接意外斷開碉钠,服務(wù)器將發(fā)布客戶端的遺囑信息
options.setWill("willTopic",(clientId + "與服務(wù)器斷開連接").getBytes(),0,false);
//設(shè)置回調(diào)
client.setCallback(new MqttConsumerCallBack());
client.connect(options);
//訂閱主題
//消息等級纲缓,和主題數(shù)組一一對應(yīng),服務(wù)端將按照指定等級給訂閱了主題的客戶端推送消息
int[] qos = {1,1};
//主題
String[] topics = {"topic1","topic2"};
//訂閱主題
client.subscribe(topics,qos);
} catch (MqttException e) {
e.printStackTrace();
}
}
/**
* 斷開連接
*/
public void disConnect(){
try {
client.disconnect();
} catch (MqttException e) {
e.printStackTrace();
}
}
/**
* 訂閱主題
*/
public void subscribe(String topic,int qos){
try {
client.subscribe(topic,qos);
} catch (MqttException e) {
e.printStackTrace();
}
}
}
1.6.4 消息消費者客戶端回調(diào)
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
public class MqttConsumerCallBack implements MqttCallback {
/**
* 客戶端斷開連接的回調(diào)
*/
@Override
public void connectionLost(Throwable throwable) {
System.out.println("與服務(wù)器斷開連接喊废,可重連");
}
/**
* 消息到達(dá)的回調(diào)
*/
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
System.out.println(String.format("接收消息主題 : %s",topic));
System.out.println(String.format("接收消息Qos : %d",message.getQos()));
System.out.println(String.format("接收消息內(nèi)容 : %s",new String(message.getPayload())));
System.out.println(String.format("接收消息retained : %b",message.isRetained()));
}
/**
* 消息發(fā)布成功的回調(diào)
*/
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
}
}
1.6.5 控制器提供手動建立連接和斷開連接方法
import com.xdemo.mqttconsumer.mqtt.MqttConsumerConfig;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
@Controller
public class TestController {
@Autowired
private MqttConsumerConfig client;
@Value("${spring.mqtt.client.id}")
private String clientId;
@RequestMapping("connect")
@ResponseBody
public String connect(){
client.connect();
return clientId + "連接到服務(wù)器";
}
@RequestMapping("disConnect")
@ResponseBody
public String disConnect(){
client.disConnect();
return clientId + "與服務(wù)器斷開連接";
}
}
1.7 測試提供端和消費端
1.7.1 使用三分工具
可以使用 MQTT
客戶端來測試 MQTT
的即時通訊功能祝高,這里使用的是 MQTTBox
這個客戶端工具。
首先下載并安裝好 MQTTBox
污筷,下載地址:http://workswithweb.com/mqttbox.html
點擊Create MQTT Client按鈕來創(chuàng)建一個MQTT客戶端工闺;
接下來對MQTT客戶端進(jìn)行配置,主要是配置好協(xié)議端口瓣蛀、連接用戶名密碼和QoS即可陆蟆;
再配置一個訂閱者,訂閱者訂閱testTopicA這個主題揪惦,我們會向這個主題發(fā)送消息遍搞;
發(fā)布者向主題中發(fā)布消息罗侯,訂閱者可以實時接收到
1.7.2 使用代碼測試
分別啟動兩個項目器腋,可以在管理界面看到創(chuàng)建的兩個客戶端
調(diào)用發(fā)布消息接口發(fā)布消息
消費者控制臺打印
客戶端斷線消息恢復(fù)
把消費者與服務(wù)端斷開連接
再調(diào)用發(fā)布消息接口發(fā)送兩條消息到topic1,然后再把消費者連接到服務(wù)端
控制臺沒有東西打印
修改消費者客戶端配置钩杰,把setCleanSession改為false
重啟項目纫塌,把消費者客戶端斷開連接,調(diào)用發(fā)布消息接口發(fā)布兩條消息讲弄,再把消費者和服務(wù)端連接上