?? 文章中所有的示例代碼全部上傳到自己的Github中聋庵,需要參考請(qǐng)移步RabbitMQ
消息隊(duì)列
什么是消息隊(duì)列?
在我真正開(kāi)始學(xué)習(xí)消息隊(duì)列之前就經(jīng)常聽(tīng)到這個(gè)詞,常見(jiàn)于各類(lèi)面試題和高并發(fā)場(chǎng)景孩擂,一般都會(huì)伴隨著分布式系統(tǒng)出現(xiàn),當(dāng)時(shí)自己覺(jué)得這個(gè)真的好難箱熬,好深?yuàn)W类垦,但是學(xué)習(xí)一段時(shí)間后發(fā)現(xiàn)實(shí)際上消息隊(duì)列的使用實(shí)際上并不難狈邑,難點(diǎn)在于對(duì)于業(yè)務(wù)的理解和模型的建立以及對(duì)出現(xiàn)的問(wèn)題處理這方面。
究竟什么是消息隊(duì)列?
在計(jì)算機(jī)科學(xué)中蚤认,消息隊(duì)列(英語(yǔ):Message queue)是一種進(jìn)程間通信或同一進(jìn)程的不同線程間的通信方式米苹,軟件的貯列用來(lái)處理一系列的輸入,通常是來(lái)自用戶砰琢。消息隊(duì)列提供了異步的通信協(xié)議蘸嘶,每一個(gè)貯列中的紀(jì)錄包含詳細(xì)說(shuō)明的數(shù)據(jù),包含發(fā)生的時(shí)間陪汽,輸入設(shè)備的種類(lèi)训唱,以及特定的輸入?yún)?shù),也就是說(shuō):消息的發(fā)送者和接收者不需要同時(shí)與消息隊(duì)列交互掩缓。消息會(huì)保存在隊(duì)列中雪情,直到接收者取回它∧憷保——摘取自維基百科消息隊(duì)列
自己對(duì)于消息隊(duì)列的理解實(shí)際上可以從兩方面入手巡通,消息與隊(duì)列。
消息與隊(duì)列
什么是消息?香農(nóng)在《信息的數(shù)學(xué)理論》中定義能夠攜帶信息量的信息都可以稱(chēng)之為消息舍哄。簡(jiǎn)而言之有發(fā)送方有接收方并且能夠在二者之間傳遞信息的載體都可以稱(chēng)之為消息宴凉。
什么是隊(duì)列(queue)?一種經(jīng)典的先入先出的數(shù)據(jù)結(jié)構(gòu),后端插入數(shù)據(jù)表悬,前端獲取數(shù)據(jù)弥锄。
為什么需要消息隊(duì)列,這需要結(jié)合一些業(yè)務(wù)邏輯來(lái)說(shuō)蟆沫。在計(jì)算機(jī)中籽暇,硬件之間傳輸速率總是不盡相同,例如使用SATA盤(pán)向機(jī)械硬盤(pán)拷貝數(shù)據(jù)饭庞,無(wú)論SATA速率如何高戒悠,總要受到機(jī)械硬盤(pán)的限制。而軟件中同樣舟山,很多情況下绸狐,我們消息在單位時(shí)間內(nèi)獲取的速度和軟件滿足單位時(shí)間內(nèi)寫(xiě)入的速度并不相匹配,例如數(shù)據(jù)庫(kù)寫(xiě)入瓶頸累盗,這時(shí)候就需要使用到消息隊(duì)列作為緩沖寒矿,還有很多情況之后再總結(jié)。
將消息暫存在隊(duì)列中若债,保證消息到達(dá)的順序性同時(shí)還能夠保證消息的準(zhǔn)確性符相,我覺(jué)得這是消息隊(duì)列最可貴的地方。
常見(jiàn)消息隊(duì)列
RabbitMQ
RabbitMQ是使用Erlang編寫(xiě)的一個(gè)開(kāi)源的消息隊(duì)列拆座,本身支持很多的協(xié)議:AMQP主巍,XMPP, SMTP, STOMP冠息,也正因如此,它非常重量級(jí)孕索,更適合于企業(yè)級(jí)的開(kāi)發(fā)逛艰。同時(shí)實(shí)現(xiàn)了Broker構(gòu)架,這意味著消息在發(fā)送給客戶端時(shí)先在中心隊(duì)列排隊(duì)搞旭。對(duì)路由散怖,負(fù) 載均衡或者數(shù)據(jù)持久化都有很好的支持。
Redis
Redis是一個(gè)基于Key-Value對(duì)的NoSQL數(shù)據(jù)庫(kù)肄渗,開(kāi)發(fā)維護(hù)很活躍镇眷。雖然它是一個(gè)Key-Value數(shù)據(jù)庫(kù)存儲(chǔ)系統(tǒng),但它本身支持MQ功能翎嫡, 所以完全可以當(dāng)做一個(gè)輕量級(jí)的隊(duì)列服務(wù)來(lái)使用欠动。對(duì)于RabbitMQ和Redis的入隊(duì)和出隊(duì)操作,各執(zhí)行100萬(wàn)次惑申,每10萬(wàn)次記錄一次執(zhí)行時(shí)間具伍。測(cè)試 數(shù)據(jù)分為128Bytes、512Bytes圈驼、1K和10K四個(gè)不同大小的數(shù)據(jù)人芽。實(shí)驗(yàn)表明:入隊(duì)時(shí),當(dāng)數(shù)據(jù)比較小時(shí)Redis的性能要高于 RabbitMQ绩脆,而如果數(shù)據(jù)大小超過(guò)了10K萤厅,Redis則慢的無(wú)法忍受;出隊(duì)時(shí)靴迫,無(wú)論數(shù)據(jù)大小惕味,Redis都表現(xiàn)出非常好的性能,而RabbitMQ 的出隊(duì)性能則遠(yuǎn)低于Redis玉锌。
Kafka/Jafka
Kafka是Apache下的一個(gè)子項(xiàng)目赦拘,是一個(gè)高性能跨語(yǔ)言分布式Publish/Subscribe消息隊(duì)列系統(tǒng),而Jafka是在Kafka之上孵 化而來(lái)的芬沉,即Kafka的一個(gè)升級(jí)版。具有以下特性:快速持久化阁猜,可以在O(1)的系統(tǒng)開(kāi)銷(xiāo)下進(jìn)行消息持久化丸逸;高吞吐,在一臺(tái)普通的服務(wù)器上既可以達(dá)到 10W/s的吞吐速率剃袍;完全的分布式系統(tǒng)黄刚,Broker、Producer民效、Consumer都原生自動(dòng)支持分布式憔维,自動(dòng)實(shí)現(xiàn)復(fù)雜均衡涛救;支持Hadoop 數(shù)據(jù)并行加載,對(duì)于像Hadoop的一樣的日志數(shù)據(jù)和離線分析系統(tǒng)业扒,但又要求實(shí)時(shí)處理的限制检吆,這是一個(gè)可行的解決方案。Kafka通過(guò)Hadoop的并行 加載機(jī)制來(lái)統(tǒng)一了在線和離線的消息處理程储。Apache Kafka相對(duì)于ActiveMQ是一個(gè)非常輕量級(jí)的消息系統(tǒng)蹭沛,除了性能非常好之外,還是一個(gè)工作良好的分布式系統(tǒng)章鲤。
ZeroMQ
ZeroMQ號(hào)稱(chēng)最快的消息隊(duì)列系統(tǒng)摊灭,尤其針對(duì)大吞吐量的需求場(chǎng)景。ZMQ能夠?qū)崿F(xiàn)RabbitMQ不擅長(zhǎng)的高級(jí)/復(fù)雜的隊(duì)列败徊,但是開(kāi)發(fā)人員需要自己組合 多種技術(shù)框架帚呼,技術(shù)上的復(fù)雜度是對(duì)這MQ能夠應(yīng)用成功的挑戰(zhàn)。ZeroMQ具有一個(gè)獨(dú)特的非中間件的模式皱蹦,你不需要安裝和運(yùn)行一個(gè)消息服務(wù)器或中間件煤杀,因 為你的應(yīng)用程序?qū)缪萘诉@個(gè)服務(wù)角色。你只需要簡(jiǎn)單的引用ZeroMQ程序庫(kù)根欧,可以使用NuGet安裝怜珍,然后你就可以愉快的在應(yīng)用程序之間發(fā)送消息了。但 是ZeroMQ僅提供非持久性的隊(duì)列凤粗,也就是說(shuō)如果down機(jī)酥泛,數(shù)據(jù)將會(huì)丟失。其中嫌拣,Twitter的Storm中默認(rèn)使用ZeroMQ作為數(shù)據(jù)流的傳 輸柔袁。
ActiveMQ
ActiveMQ是Apache下的一個(gè)子項(xiàng)目。 類(lèi)似于ZeroMQ异逐,它能夠以代理人和點(diǎn)對(duì)點(diǎn)的技術(shù)實(shí)現(xiàn)隊(duì)列捶索。同時(shí)類(lèi)似于RabbitMQ,它少量代碼就可以高效地實(shí)現(xiàn)高級(jí)應(yīng)用場(chǎng)景灰瞻。
消息隊(duì)列常見(jiàn)適用場(chǎng)景
首先來(lái)說(shuō)個(gè)自己之前的經(jīng)歷吧腥例,在之前的電纜監(jiān)控管理項(xiàng)目中,需要與很多網(wǎng)關(guān)進(jìn)行數(shù)據(jù)交換酝润,而這個(gè)時(shí)間節(jié)點(diǎn)是1分鐘一次燎竖,所以在59秒的時(shí)候系統(tǒng)可能會(huì)收到非常多的MODBUS數(shù)據(jù),通過(guò)解析轉(zhuǎn)化為對(duì)象后要销,這些數(shù)據(jù)大約在3000個(gè)/s构回,但是這種數(shù)據(jù)高峰只出現(xiàn)在這一秒,其他的時(shí)候就非常平緩。由于當(dāng)時(shí)的服務(wù)器并不太好纤掸,所以在用戶使用時(shí)脐供,總會(huì)在一分鐘內(nèi)卡頓一兩秒的時(shí)間,用戶體驗(yàn)極差借跪。當(dāng)時(shí)的設(shè)置的解決方案是將Netty分批次阻塞處理政己,也就是說(shuō)讓這3000個(gè)數(shù)據(jù)在1分鐘內(nèi)均勻到達(dá)。
但是這樣有一些缺點(diǎn)垦梆,可能某個(gè)數(shù)據(jù)是1分1秒發(fā)送而我們將他阻塞到了1分59秒匹颤,數(shù)據(jù)的準(zhǔn)確性可能會(huì)下降不少『啵現(xiàn)在看來(lái)這種場(chǎng)景實(shí)際上非常適合消息隊(duì)列溪北,可以先將這些數(shù)據(jù)緩存到消息隊(duì)列中辛臊,再由隊(duì)列寫(xiě)入數(shù)據(jù)庫(kù)稀余。
1.異步處理
假設(shè)現(xiàn)在用戶需要注冊(cè)一個(gè)賬號(hào)寒瓦,注冊(cè)完成后需要發(fā)送注冊(cè)郵件和短信提示被冒,如果讓整個(gè)流程按照串行化執(zhí)行适室,從注冊(cè)到系統(tǒng)響應(yīng)共計(jì)150ms货邓。
如果讓發(fā)送郵件和發(fā)送短信兩個(gè)步驟并行執(zhí)行公浪,需要100ms響應(yīng)他宛,顯然時(shí)間已經(jīng)縮短了1/3.
如果加入了消息隊(duì)列來(lái)處理。
與之前的并行處理方式所不同的是欠气,在加入消息隊(duì)列后厅各,發(fā)送郵件與發(fā)送短信功能變?yōu)榱水惒教幚恚簿褪钦f(shuō)當(dāng)用戶將注冊(cè)信息存入數(shù)據(jù)庫(kù)后预柒,數(shù)據(jù)庫(kù)會(huì)將這個(gè)對(duì)象存入消息隊(duì)列队塘,此時(shí)就完成了響應(yīng)。隨后發(fā)送郵件和短信的模塊會(huì)自己去消息隊(duì)列中獲取該對(duì)象進(jìn)行發(fā)送宜鸯,此時(shí)用戶響應(yīng)為60ms憔古,最為快捷。
2.應(yīng)用解耦
消息隊(duì)列同樣常見(jiàn)于應(yīng)用的解耦方面淋袖,作為一個(gè)中間件鸿市,消息隊(duì)列讓兩個(gè)強(qiáng)耦合的關(guān)系變成二者同時(shí)依賴(lài)于消息隊(duì)列,二者中任意一個(gè)都能夠不依賴(lài)另一方所進(jìn)行工作即碗。如圖(畫(huà)圖不易焰情,取圖請(qǐng)注明)
如果不適用消息隊(duì)列直接讓訂單系統(tǒng)依賴(lài)于庫(kù)存系統(tǒng),則會(huì)出現(xiàn)二者任意一個(gè)崩潰整個(gè)系統(tǒng)都無(wú)法使用剥懒,而在加入消息隊(duì)列后烙样,如果庫(kù)存系統(tǒng)崩潰,用戶依舊可以正常下單蕊肥,只是這些訂單將存儲(chǔ)在訂單隊(duì)列中,待庫(kù)存系統(tǒng)修復(fù)后可從隊(duì)列中獲取進(jìn)行業(yè)務(wù)處理,讓整個(gè)系統(tǒng)更加安全壁却,依賴(lài)程度更低批狱。
3.流量削峰
當(dāng)某個(gè)時(shí)間點(diǎn)流量過(guò)大,會(huì)超過(guò)服務(wù)器負(fù)載的時(shí)候需要用到流量削峰展东,例如在秒殺活動(dòng)中赔硫,可以將秒殺訂單存放進(jìn)入消息隊(duì)列,如果超過(guò)隊(duì)列上限可以丟棄盐肃,然后再進(jìn)行訂單的處理爪膊,緩解服務(wù)器壓力。
RabbitMQ
RabbitMQ是一套開(kāi)源(MPL)的消息隊(duì)列服務(wù)軟件砸王,是由 LShift 提供的一個(gè) Advanced Message Queuing Protocol (AMQP) 的開(kāi)源實(shí)現(xiàn)推盛,由以高性能、健壯以及可伸縮性出名的 Erlang 寫(xiě)成谦铃。
可供選擇的消息隊(duì)列很多耘成,之所以優(yōu)先學(xué)習(xí)了RabbitMQ,主要在于其性能強(qiáng)驹闰,由Erlang編寫(xiě)瘪菌,天生對(duì)于高并發(fā)類(lèi)型友好,而且其學(xué)習(xí)資源豐富嘹朗,遇到問(wèn)題更加容易排查處理师妙。
安裝RabbitMQ
如果有能力最好按照官網(wǎng)的文檔進(jìn)行安裝,任何時(shí)候官方文檔都應(yīng)該是第一選擇屹培。
安裝完成之后默穴,打開(kāi)默認(rèn)RabbitMQ-Server分配的地址localhost:15672,默認(rèn)密碼和用戶名都為guest惫谤。
Get Started In Java
簡(jiǎn)單隊(duì)列
既然是消息隊(duì)列壁顶,我們首先來(lái)用Java寫(xiě)個(gè)Demo往消息隊(duì)列中存一些MSG,示例為簡(jiǎn)單隊(duì)列溜歪,由一個(gè)消費(fèi)者與一個(gè)生產(chǎn)者組成若专。
和數(shù)據(jù)庫(kù)相同,首先要獲取鏈接蝴猪,這里抽象處理调衰,編寫(xiě)一個(gè)RabbitMQConnectionUtils
。
package com.magnoliaory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 工具類(lèi)自阱,類(lèi)似JDBC提供MQ的連接
*/
public class RabbitMQUtils {
public static Connection getRabbitMQConnection
(String host , Integer port , String virtualHost , String username , String password) {
//定義一個(gè)工廠
ConnectionFactory connectionFactory = new ConnectionFactory();
//設(shè)置連接參數(shù)
connectionFactory.setHost(host);
//設(shè)置連接方式的端口嚎莉,例如amqp對(duì)應(yīng)5672
connectionFactory.setPort(port);
//設(shè)置Vhost , 可以看作數(shù)據(jù)庫(kù)
connectionFactory.setVirtualHost(virtualHost);
//設(shè)置用戶名和密碼
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
Connection connection = null;
//獲取連接
try {
connection = connectionFactory.newConnection();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
return connection;
}
}
然后編寫(xiě)一個(gè)生產(chǎn)者類(lèi)來(lái)進(jìn)行消息的寫(xiě)入。
package com.magnoliaory;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
public class RabbitMQProduc {
public static void main(String[] args) throws Exception {
for (int i = 0; i < 11; i++) {
prodceMessage();
Thread.sleep(3000);
}
}
/**
* 隊(duì)列名稱(chēng)
*/
public final static String QUEUE_NAME = "MyProductQueue";
public static void prodceMessage() throws Exception {
//獲取連接
Connection connection = RabbitMQUtils.
getRabbitMQConnection("localhost", 5672,
"/magnolia", "guest", "guest");
//獲取通道
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//發(fā)送消息
String msg = "HELLO WORLD";
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
//關(guān)閉
channel.close();
connection.close();
}
}
可以看到沛豌,此時(shí)消息隊(duì)列中已經(jīng)有了十一個(gè)數(shù)據(jù)準(zhǔn)備就緒趋箩,等待我們get
赃额。
編寫(xiě)消費(fèi)者
package com.magnoliaory;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.Date;
public class RabbitMQConsumer {
public static void main(String[] args) throws IOException {
receiveMessage();
}
public final static String QUEUE_NAME = "MyProductQueue";
public static void receiveMessage() throws IOException {
//獲取連接
Connection connection = RabbitMQUtils.
getRabbitMQConnection("localhost", 5672,
"/magnolia", "guest", "guest");
Channel channel = connection.createChannel();
//隊(duì)列聲明
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//事件模型
DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body, "UTF-8");
System.out.println("接收端收到信息 : "+msg + " " + new Date());
}
};
//監(jiān)聽(tīng)隊(duì)列
channel.basicConsume(QUEUE_NAME, true, defaultConsumer);
}
}
看下控制臺(tái)效果。
接收端收到信息 : HELLO WORLD Tue Jan 07 14:52:48 CST 2020
接收端收到信息 : HELLO WORLD Tue Jan 07 14:52:51 CST 2020
接收端收到信息 : HELLO WORLD Tue Jan 07 14:52:54 CST 2020
接收端收到信息 : HELLO WORLD Tue Jan 07 14:52:57 CST 2020
接收端收到信息 : HELLO WORLD Tue Jan 07 14:53:00 CST 2020
接收端收到信息 : HELLO WORLD Tue Jan 07 14:53:03 CST 2020
接收端收到信息 : HELLO WORLD Tue Jan 07 14:53:06 CST 2020
接收端收到信息 : HELLO WORLD Tue Jan 07 14:53:09 CST 2020
接收端收到信息 : HELLO WORLD Tue Jan 07 14:53:12 CST 2020
接收端收到信息 : HELLO WORLD Tue Jan 07 14:53:15 CST 2020
控制臺(tái)中能夠看到叫确,此時(shí)consumer依舊在監(jiān)聽(tīng)消息隊(duì)列跳芳。
工作模式(work)
工作隊(duì)列由一個(gè)生產(chǎn)者與多個(gè)消費(fèi)者組成。
之前代碼示例為簡(jiǎn)單隊(duì)列(Simple Queue)竹勉,其結(jié)構(gòu)為傳統(tǒng)的一對(duì)一類(lèi)型飞盆,但是再實(shí)際生產(chǎn)中,可能存在生產(chǎn)者生產(chǎn)速度遠(yuǎn)遠(yuǎn)高于消費(fèi)者消費(fèi)速度次乓,此時(shí)如果依舊按照一對(duì)一模式消費(fèi)吓歇,則會(huì)造成消息在隊(duì)列中積壓,所以我們可以根據(jù)業(yè)務(wù)產(chǎn)生的速率來(lái)構(gòu)建工作隊(duì)列票腰,讓消費(fèi)者和生產(chǎn)者之間速率接近平衡城看,此時(shí)的模型即為工作隊(duì)列。
實(shí)際上工作隊(duì)和簡(jiǎn)單隊(duì)列差異很小丧慈,更多的是一種業(yè)務(wù)邏輯上的區(qū)別析命。看下生產(chǎn)者代碼逃默。
package workQueue;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import sun.reflect.misc.ConstructorUtil;
import utils.RabbitMQUtils;
import java.io.IOException;
/**
* @author fanhao
* 工作隊(duì)列消息發(fā)送者
*/
public class WorkQueueProducer {
public static void main(String[] args) throws Exception {
for (int i = 0; i < 100; i++) {
createMsg(i);
System.out.println("第"+i+"消息發(fā)送完成");
Thread.sleep(500);
}
}
public static final String QUEUE_NAME = "MSG_PRODUCER";
public static void createMsg(Integer temp) throws Exception {
Connection connection = RabbitMQUtils.
getRabbitMQConnection("localhost", 5672,
"/magnolia", "guest", "guest");
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String msg = "消息生產(chǎn)者M(jìn)SG_PRDUCER第" + temp + "次發(fā)送消息";
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
channel.close();
connection.close();
}
}
消費(fèi)者代碼鹃愤。
package workQueue;
import com.rabbitmq.client.*;
import utils.RabbitMQUtils;
import java.io.IOException;
import java.util.Date;
public class WorkQueueConsumer {
public static void main(String[] args) throws IOException {
receiveMessage();
}
public final static String QUEUE_NAME = "MSG_PRODUCER";
public static void receiveMessage() throws IOException {
//獲取連接
Connection connection = RabbitMQUtils.
getRabbitMQConnection("localhost", 5672,
"/magnolia", "guest", "guest");
Channel channel = connection.createChannel();
//隊(duì)列聲明
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//事件模型
DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body, "UTF-8");
System.out.println("接收端收到信息 : " + msg + " " + new Date() + Thread.currentThread());
try {
Thread.sleep(1500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
//監(jiān)聽(tīng)隊(duì)列
channel.basicConsume(QUEUE_NAME, true, defaultConsumer);
}
}
這里啟動(dòng)兩個(gè)consumer,可以看到一個(gè)奇怪的現(xiàn)象完域,兩個(gè)消費(fèi)者均分软吐,一個(gè)獲取到的是奇數(shù)消息,另一個(gè)獲取到偶數(shù)吟税,這種情況稱(chēng)之為輪詢分發(fā)凹耙。
輪詢分發(fā) : 無(wú)論Consumer負(fù)載是否相同,任務(wù)消息總是均分處理肠仪,即 K%N 處理方式肖抱。
針對(duì)這種情況可以使用公平分發(fā)的模式來(lái)處理。即使用basicQos(perfetch=1)
异旧。公平分發(fā)主要區(qū)別在于意述,每次MQ都會(huì)按照f(shuō)etch數(shù)為消費(fèi)者發(fā)送消息,之后消費(fèi)者處理完成并給與RabbitMQ響應(yīng)后(手動(dòng)反饋)吮蛹,才會(huì)再次發(fā)送下一個(gè)消息荤崇,這樣就保證了性能強(qiáng)的消費(fèi)者處理更多的消息,即公平分發(fā)潮针。
首先要關(guān)閉consumer的自動(dòng)應(yīng)答术荤。
//監(jiān)聽(tīng)隊(duì)列
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME,autoAck , defaultConsumer);
然后在生產(chǎn)者中添加對(duì)于公平隊(duì)列發(fā)送的通道設(shè)計(jì)即可。
//確認(rèn)響應(yīng)前只發(fā)送一個(gè)消息
channel.basicQos(1);
此時(shí)啟動(dòng)服務(wù)器每篷,發(fā)現(xiàn)二者的消費(fèi)行為就有了差距瓣戚。
消息應(yīng)答
消息應(yīng)答實(shí)際上就是一種溝通機(jī)制端圈,在消費(fèi)者和MQ之間的行為,當(dāng)消費(fèi)者獲取消息完成后带兜,會(huì)告知RabbitMQ枫笛,同時(shí)消息隊(duì)列將消息從內(nèi)存中刪除。
自動(dòng)應(yīng)答
使用自動(dòng)應(yīng)答autoack = true
時(shí)刚照,消息將以輪詢的方式發(fā)送,此時(shí)如果一個(gè)消息已經(jīng)發(fā)送喧兄,并且目標(biāo)消費(fèi)者出現(xiàn)異常无畔,該消息將永遠(yuǎn)消失,所以這種情況下存在消息丟失的風(fēng)險(xiǎn)吠冤。
手動(dòng)應(yīng)答
如果使用手動(dòng)應(yīng)答浑彰,則在消息隊(duì)列未收到消息處理完成之前,并不會(huì)刪除該消息拯辙,如果消息發(fā)送后消費(fèi)者崩潰郭变,消息隊(duì)列將會(huì)把該消息發(fā)送給其他的消費(fèi)者。
這樣我們就解決了消費(fèi)者異常導(dǎo)致任務(wù)丟失的情況涯保,但是還存在RabbitMQ異常導(dǎo)致消息丟失的情況诉濒。所以RabbitMQ提供了消息持久化的功能保證我們能夠?qū)?nèi)存中的數(shù)據(jù)持久化到磁盤(pán)上,防止出現(xiàn)異常情況夕春。
消息持久化
在之前寫(xiě)一些簡(jiǎn)單的消息隊(duì)列時(shí)都會(huì)看到這樣一個(gè)方法未荒,注意RabbitMQ不允許重新定義不同參數(shù)的隊(duì)列,例如我們剛剛寫(xiě)好的內(nèi)容修改未true及志,將無(wú)法使用片排,只能新建一個(gè)隊(duì)列。
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
我們來(lái)看看源碼中這些參數(shù)的內(nèi)容速侈。
/**
* Declare a queue
* @see com.rabbitmq.client.AMQP.Queue.Declare
* @see com.rabbitmq.client.AMQP.Queue.DeclareOk
* @param queue the name of the queue
* @param durable true if we are declaring a durable queue (the queue will survive a server restart)
* @param exclusive true if we are declaring an exclusive queue (restricted to this connection)
* @param autoDelete true if we are declaring an autodelete queue (server will delete it when no longer in use)
* @param arguments other properties (construction arguments) for the queue
* @return a declaration-confirm method to indicate the queue was successfully declared
* @throws java.io.IOException if an error is encountered
*/
Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
Map<String, Object> arguments) throws IOException;
可以看到the queue will survive a server restart
這句話率寡,很明顯,這個(gè)durable即將消息隊(duì)列持久化到磁盤(pán)倚搬,讓隊(duì)列信息在服務(wù)器重啟后得以幸存冶共。持久化只需要將其設(shè)置為true即可。
訂閱模式(fanout)
消息被發(fā)布到通道上潭枣。訂閱者將收到其訂閱的主題上的所有消息比默,并且所有訂閱同一主題的訂閱者將接收到同樣的消息。
和前面兩種隊(duì)列相比較盆犁,訂閱模式有如下幾個(gè)特點(diǎn)命咐。
- 每個(gè)消費(fèi)者擁有自己的消息隊(duì)列
- 消息將由生產(chǎn)者發(fā)送到交換機(jī),再由交換機(jī)轉(zhuǎn)發(fā)到消息隊(duì)列中
- 每個(gè)隊(duì)列都要被綁定在交換機(jī)上
- 生產(chǎn)者發(fā)送的消息經(jīng)過(guò)交換機(jī)到達(dá)隊(duì)列后谐岁,一個(gè)消息可以被多個(gè)消費(fèi)者消費(fèi)
創(chuàng)建生產(chǎn)者
public class SubscripQueueProducer {
public static void main(String[] args) throws Exception {
for (int i = 0; i < 10; i++) {
createMsg();
}
}
//聲明交換機(jī)的名稱(chēng)
public static final String EXCHANGE_NAME = "SUBSCRIPT_QUEUE";
public static void createMsg() throws Exception {
//同之前
Connection connection = RabbitMQUtils.
getRabbitMQConnection("localhost", 5672,
"/magnolia", "guest", "guest");
Channel channel = connection.createChannel();
//聲明交換機(jī)
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
//發(fā)送消息
String msg = "這是一條來(lái)自交換機(jī)的消息醋奠。";
channel.basicPublish(EXCHANGE_NAME,"" , null, msg.getBytes());
channel.close();
connection.close();
}
}
運(yùn)行結(jié)果榛臼,查看控制臺(tái)能夠看到在交換機(jī)一欄中消息已經(jīng)被推送進(jìn)來(lái)。
但是我們發(fā)現(xiàn)窜司,消息卻丟失了沛善,這里需要注意,在RabbitMQ中塞祈,只有隊(duì)列有存儲(chǔ)能力金刁,交換機(jī)無(wú)存儲(chǔ)能力,在沒(méi)有隊(duì)列綁定時(shí)议薪,數(shù)據(jù)將被直接拋棄尤蛮。
下面我們寫(xiě)一個(gè)消費(fèi)者隊(duì)列。
private static final String QUEUE_NAME = "subscriptQueueConsumer";
public static void getMsg() throws Exception {
//同之前
Connection connection = RabbitMQUtils.
getRabbitMQConnection("localhost", 5672,
"/magnolia", "guest", "guest");
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//將隊(duì)列綁定在交換機(jī)上
channel.queueBind(QUEUE_NAME, SubscriptQueueProducer.EXCHANGE_NAME, "");
//事件模型
DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body, "UTF-8");
System.out.println(" 1接收端收到信息 : "+msg + " " + new Date());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
//監(jiān)聽(tīng)隊(duì)列
channel.basicConsume(QUEUE_NAME, true, defaultConsumer);
}
}
先啟動(dòng)consumer監(jiān)聽(tīng)斯议,然后啟動(dòng)生產(chǎn)者产捞,這時(shí)候可以看到控制臺(tái)交換機(jī)中有綁定的隊(duì)列。
路由模式(direct)
路由模式實(shí)際上時(shí)對(duì)于訂閱模式的一個(gè)拓展哼御。
訂閱模式中只要生產(chǎn)者發(fā)送消息到交換機(jī)中坯临,交換機(jī)會(huì)將他們推送給所有的消息隊(duì)列,而路由模式則會(huì)在推送消息時(shí)攜帶一個(gè)Key恋昼,只有與該Key匹配的消息隊(duì)列才能接收到該消息看靠,也就是說(shuō),路由模式將能夠決定給哪些消息隊(duì)列推送消息焰雕。
主題模式(topic)
之前我們使用過(guò)一個(gè)方法 :
channel.queueBind(QUEUE_NAME, SubscriptQueueProducer.EXCHANGE_NAME, "");
其中最后一個(gè)參數(shù)未rountingKey(路由鍵)衷笋,而路由模式則和此值有關(guān)。在消費(fèi)者和生產(chǎn)者中只需要增加對(duì)于自身routingKey
的綁定矩屁,同時(shí)修改Exchange的type為direct辟宗。
簡(jiǎn)單來(lái)講,主題模式類(lèi)似于正則表達(dá)式或者通配符吝秕。
如圖泊脐,其中以CH開(kāi)頭的消息都會(huì)發(fā)送到CH.#消息隊(duì)列中,以.News結(jié)尾都會(huì)發(fā)送到#.News消息隊(duì)列中烁峭,以此類(lèi)推容客。
下面就直接上代碼,生產(chǎn)者 :
public class TopicPouducer {
//聲明交換機(jī)的名稱(chēng)
public static final String EXCHANGE_NAME = "Topic";
public static void createMsg() throws Exception {
//同之前
Connection connection = RabbitMQUtils.
getRabbitMQConnection("localhost", 5672,
"/magnolia", "guest", "guest");
Channel channel = connection.createChannel();
//聲明交換機(jī)
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
//發(fā)送消息
String msg = "路由信息";
channel.basicPublish(EXCHANGE_NAME,"goods.car" , null, msg.getBytes());
channel.close();
connection.close();
}
}
消費(fèi)者
public class TopicConsumer {
private static final String QUEUE_NAME = "TopicConsumer";
public static void getMsg() throws Exception {
//同之前
Connection connection = RabbitMQUtils.
getRabbitMQConnection("localhost", 5672,
"/magnolia", "guest", "guest");
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//將隊(duì)列綁定在交換機(jī)上
//隊(duì)列接收一切以goods.開(kāi)頭的消息
channel.queueBind(QUEUE_NAME, SubscriptQueueProducer.EXCHANGE_NAME, "goods.#");
//事件模型
DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body, "UTF-8");
System.out.println(" 1接收端收到信息 : "+msg + " " + new Date());
}
};
//監(jiān)聽(tīng)隊(duì)列
channel.basicConsume(QUEUE_NAME, true, defaultConsumer);
}
}
消息確認(rèn)機(jī)制
之前消息應(yīng)答保證了消費(fèi)者與RabbitMQ之間的消息存儲(chǔ)安全约郁,但是在生產(chǎn)者一方缩挑,我們無(wú)法知道我們的消息是否準(zhǔn)確發(fā)送到了RabbitMQ消息隊(duì)列中。這里總結(jié)兩種方式 :
- AMQP事務(wù)機(jī)制
- Confirm模式
AMQP
try {
channel.txSelect();
channel.basicPublish(EXCHANGE_NAME,"" , null, msg.getBytes());
channel.txCommit();
} catch (IOException e) {
channel.txRollback();
}
和數(shù)據(jù)庫(kù)常用的事務(wù)機(jī)制類(lèi)似鬓梅,都是以commit和rollback為操作標(biāo)準(zhǔn)供置。
confirm
在confirm模式中,一旦channel使用該模式绽快,則其發(fā)送的每一條消息都將會(huì)附帶一個(gè)唯一ID值芥丧,當(dāng)RabbitMQ收到該消息時(shí)紧阔,將返回一個(gè)確認(rèn)給生產(chǎn)者,以告知消息安全到達(dá)续担。
由于confirm模式所使用的是異步處理擅耽,消息發(fā)送與ID確認(rèn)在不同線程處理,結(jié)果通過(guò)回調(diào)處理物遇,能夠有效提高性能乖仇。
//開(kāi)啟confim
channel.confirmSelect();
三種編程模式 :
- 普通模式(串行) : 發(fā)送一條confirm信息
- 批量模式(串行) : 發(fā)送一批confirm信息
- 異步模式 : 提供回調(diào)方法
普通模式
if (!channel.waitForConfirms()) {
System.out.println("消息發(fā)送失敗");
}
批量模式
for (int i = 0; i < 10; i++) {
channel.basicPublish(EXCHANGE_NAME,"routingKey" , null, msg.getBytes());
}
if (!channel.waitForConfirms()) {
System.out.println("消息發(fā)送失敗");
}
批量實(shí)際上就是讓所有信息發(fā)送完成后再確認(rèn)即可,大部分情況下批量模式性能更好询兴,但是如果批量模式出現(xiàn)問(wèn)題这敬,這一批數(shù)據(jù)都會(huì)重新發(fā)送,具體使用根據(jù)場(chǎng)景而定蕉朵。
異步模式
在異步模式中,生產(chǎn)者每發(fā)送一條消息都會(huì)將該消息的ID值放入一個(gè)集合當(dāng)中阳掐,然后由另一條線程根據(jù)集合進(jìn)行發(fā)送確認(rèn)始衅。
由于是集合,可能會(huì)有多個(gè)消息確認(rèn)收到缭保,這個(gè)multiple
為true時(shí)就是說(shuō)明有多個(gè)符合要求汛闸。
package confirm;
public class ConfirmTest {
public static void main(String[] args) throws Exception {
for (int i = 0; i < 100; i++) {
createMsg();
System.out.println("發(fā)送完成");
}
}
//聲明交換機(jī)的名稱(chēng)
public static final String EXCHANGE_NAME = "SUBSCRIPT_QUEUE";
public static void createMsg() throws Exception {
//同之前
Connection connection = RabbitMQUtils.
getRabbitMQConnection("localhost", 5672,
"/magnolia", "guest", "guest");
Channel channel = connection.createChannel();
//未確認(rèn)的消息集合
final SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<Long>());
channel.addConfirmListener(new ConfirmListener() {
//沒(méi)有問(wèn)題
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
if (multiple) {
confirmSet.headSet(deliveryTag + 1).clear();
}else {
confirmSet.remove(deliveryTag);
}
}
//消息沒(méi)有確認(rèn)
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
if (multiple) {
confirmSet.headSet(deliveryTag + 1).clear();
}else {
confirmSet.remove(deliveryTag);
}
}
});
}
}
感謝您百忙之中能夠看完本篇文章,祝您擁有美好的一天艺骂。
參考文章
- 維基百科 : 消息隊(duì)列
- Arvon : 常見(jiàn)消息隊(duì)列
- RabbitMQ官方文檔