- 關(guān)于異步發(fā)送消息:
ActiveMQ官方說(shuō)異步發(fā)送是很多模式下默認(rèn)的傳輸方式菌赖,但是在發(fā)送非事物持久化消息的時(shí)候默認(rèn)使用的是同步發(fā)送模式盲厌。
同步發(fā)送時(shí),Producer.send() 方法會(huì)被阻塞,直到 broker 發(fā)送一個(gè)確認(rèn)消息給生產(chǎn)者,這個(gè)確認(rèn)消息暗示生產(chǎn)者 broker 已經(jīng)成功地將它發(fā)送的消息路由到目標(biāo)目的并把消息保存到二級(jí)存儲(chǔ)中镣衡。
同步發(fā)送持久消息能夠提供更好的可靠性,但這潛在地影響了程序的相應(yīng)速度档悠,因?yàn)樵诮邮艿?broker 的確認(rèn)消息之前應(yīng)用程序或線程會(huì)被阻塞捆探。如果應(yīng)用程序能夠容忍一些消息的丟失,那么可以使用異步發(fā)送站粟。異步發(fā)送不會(huì)在受到 broker 的確認(rèn)之前一直阻塞 Producer.send 方法。
解決辦法:
1)添加事務(wù)
2)設(shè)置異步參數(shù)
[1].設(shè)置ConnectionFactory時(shí)指定使用異步
cf = new ActiveMQConnectionFactory("tcp://locahost:61616?jms.useAsyncSend=true");
[2].不在構(gòu)造函數(shù)中指定曾雕,而是修改ConnectionFactory的配置
((ActiveMQConnectionFactory)connectionFactory).setUseAsyncSend(true);
[3].在實(shí)例化后的ActiveMQConnection對(duì)象中設(shè)置異步發(fā)送
((ActiveMQConnection)connection).setUseAsyncSend(true)
異步發(fā)送的弊端
1)異步發(fā)送丟失消息的場(chǎng)景是:生產(chǎn)者設(shè)置UseAsyncSend=true奴烙,使用producer.send(msg)持續(xù)發(fā)送消息。由于消息不阻塞,生產(chǎn)者會(huì)認(rèn)為所有send的消息均被成功發(fā)送至MQ切诀。如果服務(wù)端突然宕機(jī)揩环,此時(shí)生產(chǎn)者端內(nèi)存中尚未被發(fā)送至MQ的消息都會(huì)丟失。
此時(shí)幅虑,需要在發(fā)送時(shí)設(shè)置回調(diào)函數(shù)丰滑,從而知道消息是否發(fā)送成功:
public void sendMessage(ActiveMQMessage msg, final String msgid) throws JMSException {
producer.send(msg, new AsyncCallback() {
@Override
public void onSuccess() {
// 使用msgid標(biāo)識(shí)來(lái)進(jìn)行消息發(fā)送成功的處理
System.out.println(msgid+" has been successfully sent.");
}
@Override
public void onException(JMSException exception) {
// 使用msgid表示進(jìn)行消息發(fā)送失敗的處理
System.out.println(msgid+" fail to send to mq.");
exception.printStackTrace();
}
});
}
-
延遲投遞/定時(shí)投遞
1)activemq開啟延遲和定時(shí)機(jī)制:修改activemq.xml中的配置項(xiàng),在<broker>標(biāo)簽中添加屬性schedulerSupport=true;
2)關(guān)于延遲和定時(shí)的消息屬性如下:
time.png
具體設(shè)置如下:
Message message = session.createTextMessage("sss");
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY,3000);
//設(shè)置延遲3s發(fā)送
-
消息重試機(jī)制:
try.png
默認(rèn):消息被重復(fù)發(fā)送到消費(fèi)者6次后倒庵,會(huì)將該消息放置死信隊(duì)列中(DLQ)褒墨;
- DLQ死信隊(duì)列:主要是用來(lái)處理失敗的消息,多數(shù)為人工干預(yù)處理擎宝;
- 避免消息重復(fù)消費(fèi):
解決方法:增加消息狀態(tài)表郁妈。
通俗來(lái)說(shuō)就是一個(gè)賬本,用來(lái)記錄消息的處理狀態(tài)绍申,每次處理消息之前噩咪,都去狀態(tài)表中查詢一次,如果已經(jīng)有相同的消息存在极阅,那么不處理胃碾,可以防止重復(fù)發(fā)送。 - 消息丟失處理方式:
解決方案:用持久化消息【可以使用對(duì)數(shù)據(jù)進(jìn)行持久化JDBC筋搏,AMQ(日志文件)仆百,KahaDB和LevelDB】,或者非持久化消息及時(shí)處理不要堆積拆又,或者啟動(dòng)事務(wù)儒旬,啟動(dòng)事務(wù)后,commit()方法會(huì)負(fù)責(zé)任的等待服務(wù)器的返回帖族,也就不會(huì)關(guān)閉連接導(dǎo)致消息丟失了栈源。