使用nodejs 開發(fā)websocket消息分發(fā)系統(tǒng)
websocket使用事件觸發(fā)機(jī)制傳輸密集傳輸數(shù)據(jù)钞护,跟nodejs的事件機(jī)制相同懂拾,比起早期comet技術(shù)使用的http響應(yīng)式更為高效惩激,客戶端不用重復(fù)的與服務(wù)端握手更加節(jié)約服務(wù)器性能缘滥。
1眶明、在客戶端調(diào)用
socket事件
message 消息接收
open 與服務(wù)器握手完成后觸發(fā)相對(duì)與服務(wù)端的upgrade事件
close socket關(guān)閉
error 錯(cuò)誤消息
ws.send 發(fā)送消息艾扮,消息內(nèi)容必須為字符串或buffer
let url = "ws://127.0.0.1:3000/socket?id="+id+"&name="+name+"&gid="+teamid;
let ws= new WebSocket(url);
ws.addEventListener("message",this.message.bind(this));
ws.addEventListener("open",this.open.bind(this));
ws.addEventListener("close",this.close.bind(this));
ws.addEventListener("error",this.error.bind(this));
ws.send("消息發(fā)送");
2既琴、服務(wù)端接收調(diào)用
在服務(wù)端使用koa-websocket
其響應(yīng)原理為客戶端請(qǐng)求后,服務(wù)端切換響應(yīng)為websocket栏渺,在upgrade事件中
1呛梆、收到客戶端請(qǐng)求頭
upgrade:websocket
connection:upgrade
Sec-WebSocket-Key:xxxx==
2、服務(wù)端使用258EAFA5-E914-47DA-95CA-C5AB0DC85B11通過sha1散列算法合并key手動(dòng)修改Sec-WebSocket-Key 返回客戶端完成
3磕诊、upgrade事件參數(shù)req,socket,upgradehead,socket.setNoDelay(true)
socket.write(修改過端返回頭)
websocket = new WebSocfket()
websocket.setSocket(socket) 創(chuàng)建websocket鏈接
message響應(yīng)客戶端事件 ws.send() 發(fā)送客戶端事件
let ws = ctx.websocket;
ws.on("message",this.EVNET_MESSAGE.bind(this));
ws.on("error",this.EVENT_ERROR.bind(this));
ws.on("close",this.EVENT_CLOSE.bind(this));
ws.on("timeout",this.EVENT_ERROR.bind(this));
3填物、在后期開發(fā)中加入了redis與mssql 來進(jìn)行數(shù)據(jù)存儲(chǔ)操作
因?yàn)閞edis自己有重連機(jī)制纹腌,不需要在處理重新連接,與sql數(shù)據(jù)庫操作不同滞磺,不用重復(fù)端去調(diào)用初始化升薯,此處為坑
this.client = redis.createClient({
host:host,port:port,
retry_strategy(options){
if (options.error.code === 'ECONNREFUSED') {
console.log('連接被拒絕');
Log.error('連接被拒絕');
}
if (options.times_connected > 10) {
console.log('重試連接超過十次');
Log.error('重試連接超過十次');
}
return Math.max(options.attempt * 100, 3000);
}
});
4、mssql
使用 new sql.ConnectionPool(this.config).connect();創(chuàng)建鏈接
在查詢完成后sql.close();來關(guān)閉鏈接击困,跟api上說明使用pool.close()不同涎劈,pool會(huì)造成二次連接數(shù)據(jù)庫失敗。此處為坑
const sql = require('mssql');
class xx{
connect(){ //創(chuàng)建鏈接
return new sql.ConnectionPool(this.config).connect();
}
async query(str){ //查詢數(shù)據(jù)
console.log("查詢或入庫")
let pool;
let result;
try{
pool = await this.connect()
result = await pool.request().query(str)
}catch(e){
console.log("數(shù)據(jù)庫鏈接錯(cuò)誤:"+e.message);
Log.error("數(shù)據(jù)庫鏈接錯(cuò)誤:"+e.message);
return Promise.reject(e.message).catch(str=>{
this.emit("DBError",str);
})
}
let rows = result.recordset;
let data=[]
if(result.recordsets.length){
data = result.recordsets[0];
}
sql.close();
return Promise.resolve(toJson(data))
}
}