用pm2起一個nodejs服務(wù),安裝amqp-connection-manager褂微,ioredis:
1)建立amqp連接瓜饥,創(chuàng)建channels,綁定queue冷溃,并消費:
const amqp = require('amqp-connection-manager');
const amqpConfig = {
"amqp": {
"cluster": [
"amqp://用戶名:密碼@服務(wù)器ip:5672"
],
"exchange": "****"
}
}
amqp.connect(amqpConfig.amqp.cluster, { json: true })
.createChannel({ setup: channelSetup })
.waitForConnect().then(() => {
console.log("MQ client is listening for messages.");
}).catch(err => {
console.log("MQ client setup failed.", err);
});
function channelSetup(ch) {
return ch.assertExchange(Config.amqp.exchange, 'direct', { durable: true })
.then(function (exchg) {
//消息類型
var qp_new_task = `push.queue.task.new`;
var qks = [
{ "queue": qp_new_task, "key": qp_new_task, "durable": true }
];
_.each(qks, function (qk) {
ch.assertQueue(qk.queue, { durable: qk.durable }).then(function (qok) {
return qok.queue;
}).then(function (queue) {
return ch.bindQueue(queue, exchg.exchange, qk.key).then(function () {
return queue;
});
}).then(function (queue) {
console.log("*** Message Queue [" + queue + "] ready");
return ch.consume(queue, function (msg) {
console.log('[receive message from amqp]')
}, { noAck: true });
});
});
});
}
如果amqp連接成功钱磅,會在amqp的管理界面看到如下節(jié)點信息:2)建立redis連接:
const Redis = require('ioredis');
function createRedisClient(){
const config = {
"redis": {
"password": "***",
"cluster": [
{
"host": "192.168.0.0",
"port": 1234
},
{
"host": "192.168.0.1",
"port": 1234
}
]
}}
//建立單個redis service連接:
return new Redis({
host: config.redis.cluster[0].host,
port: config.redis.cluster[0].port,
password: config.redis.password
});
//建立多個redis service連接:
//new Redis.Cluster(config.redis.cluster, {
// redisOptions: {
// password: config.redis.password
// }
//});
}
//向redis注冊用戶信息:
createRedisClient().sadd(userId, userToken);
3)socket.io的存在是橋梁作用似枕,socket.io分為socket.io和socket.io-client盖淡。在web端實例化socket.io-client,用戶登錄后凿歼,獲取到用戶信息和token褪迟,推送給nodejs服務(wù)中的socket.io。socket.io接收到用戶信息后在redis中注冊答憔。當amqp推送消息過來時味赃,nodejs服務(wù)會消費到消息隊列中的這條消息,然后找到對應(yīng)的userId,利用socket.io推送給socket.io-client,從而實現(xiàn)聊天系統(tǒng)中的消息推送虐拓。
實現(xiàn)效果:采坑記錄:
首先檢查服務(wù)器上rabbitMQ和redis的端口是否正常訪問
-- 檢查方法:linux和centos:telnet 10.20.66.37 5672
macos: nc -vz -w 2 10.20.66.37 5672
補充說明:
--------補充更新-------
當我們在聊天系統(tǒng)中發(fā)送消息時城榛,首先會調(diào)用后臺的api揪利,后臺api會將消息放到amqp的消息隊列中,其次狠持,每個連接amqp的nodejs服務(wù)相當于一個消費者疟位,消息隊列中一旦有消息會被消費者消費。假設(shè)當前有兩個nodejs服務(wù)同時在消費喘垂,當amqp消息隊列中有消息產(chǎn)生時甜刻,如果被其中一個nodejs服務(wù)消費掉,則另外一個nodejs將消費不到這條消息王污。換句話說罢吃,當有多個nodejs存在時,只會有一個nodejs接收到amqp發(fā)來的消息
后續(xù)會整理個demo出來昭齐,更不動了尿招。。