架構(gòu)設(shè)計(jì)之NodeJS操作消息隊(duì)列RabbitMQ

一. 什么是消息隊(duì)列凤覆?

消息(Message)是指在應(yīng)用間傳送的數(shù)據(jù)链瓦。消息可以非常簡(jiǎn)單,比如只包含文本字符串盯桦,也可以更復(fù)雜慈俯,可能包含嵌入對(duì)象。

消息隊(duì)列(Message Queue)是一種應(yīng)用間的通信方式俺附,消息發(fā)送后可以立即返回肥卡,由消息系統(tǒng)來確保消息的可靠傳遞。消息發(fā)布者只管把消息發(fā)布到 MQ 中而不用管誰來取事镣,消息使用者只管從 MQ 中取消息而不管是誰發(fā)布的步鉴。這樣發(fā)布者和使用者都不用知道對(duì)方的存在。

二. 常用的消息隊(duì)列有哪些璃哟?

RabbitMQ氛琢、RocketMQ、ActiveMQ随闪、Kafka阳似、ZeroMQ、MetaMq铐伴。

甚至現(xiàn)在部分NoSQL也可做消息隊(duì)列撮奏,如Redis。

三. 消息隊(duì)列的使用場(chǎng)景当宴?

  • 異步處理

  • 應(yīng)用解耦

  • 流量削峰

四. 使用案例

上規(guī)模的公司都會(huì)有自己的日志分析系統(tǒng)畜吊,日志系統(tǒng)是怎么實(shí)現(xiàn)的呢?

image.png

圖解:用戶在訪問應(yīng)用的時(shí)候户矢,我們要記錄下用戶的操作記錄和系統(tǒng)的異常日志玲献,常規(guī)的做法是將系統(tǒng)產(chǎn)生的日志保存到服務(wù)器磁盤,在服務(wù)器中開啟定時(shí)任務(wù)梯浪,定時(shí)將磁盤的日志信息傳入mq中(生產(chǎn)者)捌年,也定時(shí)將mq中的消息取出并存到相應(yīng)的數(shù)據(jù)庫,如ElasticSearch或Hive中挂洛。

五. 如何安裝RabbitMQ礼预?

上面的案例介紹了MQ的一個(gè)使用場(chǎng)景,我這里是用RabbitMQ舉例虏劲,現(xiàn)實(shí)項(xiàng)目中可能用到的是Kafka逆瑞。

  1. 首先安裝brew(mac為例)

    /usr/bin/ruby -e "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/master/install)"復(fù)制代碼
    
  2. 安裝RabbitMQ

    brew install rabbitmq復(fù)制代碼
    
  3. 運(yùn)行RabbitMQ

    進(jìn)入到 /usr/local/Cellar/rabbitmq/3.7.7荠藤,執(zhí)行

    localhost:3.6.6 lidong$ sbin/rabbitmq-server復(fù)制代碼
    
  4. 啟動(dòng)插件

    進(jìn)入到 /usr/local/Cellar/rabbitmq/3.7.7/sbin

    ./rabbitmq-plugins enable rabbitmq_management復(fù)制代碼
    
  5. 登陸管理界面

    打開瀏覽器輸入:http://localhost:15672,RabbitMQ默認(rèn)15672端口

image.png

六. Nodejs操作RabbitMQ

網(wǎng)上可以找到好幾個(gè)相應(yīng)的Node SDK,這里推薦amqplib

  1. 生產(chǎn)者
/**
 * 對(duì)RabbitMQ的封裝
 */
let amqp = require('amqplib');

class RabbitMQ {
    constructor() {
        this.hosts = [];
        this.index = 0;
        this.length = this.hosts.length;
        this.open = amqp.connect(this.hosts[this.index]);
    }
    sendQueueMsg(queueName, msg, errCallBack) {
        let self = this;

        self.open
            .then(function (conn) {
                return conn.createChannel();
            })
            .then(function (channel) {
                return channel.assertQueue(queueName).then(function (ok) {
                    return channel.sendToQueue(queueName, new Buffer(msg), {
                        persistent: true
                    });
                })
                    .then(function (data) {
                        if (data) {
                            errCallBack && errCallBack("success");
                            channel.close();
                        }
                    })
                    .catch(function () {
                        setTimeout(() => {
                            if (channel) {
                                channel.close();
                            }
                        }, 500)
                    });
            })
            .catch(function () {
                let num = self.index++;

                if (num <= self.length - 1) {
                    self.open = amqp.connect(self.hosts[num]);
                } else {
                    self.index == 0;
                }
            });
    }
}
復(fù)制代碼

2. 消費(fèi)者

/**
 * 對(duì)RabbitMQ的封裝
 */
let amqp = require('amqplib');

class RabbitMQ {
    constructor() {
        this.open = amqp.connect(this.hosts[this.index]);
    }
    receiveQueueMsg(queueName, receiveCallBack, errCallBack) {
        let self = this;

        self.open
            .then(function (conn) {
                return conn.createChannel();
            })
            .then(function (channel) {
                return channel.assertQueue(queueName)
                    .then(function (ok) {
                        return channel.consume(queueName, function (msg) {
                            if (msg !== null) {
                                let data = msg.content.toString();
                                channel.ack(msg);
                                receiveCallBack && receiveCallBack(data);
                            }
                        })
                            .finally(function () {
                                setTimeout(() => {
                                    if (channel) {
                                        channel.close();
                                    }
                                }, 500)
                            });
                    })
            })
            .catch(function () {
                let num = self.index++;
                if (num <= self.length - 1) {
                    self.open = amqp.connect(self.hosts[num]);
                } else {
                    self.index = 0;
                    self.open = amqp.connect(self.hosts[0]);
                }
            });
    }復(fù)制代碼

3. 通過生產(chǎn)者向MQ發(fā)送一個(gè)消息兼都,并創(chuàng)建隊(duì)列

let mq = new RabbitMQ();
mq.sendQueueMsg('testQueue', 'my first message', (error) => {
    console.log(error)
})復(fù)制代碼

執(zhí)行之后生巡,我們打開管理平臺(tái),發(fā)現(xiàn)RabbbitMQ已經(jīng)接受到了一條消息:

image.png

并且RabbbitMQ新增了一個(gè)隊(duì)列testQueue

image.png

4. 獲取指定隊(duì)列的消息

let mq = new RabbitMQ();
mq.receiveQueueMsg('testQueue',(msg) => 
{    
   console.log(msg)
})
// 輸出結(jié)果:my first message復(fù)制代碼

此時(shí)打開RabbitMQ管理平臺(tái)冕臭,消息數(shù)量已經(jīng)變?yōu)?:

image.png

綜上:我們簡(jiǎn)單講述了消息隊(duì)列及RabbitMQ相關(guān)的一些知識(shí),以及我們?nèi)绾瓮ㄟ^nodejs來生產(chǎn)與消費(fèi)消息,上面講的比較簡(jiǎn)單摊趾,之后會(huì)發(fā)表更多文章講述消息隊(duì)列集群搭建及容災(zāi)的實(shí)現(xiàn)。

作者:吳空
鏈接:https://juejin.im/post/5b4839b2f265da0f6640142b
來源:掘金
著作權(quán)歸作者所有游两。商業(yè)轉(zhuǎn)載請(qǐng)聯(lián)系作者獲得授權(quán)砾层,非商業(yè)轉(zhuǎn)載請(qǐng)注明出處。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末贱案,一起剝皮案震驚了整個(gè)濱河市肛炮,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌宝踪,老刑警劉巖侨糟,帶你破解...
    沈念sama閱讀 206,126評(píng)論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異瘩燥,居然都是意外死亡秕重,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,254評(píng)論 2 382
  • 文/潘曉璐 我一進(jìn)店門厉膀,熙熙樓的掌柜王于貴愁眉苦臉地迎上來溶耘,“玉大人,你說我怎么就攤上這事服鹅〉时” “怎么了?”我有些...
    開封第一講書人閱讀 152,445評(píng)論 0 341
  • 文/不壞的土叔 我叫張陵菱魔,是天一觀的道長(zhǎng)留荔。 經(jīng)常有香客問我,道長(zhǎng)澜倦,這世上最難降的妖魔是什么聚蝶? 我笑而不...
    開封第一講書人閱讀 55,185評(píng)論 1 278
  • 正文 為了忘掉前任,我火速辦了婚禮藻治,結(jié)果婚禮上碘勉,老公的妹妹穿的比我還像新娘。我一直安慰自己桩卵,他們只是感情好验靡,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,178評(píng)論 5 371
  • 文/花漫 我一把揭開白布倍宾。 她就那樣靜靜地躺著,像睡著了一般胜嗓。 火紅的嫁衣襯著肌膚如雪高职。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 48,970評(píng)論 1 284
  • 那天辞州,我揣著相機(jī)與錄音怔锌,去河邊找鬼。 笑死变过,一個(gè)胖子當(dāng)著我的面吹牛埃元,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播媚狰,決...
    沈念sama閱讀 38,276評(píng)論 3 399
  • 文/蒼蘭香墨 我猛地睜開眼岛杀,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來了崭孤?” 一聲冷哼從身側(cè)響起类嗤,我...
    開封第一講書人閱讀 36,927評(píng)論 0 259
  • 序言:老撾萬榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎裳瘪,沒想到半個(gè)月后土浸,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 43,400評(píng)論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡彭羹,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 35,883評(píng)論 2 323
  • 正文 我和宋清朗相戀三年黄伊,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片派殷。...
    茶點(diǎn)故事閱讀 37,997評(píng)論 1 333
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡还最,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出毡惜,到底是詐尸還是另有隱情拓轻,我是刑警寧澤,帶...
    沈念sama閱讀 33,646評(píng)論 4 322
  • 正文 年R本政府宣布经伙,位于F島的核電站扶叉,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏帕膜。R本人自食惡果不足惜枣氧,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,213評(píng)論 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望垮刹。 院中可真熱鬧达吞,春花似錦、人聲如沸荒典。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,204評(píng)論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至覆糟,卻和暖如春刻剥,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背滩字。 一陣腳步聲響...
    開封第一講書人閱讀 31,423評(píng)論 1 260
  • 我被黑心中介騙來泰國打工透敌, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人踢械。 一個(gè)月前我還...
    沈念sama閱讀 45,423評(píng)論 2 352
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像魄藕,于是被迫代替她去往敵國和親内列。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,722評(píng)論 2 345

推薦閱讀更多精彩內(nèi)容

  • 消息隊(duì)列目前流行的有KafKa背率、RabbitMQ话瞧、ActiveMQ等,它們的誕生無非不是為了解決消息的分布式消費(fèi)寝姿,...
    恒宇少年閱讀 12,450評(píng)論 24 101
  • 關(guān)于消息隊(duì)列交排,從前年開始斷斷續(xù)續(xù)看了些資料,想寫很久了饵筑,但一直沒騰出空埃篓,近來分別碰到幾個(gè)朋友聊這塊的技術(shù)選型,是時(shí)...
    預(yù)流閱讀 584,412評(píng)論 51 785
  • 關(guān)于消息隊(duì)列根资,從前年開始斷斷續(xù)續(xù)看了些資料架专,想寫很久了,但一直沒騰出空玄帕,近來分別碰到幾個(gè)朋友聊這塊的技術(shù)選型部脚,是時(shí)...
    中v中閱讀 1,967評(píng)論 0 20
  • 關(guān)于消息隊(duì)列,從前年開始斷斷續(xù)續(xù)看了些資料裤纹,想寫很久了委刘,但一直沒騰出空,近來分別碰到幾個(gè)朋友聊這塊的技術(shù)選型鹰椒,是時(shí)...
    Johnson_zx閱讀 1,107評(píng)論 0 5
  • 關(guān)于消息隊(duì)列锡移,從前年開始斷斷續(xù)續(xù)看了些資料,想寫很久了吹零,但一直沒騰出空罩抗,近來分別碰到幾個(gè)朋友聊這塊的技術(shù)選型,是時(shí)...
    Java機(jī)械師閱讀 544評(píng)論 0 2