一蔽介、 問題提出
場(chǎng)景問題:服務(wù)器斷電重啟干花,未被消費(fèi)的消息是否會(huì)在重啟之后被繼續(xù)消費(fèi)绪氛?
兩種選擇:非持久性模式/持久性模式
非持久性模式:服務(wù)器斷電(關(guān)閉)之后闻书,使用非持久性模式時(shí)名斟,沒有被消費(fèi)的消息不會(huì)繼續(xù)消費(fèi)全部丟失;
程序會(huì)報(bào)一個(gè)連接關(guān)閉異常停止運(yùn)行魄眉,繼續(xù)啟動(dòng)服務(wù)器運(yùn)行程序砰盐,不會(huì)接收任何消息。
持久性模式:服務(wù)器斷電(關(guān)閉)后坑律,使用持久性模式時(shí)岩梳,沒有被消費(fèi)的消息會(huì)繼續(xù)消費(fèi);
程序也會(huì)報(bào)連接關(guān)閉異常晃择,但再次啟動(dòng)服務(wù)器和程序后冀值,接收方還能繼續(xù)原來的消息再次接收。
二宫屠、 ActiveMQ的幾種存儲(chǔ)模式選擇
- AMQ消息存儲(chǔ) (默認(rèn)的消息存儲(chǔ))
- 基于文件存儲(chǔ)的消息數(shù)據(jù)庫(kù)列疗,不依賴于第三方數(shù)據(jù)庫(kù)
- KahaDB 消息存儲(chǔ)(提供容量的提升和恢復(fù)能力)
- 消息存儲(chǔ)機(jī)制
- JDBC 消息存儲(chǔ)(消息基于JDBC存儲(chǔ))
- 使用Mysql或Oracle數(shù)據(jù)庫(kù)存儲(chǔ)消息
- Memory 消息存儲(chǔ)(基于內(nèi)容的消息存儲(chǔ))
- ActiveMQ支持將消息保存到內(nèi)存中,將Broker的“prsistent” 屬性設(shè)置為“false”浪蹂。
三抵栈、本文主要記錄將ActiveMQ消息持久化到Mysql數(shù)據(jù)庫(kù)
1. 將以下Jar包添加到到%ACTIVEMQ_HOME%\lib下面
- commons-dbcp-1.4.jar
- commons-pool-1.6.jar
- mysql-connector-java-5.1.30.jar
2. 修改ActiveMQ的conf目錄下的activemq.xml文件告材,修改數(shù)據(jù)持久化的方式
在添加如下配置時(shí),需要注意添加位置
<broker>
<persistenceAdapter>
<!--createTablesOnStartup="true"表示在第一次啟動(dòng)時(shí)自動(dòng)建立表結(jié)構(gòu)古劲,再次啟動(dòng)時(shí)斥赋,應(yīng)該改為false-->
<jdbcPersistenceAdapter dataSource="#mysql-ds" createTablesOnStartup="true"/>
</persistenceAdapter>
</broker>
<beans>
<bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
<property name="driverClassName" value="com.mysql.jdbc.Driver"/>
<property name="url" value="jdbc:mysql://localhost/activemq?relaxAutoCommit=true"/>
<property name="username" value="root"/>
<property name="password" value="root"/>
<property name="maxActive" value="200"/>
<property name="poolPreparedStatements" value="true"/>
</bean>
</beans>
3. 啟動(dòng)ActiveMQ,分析數(shù)據(jù)表結(jié)構(gòu)
在啟動(dòng)ActiveMQ之后产艾,會(huì)在數(shù)據(jù)庫(kù)<
activemq
>中建立三張表activemq_acks灿渴、activemq_lock、activemq_msgs
- activemq_acks:用于存儲(chǔ)訂閱關(guān)系胰舆。如果是持久化Topic,訂閱者和服務(wù)器的訂閱關(guān)系在這個(gè)表保存蹬挤。
- CONTAINER:消息的目的地
- SUB_DEST:如果是使用static集群缚窿,這個(gè)字段會(huì)有集群其他系統(tǒng)的信息
- CLIENT_ID:每個(gè)訂閱者客戶端ID
- SUB_NAME:訂閱者名稱
- SELECTOR:選擇器,可以選擇只消費(fèi)滿足條件的消息焰扳。條件可以用自定義屬性實(shí)現(xiàn)倦零,可支持多屬性and和or操作
- LAST_ACKED_ID:記錄消費(fèi)過的消息的id。
- PRIORITY:優(yōu)先級(jí)
- XID:
- activemq_lock:用于記錄哪一個(gè)Broker是Master Broker吨悍。這張表只有在集群環(huán)境中才會(huì)用到扫茅,在集群中,只能有一個(gè)Broker來接收消息育瓜,那么這個(gè)Broker就是主Broker葫隙,其他的作為從Broker,用來備份等待躏仇。
- ID:主鍵
- TIME:時(shí)間
- BROKER_NAME:Broker名稱
- activemq_msgs:用于存儲(chǔ)消息恋脚,Topic和Queue消息都會(huì)保存在這張表中
- ID:自增主鍵
- CONTAINER:容器名稱
- MSGID_PROD:消息發(fā)送者客戶端的主鍵
- MSGID_SEQ:發(fā)送消息的順序,msgid_prod+msg_seq可以組成jms的messageid
- EXPIRATION:消息的過期時(shí)間焰手,存儲(chǔ)的是從1970-01-01到現(xiàn)在的毫秒數(shù)
- MSG:消息本體的java序列化對(duì)象的二進(jìn)制數(shù)據(jù)
- PRIORITY:優(yōu)先級(jí)糟描,從0-9,數(shù)值越大優(yōu)先級(jí)越高