Hello World
RabbitMQ 接收處理和轉發(fā)二進制blob數據
一些術語
- Producer:發(fā)送消息的程序
- Producing:發(fā)送消息
- Queue:消息緩沖器白翻,暫存消息,在RabbitMQ中
- Consuming:等待接收消息
- Consumer:等待并接收消息的程序
使用amqp.node客戶端
任務需求
- 安裝RabbitMQ for Node模塊(amqp.node)
- 編寫消息發(fā)送程序
- 編寫消息接收程序
- 運行測試代碼
step1 安裝客戶端模塊
npm install amqplib
發(fā)送
var amqp = require('amqplib/callback_api');
amqp.connect('amqp://localhost', function(err, conn){
// 創(chuàng)建一個頻道皆尔,是大多數API所在的位置
conn.createChannel(function(err,ch){
var q = 'hello'; // 對列名
// 聲明一個隊列,該操作是冪等的熬粗,隊列不存在才會創(chuàng)建
ch.assertQueue(q,{durable:false});
// 向隊列發(fā)送消息库说,消息使用字節(jié)數組發(fā)送(Buffer)
ch.sendToQueue(q,new Buffer('Hello World'));
console.log('[x] Sent \'Hello World\'');
});
});
接收
var amqp = require('amqplib/callback_api');
amqp.connect('amqp://localhost', function(err, conn){
// 創(chuàng)建頻道
conn.createChannel(function(err, ch){
var q = 'hello';
ch.assertQueue(q,{durable:false}); // 聲明隊列
// 注冊監(jiān)聽
ch.consume(q, function(msg){
console.log('[x] Received%s ', msg.content.toString());
},{noAck:true});
});
});
Work Queues 工作隊列
WorkQueue用于在多個Consumer之間分配耗時的任務垢村。
實現(xiàn)思想
流程
將任務封裝成消息并將其發(fā)送到隊列娇哆,工作進程彈出任務并最終執(zhí)行作業(yè)末荐。
實現(xiàn)
MQ充當任務隊列侧纯,Producer向MQ中發(fā)送任務,多個Consumer領取任務并執(zhí)行甲脏。
Step 1 實現(xiàn)Producer 和 Consumer
Producer
Producer的代碼
// new_task.js 每次啟動后往隊列中發(fā)送一條task
var amqp = require('amqplib/callback_api');
amqp.connect('amqp://localhost',function(err,conn){
conn.createChannel(function(err,ch){
var q = 'test_queue';
var msg = 'New Task!';
ch.assertQueue(q, {durable:true});
// 發(fā)送消息
ch.sendToQueue(q, new Buffer(msg), {persistent: true});
console.log('[x]Sent \'%s\'',msg);
});
});
Consumer
Consumer的代碼
// worker.js模擬一個消費者眶熬,監(jiān)聽并接收MQ分配給自己的task。
var amqp = require('amqplib/callback_api');
amqp.connect('amqp://localhost',function(err,conn){
conn.createChannel(function(err, ch){
var q = 'test_queue';
ch.assertQueue(q, {durable:true});
// 監(jiān)聽MQ發(fā)送的消息块请,并在calback中處理
ch.consume(q, function(msg){
var secs = msg.content.toString().split('.').length -1;
console.log('[x] Received %s',msg.content.toString());
// 模擬處理時間
setTimeout(function(){
console.log('[x] Done');
},secs * 1000);
},{noAck: true});
});
});
啟動步驟
- 打開多個控制臺娜氏,啟動多個worker.js。
- 啟動多次new_task.js负乡,向MQ中發(fā)送多條信息牍白。
結果:每個worker依次接收到任務并打印。
默認情況下抖棘,RabbitMQ將按順序將每條消息發(fā)送給下一個消費者茂腥。
平均而言,每個消費者將獲得相同數量的消息切省。這種分發(fā)消息的方式稱為循環(huán)法最岗。
問題:在實際應用中,上述程序存在一些問題
- 每個任務都會消耗一定的時間朝捆,這個時間是不確定的般渡。
- 上面的程序中,當worker領取任務后芙盘,消息便會從MQ中彈出驯用,不論是否執(zhí)行成功。若執(zhí)行失敗儒老,則任務丟失蝴乔。
解決方案:使用消息確認解決。
Step 2 使用消息確認機制
為了確保消息永不丟失驮樊,RabbitMQ支持消息確認
消費者發(fā)回ack(acknowledgement)告訴RabbitMQ已收到薇正,處理了特定消息片酝,RabbitMQ可以自由刪除它。
如果Consumer死亡(其通道關閉挖腰,連接關閉或TCP連接丟失)而不發(fā)送ack雕沿,RabbitMQ會將該消息重新排隊。如果其他消費者同時在線猴仑,則會迅速將其重新發(fā)送給其他消費者审轮。這樣就可以確保沒有消息丟失,即使Consumer偶爾會死亡宁脊。
實現(xiàn)原理
使用consume方法的noAck屬性断国,將其設置成false開啟消息確認。
然后調用Chanel.ack(MSG)方法應答MQ
代碼如下
ch.consume(q, function(msg){
// ...do something sync
setTimeout(function(){
// ...do something async
ch.ack(MSG) // 發(fā)送ack
},1000);
},{noAck: true}); // 開啟ack
注意:確認必須在收到的交付的同一信道上發(fā)送榆苞。嘗試使用不同的通道進行確認將導致通道級協(xié)議異常。
Step3 消息持久化
當RabbitMQ退出或崩潰時霞捡,它將丟失隊列和消息坐漏。要確保消息不會丟失,我們需要將隊列和消息都標記為持久碧信。
開啟隊列的持久化
// 聲明隊列時赊琳,設置durable屬性為true開啟隊列持久化
ch.assertQueue('hello', {durable: true});
開啟消息的持久化
// 發(fā)送消息時,設置persistent屬性為true砰碴,讓RabbitMQ持久化當前消息
ch.sendToQueue(q, new Buffer(msg), {persistent: true});
有關消息持久化的注意事項
注意:將消息標記為持久性并不能完全保證消息不會丟失躏筏。雖然它告訴RabbitMQ將消息保存到磁盤,但是當RabbitMQ接受消息并且尚未保存消息時呈枉,仍然有一個短時間窗口期趁尼。如果需要更強的保證,那么可以使用 發(fā)布者確認猖辫。
Step4 公平派遣任務
問題
平均依次分配任務不能保證每個worker接收到相同分量的任務酥泞。當出現(xiàn)一些任務很重,另一些很輕時啃憎,經常會出現(xiàn)一個worker將經常忙碌而另一個worker經持ザ冢空閑。
解決方案
使用prefetch方法控制worker同時最多接收的任務數量辛萍。當worker待處理的任務達到最大數量時悯姊,MQ不會向其發(fā)送新任務,而是會向空閑的worker發(fā)送新任務贩毕。使用這種機制可以達到負載均衡的效果悯许。
Consumer(Worker)的代碼
// ...imports
amqp.connect('amqp://localhost',function(err,conn){
conn.createChannel(function(err,ch){
var q = 'task_queue';
ch.prefetch(1); // 當有一個任務未完成時,不再接受新任務
ch.consume(q,function(msg){
// ...do something
setTimeout(()=>{
// ...do something async
ch.ack(MSG); // 確認執(zhí)行完畢
},1000);
},{noAck:false});
});
});