1.生產(chǎn)者丟數(shù)據(jù)
如果想學(xué)習(xí)Java工程化蜜暑、高性能及分布式晨川、深入淺出画恰。微服務(wù)彭谁、Spring,MyBatis允扇,Netty源碼分析的朋友可以加我的Java高級(jí)交流:854630135缠局,群里有阿里大牛直播講解技術(shù),以及Java大型互聯(lián)網(wǎng)技術(shù)的視頻免費(fèi)分享給大家考润。
生產(chǎn)者的消息沒有投遞到MQ中怎么辦狭园?從生產(chǎn)者弄丟數(shù)據(jù)這個(gè)角度來看,RabbitMQ提供transaction和confirm模式來確保生產(chǎn)者不丟消息糊治。transaction機(jī)制就是說唱矛,發(fā)送消息前,開啟事物(channel.txSelect()),然后發(fā)送消息绎谦,如果發(fā)送過程中出現(xiàn)什么異常管闷,事物就會(huì)回滾(channel.txRollback())境输,如果發(fā)送成功則提交事物(channel.txCommit())逐纬。然而缺點(diǎn)就是吞吐量下降了。因此摹察,按照博主的經(jīng)驗(yàn)冤留,生產(chǎn)上用confirm模式的居多赃蛛。一旦channel進(jìn)入confirm模式,所有在該信道上面發(fā)布的消息都將會(huì)被指派一個(gè)唯一的ID(從1開始)搀菩,一旦消息被投遞到所有匹配的隊(duì)列之后,rabbitMQ就會(huì)發(fā)送一個(gè)Ack給生產(chǎn)者(包含消息的唯一ID)破托,這就使得生產(chǎn)者知道消息已經(jīng)正確到達(dá)目的隊(duì)列了.如果rabiitMQ沒能處理該消息肪跋,則會(huì)發(fā)送一個(gè)Nack消息給你,你可以進(jìn)行重試操作土砂。
下面演示一下confirm模式:
//測試確認(rèn)后回調(diào)@ServicepublicclassHelloSender1implementsRabbitTemplate.ConfirmCallback{@AutowiredprivateRabbitTemplate rabbitTemplate;publicvoidsend(){ String context ="你好現(xiàn)在是 "+newDate() +""; System.out.println("HelloSender發(fā)送內(nèi)容 : "+ context);this.rabbitTemplate.setConfirmCallback(this);//exchange,queue 都正確,confirm被回調(diào), ack=true//this.rabbitTemplate.convertAndSend("exchange","topic.message", context);//exchange 錯(cuò)誤,queue 正確,confirm被回調(diào), ack=false//this.rabbitTemplate.convertAndSend("fasss","topic.message", context);//exchange 正確,queue 錯(cuò)誤 ,confirm被回調(diào), ack=true; return被回調(diào) replyText:NO_ROUTE//this.rabbitTemplate.convertAndSend("exchange","", context);//exchange 錯(cuò)誤,queue 錯(cuò)誤,confirm被回調(diào), ack=falsethis.rabbitTemplate.convertAndSend("fasss","fass", context); }@Overridepublicvoidconfirm(CorrelationData correlationData,booleanack, String cause){ System.out.println("confirm--:correlationData:"+correlationData+",ack:"+ack+",cause:"+cause); }}
2.消息隊(duì)列丟數(shù)據(jù)
處理消息隊(duì)列丟數(shù)據(jù)的情況州既,一般是開啟持久化磁盤的配置。這個(gè)持久化配置可以和confirm機(jī)制配合使用萝映,你可以在消息持久化磁盤后吴叶,再給生產(chǎn)者發(fā)送一個(gè)Ack信號(hào)。這樣序臂,如果消息持久化磁盤之前蚌卤,rabbitMQ陣亡了,那么生產(chǎn)者收不到Ack信號(hào)奥秆,生產(chǎn)者會(huì)自動(dòng)重發(fā)逊彭。那么如何持久化呢,這里順便說一下吧构订,其實(shí)也很容易侮叮,就下面兩步①、將queue的持久化標(biāo)識(shí)durable設(shè)置為true,則代表是一個(gè)持久的隊(duì)列②悼瘾、發(fā)送消息的時(shí)候?qū)eliveryMode=2這樣設(shè)置以后囊榜,rabbitMQ就算掛了,重啟后也能恢復(fù)數(shù)據(jù)亥宿。在消息還沒有持久化到硬盤時(shí)卸勺,可能服務(wù)已經(jīng)死掉,這種情況可以通過引入mirrored-queue即鏡像隊(duì)列箩绍,但也不能保證消息百分百不丟失(整個(gè)集群都掛掉)
/**
* 第二個(gè)參數(shù):queue的持久化是通過durable=true來實(shí)現(xiàn)的孔庭。
* 第三個(gè)參數(shù):exclusive:排他隊(duì)列,如果一個(gè)隊(duì)列被聲明為排他隊(duì)列,該隊(duì)列僅對(duì)首次申明它的連接可見圆到,并在連接斷開時(shí)自動(dòng)刪除怎抛。這里需要注意三點(diǎn):
1. 排他隊(duì)列是基于連接可見的,同一連接的不同信道是可以同時(shí)訪問同一連接創(chuàng)建的排他隊(duì)列芽淡;
2.“首次”马绝,如果一個(gè)連接已經(jīng)聲明了一個(gè)排他隊(duì)列,其他連接是不允許建立同名的排他隊(duì)列的挣菲,這個(gè)與普通隊(duì)列不同富稻;
3.即使該隊(duì)列是持久化的,一旦連接關(guān)閉或者客戶端退出白胀,該排他隊(duì)列都會(huì)被自動(dòng)刪除的椭赋,這種隊(duì)列適用于一個(gè)客戶端發(fā)送讀取消息的應(yīng)用場景。
* 第四個(gè)參數(shù):自動(dòng)刪除或杠,如果該隊(duì)列沒有任何訂閱的消費(fèi)者的話哪怔,該隊(duì)列會(huì)被自動(dòng)刪除。這種隊(duì)列適用于臨時(shí)隊(duì)列向抢。
* @param
* @return
* @Author zxj
*/@BeanpublicQueuequeue() {Map arguments =newHashMap<>(); arguments.put("x-message-ttl",25000);//25秒自動(dòng)刪除Queuequeue=newQueue("topic.messages",true,false,true, arguments);returnqueue; }
MessagePropertiesproperties=newMessageProperties();properties.setContentType(MessageProperties.DEFAULT_CONTENT_TYPE);properties.setDeliveryMode(MessageProperties.DEFAULT_DELIVERY_MODE);//持久化設(shè)置properties.setExpiration("2018-12-15 23:23:23");//設(shè)置到期時(shí)間 Message message=newMessage("hello".getBytes(),properties); this.rabbitTemplate.sendAndReceive("exchange","topic.message",message);
3.消費(fèi)者丟數(shù)據(jù)
啟用手動(dòng)確認(rèn)模式可以解決這個(gè)問題①自動(dòng)確認(rèn)模式认境,消費(fèi)者掛掉,待ack的消息回歸到隊(duì)列中挟鸠。消費(fèi)者拋出異常叉信,消息會(huì)不斷的被重發(fā),直到處理成功艘希。不會(huì)丟失消息硼身,即便服務(wù)掛掉,沒有處理完成的消息會(huì)重回隊(duì)列枢冤,但是異常會(huì)讓消息不斷重試鸠姨。②手動(dòng)確認(rèn)模式③不確認(rèn)模式,acknowledge="none"不使用確認(rèn)機(jī)制淹真,只要消息發(fā)送完成會(huì)立即在隊(duì)列移除讶迁,無論客戶端異常還是斷開,只要發(fā)送完就移除核蘸,不會(huì)重發(fā)巍糯。
如果想學(xué)習(xí)Java工程化、高性能及分布式客扎、深入淺出祟峦。微服務(wù)、Spring徙鱼,MyBatis宅楞,Netty源碼分析的朋友可以加我的Java高級(jí)交流:854630135针姿,群里有阿里大牛直播講解技術(shù),以及Java大型互聯(lián)網(wǎng)技術(shù)的視頻免費(fèi)分享給大家厌衙。
指定Acknowledge的模式:spring.rabbitmq.listener.direct.acknowledge-mode=manual距淫,表示該監(jiān)聽器手動(dòng)應(yīng)答消息針對(duì)手動(dòng)確認(rèn)模式,有以下特點(diǎn):1.使用手動(dòng)應(yīng)答消息婶希,有一點(diǎn)需要特別注意榕暇,那就是不能忘記應(yīng)答消息,因?yàn)閷?duì)于RabbitMQ來說處理消息沒有超時(shí)喻杈,只要不應(yīng)答消息彤枢,他就會(huì)認(rèn)為仍在正常處理消息,導(dǎo)致消息隊(duì)列出現(xiàn)阻塞筒饰,影響業(yè)務(wù)執(zhí)行缴啡。2.如果消費(fèi)者來不及處理就死掉時(shí),沒有響應(yīng)ack時(shí)瓷们,會(huì)項(xiàng)目啟動(dòng)后會(huì)重復(fù)發(fā)送一條信息給其他消費(fèi)者盟猖;3.可以選擇丟棄消息,這其實(shí)也是一種應(yīng)答换棚,如下,這樣就不會(huì)再次收到這條消息反镇。channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,false);4.如果消費(fèi)者設(shè)置了手動(dòng)應(yīng)答模式固蚤,并且設(shè)置了重試,出現(xiàn)異常時(shí)無論是否捕獲了異常歹茶,都是不會(huì)重試的5.如果消費(fèi)者沒有設(shè)置手動(dòng)應(yīng)答模式夕玩,并且設(shè)置了重試,那么在出現(xiàn)異常時(shí)沒有捕獲異常會(huì)進(jìn)行重試惊豺,如果捕獲了異常不會(huì)重試燎孟。
重試機(jī)制:
spring.rabbitmq.listener.simple.retry.max-attempts=5最大重試次數(shù)spring.rabbitmq.listener.simple.retry.enabled=true 是否開啟消費(fèi)者重試(為false時(shí)關(guān)閉消費(fèi)者重試,這時(shí)消費(fèi)端代碼異常會(huì)一直重復(fù)收到消息)spring.rabbitmq.listener.simple.retry.initial-interval=5000重試間隔時(shí)間(單位毫秒)spring.rabbitmq.listener.simple.default-requeue-rejected=false 重試次數(shù)超過上面的設(shè)置之后是否丟棄(false不丟棄時(shí)需要寫相應(yīng)代碼將該消息加入死信隊(duì)列)
如果設(shè)置了重試模式尸昧,那么在出現(xiàn)異常時(shí)沒有捕獲異常會(huì)進(jìn)行重試揩页,如果捕獲了異常不會(huì)重試。
當(dāng)出現(xiàn)異常時(shí)烹俗,我們需要把這個(gè)消息回滾到消息隊(duì)列爆侣,有兩種方式:
//ack返回false,并重新回到隊(duì)列幢妄,api里面解釋得很清楚
//ack返回false兔仰,并重新回到隊(duì)列,api里面解釋得很清楚channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);//拒絕消息channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);
經(jīng)過開發(fā)中的實(shí)際測試蕉鸳,當(dāng)消息回滾到消息隊(duì)列時(shí)乎赴,這條消息不會(huì)回到隊(duì)列尾部,而是仍是在隊(duì)列頭部,這時(shí)消費(fèi)者會(huì)立馬又接收到這條消息進(jìn)行處理榕吼,接著拋出異常饿序,進(jìn)行 回滾,如此反復(fù)進(jìn)行友题。這種情況會(huì)導(dǎo)致消息隊(duì)列處理出現(xiàn)阻塞嗤堰,消息堆積,導(dǎo)致正常消息也無法運(yùn)行度宦。對(duì)于消息回滾到消息隊(duì)列踢匣,我們希望比較理想的方式時(shí)出現(xiàn)異常的消息到 達(dá)消息隊(duì)列尾部,這樣既保證消息不會(huì)丟失戈抄,又保證了正常業(yè)務(wù)的進(jìn)行离唬,因此我們采取的解決方案是,將消息進(jìn)行應(yīng)答划鸽,這時(shí)消息隊(duì)列會(huì)刪除該消息输莺,同時(shí)我們?cè)俅伟l(fā)送該消息 到消息隊(duì)列,這時(shí)就實(shí)現(xiàn)了錯(cuò)誤消息進(jìn)行消息隊(duì)列尾部的方案裸诽。
//手動(dòng)進(jìn)行應(yīng)答channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);//重新發(fā)送消息到隊(duì)尾channel.basicPublish(message.getMessageProperties().getReceivedExchange(),message.getMessageProperties().getReceivedRoutingKey(), MessageProperties.PERSISTENT_TEXT_PLAIN, JSON.toJSONBytes(newObject()));
如果一個(gè)消息體本身有誤嫂用,會(huì)導(dǎo)致該消息體,一直無法進(jìn)行處理丈冬,而服務(wù)器中刷出大量無用日志嘱函。解決這個(gè)問題可以采取兩種方案:
1.一種是對(duì)于日常細(xì)致處理,分清哪些是可以恢復(fù)的異常埂蕊,哪些是不可以恢復(fù)的異常往弓。對(duì)于可以恢復(fù)的異常我們采取第三條中的解決方案,對(duì)于不可以處理的異常蓄氧,我們采用記錄日志函似,直接丟棄該消息方案。
2.另一種是我們對(duì)每條消息進(jìn)行標(biāo)記喉童,記錄每條消息的處理次數(shù)撇寞,當(dāng)一條消息,多次處理仍不能成功時(shí)堂氯,處理次數(shù)到達(dá)我們?cè)O(shè)置的值時(shí)重抖,我們就丟棄該消息,但需要記錄詳細(xì)的日志祖灰。
消息監(jiān)聽內(nèi)的異常處理有兩種方式:
1.內(nèi)部catch后直接處理钟沛,然后使用channel對(duì)消息進(jìn)行確認(rèn)
2.配置RepublishMessageRecoverer將處理異常的消息發(fā)送到指定隊(duì)列專門處理或記錄。監(jiān)聽的方法內(nèi)拋出異常貌似沒有太大用處局扶。因?yàn)閽伋霎惓>退闶侵卦囈卜浅S锌赡軙?huì)繼續(xù)出現(xiàn)異常恨统,當(dāng)重試次數(shù)完了之后消息就只有重啟應(yīng)用才能接收到了叁扫,很有可能導(dǎo)致消息消費(fèi)不及時(shí)。當(dāng)然可以配置RepublishMessageRecoverer來解決畜埋,但是萬一RepublishMessageRecoverer發(fā)送失敗了呢莫绣。。那就可能造成消息消費(fèi)不及時(shí)了悠鞍。所以即使需要將處理出現(xiàn)異常的消息統(tǒng)一放到另外隊(duì)列去處理对室,個(gè)人建議兩種方式:
①catch異常后,手動(dòng)發(fā)送到指定隊(duì)列咖祭,然后使用channel給rabbitmq確認(rèn)消息已消費(fèi)
②給Queue綁定死信隊(duì)列掩宜,使用nack(requque為false)確認(rèn)消息消費(fèi)失敗
如果想學(xué)習(xí)Java工程化、高性能及分布式么翰、深入淺出牺汤。微服務(wù)、Spring浩嫌,MyBatis檐迟,Netty源碼分析的朋友可以加我的Java高級(jí)交流:854630135,群里有阿里大牛直播講解技術(shù)码耐,以及Java大型互聯(lián)網(wǎng)技術(shù)的視頻免費(fèi)分享給大家追迟。