Apache thrift是一個(gè)開源的RPC框架拖叙,看到跟protocol buffer一樣也適用多種語言,就想著用rabbitMQ來處理thrift的消息赂乐,因?yàn)閠hrift字節(jié)比pb更少薯鳍,可以適用于大量傳送數(shù)據(jù)的場(chǎng)景,例如挨措,每個(gè)消息10K挖滤,傳送100條這樣的消息,就是10*100=1M浅役,但用thrift可以壓縮40%斩松,這里數(shù)據(jù)就少得很可觀了。
這里就記錄一下担租,我的使用過程:
首先定義一個(gè)Message.thrift
struct Message {
1: i32 messageid,
2: string message
}
然后用thrift生成相應(yīng)的js文件
thrift --gen js:node Message.thrift
這里生成了一個(gè)Message_types.js文件
這就是我們用來序列化數(shù)據(jù)的文件
好了砸民,我們?cè)儆胣ode寫一個(gè)rabbitMQ的發(fā)送文件
這里我們就叫send.js
var thrift = require('thrift');
var Message = require('./gen-nodejs/Message_types').Message;
var amqp = require('amqplib/callback_api');
var transport = new thrift.TBufferedTransport();
var protocol = new thrift.TBinaryProtocol(transport);
var AMPQ_URI = 'amqp://localhost:5672';
amqp.connect(AMPQ_URI, function(err, conn){
conn.createChannel(function(err, ch){
var q = 'hello';
var buf = obj2buf({messageid:1,message:'{message:"1234"}'});
ch.assertQueue(q, {durable: false});
ch.sendToQueue(q, buf);
console.log(" [x] Send Data Finish");
});
setTimeout(function(){
conn.close();
process.exit(0);
}, 500);
})
/**
* 將對(duì)象轉(zhuǎn)換成buffer
* @param {[type]} obj [description]
* @return {[type]} [description]
*/
var obj2buf = function(obj){
var message = new Message(obj);
message.write(protocol);
var outBuffers = transport.outBuffers;
var outCount = transport.outCount;
var result = new Buffer(outCount);
var pos = 0;
outBuffers.forEach(function(buf) {
buf.copy(result, pos, 0);
pos += buf.length;
});
return result;
}
其中obj2buf就是thrift將數(shù)據(jù)轉(zhuǎn)換成buffer的方法抵怎,別問我怎么得來的奋救,我也是從網(wǎng)上找的,但這個(gè)方法能用反惕,自己親測(cè)
我們?cè)賹懸粋€(gè)receiver.js尝艘,這個(gè)方法是用來處理rabbitMQ消息的
var thrift = require('thrift');
var Message = require('./gen-nodejs/Message_types').Message;
var amqp = require('amqplib/callback_api');
var transport = new thrift.TBufferedTransport();
var protocol = new thrift.TBinaryProtocol(transport);
var AMQP_URI = 'amqp://localhost:5672';
amqp.connect(AMQP_URI, function(err, conn){
conn.createChannel(function(err, ch){
var q = 'hello';
ch.assertQueue(q, {durable: false});
console.log('[*] Waiting for message in %s. To exit press CTRL+C', q);
ch.consume(q, function(msg){
// console.log(msg);
var message = buf2obj(msg.content);
console.log(message);
console.log('[x] Received Data Finish');
}, {noAck: true});
})
})
/**
* 將buffer轉(zhuǎn)換成對(duì)象
* @param {[type]} buffer [description]
* @return {[type]} [description]
*/
var buf2obj = function(buffer){
var data = buffer;
data.copy(transport.inBuf, transport.writeCursor, 0);
transport.writeCursor += data.length;
var message = new Message();
message.read(protocol);
return message;
}
這里的buf2obj就是將buffer轉(zhuǎn)換成對(duì)象,
rabbitMQ里面?zhèn)魉拖⒍际且詁uffer類型姿染。
好了背亥,我們可以先跑
node receiver.js
屏幕快照 2017-05-18 下午3.10.07.png
再開一個(gè)窗口運(yùn)行
node send.js
屏幕快照 2017-05-18 下午3.10.18.png