內(nèi)容來自:RabbitMQ Tutorials Java版
Topics
在上一個(gè)教程中我們改進(jìn)了我們的日志系統(tǒng):使用direct
路由器替代了fanout
路由器渤早,從而可以選擇性地接收日志。
盡管使用direct路由器給我們的日志系統(tǒng)帶了了改進(jìn)演痒,但仍然有一些限制:不能基于多種標(biāo)準(zhǔn)進(jìn)行路由艺普。
在我們的日志系統(tǒng)中届氢,我們可能不僅需要根據(jù)日志的嚴(yán)重級別來接收日志,而且有時(shí)想基于日志來源進(jìn)行路由坡锡。如果你知道syslog這個(gè)Unix工具猴蹂,你可能了解這個(gè)概念勃黍,sysylog
會(huì)基于日志嚴(yán)重級別(info/warn/crit...
)和設(shè)備(auth/cron/kern...
)進(jìn)行日志分發(fā)。
如果我們可以監(jiān)聽來自corn
的錯(cuò)誤日志晕讲,同時(shí)也監(jiān)聽kern
的所有日志覆获,那么我們的日志系統(tǒng)就會(huì)更加靈活。
為了實(shí)現(xiàn)這個(gè)功能瓢省,我們需要了解一個(gè)復(fù)雜的路由器:topic
路由器弄息。
主題路由器(Topic Exchange)
發(fā)送到topic
路由器的消息的路由鍵routing_key
不能任意給定:它必須是一些單詞的集合,中間用點(diǎn)號.
分割勤婚。這些單詞可以是任意的摹量,但通常會(huì)體現(xiàn)出消息的特征。一些有效的路由鍵示例:stock.usd.nyse
馒胆,nyse.vmw
缨称,quick.orange.rabbit
。這些路由鍵可以包含很多單詞祝迂,但路由鍵總長度不能超過255個(gè)字節(jié)睦尽。
綁定鍵binding key
也必須是這種形式。topic
路由器背后的邏輯與direct
路由器類似:以特定路由鍵發(fā)送的消息將會(huì)發(fā)送到所有綁定鍵與之匹配的隊(duì)列中型雳。但綁定鍵有兩種特殊的情況:
①*(星號)僅代表一個(gè)單詞
②#(井號)代表任意個(gè)單詞
下圖可以很好地解釋這兩個(gè)符號的含義:
對于上圖的例子当凡,我們將會(huì)發(fā)送描述動(dòng)物的消息。這些消息將會(huì)以由三個(gè)單詞組成的路由鍵發(fā)送纠俭。路由鍵中的第一個(gè)單詞描述了速度沿量,第二個(gè)描述了顏色,第三個(gè)描述了物種:<speed>.<colour>.<species>
冤荆。
我們創(chuàng)建了三個(gè)綁定朴则,Q1的綁定鍵為*.orange.*
,Q2的綁定鍵有兩個(gè)钓简,分別是*.*.rabbit
和lazy.#
乌妒。
上述綁定關(guān)系可以描述為:
①Q(mào)1關(guān)注所有顏色為orange
的動(dòng)物。
②Q2關(guān)注所有的rabbit
涌庭,以及所有的lazy
的動(dòng)物芥被。
如果一個(gè)消息的路由鍵是quick.orange.rabbit
欧宜,那么Q1和Q2都可以接收到坐榆,路由鍵是lazy.orange.elephant
的消息同樣如此。但是冗茸,路由鍵是quick.orange.fox
的消息只會(huì)到達(dá)Q1席镀,路由鍵是lazy.brown.fox
的消息只會(huì)到達(dá)Q2匹中。注意,路由鍵為lazy.pink.rabbit
的消息只會(huì)到達(dá)Q2一次豪诲,盡管它匹配了兩個(gè)綁定鍵顶捷。路由鍵為quick.brown.fox
的消息因?yàn)椴缓腿我獾慕壎ㄦI匹配,所以將會(huì)被丟棄屎篱。
假如我們不按常理出牌:發(fā)送一個(gè)路由鍵只有一個(gè)單詞或者四個(gè)單詞的消息服赎,像orange
或者quick.orange.male.rabbit
,這樣的話交播,這些消息因?yàn)椴缓腿我饨壎ㄦI匹配重虑,都將會(huì)丟棄。但是秦士,lazy.orange.male.rabbit
消息因?yàn)楹?code>lazy.#匹配缺厉,所以會(huì)到達(dá)Q2,盡管它包含四個(gè)單詞隧土。
Topic exchange
Topic exchange
非常強(qiáng)大提针,可以實(shí)現(xiàn)其他任意路由器的功能。
</br>當(dāng)一個(gè)隊(duì)列以綁定鍵#
綁定曹傀,它將會(huì)接收到所有的消息,而無視路由鍵(實(shí)際是綁定鍵#
匹配了任意的路由鍵)揖曾。----這和fanout
路由器一樣了亥啦。
</br>當(dāng)*
和#
這兩個(gè)特殊的字符不出現(xiàn)在綁定鍵中炭剪,Topic exchange
就會(huì)和direct exchange
類似了。
放在一塊
我們將會(huì)在我們的日志系統(tǒng)中使用主題路由器Topic exchange
翔脱,并假設(shè)所有的日志消息以兩個(gè)單詞<facility>.<severity>
為路由鍵。
代碼和上個(gè)教程幾乎一樣届吁。
生產(chǎn)者EmitLogTopic.java
:
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
public class EmitLogTopic {
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] argv) {
Connection connection = null;
Channel channel = null;
try {
//建立連接和通道
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
connection = factory.newConnection();
channel = connection.createChannel();
//聲明路由器和路由器類型
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
//定義路由鍵和消息
String routingKey = "";
String message = "msg.....";
//發(fā)布消息
channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'");
} catch (Exception e) {
e.printStackTrace();
} finally {
if (connection != null) {
try {
connection.close();
} catch (Exception ignore) {
}
}
}
}
}
消費(fèi)者ReceiveLogsTopic.java
:
import com.rabbitmq.client.*;
import java.io.IOException;
public class ReceiveLogsTopic {
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] argv) throws Exception {
//建立連接和通道
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//聲明路由器和路由器類型
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
String queueName = channel.queueDeclare().getQueue();
//
String bingingKeys[] = {""};
for (String bindingKey : bingingKeys) {
channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);
}
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
//監(jiān)聽消息
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + envelope.getRoutingKey() + "':'" + message + "'");
}
};
channel.basicConsume(queueName, true, consumer);
}
}
現(xiàn)在,可以動(dòng)手實(shí)驗(yàn)了疚沐。
開頭提到的:日志嚴(yán)重級別info/warn/crit...
和設(shè)備auth/cron/kern...
暂氯。
消費(fèi)者:
將String bingingKeys[] = {""}
改為String bingingKeys[] = {"#"}
,啟動(dòng)第一個(gè)消費(fèi)者痴施;
再改為String bingingKeys[] = {"kern.*"}
,啟動(dòng)第二個(gè)消費(fèi)者辣吃;
再改為String bingingKeys[] = {"*.critical"}
,啟動(dòng)第三個(gè)消費(fèi)者厘惦;
再改為String bingingKeys[] = {"kern.*", "*.critical"}
哩簿,啟動(dòng)第四個(gè)消費(fèi)者宵蕉。
生產(chǎn)者节榜,發(fā)送多個(gè)消息,如:
路由鍵為kern.critical
的消息:A critical kernel error
缝左;
路由鍵為kern.info
的消息:A kernel info
浓若;
路由鍵為kern.warn
的消息:A kernel warning
;
路由鍵為auth.critical
的消息:A critical auth error
挪钓;
路由鍵為cron.warn
的消息:A cron waning
;
路由鍵為cron.critical
的消息:A critical cron error
倚评;
試試最后的結(jié)果:第一個(gè)消費(fèi)者將會(huì)接收到所有的消息馏予,第二個(gè)消費(fèi)者將會(huì)kern
的所有嚴(yán)重級別的日志,第三個(gè)消費(fèi)者將會(huì)接收到所有設(shè)備的critical
消息霞丧,第四個(gè)消費(fèi)者將會(huì)接收到kern
設(shè)備的所有消息和所有
critical
消息。
下個(gè)教程蛹尝,我們將會(huì)學(xué)習(xí)如何讓消息往返后豫,以此來作為一個(gè)遠(yuǎn)程過程調(diào)用(RPC)突那。
說明
①與原文略有出入,如有疑問早龟,請參閱原文
②原文均是編譯后通過javacp命令直接運(yùn)行程序,我是在IDE中進(jìn)行的拄衰,相應(yīng)的操作做了修改饵骨。