RabbitMQ可靠性保障

來一幅圖:


image.png

說明:可靠性和效率是不可兼得的,保證可靠得犧牲一部分效率。

為了保障消息成功從生產(chǎn)者投遞到broker:
采用comfirm確認(rèn)消息機制结笨,如果Broker端接受到消息,那么就會回送相應(yīng),然后生產(chǎn)者會監(jiān)聽Broker給的應(yīng)答气忠,流程圖:


image.png

實現(xiàn)方式:


image.png

代碼如下:
生產(chǎn)者:

public class Producer {

    
    public static void main(String[] args) throws Exception {
        
        
        //1 創(chuàng)建ConnectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.11.76");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        
        //2 獲取C onnection
        Connection connection = connectionFactory.newConnection();
        
        //3 通過Connection創(chuàng)建一個新的Channel
        Channel channel = connection.createChannel();
        
        
        //4 指定我們的消息投遞模式: 消息的確認(rèn)模式 
        channel.confirmSelect();
        
        String exchangeName = "test_confirm_exchange";
        String routingKey = "confirm.save";
        
        //5 發(fā)送一條消息
        String msg = "Hello RabbitMQ Send confirm message!";
        channel.basicPublish(exchangeName, routingKey, null, msg.getBytes());
        
        //6 添加一個確認(rèn)監(jiān)聽
        channel.addConfirmListener(new ConfirmListener() {
            @Override
            public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                System.err.println("-------no ack!-----------");
            }
            
            @Override
            public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                System.err.println("-------ack!-----------");
            }
        });

    }
}

隊列收到消息之后會自動ACK(或者在消費者端可以手動選擇channel的NCK),那么生產(chǎn)者端的監(jiān)聽就會收到回復(fù)未桥。

2.如果消息路由不到指定的隊列笔刹,處理路由不到的問題,那么方法一就可以使用Return Listener.

流程圖如下:

image.png

如果發(fā)送消息的時候冬耿,可能因為routingkey錯誤舌菜,或者隊列不存在,或者隊列名稱錯誤導(dǎo)致路由失敗亦镶。
使用方式:
使用mandatory參數(shù)(即發(fā)送消息時候的第三個參數(shù)設(shè)置為true)和ReturnListener日月,可以實現(xiàn)消息無法路由的時候返回給生產(chǎn)者。
核心代碼:

        channel.addReturnListener(new ReturnListener() {
            @Override
            public void handleReturn(int replyCode, String replyText, String exchange,
                    String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
                
                System.err.println("---------handle  return----------");
                System.err.println("replyCode: " + replyCode);
                System.err.println("replyText: " + replyText);
                System.err.println("exchange: " + exchange);
                System.err.println("routingKey: " + routingKey);
                System.err.println("properties: " + properties);
                System.err.println("body: " + new String(body));
            }
        });
        
        
        channel.basicPublish(exchange, routingKeyError, true, null, msg.getBytes());

此時如果消息路由不到缤骨,生產(chǎn)者配置的監(jiān)聽會拿到該消息爱咬。

方法二是采用備份交換機(alternate-exchange),無法路由的消息會發(fā)送到這個交換機上绊起。
代碼如下:

Map<String,Object> arguments = new HashMap<String,Object>(); 
arguments.put("alternate-exchange","ALTERNATE_EXCHANGE"); // 指定交換機的備份交換機 
channel.exchangeDeclare("TEST_EXCHANGE","topic", false, false, false, arguments);

3.確保消息成功從隊列投到消費者(注意是隊列到消費者):
采用ACK機制,在channel操作隊列和消費者時候即首先關(guān)閉autoAck,

    // 手工簽收 必須要關(guān)閉 autoAck = false
        channel.basicConsume(queueName, false, new MyConsumer(channel));

這里我使用的是自定義消費者精拟,自定義消費者代碼:


public class MyConsumer extends DefaultConsumer {


    private Channel channel ;
    
    public MyConsumer(Channel channel) {
        super(channel);
        this.channel = channel;
    }

    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        System.err.println("-----------consume message----------");
        System.err.println("body: " + new String(body));
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        if((Integer)properties.getHeaders().get("num") == 0) {
            channel.basicNack(envelope.getDeliveryTag(), false, true);
        } else {
            channel.basicAck(envelope.getDeliveryTag(), false);
        }
        
    }


}

如果是第0條消息,那么就拒絕并將此消息重回隊列尾部虱歪,如果不是第0條那么就ACK蜂绎。

生產(chǎn)者的代碼如下,每次發(fā)送消息都有num標(biāo)識:


public class Producer {

    
    public static void main(String[] args) throws Exception {
        
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.11.76");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        
        String exchange = "test_ack_exchange";
        String routingKey = "ack.save";
        
        
        
        for(int i =0; i<5; i ++){
            
            Map<String, Object> headers = new HashMap<String, Object>();
            headers.put("num", i);
            
            AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
                    .deliveryMode(2)
                    .contentEncoding("UTF-8")
                    .headers(headers)
                    .build();
            String msg = "Hello RabbitMQ ACK Message " + i;
            channel.basicPublish(exchange, routingKey, true, properties, msg.getBytes());
        }
        
    }
}

開啟消費者和生產(chǎn)者,控制臺打印如下:


image.png

可以看出第0條消息一直被循環(huán)消費(因為這個隊列只綁定了一個消費者笋鄙,此消費者設(shè)置了第0條消息重回隊列师枣,那么就循環(huán)消費)。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末萧落,一起剝皮案震驚了整個濱河市践美,隨后出現(xiàn)的幾起案子洗贰,更是在濱河造成了極大的恐慌,老刑警劉巖陨倡,帶你破解...
    沈念sama閱讀 206,311評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件敛滋,死亡現(xiàn)場離奇詭異,居然都是意外死亡玫膀,警方通過查閱死者的電腦和手機矛缨,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,339評論 2 382
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來帖旨,“玉大人箕昭,你說我怎么就攤上這事〗庠模” “怎么了落竹?”我有些...
    開封第一講書人閱讀 152,671評論 0 342
  • 文/不壞的土叔 我叫張陵,是天一觀的道長货抄。 經(jīng)常有香客問我述召,道長,這世上最難降的妖魔是什么蟹地? 我笑而不...
    開封第一講書人閱讀 55,252評論 1 279
  • 正文 為了忘掉前任积暖,我火速辦了婚禮,結(jié)果婚禮上怪与,老公的妹妹穿的比我還像新娘夺刑。我一直安慰自己,他們只是感情好分别,可當(dāng)我...
    茶點故事閱讀 64,253評論 5 371
  • 文/花漫 我一把揭開白布遍愿。 她就那樣靜靜地躺著,像睡著了一般耘斩。 火紅的嫁衣襯著肌膚如雪沼填。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,031評論 1 285
  • 那天括授,我揣著相機與錄音坞笙,去河邊找鬼。 笑死荚虚,一個胖子當(dāng)著我的面吹牛薛夜,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播曲管,決...
    沈念sama閱讀 38,340評論 3 399
  • 文/蒼蘭香墨 我猛地睜開眼却邓,長吁一口氣:“原來是場噩夢啊……” “哼硕糊!你這毒婦竟也來了院水?” 一聲冷哼從身側(cè)響起腊徙,我...
    開封第一講書人閱讀 36,973評論 0 259
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎檬某,沒想到半個月后撬腾,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 43,466評論 1 300
  • 正文 獨居荒郊野嶺守林人離奇死亡恢恼,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 35,937評論 2 323
  • 正文 我和宋清朗相戀三年民傻,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片场斑。...
    茶點故事閱讀 38,039評論 1 333
  • 序言:一個原本活蹦亂跳的男人離奇死亡漓踢,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出漏隐,到底是詐尸還是另有隱情喧半,我是刑警寧澤,帶...
    沈念sama閱讀 33,701評論 4 323
  • 正文 年R本政府宣布青责,位于F島的核電站挺据,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏脖隶。R本人自食惡果不足惜扁耐,卻給世界環(huán)境...
    茶點故事閱讀 39,254評論 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望产阱。 院中可真熱鬧婉称,春花似錦、人聲如沸心墅。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,259評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽怎燥。三九已至瘫筐,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間铐姚,已是汗流浹背策肝。 一陣腳步聲響...
    開封第一講書人閱讀 31,485評論 1 262
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留隐绵,地道東北人之众。 一個月前我還...
    沈念sama閱讀 45,497評論 2 354
  • 正文 我出身青樓,卻偏偏與公主長得像依许,于是被迫代替她去往敵國和親棺禾。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 42,786評論 2 345

推薦閱讀更多精彩內(nèi)容

  • http://liuxing.info/2017/06/30/Spring%20AMQP%E4%B8%AD%E6%...
    sherlock_6981閱讀 15,869評論 2 11
  • 0 相關(guān)源碼 1 你將學(xué)到 如何保證消息百分百投遞成功 冪等性 如何避免海量訂單生成時消息的重復(fù)消費 Confir...
    JavaEdge閱讀 1,388評論 0 9
  • 《羅夢幽門》 屈犁緣創(chuàng)新生篇峭跳, 飲是城雙閂趕門膘婶。 唯家以諾亂仗淵缺前, 親景羅莎集烏恬。 支朱鎖那圈層賽悬襟, 意語頻追命...
    春城怡景閱讀 373評論 4 15
  • 在我的世界分大快樂和小快樂衅码。 吃飯逛街看電影是小快樂。 和喜歡的人吃飯談心脊岳,和家人逛街逝段,和朋友討論電影情節(jié)是大快樂...
    思思有片海閱讀 535評論 0 0
  • 如果你想培養(yǎng)一個新習(xí)慣,微習(xí)慣基本上就是它的大幅度縮減版割捅,把每天100個俯臥撐縮減為每天1個奶躯,每天寫3000字縮減...
    周筠桐閱讀 161評論 0 1