ActiveMQ事物/持久化/其他

事務(wù)
jms中事務(wù)分為生產(chǎn)者和消費者兩塊犹菇,消息的生產(chǎn)和消費不能包含在同一個事務(wù)中德迹。

生產(chǎn)者:
在事務(wù)狀態(tài)下進行發(fā)送操作,消息并未真正投遞到中間件揭芍。而只有進行session.commit操作之后胳搞,消息才會發(fā)送到中間件,再轉(zhuǎn)發(fā)到適當(dāng)?shù)南M者進行處理。如果是調(diào)用rollback操作流酬,則表明币厕,當(dāng)前事務(wù)期間內(nèi)所發(fā)送的消息都取消掉。
在支持事務(wù)的session中芽腾,producer發(fā)送message時在message中帶有transactionID。broker收到message后判斷是否有transactionID页衙,如果有就把message保存在transaction store中摊滔,等待commit或者rollback消息。所以ActiveMq的事務(wù)是針對broker而不是producer的店乐,不管session是否commit艰躺,broker都會收到message。如果producer發(fā)送模式選擇了persistent眨八,那么message過期后會進入死亡隊列腺兴。在message進入死亡隊列之前,ActiveMQ會刪除message中的transaction ID廉侧,這樣過期的message就不在事務(wù)中了页响,不會保存在transaction store中,會直接進入死亡隊列段誊。

消費者:
在Spring整合JMS的應(yīng)用中闰蚕,我們要進行本地的事務(wù)管理,只需要指定對應(yīng)的監(jiān)聽容器的sessionTransacted屬性為true连舍。對于SessionAwareMessageListener没陡,在接收到消息后發(fā)送一個返回消息時也處于同一事務(wù)下,但是對于其他操作如數(shù)據(jù)庫訪問等將不屬于該事務(wù)控制索赏。

    <bean id="jmsContainer"  
        class="org.springframework.jms.listener.DefaultMessageListenerContainer">  
        <property name="connectionFactory" ref="connectionFactory" />  
        <property name="destination" ref="queueDestination" />  
        <property name="messageListener" ref="consumerMessageListener" />  
        <property name="sessionTransacted" value="true"/>  
    </bean> 

jta事務(wù):
如果想要接收消息和數(shù)據(jù)庫訪問處于同一事務(wù)中盼玄,那么我們就可以配置一個外部的事務(wù)管理同時配置一個支持外部事務(wù)管理的消息監(jiān)聽容器(如DefaultMessageListenerContainer)。要配置這樣一個參與分布式事務(wù)管理的消息監(jiān)聽容器潜腻,我們可以配置一個JtaTransactionManager埃儿,當(dāng)然底層的JMS ConnectionFactory需要能夠支持分布式事務(wù)管理,并正確地注冊我們的JtaTransactionManager砾赔。這樣消息監(jiān)聽器進行消息接收和對應(yīng)的數(shù)據(jù)庫訪問就會處于同一數(shù)據(jù)庫控制下蝌箍,當(dāng)消息接收失敗或數(shù)據(jù)庫訪問失敗都會進行事務(wù)回滾操作。

    <bean id="listenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="connectionFactory" />
        <property name="messageListener" ref="jmsQueueReceiver" />
        <property name="destination" ref="queueDestination" />
        <property name="sessionTransacted" value="true"/>
        <property name="transactionManager" ref="jtaTransactionManager"/>
    </bean>
    <bean id="jtaTransactionManager" class="org.springframework.transaction.jta.JtaTransactionManager"/>

當(dāng)指定了transactionManager時暴心,消息監(jiān)聽容器將忽略sessionTransacted的值妓盲。

持久化
默認持久化到文件中:
打開安裝目錄下的配置文件,注意這里使用的是kahaDB专普,是一個基于文件支持事務(wù)的消息存儲器悯衬。以日志形式存儲消息,消息索引以B-Tree結(jié)構(gòu)存儲。在D:\ActiveMQ\apache-activemq\conf\activemq.xml中會發(fā)現(xiàn)默認的配置項:

    <persistenceAdapter>
        <kahaDB directory="${activemq.data}/kahadb"/>
    </persistenceAdapter>

消息存儲在基于文件的數(shù)據(jù)日志中筋粗。如果消息發(fā)送成功策橘,變標(biāo)記為可刪除的。系統(tǒng)會周期性的清除或者歸檔日志文件娜亿。
消息文件的位置索引存儲在內(nèi)存中丽已,這樣能快速定位到。定期將內(nèi)存中的消息索引保存到metadata store中买决,避免大量消息未發(fā)送時沛婴,消息索引占用過多內(nèi)存空間。

如需持久化到數(shù)據(jù)庫中:
首先需要把MySql的驅(qū)動放到ActiveMQ的Lib目錄下督赤,例如:mysql-connector-java-5.0.4-bin.jar嘁灯,在conf/acticvemq.xml中更改persistenceAdapter節(jié)點配置并且引用定義的mysql-ds數(shù)據(jù)源。

        <!--
            Configure message persistence for the broker. The default persistence
            mechanism is the KahaDB store (identified by the kahaDB tag).
            For more information, see:

            http://activemq.apache.org/persistence.html
            <kahaDB directory="${activemq.data}/kahadb"/>
        -->
        <persistenceAdapter> 
            <jdbcPersistenceAdapter dataSource="#mysql-ds" createTablesOnStartup="false" /> 
        </persistenceAdapter>

dataSource指定持久化數(shù)據(jù)庫的bean躲舌,createTablesOnStartup是否在啟動的時候創(chuàng)建數(shù)據(jù)表丑婿,默認值是true,這樣每次啟動都會去創(chuàng)建數(shù)據(jù)表了没卸,一般是第一次啟動的時候設(shè)置為true羹奉,之后改成false。

在conf/acticvemq.xml中定義mysql-ds办悟,如下尘奏。

  <bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
    <property name="driverClassName" value="com.mysql.jdbc.Driver"/>
    <property name="url" value="jdbc:mysql://localhost/activemq?relaxAutoCommit=true"/>
    <property name="username" value="root"/>
    <property name="password" value=""/>
    <property name="maxActive" value="200"/>
    <property name="poolPreparedStatements" value="true"/>
  </bean>

然后重新啟動消息隊列,你會發(fā)現(xiàn)多了3張表activemq_acks病蛉,activemq_lock炫加,activemq_msgs。

  • activemq_msgs用于存儲消息铺然,然后啟動消費者俗孝,發(fā)現(xiàn)Mysql中已經(jīng)沒有這條消息了。
  • activemq_acks用于存儲訂閱關(guān)系魄健。如果是持久化Topic赋铝,訂閱者和服務(wù)器的訂閱關(guān)系在這個表保存。
  • activemq_lock在集群環(huán)境中才有用沽瘦。

PERSISTENT (持久消息)和 NON_PERSISTENT(非持久消息)革骨,默認為持久消息。持久化的消息在MQ服務(wù)器宕機之后析恋,消息不會丟失良哲,在重啟服務(wù)的時候,消息將恢復(fù)助隧。

<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">  
    <property name="connectionFactory" ref="cachingConnectionFactory"></property>  
    <property name="defaultDestination" ref="dest" />  
    <property name="messageConverter" ref="messageConverter" />  
    <property name="pubSubDomain" value="false" />  
    <property name="explicitQosEnabled" value="true" />  
    <!-- 發(fā)送模式  DeliveryMode.NON_PERSISTENT=1:非持久 ; DeliveryMode.PERSISTENT=2:持久-->
    <property name="deliveryMode" value="1" />   
</bean>  

prefetch:
ActiveMQ的prefetch機制筑凫。當(dāng)消費者去獲取消息時,不會一條一條去獲取,而是一次性獲取一批巍实,默認是1000條滓技。
假設(shè)有三個消費者,接收從1到99棚潦,共99條消息:
consumer A:1,4,7...
consumer B:2,5,8...
consumer C:3,6,9...
按照默認分配策略令漂,將會把消息如上預(yù)分配。

這些預(yù)獲取的消息丸边,在還沒確認消費之前洗显,在管理控制臺還是可以看見這些消息的,但是不會再分配給其他消費者原环,此時這些消息的狀態(tài)應(yīng)該算作“已分配未消費”,如果消息最后被消費处窥,則會在服務(wù)器端被刪除嘱吗。如果消費者崩潰,則這些消息會被重新分配給新的消費者滔驾。但是如果消費者既不消費確認谒麦,又不崩潰,那這些消息就永遠躺在消費者的緩存區(qū)里無法處理哆致。

假設(shè)一種情況:某個consumer C性能較差绕德,處理信息速度很慢。會導(dǎo)致consumer C任有消息積壓摊阀,但consumer A耻蛇, consumer B已經(jīng)空閑。
解決方案:將consumer C 的 prefetch設(shè)為1胞此,每次處理1條消息臣咖,處理完再去取。

    prefetchPolicy.setQueuePrefetch(1);
    connectionFactory = new ActiveMQConnectionFactory(JMSConsumer.USERNAME, JMSConsumer.PASSWORD, JMSConsumer.BROKEURL);//實例化連接工廠
    connectionFactory.setPrefetchPolicy(prefetchPolicy);

TimeToLive:

表示一個消息的有效期漱牵。只有在這個有效期內(nèi)夺蛇,消息消費者才可以消費這個消息。默認值為0酣胀,表示消息永不過期刁赦。

如果使用TTL來判定消息的過期,那么就首先需要確保Producer闻镶、broker兩者的系統(tǒng)時間要盡可能的一致甚脉,Consumer也盡可能的和broker的時間保持一致。Broker將會在接收Producer消息時儒溉,以及將消息發(fā)送給Consumer之前都會檢測消息是否過期宦焦,判斷過期的方法也就是根據(jù)JMSExpiration和當(dāng)前時間戳比較。

可以通過下面的方式設(shè)置:

    producer.setTimeToLive(3600000); //有效期1小時 (1000毫秒 * 60秒 * 60分)

可以在消息發(fā)送時,為當(dāng)前消息設(shè)定ttl波闹。

    messageProducer.send(Message message, int deliveryMode, int priority, long timeToLive)

如果消息過期酝豪,將會把消息發(fā)送到DLQ中,此消息不會被Consumer消費精堕。如果broker端對DLQ使用Discard策略或者Broker沒有開啟DLQ相關(guān)策略孵淘,這些過期的消息可能將不復(fù)存在。在conf/activemq.xml中:

    <destinationPolicy>
        <policyMap>
         <policyEntries>
           <policyEntry queue=">" topic=">">
             <deadLetterStrategy>
               <sharedDeadLetterStrategy processExpired="false" />
             </deadLetterStrategy>
             <!-- discard all -->
             <!--
             <discardingDeadLetterStrategy />
             -->
           </policyEntry>
         </policyEntries>
        </policyMap>
    </destinationPolicy>

Priority:

我們可以在發(fā)送消息時歹篓,指定消息的權(quán)重瘫证,broker可以建議權(quán)重較高的消息將會優(yōu)先發(fā)送給Consumer。不過因為各種原因庄撮,priority并不能決定消息傳送的嚴(yán)格順序(order)背捌。
JMS標(biāo)準(zhǔn)中約定priority可以為09的數(shù)值,值越大表示權(quán)重越高洞斯,默認值為4毡庆。不過activeMQ中各個存儲器對priority的支持并非完全一樣。比如JDBC存儲器可以支持09烙如。但是對于kahadb/levelDB等這種基于日志文件的存儲器而言么抗,priority支持相對較弱,只能識別三種優(yōu)先級(LOW: < 4,NORMAL: =4,HIGH: > 4)亚铁。在broker端蝇刀,默認是不支持priority排序的,我們需要手動開啟徘溢。在conf/activemq.xml中::

    <policyEntry queue=">" prioritizedMessages="true"/>

設(shè)置message的優(yōu)先級:
TextMessage message = session.createTextMessage("ActiveMQ 發(fā)送消息" +i);//創(chuàng)建一條文本消息
message.setJMSPriority(9);

failover:
如果集群中的某一臺消息服務(wù)器宕機吞琐,與該臺消息服務(wù)器相連接的生產(chǎn)者和消費者需要能夠自動連接到其他正常工作的消息服務(wù)器。對此ActiveMQ提供了一種叫做失效轉(zhuǎn)移(也叫故障轉(zhuǎn)移甸昏,F(xiàn)ailOver)的策略顽分。失效轉(zhuǎn)移提供了在傳輸層上重新連接到其他任何傳輸器的功能。

只需要在uri中配置施蜜,語法如下:

    failover:(uri1,...,uriN)?transportOptions 或者 failover:uri1,...,uriN

例子:

    failover:(tcp://primary:61616,tcp://secondary:61616)?randomize=false
    failover:(tcp://localhost:61616,tcp://remotehost:61616)?randomize=false&initialReconnectDelay=100 

如果某個ActiveMQ客戶端發(fā)現(xiàn)uri1地址失效了卒蘸,它會立即轉(zhuǎn)向uri地址列表中其他可以連接的消息服務(wù)器進行重連,以保證繼續(xù)正常工作翻默,這種選擇其他地址的方式默認是隨機的缸沃,以保證負載均衡。如果你想關(guān)閉隨機修械,可以transportOptions中加入randomize=false趾牧。

Java例子:

    ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
    "failover:(
    tcp://192.168.0.87:61616?wireFormat.maxInactivityDuration=0,
    tcp://192.168.0.87:61617?wireFormat.maxInactivityDuration=0
    )");    
    Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);    
  • transportOptions有多種參數(shù)可以選擇,如下:
  • initialReconnectDelay:默認為10肯污,單位毫秒翘单,表示第一次嘗試重連之前等待的時間吨枉。
  • maxReconnectDelay:默認30000,單位毫秒哄芜,表示兩次重連之間的最大時間間隔貌亭。
  • useExponentialBackOff:默認為true,表示重連時是否加入避讓指數(shù)來避免高并發(fā)认臊。
  • reconnectDelayExponent:默認為2.0圃庭,重連時使用的避讓指數(shù)。
  • maxReconnectAttempts:5.6版本之前默認為-1, 5.6版本及其以后失晴,默認為0剧腻。 0表示重連的次數(shù)無限,配置大于0可以指定最大重連次數(shù)涂屁。
  • randomize:默認為true书在,表示在URI列表中選擇URI連接時是否采用隨機策略。如果為true的話拆又,有可能生產(chǎn)者連接的是第一個蕊温,而消費者連接的是第二個,造成一個服務(wù)器上只有生產(chǎn)者遏乔,一個服務(wù)器上只有消費者。
  • backup:默認為false发笔,表示是否在連接初始化時將URI列表中的所有地址都初始化連接盟萨,以便快速的失效轉(zhuǎn)移,默認是不開啟了讨。
  • timeout:默認為-1捻激,單位毫秒,是否允許在重連過程中設(shè)置超時時間來中斷的正在阻塞的發(fā)送操作前计。-1表示不允許胞谭,其他表示超時時間。

5.9版本使用levelDB+zookeeper的方式來實現(xiàn)HA了男杈。


參考:
http://haohaoxuexi.iteye.com/blog/1983532
http://manzhizhen.iteye.com/blog/2105572

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末丈屹,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子伶棒,更是在濱河造成了極大的恐慌旺垒,老刑警劉巖,帶你破解...
    沈念sama閱讀 217,084評論 6 503
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件肤无,死亡現(xiàn)場離奇詭異先蒋,居然都是意外死亡,警方通過查閱死者的電腦和手機宛渐,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,623評論 3 392
  • 文/潘曉璐 我一進店門竞漾,熙熙樓的掌柜王于貴愁眉苦臉地迎上來眯搭,“玉大人,你說我怎么就攤上這事业岁×巯桑” “怎么了?”我有些...
    開封第一講書人閱讀 163,450評論 0 353
  • 文/不壞的土叔 我叫張陵叨襟,是天一觀的道長繁扎。 經(jīng)常有香客問我,道長糊闽,這世上最難降的妖魔是什么梳玫? 我笑而不...
    開封第一講書人閱讀 58,322評論 1 293
  • 正文 為了忘掉前任,我火速辦了婚禮右犹,結(jié)果婚禮上提澎,老公的妹妹穿的比我還像新娘。我一直安慰自己念链,他們只是感情好盼忌,可當(dāng)我...
    茶點故事閱讀 67,370評論 6 390
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著掂墓,像睡著了一般谦纱。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上君编,一...
    開封第一講書人閱讀 51,274評論 1 300
  • 那天跨嘉,我揣著相機與錄音,去河邊找鬼吃嘿。 笑死祠乃,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的兑燥。 我是一名探鬼主播亮瓷,決...
    沈念sama閱讀 40,126評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼降瞳!你這毒婦竟也來了嘱支?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 38,980評論 0 275
  • 序言:老撾萬榮一對情侶失蹤挣饥,失蹤者是張志新(化名)和其女友劉穎斗塘,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體亮靴,經(jīng)...
    沈念sama閱讀 45,414評論 1 313
  • 正文 獨居荒郊野嶺守林人離奇死亡馍盟,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,599評論 3 334
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了茧吊。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片贞岭。...
    茶點故事閱讀 39,773評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡八毯,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出瞄桨,到底是詐尸還是另有隱情话速,我是刑警寧澤,帶...
    沈念sama閱讀 35,470評論 5 344
  • 正文 年R本政府宣布芯侥,位于F島的核電站泊交,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏柱查。R本人自食惡果不足惜廓俭,卻給世界環(huán)境...
    茶點故事閱讀 41,080評論 3 327
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望唉工。 院中可真熱鬧研乒,春花似錦、人聲如沸淋硝。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,713評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽谣膳。三九已至竿报,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間继谚,已是汗流浹背仰楚。 一陣腳步聲響...
    開封第一講書人閱讀 32,852評論 1 269
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留犬庇,地道東北人。 一個月前我還...
    沈念sama閱讀 47,865評論 2 370
  • 正文 我出身青樓侨嘀,卻偏偏與公主長得像臭挽,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子咬腕,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,689評論 2 354

推薦閱讀更多精彩內(nèi)容

  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理欢峰,服務(wù)發(fā)現(xiàn),斷路器涨共,智...
    卡卡羅2017閱讀 134,654評論 18 139
  • 本文轉(zhuǎn)載自http://dataunion.org/?p=9307 背景介紹Kafka簡介Kafka是一種分布式的...
    Bottle丶Fish閱讀 5,469評論 0 34
  • 背景介紹 Kafka簡介 Kafka是一種分布式的纽帖,基于發(fā)布/訂閱的消息系統(tǒng)。主要設(shè)計目標(biāo)如下: 以時間復(fù)雜度為O...
    高廣超閱讀 12,831評論 8 167
  • Kafka入門經(jīng)典教程-Kafka-about云開發(fā) http://www.aboutyun.com/threa...
    葡萄喃喃囈語閱讀 10,827評論 4 54
  • 煙鬼的自白 我就愛抽煙举反, 一抽十幾年懊直, 提起那滋味, 飄飄欲仙火鼻。點燃細細的煙室囊, 燃燒在纖細的指間雕崩, 淡定從容, 顯...
    阿超Lilian閱讀 146評論 0 0