RabitMQ 發(fā)布確認(rèn)

每日一句

軍人天生就舍棄了戰(zhàn)斗的意義!

概述

RabitMQ 發(fā)布確認(rèn)小染,保證消息在磁盤上援岩。

前提條件

1。隊(duì)列必須持久化 隊(duì)列持久化

2锹锰。隊(duì)列中的消息必須持久化 消息持久化

使用

三種發(fā)布確認(rèn)的方式:

1芥炭。單個(gè)發(fā)布確認(rèn)

2。批量發(fā)布確認(rèn)

3城须。異步批量發(fā)布確認(rèn)

開啟發(fā)布確認(rèn)的方法

 //創(chuàng)建一個(gè)連接工廠
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setUsername("guest");
factory.setPassword("guest");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
**
 //開啟發(fā)布確認(rèn)
channel.confirmSelect();**

單個(gè)確認(rèn)

最簡(jiǎn)單的確認(rèn)方式蚤认,它是一種同步發(fā)布確認(rèn)的方式米苹,也就是說(shuō)發(fā)送一個(gè)消息后只有它被確認(rèn)糕伐,后續(xù)的消息才能繼續(xù)發(fā)布。

最大缺點(diǎn)是:發(fā)布速度特別的滿蘸嘶。

吞吐量:每秒不超過數(shù)百條發(fā)布的消息

/**
 * 單個(gè)確認(rèn)
 */
public static void publishSingleMessage() throws Exception {
    Channel channel = RabbitMqUtils.getChannel();
    //生命隊(duì)列
    String queueName = UUID.randomUUID().toString();

    channel.queueDeclare(queueName, true, false, false, null);
    **//開啟發(fā)布確認(rèn)
    channel.confirmSelect();**
    //開始時(shí)間
    long begin = System.currentTimeMillis();

    for (int i = 0; i < 1000; i++) {
        String message = i + "";
        channel.basicPublish("", queueName, null, message.getBytes());
        //單個(gè)消息馬上進(jìn)行確認(rèn)
       ** boolean b = channel.waitForConfirms();**
        if (b) {
            System.out.println("消息發(fā)送成功A记啤!训唱!");
        }
    }

    //結(jié)束時(shí)間
    long end = System.currentTimeMillis();

    System.out.println("發(fā)送消息1000褥蚯,單個(gè)發(fā)布確認(rèn)用時(shí): " + (end - begin) + " ms");
}

批量確認(rèn)

與單個(gè)等待確認(rèn)消息相比,先發(fā)布一批消息然后一起確認(rèn)可以極大地提高吞吐量况增。

當(dāng)然這種方式的缺點(diǎn)就是:當(dāng)發(fā)生故障導(dǎo)致發(fā)布出現(xiàn)問題時(shí)赞庶,不知道是哪個(gè)消息出現(xiàn)問題了,我們必須將整個(gè)批處理保存在內(nèi)存中澳骤,以記錄重要的信息而后重新發(fā)布消息歧强。

當(dāng)然這種方案仍然是同步的,也一樣阻塞消息的發(fā)布

/**
 * 批量確認(rèn)
 */
public static void publishBatchMessage() throws Exception {
    Channel channel = RabbitMqUtils.getChannel();
    //生命隊(duì)列
    String queueName = UUID.randomUUID().toString();

    channel.queueDeclare(queueName, true, false, false, null);
    **//開啟發(fā)布確認(rèn)
    channel.confirmSelect();
    //批量確認(rèn)消息大小
    int batchSize = 100;
    //未確認(rèn)消息個(gè)數(shù)
    int outstandingMessageCount = 0;**

    //開始時(shí)間
    long begin = System.currentTimeMillis();

    for (int i = 0; i < 1000; i++) {
        String message = i + "";
        channel.basicPublish("", queueName, null, message.getBytes());
        **outstandingMessageCount++;
        //發(fā)送的消息 == 確認(rèn)消息的大小后才批量確認(rèn)
        if (outstandingMessageCount == batchSize) {
            channel.waitForConfirms();
            outstandingMessageCount = 0;
        }**
    }
    **//為了確保還有剩余沒有確認(rèn)消息 再次確認(rèn)
    if (outstandingMessageCount > 0) {
        channel.waitForConfirms();
    }**
    //結(jié)束時(shí)間
    long end = System.currentTimeMillis();

    System.out.println("發(fā)送消息1000为肮,批量發(fā)布確認(rèn)100個(gè)用時(shí): " + (end - begin) + " ms");
}

異步確認(rèn)

它是利用回調(diào)函數(shù)來(lái)達(dá)到消息可靠性傳遞的摊册,這個(gè)中間件也是通過函數(shù)回調(diào)來(lái)保證是否投遞成功

/**
 * 異步批量確認(rèn)
 *
 * @throws Exception
 */
public static void publishAsyncMessage() throws Exception {
    try (Channel channel = RabbitMqUtils.getChannel()) {
        String queueName = UUID.randomUUID().toString();
        channel.queueDeclare(queueName, false, false, false, null);
       ** //開啟發(fā)布確認(rèn)
        channel.confirmSelect();
**
        //線程安全有序的一個(gè)哈希表,適用于高并發(fā)的情況
        //1.輕松的將序號(hào)與消息進(jìn)行關(guān)聯(lián) 2.輕松批量刪除條目 只要給到序列號(hào) 3.支持并發(fā)訪問
        ConcurrentSkipListMap<Long, String> outstandingConfirms = new ConcurrentSkipListMap<>();

        **//確認(rèn)收到消息的一個(gè)回調(diào)**
        //1.消息序列號(hào)
        //2.multiple  是否是批量確認(rèn)
        //false 確認(rèn)當(dāng)前序列號(hào)消息
        ConfirmCallback ackCallback = (sequenceNumber, multiple) -> {
            if (multiple) {
                //返回的是小于等于當(dāng)前序列號(hào)的未確認(rèn)消息 是一個(gè) map
                ConcurrentNavigableMap<Long, String> confirmed =
                        outstandingConfirms.headMap(sequenceNumber, true);
                //清除該部分未確認(rèn)消息
                confirmed.clear();
            } else {
                //只清除當(dāng)前序列號(hào)的消息
                outstandingConfirms.remove(sequenceNumber);
            }
        };
        //未確認(rèn)消息的回調(diào)
        ConfirmCallback nackCallback = (sequenceNumber, multiple) -> {
            String message = outstandingConfirms.get(sequenceNumber);
            System.out.println("發(fā)布的消息" + message + "未被確認(rèn)颊艳,序列號(hào)" + sequenceNumber);
        };

        **//添加一個(gè)異步確認(rèn)的監(jiān)聽器
        //1.確認(rèn)收到消息的回調(diào)
        //2.未收到消息的回調(diào)
        channel.addConfirmListener(ackCallback, nackCallback);**

        long begin = System.currentTimeMillis();

        for (int i = 0; i < 1000; i++) {
            String message = "消息" + i;
            **//channel.getNextPublishSeqNo()獲取下一個(gè)消息的序列號(hào)
            //通過序列號(hào)與消息體進(jìn)行一個(gè)關(guān)聯(lián),全部都是未確認(rèn)的消息體
            //將發(fā)布的序號(hào)和發(fā)布消息保存到map中
            outstandingConfirms.put(channel.getNextPublishSeqNo(), message);**
            channel.basicPublish("", queueName, null, message.getBytes());
        }
        long end = System.currentTimeMillis();
        System.out.println("發(fā)布" + 1000 + "個(gè)異步確認(rèn)消息,耗時(shí)" + (end - begin) + "ms");
    }

}

如何處理異步未確認(rèn)消息

最好的解決的解決方案就是把未確認(rèn)的消息放到一個(gè)基于內(nèi)存的能被發(fā)布線程訪問茅特,適用于高并發(fā)的的隊(duì)列忘分。
比如說(shuō)用 ConcurrentLinkedQueue 、這個(gè)隊(duì)列在 confirm callbacks 與發(fā)布線程之間進(jìn)行消息的傳遞白修。

ConcurrentSkipListMap

等等都可妒峦。

面試題

如何保證消息不丟失?

就市面上常見的消息隊(duì)列而言兵睛,只要配置得當(dāng)舟山,我們的消息就不會(huì)丟失。

消息隊(duì)列主要有三個(gè)階段:

1卤恳。生產(chǎn)消息

2累盗。存儲(chǔ)消息

3。消費(fèi)消息

1突琳。生產(chǎn)消息

生產(chǎn)者發(fā)送消息至 Broker 若债,需要處理 Broker 的響應(yīng),不論是同步還是異步發(fā)送消息拆融,同步和異步回調(diào)都需要做好 try-catch 蠢琳,妥善的處理響應(yīng)。

如果 Broker 返回寫入失敗等錯(cuò)誤消息镜豹,需要重試發(fā)送傲须。

當(dāng)多次發(fā)送失敗需要作報(bào)警,日志記錄等趟脂。這樣就能保證在生產(chǎn)消息階段消息不會(huì)丟失泰讽。

2。存儲(chǔ)消息

存儲(chǔ)消息階段需要在消息刷盤之后再給生產(chǎn)者響應(yīng)昔期,假設(shè)消息寫入緩存中就返回響應(yīng)已卸,那么機(jī)器突然斷電這消息就沒了,而生產(chǎn)者以為已經(jīng)發(fā)送成功了硼一。

如果 Broker 是集群部署累澡,有多副本機(jī)制,即消息不僅僅要寫入當(dāng)前 Broker ,還需要寫入副本機(jī)中般贼。

那配置成至少寫入兩臺(tái)機(jī)子后再給生產(chǎn)者響應(yīng)愧哟。這樣基本上就能保證存儲(chǔ)的可靠了。一臺(tái)掛了還有一臺(tái)還

在呢(假如怕兩臺(tái)都掛了..那就再多些)哼蛆。

3蕊梧。消費(fèi)消息

我們應(yīng)該在消費(fèi)者真正執(zhí)行完業(yè)務(wù)邏輯之后,再發(fā)送給 Broker 消費(fèi)成功人芽,這才是真正的消費(fèi)了望几。

所以只要我們?cè)谙I(yè)務(wù)邏輯處理完成之后再給 Broker 響應(yīng),那么消費(fèi)階段消息就不會(huì)丟失

總結(jié):

1萤厅。生產(chǎn)者 需要處理好 Broker 的響應(yīng)橄抹,出錯(cuò)情況下利用重試靴迫、報(bào)警等手段

2。Broker 需要控制響應(yīng)的時(shí)機(jī)楼誓,單機(jī)情況下是消息刷盤后返回響應(yīng)玉锌,集群多副本情況下,即發(fā)送至兩個(gè)副本及以上的情況下再返回響應(yīng)疟羹。

3主守。消費(fèi)者 需要在執(zhí)行完真正的業(yè)務(wù)邏輯之后再返回響應(yīng)給 Broker

volatile 關(guān)鍵字的作用?

1榄融。保證內(nèi)存可見性

1.1 基本概念

可見性 是指線程之間的可見性参淫,一個(gè)線程修改的狀態(tài)對(duì)另一個(gè)線程是可見的。也就是一個(gè)線程修改的結(jié)果愧杯,另一個(gè)線程馬上就能夠看到涎才。

1.2 實(shí)現(xiàn)原理

當(dāng)對(duì)非volatile變量進(jìn)行讀寫的時(shí)候,每個(gè)線程先從主內(nèi)存拷貝變量到CPU緩存中力九,如果計(jì)算機(jī)有多個(gè)CPU耍铜,每個(gè)線程可能在不同的CPU上被處理,這意味著每個(gè)線程可以拷貝到不同的CPU cache中跌前。volatile變量不會(huì)被緩存在寄存器或者對(duì)其他處理器不可見的地方棕兼,保證了每次讀寫變量都從主內(nèi)存中讀,跳過CPU cache這一步抵乓。當(dāng)一個(gè)線程修改了這個(gè)變量的值伴挚,新值對(duì)于其他線程是立即得知的。

2臂寝。禁止指令重排序

2.1 基本概念

指令重排序是JVM為了優(yōu)化指令章鲤、提高程序運(yùn)行效率,在不影響單線程程序執(zhí)行結(jié)果的前提下咆贬,盡可能地提高并行度。指令重排序包括編譯器重排序和運(yùn)行時(shí)重排序帚呼。在JDK1.5之后掏缎,可以使用volatile變量禁止指令重排序。針對(duì)volatile修飾的變量煤杀,在讀寫操作指令前后會(huì)插入內(nèi)存屏障眷蜈,指令重排序時(shí)不能把后面的指令重排序到內(nèi)存屏

示例說(shuō)明:
double r = 2.1; //(1) 
double pi = 3.14;//(2) 
double area = pi*r*r;//(3)
雖然代碼語(yǔ)句的定義順序?yàn)?->2->3,但是計(jì)算順序1->2->3與2->1->3對(duì)結(jié)果并無(wú)影響沈自,所以編譯時(shí)和運(yùn)行時(shí)可以根據(jù)需要對(duì)1酌儒、2語(yǔ)句進(jìn)行重排序。

2.2 指令重排帶來(lái)的問題

線程A中
{
    context = loadContext();
    inited = true;
}

線程B中
{
    if (inited) 
        fun(context);
}
如果線程A中的指令發(fā)生了重排序枯途,那么B中很可能就會(huì)拿到一個(gè)尚未初始化或尚未初始化完成的context,從而引發(fā)程序錯(cuò)誤忌怎。

2.3 禁止指令重排的原理

olatile關(guān)鍵字提供內(nèi)存屏障的方式來(lái)防止指令被重排籍滴,編譯器在生成字節(jié)碼文件時(shí),會(huì)在指令序列中插入內(nèi)存屏障來(lái)禁止特定類型的處理器重排序榴啸。

JVM內(nèi)存屏障插入策略:

  • 每個(gè)volatile寫操作的前面插入一個(gè)StoreStore屏障孽惰;
  • 在每個(gè)volatile寫操作的后面插入一個(gè)StoreLoad屏障;
  • 在每個(gè)volatile讀操作的后面插入一個(gè)LoadLoad屏障鸥印;
  • 在每個(gè)volatile讀操作的后面插入一個(gè)LoadStore屏障勋功。

3。適用場(chǎng)景

(1)volatile關(guān)鍵字無(wú)法同時(shí)保證內(nèi)存可見性和原子性库说。加鎖機(jī)制既可以確笨裥可見性也可以確保原子性。

(2)volatile屏蔽掉了JVM中必要的代碼優(yōu)化潜的,所以在效率上比較低要销,因此一定在必要時(shí)才使用此關(guān)鍵字。

介紹一下Netty夏块?

  1. Netty是一個(gè)高性能疏咐、異步事件驅(qū)動(dòng)的NIO框架。

  2. 簡(jiǎn)化并優(yōu)化了TCP和UDP套接字等網(wǎng)絡(luò)編程脐供,性能和安全等很多方面都做了優(yōu)化浑塞。

3.支持多種協(xié)議,如FTP政己、SMTP酌壕、HTTP以及各種二進(jìn)制和基于文本的傳統(tǒng)協(xié)議。

在網(wǎng)絡(luò)編程中歇由,Netty是絕對(duì)的王者卵牍。

有很多開源項(xiàng)目都用到了Netty。

1沦泌。市面上很多消息推送系統(tǒng)都是基于Netty來(lái)做的糊昙。

2。我們常用的框架:Dubbo谢谦、RocketMQ释牺、ES等等都用到了Netty。

使用Netty的項(xiàng)目統(tǒng)計(jì):https://netty.io/wiki/related-projects.html

你好回挽,我是yltrcc没咙,日常分享技術(shù)點(diǎn)滴,歡迎關(guān)注我:ylcoder
本文由博客一文多發(fā)平臺(tái) OpenWrite 發(fā)布千劈!

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末祭刚,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌涡驮,老刑警劉巖暗甥,帶你破解...
    沈念sama閱讀 206,126評(píng)論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異遮怜,居然都是意外死亡淋袖,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,254評(píng)論 2 382
  • 文/潘曉璐 我一進(jìn)店門锯梁,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)即碗,“玉大人,你說(shuō)我怎么就攤上這事陌凳“粒” “怎么了?”我有些...
    開封第一講書人閱讀 152,445評(píng)論 0 341
  • 文/不壞的土叔 我叫張陵合敦,是天一觀的道長(zhǎng)初橘。 經(jīng)常有香客問我,道長(zhǎng)充岛,這世上最難降的妖魔是什么保檐? 我笑而不...
    開封第一講書人閱讀 55,185評(píng)論 1 278
  • 正文 為了忘掉前任,我火速辦了婚禮崔梗,結(jié)果婚禮上夜只,老公的妹妹穿的比我還像新娘。我一直安慰自己蒜魄,他們只是感情好扔亥,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,178評(píng)論 5 371
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著谈为,像睡著了一般旅挤。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上伞鲫,一...
    開封第一講書人閱讀 48,970評(píng)論 1 284
  • 那天粘茄,我揣著相機(jī)與錄音,去河邊找鬼榔昔。 笑死驹闰,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的撒会。 我是一名探鬼主播,決...
    沈念sama閱讀 38,276評(píng)論 3 399
  • 文/蒼蘭香墨 我猛地睜開眼师妙,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼诵肛!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 36,927評(píng)論 0 259
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤怔檩,失蹤者是張志新(化名)和其女友劉穎褪秀,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體薛训,經(jīng)...
    沈念sama閱讀 43,400評(píng)論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡媒吗,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 35,883評(píng)論 2 323
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了乙埃。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片闸英。...
    茶點(diǎn)故事閱讀 37,997評(píng)論 1 333
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖介袜,靈堂內(nèi)的尸體忽然破棺而出甫何,到底是詐尸還是另有隱情,我是刑警寧澤遇伞,帶...
    沈念sama閱讀 33,646評(píng)論 4 322
  • 正文 年R本政府宣布辙喂,位于F島的核電站,受9級(jí)特大地震影響鸠珠,放射性物質(zhì)發(fā)生泄漏巍耗。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,213評(píng)論 3 307
  • 文/蒙蒙 一渐排、第九天 我趴在偏房一處隱蔽的房頂上張望炬太。 院中可真熱鬧,春花似錦飞盆、人聲如沸娄琉。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,204評(píng)論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)孽水。三九已至,卻和暖如春城看,著一層夾襖步出監(jiān)牢的瞬間女气,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,423評(píng)論 1 260
  • 我被黑心中介騙來(lái)泰國(guó)打工测柠, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留炼鞠,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 45,423評(píng)論 2 352
  • 正文 我出身青樓轰胁,卻偏偏與公主長(zhǎng)得像谒主,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子赃阀,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,722評(píng)論 2 345

推薦閱讀更多精彩內(nèi)容

  • 內(nèi)存模型以及分區(qū) JVM分為虛擬機(jī)棧霎肯、堆、方法區(qū)、本地方法區(qū)堆观游,用來(lái)存放實(shí)例化對(duì)象搂捧、非static成員變量,屬于線...
    北京黃小胖閱讀 1,217評(píng)論 0 0
  • 面試 一般都是由淺到深去問,思路是:先考察基礎(chǔ)是否過關(guān),因?yàn)榛A(chǔ)知識(shí)決定了一個(gè)技術(shù)人員發(fā)展的上限再通過深度考察是否...
    攻城獅Chova閱讀 798評(píng)論 0 1
  • 1-Java基礎(chǔ) 1.1-String和StringBuffer區(qū)別,為什么是可變的懂缕,不可變的 String 類中...
    楊慶祥閱讀 937評(píng)論 0 0
  • 一 基礎(chǔ)篇 1.1 Java基礎(chǔ) 面向?qū)ο蟮奶卣鞒橄?將一類對(duì)象的共同特征總結(jié)出來(lái)構(gòu)建類的過程允跑。繼承:對(duì)已有類的一...
    essential_note閱讀 686評(píng)論 0 0
  • 前言 在茫茫的互聯(lián)網(wǎng)海洋中尋尋覓覓拌屏,我收藏了近千道Java經(jīng)典面試題潮针,分享給你們。建議大家收藏起來(lái)倚喂,在茶余飯后拿出...
    風(fēng)平浪靜如碼閱讀 543評(píng)論 0 1