RabbitMQ:RabbitMQ之消息確認(rèn)機(jī)制(事務(wù)+Confirm)

9824247-b8c9f19a9e567080.jpg

1、概述

在Rabbitmq中我們可以通過持久化來解決因?yàn)榉?wù)器異常而導(dǎo)致丟失的問題, 除此之外我們還會(huì)遇到一個(gè)問題:生產(chǎn)者將消息發(fā)送出去之后,消息到底有沒有正確到達(dá) Rabbit服務(wù)器呢?如果不錯(cuò)得數(shù)處理,我們是不知道的,(即Rabbit服務(wù)器不會(huì)反饋任何消息給生產(chǎn)者),也就是默認(rèn)的情況下是不知道消息有沒有正確到達(dá);

導(dǎo)致的問題:消息到達(dá)服務(wù)器之前丟失,那么持久化也不能解決此問題,因?yàn)橄⒏揪蜎]有到達(dá)Rabbit服務(wù)器!

RabbitMQ為我們提供了兩種方式:

  • 通過AMQP事務(wù)機(jī)制實(shí)現(xiàn),這也是AMQP協(xié)議層面提供的解決方案撤蚊;
  • 通過將channel設(shè)置成confirm模式來實(shí)現(xiàn)什乙;
事務(wù)機(jī)制

RabbitMQ中與事務(wù)機(jī)制有關(guān)的方法有三個(gè):txSelect(), txCommit()以及txRollback(), txSelect用于將當(dāng)前channel設(shè)置成transaction模式肄扎,txCommit用于提交事務(wù)拦赠,txRollback用于回滾事務(wù)沪羔,在通過txSelect開啟事務(wù)之后饥伊,我們便可以發(fā)布消息給broker代理服務(wù)器了,如果txCommit提交成功了蔫饰,則消息一定到達(dá)了broker了琅豆,如果在txCommit執(zhí)行之前broker異常崩潰或者由于其他原因拋出異常,這個(gè)時(shí)候我們便可以捕獲異常通過txRollback回滾事務(wù)了篓吁。 關(guān)鍵代碼:

channel.txSelect(); 
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes()); 
channel.txCommit(); 

2茫因、事務(wù)機(jī)制

事務(wù)機(jī)制類似于數(shù)據(jù)庫的事務(wù)機(jī)制

2.1 生產(chǎn)者
package com.hrabbit.rabbitmq.amqp.send;

import com.hrabbit.rabbitmq.utils.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @Auther: hrabbit
 * @Date: 2018-07-02 下午3:05
 * @Description:
 */
public class Send {
    private static final String QUEUE_NAME = "QUEUE_simple";

    public static void main(String[] args) throws IOException, TimeoutException {
         /* 獲取一個(gè)連接 */
        Connection connection = ConnectionUtils.getConnection();
        /* 從連接中創(chuàng)建通道 */
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        String msg = "Hello  Simple QUEUE !";
        try {
            channel.txSelect();
            channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
            int result = 1 / 0;
            channel.txCommit();
        } catch (Exception e) {
            channel.txRollback();
            System.out.println("----msg rollabck ");
        } finally {
            channel.close();
            connection.close();
        }
    }
}
2.1 消費(fèi)者
package com.hrabbit.rabbitmq.amqp.recover;

import com.hrabbit.rabbitmq.utils.ConnectionUtils;
import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * @Auther: hrabbit
 * @Date: 2018-07-02 下午3:08
 * @Description:
 */
public class Recover {
    private static final String QUEUE_NAME = "QUEUE_simple";
    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            //獲取到達(dá)的消息
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println(" [x] Received '" + message + "'");
            }
        };
        //監(jiān)聽隊(duì)列
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }
}

此種方式雖然可以解決事物的問題,但是此種模式還是很耗時(shí)的,采用這種方式 降低了Rabbitmq的消息吞吐量杖剪。

2冻押、Confirm模式

2.1概念

上面我們介紹了RabbitMQ可能會(huì)遇到的一個(gè)問題,即生成者不知道消息是否真正到達(dá)broker盛嘿,隨后通過AMQP協(xié)議層面為我們提供了事務(wù)機(jī)制解決了這個(gè)問題洛巢,但是采用事務(wù)機(jī)制實(shí)現(xiàn)會(huì)降低RabbitMQ的消息吞吐量,那么有沒有更加高效的解決方式呢次兆?答案是采用Confirm模式稿茉。

2.2 producer端confirm模式的實(shí)現(xiàn)原理

生產(chǎn)者將信道設(shè)置成confirm模式,一旦信道進(jìn)入confirm模式类垦,所有在該信道上面發(fā)布的消息都會(huì)被指派一個(gè)唯一的ID(從1開始)狈邑,一旦消息被投遞到所有匹配的隊(duì)列之后城须,broker就會(huì)發(fā)送一個(gè)確認(rèn)給生產(chǎn)者(包含消息的唯一ID),這就使得生產(chǎn)者知道消息已經(jīng)正確到達(dá)目的隊(duì)列了蚤认,如果消息和隊(duì)列是可持久化的,那么確認(rèn)消息會(huì)將消息寫入磁盤之后發(fā)出糕伐,broker回傳給生產(chǎn)者的確認(rèn)消息中deliver-tag域包含了確認(rèn)消息的序列號(hào)砰琢,此外broker也可以設(shè)置basic.ack的multiple域,表示到這個(gè)序列號(hào)之前的所有消息都已經(jīng)得到了處理良瞧。 confirm模式最大的好處在于他是異步的陪汽,一旦發(fā)布一條消息,生產(chǎn)者應(yīng)用程序就可以在等信道返回確認(rèn)的同時(shí)繼續(xù)發(fā)送下一條消息褥蚯,當(dāng)消息最終得到確認(rèn)之后挚冤,生產(chǎn)者應(yīng)用便可以通過回調(diào)方法來處理該確認(rèn)消息,如果RabbitMQ因?yàn)樽陨韮?nèi)部錯(cuò)誤導(dǎo)致消息丟失赞庶,就會(huì)發(fā)送一條nack消息训挡,生產(chǎn)者應(yīng)用程序同樣可以在回調(diào)方法中處理該nack消息澳骤。

2.3 開啟confirm模式的方法

已經(jīng)在transaction事務(wù)模式的channel是不能再設(shè)置成confirm模式的,即這兩種模式是不能共存的澜薄。
生產(chǎn)者通過調(diào)用channel的confirmSelect方法將channel設(shè)置為confirm模式
核心代碼:

//生產(chǎn)者通過調(diào)用channel的confirmSelect方法將channel設(shè)置為confirm模式  
channel.confirmSelect(); 

編程模式

  1. 普通confirm模式:每發(fā)送一條消息后为肮,調(diào)用waitForConfirms()方法,等待服務(wù)器端confirm肤京。實(shí)際上是一種串行confirm了颊艳。
  2. 批量confirm模式:每發(fā)送一批消息后,調(diào)用waitForConfirms()方法忘分,等待服務(wù)器端confirm棋枕。
  3. 異步confirm模式:提供一個(gè)回調(diào)方法,服務(wù)端confirm了一條或者多條消息后Client端會(huì)回調(diào)這個(gè)方法妒峦。
2.4 普通confirm模式
package com.hrabbit.rabbitmq.confirm;

import com.hrabbit.rabbitmq.utils.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import org.junit.jupiter.api.Test;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @Auther: hrabbit
 * @Date: 2018-07-02 下午3:20
 * @Description:
 */
public class SendConfirm {
    private static final String QUEUE_NAME = "QUEUE_simple_confirm";

    @Test
    public void sendMsg() throws IOException, TimeoutException, InterruptedException {
        /* 獲取一個(gè)連接 */
        Connection connection = ConnectionUtils.getConnection();
        /* 從連接中創(chuàng)建通道 */
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        //生產(chǎn)者通過調(diào)用channel的confirmSelect方法將channel設(shè)置為confirm模式
        channel.confirmSelect();
        String msg = "Hello   QUEUE !";
        channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
        if (!channel.waitForConfirms()) {
            System.out.println("send message 失敗");
        } else {
            System.out.println(" send messgae ok ...");
        }
        channel.close();
        connection.close();
    }
}
2.4 批量confirm模式

批量confirm模式稍微復(fù)雜一點(diǎn)戒悠,客戶端程序需要定期(每隔多少秒)或者定量(達(dá)到多少條)或者兩則結(jié)合起來publish消息,然后等待服務(wù)器端confirm, 相比普通confirm模式舟山,批量極大提升confirm效率绸狐,但是問題在于一旦出現(xiàn)confirm返回false或者超時(shí)的情況時(shí),客戶端需要將這一批次的消息全部重發(fā)累盗,這會(huì)帶來明顯的重復(fù)消息數(shù)量寒矿,并且,當(dāng)消息經(jīng)常丟失時(shí)若债,批量confirm性能應(yīng)該是不升反降的符相。

package com.hrabbit.rabbitmq.confirm;

import com.hrabbit.rabbitmq.utils.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import org.junit.jupiter.api.Test;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @Auther: hrabbit
 * @Date: 2018-07-02 下午3:25
 * @Description:
 */
public class SendbatchConfirm {

    private static final String QUEUE_NAME = "QUEUE_simple_confirm";

    @Test
    public void sendMsg() throws IOException, TimeoutException, InterruptedException {
        /* 獲取一個(gè)連接 */
        Connection connection = ConnectionUtils.getConnection();
        /* 從連接中創(chuàng)建通道 */
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        //生產(chǎn)者通過調(diào)用channel的confirmSelect方法將channel設(shè)置為confirm模式
        channel.confirmSelect();

        //生產(chǎn)者通過調(diào)用channel的confirmSelect方法將channel設(shè)置為confirm模式
        channel.confirmSelect();
        String msg = "Hello   QUEUE !";
        for (int i = 0; i < 10; i++) {
            channel.basicPublish("", QUEUE_NAME, null,msg.getBytes());
        }

        if (!channel.waitForConfirms()) {
            System.out.println("send message error");
        } else {
            System.out.println(" send messgae ok ...");
        }
        channel.close();
        connection.close();
    }
}

2.5 異步confirm模式

Channel對(duì)象提供的ConfirmListener()回調(diào)方法只包含deliveryTag(當(dāng)前Chanel發(fā)出的消息序號(hào)),我們需要自己為每一個(gè)Channel維護(hù)一個(gè)unconfirm的消息序號(hào)集合蠢琳,每publish一條數(shù)據(jù)啊终,集合中元素加1,每回調(diào)一次handleAck方法傲须,unconfirm集合刪掉相應(yīng)的一條(multiple=false)或多條(multiple=true)記錄蓝牲。從程序運(yùn)行效率上看,這個(gè)unconfirm集合最好采用有序集合SortedSet存儲(chǔ)結(jié)構(gòu)泰讽。實(shí)際上例衍,SDK中的waitForConfirms()方法也是通過SortedSet維護(hù)消息序號(hào)的。

package com.hrabbit.rabbitmq.confirm;

import com.hrabbit.rabbitmq.utils.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection;

import java.io.IOException;
import java.util.Collections;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.TimeoutException;

/**
 * @Auther: hrabbit
 * @Date: 2018-07-02 下午3:28
 * @Description:
 */
public class SendAync {

    private static final String QUEUE_NAME = "QUEUE_simple_confirm_aync";
    public static void main(String[] args) throws IOException, TimeoutException {
        /* 獲取一個(gè)連接 */
        Connection connection = ConnectionUtils.getConnection();
        /* 從連接中創(chuàng)建通道 */
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        //生產(chǎn)者通過調(diào)用channel的confirmSelect方法將channel設(shè)置為confirm模式
        channel.confirmSelect();
        final SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<Long>());
        channel.addConfirmListener(new ConfirmListener() {
            //每回調(diào)一次handleAck方法已卸,unconfirm集合刪掉相應(yīng)的一條(multiple=false)或多條(multiple=true)記錄佛玄。
            public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                if (multiple) {
                    System.out.println("--multiple--");
                    confirmSet.headSet(deliveryTag + 1).clear();
                    //用一個(gè)SortedSet, 返回此有序集合中小于end的所有元素。
                    } else {
                    System.out.println("--multiple false--");
                    confirmSet.remove(deliveryTag);
                }
            }
            public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                System.out.println("Nack, SeqNo: " + deliveryTag + ", multiple: " + multiple);
                if (multiple) {
                    confirmSet.headSet(deliveryTag + 1).clear();
                } else {
                    confirmSet.remove(deliveryTag);
                }
            }
        });
        String msg = "Hello   QUEUE !";
        while (true) {
            long nextSeqNo = channel.getNextPublishSeqNo();
            channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
            confirmSet.add(nextSeqNo);
        }
    }
}

系列文章:

RabbitMQ:RabbitMQ-理論基礎(chǔ)
RabbitMQ:RabbitMQ:快速入門hello word
RabbitMQ:RabbitMQ:work queues 工作隊(duì)列(Round-robin/Fair dispatch)
RabbitMQ:RabbitMQ:消息應(yīng)答與消息持久化
RabbitMQ:發(fā)布/訂閱 Publish/Subscribe
RabbitMQ:路由Routing
RabbitMQ:Topic類型的exchange
RabbitMQ:spring整合RabbitMQ

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末累澡,一起剝皮案震驚了整個(gè)濱河市梦抢,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌愧哟,老刑警劉巖奥吩,帶你破解...
    沈念sama閱讀 212,718評(píng)論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件具伍,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡圈驼,警方通過查閱死者的電腦和手機(jī)人芽,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,683評(píng)論 3 385
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來绩脆,“玉大人萤厅,你說我怎么就攤上這事⊙テ龋” “怎么了惕味?”我有些...
    開封第一講書人閱讀 158,207評(píng)論 0 348
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)玉锌。 經(jīng)常有香客問我名挥,道長(zhǎng),這世上最難降的妖魔是什么主守? 我笑而不...
    開封第一講書人閱讀 56,755評(píng)論 1 284
  • 正文 為了忘掉前任禀倔,我火速辦了婚禮,結(jié)果婚禮上参淫,老公的妹妹穿的比我還像新娘救湖。我一直安慰自己,他們只是感情好涎才,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,862評(píng)論 6 386
  • 文/花漫 我一把揭開白布鞋既。 她就那樣靜靜地躺著,像睡著了一般耍铜。 火紅的嫁衣襯著肌膚如雪邑闺。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 50,050評(píng)論 1 291
  • 那天棕兼,我揣著相機(jī)與錄音陡舅,去河邊找鬼。 笑死程储,一個(gè)胖子當(dāng)著我的面吹牛蹭沛,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播章鲤,決...
    沈念sama閱讀 39,136評(píng)論 3 410
  • 文/蒼蘭香墨 我猛地睜開眼,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼咆贬!你這毒婦竟也來了败徊?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 37,882評(píng)論 0 268
  • 序言:老撾萬榮一對(duì)情侶失蹤掏缎,失蹤者是張志新(化名)和其女友劉穎皱蹦,沒想到半個(gè)月后煤杀,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,330評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡沪哺,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,651評(píng)論 2 327
  • 正文 我和宋清朗相戀三年沈自,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片辜妓。...
    茶點(diǎn)故事閱讀 38,789評(píng)論 1 341
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡枯途,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出籍滴,到底是詐尸還是另有隱情酪夷,我是刑警寧澤,帶...
    沈念sama閱讀 34,477評(píng)論 4 333
  • 正文 年R本政府宣布孽惰,位于F島的核電站晚岭,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏勋功。R本人自食惡果不足惜坦报,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 40,135評(píng)論 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望狂鞋。 院中可真熱鬧燎竖,春花似錦、人聲如沸要销。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,864評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽疏咐。三九已至纤掸,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間浑塞,已是汗流浹背借跪。 一陣腳步聲響...
    開封第一講書人閱讀 32,099評(píng)論 1 267
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留酌壕,地道東北人掏愁。 一個(gè)月前我還...
    沈念sama閱讀 46,598評(píng)論 2 362
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像卵牍,于是被迫代替她去往敵國和親果港。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,697評(píng)論 2 351