title: 【MQ】初始MQ
date: 2017-12-08 21:48:26
tags: MQ
categories: MQ
接觸 MQ 之前簡單的理解消息隊列就是一個理論上無限大的線性表洽损,接觸后發(fā)現(xiàn) MQ 支持的功能遠不止這些厦取。MQ 的功能可以概括為:削峰填谷赖淤,異步解耦医增。
從模型上看慎皱,MQ 模型不是狹義上的 C/S 模型,而是消息服務(wù)投遞模型:
- 在程序角度:當(dāng)程序連接到 RabbitMQ 時必須決定自己是發(fā)送者還是接受者
- 在 MQ 角度:MQ 及接收消息叶骨,又發(fā)送消息
AMQP(高級消息隊列協(xié)議)是對 MQ 最抽象的描述茫多。
AMQP
AMQP 定義了一個 MQ 的幾個組件,官方的描述還是比較晦澀的忽刽,我以自己的理解描述所以可能不夠準確:
- Server(broker):MQ 服務(wù)器
- Exchange:一個功能強大 router天揖,不做消息的存儲,單純轉(zhuǎn)發(fā)給 MQ
- Message Queue:消息隊列跪帝,具體存儲未被消費的消息
- Message:消息
- Binding:關(guān)聯(lián) Exchange 和 Message Queeu 的路由表
- Connection:鏈接今膊,TCP 鏈接
- Channel:子鏈接,復(fù)用 Connection
- Command:命令
- Virtual Host:服務(wù)器創(chuàng)建的 mini 版的 MQ
Exchange & Binding
這兩個東東算是 MQ 核心功能的實現(xiàn)組件伞剑,網(wǎng)上描述的我覺得不是很清楚斑唬。
可以把 exchange 當(dāng)路由器理解,把 binding 當(dāng)路由表理解黎泣。路由器根據(jù)路由表把數(shù)據(jù)從路由器路由到一下節(jié)點恕刘,exchange 根據(jù) binding 把消息從 exchange 路由到 queue。exchange 的核心功能是路由轉(zhuǎn)發(fā)抒倚,而路由轉(zhuǎn)發(fā)的依據(jù)是 binding褐着。把 binding 當(dāng)路由表的話,那么這個路由表有三項:
- exchange name
- queue name
- router key
三者的關(guān)系需要在實際生產(chǎn)托呕、消費消息之前完成綁定含蓉。而后消息到達 exchange 后根據(jù) routing key 路由到指定 queue。而 exchange 有多種不同實現(xiàn)项郊,不同實現(xiàn)的 exchange 根據(jù) routing key 的路由方式不同谴餐,適用于不同場景。
典型場景
以下例子從 Rabbit MQ 官網(wǎng)給搬運呆抑,傳送門岂嗓。
direct
direct 類型的交換器嚴格根據(jù)消息頭的 exchange name, queue name, router key 將消息路由到對應(yīng)的隊列
消息投遞到一個隊列
所有消息由默認交換器根據(jù)消息頭 queue name 投遞到隊列。沒有聲明交換器鹊碍,自動將隊列綁定到了默認交換器厌殉。下面代碼的第二個參數(shù)很容易被當(dāng)做 queue name,實際上這個字段是 routing key侈咕,發(fā)送方是不關(guān)心 queue 的公罕。
channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 聲明隊列
channel.basicPublish("", // exchange name,空則投遞到默認交換器
QUEUE_NAME, // 以 queue_name 作為 routing key
null,
message.getBytes());
消息投遞到一個隊列由多個消費者消費
可以用于負載均衡的生產(chǎn)者消費者模型耀销,每個消息正常只被消費一次楼眷。
投遞過程與上一個一樣,隊列的消息同時由多個消費者消費
消息有選擇的分散到多個隊列
channel.exchangeDeclare(EXCHANGE_NAME, "direct"); // 聲明 direct 類型交換器
String queueName = channel.queueDeclare().getQueue(); // 聲明隨機隊列,并獲取該隊列名字
channel.queueBind(queueName, EXCHANGE_NAME, ROUTING_KEY); // 綁定
// 發(fā)送
channel.basicPublish(EXCHANGE_NAME,
ROUTING_KEY,
null,
message.getBytes());
完整的交換器罐柳,隊列聲明并綁定掌腰,消息根據(jù)綁定信息投遞到對應(yīng)隊列。
一個交換器與多個隊列使用相同的 routing key 進行綁定张吉,當(dāng)該 routing key 消息發(fā)送至交換器可以形成廣播的形式齿梁。
fanout
channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); // 聲明 fanout 類型交換器
String queueName = channel.queueDeclare().getQueue(); // 創(chuàng)建非持久的吊洼,唯一的闷畸,自動刪除的隊列
channel.queueBind(queueName, EXCHANGE_NAME, ""); // 綁定隊列與交換器尖淘,不要 routing key
// 發(fā)送
channel.basicPublish(EXCHANGE_NAME,
"", // routing key
null,
message.getBytes());
交換器收到的消息廣播至所有綁定的隊列积蜻,綁定不需要給定 routing key
topic
channel.exchangeDeclare(EXCHANGE_NAME, "topic"); // 聲明 topic 類型交換器
String queueName = channel.queueDeclare().getQueue(); // 創(chuàng)建非持久的、唯一的玫坛、自動刪除的隊列
channel.queueBind(queueName, EXCHANGE_NAME, ROUTING_KEY); // 綁定隊列墩虹,交換器航厚,路由鍵
// 發(fā)送
channel.basicPublish(EXCHANGE_NAME,
ROUTING_KEY,
null,
msg.getBytes());
編碼過程與使用 direct 交換器的完整過程一直昆码,但是 routing key 可以使用通配符:
-
*
將 . 視為分隔符進行匹配 -
#
將任意字符串視為關(guān)鍵字匹配
其他
- 消息發(fā)后即忘:消息單向傳遞气忠,默認并不會向發(fā)送方確認發(fā)送,也不會做持久化
- prefetch count:在 direct 的第二場景下未桥,消息會被平均分配給各個消費者笔刹,而不考慮消費者的消費能力芥备《ⅲ可以使用設(shè)置 Prefetch count 保持各消費者負載均衡
- binding key:有的文章將 bindling 中使用的 routing key 也稱作 binding key,我統(tǒng)一稱為 binding key 了
- 相關(guān) demo 傳送門