什么是mq
MQ,全稱是Message Queue,是基于數(shù)據(jù)結(jié)構(gòu)中“先進先出”的一種數(shù)據(jù)結(jié)構(gòu)颓鲜,指把要傳輸?shù)臄?shù)據(jù)(消息)放在隊列中表窘,用隊列機制來實現(xiàn)消息傳遞——生產(chǎn)者產(chǎn)生消息并把消息放入隊列,然后由消費者去處理甜滨。消費者可以到指定隊列拉取消息乐严,或者訂閱相應(yīng)的隊列,由MQ服務(wù)端給其推送消息衣摩。
mq的使用場景
在技術(shù)小蟲的工作中昂验,在以下場景中用到過
- 每天對數(shù)據(jù)的統(tǒng)計,需要發(fā)送很多封郵件艾扮,但是我們不想因為把發(fā)送郵件這個功能嵌套在我們的統(tǒng)計腳本中(一是為了提高統(tǒng)計效率既琴,二是為了業(yè)務(wù)的解耦)
- 使用rabbitmq的死信隊列完成對庫存、題庫的回收工作栏渺,比如 某個商品被下單了減了庫存呛梆,但是遲遲沒有付款锐涯,超過30分鐘我們就默認訂單取消磕诊,并恢復(fù)庫存。或者在醫(yī)生搶題 答題的業(yè)務(wù)中霎终,有的醫(yī)生搶了10道題滞磺,但是只在有效期(比如1天)內(nèi)答了3道題,但是剩余的7道題不可能一直被這個醫(yī)生綁定莱褒,超過1天就要被解綁回庫击困。
- 數(shù)據(jù)信息的同步,隨著微服務(wù)的盛行广凸,就會存在對于不同的業(yè)務(wù)會有不同的用戶表阅茶,比如,公司原本就有一個用戶表的總表谅海。但是隨著業(yè)務(wù)的發(fā)展脸哀,購車貸是一個服務(wù),他會有自己的用戶表扭吁,而消費貸也是一個服務(wù)撞蜂,也有自己的用戶表。而每個服務(wù)的用戶信息發(fā)生變化侥袜,其實都要同步到用戶表總表中去蝌诡。那么這個時候我們就用到了消息隊列,每當(dāng)用戶修改信息的時候枫吧,都往消息隊列投放一個消息浦旱。我們通過再這個隊列完成信息的同步。
- 還有一些情景會觸發(fā)很多操作由蘑,那么這個時候也會用到消息隊列闽寡,比如 醫(yī)生注冊功能,但是在醫(yī)生完成注冊的時候尼酿,我們要同步給他開通其他賬號爷狈,比如醫(yī)療科普號,科普講壇號這些賬號裳擎。
- 還有一些高并發(fā)的場景涎永,比如一個搶購的活動,年底的時候我們會有積分兌換的活動鹿响,一共兩個小時羡微,那么這個時候打過來的請求非常多,如果每個都取請求來了我們都取更新一下數(shù)據(jù)庫惶我,做一些更新的操作妈倔,可能牽涉的很多個表,那么數(shù)據(jù)庫可能就扛不住绸贡。那么為了避免這種情況盯蝴,我們就可以先把這些數(shù)據(jù)存起來毅哗,讓數(shù)據(jù)庫慢慢去消化。這就是流量削峰捧挺。
mq有哪些產(chǎn)品和對比
為什么是rabbitmq
RabbitMQ是使用Erlang語言開發(fā)的開源消息隊列系統(tǒng)虑绵,基于AMQP協(xié)議來實現(xiàn)。
AMQP的主要特征是面向消息闽烙、隊列翅睛、路由(包括點對點和發(fā)布/訂閱)、可靠性黑竞、安全捕发。
AMQP協(xié)議更多用在企業(yè)系統(tǒng)內(nèi),對數(shù)據(jù)一致性很魂、穩(wěn)定性和可靠性要求很高的場景爬骤,對性能和吞吐量的要求還在其次。
RabbitMQ的可靠性是非常好的莫换,數(shù)據(jù)能夠保證百分之百的不丟失霞玄。可以使用鏡像隊列拉岁,它的穩(wěn)定性非常好坷剧。所以說在我們互聯(lián)網(wǎng)的金融行業(yè)。對數(shù)據(jù)的穩(wěn)定性和可靠性要求都非常高的情況下喊暖,我們都會選擇RabbitMQ惫企。當(dāng)然沒有kafka性能好,但是要比AvtiveMQ性能要好很多陵叽。也可以自己做一些性能的優(yōu)化狞尔。
RabbitMQ可以構(gòu)建異地雙活架構(gòu),包括每一個節(jié)點存儲方式可以采用磁盤或者內(nèi)存的方式巩掺。
安裝啟動(ubuntu 18) 參考文章
#先查看一下我的版本號
root@guofu:~# cat /etc/issue
Ubuntu 18.04.5 LTS \n \l
#從前面的mq對比中已經(jīng)說了偏序,rabbitmq是erlang實現(xiàn)的,所以需要安裝erlang
26 sudo apt-get install erlang-nox
# 添加公鑰
27 wget -O- https://www.rabbitmq.com/rabbitmq-release-signing-key.asc | sudo apt-key add -
# 更新軟件包
28 sudo apt-get update
# 安裝rabbitmq 胖替,安裝完畢自動啟動
29 sudo apt-get install rabbitmq-server
# 查看rabbitmq的運行狀態(tài) service rabbitmq-server status 也可以查看
30 systemctl status rabbitmq-server
info:Active: active (running) since Mon 2021-07-26 11:15:54 CST; 13s ago
#服務(wù)的啟動研儒、停止、重啟
31 sudo service rabbitmq-server stop
32 sudo service rabbitmq-server start
33 sudo service rabbitmq-server
# 安裝可視化的web操作頁面
34 sudo rabbitmq-plugins enable rabbitmq_management
35 sudo service rabbitmq-server restart
36 curl http://localhost:15672
至此独令,rabbitmq安裝完畢端朵,web頁面也可以訪問了。默認用戶名和密碼是guest/guest,但是燃箭,rabbitmq默認會創(chuàng)建guest用戶冲呢,但是只能服務(wù)器本機登錄,建議創(chuàng)建其他新用戶招狸,授權(quán)敬拓,用來做其他操作瓤湘。所以我們接下來開始創(chuàng)建一個新的用戶
# 查看所有用戶
38 sudo rabbitmqctl list_users
#增加用戶admin 密碼是passwd(根據(jù)需求自定義即可)
39 sudo rabbitmqctl add_user admin passwd
# 給普通用戶分配管理員角色
40 sudo rabbitmqctl set_user_tags admin administrator
#賦予virtual host中所有資源的配置、寫恩尾、讀權(quán)限以便管理其中的資源,也是添加遠程訪問權(quán)限
41 sudo rabbitmqctl set_permissions -p / admin '.*' '.*' '.*'
使用admin遠程登錄
配置文件解讀
rabbitmq-env.conf rabbitmq的環(huán)境變量
root@guofu:~# cd /etc/rabbitmq/
root@guofu:/etc/rabbitmq# ls
enabled_plugins rabbitmq-env.conf
root@guofu:/etc/rabbitmq# cat rabbitmq-env.conf
# Defaults to rabbit. This can be useful if you want to run more than one node
# per machine - RABBITMQ_NODENAME should be unique per erlang-node-and-machine
# combination. See the clustering on a single machine guide for details:
# http://www.rabbitmq.com/clustering.html#single-machine
#NODENAME=rabbit --節(jié)點名稱挽懦,如果服務(wù)是集群的形式翰意,每個節(jié)點的名稱必須唯一
# By default RabbitMQ will bind to all interfaces, on IPv4 and IPv6 if
# available. Set this if you only want to bind to one network interface or#
# address family.
#NODE_IP_ADDRESS=127.0.0.1 --節(jié)點的ip地址
# Defaults to 5672.
#NODE_PORT=5672 --節(jié)點的端口號
# Default rabbitmq-server wait timeout.
mq服務(wù)器的架構(gòu)
- 我們先來看一下rabbitmq的架構(gòu)圖
- Broker : 標(biāo)識消息隊列服務(wù)器實體rabbitmq-server
- v-host : Virtual Host 虛擬主機。vhost是rabbitmq分配權(quán)限的最小細粒度信柿,比如冀偶,我有兩個用戶a和b,我如果想讓a用戶只訪問a1隊列,b用戶訪問b1隊列渔嚷,那么在同一個vhost下进鸠,這是做不到的。
我們可以為一個用戶分配一個可以訪問哪個或者哪一些vhost的權(quán)限形病。但是不能為用戶分配一個可以訪問哪一些exchange客年,或者queue的權(quán)限,因為rabbitmq的權(quán)限細粒度沒有細化到交換器和隊列漠吻,他的最小細粒度是vhost(vhost中包含許多的exchanges量瓜,queues,bingdings)途乃。
所以如果exchangeA 和queueA 只能讓用戶A訪問绍傲,exchangeB 和queueB 只能讓用戶B訪問,要達到這種需求耍共,只能為exchangeA 和queueA創(chuàng)建一個vhostA烫饼,為exchangeB 和queueB 創(chuàng)建vhostB,這樣就隔離開來了试读。杠纵。vhost是AMQP概念的基礎(chǔ),必須在鏈接時指定钩骇。
RabbitMQ默認的vhost是 /淡诗。查看所有虛擬主機的命令是
root@guofu:/etc/rabbitmq# rabbitmqctl add_vhost test_vhost
Creating vhost "test_vhost"
root@guofu:/etc/rabbitmq# rabbitmqctl list_vhosts
Listing vhosts
/
test_vhost
# 查看用戶列表
root@guofu:/etc/rabbitmq# rabbitmqctl list_users
Listing users
admin [administrator]
guest [administrator]
# 分配訪問權(quán)限 set_permissions [-p <vhost>] <user> <conf> <write> <read>
# 需要注意的是RabbitMQ會緩存每個connection或channel的權(quán)限驗證結(jié)果、因此權(quán)限發(fā)生變化后需要重連才能生效伊履。
root@guofu:/etc/rabbitmq# set rabbitmqctl set_permissions -p test_host admin ".*" ".*" ".*"
-
exchange:交換器用來接收生產(chǎn)者發(fā)送的消息并將這些消息路由給服務(wù)器中的隊列韩容。
從web頁面可以看到,exchange可以選擇的有四種唐瀑,持久化方式有兩種群凶,一種是內(nèi)存,一種是硬盤
- fanout / (Publish/Subscribe) / 發(fā)布訂閱
生產(chǎn)者將消息給交換機哄辣,交換機根據(jù)自身的類型(fanout)將會把所有消息復(fù)制同步到所有與其綁定的隊列请梢,每個隊列可以有一個消費者接收消息進行消費邏輯赠尾。需要我們自己創(chuàng)建交換器并進行綁定,創(chuàng)建多個隊列進行綁定即可毅弧,若一個消費者綁定多個隊列則進行輪詢气嫁,因為mq有閱后即焚的特點,只能保證一個消費者閱讀接受够坐。常用于群發(fā)消息寸宵。
- 路由模式 / Routing / direct
生產(chǎn)者將消息發(fā)送到交換機信息攜帶具體的路由key,交換機的類型是direct,將接收到的信息中的routingKey,比對與之綁定的隊列routingkey元咙。消費者監(jiān)聽一個隊列梯影,獲取消息,執(zhí)行消費邏輯庶香。一個隊列可以綁定一個routingKey也可以綁定多個甲棍。在消息進行路由時會攜帶一個routingKey尋找對應(yīng)的隊列。
- Topic/ 通配符匹配
生產(chǎn)者發(fā)送消息赶掖,消息中帶有具體的路由key感猛,交換機的類型是topic称诗,隊列綁定交換機不在使用具體的路由key而是一個范圍值主经,例如: .yell.,hlll.iii,jjj.#。其中* 表示一個字符串(不能攜帶特殊字符)#表示任意
- header exchange(頭交換機)和主題交換機有點相似兜畸,但是不同于主題交換機的路由是基于路由鍵呈驶,頭交換機的路由值基于消息的header數(shù)據(jù)拷泽。舉栗說明
隊列A:綁定交換機參數(shù)是:format=pdf,type=report,x-match=all,
隊列B: 綁定交換機參數(shù)是:format=pdf,type=log,x-match=any袖瞻,
隊列C:綁定交換機參數(shù)是:format=zip,type=report,x-match=all司致,
消息1發(fā)送交換機的頭參數(shù)是:format=pdf,type=reprot則消息傳送到隊列A
消息2發(fā)送交換機的頭參數(shù)是:format=pdf則消息傳送到隊列A和隊列B
消息3發(fā)送交換機的頭參數(shù)是:format=zip,type=log則消息沒有匹配隊列,此消息會被丟棄
all: 默認值聋迎。一個傳送消息的header里的鍵值對和交換機的header鍵值對全部匹配脂矫,才可以路由到對應(yīng)交換機
any: 一個傳送消息的header里的鍵值對和交換機的header鍵值對任意一個匹配,就可以路由到對應(yīng)交換機
- queen:消息隊列霉晕,用來保存消息直到發(fā)送給消費者庭再。它是消息的容器,也是消息的終點牺堰。一個消息可投入一個或多個隊列拄轻。消息一直在隊列里面,等待消費者連接到這個隊列將其取走伟葫。
Banding : 綁定恨搓,用于消息隊列和交換機之間的關(guān)聯(lián)。一個綁定就是基于路由鍵將交換機和消息隊列連接起來的路由規(guī)則,所以可以將交換器理解成一個由綁定構(gòu)成的路由表斧抱。### Virtual Host的使用
Channel : 信道常拓,多路復(fù)用連接中的一條獨立的雙向數(shù)據(jù)流通道。信道是建立在真實的TCP連接內(nèi)的虛擬鏈接辉浦,AMQP命令都是通過信道發(fā)出去的弄抬,不管是發(fā)布消息、訂閱隊列還是接收消息宪郊,這些動作都是通過信道完成掂恕。因為對于操作系統(tǒng)來說,建立和銷毀TCP都是非常昂貴的開銷废膘,所以引入了信道的概念,以復(fù)用一條TCP連接慕蔚。
Connection : 網(wǎng)絡(luò)連接丐黄,比如一個TCP連接。
接下來我們根據(jù)上面的exchang的不同類型做一個演示
- 先來創(chuàng)建用戶和vhost(這里為了演示孔飒,會盡可能多的使用到前面講的命令灌闺,具體要根據(jù)需求 是否創(chuàng)建vhost),另外這些操作通過web頁面也可以完成坏瞄。
# 創(chuàng)建vhost
root@guofu:/etc/rabbitmq# rabbitmqctl add_vhost guofu_vhost
Creating vhost "guofu_vhost"
#查看vhost列表
root@guofu:/etc/rabbitmq# rabbitmqctl list_vhosts
Listing vhosts
guofu_vhost
/
test_vhost
# 創(chuàng)建用戶和密碼
root@guofu:/etc/rabbitmq# rabbitmqctl add_user guofu guofu
Creating user "guofu"
#查看用戶列表
root@guofu:/etc/rabbitmq# rabbitmqctl list_users
Listing users
vhost1 []
admin [administrator]
guofu []
guest [administrator]
# 給用戶設(shè)置角色桂对,否則遠程登錄不了
root@guofu:/etc/rabbitmq# sudo rabbitmqctl set_user_tags guofu administrator
Setting tags for user "guofu" to [administrator]
#給用戶 vhost的權(quán)限,3個* 代表 配置 讀 寫的權(quán)限
root@guofu:/etc/rabbitmq# sudo rabbitmqctl set_permissions -p guofu_vhost guofu ".*" ".*" ".*"
Setting permissions for user "guofu" in vhost "guofu_vhost"
# 查看用戶權(quán)限
root@guofu:/etc/rabbitmq# sudo rabbitmqctl list_user_permissions guofu
Listing permissions for user "guofu"
guofu_vhost .* .* .*
配置完畢后鸠匀,我們在頁面也可以看到蕉斜,已經(jīng)生效了
-
新建一個交換機并指定vhost
-
新建兩個隊列并綁定exchange
- 我們把信息配置到代碼中去相關(guān)參考資料
package main
import (
"github.com/streadway/amqp"
)
/**
* @Description: 演示rabbitmq的exchange類型-生產(chǎn),fanout 為了方便演示,忽略錯誤捕捉
*/
func main() {
//交換機
var exchange="guofu_exchange"
//建立連接 用戶名+密碼+ip+端口號+vhost
conn, _ := amqp.Dial("amqp://guofu:guofu@172.16.131.3:5672/guofu_vhost")
//建立通道
ch, _ := conn.Channel()
//聲明交換機類型
ch.ExchangeDeclare(
exchange,
"fanout",
true,
false,
false,
false,
nil,
)
//定義消息
msgBody:="i am a msg3"
//發(fā)送消息 相關(guān)參數(shù) exchange, key string, mandatory, immediate bool, msg Publishing
err := ch.Publish(
exchange, //exchange
"", //routing key(queue name)
false,
false,
amqp.Publishing{
DeliveryMode: amqp.Persistent, //Msg set as persistent
ContentType: "text/plain",
Body: []byte(msgBody),
})
if err !=nil{
panic(err)
}
}
-
我們通過web頁面看一下
可見對于fanout 發(fā)布訂閱 缀棍,其實我們在推送消息的時候宅此,只用到了exchange和type,而不關(guān)系隊列,因為只要是綁定了該exchange的隊列爬范,都會被推送消息父腕。也就是說,fanout模式青瀑,一個消息會被推送到多個隊列璧亮,那么哪種情景會用到這種模式呢?比如 用戶注冊后斥难,我既要發(fā)郵件枝嘶,又要發(fā)短信,那么發(fā)短信和發(fā)郵件哑诊,就可以用fanout 這種模式
下面我寫一下消費的代碼躬络,消費隊列的方法其實都一樣,這里演示一次搭儒,后面的其他類型的exchange就不演示了穷当。
package main
import (
"github.com/streadway/amqp"
)
/**
* @Description: 演示rabbitmq的exchange類型-生產(chǎn),fanout 為了方便演示提茁,忽略錯誤捕捉
*/
func main() {
//交換機
var exchange="guofu_exchange"
//建立連接 用戶名+密碼+ip+端口號+vhost
conn, _ := amqp.Dial("amqp://guofu:guofu@172.16.131.3:5672/guofu_vhost")
//建立通道
ch, _ := conn.Channel()
//聲明交換機類型
ch.ExchangeDeclare(
exchange,
"fanout",
true,
false,
false,
false,
nil,
)
//定義消息
msgBody:="i am a msg3"
//發(fā)送消息 相關(guān)參數(shù) exchange, key string, mandatory, immediate bool, msg Publishing
err := ch.Publish(
exchange, //exchange
"", //routing key(queue name)
false,
false,
amqp.Publishing{
DeliveryMode: amqp.Persistent, //Msg set as persistent
ContentType: "text/plain",
Body: []byte(msgBody),
})
if err !=nil{
panic(err)
}
}
-
上面的代碼相信大家都看的明白,但是要注意的是馁菜,里面有一個點 【試探性創(chuàng)建】 這是什么意思茴扁?這是說,如果有這個exchange/queue汪疮,就用峭火,沒有的話就創(chuàng)建,剛才我并沒有創(chuàng)建guofu_queue3,但是我監(jiān)聽這個隊列也得到消息了
那么我們用消費代碼創(chuàng)建一下新的exchange和queue
package main
import (
"fmt"
"github.com/streadway/amqp"
)
func main() {
//交換機
var exchange = "guofu_exchange_test"
var queue = "guofu_queue_test"
var key = ""
//建立連接 用戶名+密碼+ip+端口號+vhost
conn, _ := amqp.Dial("amqp://guofu:guofu@172.16.131.3:5672/guofu_vhost")
//建立通道
ch, _ := conn.Channel()
//試探性聲明交換機類型
ch.ExchangeDeclare(
exchange,
"fanout",
true,
false,
false,
false,
nil,
)
//試探性創(chuàng)建隊列
//聲明queue 和相關(guān)屬性 相關(guān)參數(shù) name string, durable, autoDelete, exclusive, noWait bool, args Table
_, err := ch.QueueDeclare(
queue,
true,
false,
false,
false,
nil,
)
if err != nil {
panic(err)
}
//綁定隊列 (name, key, exchange string, noWait bool, args Table) 發(fā)布訂閱模式的key為空
ch.QueueBind(queue, key, exchange, false, nil)
// 消費隊列 Consume(queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args Table)
msg, err := ch.Consume(
queue,
"",
false,
false,
false,
false,
nil,
)
for d:=range msg{
fmt.Println(string(d.Body))
d.Ack(false)
}
}
-
交換機和隊列被創(chuàng)建
還有一點要注意的是ACK 機制
確認機制分為三種:none智嚷、auto (默認)卖丸、manual
自動 ACK:消息一旦被接收,消費者自動發(fā)送 ACK
手動 ACK:消息接收后盏道,不會發(fā)送 ACK稍浆,需要手動調(diào)用
這兩 ACK 要怎么選擇呢?這需要看消息的重要性:
如果消息不太重要猜嘱,丟失也沒有影響衅枫,那么自動 ACK 會比較方便
如果消息非常重要,不容丟失朗伶。那么最好在消費完成后手動 ACK弦撩,否則接收消息后就自動 ACK,RabbitMQ 就會把消息從隊列中刪除论皆。如果此時消費者宕機益楼,那么消息就丟失了。
-
另外一點是在php和java中点晴,還有一種生產(chǎn)者消息確認機制偏形,消息推送成功后支持函數(shù)回調(diào),但是golang里面我沒有找到這個方法
好了觉鼻,我們回歸exchange的第二種類型direct 路由模式俊扭,這次我們直接使用消費端的代碼直接建立隊列并監(jiān)聽
package main
import (
"fmt"
"github.com/streadway/amqp"
)
func main() {
//交換機
var exchange = "direct_guofu_exchange"
var queue = "direct_guofu_queue"
var key = "direct_key"
//建立連接 用戶名+密碼+ip+端口號+vhost
conn, _ := amqp.Dial("amqp://guofu:guofu@172.16.131.3:5672/guofu_vhost")
//建立通道
ch, _ := conn.Channel()
//試探性聲明交換機類型
ch.ExchangeDeclare(
exchange,
"direct",
true,
false,
false,
false,
nil,
)
//試探性創(chuàng)建隊列
//聲明queue 和相關(guān)屬性 相關(guān)參數(shù) name string, durable, autoDelete, exclusive, noWait bool, args Table
_, err := ch.QueueDeclare(
queue,
true,
false,
false,
false,
nil,
)
if err != nil {
panic(err)
}
//綁定隊列 (name, key, exchange string, noWait bool, args Table) 發(fā)布訂閱模式的key為空
ch.QueueBind(queue, key, exchange, false, nil)
// 消費隊列 Consume(queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args Table)
msg, err := ch.Consume(
queue,
"",
false,
false,
false,
false,
nil,
)
for d:=range msg{
fmt.Println(string(d.Body))
d.Ack(false)
}
}
使用同樣方法 創(chuàng)建隊列direct_guofu_queue
推送消息到該隊列,需要注意的是,如果你兩個queue使用了同一個key,那么exchange會根據(jù)key 推送給兩個隊列坠陈,如果不是業(yè)務(wù)需要萨惑,盡量避免重復(fù)key ,減少臟數(shù)據(jù)的生成
package main
import (
"github.com/streadway/amqp"
)
/**
* @Description: 演示rabbitmq的exchange類型-生產(chǎn),fanout 為了方便演示仇矾,忽略錯誤捕捉
*/
func main() {
var exchange = "direct_guofu_exchange"
var key = "direct_key"
//建立連接 用戶名+密碼+ip+端口號+vhost
conn, _ := amqp.Dial("amqp://guofu:guofu@172.16.131.3:5672/guofu_vhost")
//建立通道
ch, _ := conn.Channel()
//聲明交換機類型
ch.ExchangeDeclare(
exchange,
"direct",
true,
false,
false,
false,
nil,
)
//定義消息
msgBody:="i am a direct"
//發(fā)送消息 相關(guān)參數(shù) exchange, key string, mandatory, immediate bool, msg Publishing
err := ch.Publish(
exchange, //exchange
key, //routing key(queue name)
false,
false,
amqp.Publishing{
DeliveryMode: amqp.Persistent, //Msg set as persistent
ContentType: "text/plain",
Body: []byte(msgBody),
})
if err !=nil{
panic(err)
}
}
- topic模式
topic 類似于mysql的模糊查詢庸蔼,只要是能模糊匹配到的,都會推送消息贮匕。姐仅,推送的路由可以是一個包含了多個屬性,以.分割的字符串,最大程長度是200左右掏膏。推送之后劳翰,其他會匹配其他隊列的路由,如果匹配到了馒疹,則推送進去〖阳ぃ現(xiàn)在我們假設(shè)有以下場景 一共有兩個隊列,第一個隊列animal 如果是動物颖变,進入這個隊列生均,第二個隊列是 plant,第三個隊列是表示顏色yellow腥刹,如果是黃色的都進入這個隊列马胧,現(xiàn)在我們要推送這個幾個到隊列里面去
1.橘貓 既要去animal 也要去 yellow
2.菊花 既要去plant 也要去yellow
如代碼所示,我創(chuàng)建了三個隊列衔峰,綁定的key 分別是 #.animal.#,#.plant.#,yellow.#,
var exchange = "topic_guofu_exchange"
var queue = "topic727_yellow"
var key = "yellow.#"
var exchange = "topic_guofu_exchange"
var queue = "topic727_animal"
var key = "#.animal.#"
var exchange = "topic_guofu_exchange"
var queue = "topic727_plant"
var key = "#.plant.#"
那么當(dāng)我推送消息的時候佩脊,如果我topic綁定的路由鍵 是 yellow.animal.plant ,那么推送的時候 三個消息隊列都會被匹配。我們來看一下
- 把生產(chǎn)的代碼貼出來
package main
import (
"github.com/streadway/amqp"
)
/**
* @Description: 演示rabbitmq的exchange類型-生產(chǎn),fanout 為了方便演示朽色,忽略錯誤捕捉
*/
func main() {
var exchange = "topic_guofu_exchange"
var key = "yellow.animal.plant "
var queue = "topic727"
//建立連接 用戶名+密碼+ip+端口號+vhost
conn, _ := amqp.Dial("amqp://guofu:guofu@172.16.131.3:5672/guofu_vhost")
//建立通道
ch, _ := conn.Channel()
//聲明交換機類型
ch.ExchangeDeclare(
exchange,
"topic",
true,
false,
false,
false,
nil,
)
//試探性創(chuàng)建隊列
//聲明queue 和相關(guān)屬性 相關(guān)參數(shù) name string, durable, autoDelete, exclusive, noWait bool, args Table
_, err := ch.QueueDeclare(
queue,
true,
false,
false,
false,
nil,
)
if err != nil {
panic(err)
}
//綁定隊列 (name, key, exchange string, noWait bool, args Table) 發(fā)布訂閱模式的key為空
ch.QueueBind(queue, key, exchange, false, nil)
//定義消息
msgBody := key
//發(fā)送消息 相關(guān)參數(shù) exchange, key string, mandatory, immediate bool, msg Publishing
err = ch.Publish(
exchange, //exchange
key, //routing key(queue name)
false,
false,
amqp.Publishing{
DeliveryMode: amqp.Persistent, //Msg set as persistent
ContentType: "text/plain",
Body: []byte(msgBody),
})
if err != nil {
panic(err)
}
}
-
推送完畢邻吞,發(fā)現(xiàn)四個隊列都有了數(shù)據(jù)(第一個隊列是topic 推送時候綁定的组题,后面三個是路由匹配的)
-
那么此時葫男,如果我推送的key是yellow.animal,那么路由會匹配到 yellow.# 和 #.animal.#,我們來看一下
topic的功能是比較強大的崔列,利用好topic ,可以實現(xiàn) direct和fanout的功能梢褐,路由密鑰中可以包含任意多個單詞,最多255個字節(jié)赵讯。
- header exchange(頭交換機)和主題交換機有點相似盈咳,但是不同于主題交換機的路由是基于路由鍵,頭交換機的路由值基于消息的header數(shù)據(jù)边翼。在此也不做贅述了鱼响。有興趣的同學(xué)可以去官網(wǎng)看看。