繼續(xù)翻譯第一次嘗試進行這樣模式的學(xué)習(xí)系洛,感覺好難進行螃征,不過還是要堅持住搪桂!
簡介
在之前的教程中,我們創(chuàng)建了一個工作隊列盯滚,工作隊列使用情況的假設(shè)是:每個人物都交付給一個Worker踢械,也就是消費者。在這部分中魄藕,我們將做一些完全不同的事情——我們將向多個消費者傳遞消息内列。這樣的模式被稱為“發(fā)布/訂閱"模式,檢查P/S模式背率。
為了說明這個模式话瞧,我們將會構(gòu)建一個簡單的日志記錄系統(tǒng)嫩与。它將由兩個程序組成:1.第一個程序發(fā)送日志消息。2.第二個程序?qū)⒔邮艽蛴∵@些日志交排。
在我們的日志系統(tǒng)中划滋,有接收功能的程序都將得到消息。所以埃篓,我們就可以運行一個接收器处坪,將這些日志引導(dǎo)到磁盤。同時我們再運行另一個接收器架专,功能是讓我們在屏幕上看到日志同窘。
本質(zhì)是:生產(chǎn)者發(fā)送的消息,會被傳播到所有消費者那里去部脚。
Exchange——交換器
在之前的教程之中塞椎,我們只是通過隊列發(fā)送和接受消息。接下來我們需要了解一下完整的消息傳遞模型睛低。
讓我們快速的回顧一下前面教程中介紹的內(nèi)容:
- 生產(chǎn)者是一個應(yīng)用程序,它的任務(wù)是發(fā)送消息
- 隊列是存儲消息的緩沖區(qū)
- 消費者是一個應(yīng)用程序服傍,它的任務(wù)是接收消息
完整的RabbitMQ消息傳遞模型的核心思想是——生產(chǎn)者不會直接向隊列發(fā)送任何消息钱雷!甚至消費者都不知道消息是否會被傳遞到哪些隊列。
那么這些消息發(fā)送給誰了呢吹零?
生產(chǎn)者只能將消息發(fā)送到Exchange罩抗,也就是交換器里面,交換是一個很簡單的事情灿椅。一方面套蒂,它接受來自生產(chǎn)者的消息,另一方面則把消息推送到隊列里面茫蛹。交換器必須知道如何處理它收到的消息——它是否應(yīng)該推送到特定的隊列中操刀?它是否應(yīng)該被推送到N個隊列里面?獲取它應(yīng)該被拋棄婴洼?
答:這個規(guī)則是由交換類型定義的骨坑。
有一些可用的交換類型:direct、topic柬采、headers和fanout欢唾。我們這次主要關(guān)注點在fanout上,我們將會創(chuàng)建這種類型的交換粉捻,并調(diào)用它的日志礁遣。
channel.exchangeDeclare("logs","fanout");
fanout類型很簡單,它會將接收到的所有消息肩刃,傳播到它知道的所有隊列中去祟霍。對于我們的系統(tǒng)來說杏头,這正是我們需要的。
簡單說一下這幾個交換類型
direct : 所有發(fā)送到direct類型的交換器中的消息都會被轉(zhuǎn)發(fā)到”RouteKey“中指定的隊列浅碾。
fanout : 所有發(fā)送到fanout類型的交換器中的消息都會被轉(zhuǎn)發(fā)到所有與該交換器綁定的隊列大州。
topic : 所有發(fā)送到topic類型的交換器中的消息都會被轉(zhuǎn)發(fā)到所有關(guān)心RouteKey中指定的話題的隊列。
header : 這個用的比較少垂谢,忽略了RouteKey的路由方式厦画,使用Headers來匹配。Headers是個鍵值對滥朱。
(我自己也是在學(xué)習(xí)中根暑,不是很熟悉,以后我研究研究徙邻,明白了單寫一個番外)
交換器列表
我們可以通過rabbitmqctl語句列出我們可以運行的可用的交換器列表:
rabbitmqctl list_exchanges
出來這些東西排嫌,莫方!很多帶著amq做開頭的交換器和沒有命名的交換器缰犁,這些都是默認(rèn)創(chuàng)建的淳地,而且我們目前用不上他們。
沒有名字的交換器
教程的前面幾個部分里面帅容,我們對交換器一無所知颇象,但是仍然能夠?qū)⑾l(fā)送到隊列里面去。這是為啥并徘?
我們使用的是默認(rèn)的交換器遣钳,我們用空字符串("")去識別它。
回憶一下前面我們是如何發(fā)送消息的:
channel.basicPublish("","hello",null,message.getBytes());
第一個參數(shù)是空字符串麦乞,這個就是我們使用的交換器的名稱蕴茴。空字符串表示默認(rèn)或者匿名的交換器姐直。如果消息存在倦淀,那么則使用RoutingKey指定的名稱將消息放到隊列中去。
現(xiàn)在声畏,我們可以發(fā)布到我們自己命名的交換器啦:
channel.basicPublish("logs","",null,message.getBytes());
臨時隊列——Temporyary Queues
你可能記得我們使用過有指定名稱的隊列(還記得"hello"和task_queue"嗎晃听?)。對于我們來說砰识,能夠給隊列命名能扒,是至關(guān)重要的,因為我們需要把Worker(消費者)指向相同的隊列辫狼。當(dāng)我們想要在消費者和生產(chǎn)者之間共享隊列的時候初斑,給隊列命名就會尤為重要。(我也不知道為啥官方把這句話說了兩遍膨处,可能很重要吧ㄟ( ▔, ▔ )ㄏ...)
但是<印I笆!對于我們的Log來說鹃答,情況就不一樣啦乎澄。我們希望拿到所有關(guān)于日志的消息,而不只是它們中的一部分测摔。我們也只對當(dāng)前流動的消息感興趣置济,而不是舊消息。想要解決這個問題锋八,我們需要理清兩件事:
首先浙于,每當(dāng)我們連接到RabbitMQ,我們都需要一個新的挟纱,空的隊列羞酗。要做到這一點,我們可以創(chuàng)建一個帶有隨機名稱的隊列紊服,或者檀轨,我們選擇更好的方式——讓服務(wù)器為我們選擇一個隨機的隊列名稱。其次欺嗤,一旦我們斷開了消費者的連接裤园,應(yīng)該自動刪除隊列。在Java客戶端剂府,當(dāng)我們沒有向queueDeclare()提供參數(shù)的時候,我們會創(chuàng)建一個非持久的剃盾,獨占的腺占,自動刪除的隊列,并會生成一個名稱痒谴。
String queueName = channel.queueDeclare().getQueue();
此時衰伯,queueName包含一個隨機隊列名稱。比如积蔚,他可能看起來像amq.gen-jzty20brgko-hjmuj0wlg (總之是亂七八糟的)
綁定——Bindings
我們已經(jīng)創(chuàng)建了一個fanout類型的交換器意鲸,和一個隊列,現(xiàn)在我們需要告訴交換器尽爆,讓他將消息發(fā)送到我們的隊列中區(qū)怎顾。交換器和隊列之間的關(guān)系叫做綁定。
channel.queueBind(queueName,"logs","")
從現(xiàn)在開始漱贱,我們的log交換器將會向我們隊列添加消息了槐雾。
完整示例
這次我沒有像之前,先貼例子幅狮。這次我選擇先寫小部分募强,最后寫完整的株灸,這樣就和官方基本上一模一樣了,會在某些地方加上自己的理解擎值。還有注釋慌烧!我還是會寫的很完整的!
發(fā)布日志消息的生產(chǎn)者程序與前一期沒啥太大的不同鸠儿,不過有些變化比較重要屹蚊。我們現(xiàn)在想要將消息發(fā)布到日志交換器中,而不是無名的交換器捆交。我們需要在發(fā)送的時候提供一個路由鍵(RoutingKey),但是我們忽略了它的value淑翼,因為我們的交換類型是fanout。
EmitLog.java
public class EmitLog {
//設(shè)置交換器的名字
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] args) throws IOException {
//獲取連接
Connection connection = ConnectionUtil.getConnection();
//創(chuàng)建通道
Channel channel = connection.createChannel();
//聲明交換器,給它名字,設(shè)置交換類型為fanout
channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
//待傳遞的消息內(nèi)容
String message = getMessage(args);
//傳消息
channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes());
System.out.println("[x] Sent '"+message+"'");
//關(guān)閉連接通道
channel.close();
connection.close();
}
private static String getMessage(String[] strings) {
if (strings.length<1){
return "hello world";
}
return joinStrings(strings," ");
}
private static String joinStrings(String[] strings, String delimiter) {
int length = strings.length;
if (length == 0){
return "";
}
StringBuilder words = new StringBuilder(strings[0]);
for (int i = 1;i < length; i++){
words.append(delimiter).append(strings[i]);
}
return words.toString();
}
}
(我貼心的把官方?jīng)]給你們的工具函數(shù)也寫上了品追!快夸我(☆▽☆))
正如你們看到的玄括,建立連接之后,我們聲明了交換器肉瓦,這個步驟是必要的遭京,因為把消息發(fā)布到一個不存在的交換器上是禁止的!
如果沒有隊列綁定到交換中泞莉,消息將消失哪雕,但是對于我們是允許的,因為如果沒有消費者在收集消息鲫趁,我們可以安全的丟棄信息斯嚎。
ReceiveLogs.java
public class ReceiveLogs {
//設(shè)置交換器的名字
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] args) throws IOException {
//獲取連接
Connection connection = ConnectionUtil.getConnection();
//創(chuàng)建通道
Channel channel = connection.createChannel();
//聲明交換器,給它名字,設(shè)置交換類型為fanout
channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
//得到隊列的名字
String queueName = channel.queueDeclare().getQueue();
//隊列和交換器進行綁定
channel.queueBind(queueName,EXCHANGE_NAME,"");
System.out.println("[*] Waiting for message.To exit press CTRL+C");
//接收
Consumer consumer =new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body,"UTF-8");
System.out.println("[x]Receive '"+ message+"'");
}
};
boolean autoAck = true;
channel.basicConsume(queueName,autoAck,consumer);
}
}
這里可以自己試試,打開兩個或者三個ReceiveLogs挨厚,然后嘗試看看發(fā)送一條信息堡僻,會發(fā)生什么?
結(jié)果
本來想你們自己嘗試來著疫剃,不過我還是回來把結(jié)果圖貼上钉疫。具體怎么做請看第二期工作隊列模式里面教的方式。
生產(chǎn)者:
打開的三個消費者:
第三個反正長得一樣就不貼了巢价,╭(╯^╰)╮
試驗成功牲阁,一個發(fā)送,三個同時接受成功壤躲!