import { Injectable } from '@angular/core';
import { Paho } from 'ng2-mqtt/mqttws31';
import { Observable, ReplaySubject } from 'rxjs';
export class TopicDefinition {
name: string;
option?: any;
}
@Injectable({
providedIn: 'root'
})
export class MqttService {
private client: Paho.MQTT.Client;
private topics: string[];
private payload: ReplaySubject<Paho.MQTT.Message>;
private connection: ReplaySubject<boolean>;
public status: boolean;
private config: any;
constructor() {
this.topics = [];
this.payload = new ReplaySubject<Paho.MQTT.Message>(1);
this.connection = new ReplaySubject<boolean>(1);
}
//constructor(host: string, clientId: string);
//constructor(host: string, port: number, clientId: string);
//constructor(host: string, port: number, path: string, clientId: string);
public initMqttClient(host: string, port: number, clientID?: string): Promise<boolean> {
clientID = clientID ? clientID : this.generateUUID();
//創(chuàng)建客戶端實(shí)例
this.client = new Paho.MQTT.Client(host, port, '', clientID);
// 建立連線,連接成功返回true,否則返回false
return new Promise((resolve, reject) => {
// 建立連線
this.client.connect({
cleanSession: false,
onSuccess: () => {
console.log(
'%c ?? MQTT Connection Success ',
'background: #000; color: #5FBA7D; line-height: 26px;'
);
this.status = true;
this.connection.next(true);
resolve(true);
}
});
// 無法連接或斷線時觸發(fā)
this.client.onConnectionLost = async (responseObject: object) => {
console.log(
'%c ? MQTT Connection Lost ',
'background: #000; color: ##E43935; line-height: 26px;'
);
this.status = false;
this.connection.next(false);
await this.reonnect();
reject(false);
};
// 當(dāng)接收到訂閱訊息時觸發(fā)
this.client.onMessageArrived = this.onMessageArrived.bind(this);
});
}
/**
* MQTT的Client Getter
*
* @method public
* @return 回傳MQTT的Client
*/
public get mqttClient(): Paho.MQTT.Client {
return this.client;
}
/**
* 監(jiān)聽MQTT連線是否斷線
*
* @method public
* @return 回傳一個Observable午阵,讓呼叫者可以監(jiān)聽MQTT連線是否斷線
*/
public listenConnection(): Observable<boolean> {
return this.connection.asObservable();
}
/**
* MQTT重新連縣
*
* @method public
*/
public reonnect(): Promise<boolean> {
return new Promise<boolean>((resolve, reject) => {
this.client.connect({
cleanSession: false,
onSuccess: () => {
console.log(
'%c ?? MQTT Connection Success ',
'background: #000; color: #5FBA7D; line-height: 26px;'
);
this.status = true;
this.connection.next(true);
resolve(true);
}
});
});
}
/**
* 將MQTT斷線
*
* @method public
*/
public disconnect(): void {
this.client.disconnect();
}
/**
* ---------------------------------------------------------------------------
* @NOTE MQTT訂閱Topic
* ---------------------------------------------------------------------------
*/
/**
* 訂閱MQTT的Topic
*
* @method public
* @param topic 訂閱的Topics
*/
public subscribeTopic(topics: TopicDefinition[]): void {
console.log('Subscribe topic:', topics);
console.log(
'%c ?? Subscribe MQTT Topics ',
'background: #000; color: #5FBA7D; line-height: 26px;'
);
if (this.topics.length === 0) {
this.topics = [];
}
topics.forEach(topic => {
this.topics.push(topic.name);
this.client.subscribe(topic.name, topic.option);
});
}
/**
* 取消訂閱所有的Topics
*
* @method public
*/
public unsubscribeAllTopics(): void {
console.log(
'%c ?? Unsubscribe MQTT All Topics ',
'background: #000; color: #5FBA7D; line-height: 26px;'
);
if (this.topics) {
this.topics.forEach(topic => {
this.client.unsubscribe(topic, null);
});
this.topics = [];
}
}
/**
* 取消訂閱MQTT的特定Topic
*
* @method public
* @param topic MQTT的特定Topic
*/
public unsubscribeTopic(topic: string): void {
console.log(
`%c ?? Unsubscribe MQTT Topics: ${topic}`,
'background: #000; color: #5FBA7D; line-height: 26px;'
);
const topicIndex = this.topics.indexOf(topic);
this.topics.splice(topicIndex, 1);
this.client.unsubscribe(topic, null);
}
/**
* 當(dāng)收到MQTT訂閱的訊息
*
* @method private
* @param payload 訂閱的訊息
*/
private onMessageArrived(payload: Paho.MQTT.Message): void {
// console.log(
// '%c ?? MQTT Message Arrvived ',
// 'background: #000; color: #5FBA7D; line-height: 26px;'
// );
const payloads: Paho.MQTT.Message = {
destinationName: payload.destinationName,
payloadBytes: payload.payloadBytes,
payloadString: this.byteToString(payload.payloadBytes),
duplicate: payload.duplicate,
retained: payload.retained,
qos: payload.qos,
};
this.payload.next(payloads);
}
/**
* 監(jiān)聽MQTT訂閱的訊息
*
* @method public
* @return 回傳一個Observable,讓呼叫者可以訂閱MQTT的訊息
*/
public listenMessage(): Observable<Paho.MQTT.Message> {
return this.payload.asObservable();
}
/**
* 將MQTT Payload Bytes轉(zhuǎn)成String
*
* @method private
* @param bytes MQTT Payload的Bytes
* @return 回傳轉(zhuǎn)換後的Payload
*/
private decodePayloads(bytes: any): string {
const result = String.fromCharCode(...bytes);
return result;
}
/**
* ---------------------------------------------------------------------------
* @NOTE 送出訊息
* ---------------------------------------------------------------------------
*/
/**
* Publish至MQTT
*
* @method public
* @param topic 目的地Topic
* @param value 要送出的數(shù)據(jù)
*/
public send(topic: string, value: any): void {
const payloads = new Paho.MQTT.Message(value);
payloads.destinationName = topic;
this.client.send(payloads);
}
/**
* ---------------------------------------------------------------------------
* @NOTE UUID
* ---------------------------------------------------------------------------
*/
/**
* 產(chǎn)生UUID
*
* @method public
* @return 回傳UUID
*/
public generateUUID(): string {
return this.generateS4() + this.generateS4() + '-' + this.generateS4() + '-'
+ this.generateS4() + '-' + this.generateS4() + '-' + this.generateS4() +
this.generateS4() + this.generateS4();
}
/**
* 產(chǎn)生16位隨機(jī)碼
* @return 回傳隨機(jī)碼
*/
private generateS4(): string {
return Math.floor((1 + Math.random()) * 0x10000).toString(16).substring(1);
}
byteToString(arr): string {
if (typeof arr === 'string') {
return arr;
}
let str = '';
// tslint:disable-next-line:variable-name
const _arr = arr;
for (let i = 0; i < _arr.length; i++) {
// tslint:disable-next-line:one-variable-per-declaration
const one = _arr[i].toString(2),
v = one.match(/^1+?(?=0)/);
if (v && one.length === 8) {
const bytesLength = v[0].length;
let store = _arr[i].toString(2).slice(7 - bytesLength);
for (let st = 1; st < bytesLength; st++) {
store += _arr[st + i].toString(2).slice(2);
}
str += String.fromCharCode(parseInt(store, 2));
i += bytesLength - 1;
} else {
str += String.fromCharCode(_arr[i]);
}
}
return str;
}
InitMessage(msg): { evt_tp: any, value: any } {
try {
const rawMsg = JSON.parse(msg.payloadString);
// console.log(rawMsg.evt_data.value);
return { evt_tp: rawMsg.evt_tp, value: rawMsg.evt_data.value };
} catch (e) {
console.log(e);
}
}
InitMessageforlist(msg): { linename: any,stationname:any, value: any } {
try {
const rawMsg = JSON.parse(msg.payloadString);
return { linename: rawMsg.evt_data['line'],stationname:rawMsg.evt_data['station'], value: rawMsg.evt_data.value[0] };
} catch (e) {
console.log(e);
}
}
}
MQTT services
最后編輯于 :
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
- 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來吹泡,“玉大人骤星,你說我怎么就攤上這事”疲” “怎么了洞难?”我有些...
- 文/不壞的土叔 我叫張陵,是天一觀的道長揭朝。 經(jīng)常有香客問我队贱,道長,這世上最難降的妖魔是什么萝勤? 我笑而不...
- 正文 為了忘掉前任露筒,我火速辦了婚禮,結(jié)果婚禮上敌卓,老公的妹妹穿的比我還像新娘。我一直安慰自己伶氢,他們只是感情好趟径,可當(dāng)我...
- 文/花漫 我一把揭開白布瘪吏。 她就那樣靜靜地躺著,像睡著了一般蜗巧。 火紅的嫁衣襯著肌膚如雪掌眠。 梳的紋絲不亂的頭發(fā)上,一...
- 文/蒼蘭香墨 我猛地睜開眼鸥跟,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了盔沫?” 一聲冷哼從身側(cè)響起医咨,我...
- 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎架诞,沒想到半個月后拟淮,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
- 正文 獨(dú)居荒郊野嶺守林人離奇死亡谴忧,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
- 正文 我和宋清朗相戀三年很泊,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片俏蛮。...
- 正文 年R本政府宣布辣恋,位于F島的核電站亮垫,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏伟骨。R本人自食惡果不足惜饮潦,卻給世界環(huán)境...
- 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望携狭。 院中可真熱鬧继蜡,春花似錦、人聲如沸。這莊子的主人今日做“春日...
- 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至碘举,卻和暖如春忘瓦,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背引颈。 一陣腳步聲響...
- 正文 我出身青樓凌停,卻偏偏與公主長得像,于是被迫代替她去往敵國和親李丰。 傳聞我的和親對象是個殘疾皇子苦锨,可洞房花燭夜當(dāng)晚...
推薦閱讀更多精彩內(nèi)容
- 問題 早上使用SC命令部署Service,啟動服務(wù)一直報(bào)The **** service on Local Com...
- composer 自己的 git 倉庫出現(xiàn)問題: composer update "hdll/services:d...
- 協(xié)議就是通信雙方的一個約定趴泌,即舟舒,表示第1位傳輸?shù)氖裁础⒌?位傳輸?shù)氖裁础茹尽T贛QTT協(xié)議中秃励,一個MQTT數(shù)據(jù)包由...
- 如果不了解MQTT的可以看這篇文章http://www.cnblogs.com/yangfengwu/p/7764...
- 在項(xiàng)目開發(fā)過程中呐舔,由于需要數(shù)據(jù)的實(shí)時推送币励,但是在實(shí)際過程中,用到了MQtt的方法珊拼,搜遍網(wǎng)上 那個的講解食呻,...