rabbitmq如何保證消息可靠性不丟失

[TOC]

之前我們簡單介紹了rabbitmq的功能嘶朱。他的作用就是方便我們的消息解耦特愿。緊接著問題就會暴露出來仲墨。解耦就設(shè)計(jì)到雙方系統(tǒng)不穩(wěn)定問題。在mq中有生產(chǎn)者揍障、mq目养、消費(fèi)者三個角色。其中一個角色down機(jī)或者重啟后毒嫡。就設(shè)計(jì)到消息的丟失問題癌蚁。

因?yàn)镸Q整個消息周期設(shè)計(jì)到上述的三個角色,所以我們從這個三個角色開始討論丟失數(shù)據(jù)的情況。并如何解決

生產(chǎn)者丟失消息

  • 在生產(chǎn)數(shù)據(jù)程序中努释,消息已經(jīng)處理好還未發(fā)送給MQ這個階段碘梢,生產(chǎn)者因?yàn)橐馔馇闆r中斷了。這個時(shí)候生產(chǎn)者這條消息就會丟失伐蒂。因?yàn)槌绦蛑貑⒑弥罂赡懿粫俅紊a(chǎn)該消息煞躬。

實(shí)際案列1

  • 購物商城中已經(jīng)選購了商品提交到支付界面。在支付成功后我們的程序需要發(fā)送消息給商家逸邦。這個時(shí)候程序中斷了恩沛。待重啟后客戶界面訂單狀態(tài)是付款成功的。但是這個訂單就沒有及時(shí)通知給商家缕减。這會造成商家延遲發(fā)貨复唤。

實(shí)際案例2

  • 同樣是購物支付,A客戶先付款order1訂單,支付成功后發(fā)送MQ前直線異常但并未導(dǎo)致程序中斷烛卧。這個時(shí)候order1商家收不到通知佛纫,然后B客戶對order2訂單進(jìn)行支付且整個過程正常。order2訂單就會通知到對應(yīng)的商家总放。整個周期order1訂單就屬于丟失

總結(jié)

  • 兩種情況都是在發(fā)送消息是出現(xiàn)問題呈宇。第一種是程序中斷,第二種是訂單異常局雄,第一種異常級別高會影響整個程序使用反而是好排查甥啄。第二種程序不異常。這種情況很難發(fā)現(xiàn)炬搭,只會是個別情況蜈漓。

解決方案

  • 針對上述情況mq也提供了兩種方法解決。
  • 1宫盔、開啟rabbitmq事務(wù)(同步)
  • 2融虽、開啟confirm模式(異步)

代碼模擬


Map<String, Object> resultMap = new HashMap<String, Object>(){
    {
        put("code", 200);
    }
};
String msg = "";
Integer index = 0;
if (params.containsKey("msg")) {
    msg = params.get("msg").toString();
}
if (params.containsKey("index")) {
    index = Integer.valueOf(params.get("index").toString());
}
if (index != 0) {
    //這里開始模擬異常出現(xiàn)。消息將會丟失
    int i = 1 / 0;
}
rabbitTemplate.convertAndSend(RabbitConfig.TOPICEXCHANGE, "zxh", msg);
return resultMap;

  • 上述代碼http://localhost:8282/rabbitmq/sendTopic?msg=test&index=1就會發(fā)生異常灼芭,這個時(shí)候數(shù)據(jù)丟失
  • http://localhost:8282/rabbitmq/sendTopic?msg=test可以正常發(fā)送有额。讀者可以自行測試
  • 其實(shí)通過rabbitmq的事務(wù)并不能解決上面的丟失情況。但是加上事務(wù)會保證消息發(fā)送的可靠性彼绷。上面發(fā)送消息后出異常這時(shí)候我們就沒法回退消息了巍佑。但是事務(wù)可以幫我們實(shí)現(xiàn)

事務(wù)


String msg = "trantest";
Connection connection = rabbitTemplate.getConnectionFactory().createConnection();
Channel channel = connection.createChannel(true);
try {
    channel.basicPublish(RabbitConfig.TOPICEXCHANGE, "zxh", null, msg.getBytes());
    int i = 1 / 0;
} catch (IOException e) {
    channel.txRollback();
    e.printStackTrace();
}
channel.txCommit();
connection.close();

  • 最終測試效果是mq沒有收到消息的。

confirm模式確實(shí)


Connection connection = rabbitTemplate.getConnectionFactory().createConnection();
Channel channel = connection.createChannel(false);
channel.confirmSelect();
try {
    channel.basicPublish(RabbitConfig.TOPICEXCHANGE, "zxh", null, msg.getBytes());
} catch (IOException e) {
    e.printStackTrace();
}
boolean b = channel.waitForConfirms();
if (b) {
    System.out.println("mq接收消息成功");
    Thread.sleep(1000*5);
}
System.out.println("end1");
channel.confirmSelect();
channel.basicPublish(RabbitConfig.TOPICEXCHANGE, "zxh", null, msg.getBytes());
channel.addConfirmListener(new ConfirmListener() {
    @SneakyThrows
    @Override
    public void handleAck(long deliveryTag, boolean multiple) throws IOException {
        System.out.println("~~~~~消息成功發(fā)送到交換機(jī)");
        Thread.sleep(1000 * 5);
    }

    @Override
    public void handleNack(long deliveryTag, boolean multiple) throws IOException {
        System.out.println("~~~~~消息發(fā)送到交換機(jī)失敗");
    }
});
System.out.println("end2");
channel.close();
connection.close();

  • 上面使用了兩種確認(rèn)方式寄悯,前者是同步確認(rèn)萤衰,后者是異步確認(rèn)。因?yàn)樵谕粋€方法里猜旬。msg都是能獲取到的脆栋。所以在ConfimListener中就沒有返回消息胳螟。

數(shù)據(jù)退回監(jiān)聽

  • 上面兩種一個增加安全可靠性。一個增加確認(rèn)機(jī)制筹吐。還有一種情況是數(shù)據(jù)回退糖耸。當(dāng)交換機(jī)沒有隊(duì)列綁定是這個時(shí)候發(fā)送數(shù)據(jù)后如果設(shè)置了回退屬性,那么消息會回退到監(jiān)聽器匯中的丘薛。channel中的mandatory表示是否檢測分發(fā)到隊(duì)列中嘉竟。

String msg = "Hello World!";
Connection connection = rabbitTemplate.getConnectionFactory().createConnection();
Channel channel = connection.createChannel(false);
channel.confirmSelect();
//return機(jī)制:監(jiān)控交換機(jī)是否將消息分發(fā)到隊(duì)列
channel.addReturnListener(new ReturnListener() {
    @Override
    public void handleReturn(int i, String s, String s1, String s2, AMQP.BasicProperties basicProperties, byte[] bytes) throws IOException {
        //如果交換機(jī)分發(fā)消息到隊(duì)列失敗,則會執(zhí)行此方法(用來處理交換機(jī)分發(fā)消息到隊(duì)列失敗的情況)
        System.out.println("*****"+i);  //標(biāo)識
        System.out.println("*****"+s);  //
        System.out.println("*****"+s1); //交換機(jī)名
        System.out.println("*****"+s2); //交換機(jī)對應(yīng)的隊(duì)列的key
        System.out.println("*****"+new String(bytes));  //發(fā)送的消息
    }
});
//發(fā)送消息
//channel.basicPublish("ex2", "c", null, msg.getBytes());
channel.basicPublish(RabbitConfig.DIRECTEXCHANGE, "c", true, null, msg.getBytes());
channel.addConfirmListener(new ConfirmListener() {
    @SneakyThrows
    @Override
    public void handleAck(long deliveryTag, boolean multiple) throws IOException {
        System.out.println("~~~~~消息成功發(fā)送到交換機(jī)");
        Thread.sleep(1000 * 5);
    }

    @Override
    public void handleNack(long deliveryTag, boolean multiple) throws IOException {
        System.out.println("~~~~~消息發(fā)送到交換機(jī)失敗");
    }
});

  • 上面ReturnListener就會被觸發(fā)洋侨,這個時(shí)候confirm監(jiān)聽器也被觸發(fā)認(rèn)為成功接收的只不過被退回舍扰。

MQ事務(wù)相關(guān)軟文推薦

mq事務(wù)開啟分析

MQ丟失信息

  • 在發(fā)送消息到MQ時(shí)我們可以設(shè)置消息屬性是否為可持久化。如果設(shè)置了直接就會存儲在磁盤上希坚。在內(nèi)存可用時(shí)也會同步到內(nèi)存中提高效率边苹。如果消息屬性中設(shè)置的是非持久化的話,就會直接存儲在內(nèi)存里裁僧,當(dāng)內(nèi)存不足是會將數(shù)據(jù)備份至磁盤上个束。

消費(fèi)者丟失信息

  • 消費(fèi)端如果沒有單獨(dú)設(shè)置的話默認(rèn)就是MQ不管理。換句話說MQ只負(fù)責(zé)發(fā)送消息聊疲。mq為我們提供了三種模式
    NONE,
    MANUAL,
    AUTO; 默認(rèn)的

  • 我們需要手動將連接工廠設(shè)置MANUAL后再接收到消息后我們需要手動確認(rèn)茬底,mq才會刪除消息。否則會一直等待到消費(fèi)端重啟才會進(jìn)行重新分發(fā)數(shù)據(jù)

  • channel.basicAck(long,boolean); 確認(rèn)收到消息获洲,消息將被隊(duì)列移除阱表,false只確認(rèn)當(dāng)前consumer一個消息收到,true確認(rèn)所有consumer獲得的消息贡珊。

  • channel.basicNack(long,boolean,boolean); 確認(rèn)否定消息最爬,第一個boolean表示一個consumer還是所有,第二個boolean表示requeue是否重新回到隊(duì)列门岔,true重新入隊(duì)爱致。

  • channel.basicReject(long,boolean); 拒絕消息,requeue=false 表示不再重新入隊(duì)固歪,如果配置了死信隊(duì)列則進(jìn)入死信隊(duì)列蒜鸡。

當(dāng)消息回滾到消息隊(duì)列時(shí)胯努,這條消息不會回到隊(duì)列尾部牢裳,而是仍是在隊(duì)列頭部,這時(shí)消費(fèi)者會又接收到這條消息叶沛,如果想消息進(jìn)入隊(duì)尾蒲讯,須確認(rèn)消息后再次發(fā)送消息。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末灰署,一起剝皮案震驚了整個濱河市判帮,隨后出現(xiàn)的幾起案子局嘁,更是在濱河造成了極大的恐慌,老刑警劉巖晦墙,帶你破解...
    沈念sama閱讀 206,482評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件悦昵,死亡現(xiàn)場離奇詭異,居然都是意外死亡晌畅,警方通過查閱死者的電腦和手機(jī)但指,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,377評論 2 382
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來抗楔,“玉大人棋凳,你說我怎么就攤上這事×铮” “怎么了剩岳?”我有些...
    開封第一講書人閱讀 152,762評論 0 342
  • 文/不壞的土叔 我叫張陵,是天一觀的道長入热。 經(jīng)常有香客問我拍棕,道長,這世上最難降的妖魔是什么勺良? 我笑而不...
    開封第一講書人閱讀 55,273評論 1 279
  • 正文 為了忘掉前任莫湘,我火速辦了婚禮,結(jié)果婚禮上郑气,老公的妹妹穿的比我還像新娘幅垮。我一直安慰自己,他們只是感情好尾组,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,289評論 5 373
  • 文/花漫 我一把揭開白布忙芒。 她就那樣靜靜地躺著,像睡著了一般讳侨。 火紅的嫁衣襯著肌膚如雪呵萨。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,046評論 1 285
  • 那天跨跨,我揣著相機(jī)與錄音潮峦,去河邊找鬼。 笑死勇婴,一個胖子當(dāng)著我的面吹牛忱嘹,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播耕渴,決...
    沈念sama閱讀 38,351評論 3 400
  • 文/蒼蘭香墨 我猛地睜開眼拘悦,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了橱脸?” 一聲冷哼從身側(cè)響起础米,我...
    開封第一講書人閱讀 36,988評論 0 259
  • 序言:老撾萬榮一對情侶失蹤分苇,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后屁桑,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體医寿,經(jīng)...
    沈念sama閱讀 43,476評論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 35,948評論 2 324
  • 正文 我和宋清朗相戀三年蘑斧,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了糟红。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 38,064評論 1 333
  • 序言:一個原本活蹦亂跳的男人離奇死亡乌叶,死狀恐怖盆偿,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情准浴,我是刑警寧澤事扭,帶...
    沈念sama閱讀 33,712評論 4 323
  • 正文 年R本政府宣布,位于F島的核電站乐横,受9級特大地震影響求橄,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜葡公,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,261評論 3 307
  • 文/蒙蒙 一罐农、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧催什,春花似錦涵亏、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,264評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至旋圆,卻和暖如春宠默,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背灵巧。 一陣腳步聲響...
    開封第一講書人閱讀 31,486評論 1 262
  • 我被黑心中介騙來泰國打工搀矫, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人刻肄。 一個月前我還...
    沈念sama閱讀 45,511評論 2 354
  • 正文 我出身青樓瓤球,卻偏偏與公主長得像,于是被迫代替她去往敵國和親肄方。 傳聞我的和親對象是個殘疾皇子冰垄,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,802評論 2 345

推薦閱讀更多精彩內(nèi)容