分布式消息隊(duì)列rabbitmq 總結(jié)

什么時(shí)候使用

用于系統(tǒng)的物理解耦和邏輯解耦

場(chǎng)景

  • 削峰填谷
  • 數(shù)據(jù)驅(qū)動(dòng)的任務(wù)依賴
  • 多個(gè)接收方刁憋,上游不關(guān)心多下游執(zhí)行結(jié)果
  • upsteam 關(guān)注結(jié)果但時(shí)間很長(zhǎng)

消息隊(duì)列的推送

  • 推送 push
    • 優(yōu)點(diǎn)
      • 實(shí)時(shí)性好
      • 中間件服務(wù)器做負(fù)載均衡
    • 缺點(diǎn)
      • 需要確認(rèn)收到
  • 拉取 pull
    • 優(yōu)點(diǎn)
      • 可以拉取多條
      • 服務(wù)端邏輯少
    • 缺點(diǎn):
      1. 可能導(dǎo)致消息堆積
      2. 消費(fèi)端主動(dòng)輪訓(xùn)

rabbitmq的push和pull


  • 這種方式Api比較簡(jiǎn)單醇滥,但是需要自己控制拉取節(jié)奏佩谣,
func (ch *Channel) Get(queue string, autoAck bool) (msg Delivery, ok bool, err error)

Get synchronously receives a single Delivery from the head of a queue from the server to the client. In almost all cases, using Channel.Consume will be preferred.

If there was a delivery waiting on the queue and that delivery was received, the second return value will be true. If there was no delivery waiting or an error occurred, the ok bool will be false.

func (ch *Channel) Consume(queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args Table) (<-chan Delivery, error)
c.deliveryChan, err = c.channel.Consume(c.queueName,
        c.tag,
        false,
        false,
        false,
        false,
        nil)
    if err != nil {
        c.RabbitMQClient.Close()
        return err
    }
    
    // important: 在一個(gè)goroutine中同時(shí)對(duì)msgs和notifyClose兩個(gè)channel進(jìn)行讀取可能會(huì)導(dǎo)致死鎖凡怎。
    // 因?yàn)閙sgs被關(guān)閉就會(huì)結(jié)束相應(yīng)的goroutine,
    // 此時(shí)notifyClose因?yàn)闆](méi)有接收者宫蛆,而在amqp.channel關(guān)閉的過(guò)程中出現(xiàn)死鎖艘包。
    go c.handle(c.deliveryChan) //另外啟動(dòng)一個(gè)攜程處理任務(wù)

消息不重復(fù)消費(fèi)

在消息中添加唯一的消息ID的猛;同時(shí)確保消息的冪等性;

消息不丟

消息發(fā)送的時(shí)候Producer要收到rabbitmq的Confirm消息想虎;消費(fèi)端收到消息后應(yīng)該給rabbitmq發(fā)送ACK卦尊;

保證送達(dá)

  1. 在保證消息不丟的前提下,在發(fā)送到rabbitmq之后寫入數(shù)據(jù)庫(kù)舌厨,當(dāng)消息被consumer處理之后更新數(shù)據(jù)庫(kù)中的狀態(tài)岂却;
  2. 啟動(dòng)一個(gè)異步認(rèn)為定時(shí)檢查數(shù)據(jù)庫(kù)中的任務(wù),如果狀態(tài)沒(méi)有被更新就取出來(lái)重新發(fā)送到消息隊(duì)列裙椭;
  3. 在保證消息冪等性的前提下躏哩,可以保證消息被送達(dá)

消息順序性

消息中只有一個(gè)接收者的情況下,可以保證消息的順序消費(fèi)

消息隊(duì)列的延時(shí)以及過(guò)期失效問(wèn)題

延時(shí)隊(duì)列可以通過(guò)以下2種方式實(shí)現(xiàn):

  1. 死信隊(duì)列

    • 死信產(chǎn)生:

      • 消息被拒絕(basic.reject / basic.nack)揉燃,并且requeue = false
      • 消息TTL過(guò)期
      • 隊(duì)列達(dá)到最大長(zhǎng)度
    • 死信說(shuō)明
      DLX也是一個(gè)正常的Exchange扫尺,和一般的Exchange沒(méi)有區(qū)別,它能在任 何的隊(duì)列上被指定炊汤,實(shí)際上就是設(shè)置某個(gè)隊(duì)列的屬性正驻。當(dāng)這個(gè)隊(duì)列中有死信時(shí),RabbitMQ就會(huì)自動(dòng)的將這個(gè)消息重新發(fā)布到設(shè)置的Exchange上去抢腐,進(jìn)而被路由到另一個(gè)隊(duì)列拨拓。可以監(jiān)聽(tīng)這個(gè)隊(duì)列中的消息做相應(yīng)的處理氓栈。

    • 延時(shí)消息
      基于上面的說(shuō)明,發(fā)送一條消息到一個(gè)沒(méi)有consumer的exchange 并設(shè)置ttl的過(guò)期時(shí)間為我們需要延時(shí)的時(shí)間比如30(秒),當(dāng)ttl過(guò)期之后就會(huì)根據(jù)私信dlx.exchange路由到指定的queue中婿着;然后再死信隊(duì)列中的consumer復(fù)制消費(fèi)這個(gè)消息

  2. 使用插件 delayed_exchange
    rabbitmq-delayed-message-exchange
    創(chuàng)建exchange的時(shí)候需要按照下圖所示指定類型

    arg := make(map[string]interface{})
    arg["x-delayed-type"] = "topic"
    ex := mq.Exchange{
        Name: "delay-task2",
        Kind: "x-delayed-message",
        Args: arg,
    }
    

    發(fā)送message的時(shí)候指定消息的delay的時(shí)間

     table := make(map[string]interface{})
     table["x-delay"] = 3000  // 指定delay 3s
     err := p.Publish(ctx, d, "delay.order.abc", table);
    

限流

  // 限定prefetch count  prefetch_size, global
  //Qos controls how many messages or how many bytes the server will try to keep on the network for consumers before receiving delivery acks. The intent of Qos is to make sure the network buffers stay full between the server and client.
  if err := c.channel.Qos(1, 0, false); err != nil {
  }
c.deliveryChan, err = c.channel.Consume(c.queueName,
      c.tag,
      false, // 關(guān)閉auto ack
      false,
      false,
      false,
      nil)
  if err != nil {
      c.RabbitMQClient.Close()
      return err
  }

斷線重連

單獨(dú)啟動(dòng)一個(gè)協(xié)程授瘦,執(zhí)行下面的操作

func (c *RabbitMQConsumer) reconnect() error {
    // 重新連接成功之后,重新執(zhí)行consume竟宋;

    for {
        // 是否發(fā)生錯(cuò)誤
        select {
        case err := <-c.connNotify:
            if err != nil {
                log.Println("rabbitmq consumer - connection NotifyClose: ", err)
            }
        case err := <-c.channelNotify:
            if err != nil {
                log.Println("rabbitmq consumer - channel NotifyClose: ", err)
            }
        case <-c.quit:
            return nil
        }
        // 連接未關(guān)閉
        if !c.conn.IsClosed() {
            var errConn, errChannel *amqp.Error
            for errChannel = range c.channelNotify {
                log.Println(errChannel)
            }
            for errConn = range c.connNotify {
                log.Println(errConn)
            }

            // 關(guān)閉 SubMsg common delivery
            if err := c.channel.Cancel(c.tag, true); err != nil {
                log.Println("rabbitmq consumer - channel cancel failed: ", err)
            }
            if err := c.channel.Close(); err != nil {

            }
            if err := c.conn.Close(); err != nil {
                log.Println("rabbitmq consumer - connection close failed: ", err)
            }
        } else {
            log.Println("conn is closed")
        }
        // IMPORTANT: 必須清空 Notify提完,否則死連接不會(huì)釋放 如果還有error一起讀完否則連接不能釋放

    retry:
        for {
            select {
            case <-c.quit:
                return nil
            default:
                log.Println("rabbitmq consumer - reconnect")
                // 第二次重新連接
                if err := c.Init(); err != nil {
                    // 等待;然后重試
                    time.Sleep(time.Second * 10)
                    log.Println("loop again continue")
                    continue
                }
                break retry
            }
        }
    }
}

高可用

  1. 主備
    只有主節(jié)點(diǎn)提供讀寫丘侠;備用節(jié)點(diǎn)只是在主節(jié)點(diǎn)掛掉的情況下服務(wù)徒欣;并發(fā)量并不大的情況可以使用haproxy做主備;

  2. 鏡像模式
    mirror鏡像隊(duì)列蜗字;保證rabbitmq數(shù)據(jù)的高可靠性打肝,實(shí)現(xiàn)數(shù)據(jù)同步2-3個(gè)節(jié)點(diǎn)的數(shù)據(jù)同步;前端需要自己做負(fù)載均衡


    image.png
  3. Federation
    在broker之間傳輸消息的插件
    https://github.com/rabbitmq/rabbitmq-federation

image.png

如上圖所示挪捕,F(xiàn)ederation Exchanges,可以看成Downstream從Upstream主動(dòng)拉取消息粗梭,但并不是拉取所有消息,必須是在Downstream上已經(jīng)明確定義Bindings關(guān)系的Exchange级零,也就是有實(shí)際的物理Queue來(lái)接收消息断医,才會(huì)從Upstream拉取消息到Downstream。使用AMQP協(xié)議實(shí)施代理間通信,Downstream會(huì)將綁定關(guān)系組合在一起鉴嗤,綁定/解綁命令將會(huì)發(fā)送到Upstream交換機(jī)斩启。因此,F(xiàn)ederationExchange只接收具有訂閱的消息醉锅。

rabbitmq 思維導(dǎo)圖

rabbit mq.png
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末兔簇,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子荣挨,更是在濱河造成了極大的恐慌男韧,老刑警劉巖,帶你破解...
    沈念sama閱讀 219,110評(píng)論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件默垄,死亡現(xiàn)場(chǎng)離奇詭異此虑,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)口锭,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,443評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門朦前,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人鹃操,你說(shuō)我怎么就攤上這事韭寸。” “怎么了荆隘?”我有些...
    開(kāi)封第一講書人閱讀 165,474評(píng)論 0 356
  • 文/不壞的土叔 我叫張陵恩伺,是天一觀的道長(zhǎng)。 經(jīng)常有香客問(wèn)我椰拒,道長(zhǎng)晶渠,這世上最難降的妖魔是什么? 我笑而不...
    開(kāi)封第一講書人閱讀 58,881評(píng)論 1 295
  • 正文 為了忘掉前任燃观,我火速辦了婚禮褒脯,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘缆毁。我一直安慰自己番川,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,902評(píng)論 6 392
  • 文/花漫 我一把揭開(kāi)白布脊框。 她就那樣靜靜地躺著颁督,像睡著了一般。 火紅的嫁衣襯著肌膚如雪浇雹。 梳的紋絲不亂的頭發(fā)上适篙,一...
    開(kāi)封第一講書人閱讀 51,698評(píng)論 1 305
  • 那天,我揣著相機(jī)與錄音箫爷,去河邊找鬼嚷节。 笑死聂儒,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的硫痰。 我是一名探鬼主播衩婚,決...
    沈念sama閱讀 40,418評(píng)論 3 419
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼效斑!你這毒婦竟也來(lái)了非春?” 一聲冷哼從身側(cè)響起,我...
    開(kāi)封第一講書人閱讀 39,332評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤缓屠,失蹤者是張志新(化名)和其女友劉穎奇昙,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體敌完,經(jīng)...
    沈念sama閱讀 45,796評(píng)論 1 316
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡储耐,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,968評(píng)論 3 337
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了滨溉。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片什湘。...
    茶點(diǎn)故事閱讀 40,110評(píng)論 1 351
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖晦攒,靈堂內(nèi)的尸體忽然破棺而出闽撤,到底是詐尸還是另有隱情,我是刑警寧澤脯颜,帶...
    沈念sama閱讀 35,792評(píng)論 5 346
  • 正文 年R本政府宣布哟旗,位于F島的核電站,受9級(jí)特大地震影響栋操,放射性物質(zhì)發(fā)生泄漏热幔。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,455評(píng)論 3 331
  • 文/蒙蒙 一讼庇、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧近尚,春花似錦蠕啄、人聲如沸。這莊子的主人今日做“春日...
    開(kāi)封第一講書人閱讀 32,003評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至格遭,卻和暖如春哈街,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背拒迅。 一陣腳步聲響...
    開(kāi)封第一講書人閱讀 33,130評(píng)論 1 272
  • 我被黑心中介騙來(lái)泰國(guó)打工骚秦, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留她倘,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 48,348評(píng)論 3 373
  • 正文 我出身青樓作箍,卻偏偏與公主長(zhǎng)得像硬梁,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子胞得,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,047評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容