四種交換機(jī)類型
fanout ? 它會將發(fā)送到該交換機(jī)的消息路由到所有與該交互機(jī)存在binding的隊列中,無視RoutingKey
direct ?把消費(fèi)發(fā)送到RoutingKey與BindingKey完全匹配的隊列中洛口。
-
topic ? 類似direct交換機(jī)策精,但是BindingKey可以是模糊匹配的規(guī)則
* 代表任意個單詞(可為零)
# 代表只有一個單詞
headers ? 性能較差,一般不使用
重名交換機(jī)或隊列
生產(chǎn)者和消費(fèi)者都可以聲明一個隊列莱革。如果聲明一個已存在的交換機(jī)或隊列峻堰,只要聲明的參數(shù)完全匹配現(xiàn)存的交換機(jī)或隊列,則RabbitMQ什么都不做盅视,并成功返回捐名。否則,將會拋出異常闹击。
聲明交換機(jī)
聲明交換機(jī)時有兩個參數(shù)需要注意
durable: 是否持久化镶蹋。 持久化可以講交換機(jī)保存在磁盤中,服務(wù)器重啟后不會丟失信息赏半。
autoDelete: 是否自動刪除贺归。自動刪除的前提是至少有一個交換機(jī)或?qū)﹃犃信c這個交換機(jī)綁定,之后所有的交換機(jī)和隊列與這個交換機(jī)解綁断箫。
聲明隊列
durable: 是否持久化拂酣。 持久化可以講交換機(jī)保存在磁盤中,服務(wù)器重啟后不會丟失信息仲义。
exclusive: 是否排他婶熬。如果一個隊列被聲明為排他隊列,則該隊列僅對首次聲明它的連接可見埃撵,并且在連接斷開的時候自動刪除赵颅。這里需要注意三點:排他隊列是基于連接(connection)可見的,同一個連接的不同信道(channel)可以訪問同一連接創(chuàng)建的排他隊列暂刘〗让“首次” 是指如果體格連接已經(jīng)創(chuàng)建了一個排他隊列,則其他連接不允許再創(chuàng)建同名的排他隊列鸳惯。即使該排他隊列是持久化的(durable=true),一點連接關(guān)閉或客戶端退出商蕴,該隊列就會被刪除。
autoDelete: 是否自動刪除芝发。自動刪除的前提是:至少有一個消費(fèi)者連接到該隊列绪商,之后所有的消費(fèi)者與這個隊列的連接都斷開時,才會自動刪除辅鲸。
queueDeclarePassive
Queue.DeclareOk queueDeclarePassive(String queue) throws IOException;
這個方法用來檢測隊列是否已經(jīng)存在格郁。如果存在則正常返回,不存在則拋異常:404 channel exception 同時channel也會被關(guān)閉。
未確認(rèn)的消息
RabbitMQ不會為未確認(rèn)的消息設(shè)置過期時間例书,它判斷此消息是否需要重新投遞給消費(fèi)者的唯一依據(jù)是消費(fèi)該消息的消費(fèi)者是否斷開連接锣尉,這么設(shè)計的原因是RabbitMQ需要消費(fèi)者話很久的時間來處理這條消息。
mandatory參數(shù)
當(dāng)mandatory參數(shù)設(shè)置為true的時候决采,若交換機(jī)無法根據(jù)自己的類型和路由鍵找到合適的隊列自沧,那么RabbitMQ會調(diào)用basic.return命令將該消息返回給生產(chǎn)者。當(dāng)mandatory設(shè)置為false時树瞭,該消息將會被直接丟棄拇厢。
在springboot整合RabbitMQ的starter中,對應(yīng)的方法為RabbitTempalte.ReturnCallback接口的returnedMessage方法晒喷。
注意:當(dāng)找不到隊里時才會調(diào)用這個returnedMessage方法孝偎。當(dāng)交換機(jī)找不到時,會直接到RabbitTempalte.ConfirmCallback的confirm方法
消息的過期時間
針對隊列中的所有消息設(shè)置過期時間
@Bean
public Queue queue() {
Map<String,Object> args = new HashMap<>();
args.put("x-message-ttl","100");// 設(shè)置隊列中消息的過期時間,單位毫秒
return new Queue(QUEUE_1, true,false,false,args);
}
針對單個消息設(shè)置過期時間
public void sendDirectAck(){
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
MessageProperties messageProperties = new MessageProperties();
// 設(shè)置消息的過期時間
messageProperties.setExpiration("1000"); //設(shè)置每條消息具體的過期時間 單位毫秒
Message message = new Message(user1.toString().getBytes(),messageProperties);// 不要在意字節(jié)數(shù)組的細(xì)節(jié)
rabbitTemplate.convertAndSend(RabbitmqConfig.TOPIC_EXCHANGE,RabbitmqConfig.ROUTING_KEY1,message,correlationData);
}
死信隊列
DLX, Dead-Letter-Exchange凉敲。利用DLX, 當(dāng)消息在一個隊列中變成死信(dead message)之后衣盾,它能被重新publish到另一個Exchange,這個Exchange就是DLX爷抓。消息變成死信一向有一下幾種情況:
- 消息被拒絕(basic.reject/ basic.nack)并且requeue=false
- 消息TTL過期
- 隊列達(dá)到最大長度
DLX也是一個正常的Exchange势决,和一般的Exchange沒有區(qū)別,它能在任何的隊列上被指定废赞,實際上就是設(shè)置某個隊列的屬性徽龟,當(dāng)這個隊列中有死信時,RabbitMQ就會自動的將這個消息重新發(fā)布到設(shè)置的Exchange上去唉地,進(jìn)而被路由到另一個隊列,可以監(jiān)聽這個隊列中消息做相應(yīng)的處理传透,這個特性可以彌補(bǔ)RabbitMQ 3.0以前支持的immediate參數(shù)的功能耘沼。
核心代碼實現(xiàn):通過在queueDeclare方法中加入“x-dead-letter-exchange”實現(xiàn)。
channel.exchangeDeclare("some.exchange.name", "direct");
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-dead-letter-exchange", "some.exchange.name");
channel.queueDeclare("myqueue", false, false, false, args);
你也可以為這個DLX指定routing key朱盐,如果沒有特殊指定群嗤,則使用原隊列的routing key
args.put("x-dead-letter-routing-key", "some-routing-key");
延遲隊列
延遲隊列存儲的對象肯定是對應(yīng)的延遲消息,所謂”延遲消息”是指當(dāng)消息被發(fā)送以后兵琳,并不想讓消費(fèi)者立即拿到消息狂秘,而是等待指定時間后,消費(fèi)者才拿到這個消息進(jìn)行消費(fèi)躯肌。
場景一:在訂單系統(tǒng)中者春,一個用戶下單之后通常有30分鐘的時間進(jìn)行支付,如果30分鐘之內(nèi)沒有支付成功清女,那么這個訂單將進(jìn)行一場處理钱烟。這是就可以使用延遲隊列將訂單信息發(fā)送到延遲隊列。
場景二:用戶希望通過手機(jī)遠(yuǎn)程遙控家里的智能設(shè)備在指定的時間進(jìn)行工作。這時候就可以將用戶指令發(fā)送到延遲隊列拴袭,當(dāng)指令設(shè)定的時間到了再將指令推送到只能設(shè)備读第。
消息的持久化
// 設(shè)置消息持久化 (默認(rèn)是持久化的)
messageProperties.setDeliveryMode(MessageProperties.DEFAULT_DELIVERY_MODE);
如果隊列沒有設(shè)置持久化,那么及時消息設(shè)置了持久化拥刻,重啟后消息依舊會消失
持久化會嚴(yán)重影響rabbitmq的性能
消息的推拉模式
在rabbitmq中支持兩種消息處理的模式怜瞒,一種是訂閱模式(也叫push模式),由broker主動將消息推送給訂閱隊列的消費(fèi)者般哼;另一種是檢索模式(也叫pull模式)盼砍,需要消費(fèi)者調(diào)用channel.basicGet方法,主動從隊列中拉取消息逝她。
訂閱模式(push)
訂閱模式接收消息是最有效的一種消息處理方式浇坐,當(dāng)消息到達(dá)broker時,broker會自動將消息投遞給匹配的消費(fèi)者黔宛,而不需要消費(fèi)端手動去拉取近刘。在同一個通道channel中,每個消費(fèi)者Consumer都有著不同的consumer-tag標(biāo)識臀晃,這個標(biāo)識可以是客戶端指定觉渴,也可以由broker服務(wù)端自動生成(如果客戶端手動指定了,則以客戶端的為準(zhǔn)徽惋,如果沒有指定則由服務(wù)端自動生成)案淋。
檢索模式(pull)
通過使用Channel.basicGet顯示拉取消息,返回的數(shù)據(jù)類型是GetResponse實例险绘。
消息分發(fā)
多個消費(fèi)者訂閱同一個隊列踢京,這時候隊列中的消息會采用輪詢(Round-Robin)的方式發(fā)送給消費(fèi)者,即每個消息只會有一些消費(fèi)者來處理宦棺,避免消息的重復(fù)消費(fèi)瓣距。
但這種模式有一個潛在的問題,就說如果消費(fèi)者A處理消息的速度很快代咸,而B處理得很慢蹈丸。采用輪詢分發(fā)的時候有可能出現(xiàn)A消費(fèi)者處理空閑,而B消費(fèi)者卻出現(xiàn)消息堆積的問題呐芥。
此時可以在消費(fèi)者端調(diào)用channel.basicQos(...)方法指定每個消費(fèi)者未確認(rèn)的消息的數(shù)量(只在推(push)模式下有效)
此方法有三個重載
void basicQos(int prefetchSize, int prefetchCount, boolean global) throws IOException;
void basicQos(int prefetchCount, boolean global) throws IOException;
void basicQos(int prefetchCount) throws IOException;
關(guān)于 global和prefetch的含義逻杖,參考官方文檔Consumer Prefetch
Unfortunately the channel is not the ideal scope for this - since a single channel may consume from multiple queues, the channel and the queue(s) need to coordinate with each other for every message sent to ensure they don't go over the limit. This is slow on a single machine, and very slow when consuming across a cluster.
Channel channel = ...;
Consumer consumer1 = ...;
Consumer consumer2 = ...;
channel.basicQos(10); // Per consumer limit
channel.basicConsume("my-queue1", false, consumer1);
channel.basicConsume("my-queue2", false, consumer2);
當(dāng)單個channel有多個消費(fèi)者時,協(xié)調(diào)兩個消費(fèi)者不超過limit會嚴(yán)重影響性能思瘟。
Therefore RabbitMQ redefines the meaning of the global flag in the basic.qos method:
global | Meaning of prefetch_count in AMQP 0-9-1 | Meaning of prefetch_count in RabbitMQ |
---|---|---|
false | shared across all consumers on the channel | applied separately to each new consumer on the channel |
true | shared across all consumers on the connection | shared across all consumers on the channel |
可以設(shè)置global=false荸百,是每個消費(fèi)者獨(dú)享消息數(shù)量的限制(默認(rèn)即為false)