消息隊(duì)列助你成為高薪 Node.js 工程師

為什么寫這篇文章

  • 現(xiàn)在的面試要求越來越高了,打開看了看幾個(gè) BOSS 招聘 Node.js 全棧開發(fā)的厕隧,其中都有一條“了解 消息隊(duì)列奔脐,并在項(xiàng)目中應(yīng)用過”,嗚嗚嗚
  • 后端開發(fā)者應(yīng)該都知道消息隊(duì)列吁讨,但是一些前端開發(fā)者可能知道的并不多髓迎,但是你們可能好奇搶票,商品秒殺等功能是如何實(shí)現(xiàn)的,其實(shí)沒有多么高大上建丧,看了消息隊(duì)列就知道了排龄。

文章導(dǎo)圖(你能學(xué)到)

image

什么是消息隊(duì)列

“消息隊(duì)列”是在消息的傳輸過程中保存消息的容器。

個(gè)人理解:我把它分成兩個(gè)詞消息隊(duì)列翎朱。當(dāng)一大批客戶端同時(shí)產(chǎn)生大量的網(wǎng)絡(luò)請(qǐng)求(消息)時(shí)候橄维,服務(wù)器的承受能力肯定是有一個(gè)限制的。這時(shí)候要是有個(gè)容器拴曲,先讓這些消息排隊(duì)就好了争舞,還好有個(gè)叫隊(duì)列的數(shù)據(jù)結(jié)構(gòu),通過有隊(duì)列屬性的容器排隊(duì)(先進(jìn)先出)澈灼,把消息再傳到我們的服務(wù)器竞川,壓力減小了好多店溢,這個(gè)很棒的容器就是消息隊(duì)列

這段理解中還包含這個(gè)兩個(gè)概念: 客戶端->生產(chǎn)者 服務(wù)器->消費(fèi)者當(dāng)有消息隊(duì)列出現(xiàn),生產(chǎn)者消費(fèi)者是必不可少的兩個(gè)概念委乌,上面的理解是多個(gè)生產(chǎn)者對(duì)應(yīng)一個(gè)消費(fèi)者床牧,當(dāng)然現(xiàn)實(shí)開發(fā)中還有許多消費(fèi)者的情況哦。接下來的文章也會(huì)多次提到生產(chǎn)-消費(fèi)模型遭贸。

消息隊(duì)列優(yōu)勢(shì)

  • 應(yīng)用解耦

    消息隊(duì)列可以使消費(fèi)者和生產(chǎn)者直接互不干涉叠赦,互不影響,只需要把消息發(fā)送到隊(duì)列即可算利,而且可獨(dú)立的擴(kuò)展或修改兩邊的處理過程效拭,只要能確保它們遵守同樣的接口約定,可以生產(chǎn)者用Node.js實(shí)現(xiàn)挤渔,消費(fèi)者用python實(shí)現(xiàn)判导。

  • 靈活性和峰值處理能力

    當(dāng)客戶端訪問量突然劇增,對(duì)服務(wù)器的訪問已經(jīng)超過服務(wù)所能處理的最大峰值擂红,甚至導(dǎo)致服務(wù)器超時(shí)負(fù)載崩潰昵骤,使用消息隊(duì)列可以解決這個(gè)問題,可以通過控制消費(fèi)者的處理速度生產(chǎn)者可進(jìn)入消息隊(duì)列的數(shù)量等來避免峰值問題

  • 排序保證

    消息隊(duì)列可以控制數(shù)據(jù)處理的順序伴栓,因?yàn)橄㈥?duì)列本身使用的是隊(duì)列這個(gè)數(shù)據(jù)結(jié)構(gòu)惑淳,FIFO(先進(jìn)選出)歧焦,在一些場(chǎng)景數(shù)據(jù)處理的順序很重要,比如商品下單順序等舰涌。

  • 異步通信

    消息隊(duì)列中的有些消息,并不需要立即處理搁痛,消息隊(duì)列提供了異步處理機(jī)制鸡典,可以把消息放在隊(duì)列中并不立即處理,需要的時(shí)候處理疗垛,或者異步慢慢處理贷腕,一些不重要的發(fā)送短信和郵箱功能可以使用。

  • 可擴(kuò)展性

    前面提到了消息隊(duì)列可以做到解耦涮总,如果我們想增強(qiáng)消息入隊(duì)和出隊(duì)的處理頻率瀑梗,很簡(jiǎn)單谤职,并不需要改變代碼中任何內(nèi)容允蜈,可以直接對(duì)消息隊(duì)列修改一些配置即可,比如我們想限制每次發(fā)送給消費(fèi)者的消息條數(shù)等凤跑。

有優(yōu)勢(shì)定有它現(xiàn)實(shí)的應(yīng)用場(chǎng)景,文章后面會(huì)針對(duì)優(yōu)勢(shì)講它們對(duì)應(yīng)的應(yīng)用場(chǎng)景褐奥。

消息隊(duì)列的類型介紹

介紹幾款目前市場(chǎng)上主流的消息隊(duì)列(課外知識(shí),可忽略)

  • Kafka:是由 Apache 軟件基金會(huì)開發(fā)的一個(gè)開源流處理平臺(tái)呜笑,由 Scala 和 Java 編寫叫胁,是一種高吞吐量的分布式發(fā)布訂閱消息系統(tǒng)驼鹅,支持單機(jī)每秒百萬并發(fā)。另外买乃,Kafka 的定位主要在日志等方面剪验, 因?yàn)镵afka 設(shè)計(jì)的初衷就是處理日志的抖韩,可以看做是一個(gè)日志(消息)系統(tǒng)一個(gè)重要組件茂浮,針對(duì)性很強(qiáng)。0.8 版本開始支持復(fù)制幌羞,不支持事務(wù)属桦,因此對(duì)消息的重復(fù)、丟失诊笤、錯(cuò)誤沒有嚴(yán)格的要求讨跟。
  • RocketMQ:阿里開源的消息中間件茶袒,是一款低延遲弹谁、高可靠、可伸縮植康、易于使用的消息中間件供璧,思路起源于 Kafka冻记。最大的問題商業(yè)版收費(fèi)演顾,有些功能不開放钠至。
  • RabbitMQ:由 Erlang(有著和原生 Socket 一樣低的延遲)語言開發(fā)基于 AMQP 協(xié)議的開源消息隊(duì)列系統(tǒng)。能保證消息的可靠性宪卿、穩(wěn)定性、安全性。高并發(fā)的特性撒遣,毋庸置疑,RabbitMQ 最高廉涕,原因是它的實(shí)現(xiàn)語言是天生具備高并發(fā)高可用的erlang 語言,天生的分布式優(yōu)勢(shì)层释。

說明本文主要以RabbitMQ講解廉白,較為常見。****個(gè)人認(rèn)為這幾種消息隊(duì)列中間件能實(shí)現(xiàn)的功能磅轻,通過 redis 也都能實(shí)現(xiàn),思想痊班。

初識(shí)消息隊(duì)列(消息隊(duì)列在node.js中的簡(jiǎn)單應(yīng)用)

Rabbitmq基本安裝

Mac版安裝

直接通過 HomeBrew 安裝馒胆,執(zhí)行以下命令

brew install rabbitmq

啟動(dòng) rabbitmq

進(jìn)入安裝目錄$ /usr/local/Cellar/rabbitmq/3.7.8啟動(dòng)$ sbin/rabbitmq-server

瀏覽器輸入 http://localhost:15672/#/ 默認(rèn)用戶名密碼 guest

安裝后的基本示意圖

image

可視化界面可模塊功能介紹:


其他系統(tǒng)安裝請(qǐng)自行網(wǎng)上搜索

幾個(gè)端口區(qū)別說明

5672:通信默認(rèn)端口號(hào) 15672:管理控制臺(tái)默認(rèn)端口號(hào) 25672:集群通信端口號(hào) 注意: 阿里云 ECS 服務(wù)器如果出現(xiàn) RabbitMQ 安裝成功器净,外網(wǎng)不能訪問是因?yàn)榘踩M的問題沒有開放端口 解決方案

Rabbitmq安裝后的基本命令

以下列舉一些在終端常用的操作命令

  • whereis rabbitmq:查看 rabbitmq 安裝位置
  • rabbitmqctl start_app:?jiǎn)?dòng)應(yīng)用
  • whereis erlang:查看erlang安裝位置
  • rabbitmqctl start_app:?jiǎn)?dòng)應(yīng)用
  • rabbitmqctl stop_app:關(guān)閉應(yīng)用
  • rabbitmqctl status:節(jié)點(diǎn)狀態(tài)
  • rabbitmqctl add_user username password:添加用戶
  • rabbitmqctl list_users:列出所有用戶
  • rabbitmqctl delete_user username:刪除用戶
  • rabbitmqctl add_vhost vhostpath:創(chuàng)建虛擬主機(jī)
  • rabbitmqctl list_vhosts:列出所有虛擬主機(jī)
  • rabbitmqctl list_queues:查看所有隊(duì)列
  • rabbitmqctl -p vhostpath purge_queue blue:清除隊(duì)列里消息

注意:以上終端所有命令,需要進(jìn)入到rabbitmqctl的sbin目錄下執(zhí)行rabbitmqctl命令才有用,否則會(huì)報(bào)錯(cuò):

image

Node.js實(shí)現(xiàn)一個(gè)簡(jiǎn)單的 HelloWorld 消息隊(duì)列

畫一張基本的圖浪慌,HelloWorld 消息隊(duì)列的圖片冤荆,把下面幾個(gè)概念都畫進(jìn)去。


看這段代碼前先說幾個(gè)概念

  • 生產(chǎn)者 :生產(chǎn)消息的
  • 消費(fèi)者 :接收消息的
  • 通道 channel:建立連接后权纤,會(huì)獲取一個(gè) channel 通道
  • exchange :交換機(jī)钓简,消息需要先發(fā)送到 exchange 交換機(jī),也可以說是第一步存儲(chǔ)消息的地方(交換機(jī)會(huì)有很多類型汹想,后面會(huì)詳細(xì)說)外邓。
  • 消息隊(duì)列 : 到達(dá)消費(fèi)者前一刻存儲(chǔ)消息的地方,exchange 交換機(jī)會(huì)把消息傳遞到此
  • ack回執(zhí):收到消息后確認(rèn)消息已經(jīng)消費(fèi)的應(yīng)答

amqplib模塊

推薦一個(gè) npm 模塊amqplib坐榆。

Github: https://github.com/squaremo/amqp.node

$ npm install amqplib

生產(chǎn)者代碼 product.js

const amqp =require('amqplib');async function  product(params) {     // 1\. 創(chuàng)建鏈接對(duì)象     const connection = await amqp.connect('amqp://localhost:5672');     // 2\. 獲取通道     const channel = await connection.createChannel();     // 3\. 聲明參數(shù)     const routingKey = 'helloKoalaQueue';     const msg = 'hello koala';     for (let i=0; i<10000; i++) {         // 4\. 發(fā)送消息         await channel.publish('', routingKey, Buffer.from(`${msg} 第${i}條消息`));     }// 5\. 關(guān)閉通道     await channel.close();     // 6\. 關(guān)閉連接     await connect.close();}product();

生產(chǎn)者代碼解釋與運(yùn)行結(jié)果

執(zhí)行 node product.js

代碼注釋中已經(jīng)把基本的流程講解了,但是我剛開始看的時(shí)候還有疑問交播,我想很多小伙伴也會(huì)有疑問提针,說明下:

  • 疑問1

    前面提到過交換機(jī)這個(gè)名詞,生產(chǎn)者發(fā)消息的時(shí)候必須要指定一個(gè) exchange翔脱,若不指定 exchange(為空)會(huì)默認(rèn)指向 AMQP default 交換機(jī)潮模,AMQP default 路由規(guī)則是根據(jù) routingKey 和 mq 上有沒有相同名字的隊(duì)列進(jìn)行匹配路由神得。上面這段代碼就是默認(rèn)指定的交換機(jī)节榜。不同類型交換機(jī)詳細(xì)講解請(qǐng)往下看。

  • 疑問2

    生產(chǎn)者發(fā)送消息后碌上,消息是發(fā)送到交換機(jī)exchange冕香,但是這時(shí)候會(huì)創(chuàng)建隊(duì)列嗎?

    答案:代碼中我們聲明的是路由是routingKey饵骨,但是它并沒有創(chuàng)建helloKoalaQueue 消息隊(duì)列制市,消息只會(huì)發(fā)送到交exchange交換機(jī)。 運(yùn)行代碼后看隊(duì)列截圖可以證明這一點(diǎn):

  • 說明1

    生產(chǎn)者發(fā)送消息后嘀略,注意關(guān)閉通道和連接奶段,只要消息發(fā)送成功后,連接就可以關(guān)閉了撑帖,消費(fèi)者用任何語言去獲取消息都可以,這也證明了消息隊(duì)列優(yōu)秀解耦的特性

  • 說明2

    可以多次執(zhí)行node product.js生產(chǎn)者代碼,消息會(huì)堆積到交換機(jī)exchange中禀横,并不會(huì)覆蓋,如果已執(zhí)行過消費(fèi)者并且確認(rèn)了對(duì)應(yīng)的消息隊(duì)列浙芙,消息會(huì)從exchange交換機(jī)發(fā)送到消息隊(duì)列掉蔬,并存入到消息隊(duì)列壕翩,等待消費(fèi)者消費(fèi)

    image

消費(fèi)者代碼 consumer.js

// 構(gòu)建消費(fèi)者const amqp = require('amqplib');async function consumer() {    // 1\. 創(chuàng)建鏈接對(duì)象    const connection = await amqp.connect('amqp://localhost:5672');    // 2\. 獲取通道    const channel = await connection.createChannel();    // 3\. 聲明參數(shù)    const queueName = 'helloKoalaQueue';    // 4\. 聲明隊(duì)列线召,交換機(jī)默認(rèn)為 AMQP default    await channel.assertQueue(queueName);    // 5\. 消費(fèi)    await channel.consume(queueName, msg => {        console.log('Consumer:', msg.content.toString());        channel.ack(msg);    });}consumer();

生產(chǎn)者代碼解釋與運(yùn)行結(jié)果

執(zhí)行 node consumer.js
  • 運(yùn)行后的執(zhí)行結(jié)果

    image
  • 說明1

    這時(shí)候我改變代碼中的隊(duì)列名稱為helloKoalaQueueHaHa,這時(shí)候去看Rabbitmq可視化界面中肺孵,隊(duì)列模塊,創(chuàng)建了這個(gè)隊(duì)列

    image

    看到這里再次證明了消息隊(duì)列優(yōu)秀的解耦特性颜阐,消費(fèi)者和生產(chǎn)者模型之間沒有任何聯(lián)系平窘,再次創(chuàng)建這個(gè)helloKoalaQueueHaHa路由名稱的生產(chǎn)者,消費(fèi)者也會(huì)正常消費(fèi)凳怨,并且會(huì)打印消息瑰艘,大家可以實(shí)際操作試一下。

  • 說明2

    這時(shí)候我改變代碼中的隊(duì)列名稱為helloKoalaQueueHaHa,這時(shí)候去看Rabbitmq可視化界面中肤舞,隊(duì)列模塊紫新,創(chuàng)建了這個(gè)隊(duì)列

    image

    看到這里又再次證明了消息隊(duì)列優(yōu)秀的解耦特性消費(fèi)者和生產(chǎn)者模型之間沒有任何聯(lián)系李剖,再次創(chuàng)建這個(gè)helloKoalaQueueHaHa路由名稱的生產(chǎn)者芒率,消費(fèi)者也會(huì)正常消費(fèi),并且會(huì)打印消息篙顺,大家可以實(shí)際操作試一下偶芍。

如何釋放掉消息隊(duì)列

可視化界面中直接刪除掉消息隊(duì)列

  1. 訪問http://{rabbitmq安裝IP}:15672,登錄德玫。
  2. 點(diǎn)擊queues匪蟀,這里可以看到你創(chuàng)建的所有的Queue,
  3. 選中某一個(gè)Queue宰僧,然后會(huì)進(jìn)入一個(gè)列表界面萄窜,下方有個(gè)Delete按鈕,確認(rèn) Queue刪除隊(duì)列/Purge Message清除消息即可撒桨。

弊端:這樣只能一個(gè)隊(duì)列一個(gè)隊(duì)列的刪除查刻,如果隊(duì)列中的消息過多就會(huì)特別慢。

通過代碼實(shí)現(xiàn)消息隊(duì)列釋放(刪除)

消息隊(duì)列交換機(jī)講解

先記住一句話

生產(chǎn)者發(fā)消息的時(shí)候必須指定一個(gè) exchange凤类,否則消息無法直接到達(dá)消息隊(duì)列穗泵,Exchange將消息路由到一個(gè)或多個(gè)Queue中(或者丟棄)

然后開始本章節(jié)交換機(jī)的講解

若不指定 exchange(為空)會(huì)默認(rèn)指向 AMQP default 交換機(jī),AMQP default 路由規(guī)則是根據(jù) routingKey 和 mq 上有沒有相同名字的隊(duì)列進(jìn)行匹配路由谜疤。

交換機(jī)的種類

常用的四種類型

  • fanout

  • direct

  • topic

  • headers

不管是哪一種類型的交換機(jī)佃延,都有一個(gè)綁定binding的操作现诀,只不過根據(jù)不同的交換機(jī)類型有不同的路由綁定策略。不同類型做的下圖紅色框框中的事履肃。

image

fanout(中文翻譯 廣播)

fanout類型的Exchange路由規(guī)則非常簡(jiǎn)單仔沿,它會(huì)把所有發(fā)送到該Exchange的消息路由到所有與它綁定的Queue中,不需要設(shè)置路由鍵尺棋。

image

上圖中封锉,上圖中,生產(chǎn)者(Producter)發(fā)送到Exchange(X)的所有消息都會(huì)路由到圖中的兩個(gè)Queue膘螟,并最終被兩個(gè)消費(fèi)者(consumer1與consumer2)消費(fèi)成福。

說明:所有消息都會(huì)路由到兩個(gè)Queue中,是兩個(gè)消費(fèi)者都可以收到全部的完全相同的消息嗎? 答案是的荆残,兩個(gè)消費(fèi)者收到的隊(duì)列消息正常應(yīng)該是完全相同的奴艾。這種類型常用于廣播類型的需求,或者也可以消費(fèi)者1記錄日志 内斯,消費(fèi)者2打印日志

對(duì)應(yīng)代碼實(shí)現(xiàn)

生產(chǎn)者:

const amqp = require('amqplib');async function producer() {    // 創(chuàng)建鏈接對(duì)象    const connection = await amqp.connect('amqp://localhost:5672');    // 獲取通道    const channel = await connection.createChannel();    // 聲明參數(shù)    const exchangeName = 'fanout_koala_exchange';    const routingKey = '';    const msg = 'hello koala';    // 交換機(jī)    await channel.assertExchange(exchangeName, 'fanout', {        durable: true,    });    // 發(fā)送消息    await channel.publish(exchangeName, routingKey, Buffer.from(msg));    // 關(guān)閉鏈接    await channel.close();    await connection.close();}producer();

消費(fèi)者:

const amqp = require('amqplib');async function consumer() {    // 創(chuàng)建鏈接對(duì)象    const connection = await amqp.connect('amqp://localhost:5672');    // 獲取通道    const channel = await connection.createChannel();    // 聲明參數(shù)    const exchangeName = 'fanout_koala_exchange';    const queueName = 'fanout_kaola_queue';    const routingKey = '';    // 聲明一個(gè)交換機(jī)    await channel.assertExchange(exchangeName, 'fanout', { durable: true });    // 聲明一個(gè)隊(duì)列    await channel.assertQueue(queueName);    // 綁定關(guān)系(隊(duì)列蕴潦、交換機(jī)、路由鍵)    await channel.bindQueue(queueName, exchangeName, routingKey);    // 消費(fèi)    await channel.consume(queueName, msg => {        console.log('Consumer:', msg.content.toString());        channel.ack(msg);    });    console.log('消費(fèi)端啟動(dòng)成功俘闯!');}consumer();

注意:其他類型代碼已經(jīng)放到 github品擎,地址:https://github.com/koala-coding/simple_rabbitmq 歡迎 star 交流。

direct

direct 把消息路由到那些 binding key與 routing key 完全匹配的 Queue中备徐。

image

以上圖的配置為例萄传,我們以 routingKey=”error” 發(fā)送消息到Exchange,則消息會(huì)路由到 amq1 和 amq2蜜猾;如果我們以 routingKey=”info” 或 routingKey=”warning” 來發(fā)送消息秀菱,則消息只會(huì)路由到 Queue2。如果我們以其他 routingKey 發(fā)送消息蹭睡,則消息不會(huì)路由到這兩個(gè) Queue 中衍菱。

topic

生產(chǎn)者指定 RoutingKey 消息根據(jù)消費(fèi)端指定的隊(duì)列通過模糊匹配的方式進(jìn)行相應(yīng)轉(zhuǎn)發(fā),兩種通配符模式: #:可匹配一個(gè)或多個(gè)關(guān)鍵字 *:只能匹配一個(gè)關(guān)鍵字

image

headers

header exchange(頭交換機(jī))和主題交換機(jī)有點(diǎn)相似肩豁,但是不同于主題交換機(jī)的路由是基于路由鍵脊串,頭交換機(jī)的路由值基于消息的 header 數(shù)據(jù)。 主題交換機(jī)路由鍵只有是字符串,而頭交換機(jī)可以是整型和哈希值 header Exchange 類型用的比較少清钥,可以自行 google 了解琼锋。

消息隊(duì)列的思考與深入探索

消息隊(duì)列實(shí)現(xiàn)rpc

(本小段內(nèi)容來源網(wǎng)上,參考文章說明)

image

RPC 遠(yuǎn)程調(diào)用服務(wù)端的方法祟昭,使用 MQ 可以實(shí)現(xiàn) RPC 的異步調(diào)用缕坎,基于 Direct 交換機(jī)實(shí)現(xiàn)

  1. 客戶端即是生產(chǎn)者又是消費(fèi)者,向 RPC 請(qǐng)求隊(duì)列發(fā)送 RPC 調(diào)用消息篡悟,同時(shí)監(jiān)聽 RPC 響應(yīng)隊(duì)列
  2. 服務(wù)端監(jiān)聽RPC請(qǐng)求隊(duì)列谜叹,收到消息后執(zhí)行服務(wù)端的方法
  3. 服務(wù)端將方法執(zhí)行后的結(jié)果發(fā)送到RPC響應(yīng)隊(duì)列

(注意匾寝,這里只是提一下 RPC 這個(gè)知識(shí),因?yàn)閱螁我粋€(gè)RPC一篇文章都不一定說說完荷腊,有興趣的可以用隊(duì)列嘗試一下RPC)

是否有消息持久化的必要艳悔?

消息隊(duì)列是存在內(nèi)存中的,如果出現(xiàn)問題掛掉,消息隊(duì)列中的消息會(huì)丟失女仰。所以對(duì)于一些需求非常有持久化的必要猜年!RabbitMQ 可以開啟持久化。不同開發(fā)語言都可以設(shè)置持久化參數(shù)董栽。

這里以Node.js為例子,其他語言可以自行搜索

    await channel.assertExchange(exchangeName, 'direct', { durable: true });    // 注意其中的{ durable: true }企孩,這事對(duì)交換機(jī)持久化锭碳,還有其他的幾種持久化方式

同時(shí)推薦一篇不錯(cuò)的寫持久化的文章: https://juejin.im/post/5d6f6b0ae51d45621512add0

消費(fèi)者完成后是否有消息應(yīng)答的必要?

消息應(yīng)答簡(jiǎn)單的解釋就是消費(fèi)者完成了消費(fèi)后勿璃,通知一下消息隊(duì)列擒抛。

我覺得這個(gè)配置是有必要打開的,消費(fèi)者完成消息隊(duì)列中的任務(wù)补疑,消費(fèi)者可能中途失敗或者掛掉歧沪,一旦 RabbitMQ 發(fā)送一個(gè)消息給消費(fèi)者然后便迅速將該消息從消息隊(duì)列內(nèi)存中移除,這種情況下,消費(fèi)者對(duì)應(yīng)工作進(jìn)程失敗或者掛掉后莲组,那該進(jìn)程正在處理的消息也將丟失诊胞。而且,也將丟失所有發(fā)送給該進(jìn)程的未被處理的消息锹杈。

為了確保消息永不丟失撵孤,RabbitMQ 支持消息應(yīng)答機(jī)制。當(dāng)消息被接受竭望,處理之后一條應(yīng)答便會(huì)從消費(fèi)者回傳至發(fā)送方邪码,然后RabbitMQ將其刪除。

如果某個(gè)消費(fèi)者掛掉(信道咬清、鏈接關(guān)閉或者 tcp 鏈接丟失)且沒有發(fā)送 ack 應(yīng)答闭专,RabbitMQ 會(huì)認(rèn)為該消息沒有被處理完全然后會(huì)將其重新放置到隊(duì)列中。通過這種方式你就可以確保消息永不丟失旧烧,甚至某個(gè)工作進(jìn)程偶然掛掉的情況影钉。

默認(rèn)情況下消息應(yīng)答是關(guān)閉的。是時(shí)候使用 false(auto-ack配置項(xiàng))參數(shù)將其開啟了

這里以 Node.js 為例子掘剪,其他語言可以自行搜索

// 消費(fèi)者消費(fèi)時(shí)候的代碼await channel.consume(queueName, msg => {    console.log('koala:', msg.content.toString());    //... 這里可以放業(yè)務(wù)邏輯處理的代碼斧拍,消費(fèi)者完成后發(fā)送回執(zhí)應(yīng)答    channel.ack(msg);// 消息應(yīng)答}, { noAck: false });

如何實(shí)現(xiàn)公平調(diào)度?

可以將prefetch count項(xiàng)的值配置為1杖小,這將會(huì)指示 RabbitMQ 在同一時(shí)間不要發(fā)送超過一條消息給每個(gè)消費(fèi)者肆汹。換句話說愚墓,直到消息被處理和應(yīng)答之前都不會(huì)發(fā)送給該消費(fèi)者任何消息。取而代之的是昂勉,它將會(huì)發(fā)送消息至下一個(gè)比較閑的消費(fèi)者或工作進(jìn)程浪册。

這里以 Node.js 為例子,amqplib 庫(kù)對(duì)于限流實(shí)現(xiàn)提供的接口方法 prefetch岗照。

prefetch 參數(shù)說明

  • count:每次推送給消費(fèi)端 N 條消息數(shù)目村象,如果這 N 條消息沒有被ack,生產(chǎn)端將不會(huì)再次推送直到這 N 條消息被消費(fèi)攒至。
  • global:在哪個(gè)級(jí)別上做限制厚者,ture 為 channel 上做限制,false 為消費(fèi)端上做限制迫吐,默認(rèn)為 false库菲。
// 創(chuàng)建消費(fèi)者的時(shí)候 限流參數(shù)設(shè)置await channel.prefetch(1, false);

如何實(shí)現(xiàn)一個(gè)交換機(jī)給多個(gè)消費(fèi)者依次發(fā)送消息,選擇那種交換機(jī)志膀?

如果一個(gè)生產(chǎn)者熙宇,兩個(gè)消費(fèi)者,發(fā)放消息溉浙,我想要的隊(duì)列先給消費(fèi)者1發(fā)烫止,發(fā)完消費(fèi)者1發(fā)消費(fèi)者2,這樣有順序的交互發(fā)送戳稽,應(yīng)該現(xiàn)在哪一種交換機(jī)呢馆蠕?注意是交互,看完之后想一下惊奇?還有消費(fèi)者完成后有沒有手動(dòng)回調(diào)消息隊(duì)列完成的必要荆几?消息持久化有必要沒,持久化有什么好處赊时?

(看完消息隊(duì)列的消息傳遞吨铸,你會(huì)有疑問管道中的消息(生產(chǎn)者)是怎么被消費(fèi)者消費(fèi)的 放入隊(duì)列,然后從隊(duì)列被取出)

消息隊(duì)列應(yīng)用場(chǎng)景

  1. 雙十一商品秒殺/搶票功能實(shí)現(xiàn)

    我們?cè)陔p11的時(shí)候祖秒,當(dāng)我們凌晨大量的秒殺和搶購(gòu)商品诞吱,然后去結(jié)算的時(shí)候,就會(huì)發(fā)現(xiàn)竭缝,界面會(huì)提醒我們房维,讓我們稍等,以及一些友好的圖片文字提醒抬纸。而不是像前幾年的時(shí)代咙俩,動(dòng)不動(dòng)就頁(yè)面卡死,報(bào)錯(cuò)等來呈現(xiàn)給用戶。

    用一張圖來解釋消息隊(duì)列在秒殺搶票等場(chǎng)景的使用:(說明:往下看之前阿趁,如果你做過電商類秒殺膜蛔,可以想想你是怎么實(shí)現(xiàn)的,我們可以一起討論哦脖阵。這里只是想說下消息隊(duì)列的作用皂股,并不是最終優(yōu)化的結(jié)果,比如用redis控制總緩存等)

    image

    這里在生成訂單時(shí)候命黔,不需要直接操作數(shù)據(jù)庫(kù) IO 呜呐,預(yù)扣庫(kù)存。先扣除了庫(kù)存悍募,保證不超賣蘑辑,然后異步生成用戶訂單,這里用到一次即時(shí)消費(fèi)隊(duì)列坠宴,這樣響應(yīng)給用戶的速度就會(huì)快很多洋魂;而且還要保證不少賣,用戶拿到了訂單啄踊,不支付怎么辦忧设?我們都知道現(xiàn)在訂單都有有效期刁标,再使用一個(gè)消息隊(duì)列颠通,用于判斷訂單支付超時(shí),比如說用戶五分鐘內(nèi)不支付膀懈,訂單就失效了顿锰,訂單一旦失效,就會(huì)加入新的庫(kù)存启搂。這也是現(xiàn)在很多網(wǎng)上零售企業(yè)保證商品不少賣采用的方案硼控。訂單量比較少的情況下,生成訂單非掣於模快牢撼,用戶幾乎不用排隊(duì)。

  2. 積分兌換(積分可用于多平臺(tái))

    積分兌換模塊疑苫,有一個(gè)公司多個(gè)部門都要用到這個(gè)模塊熏版,這時(shí)候就可以通過消息隊(duì)列解耦這個(gè)特性來實(shí)現(xiàn)。 各部門系統(tǒng)做各部門的事捍掺,但是他們都可以用這個(gè)積分系統(tǒng)進(jìn)行商品的兌換等撼短。其他模塊與積分模塊完全解耦。

  3. 發(fā)送郵件琅拌,用戶大數(shù)據(jù)分析等 同步變異步功能實(shí)現(xiàn)

    這個(gè)功能要說的比較多晨继,從一個(gè)平臺(tái)的用戶注冊(cè)開始睦刃。

    正常情況注冊(cè)棚放,不出現(xiàn)高并發(fā)

    對(duì)于用戶來說禾嫉,他就是想注冊(cè)用一下這個(gè)軟件灾杰,只要服務(wù)端將他的賬戶信息存到數(shù)據(jù)庫(kù)中他便可以登錄上去做他想做的事情了。用戶并不care這些事夭织,服務(wù)端就可以把其他的操作放入對(duì)應(yīng)的消息隊(duì)列中然后馬上返回用戶結(jié)果吭露,由消息隊(duì)列異步的進(jìn)行這些操作。

    假如有大量的用戶注冊(cè)尊惰,發(fā)生了高并發(fā)

    郵件接口承受不住讲竿,或是分析信息時(shí)的大量計(jì)算使 cpu 滿載,這將會(huì)出現(xiàn)雖然用戶數(shù)據(jù)記錄很快的添加到數(shù)據(jù)庫(kù)中了弄屡,但是卻卡在發(fā)郵件或分析信息時(shí)的情況题禀,導(dǎo)致請(qǐng)求的響應(yīng)時(shí)間大幅增長(zhǎng),甚至出現(xiàn)超時(shí)膀捷,這就有點(diǎn)不劃算了迈嘹。面對(duì)這種情況一般也是將這些操作放入消息隊(duì)列(生產(chǎn)者消費(fèi)者模型),消息隊(duì)列慢慢的進(jìn)行處理全庸,同時(shí)可以很快的完成注冊(cè)請(qǐng)求秀仲,不會(huì)影響用戶使用其他功能。

  4. 用戶注冊(cè)

  5. 用戶注冊(cè)選擇幾個(gè)興趣標(biāo)簽壶笼,這時(shí)候需要根據(jù)用戶的屬性神僵,用戶分析,計(jì)算出推薦內(nèi)容

  6. 注冊(cè)后可能需要發(fā)送郵件給用戶

  7. 發(fā)送給用戶一個(gè)包含操作指南的系統(tǒng)通知

  8. 等等

  9. 基于RabbitMQ的Node.js與Python或其他語言實(shí)現(xiàn)通信

    這里也是利用了 RabbitMQ 的解耦特性覆劈,不僅僅可以與 Python保礼,還可以與其他很多語言通信,就不具體說了责语。

總結(jié)

親炮障,別只看,你試試呀坤候!直接開啟服務(wù)胁赢,裝個(gè) RabbitMQ,挺有意思的白筹,就算一個(gè) HelloWorld 也能嘗試出很多內(nèi)容智末。而且本文說的很多內(nèi)容都可以用 redis 來實(shí)現(xiàn),也可以去看下我的 redis 文章遍蟋。順便說一句設(shè)計(jì)模式和數(shù)據(jù)結(jié)構(gòu)是兩個(gè)好東西吹害,越來越能感覺到。文章代碼地址:https://github.com/koala-coding/simple_rabbitmq

參考文章

https://www.cnblogs.com/baidawei/p/9172433.html

https://www.sojson.com/blog/48.html

http://www.imooc.com/article/293742

https://www.zhihu.com/question/34243607/answer/58314162

https://bbs.csdn.net/topics/392169691?page=1

https://mp.weixin.qq.com/s/wTkwJXlNr5CaI7uRntJ42A

本文轉(zhuǎn)自:https://mp.weixin.qq.com/s/l55gH6F8eur1tGnJVG-vIg

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末虚青,一起剝皮案震驚了整個(gè)濱河市它呀,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌,老刑警劉巖纵穿,帶你破解...
    沈念sama閱讀 221,695評(píng)論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件下隧,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡谓媒,警方通過查閱死者的電腦和手機(jī)淆院,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,569評(píng)論 3 399
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來句惯,“玉大人土辩,你說我怎么就攤上這事∏酪埃” “怎么了拷淘?”我有些...
    開封第一講書人閱讀 168,130評(píng)論 0 360
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)指孤。 經(jīng)常有香客問我启涯,道長(zhǎng),這世上最難降的妖魔是什么恃轩? 我笑而不...
    開封第一講書人閱讀 59,648評(píng)論 1 297
  • 正文 為了忘掉前任结洼,我火速辦了婚禮,結(jié)果婚禮上叉跛,老公的妹妹穿的比我還像新娘松忍。我一直安慰自己,他們只是感情好昧互,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,655評(píng)論 6 397
  • 文/花漫 我一把揭開白布挽铁。 她就那樣靜靜地躺著伟桅,像睡著了一般敞掘。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上楣铁,一...
    開封第一講書人閱讀 52,268評(píng)論 1 309
  • 那天玖雁,我揣著相機(jī)與錄音,去河邊找鬼盖腕。 笑死赫冬,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的溃列。 我是一名探鬼主播劲厌,決...
    沈念sama閱讀 40,835評(píng)論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼听隐!你這毒婦竟也來了补鼻?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,740評(píng)論 0 276
  • 序言:老撾萬榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎风范,沒想到半個(gè)月后咨跌,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 46,286評(píng)論 1 318
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡硼婿,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,375評(píng)論 3 340
  • 正文 我和宋清朗相戀三年锌半,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片寇漫。...
    茶點(diǎn)故事閱讀 40,505評(píng)論 1 352
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡刊殉,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出州胳,到底是詐尸還是另有隱情冗澈,我是刑警寧澤,帶...
    沈念sama閱讀 36,185評(píng)論 5 350
  • 正文 年R本政府宣布陋葡,位于F島的核電站亚亲,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏腐缤。R本人自食惡果不足惜捌归,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,873評(píng)論 3 333
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望岭粤。 院中可真熱鬧惜索,春花似錦、人聲如沸剃浇。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,357評(píng)論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)虎囚。三九已至角塑,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間淘讥,已是汗流浹背圃伶。 一陣腳步聲響...
    開封第一講書人閱讀 33,466評(píng)論 1 272
  • 我被黑心中介騙來泰國(guó)打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留蒲列,地道東北人窒朋。 一個(gè)月前我還...
    沈念sama閱讀 48,921評(píng)論 3 376
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像蝗岖,于是被迫代替她去往敵國(guó)和親侥猩。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,515評(píng)論 2 359