![](http://7xjq5l.com1.z0.glb.clouddn.com/nodejs.jpg)
兩年前在項目中使用nodejs+socket.io+redis實現(xiàn)的聊天和推送服務(wù)器,
基本上幾百行代碼就實現(xiàn)了整個功能员串,在項目中單服務(wù)器單進(jìn)程可以跑到支持
5000人左右同時在線彬坏。
主要思路
- 用戶上線后蟹腾,根據(jù)用戶的userid和socket骂删,保存到一個全局的map中
- 發(fā)送消息時,根據(jù)對方的userid找到對應(yīng)的socket祭刚,通過socket.emit發(fā)送消息給對方
- 如果對方不在線袁梗,在redis中保留消息,在對方上線后推送給用戶
- 用戶下線后遮怜,從全局的map中刪除對應(yīng)的用戶socket
- 由于需要保持長連接,客戶端需要定時發(fā)心跳給服務(wù)端鸿市,所以定義了一個心跳消息,可以5分鐘發(fā)一次
- 用戶上線
io.sockets.on('connection', function (socket) {
var address = socket.handshake.address;
console.log(Date() + " New connection from " + address.address + ":" + address.port);
socket.on('login', function (userinfo) {
userid = userinfo.myAuraId
var address = socket.handshake.address;
var deviceid = userinfo.deviceId
console.log(Date() + " Login from " + address.address + ":" + address.port + " " + userid + " " + deviceid);
old_socket = sockets[userid]
if (old_socket && old_socket.deviceid && deviceid && old_socket.deviceid != deviceid) {
old_socket.relogin = 1
old_socket.emit('logout')
console.log("logout " + old_socket.userid + " " + old_socket.deviceid)
}
if (old_socket && old_socket != socket) {
old_socket.disconnect()
}
socket.relogin = 0
socket.userid = userid
socket.deviceid = deviceid
//發(fā)送離線消息
send_store_msg(socket, userid)
sockets[userid] = socket
//通知業(yè)務(wù)服務(wù)器陌凳,用戶已登錄
pub.publish("login_message_channel", JSON.stringify(userinfo))
})
- 發(fā)送消息
socket.on('chat', function (msg, ack) {
//process_msg(msg)
pub.publish("chat_filter_channel", JSON.stringify(msg))
socket.userid = msg.from
sockets[socket.userid] = socket
if (ack) {
ack(1)
}
})
- 接收和回應(yīng)心跳
socket.on('hb', function (msg, ack) {
if (ack) {
ack(1)
}
})
- 用戶下線内舟,刪除對應(yīng)的socket
socket.on("disconnect", function () {
var address = socket.handshake.address;
console.log(Date() + " Disconnect from " + address.address + ":" + address.port);
if (!socket.relogin) {
delete sockets[socket.userid]
}
})
推送服務(wù)器
實現(xiàn)了聊天服務(wù)器后,對推送來說就很簡單了
- 在redis里開個channel验游,業(yè)務(wù)服務(wù)器往這個channel里publish數(shù)據(jù)
- nodejs subscribe這個channel監(jiān)聽數(shù)據(jù),找到對應(yīng)用戶發(fā)送消息即可耕蝉。
- 用戶不在線崔梗,可能也需要對這個離線推送消息做保留,具體看業(yè)務(wù)定義
var notification = redis.createClient()
notification.subscribe("notification")
// check redis notifcation channel
notification.on("message", function (pattern, msg) {
var msgobj = JSON.parse(msg)
var keys = msgobj.toWho
var needStore = msgobj.needStore
for (index in keys) {
var key = keys[index]
if (!needStore) {
if (sockets[key]) {
sockets[key].emit("notification", msg)
}
}
else {
var list = []
store.hget("nodejs_notification", key, function (e, v) {
if (v) {
list = JSON.parse(v);
}
list.push(msg)
var msglist = JSON.stringify(list)
store.hset("nodejs_notification", key, msglist, function (e, r) {
})
})
if (sockets[key]) {
send_notification(sockets[key], msg)
}
}
}
})
function send_notification(socket, notif) {
socket.emit("notification", notif, function ack() {
store.hdel("nodejs_notification", socket.userid)
})
}
客戶端Lib庫
服務(wù)端是使用socket.io實現(xiàn)垒在,基本上socket.io的Lib都能兼容
以下是推薦的兩個客戶端Lib:
其他語言版本蒜魄,可以在github搜索socket.io,找到對應(yīng)的Lib庫
Server端全部代碼
//var io = require('socket.io').listen(80)
var app = require('http').createServer(handler)
, io = require('socket.io').listen(app)
app.listen(80);
function handler(req, res) {
if (req.url == "/monitor") {
res.writeHead(200);
res.end("OK");
}
else {
res.writeHead(404);
res.end();
}
}
//io.disable('heartbeats')
//io.set('heartbeats', false);
io.set('transports', ['websocket', 'xhr-polling']);
io.set('heartbeat timeout', 5 * 60)
io.set('heartbeat interval', 4 * 60)
io.set('close timeout', 1 * 30);
io.set("log level", 1)
io.set("browser client", false)
io.set("browser client cache", false)
io.set("browser client cache", false)
var redis = require("redis")
var pub = redis.createClient()
var store = redis.createClient()
var snschat = redis.createClient()
var notification = redis.createClient()
var PUSH_TO_IOS_DELAY_TIME = 120000
snschat.subscribe("snschat");
notification.subscribe("notification")
var sockets = {}
pub.on("error", function (err) {
console.log("Error " + err);
});
store.on("error", function (err) {
console.log("Error " + err);
});
snschat.on("error", function (err) {
console.log("Error " + err);
});
function send_msg_delay(socket) {
store.hget("chat_history", socket.userid, function (e, v) {
if (v) {
list = JSON.parse(v);
if (list.length > 0) {
var msg = JSON.stringify(list)
socket.isSendingChatMessage = false
send_msg(socket, msg)
}
}
})
}
function send_msg(socket, msg) {
//delay for 5 sec
if (socket.isSendingChatMessage) {
setTimeout(function () {
send_msg_delay(socket)
}, 5000)
return
}
socket.isSendingChatMessage = true
//start send
var callSendToIOS = sendToIOSDealy(socket.userid, PUSH_TO_IOS_DELAY_TIME)
socket.emit("chat", msg, function ack(size) {
clearTimeout(callSendToIOS)
store.hget("chat_history", socket.userid, function (e, v) {
if (v) {
list = JSON.parse(v);
//console.log("size="+size)
if (list.length == size) {
store.hdel("chat_history", socket.userid, function (e, r) {
})
}
else if (size < list.length) {
list = list.splice(size)
var msglist = JSON.stringify(list)
store.hset("chat_history", socket.userid, msglist, function (e, r) {
})
}
}
socket.isSendingChatMessage = false
})
})
}
function sendToIOSDealy(toWho, time) {
return setTimeout(function () {
sendToIOS(toWho)
}, time)
}
function sendToIOS(toWho) {
var obj = {"toWho": toWho}
var msg = JSON.stringify(obj)
console.log("delay send to ios channel:" + msg)
pub.publish("chat_message_channel", msg)
}
function send_notification(socket, notif) {
socket.emit("notification", notif, function ack() {
store.hdel("nodejs_notification", socket.userid)
})
}
function send_store_msg(socket, userid) {
if (socket.isSendStoreMsg) {
return;
}
socket.isSendingChatMessage = false
store.hget("chat_history", userid, function (e, msg) {
if (msg) {
send_msg(socket, msg)
store.hdel("chat_history", socket.userid, function (e, r) {
})
}
})
store.hget("nodejs_notification", userid, function (e, msg) {
if (msg) {
var msglist = JSON.parse(msg)
for (var i = 0; i < msglist.length; i++) {
send_notification(socket, msglist[i])
}
//socket.emit("notification", msg)
//store.hdel("nodejs_notification", userid)
}
})
socket.isSendStoreMsg = true
}
function saveToChatHistory(msg) {
var list = []
store.hget("chat_history", msg.to, function (e, v) {
if (v) {
list = JSON.parse(v);
}
list.push(msg)
var msglist = JSON.stringify(list)
store.hset("chat_history", msg.to, msglist, function (e, r) {
})
})
}
function pushToChatHistoryChannel(msg) {
var msgStr = JSON.stringify(msg)
pub.publish("chat_message_history_channel", msgStr)
}
function process_msg(msg) {
var list = []
store.hget("chat_history", msg.to, function (e, v) {
if (v) {
list = JSON.parse(v);
}
list.push(msg)
var msglist = JSON.stringify(list)
store.hset("chat_history", msg.to, msglist, function (e, r) {
})
if (sockets[msg.to]) {
send_msg(sockets[msg.to], msglist)
}
else {
sendToIOS(msg.to)
}
pushToChatHistoryChannel(msg)
})
}
// check redis notifcation channel
notification.on("message", function (pattern, msg) {
var msgobj = JSON.parse(msg)
var keys = msgobj.toWho
var needStore = msgobj.needStore
for (index in keys) {
var key = keys[index]
if (!needStore) {
if (sockets[key]) {
sockets[key].emit("notification", msg)
}
}
else {
var list = []
store.hget("nodejs_notification", key, function (e, v) {
if (v) {
list = JSON.parse(v);
}
list.push(msg)
var msglist = JSON.stringify(list)
store.hset("nodejs_notification", key, msglist, function (e, r) {
})
})
if (sockets[key]) {
send_notification(sockets[key], msg)
}
}
}
})
// check redis snschat channel
snschat.on("message", function (pattern, data) {
msg = JSON.parse(data)
process_msg(msg)
})
io.sockets.on('connection', function (socket) {
var address = socket.handshake.address;
console.log(Date() + " New connection from " + address.address + ":" + address.port);
socket.on('login', function (userinfo) {
userid = userinfo.myAuraId
var address = socket.handshake.address;
var deviceid = userinfo.deviceId
console.log(Date() + " Login from " + address.address + ":" + address.port + " " + userid + " " + deviceid);
old_socket = sockets[userid]
if (old_socket && old_socket.deviceid && deviceid && old_socket.deviceid != deviceid) {
old_socket.relogin = 1
old_socket.emit('logout')
console.log("logout " + old_socket.userid + " " + old_socket.deviceid)
}
if (old_socket && old_socket != socket) {
old_socket.disconnect()
}
socket.relogin = 0
socket.userid = userid
socket.deviceid = deviceid
send_store_msg(socket, userid)
sockets[userid] = socket
pub.publish("login_message_channel", JSON.stringify(userinfo))
})
socket.on('geo', function (geo, ack) {
if (geo.myAuraId) {
var now = new Date()
pub.publish("geo", JSON.stringify({
geo: geo, time: now.getTime()
}))
socket.userid = geo.myAuraId
sockets[socket.userid] = socket
if (ack) {
ack(1)
send_store_msg(socket, userid)
}
}
})
socket.on('chat', function (msg, ack) {
//process_msg(msg)
pub.publish("chat_filter_channel", JSON.stringify(msg))
socket.userid = msg.from
sockets[socket.userid] = socket
if (ack) {
ack(1)
}
})
socket.on('hb', function (msg, ack) {
if (ack) {
ack(1)
}
})
socket.on("disconnect", function () {
var address = socket.handshake.address;
console.log(Date() + " Disconnect from " + address.address + ":" + address.port);
if (!socket.relogin) {
delete sockets[socket.userid]
}
})
})