# RabbitMQ使用總結(jié)(二)

rabbitmq.jpeg

在上一節(jié)中我們主要介紹了為什么要使用消息隊(duì)列、常見(jiàn)的消息隊(duì)列對(duì)比分析以及初步認(rèn)識(shí)了RabbitMQ的整體架構(gòu)和基本概念。那么在這一節(jié)中疤剑,主要是從代碼的角度出發(fā)介紹一下如何使用RabbitMQ以及講一下在實(shí)際項(xiàng)目中踩的坑。

這里我用的是Go語(yǔ)言敛助,連接驅(qū)動(dòng)采用的是開(kāi)源的庫(kù)amqp

github地址:https://github.com/streadway/amqp

生產(chǎn)者

  1. 建立連接
// 建立連接
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
  1. 創(chuàng)建channel
// 創(chuàng)建channel
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
  1. 創(chuàng)建隊(duì)列

    // 創(chuàng)建隊(duì)列
     q, err := ch.QueueDeclare(
         /*name*/ "queue", // 隊(duì)列名稱
         /*durable*/ false, // 是否持久化
         /*autoDelete*/ false, // 是否自動(dòng)刪除
         /*exclusive*/ false, // 排他
         /*noWait*/ false, // 是否等待服務(wù)器確認(rèn)
         /*args*/ nil, // 其他配置
     )
     failOnError(err, "Failed to declare a queue")
    

    參數(shù)說(shuō)明:

    • exclusive

      排他隊(duì)列只對(duì)首次創(chuàng)建它的連接可見(jiàn)捻浦,排他隊(duì)列是基于連接 (Connection) 可見(jiàn)的晤揣,并且該連接內(nèi)的所有信道 (Channel) 都可以訪問(wèn)這個(gè)排他隊(duì)列,在這個(gè)連接斷開(kāi)之后朱灿,該隊(duì)列自動(dòng)刪除昧识,由此可見(jiàn)這個(gè)隊(duì)列可以說(shuō)是綁到連接上的,對(duì)同一服務(wù)器的其他連接不可見(jiàn)盗扒。
      同一連接中不允許建立同名的排他隊(duì)列的
      這種排他優(yōu)先于持久化跪楞,即使設(shè)置了隊(duì)列持久化,在連接斷開(kāi)后侣灶,該隊(duì)列也會(huì)自動(dòng)刪除甸祭。
      非排他隊(duì)列不依附于連接而存在,同一服務(wù)器上的多個(gè)連接都可以訪問(wèn)這個(gè)隊(duì)列褥影。

    • autoDelete

      為 true 則設(shè)置隊(duì)列為自動(dòng)刪除池户。
      自動(dòng)刪除的前提是:至少有一個(gè)消費(fèi)者連接到這個(gè)隊(duì)列,之后所有與這個(gè)隊(duì)列連接的消費(fèi)者都斷開(kāi)時(shí)凡怎,才會(huì)自動(dòng)刪除校焦。
      不能把這個(gè)參數(shù)錯(cuò)誤地理解為: 當(dāng)連接到此隊(duì)列的所有客戶端斷開(kāi)時(shí),這個(gè)隊(duì)列自動(dòng)刪除统倒,因?yàn)樯a(chǎn)者客戶端創(chuàng)建這個(gè)隊(duì)列寨典,或者沒(méi)有消費(fèi)者客戶端與這個(gè)隊(duì)列連接時(shí),都不會(huì)自動(dòng)刪除這個(gè)隊(duì)列房匆。

    • noWait

      當(dāng) noWait 為 true 時(shí)耸成,聲明時(shí)無(wú)需等待服務(wù)器的確認(rèn)。
      該通道可能由于錯(cuò)誤而關(guān)閉浴鸿。 添加一個(gè) NotifyClose 偵聽(tīng)器應(yīng)對(duì)任何異常井氢。

  2. 創(chuàng)建交換機(jī)

    // 創(chuàng)建交換機(jī)
    ch.ExchangeDeclare(
       /*name*/ "exchange", // 交換機(jī)名稱
       /*kind*/ "fanout", // 交換機(jī)類型
       /*durable*/ true, // 是否持久化
       /*autoDelete*/ false, // 是否自動(dòng)刪除
       /*internal*/ false, // 是否是內(nèi)置交換機(jī)
       /*noWait*/ false, // 是否等待確認(rèn)
       /*args*/ nil) // 其他配置
    failOnError(err, "Failed to declare a exchange")
    

    參數(shù)說(shuō)明:

    • internal:

      內(nèi)置交換器是一種特殊的交換器,這種交換器不能直接接收生產(chǎn)者發(fā)送的消息赚楚,
      只能作為類似于隊(duì)列的方式綁定到另一個(gè)交換器毙沾,來(lái)接收這個(gè)交換器中路由的消息,
      內(nèi)置交換器同樣可以綁定隊(duì)列和路由消息宠页,只是其接收消息的來(lái)源與普通交換器不同左胞。

  3. 綁定交換機(jī)和隊(duì)列

    // 綁定交換機(jī)和隊(duì)列
     err = ch.QueueBind(
         /*name*/ q.Name, // 隊(duì)列名稱
         /*key*/ "", // Routing Key 因?yàn)椴捎玫氖莊anout模式,所以這里為空
         /*exchange*/ "exchange", // 交換機(jī)名稱
         /*noWait*/ false, // 是否等待確認(rèn)
         /*args*/ nil) // 其他配置
     failOnError(err, "Failed to bind")
    
  4. 發(fā)送消息

    // 發(fā)送消息
     body := "Hello World!"
     err = ch.Publish(
         /*exchange*/ "exchange", // 交換機(jī)名稱
         /*key*/ "", // routing key
         /*mandatory*/ false, // 是否為無(wú)法路由的消息進(jìn)行返回處理
         /*immediate*/ false, // 是否對(duì)路由到無(wú)消費(fèi)者隊(duì)列的消息進(jìn)行返回處理 RabbitMQ 3.0 廢棄
         amqp.Publishing{
             ContentType: "text/plain",
             Body:        []byte(body),
             DeliveryMode: 2,  // 2代表著消息持久化举户,1代表否
         })
     log.Printf(" [x] Sent %s", body)
     failOnError(err, "Failed to publish a message")
    

    參數(shù)說(shuō)明:

    • mandatory

      消息發(fā)布的時(shí)候設(shè)置消息的 mandatory 屬性用于設(shè)置消息在發(fā)送到交換器之后無(wú)法路由到隊(duì)列的情況對(duì)消息的處理方式烤宙,
      設(shè)置為 true 表示將消息返回到生產(chǎn)者,否則直接丟棄消息俭嘁。

    • immediate

      參數(shù)告訴服務(wù)器至少將該消息路由到一個(gè)隊(duì)列中躺枕,否則將消息返回給生產(chǎn)者。imrnediate 參數(shù)告訴服務(wù)器,如果該消息關(guān)聯(lián)的隊(duì)列上有消費(fèi)者拐云,則立刻投遞:如果所有匹配的隊(duì)列上都沒(méi)有消費(fèi)者罢猪,則直接將消息返還給生產(chǎn)者,不用將消息存入隊(duì)列而等待消費(fèi)者了叉瘩。

如果在客戶端提前創(chuàng)建了交換機(jī)和隊(duì)列并且也綁定在一起了膳帕,那么可以省略3、4薇缅、5步驟危彩,直接發(fā)送消息即可。

發(fā)送了一條消息泳桦,如果這條消息還沒(méi)有被消費(fèi)掉汤徽,那么在客戶端就能看到這條消息:

[圖片上傳失敗...(image-12925a-1593443389768)]

消費(fèi)者

消費(fèi)者和生產(chǎn)者步驟類似,這里就貼一下demo:

package main

import (
   "log"

   "github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
   if err != nil {
      log.Fatalf("%s: %s", msg, err)
   }
}

func main() {
   // 建立連接
   conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
   failOnError(err, "Failed to connect to RabbitMQ")
   defer conn.Close()

   // 創(chuàng)建channel
   ch, err := conn.Channel()
   failOnError(err, "Failed to open a channel")
   defer ch.Close()

   // 創(chuàng)建隊(duì)列
   q, err := ch.QueueDeclare(
      "queue", // 隊(duì)列名稱
      true,    // 是否持久化
      false,   // 是否自動(dòng)刪除
      false,   // 排他
      false,   // 是否等待確認(rèn)
      nil,     // 其他配置
   )
   failOnError(err, "Failed to declare a queue")

   msgs, err := ch.Consume(
      q.Name, // 隊(duì)列名稱
      "",     // consumer
      true,   // 是否自動(dòng)ack確認(rèn)
      false,  // 排他
      false,  // no-local 設(shè)置為 true 則表示不能將同一個(gè) Connection 中生產(chǎn)者發(fā)送的消息傳送給這個(gè) Connection 中的消費(fèi)者
      false,  // 是否等待確認(rèn)
      nil,    // 其他配置
   )
   failOnError(err, "Failed to register a consumer")

   forever := make(chan bool)
   go func() {
      for d := range msgs {
         log.Printf("Received a message: %s", d.Body)
      }
   }()

   log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
   <-forever
}

go run一下就能接收生產(chǎn)者發(fā)送的消息


image-20200627000417465.png

持久化

RabbitMQ 持久化包含3個(gè)部分

  • exchange 持久化灸撰,在聲明時(shí)指定 durable 為 true
  • queue 持久化谒府,在聲明時(shí)指定 durable 為 true
  • message 持久化,在投遞時(shí)指定 delivery_mode=2(1是非持久化)

這里需要注意以下幾點(diǎn):

  • queue 的持久化能保證本身的元數(shù)據(jù)不會(huì)因異常而丟失梧奢,但是不能保證內(nèi)部的 message 不會(huì)丟失狱掂。要確保 message 不丟失演痒,還需要將 message 也持久化亲轨。

我之前就是因?yàn)闆](méi)有指定message 持久化,重啟了RabbitMQ就發(fā)送數(shù)據(jù)不見(jiàn)了......

  • 如果 exchange 和 queue 都是持久化的鸟顺,那么它們之間的 binding 也是持久化的惦蚊。

  • 如果 exchange 和 queue 兩者之間有一個(gè)持久化,一個(gè)非持久化讯嫂,就不允許建立綁定蹦锋。

  • 一旦確定了 exchange 和 queue 的 durable,就不能修改了欧芽。如果非要修改莉掂,唯一的辦法就是刪除原來(lái)的 exchange 或 queue 后,然后重新創(chuàng)建千扔。

數(shù)據(jù)丟失了怎么辦

我們使用消息隊(duì)列總不能把數(shù)據(jù)弄丟了吧憎妙,這是基本原則。

這里數(shù)據(jù)丟失主要分三種情況:生產(chǎn)者弄丟了數(shù)據(jù)曲楚、RabbitMQ弄丟了數(shù)據(jù)以及消費(fèi)者弄丟了數(shù)據(jù)厘唾。

  • 生產(chǎn)者弄丟了數(shù)據(jù)

    生產(chǎn)者將數(shù)據(jù)發(fā)送到 RabbitMQ 的時(shí)候,可能數(shù)據(jù)就在半路給搞丟了龙誊,因?yàn)榫W(wǎng)絡(luò)問(wèn)題啥的抚垃,都有可能。為了避免這種情況發(fā)生,主要有以下兩點(diǎn)解決方案:

    1. 利用RabbitMQ事物機(jī)制

      這個(gè)呢就是生產(chǎn)者發(fā)送數(shù)據(jù)之前開(kāi)啟 RabbitMQ 事務(wù)ch.Tx() 鹤树,然后發(fā)送消息铣焊,如果消息沒(méi)有成功被 RabbitMQ 接收到,那么生產(chǎn)者會(huì)收到異常報(bào)錯(cuò)罕伯,此時(shí)就可以回滾事務(wù) ch.TxRollback() 粗截,然后重試發(fā)送消息;如果收到了消息捣炬,那么可以提交事務(wù) ch.TxCommit() 熊昌。

      // 發(fā)送消息
      body := "Hello World!"
      ch.Tx()
      err = ch.Publish(
         /*exchange*/ "exchange", // 交換機(jī)名稱
         /*key*/ "", // routing key
         /*mandatory*/ false, // 是否為無(wú)法路由的消息進(jìn)行返回處理
         /*immediate*/ false, // 是否對(duì)路由到無(wú)消費(fèi)者隊(duì)列的消息進(jìn)行返回處理 RabbitMQ 3.0 廢棄
         amqp.Publishing{
            ContentType:  "text/plain",
            Body:         []byte(body),
            DeliveryMode: 2, // 2代表著消息持久化,1代表否
         })
      if err != nil {
         log.Printf("tx rollback")
         ch.TxRollback()
      } else {
         log.Printf(" [x] Sent %s", body)
         ch.TxCommit()
      }
      
    2. 采用confirm機(jī)制

      在生產(chǎn)者那里設(shè)置開(kāi)啟 confirm 模式之后湿酸,你每次寫(xiě)的消息都會(huì)分配一個(gè)唯一的 id婿屹,然后如果寫(xiě)入了 RabbitMQ 中,RabbitMQ 會(huì)給你回傳一個(gè) ack 消息推溃,告訴你說(shuō)這個(gè)消息 ok 了昂利。如果 RabbitMQ 沒(méi)能處理這個(gè)消息,ack=false铁坎,告訴你這個(gè)消息接收失敗蜂奸,你可以重試。而且你可以結(jié)合這個(gè)機(jī)制自己在內(nèi)存里維護(hù)每個(gè)消息 id 的狀態(tài)硬萍,如果超過(guò)一定時(shí)間還沒(méi)接收到這個(gè)消息的回調(diào)扩所,那么你可以重發(fā)。

      confirm := ch.NotifyPublish(make(chan amqp.Confirmation))
      ch.Confirm(false)
      go func() {
         ch.Publish("exchange", "", false, false, amqp.Publishing{Body: []byte("pub 1")})
         ch.Publish("exchange", "", false, false, amqp.Publishing{Body: []byte("pub 2")})
         ch.Publish("exchange", "", false, false, amqp.Publishing{Body: []byte("pub 3")})
         ch.Publish("exchange", "", false, false, amqp.Publishing{Body: []byte("pub 4")})
      }()
      
      for i := 0; i < 4; i++ {
         ack := <-confirm
         if ack.Ack {
            // 消息成功發(fā)送
         } else {
            // 消息未成功發(fā)送
         }
         fmt.Println(ack.DeliveryTag)
      }
      

    對(duì)比這兩種方法朴乖,我們一般采取第二種方案祖屏,因?yàn)榈谝环N是同步的,你提交一個(gè)事務(wù)之后會(huì)阻塞在那兒买羞。但是 confirm 機(jī)制是異步的袁勺,你發(fā)送個(gè)消息之后就可以發(fā)送下一個(gè)消息,然后那個(gè)消息 RabbitMQ 接收了之后會(huì)異步回調(diào)你的一個(gè)接口通知你這個(gè)消息接收到了畜普。

  • RabbitMQ弄丟了數(shù)據(jù)

    就是 RabbitMQ 自己弄丟了數(shù)據(jù)期丰,這個(gè)你必須開(kāi)啟 RabbitMQ 的持久化,就是消息寫(xiě)入之后會(huì)持久化到磁盤吃挑,哪怕是 RabbitMQ 自己掛了钝荡,恢復(fù)之后會(huì)自動(dòng)讀取之前存儲(chǔ)的數(shù)據(jù),一般數(shù)據(jù)不會(huì)丟儒鹿。除非極其罕見(jiàn)的是化撕,RabbitMQ 還沒(méi)持久化,自己就掛了约炎,可能導(dǎo)致少量數(shù)據(jù)丟失植阴,但是這個(gè)概率較小蟹瘾。

    如果碰到這種情況怎么辦呢?

    持久化可以跟生產(chǎn)者那邊的 confirm 機(jī)制配合起來(lái)掠手,只有消息被持久化到磁盤之后憾朴,才會(huì)通知生產(chǎn)者 ack 了,所以哪怕是在持久化到磁盤之前喷鸽,RabbitMQ 掛了众雷,數(shù)據(jù)丟了,生產(chǎn)者收不到 ack 砾省,你也是可以自己重發(fā)的。

  • 消費(fèi)者弄丟了數(shù)據(jù)

    RabbitMQ 如果丟失了數(shù)據(jù)混槐,主要是因?yàn)槟阆M(fèi)的時(shí)候编兄,剛消費(fèi)到,還沒(méi)處理声登,結(jié)果進(jìn)程掛了狠鸳,比如重啟了,那么就尷尬了悯嗓,RabbitMQ 認(rèn)為你都消費(fèi)了件舵,這數(shù)據(jù)就丟了。

    這個(gè)時(shí)候就需要RabbitMQ提供的ack機(jī)制脯厨,消費(fèi)者消費(fèi)完成后會(huì)發(fā)送一個(gè)ack,RabbitMQ接收到了才會(huì)去刪除數(shù)據(jù)铅祸,否則保留數(shù)據(jù)。

    前提是關(guān)閉RabbitMQ自動(dòng)ack:

    image-20200627125028885.png
    messages, err := rabbitmq.Consume()
    if err != nil {
       logger.ErrorF("[Consume] rabbitmq Consume fail,err: %s", err.Error())
    }
    go func() {
       ch := make(chan int, 50)
       for msg := range messages {
          go func(m amqp.Delivery) {
             defer func() {
                <-ch
             }()
             ch <- 1
             if err = handleMessage(m.Body); err == nil {
                m.Ack(false) // 發(fā)送ack
             } else {
                logger.ErrorF("[Consume] handleMessage fail,err: %s", err.Error())
             }
          }(msg)
       }
    }()
    
    • m.Ack(true)的時(shí)候代表同一channe俄认,此傳遞和所有先前未確認(rèn)的傳遞將一起被確認(rèn)个少,這在批量處理的時(shí)候會(huì)用到。

    • m.Ack(false)的時(shí)候就是一個(gè)一個(gè)確認(rèn)眯杏。

總結(jié)

本節(jié)主要是分享了RabbitMQ在Go里的基本使用以及我們?nèi)绾稳ヌ幚硐G失的一個(gè)問(wèn)題。

當(dāng)然了還有其他很多更深層次的問(wèn)題壳澳,比如說(shuō)我們?nèi)绾稳ゴ_保消息隊(duì)列的高可用岂贩、如何去確保消息的順序性、遇到消息積壓等等等等巷波。因?yàn)槟乇救藢?duì)于RabbitMQ也沒(méi)有更深入的了解萎津,后續(xù)如果發(fā)現(xiàn)比較值得學(xué)習(xí)的地方再來(lái)給大家分享。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末抹镊,一起剝皮案震驚了整個(gè)濱河市锉屈,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌垮耳,老刑警劉巖颈渊,帶你破解...
    沈念sama閱讀 219,427評(píng)論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件遂黍,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡俊嗽,警方通過(guò)查閱死者的電腦和手機(jī)雾家,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,551評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)绍豁,“玉大人芯咧,你說(shuō)我怎么就攤上這事≈褡幔” “怎么了敬飒?”我有些...
    開(kāi)封第一講書(shū)人閱讀 165,747評(píng)論 0 356
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)芬位。 經(jīng)常有香客問(wèn)我驶拱,道長(zhǎng),這世上最難降的妖魔是什么晶衷? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 58,939評(píng)論 1 295
  • 正文 為了忘掉前任蓝纲,我火速辦了婚禮,結(jié)果婚禮上晌纫,老公的妹妹穿的比我還像新娘税迷。我一直安慰自己,他們只是感情好锹漱,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,955評(píng)論 6 392
  • 文/花漫 我一把揭開(kāi)白布箭养。 她就那樣靜靜地躺著,像睡著了一般哥牍。 火紅的嫁衣襯著肌膚如雪毕泌。 梳的紋絲不亂的頭發(fā)上,一...
    開(kāi)封第一講書(shū)人閱讀 51,737評(píng)論 1 305
  • 那天嗅辣,我揣著相機(jī)與錄音撼泛,去河邊找鬼。 笑死澡谭,一個(gè)胖子當(dāng)著我的面吹牛愿题,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播蛙奖,決...
    沈念sama閱讀 40,448評(píng)論 3 420
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼潘酗,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了雁仲?” 一聲冷哼從身側(cè)響起仔夺,我...
    開(kāi)封第一講書(shū)人閱讀 39,352評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎攒砖,沒(méi)想到半個(gè)月后缸兔,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體日裙,經(jīng)...
    沈念sama閱讀 45,834評(píng)論 1 317
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,992評(píng)論 3 338
  • 正文 我和宋清朗相戀三年灶体,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了阅签。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,133評(píng)論 1 351
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡蝎抽,死狀恐怖政钟,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情樟结,我是刑警寧澤养交,帶...
    沈念sama閱讀 35,815評(píng)論 5 346
  • 正文 年R本政府宣布,位于F島的核電站瓢宦,受9級(jí)特大地震影響碎连,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜驮履,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,477評(píng)論 3 331
  • 文/蒙蒙 一鱼辙、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧玫镐,春花似錦倒戏、人聲如沸。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 32,022評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至矫夷,卻和暖如春葛闷,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背双藕。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 33,147評(píng)論 1 272
  • 我被黑心中介騙來(lái)泰國(guó)打工淑趾, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人蔓彩。 一個(gè)月前我還...
    沈念sama閱讀 48,398評(píng)論 3 373
  • 正文 我出身青樓治笨,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親赤嚼。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,077評(píng)論 2 355