1. 安裝mqtt
npm i mqtt
2.封裝
- 新建NotifyMessage.ts 消息返回的類型
export default class NotifyMessage{
type: string
source: string
data: any
timestamp?: string
constructor(type: string, source: string, data: any) {
this.type = type;
this.source = source;
this.data = data;
}
}
- 新建mqtt.ts 創(chuàng)建mqtt類 (連接、訂閱涧黄、取消訂閱绑洛、收到消息)
import * as mqtt from "mqtt"
import {ISubscriptionGrant, MqttClient} from "mqtt/types/lib/client";
import {IConnackPacket, IDisconnectPacket, IPublishPacket} from "mqtt-packet";
import NotifyMessage from "./NotifyMessage";
export class Mqtt {
host: string
port: number
username: string
password: string
path: string
client?: MqttClient
topicMap = new Map<string, Set<((message: any, type?:string) => void)>>()
constructor(host:string, port:number, username:string, password:string, path: string) {
this.host = host
this.port = port
this.username = username
this.password = password
this.path = path
this.connect()
}
connect() {
this.client = mqtt.connect({
host: this.host,
port: this.port,
hostname: this.host,
username: this.username,
password: this.password,
path: this.path,
protocol: location.protocol === "http:" ? "ws" : "wss",
keepalive: 15,
})
this.client!.once('connect', (packet) => this.onConnect(packet))
this.client!.on('reconnect', () => console.debug(process.env.VUE_APP_name + " mq reconnect."))
this.client!.on('disconnect', (packet: IDisconnectPacket) => console.debug(process.env.VUE_APP_name + " mq disconnect.", packet))
this.client!.on('close', () => console.debug(process.env.VUE_APP_name + " mq close."))
}
onConnect(p: IConnackPacket) {
console.debug(process.env.VUE_APP_name + " connect to mq success. ", p)
this.client!.removeAllListeners('message')
this.client!.on('message', (topic, payload, packet) => this.onMessage(topic, payload.toString(), packet))
if(Array.from(this.topicMap.keys()).length > 0) {
this.client!.subscribe(Array.from(this.topicMap.keys()), (err: Error | null, granted: ISubscriptionGrant[]) => {
if(err) {
console.error(process.env.VUE_APP_name + " subscribe to mq error: ", err, granted)
} else {
console.debug(process.env.VUE_APP_name + " subscribe to mq granted: ", granted)
}
})
}
}
onMessage(topic: string, payload: string, packet: IPublishPacket) {
try{
let notify = JSON.parse(payload) as NotifyMessage
console.debug(notify)
let topicCBList = this.topicMap.get(topic);
if(topicCBList) {
topicCBList.forEach((f) => {
try {
f(notify.data, notify.type)
} catch (e) {
console.debug(process.env.VUE_APP_name + " process topic cb fun error.", e)
}
})
} else {
console.debug(process.env.VUE_APP_name + " can't find subscribe callback.")
}
} catch (e) {
}
}
subscribe(topic:string, callback:(message: any, type?:string) => void ) {
if(!this.topicMap.has(topic)) {
this.client?.subscribe(topic, (err: Error | null, granted: ISubscriptionGrant[]) => {
if(err) {
console.error(process.env.VUE_APP_name + " subscribe to mq error: ", err, granted)
} else {
console.debug(process.env.VUE_APP_name + " subscribe to mq granted: ", granted)
}
})
this.topicMap.set(topic, new Set())
}
this.topicMap.get(topic)?.add(callback)
}
unsubscribe(topic:string, callback?:(message:any) => void) {
if(this.topicMap.has(topic)) {
if(callback) {
this.topicMap.get(topic)?.delete(callback)
} else {
this.topicMap.get(topic)?.clear()
}
if(this.topicMap.get(topic)!.size <= 0) {
this.topicMap.delete(topic)
this.client?.unsubscribe(topic)
}
}
}
}
- 創(chuàng)建config.json 全局配置數據 --- 這里默認配置mqtt
{
"logoConfig": {
"systemTitle": "管理系統(tǒng)"
},
"mqttConfig": {
"host": "",
"port": 0,
"username": "admin",
"password": "admin",
"path": "/activemq",
"use": true
}
}
- 創(chuàng)建global/customConfig.ts (mqtt配置的類型)
export class CustomConfig {
mqttConfig?: MqttConfig
logoConfig?: LogoConfig
constructor(obj:CustomConfig) {
Object.assign(this, obj)
if(obj.mqttConfig) {
this.mqttConfig = new MqttConfig(obj.mqttConfig)
}
}
}
export class MqttConfig {
host:string = ""
port:number = 61614
username:string = "admin"
password:string = "admin"
path:string = "/activemq"
use:boolean = false
constructor(obj:MqttConfig) {
Object.assign(this, obj)
}
getHost():string {
return this.host === "" ? document.location.hostname : this.host
}
getPort():number {
return this.port === 0 ? parseInt(document.location.port) : this.port
}
}
export class LogoConfig {
systemTitle: string = "融合通信管理系統(tǒng)"
}
- 創(chuàng)建global/index.ts (初始化mqtt方法并連接)
import axios from "axios";
import {Mqtt} from "@/mqtt/mqtt";
import {CustomConfig} from "@/global/customConfig";
export let customConfig: CustomConfig
export let mqttInstants: Mqtt
export function InitCustomConfig() {
return new Promise<CustomConfig>((resolve, reject) => {
if(customConfig != null) {
resolve(customConfig)
}
axios.get(process.env.VUE_APP_customConfigPath as string)
.then(res => {
if(res.status == 200) {
customConfig = res.data
customConfig = new CustomConfig(res.data)
resolve(customConfig)
} else {
reject(new Error("get custom config failed!"))
}
})
.catch( err => {
reject(err)
})
})
}
export function InitMqtt() {
let mqttConfig = customConfig.mqttConfig
if(mqttConfig) {
if(mqttConfig.use) {
mqttInstants = new Mqtt(mqttConfig.getHost(), mqttConfig.getPort(),
mqttConfig.username, mqttConfig.password, mqttConfig.path)
}
} else {
console.error("customConfig.mqttConfig is null.")
}
}
...
import {InitCustomConfig, InitMqtt} from "@/global";
// 將自動注冊所有組件為全局組件
import dataV from '@jiaminghi/data-view'
InitCustomConfig().then(config => {
const app = createApp(App)
document.title = config.logoConfig?.systemTitle ?? ""
app.config.globalProperties.$customConfig = config
app.use(router)
app.use(store)
app.use(dataV)
app.use(ElementPlus, {
locale: process.env.VUE_APP_LANGUAGE == 'en' ? en : zhCn,
})
app.use(VueI18n)
//注冊所有element plus圖標
for (const [key, component] of Object.entries(ElementPlusIconsVue)) {
app.component(key, component)
}
InitMqtt()
...
app.mount('#app')
})
使用
<template>
</template>
<script lang='ts' setup>
import { ref, onMounted, onUnmounted } from 'vue'
import { mqttInstants } from '@/global';
const onMessage = (msg:any,type?:string) => {
console.log('消息:',msg)
}
const mqtopic = '/omc-server/present/alarm'
onMounted(()=>{
// 訂閱
mqttInstants.subscribe(mqtopic,onMessage)
})
onUnmounted(()=>{
// 取消訂閱
mqttInstants.unsubscribe(mqtopic,onMessage)
})
</script>