以中間件栈源,路由区转,跨進程事件的姿勢使用WebSocket--Node.js篇

上一篇文章介紹了在瀏覽器端以中間件匹摇,路由婆跑,跨進程事件的姿勢使用原生WebSocket。這篇文章將介紹如何使用Node.js以相同的編程模式來實現(xiàn)WebSocket服務端魏割。

Node.js中比較流行的兩個WebSocket庫分別是socket.iows譬嚣。其中socket.io已經(jīng)實現(xiàn)了跨進程事件,廣播钞它,群發(fā)等功能拜银,并且服務端與瀏覽器端是配套的,在不支持WebSocket技術(shù)的瀏覽器會降級為使用ajax輪詢须揣。所以盐股。這里選擇使用相對而言較為底層或原始的ws,在其基礎(chǔ)上實現(xiàn)文章標題所提到的編程模式。

WS

使用ws簡簡單單就可以啟動一個WebSocket服務:

const WebSocket = require('ws');

const wss = new WebSocket.Server({ port: 8080 });

wss.on('connection', function connection(ws) {
  ws.on('message', function incoming(message) {
    console.log('received: %s', message);
  });

  ws.send('something');
});

上面的wss支持的事件有connection,close,error,headers,listening,ws支持的事件有message,close,error耻卡。更多詳情可以這里疯汁。

中間件

對ws進行封裝,上面提到的事件:WSS的connection卵酪,ws的message,close,error幌蚊。分別提供注冊中間件的接口

class EasySocket {
    constructor() {
        this.connectionMiddleware = [];
        this.closeMiddleware = [];
        this.messageMiddleware = [];
        this.errorMiddleware = [];

        this.connectionFn = Promise.resolve();
        this.closeFn = Promise.resolve();
        this.messageFn = Promise.resolve();
        this.errorFn = Promise.resolve();
    }
    connectionUse(fn, runtime) {
        this.connectionMiddleware.push(fn);
        if (runtime) {
            this.connectionFn = compose(this.connectionMiddleware);
        }
        return this;
    }
    closeUse(fn, runtime) {
        this.closeMiddleware.push(fn);
        if (runtime) {
            this.closeFn = compose(this.closeMiddleware);
        }
        return this;
    }
    messageUse(fn, runtime) {
        this.messageMiddleware.push(fn);
        if (runtime) {
            this.messageFn = compose(this.messageMiddleware);
        }
        return this;
    }
    errorUse(fn, runtime) {
        this.errorMiddleware.push(fn);
        if (runtime) {
            this.errorFn = compose(this.errorMiddleware);
        }
        return this;
    }
    
}

通過xxxUse注冊相應的中間件。 xxxMiddleware中就是相應的中間件溃卡。xxxFn是中間件通過compose處理后的結(jié)構(gòu)溢豆。使用runtime參數(shù)可以在運行時注冊中間件。

再添加一個listen方法瘸羡,處理相應的中間件并且實例化WebSocket.Server

listen(config) {
        this.socket = new WebSocket.Server(config);
        this.connectionFn = compose(this.connectionMiddleware);
        this.messageFn = compose(this.messageMiddleware);
        this.closeFn = compose(this.closeMiddleware);
        this.errorFn = compose(this.errorMiddleware);
        this.socket.on('connection', (client, req) => {
            let context = { server: this, client, req };
            this.connectionFn(context).catch(error => { console.log(error) });

            client.on('message', (message) => {
                let req;
                try {
                    req = JSON.parse(message);
                } catch (error) {
                    req = message;
                }
                let messageContext = { server: this, client, req }
                this.messageFn(messageContext).catch(error => { console.log(error) })
            });

            client.on('close', (code, message) => {
                let closeContext = { server: this, client, code, message };
                this.closeFn(closeContext).catch(error => { console.log(error) })
            });

            client.on('error', (error) => {
                let errorContext = { server: this, client, error };
                this.errorFn(errorContext).catch(error => { console.log(error) })
            });
        })
    }

使用koa-compose模塊處理中間件漩仙。注意xxContext傳入了哪些東西,后續(xù)定義中間件的時候都可以使用犹赖。

compose的作用可看這篇文章 傻瓜式解讀koa中間件處理模塊koa-compose

使用:

import EasySocket from 'easy-socket-node';

const config = {
    port: 3001,
    perMessageDeflate: {
        zlibDeflateOptions: { // See zlib defaults.
            chunkSize: 1024,
            memLevel: 7,
            level: 3,
        },
        zlibInflateOptions: {
            chunkSize: 10 * 1024
        },
        // Other options settable:
        clientNoContextTakeover: true, // Defaults to negotiated value.
        serverNoContextTakeover: true, // Defaults to negotiated value.
        //clientMaxWindowBits: 10,       // Defaults to negotiated value.
        serverMaxWindowBits: 10,       // Defaults to negotiated value.
        // Below options specified as default values.
        concurrencyLimit: 10,          // Limits zlib concurrency for perf.
        threshold: 1024,               // Size (in bytes) below which messages
        // should not be compressed.
    }
}
const easySocket = new EasySocket();
//使用中間件獲取token
easySocket
    .connectionUse((context,next)=>{
       console.log("new Connected");
       let location = url.parse(context.req.url, true);
       let token=location.query.token;
       if(!token){
           client.send("invalid token");
           client.close(1003, "invalid token");
           return;
       }
       context.client.token=token;
       next();
    });
easySocket
    .listen(config)

console.log('Now start WebSocket server on port ' + config.port + '...')

使用messageUse可以注冊多個處理消息的中間件队他,比如

 easySocket.messageUse((context, next) => {
    //群聊處理中間件
    if (context.req.action === 'roomChatMessage') {
      //可以在這里持久化消息,將消息發(fā)送給其它群聊客戶端
      console.log(context.req);
    }
    next();
  })
  .messageUse((context, next) => {
    //私聊處理中間件
    if (context.req.action === 'privateChatMessage') {
      //可以在這里持久化消息峻村,將消息發(fā)送給私聊客戶端
      console.log(context.req);
    }
    next();
  })

每個中間件都要判斷context.req.action麸折,而這個context.res就是瀏覽器端或客戶端發(fā)送的數(shù)據(jù)。怎么消除這個頻繁的if判斷呢? 我們實現(xiàn)一個簡單的消息處理路由粘昨。

路由

定義消息路由中間件

messageRouteMiddleware.js

export default (routes) => {
    return async (context, next) => {
        if (routes[context.req.action]) {
            await routes[context.req.action](context,next);
        } else {
            console.log(context.req)
            next();
        }
    }
}

定義路由

router.js

export default {
    roomChatMessage:function(context,next){
        //可以在這里持久化消息垢啼,將消息發(fā)送給其它群聊客戶端,以及其它業(yè)務邏輯
        console.log(context.req);
        next();
    },
    privateChatMessage:function(context,next){
        //可以在這里持久化消息张肾,將消息發(fā)送給私聊客戶端芭析,以及其它業(yè)務邏輯
        console.log(context.req);
        next();
    }
}

使用:

easySocket.messageUse(messageRouteMiddleware(router))

跨進程事件

上一篇文章已經(jīng)介紹了跨進程事件,這里直接說實現(xiàn)吞瞪。

使用Node的原生事件模塊

import compose from './compose';
const WebSocket = require('ws');
var EventEmitter = require('events').EventEmitter;
export default class EasySocket extends EventEmitter {
    constructor() {
        ...
        this.remoteEmitMiddleware = [];

        ...
        this.remoteEmitFn = Promise.resolve();
    }
    ...
    remoteEmitUse(fn, runtime) {
        this.remoteEmitMiddleware.push(fn);
        if (runtime) {
            this.remoteEmitFn = compose(this.remoteEmitMiddleware);
        }
        return this;
    }
    listen(config) {
        this.socket = new WebSocket.Server(config);
        ...
        this.remoteEmitFn = compose(this.remoteEmitMiddleware);

        ...
    }
    emit(event, args, isLocal = false) { 
        let arr = [event, args];
        if (isLocal) {
            super.emit.apply(this, arr);
            return this;
        }
        let evt = {
            event: event,
            args: args
        }
        let remoteEmitContext = { server: this, event: evt };
        this.remoteEmitFn(remoteEmitContext).catch(error => { console.log(error) })
        return this;
    }
}

最后

源碼地址:easy-socket-node

基于easy-socket-nodeeasy-socket-browser一個完整例子:

index.html

<!DOCTYPE html>
<html lang="en">

<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <meta http-equiv="X-UA-Compatible" content="ie=edge">
    <title>Document</title>
</head>

<body>
</body>
<script src="https://unpkg.com/easy-socket-browser@1.1.1/lib/easy-socket.min.js"></script>
<script>
    <!DOCTYPE html>
<html lang="en">

<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <meta http-equiv="X-UA-Compatible" content="ie=edge">
    <title>Document</title>
</head>

<body>
</body>
<script src="https://unpkg.com/easy-socket-browser@1.1.1/lib/easy-socket.min.js"></script>
<script>
    var client = new EasySocket({
        name: 'demo',
        autoReconnect: true,
        pingMsg: '{"type":"event","event":"ping","args":"ping"}'//模擬emit 消息體
    });
    client.openUse((context, next) => {
        console.log("open");
        next();
    })
        .closeUse((context, next) => {
            console.log("close");
            next();
        }).errorUse((context, next) => {
            console.log("error", context.event);
            next();
        }).messageUse((context, next) => {
            if (context.res.type === 'event') {
                context.client.emit(context.res.event, context.res.args, true);
            }
            next();
        })
        .reconnectUse((context, next) => {
            console.log('正在進行重連')
            next();
        })
        .remoteEmitUse((context, next) => {
            let client = context.client;
            let event = context.event;
            if (client.socket.readyState !== 1) {
                console.log("連接已斷開");
            } else {
                client.socket.send(JSON.stringify({
                    type: 'event',
                    event: event.event,
                    args: event.args
                }));
                next();
            }
        });


    client.connect('ws://localhost:3001');
    var msg = 1;
    setInterval(() => {
        client.emit('chatMessage', msg++)
    }, 3000);
    client.on("serverMessage", (data) => {
        console.log("serverMessage:" + data)
    });

</script>

</html>

</script>

</html>

server.js

var EasySocket = require('easy-socket-node').default;
var config = {
    port: 3001,
    perMessageDeflate: {
        zlibDeflateOptions: { // See zlib defaults.
            chunkSize: 1024,
            memLevel: 7,
            level: 3,
        },
        zlibInflateOptions: {
            chunkSize: 10 * 1024
        },
        // Other options settable:
        clientNoContextTakeover: true, // Defaults to negotiated value.
        serverNoContextTakeover: true, // Defaults to negotiated value.
        //clientMaxWindowBits: 10,       // Defaults to negotiated value.
        serverMaxWindowBits: 10,       // Defaults to negotiated value.
        // Below options specified as default values.
        concurrencyLimit: 10,          // Limits zlib concurrency for perf.
        threshold: 1024,               // Size (in bytes) below which messages
        // should not be compressed.
    }
}

var remoteEmitMiddleware = (context, next) => {
    var server = context.server;
    var event = context.event;
    for (let client of server.clients.values()) {
        client.readyState == 1 && client.send(makeEventMessage(event));
    }
}
function makeEventMessage(event) {
    return JSON.stringify({
        type: 'event',
        event: event.event,
        args: event.args
    })
}
var messageRouteMiddleware = (routes) => {
    return (context, next) => {
        if (context.req.type === 'event') {
            if (routes[context.req.event]) {
                routes[context.req.event](context, next);
            } else {
                context.server.emit(context.req.event, context.req.args);//將會直接觸發(fā)remoteEmitMiddleware 中間件的調(diào)用
                next();
            }
        } else {
            next();
        }
    }
}
var router = {
    chatMessage: (context, next) => {
        var req = context.req;
        context.server.emit('serverMessage', req.args);
    }
}
var server = new EasySocket();
server
    .connectionUse((context, next) => {
        context.server.clients.set(1, context.client)
        console.log('new connection')
    })
    .closeUse((context, next) => {
        console.log('close')
    })
    .messageUse(messageRouteMiddleware(router))
    .remoteEmitUse(remoteEmitMiddleware)
    .listen(config)

console.log('Now start WebSocket server on port ' + config.port + '...')

運行過程放刨,可以停止后端服務,然后再啟動尸饺,測下心跳重連

實現(xiàn)的聊天室例子:online chat demo

聊天室前端源碼:lazy-mock-im

聊天室服務端源碼:lazy-mock

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末进统,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子浪听,更是在濱河造成了極大的恐慌螟碎,老刑警劉巖,帶你破解...
    沈念sama閱讀 206,126評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件迹栓,死亡現(xiàn)場離奇詭異掉分,居然都是意外死亡,警方通過查閱死者的電腦和手機克伊,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,254評論 2 382
  • 文/潘曉璐 我一進店門酥郭,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人愿吹,你說我怎么就攤上這事不从。” “怎么了犁跪?”我有些...
    開封第一講書人閱讀 152,445評論 0 341
  • 文/不壞的土叔 我叫張陵椿息,是天一觀的道長。 經(jīng)常有香客問我坷衍,道長寝优,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 55,185評論 1 278
  • 正文 為了忘掉前任枫耳,我火速辦了婚禮乏矾,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘迁杨。我一直安慰自己钻心,他們只是感情好,可當我...
    茶點故事閱讀 64,178評論 5 371
  • 文/花漫 我一把揭開白布仑最。 她就那樣靜靜地躺著扔役,像睡著了一般。 火紅的嫁衣襯著肌膚如雪警医。 梳的紋絲不亂的頭發(fā)上亿胸,一...
    開封第一講書人閱讀 48,970評論 1 284
  • 那天,我揣著相機與錄音预皇,去河邊找鬼侈玄。 笑死,一個胖子當著我的面吹牛吟温,可吹牛的內(nèi)容都是我干的序仙。 我是一名探鬼主播,決...
    沈念sama閱讀 38,276評論 3 399
  • 文/蒼蘭香墨 我猛地睜開眼鲁豪,長吁一口氣:“原來是場噩夢啊……” “哼潘悼!你這毒婦竟也來了律秃?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 36,927評論 0 259
  • 序言:老撾萬榮一對情侶失蹤治唤,失蹤者是張志新(化名)和其女友劉穎棒动,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體宾添,經(jīng)...
    沈念sama閱讀 43,400評論 1 300
  • 正文 獨居荒郊野嶺守林人離奇死亡船惨,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 35,883評論 2 323
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了缕陕。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片粱锐。...
    茶點故事閱讀 37,997評論 1 333
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖扛邑,靈堂內(nèi)的尸體忽然破棺而出怜浅,到底是詐尸還是另有隱情,我是刑警寧澤鹿榜,帶...
    沈念sama閱讀 33,646評論 4 322
  • 正文 年R本政府宣布海雪,位于F島的核電站,受9級特大地震影響舱殿,放射性物質(zhì)發(fā)生泄漏奥裸。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 39,213評論 3 307
  • 文/蒙蒙 一沪袭、第九天 我趴在偏房一處隱蔽的房頂上張望湾宙。 院中可真熱鬧,春花似錦冈绊、人聲如沸侠鳄。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,204評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽伟恶。三九已至,卻和暖如春毅该,著一層夾襖步出監(jiān)牢的瞬間博秫,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,423評論 1 260
  • 我被黑心中介騙來泰國打工眶掌, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留挡育,地道東北人。 一個月前我還...
    沈念sama閱讀 45,423評論 2 352
  • 正文 我出身青樓朴爬,卻偏偏與公主長得像即寒,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 42,722評論 2 345

推薦閱讀更多精彩內(nèi)容