mqtt是什么椿浓?
MQTT 是一種基于發(fā)布/訂閱模式的 輕量級物聯(lián)網(wǎng)消息傳輸協(xié)議 ,可在嚴(yán)重受限的硬件設(shè)備和低帶寬德玫、高延遲的網(wǎng)絡(luò)上實(shí)現(xiàn)穩(wěn)定傳輸匪蟀。
后端集成mqtt客戶端-生產(chǎn)者
- pom
<!-- mqtt-->
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.5</version>
</dependency>
<!--mqtt相關(guān)依賴-->
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
-
發(fā)布者
public class PublishSample { public static void main(String[] args) { //為了讓前端js 接受到消息只能先websocket 協(xié)議 String broker = "ws://broker.emqx.io:8083/mqtt"; // String broker = "tcp://broker.emqx.io:1883"; String topic = "topic-123"; String clientid = "publish_clien"; String content = "各位好椎麦!"; int qos = 0; try { MqttClient client = new MqttClient(broker, clientid, new MemoryPersistence()); // 連接參數(shù) MqttConnectOptions options = new MqttConnectOptions(); options.setConnectionTimeout(60); options.setKeepAliveInterval(60); // 連接 client.connect(options); // 創(chuàng)建消息并設(shè)置 QoS // MqttMessage message = new MqttMessage(content.getBytes()); // message.setQos(qos); // 發(fā)布消息 // client.publish(topic, message); // System.out.println("Message published"); // System.out.println("topic: " + topic); // System.out.println("message content: " + content); //模擬產(chǎn)生日志 for (int i = 0; i < 50; i++) { try { TimeUnit.MILLISECONDS.sleep(100L); } catch (InterruptedException e) { e.printStackTrace(); } String msg = "發(fā)送【" + i + "】條日志信息-" + UUID.randomUUID().toString(); MqttMessage message = new MqttMessage(msg.getBytes()); message.setQos(qos); client.publish(topic, message); System.out.println("Message published"); System.out.println("topic: " + topic); System.out.println("message content: " + content); } // 關(guān)閉連接 client.disconnect(); // 關(guān)閉客戶端 client.close(); } catch (MqttException e) { throw new RuntimeException(e); } } }
前端集成mqtt客戶端-消費(fèi)者
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>mqtt</title>
<style>
#txt {
border: 1px solid;
width: 800px;
height: 500px;
overflow-y: scroll;
}
</style>
</head>
<body>
<h5>日志實(shí)時展示</h5>
<div id="txt">
</div>
</body>
<script src="https://cdn.bootcdn.net/ajax/libs/mqtt/4.1.0/mqtt.min.js"></script>
<script>
// Broker: broker.emqx.io
// TCP Port: 1883
// Websocket Port: 8083
const connectUrl = `ws://broker.emqx.io:8083/mqtt`;
// const connectUrl = `ws://broker.emqx.io/mqtt`;
client = mqtt.connect(connectUrl, {
clean: true,
connectTimeout: 4000,
reconnectPeriod: 1000,
clientId: 'emqx_test',
username: 'emqx_test',
password: 'emqx_test'
})
// 需要訂閱的主題
const topic = 'topic-123';
//成功連接后觸發(fā)的回調(diào)
client.on('connect', () => {
console.log('已經(jīng)連接成功');
// 這里可以訂閱多個主題
client.subscribe([topic], () => {
console.log(`訂閱了主題 ${topic}`)
})
});
// 當(dāng)客戶端收到一個發(fā)布過來的消息時觸發(fā)回調(diào)
client.on('message', function (topic, message, packet) {
// 這里有可能拿到的數(shù)據(jù)格式是Uint8Array格式,所以可以直接用toString轉(zhuǎn)成字符串
// let data = JSON.parse(message.toString);
// var s = JSON.stringify(message.toString());
console.log("返回的數(shù)據(jù):", message.toString())
// console.log("返回的數(shù)據(jù)2:", s)
//將字節(jié)數(shù)組轉(zhuǎn)換 成 普通 字符串 utf-8編碼
var blob = new Blob([message]);
var fileReader = new FileReader();
fileReader.onload = function (event) {
var result = event.target.result;
console.log("解析收到消息:" + result)
//渲染到頁面上
var txtDiv = document.querySelector("#txt");
var p = document.createElement("p");
p.textContent = `${result}`;
txtDiv.appendChild(p);
}
fileReader.readAsText(blob)
});
// 連接斷開后觸發(fā)的回調(diào)
client.on("close", function () {
console.log("已斷開連接")
});
</script>
</html>