JMS如何保證消息的可靠性堡赔?
JMS是通過持久化(Persistent)识脆、事務(wù)(Transaction)、 簽收來(acknowledge)保證的
1善已、持久化
首先消息的持久化是在消息的生產(chǎn)者中設(shè)置的灼捂,由消息的生產(chǎn)者告訴消息中間件是否進(jìn)行消息的持久化,如果生產(chǎn)者端沒有設(shè)置换团,默認(rèn)是持久化的
通過session創(chuàng)建出來的生產(chǎn)者生產(chǎn)的Queue消息為持久性
- 持久化:當(dāng)服務(wù)器宕機(jī)悉稠,消息依然存在;當(dāng)服務(wù)器再次重啟艘包,消息能夠被消費(fèi)者繼續(xù)消費(fèi)的猛。
- 非持久化:當(dāng)服務(wù)器宕機(jī),消息不會被保留辑甜。
設(shè)置通過session創(chuàng)建出來的生產(chǎn)者生產(chǎn)的Queue消息為持久性
messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);//非持久化 messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);//持久化
生產(chǎn)者代碼實(shí)現(xiàn):
public class JmsProducer {
public static final String ACTIVEMQ_URL="tcp://192.168.137.199:61616";
public static final String QUEUE_ANME="queue01";
public static void main(String[] args) throws JMSException {
//1.創(chuàng)建連接工廠
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("admin","admin",ACTIVEMQ_URL);
//2.通過連接工廠衰絮,獲得連接connection并啟動訪問
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
//3.創(chuàng)建會話session
// 兩個(gè)參數(shù):事務(wù)/簽收
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//4.創(chuàng)建目的地(隊(duì)列或主題topic)
// Destination destination = session.createQueue(QUEUE_ANME);
Queue queue = session.createQueue(QUEUE_ANME);
//5.創(chuàng)建消息的生產(chǎn)者
MessageProducer messageProducer = session.createProducer(queue);
// messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);//非持久化
// messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);//持久化
//6 通過使用messageProducer生產(chǎn)3條消息發(fā)送到MQ的隊(duì)列里面
for (int i = 1; i <=3; i++) {
//7創(chuàng)建消息
// TextMessage textMessage = session.createTextMessage("msg--" + i);
TextMessage textMessage = session.createTextMessage("textMessage msg--" + i);
//8.通過messageProducer發(fā)送給mq
messageProducer.send(textMessage);
}
//9.關(guān)閉資源
messageProducer.close();
session.close();
connection.close();
System.out.println("MQ消息發(fā)布成功");
}
}
消費(fèi)者代碼實(shí)現(xiàn):
public class JmsConsumer {
public static final String ACTIVEMQ_URL="tcp://192.168.137.199:61616";
public static final String QUEUE_ANME="queue01";
public static void main(String[] args) throws JMSException, IOException {
System.out.println("*****我是2號消費(fèi)者");
//1.創(chuàng)建連接工廠
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("admin","admin",ACTIVEMQ_URL);
//2.通過連接工廠,獲得連接connection并啟動訪問
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
//3.創(chuàng)建會話session
// 兩個(gè)參數(shù):事務(wù)/簽收
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//4.創(chuàng)建目的地(隊(duì)列或主題topic)
// Destination destination = session.createQueue(QUEUE_ANME);
Queue queue = session.createQueue(QUEUE_ANME);
//5.創(chuàng)建消費(fèi)者
MessageConsumer messageConsumer = session.createConsumer(queue);
while (true){
TextMessage message = (TextMessage) messageConsumer.receive(20000L);
if(null != message){
System.out.println("TX消費(fèi)者接收到消息:"+message.getText());
// message.acknowledge();
}else {
break;
}
}
// System.in.read();//保證控制臺不關(guān)閉
messageConsumer.close();
// session.commit();
session.close();
connection.close();
}
}
持久化消息是隊(duì)列的默認(rèn)傳遞模式磷醋,此模式保證這些消息只被傳送一次和成功使用一次猫牡。對于這些消息,可靠性是優(yōu)先考慮的因素邓线。
可靠性的另一個(gè)重要方面是確保持久性消息傳送至目標(biāo)后淌友,消息服務(wù)在向消費(fèi)者傳送它們之前不會丟失這些消息。
2骇陈、事務(wù)
消息生產(chǎn)者偏重于事務(wù)震庭,開啟事務(wù)后生產(chǎn)者生產(chǎn)的消息,只有在session執(zhí)行commit后才會被提交到服務(wù)器你雌,不然此次提交記錄無效器联。事務(wù)開啟的意義在于,如果對于多條必須同批次傳輸?shù)南⑿稣福梢允褂檬聞?wù)拨拓,如果一條傳輸失敗,可以將事務(wù)回滾氓栈,再次傳輸渣磷,保證數(shù)據(jù)的完整性。
如果開啟事務(wù)授瘦,事務(wù)優(yōu)先級>簽收醋界;
- 在事務(wù)性會話中竟宋,當(dāng)一個(gè)事務(wù)被成功提交則消息被自動簽收。
- 如果事務(wù)回滾形纺,則消息會被再次傳送
- 對于消費(fèi)者而言丘侠,如果開啟事務(wù),createSession設(shè)置為 ture挡篓,消費(fèi)消息需要commit,否則會出現(xiàn)多次消費(fèi)的情況
//這邊的第一個(gè)參數(shù)true表示事務(wù)開啟
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
//事務(wù)如果是true: 需要commit
session.commit();
3婉陷、簽收
消費(fèi)者偏重于簽收
簽收主要針對于消費(fèi)者,而且一般在非事務(wù)的情況下生效官研,因?yàn)殚_啟事務(wù)后秽澳,簽收與否取決于事務(wù)的提交或回滾,與簽收設(shè)置的方式無關(guān)戏羽。
簽收方式有4種担神,但平時(shí)常用的有兩種:
- 自動簽收
Session.AUTO_ACKNOWLEDGE;//自動簽收
- 手動簽收
Session.CLIENT_ACKNOWLEDGE; //手動簽收
message.acknowledge(); // 手動簽收需要ack