使用Nodejs實現(xiàn)聊天服務(wù)器

兩年前在項目中使用nodejs+socket.io+redis實現(xiàn)的聊天和推送服務(wù)器,
基本上幾百行代碼就實現(xiàn)了整個功能员串,在項目中單服務(wù)器單進(jìn)程可以跑到支持
5000人左右同時在線彬坏。

主要思路

  1. 用戶上線后蟹腾,根據(jù)用戶的userid和socket骂删,保存到一個全局的map中
  2. 發(fā)送消息時,根據(jù)對方的userid找到對應(yīng)的socket祭刚,通過socket.emit發(fā)送消息給對方
  3. 如果對方不在線袁梗,在redis中保留消息,在對方上線后推送給用戶
  4. 用戶下線后遮怜,從全局的map中刪除對應(yīng)的用戶socket
  5. 由于需要保持長連接,客戶端需要定時發(fā)心跳給服務(wù)端鸿市,所以定義了一個心跳消息,可以5分鐘發(fā)一次
  • 用戶上線
io.sockets.on('connection', function (socket) {
    var address = socket.handshake.address;
    console.log(Date() + " New connection from " + address.address + ":" + address.port);
    socket.on('login', function (userinfo) {
        userid = userinfo.myAuraId
        var address = socket.handshake.address;
        var deviceid = userinfo.deviceId
        console.log(Date() + " Login from " + address.address + ":" + address.port + " " + userid + " " + deviceid);
        old_socket = sockets[userid]
        if (old_socket && old_socket.deviceid && deviceid && old_socket.deviceid != deviceid) {
            old_socket.relogin = 1
            old_socket.emit('logout')
            console.log("logout " + old_socket.userid + " " + old_socket.deviceid)
        }

        if (old_socket && old_socket != socket) {
            old_socket.disconnect()
        }

        socket.relogin = 0
        socket.userid = userid
        socket.deviceid = deviceid
        //發(fā)送離線消息
        send_store_msg(socket, userid)

        sockets[userid] = socket
        
        //通知業(yè)務(wù)服務(wù)器陌凳,用戶已登錄
        pub.publish("login_message_channel", JSON.stringify(userinfo))

    })
  • 發(fā)送消息
    socket.on('chat', function (msg, ack) {
        //process_msg(msg)
        pub.publish("chat_filter_channel", JSON.stringify(msg))
        socket.userid = msg.from
        sockets[socket.userid] = socket
        if (ack) {
            ack(1)
        }

    })
  • 接收和回應(yīng)心跳
    socket.on('hb', function (msg, ack) {
        if (ack) {
            ack(1)
        }
    })
  • 用戶下線内舟,刪除對應(yīng)的socket
    socket.on("disconnect", function () {
        var address = socket.handshake.address;
        console.log(Date() + " Disconnect from " + address.address + ":" + address.port);
        if (!socket.relogin) {
            delete sockets[socket.userid]
        }
    })

推送服務(wù)器

實現(xiàn)了聊天服務(wù)器后,對推送來說就很簡單了

  1. 在redis里開個channel验游,業(yè)務(wù)服務(wù)器往這個channel里publish數(shù)據(jù)
  2. nodejs subscribe這個channel監(jiān)聽數(shù)據(jù),找到對應(yīng)用戶發(fā)送消息即可耕蝉。
  3. 用戶不在線崔梗,可能也需要對這個離線推送消息做保留,具體看業(yè)務(wù)定義
var notification = redis.createClient()
notification.subscribe("notification")

// check redis notifcation channel
notification.on("message", function (pattern, msg) {
    var msgobj = JSON.parse(msg)
    var keys = msgobj.toWho
    var needStore = msgobj.needStore
    for (index in keys) {
        var key = keys[index]
        if (!needStore) {
            if (sockets[key]) {
                sockets[key].emit("notification", msg)
            }
        }
        else {
            var list = []
            store.hget("nodejs_notification", key, function (e, v) {
                if (v) {
                    list = JSON.parse(v);
                }
                list.push(msg)
                var msglist = JSON.stringify(list)
                store.hset("nodejs_notification", key, msglist, function (e, r) {
                })
            })

            if (sockets[key]) {
                send_notification(sockets[key], msg)
            }

        }
    }
})

function send_notification(socket, notif) {
    socket.emit("notification", notif, function ack() {
        store.hdel("nodejs_notification", socket.userid)
    })
}

客戶端Lib庫

服務(wù)端是使用socket.io實現(xiàn)垒在,基本上socket.io的Lib都能兼容

以下是推薦的兩個客戶端Lib:

其他語言版本蒜魄,可以在github搜索socket.io,找到對應(yīng)的Lib庫

Server端全部代碼

//var io = require('socket.io').listen(80)
var app = require('http').createServer(handler)
    , io = require('socket.io').listen(app)

app.listen(80);

function handler(req, res) {
    if (req.url == "/monitor") {
        res.writeHead(200);
        res.end("OK");
    }
    else {
        res.writeHead(404);
        res.end();
    }
}


//io.disable('heartbeats')
//io.set('heartbeats', false);
io.set('transports', ['websocket', 'xhr-polling']);
io.set('heartbeat timeout', 5 * 60)
io.set('heartbeat interval', 4 * 60)
io.set('close timeout', 1 * 30);
io.set("log level", 1)
io.set("browser client", false)
io.set("browser client cache", false)
io.set("browser client cache", false)
var redis = require("redis")
var pub = redis.createClient()
var store = redis.createClient()
var snschat = redis.createClient()
var notification = redis.createClient()
var PUSH_TO_IOS_DELAY_TIME = 120000
snschat.subscribe("snschat");
notification.subscribe("notification")
var sockets = {}

pub.on("error", function (err) {
    console.log("Error " + err);
});

store.on("error", function (err) {
    console.log("Error " + err);
});

snschat.on("error", function (err) {
    console.log("Error " + err);
});


function send_msg_delay(socket) {
    store.hget("chat_history", socket.userid, function (e, v) {
        if (v) {
            list = JSON.parse(v);
            if (list.length > 0) {
                var msg = JSON.stringify(list)
                socket.isSendingChatMessage = false
                send_msg(socket, msg)
            }
        }
    })
}

function send_msg(socket, msg) {
    //delay for 5 sec
    if (socket.isSendingChatMessage) {
        setTimeout(function () {
            send_msg_delay(socket)
        }, 5000)
        return
    }
    socket.isSendingChatMessage = true

    //start send
    var callSendToIOS = sendToIOSDealy(socket.userid, PUSH_TO_IOS_DELAY_TIME)
    socket.emit("chat", msg, function ack(size) {
        clearTimeout(callSendToIOS)
        store.hget("chat_history", socket.userid, function (e, v) {
            if (v) {
                list = JSON.parse(v);
                //console.log("size="+size)
                if (list.length == size) {
                    store.hdel("chat_history", socket.userid, function (e, r) {
                    })
                }
                else if (size < list.length) {
                    list = list.splice(size)
                    var msglist = JSON.stringify(list)
                    store.hset("chat_history", socket.userid, msglist, function (e, r) {
                    })
                }
            }
            socket.isSendingChatMessage = false

        })
    })
}


function sendToIOSDealy(toWho, time) {
    return setTimeout(function () {
        sendToIOS(toWho)
    }, time)
}

function sendToIOS(toWho) {
    var obj = {"toWho": toWho}
    var msg = JSON.stringify(obj)
    console.log("delay send to ios channel:" + msg)
    pub.publish("chat_message_channel", msg)
}

function send_notification(socket, notif) {
    socket.emit("notification", notif, function ack() {
        store.hdel("nodejs_notification", socket.userid)
    })
}

function send_store_msg(socket, userid) {

    if (socket.isSendStoreMsg) {
        return;
    }

    socket.isSendingChatMessage = false

    store.hget("chat_history", userid, function (e, msg) {
        if (msg) {
            send_msg(socket, msg)
            store.hdel("chat_history", socket.userid, function (e, r) {
            })
        }
    })

    store.hget("nodejs_notification", userid, function (e, msg) {
        if (msg) {
            var msglist = JSON.parse(msg)
            for (var i = 0; i < msglist.length; i++) {
                send_notification(socket, msglist[i])
            }
            //socket.emit("notification", msg)
            //store.hdel("nodejs_notification", userid)
        }
    })
    socket.isSendStoreMsg = true
}

function saveToChatHistory(msg) {
    var list = []
    store.hget("chat_history", msg.to, function (e, v) {
        if (v) {
            list = JSON.parse(v);
        }
        list.push(msg)
        var msglist = JSON.stringify(list)
        store.hset("chat_history", msg.to, msglist, function (e, r) {
        })
    })
}


function pushToChatHistoryChannel(msg) {
    var msgStr = JSON.stringify(msg)
    pub.publish("chat_message_history_channel", msgStr)
}

function process_msg(msg) {
    var list = []
    store.hget("chat_history", msg.to, function (e, v) {
        if (v) {
            list = JSON.parse(v);
        }
        list.push(msg)
        var msglist = JSON.stringify(list)
        store.hset("chat_history", msg.to, msglist, function (e, r) {
        })
        if (sockets[msg.to]) {
            send_msg(sockets[msg.to], msglist)
        }
        else {
            sendToIOS(msg.to)
        }
        pushToChatHistoryChannel(msg)
    })
}

// check redis notifcation channel
notification.on("message", function (pattern, msg) {
    var msgobj = JSON.parse(msg)
    var keys = msgobj.toWho
    var needStore = msgobj.needStore
    for (index in keys) {
        var key = keys[index]
        if (!needStore) {
            if (sockets[key]) {
                sockets[key].emit("notification", msg)
            }
        }
        else {
            var list = []
            store.hget("nodejs_notification", key, function (e, v) {
                if (v) {
                    list = JSON.parse(v);
                }
                list.push(msg)
                var msglist = JSON.stringify(list)
                store.hset("nodejs_notification", key, msglist, function (e, r) {
                })
            })

            if (sockets[key]) {
                send_notification(sockets[key], msg)
            }

        }
    }
})

// check redis snschat channel
snschat.on("message", function (pattern, data) {
    msg = JSON.parse(data)
    process_msg(msg)
})


io.sockets.on('connection', function (socket) {
    var address = socket.handshake.address;
    console.log(Date() + " New connection from " + address.address + ":" + address.port);
    socket.on('login', function (userinfo) {
        userid = userinfo.myAuraId
        var address = socket.handshake.address;
        var deviceid = userinfo.deviceId
        console.log(Date() + " Login from " + address.address + ":" + address.port + " " + userid + " " + deviceid);
        old_socket = sockets[userid]
        if (old_socket && old_socket.deviceid && deviceid && old_socket.deviceid != deviceid) {
            old_socket.relogin = 1
            old_socket.emit('logout')
            console.log("logout " + old_socket.userid + " " + old_socket.deviceid)
        }

        if (old_socket && old_socket != socket) {
            old_socket.disconnect()
        }

        socket.relogin = 0
        socket.userid = userid
        socket.deviceid = deviceid

        send_store_msg(socket, userid)

        sockets[userid] = socket
        pub.publish("login_message_channel", JSON.stringify(userinfo))

    })

    socket.on('geo', function (geo, ack) {
        if (geo.myAuraId) {
            var now = new Date()
            pub.publish("geo", JSON.stringify({
                geo: geo, time: now.getTime()
            }))
            socket.userid = geo.myAuraId
            sockets[socket.userid] = socket

            if (ack) {
                ack(1)
                send_store_msg(socket, userid)

            }
        }
    })

    socket.on('chat', function (msg, ack) {
        //process_msg(msg)
        pub.publish("chat_filter_channel", JSON.stringify(msg))
        socket.userid = msg.from
        sockets[socket.userid] = socket
        if (ack) {
            ack(1)
        }

    })

    socket.on('hb', function (msg, ack) {
        if (ack) {
            ack(1)
        }
    })


    socket.on("disconnect", function () {
        var address = socket.handshake.address;
        console.log(Date() + " Disconnect from " + address.address + ":" + address.port);
        if (!socket.relogin) {
            delete sockets[socket.userid]
        }
    })

})

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子谈为,更是在濱河造成了極大的恐慌旅挤,老刑警劉巖,帶你破解...
    沈念sama閱讀 218,607評論 6 507
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件峦阁,死亡現(xiàn)場離奇詭異谦铃,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)榔昔,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,239評論 3 395
  • 文/潘曉璐 我一進(jìn)店門驹闰,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人撒会,你說我怎么就攤上這事嘹朗。” “怎么了诵肛?”我有些...
    開封第一講書人閱讀 164,960評論 0 355
  • 文/不壞的土叔 我叫張陵屹培,是天一觀的道長。 經(jīng)常有香客問我怔檩,道長褪秀,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,750評論 1 294
  • 正文 為了忘掉前任薛训,我火速辦了婚禮媒吗,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘乙埃。我一直安慰自己闸英,他們只是感情好,可當(dāng)我...
    茶點故事閱讀 67,764評論 6 392
  • 文/花漫 我一把揭開白布介袜。 她就那樣靜靜地躺著甫何,像睡著了一般。 火紅的嫁衣襯著肌膚如雪遇伞。 梳的紋絲不亂的頭發(fā)上辙喂,一...
    開封第一講書人閱讀 51,604評論 1 305
  • 那天加派,我揣著相機(jī)與錄音跳芳,去河邊找鬼飞盆。 笑死吓歇,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的女气。 我是一名探鬼主播炼鞠,決...
    沈念sama閱讀 40,347評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼谒主,長吁一口氣:“原來是場噩夢啊……” “哼霎肯!你這毒婦竟也來了榛斯?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,253評論 0 276
  • 序言:老撾萬榮一對情侶失蹤懂缕,失蹤者是張志新(化名)和其女友劉穎提佣,沒想到半個月后荤崇,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,702評論 1 315
  • 正文 獨居荒郊野嶺守林人離奇死亡倚喂,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,893評論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了子库。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 40,015評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡宴倍,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出俗他,到底是詐尸還是另有隱情兆衅,我是刑警寧澤,帶...
    沈念sama閱讀 35,734評論 5 346
  • 正文 年R本政府宣布羡亩,位于F島的核電站夕春,受9級特大地震影響及志,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜速侈,卻給世界環(huán)境...
    茶點故事閱讀 41,352評論 3 330
  • 文/蒙蒙 一倚搬、第九天 我趴在偏房一處隱蔽的房頂上張望每界。 院中可真熱鬧家卖,春花似錦、人聲如沸趴樱。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,934評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽永罚。三九已至议薪,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間醇锚,已是汗流浹背坯临。 一陣腳步聲響...
    開封第一講書人閱讀 33,052評論 1 270
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點兒被人妖公主榨干…… 1. 我叫王不留赶促,地道東北人鸥滨。 一個月前我還...
    沈念sama閱讀 48,216評論 3 371
  • 正文 我出身青樓婿滓,卻偏偏與公主長得像粥喜,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子额湘,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,969評論 2 355

推薦閱讀更多精彩內(nèi)容