分別運(yùn)行訂閱模式和P2P模式,可以發(fā)現(xiàn)陪踩,P2P模式缺省把消息進(jìn)行持久化杖们,而topic模式是沒(méi)有的。
一般topic模式實(shí)驗(yàn):
1肩狂、 啟動(dòng)兩個(gè)消費(fèi)者摘完,啟動(dòng)一個(gè)生產(chǎn)者,發(fā)送消息傻谁,兩個(gè)消費(fèi)者都可以收到孝治。
2、 關(guān)閉一個(gè)消費(fèi)者审磁,生產(chǎn)者發(fā)送消息谈飒,活躍的消費(fèi)者可以收到消息,啟動(dòng)被關(guān)閉的消費(fèi)者态蒂,無(wú)法收到消息杭措。
3、 關(guān)閉所有消費(fèi)者吃媒,生產(chǎn)者發(fā)送消息瓤介,在ActiveMQ控制臺(tái)可以看見(jiàn)消息已被接收,關(guān)閉再啟動(dòng)ActiveMQ赘那,啟動(dòng)消費(fèi)者收不到消息刑桑。
如果topic模式下,需要消費(fèi)者在離線又上線后募舟,不管ActiveMQ是否重啟過(guò)祠斧,都保證可以接受到消息,就需要進(jìn)行持久化訂閱拱礁。具體代碼參見(jiàn)模塊no-spirng包durabletopic琢锋。
持久Topic消費(fèi)者端
需要設(shè)置客戶端id:connection.setClientID("Mark");
消息的destination變?yōu)?Topic
消費(fèi)者類型變?yōu)門opicSubscriber
消費(fèi)者創(chuàng)建時(shí)變?yōu)閟ession.createDurableSubscriber(destination,"任意名字,代表訂閱名 ");
運(yùn)行一次消費(fèi)者呢灶,將消費(fèi)者在ActiveMQ上進(jìn)行一次注冊(cè)吴超。然后在ActiveMQ的管理控制臺(tái)subscribers頁(yè)面可以看見(jiàn)我們的消費(fèi)者。
效果:
1鸯乃、 運(yùn)行生產(chǎn)者鲸阻,發(fā)布消息,多個(gè)消費(fèi)者可以正常收到。
2鸟悴、 關(guān)閉一個(gè)消費(fèi)者陈辱,運(yùn)行生產(chǎn)者,發(fā)布消息后再啟動(dòng)被關(guān)閉的消費(fèi)者细诸,可以收到離線后的消息沛贪;
3、 關(guān)閉所有消費(fèi)者震贵,運(yùn)行生產(chǎn)者利赋,發(fā)布消息后,關(guān)閉ActiveMQ再啟動(dòng)屏歹,啟動(dòng)所有消費(fèi)者隐砸,都可以收到消息。
注意:生產(chǎn)者端無(wú)需另外單獨(dú)配置
消息非持久化
修改messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);來(lái)設(shè)置消息本身的持久化屬性為非持久化蝙眶。重復(fù)上述實(shí)驗(yàn)季希,可以發(fā)現(xiàn),第1,2點(diǎn)保持不變幽纷,但是第三點(diǎn)式塌,當(dāng)關(guān)閉ActiveMQ再啟動(dòng),消費(fèi)者關(guān)閉后再啟動(dòng)友浸,是收不到消息的峰尝。
說(shuō)明,即使進(jìn)行了持久訂閱收恢,但是消息本身如果是不持久化的武学,ActiveMQ關(guān)閉再啟動(dòng),這些非持久化的消息會(huì)丟失伦意,進(jìn)行持久訂閱的消費(fèi)者也是收不到自身離線期間的消息的火窒。
代碼
public class JmsDurableTopicConsumerOther {
private static final String USERNAME
= ActiveMQConnection.DEFAULT_USER;//默認(rèn)連接用戶名
private static final String PASSWORD
= ActiveMQConnection.DEFAULT_PASSWORD;//默認(rèn)連接密碼
private static final String BROKEURL
= ActiveMQConnection.DEFAULT_BROKER_URL;//默認(rèn)連接地址
public static void main(String[] args) {
ConnectionFactory connectionFactory;//連接工廠
Connection connection = null;//連接
Session session;//會(huì)話 接受或者發(fā)送消息的線程
TopicSubscriber messageConsumer;//消息的消費(fèi)者
//實(shí)例化連接工廠
connectionFactory = new ActiveMQConnectionFactory(JmsDurableTopicConsumerOther.USERNAME,
JmsDurableTopicConsumerOther.PASSWORD, JmsDurableTopicConsumerOther.BROKEURL);
try {
//通過(guò)連接工廠獲取連接
connection = connectionFactory.createConnection();
connection.setClientID("Lison");
//啟動(dòng)連接
connection.start();
//創(chuàng)建session
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic destination = session.createTopic("DurableTopic2");
//創(chuàng)建消息消費(fèi)者
messageConsumer = session.createDurableSubscriber(destination,"xiangxue");
messageConsumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
try {
System.out.println("Accept msg : "
+((TextMessage)message).getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
} catch (JMSException e) {
e.printStackTrace();
}
}
}