上一篇中我們構(gòu)建了一個(gè)簡(jiǎn)單的日志系統(tǒng),我們可以把日志消息廣播給多個(gè)接受者荧关。
這篇中我們將來(lái)添加一個(gè)特性只接收部分消息。例如我只將一些錯(cuò)誤log存到文件中,把所有的log都打印到控制臺(tái)里岭辣。
1沦童、綁定(Bindings)
在上篇博文中偷遗,我們已經(jīng)創(chuàng)建了一個(gè)binding氏豌,代碼如下:
channel.queueBind(queueName, EXCHANGE_NAME, "");
一個(gè)binding就是exchange和Queue之間的一個(gè)關(guān)系般妙。可以簡(jiǎn)單的理解為:這個(gè)Queue對(duì)其相對(duì)于的exchange的消息之間建立了一個(gè)關(guān)系旺隙。
Binding可以使用一個(gè)已經(jīng)存在的routingKey參數(shù)伏社。為了避免和basic_publish參數(shù)混淆,我們稱之為binding key罕容。下邊就是我們?cè)趺从胟ey來(lái)創(chuàng)建一個(gè)binding:
channel.queueBind(queueName, EXCHANGE_NAME, "black");
binding key的意義有時(shí)候取決于exchange的類型旅择。對(duì)于Fanout類型的exchange捺宗,會(huì)忽略binding key。
2、Direct類型的exchange
我們上篇博文中的日志系統(tǒng)會(huì)把所有的log消息廣播給所有的消費(fèi)者。我們想擴(kuò)展來(lái)根據(jù)他們的日志級(jí)別來(lái)過(guò)濾log消息蕊连。例如:我們只想把error級(jí)別的日志寫到磁盤文件中载庭,而其它級(jí)別的日志消息則過(guò)濾掉顽铸。
我們之前使用的fanout類型的exchange鬼譬,但這樣就不會(huì)有太多的靈活性。
在這里我們將要使用direct類型的exchange。Direct類型exchange的路由算法是很簡(jiǎn)單的:要想一個(gè)消息能到達(dá)這個(gè)隊(duì)列颤枪,需要binding key和routing key正好能匹配得上汗捡。
在這樣的結(jié)構(gòu)中扇住,我們可以看到direct類型的exchange X女阀,有兩個(gè)queue綁定到它。第一個(gè)queue是以orange為binding key綁定到exchange X上的,第二個(gè)queue是由兩個(gè)binding key(black和green)綁定到exchange X的晓淀。
在這樣的設(shè)置中所袁,一條消息被推送到exchange,如果使用的routing key是error懦窘,那么消息就會(huì)被路由到C1中前翎;如果使用的routing key是error或者info或者warning,那么該消息將會(huì)被路由到C2中畅涂。其它的消息都將會(huì)被丟棄掉港华。
3、多重綁定(Multiple bindings)
用同一個(gè)binding來(lái)把多個(gè)queue綁定到同一個(gè)exchange也是可行的午衰。例如在之前例子的基礎(chǔ)上立宜,在X和Q1之間添加binding key名字為black,這樣的話臊岸,這里的direct類型的exchange就和fanout類型的一樣了橙数,可以把消息推送給所有的queue。帶有routing key為black的消息將會(huì)被推送到Q1和Q2中帅戒。
4灯帮、發(fā)送日志(Emitting logs)
我們將會(huì)使用這種模型,不使用fanout類型的exchange逻住,而是使用direct類型的钟哥。我們使用日志級(jí)別做為routing key,接收端根據(jù)設(shè)置的日志級(jí)別做為binding key來(lái)接收消息瞎访。首先來(lái)看看發(fā)射日志:
如之前一樣腻贰,首先來(lái)創(chuàng)建一個(gè)exchange:
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
然后準(zhǔn)備發(fā)送消息;
channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());
這里的”severity”可以是”info”、“warning”装诡、”error”等银受。
那么下面我們用代碼實(shí)現(xiàn)以下:
5践盼、生產(chǎn)者
package com.hrabbit.rabbitmq.routing.send;
import com.hrabbit.rabbitmq.utils.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @Auther: hrabbit
* @Date: 2018-06-30 下午7:41
* @Description:
*/
public class Send {
//交換機(jī)名稱
private final static String EXCHANGE_NAME = "hrabbit_exchange_direct";
public static void main(String[] args) throws IOException, TimeoutException {
// 獲取到連接以及mq通道
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
//聲明一個(gè)交換機(jī),一個(gè)參數(shù)為交換機(jī)名稱宾巍,第二個(gè)參數(shù)為模式
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
// 消息內(nèi)容
String message = "id=1的商品刪除了";
channel.basicPublish(EXCHANGE_NAME, "info", null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
channel.close();
connection.close();
}
}
在上面的生產(chǎn)者我發(fā)送了一個(gè)info類型的內(nèi)容咕幻,此時(shí)應(yīng)該C2可以接受到這條消息。
6顶霞、消費(fèi)者1號(hào)
消費(fèi)者定義的routingKey 為error
肄程。
package com.hrabbit.rabbitmq.routing.recover;
import com.hrabbit.rabbitmq.utils.ConnectionUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @Auther: hrabbit
* @Date: 2018-06-30 下午7:42
* @Description:
*/
public class Recover {
//隊(duì)列名稱
private final static String QUEUE_NAME = "hrabbit_queue_direct_1";
//交換機(jī)名稱
private final static String EXCHANGE_NAME = "hrabbit_exchange_direct";
public static void main(String[] args) throws IOException, TimeoutException {
// 獲取到連接以及mq通道
Connection connection = ConnectionUtils.getConnection();
final Channel channel = connection.createChannel();
// 聲明隊(duì)列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 綁定隊(duì)列到交換機(jī)
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error");
//------------下面邏輯和work模式一樣-----
// 同一時(shí)刻服務(wù)器只會(huì)發(fā)一條消息給消費(fèi)者
channel.basicQos(1);
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// 消息到達(dá) 觸發(fā)這個(gè)方法
String msg = new String(body, "utf-8");
System.out.println("[error]:" + msg);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("error消息執(zhí)行完畢!");
// 手動(dòng)回執(zhí)
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME,autoAck,consumer);
}
}
6选浑、消費(fèi)者2號(hào)
消費(fèi)者定義的routingKey 為error
蓝厌、info
、warning
古徒。
package com.hrabbit.rabbitmq.routing.recover;
import com.hrabbit.rabbitmq.utils.ConnectionUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @Auther: hrabbit
* @Date: 2018-06-30 下午7:42
* @Description:
*/
public class Recover2 {
//隊(duì)列名稱
private final static String QUEUE_NAME = "hrabbit_queue_direct_2";
//交換機(jī)名稱
private final static String EXCHANGE_NAME = "hrabbit_exchange_direct";
public static void main(String[] args) throws IOException, TimeoutException {
// 獲取到連接以及mq通道
Connection connection = ConnectionUtils.getConnection();
final Channel channel = connection.createChannel();
// 聲明隊(duì)列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 綁定隊(duì)列到交換機(jī)
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "warning");
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error");
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "info");
//------------下面邏輯和work模式一樣-----
// 同一時(shí)刻服務(wù)器只會(huì)發(fā)一條消息給消費(fèi)者
channel.basicQos(1);
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// 消息到達(dá) 觸發(fā)這個(gè)方法
String msg = new String(body, "utf-8");
System.out.println("[info]:" + msg);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("info消息執(zhí)行完畢拓提!");
// 手動(dòng)回執(zhí)
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME,autoAck,consumer);
}
}
7.測(cè)試結(jié)果
在消費(fèi)者2中輸出了類型info
的消息
總結(jié):
要記住生產(chǎn)者端的routing key,那么在消費(fèi)者端設(shè)置binding key和之前的routing key一樣隧膘,就可以用direct類型的exchange了代态,以此來(lái)獲取到自己需要的消息。
系列文章:
RabbitMQ:RabbitMQ-理論基礎(chǔ)
RabbitMQ:快速入門hello word
RabbitMQ:RabbitMQ:work queues 工作隊(duì)列(Round-robin/Fair dispatch)
RabbitMQ:RabbitMQ:消息應(yīng)答與消息持久化
RabbitMQ:發(fā)布/訂閱 Publish/Subscribe
RabbitMQ:Topic類型的exchange
RabbitMQ:RabbitMQ之消息確認(rèn)機(jī)制(事務(wù)+Confirm)
RabbitMQ:spring整合RabbitMQ