路由(routing)
在上一個(gè)教程中,我們實(shí)現(xiàn)了一個(gè)簡(jiǎn)單的日志系統(tǒng)买喧。我們將日志消息廣播到很多個(gè)消費(fèi)者。
在這個(gè)教程我們將給它加一個(gè)特性 —— 我們將使它可以只訂閱消息的一個(gè)子集。例如:我們直接將致命的錯(cuò)誤信息打印到日志(保存在磁盤中)旨巷,同時(shí)能將所有的日志信息打印在控制臺(tái)上。
綁定(Bindings)
在前面的教程中我們已經(jīng)用到了綁定添忘,可以重新調(diào)用那段代碼:
channel.queueBind(queueName, EXCHANGE_NAME, "");
一個(gè)exchange和queue之間的綁定關(guān)系采呐。換句話說(shuō):這個(gè)queue對(duì)exchange中的消息感興趣。
綁定需要增加一個(gè)額外的參數(shù)routingKey
搁骑。為了避免和推送消息中basic_publish
的參數(shù)名造成混亂斧吐,消費(fèi)者中我們叫它binding key
。下面展示創(chuàng)建一個(gè)binding key
:
channel.queueBind(queueName, EXCHANGE_NAME, "black");
binding key
的關(guān)鍵在于依賴的exchange類型仲器。類型為fanout
的exchange煤率,上個(gè)教程用到的,忽略了它的值乏冀。
直接交換(direct exchange)
上個(gè)教程我們的日志系統(tǒng)直接將所有的信息廣播給所有的消費(fèi)者蝶糯。我們想在這個(gè)的基礎(chǔ)上根據(jù)消息的嚴(yán)重程度進(jìn)行過(guò)濾。例如:我們想有一個(gè)程序用來(lái)將重要的錯(cuò)誤信息存儲(chǔ)到磁盤當(dāng)中辆沦,而不像浪費(fèi)空間去存儲(chǔ)warning
或者是info
級(jí)別的日志消息昼捍。
我們使用的fanout
類型的exchange,并不能給我們提供這樣的靈活性 —— 它只能盲目的進(jìn)行廣播肢扯。
我們將用direct
類型的exchange來(lái)代替它妒茬。direct
exchange背后的路由算法很簡(jiǎn)單 —— 生產(chǎn)者的routing key
完全匹配消費(fèi)者中的binding key
。
為了說(shuō)明這些蔚晨,請(qǐng)看下圖中的配置:
在上圖的配置中乍钻,我們看到有兩個(gè)隊(duì)列綁定了類型為direct
的exchange X
。第一個(gè)隊(duì)列綁定了的key為orange
,第二個(gè)隊(duì)列有兩個(gè)綁定团赁,一個(gè)綁定的key為black
育拨,另一個(gè)是green
。
在這樣的一個(gè)配置中欢摄,推送到exchange的消息熬丧,routing key為orange
的消息將路由到隊(duì)列1(Q1
)中,routing key為black
或者為green
的消息將路由到隊(duì)列2(Q2
)中怀挠。其他的消息將被丟棄析蝴。
多個(gè)綁定(multiple bindings)
多個(gè)隊(duì)列綁定相同的key是完全允許的。在我們的例子當(dāng)中绿淋,我們可以在X
和Q1
之間添加一個(gè)綁定key為black
的關(guān)系闷畸。這樣,類型為direct
的exchange就類似于fanout
了吞滞,能將消息廣播到所有匹配的隊(duì)列中佑菩。當(dāng)消息的routing key為black
時(shí),將分發(fā)到Q1
和Q2
中裁赠。
發(fā)送日志(emitting logs)
我們將在我們的日志系統(tǒng)中使用這個(gè)模式殿漠。將消息發(fā)送到類型為direct
的exchange中,而不是fanout
類型的exchange佩捞。這樣接收程序就能選擇重要的消息接收了绞幌。
像以前一樣,我們需要?jiǎng)?chuàng)建一個(gè)exchange先:
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
然后發(fā)送一個(gè)消息:
channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());
簡(jiǎn)單起見一忱,我們將嚴(yán)重的級(jí)別的定義為:info莲蜘、warning、error帘营。
訂閱(subscribing)
接收消息將和前面的教程差不多票渠,但有一點(diǎn)除外 —— 我們將給我們所有感興趣的每種嚴(yán)重的消息創(chuàng)建綁定關(guān)系。
String queueName = channel.queueDeclare().getQueue();
for(String severity : argv){
channel.queueBind(queueName, EXCHANGE_NAME, severity);
}
信息匯總
官網(wǎng)的使用命令行執(zhí)行的仪吧。這里我們將一次性的向exchange發(fā)送6條消息庄新,info、warn薯鼠、error三個(gè)級(jí)別各兩條择诈。如上圖,我們創(chuàng)建兩個(gè)消費(fèi)者出皇,這里我們創(chuàng)建兩個(gè)類羞芍,一個(gè)接收routing key為error的消息,并將其打印到文件中郊艘;另一個(gè)接收所有消息并打印到控制臺(tái)荷科。
package com.roachfu.tutorial.rabbitmq.website.direct;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class EmitLogDirect {
private static final String EXCHANGE_NAME = "direct.log";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
System.out.println(" [*] begin sent message to exchange");
/* 分別發(fā)送兩條 info,warn,error基本的消息 */
channel.basicPublish(EXCHANGE_NAME, "error", null, "[error] - this is first error message.".getBytes("UTF-8"));
channel.basicPublish(EXCHANGE_NAME, "warn", null, "[warn ] - this is first warn message.".getBytes("UTF-8"));
channel.basicPublish(EXCHANGE_NAME, "info", null, "[info ] - this is first info message.".getBytes("UTF-8"));
channel.basicPublish(EXCHANGE_NAME, "error", null, "[error] - this is second error message.".getBytes("UTF-8"));
channel.basicPublish(EXCHANGE_NAME, "warn", null, "[warn ] - this is second warn message.".getBytes("UTF-8"));
channel.basicPublish(EXCHANGE_NAME, "info", null, "[info ] - this is second info message.".getBytes("UTF-8"));
System.out.println( " [x] done. . . ");
channel.close();
connection.close();
}
}
package com.roachfu.tutorial.rabbitmq.website.direct;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class ReceiveLogDirectToConsole {
private static final String EXCHANGE_NAME = "direct.log";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, EXCHANGE_NAME, "info");
channel.queueBind(queueName, EXCHANGE_NAME, "warn");
channel.queueBind(queueName, EXCHANGE_NAME, "error");
System.out.println(" [*] Waiting for message and handle it to console . . . ");
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] receive " + envelope.getRoutingKey() + " : '" + message + "'");
}
};
channel.basicConsume(queueName,consumer);
}
}
package com.roachfu.tutorial.rabbitmq.website.direct;
import com.rabbitmq.client.*;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class ReceiveLogDirectToFile {
private static final String EXCHANGE_NAME = "direct.log";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, EXCHANGE_NAME, "error");
System.out.println(" [*] Waiting for message and handle it to file . . . ");
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
File file = new File("/temp/direct.log");
FileOutputStream out = new FileOutputStream(file, true);
out.write(body);
out.write(("\r\n").getBytes());
out.flush();
out.close();
}
};
channel.basicConsume(queueName,consumer);
}
}
我們先運(yùn)行兩個(gè)消費(fèi)者唯咬,然后運(yùn)行生產(chǎn)者∥方看輸出結(jié)果:
ReceiveLogDirectToConsole 消費(fèi)者
ReceiveLogDierctToFile 消費(fèi)者