轉(zhuǎn)載請(qǐng)注明:
原始地址:http://www.reibang.com/p/586c6bc7951b
原作者:wonder
1、寫(xiě)在前面
上一篇《owt-server 的集群管理者祷愉、集群工作站窗宦、消息隊(duì)列(一)》主要介紹了 owt-server 的集群管理者(clusterManager)的相關(guān)定義,同時(shí)提到了消息交互的底層是依賴于消息隊(duì)列的二鳄。owt-server中使用了node.js 的 amqp 庫(kù)去連接rabbitmq實(shí)例赴涵,源碼中對(duì) amqp 操作進(jìn)一步封裝形成適合其他模塊調(diào)用的消息隊(duì)列接口。
本篇將對(duì)上述的amqp封裝模塊 amqp_client.js進(jìn)行分解介紹订讼,但是在開(kāi)始模塊分解之前髓窜,簡(jiǎn)要介紹一下amqp這種協(xié)議(amqp其實(shí)是一種協(xié)議,定義了一種消息隊(duì)列的組織模塊欺殿、基本概念)寄纵。了解 amqp 協(xié)議有助于初學(xué)者快速理解消息隊(duì)列的原理,在走讀 owt-server 以及其他使用消息隊(duì)列的源碼中會(huì)輕松很多)
在本篇之后祈餐,你將對(duì)owt-server的集群管理(clusterManager)擂啥、底層消息機(jī)制(rpc、消息隊(duì)列)有了一定了解帆阳。那么在《owt-server 的集群管理者哺壶、集群工作站、消息隊(duì)列(三)
》中蜒谤,就將開(kāi)始介紹集群工作者(clusterWorker)山宾,clusterWorkers 將同樣利用本篇所述的rpc機(jī)制與clusterManagers(準(zhǔn)確的說(shuō)是master)進(jìn)行交互,owt-server系統(tǒng)的全貌也將由此逐漸展現(xiàn)鳍徽。
2资锰、amqp(Advanced Message Queuing Protocol)協(xié)議簡(jiǎn)要介紹
如果你已經(jīng)了解 “高級(jí)消息隊(duì)列協(xié)議”,縮寫(xiě)是amqp阶祭,直接跳過(guò)本節(jié)绷杜。
如果你需要深入了解amqp協(xié)議直秆、學(xué)習(xí)一些栗子,可以參閱 rabbitmq 的官方資料 或者 其他優(yōu)秀的博客鞭盟,比如:
rabbitmq 對(duì)于AMQP的介紹: [https://www.rabbitmq.com/tutorials/amqp-concepts.html#message-acknowledge](https://www.rabbitmq.com/tutorials/amqp-concepts.html#message-acknowledge)
【推薦看這篇圾结,全面且有拓展】[http://www.reibang.com/p/a4d92d0d7e19](http://www.reibang.com/p/a4d92d0d7e19)
下圖中包含了 amqp 的幾個(gè)元素:publisher、exchange齿诉、routes筝野、queue、consumer
publisher: 消息的發(fā)布者
consumer: 消息的消費(fèi)者
exchange: 發(fā)布的消息將通過(guò)關(guān)聯(lián)的routes被路由到指定queue
routes: 路由規(guī)則
queue: 存儲(chǔ)消息的隊(duì)列
其中exchange有若干種類型:
Direct exchange 直連式
Fanout exchangea 廣播式
Topic exchange 主題式
Headers exchange 頭匹配式
發(fā)布者和消費(fèi)者要執(zhí)行的動(dòng)作:
消息發(fā)布者(publisher)發(fā)布(publish)消息到某個(gè)exchange粤剧,消息根據(jù)規(guī)則(routes)被路由到對(duì)應(yīng)queue歇竟;
消息消費(fèi)者(consumer)訂閱(subscribe)該queue才可以接收到queue中的消息;
3抵恋、owt-server 中 amqp 封裝模塊分解
amqp_client.js 位于 owt-server源碼目錄下的 **source/common/ **文件夾中
root@ubuntu:/home/wonder/OWT/owt-server-master/source/common# ls
amqp_client.js cipher.js clusterWorker.js loadCollector.js logger.js makeRPC.js mediaUtil.js rpcChannel.js
查看amqp_client.js 文件焕议,有以下定義
3.1 var declareExchange 定義
var declareExchange = function(conn, name, type, autoDelete, on_ok, on_failure) { //創(chuàng)建exchange元素,conn為rabittmq的連接實(shí)例
var ok = false;
var exc = conn.exchange(name, {type: type, autoDelete: autoDelete}, function (exchange) { //用rabbitmq連接實(shí)例創(chuàng)建指定名稱馋记、類型的exchange
log.debug('Exchange ' + exchange.name + ' is open');
ok = true;
on_ok(exc); //創(chuàng)建成功執(zhí)行回調(diào)函數(shù)号坡,把exchange元素傳回調(diào)用者
});
};
3.2 var rpcClient 定義
用消息隊(duì)列方法實(shí)現(xiàn)了遠(yuǎn)程調(diào)用客戶端的角色
var rpcClient = function(bus, conn, on_ready, on_failure) { //bus為外部透?jìng)鲄?shù),conn為rabbitmq實(shí)例
var handler = {bus: bus};
var call_map = {},
corrID = 0,
ready = false,
reply_q,
exc;
var queueBindingName;
declareExchange(conn, 'owtRpc', 'direct', true, function (exc_got) { //生成一個(gè) direct 類型的 exchange對(duì)象
exc = exc_got; //生成的exchange對(duì)象
reply_q = conn.queue( ' ', function (q) { //創(chuàng)建一個(gè)響應(yīng)消息隊(duì)列梯醒,用于接收rpc消息的響應(yīng)(簡(jiǎn)稱響應(yīng))
log.debug('Reply queue for rpc client ' + q.name + ' is open');
// Save queueBindingName once.
queueBindingName = q.name; //保留該響應(yīng)隊(duì)列的名稱
reply_q.bind(exc.name, queueBindingName, function () { //響應(yīng)隊(duì)列綁定exchange和route名
reply_q.subscribe(function (message) { //響應(yīng)隊(duì)列訂閱開(kāi)啟(開(kāi)始接收消息)
try {
log.debug('New message received', message);
if(call_map[message.corrID] !== undefined) { //接收到的響應(yīng)是有rpc發(fā)起者的id記錄在案的
log.debug('Callback', message.type, ' - ', message.data);
clearTimeout(call_map[message.corrID].timer); //清除該發(fā)起者id定時(shí)器
call_map[message.corrID].fn[message.type].call({}, message.data, message.err); //使用指定處理函數(shù)處理該響應(yīng)
if (call_map[message.corrID].fn['onStatus']) { //狀態(tài)響應(yīng)需要一起時(shí)間生效
setTimeout(function() { //一定時(shí)間后,銷(xiāo)毀對(duì)應(yīng)rpc消息記錄
(call_map[message.corrID] !== undefined) && (delete call_map[message.corrID]);
}, REMOVAL_TIMEOUT);
} else { //銷(xiāo)毀對(duì)應(yīng)rpc消息記錄
(call_map[message.corrID] !== undefined) && (delete call_map[message.corrID]);
}
} else {
log.warn('Late rpc reply:', message);
}
} catch(err) {
log.error('Error processing response: ', err);
}
});
ready = true; //響應(yīng)隊(duì)列綁定成功
on_ready(); //響應(yīng)隊(duì)列綁定成功的回調(diào)
});
});
}, on_failure);
handler.remoteCall = function(to, method, args, callbacks, timeout) { //執(zhí)行一次遠(yuǎn)程調(diào)用(rpc)腌紧,to為該rpc對(duì)應(yīng)的消息隊(duì)列名稱茸习,method為rpc方法名,args為rpc方法的參數(shù)壁肋,callbacks為rpc響應(yīng)的處理函數(shù)号胚,timeout為rpc超時(shí)時(shí)間
log.debug('remoteCall, corrID:', corrID, 'to:', to, 'method:', method);
if (ready) { //響應(yīng)隊(duì)列可用
var corr_id = corrID++; //消息序號(hào)
call_map[corr_id] = {};
call_map[corr_id].fn = callbacks || {callback: function() {}}; //記錄rpc響應(yīng)的處理函數(shù)
call_map[corr_id].timer = setTimeout(function() { //設(shè)置rpc超時(shí)時(shí)鐘
log.debug('remoteCall timeout, corrID:', corr_id);
if (call_map[corr_id]) {
for (var i in call_map[corr_id].fn) {
(typeof call_map[corr_id].fn[i] === 'function' ) && call_map[corr_id].fn[i]('timeout'); //所有rpc響應(yīng)的處理函數(shù)置為超時(shí)
}
delete call_map[corr_id]; //清理rpc記錄
}
}, timeout || TIMEOUT);
exc.publish(to, {method: method, args: args, corrID: corr_id, replyTo: queueBindingName}); //發(fā)布rpc消息
} else { //響應(yīng)隊(duì)列未就緒
for (var i in callbacks) {
(typeof callbacks[i] === 'function' ) && callbacks[i]('error', 'rpc client is not ready'); //響應(yīng)的處理函數(shù)置為錯(cuò)誤
}
}
};
handler.remoteCast = function(to, method, args) { //向遠(yuǎn)端投遞消息,與rpc的區(qū)別是浸遗,投遞消息不需要響應(yīng)
exc && exc.publish(to, {method: method, args: args}); //向exchange發(fā)布
};
handler.close = function() { //關(guān)閉該rpc client
for (var i in call_map) { //清除所有rpc記錄的超時(shí)
clearTimeout(call_map[i].timer);
}
call_map = {};
reply_q && reply_q.destroy();
reply_q = undefined;
exc && exc.destroy(true);
exc = undefined;
};
return handler;
};
3.3 var rpcClient 定義
用消息隊(duì)列方法實(shí)現(xiàn)了遠(yuǎn)程調(diào)用服務(wù)端的角色
var rpcServer = function(bus, conn, id, methods, on_ready, on_failure) { //bus為外部透?jìng)鲄?shù)猫胁,conn為rabbitmq實(shí)例,id為該rpc服務(wù)名跛锌,methodes為rpc方法表
var handler = {bus: bus};
var exc, request_q;
declareExchange(conn, 'owtRpc', 'direct', true, function (exc_got) { //創(chuàng)建 “direct” 類型的exchange
exc = exc_got;
var ready = false;
request_q = conn.queue(id, function (queueCreated) { //創(chuàng)建rpc請(qǐng)求接收隊(duì)列弃秆,簡(jiǎn)稱“請(qǐng)求隊(duì)列”
log.debug('Request queue for rpc server ' + queueCreated.name + ' is open');
request_q.bind(exc.name, id, function() { //請(qǐng)求隊(duì)列綁定exchange名稱和route名
request_q.subscribe(function (message) { //請(qǐng)求開(kāi)啟訂閱(開(kāi)始接收消息)
try {
log.debug('New message received', message);
message.args = message.args || [];
if (message.replyTo && message.corrID !== undefined) { //響應(yīng)目標(biāo)和消息序號(hào)正常
message.args.push(function(type, result, err) { //在請(qǐng)求消息參數(shù)中加入響應(yīng)發(fā)送回調(diào)函數(shù)
exc.publish(message.replyTo, {data: result, corrID: message.corrID, type: type, err: err});
});
}
if (typeof methods[message.method] === 'function') { //rpc請(qǐng)求的方法存在且是函數(shù)
methods[message.method].apply(methods, message.args); //調(diào)用該函數(shù)
} else { //rcp請(qǐng)求該方法不存在
log.warn('RPC server does not support this method:', message.method);
if (message.replyTo && message.corrID !== undefined) { //響應(yīng)消息置為錯(cuò)誤
exc.publish(message.replyTo, {data: 'error', corrID: message.corrID, type: 'callback', err: 'Not support method'}); //發(fā)送響應(yīng)
}
}
} catch (error) {
log.error('message:', message);
log.error('Error processing call: ', error);
}
});
ready = true;
on_ready();
});
});
}, on_failure);
handler.close = function() { //rpc server關(guān)閉
request_q && request_q.destroy();
request_q = undefined;
exc && exc.destroy(true);
exc = undefined;
};
return handler;
};
3.4 var topicParticipant 定義
用消息隊(duì)列方法實(shí)現(xiàn)了主題消息參與者的角色
var topicParticipant = function(bus, conn, excName, on_ready, on_failure) { //bus為外部透?jìng)鲄?shù),conn為rabbitmq實(shí)例髓帽,excName為主題名稱
var that = {bus: bus},
message_processor;
var exc, msg_q;
declareExchange(conn, excName, 'topic', false, function (exc_got) { //創(chuàng)建“topic” 類型exchange
exc = exc_got;
var ready = false;
msg_q = conn.queue('', function (queueCreated) { //創(chuàng)建消息隊(duì)列
log.debug('Message queue for topic participant is open:', queueCreated.name);
ready = true;
on_ready();
});
}, on_failure);
that.subscribe = function(patterns, on_message, on_ok) { //訂閱 patterns 中的主題菠赚,on_message為消息回調(diào)函數(shù)
if (msg_q) {
message_processor = on_message;
patterns.map(function(pattern) { //消息隊(duì)列綁定patterns中的每個(gè)主題
msg_q.bind(exc.name, pattern, function() {
log.debug('Follow topic [' + pattern + '] ok.');
});
});
msg_q.subscribe(function(message) { //消息隊(duì)列訂閱開(kāi)啟(開(kāi)始接收消息)
try {
message_processor && message_processor(message); //回調(diào)消息
} catch (error) {
log.error('Error processing topic message:', message, 'and error:', error);
}
});
on_ok();
}
};
that.unsubscribe = function(patterns) { //取消訂閱patterns中的主題
if (msg_q) {
message_processor = undefined;
patterns.map(function(pattern) { //對(duì)patterns中的每條主題取消訂閱
msg_q.unbind(exc.name, pattern);
log.debug('Ignore topic [' + pattern + ']');
});
}
};
that.publish = function(topic, data) {
exc && exc.publish(topic, data); //在一個(gè)主題上發(fā)布一條消息
};
that.close = function () {
msg_q.destroy();
msg_q = undefined;
exc && exc.destroy(true);
exc = undefined;
};
return that;
};
3.4 var faultMonitor定義
用消息隊(duì)列方法實(shí)現(xiàn)了差錯(cuò)監(jiān)視者的角色
var faultMonitor = function(bus, conn, on_message, on_ready, on_failure) { //bus為外部透?jìng)鲄?shù),conn為rabbitmq實(shí)例郑藏,on_message為消息處理(回調(diào))函數(shù)
var that = {bus: bus},
msg_receiver = on_message;
var exc, msg_q;
declareExchange(conn, 'owtMonitoring', 'topic', false, function (exc_got) { //創(chuàng)建“topic”類型的exchange
exc = exc_got;
var ready = false;
msg_q = conn.queue('', function (queueCreated) { //創(chuàng)建消息隊(duì)列
log.debug('Message queue for monitoring is open:', queueCreated.name);
ready = true;
msg_q.bind(exc.name, 'exit.#', function() { //綁定消息隊(duì)列到“exit.#”主題
log.debug('Monitoring queue bind ok.');
});
msg_q.subscribe(function(msg) { //消息隊(duì)列定義開(kāi)啟
try {
log.debug('received monitoring message:', msg);
msg_receiver && msg_receiver(msg); //處理消息
} catch (error) {
log.error('Error processing monitored message:', msg, 'and error:', error);
}
});
on_ready();
});
}, on_failure);
that.setMsgReceiver = function (on_message) { //重設(shè)置消息處理函數(shù)
msg_receiver = on_message;
};
that.close = function () {
msg_q.destroy();
msg_q = undefined;
exc && exc.destroy(true);
exc = undefined;
};
return that;
};
3.5 var monitoringTarget定義
用消息隊(duì)列方法實(shí)現(xiàn)了差錯(cuò)通告者的角色
var monitoringTarget = function(bus, conn, on_ready, on_failure) { //bus為外部透?jìng)鲄?shù)衡查,conn為rabbitmq實(shí)例
var that = {bus: bus};
var exc;
declareExchange(conn, 'owtMonitoring', 'topic', false, function (exc_got) { //創(chuàng)建“topic”類型exchange
exc = exc_got;
on_ready();
}, on_failure);
that.notify = function(reason, message) { //發(fā)送主題為“exit.”+resason的通告
exc && exc.publish('exit.' + reason, {reason: reason, message: message});
};
that.close = function () {
exc.destroy(true);
exc = undefined;
};
return that;
};
3.6 amqp_client.js的模塊導(dǎo)出定義
模塊導(dǎo)出部分提供了上述幾個(gè)定義的對(duì)外接口,具體就不全部貼出來(lái)了必盖,可以參考源碼
module.exports = function() {
//省略部分定義
...
that.connect = function(options, on_ok, on_failure) {
log.debug('Connecting to rabbitMQ server:', options);
var setupConnection = function(options) { //建立ampq到rabbitmq的連接實(shí)例
//以下是一些源碼注釋
/*
* `amqp.createConnection([options, [implOptions]])` takes two options
* objects as parameters. The first options object has these defaults:
*
* { host: 'localhost'
* , port: 5672
* , login: 'guest'
* , password: 'guest'
* , connectionTimeout: 10000
* , authMechanism: 'AMQPLAIN'
* , vhost: '/'
* , noDelay: true
* , ssl: { enabled : false
* }
* }
*
* The second options are specific to the node AMQP implementation. It has
* the default values:
*
* { defaultExchangeName: ''
* , reconnect: true
* , reconnectBackoffStrategy: 'linear'
* , reconnectExponentialLimit: 120000
* , reconnectBackoffTime: 1000
* }
*/
// Note that we use the default option.
// So the reconnect is enabled, and reconnect strategy is
// 'linear', which means the broken connection will try to
// recover after every 'reconnectBackoffTime' which is
// 1000ms by default.
var conn = amqp.createConnection(options); //創(chuàng)建連接拌牲,異步進(jìn)行
var connected = false;
conn.on('ready', function() { //設(shè)置連接狀態(tài)“ready”的處理函數(shù)
delete options.password; //刪除配置中rabbitmq的密碼俱饿,保證程序運(yùn)行期間密碼不存于內(nèi)存
log.info('Connecting to rabbitMQ server OK, options:', options);
connection = conn;
// The 'ready' event will be triggered each time
// the connection is OK. So just invoke the success
// callback once to avoid the duplicate logic when
// reconnecting is done.
if (!connected) {
connected = true;
on_ok();
}
});
conn.on('error', function(e) { //設(shè)置連接狀態(tài)“error”的處理函數(shù)
// The amqp client will try to reconnect by
// the default option, so just notify something here.
log.info('Connection to rabbitMQ server error', e);
});
conn.on('disconnect', function() { //設(shè)置連接狀態(tài)“disconnect”的處理函數(shù)
if (connection) {
close();
connection = undefined;
log.info('Connection to rabbitMQ server disconnected.');
on_failure('amqp connection disconnected');
}
});
};
if (fs.existsSync(cipher.astore)) { //獲取授權(quán)信息文件
cipher.unlock(cipher.k, cipher.astore, function cb (err, authConfig) { //使用cipher庫(kù)解密出cipher.astore文件中存儲(chǔ)的關(guān)于rabbit的授權(quán)信息(用戶名和密碼),cipher庫(kù)實(shí)現(xiàn)請(qǐng)參考源碼
if (!err) {
if (authConfig.rabbit) {
options.login = authConfig.rabbit.username;
options.password = authConfig.rabbit.password;
}
setupConnection(options); //建立和rabbit的連接
} else {
log.error('Failed to get rabbitmq auth:', err);
setupConnection(options);
}
});
} else {
setupConnection(options);
}
};
}