上一節(jié)講述了RabbitMQ的基礎(chǔ)Api和參數(shù)詳解.本節(jié)將深入學(xué)習(xí)使用一些RabbitMQ的高階用法.比如死信隊列\(zhòng)延時隊列等
1. mandatory參數(shù)和備份交換器
- mandatory參數(shù)
在聲明交換器的exchangeDeclare方法中,有一個mandatory參數(shù),當(dāng)設(shè)置為true時,交換器無法根據(jù)自身類型和RoutingKey找到隊列的時候,RabbitMQ會調(diào)用Basic.Return命令將消息返回給生產(chǎn)者.生產(chǎn)者可以在channel中添加一個ReturnListener監(jiān)聽器回調(diào)服務(wù)器返回的消息.代碼如下:
public void mandatoryTest() throws IOException {
Connection connection = ConnectionCreator.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("mandatory_exchange", BuiltinExchangeType.DIRECT);
channel.addReturnListener((replyCode, replyText, exchange, routingKey, properties, body) -> {
System.out.println(replyCode + "\t" + replyText);
System.out.println(new String(body));
});
channel.basicPublish("mandatory_exchange", "", true, MessageProperties.TEXT_PLAIN, "mandatory message".getBytes());
RabbitMQUtils.close(channel, connection);
}
控制臺運(yùn)行結(jié)果如下:
- 備份交換器
備份交換器(Alternate Exchange),可以通過聲明備份交換器的方式處理未被路由的消息,從而避免消息丟失.
在聲明交換器時,添加alternate-exchange參數(shù),添加交換器的備份交換器.也可以通過策略Policy,同時使用時,前者的優(yōu)先級更高.
代碼如下:
public void alternateExchangeTest() throws IOException {
Connection connection = ConnectionCreator.getConnection();
Channel channel = connection.createChannel();
// 聲明一個備份交換器, 類型通常為fanout的.從而使任何沒有去處的消息都發(fā)送到備份交換器綁定的隊列中.
channel.exchangeDeclare("alternate_exchange_test", BuiltinExchangeType.FANOUT, true, false, null);
// 聲明交換器并添加備份交換器參數(shù)
Map<String, Object> args = new HashMap<>();
args.put("alternate-exchange", "alternate_exchange_test");
channel.exchangeDeclare("normal_exchange", BuiltinExchangeType.DIRECT, true, false, args);
channel.queueDeclare("alternate_queue_test", true, false, false, null);
channel.queueDeclare("normal_queue", true, false, false, null);
channel.queueBind("normal_queue", "normal_exchange", "normal_routing");
// 綁定備份交換器和對應(yīng)的隊列
channel.queueBind("alternate_queue_test", "alternate_exchange_test", "alternate_routing");
channel.basicPublish("normal_exchange", "wrong_routing",new AMQP.BasicProperties.Builder().build(), "testMessage".getBytes());
RabbitMQUtils.close(channel, connection);
}
消息發(fā)送后,由于沒有對應(yīng)的路由鍵,因此消息將會發(fā)送到備份交換器,通過備份交換器發(fā)送到綁定的隊列中.可以在管理頁面看到:
對于備份交換器有以下幾點:
- 如果設(shè)置的備份交換器不存在,則消息丟失,沒有異常
- 如果備份交換器沒有綁定任何隊列,則消息丟失,沒有異常
- 如果備份交換器沒有匹配的RoutingKey的隊列,則消息丟失,沒有異常
- 如果備份交換器和mandatory參數(shù)同事使用,則mandatory參數(shù)不生效
2.過期時間(TTL)
RabbitMQ可以對消息和隊列設(shè)置過期時間
- 對消息設(shè)置過期時間
- 通過隊列設(shè)置消息的過期時間.
在聲明隊列時,加入x-message-ttl參數(shù)實現(xiàn)的,參數(shù)單位是ms
代碼實例如下:
public void messageTTLByQueueTest() throws IOException {
Connection connection = ConnectionCreator.getConnection();
Channel channel = connection.createChannel();
Map<String, Object> map = new HashMap<>();
map.put("x-message-ttl", 5000);
channel.queueDeclare("ttl_queue", true, false, false, map);
channel.queueBind("ttl_queue", RabbitMQUtils.TEST_EXCHANGE, "ttl_routing");
channel.basicPublish(RabbitMQUtils.TEST_EXCHANGE, "ttl_routing", null, "ttl_message".getBytes());
RabbitMQUtils.close(channel, connection);
}
- 通過消息屬性設(shè)置消息的過期時間
在channel.basicPublish方法中加入expiration的屬性參數(shù),單位為ms.示例代碼如下:
public void messageTTLByMessageTest() throws IOException {
Connection connection = ConnectionCreator.getConnection();
Channel channel = connection.createChannel();
channel.basicPublish(RabbitMQUtils.TEST_EXCHANGE, RabbitMQUtils.TEST_ROUTING,
new AMQP.BasicProperties().builder().expiration("30000").build(), "message_ttl_test".getBytes());
RabbitMQUtils.close(channel, connection);
}
對于第一種隊列設(shè)置TTL屬性的方法,一旦消息過期,就會從隊列中抹去.而在第二種方法中,即使消息過期,也不會馬上從隊列中抹去.因為每條消息是否過期是在即將投遞前判定的.而第二種方法中消息過期時間不同.
- 設(shè)置隊列的ttl
聲明隊列時加入x-expires參數(shù).如下:
public void queueTTLTest() throws IOException {
Connection connection = ConnectionCreator.getConnection();
Channel channel = connection.createChannel();
Map<String, Object> args = new HashMap<>();
args.put("x-expires", 60000);
channel.queueDeclare("ttl_queue_test", true, false, false, args);
RabbitMQUtils.close(channel, connection);
}
隊列到期后會自動被刪除
3. 死信隊列
DLX: Dead-Letter-Exchange 死信交換器.當(dāng)消息在一個隊列中變成死信之后,能夠被重新投遞到另一個交換器中.這個交換器就是DLX.消息變成死信一般由于以下幾種情況:
- 消息被拒絕 basic.reject basic.nack 并且設(shè)置requeue為false
- 消息過期
- 隊列達(dá)到最大長度
代碼示例如下:
public void DLXTest() throws IOException {
Connection connection = ConnectionCreator.getConnection();
Channel channel = connection.createChannel();
// 聲明一個作為死信隊列的路由和隊列并綁定
channel.exchangeDeclare("dlx_test", BuiltinExchangeType.DIRECT);
channel.queueDeclare("dlx_queue", true, false, false, null);
channel.queueBind("dlx_queue", "dlx_test", "dlx_routing");
// 聲明一個隊列,設(shè)置死信交換器和消息過期時間
Map<String, Object> args = new HashMap<>();
args.put("x-message-ttl", 5000);
args.put("x-dead-letter-exchange", "dlx_test");
args.put("x-dead-letter-exchange-routing-key", "dlx_routing");
channel.queueDeclare("normal_queue", true, false, false, args);
channel.queueBind("normal_queue", RabbitMQUtils.TEST_EXCHANGE, "dlx_routing");
channel.basicPublish(RabbitMQUtils.TEST_EXCHANGE, "dlx_routing", null, "dead letter queue test".getBytes());
RabbitMQUtils.close(channel, connection);
}
4.延遲隊列
上面的死信隊列+過期時間配合使用就實現(xiàn)了一個延遲隊列.
消息到達(dá)隊列后,一段時間沒有被消費(fèi),就進(jìn)入到死信隊列.
5. 優(yōu)先級隊列
可以通過隊列的x-max-priority屬性設(shè)置隊列的最大優(yōu)先級.
發(fā)布消息的時候設(shè)置priority屬性設(shè)置消息的優(yōu)先級.
如果節(jié)點中消息有堆積.則高優(yōu)先級的消息會被優(yōu)先消費(fèi).