上一篇文章介紹了在瀏覽器端以中間件匹摇,路由婆跑,跨進程事件的姿勢使用原生WebSocket。這篇文章將介紹如何使用Node.js以相同的編程模式來實現(xiàn)WebSocket服務端魏割。
Node.js中比較流行的兩個WebSocket庫分別是socket.io與ws譬嚣。其中socket.io已經(jīng)實現(xiàn)了跨進程事件,廣播钞它,群發(fā)等功能拜银,并且服務端與瀏覽器端是配套的,在不支持WebSocket技術(shù)的瀏覽器會降級為使用ajax輪詢须揣。所以盐股。這里選擇使用相對而言較為底層或原始的ws,在其基礎(chǔ)上實現(xiàn)文章標題所提到的編程模式。
WS
使用ws簡簡單單就可以啟動一個WebSocket服務:
const WebSocket = require('ws');
const wss = new WebSocket.Server({ port: 8080 });
wss.on('connection', function connection(ws) {
ws.on('message', function incoming(message) {
console.log('received: %s', message);
});
ws.send('something');
});
上面的wss支持的事件有connection
,close
,error
,headers
,listening
,ws支持的事件有message
,close
,error
耻卡。更多詳情可以這里疯汁。
中間件
對ws進行封裝,上面提到的事件:WSS的connection
卵酪,ws的message
,close
,error
幌蚊。分別提供注冊中間件的接口
class EasySocket {
constructor() {
this.connectionMiddleware = [];
this.closeMiddleware = [];
this.messageMiddleware = [];
this.errorMiddleware = [];
this.connectionFn = Promise.resolve();
this.closeFn = Promise.resolve();
this.messageFn = Promise.resolve();
this.errorFn = Promise.resolve();
}
connectionUse(fn, runtime) {
this.connectionMiddleware.push(fn);
if (runtime) {
this.connectionFn = compose(this.connectionMiddleware);
}
return this;
}
closeUse(fn, runtime) {
this.closeMiddleware.push(fn);
if (runtime) {
this.closeFn = compose(this.closeMiddleware);
}
return this;
}
messageUse(fn, runtime) {
this.messageMiddleware.push(fn);
if (runtime) {
this.messageFn = compose(this.messageMiddleware);
}
return this;
}
errorUse(fn, runtime) {
this.errorMiddleware.push(fn);
if (runtime) {
this.errorFn = compose(this.errorMiddleware);
}
return this;
}
}
通過xxxUse注冊相應的中間件。 xxxMiddleware中就是相應的中間件溃卡。xxxFn是中間件通過compose處理后的結(jié)構(gòu)溢豆。使用runtime參數(shù)可以在運行時注冊中間件。
再添加一個listen方法瘸羡,處理相應的中間件并且實例化WebSocket.Server
listen(config) {
this.socket = new WebSocket.Server(config);
this.connectionFn = compose(this.connectionMiddleware);
this.messageFn = compose(this.messageMiddleware);
this.closeFn = compose(this.closeMiddleware);
this.errorFn = compose(this.errorMiddleware);
this.socket.on('connection', (client, req) => {
let context = { server: this, client, req };
this.connectionFn(context).catch(error => { console.log(error) });
client.on('message', (message) => {
let req;
try {
req = JSON.parse(message);
} catch (error) {
req = message;
}
let messageContext = { server: this, client, req }
this.messageFn(messageContext).catch(error => { console.log(error) })
});
client.on('close', (code, message) => {
let closeContext = { server: this, client, code, message };
this.closeFn(closeContext).catch(error => { console.log(error) })
});
client.on('error', (error) => {
let errorContext = { server: this, client, error };
this.errorFn(errorContext).catch(error => { console.log(error) })
});
})
}
使用koa-compose模塊處理中間件漩仙。注意xxContext傳入了哪些東西,后續(xù)定義中間件的時候都可以使用犹赖。
compose的作用可看這篇文章 傻瓜式解讀koa中間件處理模塊koa-compose
使用:
import EasySocket from 'easy-socket-node';
const config = {
port: 3001,
perMessageDeflate: {
zlibDeflateOptions: { // See zlib defaults.
chunkSize: 1024,
memLevel: 7,
level: 3,
},
zlibInflateOptions: {
chunkSize: 10 * 1024
},
// Other options settable:
clientNoContextTakeover: true, // Defaults to negotiated value.
serverNoContextTakeover: true, // Defaults to negotiated value.
//clientMaxWindowBits: 10, // Defaults to negotiated value.
serverMaxWindowBits: 10, // Defaults to negotiated value.
// Below options specified as default values.
concurrencyLimit: 10, // Limits zlib concurrency for perf.
threshold: 1024, // Size (in bytes) below which messages
// should not be compressed.
}
}
const easySocket = new EasySocket();
//使用中間件獲取token
easySocket
.connectionUse((context,next)=>{
console.log("new Connected");
let location = url.parse(context.req.url, true);
let token=location.query.token;
if(!token){
client.send("invalid token");
client.close(1003, "invalid token");
return;
}
context.client.token=token;
next();
});
easySocket
.listen(config)
console.log('Now start WebSocket server on port ' + config.port + '...')
使用messageUse可以注冊多個處理消息的中間件队他,比如
easySocket.messageUse((context, next) => {
//群聊處理中間件
if (context.req.action === 'roomChatMessage') {
//可以在這里持久化消息,將消息發(fā)送給其它群聊客戶端
console.log(context.req);
}
next();
})
.messageUse((context, next) => {
//私聊處理中間件
if (context.req.action === 'privateChatMessage') {
//可以在這里持久化消息峻村,將消息發(fā)送給私聊客戶端
console.log(context.req);
}
next();
})
每個中間件都要判斷context.req.action麸折,而這個context.res就是瀏覽器端或客戶端發(fā)送的數(shù)據(jù)。怎么消除這個頻繁的if判斷呢? 我們實現(xiàn)一個簡單的消息處理路由粘昨。
路由
定義消息路由中間件
messageRouteMiddleware.js
export default (routes) => {
return async (context, next) => {
if (routes[context.req.action]) {
await routes[context.req.action](context,next);
} else {
console.log(context.req)
next();
}
}
}
定義路由
router.js
export default {
roomChatMessage:function(context,next){
//可以在這里持久化消息垢啼,將消息發(fā)送給其它群聊客戶端,以及其它業(yè)務邏輯
console.log(context.req);
next();
},
privateChatMessage:function(context,next){
//可以在這里持久化消息张肾,將消息發(fā)送給私聊客戶端芭析,以及其它業(yè)務邏輯
console.log(context.req);
next();
}
}
使用:
easySocket.messageUse(messageRouteMiddleware(router))
跨進程事件
上一篇文章已經(jīng)介紹了跨進程事件,這里直接說實現(xiàn)吞瞪。
使用Node的原生事件模塊
import compose from './compose';
const WebSocket = require('ws');
var EventEmitter = require('events').EventEmitter;
export default class EasySocket extends EventEmitter {
constructor() {
...
this.remoteEmitMiddleware = [];
...
this.remoteEmitFn = Promise.resolve();
}
...
remoteEmitUse(fn, runtime) {
this.remoteEmitMiddleware.push(fn);
if (runtime) {
this.remoteEmitFn = compose(this.remoteEmitMiddleware);
}
return this;
}
listen(config) {
this.socket = new WebSocket.Server(config);
...
this.remoteEmitFn = compose(this.remoteEmitMiddleware);
...
}
emit(event, args, isLocal = false) {
let arr = [event, args];
if (isLocal) {
super.emit.apply(this, arr);
return this;
}
let evt = {
event: event,
args: args
}
let remoteEmitContext = { server: this, event: evt };
this.remoteEmitFn(remoteEmitContext).catch(error => { console.log(error) })
return this;
}
}
最后
源碼地址:easy-socket-node
基于easy-socket-node與easy-socket-browser一個完整例子:
index.html
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<meta http-equiv="X-UA-Compatible" content="ie=edge">
<title>Document</title>
</head>
<body>
</body>
<script src="https://unpkg.com/easy-socket-browser@1.1.1/lib/easy-socket.min.js"></script>
<script>
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<meta http-equiv="X-UA-Compatible" content="ie=edge">
<title>Document</title>
</head>
<body>
</body>
<script src="https://unpkg.com/easy-socket-browser@1.1.1/lib/easy-socket.min.js"></script>
<script>
var client = new EasySocket({
name: 'demo',
autoReconnect: true,
pingMsg: '{"type":"event","event":"ping","args":"ping"}'//模擬emit 消息體
});
client.openUse((context, next) => {
console.log("open");
next();
})
.closeUse((context, next) => {
console.log("close");
next();
}).errorUse((context, next) => {
console.log("error", context.event);
next();
}).messageUse((context, next) => {
if (context.res.type === 'event') {
context.client.emit(context.res.event, context.res.args, true);
}
next();
})
.reconnectUse((context, next) => {
console.log('正在進行重連')
next();
})
.remoteEmitUse((context, next) => {
let client = context.client;
let event = context.event;
if (client.socket.readyState !== 1) {
console.log("連接已斷開");
} else {
client.socket.send(JSON.stringify({
type: 'event',
event: event.event,
args: event.args
}));
next();
}
});
client.connect('ws://localhost:3001');
var msg = 1;
setInterval(() => {
client.emit('chatMessage', msg++)
}, 3000);
client.on("serverMessage", (data) => {
console.log("serverMessage:" + data)
});
</script>
</html>
</script>
</html>
server.js
var EasySocket = require('easy-socket-node').default;
var config = {
port: 3001,
perMessageDeflate: {
zlibDeflateOptions: { // See zlib defaults.
chunkSize: 1024,
memLevel: 7,
level: 3,
},
zlibInflateOptions: {
chunkSize: 10 * 1024
},
// Other options settable:
clientNoContextTakeover: true, // Defaults to negotiated value.
serverNoContextTakeover: true, // Defaults to negotiated value.
//clientMaxWindowBits: 10, // Defaults to negotiated value.
serverMaxWindowBits: 10, // Defaults to negotiated value.
// Below options specified as default values.
concurrencyLimit: 10, // Limits zlib concurrency for perf.
threshold: 1024, // Size (in bytes) below which messages
// should not be compressed.
}
}
var remoteEmitMiddleware = (context, next) => {
var server = context.server;
var event = context.event;
for (let client of server.clients.values()) {
client.readyState == 1 && client.send(makeEventMessage(event));
}
}
function makeEventMessage(event) {
return JSON.stringify({
type: 'event',
event: event.event,
args: event.args
})
}
var messageRouteMiddleware = (routes) => {
return (context, next) => {
if (context.req.type === 'event') {
if (routes[context.req.event]) {
routes[context.req.event](context, next);
} else {
context.server.emit(context.req.event, context.req.args);//將會直接觸發(fā)remoteEmitMiddleware 中間件的調(diào)用
next();
}
} else {
next();
}
}
}
var router = {
chatMessage: (context, next) => {
var req = context.req;
context.server.emit('serverMessage', req.args);
}
}
var server = new EasySocket();
server
.connectionUse((context, next) => {
context.server.clients.set(1, context.client)
console.log('new connection')
})
.closeUse((context, next) => {
console.log('close')
})
.messageUse(messageRouteMiddleware(router))
.remoteEmitUse(remoteEmitMiddleware)
.listen(config)
console.log('Now start WebSocket server on port ' + config.port + '...')
運行過程放刨,可以停止后端服務,然后再啟動尸饺,測下心跳重連
實現(xiàn)的聊天室例子:online chat demo
聊天室前端源碼:lazy-mock-im
聊天室服務端源碼:lazy-mock