每日一句
軍人天生就舍棄了戰(zhàn)斗的意義意述!
概述
RabitMQ 發(fā)布確認(rèn)浴栽,保證消息在磁盤(pán)上击困。
前提條件
1涎劈。隊(duì)列必須持久化 隊(duì)列持久化
2。隊(duì)列中的消息必須持久化 消息持久化
使用
三種發(fā)布確認(rèn)的方式:
1阅茶。單個(gè)發(fā)布確認(rèn)
2蛛枚。批量發(fā)布確認(rèn)
3。異步批量發(fā)布確認(rèn)
開(kāi)啟發(fā)布確認(rèn)的方法
//創(chuàng)建一個(gè)連接工廠
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setUsername("guest");
factory.setPassword("guest");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
**
//開(kāi)啟發(fā)布確認(rèn)
channel.confirmSelect();**
單個(gè)確認(rèn)
最簡(jiǎn)單的確認(rèn)方式脸哀,它是一種同步發(fā)布確認(rèn)的方式蹦浦,也就是說(shuō)發(fā)送一個(gè)消息后只有它被確認(rèn),后續(xù)的消息才能繼續(xù)發(fā)布撞蜂。
最大缺點(diǎn)是:發(fā)布速度特別的滿(mǎn)盲镶。
吞吐量:每秒不超過(guò)數(shù)百條發(fā)布的消息
/**
* 單個(gè)確認(rèn)
*/
public static void publishSingleMessage() throws Exception {
Channel channel = RabbitMqUtils.getChannel();
//生命隊(duì)列
String queueName = UUID.randomUUID().toString();
channel.queueDeclare(queueName, true, false, false, null);
**//開(kāi)啟發(fā)布確認(rèn)
channel.confirmSelect();**
//開(kāi)始時(shí)間
long begin = System.currentTimeMillis();
for (int i = 0; i < 1000; i++) {
String message = i + "";
channel.basicPublish("", queueName, null, message.getBytes());
//單個(gè)消息馬上進(jìn)行確認(rèn)
** boolean b = channel.waitForConfirms();**
if (b) {
System.out.println("消息發(fā)送成功!r蚬睢溉贿!");
}
}
//結(jié)束時(shí)間
long end = System.currentTimeMillis();
System.out.println("發(fā)送消息1000,單個(gè)發(fā)布確認(rèn)用時(shí): " + (end - begin) + " ms");
}
批量確認(rèn)
與單個(gè)等待確認(rèn)消息相比浦旱,先發(fā)布一批消息然后一起確認(rèn)可以極大地提高吞吐量宇色。
當(dāng)然這種方式的缺點(diǎn)就是:當(dāng)發(fā)生故障導(dǎo)致發(fā)布出現(xiàn)問(wèn)題時(shí),不知道是哪個(gè)消息出現(xiàn)問(wèn)題了,我們必須將整個(gè)批處理保存在內(nèi)存中宣蠕,以記錄重要的信息而后重新發(fā)布消息例隆。
當(dāng)然這種方案仍然是同步的,也一樣阻塞消息的發(fā)布
/**
* 批量確認(rèn)
*/
public static void publishBatchMessage() throws Exception {
Channel channel = RabbitMqUtils.getChannel();
//生命隊(duì)列
String queueName = UUID.randomUUID().toString();
channel.queueDeclare(queueName, true, false, false, null);
**//開(kāi)啟發(fā)布確認(rèn)
channel.confirmSelect();
//批量確認(rèn)消息大小
int batchSize = 100;
//未確認(rèn)消息個(gè)數(shù)
int outstandingMessageCount = 0;**
//開(kāi)始時(shí)間
long begin = System.currentTimeMillis();
for (int i = 0; i < 1000; i++) {
String message = i + "";
channel.basicPublish("", queueName, null, message.getBytes());
**outstandingMessageCount++;
//發(fā)送的消息 == 確認(rèn)消息的大小后才批量確認(rèn)
if (outstandingMessageCount == batchSize) {
channel.waitForConfirms();
outstandingMessageCount = 0;
}**
}
**//為了確保還有剩余沒(méi)有確認(rèn)消息 再次確認(rèn)
if (outstandingMessageCount > 0) {
channel.waitForConfirms();
}**
//結(jié)束時(shí)間
long end = System.currentTimeMillis();
System.out.println("發(fā)送消息1000植影,批量發(fā)布確認(rèn)100個(gè)用時(shí): " + (end - begin) + " ms");
}
異步確認(rèn)
它是利用回調(diào)函數(shù)來(lái)達(dá)到消息可靠性傳遞的裳擎,這個(gè)中間件也是通過(guò)函數(shù)回調(diào)來(lái)保證是否投遞成功
[圖片上傳失敗...(image-4b7849-1653380757612)]
/**
* 異步批量確認(rèn)
*
* @throws Exception
*/
public static void publishAsyncMessage() throws Exception {
try (Channel channel = RabbitMqUtils.getChannel()) {
String queueName = UUID.randomUUID().toString();
channel.queueDeclare(queueName, false, false, false, null);
** //開(kāi)啟發(fā)布確認(rèn)
channel.confirmSelect();
**
//線(xiàn)程安全有序的一個(gè)哈希表,適用于高并發(fā)的情況
//1.輕松的將序號(hào)與消息進(jìn)行關(guān)聯(lián) 2.輕松批量刪除條目 只要給到序列號(hào) 3.支持并發(fā)訪問(wèn)
ConcurrentSkipListMap<Long, String> outstandingConfirms = new ConcurrentSkipListMap<>();
**//確認(rèn)收到消息的一個(gè)回調(diào)**
//1.消息序列號(hào)
//2.multiple 是否是批量確認(rèn)
//false 確認(rèn)當(dāng)前序列號(hào)消息
ConfirmCallback ackCallback = (sequenceNumber, multiple) -> {
if (multiple) {
//返回的是小于等于當(dāng)前序列號(hào)的未確認(rèn)消息 是一個(gè) map
ConcurrentNavigableMap<Long, String> confirmed =
outstandingConfirms.headMap(sequenceNumber, true);
//清除該部分未確認(rèn)消息
confirmed.clear();
} else {
//只清除當(dāng)前序列號(hào)的消息
outstandingConfirms.remove(sequenceNumber);
}
};
//未確認(rèn)消息的回調(diào)
ConfirmCallback nackCallback = (sequenceNumber, multiple) -> {
String message = outstandingConfirms.get(sequenceNumber);
System.out.println("發(fā)布的消息" + message + "未被確認(rèn)思币,序列號(hào)" + sequenceNumber);
};
**//添加一個(gè)異步確認(rèn)的監(jiān)聽(tīng)器
//1.確認(rèn)收到消息的回調(diào)
//2.未收到消息的回調(diào)
channel.addConfirmListener(ackCallback, nackCallback);**
long begin = System.currentTimeMillis();
for (int i = 0; i < 1000; i++) {
String message = "消息" + i;
**//channel.getNextPublishSeqNo()獲取下一個(gè)消息的序列號(hào)
//通過(guò)序列號(hào)與消息體進(jìn)行一個(gè)關(guān)聯(lián),全部都是未確認(rèn)的消息體
//將發(fā)布的序號(hào)和發(fā)布消息保存到map中
outstandingConfirms.put(channel.getNextPublishSeqNo(), message);**
channel.basicPublish("", queueName, null, message.getBytes());
}
long end = System.currentTimeMillis();
System.out.println("發(fā)布" + 1000 + "個(gè)異步確認(rèn)消息,耗時(shí)" + (end - begin) + "ms");
}
}
如何處理異步未確認(rèn)消息
最好的解決的解決方案就是把未確認(rèn)的消息放到一個(gè)基于內(nèi)存的能被發(fā)布線(xiàn)程訪問(wèn)鹿响,適用于高并發(fā)的的隊(duì)列。
比如說(shuō)用 ConcurrentLinkedQueue 谷饿、這個(gè)隊(duì)列在 confirm callbacks 與發(fā)布線(xiàn)程之間進(jìn)行消息的傳遞惶我。
ConcurrentSkipListMap
等等都可。
面試題
如何保證消息不丟失博投?
就市面上常見(jiàn)的消息隊(duì)列而言绸贡,只要配置得當(dāng),我們的消息就不會(huì)丟失毅哗。
消息隊(duì)列主要有三個(gè)階段:
1听怕。生產(chǎn)消息
2。存儲(chǔ)消息
3虑绵。消費(fèi)消息
1尿瞭。生產(chǎn)消息
生產(chǎn)者發(fā)送消息至 Broker ,需要處理 Broker 的響應(yīng)翅睛,不論是同步還是異步發(fā)送消息声搁,同步和異步回調(diào)都需要做好 try-catch ,妥善的處理響應(yīng)捕发。
如果 Broker 返回寫(xiě)入失敗等錯(cuò)誤消息疏旨,需要重試發(fā)送。
當(dāng)多次發(fā)送失敗需要作報(bào)警扎酷,日志記錄等檐涝。這樣就能保證在生產(chǎn)消息階段消息不會(huì)丟失。
2法挨。存儲(chǔ)消息
存儲(chǔ)消息階段需要在消息刷盤(pán)之后再給生產(chǎn)者響應(yīng)骤铃,假設(shè)消息寫(xiě)入緩存中就返回響應(yīng),那么機(jī)器突然斷電這消息就沒(méi)了坷剧,而生產(chǎn)者以為已經(jīng)發(fā)送成功了。
如果 Broker 是集群部署喊暖,有多副本機(jī)制惫企,即消息不僅僅要寫(xiě)入當(dāng)前 Broker ,還需要寫(xiě)入副本機(jī)中。
那配置成至少寫(xiě)入兩臺(tái)機(jī)子后再給生產(chǎn)者響應(yīng)。這樣基本上就能保證存儲(chǔ)的可靠了狞尔。一臺(tái)掛了還有一臺(tái)還
在呢(假如怕兩臺(tái)都掛了..那就再多些)丛版。
3。消費(fèi)消息
我們應(yīng)該在消費(fèi)者真正執(zhí)行完業(yè)務(wù)邏輯之后偏序,再發(fā)送給 Broker 消費(fèi)成功页畦,這才是真正的消費(fèi)了。
所以只要我們?cè)谙I(yè)務(wù)邏輯處理完成之后再給 Broker 響應(yīng)研儒,那么消費(fèi)階段消息就不會(huì)丟失
總結(jié):
1豫缨。生產(chǎn)者 需要處理好 Broker 的響應(yīng),出錯(cuò)情況下利用重試端朵、報(bào)警等手段
2好芭。Broker 需要控制響應(yīng)的時(shí)機(jī),單機(jī)情況下是消息刷盤(pán)后返回響應(yīng)冲呢,集群多副本情況下舍败,即發(fā)送至兩個(gè)副本及以上的情況下再返回響應(yīng)。
3敬拓。消費(fèi)者 需要在執(zhí)行完真正的業(yè)務(wù)邏輯之后再返回響應(yīng)給 Broker
volatile 關(guān)鍵字的作用邻薯?
1。保證內(nèi)存可見(jiàn)性
1.1 基本概念
可見(jiàn)性 是指線(xiàn)程之間的可見(jiàn)性乘凸,一個(gè)線(xiàn)程修改的狀態(tài)對(duì)另一個(gè)線(xiàn)程是可見(jiàn)的厕诡。也就是一個(gè)線(xiàn)程修改的結(jié)果,另一個(gè)線(xiàn)程馬上就能夠看到翰意。
1.2 實(shí)現(xiàn)原理
[圖片上傳失敗...(image-ce5d21-1653380757612)]
當(dāng)對(duì)非volatile變量進(jìn)行讀寫(xiě)的時(shí)候木人,每個(gè)線(xiàn)程先從主內(nèi)存拷貝變量到CPU緩存中,如果計(jì)算機(jī)有多個(gè)CPU冀偶,每個(gè)線(xiàn)程可能在不同的CPU上被處理醒第,這意味著每個(gè)線(xiàn)程可以拷貝到不同的CPU cache中。volatile變量不會(huì)被緩存在寄存器或者對(duì)其他處理器不可見(jiàn)的地方进鸠,保證了每次讀寫(xiě)變量都從主內(nèi)存中讀稠曼,跳過(guò)CPU cache這一步。當(dāng)一個(gè)線(xiàn)程修改了這個(gè)變量的值客年,新值對(duì)于其他線(xiàn)程是立即得知的霞幅。
2。禁止指令重排序
2.1 基本概念
指令重排序是JVM為了優(yōu)化指令量瓜、提高程序運(yùn)行效率司恳,在不影響單線(xiàn)程程序執(zhí)行結(jié)果的前提下,盡可能地提高并行度绍傲。指令重排序包括編譯器重排序和運(yùn)行時(shí)重排序扔傅。在JDK1.5之后耍共,可以使用volatile變量禁止指令重排序。針對(duì)volatile修飾的變量猎塞,在讀寫(xiě)操作指令前后會(huì)插入內(nèi)存屏障试读,指令重排序時(shí)不能把后面的指令重排序到內(nèi)存屏
示例說(shuō)明:
double r = 2.1; //(1)
double pi = 3.14;//(2)
double area = pi*r*r;//(3)
雖然代碼語(yǔ)句的定義順序?yàn)?->2->3,但是計(jì)算順序1->2->3與2->1->3對(duì)結(jié)果并無(wú)影響荠耽,所以編譯時(shí)和運(yùn)行時(shí)可以根據(jù)需要對(duì)1钩骇、2語(yǔ)句進(jìn)行重排序。
2.2 指令重排帶來(lái)的問(wèn)題
線(xiàn)程A中
{
context = loadContext();
inited = true;
}
線(xiàn)程B中
{
if (inited)
fun(context);
}
如果線(xiàn)程A中的指令發(fā)生了重排序铝量,那么B中很可能就會(huì)拿到一個(gè)尚未初始化或尚未初始化完成的context,從而引發(fā)程序錯(cuò)誤倘屹。
2.3 禁止指令重排的原理
olatile關(guān)鍵字提供內(nèi)存屏障的方式來(lái)防止指令被重排,編譯器在生成字節(jié)碼文件時(shí)款违,會(huì)在指令序列中插入內(nèi)存屏障來(lái)禁止特定類(lèi)型的處理器重排序唐瀑。
JVM內(nèi)存屏障插入策略:
- 每個(gè)volatile寫(xiě)操作的前面插入一個(gè)StoreStore屏障;
- 在每個(gè)volatile寫(xiě)操作的后面插入一個(gè)StoreLoad屏障插爹;
- 在每個(gè)volatile讀操作的后面插入一個(gè)LoadLoad屏障哄辣;
- 在每個(gè)volatile讀操作的后面插入一個(gè)LoadStore屏障。
3赠尾。適用場(chǎng)景
(1)volatile關(guān)鍵字無(wú)法同時(shí)保證內(nèi)存可見(jiàn)性和原子性力穗。加鎖機(jī)制既可以確保可見(jiàn)性也可以確保原子性气嫁。
(2)volatile屏蔽掉了JVM中必要的代碼優(yōu)化当窗,所以在效率上比較低,因此一定在必要時(shí)才使用此關(guān)鍵字寸宵。
介紹一下Netty崖面?
Netty是一個(gè)高性能、異步事件驅(qū)動(dòng)的NIO框架梯影。
簡(jiǎn)化并優(yōu)化了TCP和UDP套接字等網(wǎng)絡(luò)編程巫员,性能和安全等很多方面都做了優(yōu)化。
3.支持多種協(xié)議甲棍,如FTP简识、SMTP、HTTP以及各種二進(jìn)制和基于文本的傳統(tǒng)協(xié)議感猛。
在網(wǎng)絡(luò)編程中七扰,Netty是絕對(duì)的王者。
有很多開(kāi)源項(xiàng)目都用到了Netty陪白。
1颈走。市面上很多消息推送系統(tǒng)都是基于Netty來(lái)做的。
2咱士。我們常用的框架:Dubbo疫鹊、RocketMQ袖瞻、ES等等都用到了Netty。
使用Netty的項(xiàng)目統(tǒng)計(jì):https://netty.io/wiki/related-projects.html
你好拆吆,我是yltrcc,日常分享技術(shù)點(diǎn)滴脂矫,歡迎關(guān)注我:ylcoder