在上一節(jié)中我們主要介紹了為什么要使用消息隊(duì)列、常見(jiàn)的消息隊(duì)列對(duì)比分析以及初步認(rèn)識(shí)了RabbitMQ的整體架構(gòu)和基本概念。那么在這一節(jié)中疤剑,主要是從代碼的角度出發(fā)介紹一下如何使用RabbitMQ以及講一下在實(shí)際項(xiàng)目中踩的坑。
這里我用的是Go語(yǔ)言敛助,連接驅(qū)動(dòng)采用的是開(kāi)源的庫(kù)amqp
github地址:https://github.com/streadway/amqp
生產(chǎn)者
- 建立連接
// 建立連接
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
- 創(chuàng)建channel
// 創(chuàng)建channel
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
-
創(chuàng)建隊(duì)列
// 創(chuàng)建隊(duì)列 q, err := ch.QueueDeclare( /*name*/ "queue", // 隊(duì)列名稱 /*durable*/ false, // 是否持久化 /*autoDelete*/ false, // 是否自動(dòng)刪除 /*exclusive*/ false, // 排他 /*noWait*/ false, // 是否等待服務(wù)器確認(rèn) /*args*/ nil, // 其他配置 ) failOnError(err, "Failed to declare a queue")
參數(shù)說(shuō)明:
-
exclusive
排他隊(duì)列只對(duì)首次創(chuàng)建它的連接可見(jiàn)捻浦,排他隊(duì)列是基于連接 (Connection) 可見(jiàn)的晤揣,并且該連接內(nèi)的所有信道 (Channel) 都可以訪問(wèn)這個(gè)排他隊(duì)列,在這個(gè)連接斷開(kāi)之后朱灿,該隊(duì)列自動(dòng)刪除昧识,由此可見(jiàn)這個(gè)隊(duì)列可以說(shuō)是綁到連接上的,對(duì)同一服務(wù)器的其他連接不可見(jiàn)盗扒。
同一連接中不允許建立同名的排他隊(duì)列的
這種排他優(yōu)先于持久化跪楞,即使設(shè)置了隊(duì)列持久化,在連接斷開(kāi)后侣灶,該隊(duì)列也會(huì)自動(dòng)刪除甸祭。
非排他隊(duì)列不依附于連接而存在,同一服務(wù)器上的多個(gè)連接都可以訪問(wèn)這個(gè)隊(duì)列褥影。 -
autoDelete
為 true 則設(shè)置隊(duì)列為自動(dòng)刪除池户。
自動(dòng)刪除的前提是:至少有一個(gè)消費(fèi)者連接到這個(gè)隊(duì)列,之后所有與這個(gè)隊(duì)列連接的消費(fèi)者都斷開(kāi)時(shí)凡怎,才會(huì)自動(dòng)刪除校焦。
不能把這個(gè)參數(shù)錯(cuò)誤地理解為: 當(dāng)連接到此隊(duì)列的所有客戶端斷開(kāi)時(shí),這個(gè)隊(duì)列自動(dòng)刪除统倒,因?yàn)樯a(chǎn)者客戶端創(chuàng)建這個(gè)隊(duì)列寨典,或者沒(méi)有消費(fèi)者客戶端與這個(gè)隊(duì)列連接時(shí),都不會(huì)自動(dòng)刪除這個(gè)隊(duì)列房匆。 -
noWait
當(dāng) noWait 為 true 時(shí)耸成,聲明時(shí)無(wú)需等待服務(wù)器的確認(rèn)。
該通道可能由于錯(cuò)誤而關(guān)閉浴鸿。 添加一個(gè) NotifyClose 偵聽(tīng)器應(yīng)對(duì)任何異常井氢。
-
-
創(chuàng)建交換機(jī)
// 創(chuàng)建交換機(jī) ch.ExchangeDeclare( /*name*/ "exchange", // 交換機(jī)名稱 /*kind*/ "fanout", // 交換機(jī)類型 /*durable*/ true, // 是否持久化 /*autoDelete*/ false, // 是否自動(dòng)刪除 /*internal*/ false, // 是否是內(nèi)置交換機(jī) /*noWait*/ false, // 是否等待確認(rèn) /*args*/ nil) // 其他配置 failOnError(err, "Failed to declare a exchange")
參數(shù)說(shuō)明:
-
internal:
內(nèi)置交換器是一種特殊的交換器,這種交換器不能直接接收生產(chǎn)者發(fā)送的消息赚楚,
只能作為類似于隊(duì)列的方式綁定到另一個(gè)交換器毙沾,來(lái)接收這個(gè)交換器中路由的消息,
內(nèi)置交換器同樣可以綁定隊(duì)列和路由消息宠页,只是其接收消息的來(lái)源與普通交換器不同左胞。
-
-
綁定交換機(jī)和隊(duì)列
// 綁定交換機(jī)和隊(duì)列 err = ch.QueueBind( /*name*/ q.Name, // 隊(duì)列名稱 /*key*/ "", // Routing Key 因?yàn)椴捎玫氖莊anout模式,所以這里為空 /*exchange*/ "exchange", // 交換機(jī)名稱 /*noWait*/ false, // 是否等待確認(rèn) /*args*/ nil) // 其他配置 failOnError(err, "Failed to bind")
-
發(fā)送消息
// 發(fā)送消息 body := "Hello World!" err = ch.Publish( /*exchange*/ "exchange", // 交換機(jī)名稱 /*key*/ "", // routing key /*mandatory*/ false, // 是否為無(wú)法路由的消息進(jìn)行返回處理 /*immediate*/ false, // 是否對(duì)路由到無(wú)消費(fèi)者隊(duì)列的消息進(jìn)行返回處理 RabbitMQ 3.0 廢棄 amqp.Publishing{ ContentType: "text/plain", Body: []byte(body), DeliveryMode: 2, // 2代表著消息持久化举户,1代表否 }) log.Printf(" [x] Sent %s", body) failOnError(err, "Failed to publish a message")
參數(shù)說(shuō)明:
-
mandatory
消息發(fā)布的時(shí)候設(shè)置消息的 mandatory 屬性用于設(shè)置消息在發(fā)送到交換器之后無(wú)法路由到隊(duì)列的情況對(duì)消息的處理方式烤宙,
設(shè)置為 true 表示將消息返回到生產(chǎn)者,否則直接丟棄消息俭嘁。 -
immediate
參數(shù)告訴服務(wù)器至少將該消息路由到一個(gè)隊(duì)列中躺枕,否則將消息返回給生產(chǎn)者。imrnediate 參數(shù)告訴服務(wù)器,如果該消息關(guān)聯(lián)的隊(duì)列上有消費(fèi)者拐云,則立刻投遞:如果所有匹配的隊(duì)列上都沒(méi)有消費(fèi)者罢猪,則直接將消息返還給生產(chǎn)者,不用將消息存入隊(duì)列而等待消費(fèi)者了叉瘩。
-
如果在客戶端提前創(chuàng)建了交換機(jī)和隊(duì)列并且也綁定在一起了膳帕,那么可以省略3、4薇缅、5步驟危彩,直接發(fā)送消息即可。
發(fā)送了一條消息泳桦,如果這條消息還沒(méi)有被消費(fèi)掉汤徽,那么在客戶端就能看到這條消息:
[圖片上傳失敗...(image-12925a-1593443389768)]
消費(fèi)者
消費(fèi)者和生產(chǎn)者步驟類似,這里就貼一下demo:
package main
import (
"log"
"github.com/streadway/amqp"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
// 建立連接
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
// 創(chuàng)建channel
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
// 創(chuàng)建隊(duì)列
q, err := ch.QueueDeclare(
"queue", // 隊(duì)列名稱
true, // 是否持久化
false, // 是否自動(dòng)刪除
false, // 排他
false, // 是否等待確認(rèn)
nil, // 其他配置
)
failOnError(err, "Failed to declare a queue")
msgs, err := ch.Consume(
q.Name, // 隊(duì)列名稱
"", // consumer
true, // 是否自動(dòng)ack確認(rèn)
false, // 排他
false, // no-local 設(shè)置為 true 則表示不能將同一個(gè) Connection 中生產(chǎn)者發(fā)送的消息傳送給這個(gè) Connection 中的消費(fèi)者
false, // 是否等待確認(rèn)
nil, // 其他配置
)
failOnError(err, "Failed to register a consumer")
forever := make(chan bool)
go func() {
for d := range msgs {
log.Printf("Received a message: %s", d.Body)
}
}()
log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
<-forever
}
go run一下就能接收生產(chǎn)者發(fā)送的消息
持久化
RabbitMQ 持久化包含3個(gè)部分
- exchange 持久化灸撰,在聲明時(shí)指定 durable 為 true
- queue 持久化谒府,在聲明時(shí)指定 durable 為 true
- message 持久化,在投遞時(shí)指定 delivery_mode=2(1是非持久化)
這里需要注意以下幾點(diǎn):
- queue 的持久化能保證本身的元數(shù)據(jù)不會(huì)因異常而丟失梧奢,但是不能保證內(nèi)部的 message 不會(huì)丟失狱掂。要確保 message 不丟失演痒,還需要將 message 也持久化亲轨。
我之前就是因?yàn)闆](méi)有指定message 持久化,重啟了RabbitMQ就發(fā)送數(shù)據(jù)不見(jiàn)了......
如果 exchange 和 queue 都是持久化的鸟顺,那么它們之間的 binding 也是持久化的惦蚊。
如果 exchange 和 queue 兩者之間有一個(gè)持久化,一個(gè)非持久化讯嫂,就不允許建立綁定蹦锋。
一旦確定了 exchange 和 queue 的 durable,就不能修改了欧芽。如果非要修改莉掂,唯一的辦法就是刪除原來(lái)的 exchange 或 queue 后,然后重新創(chuàng)建千扔。
數(shù)據(jù)丟失了怎么辦
我們使用消息隊(duì)列總不能把數(shù)據(jù)弄丟了吧憎妙,這是基本原則。
這里數(shù)據(jù)丟失主要分三種情況:生產(chǎn)者弄丟了數(shù)據(jù)曲楚、RabbitMQ弄丟了數(shù)據(jù)以及消費(fèi)者弄丟了數(shù)據(jù)厘唾。
-
生產(chǎn)者弄丟了數(shù)據(jù)
生產(chǎn)者將數(shù)據(jù)發(fā)送到 RabbitMQ 的時(shí)候,可能數(shù)據(jù)就在半路給搞丟了龙誊,因?yàn)榫W(wǎng)絡(luò)問(wèn)題啥的抚垃,都有可能。為了避免這種情況發(fā)生,主要有以下兩點(diǎn)解決方案:
-
利用RabbitMQ事物機(jī)制
這個(gè)呢就是生產(chǎn)者發(fā)送數(shù)據(jù)之前開(kāi)啟 RabbitMQ 事務(wù)
ch.Tx()
鹤树,然后發(fā)送消息铣焊,如果消息沒(méi)有成功被 RabbitMQ 接收到,那么生產(chǎn)者會(huì)收到異常報(bào)錯(cuò)罕伯,此時(shí)就可以回滾事務(wù)ch.TxRollback()
粗截,然后重試發(fā)送消息;如果收到了消息捣炬,那么可以提交事務(wù)ch.TxCommit()
熊昌。// 發(fā)送消息 body := "Hello World!" ch.Tx() err = ch.Publish( /*exchange*/ "exchange", // 交換機(jī)名稱 /*key*/ "", // routing key /*mandatory*/ false, // 是否為無(wú)法路由的消息進(jìn)行返回處理 /*immediate*/ false, // 是否對(duì)路由到無(wú)消費(fèi)者隊(duì)列的消息進(jìn)行返回處理 RabbitMQ 3.0 廢棄 amqp.Publishing{ ContentType: "text/plain", Body: []byte(body), DeliveryMode: 2, // 2代表著消息持久化,1代表否 }) if err != nil { log.Printf("tx rollback") ch.TxRollback() } else { log.Printf(" [x] Sent %s", body) ch.TxCommit() }
-
采用confirm機(jī)制
在生產(chǎn)者那里設(shè)置開(kāi)啟
confirm
模式之后湿酸,你每次寫(xiě)的消息都會(huì)分配一個(gè)唯一的 id婿屹,然后如果寫(xiě)入了 RabbitMQ 中,RabbitMQ 會(huì)給你回傳一個(gè)ack
消息推溃,告訴你說(shuō)這個(gè)消息 ok 了昂利。如果 RabbitMQ 沒(méi)能處理這個(gè)消息,ack=false
铁坎,告訴你這個(gè)消息接收失敗蜂奸,你可以重試。而且你可以結(jié)合這個(gè)機(jī)制自己在內(nèi)存里維護(hù)每個(gè)消息 id 的狀態(tài)硬萍,如果超過(guò)一定時(shí)間還沒(méi)接收到這個(gè)消息的回調(diào)扩所,那么你可以重發(fā)。confirm := ch.NotifyPublish(make(chan amqp.Confirmation)) ch.Confirm(false) go func() { ch.Publish("exchange", "", false, false, amqp.Publishing{Body: []byte("pub 1")}) ch.Publish("exchange", "", false, false, amqp.Publishing{Body: []byte("pub 2")}) ch.Publish("exchange", "", false, false, amqp.Publishing{Body: []byte("pub 3")}) ch.Publish("exchange", "", false, false, amqp.Publishing{Body: []byte("pub 4")}) }() for i := 0; i < 4; i++ { ack := <-confirm if ack.Ack { // 消息成功發(fā)送 } else { // 消息未成功發(fā)送 } fmt.Println(ack.DeliveryTag) }
對(duì)比這兩種方法朴乖,我們一般采取第二種方案祖屏,因?yàn)榈谝环N是同步的,你提交一個(gè)事務(wù)之后會(huì)阻塞在那兒买羞。但是
confirm
機(jī)制是異步的袁勺,你發(fā)送個(gè)消息之后就可以發(fā)送下一個(gè)消息,然后那個(gè)消息 RabbitMQ 接收了之后會(huì)異步回調(diào)你的一個(gè)接口通知你這個(gè)消息接收到了畜普。 -
-
RabbitMQ弄丟了數(shù)據(jù)
就是 RabbitMQ 自己弄丟了數(shù)據(jù)期丰,這個(gè)你必須開(kāi)啟 RabbitMQ 的持久化,就是消息寫(xiě)入之后會(huì)持久化到磁盤吃挑,哪怕是 RabbitMQ 自己掛了钝荡,恢復(fù)之后會(huì)自動(dòng)讀取之前存儲(chǔ)的數(shù)據(jù),一般數(shù)據(jù)不會(huì)丟儒鹿。除非極其罕見(jiàn)的是化撕,RabbitMQ 還沒(méi)持久化,自己就掛了约炎,可能導(dǎo)致少量數(shù)據(jù)丟失植阴,但是這個(gè)概率較小蟹瘾。
如果碰到這種情況怎么辦呢?
持久化可以跟生產(chǎn)者那邊的
confirm
機(jī)制配合起來(lái)掠手,只有消息被持久化到磁盤之后憾朴,才會(huì)通知生產(chǎn)者ack
了,所以哪怕是在持久化到磁盤之前喷鸽,RabbitMQ 掛了众雷,數(shù)據(jù)丟了,生產(chǎn)者收不到ack
砾省,你也是可以自己重發(fā)的。 -
消費(fèi)者弄丟了數(shù)據(jù)
RabbitMQ 如果丟失了數(shù)據(jù)混槐,主要是因?yàn)槟阆M(fèi)的時(shí)候编兄,剛消費(fèi)到,還沒(méi)處理声登,結(jié)果進(jìn)程掛了狠鸳,比如重啟了,那么就尷尬了悯嗓,RabbitMQ 認(rèn)為你都消費(fèi)了件舵,這數(shù)據(jù)就丟了。
這個(gè)時(shí)候就需要RabbitMQ提供的
ack
機(jī)制脯厨,消費(fèi)者消費(fèi)完成后會(huì)發(fā)送一個(gè)ack,RabbitMQ接收到了才會(huì)去刪除數(shù)據(jù)铅祸,否則保留數(shù)據(jù)。前提是關(guān)閉RabbitMQ自動(dòng)ack:
image-20200627125028885.pngmessages, err := rabbitmq.Consume() if err != nil { logger.ErrorF("[Consume] rabbitmq Consume fail,err: %s", err.Error()) } go func() { ch := make(chan int, 50) for msg := range messages { go func(m amqp.Delivery) { defer func() { <-ch }() ch <- 1 if err = handleMessage(m.Body); err == nil { m.Ack(false) // 發(fā)送ack } else { logger.ErrorF("[Consume] handleMessage fail,err: %s", err.Error()) } }(msg) } }()
m.Ack(true)的時(shí)候代表同一channe俄认,此傳遞和所有先前未確認(rèn)的傳遞將一起被確認(rèn)个少,這在批量處理的時(shí)候會(huì)用到。
m.Ack(false)的時(shí)候就是一個(gè)一個(gè)確認(rèn)眯杏。
總結(jié)
本節(jié)主要是分享了RabbitMQ在Go里的基本使用以及我們?nèi)绾稳ヌ幚硐G失的一個(gè)問(wèn)題。
當(dāng)然了還有其他很多更深層次的問(wèn)題壳澳,比如說(shuō)我們?nèi)绾稳ゴ_保消息隊(duì)列的高可用岂贩、如何去確保消息的順序性、遇到消息積壓等等等等巷波。因?yàn)槟乇救藢?duì)于RabbitMQ也沒(méi)有更深入的了解萎津,后續(xù)如果發(fā)現(xiàn)比較值得學(xué)習(xí)的地方再來(lái)給大家分享。