當(dāng)我們想通過(guò)多條線程處理activemq中的消息,直覺(jué)上會(huì)使用固定大小線程池去處理贝奇,然而這種方式并不妥當(dāng)肥荔,這么做我們只是將消息從activemq轉(zhuǎn)移到線程池的阻塞隊(duì)列之中加酵,當(dāng)線程池開(kāi)始工作,activemq中的消息快速被消費(fèi)完畢姜盈,而消息所代表的任務(wù)卻并未真正被處理低千, 他們被堆積在處理程序的內(nèi)存中,并陸續(xù)由線程中的線程處理馏颂。這會(huì)產(chǎn)生副作用示血,此時(shí)當(dāng)處理程序因?yàn)槟撤N原因而崩潰,這些待處理的任務(wù)都將丟失救拉。
如何實(shí)現(xiàn)既能通過(guò)多個(gè)線程處理任務(wù)难审,又能保證未完成的任務(wù)的安全性,此時(shí) SynchronousQueue 就有了用武之地亿絮。
我們可以把SynchronousQueue 當(dāng)作長(zhǎng)度為1的阻塞隊(duì)列告喊,當(dāng)隊(duì)列被塞入一個(gè)元素,假如這個(gè)元素未被消費(fèi)掉壹无,那么后續(xù)的塞入操作將被阻塞葱绒。我們可以利用它的這個(gè)特性,把它當(dāng)作是activemq與處理線程之間的緩沖層斗锭。在 SynchronousQueue 的一端地淀,我們從activemq中讀取一個(gè)元素,并將它put進(jìn)SynchronousQueue 岖是。在另一端帮毁,多條線程分別從 SynchronousQueue 中 take 元素進(jìn)行處理实苞,只有當(dāng) SynchronousQueue 中不存在任何元素,也就是線程們將當(dāng)前的任務(wù)都處理完畢烈疚,還有一端的從activemq中提取消息的操作才能執(zhí)行黔牵,反之則將被阻塞。 通過(guò)這種方式爷肝,我們便能保證任務(wù)不丟失的同時(shí)又能通過(guò)多線程處理它們猾浦。示例代碼如下
初始化一個(gè) SynchronousQueue
private SynchronousQueue<ActiveMQObjectMessage> synchronousQueue = new SynchronousQueue<>();
從activemq中將消息轉(zhuǎn)移至synchronousQueue,一次轉(zhuǎn)移一條灯抛,如果上一條未被處理金赦,下一條不能繼續(xù)
ConnectionFactory factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection
.DEFAULT_PASSWORD, brokerUrl);
connection = factory.createConnection();
connection.start();
session = connection.createSession(Boolean.FALSE, Session.CLIENT_ACKNOWLEDGE);
Destination destination = session.createQueue(dest);
MessageConsumer consumer = session.createConsumer(destination);
while (true) {
try {
Message message = consumer.receive();
if (message instanceof ActiveMQObjectMessage) {
ActiveMQObjectMessage activeMQObjectMessage = (ActiveMQObjectMessage) message;
synchronousQueue.put(activeMQObjectMessage);
} else {
if (message != null) {
message.acknowledge();
logger.error("消息格式錯(cuò)誤,msg={}",message.toString());
}
}
} catch (JMSException | InterruptedException e) {
e.printStackTrace();
}
}
開(kāi)啟多條線程同時(shí)處理消息
Runnable task = () -> {
while (true) {
try {
ActiveMQObjectMessage activeMQObjectMessage = synchronousQueue.take();
//消費(fèi)消息对嚼,處理成功后確認(rèn)
boolean complete = handle(msg);
if (complete) {
activeMQObjectMessage.acknowledge();
}
} catch ( JMSException e) {
e.printStackTrace();
}
}
};
for (int i = 0; i < threads; i++) {
Thread thread = new Thread(task);
thread.setName("log-task-" + i);
thread.start();
}