RabbitMQ學習(四)可靠性投遞和confirm機制

RMQ可靠性投遞

一.什么是RMQ的可靠性投遞

1.保障消息的成功發(fā)出
2.保障MQ節(jié)點的成功接收
3.發(fā)送端收到MQ節(jié)點(broker)確認應答
4.完善的消息補償機制
在實際生產中,很難保障前三點的完全可靠欧募,比如在極端的環(huán)境中,生產者發(fā)送消息失敗了处坪,發(fā)送端在接受確認應答時突然發(fā)生網絡閃斷等等情況,很難保障可靠性投遞颤枪,所以就需要有第四點完善的消息補償機制寿烟。

二虽抄、大廠的解決方案

第一種:消息落庫

消息落庫流程圖.JPG

流程的示意圖如上所示:
step1:比如我下單成功了走搁,對我的業(yè)務數據進行入庫,同時對消息數據入庫迈窟;
setp2:發(fā)送消息到MQ服務上私植;
step3: 按照正常的流程就是消費者監(jiān)聽到該消息,就根據唯一id修改該消息的狀態(tài)為已消費车酣,并給一個確認應答ack到Listener曲稼。
step4: 修改消息的投遞狀態(tài);
step5: 如果出現意外情況湖员,消費者未接收到或者Listener接收確認時發(fā)生網絡閃斷贫悄,接收不到,這時候就需要用到我們的分布式定時任務來從msg數據庫抓取那些超時了還未被消費的消息;
step6: 對超時任務重新發(fā)送一遍娘摔;
step7: 重試機制里面要設置重試次數限制窄坦,因為一些外部的原因導致一直發(fā)送失敗的,不能重試太多次凳寺,要不然會拖垮整個服務鸭津。
例如重試三次還是失敗的,就把消息的status設置成2肠缨,然后通過補償機制逆趋,人工去處理。實際生產中晒奕,這種情況還是比較少的闻书,但是你不能沒有這個補償機制,要不然就做不到可靠性了脑慧。
優(yōu)點
架構實現清晰魄眉,流程簡單;
缺點
一次業(yè)務操作2次入庫漾橙,高并發(fā)場景下影響性能杆融。
有人可能就想到采用分布式事務來保證數據的一致性,但是在大型互聯網中霜运,基本很少采用事務脾歇,都是采用補償機制蒋腮。
第二種:延遲投遞,做二次確認藕各,回調檢查池摧。
既然第一種方案在高并發(fā)場景下不適合,這時候需要我們的第二種方案激况,流程圖如下作彤。
延遲投遞實現流程.JPG

step1:完成業(yè)務操作落庫后,發(fā)送消息出去乌逐;
step2: 業(yè)務消息發(fā)送后竭讳,延遲3分鐘或5分鐘再發(fā)送消息數據;
step3: 消費者訂閱消息進行消費;
step4: 完成后重新組裝一個確認消費的消息浙踢;
step5: 補償服務監(jiān)聽消費者發(fā)送到特定隊列的確認消息绢慢,修改消息數據庫的信息;
step6: 補償服務延遲檢查生產者發(fā)出的延遲檢查的消息洛波,去消息數據庫判斷胰舆,如果已經消費成功,檢查通過蹬挤;
step7: 如果檢查沒通過缚窿,則向生產者發(fā)送重新發(fā)送消息的命令。

雖然第二種方案也是無法做到100%的可靠傳遞焰扳,在特別極端的情況倦零,還是需要定時任務和補償機制進行輔助的。但是第二種方案的核心是減少數據庫操作蓝翰,在高并發(fā)場景下光绕,我們考慮的不是百分百的可靠性了,而是考慮可用性畜份,性能能否扛得住這個流量诞帐,所以我能減少一次數據庫操作就減少一次。我上游服務減少了一次數據庫操作爆雹,我的服務性能相對而言就提高了一些停蕉,而且又能把異步callback Server補償服務解耦出來。

confirm機制

消息確認機制.JPG
  1. 消息的確認钙态,是指生產投遞消息后慧起,如果broker收到消息,會給生產者一個應答
  2. 生產者接受應答册倒,用來確認這條消息是否正常發(fā)送到broker
    如何實現消息的確認機制蚓挤?
    第一步在channe開啟確認模式,channel.confirmSelect();
    第二步在channel添加監(jiān)聽,addConfirmListener,根據監(jiān)聽的結果對消息進行重新發(fā)送或記錄日志等處理灿意。
    代碼如下:
    Producer類
package com.bfxy.rabbitmq.api.confirm;

import java.io.IOException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Producer { 
    public static void main(String[] args) throws Exception {   
        //1 創(chuàng)建ConnectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("10.136.198.52");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("123456");    
        //2 獲取C onnection
        Connection connection = connectionFactory.newConnection();  
        //3 通過Connection創(chuàng)建一個新的Channel
        Channel channel = connection.createChannel();       
        //4 指定我們的消息投遞模式: 消息的確認模式 
        channel.confirmSelect();        
        String exchangeName = "test_confirm_exchange";
        String routingKey = "confirm.save"; 
        //5 發(fā)送一條消息
        String msg = "Hello RabbitMQ Send confirm message!";
        channel.basicPublish(exchangeName, routingKey, null, msg.getBytes());       
        //6 添加一個確認監(jiān)聽
        channel.addConfirmListener(new ConfirmListener() {
            @Override
            public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                System.err.println("-------no ack!-----------");
            }           
            @Override
            public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                System.err.println("-------ack!-----------");
            }
        });     
    }
}

Consumer類

package com.bfxy.rabbitmq.api.confirm;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;

public class Consumer { 
    public static void main(String[] args) throws Exception {               
        //1 創(chuàng)建ConnectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("10.136.198.52");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("123456");        
        //2 獲取C onnection
        Connection connection = connectionFactory.newConnection();      
        //3 通過Connection創(chuàng)建一個新的Channel
        Channel channel = connection.createChannel();       
        String exchangeName = "test_confirm_exchange";
        String routingKey = "confirm.#";
        String queueName = "test_confirm_queue";        
        //4 聲明交換機和隊列 然后進行綁定設置, 最后制定路由Key
        channel.exchangeDeclare(exchangeName,"topic",true);
        channel.queueDeclare(queueName,true,false,false,null);      channel.queueBind(queueName,exchangeName,routingKey);
        //5 創(chuàng)建消費者
        QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
        //6 設置信道    channel.basicConsume(queueName,true,queueingConsumer);
        //7 消費消息內容
        while (true){
        Delivery delivery = queueingConsumer.nextDelivery();
        String msg = new String(delivery.getBody());
            System.out.println("消費端:"+ msg);
        }       
    }
}

先啟動消費者估灿,再啟動生產者: 消費者打印出消費成功的信息,然后重新發(fā)送消息缤剧,生產者接收到確認消費成功的消息后馅袁,打印出確認信息。
首先消費者控制臺如下

comsumer.JPG

然后生產者控制臺打印信息如下:
producer.JPG

?著作權歸作者所有,轉載或內容合作請聯系作者
  • 序言:七十年代末荒辕,一起剝皮案震驚了整個濱河市汗销,隨后出現的幾起案子,更是在濱河造成了極大的恐慌抵窒,老刑警劉巖弛针,帶你破解...
    沈念sama閱讀 212,718評論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現場離奇詭異估脆,居然都是意外死亡钦奋,警方通過查閱死者的電腦和手機,發(fā)現死者居然都...
    沈念sama閱讀 90,683評論 3 385
  • 文/潘曉璐 我一進店門疙赠,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人朦拖,你說我怎么就攤上這事圃阳。” “怎么了璧帝?”我有些...
    開封第一講書人閱讀 158,207評論 0 348
  • 文/不壞的土叔 我叫張陵捍岳,是天一觀的道長。 經常有香客問我睬隶,道長锣夹,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 56,755評論 1 284
  • 正文 為了忘掉前任苏潜,我火速辦了婚禮银萍,結果婚禮上,老公的妹妹穿的比我還像新娘恤左。我一直安慰自己贴唇,他們只是感情好,可當我...
    茶點故事閱讀 65,862評論 6 386
  • 文/花漫 我一把揭開白布飞袋。 她就那樣靜靜地躺著戳气,像睡著了一般。 火紅的嫁衣襯著肌膚如雪巧鸭。 梳的紋絲不亂的頭發(fā)上瓶您,一...
    開封第一講書人閱讀 50,050評論 1 291
  • 那天,我揣著相機與錄音,去河邊找鬼呀袱。 笑死芯肤,一個胖子當著我的面吹牛,可吹牛的內容都是我干的压鉴。 我是一名探鬼主播崖咨,決...
    沈念sama閱讀 39,136評論 3 410
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼油吭!你這毒婦竟也來了击蹲?” 一聲冷哼從身側響起,我...
    開封第一講書人閱讀 37,882評論 0 268
  • 序言:老撾萬榮一對情侶失蹤婉宰,失蹤者是張志新(化名)和其女友劉穎歌豺,沒想到半個月后,有當地人在樹林里發(fā)現了一具尸體心包,經...
    沈念sama閱讀 44,330評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡类咧,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 36,651評論 2 327
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現自己被綠了蟹腾。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片痕惋。...
    茶點故事閱讀 38,789評論 1 341
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖娃殖,靈堂內的尸體忽然破棺而出值戳,到底是詐尸還是另有隱情,我是刑警寧澤炉爆,帶...
    沈念sama閱讀 34,477評論 4 333
  • 正文 年R本政府宣布堕虹,位于F島的核電站,受9級特大地震影響芬首,放射性物質發(fā)生泄漏赴捞。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 40,135評論 3 317
  • 文/蒙蒙 一郁稍、第九天 我趴在偏房一處隱蔽的房頂上張望赦政。 院中可真熱鬧,春花似錦艺晴、人聲如沸昼钻。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,864評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽然评。三九已至,卻和暖如春狈究,著一層夾襖步出監(jiān)牢的瞬間碗淌,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,099評論 1 267
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留亿眠,地道東北人碎罚。 一個月前我還...
    沈念sama閱讀 46,598評論 2 362
  • 正文 我出身青樓,卻偏偏與公主長得像纳像,于是被迫代替她去往敵國和親荆烈。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 43,697評論 2 351

推薦閱讀更多精彩內容