RabitMQ 發(fā)布確認(rèn)

每日一句

軍人天生就舍棄了戰(zhàn)斗的意義意述!

概述

RabitMQ 發(fā)布確認(rèn)浴栽,保證消息在磁盤(pán)上击困。

前提條件

1涎劈。隊(duì)列必須持久化 隊(duì)列持久化

2。隊(duì)列中的消息必須持久化 消息持久化

使用

三種發(fā)布確認(rèn)的方式:

1阅茶。單個(gè)發(fā)布確認(rèn)

2蛛枚。批量發(fā)布確認(rèn)

3。異步批量發(fā)布確認(rèn)

開(kāi)啟發(fā)布確認(rèn)的方法

 //創(chuàng)建一個(gè)連接工廠
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setUsername("guest");
factory.setPassword("guest");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
**
 //開(kāi)啟發(fā)布確認(rèn)
channel.confirmSelect();**

單個(gè)確認(rèn)

最簡(jiǎn)單的確認(rèn)方式脸哀,它是一種同步發(fā)布確認(rèn)的方式蹦浦,也就是說(shuō)發(fā)送一個(gè)消息后只有它被確認(rèn),后續(xù)的消息才能繼續(xù)發(fā)布撞蜂。

最大缺點(diǎn)是:發(fā)布速度特別的滿(mǎn)盲镶。

吞吐量:每秒不超過(guò)數(shù)百條發(fā)布的消息

/**
 * 單個(gè)確認(rèn)
 */
public static void publishSingleMessage() throws Exception {
    Channel channel = RabbitMqUtils.getChannel();
    //生命隊(duì)列
    String queueName = UUID.randomUUID().toString();

    channel.queueDeclare(queueName, true, false, false, null);
    **//開(kāi)啟發(fā)布確認(rèn)
    channel.confirmSelect();**
    //開(kāi)始時(shí)間
    long begin = System.currentTimeMillis();

    for (int i = 0; i < 1000; i++) {
        String message = i + "";
        channel.basicPublish("", queueName, null, message.getBytes());
        //單個(gè)消息馬上進(jìn)行確認(rèn)
       ** boolean b = channel.waitForConfirms();**
        if (b) {
            System.out.println("消息發(fā)送成功!r蚬睢溉贿!");
        }
    }

    //結(jié)束時(shí)間
    long end = System.currentTimeMillis();

    System.out.println("發(fā)送消息1000,單個(gè)發(fā)布確認(rèn)用時(shí): " + (end - begin) + " ms");
}

批量確認(rèn)

與單個(gè)等待確認(rèn)消息相比浦旱,先發(fā)布一批消息然后一起確認(rèn)可以極大地提高吞吐量宇色。

當(dāng)然這種方式的缺點(diǎn)就是:當(dāng)發(fā)生故障導(dǎo)致發(fā)布出現(xiàn)問(wèn)題時(shí),不知道是哪個(gè)消息出現(xiàn)問(wèn)題了,我們必須將整個(gè)批處理保存在內(nèi)存中宣蠕,以記錄重要的信息而后重新發(fā)布消息例隆。

當(dāng)然這種方案仍然是同步的,也一樣阻塞消息的發(fā)布

/**
 * 批量確認(rèn)
 */
public static void publishBatchMessage() throws Exception {
    Channel channel = RabbitMqUtils.getChannel();
    //生命隊(duì)列
    String queueName = UUID.randomUUID().toString();

    channel.queueDeclare(queueName, true, false, false, null);
    **//開(kāi)啟發(fā)布確認(rèn)
    channel.confirmSelect();
    //批量確認(rèn)消息大小
    int batchSize = 100;
    //未確認(rèn)消息個(gè)數(shù)
    int outstandingMessageCount = 0;**

    //開(kāi)始時(shí)間
    long begin = System.currentTimeMillis();

    for (int i = 0; i < 1000; i++) {
        String message = i + "";
        channel.basicPublish("", queueName, null, message.getBytes());
        **outstandingMessageCount++;
        //發(fā)送的消息 == 確認(rèn)消息的大小后才批量確認(rèn)
        if (outstandingMessageCount == batchSize) {
            channel.waitForConfirms();
            outstandingMessageCount = 0;
        }**
    }
    **//為了確保還有剩余沒(méi)有確認(rèn)消息 再次確認(rèn)
    if (outstandingMessageCount > 0) {
        channel.waitForConfirms();
    }**
    //結(jié)束時(shí)間
    long end = System.currentTimeMillis();

    System.out.println("發(fā)送消息1000植影,批量發(fā)布確認(rèn)100個(gè)用時(shí): " + (end - begin) + " ms");
}

異步確認(rèn)

它是利用回調(diào)函數(shù)來(lái)達(dá)到消息可靠性傳遞的裳擎,這個(gè)中間件也是通過(guò)函數(shù)回調(diào)來(lái)保證是否投遞成功

[圖片上傳失敗...(image-4b7849-1653380757612)]

/**
 * 異步批量確認(rèn)
 *
 * @throws Exception
 */
public static void publishAsyncMessage() throws Exception {
    try (Channel channel = RabbitMqUtils.getChannel()) {
        String queueName = UUID.randomUUID().toString();
        channel.queueDeclare(queueName, false, false, false, null);
       ** //開(kāi)啟發(fā)布確認(rèn)
        channel.confirmSelect();
**
        //線(xiàn)程安全有序的一個(gè)哈希表,適用于高并發(fā)的情況
        //1.輕松的將序號(hào)與消息進(jìn)行關(guān)聯(lián) 2.輕松批量刪除條目 只要給到序列號(hào) 3.支持并發(fā)訪問(wèn)
        ConcurrentSkipListMap<Long, String> outstandingConfirms = new ConcurrentSkipListMap<>();

        **//確認(rèn)收到消息的一個(gè)回調(diào)**
        //1.消息序列號(hào)
        //2.multiple  是否是批量確認(rèn)
        //false 確認(rèn)當(dāng)前序列號(hào)消息
        ConfirmCallback ackCallback = (sequenceNumber, multiple) -> {
            if (multiple) {
                //返回的是小于等于當(dāng)前序列號(hào)的未確認(rèn)消息 是一個(gè) map
                ConcurrentNavigableMap<Long, String> confirmed =
                        outstandingConfirms.headMap(sequenceNumber, true);
                //清除該部分未確認(rèn)消息
                confirmed.clear();
            } else {
                //只清除當(dāng)前序列號(hào)的消息
                outstandingConfirms.remove(sequenceNumber);
            }
        };
        //未確認(rèn)消息的回調(diào)
        ConfirmCallback nackCallback = (sequenceNumber, multiple) -> {
            String message = outstandingConfirms.get(sequenceNumber);
            System.out.println("發(fā)布的消息" + message + "未被確認(rèn)思币,序列號(hào)" + sequenceNumber);
        };

        **//添加一個(gè)異步確認(rèn)的監(jiān)聽(tīng)器
        //1.確認(rèn)收到消息的回調(diào)
        //2.未收到消息的回調(diào)
        channel.addConfirmListener(ackCallback, nackCallback);**

        long begin = System.currentTimeMillis();

        for (int i = 0; i < 1000; i++) {
            String message = "消息" + i;
            **//channel.getNextPublishSeqNo()獲取下一個(gè)消息的序列號(hào)
            //通過(guò)序列號(hào)與消息體進(jìn)行一個(gè)關(guān)聯(lián),全部都是未確認(rèn)的消息體
            //將發(fā)布的序號(hào)和發(fā)布消息保存到map中
            outstandingConfirms.put(channel.getNextPublishSeqNo(), message);**
            channel.basicPublish("", queueName, null, message.getBytes());
        }
        long end = System.currentTimeMillis();
        System.out.println("發(fā)布" + 1000 + "個(gè)異步確認(rèn)消息,耗時(shí)" + (end - begin) + "ms");
    }

}

如何處理異步未確認(rèn)消息

最好的解決的解決方案就是把未確認(rèn)的消息放到一個(gè)基于內(nèi)存的能被發(fā)布線(xiàn)程訪問(wèn)鹿响,適用于高并發(fā)的的隊(duì)列。
比如說(shuō)用 ConcurrentLinkedQueue 谷饿、這個(gè)隊(duì)列在 confirm callbacks 與發(fā)布線(xiàn)程之間進(jìn)行消息的傳遞惶我。

ConcurrentSkipListMap

等等都可。

面試題

如何保證消息不丟失博投?

就市面上常見(jiàn)的消息隊(duì)列而言绸贡,只要配置得當(dāng),我們的消息就不會(huì)丟失毅哗。

消息隊(duì)列主要有三個(gè)階段:

1听怕。生產(chǎn)消息

2。存儲(chǔ)消息

3虑绵。消費(fèi)消息

1尿瞭。生產(chǎn)消息

生產(chǎn)者發(fā)送消息至 Broker ,需要處理 Broker 的響應(yīng)翅睛,不論是同步還是異步發(fā)送消息声搁,同步和異步回調(diào)都需要做好 try-catch ,妥善的處理響應(yīng)捕发。

如果 Broker 返回寫(xiě)入失敗等錯(cuò)誤消息疏旨,需要重試發(fā)送。

當(dāng)多次發(fā)送失敗需要作報(bào)警扎酷,日志記錄等檐涝。這樣就能保證在生產(chǎn)消息階段消息不會(huì)丟失。

2法挨。存儲(chǔ)消息

存儲(chǔ)消息階段需要在消息刷盤(pán)之后再給生產(chǎn)者響應(yīng)骤铃,假設(shè)消息寫(xiě)入緩存中就返回響應(yīng),那么機(jī)器突然斷電這消息就沒(méi)了坷剧,而生產(chǎn)者以為已經(jīng)發(fā)送成功了。

如果 Broker 是集群部署喊暖,有多副本機(jī)制惫企,即消息不僅僅要寫(xiě)入當(dāng)前 Broker ,還需要寫(xiě)入副本機(jī)中。

那配置成至少寫(xiě)入兩臺(tái)機(jī)子后再給生產(chǎn)者響應(yīng)。這樣基本上就能保證存儲(chǔ)的可靠了狞尔。一臺(tái)掛了還有一臺(tái)還

在呢(假如怕兩臺(tái)都掛了..那就再多些)丛版。

3。消費(fèi)消息

我們應(yīng)該在消費(fèi)者真正執(zhí)行完業(yè)務(wù)邏輯之后偏序,再發(fā)送給 Broker 消費(fèi)成功页畦,這才是真正的消費(fèi)了。

所以只要我們?cè)谙I(yè)務(wù)邏輯處理完成之后再給 Broker 響應(yīng)研儒,那么消費(fèi)階段消息就不會(huì)丟失

總結(jié):

1豫缨。生產(chǎn)者 需要處理好 Broker 的響應(yīng),出錯(cuò)情況下利用重試端朵、報(bào)警等手段

2好芭。Broker 需要控制響應(yīng)的時(shí)機(jī),單機(jī)情況下是消息刷盤(pán)后返回響應(yīng)冲呢,集群多副本情況下舍败,即發(fā)送至兩個(gè)副本及以上的情況下再返回響應(yīng)。

3敬拓。消費(fèi)者 需要在執(zhí)行完真正的業(yè)務(wù)邏輯之后再返回響應(yīng)給 Broker

volatile 關(guān)鍵字的作用邻薯?

1。保證內(nèi)存可見(jiàn)性

1.1 基本概念

可見(jiàn)性 是指線(xiàn)程之間的可見(jiàn)性乘凸,一個(gè)線(xiàn)程修改的狀態(tài)對(duì)另一個(gè)線(xiàn)程是可見(jiàn)的厕诡。也就是一個(gè)線(xiàn)程修改的結(jié)果,另一個(gè)線(xiàn)程馬上就能夠看到翰意。

1.2 實(shí)現(xiàn)原理

[圖片上傳失敗...(image-ce5d21-1653380757612)]

當(dāng)對(duì)非volatile變量進(jìn)行讀寫(xiě)的時(shí)候木人,每個(gè)線(xiàn)程先從主內(nèi)存拷貝變量到CPU緩存中,如果計(jì)算機(jī)有多個(gè)CPU冀偶,每個(gè)線(xiàn)程可能在不同的CPU上被處理醒第,這意味著每個(gè)線(xiàn)程可以拷貝到不同的CPU cache中。volatile變量不會(huì)被緩存在寄存器或者對(duì)其他處理器不可見(jiàn)的地方进鸠,保證了每次讀寫(xiě)變量都從主內(nèi)存中讀稠曼,跳過(guò)CPU cache這一步。當(dāng)一個(gè)線(xiàn)程修改了這個(gè)變量的值客年,新值對(duì)于其他線(xiàn)程是立即得知的霞幅。

2。禁止指令重排序

2.1 基本概念

指令重排序是JVM為了優(yōu)化指令量瓜、提高程序運(yùn)行效率司恳,在不影響單線(xiàn)程程序執(zhí)行結(jié)果的前提下,盡可能地提高并行度绍傲。指令重排序包括編譯器重排序和運(yùn)行時(shí)重排序扔傅。在JDK1.5之后耍共,可以使用volatile變量禁止指令重排序。針對(duì)volatile修飾的變量猎塞,在讀寫(xiě)操作指令前后會(huì)插入內(nèi)存屏障试读,指令重排序時(shí)不能把后面的指令重排序到內(nèi)存屏

示例說(shuō)明:
double r = 2.1; //(1) 
double pi = 3.14;//(2) 
double area = pi*r*r;//(3)
雖然代碼語(yǔ)句的定義順序?yàn)?->2->3,但是計(jì)算順序1->2->3與2->1->3對(duì)結(jié)果并無(wú)影響荠耽,所以編譯時(shí)和運(yùn)行時(shí)可以根據(jù)需要對(duì)1钩骇、2語(yǔ)句進(jìn)行重排序。

2.2 指令重排帶來(lái)的問(wèn)題

線(xiàn)程A中
{
    context = loadContext();
    inited = true;
}

線(xiàn)程B中
{
    if (inited) 
        fun(context);
}
如果線(xiàn)程A中的指令發(fā)生了重排序铝量,那么B中很可能就會(huì)拿到一個(gè)尚未初始化或尚未初始化完成的context,從而引發(fā)程序錯(cuò)誤倘屹。

2.3 禁止指令重排的原理

olatile關(guān)鍵字提供內(nèi)存屏障的方式來(lái)防止指令被重排,編譯器在生成字節(jié)碼文件時(shí)款违,會(huì)在指令序列中插入內(nèi)存屏障來(lái)禁止特定類(lèi)型的處理器重排序唐瀑。

JVM內(nèi)存屏障插入策略:

  • 每個(gè)volatile寫(xiě)操作的前面插入一個(gè)StoreStore屏障;
  • 在每個(gè)volatile寫(xiě)操作的后面插入一個(gè)StoreLoad屏障插爹;
  • 在每個(gè)volatile讀操作的后面插入一個(gè)LoadLoad屏障哄辣;
  • 在每個(gè)volatile讀操作的后面插入一個(gè)LoadStore屏障。

3赠尾。適用場(chǎng)景

(1)volatile關(guān)鍵字無(wú)法同時(shí)保證內(nèi)存可見(jiàn)性和原子性力穗。加鎖機(jī)制既可以確保可見(jiàn)性也可以確保原子性气嫁。

(2)volatile屏蔽掉了JVM中必要的代碼優(yōu)化当窗,所以在效率上比較低,因此一定在必要時(shí)才使用此關(guān)鍵字寸宵。

介紹一下Netty崖面?

  1. Netty是一個(gè)高性能、異步事件驅(qū)動(dòng)的NIO框架梯影。

  2. 簡(jiǎn)化并優(yōu)化了TCP和UDP套接字等網(wǎng)絡(luò)編程巫员,性能和安全等很多方面都做了優(yōu)化。

3.支持多種協(xié)議甲棍,如FTP简识、SMTP、HTTP以及各種二進(jìn)制和基于文本的傳統(tǒng)協(xié)議感猛。

在網(wǎng)絡(luò)編程中七扰,Netty是絕對(duì)的王者。

有很多開(kāi)源項(xiàng)目都用到了Netty陪白。

1颈走。市面上很多消息推送系統(tǒng)都是基于Netty來(lái)做的。

2咱士。我們常用的框架:Dubbo疫鹊、RocketMQ袖瞻、ES等等都用到了Netty。

使用Netty的項(xiàng)目統(tǒng)計(jì):https://netty.io/wiki/related-projects.html

你好拆吆,我是yltrcc,日常分享技術(shù)點(diǎn)滴脂矫,歡迎關(guān)注我:ylcoder

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末枣耀,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子庭再,更是在濱河造成了極大的恐慌捞奕,老刑警劉巖,帶你破解...
    沈念sama閱讀 217,185評(píng)論 6 503
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件拄轻,死亡現(xiàn)場(chǎng)離奇詭異颅围,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)恨搓,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,652評(píng)論 3 393
  • 文/潘曉璐 我一進(jìn)店門(mén)院促,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人斧抱,你說(shuō)我怎么就攤上這事常拓。” “怎么了辉浦?”我有些...
    開(kāi)封第一講書(shū)人閱讀 163,524評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵弄抬,是天一觀的道長(zhǎng)。 經(jīng)常有香客問(wèn)我宪郊,道長(zhǎng)掂恕,這世上最難降的妖魔是什么? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 58,339評(píng)論 1 293
  • 正文 為了忘掉前任弛槐,我火速辦了婚禮懊亡,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘丐黄。我一直安慰自己斋配,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,387評(píng)論 6 391
  • 文/花漫 我一把揭開(kāi)白布灌闺。 她就那樣靜靜地躺著艰争,像睡著了一般。 火紅的嫁衣襯著肌膚如雪桂对。 梳的紋絲不亂的頭發(fā)上甩卓,一...
    開(kāi)封第一講書(shū)人閱讀 51,287評(píng)論 1 301
  • 那天,我揣著相機(jī)與錄音蕉斜,去河邊找鬼逾柿。 笑死缀棍,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的机错。 我是一名探鬼主播爬范,決...
    沈念sama閱讀 40,130評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼弱匪!你這毒婦竟也來(lái)了青瀑?” 一聲冷哼從身側(cè)響起,我...
    開(kāi)封第一講書(shū)人閱讀 38,985評(píng)論 0 275
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤萧诫,失蹤者是張志新(化名)和其女友劉穎斥难,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體帘饶,經(jīng)...
    沈念sama閱讀 45,420評(píng)論 1 313
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡哑诊,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,617評(píng)論 3 334
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了及刻。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片镀裤。...
    茶點(diǎn)故事閱讀 39,779評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖提茁,靈堂內(nèi)的尸體忽然破棺而出淹禾,到底是詐尸還是另有隱情,我是刑警寧澤茴扁,帶...
    沈念sama閱讀 35,477評(píng)論 5 345
  • 正文 年R本政府宣布铃岔,位于F島的核電站,受9級(jí)特大地震影響峭火,放射性物質(zhì)發(fā)生泄漏毁习。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,088評(píng)論 3 328
  • 文/蒙蒙 一卖丸、第九天 我趴在偏房一處隱蔽的房頂上張望纺且。 院中可真熱鬧,春花似錦稍浆、人聲如沸载碌。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 31,716評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)嫁艇。三九已至,卻和暖如春弦撩,著一層夾襖步出監(jiān)牢的瞬間步咪,已是汗流浹背。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 32,857評(píng)論 1 269
  • 我被黑心中介騙來(lái)泰國(guó)打工益楼, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留猾漫,地道東北人点晴。 一個(gè)月前我還...
    沈念sama閱讀 47,876評(píng)論 2 370
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像悯周,于是被迫代替她去往敵國(guó)和親粒督。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,700評(píng)論 2 354

推薦閱讀更多精彩內(nèi)容

  • 每日一句 軍人天生就舍棄了戰(zhàn)斗的意義队橙! 概述 RabitMQ 發(fā)布確認(rèn)坠陈,保證消息在磁盤(pán)上。 前提條件 1捐康。隊(duì)列必須...
    darklovy閱讀 137評(píng)論 0 0
  • 內(nèi)存模型以及分區(qū) JVM分為虛擬機(jī)棧、堆庸蔼、方法區(qū)解总、本地方法區(qū)堆,用來(lái)存放實(shí)例化對(duì)象姐仅、非static成員變量,屬于線(xiàn)...
    北京黃小胖閱讀 1,231評(píng)論 0 0
  • 面試 一般都是由淺到深去問(wèn),思路是:先考察基礎(chǔ)是否過(guò)關(guān),因?yàn)榛A(chǔ)知識(shí)決定了一個(gè)技術(shù)人員發(fā)展的上限再通過(guò)深度考察是否...
    攻城獅Chova閱讀 806評(píng)論 0 1
  • 1-Java基礎(chǔ) 1.1-String和StringBuffer區(qū)別花枫,為什么是可變的,不可變的 String 類(lèi)中...
    楊慶祥閱讀 964評(píng)論 0 0
  • 一 基礎(chǔ)篇 1.1 Java基礎(chǔ) 面向?qū)ο蟮奶卣鞒橄?將一類(lèi)對(duì)象的共同特征總結(jié)出來(lái)構(gòu)建類(lèi)的過(guò)程掏膏。繼承:對(duì)已有類(lèi)的一...
    essential_note閱讀 695評(píng)論 0 0