RabbitMQ學(xué)習(xí)筆記

四種交換機(jī)類型

  • fanout ? 它會將發(fā)送到該交換機(jī)的消息路由到所有與該交互機(jī)存在binding的隊列中,無視RoutingKey

  • direct ?把消費(fèi)發(fā)送到RoutingKey與BindingKey完全匹配的隊列中洛口。

  • topic ? 類似direct交換機(jī)策精,但是BindingKey可以是模糊匹配的規(guī)則

    * 代表任意個單詞(可為零)

    # 代表只有一個單詞

  • headers ? 性能較差,一般不使用

重名交換機(jī)或隊列

生產(chǎn)者和消費(fèi)者都可以聲明一個隊列莱革。如果聲明一個已存在的交換機(jī)或隊列峻堰,只要聲明的參數(shù)完全匹配現(xiàn)存的交換機(jī)或隊列,則RabbitMQ什么都不做盅视,并成功返回捐名。否則,將會拋出異常闹击。

聲明交換機(jī)

聲明交換機(jī)時有兩個參數(shù)需要注意

  • durable: 是否持久化镶蹋。 持久化可以講交換機(jī)保存在磁盤中,服務(wù)器重啟后不會丟失信息赏半。

  • autoDelete: 是否自動刪除贺归。自動刪除的前提是至少有一個交換機(jī)或?qū)﹃犃信c這個交換機(jī)綁定,之后所有的交換機(jī)和隊列與這個交換機(jī)解綁断箫。

聲明隊列

  • durable: 是否持久化拂酣。 持久化可以講交換機(jī)保存在磁盤中,服務(wù)器重啟后不會丟失信息仲义。

  • exclusive: 是否排他婶熬。如果一個隊列被聲明為排他隊列,則該隊列僅對首次聲明它的連接可見埃撵,并且在連接斷開的時候自動刪除赵颅。這里需要注意三點:排他隊列是基于連接(connection)可見的,同一個連接的不同信道(channel)可以訪問同一連接創(chuàng)建的排他隊列暂刘〗让“首次” 是指如果體格連接已經(jīng)創(chuàng)建了一個排他隊列,則其他連接不允許再創(chuàng)建同名的排他隊列鸳惯。即使該排他隊列是持久化的(durable=true),一點連接關(guān)閉或客戶端退出商蕴,該隊列就會被刪除。

  • autoDelete: 是否自動刪除芝发。自動刪除的前提是:至少有一個消費(fèi)者連接到該隊列绪商,之后所有的消費(fèi)者與這個隊列的連接都斷開時,才會自動刪除辅鲸。

queueDeclarePassive

Queue.DeclareOk queueDeclarePassive(String queue) throws IOException;

這個方法用來檢測隊列是否已經(jīng)存在格郁。如果存在則正常返回,不存在則拋異常:404 channel exception 同時channel也會被關(guān)閉。

未確認(rèn)的消息

RabbitMQ不會為未確認(rèn)的消息設(shè)置過期時間例书,它判斷此消息是否需要重新投遞給消費(fèi)者的唯一依據(jù)是消費(fèi)該消息的消費(fèi)者是否斷開連接锣尉,這么設(shè)計的原因是RabbitMQ需要消費(fèi)者話很久的時間來處理這條消息。

mandatory參數(shù)

當(dāng)mandatory參數(shù)設(shè)置為true的時候决采,若交換機(jī)無法根據(jù)自己的類型和路由鍵找到合適的隊列自沧,那么RabbitMQ會調(diào)用basic.return命令將該消息返回給生產(chǎn)者。當(dāng)mandatory設(shè)置為false時树瞭,該消息將會被直接丟棄拇厢。

在springboot整合RabbitMQ的starter中,對應(yīng)的方法為RabbitTempalte.ReturnCallback接口的returnedMessage方法晒喷。

注意:當(dāng)找不到隊里時才會調(diào)用這個returnedMessage方法孝偎。當(dāng)交換機(jī)找不到時,會直接到RabbitTempalte.ConfirmCallback的confirm方法

消息的過期時間

針對隊列中的所有消息設(shè)置過期時間

@Bean
public Queue queue() {
    Map<String,Object> args = new HashMap<>();
    args.put("x-message-ttl","100");// 設(shè)置隊列中消息的過期時間,單位毫秒
    return new Queue(QUEUE_1, true,false,false,args);
}

針對單個消息設(shè)置過期時間

public void sendDirectAck(){
    CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
    MessageProperties messageProperties = new MessageProperties();
    // 設(shè)置消息的過期時間
    messageProperties.setExpiration("1000"); //設(shè)置每條消息具體的過期時間 單位毫秒
    Message message = new Message(user1.toString().getBytes(),messageProperties);// 不要在意字節(jié)數(shù)組的細(xì)節(jié)
    rabbitTemplate.convertAndSend(RabbitmqConfig.TOPIC_EXCHANGE,RabbitmqConfig.ROUTING_KEY1,message,correlationData);
}

死信隊列

DLX, Dead-Letter-Exchange凉敲。利用DLX, 當(dāng)消息在一個隊列中變成死信(dead message)之后衣盾,它能被重新publish到另一個Exchange,這個Exchange就是DLX爷抓。消息變成死信一向有一下幾種情況:

  • 消息被拒絕(basic.reject/ basic.nack)并且requeue=false
  • 消息TTL過期
  • 隊列達(dá)到最大長度

DLX也是一個正常的Exchange势决,和一般的Exchange沒有區(qū)別,它能在任何的隊列上被指定废赞,實際上就是設(shè)置某個隊列的屬性徽龟,當(dāng)這個隊列中有死信時,RabbitMQ就會自動的將這個消息重新發(fā)布到設(shè)置的Exchange上去唉地,進(jìn)而被路由到另一個隊列,可以監(jiān)聽這個隊列中消息做相應(yīng)的處理传透,這個特性可以彌補(bǔ)RabbitMQ 3.0以前支持的immediate參數(shù)的功能耘沼。

核心代碼實現(xiàn):通過在queueDeclare方法中加入“x-dead-letter-exchange”實現(xiàn)。

channel.exchangeDeclare("some.exchange.name", "direct");

Map<String, Object> args = new HashMap<String, Object>();
args.put("x-dead-letter-exchange", "some.exchange.name");
channel.queueDeclare("myqueue", false, false, false, args);

你也可以為這個DLX指定routing key朱盐,如果沒有特殊指定群嗤,則使用原隊列的routing key

args.put("x-dead-letter-routing-key", "some-routing-key");

延遲隊列

延遲隊列存儲的對象肯定是對應(yīng)的延遲消息,所謂”延遲消息”是指當(dāng)消息被發(fā)送以后兵琳,并不想讓消費(fèi)者立即拿到消息狂秘,而是等待指定時間后,消費(fèi)者才拿到這個消息進(jìn)行消費(fèi)躯肌。

場景一:在訂單系統(tǒng)中者春,一個用戶下單之后通常有30分鐘的時間進(jìn)行支付,如果30分鐘之內(nèi)沒有支付成功清女,那么這個訂單將進(jìn)行一場處理钱烟。這是就可以使用延遲隊列將訂單信息發(fā)送到延遲隊列。

場景二:用戶希望通過手機(jī)遠(yuǎn)程遙控家里的智能設(shè)備在指定的時間進(jìn)行工作。這時候就可以將用戶指令發(fā)送到延遲隊列拴袭,當(dāng)指令設(shè)定的時間到了再將指令推送到只能設(shè)備读第。


延遲隊列

消息的持久化

// 設(shè)置消息持久化 (默認(rèn)是持久化的)    
messageProperties.setDeliveryMode(MessageProperties.DEFAULT_DELIVERY_MODE);

如果隊列沒有設(shè)置持久化,那么及時消息設(shè)置了持久化拥刻,重啟后消息依舊會消失

持久化會嚴(yán)重影響rabbitmq的性能

消息的推拉模式

在rabbitmq中支持兩種消息處理的模式怜瞒,一種是訂閱模式(也叫push模式),由broker主動將消息推送給訂閱隊列的消費(fèi)者般哼;另一種是檢索模式(也叫pull模式)盼砍,需要消費(fèi)者調(diào)用channel.basicGet方法,主動從隊列中拉取消息逝她。

訂閱模式(push)

訂閱模式接收消息是最有效的一種消息處理方式浇坐,當(dāng)消息到達(dá)broker時,broker會自動將消息投遞給匹配的消費(fèi)者黔宛,而不需要消費(fèi)端手動去拉取近刘。在同一個通道channel中,每個消費(fèi)者Consumer都有著不同的consumer-tag標(biāo)識臀晃,這個標(biāo)識可以是客戶端指定觉渴,也可以由broker服務(wù)端自動生成(如果客戶端手動指定了,則以客戶端的為準(zhǔn)徽惋,如果沒有指定則由服務(wù)端自動生成)案淋。

檢索模式(pull)

通過使用Channel.basicGet顯示拉取消息,返回的數(shù)據(jù)類型是GetResponse實例险绘。

消息分發(fā)

多個消費(fèi)者訂閱同一個隊列踢京,這時候隊列中的消息會采用輪詢(Round-Robin)的方式發(fā)送給消費(fèi)者,即每個消息只會有一些消費(fèi)者來處理宦棺,避免消息的重復(fù)消費(fèi)瓣距。

但這種模式有一個潛在的問題,就說如果消費(fèi)者A處理消息的速度很快代咸,而B處理得很慢蹈丸。采用輪詢分發(fā)的時候有可能出現(xiàn)A消費(fèi)者處理空閑,而B消費(fèi)者卻出現(xiàn)消息堆積的問題呐芥。

此時可以在消費(fèi)者端調(diào)用channel.basicQos(...)方法指定每個消費(fèi)者未確認(rèn)的消息的數(shù)量(只在推(push)模式下有效

此方法有三個重載

void basicQos(int prefetchSize, int prefetchCount, boolean global) throws IOException;

void basicQos(int prefetchCount, boolean global) throws IOException;

void basicQos(int prefetchCount) throws IOException;

關(guān)于 global和prefetch的含義逻杖,參考官方文檔Consumer Prefetch

Unfortunately the channel is not the ideal scope for this - since a single channel may consume from multiple queues, the channel and the queue(s) need to coordinate with each other for every message sent to ensure they don't go over the limit. This is slow on a single machine, and very slow when consuming across a cluster.

Channel channel = ...;
Consumer consumer1 = ...;
Consumer consumer2 = ...;
channel.basicQos(10); // Per consumer limit
channel.basicConsume("my-queue1", false, consumer1);
channel.basicConsume("my-queue2", false, consumer2);

當(dāng)單個channel有多個消費(fèi)者時,協(xié)調(diào)兩個消費(fèi)者不超過limit會嚴(yán)重影響性能思瘟。

Therefore RabbitMQ redefines the meaning of the global flag in the basic.qos method:

global Meaning of prefetch_count in AMQP 0-9-1 Meaning of prefetch_count in RabbitMQ
false shared across all consumers on the channel applied separately to each new consumer on the channel
true shared across all consumers on the connection shared across all consumers on the channel

可以設(shè)置global=false荸百,是每個消費(fèi)者獨(dú)享消息數(shù)量的限制(默認(rèn)即為false)

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市潮太,隨后出現(xiàn)的幾起案子管搪,更是在濱河造成了極大的恐慌虾攻,老刑警劉巖,帶你破解...
    沈念sama閱讀 211,817評論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件更鲁,死亡現(xiàn)場離奇詭異霎箍,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)澡为,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,329評論 3 385
  • 文/潘曉璐 我一進(jìn)店門漂坏,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人媒至,你說我怎么就攤上這事顶别。” “怎么了拒啰?”我有些...
    開封第一講書人閱讀 157,354評論 0 348
  • 文/不壞的土叔 我叫張陵驯绎,是天一觀的道長。 經(jīng)常有香客問我谋旦,道長剩失,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 56,498評論 1 284
  • 正文 為了忘掉前任册着,我火速辦了婚禮拴孤,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘甲捏。我一直安慰自己演熟,他們只是感情好,可當(dāng)我...
    茶點故事閱讀 65,600評論 6 386
  • 文/花漫 我一把揭開白布司顿。 她就那樣靜靜地躺著芒粹,像睡著了一般。 火紅的嫁衣襯著肌膚如雪免猾。 梳的紋絲不亂的頭發(fā)上是辕,一...
    開封第一講書人閱讀 49,829評論 1 290
  • 那天,我揣著相機(jī)與錄音猎提,去河邊找鬼。 笑死旁蔼,一個胖子當(dāng)著我的面吹牛锨苏,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播棺聊,決...
    沈念sama閱讀 38,979評論 3 408
  • 文/蒼蘭香墨 我猛地睜開眼伞租,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了限佩?” 一聲冷哼從身側(cè)響起葵诈,我...
    開封第一講書人閱讀 37,722評論 0 266
  • 序言:老撾萬榮一對情侶失蹤裸弦,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后作喘,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體理疙,經(jīng)...
    沈念sama閱讀 44,189評論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,519評論 2 327
  • 正文 我和宋清朗相戀三年泞坦,在試婚紗的時候發(fā)現(xiàn)自己被綠了窖贤。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 38,654評論 1 340
  • 序言:一個原本活蹦亂跳的男人離奇死亡贰锁,死狀恐怖赃梧,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情豌熄,我是刑警寧澤授嘀,帶...
    沈念sama閱讀 34,329評論 4 330
  • 正文 年R本政府宣布,位于F島的核電站锣险,受9級特大地震影響蹄皱,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜囱持,卻給世界環(huán)境...
    茶點故事閱讀 39,940評論 3 313
  • 文/蒙蒙 一夯接、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧纷妆,春花似錦盔几、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,762評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽广恢。三九已至,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間蘸泻,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,993評論 1 266
  • 我被黑心中介騙來泰國打工虱歪, 沒想到剛下飛機(jī)就差點兒被人妖公主榨干…… 1. 我叫王不留霹抛,地道東北人。 一個月前我還...
    沈念sama閱讀 46,382評論 2 360
  • 正文 我出身青樓轮听,卻偏偏與公主長得像骗露,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子血巍,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 43,543評論 2 349

推薦閱讀更多精彩內(nèi)容

  • RabbitMQ 簡介 MQ 消息隊列萧锉,上承生產(chǎn)者,下接消費(fèi)者述寡。從生產(chǎn)者側(cè)獲取消息柿隙,然后將消息轉(zhuǎn)發(fā)給消費(fèi)者叶洞。由此可...
    2205閱讀 3,487評論 1 11
  • 什么叫消息隊列 消息(Message)是指在應(yīng)用間傳送的數(shù)據(jù)。消息可以非常簡單禀崖,比如只包含文本字符串衩辟,也可以更復(fù)雜...
    lijun_m閱讀 1,340評論 0 1
  • 來源 RabbitMQ是用Erlang實現(xiàn)的一個高并發(fā)高可靠AMQP消息隊列服務(wù)器。支持消息的持久化帆焕、事務(wù)惭婿、擁塞控...
    jiangmo閱讀 10,350評論 2 34
  • http://liuxing.info/2017/06/30/Spring%20AMQP%E4%B8%AD%E6%...
    sherlock_6981閱讀 15,892評論 2 11
  • 1.vhost(虛擬主機(jī)) 虛擬主機(jī)相當(dāng)于一個隔離的空間,多個虛擬主機(jī)可以對不同的用戶叶雹,不同的作用分割開來 2.P...
    MR_Hanjc閱讀 782評論 0 0