在使用Message Queue的過程中土居,總會由于種種原因而導(dǎo)致消息失敗。一個(gè)經(jīng)典的場景是一個(gè)生成者向Queue中發(fā)消息嬉探,里面包含了一組郵件地址和郵件內(nèi)容擦耀。而消費(fèi)者從Queue中將消息一條條讀出來,向指定郵件地址發(fā)送郵件涩堤。消費(fèi)者在發(fā)送消息的過程中由于種種原因會導(dǎo)致失敗眷蜓,比如網(wǎng)絡(luò)超時(shí)、當(dāng)前郵件服務(wù)器不可用等胎围。這樣我們就希望建立一種機(jī)制吁系,對于未發(fā)送成功的郵件再重新發(fā)送,也就是重新處理白魂。重新處理超過一定次數(shù)還不成功汽纤,就放棄對該消息的處理,記錄下來碧聪,繼續(xù)對剩余消息進(jìn)行處理冒版。
ActiveMQ為我們實(shí)現(xiàn)了這一功能液茎,叫做ReDelivery(重新投遞)逞姿。當(dāng)消費(fèi)者在處理消息時(shí)有異常發(fā)生辞嗡,會將消息重新放回Queue里,進(jìn)行下一次處理滞造。當(dāng)超過重試次數(shù)時(shí)续室,會給broker發(fā)送一個(gè)"Poison ack",這個(gè)消息被認(rèn)為是a poison pill(毒丸)谒养,這時(shí)broker會將這個(gè)消息發(fā)送到DLQ挺狰。
在以下四種情況中,ActiveMQ消息會被重發(fā)給客戶端/消費(fèi)者:
- 在一個(gè)事務(wù)session中买窟,并且調(diào)用了session.rollback()方法丰泊。
- 在一個(gè)事務(wù)session中,session.commit()之前調(diào)用了commit.close()始绍。
- 在session中使用CLIENT_ACKNOWLEDGE簽收模式瞳购,并且調(diào)用了session.recover()方法。
- 在session中使用AUTO_ACKNOWLEDGE簽收模式亏推,在異步(messageListener)消費(fèi)消息情況下学赛,如果onMessage方法異常且沒有被catch,此消息會被redelivery吞杭。
缺省情況下:持久消息過期盏浇,會被送到DLQ,非持久消息不會送到DLQ(不會redelivery)芽狗。
可以在connectionFactory中注入自定義的redeliveryPolicy來改變?nèi)笔?shù):
<bean id="redeliveryPolicy" class="org.apache.activemq.RedeliveryPolicy">
<!--是否在每次嘗試重新發(fā)送失敗后,增長這個(gè)等待時(shí)間-->
<property name="useExponentialBackOff" value="true"></property>
<!--重發(fā)次數(shù),默認(rèn)為6次-->
<property name="maximumRedeliveries" value="5"></property>
<!--重發(fā)時(shí)間間隔,默認(rèn)為1秒-->
<property name="initialRedeliveryDelay" value="1000"></property>
<!--第一次失敗后重新發(fā)送之前等待500毫秒,第二次失敗再等待500 * 2毫秒,這里的2就是value-->
<property name="backOffMultiplier" value="2"></property>
<!--最大傳送延遲绢掰,只在useExponentialBackOff為true時(shí)有效,當(dāng)重連間隔大于最大重連間隔時(shí)童擎,以后每次重連間隔都為最大重連間隔曼月。-->
<property name="maximumRedeliveryDelay" value="1000"></property>
</bean>
<!-- 在ConnectionFactory中應(yīng)用這個(gè)Policy。 -->
<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://188.166.236.173:61616"/>
<property name="redeliveryPolicy" ref="activeMQRedeliveryPolicy"/>
<!-- <property name="useAsyncSend" value="true"/> 默認(rèn)就是異步發(fā)送-->
</bean>
在ActiveMQ 服務(wù)端的conf/activemq.xmlzhong的broker節(jié)點(diǎn)下添加:
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry queue=">">
<deadLetterStrategy>
<individualDeadLetterStrategy queuePrefix="DLQ." useQueueForQueueMessages="true" />
<!-- 如果不想將過期消息放到DLQ中
<sharedDeadLetterStrategy processExpired="false" />
-->
<!-- 如果想將非持久消息放入DLQ
<sharedDeadLetterStrategy processNonPersistent="true" />
-->
</deadLetterStrategy>
</policyEntry>
<policyEntry topic=">" >
<pendingMessageLimitStrategy>
<constantPendingMessageLimitStrategy limit="1000"/>
</pendingMessageLimitStrategy>
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>
測試會重發(fā)消息(redelivery)的四種方法:
在一個(gè)事務(wù)session中柔昼,并且調(diào)用了session.rollback():
<bean id="jmsQueueContainerForDLQ" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory" />
<property name="destination" ref="queueDestination" />
<property name="messageListener" ref="consumerMessageListenerDLQ" />
<!-- 如果支持事物的話哑芹,在接收消息后rollback會重發(fā)消息,進(jìn)入死信隊(duì)列捕透,默認(rèn)為false -->
<property name="sessionTransacted" value="true" />
</bean>
public class ConsumerMessageListenerDLQ implements SessionAwareMessageListener<TextMessage> {
public void onMessage(TextMessage message, Session session) {
if(message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
try {
String text = textMessage.getText();
System.out.println(String.format("Received: %s",text));
if ("i want to redelivery".equals(text)){
throw new JMSException("process failed to test redelivery and DLQ");
}
} catch (JMSException e) {
System.out.println("there is JMS exception: " + e.getMessage() );
//throw JmsUtils.convertJmsAccessException(e);
try {
session.rollback();
} catch (JMSException e1) {
e1.printStackTrace();
}
}
}
}
}
在一個(gè)事務(wù)session中聪姿,session.commit()之前調(diào)用了commit.close():
<bean id="jmsQueueContainerForDLQ" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory" />
<property name="destination" ref="queueDestination" />
<property name="messageListener" ref="consumerMessageListenerDLQ" />
<!-- 如果支持事物的話,在接收消息后rollback會重發(fā)消息乙嘀,進(jìn)入死信隊(duì)列末购,默認(rèn)為false -->
<property name="sessionTransacted" value="true" />
</bean>
public class ConsumerMessageListenerDLQ implements SessionAwareMessageListener<TextMessage> {
public void onMessage(TextMessage message, Session session) {
if(message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
try {
String text = textMessage.getText();
System.out.println(String.format("Received: %s",text));
if ("i want to redelivery".equals(text)){
throw new JMSException("process failed to test redelivery and DLQ");
}
} catch (JMSException e) {
System.out.println("there is JMS exception: " + e.getMessage() );
//throw JmsUtils.convertJmsAccessException(e);
try {
session.close();
} catch (JMSException e1) {
e1.printStackTrace();
}
}
}
}
}
在session中使用CLIENT_ACKNOWLEDGE簽收模式,并且調(diào)用了session.recover():
<bean id="jmsQueueContainerForDLQ" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory" />
<property name="destination" ref="queueDestination" />
<property name="messageListener" ref="consumerMessageListenerDLQ" />
<!-- 自動應(yīng)答模式消息不會重發(fā)虎谢,進(jìn)入死信隊(duì)列 -->
<property name="sessionAcknowledgeMode" value="2"/>
</bean>
public class ConsumerMessageListenerDLQ implements SessionAwareMessageListener<TextMessage> {
public void onMessage(TextMessage message, Session session) {
if(message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
try {
String text = textMessage.getText();
System.out.println(String.format("Received: %s",text));
if ("i want to redelivery".equals(text)){
throw new JMSException("process failed to test redelivery and DLQ");
}
} catch (JMSException e) {
System.out.println("there is JMS exception: " + e.getMessage() );
//throw JmsUtils.convertJmsAccessException(e);
try {
session.recover();
} catch (JMSException e1) {
e1.printStackTrace();
}
}
}
}
}
在session中使用AUTO_ACKNOWLEDGE簽收模式盟榴,異步Listener的onMessage()異常未被捕捉:
public class Listener implements MessageListener {
public void onMessage(Message message) {
int i = 8/0;//會導(dǎo)致redelivery
try {
if(message instanceof ActiveMQTextMessage){
ActiveMQTextMessage textMessage = (ActiveMQTextMessage) message;
System.out.println("收到的消息:" + textMessage.getText()); }
} catch (Exception e) {
e.printStackTrace();
}
}
}
持久消息過期,會被送到DLQ:
<!-- 定義JmsTemplate的Queue類型 -->
<bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate">
<!-- 這個(gè)connectionFactory對應(yīng)的是我們定義的Spring提供的那個(gè)ConnectionFactory對象 -->
<constructor-arg ref="connectionFactory" />
<property name="messageConverter" ref="messageConverter"></property>
<!-- 非pub/sub模型(發(fā)布/訂閱)婴噩,即隊(duì)列模式 -->
<property name="pubSubDomain" value="false" />
<!-- 發(fā)送模式 DeliveryMode.NON_PERSISTENT=1:非持久 ; DeliveryMode.PERSISTENT=2:持久-->
<property name="deliveryMode" value="2" />
<!-- 2秒后過期擎场,這個(gè)對點(diǎn)對點(diǎn)模式有效 -->
<property name="timeToLive" value="2000" />
</bean>
Junit測試:
@Test
public void testDLQ() throws Exception{
//jpaUserService.findOne("3");
for(int i = 1;i<=10;i++){
queueProducer.sendMessages("我是第"+i+"個(gè)");
}
Thread.sleep(50000);
System.out.print("全部執(zhí)行完畢!!!");
}