最近做了個(gè)即時(shí)通訊的小項(xiàng)目也在這期間接觸到了MQTT這個(gè)東西,如果你能搜到這篇博客钉跷,說(shuō)明你對(duì)MQTT起碼有大致了解的弥鹦,就不在這進(jìn)行介紹了。今天我們要講的是MQTT的一個(gè)node.js實(shí)現(xiàn)-----Mosca,由于本身對(duì)MQTT規(guī)范了解也不深,在這里就主要以代碼為主彬坏。
什么是Mosca
Mosca是MQTT在Node.js中的一個(gè)Broker的開(kāi)源實(shí)現(xiàn)朦促,通俗講也就是MQTT中的Server實(shí)現(xiàn)。
同時(shí)作者也維護(hù)著MQTT.js這一模塊栓始,這一模塊大家可理解為MQTT的Client實(shí)現(xiàn)务冕。而縱觀整個(gè)Node.js的module中比較有分量的也就以上兩個(gè)module.
如何實(shí)現(xiàn)即時(shí)聊天功能
沒(méi)有調(diào)研過(guò)其他的聊天軟件是怎么實(shí)現(xiàn)一對(duì)一推送的,下面簡(jiǎn)單介紹下MQTT是怎么實(shí)現(xiàn)的幻赚。MQTT是簡(jiǎn)單的發(fā)布訂閱模式禀忆,也就是說(shuō)當(dāng)一條消息發(fā)出去的時(shí)候,誰(shuí)訂閱了誰(shuí)就會(huì)受到落恼,而應(yīng)用于即時(shí)通訊的話油湖,我們就需要進(jìn)行1對(duì)1的進(jìn)行聊天,這時(shí)候我們就需要领跛,每個(gè)客戶端訂閱自己的一個(gè)專(zhuān)屬channel乏德,然后由Broker來(lái)進(jìn)行推送(Broker做的話,可以實(shí)現(xiàn)消息過(guò)濾吠昭,離線消息等)喊括。
在這過(guò)程中也遇到了一些問(wèn)題,比如對(duì)于即時(shí)通訊來(lái)說(shuō)矢棚,客戶端應(yīng)該以什么作為自己的專(zhuān)屬channel郑什,在我接手這個(gè)項(xiàng)目之前他們訂閱的是設(shè)備號(hào),這樣其實(shí)有很多問(wèn)題蒲肋,無(wú)法實(shí)現(xiàn)多設(shè)備同時(shí)在線蘑拯。每次用戶換設(shè)備登陸,服務(wù)器端都需要將相應(yīng)的離線消息進(jìn)行轉(zhuǎn)移兜粘,耗時(shí)耗力而且一些需求無(wú)法滿足申窘,所以我覺(jué)得客戶端訂閱應(yīng)該以用戶的唯一ID為channel,這樣就實(shí)現(xiàn)了客戶端多設(shè)備在線孔轴,以及離線消息等功能剃法。
如何實(shí)現(xiàn)離線消息功能
這里所說(shuō)的離線消息是指,當(dāng)設(shè)備離線Mqtt斷開(kāi)了之后路鹰,這時(shí)候向用戶推送的數(shù)據(jù)贷洲,會(huì)在用戶下次上線的時(shí)候推給用戶。
這個(gè)地方客戶端有兩個(gè)關(guān)鍵點(diǎn):
- 客戶端
cleanSession
需要設(shè)置為false.則該鏈接便會(huì)認(rèn)為是持久連接晋柱,當(dāng)鏈接斷開(kāi)的時(shí)候优构,發(fā)送的消息便會(huì)進(jìn)行報(bào)錯(cuò),直到下次鏈接再次建立雁竞,會(huì)將這些消息發(fā)送給客戶端 -
Qos
標(biāo)志钦椭,在Mqtt標(biāo)準(zhǔn)鐘,對(duì)Qos有3種設(shè)置,在這里我們需要將Qos設(shè)置為1玉凯。更多Qos介紹势腮,請(qǐng)點(diǎn)擊
上面所說(shuō)的兩個(gè)關(guān)鍵是是客戶端需要設(shè)置的联贩。在在這里我們用到的Mosca也有需要注意的地方便是發(fā)布訂閱的模型選擇漫仆。詳情點(diǎn)擊ascoltatori。這一模塊是構(gòu)建Mosca的核心模塊之一泪幌。
我之前一同事進(jìn)行過(guò)測(cè)試盲厌,當(dāng)采用redis,mongodb作為發(fā)布訂閱模型的話會(huì)出現(xiàn)離線消息數(shù)據(jù)錯(cuò)亂的情況,經(jīng)過(guò)多次測(cè)試之后祸泪,我發(fā)現(xiàn)ZeroMQ吗浩,和RabbitMQ這種消息隊(duì)列的離線數(shù)據(jù)是完全按照發(fā)送的順序,離線發(fā)送没隘。
示例代碼
在這里我們將演示一下離線消息發(fā)送懂扼,在這里我們采用的是zeroMQ最為訂閱發(fā)布模型
Client pub
var mqtt = require('mqtt');
var client = mqtt.createClient(5112, '182.92.149.22');
//client.subscribe('presence');
var num = 0;
setInterval(function (){
client.publish('order', 'Hello mqtt ' + (num++),{qos:1, retain: true});
}, 1000);
Client Sub
var mqtt = require('mqtt');
var client = mqtt.createClient(5112, 'localhost',{clientId:'1',clean:false});
client.subscribe('test',{qos:1});
client.on('message', function (topic, message) {
console.log(message);
});
Mosca Server
var mosca = require('mosca')
var settings = {
port: 5112,
backend:{
type: 'zmq',
json: false,
zmq: require("zmq"),
port: "tcp://127.0.0.1:33333",
controlPort: "tcp://127.0.0.1:33334",
delay: 5
},
persistence:{
factory: mosca.persistence.Mongo,
url: "mongodb://localhost:27017/mosca"
}
};
var server = new mosca.Server(settings);
server.on('ready', function(){
console.log('Mosca server is up and running');
});
server.on('published', function(packet, client) {
console.log('Published', packet.payload);
});
在上面我們看到我們使用zeromq作為發(fā)布訂閱模型, mongodb作為持久化Db右蒲,你可以先將Mosca server以及 Client pub啟動(dòng)阀湿,過(guò)一段時(shí)間你再啟動(dòng)Client sub。這時(shí)候你會(huì)發(fā)現(xiàn)瑰妄,收到的消息會(huì)是從1開(kāi)始的陷嘴,,