長恨人生不如水隆圆,等閑平地起波瀾抹凳。 —— 劉禹錫
1. Wildcards(通配符)
??Wildcars用來支持名字分層體系漂彤,它不是JMS規(guī)范的一部分鸯匹,是ActiveMQ的擴展坊饶。
??ActiveMQ支持以下三種通配符:
- ".":用于作為路徑上名字間的分隔符
- ">":用于遞歸的匹配任何以這個名字開始的Destination(目的地)
- "*":用于作為路徑上任何名字。
??舉例來說殴蓬,如有以下兩個Destination:
??PRICE.COMPUTER.JD.APPLE(蘋果電腦在京東上的價格)
??PRICE.COMPUTER.TMALL.APPLE(蘋果電腦在天貓上的價格)
??1. PRICE.> :匹配任何產(chǎn)品的價格變動
??2. PRICE.COMPUTER.> :匹配任何電腦產(chǎn)品的價格變動
??3. PRICE.COMPUTER.JD.*:匹配任何在京東上的電腦的價格變動
??4. PRICE.COMPUTER.*.APPLE:匹配蘋果電腦京東或天貓上的價格變動
??JAVA代碼示例:
??生產(chǎn)者:
// 實例化連接工廠
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, "failover:(tcp://localhost:61616,tcp://localhost:61617)?randomize=false");
// 通過連接工廠獲取連接
Connection connection = connectionFactory.createConnection();
// 啟動連接
connection.start();
// 創(chuàng)建session
Session session = connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE);
// 創(chuàng)建隊列
Destination destination = session.createQueue("PRICE.COMPUTER.JD.APPLE");
// 創(chuàng)建生產(chǎn)者
MessageProducer messageProducer = session.createProducer(destination);
for (int i = 1; i <= 10; i++) {
TextMessage textMessage = session.createTextMessage(message);
messageProducer.send("Mac Air價格:" + i * 1000);
System.out.println("發(fā)送消息 - " + textMessage.getText());
}
session.commit();
??消費者:
// 實例化連接工廠
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, "failover:(tcp://localhost:61616,tcp://localhost:61617)?randomize=false");
// 通過連接工廠獲取連接
Connection connection = connectionFactory.createConnection();
// 啟動連接
connection.start();
// 創(chuàng)建session
Session session = connection.createSession(Boolean.FALSE,Session.AUTO_ACKNOWLEDGE);
// 創(chuàng)建隊列
Destination destination = session.createQueue("PRICE.COMPUTER.>");
// 創(chuàng)建消費者
MessageConsumer messageConsumer = session.createConsumer(destination);
messageConsumer.setMessageListener(new MessageListener(){
@Override
public void onMessage(Message message) {
try {
System.out.println("收到的消息:" + ((TextMessage) message).getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
??生產(chǎn)者發(fā)送消息日志:
發(fā)送消息:Mac Air價格:1000
發(fā)送消息:Mac Air價格:2000
發(fā)送消息:Mac Air價格:3000
發(fā)送消息:Mac Air價格:4000
發(fā)送消息:Mac Air價格:5000
發(fā)送消息:Mac Air價格:6000
發(fā)送消息:Mac Air價格:7000
發(fā)送消息:Mac Air價格:8000
發(fā)送消息:Mac Air價格:9000
發(fā)送消息:Mac Air價格:10000
??消費者接收到的消息日志:
收到的消息:Mac Air價格:1000
收到的消息:Mac Air價格:2000
收到的消息:Mac Air價格:3000
收到的消息:Mac Air價格:4000
收到的消息:Mac Air價格:5000
收到的消息:Mac Air價格:6000
收到的消息:Mac Air價格:7000
收到的消息:Mac Air價格:8000
收到的消息:Mac Air價格:9000
收到的消息:Mac Air價格:10000
??從消費者的接收日志來看匿级,當消費者使用通配符隊列時,是能正常接收消息的染厅。
通配符中是為消費者服務(wù)的痘绎。即:通配符只能配置在消費端。
2. 組合列隊(Composite Destinations)
??組合列隊即通過一個虛擬的Destination代表多個Destination肖粮,這樣就可以通過該虛擬的Destination同時向多個Destination發(fā)送消息孤页。
??實現(xiàn)虛擬Destination可通過以下方式實現(xiàn):
??(1) 客戶端方式
??生產(chǎn)者:
Destination destination = session.createQueue("PRICE.COMPUTER.JD.APPLE,PRICE.COMPUTER.TMALL.APPLE");
// 多個Destination使用,分隔
??消費者:
Destination destination = session.createQueue("PRICE.COMPUTER.JD.APPLE");
// 或者
Destination destination = session.createQueue("PRICE.COMPUTER.TMALL.APPLE");
// 以上兩個消費者都能接收到生產(chǎn)者的消息
若使用不同類型的destination,那么需要加上前綴如queue:// 或topic://涩馆。如:Destination destination = session.createQueue("PRICE.COMPUTER.JD.APPLE,topic://PRICE.COMPUTER.TMALL.APPLE");
??(2) 配置方式
??在activemq.xml的broker中添加以下配置:
<destinationInterceptors>
<virtualDestinationInterceptor>
<virtualDestinations>
<compositeQueue name="PRICE.COMPUTER">
<forwardTo>
<queue physicalName="PRICE.COMPUTER.JD.APPLE"/>
<queue physicalName="PRICE.COMPUTER.TMALL.APPLE" />
</forwardTo>
</compositeQueue>
</virtualDestinations>
</virtualDestinationInterceptor>
</destinationInterceptors>
??然后在生產(chǎn)者中行施,直接向隊列PRICE.COMPUTER中發(fā)送消息。那么監(jiān)聽PRICE.COMPUTER.JD.APPLE或PRICE.COMPUTER.TMALL.APPLE的消費者都能同時接收到生產(chǎn)者發(fā)送的消息魂那。
注意:修改完配置文件后蛾号,須重啟Broker服務(wù),不然配置是不會生效的涯雅!
3. 配置啟動隊列
??若需要在ActiveMQ啟動的時候鲜结,創(chuàng)建Destination的話,可以如下配置conf/activemq.xml的broker下:
<destinations>
<queue physicalName="PRICE.COMPUTER.JD.APPLE" />
<topic physicalName="PRICE.COMPUTER.TMALL.APPLE" />
</destinations>
4. 隊列選項
??隊列選項是給consumer在JMS規(guī)范之外添加的功能特性活逆,通過在隊列名稱后面使用類似URL的語法添加多個選項精刷。包括:
?? 1:consumer.prefetchSize,consumer持有的未確認最大消息數(shù)量划乖,默認值 variable贬养。
?? 2:consumer.maximumPendingMessageLimit:用來控制非持久化的topic在存在慢消費者的情況下,丟棄的數(shù)量,默認0宋欺。
??3:consumer.noLocal :默認false。
?? 4:consumer.dispatchAsync :是否異步分發(fā) 蔚晨,默認true儿礼。
??5:consumer.retroactive:是否為回溯消費者 咖杂,默認false。
?? 6:consumer.selector:Jms的Selector蚊夫,默認null诉字。
?? 7:consumer.exclusive:是否為獨占消費者 ,默認false知纷。
?? 8:consumer.priority:設(shè)置消費者的優(yōu)先級壤圃,默認0。
??示例如下:
queue = new ActiveMQQueue("PRICE.COMPUTER.TMALL.APPLE?consumer.dispatchAsync=true&consumer.prefetchSize=20");
consumer = session.createConsumer(queue);
5. 虛擬Destination
??ActiveMQ支持虛擬Destination有以下兩種方式:
??(1) 虛擬主題
??(2) 組合Destinations(前面已介紹過)
5.1 為何使用虛擬主題
??ActiveMQ中琅轧,topic只有在持久訂閱下才是持久化的伍绳。持久訂閱時,每個持久訂閱者乍桂,都相當于一個queue的客戶端冲杀,它會收取所有消息。這種情況下存在兩個問題:
??(1) 同一應(yīng)用內(nèi)consumer端負載均衡的問題:即同一個應(yīng)用上的一個持久訂閱不能使用多個consumer來共同承擔消息處理功能睹酌。因為每個consumer都會獲取所有消息权谁。queue模式可以解決這個問題,但broker端又不能將消息發(fā)送到多個應(yīng)用端憋沿。
??(2) 由于只能使用單個的持久訂閱者旺芽,如果這個訂閱者出錯,則應(yīng)用就無法處理消息了卤妒,系統(tǒng)的健壯性不高甥绿。
??在ActiveMQ,可以通過虛擬主題來解決以上兩個問題则披。
5.2 虛擬主題的使用
??對于消息發(fā)布者來說共缕,就是一個正常的Topic,名稱以VirtualTopic.開頭士复。例如VirtualTopic.Mobile图谷。示例:
Topic destination = session.createTopic("VirtualTopic.Mobille");
??對于消息接收端來說,是個隊列阱洪,不同應(yīng)用里使用不同的前綴作為隊列的名稱便贵,即可表明自己的身份即可實現(xiàn)消費端應(yīng)用分組。如Consumer.A.VirtualTopic.Mobille冗荸,說明它是名稱為A的消費端承璃,同理Consumer.B.VirtualTopic.Mobille說明是一個名稱為B的客戶端“霰荆可以在同一個應(yīng)用里使用多個consumer消費此Topic盔粹,則可以實現(xiàn)上面兩個功能隘梨。又因為不同應(yīng)用使用的queue名稱不同(前綴不同),所以不同的應(yīng)用中都可以接收到全部的消息舷嗡。代碼示例如下:
??應(yīng)用A的消費者:
Destination destination = session.createQueue("Consumer.A.VirtualTopic.Mobile");
應(yīng)用A的消費者啟動兩個及以上轴猎,下面稱為A1、A2.
??應(yīng)用B的消費者:
Destination destination = session.createQueue("Consumer.B.VirtualTopic.Mobile");
??應(yīng)用A的消費者和應(yīng)用B的消費者依次啟動訂閱进萄,發(fā)布者發(fā)送消息捻脖。
??應(yīng)用A消費者A1消費消息日志:
收到的消息:ActiveMQ 發(fā)送的消息:10
收到的消息:ActiveMQ 發(fā)送的消息:12
收到的消息:ActiveMQ 發(fā)送的消息:14
收到的消息:ActiveMQ 發(fā)送的消息:16
收到的消息:ActiveMQ 發(fā)送的消息:18
??應(yīng)用A消費者A2消費消息日志:
收到的消息:ActiveMQ 發(fā)送的消息:11
收到的消息:ActiveMQ 發(fā)送的消息:13
收到的消息:ActiveMQ 發(fā)送的消息:15
收到的消息:ActiveMQ 發(fā)送的消息:17
收到的消息:ActiveMQ 發(fā)送的消息:19
??應(yīng)用B消費者消息日志:
收到的消息:ActiveMQ 發(fā)送的消息:10
收到的消息:ActiveMQ 發(fā)送的消息:11
收到的消息:ActiveMQ 發(fā)送的消息:12
收到的消息:ActiveMQ 發(fā)送的消息:13
收到的消息:ActiveMQ 發(fā)送的消息:14
收到的消息:ActiveMQ 發(fā)送的消息:15
收到的消息:ActiveMQ 發(fā)送的消息:16
收到的消息:ActiveMQ 發(fā)送的消息:17
收到的消息:ActiveMQ 發(fā)送的消息:18
收到的消息:ActiveMQ 發(fā)送的消息:19
??從以上消費者日志可以看出,應(yīng)用A的A1中鼠、A2兩個消費者共同消費了10條消息可婶,而應(yīng)用B的消費者也同樣消費了10條消息,但應(yīng)用B只要一個消費者援雇。
注意:若是activemq.xml中配置了destinationInterceptors扰肌,則需要將destinationInterceptors刪除,然后重啟Broker服務(wù)熊杨。
??虛擬主題的前綴默認是VirtualTopic,若是需要修改盗舰,可通過以下方式修改:
<destinationInterceptors>
<virtualDestinationInterceptor>
<virtualDestinations>
<virtualTopic name=">" prefix="VirtualTopicConsumers.*." selectorAware="false"/>
</virtualDestinations>
</virtualDestinationInterceptor>
</destinationInterceptors>