超詳細的RabbitMQ快速入門!!你不拿走嗎滑燃?

什么是mq

MQ,全稱是Message Queue,是基于數(shù)據(jù)結(jié)構(gòu)中“先進先出”的一種數(shù)據(jù)結(jié)構(gòu)颓鲜,指把要傳輸?shù)臄?shù)據(jù)(消息)放在隊列中表窘,用隊列機制來實現(xiàn)消息傳遞——生產(chǎn)者產(chǎn)生消息并把消息放入隊列,然后由消費者去處理甜滨。消費者可以到指定隊列拉取消息乐严,或者訂閱相應(yīng)的隊列,由MQ服務(wù)端給其推送消息衣摩。

mq的使用場景

在技術(shù)小蟲的工作中昂验,在以下場景中用到過

  • 每天對數(shù)據(jù)的統(tǒng)計,需要發(fā)送很多封郵件艾扮,但是我們不想因為把發(fā)送郵件這個功能嵌套在我們的統(tǒng)計腳本中(一是為了提高統(tǒng)計效率既琴,二是為了業(yè)務(wù)的解耦)
  • 使用rabbitmq的死信隊列完成對庫存、題庫的回收工作栏渺,比如 某個商品被下單了減了庫存呛梆,但是遲遲沒有付款锐涯,超過30分鐘我們就默認訂單取消磕诊,并恢復(fù)庫存。或者在醫(yī)生搶題 答題的業(yè)務(wù)中霎终,有的醫(yī)生搶了10道題滞磺,但是只在有效期(比如1天)內(nèi)答了3道題,但是剩余的7道題不可能一直被這個醫(yī)生綁定莱褒,超過1天就要被解綁回庫击困。
  • 數(shù)據(jù)信息的同步,隨著微服務(wù)的盛行广凸,就會存在對于不同的業(yè)務(wù)會有不同的用戶表阅茶,比如,公司原本就有一個用戶表的總表谅海。但是隨著業(yè)務(wù)的發(fā)展脸哀,購車貸是一個服務(wù),他會有自己的用戶表扭吁,而消費貸也是一個服務(wù)撞蜂,也有自己的用戶表。而每個服務(wù)的用戶信息發(fā)生變化侥袜,其實都要同步到用戶表總表中去蝌诡。那么這個時候我們就用到了消息隊列,每當(dāng)用戶修改信息的時候枫吧,都往消息隊列投放一個消息浦旱。我們通過再這個隊列完成信息的同步。
  • 還有一些情景會觸發(fā)很多操作由蘑,那么這個時候也會用到消息隊列闽寡,比如 醫(yī)生注冊功能,但是在醫(yī)生完成注冊的時候尼酿,我們要同步給他開通其他賬號爷狈,比如醫(yī)療科普號,科普講壇號這些賬號裳擎。
  • 還有一些高并發(fā)的場景涎永,比如一個搶購的活動,年底的時候我們會有積分兌換的活動鹿响,一共兩個小時羡微,那么這個時候打過來的請求非常多,如果每個都取請求來了我們都取更新一下數(shù)據(jù)庫惶我,做一些更新的操作妈倔,可能牽涉的很多個表,那么數(shù)據(jù)庫可能就扛不住绸贡。那么為了避免這種情況盯蝴,我們就可以先把這些數(shù)據(jù)存起來毅哗,讓數(shù)據(jù)庫慢慢去消化。這就是流量削峰捧挺。

mq有哪些產(chǎn)品和對比

image.png

為什么是rabbitmq

RabbitMQ是使用Erlang語言開發(fā)的開源消息隊列系統(tǒng)虑绵,基于AMQP協(xié)議來實現(xiàn)。
AMQP的主要特征是面向消息闽烙、隊列翅睛、路由(包括點對點和發(fā)布/訂閱)、可靠性黑竞、安全捕发。
AMQP協(xié)議更多用在企業(yè)系統(tǒng)內(nèi),對數(shù)據(jù)一致性很魂、穩(wěn)定性和可靠性要求很高的場景爬骤,對性能和吞吐量的要求還在其次。
RabbitMQ的可靠性是非常好的莫换,數(shù)據(jù)能夠保證百分之百的不丟失霞玄。可以使用鏡像隊列拉岁,它的穩(wěn)定性非常好坷剧。所以說在我們互聯(lián)網(wǎng)的金融行業(yè)。對數(shù)據(jù)的穩(wěn)定性和可靠性要求都非常高的情況下喊暖,我們都會選擇RabbitMQ惫企。當(dāng)然沒有kafka性能好,但是要比AvtiveMQ性能要好很多陵叽。也可以自己做一些性能的優(yōu)化狞尔。
RabbitMQ可以構(gòu)建異地雙活架構(gòu),包括每一個節(jié)點存儲方式可以采用磁盤或者內(nèi)存的方式巩掺。

安裝啟動(ubuntu 18) 參考文章

#先查看一下我的版本號
root@guofu:~# cat /etc/issue
Ubuntu 18.04.5 LTS \n \l

#從前面的mq對比中已經(jīng)說了偏序,rabbitmq是erlang實現(xiàn)的,所以需要安裝erlang
   26  sudo apt-get install erlang-nox
# 添加公鑰
   27  wget -O- https://www.rabbitmq.com/rabbitmq-release-signing-key.asc | sudo apt-key add -
# 更新軟件包
   28  sudo apt-get update
# 安裝rabbitmq 胖替,安裝完畢自動啟動
   29  sudo apt-get install rabbitmq-server
# 查看rabbitmq的運行狀態(tài)  service rabbitmq-server status  也可以查看
   30  systemctl status rabbitmq-server
   info:Active: active (running) since Mon 2021-07-26 11:15:54 CST; 13s ago

#服務(wù)的啟動研儒、停止、重啟
   31  sudo service rabbitmq-server stop
   32  sudo service rabbitmq-server start
   33  sudo service rabbitmq-server 

# 安裝可視化的web操作頁面
   34  sudo rabbitmq-plugins enable rabbitmq_management
   35  sudo service rabbitmq-server restart
   36  curl  http://localhost:15672

至此独令,rabbitmq安裝完畢端朵,web頁面也可以訪問了。默認用戶名和密碼是guest/guest,但是燃箭,rabbitmq默認會創(chuàng)建guest用戶冲呢,但是只能服務(wù)器本機登錄,建議創(chuàng)建其他新用戶招狸,授權(quán)敬拓,用來做其他操作瓤湘。所以我們接下來開始創(chuàng)建一個新的用戶

# 查看所有用戶
   38  sudo rabbitmqctl list_users
#增加用戶admin 密碼是passwd(根據(jù)需求自定義即可)
   39  sudo rabbitmqctl add_user admin  passwd
# 給普通用戶分配管理員角色
   40  sudo rabbitmqctl set_user_tags admin administrator
#賦予virtual host中所有資源的配置、寫恩尾、讀權(quán)限以便管理其中的資源,也是添加遠程訪問權(quán)限
   41  sudo rabbitmqctl  set_permissions -p / admin '.*' '.*' '.*'

使用admin遠程登錄


image.png

配置文件解讀

rabbitmq-env.conf rabbitmq的環(huán)境變量

root@guofu:~# cd /etc/rabbitmq/
root@guofu:/etc/rabbitmq# ls
enabled_plugins  rabbitmq-env.conf
root@guofu:/etc/rabbitmq# cat rabbitmq-env.conf
# Defaults to rabbit. This can be useful if you want to run more than one node
# per machine - RABBITMQ_NODENAME should be unique per erlang-node-and-machine
# combination. See the clustering on a single machine guide for details:
# http://www.rabbitmq.com/clustering.html#single-machine
#NODENAME=rabbit  --節(jié)點名稱挽懦,如果服務(wù)是集群的形式翰意,每個節(jié)點的名稱必須唯一

# By default RabbitMQ will bind to all interfaces, on IPv4 and IPv6 if
# available. Set this if you only want to bind to one network interface or#
# address family.
#NODE_IP_ADDRESS=127.0.0.1 --節(jié)點的ip地址

# Defaults to 5672.
#NODE_PORT=5672 --節(jié)點的端口號

# Default rabbitmq-server wait timeout.

mq服務(wù)器的架構(gòu)

相關(guān)參考
相關(guān)參考

  • 我們先來看一下rabbitmq的架構(gòu)圖
image.png
image.png
  • Broker : 標(biāo)識消息隊列服務(wù)器實體rabbitmq-server
  • v-host : Virtual Host 虛擬主機。vhost是rabbitmq分配權(quán)限的最小細粒度信柿,比如冀偶,我有兩個用戶a和b,我如果想讓a用戶只訪問a1隊列,b用戶訪問b1隊列渔嚷,那么在同一個vhost下进鸠,這是做不到的。
    我們可以為一個用戶分配一個可以訪問哪個或者哪一些vhost的權(quán)限形病。但是不能為用戶分配一個可以訪問哪一些exchange客年,或者queue的權(quán)限,因為rabbitmq的權(quán)限細粒度沒有細化到交換器和隊列漠吻,他的最小細粒度是vhost(vhost中包含許多的exchanges量瓜,queues,bingdings)途乃。
    所以如果exchangeA 和queueA 只能讓用戶A訪問绍傲,exchangeB 和queueB 只能讓用戶B訪問,要達到這種需求耍共,只能為exchangeA 和queueA創(chuàng)建一個vhostA烫饼,為exchangeB 和queueB 創(chuàng)建vhostB,這樣就隔離開來了试读。杠纵。vhost是AMQP概念的基礎(chǔ),必須在鏈接時指定钩骇。
    RabbitMQ默認的vhost是 /淡诗。查看所有虛擬主機的命令是
root@guofu:/etc/rabbitmq# rabbitmqctl add_vhost test_vhost
Creating vhost "test_vhost"
root@guofu:/etc/rabbitmq# rabbitmqctl list_vhosts
Listing vhosts
/
test_vhost

# 查看用戶列表
root@guofu:/etc/rabbitmq# rabbitmqctl list_users
Listing users
admin   [administrator]
guest   [administrator]

# 分配訪問權(quán)限  set_permissions [-p <vhost>] <user> <conf> <write> <read>
# 需要注意的是RabbitMQ會緩存每個connection或channel的權(quán)限驗證結(jié)果、因此權(quán)限發(fā)生變化后需要重連才能生效伊履。
root@guofu:/etc/rabbitmq# set rabbitmqctl set_permissions -p test_host  admin ".*" ".*" ".*"


  • exchange:交換器用來接收生產(chǎn)者發(fā)送的消息并將這些消息路由給服務(wù)器中的隊列韩容。


    image.png

從web頁面可以看到,exchange可以選擇的有四種唐瀑,持久化方式有兩種群凶,一種是內(nèi)存,一種是硬盤

  • fanout / (Publish/Subscribe) / 發(fā)布訂閱
image

生產(chǎn)者將消息給交換機哄辣,交換機根據(jù)自身的類型(fanout)將會把所有消息復(fù)制同步到所有與其綁定的隊列请梢,每個隊列可以有一個消費者接收消息進行消費邏輯赠尾。需要我們自己創(chuàng)建交換器并進行綁定,創(chuàng)建多個隊列進行綁定即可毅弧,若一個消費者綁定多個隊列則進行輪詢气嫁,因為mq有閱后即焚的特點,只能保證一個消費者閱讀接受够坐。常用于群發(fā)消息寸宵。

  • 路由模式 / Routing / direct
image

生產(chǎn)者將消息發(fā)送到交換機信息攜帶具體的路由key,交換機的類型是direct,將接收到的信息中的routingKey,比對與之綁定的隊列routingkey元咙。消費者監(jiān)聽一個隊列梯影,獲取消息,執(zhí)行消費邏輯庶香。一個隊列可以綁定一個routingKey也可以綁定多個甲棍。在消息進行路由時會攜帶一個routingKey尋找對應(yīng)的隊列。

  • Topic/ 通配符匹配
image

生產(chǎn)者發(fā)送消息赶掖,消息中帶有具體的路由key感猛,交換機的類型是topic称诗,隊列綁定交換機不在使用具體的路由key而是一個范圍值主经,例如: .yell.,hlll.iii,jjj.#。其中* 表示一個字符串(不能攜帶特殊字符)#表示任意

  • header exchange(頭交換機)和主題交換機有點相似兜畸,但是不同于主題交換機的路由是基于路由鍵呈驶,頭交換機的路由值基于消息的header數(shù)據(jù)拷泽。舉栗說明
隊列A:綁定交換機參數(shù)是:format=pdf,type=report,x-match=all,
隊列B: 綁定交換機參數(shù)是:format=pdf,type=log,x-match=any袖瞻,
隊列C:綁定交換機參數(shù)是:format=zip,type=report,x-match=all司致,

消息1發(fā)送交換機的頭參數(shù)是:format=pdf,type=reprot則消息傳送到隊列A
消息2發(fā)送交換機的頭參數(shù)是:format=pdf則消息傳送到隊列A和隊列B
消息3發(fā)送交換機的頭參數(shù)是:format=zip,type=log則消息沒有匹配隊列,此消息會被丟棄

all: 默認值聋迎。一個傳送消息的header里的鍵值對和交換機的header鍵值對全部匹配脂矫,才可以路由到對應(yīng)交換機
any: 一個傳送消息的header里的鍵值對和交換機的header鍵值對任意一個匹配,就可以路由到對應(yīng)交換機

  • queen:消息隊列霉晕,用來保存消息直到發(fā)送給消費者庭再。它是消息的容器,也是消息的終點牺堰。一個消息可投入一個或多個隊列拄轻。消息一直在隊列里面,等待消費者連接到這個隊列將其取走伟葫。
  • Banding : 綁定恨搓,用于消息隊列和交換機之間的關(guān)聯(lián)。一個綁定就是基于路由鍵將交換機和消息隊列連接起來的路由規(guī)則,所以可以將交換器理解成一個由綁定構(gòu)成的路由表斧抱。### Virtual Host的使用

  • Channel : 信道常拓,多路復(fù)用連接中的一條獨立的雙向數(shù)據(jù)流通道。信道是建立在真實的TCP連接內(nèi)的虛擬鏈接辉浦,AMQP命令都是通過信道發(fā)出去的弄抬,不管是發(fā)布消息、訂閱隊列還是接收消息宪郊,這些動作都是通過信道完成掂恕。因為對于操作系統(tǒng)來說,建立和銷毀TCP都是非常昂貴的開銷废膘,所以引入了信道的概念,以復(fù)用一條TCP連接慕蔚。

  • Connection : 網(wǎng)絡(luò)連接丐黄,比如一個TCP連接。

接下來我們根據(jù)上面的exchang的不同類型做一個演示
  • 先來創(chuàng)建用戶和vhost(這里為了演示孔飒,會盡可能多的使用到前面講的命令灌闺,具體要根據(jù)需求 是否創(chuàng)建vhost),另外這些操作通過web頁面也可以完成坏瞄。
# 創(chuàng)建vhost
root@guofu:/etc/rabbitmq# rabbitmqctl add_vhost guofu_vhost
Creating vhost "guofu_vhost"
#查看vhost列表
root@guofu:/etc/rabbitmq# rabbitmqctl list_vhosts
Listing vhosts
guofu_vhost
/
test_vhost
# 創(chuàng)建用戶和密碼
root@guofu:/etc/rabbitmq# rabbitmqctl add_user guofu guofu
Creating user "guofu"
#查看用戶列表
root@guofu:/etc/rabbitmq# rabbitmqctl list_users
Listing users
vhost1  []
admin   [administrator]
guofu   []
guest   [administrator]
# 給用戶設(shè)置角色桂对,否則遠程登錄不了
root@guofu:/etc/rabbitmq# sudo rabbitmqctl set_user_tags guofu administrator
Setting tags for user "guofu" to [administrator]
#給用戶 vhost的權(quán)限,3個* 代表 配置 讀 寫的權(quán)限
root@guofu:/etc/rabbitmq# sudo rabbitmqctl set_permissions -p guofu_vhost guofu ".*" ".*" ".*"
Setting permissions for user "guofu" in vhost "guofu_vhost"
# 查看用戶權(quán)限
root@guofu:/etc/rabbitmq# sudo rabbitmqctl list_user_permissions guofu
Listing permissions for user "guofu"
guofu_vhost     .*      .*      .*

配置完畢后鸠匀,我們在頁面也可以看到蕉斜,已經(jīng)生效了


image.png
  • 新建一個交換機并指定vhost


    image.png
  • 新建兩個隊列并綁定exchange


    image.png
package main

import (
    "github.com/streadway/amqp"
)

/**
 * @Description: 演示rabbitmq的exchange類型-生產(chǎn),fanout     為了方便演示,忽略錯誤捕捉
 */
func main() {
    //交換機
    var exchange="guofu_exchange"
    //建立連接  用戶名+密碼+ip+端口號+vhost
    conn, _ := amqp.Dial("amqp://guofu:guofu@172.16.131.3:5672/guofu_vhost")
    //建立通道
    ch, _ := conn.Channel()
    //聲明交換機類型
    ch.ExchangeDeclare(
        exchange,
        "fanout",
        true,
        false,
        false,
        false,
        nil,
        )

    //定義消息
    msgBody:="i am a msg3"
    //發(fā)送消息  相關(guān)參數(shù) exchange, key string, mandatory, immediate bool, msg Publishing
    err := ch.Publish(
        exchange,     //exchange
        "", //routing key(queue name)
        false,
        false,
        amqp.Publishing{
            DeliveryMode: amqp.Persistent, //Msg set as persistent
            ContentType:  "text/plain",
            Body:         []byte(msgBody),
        })

    if err !=nil{
        panic(err)
    }
}


  • 我們通過web頁面看一下


    image.png

    image.png
  • 可見對于fanout 發(fā)布訂閱 缀棍,其實我們在推送消息的時候宅此,只用到了exchange和type,而不關(guān)系隊列,因為只要是綁定了該exchange的隊列爬范,都會被推送消息父腕。也就是說,fanout模式青瀑,一個消息會被推送到多個隊列璧亮,那么哪種情景會用到這種模式呢?比如 用戶注冊后斥难,我既要發(fā)郵件枝嘶,又要發(fā)短信,那么發(fā)短信和發(fā)郵件哑诊,就可以用fanout 這種模式

  • 下面我寫一下消費的代碼躬络,消費隊列的方法其實都一樣,這里演示一次搭儒,后面的其他類型的exchange就不演示了穷当。

package main

import (
    "github.com/streadway/amqp"
)

/**
 * @Description: 演示rabbitmq的exchange類型-生產(chǎn),fanout     為了方便演示提茁,忽略錯誤捕捉
 */
func main() {
    //交換機
    var exchange="guofu_exchange"
    //建立連接  用戶名+密碼+ip+端口號+vhost
    conn, _ := amqp.Dial("amqp://guofu:guofu@172.16.131.3:5672/guofu_vhost")
    //建立通道
    ch, _ := conn.Channel()
    //聲明交換機類型
    ch.ExchangeDeclare(
        exchange,
        "fanout",
        true,
        false,
        false,
        false,
        nil,
        )

    //定義消息
    msgBody:="i am a msg3"
    //發(fā)送消息  相關(guān)參數(shù) exchange, key string, mandatory, immediate bool, msg Publishing
    err := ch.Publish(
        exchange,     //exchange
        "", //routing key(queue name)
        false,
        false,
        amqp.Publishing{
            DeliveryMode: amqp.Persistent, //Msg set as persistent
            ContentType:  "text/plain",
            Body:         []byte(msgBody),
        })

    if err !=nil{
        panic(err)
    }
}

  • 上面的代碼相信大家都看的明白,但是要注意的是馁菜,里面有一個點 【試探性創(chuàng)建】 這是什么意思茴扁?這是說,如果有這個exchange/queue汪疮,就用峭火,沒有的話就創(chuàng)建,剛才我并沒有創(chuàng)建guofu_queue3,但是我監(jiān)聽這個隊列也得到消息了


    image.png
  • 那么我們用消費代碼創(chuàng)建一下新的exchange和queue

package main

import (
    "fmt"
    "github.com/streadway/amqp"
)

func main() {
    //交換機
    var exchange = "guofu_exchange_test"
    var queue = "guofu_queue_test"
    var key = ""
    //建立連接  用戶名+密碼+ip+端口號+vhost
    conn, _ := amqp.Dial("amqp://guofu:guofu@172.16.131.3:5672/guofu_vhost")
    //建立通道
    ch, _ := conn.Channel()
    //試探性聲明交換機類型
    ch.ExchangeDeclare(
        exchange,
        "fanout",
        true,
        false,
        false,
        false,
        nil,
    )

    //試探性創(chuàng)建隊列
    //聲明queue 和相關(guān)屬性 相關(guān)參數(shù) name string, durable, autoDelete, exclusive, noWait bool, args Table
    _, err := ch.QueueDeclare(
        queue,
        true,
        false,
        false,
        false,
        nil,
    )
    if err != nil {
        panic(err)
    }
    //綁定隊列 (name, key, exchange string, noWait bool, args Table) 發(fā)布訂閱模式的key為空
    ch.QueueBind(queue, key, exchange, false, nil)

    //  消費隊列  Consume(queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args Table)
    msg, err := ch.Consume(
        queue,
        "",
        false,
        false,
        false,
        false,
        nil,
    )
    for d:=range msg{
        fmt.Println(string(d.Body))
        d.Ack(false)

    }




}

  • 交換機和隊列被創(chuàng)建


    image.png
  • 還有一點要注意的是ACK 機制
    確認機制分為三種:none智嚷、auto (默認)卖丸、manual
    自動 ACK:消息一旦被接收,消費者自動發(fā)送 ACK
    手動 ACK:消息接收后盏道,不會發(fā)送 ACK稍浆,需要手動調(diào)用
    這兩 ACK 要怎么選擇呢?這需要看消息的重要性:
    如果消息不太重要猜嘱,丟失也沒有影響衅枫,那么自動 ACK 會比較方便
    如果消息非常重要,不容丟失朗伶。那么最好在消費完成后手動 ACK弦撩,否則接收消息后就自動 ACK,RabbitMQ 就會把消息從隊列中刪除论皆。如果此時消費者宕機益楼,那么消息就丟失了。

image.png
  • 另外一點是在php和java中点晴,還有一種生產(chǎn)者消息確認機制偏形,消息推送成功后支持函數(shù)回調(diào),但是golang里面我沒有找到這個方法


    image.png
  • 好了觉鼻,我們回歸exchange的第二種類型direct 路由模式俊扭,這次我們直接使用消費端的代碼直接建立隊列并監(jiān)聽

package main

import (
    "fmt"
    "github.com/streadway/amqp"
)

func main() {
    //交換機
    var exchange = "direct_guofu_exchange"
    var queue = "direct_guofu_queue"
    var key = "direct_key"
    //建立連接  用戶名+密碼+ip+端口號+vhost
    conn, _ := amqp.Dial("amqp://guofu:guofu@172.16.131.3:5672/guofu_vhost")
    //建立通道
    ch, _ := conn.Channel()
    //試探性聲明交換機類型
    ch.ExchangeDeclare(
        exchange,
        "direct",
        true,
        false,
        false,
        false,
        nil,
    )

    //試探性創(chuàng)建隊列
    //聲明queue 和相關(guān)屬性 相關(guān)參數(shù) name string, durable, autoDelete, exclusive, noWait bool, args Table
    _, err := ch.QueueDeclare(
        queue,
        true,
        false,
        false,
        false,
        nil,
    )
    if err != nil {
        panic(err)
    }
    //綁定隊列 (name, key, exchange string, noWait bool, args Table) 發(fā)布訂閱模式的key為空
    ch.QueueBind(queue, key, exchange, false, nil)

    //  消費隊列  Consume(queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args Table)
    msg, err := ch.Consume(
        queue,
        "",
        false,
        false,
        false,
        false,
        nil,
    )
    for d:=range msg{
        fmt.Println(string(d.Body))
        d.Ack(false)

    }




}


image.png
  • 使用同樣方法 創(chuàng)建隊列direct_guofu_queue

  • 推送消息到該隊列,需要注意的是,如果你兩個queue使用了同一個key,那么exchange會根據(jù)key 推送給兩個隊列坠陈,如果不是業(yè)務(wù)需要萨惑,盡量避免重復(fù)key ,減少臟數(shù)據(jù)的生成

package main

import (
    "github.com/streadway/amqp"
)

/**
 * @Description: 演示rabbitmq的exchange類型-生產(chǎn),fanout     為了方便演示仇矾,忽略錯誤捕捉
 */
func main() {
    var exchange = "direct_guofu_exchange"
    var key = "direct_key"
    //建立連接  用戶名+密碼+ip+端口號+vhost
    conn, _ := amqp.Dial("amqp://guofu:guofu@172.16.131.3:5672/guofu_vhost")
    //建立通道
    ch, _ := conn.Channel()
    //聲明交換機類型
    ch.ExchangeDeclare(
        exchange,
        "direct",
        true,
        false,
        false,
        false,
        nil,
        )

    //定義消息
    msgBody:="i am a direct"
    //發(fā)送消息  相關(guān)參數(shù) exchange, key string, mandatory, immediate bool, msg Publishing
    err := ch.Publish(
        exchange,     //exchange
        key, //routing key(queue name)
        false,
        false,
        amqp.Publishing{
            DeliveryMode: amqp.Persistent, //Msg set as persistent
            ContentType:  "text/plain",
            Body:         []byte(msgBody),
        })


    if err !=nil{
        panic(err)
    }
}

image.png
  • topic模式
    topic 類似于mysql的模糊查詢庸蔼,只要是能模糊匹配到的,都會推送消息贮匕。姐仅,推送的路由可以是一個包含了多個屬性,以.分割的字符串,最大程長度是200左右掏膏。推送之后劳翰,其他會匹配其他隊列的路由,如果匹配到了馒疹,則推送進去〖阳ぃ現(xiàn)在我們假設(shè)有以下場景 一共有兩個隊列,第一個隊列animal 如果是動物颖变,進入這個隊列生均,第二個隊列是 plant,第三個隊列是表示顏色yellow腥刹,如果是黃色的都進入這個隊列马胧,現(xiàn)在我們要推送這個幾個到隊列里面去
    1.橘貓 既要去animal 也要去 yellow
    2.菊花 既要去plant 也要去yellow

如代碼所示,我創(chuàng)建了三個隊列衔峰,綁定的key 分別是 #.animal.#,#.plant.#,yellow.#,

    var exchange = "topic_guofu_exchange"
    var queue = "topic727_yellow"
    var key = "yellow.#"

    var exchange = "topic_guofu_exchange"
    var queue = "topic727_animal"
    var key = "#.animal.#"


    var exchange = "topic_guofu_exchange"
    var queue = "topic727_plant"
    var key = "#.plant.#"

那么當(dāng)我推送消息的時候佩脊,如果我topic綁定的路由鍵 是 yellow.animal.plant ,那么推送的時候 三個消息隊列都會被匹配。我們來看一下


執(zhí)行前
  • 把生產(chǎn)的代碼貼出來
package main

import (
    "github.com/streadway/amqp"
)

/**
 * @Description: 演示rabbitmq的exchange類型-生產(chǎn),fanout     為了方便演示朽色,忽略錯誤捕捉
 */
func main() {
    var exchange = "topic_guofu_exchange"
    var key = "yellow.animal.plant "
    var queue = "topic727"
    //建立連接  用戶名+密碼+ip+端口號+vhost
    conn, _ := amqp.Dial("amqp://guofu:guofu@172.16.131.3:5672/guofu_vhost")
    //建立通道
    ch, _ := conn.Channel()
    //聲明交換機類型
    ch.ExchangeDeclare(
        exchange,
        "topic",
        true,
        false,
        false,
        false,
        nil,
    )

    //試探性創(chuàng)建隊列
    //聲明queue 和相關(guān)屬性 相關(guān)參數(shù) name string, durable, autoDelete, exclusive, noWait bool, args Table
    _, err := ch.QueueDeclare(
        queue,
        true,
        false,
        false,
        false,
        nil,
    )
    if err != nil {
        panic(err)
    }
    //綁定隊列 (name, key, exchange string, noWait bool, args Table) 發(fā)布訂閱模式的key為空
    ch.QueueBind(queue, key, exchange, false, nil)

    //定義消息
    msgBody := key
    //發(fā)送消息  相關(guān)參數(shù) exchange, key string, mandatory, immediate bool, msg Publishing
    err = ch.Publish(
        exchange, //exchange
        key,      //routing key(queue name)
        false,
        false,
        amqp.Publishing{
            DeliveryMode: amqp.Persistent, //Msg set as persistent
            ContentType:  "text/plain",
            Body:         []byte(msgBody),
        })

    if err != nil {
        panic(err)
    }
}

  • 推送完畢邻吞,發(fā)現(xiàn)四個隊列都有了數(shù)據(jù)(第一個隊列是topic 推送時候綁定的组题,后面三個是路由匹配的)


    image.png
  • 那么此時葫男,如果我推送的key是yellow.animal,那么路由會匹配到 yellow.# 和 #.animal.#,我們來看一下


    image.png
  • topic的功能是比較強大的崔列,利用好topic ,可以實現(xiàn) direct和fanout的功能梢褐,路由密鑰中可以包含任意多個單詞,最多255個字節(jié)赵讯。

  • header exchange(頭交換機)和主題交換機有點相似盈咳,但是不同于主題交換機的路由是基于路由鍵,頭交換機的路由值基于消息的header數(shù)據(jù)边翼。在此也不做贅述了鱼响。有興趣的同學(xué)可以去官網(wǎng)看看。
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末组底,一起剝皮案震驚了整個濱河市丈积,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌债鸡,老刑警劉巖江滨,帶你破解...
    沈念sama閱讀 206,126評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異厌均,居然都是意外死亡唬滑,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,254評論 2 382
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來晶密,“玉大人擒悬,你說我怎么就攤上這事∪切” “怎么了茄螃?”我有些...
    開封第一講書人閱讀 152,445評論 0 341
  • 文/不壞的土叔 我叫張陵,是天一觀的道長连锯。 經(jīng)常有香客問我归苍,道長,這世上最難降的妖魔是什么运怖? 我笑而不...
    開封第一講書人閱讀 55,185評論 1 278
  • 正文 為了忘掉前任拼弃,我火速辦了婚禮,結(jié)果婚禮上摇展,老公的妹妹穿的比我還像新娘吻氧。我一直安慰自己,他們只是感情好咏连,可當(dāng)我...
    茶點故事閱讀 64,178評論 5 371
  • 文/花漫 我一把揭開白布盯孙。 她就那樣靜靜地躺著,像睡著了一般祟滴。 火紅的嫁衣襯著肌膚如雪振惰。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 48,970評論 1 284
  • 那天垄懂,我揣著相機與錄音骑晶,去河邊找鬼。 笑死草慧,一個胖子當(dāng)著我的面吹牛桶蛔,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播漫谷,決...
    沈念sama閱讀 38,276評論 3 399
  • 文/蒼蘭香墨 我猛地睜開眼仔雷,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了舔示?” 一聲冷哼從身側(cè)響起碟婆,我...
    開封第一講書人閱讀 36,927評論 0 259
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎斩郎,沒想到半個月后脑融,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 43,400評論 1 300
  • 正文 獨居荒郊野嶺守林人離奇死亡缩宜,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 35,883評論 2 323
  • 正文 我和宋清朗相戀三年肘迎,在試婚紗的時候發(fā)現(xiàn)自己被綠了甥温。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 37,997評論 1 333
  • 序言:一個原本活蹦亂跳的男人離奇死亡妓布,死狀恐怖姻蚓,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情匣沼,我是刑警寧澤狰挡,帶...
    沈念sama閱讀 33,646評論 4 322
  • 正文 年R本政府宣布,位于F島的核電站释涛,受9級特大地震影響加叁,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜唇撬,卻給世界環(huán)境...
    茶點故事閱讀 39,213評論 3 307
  • 文/蒙蒙 一它匕、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧窖认,春花似錦豫柬、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,204評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至喝噪,卻和暖如春础嫡,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背仙逻。 一陣腳步聲響...
    開封第一講書人閱讀 31,423評論 1 260
  • 我被黑心中介騙來泰國打工驰吓, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留涧尿,地道東北人系奉。 一個月前我還...
    沈念sama閱讀 45,423評論 2 352
  • 正文 我出身青樓,卻偏偏與公主長得像姑廉,于是被迫代替她去往敵國和親缺亮。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 42,722評論 2 345

推薦閱讀更多精彩內(nèi)容