activeMQ 了解一下(三)——重發(fā)機制+項目應用

  • 項目場景:

推送通知給第三方,并得到第三方的feedback徙歼,json交互

  • 問題:

推送過程遇到網(wǎng)絡異常碎乃?第三方異常姊扔?數(shù)據(jù)格式錯誤?等等問題怎么辦

【這就是重試機制了荠锭,即什么時候觸發(fā)+觸發(fā)后做什么】

  • 希望達成的目的:

1.網(wǎng)絡異常旱眯,代碼異常,對方響應碼為失敗等無關業(yè)務數(shù)據(jù)的推送失敗证九,重試發(fā)送消息删豺,上限為3次
2.數(shù)據(jù)異常,針對消息里數(shù)據(jù)解析后對方給回失敗響應愧怜,不重發(fā)呀页,記錄發(fā)送失敗

  • 實現(xiàn)方式:

1,使用activemq拥坛,mq的好處見參見文章activeMQ了解一下(一)蓬蝶。且其有重發(fā)機制尘分,剛好適合本場景(配置mq)
2,每次重發(fā)丸氛,都記錄次數(shù)培愁;視情況記錄發(fā)送結果為失敗/成功(數(shù)據(jù)庫記錄)

搭建mq見文章activeMQ了解一下(二),本文只說如何配置重發(fā)機制

【第一步缓窜,配置xml】

spring-activemq.xml如下

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"  
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  
    xmlns:context="http://www.springframework.org/schema/context"  
    xmlns:tx="http://www.springframework.org/schema/tx"  
    xmlns:mongo="http://www.springframework.org/schema/data/mongo"  
    xsi:schemaLocation="http://www.springframework.org/schema/beans    
           http://www.springframework.org/schema/beans/spring-beans-3.2.xsd    
           http://www.springframework.org/schema/aop     
           http://www.springframework.org/schema/aop/spring-aop-3.2.xsd    
           http://www.springframework.org/schema/tx    
           http://www.springframework.org/schema/tx/spring-tx-3.2.xsd    
           http://www.springframework.org/schema/context    
           http://www.springframework.org/schema/context/spring-context-3.2.xsd">  

    <context:component-scan base-package="com.latech"/>
    <!-- 重發(fā)機制  -->
    <bean id="activeMQRedeliveryPolicy" class="org.apache.activemq.RedeliveryPolicy">
        <!--是否在每次嘗試重新發(fā)送失敗后,增長這個等待時間 -->
        <property name="useExponentialBackOff" value="true"></property>
        <!--重發(fā)次數(shù),默認為6次   這里設置為2次 -->
        <property name="maximumRedeliveries" value="2"></property>
        <!--重發(fā)時間間隔,默認為5秒定续,設置為1秒 -->
        <property name="initialRedeliveryDelay" value="1000"></property>
        <!--第一次失敗后重新發(fā)送之前等待1秒,第二次失敗再等待1 * 2秒,這里的2就是value -->
        <property name="backOffMultiplier" value="2"></property>
        <!--最大傳送延遲,最大重發(fā)時間間隔時,以后每次重連時間間隔都為最大重連時間間隔禾锤。 -->
        <property name="maximumRedeliveryDelay" value="10000"></property>
    </bean>
    <bean id="jmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">
        <property name="connectionFactory">
            <bean class="org.apache.activemq.ActiveMQConnectionFactory">
                <property name="brokerURL"><value>tcp://192.168.28.2:61616?</value></property>
                <property name="redeliveryPolicy" ref="activeMQRedeliveryPolicy" />
            </bean>
        </property>
        <property name="maxConnections" value="100"></property>
    </bean>

    <!--使用緩存可以提升效率-->
    <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
        <property name="targetConnectionFactory" ref="jmsFactory"/>
        <property name="sessionCacheSize" value="100"/>
    </bean>
    
    <!-- 配置JMS模板(Queue)私股,Spring提供的JMS工具類,它發(fā)送恩掷、接收消息倡鲸。 -->
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <property name="connectionFactory" ref="connectionFactory"/>
        <property name="messageConverter">
            <bean class="org.springframework.jms.support.converter.SimpleMessageConverter"/>
        </property>
    </bean>    
    <!-- 定義推送中獎消息隊列(Queue) -->
    <bean id="awardMsgDestinationQueue" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg name="name" value="awardMsgDestinationQueue"/>
    </bean>  
    <!-- 配置監(jiān)聽者(Queue) -->
    <bean id="awardMsgQueueListener" class="com.latech.notify.consumer.AwardMsgQueueListener" />
    <!-- 配置多個消息監(jiān)聽容器,配置連接工廠黄娘,監(jiān)聽的目標是defaultDestinationQueue峭状,監(jiān)聽器是上面定義的監(jiān)聽器 -->
    <bean id="queueListenerContainer1" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="connectionFactory" />
        <property name="destination" ref="awardMsgDestinationQueue" />
        <property name="messageListener" ref="awardMsgQueueListener" />
        <property name="sessionTransacted" value="true"/>
    </bean>
</beans>

敲重點:

  • 要配置重發(fā)機制
  • 監(jiān)聽器要開啟session事務,見配置最后一行

【第二步逼争,代碼里觸發(fā)】

public class AwardMsgQueueListener implements MessageListener{
    
    private Logger logger = LoggerFactory.getLogger(this.getClass());
    
    @Autowired
    private IOrderService orderService;
    
    @Override
    @SuppressWarnings("unchecked")
    public void onMessage(Message message) {
        TextMessage tm = (TextMessage) message;
        Integer sendTimes = 0;
        String msg="",data = "",notifyUrl="",channel="";
        try {
            msg = tm.getText();
        } catch (JMSException e1) {
            logger.error("從消息隊列獲取消息出現(xiàn)異常宁炫,請檢查");
            e1.printStackTrace();
            return;
        }
        logger.info("接收到的消息為:"+msg);
        try{
            Map<String, Map<String, Object>> msgMap = JSONObject.parseObject(msg, new TypeReference<Map<String,Map<String,Object>>>(){});
            notifyUrl = msgMap.keySet().iterator().next();
            data = JSONObject.toJSONString(msgMap.get(notifyUrl));
//          notifyUrl = "http://127.0.0.1:10003/order/receiveMsg.json";
            Map<String, Object> dataMap = JSONObject.parseObject(data,new TypeReference<Map<String,Object>>(){});
            channel = (String)dataMap.get("channel");
            CloseableHttpClient httpClient = HttpClients.createDefault();
            HttpPost httpPost= new HttpPost(notifyUrl);
            httpPost.setHeader("Content-type", "application/json");
            httpPost.setEntity(new StringEntity(data));
            CloseableHttpResponse response = httpClient.execute(httpPost);
            if(200 !=response.getStatusLine().getStatusCode()){
                //請求失敗時,記錄推送次數(shù),狀態(tài)仍為推送中氮凝,不提交事務
                logger.error("推送失敗,statusCode為:"+response.getStatusLine().getStatusCode());
                this.updateOrderAfterFail(data);
                throw new RuntimeException();
            }else{
                HttpEntity responseEntity = response.getEntity();
                if(responseEntity == null || responseEntity.getContent()==null){
                    logger.error("推送商戶中獎信息成功望忆,但返回內容為空罩阵,url:"+notifyUrl);
                    this.updateOrderAfterFail(data);
                    throw new RuntimeException("推送商戶中獎信息成功,但返回內容為空启摄,url:"+notifyUrl);
                }else{
                    StringBuilder entityStringBuilder = new StringBuilder();  
                    BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(responseEntity.getContent(),"UTF-8"), 8 * 1024);  
                    String line = null;  
                    while ((line = bufferedReader.readLine()) != null) {  
                        entityStringBuilder.append(line);  
                    } 
                    String result = entityStringBuilder.toString();
                    Map<String,Object> resultMap = JSONObject.parseObject(result,new TypeReference<Map<String,Object>>(){});
                    if(!resultMap.containsKey("data") || !resultMap.containsKey("security") || !resultMap.containsKey("response")){
                        logger.error("推送商戶中獎信息成功稿壁,但返回內容不合規(guī),響應數(shù)據(jù)中可能不包含“data,security,reponse”中的一個或多個");
                        this.updateOrderAfterFail(data);
                        throw new RuntimeException("推送商戶中獎信息成功歉备,但返回內容不合規(guī)傅是,響應數(shù)據(jù)中可能不包含“data,security,reponse”中的一個或多個");
                    }
                    if("11111".equals((String)resultMap.get("response"))){
                        logger.error("推送商戶中獎信息成功,但返回內容響應碼為11111");
                        this.updateOrderAfterFail(data);
                        throw new RuntimeException("推送商戶中獎信息成功蕾羊,但返回內容響應碼為11111");
                    }
                    List<Map<String,String>> orderList = (List<Map<String,String>>)resultMap.get("data");
                    for(Map<String,String> order : orderList){
                        List<OrderInfo> orderResult = orderService.getByOrderNumAndChannel(order.get("orderNum"),channel);
                        String serialNumber = orderResult.get(0).getSerialNumber();
                        OrderInfoVo existOrder = orderService.queryBySerialNumber(serialNumber);
                        OrderInfo orderInfo = new OrderInfo();
                        orderInfo.setSerialNumber(serialNumber);
                        orderInfo.setSendAwardTimes(existOrder.getSendAwardTimes()+1);
                        orderInfo.setSendAwardFlag(SendAwardFlagEnum.SEND_SUCCESS.getCode());
                        if("111111".equals(order.get("code"))){
                            orderInfo.setSendAwardFlag(SendAwardFlagEnum.SEND_FAIL.getCode());
                        }
                        orderService.updateByOrderInfo(orderInfo);
                    }
                }
            }
        } catch (Exception e){
            logger.error("推送消息異常捕捉:"+e.getMessage());
            e.printStackTrace();
            this.updateOrderAfterFail(data);
            throw new RuntimeException();//拋出此異常喧笔,觸發(fā)重發(fā)機制
        }
    }

敲重點:

在需要重發(fā)時,拋出RuntimeException異常龟再,會自動觸發(fā)重發(fā)機制
由于還涉及到其他可能的異常书闸,所以整體代碼try..catch..最后統(tǒng)一throw

不過呢,現(xiàn)在還存在一個問題利凑,配置里設置了重發(fā)次數(shù)為2浆劲,即包括初次發(fā)嫌术,一共發(fā)三次。但是最后數(shù)據(jù)庫顯示是最后發(fā)了6次的牌借,6次是mq默認的次數(shù)度气,說明那個配置沒生效,暫時還未解決這個問題

另外膨报,消息隊列里的消息這里是同步發(fā)送的磷籍,即按順序一條條的來。前一條發(fā)送失敗并重發(fā)時候丙躏,后一條是等待的择示。也可以配置異步發(fā)送,這里暫不做研究

PS:這里用到的消費者監(jiān)聽器時最簡單基礎的晒旅,還有一種SessionAwareMessageListener栅盲,重發(fā)機制的配置會略有不同,待嘗試

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末废恋,一起剝皮案震驚了整個濱河市谈秫,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌鱼鼓,老刑警劉巖拟烫,帶你破解...
    沈念sama閱讀 219,039評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異迄本,居然都是意外死亡硕淑,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,426評論 3 395
  • 文/潘曉璐 我一進店門嘉赎,熙熙樓的掌柜王于貴愁眉苦臉地迎上來置媳,“玉大人,你說我怎么就攤上這事公条∧茨遥” “怎么了?”我有些...
    開封第一講書人閱讀 165,417評論 0 356
  • 文/不壞的土叔 我叫張陵靶橱,是天一觀的道長寥袭。 經(jīng)常有香客問我,道長关霸,這世上最難降的妖魔是什么传黄? 我笑而不...
    開封第一講書人閱讀 58,868評論 1 295
  • 正文 為了忘掉前任,我火速辦了婚禮谒拴,結果婚禮上尝江,老公的妹妹穿的比我還像新娘。我一直安慰自己英上,他們只是感情好炭序,可當我...
    茶點故事閱讀 67,892評論 6 392
  • 文/花漫 我一把揭開白布啤覆。 她就那樣靜靜地躺著,像睡著了一般惭聂。 火紅的嫁衣襯著肌膚如雪窗声。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,692評論 1 305
  • 那天辜纲,我揣著相機與錄音笨觅,去河邊找鬼。 笑死耕腾,一個胖子當著我的面吹牛见剩,可吹牛的內容都是我干的。 我是一名探鬼主播扫俺,決...
    沈念sama閱讀 40,416評論 3 419
  • 文/蒼蘭香墨 我猛地睜開眼苍苞,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了狼纬?” 一聲冷哼從身側響起羹呵,我...
    開封第一講書人閱讀 39,326評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎疗琉,沒想到半個月后冈欢,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,782評論 1 316
  • 正文 獨居荒郊野嶺守林人離奇死亡盈简,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 37,957評論 3 337
  • 正文 我和宋清朗相戀三年凑耻,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片柠贤。...
    茶點故事閱讀 40,102評論 1 350
  • 序言:一個原本活蹦亂跳的男人離奇死亡拳话,死狀恐怖,靈堂內的尸體忽然破棺而出种吸,到底是詐尸還是另有隱情,我是刑警寧澤呀非,帶...
    沈念sama閱讀 35,790評論 5 346
  • 正文 年R本政府宣布坚俗,位于F島的核電站,受9級特大地震影響岸裙,放射性物質發(fā)生泄漏猖败。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,442評論 3 331
  • 文/蒙蒙 一降允、第九天 我趴在偏房一處隱蔽的房頂上張望恩闻。 院中可真熱鬧,春花似錦剧董、人聲如沸幢尚。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,996評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽尉剩。三九已至真慢,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間理茎,已是汗流浹背黑界。 一陣腳步聲響...
    開封第一講書人閱讀 33,113評論 1 272
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留皂林,地道東北人朗鸠。 一個月前我還...
    沈念sama閱讀 48,332評論 3 373
  • 正文 我出身青樓,卻偏偏與公主長得像础倍,于是被迫代替她去往敵國和親烛占。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 45,044評論 2 355