-
項目場景:
推送通知給第三方,并得到第三方的feedback徙歼,json交互
-
問題:
推送過程遇到網(wǎng)絡異常碎乃?第三方異常姊扔?數(shù)據(jù)格式錯誤?等等問題怎么辦
【這就是重試機制了荠锭,即什么時候觸發(fā)+觸發(fā)后做什么】
-
希望達成的目的:
1.網(wǎng)絡異常旱眯,代碼異常,對方響應碼為失敗等無關業(yè)務數(shù)據(jù)的推送失敗证九,重試發(fā)送消息删豺,上限為3次
2.數(shù)據(jù)異常,針對消息里數(shù)據(jù)解析后對方給回失敗響應愧怜,不重發(fā)呀页,記錄發(fā)送失敗
-
實現(xiàn)方式:
1,使用activemq拥坛,mq的好處見參見文章activeMQ了解一下(一)蓬蝶。且其有重發(fā)機制尘分,剛好適合本場景(配置mq)
2,每次重發(fā)丸氛,都記錄次數(shù)培愁;視情況記錄發(fā)送結果為失敗/成功(數(shù)據(jù)庫記錄)
搭建mq見文章activeMQ了解一下(二),本文只說如何配置重發(fā)機制
【第一步缓窜,配置xml】
spring-activemq.xml如下
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:mongo="http://www.springframework.org/schema/data/mongo"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.2.xsd
http://www.springframework.org/schema/aop
http://www.springframework.org/schema/aop/spring-aop-3.2.xsd
http://www.springframework.org/schema/tx
http://www.springframework.org/schema/tx/spring-tx-3.2.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-3.2.xsd">
<context:component-scan base-package="com.latech"/>
<!-- 重發(fā)機制 -->
<bean id="activeMQRedeliveryPolicy" class="org.apache.activemq.RedeliveryPolicy">
<!--是否在每次嘗試重新發(fā)送失敗后,增長這個等待時間 -->
<property name="useExponentialBackOff" value="true"></property>
<!--重發(fā)次數(shù),默認為6次 這里設置為2次 -->
<property name="maximumRedeliveries" value="2"></property>
<!--重發(fā)時間間隔,默認為5秒定续,設置為1秒 -->
<property name="initialRedeliveryDelay" value="1000"></property>
<!--第一次失敗后重新發(fā)送之前等待1秒,第二次失敗再等待1 * 2秒,這里的2就是value -->
<property name="backOffMultiplier" value="2"></property>
<!--最大傳送延遲,最大重發(fā)時間間隔時,以后每次重連時間間隔都為最大重連時間間隔禾锤。 -->
<property name="maximumRedeliveryDelay" value="10000"></property>
</bean>
<bean id="jmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">
<property name="connectionFactory">
<bean class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL"><value>tcp://192.168.28.2:61616?</value></property>
<property name="redeliveryPolicy" ref="activeMQRedeliveryPolicy" />
</bean>
</property>
<property name="maxConnections" value="100"></property>
</bean>
<!--使用緩存可以提升效率-->
<bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
<property name="targetConnectionFactory" ref="jmsFactory"/>
<property name="sessionCacheSize" value="100"/>
</bean>
<!-- 配置JMS模板(Queue)私股,Spring提供的JMS工具類,它發(fā)送恩掷、接收消息倡鲸。 -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="messageConverter">
<bean class="org.springframework.jms.support.converter.SimpleMessageConverter"/>
</property>
</bean>
<!-- 定義推送中獎消息隊列(Queue) -->
<bean id="awardMsgDestinationQueue" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg name="name" value="awardMsgDestinationQueue"/>
</bean>
<!-- 配置監(jiān)聽者(Queue) -->
<bean id="awardMsgQueueListener" class="com.latech.notify.consumer.AwardMsgQueueListener" />
<!-- 配置多個消息監(jiān)聽容器,配置連接工廠黄娘,監(jiān)聽的目標是defaultDestinationQueue峭状,監(jiān)聽器是上面定義的監(jiān)聽器 -->
<bean id="queueListenerContainer1" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory" />
<property name="destination" ref="awardMsgDestinationQueue" />
<property name="messageListener" ref="awardMsgQueueListener" />
<property name="sessionTransacted" value="true"/>
</bean>
</beans>
敲重點:
- 要配置重發(fā)機制
- 監(jiān)聽器要開啟session事務,見配置最后一行
【第二步逼争,代碼里觸發(fā)】
public class AwardMsgQueueListener implements MessageListener{
private Logger logger = LoggerFactory.getLogger(this.getClass());
@Autowired
private IOrderService orderService;
@Override
@SuppressWarnings("unchecked")
public void onMessage(Message message) {
TextMessage tm = (TextMessage) message;
Integer sendTimes = 0;
String msg="",data = "",notifyUrl="",channel="";
try {
msg = tm.getText();
} catch (JMSException e1) {
logger.error("從消息隊列獲取消息出現(xiàn)異常宁炫,請檢查");
e1.printStackTrace();
return;
}
logger.info("接收到的消息為:"+msg);
try{
Map<String, Map<String, Object>> msgMap = JSONObject.parseObject(msg, new TypeReference<Map<String,Map<String,Object>>>(){});
notifyUrl = msgMap.keySet().iterator().next();
data = JSONObject.toJSONString(msgMap.get(notifyUrl));
// notifyUrl = "http://127.0.0.1:10003/order/receiveMsg.json";
Map<String, Object> dataMap = JSONObject.parseObject(data,new TypeReference<Map<String,Object>>(){});
channel = (String)dataMap.get("channel");
CloseableHttpClient httpClient = HttpClients.createDefault();
HttpPost httpPost= new HttpPost(notifyUrl);
httpPost.setHeader("Content-type", "application/json");
httpPost.setEntity(new StringEntity(data));
CloseableHttpResponse response = httpClient.execute(httpPost);
if(200 !=response.getStatusLine().getStatusCode()){
//請求失敗時,記錄推送次數(shù),狀態(tài)仍為推送中氮凝,不提交事務
logger.error("推送失敗,statusCode為:"+response.getStatusLine().getStatusCode());
this.updateOrderAfterFail(data);
throw new RuntimeException();
}else{
HttpEntity responseEntity = response.getEntity();
if(responseEntity == null || responseEntity.getContent()==null){
logger.error("推送商戶中獎信息成功望忆,但返回內容為空罩阵,url:"+notifyUrl);
this.updateOrderAfterFail(data);
throw new RuntimeException("推送商戶中獎信息成功,但返回內容為空启摄,url:"+notifyUrl);
}else{
StringBuilder entityStringBuilder = new StringBuilder();
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(responseEntity.getContent(),"UTF-8"), 8 * 1024);
String line = null;
while ((line = bufferedReader.readLine()) != null) {
entityStringBuilder.append(line);
}
String result = entityStringBuilder.toString();
Map<String,Object> resultMap = JSONObject.parseObject(result,new TypeReference<Map<String,Object>>(){});
if(!resultMap.containsKey("data") || !resultMap.containsKey("security") || !resultMap.containsKey("response")){
logger.error("推送商戶中獎信息成功稿壁,但返回內容不合規(guī),響應數(shù)據(jù)中可能不包含“data,security,reponse”中的一個或多個");
this.updateOrderAfterFail(data);
throw new RuntimeException("推送商戶中獎信息成功歉备,但返回內容不合規(guī)傅是,響應數(shù)據(jù)中可能不包含“data,security,reponse”中的一個或多個");
}
if("11111".equals((String)resultMap.get("response"))){
logger.error("推送商戶中獎信息成功,但返回內容響應碼為11111");
this.updateOrderAfterFail(data);
throw new RuntimeException("推送商戶中獎信息成功蕾羊,但返回內容響應碼為11111");
}
List<Map<String,String>> orderList = (List<Map<String,String>>)resultMap.get("data");
for(Map<String,String> order : orderList){
List<OrderInfo> orderResult = orderService.getByOrderNumAndChannel(order.get("orderNum"),channel);
String serialNumber = orderResult.get(0).getSerialNumber();
OrderInfoVo existOrder = orderService.queryBySerialNumber(serialNumber);
OrderInfo orderInfo = new OrderInfo();
orderInfo.setSerialNumber(serialNumber);
orderInfo.setSendAwardTimes(existOrder.getSendAwardTimes()+1);
orderInfo.setSendAwardFlag(SendAwardFlagEnum.SEND_SUCCESS.getCode());
if("111111".equals(order.get("code"))){
orderInfo.setSendAwardFlag(SendAwardFlagEnum.SEND_FAIL.getCode());
}
orderService.updateByOrderInfo(orderInfo);
}
}
}
} catch (Exception e){
logger.error("推送消息異常捕捉:"+e.getMessage());
e.printStackTrace();
this.updateOrderAfterFail(data);
throw new RuntimeException();//拋出此異常喧笔,觸發(fā)重發(fā)機制
}
}
敲重點:
在需要重發(fā)時,拋出RuntimeException異常龟再,會自動觸發(fā)重發(fā)機制
由于還涉及到其他可能的異常书闸,所以整體代碼try..catch..最后統(tǒng)一throw
不過呢,現(xiàn)在還存在一個問題利凑,配置里設置了重發(fā)次數(shù)為2浆劲,即包括初次發(fā)嫌术,一共發(fā)三次。但是最后數(shù)據(jù)庫顯示是最后發(fā)了6次的牌借,6次是mq默認的次數(shù)度气,說明那個配置沒生效,暫時還未解決這個問題
另外膨报,消息隊列里的消息這里是同步發(fā)送的磷籍,即按順序一條條的來。前一條發(fā)送失敗并重發(fā)時候丙躏,后一條是等待的择示。也可以配置異步發(fā)送,這里暫不做研究
PS:這里用到的消費者監(jiān)聽器時最簡單基礎的晒旅,還有一種SessionAwareMessageListener栅盲,重發(fā)機制的配置會略有不同,待嘗試