RabbitMQ如何解決各種情況下丟數(shù)據(jù)的問題

1.生產(chǎn)者丟數(shù)據(jù)

如果想學(xué)習(xí)Java工程化蜜暑、高性能及分布式晨川、深入淺出画恰。微服務(wù)彭谁、Spring,MyBatis允扇,Netty源碼分析的朋友可以加我的Java高級(jí)交流:854630135缠局,群里有阿里大牛直播講解技術(shù),以及Java大型互聯(lián)網(wǎng)技術(shù)的視頻免費(fèi)分享給大家考润。

生產(chǎn)者的消息沒有投遞到MQ中怎么辦狭园?從生產(chǎn)者弄丟數(shù)據(jù)這個(gè)角度來看,RabbitMQ提供transaction和confirm模式來確保生產(chǎn)者不丟消息糊治。transaction機(jī)制就是說唱矛,發(fā)送消息前,開啟事物(channel.txSelect()),然后發(fā)送消息绎谦,如果發(fā)送過程中出現(xiàn)什么異常管闷,事物就會(huì)回滾(channel.txRollback())境输,如果發(fā)送成功則提交事物(channel.txCommit())逐纬。然而缺點(diǎn)就是吞吐量下降了。因此摹察,按照博主的經(jīng)驗(yàn)冤留,生產(chǎn)上用confirm模式的居多赃蛛。一旦channel進(jìn)入confirm模式,所有在該信道上面發(fā)布的消息都將會(huì)被指派一個(gè)唯一的ID(從1開始)搀菩,一旦消息被投遞到所有匹配的隊(duì)列之后,rabbitMQ就會(huì)發(fā)送一個(gè)Ack給生產(chǎn)者(包含消息的唯一ID)破托,這就使得生產(chǎn)者知道消息已經(jīng)正確到達(dá)目的隊(duì)列了.如果rabiitMQ沒能處理該消息肪跋,則會(huì)發(fā)送一個(gè)Nack消息給你,你可以進(jìn)行重試操作土砂。

下面演示一下confirm模式:

//測試確認(rèn)后回調(diào)@ServicepublicclassHelloSender1implementsRabbitTemplate.ConfirmCallback{@AutowiredprivateRabbitTemplate rabbitTemplate;publicvoidsend(){ String context ="你好現(xiàn)在是 "+newDate() +""; System.out.println("HelloSender發(fā)送內(nèi)容 : "+ context);this.rabbitTemplate.setConfirmCallback(this);//exchange,queue 都正確,confirm被回調(diào), ack=true//this.rabbitTemplate.convertAndSend("exchange","topic.message", context);//exchange 錯(cuò)誤,queue 正確,confirm被回調(diào), ack=false//this.rabbitTemplate.convertAndSend("fasss","topic.message", context);//exchange 正確,queue 錯(cuò)誤 ,confirm被回調(diào), ack=true; return被回調(diào) replyText:NO_ROUTE//this.rabbitTemplate.convertAndSend("exchange","", context);//exchange 錯(cuò)誤,queue 錯(cuò)誤,confirm被回調(diào), ack=falsethis.rabbitTemplate.convertAndSend("fasss","fass", context); }@Overridepublicvoidconfirm(CorrelationData correlationData,booleanack, String cause){ System.out.println("confirm--:correlationData:"+correlationData+",ack:"+ack+",cause:"+cause); }}

2.消息隊(duì)列丟數(shù)據(jù)

處理消息隊(duì)列丟數(shù)據(jù)的情況州既,一般是開啟持久化磁盤的配置。這個(gè)持久化配置可以和confirm機(jī)制配合使用萝映,你可以在消息持久化磁盤后吴叶,再給生產(chǎn)者發(fā)送一個(gè)Ack信號(hào)。這樣序臂,如果消息持久化磁盤之前蚌卤,rabbitMQ陣亡了,那么生產(chǎn)者收不到Ack信號(hào)奥秆,生產(chǎn)者會(huì)自動(dòng)重發(fā)逊彭。那么如何持久化呢,這里順便說一下吧构订,其實(shí)也很容易侮叮,就下面兩步①、將queue的持久化標(biāo)識(shí)durable設(shè)置為true,則代表是一個(gè)持久的隊(duì)列②悼瘾、發(fā)送消息的時(shí)候?qū)eliveryMode=2這樣設(shè)置以后囊榜,rabbitMQ就算掛了,重啟后也能恢復(fù)數(shù)據(jù)亥宿。在消息還沒有持久化到硬盤時(shí)卸勺,可能服務(wù)已經(jīng)死掉,這種情況可以通過引入mirrored-queue即鏡像隊(duì)列箩绍,但也不能保證消息百分百不丟失(整個(gè)集群都掛掉)

/**

* 第二個(gè)參數(shù):queue的持久化是通過durable=true來實(shí)現(xiàn)的孔庭。

* 第三個(gè)參數(shù):exclusive:排他隊(duì)列,如果一個(gè)隊(duì)列被聲明為排他隊(duì)列,該隊(duì)列僅對(duì)首次申明它的連接可見圆到,并在連接斷開時(shí)自動(dòng)刪除怎抛。這里需要注意三點(diǎn):

   1. 排他隊(duì)列是基于連接可見的,同一連接的不同信道是可以同時(shí)訪問同一連接創(chuàng)建的排他隊(duì)列芽淡;

   2.“首次”马绝,如果一個(gè)連接已經(jīng)聲明了一個(gè)排他隊(duì)列,其他連接是不允許建立同名的排他隊(duì)列的挣菲,這個(gè)與普通隊(duì)列不同富稻;

   3.即使該隊(duì)列是持久化的,一旦連接關(guān)閉或者客戶端退出白胀,該排他隊(duì)列都會(huì)被自動(dòng)刪除的椭赋,這種隊(duì)列適用于一個(gè)客戶端發(fā)送讀取消息的應(yīng)用場景。

* 第四個(gè)參數(shù):自動(dòng)刪除或杠,如果該隊(duì)列沒有任何訂閱的消費(fèi)者的話哪怔,該隊(duì)列會(huì)被自動(dòng)刪除。這種隊(duì)列適用于臨時(shí)隊(duì)列向抢。

* @param

* @return

* @Author zxj

*/@BeanpublicQueuequeue() {Map arguments =newHashMap<>(); arguments.put("x-message-ttl",25000);//25秒自動(dòng)刪除Queuequeue=newQueue("topic.messages",true,false,true, arguments);returnqueue; }

MessagePropertiesproperties=newMessageProperties();properties.setContentType(MessageProperties.DEFAULT_CONTENT_TYPE);properties.setDeliveryMode(MessageProperties.DEFAULT_DELIVERY_MODE);//持久化設(shè)置properties.setExpiration("2018-12-15 23:23:23");//設(shè)置到期時(shí)間 Message message=newMessage("hello".getBytes(),properties); this.rabbitTemplate.sendAndReceive("exchange","topic.message",message);

3.消費(fèi)者丟數(shù)據(jù)

啟用手動(dòng)確認(rèn)模式可以解決這個(gè)問題①自動(dòng)確認(rèn)模式认境,消費(fèi)者掛掉,待ack的消息回歸到隊(duì)列中挟鸠。消費(fèi)者拋出異常叉信,消息會(huì)不斷的被重發(fā),直到處理成功艘希。不會(huì)丟失消息硼身,即便服務(wù)掛掉,沒有處理完成的消息會(huì)重回隊(duì)列枢冤,但是異常會(huì)讓消息不斷重試鸠姨。②手動(dòng)確認(rèn)模式③不確認(rèn)模式,acknowledge="none"不使用確認(rèn)機(jī)制淹真,只要消息發(fā)送完成會(huì)立即在隊(duì)列移除讶迁,無論客戶端異常還是斷開,只要發(fā)送完就移除核蘸,不會(huì)重發(fā)巍糯。

如果想學(xué)習(xí)Java工程化、高性能及分布式客扎、深入淺出祟峦。微服務(wù)、Spring徙鱼,MyBatis宅楞,Netty源碼分析的朋友可以加我的Java高級(jí)交流:854630135针姿,群里有阿里大牛直播講解技術(shù),以及Java大型互聯(lián)網(wǎng)技術(shù)的視頻免費(fèi)分享給大家厌衙。

指定Acknowledge的模式:spring.rabbitmq.listener.direct.acknowledge-mode=manual距淫,表示該監(jiān)聽器手動(dòng)應(yīng)答消息針對(duì)手動(dòng)確認(rèn)模式,有以下特點(diǎn):1.使用手動(dòng)應(yīng)答消息婶希,有一點(diǎn)需要特別注意榕暇,那就是不能忘記應(yīng)答消息,因?yàn)閷?duì)于RabbitMQ來說處理消息沒有超時(shí)喻杈,只要不應(yīng)答消息彤枢,他就會(huì)認(rèn)為仍在正常處理消息,導(dǎo)致消息隊(duì)列出現(xiàn)阻塞筒饰,影響業(yè)務(wù)執(zhí)行缴啡。2.如果消費(fèi)者來不及處理就死掉時(shí),沒有響應(yīng)ack時(shí)瓷们,會(huì)項(xiàng)目啟動(dòng)后會(huì)重復(fù)發(fā)送一條信息給其他消費(fèi)者盟猖;3.可以選擇丟棄消息,這其實(shí)也是一種應(yīng)答换棚,如下,這樣就不會(huì)再次收到這條消息反镇。channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,false);4.如果消費(fèi)者設(shè)置了手動(dòng)應(yīng)答模式固蚤,并且設(shè)置了重試,出現(xiàn)異常時(shí)無論是否捕獲了異常歹茶,都是不會(huì)重試的5.如果消費(fèi)者沒有設(shè)置手動(dòng)應(yīng)答模式夕玩,并且設(shè)置了重試,那么在出現(xiàn)異常時(shí)沒有捕獲異常會(huì)進(jìn)行重試惊豺,如果捕獲了異常不會(huì)重試燎孟。

重試機(jī)制:

spring.rabbitmq.listener.simple.retry.max-attempts=5最大重試次數(shù)spring.rabbitmq.listener.simple.retry.enabled=true 是否開啟消費(fèi)者重試(為false時(shí)關(guān)閉消費(fèi)者重試,這時(shí)消費(fèi)端代碼異常會(huì)一直重復(fù)收到消息)spring.rabbitmq.listener.simple.retry.initial-interval=5000重試間隔時(shí)間(單位毫秒)spring.rabbitmq.listener.simple.default-requeue-rejected=false 重試次數(shù)超過上面的設(shè)置之后是否丟棄(false不丟棄時(shí)需要寫相應(yīng)代碼將該消息加入死信隊(duì)列)

如果設(shè)置了重試模式尸昧,那么在出現(xiàn)異常時(shí)沒有捕獲異常會(huì)進(jìn)行重試揩页,如果捕獲了異常不會(huì)重試。

當(dāng)出現(xiàn)異常時(shí)烹俗,我們需要把這個(gè)消息回滾到消息隊(duì)列爆侣,有兩種方式:

//ack返回false,并重新回到隊(duì)列幢妄,api里面解釋得很清楚

//ack返回false兔仰,并重新回到隊(duì)列,api里面解釋得很清楚channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);//拒絕消息channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);

經(jīng)過開發(fā)中的實(shí)際測試蕉鸳,當(dāng)消息回滾到消息隊(duì)列時(shí)乎赴,這條消息不會(huì)回到隊(duì)列尾部,而是仍是在隊(duì)列頭部,這時(shí)消費(fèi)者會(huì)立馬又接收到這條消息進(jìn)行處理榕吼,接著拋出異常饿序,進(jìn)行 回滾,如此反復(fù)進(jìn)行友题。這種情況會(huì)導(dǎo)致消息隊(duì)列處理出現(xiàn)阻塞嗤堰,消息堆積,導(dǎo)致正常消息也無法運(yùn)行度宦。對(duì)于消息回滾到消息隊(duì)列踢匣,我們希望比較理想的方式時(shí)出現(xiàn)異常的消息到 達(dá)消息隊(duì)列尾部,這樣既保證消息不會(huì)丟失戈抄,又保證了正常業(yè)務(wù)的進(jìn)行离唬,因此我們采取的解決方案是,將消息進(jìn)行應(yīng)答划鸽,這時(shí)消息隊(duì)列會(huì)刪除該消息输莺,同時(shí)我們?cè)俅伟l(fā)送該消息 到消息隊(duì)列,這時(shí)就實(shí)現(xiàn)了錯(cuò)誤消息進(jìn)行消息隊(duì)列尾部的方案裸诽。

//手動(dòng)進(jìn)行應(yīng)答channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);//重新發(fā)送消息到隊(duì)尾channel.basicPublish(message.getMessageProperties().getReceivedExchange(),message.getMessageProperties().getReceivedRoutingKey(), MessageProperties.PERSISTENT_TEXT_PLAIN, JSON.toJSONBytes(newObject()));

如果一個(gè)消息體本身有誤嫂用,會(huì)導(dǎo)致該消息體,一直無法進(jìn)行處理丈冬,而服務(wù)器中刷出大量無用日志嘱函。解決這個(gè)問題可以采取兩種方案:

1.一種是對(duì)于日常細(xì)致處理,分清哪些是可以恢復(fù)的異常埂蕊,哪些是不可以恢復(fù)的異常往弓。對(duì)于可以恢復(fù)的異常我們采取第三條中的解決方案,對(duì)于不可以處理的異常蓄氧,我們采用記錄日志函似,直接丟棄該消息方案。

2.另一種是我們對(duì)每條消息進(jìn)行標(biāo)記喉童,記錄每條消息的處理次數(shù)撇寞,當(dāng)一條消息,多次處理仍不能成功時(shí)堂氯,處理次數(shù)到達(dá)我們?cè)O(shè)置的值時(shí)重抖,我們就丟棄該消息,但需要記錄詳細(xì)的日志祖灰。

消息監(jiān)聽內(nèi)的異常處理有兩種方式:

1.內(nèi)部catch后直接處理钟沛,然后使用channel對(duì)消息進(jìn)行確認(rèn)

2.配置RepublishMessageRecoverer將處理異常的消息發(fā)送到指定隊(duì)列專門處理或記錄。監(jiān)聽的方法內(nèi)拋出異常貌似沒有太大用處局扶。因?yàn)閽伋霎惓>退闶侵卦囈卜浅S锌赡軙?huì)繼續(xù)出現(xiàn)異常恨统,當(dāng)重試次數(shù)完了之后消息就只有重啟應(yīng)用才能接收到了叁扫,很有可能導(dǎo)致消息消費(fèi)不及時(shí)。當(dāng)然可以配置RepublishMessageRecoverer來解決畜埋,但是萬一RepublishMessageRecoverer發(fā)送失敗了呢莫绣。。那就可能造成消息消費(fèi)不及時(shí)了悠鞍。所以即使需要將處理出現(xiàn)異常的消息統(tǒng)一放到另外隊(duì)列去處理对室,個(gè)人建議兩種方式:

①catch異常后,手動(dòng)發(fā)送到指定隊(duì)列咖祭,然后使用channel給rabbitmq確認(rèn)消息已消費(fèi)

②給Queue綁定死信隊(duì)列掩宜,使用nack(requque為false)確認(rèn)消息消費(fèi)失敗

如果想學(xué)習(xí)Java工程化、高性能及分布式么翰、深入淺出牺汤。微服務(wù)、Spring浩嫌,MyBatis檐迟,Netty源碼分析的朋友可以加我的Java高級(jí)交流:854630135,群里有阿里大牛直播講解技術(shù)码耐,以及Java大型互聯(lián)網(wǎng)技術(shù)的視頻免費(fèi)分享給大家追迟。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市骚腥,隨后出現(xiàn)的幾起案子怔匣,更是在濱河造成了極大的恐慌,老刑警劉巖桦沉,帶你破解...
    沈念sama閱讀 212,816評(píng)論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異金闽,居然都是意外死亡纯露,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,729評(píng)論 3 385
  • 文/潘曉璐 我一進(jìn)店門代芜,熙熙樓的掌柜王于貴愁眉苦臉地迎上來埠褪,“玉大人,你說我怎么就攤上這事挤庇〕伲” “怎么了?”我有些...
    開封第一講書人閱讀 158,300評(píng)論 0 348
  • 文/不壞的土叔 我叫張陵嫡秕,是天一觀的道長渴语。 經(jīng)常有香客問我,道長昆咽,這世上最難降的妖魔是什么驾凶? 我笑而不...
    開封第一講書人閱讀 56,780評(píng)論 1 285
  • 正文 為了忘掉前任牙甫,我火速辦了婚禮,結(jié)果婚禮上调违,老公的妹妹穿的比我還像新娘窟哺。我一直安慰自己,他們只是感情好技肩,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,890評(píng)論 6 385
  • 文/花漫 我一把揭開白布且轨。 她就那樣靜靜地躺著,像睡著了一般虚婿。 火紅的嫁衣襯著肌膚如雪旋奢。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 50,084評(píng)論 1 291
  • 那天雳锋,我揣著相機(jī)與錄音黄绩,去河邊找鬼。 笑死玷过,一個(gè)胖子當(dāng)著我的面吹牛爽丹,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播辛蚊,決...
    沈念sama閱讀 39,151評(píng)論 3 410
  • 文/蒼蘭香墨 我猛地睜開眼粤蝎,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了袋马?” 一聲冷哼從身側(cè)響起初澎,我...
    開封第一講書人閱讀 37,912評(píng)論 0 268
  • 序言:老撾萬榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎虑凛,沒想到半個(gè)月后碑宴,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,355評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡桑谍,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,666評(píng)論 2 327
  • 正文 我和宋清朗相戀三年延柠,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片锣披。...
    茶點(diǎn)故事閱讀 38,809評(píng)論 1 341
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡贞间,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出雹仿,到底是詐尸還是另有隱情增热,我是刑警寧澤,帶...
    沈念sama閱讀 34,504評(píng)論 4 334
  • 正文 年R本政府宣布胧辽,位于F島的核電站峻仇,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏邑商。R本人自食惡果不足惜础浮,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 40,150評(píng)論 3 317
  • 文/蒙蒙 一帆调、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧豆同,春花似錦番刊、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,882評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至鸭廷,卻和暖如春枣抱,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背辆床。 一陣腳步聲響...
    開封第一講書人閱讀 32,121評(píng)論 1 267
  • 我被黑心中介騙來泰國打工佳晶, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人讼载。 一個(gè)月前我還...
    沈念sama閱讀 46,628評(píng)論 2 362
  • 正文 我出身青樓轿秧,卻偏偏與公主長得像,于是被迫代替她去往敵國和親咨堤。 傳聞我的和親對(duì)象是個(gè)殘疾皇子菇篡,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,724評(píng)論 2 351

推薦閱讀更多精彩內(nèi)容