事務(wù)
jms中事務(wù)分為生產(chǎn)者和消費者兩塊犹菇,消息的生產(chǎn)和消費不能包含在同一個事務(wù)中德迹。
生產(chǎn)者:
在事務(wù)狀態(tài)下進行發(fā)送操作,消息并未真正投遞到中間件揭芍。而只有進行session.commit操作之后胳搞,消息才會發(fā)送到中間件,再轉(zhuǎn)發(fā)到適當(dāng)?shù)南M者進行處理。如果是調(diào)用rollback操作流酬,則表明币厕,當(dāng)前事務(wù)期間內(nèi)所發(fā)送的消息都取消掉。
在支持事務(wù)的session中芽腾,producer發(fā)送message時在message中帶有transactionID。broker收到message后判斷是否有transactionID页衙,如果有就把message保存在transaction store中摊滔,等待commit或者rollback消息。所以ActiveMq的事務(wù)是針對broker而不是producer的店乐,不管session是否commit艰躺,broker都會收到message。如果producer發(fā)送模式選擇了persistent眨八,那么message過期后會進入死亡隊列腺兴。在message進入死亡隊列之前,ActiveMQ會刪除message中的transaction ID廉侧,這樣過期的message就不在事務(wù)中了页响,不會保存在transaction store中,會直接進入死亡隊列段誊。
消費者:
在Spring整合JMS的應(yīng)用中闰蚕,我們要進行本地的事務(wù)管理,只需要指定對應(yīng)的監(jiān)聽容器的sessionTransacted屬性為true连舍。對于SessionAwareMessageListener没陡,在接收到消息后發(fā)送一個返回消息時也處于同一事務(wù)下,但是對于其他操作如數(shù)據(jù)庫訪問等將不屬于該事務(wù)控制索赏。
<bean id="jmsContainer"
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory" />
<property name="destination" ref="queueDestination" />
<property name="messageListener" ref="consumerMessageListener" />
<property name="sessionTransacted" value="true"/>
</bean>
jta事務(wù):
如果想要接收消息和數(shù)據(jù)庫訪問處于同一事務(wù)中盼玄,那么我們就可以配置一個外部的事務(wù)管理同時配置一個支持外部事務(wù)管理的消息監(jiān)聽容器(如DefaultMessageListenerContainer)。要配置這樣一個參與分布式事務(wù)管理的消息監(jiān)聽容器潜腻,我們可以配置一個JtaTransactionManager埃儿,當(dāng)然底層的JMS ConnectionFactory需要能夠支持分布式事務(wù)管理,并正確地注冊我們的JtaTransactionManager砾赔。這樣消息監(jiān)聽器進行消息接收和對應(yīng)的數(shù)據(jù)庫訪問就會處于同一數(shù)據(jù)庫控制下蝌箍,當(dāng)消息接收失敗或數(shù)據(jù)庫訪問失敗都會進行事務(wù)回滾操作。
<bean id="listenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory" />
<property name="messageListener" ref="jmsQueueReceiver" />
<property name="destination" ref="queueDestination" />
<property name="sessionTransacted" value="true"/>
<property name="transactionManager" ref="jtaTransactionManager"/>
</bean>
<bean id="jtaTransactionManager" class="org.springframework.transaction.jta.JtaTransactionManager"/>
當(dāng)指定了transactionManager時暴心,消息監(jiān)聽容器將忽略sessionTransacted的值妓盲。
持久化
默認持久化到文件中:
打開安裝目錄下的配置文件,注意這里使用的是kahaDB专普,是一個基于文件支持事務(wù)的消息存儲器悯衬。以日志形式存儲消息,消息索引以B-Tree結(jié)構(gòu)存儲。在D:\ActiveMQ\apache-activemq\conf\activemq.xml中會發(fā)現(xiàn)默認的配置項:
<persistenceAdapter>
<kahaDB directory="${activemq.data}/kahadb"/>
</persistenceAdapter>
消息存儲在基于文件的數(shù)據(jù)日志中筋粗。如果消息發(fā)送成功策橘,變標(biāo)記為可刪除的。系統(tǒng)會周期性的清除或者歸檔日志文件娜亿。
消息文件的位置索引存儲在內(nèi)存中丽已,這樣能快速定位到。定期將內(nèi)存中的消息索引保存到metadata store中买决,避免大量消息未發(fā)送時沛婴,消息索引占用過多內(nèi)存空間。
如需持久化到數(shù)據(jù)庫中:
首先需要把MySql的驅(qū)動放到ActiveMQ的Lib目錄下督赤,例如:mysql-connector-java-5.0.4-bin.jar嘁灯,在conf/acticvemq.xml中更改persistenceAdapter節(jié)點配置并且引用定義的mysql-ds數(shù)據(jù)源。
<!--
Configure message persistence for the broker. The default persistence
mechanism is the KahaDB store (identified by the kahaDB tag).
For more information, see:
http://activemq.apache.org/persistence.html
<kahaDB directory="${activemq.data}/kahadb"/>
-->
<persistenceAdapter>
<jdbcPersistenceAdapter dataSource="#mysql-ds" createTablesOnStartup="false" />
</persistenceAdapter>
dataSource指定持久化數(shù)據(jù)庫的bean躲舌,createTablesOnStartup是否在啟動的時候創(chuàng)建數(shù)據(jù)表丑婿,默認值是true,這樣每次啟動都會去創(chuàng)建數(shù)據(jù)表了没卸,一般是第一次啟動的時候設(shè)置為true羹奉,之后改成false。
在conf/acticvemq.xml中定義mysql-ds办悟,如下尘奏。
<bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
<property name="driverClassName" value="com.mysql.jdbc.Driver"/>
<property name="url" value="jdbc:mysql://localhost/activemq?relaxAutoCommit=true"/>
<property name="username" value="root"/>
<property name="password" value=""/>
<property name="maxActive" value="200"/>
<property name="poolPreparedStatements" value="true"/>
</bean>
然后重新啟動消息隊列,你會發(fā)現(xiàn)多了3張表activemq_acks病蛉,activemq_lock炫加,activemq_msgs。
- activemq_msgs用于存儲消息铺然,然后啟動消費者俗孝,發(fā)現(xiàn)Mysql中已經(jīng)沒有這條消息了。
- activemq_acks用于存儲訂閱關(guān)系魄健。如果是持久化Topic赋铝,訂閱者和服務(wù)器的訂閱關(guān)系在這個表保存。
- activemq_lock在集群環(huán)境中才有用沽瘦。
PERSISTENT (持久消息)和 NON_PERSISTENT(非持久消息)革骨,默認為持久消息。持久化的消息在MQ服務(wù)器宕機之后析恋,消息不會丟失良哲,在重啟服務(wù)的時候,消息將恢復(fù)助隧。
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="cachingConnectionFactory"></property>
<property name="defaultDestination" ref="dest" />
<property name="messageConverter" ref="messageConverter" />
<property name="pubSubDomain" value="false" />
<property name="explicitQosEnabled" value="true" />
<!-- 發(fā)送模式 DeliveryMode.NON_PERSISTENT=1:非持久 ; DeliveryMode.PERSISTENT=2:持久-->
<property name="deliveryMode" value="1" />
</bean>
prefetch:
ActiveMQ的prefetch機制筑凫。當(dāng)消費者去獲取消息時,不會一條一條去獲取,而是一次性獲取一批巍实,默認是1000條滓技。
假設(shè)有三個消費者,接收從1到99棚潦,共99條消息:
consumer A:1,4,7...
consumer B:2,5,8...
consumer C:3,6,9...
按照默認分配策略令漂,將會把消息如上預(yù)分配。
這些預(yù)獲取的消息丸边,在還沒確認消費之前洗显,在管理控制臺還是可以看見這些消息的,但是不會再分配給其他消費者原环,此時這些消息的狀態(tài)應(yīng)該算作“已分配未消費”,如果消息最后被消費处窥,則會在服務(wù)器端被刪除嘱吗。如果消費者崩潰,則這些消息會被重新分配給新的消費者滔驾。但是如果消費者既不消費確認谒麦,又不崩潰,那這些消息就永遠躺在消費者的緩存區(qū)里無法處理哆致。
假設(shè)一種情況:某個consumer C性能較差绕德,處理信息速度很慢。會導(dǎo)致consumer C任有消息積壓摊阀,但consumer A耻蛇, consumer B已經(jīng)空閑。
解決方案:將consumer C 的 prefetch設(shè)為1胞此,每次處理1條消息臣咖,處理完再去取。
prefetchPolicy.setQueuePrefetch(1);
connectionFactory = new ActiveMQConnectionFactory(JMSConsumer.USERNAME, JMSConsumer.PASSWORD, JMSConsumer.BROKEURL);//實例化連接工廠
connectionFactory.setPrefetchPolicy(prefetchPolicy);
TimeToLive:
表示一個消息的有效期漱牵。只有在這個有效期內(nèi)夺蛇,消息消費者才可以消費這個消息。默認值為0酣胀,表示消息永不過期刁赦。
如果使用TTL來判定消息的過期,那么就首先需要確保Producer闻镶、broker兩者的系統(tǒng)時間要盡可能的一致甚脉,Consumer也盡可能的和broker的時間保持一致。Broker將會在接收Producer消息時儒溉,以及將消息發(fā)送給Consumer之前都會檢測消息是否過期宦焦,判斷過期的方法也就是根據(jù)JMSExpiration和當(dāng)前時間戳比較。
可以通過下面的方式設(shè)置:
producer.setTimeToLive(3600000); //有效期1小時 (1000毫秒 * 60秒 * 60分)
可以在消息發(fā)送時,為當(dāng)前消息設(shè)定ttl波闹。
messageProducer.send(Message message, int deliveryMode, int priority, long timeToLive)
如果消息過期酝豪,將會把消息發(fā)送到DLQ中,此消息不會被Consumer消費精堕。如果broker端對DLQ使用Discard策略或者Broker沒有開啟DLQ相關(guān)策略孵淘,這些過期的消息可能將不復(fù)存在。在conf/activemq.xml中:
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry queue=">" topic=">">
<deadLetterStrategy>
<sharedDeadLetterStrategy processExpired="false" />
</deadLetterStrategy>
<!-- discard all -->
<!--
<discardingDeadLetterStrategy />
-->
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>
Priority:
我們可以在發(fā)送消息時歹篓,指定消息的權(quán)重瘫证,broker可以建議權(quán)重較高的消息將會優(yōu)先發(fā)送給Consumer。不過因為各種原因庄撮,priority并不能決定消息傳送的嚴(yán)格順序(order)背捌。
JMS標(biāo)準(zhǔn)中約定priority可以為09的數(shù)值,值越大表示權(quán)重越高洞斯,默認值為4毡庆。不過activeMQ中各個存儲器對priority的支持并非完全一樣。比如JDBC存儲器可以支持09烙如。但是對于kahadb/levelDB等這種基于日志文件的存儲器而言么抗,priority支持相對較弱,只能識別三種優(yōu)先級(LOW: < 4,NORMAL: =4,HIGH: > 4)亚铁。在broker端蝇刀,默認是不支持priority排序的,我們需要手動開啟徘溢。在conf/activemq.xml中::
<policyEntry queue=">" prioritizedMessages="true"/>
設(shè)置message的優(yōu)先級:
TextMessage message = session.createTextMessage("ActiveMQ 發(fā)送消息" +i);//創(chuàng)建一條文本消息
message.setJMSPriority(9);
failover:
如果集群中的某一臺消息服務(wù)器宕機吞琐,與該臺消息服務(wù)器相連接的生產(chǎn)者和消費者需要能夠自動連接到其他正常工作的消息服務(wù)器。對此ActiveMQ提供了一種叫做失效轉(zhuǎn)移(也叫故障轉(zhuǎn)移甸昏,F(xiàn)ailOver)的策略顽分。失效轉(zhuǎn)移提供了在傳輸層上重新連接到其他任何傳輸器的功能。
只需要在uri中配置施蜜,語法如下:
failover:(uri1,...,uriN)?transportOptions 或者 failover:uri1,...,uriN
例子:
failover:(tcp://primary:61616,tcp://secondary:61616)?randomize=false
failover:(tcp://localhost:61616,tcp://remotehost:61616)?randomize=false&initialReconnectDelay=100
如果某個ActiveMQ客戶端發(fā)現(xiàn)uri1地址失效了卒蘸,它會立即轉(zhuǎn)向uri地址列表中其他可以連接的消息服務(wù)器進行重連,以保證繼續(xù)正常工作翻默,這種選擇其他地址的方式默認是隨機的缸沃,以保證負載均衡。如果你想關(guān)閉隨機修械,可以transportOptions中加入randomize=false趾牧。
Java例子:
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
"failover:(
tcp://192.168.0.87:61616?wireFormat.maxInactivityDuration=0,
tcp://192.168.0.87:61617?wireFormat.maxInactivityDuration=0
)");
Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
- transportOptions有多種參數(shù)可以選擇,如下:
- initialReconnectDelay:默認為10肯污,單位毫秒翘单,表示第一次嘗試重連之前等待的時間吨枉。
- maxReconnectDelay:默認30000,單位毫秒哄芜,表示兩次重連之間的最大時間間隔貌亭。
- useExponentialBackOff:默認為true,表示重連時是否加入避讓指數(shù)來避免高并發(fā)认臊。
- reconnectDelayExponent:默認為2.0圃庭,重連時使用的避讓指數(shù)。
- maxReconnectAttempts:5.6版本之前默認為-1, 5.6版本及其以后失晴,默認為0剧腻。 0表示重連的次數(shù)無限,配置大于0可以指定最大重連次數(shù)涂屁。
- randomize:默認為true书在,表示在URI列表中選擇URI連接時是否采用隨機策略。如果為true的話拆又,有可能生產(chǎn)者連接的是第一個蕊温,而消費者連接的是第二個,造成一個服務(wù)器上只有生產(chǎn)者遏乔,一個服務(wù)器上只有消費者。
- backup:默認為false发笔,表示是否在連接初始化時將URI列表中的所有地址都初始化連接盟萨,以便快速的失效轉(zhuǎn)移,默認是不開啟了讨。
- timeout:默認為-1捻激,單位毫秒,是否允許在重連過程中設(shè)置超時時間來中斷的正在阻塞的發(fā)送操作前计。-1表示不允許胞谭,其他表示超時時間。
5.9版本使用levelDB+zookeeper的方式來實現(xiàn)HA了男杈。
參考:
http://haohaoxuexi.iteye.com/blog/1983532
http://manzhizhen.iteye.com/blog/2105572