為什么寫這篇文章
- 現(xiàn)在的面試要求越來越高了,打開看了看幾個(gè) BOSS 招聘 Node.js 全棧開發(fā)的厕隧,其中都有一條“了解 消息隊(duì)列奔脐,并在項(xiàng)目中應(yīng)用過”,嗚嗚嗚
- 后端開發(fā)者應(yīng)該都知道消息隊(duì)列吁讨,但是一些前端開發(fā)者可能知道的并不多髓迎,但是你們可能好奇
搶票,商品秒殺
等功能是如何實(shí)現(xiàn)的,其實(shí)沒有多么高大上建丧,看了消息隊(duì)列就知道了排龄。
文章導(dǎo)圖(你能學(xué)到)
什么是消息隊(duì)列
“消息隊(duì)列”是在消息的傳輸過程中保存消息的容器。
個(gè)人理解:我把它分成兩個(gè)詞消息
和隊(duì)列
翎朱。當(dāng)一大批客戶端同時(shí)產(chǎn)生大量的網(wǎng)絡(luò)請(qǐng)求(消息
)時(shí)候橄维,服務(wù)器的承受能力肯定是有一個(gè)限制的。這時(shí)候要是有個(gè)容器拴曲,先讓這些消息排隊(duì)就好了争舞,還好有個(gè)叫隊(duì)列
的數(shù)據(jù)結(jié)構(gòu),通過有隊(duì)列屬性的容器
排隊(duì)(先進(jìn)先出)澈灼,把消息再傳到我們的服務(wù)器竞川,壓力減小了好多店溢,這個(gè)很棒的容器
就是消息隊(duì)列
這段理解中還包含這個(gè)兩個(gè)概念: 客戶端->生產(chǎn)者
服務(wù)器->消費(fèi)者
當(dāng)有消息隊(duì)列
出現(xiàn),生產(chǎn)者
和消費(fèi)者
是必不可少的兩個(gè)概念委乌,上面的理解是多個(gè)生產(chǎn)者
對(duì)應(yīng)一個(gè)消費(fèi)者
床牧,當(dāng)然現(xiàn)實(shí)開發(fā)中還有許多消費(fèi)者
的情況哦。接下來的文章也會(huì)多次提到生產(chǎn)-消費(fèi)模型
遭贸。
消息隊(duì)列優(yōu)勢(shì)
-
應(yīng)用解耦
消息隊(duì)列可以使消費(fèi)者和生產(chǎn)者直接互不干涉叠赦,互不影響,只需要把消息發(fā)送到隊(duì)列即可算利,而且可獨(dú)立的擴(kuò)展或修改兩邊的處理過程效拭,只要能確保它們遵守同樣的接口約定,可以生產(chǎn)者用Node.js實(shí)現(xiàn)挤渔,消費(fèi)者用python實(shí)現(xiàn)判导。
-
靈活性和峰值處理能力
當(dāng)客戶端訪問量突然劇增,對(duì)服務(wù)器的訪問已經(jīng)超過服務(wù)所能處理的最大峰值擂红,甚至導(dǎo)致服務(wù)器超時(shí)負(fù)載崩潰昵骤,使用消息隊(duì)列可以解決這個(gè)問題,可以通過
控制消費(fèi)者的處理速度
和生產(chǎn)者可進(jìn)入消息隊(duì)列的數(shù)量
等來避免峰值問題 -
排序保證
消息隊(duì)列可以控制數(shù)據(jù)處理的順序伴栓,因?yàn)橄㈥?duì)列本身使用的是隊(duì)列這個(gè)數(shù)據(jù)結(jié)構(gòu)惑淳,
FIFO
(先進(jìn)選出)歧焦,在一些場(chǎng)景數(shù)據(jù)處理的順序很重要,比如商品下單順序等舰涌。 -
異步通信
消息隊(duì)列中的有些消息,并不需要立即處理搁痛,消息隊(duì)列提供了異步處理機(jī)制鸡典,可以把消息放在隊(duì)列中并不立即處理,需要的時(shí)候處理疗垛,或者異步慢慢處理贷腕,一些不重要的發(fā)送短信和郵箱功能可以使用。
-
可擴(kuò)展性
前面提到了消息隊(duì)列可以做到
解耦
涮总,如果我們想增強(qiáng)消息入隊(duì)和出隊(duì)的處理頻率瀑梗,很簡(jiǎn)單谤职,并不需要改變代碼中任何內(nèi)容允蜈,可以直接對(duì)消息隊(duì)列修改一些配置即可,比如我們想限制每次發(fā)送給消費(fèi)者的消息條數(shù)等凤跑。
有優(yōu)勢(shì)定有它現(xiàn)實(shí)的應(yīng)用場(chǎng)景,文章后面會(huì)針對(duì)優(yōu)勢(shì)講它們對(duì)應(yīng)的應(yīng)用場(chǎng)景褐奥。
消息隊(duì)列的類型介紹
介紹幾款目前市場(chǎng)上主流的消息隊(duì)列(課外知識(shí),可忽略)
-
Kafka:是由 Apache 軟件基金會(huì)開發(fā)的一個(gè)開源流處理平臺(tái)呜笑,由 Scala 和 Java 編寫叫胁,是一種高吞吐量的分布式發(fā)布訂閱消息系統(tǒng)驼鹅,支持單機(jī)每秒百萬并發(fā)。另外买乃,Kafka 的定位主要在日志等方面剪验, 因?yàn)镵afka 設(shè)計(jì)的初衷就是
處理日志
的抖韩,可以看做是一個(gè)日志(消息)系統(tǒng)
一個(gè)重要組件茂浮,針對(duì)性很強(qiáng)。0.8 版本開始支持復(fù)制幌羞,不支持事務(wù)属桦,因此對(duì)消息的重復(fù)、丟失诊笤、錯(cuò)誤沒有嚴(yán)格的要求讨跟。 - RocketMQ:阿里開源的消息中間件茶袒,是一款低延遲弹谁、高可靠、可伸縮植康、易于使用的消息中間件供璧,思路起源于 Kafka冻记。最大的問題商業(yè)版收費(fèi)演顾,有些功能不開放钠至。
-
RabbitMQ:由 Erlang(有著和原生 Socket 一樣低的延遲)語言開發(fā)基于 AMQP 協(xié)議的開源消息隊(duì)列系統(tǒng)。能保證消息的可靠性宪卿、穩(wěn)定性、安全性。
高并發(fā)
的特性撒遣,毋庸置疑,RabbitMQ 最高廉涕,原因是它的實(shí)現(xiàn)語言是天生具備高并發(fā)高可用的erlang 語言,天生的分布式
優(yōu)勢(shì)层释。
說明:本文主要以RabbitMQ講解廉白,較為常見。****個(gè)人認(rèn)為這幾種消息隊(duì)列中間件能實(shí)現(xiàn)的功能磅轻,通過 redis 也都能實(shí)現(xiàn),思想痊班。
初識(shí)消息隊(duì)列(消息隊(duì)列在node.js中的簡(jiǎn)單應(yīng)用)
Rabbitmq基本安裝
Mac版安裝
直接通過 HomeBrew 安裝馒胆,執(zhí)行以下命令
brew install rabbitmq
啟動(dòng) rabbitmq
進(jìn)入安裝目錄$ /usr/local/Cellar/rabbitmq/3.7.8啟動(dòng)$ sbin/rabbitmq-server
瀏覽器輸入 http://localhost:15672/#/ 默認(rèn)用戶名密碼 guest
安裝后的基本示意圖
可視化界面可模塊功能介紹:
其他系統(tǒng)安裝請(qǐng)自行網(wǎng)上搜索
幾個(gè)端口區(qū)別說明
5672:通信默認(rèn)端口號(hào) 15672:管理控制臺(tái)默認(rèn)端口號(hào) 25672:集群通信端口號(hào) 注意: 阿里云 ECS 服務(wù)器如果出現(xiàn) RabbitMQ 安裝成功器净,外網(wǎng)不能訪問是因?yàn)榘踩M的問題沒有開放端口 解決方案
Rabbitmq安裝后的基本命令
以下列舉一些在終端常用的操作命令
- whereis rabbitmq:查看 rabbitmq 安裝位置
- rabbitmqctl start_app:?jiǎn)?dòng)應(yīng)用
- whereis erlang:查看erlang安裝位置
- rabbitmqctl start_app:?jiǎn)?dòng)應(yīng)用
- rabbitmqctl stop_app:關(guān)閉應(yīng)用
- rabbitmqctl status:節(jié)點(diǎn)狀態(tài)
- rabbitmqctl add_user username password:添加用戶
- rabbitmqctl list_users:列出所有用戶
- rabbitmqctl delete_user username:刪除用戶
- rabbitmqctl add_vhost vhostpath:創(chuàng)建虛擬主機(jī)
- rabbitmqctl list_vhosts:列出所有虛擬主機(jī)
- rabbitmqctl list_queues:查看所有隊(duì)列
- rabbitmqctl -p vhostpath purge_queue blue:清除隊(duì)列里消息
注意:以上終端所有命令,需要進(jìn)入到rabbitmqctl的sbin目錄下執(zhí)行rabbitmqctl命令才有用,否則會(huì)報(bào)錯(cuò):
image
Node.js實(shí)現(xiàn)一個(gè)簡(jiǎn)單的 HelloWorld 消息隊(duì)列
畫一張基本的圖浪慌,HelloWorld 消息隊(duì)列的圖片冤荆,把下面幾個(gè)概念都畫進(jìn)去。
看這段代碼前先說幾個(gè)概念
- 生產(chǎn)者 :生產(chǎn)消息的
- 消費(fèi)者 :接收消息的
- 通道 channel:建立連接后权纤,會(huì)獲取一個(gè) channel 通道
- exchange :交換機(jī)钓简,消息需要先發(fā)送到 exchange 交換機(jī),也可以說是第一步存儲(chǔ)消息的地方(交換機(jī)會(huì)有很多類型汹想,后面會(huì)詳細(xì)說)外邓。
- 消息隊(duì)列 : 到達(dá)消費(fèi)者前一刻存儲(chǔ)消息的地方,exchange 交換機(jī)會(huì)把消息傳遞到此
- ack回執(zhí):收到消息后確認(rèn)消息已經(jīng)消費(fèi)的應(yīng)答
amqplib模塊
推薦一個(gè) npm 模塊amqplib
坐榆。
Github: https://github.com/squaremo/amqp.node
$ npm install amqplib
生產(chǎn)者代碼 product.js
const amqp =require('amqplib');async function product(params) { // 1\. 創(chuàng)建鏈接對(duì)象 const connection = await amqp.connect('amqp://localhost:5672'); // 2\. 獲取通道 const channel = await connection.createChannel(); // 3\. 聲明參數(shù) const routingKey = 'helloKoalaQueue'; const msg = 'hello koala'; for (let i=0; i<10000; i++) { // 4\. 發(fā)送消息 await channel.publish('', routingKey, Buffer.from(`${msg} 第${i}條消息`)); }// 5\. 關(guān)閉通道 await channel.close(); // 6\. 關(guān)閉連接 await connect.close();}product();
生產(chǎn)者代碼解釋與運(yùn)行結(jié)果
執(zhí)行 node product.js
代碼注釋中已經(jīng)把基本的流程講解了,但是我剛開始看的時(shí)候還有疑問交播,我想很多小伙伴也會(huì)有疑問提针,說明下:
-
疑問1
前面提到過交換機(jī)這個(gè)名詞,
生產(chǎn)者
發(fā)消息的時(shí)候必須要指定一個(gè) exchange翔脱,若不指定 exchange(為空)會(huì)默認(rèn)指向 AMQP default 交換機(jī)潮模,AMQP default 路由規(guī)則是根據(jù) routingKey 和 mq 上有沒有相同名字的隊(duì)列進(jìn)行匹配路由神得。上面這段代碼就是默認(rèn)指定的交換機(jī)节榜。不同類型交換機(jī)詳細(xì)講解請(qǐng)往下看。 -
疑問2
生產(chǎn)者發(fā)送消息后碌上,消息是發(fā)送到交換機(jī)exchange冕香,但是這時(shí)候會(huì)創(chuàng)建隊(duì)列嗎?
答案:代碼中我們聲明的是路由是routingKey饵骨,但是它并沒有創(chuàng)建helloKoalaQueue 消息隊(duì)列制市,消息只會(huì)發(fā)送到交exchange交換機(jī)。 運(yùn)行代碼后看隊(duì)列截圖可以證明這一點(diǎn):
-
說明1
生產(chǎn)者發(fā)送消息后嘀略,注意關(guān)閉通道和連接奶段,只要消息發(fā)送成功后,連接就可以關(guān)閉了撑帖,消費(fèi)者用任何語言去獲取消息都可以,這也證明了消息隊(duì)列優(yōu)秀
解耦
的特性 -
說明2
可以多次執(zhí)行
node product.js
生產(chǎn)者代碼,消息會(huì)堆積到交換機(jī)exchange
中禀横,并不會(huì)覆蓋,如果已執(zhí)行過消費(fèi)者并且確認(rèn)了對(duì)應(yīng)的消息隊(duì)列
浙芙,消息會(huì)從exchange交換機(jī)
發(fā)送到消息隊(duì)列
掉蔬,并存入到消息隊(duì)列
壕翩,等待消費(fèi)者
消費(fèi)image
消費(fèi)者代碼 consumer.js
// 構(gòu)建消費(fèi)者const amqp = require('amqplib');async function consumer() { // 1\. 創(chuàng)建鏈接對(duì)象 const connection = await amqp.connect('amqp://localhost:5672'); // 2\. 獲取通道 const channel = await connection.createChannel(); // 3\. 聲明參數(shù) const queueName = 'helloKoalaQueue'; // 4\. 聲明隊(duì)列线召,交換機(jī)默認(rèn)為 AMQP default await channel.assertQueue(queueName); // 5\. 消費(fèi) await channel.consume(queueName, msg => { console.log('Consumer:', msg.content.toString()); channel.ack(msg); });}consumer();
生產(chǎn)者代碼解釋與運(yùn)行結(jié)果
執(zhí)行 node consumer.js
-
運(yùn)行后的執(zhí)行結(jié)果
image -
說明1
這時(shí)候我改變代碼中的隊(duì)列名稱為
helloKoalaQueueHaHa
,這時(shí)候去看Rabbitmq可視化界面中肺孵,隊(duì)列模塊,創(chuàng)建了這個(gè)隊(duì)列image看到這里再次證明了消息隊(duì)列優(yōu)秀的
解耦特性
颜阐,消費(fèi)者和生產(chǎn)者模型
之間沒有任何聯(lián)系平窘,再次創(chuàng)建這個(gè)helloKoalaQueueHaHa
路由名稱的生產(chǎn)者,消費(fèi)者
也會(huì)正常消費(fèi)凳怨,并且會(huì)打印消息瑰艘,大家可以實(shí)際操作試一下。 -
說明2
這時(shí)候我改變代碼中的隊(duì)列名稱為
helloKoalaQueueHaHa
,這時(shí)候去看Rabbitmq可視化界面中肤舞,隊(duì)列模塊紫新,創(chuàng)建了這個(gè)隊(duì)列image看到這里又再次證明了消息隊(duì)列優(yōu)秀的
解耦特性
,消費(fèi)者和生產(chǎn)者模型
之間沒有任何聯(lián)系李剖,再次創(chuàng)建這個(gè)helloKoalaQueueHaHa
路由名稱的生產(chǎn)者芒率,消費(fèi)者
也會(huì)正常消費(fèi),并且會(huì)打印消息篙顺,大家可以實(shí)際操作試一下偶芍。
如何釋放掉消息隊(duì)列
可視化界面中直接刪除掉消息隊(duì)列
- 訪問http://{rabbitmq安裝IP}:15672,登錄德玫。
- 點(diǎn)擊queues匪蟀,這里可以看到你創(chuàng)建的所有的Queue,
- 選中某一個(gè)Queue宰僧,然后會(huì)進(jìn)入一個(gè)列表界面萄窜,下方有個(gè)Delete按鈕,確認(rèn) Queue刪除隊(duì)列/Purge Message清除消息即可撒桨。
弊端:這樣只能一個(gè)隊(duì)列一個(gè)隊(duì)列的刪除查刻,如果隊(duì)列中的消息過多就會(huì)特別慢。
通過代碼實(shí)現(xiàn)消息隊(duì)列釋放(刪除)
消息隊(duì)列交換機(jī)講解
先記住一句話
生產(chǎn)者發(fā)消息的時(shí)候必須指定一個(gè) exchange凤类,否則消息無法直接到達(dá)消息隊(duì)列穗泵,Exchange將消息路由到一個(gè)或多個(gè)Queue中(或者丟棄)
然后開始本章節(jié)交換機(jī)的講解
若不指定 exchange(為空)會(huì)默認(rèn)指向 AMQP default 交換機(jī),AMQP default 路由規(guī)則是根據(jù) routingKey 和 mq 上有沒有相同名字的隊(duì)列進(jìn)行匹配路由谜疤。
交換機(jī)的種類
常用的四種類型
fanout
direct
topic
headers
不管是哪一種類型的交換機(jī)佃延,都有一個(gè)綁定binding的操作现诀,只不過根據(jù)不同的交換機(jī)類型有不同的路由綁定策略。不同類型做的下圖紅色框框中的事履肃。
fanout(中文翻譯 廣播)
fanout類型的Exchange路由規(guī)則非常簡(jiǎn)單仔沿,它會(huì)把所有發(fā)送到該Exchange的消息路由到所有與它綁定的Queue中,不需要設(shè)置路由鍵尺棋。
上圖中封锉,上圖中,生產(chǎn)者(Producter)發(fā)送到Exchange(X)的所有消息都會(huì)路由到圖中的兩個(gè)Queue膘螟,并最終被兩個(gè)消費(fèi)者(consumer1與consumer2)消費(fèi)成福。
說明:所有消息都會(huì)路由到兩個(gè)Queue中,是兩個(gè)消費(fèi)者都可以收到全部的完全相同的消息嗎? 答案是的荆残,兩個(gè)消費(fèi)者收到的隊(duì)列消息正常應(yīng)該是完全相同的奴艾。這種類型常用于廣播類型的需求,或者也可以消費(fèi)者1記錄日志 内斯,消費(fèi)者2打印日志
對(duì)應(yīng)代碼實(shí)現(xiàn):
生產(chǎn)者:
const amqp = require('amqplib');async function producer() { // 創(chuàng)建鏈接對(duì)象 const connection = await amqp.connect('amqp://localhost:5672'); // 獲取通道 const channel = await connection.createChannel(); // 聲明參數(shù) const exchangeName = 'fanout_koala_exchange'; const routingKey = ''; const msg = 'hello koala'; // 交換機(jī) await channel.assertExchange(exchangeName, 'fanout', { durable: true, }); // 發(fā)送消息 await channel.publish(exchangeName, routingKey, Buffer.from(msg)); // 關(guān)閉鏈接 await channel.close(); await connection.close();}producer();
消費(fèi)者:
const amqp = require('amqplib');async function consumer() { // 創(chuàng)建鏈接對(duì)象 const connection = await amqp.connect('amqp://localhost:5672'); // 獲取通道 const channel = await connection.createChannel(); // 聲明參數(shù) const exchangeName = 'fanout_koala_exchange'; const queueName = 'fanout_kaola_queue'; const routingKey = ''; // 聲明一個(gè)交換機(jī) await channel.assertExchange(exchangeName, 'fanout', { durable: true }); // 聲明一個(gè)隊(duì)列 await channel.assertQueue(queueName); // 綁定關(guān)系(隊(duì)列蕴潦、交換機(jī)、路由鍵) await channel.bindQueue(queueName, exchangeName, routingKey); // 消費(fèi) await channel.consume(queueName, msg => { console.log('Consumer:', msg.content.toString()); channel.ack(msg); }); console.log('消費(fèi)端啟動(dòng)成功俘闯!');}consumer();
注意:其他類型代碼已經(jīng)放到 github品擎,地址:https://github.com/koala-coding/simple_rabbitmq 歡迎 star 交流。
direct
direct 把消息路由到那些 binding key與 routing key 完全匹配的 Queue中备徐。
以上圖的配置為例萄传,我們以 routingKey=”error” 發(fā)送消息到Exchange,則消息會(huì)路由到 amq1 和 amq2蜜猾;如果我們以 routingKey=”info” 或 routingKey=”warning” 來發(fā)送消息秀菱,則消息只會(huì)路由到 Queue2。如果我們以其他 routingKey 發(fā)送消息蹭睡,則消息不會(huì)路由到這兩個(gè) Queue 中衍菱。
topic
生產(chǎn)者指定 RoutingKey 消息根據(jù)消費(fèi)端指定的隊(duì)列通過模糊匹配的方式進(jìn)行相應(yīng)轉(zhuǎn)發(fā),兩種通配符模式: #:可匹配一個(gè)或多個(gè)關(guān)鍵字 *:只能匹配一個(gè)關(guān)鍵字
headers
header exchange(頭交換機(jī))和主題交換機(jī)有點(diǎn)相似肩豁,但是不同于主題交換機(jī)的路由是基于路由鍵脊串,頭交換機(jī)的路由值基于消息的 header 數(shù)據(jù)。 主題交換機(jī)路由鍵只有是字符串,而頭交換機(jī)可以是整型和哈希值 header Exchange 類型用的比較少清钥,可以自行 google 了解琼锋。
消息隊(duì)列的思考與深入探索
消息隊(duì)列實(shí)現(xiàn)rpc
(本小段內(nèi)容來源網(wǎng)上,參考文章說明)
RPC 遠(yuǎn)程調(diào)用服務(wù)端的方法祟昭,使用 MQ 可以實(shí)現(xiàn) RPC 的異步調(diào)用缕坎,基于 Direct 交換機(jī)實(shí)現(xiàn)
- 客戶端即是生產(chǎn)者又是消費(fèi)者,向 RPC 請(qǐng)求隊(duì)列發(fā)送 RPC 調(diào)用消息篡悟,同時(shí)監(jiān)聽 RPC 響應(yīng)隊(duì)列
- 服務(wù)端監(jiān)聽RPC請(qǐng)求隊(duì)列谜叹,收到消息后執(zhí)行服務(wù)端的方法
- 服務(wù)端將方法執(zhí)行后的結(jié)果發(fā)送到RPC響應(yīng)隊(duì)列
(注意匾寝,這里只是提一下 RPC 這個(gè)知識(shí),因?yàn)閱螁我粋€(gè)RPC一篇文章都不一定說說完荷腊,有興趣的可以用隊(duì)列嘗試一下RPC)
是否有消息持久化的必要艳悔?
消息隊(duì)列
是存在內(nèi)存中的,如果出現(xiàn)問題掛掉,消息隊(duì)列中的消息會(huì)丟失女仰。所以對(duì)于一些需求非常有持久化的必要猜年!RabbitMQ 可以開啟持久化。不同開發(fā)語言都可以設(shè)置持久化參數(shù)董栽。
這里以Node.js為例子,其他語言可以自行搜索
await channel.assertExchange(exchangeName, 'direct', { durable: true }); // 注意其中的{ durable: true }企孩,這事對(duì)交換機(jī)持久化锭碳,還有其他的幾種持久化方式
同時(shí)推薦一篇不錯(cuò)的寫持久化的文章: https://juejin.im/post/5d6f6b0ae51d45621512add0
消費(fèi)者完成后是否有消息應(yīng)答的必要?
消息應(yīng)答簡(jiǎn)單的解釋就是消費(fèi)者
完成了消費(fèi)后勿璃,通知一下消息隊(duì)列擒抛。
我覺得這個(gè)配置是有必要打開的,消費(fèi)者完成消息隊(duì)列中的任務(wù)补疑,消費(fèi)者可能中途失敗或者掛掉歧沪,一旦 RabbitMQ 發(fā)送一個(gè)消息給消費(fèi)者然后便迅速將該消息從消息隊(duì)列內(nèi)存
中移除,這種情況下,消費(fèi)者對(duì)應(yīng)工作進(jìn)程失敗或者掛掉后莲组,那該進(jìn)程正在處理的消息也將丟失诊胞。而且,也將丟失所有發(fā)送給該進(jìn)程的未被處理的消息锹杈。
為了確保消息永不丟失撵孤,RabbitMQ 支持消息應(yīng)答機(jī)制。當(dāng)消息被接受竭望,處理之后一條應(yīng)答便會(huì)從消費(fèi)者回傳至發(fā)送方邪码,然后RabbitMQ將其刪除。
如果某個(gè)消費(fèi)者掛掉(信道咬清、鏈接關(guān)閉或者 tcp 鏈接丟失)且沒有發(fā)送 ack 應(yīng)答闭专,RabbitMQ 會(huì)認(rèn)為該消息沒有被處理完全然后會(huì)將其重新放置到隊(duì)列中。通過這種方式你就可以確保消息永不丟失旧烧,甚至某個(gè)工作進(jìn)程偶然掛掉的情況影钉。
默認(rèn)情況下消息應(yīng)答是關(guān)閉的。是時(shí)候使用 false(auto-ack配置項(xiàng))參數(shù)將其開啟了
這里以 Node.js 為例子掘剪,其他語言可以自行搜索
// 消費(fèi)者消費(fèi)時(shí)候的代碼await channel.consume(queueName, msg => { console.log('koala:', msg.content.toString()); //... 這里可以放業(yè)務(wù)邏輯處理的代碼斧拍,消費(fèi)者完成后發(fā)送回執(zhí)應(yīng)答 channel.ack(msg);// 消息應(yīng)答}, { noAck: false });
如何實(shí)現(xiàn)公平調(diào)度?
可以將prefetch count
項(xiàng)的值配置為1杖小,這將會(huì)指示 RabbitMQ 在同一時(shí)間不要發(fā)送超過一條消息給每個(gè)消費(fèi)者肆汹。換句話說愚墓,直到消息被處理和應(yīng)答之前都不會(huì)發(fā)送給該消費(fèi)者任何消息。取而代之的是昂勉,它將會(huì)發(fā)送消息至下一個(gè)比較閑的消費(fèi)者或工作進(jìn)程浪册。
這里以 Node.js 為例子,amqplib 庫(kù)對(duì)于限流實(shí)現(xiàn)提供的接口方法 prefetch岗照。
prefetch 參數(shù)說明:
- count:每次推送給消費(fèi)端 N 條消息數(shù)目村象,如果這 N 條消息沒有被ack,生產(chǎn)端將不會(huì)再次推送直到這 N 條消息被消費(fèi)攒至。
- global:在哪個(gè)級(jí)別上做限制厚者,ture 為 channel 上做限制,false 為消費(fèi)端上做限制迫吐,默認(rèn)為 false库菲。
// 創(chuàng)建消費(fèi)者的時(shí)候 限流參數(shù)設(shè)置await channel.prefetch(1, false);
如何實(shí)現(xiàn)一個(gè)交換機(jī)給多個(gè)消費(fèi)者依次發(fā)送消息,選擇那種交換機(jī)志膀?
如果一個(gè)生產(chǎn)者熙宇,兩個(gè)消費(fèi)者,發(fā)放消息溉浙,我想要的隊(duì)列先給消費(fèi)者1發(fā)烫止,發(fā)完消費(fèi)者1發(fā)消費(fèi)者2,這樣有順序的交互發(fā)送戳稽,應(yīng)該現(xiàn)在哪一種交換機(jī)呢馆蠕?注意是交互,看完之后想一下惊奇?還有消費(fèi)者完成后有沒有手動(dòng)回調(diào)消息隊(duì)列完成的必要荆几?消息持久化有必要沒,持久化有什么好處赊时?
(看完消息隊(duì)列的消息傳遞吨铸,你會(huì)有疑問管道中的消息(生產(chǎn)者)是怎么被消費(fèi)者消費(fèi)的 放入隊(duì)列,然后從隊(duì)列被取出)
消息隊(duì)列應(yīng)用場(chǎng)景
-
雙十一商品秒殺/搶票功能實(shí)現(xiàn)
我們?cè)陔p11的時(shí)候祖秒,當(dāng)我們凌晨大量的秒殺和搶購(gòu)商品诞吱,然后去結(jié)算的時(shí)候,就會(huì)發(fā)現(xiàn)竭缝,界面會(huì)提醒我們房维,讓我們稍等,以及一些友好的圖片文字提醒抬纸。而不是像前幾年的時(shí)代咙俩,動(dòng)不動(dòng)就頁(yè)面卡死,報(bào)錯(cuò)等來呈現(xiàn)給用戶。
用一張圖來解釋消息隊(duì)列在秒殺搶票等場(chǎng)景的使用:(說明:往下看之前阿趁,如果你做過電商類秒殺膜蛔,可以想想你是怎么實(shí)現(xiàn)的,我們可以一起討論哦脖阵。這里只是想說下消息隊(duì)列的作用皂股,并不是最終優(yōu)化的結(jié)果,比如用redis控制總緩存等)
image這里在生成訂單時(shí)候命黔,不需要直接操作數(shù)據(jù)庫(kù) IO 呜呐,預(yù)扣庫(kù)存。先扣除了庫(kù)存悍募,保證不超賣蘑辑,然后異步生成用戶訂單,這里用到一次即時(shí)
消費(fèi)隊(duì)列
坠宴,這樣響應(yīng)給用戶的速度就會(huì)快很多洋魂;而且還要保證不少賣,用戶拿到了訂單啄踊,不支付怎么辦忧设?我們都知道現(xiàn)在訂單都有有效期刁标,再使用一個(gè)消息隊(duì)列
颠通,用于判斷訂單支付超時(shí),比如說用戶五分鐘內(nèi)不支付膀懈,訂單就失效了顿锰,訂單一旦失效,就會(huì)加入新的庫(kù)存启搂。這也是現(xiàn)在很多網(wǎng)上零售企業(yè)保證商品不少賣采用的方案硼控。訂單量比較少的情況下,生成訂單非掣於模快牢撼,用戶幾乎不用排隊(duì)。 -
積分兌換(積分可用于多平臺(tái))
積分兌換模塊疑苫,有一個(gè)公司多個(gè)部門都要用到這個(gè)模塊熏版,這時(shí)候就可以通過消息隊(duì)列解耦這個(gè)特性來實(shí)現(xiàn)。 各部門系統(tǒng)做各部門的事捍掺,但是他們都可以用這個(gè)積分系統(tǒng)進(jìn)行商品的兌換等撼短。其他模塊與積分模塊完全解耦。
-
發(fā)送郵件琅拌,用戶大數(shù)據(jù)分析等 同步變異步功能實(shí)現(xiàn)
這個(gè)功能要說的比較多晨继,從一個(gè)平臺(tái)的用戶注冊(cè)開始睦刃。
正常情況注冊(cè)棚放,不出現(xiàn)高并發(fā):
對(duì)于用戶來說禾嫉,他就是想注冊(cè)用一下這個(gè)軟件灾杰,只要服務(wù)端將他的賬戶信息存到數(shù)據(jù)庫(kù)中他便可以登錄上去做他想做的事情了。用戶并不care這些事夭织,服務(wù)端就可以把其他的操作放入對(duì)應(yīng)的
消息隊(duì)列
中然后馬上返回用戶結(jié)果吭露,由消息隊(duì)列異步
的進(jìn)行這些操作。假如有大量的用戶注冊(cè)尊惰,發(fā)生了高并發(fā):
郵件接口承受不住讲竿,或是分析信息時(shí)的大量計(jì)算使 cpu 滿載,這將會(huì)出現(xiàn)雖然用戶數(shù)據(jù)記錄很快的添加到數(shù)據(jù)庫(kù)中了弄屡,但是卻卡在發(fā)郵件或分析信息時(shí)的情況题禀,導(dǎo)致請(qǐng)求的響應(yīng)時(shí)間大幅增長(zhǎng),甚至出現(xiàn)超時(shí)膀捷,這就有點(diǎn)不劃算了迈嘹。面對(duì)這種情況一般也是將這些操作放入消息隊(duì)列(生產(chǎn)者消費(fèi)者模型),消息隊(duì)列慢慢的進(jìn)行處理全庸,同時(shí)可以很快的完成注冊(cè)請(qǐng)求秀仲,不會(huì)影響用戶使用其他功能。
用戶注冊(cè)
用戶注冊(cè)選擇幾個(gè)興趣標(biāo)簽壶笼,這時(shí)候需要根據(jù)用戶的屬性神僵,用戶分析,計(jì)算出推薦內(nèi)容
注冊(cè)后可能需要發(fā)送郵件給用戶
發(fā)送給用戶一個(gè)包含操作指南的系統(tǒng)通知
等等
-
基于RabbitMQ的Node.js與Python或其他語言實(shí)現(xiàn)通信
這里也是利用了 RabbitMQ 的解耦特性覆劈,不僅僅可以與 Python保礼,還可以與其他很多語言通信,就不具體說了责语。
總結(jié)
親炮障,別只看,你試試呀坤候!直接開啟服務(wù)胁赢,裝個(gè) RabbitMQ,挺有意思的白筹,就算一個(gè) HelloWorld 也能嘗試出很多內(nèi)容智末。而且本文說的很多內(nèi)容都可以用 redis 來實(shí)現(xiàn),也可以去看下我的 redis 文章遍蟋。順便說一句設(shè)計(jì)模式和數(shù)據(jù)結(jié)構(gòu)是兩個(gè)好東西吹害,越來越能感覺到。文章代碼地址:https://github.com/koala-coding/simple_rabbitmq
參考文章
https://www.cnblogs.com/baidawei/p/9172433.html
https://www.sojson.com/blog/48.html
http://www.imooc.com/article/293742
https://www.zhihu.com/question/34243607/answer/58314162
https://bbs.csdn.net/topics/392169691?page=1
https://mp.weixin.qq.com/s/wTkwJXlNr5CaI7uRntJ42A
本文轉(zhuǎn)自:https://mp.weixin.qq.com/s/l55gH6F8eur1tGnJVG-vIg