人生的磨難是很多的黔州,所以我們不可對于每一件輕微的傷害都過于敏感辅搬。在生活磨難面前,精神上的堅強和無動于衷是我們抵抗罪惡和人生意外的最好武器索赏。 —— 洛克
1. 獨占消費者(Exclusive Consumer)
??Queue中的消息是按照順序被分發(fā)到consumers的侵贵。然而届搁,當你有多個consumers同時從相同的queue中提取消息時,你將失去這個保證窍育。因為這些消息是被多個線程并發(fā)的處理卡睦。有的時候,保證消息按照順序處理是很重要的漱抓。
??ActiveMQ提供了Exclusive Consumer機制表锻,Broker從多個Consumer中挑選一個Consumer來處理Queue中的所有消息。若是在此過程辽旋,這個Consumer掛掉浩嫌,則會自動切換到其他的Consumer繼續(xù)處理。
??可以通過Destination Options 來創(chuàng)建一個Exclusive Consumer补胚。如下
queue = new ActiveMQQueue("ORDER.PRICE?consumer.exclusive=true");
consumer = session.createConsumer(queue);
2. 消息分組(Message Groups)
??Message Groups可以看成是并發(fā)的Exclusive Consumer码耐,Message Groups機制中是使用JMS消息屬性的JMSXGroupID來區(qū)分的。Message Groups保證所有具有相同的JMSXGroupID的消息會被分發(fā)到相同的Consumer溶其。該特性也是一種負載均衡的機制骚腥。
??在消息被分發(fā)到Consumer前,Broker首先會檢查消息JMSXGroupID屬性瓶逃。若存在束铭,那么會檢查是否有具有該Message Group的Consumer。若沒有厢绝,Broker則會選擇一個Consumer契沫,將該關聯到Message Group上,此后該Consumer會接收該Message Group的所有消息昔汉。直到:
??(1) Consumer被關閉懈万。
??(2) Message group被關閉。發(fā)送一個消息,并設置該消息的JMSXGroupSeq為0会通。
??示例如下:
// 發(fā)送者:
Destination destination = session.createQueue("Order.Group.Queue");
MessageProducer messageProducer = session.createProducer(destination);
for(int i = 1; i <= 3; i++){
TextMessage textMessage = session.createTextMessage("Message From OrderGroup-A:" + i);
textMessage.setStringProperty("JMSXGroupID", "OrderGroup-A");
messageProducer.send(textMessage);
}
// 消費者
Destination destination = session.createQueue(");
MessageConsumer messageConsumer = session.createConsumer("Order.Group.Queue");
messageConsumer.setMessageListener(new MessageListener(){@Override
public void onMessage(Message message) {
try {
System.out.println("收到的消息:" + ((TextMessage) message).getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
// 執(zhí)行結果
INFO | Successfully connected to tcp://localhost:61616
收到的消息:Message From OrderGroup-A:1
收到的消息:Message From OrderGroup-B:1
收到的消息:Message From OrderGroup-A:2
收到的消息:Message From OrderGroup-B:2
收到的消息:Message From OrderGroup-A:3
收到的消息:Message From OrderGroup-B:3
??關閉一個Message Groups口予,只需要在message對象上設置屬性即可,如下:
message.setStringProperty("JMSXGroupID","OrderGroup-A");
message.setIntProperty("JMSXGroupSeq", -1);
3. 消息選擇器(JMS Selector)
??JMS Selectors用于在訂閱中涕侈,基于消息屬性對進行消息的過濾沪停。JMS Selectors由SQL92語法定義。如下面示例:
consumer = session.createConsumer(destination, "JMSType = 'car' AND weight > 2500");
??在JMS Selectors表達式中裳涛,可以使用IN木张、NOT IN、LIKE等调违,例如:
??(1)LIKE '12%3' ('123' true窟哺,'12993' true,'1234' false)
??(2)LIKE 'l_se' ('lose' true技肩,'loose' false)
??(3)LIKE '_%' ESCAPE '' ('_foo' true,'foo' false)
??需要注意的是浮声,JMS Selectors表達式中的日期和時間需要使用標準的long型毫秒值虚婿。另外表達式中的屬性不會自動進行類型轉換,例如:
message.setStringProperty("NumberOfOrders", "2"); //"NumberOfOrders > 1" 求值結果是false泳挥。
??Message Groups雖然可以保證具有相同message group的消息被唯一的consumer順序處理然痊,但是卻不能確定被哪個consumer處理。在某些情況下屉符,Message Groups可以和JMS Selector一起工作,如:
??有三個consumers分別是A剧浸、B和C。你可以在producer中為消息設置三個message groups分別是“A”矗钟、“B”和“C”唆香。然后令consumer A使用“JMXGroupID = ‘A’”作為selector。B和C也同理吨艇。這樣就可以保證message group A的消息只被consumer A處理躬它。需要注意的是,這種做法有以下缺點:
??(1) producer必須知道當前正在運行的consumers东涡。
??(2) 如果某個consumer失效冯吓,那么應該被這個consumer消費的消息將會一直被積壓在broker上。