ack機(jī)制

每個(gè)Consumer可能需要一段時(shí)間才能處理完收到的數(shù)據(jù)凡桥。如果在這個(gè)過程中,Consumer出錯(cuò)了喷好,異常退出了翔横,而數(shù)據(jù)還沒有處理完成,那么非常不幸绒窑,這段數(shù)據(jù)就丟失了棕孙。

因?yàn)槲覀儾捎胣o-ack的方式進(jìn)行確認(rèn),也就是說些膨,每次Consumer接到數(shù)據(jù)后蟀俊,而不管是否處理完 成,RabbitMQ Server會(huì)立即把這個(gè)Message標(biāo)記為完成订雾,然后從queue中刪除了肢预。
如果一個(gè)Consumer異常退出了,它處理的數(shù)據(jù)能夠被另外的Consumer處理洼哎,這樣數(shù)據(jù)在這種情況下就不會(huì)丟失了(注意是這種情況下)烫映。

為了保證數(shù)據(jù)不被丟失沼本,RabbitMQ支持消息確認(rèn)機(jī)制,即acknowledgments锭沟。為了保證數(shù)據(jù)能被正確處理而不僅僅是被Consumer收到抽兆,那么我們不能采用no-ack。而應(yīng)該是在處理完數(shù)據(jù)后發(fā)送ack族淮。即手動(dòng)ACK辫红。

在處理數(shù)據(jù)后發(fā)送的ack,就是告訴RabbitMQ數(shù)據(jù)已經(jīng)被接收祝辣,處理完成贴妻,RabbitMQ可以去安全的刪除它了。
如果Consumer退出了但是沒有發(fā)送ack蝙斜,那么RabbitMQ就會(huì)把這個(gè)Message發(fā)送到下一個(gè)Consumer名惩。這樣就保證了在Consumer異常退出的情況下數(shù)據(jù)也不會(huì)丟失。
這里并沒有用到超時(shí)機(jī)制孕荠。RabbitMQ僅僅通過Consumer的連接中斷來確認(rèn)該Message并沒有被正確處理娩鹉。也就是說,RabbitMQ給了Consumer足夠長的時(shí)間來做數(shù)據(jù)處理稚伍。
這樣即使你通過Ctr-C中斷了Recieve.cs底循,那么Message也不會(huì)丟失了,它會(huì)被分發(fā)到下一個(gè)Consumer槐瑞。如果忘記了ack熙涤,那么后果很嚴(yán)重。當(dāng)Consumer退出時(shí)困檩,Message會(huì)重新分發(fā)祠挫。然后RabbitMQ會(huì)占用越來越多的內(nèi)存,由于 RabbitMQ會(huì)長時(shí)間運(yùn)行悼沿,因此這個(gè)“內(nèi)存泄漏”是致命的等舔。
你的 consumer 代碼必須能夠處理各種異常,確保只要收到一條消息糟趾,最終一定能夠執(zhí)行一條 ACK / NACK

去調(diào)試這種錯(cuò)誤慌植,可以通過以下命令打印un-acked Messages.
如果連接沒有斷開應(yīng)用要通知服務(wù)器讓消息重新發(fā)送:
可以通過channel.nack(message)來讓不通過的消息再次進(jìn)入消息隊(duì)列。
if(body==’Hello World3!’){chnl.nack(msg);
//這樣就可以讓這個(gè)消息再次進(jìn)入隊(duì)列而不用重啟服務(wù)义郑。
}else{
console.log(‘a(chǎn)ck’);
chnl.ack(msg);
}


消費(fèi)端的手工ACK與NACK

當(dāng)我們?cè)O(shè)置 autoACK=false 時(shí)蝶柿,就可以使用手工ACK方式了,那么其實(shí)手工方式包括了手工ACK與NACK非驮。
當(dāng)我們手工 ACK 時(shí)交汤,會(huì)發(fā)送給Broker一個(gè)應(yīng)答,代表消息成功處理了劫笙,Broker就可以回送響應(yīng)給生產(chǎn)端了芙扎。NACK 則表示消息處理失敗了星岗,如果設(shè)置重回隊(duì)列,Broker端就會(huì)將沒有成功處理的消息重新發(fā)送戒洼。

使用方式

消費(fèi)端進(jìn)行消費(fèi)的時(shí)候俏橘,如果由于業(yè)務(wù)異常我們可以手工 NACK 并進(jìn)行日志的記錄,然后進(jìn)行補(bǔ)償圈浇!
方法:void basicNack(long deliveryTag, boolean multiple, boolean requeue)
requeue為true敷矫,表示deliveryTag=n之前未確認(rèn)的消息都處理失敗且將這些消息重新放回隊(duì)列中。
requeue為false,表示deliveryTag=n之前未確認(rèn)的消息都處理失敗且將這些消息直接丟棄汉额。

如果由于服務(wù)器宕機(jī)等嚴(yán)重問題,那我們就需要手工進(jìn)行 ACK 保障消費(fèi)端消費(fèi)成功榨汤!
方法:void basicAck(long deliveryTag, boolean multiple)

消費(fèi)端的重回隊(duì)列

  • 消費(fèi)端重回隊(duì)列是為了對(duì)沒有處理成功的消息蠕搜,把消息重新會(huì)遞給Broker!
  • 重回隊(duì)列收壕,會(huì)把消費(fèi)失敗的消息重新添加到隊(duì)列的尾端妓灌,供消費(fèi)者繼續(xù)消費(fèi)。
  • 一般我們?cè)趯?shí)際應(yīng)用中蜜宪,都會(huì)關(guān)閉重回隊(duì)列虫埂,也就是設(shè)置為false

演示重回隊(duì)列

生產(chǎn)端

對(duì)消息設(shè)置自定義屬性以便進(jìn)行區(qū)分

public class Producer {
    public static void main(String[] args) throws Exception {
        //1 創(chuàng)建ConnectionFactorys
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.11.76");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        //2 獲取Connection
        Connection connection = connectionFactory.newConnection();
        //3 通過Connection創(chuàng)建一個(gè)新的Channel
        Channel channel = connection.createChannel();
        
        String exchange = "test_ack_exchange";
        String routingKey = "ack.save";
        
        for(int i =0; i<5; i ++){
            //設(shè)置消息屬性
            Map<String, Object> headers = new HashMap<String, Object>();
            headers.put("num", i);
            AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
                    .deliveryMode(2)
                    .contentEncoding("UTF-8")
                    .headers(headers)
                    .build();
            //發(fā)送消息
            String msg = "Hello RabbitMQ ACK Message " + i;
            channel.basicPublish(exchange, routingKey, true, properties, msg.getBytes());
        }   
    }
}
自定義消費(fèi)者

對(duì)第一條消息進(jìn)行NACK,并設(shè)置重回隊(duì)列

public class MyConsumer extends DefaultConsumer {

    private Channel channel ;

    public MyConsumer(Channel channel) {
        super(channel);
        this.channel = channel;
    }

    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        System.err.println("-----------consume message----------");
        System.err.println("body: " + new String(body));
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        if((Integer)properties.getHeaders().get("num") == 0) {
            //NACK圃验,參數(shù)三requeue:是否重回隊(duì)列
            channel.basicNack(envelope.getDeliveryTag(), false, true);
        } else {
            channel.basicAck(envelope.getDeliveryTag(), false);
        }
    }
}
消費(fèi)端

關(guān)閉自動(dòng)簽收功能

public class Consumer {
    
    public static void main(String[] args) throws Exception {
        
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.11.76");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        
        String exchangeName = "test_ack_exchange";
        String queueName = "test_ack_queue";
        String routingKey = "ack.#";
        //聲明交換機(jī)和隊(duì)列掉伏,然后進(jìn)行綁定設(shè)置路由Key
        channel.exchangeDeclare(exchangeName, "topic", true, false, null);
        channel.queueDeclare(queueName, true, false, false, null);
        channel.queueBind(queueName, exchangeName, routingKey);
        
        //手工簽收 必須要設(shè)置 autoAck = false
        channel.basicConsume(queueName, false, new MyConsumer(channel));
    }
}
運(yùn)行說明

先啟動(dòng)消費(fèi)端,然后啟動(dòng)生產(chǎn)端澳窑,消費(fèi)端打印如下斧散,顯然第一條消息由于我們調(diào)用了NACK,并且設(shè)置了重回隊(duì)列摊聋,所以會(huì)導(dǎo)致該條消息一直重復(fù)發(fā)送鸡捐,消費(fèi)端就會(huì)一直循環(huán)消費(fèi)

-----------consume message---------- body: Hello RabbitMQ ACK Message 0 
-----------consume message---------- body: Hello RabbitMQ ACK Message 1 
-----------consume message---------- body: Hello RabbitMQ ACK Message 2 
-----------consume message---------- body: Hello RabbitMQ ACK Message 3 
-----------consume message---------- body: Hello RabbitMQ ACK Message 4 
-----------consume message---------- body: Hello RabbitMQ ACK Message 0 
-----------consume message---------- body: Hello RabbitMQ ACK Message 0 
-----------consume message----------

一般工作中不會(huì)設(shè)置重回隊(duì)列這個(gè)屬性,我們都是自己去做補(bǔ)償或者投遞到延遲隊(duì)列里的麻裁,然后指定時(shí)間去處理即可箍镜。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市煎源,隨后出現(xiàn)的幾起案子色迂,更是在濱河造成了極大的恐慌,老刑警劉巖手销,帶你破解...
    沈念sama閱讀 222,807評(píng)論 6 518
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件脚草,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡原献,警方通過查閱死者的電腦和手機(jī)馏慨,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 95,284評(píng)論 3 399
  • 文/潘曉璐 我一進(jìn)店門埂淮,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人写隶,你說我怎么就攤上這事倔撞。” “怎么了慕趴?”我有些...
    開封第一講書人閱讀 169,589評(píng)論 0 363
  • 文/不壞的土叔 我叫張陵痪蝇,是天一觀的道長。 經(jīng)常有香客問我冕房,道長躏啰,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 60,188評(píng)論 1 300
  • 正文 為了忘掉前任耙册,我火速辦了婚禮给僵,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘详拙。我一直安慰自己帝际,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 69,185評(píng)論 6 398
  • 文/花漫 我一把揭開白布饶辙。 她就那樣靜靜地躺著蹲诀,像睡著了一般。 火紅的嫁衣襯著肌膚如雪弃揽。 梳的紋絲不亂的頭發(fā)上脯爪,一...
    開封第一講書人閱讀 52,785評(píng)論 1 314
  • 那天,我揣著相機(jī)與錄音矿微,去河邊找鬼披粟。 笑死,一個(gè)胖子當(dāng)著我的面吹牛冷冗,可吹牛的內(nèi)容都是我干的守屉。 我是一名探鬼主播,決...
    沈念sama閱讀 41,220評(píng)論 3 423
  • 文/蒼蘭香墨 我猛地睜開眼蒿辙,長吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼拇泛!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起思灌,我...
    開封第一講書人閱讀 40,167評(píng)論 0 277
  • 序言:老撾萬榮一對(duì)情侶失蹤俺叭,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后泰偿,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體熄守,經(jīng)...
    沈念sama閱讀 46,698評(píng)論 1 320
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,767評(píng)論 3 343
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了裕照。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片攒发。...
    茶點(diǎn)故事閱讀 40,912評(píng)論 1 353
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖晋南,靈堂內(nèi)的尸體忽然破棺而出惠猿,到底是詐尸還是另有隱情,我是刑警寧澤负间,帶...
    沈念sama閱讀 36,572評(píng)論 5 351
  • 正文 年R本政府宣布偶妖,位于F島的核電站,受9級(jí)特大地震影響政溃,放射性物質(zhì)發(fā)生泄漏趾访。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 42,254評(píng)論 3 336
  • 文/蒙蒙 一董虱、第九天 我趴在偏房一處隱蔽的房頂上張望扼鞋。 院中可真熱鬧,春花似錦空扎、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,746評(píng)論 0 25
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至楚殿,卻和暖如春撮慨,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背脆粥。 一陣腳步聲響...
    開封第一講書人閱讀 33,859評(píng)論 1 274
  • 我被黑心中介騙來泰國打工砌溺, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人变隔。 一個(gè)月前我還...
    沈念sama閱讀 49,359評(píng)論 3 379
  • 正文 我出身青樓规伐,卻偏偏與公主長得像,于是被迫代替她去往敵國和親匣缘。 傳聞我的和親對(duì)象是個(gè)殘疾皇子猖闪,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,922評(píng)論 2 361