RabbitMQ for Node 官方文檔筆錄

Hello World

RabbitMQ 接收處理和轉發(fā)二進制blob數據

一些術語

  • Producer:發(fā)送消息的程序
  • Producing:發(fā)送消息
  • Queue:消息緩沖器白翻,暫存消息,在RabbitMQ中
  • Consuming:等待接收消息
  • Consumer:等待并接收消息的程序

使用amqp.node客戶端

任務需求

  1. 安裝RabbitMQ for Node模塊(amqp.node)
  2. 編寫消息發(fā)送程序
  3. 編寫消息接收程序
  4. 運行測試代碼

step1 安裝客戶端模塊

npm install amqplib

發(fā)送

var amqp = require('amqplib/callback_api');
amqp.connect('amqp://localhost', function(err, conn){
    // 創(chuàng)建一個頻道皆尔,是大多數API所在的位置
    conn.createChannel(function(err,ch){
        var q = 'hello'; // 對列名
        // 聲明一個隊列,該操作是冪等的熬粗,隊列不存在才會創(chuàng)建
        ch.assertQueue(q,{durable:false});
        // 向隊列發(fā)送消息库说,消息使用字節(jié)數組發(fā)送(Buffer)
        ch.sendToQueue(q,new Buffer('Hello World'));
        console.log('[x] Sent \'Hello World\'');
    });
});

接收

var amqp = require('amqplib/callback_api');
amqp.connect('amqp://localhost', function(err, conn){
    // 創(chuàng)建頻道
    conn.createChannel(function(err, ch){
        var q = 'hello'; 
        ch.assertQueue(q,{durable:false});  // 聲明隊列
        // 注冊監(jiān)聽
        ch.consume(q, function(msg){
            console.log('[x] Received%s ', msg.content.toString());
        },{noAck:true});
    });
});

Work Queues 工作隊列

WorkQueue用于在多個Consumer之間分配耗時的任務垢村。

實現(xiàn)思想

流程

將任務封裝成消息并將其發(fā)送到隊列娇哆,工作進程彈出任務并最終執(zhí)行作業(yè)末荐。

實現(xiàn)

MQ充當任務隊列侧纯,Producer向MQ中發(fā)送任務,多個Consumer領取任務并執(zhí)行甲脏。

Step 1 實現(xiàn)Producer 和 Consumer

Producer

Producer的代碼

// new_task.js 每次啟動后往隊列中發(fā)送一條task

var amqp = require('amqplib/callback_api');
amqp.connect('amqp://localhost',function(err,conn){
    conn.createChannel(function(err,ch){
        var q = 'test_queue';
        var msg = 'New Task!';
        ch.assertQueue(q, {durable:true});
        // 發(fā)送消息
        ch.sendToQueue(q, new Buffer(msg), {persistent: true});
        console.log('[x]Sent \'%s\'',msg);
    }); 
});

Consumer

Consumer的代碼

// worker.js模擬一個消費者眶熬,監(jiān)聽并接收MQ分配給自己的task。

var amqp = require('amqplib/callback_api');
amqp.connect('amqp://localhost',function(err,conn){
    conn.createChannel(function(err, ch){
        var q = 'test_queue';
        ch.assertQueue(q, {durable:true});
        // 監(jiān)聽MQ發(fā)送的消息块请,并在calback中處理
        ch.consume(q, function(msg){
            var secs = msg.content.toString().split('.').length -1;
            console.log('[x] Received %s',msg.content.toString());
            // 模擬處理時間
            setTimeout(function(){
                console.log('[x] Done');
            },secs * 1000);
        },{noAck: true});
    });
});

啟動步驟

  1. 打開多個控制臺娜氏,啟動多個worker.js。
  2. 啟動多次new_task.js负乡,向MQ中發(fā)送多條信息牍白。

結果:每個worker依次接收到任務并打印。

默認情況下抖棘,RabbitMQ將按順序將每條消息發(fā)送給下一個消費者茂腥。
平均而言,每個消費者將獲得相同數量的消息切省。這種分發(fā)消息的方式稱為循環(huán)法最岗。

問題:在實際應用中,上述程序存在一些問題

  1. 每個任務都會消耗一定的時間朝捆,這個時間是不確定的般渡。
  2. 上面的程序中,當worker領取任務后芙盘,消息便會從MQ中彈出驯用,不論是否執(zhí)行成功。若執(zhí)行失敗儒老,則任務丟失蝴乔。

解決方案:使用消息確認解決。

Step 2 使用消息確認機制

為了確保消息永不丟失驮樊,RabbitMQ支持消息確認

消費者發(fā)回ack(acknowledgement)告訴RabbitMQ已收到薇正,處理了特定消息片酝,RabbitMQ可以自由刪除它。

如果Consumer死亡(其通道關閉挖腰,連接關閉或TCP連接丟失)而不發(fā)送ack雕沿,RabbitMQ會將該消息重新排隊。如果其他消費者同時在線猴仑,則會迅速將其重新發(fā)送給其他消費者审轮。這樣就可以確保沒有消息丟失,即使Consumer偶爾會死亡宁脊。

實現(xiàn)原理

使用consume方法的noAck屬性断国,將其設置成false開啟消息確認。

然后調用Chanel.ack(MSG)方法應答MQ

代碼如下

ch.consume(q, function(msg){ 
    // ...do something sync
    setTimeout(function(){
        // ...do something async
        ch.ack(MSG) // 發(fā)送ack
    },1000);
 },{noAck: true}); // 開啟ack

注意:確認必須在收到的交付的同一信道上發(fā)送榆苞。嘗試使用不同的通道進行確認將導致通道級協(xié)議異常。

Step3 消息持久化

當RabbitMQ退出或崩潰時霞捡,它將丟失隊列和消息坐漏。要確保消息不會丟失,我們需要將隊列和消息都標記為持久碧信。

開啟隊列的持久化

// 聲明隊列時赊琳,設置durable屬性為true開啟隊列持久化
ch.assertQueue('hello', {durable: true}); 

開啟消息的持久化

// 發(fā)送消息時,設置persistent屬性為true砰碴,讓RabbitMQ持久化當前消息
ch.sendToQueue(q, new Buffer(msg), {persistent: true});

有關消息持久化的注意事項

注意:將消息標記為持久性并不能完全保證消息不會丟失躏筏。雖然它告訴RabbitMQ將消息保存到磁盤,但是當RabbitMQ接受消息并且尚未保存消息時呈枉,仍然有一個短時間窗口期趁尼。如果需要更強的保證,那么可以使用 發(fā)布者確認猖辫。

Step4 公平派遣任務

問題

平均依次分配任務不能保證每個worker接收到相同分量的任務酥泞。當出現(xiàn)一些任務很重,另一些很輕時啃憎,經常會出現(xiàn)一個worker將經常忙碌而另一個worker經持ザ冢空閑。

解決方案

使用prefetch方法控制worker同時最多接收的任務數量辛萍。當worker待處理的任務達到最大數量時悯姊,MQ不會向其發(fā)送新任務,而是會向空閑的worker發(fā)送新任務贩毕。使用這種機制可以達到負載均衡的效果悯许。

Consumer(Worker)的代碼

// ...imports
amqp.connect('amqp://localhost',function(err,conn){
    conn.createChannel(function(err,ch){
        var q = 'task_queue';
        ch.prefetch(1); // 當有一個任務未完成時,不再接受新任務
        ch.consume(q,function(msg){
            // ...do something 
            setTimeout(()=>{
                // ...do something async
                 ch.ack(MSG); // 確認執(zhí)行完畢
            },1000);
        },{noAck:false});
    });
});

Publish/Subscribe

Routing

Topics

RPC

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末耳幢,一起剝皮案震驚了整個濱河市岸晦,隨后出現(xiàn)的幾起案子欧啤,更是在濱河造成了極大的恐慌,老刑警劉巖启上,帶你破解...
    沈念sama閱讀 217,277評論 6 503
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件邢隧,死亡現(xiàn)場離奇詭異,居然都是意外死亡冈在,警方通過查閱死者的電腦和手機倒慧,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,689評論 3 393
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來包券,“玉大人纫谅,你說我怎么就攤上這事。” “怎么了?”我有些...
    開封第一講書人閱讀 163,624評論 0 353
  • 文/不壞的土叔 我叫張陵旋膳,是天一觀的道長莹汤。 經常有香客問我,道長,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,356評論 1 293
  • 正文 為了忘掉前任,我火速辦了婚禮猛计,結果婚禮上,老公的妹妹穿的比我還像新娘爆捞。我一直安慰自己奉瘤,他們只是感情好,可當我...
    茶點故事閱讀 67,402評論 6 392
  • 文/花漫 我一把揭開白布煮甥。 她就那樣靜靜地躺著盗温,像睡著了一般。 火紅的嫁衣襯著肌膚如雪苛秕。 梳的紋絲不亂的頭發(fā)上肌访,一...
    開封第一講書人閱讀 51,292評論 1 301
  • 那天,我揣著相機與錄音艇劫,去河邊找鬼吼驶。 笑死,一個胖子當著我的面吹牛店煞,可吹牛的內容都是我干的蟹演。 我是一名探鬼主播,決...
    沈念sama閱讀 40,135評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼顷蟀,長吁一口氣:“原來是場噩夢啊……” “哼酒请!你這毒婦竟也來了?” 一聲冷哼從身側響起鸣个,我...
    開封第一講書人閱讀 38,992評論 0 275
  • 序言:老撾萬榮一對情侶失蹤羞反,失蹤者是張志新(化名)和其女友劉穎布朦,沒想到半個月后,有當地人在樹林里發(fā)現(xiàn)了一具尸體昼窗,經...
    沈念sama閱讀 45,429評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡是趴,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 37,636評論 3 334
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了澄惊。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片唆途。...
    茶點故事閱讀 39,785評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖掸驱,靈堂內的尸體忽然破棺而出肛搬,到底是詐尸還是另有隱情,我是刑警寧澤毕贼,帶...
    沈念sama閱讀 35,492評論 5 345
  • 正文 年R本政府宣布温赔,位于F島的核電站,受9級特大地震影響鬼癣,放射性物質發(fā)生泄漏让腹。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,092評論 3 328
  • 文/蒙蒙 一扣溺、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧瓜晤,春花似錦锥余、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,723評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至足画,卻和暖如春雄驹,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背淹辞。 一陣腳步聲響...
    開封第一講書人閱讀 32,858評論 1 269
  • 我被黑心中介騙來泰國打工医舆, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人象缀。 一個月前我還...
    沈念sama閱讀 47,891評論 2 370
  • 正文 我出身青樓蔬将,卻偏偏與公主長得像,于是被迫代替她去往敵國和親央星。 傳聞我的和親對象是個殘疾皇子霞怀,可洞房花燭夜當晚...
    茶點故事閱讀 44,713評論 2 354

推薦閱讀更多精彩內容

  • Spring Cloud為開發(fā)人員提供了快速構建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務發(fā)現(xiàn)莉给,斷路器毙石,智...
    卡卡羅2017閱讀 134,654評論 18 139
  • 來源 RabbitMQ是用Erlang實現(xiàn)的一個高并發(fā)高可靠AMQP消息隊列服務器廉沮。支持消息的持久化、事務徐矩、擁塞控...
    jiangmo閱讀 10,359評論 2 34
  • Kafka簡介 Kafka是一種分布式的滞时,基于發(fā)布/訂閱的消息系統(tǒng)。主要設計目標如下: 以時間復雜度為O(1)的方...
    Alukar閱讀 3,081評論 0 43
  • 背景介紹 Kafka簡介 Kafka是一種分布式的丧蘸,基于發(fā)布/訂閱的消息系統(tǒng)漂洋。主要設計目標如下: 以時間復雜度為O...
    高廣超閱讀 12,831評論 8 167
  • 最近會思考一個問題:python3 中 while 與 while true 有啥不區(qū)別 刽漂? while True...
    liqun奮斗struggle閱讀 37,321評論 1 7