探究SpringJMS+ActiveMQ消息阻塞之謎

0x00 背景介紹

最近遇到了一個(gè)消息隊(duì)列阻塞的問題礁阁,查了好幾天勾习,才終于把這個(gè)問題解決。
首先交代下我們的環(huán)境:支付業(yè)務(wù)矾瘾,使用ActiveMQ配合Spring-jms收發(fā)消息女轿,Spring版本3.2.16.RELEASE,ActiveMQ客戶端版本5.11.1霜威,ActiveMQ服務(wù)端版本5.13.2谈喳。消息隊(duì)列消費(fèi)者使用Spring-jms的DefaultMessageListenerContainer作為消息監(jiān)聽容器,用org.springframework.jms.connection.CachingConnectionFactory對(duì)org.apache.activemq.ActiveMQConnectionFactory進(jìn)行了封裝戈泼,提供緩存功能婿禽。DefaultMessageListenerContainer的并發(fā)配置為1-10赏僧。
在較長的時(shí)間里,系統(tǒng)運(yùn)行總體上較為穩(wěn)定扭倾,但是期間發(fā)生過多次消息阻塞淀零。之前查到過由于端口占用問題導(dǎo)致消息阻塞(https://issues.apache.org/jira/browse/AMQ-6362),只能通過重啟解決膛壹,后來通過修改ActiveMQ client源碼請(qǐng)求超時(shí)時(shí)間解決了這一問題(此問題在ActiveMQ 5.14.5得到修復(fù))驾中。但是還有一種偶發(fā)的消息阻塞,時(shí)不常就會(huì)來那么一次模聋,讓人防不勝防肩民。因?yàn)檫@個(gè)問題頻發(fā),之前還用Python寫了一套監(jiān)控MQ隊(duì)列消息阻塞情況的代碼链方,通過微信企業(yè)號(hào)接口發(fā)送持痰。所以能大概知道每次阻塞的持續(xù)時(shí)間。這種阻塞發(fā)生概率較低祟蚀,而且每次可能阻塞的時(shí)間不定工窍,但大部分在30~40分鐘左右。
另外前酿,我使用了兩個(gè)DefaultMessageListenerContainer實(shí)例患雏,分別對(duì)應(yīng)兩個(gè)隊(duì)列消費(fèi)者。除業(yè)務(wù)處理監(jiān)聽外罢维,其他配置都一樣淹仑,但是每次阻塞都發(fā)生在其中一個(gè)固定隊(duì)列。這種情況一度讓我很困惑言津。直到最近再進(jìn)行代碼review攻人,跟蹤spring和activemq的源代碼時(shí),才發(fā)現(xiàn)一點(diǎn)頭緒悬槽。

0x01 相關(guān)概念簡介

先簡單介紹下幾個(gè)涉及的概念:
DefaultMessageListenerContainer (后文簡稱DMLC):SpringJMS中用于異步消息監(jiān)聽的管理類怀吻。主要原理簡單介紹下。首先通過內(nèi)部初始化建立一個(gè)taskExecutor(默認(rèn)實(shí)現(xiàn)是SimpleAsyncTaskExecutor)初婆,用于執(zhí)行消息監(jiān)聽任務(wù)蓬坡,任務(wù)通過DMLC內(nèi)部類AsyncMessageListenerInvoker表示。默認(rèn)情況下磅叛,SimpleAsyncTaskExecutor每次都會(huì)開啟一個(gè)新線程屑咳。AsyncMessageListenerInvoker是實(shí)現(xiàn)了Runnable接口的任務(wù)抽象,在其executeOngoingLoop方法中不斷調(diào)用MessageConsumer#receive(this.receiveTimeout)收取消息(見DMLC父類AbstractPollingMessageListenerContainer代碼)弊琴,這里默認(rèn)的receiveTimeout是1秒兆龙。在接收到消息后,將會(huì)根據(jù)選定DMLC屬性maxConcurrentConsumers(jms:listener-container標(biāo)簽中concurrency的最大值)和當(dāng)前空閑消費(fèi)線程數(shù)量敲董,對(duì)消費(fèi)線程數(shù)量紫皇,也就是同時(shí)運(yùn)行的AsyncMessageListenerInvoker進(jìn)行擴(kuò)容慰安,即由taskExecutor新建線程(通過DefaultMessageListenerContainer調(diào)用持有的ConnectionFactory方法創(chuàng)建session和consumer的方法獲取MessageConsumer對(duì)象)接收消息。每個(gè)AsyncMessageListenerInvoker一次最大執(zhí)行的任務(wù)數(shù)量聪铺,即MessageConsumer#receive(this.receiveTimeout)方法調(diào)用次數(shù)可以通過DMLC的maxMessagesPerTask屬性制定化焕,默認(rèn)為Integer.MIN_VALUE,當(dāng)指定為大于0的值時(shí)铃剔,在執(zhí)行完maxMessagesPerTask次消息收取后撒桨,將會(huì)銷毀當(dāng)前線程。
SingleConnectionFactory(后文簡稱SCF):一個(gè)JMS ConnectionFactory的適配器键兜,通過JDK Proxy代理Connection(參見SingleConnectionFactory$SharedConnectionInvocationHandler)凤类,總是獲取同一個(gè)Connection,且忽略close蝶押。與消息監(jiān)聽容器配合使用的作用是踱蠢,在多個(gè)容器間共享一個(gè)JMS Connection。
CachingConnectionFactory (后文簡稱CCF):在SingleConnectionFactory的基礎(chǔ)上棋电,添加了Session稀轨、MessageConsumer驾霜、MessageProducer的緩存功能,與SingleConnectionFactory類似寸癌,使用JDK Proxy代理了Session榆浓,具體可以查看CachingConnectionFactory$CachedSessionInvocationHandler這個(gè)內(nèi)部類于未,它包裝了原生的Session,主要提供MessageConsumer和MessageProducer的緩存功能陡鹃。緩存session數(shù)量由參數(shù)sessionCacheSize決定烘浦,默認(rèn)為1。

0x02 探究MQ阻塞之謎

通過閱讀源碼和javadoc了解到萍鲸,在DMLC的注釋文檔中闷叉,寫明了DefaultMessageListenerContainer不能與CachingConnectionFactory配合進(jìn)行動(dòng)態(tài)擴(kuò)容。這是因?yàn)镃achingConnectionFactory更傾向于讓監(jiān)聽容器自己處理緩存脊阴。在同時(shí)停止和重啟一個(gè)監(jiān)聽容器時(shí)握侧,只會(huì)在獨(dú)立的且被容器本身緩存的連接上起作用,而不會(huì)在外部緩存的連接上起作用嘿期。

Note: Don't use Spring's org.springframework.jms.connection.CachingConnectionFactory in combination with dynamic scaling. Ideally, don't use it with a message listener container at all, since it is generally preferable to let the listener container itself handle appropriate caching within its lifecycle. Also, stopping and restarting a listener container will only work with an independent, locally cached Connection - not with an externally cached one.

StackOverflow中Spring-jms的開發(fā)主管也說明了為什么DMLC不能使用CCF品擎,簡而言之,CCF由于緩存了Session和MessageConsumer备徐,在DMLC對(duì)消費(fèi)線程進(jìn)行縮容時(shí)萄传,雖然消費(fèi)線程銷毀了,但是消費(fèi)者在蜜猾,Broker還會(huì)一直給MessageConsumer發(fā)消息秀菱。
其實(shí)查到這西设,就完全夠用了,把CCF換掉答朋,程序應(yīng)該就可以跑了贷揽。但那顯然不符合咱們對(duì)技術(shù)刨根問題的風(fēng)格,于是咱們還是探究一下MQ為什么會(huì)阻塞的實(shí)際原因∶瓮耄現(xiàn)在雖然是知道DMLC不能和CCF一起使用禽绪,但因?yàn)槲覀兊腄MLC并未指定maxMessagesPerTask參數(shù),并不存在縮容的情況洪规,所以實(shí)際原因還需要深入調(diào)查印屁。
可能有小伙伴還不太了解ActiveMQ客戶端的消費(fèi)邏輯,這里我們插播下ActiveMQ消費(fèi)邏輯斩例。

0x02.1 ActiveMQ消費(fèi)邏輯

ActiveMQ在消費(fèi)消息時(shí)雄人,并非直接從Broker進(jìn)行網(wǎng)絡(luò)通信,取消息進(jìn)行消費(fèi)念赶。 我們通過FailoverTransport+TcpTransport舉例础钠,也就是failover:(tcp:...)形式的brokerUrl。

activemq-client的消費(fèi)過程大致可以總結(jié)為叉谜,一個(gè)線程和broker進(jìn)行通信旗吁,取得消息,放到一個(gè)緩存中停局,而另一個(gè)線程則不斷地嘗試從這個(gè)緩存中獲取消息很钓。
為了說明ActiveMQ的消費(fèi)邏輯,我從一個(gè)完整的線程堆棧中董栽,拉出了以下兩條線程堆棧码倦。

負(fù)責(zé)和broker通信(tcp長連接)取消息的線程:

"ActiveMQ Transport: tcp://jms.xxx/192.168.30.xx:61618@54095" prio=10 tid=0x00007f2bf0001000 nid=0x5f94 runnable [0x00007f2e470ef000]
   java.lang.Thread.State: RUNNABLE
    at java.net.SocketInputStream.socketRead0(Native Method)
    at java.net.SocketInputStream.read(SocketInputStream.java:152)
    at java.net.SocketInputStream.read(SocketInputStream.java:122)
    at org.apache.activemq.transport.tcp.TcpBufferedInputStream.fill(TcpBufferedInputStream.java:50)
    at org.apache.activemq.transport.tcp.TcpTransport$2.fill(TcpTransport.java:609)
    at org.apache.activemq.transport.tcp.TcpBufferedInputStream.read(TcpBufferedInputStream.java:58)
    at org.apache.activemq.transport.tcp.TcpTransport$2.read(TcpTransport.java:594)
    at java.io.DataInputStream.readInt(DataInputStream.java:387)
    at org.apache.activemq.openwire.OpenWireFormat.unmarshal(OpenWireFormat.java:258)
    at org.apache.activemq.transport.tcp.TcpTransport.readCommand(TcpTransport.java:221)
    at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:213)
    at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:196)
    at java.lang.Thread.run(Thread.java:745)

使用spring-jms DMLC消費(fèi)消息的線程:

"org.springframework.jms.listener.DefaultMessageListenerContainer#0-4" prio=10 tid=0x00007f2b4c076800 nid=0xb442 in Object.wait() [0x00007f2e3f3f2000]
   java.lang.Thread.State: TIMED_WAITING (on object monitor)
    at java.lang.Object.wait(Native Method)
    - waiting on <0x00000007ff81a8e8> (a java.lang.Object)
    at org.apache.activemq.**FifoMessageDispatchChannel**.dequeue(FifoMessageDispatchChannel.java:74)
    - locked <0x00000007ff81a8e8> (a java.lang.Object)
    at org.apache.activemq.ActiveMQMessageConsumer.dequeue(ActiveMQMessageConsumer.java:482)
    at org.apache.activemq.ActiveMQMessageConsumer.receive(ActiveMQMessageConsumer.java:635)
    at org.springframework.jms.connection.CachedMessageConsumer.receive(CachedMessageConsumer.java:74)
    at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveMessage(AbstractPollingMessageListenerContainer.java:430)
    at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:310)
    at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:243)
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1103)
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1095)
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:992)
    at java.lang.Thread.run(Thread.java:745)

這里,這個(gè)org.apache.activemq.FifoMessageDispatchChannel(由ActiveMQMessageConsumer持有锭碳,屬性名稱unconsumedMessages)就是用于緩存消息的類袁稽,其內(nèi)部持有一個(gè)LinkedList<MessageDispatch>,這個(gè)MessageDispatch就是從broker分發(fā)過來還未消費(fèi)的消息封裝工禾。

0x02.2 基于源碼运提,大膽猜測(cè)

拉回整體,繼續(xù)探究MQ阻塞之謎闻葵。
既然MQ的消費(fèi)分兩步民泵,那應(yīng)該就是在MessageConsumer收取到消息到本地后,消費(fèi)線程沒有及時(shí)調(diào)用MessageConsumer#receive()方法槽畔,導(dǎo)致消息一直在本地積壓栈妆。但是broker不知道啊,只要這個(gè)MessageConsumer還在,它就會(huì)一直給它發(fā)消息鳞尔。除非它不在了嬉橙,才會(huì)把已經(jīng)分發(fā)但未消費(fèi)的消息分配給其他的MessageConsumer。
我按照線上環(huán)境寥假,在本機(jī)使用單元測(cè)試進(jìn)行模擬市框。使用反射獲取ActiveMQMessageConsumer的消息緩存unconsumedMessages,每5秒打印一次所有阻塞的消息(包括consumerId, sessionId)糕韧。并在log4j2配置中枫振,將org.springframework.jms和org.apache.activemq的打印級(jí)別調(diào)整為DEBUG。
果然發(fā)生了阻塞萤彩,通過在所有日志中查找阻塞消息的consumerId粪滤,(樣例:session: ActiveMQSession {id=ID:mypc-58436-1505458977674-3:1:2,started=true}, consumer:ActiveMQMessageConsumer { value=ID:mypc-58436-1505458977674-3:1:2:1, started=true },)可以發(fā)現(xiàn),在15:03:06到15:05:04之間雀扶,沒有任何線程嘗試從該阻塞消息對(duì)應(yīng)的session杖小。

image.png

修改DMLC的并發(fā)配置(listener-container的concurrency),從1-10變?yōu)楣潭ㄖ?0愚墓,復(fù)現(xiàn)予权,說明和DMLC的動(dòng)態(tài)改變消費(fèi)線程無關(guān)。
問題查到這里转绷,能斷定的是伟件,本文MQ的阻塞肯定和消費(fèi)線程沒有調(diào)用MessageConsumer#receive方法有關(guān)。但究竟為什么议经,是消費(fèi)線程獲取不到Session,還是獲取不到MessageConsumer谴返,還是獲取到了MessageConsumer不能調(diào)用receive方法煞肾?還不得而知。

順帶說一下ActiveMQSession的toString()格式:

"ActiveMQSession {id=" + connectionId + ":" + 全局唯一遞增序列號(hào) + ",started=" + started.get() + "}" // 參見 ActiveMQSession,ActiveMQConnection,LongSequenceGenerator,SessionId代碼

對(duì)于zw_123-41988-1505188712169-1:1:13613嗓袱,zw_123-41988-1505188712169-1:1是connectionId籍救,而13613是指自JVM啟動(dòng)起來創(chuàng)建的ActiveMQSession序列號(hào)。

看了半天DMLC了渠抹,既然沒有什么線索了蝙昙,就暫時(shí)看下其他的。
CachingConnectionFactory中用于緩存Session的cachedSessions屬性是一個(gè)Map<Integer, LinkedList<Session>>梧却,根據(jù)session類型(0-SESSION_TRANSACTED奇颠,1-AUTO_ACKNOWLEDGE,2-CLIENT_ACKNOWLEDGE放航,3-DUPS_OK_ACKNOWLEDGE)作為key獲取一個(gè)Session List烈拒。這個(gè)sessionList的引用在創(chuàng)建Session代理(CachingConnectionFactory.CachedSessionInvocationHandler) 時(shí)被傳遞進(jìn)去,并作為實(shí)例變量sessionList引用,默認(rèn)情況下荆几,這個(gè)sessionList的size為1吓妆。而在Session代理實(shí)現(xiàn)CachingConnectionFactory.CachedSessionInvocationHandler中的cachedConsumers和cachedProducers負(fù)責(zé)緩存消費(fèi)者和生產(chǎn)者。

以下為CachingConnectionFactory中獲取Session和返還Session涉及Session緩存(sessionList)的處理代碼吨铸。

// CachingConnectionFactory.java
/**
* Checks for a cached Session for the given mode.
*/
protected Session getSession(Connection con, Integer mode) throws JMSException {
LinkedList<Session> sessionList;
synchronized (this.cachedSessions) {
  sessionList = this.cachedSessions.get(mode); // 獲取緩存session列表
  if (sessionList == null) {
  sessionList = new LinkedList<Session>();
  this.cachedSessions.put(mode, sessionList);
  }
}
Session session = null;
synchronized (sessionList) {
  if (!sessionList.isEmpty()) {
  session = sessionList.removeFirst(); // 如果不為空行拢,就取第一個(gè)session。默認(rèn)情況下诞吱,sessionCacheSize數(shù)量為1舟奠,在cachedSessions沒有緩存session時(shí),會(huì)創(chuàng)建session
  }
}
if (session != null) { // 從緩存中取得session
  if (logger.isTraceEnabled()) {
  logger.trace("Found cached JMS Session for mode " + mode + ": " +
    (session instanceof SessionProxy ? ((SessionProxy) session).getTargetSession() : session));
  }
}
else { // 如果沒有緩存session狐胎,就創(chuàng)建session
  Session targetSession = createSession(con, mode); // con就是CachingConnectionFactory中的target
  if (logger.isDebugEnabled()) {
  logger.debug("Creating cached JMS Session for mode " + mode + ": " + targetSession);
  }
  session = getCachedSessionProxy(targetSession, sessionList); // 使用jdk方式代理session
}
return session;
}

// CachingConnectionFactory內(nèi)部類 CachedSessionInvocationHandler鸭栖,實(shí)現(xiàn)了InvocationHandler接口,負(fù)責(zé)Session代理
private class CachedSessionInvocationHandler implements InvocationHandler {
// ...
private void logicalClose(Session proxy) throws JMSException { // 邏輯關(guān)閉握巢,沒有實(shí)際關(guān)閉session晕鹊,而是放到sessionList緩存
// Preserve rollback-on-close semantics.
if (this.transactionOpen && this.target.getTransacted()) {
  this.transactionOpen = false;
  this.target.rollback();
}
// Physically close durable subscribers at time of Session close call.
for (Iterator<Map.Entry<ConsumerCacheKey, MessageConsumer>> it = this.cachedConsumers.entrySet().iterator(); it.hasNext();) {
  Map.Entry<ConsumerCacheKey, MessageConsumer> entry = it.next();
if (entry.getKey().subscription != null) {
  entry.getValue().close();
  it.remove();
}
}
// Allow for multiple close calls...
boolean returned = false;
synchronized (this.sessionList) {
  if (!this.sessionList.contains(proxy)) {
  this.sessionList.addLast(proxy);
  returned = true;
  }
}
if (returned && logger.isTraceEnabled()) {
  logger.trace("Returned cached Session: " + this.target);
}
}

// ...
}

DMLC每次想要獲得消息,就先得獲取Session暴浦。在使用CCF時(shí)溅话,CCF會(huì)首先判斷Session緩存里有沒有可用的Session,如果有就取出直接使用歌焦,如果沒有就創(chuàng)建一個(gè)新的Session飞几,并在Session使用完畢后嘗試將Session放到Session緩存中,如果Session緩存已經(jīng)滿了独撇,就執(zhí)行Session的物理關(guān)閉屑墨。

0x02.3 通過Eclipse MAT查看對(duì)象信息

基本邏輯差不多都了解了,那么我們查看下在阻塞發(fā)生時(shí)纷铣,Java堆中阻塞消息相關(guān)的MessageConsumer卵史、Session信息。(啥搜立?你還打了消息阻塞時(shí)候的堆信息以躯?--對(duì)呀~)
使用Eclipse MAT打開Java堆信息,使用OQL查詢消息阻塞的相關(guān)Session和Consumer:

SELECT c, c.session, c.unconsumedMessages.list.size FROM org.apache.activemq.ActiveMQMessageConsumer c WHERE (c.unconsumedMessages.list.size > 0)

查詢到ActiveMQMessageConsumer和ActiveMQSession后啄踊,進(jìn)行查看忧设。在查看相關(guān)的ActiveMQSession時(shí),發(fā)現(xiàn)了問題颠通。

image.png

如圖中所示址晕,消息阻塞的MessageConsumer相關(guān)的Session有兩個(gè)MessageConsumer。項(xiàng)目中使用兩個(gè)DMLC分別監(jiān)聽兩個(gè)隊(duì)列蒜哀,使用同一個(gè)CCF封裝ActiveMQConnectionFactory獲取連接和Session斩箫。這兩個(gè)消費(fèi)者很顯然就是由兩個(gè)DMLC調(diào)用同一個(gè)Session創(chuàng)建的吏砂。那么,是不是消費(fèi)線程雖然獲取了Session乘客,但一直在處理另一個(gè)MessageConsumer狐血,而忽略了被阻塞消息所在的MessageConsumer呢?易核?匈织?

0x02.4 鎖定問題-還是日志大法好

有了這個(gè)設(shè)想,就有了一點(diǎn)方向牡直,希望能從日志中找到些蛛絲馬跡缀匕。
正巧近期又有次阻塞,直接從線上查日志碰逸。功夫不負(fù)有心人乡小,通過查找消息阻塞相關(guān)的session {id=ID:zw_123-41988-1505188712169-1:1:13613,started=true},從下面這段看似平淡無奇的日志里饵史,終于看到了一絲曙光满钟。

1203078 2017-09-18 06:09:53,996|DEBUG|org.springframework.jms.listener.DefaultMessageListenerContainer#0-3|Creating new transaction with name [org.springframework.jms.listener.DefaultMessageListenerContainer#0]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT
1203079 2017-09-18 06:09:53,996|DEBUG|org.springframework.jms.listener.DefaultMessageListenerContainer#0-3|Creating cached JMS Session for mode 0: ActiveMQSession {id=ID:zw_123-41988-1505188712169-1:1:13613,started=true}
1203080 2017-09-18 06:09:53,996|DEBUG|org.springframework.jms.listener.DefaultMessageListenerContainer#0-3|Created JMS transaction on Session [Cached JMS Session: ActiveMQSession {id=ID:zw_123-41988-1505188712169-1:1:13613,started=true}] from Connection [Shared JMS Connection: ActiveMQConnection {id=ID:zw_123-41988-1505188712169-1:1,clientId=ID:zw_123-41988-1505188712169-0:1,started=true}]
1203081 2017-09-18 06:09:53,998|DEBUG|org.springframework.jms.listener.DefaultMessageListenerContainer#0-3|Creating cached JMS MessageConsumer for destination [queue://ConsumptionOrder.Has.Paied]: ActiveMQMessageConsumer { value=ID:zw_123-41988-1505188712169-1:1:13613:1, started=true }
... 省略...
1292825 2017-09-18 06:37:29,672|DEBUG|org.springframework.jms.listener.DefaultMessageListenerContainer#0-3|Initiating transaction commit
1292826 2017-09-18 06:37:29,672|DEBUG|org.springframework.jms.listener.DefaultMessageListenerContainer#0-3|Committing JMS transaction on Session [Cached JMS Session: ActiveMQSession {id=ID:zw_123-41988-1505188712169-1:1:13613,started=true}]
1292827 2017-09-18 06:37:29,672|DEBUG|org.springframework.jms.listener.DefaultMessageListenerContainer#0-3|ID:zw_123-41988-1505188712169-1:1:13613 Transaction Commit :null
1292828 2017-09-18 06:37:29,672|TRACE|org.springframework.jms.listener.DefaultMessageListenerContainer#0-3|Returned cached Session: ActiveMQSession {id=ID:zw_123-41988-1505188712169-1:1:13613,started=true}
1292829 2017-09-18 06:37:29,672|DEBUG|org.springframework.jms.listener.DefaultMessageListenerContainer#1-1|Creating new transaction with name [org.springframework.jms.listener.DefaultMessageListenerContainer#1]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT
1292830 2017-09-18 06:37:29,672|DEBUG|org.springframework.jms.listener.DefaultMessageListenerContainer#0-3|Creating new transaction with name [org.springframework.jms.listener.DefaultMessageListenerContainer#0]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT
1292831 2017-09-18 06:37:29,672|TRACE|org.springframework.jms.listener.DefaultMessageListenerContainer#1-1|**Found cached JMS Session for mode 0: ActiveMQSession** {id=ID:zw_123-41988-1505188712169-1:1:13613,started=true}
1292832 2017-09-18 06:37:29,672|DEBUG|org.springframework.jms.listener.DefaultMessageListenerContainer#1-1|Created JMS transaction on Session [Cached JMS Session: ActiveMQSession {id=ID:zw_123-41988-1505188712169-1:1:13613,started=true}] from Connection [Shared JMS Connection: ActiveMQConnection {id=ID:zw_123-41988-1505188712169-1:1,clientId=ID:zw_123-41988-1505188712169-0:1,started=true}]
1292833 2017-09-18 06:37:29,673|DEBUG|org.springframework.jms.listener.DefaultMessageListenerContainer#0-3|Creating cached JMS Session for mode 0: ActiveMQSession {id=ID:zw_123-41988-1505188712169-1:1:13629,started=true}
1292834 2017-09-18 06:37:29,673|DEBUG|org.springframework.jms.listener.DefaultMessageListenerContainer#0-3|Created JMS transaction on Session [Cached JMS Session: ActiveMQSession {id=ID:zw_123-41988-1505188712169-1:1:13629,started=true}] from Connection [Shared JMS Connection: ActiveMQConnection {id=ID:zw_123-41988-1505188712169-1:1,clientId=ID:zw_123-41988-1505188712169-0:1,started=true}]
1292835 2017-09-18 06:37:29,674|DEBUG|org.springframework.jms.listener.DefaultMessageListenerContainer#1-1|Creating cached JMS MessageConsumer for destination [**queue://queue.B**]: ActiveMQMessageConsumer { value=ID:zw_123-41988-1505188712169-1:1:13613:2, started=true }
1292836 2017-09-18 06:37:29,675|DEBUG|org.springframework.jms.listener.DefaultMessageListenerContainer#0-3|Creating cached JMS MessageConsumer for destination [**queue://queue.A**]: ActiveMQMessageConsumer { value=ID:zw_123-41988-1505188712169-1:1:13629:1, started=true }

對(duì)看似平淡無奇的日志的分析:
DefaultMessageListenerContainer#0 這個(gè)DMLC實(shí)例負(fù)責(zé)處理隊(duì)列A,DefaultMessageListenerContainer#1這個(gè)DMLC實(shí)例負(fù)責(zé)處理隊(duì)列B胳喷。每個(gè)DMLC實(shí)例在處理消息時(shí)湃番,會(huì)根據(jù)配置可伸縮地使用1或多條線程去處理任務(wù)(伸縮邏輯見DMLC,處理消息邏輯見其內(nèi)部類AsyncMessageListenerInvoker)吭露。此時(shí)有兩個(gè)線程#0-3負(fù)責(zé)處理隊(duì)列A吠撮,#1-1負(fù)責(zé)處理隊(duì)列B。但是由于使用了CachingConnectionFactory(CachingConnectionFactory實(shí)例只有一個(gè)讲竿,其實(shí)例變量cachedSessions負(fù)責(zé)保存創(chuàng)建的session泥兰,默認(rèn)大小為1),#0-3線程在結(jié)束本次Session使用后题禀,將Session放回了緩存中(1292828行)逾条。此時(shí)恰逢#1-1線程請(qǐng)求Session,一看緩存中有可用Session投剥,便取出使用(1292831行)。這個(gè)Session并非原生Session担孔,而是CachingConnectionFactory通過JDK Proxy進(jìn)行代理創(chuàng)建的Session代理(邏輯見內(nèi)部類CachedSessionInvocationHandler)江锨,在Session代理中,對(duì)MessageConsumer進(jìn)行了緩存糕篇。但是由于#1-1線程要處理的是隊(duì)列B啄育,不能使用#0-3之前創(chuàng)建的處理隊(duì)列A的MessageConsumer,所以就創(chuàng)建了一個(gè)新的MessageConsumer負(fù)責(zé)處理隊(duì)列B(1292835行)拌消。
此時(shí)挑豌,線程#1-1將調(diào)用新創(chuàng)建的MessageConsumer的receive方法接收消息。在spring-jms中DMLC中規(guī)定了receive的默認(rèn)超時(shí)時(shí)間為1秒(見DMLC父類AbstractPollingMessageListenerContainer.DEFAULT_RECEIVE_TIMEOUT),如果在1秒內(nèi)沒有接收到消息氓英,將返回Session到緩存侯勉,并繼續(xù)下一個(gè)循環(huán)。
注意铝阐,在接收消息的這一次循環(huán)中址貌,相關(guān)的這個(gè)Session已經(jīng)被該線程霸占,其他線程不能獲取該Session徘键,也就是說练对,該Session的另一個(gè)MessageConsumer中接收到的隊(duì)列A的消息將得不到處理!
然而正常情況下吹害,這種影響較小螟凭。在以下情況下才有可能出現(xiàn)問題:

  1. 在一次消息接收循環(huán)的末尾,線程將Session返回到緩存池(CachingConnectionFactory中的cachedSessions)時(shí)它呀,緩存池有空閑空間螺男。因?yàn)槿绻彺婵臻g滿了,程序?qū)?duì)Session做物理關(guān)閉钟些,Session創(chuàng)建的MessageConsumer和MessageProducer將一并關(guān)閉烟号,可能存在的未消費(fèi)消息也會(huì)得到釋放。
  2. 在1成功的條件下政恍,該線程在下一個(gè)消息接收循環(huán)的開始汪拥,又一次成功獲取該Session。因?yàn)橐坏┢渌€程獲取該Session篙耗,就將調(diào)用另一個(gè)隊(duì)列進(jìn)行消費(fèi)迫筑。這一步也是一個(gè)概率問題,雖然從緩存池取出操作使用removeFirst宗弯,返還操作使用addLast脯燃,即取、還操作分別操作首尾蒙保,但是由于sessionCacheSize默認(rèn)設(shè)置為1辕棚,由于Session的返還和取出操作極快,在并發(fā)不嚴(yán)重的情況下邓厕,往往就會(huì)取到同一個(gè)Session逝嚎。
  3. 在1、2循環(huán)發(fā)生時(shí)详恼,線程將霸占該Session补君,即在該Session的處理隊(duì)列A的MessageConsumer被“遺忘”時(shí),ActiveMQ又將消息分配給它處理昧互。

另挽铁,本項(xiàng)目現(xiàn)有配置為:DefaultMessageListenerContainer配合CachingConnectionFactory使用伟桅,使用2個(gè)DefaultMessageListenerContainer實(shí)例分別監(jiān)聽2個(gè)隊(duì)列,并發(fā)數(shù)為1-10叽掘,其connectionFactory為一個(gè)CachingConnectionFatory楣铁,CachingConnectionFactory的sessionCacheSize為1。推測(cè)在使用DefaultMessageListenerContainer配合CachingConnectionFactory使用時(shí)够掠,使用多個(gè)DefaultMessageListenerContainer共享一個(gè)CachingConnectionFactory時(shí)民褂,有可能會(huì)出現(xiàn)該問題。
繼續(xù)上面的條件分析疯潭,1赊堪、2看起來好像很難發(fā)生。但設(shè)想竖哩,如果所有的線程總是以一定的速率操作從Session緩存池取出哭廉、返還Session,那么是不是同一個(gè)線程拿到相同Session的概率就更大呢相叁?那這個(gè)一定速率又是怎么來的呢遵绰,是不是總是收不到消息,大家都是按照默認(rèn)超時(shí)時(shí)間1秒來循環(huán)操作緩存池呢增淹?

回顧線上環(huán)境當(dāng)時(shí)的情況椿访,消息使用量確實(shí)不高。因此消息阻塞的情況一直持續(xù)了大概40分鐘虑润,才終于被打破成玫。原因是條件1中所說的“緩存池為空”條件不成立了,線程#1-1在返還Session拳喻,碰巧另一個(gè)線程同時(shí)放了個(gè)Session進(jìn)去(又是老冤家#0-3, - -!)哭当,線程#1-1發(fā)現(xiàn)緩存池滿了(其實(shí)就是默認(rèn)值1),決定對(duì)Session進(jìn)行物理關(guān)閉冗澈。Session的關(guān)閉才終于結(jié)束了其創(chuàng)建的MessageConsumer的生命钦勘,而MessageConsumer也終于肯放棄對(duì)未消費(fèi)消息的持有,依依不舍地去了(參見ActiveMQMessageConsumer#close())亚亲。彻采。。之后ActiveMQ客戶端發(fā)送Session和Consumer捌归、Producer的刪除信息給ActiveMQ Broker颊亮,未消費(fèi)的MessageDispatch也將被重新分配給其他的Consumer進(jìn)行消費(fèi)。
以下是阻塞情況被打破的日志陨溅。查看第3仅炊、4行雷恃,#0-3線程返還Session到Session緩存池疙剑,近乎同時(shí),#1-1發(fā)現(xiàn)Session緩存滿了潭陪,只能把Session物理關(guān)閉。此時(shí)其關(guān)聯(lián)的MessageConsumer也被關(guān)閉伪朽,至此未消費(fèi)的消息才被回收重新分發(fā)帮毁。

2017-09-18 07:19:49,910|DEBUG|org.springframework.jms.listener.DefaultMessageListenerContainer#0-3|ID:zw_123-41988-1505188712169-1:1:13686 Transaction Commit :null
2017-09-18 07:19:49,910|DEBUG|org.springframework.jms.listener.DefaultMessageListenerContainer#1-1|ID:zw_123-41988-1505188712169-1:1:13613 Transaction Commit :null
**2017-09-18 07:19:49,910|TRACE|org.springframework.jms.listener.DefaultMessageListenerContainer#0-3|Returned cached Session: ActiveMQSession {id=ID:zw_123-41988-1505188712169-1:1:13686,started=true}
**
**2017-09-18 07:19:49,910|DEBUG|org.springframework.jms.listener.DefaultMessageListenerContainer#1-1|Closing cached Session: ActiveMQSession {id=ID:zw_123-41988-1505188712169-1:1:13613,started=true}
**
2017-09-18 07:19:49,910|DEBUG|org.springframework.jms.listener.DefaultMessageListenerContainer#0-3|Creating new transaction with name [org.springframework.jms.listener.DefaultMessageListenerContainer#0]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT
2017-09-18 07:19:49,910|TRACE|org.springframework.jms.listener.DefaultMessageListenerContainer#0-3|Found cached JMS Session for mode 0: ActiveMQSession {id=ID:zw_123-41988-1505188712169-1:1:13686,started=true}
2017-09-18 07:19:49,910|DEBUG|org.springframework.jms.listener.DefaultMessageListenerContainer#1-1|remove: ID:zw_123-41988-1505188712169-1:1:13613:2, lastDeliveredSequenceId:-1
2017-09-18 07:19:49,910|DEBUG|org.springframework.jms.listener.DefaultMessageListenerContainer#0-3|Created JMS transaction on Session [Cached JMS Session: ActiveMQSession {id=ID:zw_123-41988-1505188712169-1:1:13686,started=true}] from Connection [Shared JMS Connection: ActiveMQConnection {id=ID:zw_123-41988-1505188712169-1:1,clientId=ID:zw_123-41988-1505188712169-0:1,started=true}]
2017-09-18 07:19:49,910|TRACE|org.springframework.jms.listener.DefaultMessageListenerContainer#0-3|Found cached JMS MessageConsumer for destination [queue://queue.A]: ActiveMQMessageConsumer { value=ID:zw_123-41988-1505188712169-1:1:13686:1, started=true }
2017-09-18 07:19:49,910|DEBUG|org.springframework.jms.listener.DefaultMessageListenerContainer#1-1|on close, rollback duplicate: ID:app_456-51856-1505195602738-5:9324:1:1:1
2017-09-18 07:19:49,910|DEBUG|org.springframework.jms.listener.DefaultMessageListenerContainer#1-1|on close, rollback duplicate: ID:app_456-30343-1505195633167-5:9537:1:1:1
2017-09-18 07:19:49,910|DEBUG|org.springframework.jms.listener.DefaultMessageListenerContainer#1-1|on close, rollback duplicate: ID:zw_123-45783-1505195534974-5:9184:1:1:1
2017-09-18 07:19:49,910|DEBUG|org.springframework.jms.listener.DefaultMessageListenerContainer#1-1|on close, rollback duplicate: ID:app_456-30343-1505195633167-5:9539:1:1:1
2017-09-18 07:19:49,910|DEBUG|org.springframework.jms.listener.DefaultMessageListenerContainer#1-1|on close, rollback duplicate: ID:zw_123-45783-1505195534974-5:9188:1:1:1
2017-09-18 07:19:49,910|DEBUG|org.springframework.jms.listener.DefaultMessageListenerContainer#1-1|on close, rollback duplicate: ID:zw_123-63173-1505195538255-1:9237:1:1:1
2017-09-18 07:19:49,910|DEBUG|org.springframework.jms.listener.DefaultMessageListenerContainer#1-1|on close, rollback duplicate: ID:app_456-30343-1505195633167-5:9542:1:1:1
2017-09-18 07:19:49,910|DEBUG|org.springframework.jms.listener.DefaultMessageListenerContainer#1-1|on close, rollback duplicate: ID:app_456-51856-1505195602738-5:9331:1:1:1
2017-09-18 07:19:49,910|DEBUG|org.springframework.jms.listener.DefaultMessageListenerContainer#1-1|on close, rollback duplicate: ID:zw_123-45783-1505195534974-5:9192:1:1:1
2017-09-18 07:19:49,910|DEBUG|org.springframework.jms.listener.DefaultMessageListenerContainer#1-1|on close, rollback duplicate: ID:zw_123-45783-1505195534974-5:9194:1:1:1
2017-09-18 07:19:49,910|DEBUG|org.springframework.jms.listener.DefaultMessageListenerContainer#1-1|on close, rollback duplicate: ID:app_789-25467-1501664276715-5:50772:1:1:1
2017-09-18 07:19:49,910|DEBUG|org.springframework.jms.listener.DefaultMessageListenerContainer#1-1|on close, rollback duplicate: ID:app_456-51856-1505195602738-5:9337:1:1:1
2017-09-18 07:19:49,910|DEBUG|org.springframework.jms.listener.DefaultMessageListenerContainer#1-1|on close, rollback duplicate: ID:zw_123-63173-1505195538255-1:9248:1:1:2
2017-09-18 07:19:49,910|DEBUG|org.springframework.jms.listener.DefaultMessageListenerContainer#1-1|on close, rollback duplicate: ID:app_456-51856-1505195602738-5:9340:1:1:1
2017-09-18 07:19:49,910|DEBUG|org.springframework.jms.listener.DefaultMessageListenerContainer#1-1|on close, rollback duplicate: ID:zw_123-63173-1505195538255-1:9252:1:1:1
2017-09-18 07:19:49,910|DEBUG|org.springframework.jms.listener.DefaultMessageListenerContainer#1-1|on close, rollback duplicate: ID:app_789-36182-1501664210853-5:50813:1:1:1
2017-09-18 07:19:49,910|DEBUG|org.springframework.jms.listener.DefaultMessageListenerContainer#1-1|on close, rollback duplicate: ID:app_456-30343-1505195633167-5:9555:1:1:1
2017-09-18 07:19:49,910|DEBUG|org.springframework.jms.listener.DefaultMessageListenerContainer#1-1|on close, rollback duplicate: ID:app_789-33412-1505357006369-3:316:1:1:1
2017-09-18 07:19:49,910|DEBUG|org.springframework.jms.listener.DefaultMessageListenerContainer#1-1|on close, rollback duplicate: ID:app_789-36182-1501664210853-5:50816:1:1:1
2017-09-18 07:19:49,910|DEBUG|org.springframework.jms.listener.DefaultMessageListenerContainer#1-1|on close, rollback duplicate: ID:zw_123-45783-1505195534974-5:9204:1:1:1
2017-09-18 07:19:49,910|DEBUG|org.springframework.jms.listener.DefaultMessageListenerContainer#1-1|remove: ID:zw_123-41988-1505188712169-1:1:13613:1, lastDeliveredSequenceId:90527387
2017-09-18 07:19:49,911|DEBUG|org.springframework.jms.listener.DefaultMessageListenerContainer#1-1|Creating new transaction with name [org.springframework.jms.listener.DefaultMessageListenerContainer#1]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT
2017-09-18 07:19:49,911|DEBUG|org.springframework.jms.listener.DefaultMessageListenerContainer#1-1|Creating cached JMS Session for mode 0: ActiveMQSession {id=ID:zw_123-41988-1505188712169-1:1:13687,started=true}
2017-09-18 07:19:49,911|DEBUG|org.springframework.jms.listener.DefaultMessageListenerContainer#1-1|Created JMS transaction on Session [Cached JMS Session: ActiveMQSession {id=ID:zw_123-41988-1505188712169-1:1:13687,started=true}] from Connection [Shared JMS Connection: ActiveMQConnection {id=ID:zw_123-41988-1505188712169-1:1,clientId=ID:zw_123-41988-1505188712169-0:1,started=true}]

0x03 問題解決

原因分析的差不多了,解決起來也比較簡單吉拳。只要破壞掉以上幾個(gè)條件之一即可质帅。但為了避免引入其他問題,還是依Spring-jms所言留攒,換掉CachingConnectionFactory煤惩,使用其他的ConnectionFactory。
在選擇ConnectionFactory時(shí)炼邀,也遇到了選擇障礙魄揉。是直接使用ActiveMQConnectionFactory,還是使用Spring-jms的SCF拭宁,還是使用PooledConnectionFactory呢洛退?

0x03.1 PooledConnectionFactory為啥不緩存MessageConsumer?

提到了PooledConnectionFactory杰标,就想為什么PooledConnectionFactory不對(duì)Consumer進(jìn)行緩存呢兵怯?看了看PooledConnectionFactory的源碼,才發(fā)現(xiàn)原來注釋就說的很明白腔剂。
org.apache.activemq.jms.pool.PooledConnectionFactory注釋:

A JMS provider which pools Connection, Session and MessageProducer instances so it can be used with tools like Camel and Spring's JmsTemplate and MessagListenerContainer. Connections, sessions and producers are returned to a pool after use so that they can be reused later without having to undergo the cost of creating them again. b>NOTE: while this implementation does allow the creation of a collection of active consumers, it does not 'pool' consumers. Pooling makes sense for connections, sessions and producers, which are expensive to create and can remain idle a minimal cost. Consumers, on the other hand, are usually just created at startup and left active, handling incoming messages as they come. When a consumer is complete, it is best to close it rather than return it to a pool for later reuse: this is because, even if a consumer is idle, ActiveMQ will keep delivering messages to the consumer's prefetch buffer, where they'll get held until the consumer is active again. If you are creating a collection of consumers (for example, for multi-threaded message consumption), you might want to consider using a lower prefetch value for each consumer (e.g. 10 or 20), to ensure that all messages don't end up going to just one of the consumers. See this FAQ entry for more detail: http://activemq.apache.org/i-do-not-receive-messages-in-my-second-consumer.html Optionally, one may configure the pool to examine and possibly evict objects as they sit idle in the pool. This is performed by an "idle object eviction" thread, which runs asynchronously. Caution should be used when configuring this optional feature. Eviction runs contend with client threads for access to objects in the pool, so if they run too frequently performance issues may result. The idle object eviction thread may be configured using the setTimeBetweenExpirationCheckMillis method. By default the value is -1 which means no eviction thread will be run. Set to a non-negative value to configure the idle eviction thread to run.

0x03.2 簡單的性能比對(duì)

但是如果不緩存消費(fèi)者媒区,效率真是比較低,還是選擇緩存消費(fèi)者吧桶蝎。
排除了PooledConnectionFactory后驻仅,我選擇了以下維度進(jìn)行測(cè)試。
不同的DMLC cacheLevel:CACHE_NONE, CACHE_CONNECTION, CACHE_SESSION, CACHE_CONSUMER, AUTO
不同的DMLC connectionFactory: ActiveMQConnectionFactory, SingleConnectionFactory
不同的DMLC注入方式: 使用jms:listener-container標(biāo)簽, 使用bean標(biāo)簽注冊(cè)DMLC實(shí)例

經(jīng)過一番非常不嚴(yán)謹(jǐn)?shù)谋葘?duì)后登渣,發(fā)現(xiàn)性能最高的組合是(基于JDK1.7):
DMLC注入方式(使用bean標(biāo)簽注冊(cè)DMLC實(shí)例)+connectionFactory(SingleConnectionFactory )+cacheLevel(CACHE_CONSUMER)噪服。

0x03.03 使用Wireshark和Python Pandas探究性能謎題

我一直以為使用jms:listener-container標(biāo)簽選擇default也就是使用DMLC的方式和使用bean注入DMLC的方式效率一樣,可實(shí)際測(cè)試才發(fā)現(xiàn)胜茧,使用bean的方式要比使用listener-container快1~3倍(注意是不嚴(yán)謹(jǐn)?shù)臏y(cè)試哦粘优,你測(cè)出來可能和我不一樣的)。
百思不得其解之下呻顽,我嘗試從它們的流量包中分析雹顺。
借助Wireshark,我分別抓了兩種方式下的流量數(shù)據(jù)廊遍,每次都是收發(fā)1000條消息嬉愧。dmlc-tag表示使用jms標(biāo)簽注入DMLC,dmlc-bean表示使用bean標(biāo)簽注入DMLC(名字不好沒關(guān)系喉前,只是代號(hào)而已--)没酣。因?yàn)闇y(cè)試的量比較少王财,兩種方式tag和bean分別使用5194ms和3246ms就完成了。
再看流量包的分析(WireShark 統(tǒng)計(jì)-IPv4 Statistics-All Addresses)裕便,輸入顯示過濾器“tcp.port == 61618”绒净,發(fā)現(xiàn)使用bean方式的網(wǎng)絡(luò)流量包相較使用jms tag的減少了3000+。

image.png

可是這樣看偿衰, 非常不直觀挂疆,是哪種包少了呢?
為了更直觀的查看下翎,使用tshark將流量數(shù)據(jù)包導(dǎo)出成csv進(jìn)行分析缤言。

tshark.exe -r dmlc-tag.pcap -T fields -e frame.number -e frame.time_epoch -e ip.src -e ip.dst -e ip.proto -e openwire -E header=y -E separator="-" -E quote=n  openwire > dmlc-tag.csv
tshark.exe -r dmlc-bean.pcap -T fields -e frame.number -e frame.time_epoch -e ip.src -e ip.dst -e ip.proto -e openwire -E header=y -E separator="-" -E quote=n  openwire > dmlc-bean.csv

再借助Python Pandas進(jìn)行粗略的分析:

import pandas as pd
tagdf = pd.read_csv('920pcap\dmlc-tag.csv',delimiter='-')
beandf = pd.read_csv('920pcap\dmlc-bean.csv',delimiter='-')

tagdf['command'] = tagdf['openwire'].str.split(',').str.get(0)
beandf['command'] = beandf['openwire'].str.split(',').str.get(0)

tagdf.groupby('command')['command'].count()
beandf.groupby('command')['command'].count()
image.png

對(duì)兩組數(shù)據(jù)包含的openwire協(xié)議命令求差集
set(tagdf['command'].unique())^set(beandf['command'].unique())

最終發(fā)現(xiàn),bean的response要少(1010 vs. 44)漏设,而且沒有TransactionInfo墨闲。

又經(jīng)過一番源碼的探索,發(fā)現(xiàn)郑口,如果DMLC的sessionTransacted屬性沒有指明true鸳碧,則將不使用事務(wù)

在Spring-jms源碼org.springframework.jms.config.JmsListenerContainerParser中可以發(fā)現(xiàn)犬性,acknowledge可以影響ListenerContainer的兩個(gè)屬性sessionTransacted和sessionAcknowledgeMode瞻离。

image.png

所以從jms:listener-container標(biāo)簽方式切換到bean注入方式,使用事務(wù)需要注意sessionTransacted這個(gè)屬性乒裆,光是設(shè)置transactionManager可能不行套利。

下面給出使用jms:listener-container和bean方式注入DMLC的代碼,希望小伙伴們避免犯我這樣的錯(cuò)誤鹤耍。

<jms:listener-container
        connection-factory="consumerJmsConnectionFactory"
        transaction-manager="consumerJmsTransactionManager"
        concurrency="1-10"
        **acknowledge="transacted"**
        cache="consumer"
    >
        <jms:listener destination="dest.queue" ref="testConsumer" method="receive"/> 
    </jms:listener-container>
<bean id="consumptionOrderListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
<property name="delegate" ref="testConsumer"/>
<property name="defaultListenerMethod" value="receive"/>
<!--<property name="messageConverter"-->
</bean>
<bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="consumerJmsConnectionFactory"/>
<property name="transactionManager" ref="consumerJmsTransactionManager"/>
<property name="concurrency" value="1-10"/>
<property name="cacheLevelName" value="CACHE_CONSUMER" />

**<property name="sessionTransacted" value="true"/>**
<property name="maxMessagesPerTask" value="1000"/>
<property name="receiveTimeout" value="1000"/>
<property name="destinationName" value="dest.queue"/>
<property name="messageListener" ref="consumptionOrderListener"/>
</bean>

在設(shè)置完事務(wù)后肉迫,再進(jìn)行測(cè)試,兩種方式的消費(fèi)性能就差不多了稿黄。題外話喊衫,使用事務(wù)竟然對(duì)性能有這么大的影響,回頭再研究下杆怕。
鑒于使用bean直接注入DMLC的方式族购,可以指定maxMessagesPerTask,從而較靈活地伸縮消費(fèi)線程的數(shù)量陵珍,最后還是選擇了使用bean的方式寝杖。
至此,困擾我很長時(shí)間的MQ阻塞之謎才終于解開互纯。啊瑟幕,開心。


參考資料

  1. https://docs.spring.io/spring/docs/3.2.16.RELEASE/spring-framework-reference/htmlsingle/#jms
  2. Why DefaultMessageListenerContainer should not use CachingConnectionFactory? https://stackoverflow.com/questions/21984319/why-defaultmessagelistenercontainer-should-not-use-cachingconnectionfactory
  3. 深入理解DefaultMessageListenerContainer http://bijian1013.iteye.com/blog/2309671
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市收苏,隨后出現(xiàn)的幾起案子亿卤,更是在濱河造成了極大的恐慌,老刑警劉巖鹿霸,帶你破解...
    沈念sama閱讀 206,126評(píng)論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異秆乳,居然都是意外死亡懦鼠,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,254評(píng)論 2 382
  • 文/潘曉璐 我一進(jìn)店門屹堰,熙熙樓的掌柜王于貴愁眉苦臉地迎上來肛冶,“玉大人,你說我怎么就攤上這事扯键∧佬洌” “怎么了?”我有些...
    開封第一講書人閱讀 152,445評(píng)論 0 341
  • 文/不壞的土叔 我叫張陵荣刑,是天一觀的道長馅笙。 經(jīng)常有香客問我,道長厉亏,這世上最難降的妖魔是什么董习? 我笑而不...
    開封第一講書人閱讀 55,185評(píng)論 1 278
  • 正文 為了忘掉前任,我火速辦了婚禮爱只,結(jié)果婚禮上皿淋,老公的妹妹穿的比我還像新娘。我一直安慰自己恬试,他們只是感情好窝趣,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,178評(píng)論 5 371
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著训柴,像睡著了一般哑舒。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上畦粮,一...
    開封第一講書人閱讀 48,970評(píng)論 1 284
  • 那天散址,我揣著相機(jī)與錄音,去河邊找鬼宣赔。 笑死预麸,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的儒将。 我是一名探鬼主播吏祸,決...
    沈念sama閱讀 38,276評(píng)論 3 399
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼钩蚊!你這毒婦竟也來了贡翘?” 一聲冷哼從身側(cè)響起蹈矮,我...
    開封第一講書人閱讀 36,927評(píng)論 0 259
  • 序言:老撾萬榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎鸣驱,沒想到半個(gè)月后泛鸟,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 43,400評(píng)論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡踊东,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 35,883評(píng)論 2 323
  • 正文 我和宋清朗相戀三年北滥,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片闸翅。...
    茶點(diǎn)故事閱讀 37,997評(píng)論 1 333
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡再芋,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出坚冀,到底是詐尸還是另有隱情济赎,我是刑警寧澤,帶...
    沈念sama閱讀 33,646評(píng)論 4 322
  • 正文 年R本政府宣布记某,位于F島的核電站司训,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏辙纬。R本人自食惡果不足惜豁遭,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,213評(píng)論 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望贺拣。 院中可真熱鬧蓖谢,春花似錦、人聲如沸譬涡。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,204評(píng)論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽涡匀。三九已至盯腌,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間陨瘩,已是汗流浹背腕够。 一陣腳步聲響...
    開封第一講書人閱讀 31,423評(píng)論 1 260
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留舌劳,地道東北人帚湘。 一個(gè)月前我還...
    沈念sama閱讀 45,423評(píng)論 2 352
  • 正文 我出身青樓,卻偏偏與公主長得像甚淡,于是被迫代替她去往敵國和親大诸。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,722評(píng)論 2 345

推薦閱讀更多精彩內(nèi)容

  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn)资柔,斷路器焙贷,智...
    卡卡羅2017閱讀 134,599評(píng)論 18 139
  • 1、前言 之前我們通過兩篇文章(架構(gòu)設(shè)計(jì):系統(tǒng)間通信(19)——MQ:消息協(xié)議(上)贿堰、架構(gòu)設(shè)計(jì):系統(tǒng)間通信(20)...
    境里婆娑閱讀 1,868評(píng)論 0 4
  • 背景介紹 Kafka簡介 Kafka是一種分布式的辙芍,基于發(fā)布/訂閱的消息系統(tǒng)。主要設(shè)計(jì)目標(biāo)如下: 以時(shí)間復(fù)雜度為O...
    高廣超閱讀 12,818評(píng)論 8 167
  • 一羹与、 消息隊(duì)列概述 消息隊(duì)列中間件是分布式系統(tǒng)中重要的組件沸手,主要解決應(yīng)用耦合、異步消息注簿、流量削鋒等問題。實(shí)現(xiàn)高性能...
    步積閱讀 56,854評(píng)論 10 138
  • 許多主持人要求來賓用簡單的“是”和“不是”來回答復(fù)雜的問題跳仿。如果來賓用這種方式進(jìn)行回答诡渴,他們就會(huì)被批評(píng)過于...
    鄧潔兒閱讀 188評(píng)論 0 1