每日一句
軍人天生就舍棄了戰(zhàn)斗的意義!
概述
RabitMQ 發(fā)布確認(rèn)小染,保證消息在磁盤上援岩。
前提條件
1。隊(duì)列必須持久化 隊(duì)列持久化
2锹锰。隊(duì)列中的消息必須持久化 消息持久化
使用
三種發(fā)布確認(rèn)的方式:
1芥炭。單個(gè)發(fā)布確認(rèn)
2。批量發(fā)布確認(rèn)
3城须。異步批量發(fā)布確認(rèn)
開啟發(fā)布確認(rèn)的方法
//創(chuàng)建一個(gè)連接工廠
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setUsername("guest");
factory.setPassword("guest");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
**
//開啟發(fā)布確認(rèn)
channel.confirmSelect();**
單個(gè)確認(rèn)
最簡(jiǎn)單的確認(rèn)方式蚤认,它是一種同步發(fā)布確認(rèn)的方式米苹,也就是說(shuō)發(fā)送一個(gè)消息后只有它被確認(rèn)糕伐,后續(xù)的消息才能繼續(xù)發(fā)布。
最大缺點(diǎn)是:發(fā)布速度特別的滿蘸嘶。
吞吐量:每秒不超過數(shù)百條發(fā)布的消息
/**
* 單個(gè)確認(rèn)
*/
public static void publishSingleMessage() throws Exception {
Channel channel = RabbitMqUtils.getChannel();
//生命隊(duì)列
String queueName = UUID.randomUUID().toString();
channel.queueDeclare(queueName, true, false, false, null);
**//開啟發(fā)布確認(rèn)
channel.confirmSelect();**
//開始時(shí)間
long begin = System.currentTimeMillis();
for (int i = 0; i < 1000; i++) {
String message = i + "";
channel.basicPublish("", queueName, null, message.getBytes());
//單個(gè)消息馬上進(jìn)行確認(rèn)
** boolean b = channel.waitForConfirms();**
if (b) {
System.out.println("消息發(fā)送成功A记啤!训唱!");
}
}
//結(jié)束時(shí)間
long end = System.currentTimeMillis();
System.out.println("發(fā)送消息1000褥蚯,單個(gè)發(fā)布確認(rèn)用時(shí): " + (end - begin) + " ms");
}
批量確認(rèn)
與單個(gè)等待確認(rèn)消息相比,先發(fā)布一批消息然后一起確認(rèn)可以極大地提高吞吐量况增。
當(dāng)然這種方式的缺點(diǎn)就是:當(dāng)發(fā)生故障導(dǎo)致發(fā)布出現(xiàn)問題時(shí)赞庶,不知道是哪個(gè)消息出現(xiàn)問題了,我們必須將整個(gè)批處理保存在內(nèi)存中澳骤,以記錄重要的信息而后重新發(fā)布消息歧强。
當(dāng)然這種方案仍然是同步的,也一樣阻塞消息的發(fā)布
/**
* 批量確認(rèn)
*/
public static void publishBatchMessage() throws Exception {
Channel channel = RabbitMqUtils.getChannel();
//生命隊(duì)列
String queueName = UUID.randomUUID().toString();
channel.queueDeclare(queueName, true, false, false, null);
**//開啟發(fā)布確認(rèn)
channel.confirmSelect();
//批量確認(rèn)消息大小
int batchSize = 100;
//未確認(rèn)消息個(gè)數(shù)
int outstandingMessageCount = 0;**
//開始時(shí)間
long begin = System.currentTimeMillis();
for (int i = 0; i < 1000; i++) {
String message = i + "";
channel.basicPublish("", queueName, null, message.getBytes());
**outstandingMessageCount++;
//發(fā)送的消息 == 確認(rèn)消息的大小后才批量確認(rèn)
if (outstandingMessageCount == batchSize) {
channel.waitForConfirms();
outstandingMessageCount = 0;
}**
}
**//為了確保還有剩余沒有確認(rèn)消息 再次確認(rèn)
if (outstandingMessageCount > 0) {
channel.waitForConfirms();
}**
//結(jié)束時(shí)間
long end = System.currentTimeMillis();
System.out.println("發(fā)送消息1000为肮,批量發(fā)布確認(rèn)100個(gè)用時(shí): " + (end - begin) + " ms");
}
異步確認(rèn)
它是利用回調(diào)函數(shù)來(lái)達(dá)到消息可靠性傳遞的摊册,這個(gè)中間件也是通過函數(shù)回調(diào)來(lái)保證是否投遞成功
/**
* 異步批量確認(rèn)
*
* @throws Exception
*/
public static void publishAsyncMessage() throws Exception {
try (Channel channel = RabbitMqUtils.getChannel()) {
String queueName = UUID.randomUUID().toString();
channel.queueDeclare(queueName, false, false, false, null);
** //開啟發(fā)布確認(rèn)
channel.confirmSelect();
**
//線程安全有序的一個(gè)哈希表,適用于高并發(fā)的情況
//1.輕松的將序號(hào)與消息進(jìn)行關(guān)聯(lián) 2.輕松批量刪除條目 只要給到序列號(hào) 3.支持并發(fā)訪問
ConcurrentSkipListMap<Long, String> outstandingConfirms = new ConcurrentSkipListMap<>();
**//確認(rèn)收到消息的一個(gè)回調(diào)**
//1.消息序列號(hào)
//2.multiple 是否是批量確認(rèn)
//false 確認(rèn)當(dāng)前序列號(hào)消息
ConfirmCallback ackCallback = (sequenceNumber, multiple) -> {
if (multiple) {
//返回的是小于等于當(dāng)前序列號(hào)的未確認(rèn)消息 是一個(gè) map
ConcurrentNavigableMap<Long, String> confirmed =
outstandingConfirms.headMap(sequenceNumber, true);
//清除該部分未確認(rèn)消息
confirmed.clear();
} else {
//只清除當(dāng)前序列號(hào)的消息
outstandingConfirms.remove(sequenceNumber);
}
};
//未確認(rèn)消息的回調(diào)
ConfirmCallback nackCallback = (sequenceNumber, multiple) -> {
String message = outstandingConfirms.get(sequenceNumber);
System.out.println("發(fā)布的消息" + message + "未被確認(rèn)颊艳,序列號(hào)" + sequenceNumber);
};
**//添加一個(gè)異步確認(rèn)的監(jiān)聽器
//1.確認(rèn)收到消息的回調(diào)
//2.未收到消息的回調(diào)
channel.addConfirmListener(ackCallback, nackCallback);**
long begin = System.currentTimeMillis();
for (int i = 0; i < 1000; i++) {
String message = "消息" + i;
**//channel.getNextPublishSeqNo()獲取下一個(gè)消息的序列號(hào)
//通過序列號(hào)與消息體進(jìn)行一個(gè)關(guān)聯(lián),全部都是未確認(rèn)的消息體
//將發(fā)布的序號(hào)和發(fā)布消息保存到map中
outstandingConfirms.put(channel.getNextPublishSeqNo(), message);**
channel.basicPublish("", queueName, null, message.getBytes());
}
long end = System.currentTimeMillis();
System.out.println("發(fā)布" + 1000 + "個(gè)異步確認(rèn)消息,耗時(shí)" + (end - begin) + "ms");
}
}
如何處理異步未確認(rèn)消息
最好的解決的解決方案就是把未確認(rèn)的消息放到一個(gè)基于內(nèi)存的能被發(fā)布線程訪問茅特,適用于高并發(fā)的的隊(duì)列忘分。
比如說(shuō)用 ConcurrentLinkedQueue 、這個(gè)隊(duì)列在 confirm callbacks 與發(fā)布線程之間進(jìn)行消息的傳遞白修。
ConcurrentSkipListMap
等等都可妒峦。
面試題
如何保證消息不丟失?
就市面上常見的消息隊(duì)列而言兵睛,只要配置得當(dāng)舟山,我們的消息就不會(huì)丟失。
消息隊(duì)列主要有三個(gè)階段:
1卤恳。生產(chǎn)消息
2累盗。存儲(chǔ)消息
3。消費(fèi)消息
1突琳。生產(chǎn)消息
生產(chǎn)者發(fā)送消息至 Broker 若债,需要處理 Broker 的響應(yīng),不論是同步還是異步發(fā)送消息拆融,同步和異步回調(diào)都需要做好 try-catch 蠢琳,妥善的處理響應(yīng)。
如果 Broker 返回寫入失敗等錯(cuò)誤消息镜豹,需要重試發(fā)送傲须。
當(dāng)多次發(fā)送失敗需要作報(bào)警,日志記錄等趟脂。這樣就能保證在生產(chǎn)消息階段消息不會(huì)丟失泰讽。
2。存儲(chǔ)消息
存儲(chǔ)消息階段需要在消息刷盤之后再給生產(chǎn)者響應(yīng)昔期,假設(shè)消息寫入緩存中就返回響應(yīng)已卸,那么機(jī)器突然斷電這消息就沒了,而生產(chǎn)者以為已經(jīng)發(fā)送成功了硼一。
如果 Broker 是集群部署累澡,有多副本機(jī)制,即消息不僅僅要寫入當(dāng)前 Broker ,還需要寫入副本機(jī)中般贼。
那配置成至少寫入兩臺(tái)機(jī)子后再給生產(chǎn)者響應(yīng)愧哟。這樣基本上就能保證存儲(chǔ)的可靠了。一臺(tái)掛了還有一臺(tái)還
在呢(假如怕兩臺(tái)都掛了..那就再多些)哼蛆。
3蕊梧。消費(fèi)消息
我們應(yīng)該在消費(fèi)者真正執(zhí)行完業(yè)務(wù)邏輯之后,再發(fā)送給 Broker 消費(fèi)成功人芽,這才是真正的消費(fèi)了望几。
所以只要我們?cè)谙I(yè)務(wù)邏輯處理完成之后再給 Broker 響應(yīng),那么消費(fèi)階段消息就不會(huì)丟失
總結(jié):
1萤厅。生產(chǎn)者 需要處理好 Broker 的響應(yīng)橄抹,出錯(cuò)情況下利用重試靴迫、報(bào)警等手段
2。Broker 需要控制響應(yīng)的時(shí)機(jī)楼誓,單機(jī)情況下是消息刷盤后返回響應(yīng)玉锌,集群多副本情況下,即發(fā)送至兩個(gè)副本及以上的情況下再返回響應(yīng)疟羹。
3主守。消費(fèi)者 需要在執(zhí)行完真正的業(yè)務(wù)邏輯之后再返回響應(yīng)給 Broker
volatile 關(guān)鍵字的作用?
1榄融。保證內(nèi)存可見性
1.1 基本概念
可見性 是指線程之間的可見性参淫,一個(gè)線程修改的狀態(tài)對(duì)另一個(gè)線程是可見的。也就是一個(gè)線程修改的結(jié)果愧杯,另一個(gè)線程馬上就能夠看到涎才。
1.2 實(shí)現(xiàn)原理
當(dāng)對(duì)非volatile變量進(jìn)行讀寫的時(shí)候,每個(gè)線程先從主內(nèi)存拷貝變量到CPU緩存中力九,如果計(jì)算機(jī)有多個(gè)CPU耍铜,每個(gè)線程可能在不同的CPU上被處理,這意味著每個(gè)線程可以拷貝到不同的CPU cache中跌前。volatile變量不會(huì)被緩存在寄存器或者對(duì)其他處理器不可見的地方棕兼,保證了每次讀寫變量都從主內(nèi)存中讀,跳過CPU cache這一步抵乓。當(dāng)一個(gè)線程修改了這個(gè)變量的值伴挚,新值對(duì)于其他線程是立即得知的。
2臂寝。禁止指令重排序
2.1 基本概念
指令重排序是JVM為了優(yōu)化指令章鲤、提高程序運(yùn)行效率,在不影響單線程程序執(zhí)行結(jié)果的前提下咆贬,盡可能地提高并行度。指令重排序包括編譯器重排序和運(yùn)行時(shí)重排序帚呼。在JDK1.5之后掏缎,可以使用volatile變量禁止指令重排序。針對(duì)volatile修飾的變量煤杀,在讀寫操作指令前后會(huì)插入內(nèi)存屏障眷蜈,指令重排序時(shí)不能把后面的指令重排序到內(nèi)存屏
示例說(shuō)明:
double r = 2.1; //(1)
double pi = 3.14;//(2)
double area = pi*r*r;//(3)
雖然代碼語(yǔ)句的定義順序?yàn)?->2->3,但是計(jì)算順序1->2->3與2->1->3對(duì)結(jié)果并無(wú)影響沈自,所以編譯時(shí)和運(yùn)行時(shí)可以根據(jù)需要對(duì)1酌儒、2語(yǔ)句進(jìn)行重排序。
2.2 指令重排帶來(lái)的問題
線程A中
{
context = loadContext();
inited = true;
}
線程B中
{
if (inited)
fun(context);
}
如果線程A中的指令發(fā)生了重排序枯途,那么B中很可能就會(huì)拿到一個(gè)尚未初始化或尚未初始化完成的context,從而引發(fā)程序錯(cuò)誤忌怎。
2.3 禁止指令重排的原理
olatile關(guān)鍵字提供內(nèi)存屏障的方式來(lái)防止指令被重排籍滴,編譯器在生成字節(jié)碼文件時(shí),會(huì)在指令序列中插入內(nèi)存屏障來(lái)禁止特定類型的處理器重排序榴啸。
JVM內(nèi)存屏障插入策略:
- 每個(gè)volatile寫操作的前面插入一個(gè)StoreStore屏障孽惰;
- 在每個(gè)volatile寫操作的后面插入一個(gè)StoreLoad屏障;
- 在每個(gè)volatile讀操作的后面插入一個(gè)LoadLoad屏障鸥印;
- 在每個(gè)volatile讀操作的后面插入一個(gè)LoadStore屏障勋功。
3。適用場(chǎng)景
(1)volatile關(guān)鍵字無(wú)法同時(shí)保證內(nèi)存可見性和原子性库说。加鎖機(jī)制既可以確笨裥可見性也可以確保原子性。
(2)volatile屏蔽掉了JVM中必要的代碼優(yōu)化潜的,所以在效率上比較低要销,因此一定在必要時(shí)才使用此關(guān)鍵字。
介紹一下Netty夏块?
Netty是一個(gè)高性能疏咐、異步事件驅(qū)動(dòng)的NIO框架。
簡(jiǎn)化并優(yōu)化了TCP和UDP套接字等網(wǎng)絡(luò)編程脐供,性能和安全等很多方面都做了優(yōu)化浑塞。
3.支持多種協(xié)議,如FTP政己、SMTP酌壕、HTTP以及各種二進(jìn)制和基于文本的傳統(tǒng)協(xié)議。
在網(wǎng)絡(luò)編程中歇由,Netty是絕對(duì)的王者卵牍。
有很多開源項(xiàng)目都用到了Netty。
1沦泌。市面上很多消息推送系統(tǒng)都是基于Netty來(lái)做的糊昙。
2。我們常用的框架:Dubbo谢谦、RocketMQ释牺、ES等等都用到了Netty。
使用Netty的項(xiàng)目統(tǒng)計(jì):https://netty.io/wiki/related-projects.html
你好回挽,我是yltrcc没咙,日常分享技術(shù)點(diǎn)滴,歡迎關(guān)注我:ylcoder
本文由博客一文多發(fā)平臺(tái) OpenWrite 發(fā)布千劈!