一、MQTT簡介
??MQTT(Message Queuing Telemetry Transport讥裤,消息隊列遙測傳輸)是IBM開發(fā)的一個即時通訊協(xié)議己英,有可能成為物聯(lián)網(wǎng)的重要組成部分损肛。該協(xié)議支持所有平臺治拿,幾乎可以把所有聯(lián)網(wǎng)物品和外部連接起來劫谅,被用來當(dāng)做傳感器和制動器(比如通過Twitter讓房屋聯(lián)網(wǎng))的通信協(xié)議。
主要特征:
MQTT協(xié)議是為大量計算能力有限未檩,且工作在低帶寬冤狡、不可靠的網(wǎng)絡(luò)的遠(yuǎn)程傳感器和控制設(shè)備通訊而設(shè)計的協(xié)議挎峦,它具有以下主要的幾項特性:
1.使用發(fā)布/訂閱消息模式坦胶,提供一對多的消息發(fā)布顿苇,解除應(yīng)用程序耦合纪岁;
2.對負(fù)載內(nèi)容屏蔽的消息傳輸;
3.使用TCP/IP 提供網(wǎng)絡(luò)連接西壮;
4.有三種消息發(fā)布服務(wù)質(zhì)量:
“至多一次”贡定,消息發(fā)布完全依賴底層 TCP/IP 網(wǎng)絡(luò)蚓耽。會發(fā)生消息丟失或重復(fù)。這一級別可用于如下情況,環(huán)境傳感器數(shù)據(jù)答姥,丟失一次讀記錄無所謂,因為不久后還會有第二次發(fā)送敲长。
“至少一次”泽铛,確保消息到達(dá)盔腔,但消息重復(fù)可能會發(fā)生。
“只有一次”,確保消息到達(dá)一次盐杂。這一級別可用于如下情況,在計費系統(tǒng)中,消息重復(fù)或丟失會導(dǎo)致不正確的結(jié)果漩勤。
5.小型傳輸拆祈,開銷很小(固定長度的頭部是 2 字節(jié)),協(xié)議交換最小化,以降低網(wǎng)絡(luò)流量坪蚁;
二贱田、Maven依賴
<!--web-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<!--mqtt-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
<!--lombok-->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
三耗拓、apollo服務(wù)器
- 配置文件(application.properties)
#MQTT配置信息
spring.mqtt.username=admin
spring.mqtt.password=password
## MQTT-服務(wù)器連接地址竿刁,如果有多個副编,用逗號隔開,
# 如:tcp://127.0.0.1:61613带污,tcp://192.168.2.133:61613
spring.mqtt.url=tcp://192.168.77.132:61613
## MQTT-連接服務(wù)器默認(rèn)客戶端ID
spring.mqtt.client.id=clientId
## MQTT-連接服務(wù)器默認(rèn)服務(wù)端ID
spring.mqtt.server.id=serverId
## MQTT-默認(rèn)的消息推送主題充易,實際可在調(diào)用接口時指定
spring.mqtt.default.topic=topic
- 消息發(fā)送配置類
@Configuration
@IntegrationComponentScan
@Slf4j
public class MqttSenderConfig {
@Value("${spring.mqtt.username}")
private String username;
@Value("${spring.mqtt.password}")
private String password;
@Value("${spring.mqtt.url}")
private String hostUrl;
@Value("${spring.mqtt.client.id}")
private String clientId;
@Value("${spring.mqtt.default.topic}")
private String defaultTopic;
/**
* 連接設(shè)置
* @return
*/
@Bean
public MqttConnectOptions getMqttConnectOptions(){
MqttConnectOptions mqttConnectOptions=new MqttConnectOptions();
mqttConnectOptions.setUserName(username);
mqttConnectOptions.setPassword(password.toCharArray());
mqttConnectOptions.setServerURIs(new String[]{hostUrl});
mqttConnectOptions.setKeepAliveInterval(2);
return mqttConnectOptions;
}
/**
* 客戶端工廠
* @return
*/
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
factory.setConnectionOptions(getMqttConnectOptions());
return factory;
}
/**
* 發(fā)布通知
* @return
*/
@Bean
@ServiceActivator(inputChannel = "mqttOutboundChannel")
public MessageHandler mqttOutbound() {
MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(clientId, mqttClientFactory());
messageHandler.setAsync(true);
messageHandler.setDefaultTopic(defaultTopic);
return messageHandler;
}
/**
* 發(fā)布通道為直連
* @return
*/
@Bean
public MessageChannel mqttOutboundChannel() {
return new DirectChannel();
}
}
- 消息發(fā)送接口
/**
* 消息發(fā)送接口
*/
@Component
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MsgWriter {
void sendToMqtt(String data);
void sendToMqtt(String payload,@Header(MqttHeaders.TOPIC) String topic);
void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);
}
消息發(fā)送接口,需要發(fā)送消息的時候直接調(diào)用就行了,提供了幾個重載方法payload或者data是發(fā)送消息的內(nèi)容
topic是消息發(fā)送的主題,這里可以自己靈活定義,也可以使用默認(rèn)的主題,就是配置文件的主題,qos是mqtt 對消息處理的幾種機(jī)制分為0,1,2 其中0表示的是訂閱者沒收到消息不會再次發(fā)送,消息會丟失,1表示的是會嘗試重試,一直到接收到消息,但這種情況可能導(dǎo)致訂閱者收到多次重復(fù)消息,2相比多了一次去重的動作,確保訂閱者收到的消息有一次
當(dāng)然,這三種模式下的性能肯定也不一樣,qos=0是最好的,2是最差的 。
- 測試
@RestController
public class MqttController {
@Autowired
private MsgWriter msgWriter;
@RequestMapping("/send")
public String sendMqtt(String sendData){
msgWriter.sendToMqtt(sendData,"hello");
return "OK";
}
}
- 消息消費(本測試用的是同一項目盹靴,建議創(chuàng)建單獨消費項目進(jìn)行測試)
@Configuration
@IntegrationComponentScan
@Slf4j
public class MqttConsumersConfig {
// 訂閱的主題可以指定,我訂閱的是剛才發(fā)的too主題,還有訂閱方的id 別和發(fā)送方的id 一樣
@Value("${spring.mqtt.server.id}")
private String serverId;
/**
* 使用MqttSenderConfig中生成的工廠對象炸茧。
* 如果單獨服務(wù)器瑞妇,請使用以下@Bean代碼。
*/
@Autowired
private MqttPahoClientFactory mqttClientFactory;
// @Bean
// public MqttPahoClientFactory mqttClientFactory() {
// DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
// factory.setServerURIs("tcp://localhost:61613");
// factory.setUserName("admin");
// factory.setPassword("password");
// return factory;
// }
/**
* consumer 訂閱者監(jiān)聽消息
* @return
*/
@Bean
public IntegrationFlow mqttInFlow() {
return IntegrationFlows.from(mqttInbound())
.transform(p -> p + ", 收到消息梭冠,來自MQTT")
.handle(logger())
.get();
}
/**
* 處理日志
* @return
*/
private LoggingHandler logger() {
LoggingHandler loggingHandler = new LoggingHandler("INFO");
loggingHandler.setLoggerName("siSample");
return loggingHandler;
}
/**
* 訂閱主題
* @return
*/
@Bean
public MessageProducerSupport mqttInbound() {
/**
* 訂閱的主題可以指定,我訂閱的是剛才發(fā)的too主題,還有訂閱方的id 別和發(fā)送方的id 一樣
*/
MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(serverId,
mqttClientFactory, "hello");
adapter.setCompletionTimeout(5000);
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setQos(1);
return adapter;
}
}
注意:
主題名稱生產(chǎn)者與消費者一定要對應(yīng)辕狰,否則取不到消息 。
四控漠、EMQ服務(wù)器
- 配置
# MQTT-密碼
# MQTT-服務(wù)器連接地址蔓倍,如果有多個,用逗號隔開盐捷,如:tcp://127.0.0.1:1883偶翅,tcp://192.168.2.133:61613
spring.mqtt.url=tcp://192.168.77.132:1883
# MQTT-連接服務(wù)器默認(rèn)客戶端ID
spring.mqtt.client.id=mqttId
# MQTT-連接服務(wù)器默認(rèn)服務(wù)端ID
spring.mqtt.server.id=serverId
# MQTT-默認(rèn)的消息推送主題,實際可在調(diào)用接口時指定
spring.mqtt.default.topic=topic
spring.mqtt.username=admin
spring.mqtt.password=public
- 代碼
與apollo服務(wù)器相同碉渡。 -
測試結(jié)果
測試方法同apollo服務(wù)器聚谁。
測試結(jié)果