go-micro集成RabbitMQ實戰(zhàn)和原理

在go-micro中異步消息的收發(fā)是通過Broker這個組件來完成的,底層實現(xiàn)有RabbitMQ杆煞、Kafka魏宽、Redis等等很多種方式,這篇文章主要介紹go-micro使用RabbitMQ收發(fā)數(shù)據(jù)的方法和原理决乎。

Broker的核心功能

Broker的核心功能是Publish和Subscribe队询,也就是發(fā)布和訂閱。它們的定義是:

Publish(topic string, m *Message, opts ...PublishOption) error
Subscribe(topic string, h Handler, opts ...SubscribeOption) (Subscriber, error)

發(fā)布

發(fā)布第一個參數(shù)是topic(主題)构诚,用于標識某類消息蚌斩。

發(fā)布的數(shù)據(jù)是通過Message承載的,其包括消息頭和消息體范嘱,定義如下:

type Message struct {
    Header map[string]string
    Body   []byte
}

消息頭是map送膳,也就是一組KV(鍵值對)员魏。

消息體是字節(jié)數(shù)組,在發(fā)送和接收時需要開發(fā)者進行編碼和解碼的處理叠聋。

訂閱

訂閱的第一個參數(shù)也是topic(主題)撕阎,用于過濾出要接收的消息。

訂閱的數(shù)據(jù)是通過Handler處理的碌补,Handler是一個函數(shù)闻书,其定義如下:

type Handler func(Event) error

其中的參數(shù)Event是一個接口,需要具體的Broker來實現(xiàn)脑慧,其定義如下:

type Event interface {
    Topic() string
    Message() *Message
    Ack() error
    Error() error
}
  • Topic() 用于獲取當前消息的topic魄眉,也是發(fā)布者發(fā)送時的topic。
  • Message() 用于獲取消息體闷袒,也是發(fā)布者發(fā)送時的Message坑律,其中包括Header和Body。
  • Ack() 用于通知Broker消息已經(jīng)收到了囊骤,Broker可以刪除消息了晃择,可用來保證消息至少被消費一次。
  • Error() 用于獲取Broker處理消息過成功的錯誤也物。

開發(fā)者訂閱數(shù)據(jù)時宫屠,需要實現(xiàn)Handler這個函數(shù),接收Event的實例滑蚯,提取數(shù)據(jù)進行處理浪蹂,根據(jù)不同的Broker,可能還需要調(diào)用Ack()告材,處理出現(xiàn)錯誤時坤次,返回error。

go-micro集成RabbitMQ實戰(zhàn)

大概了解了Broker的定義之后斥赋,再來看下如何使用go-micro收發(fā)RabbitMQ消息缰猴。

啟動一個RabbitMQ

如果你已經(jīng)有一個RabbitMQ服務器,請?zhí)^這個步驟疤剑。

這里介紹一個使用docker快速啟動RabbitMQ的方法滑绒,當然前提是你得安裝了docker。

執(zhí)行如下命令啟動一個rabbitmq的docker容器:

docker run --name rabbitmq1 -p 5672:5672 -p 15672:15672 -d rabbitmq

然后進入容器進行一些設置:

docker exec -it rabbitmq1 /bin/bash

啟動管理工具隘膘、禁用指標采集(會導致某些API500錯誤):

rabbitmq-plugins enable rabbitmq_management
 
cd /etc/rabbitmq/conf.d/
echo management_agent.disable_metrics_collector = false > management_agent.disable_metrics_collector.conf

最后重啟容器:

docker restart rabbitmq1

最后瀏覽器中輸入 http://127.0.0.0:15672 即可訪問疑故,默認用戶名和密碼都是 guest 。

編寫收發(fā)函數(shù)

為了方便演示棘幸,先來定義發(fā)布消息和接收消息的函數(shù)焰扳。其中發(fā)布函數(shù)使用了go-micro提供的Event類型,還有其它類型也可以提供Publish的功能误续,這里發(fā)送的數(shù)據(jù)格式是Json字符串吨悍。接收消息的函數(shù)名稱可以隨意取,但是參數(shù)和返回值必須符合規(guī)范蹋嵌,也就是下邊代碼中的樣子育瓜,這個函數(shù)也可以是綁定到某個類型的。

// 定義一個發(fā)布消息的函數(shù):每隔1秒發(fā)布一條消息
func loopPublish(event micro.Event) {
    for {
        time.Sleep(time.Duration(1) * time.Second)

        curUnix := strconv.FormatInt(time.Now().Unix(), 10)
        msg := "{\"Id\":" + curUnix + ",\"Name\":\"張三\"}"
        event.Publish(context.TODO(), msg)
    }
}

// 定義一個接收消息的函數(shù):將收到的消息打印出來
func handle(ctx context.Context, msg interface{}) (err error) {
    defer func() {
        if r := recover(); r != nil {
            err = errors.New(fmt.Sprint(r))
            log.Println(err)
        }
    }()

    b, err := json.Marshal(msg)
    if err != nil {
        log.Println(err)
        return
    }

    log.Println(string(b))
    return
}

編寫主體代碼

這里先給出代碼栽烂,里面提供了一些注釋躏仇,后邊還會有詳細介紹。

func main() {
    // RabbitMQ的連接參數(shù)
    rabbitmqUrl := "amqp://guest:guest@127.0.0.1:5672/"
    exchangeName := "amq.topic"
    subcribeTopic := "test"
    queueName := "rabbitmqdemo_test"

    // 默認是application/protobuf腺办,這里演示用的是Json焰手,所以要改下
    server.DefaultContentType = "application/json"

    // 創(chuàng)建 RabbitMQ Broker
    b := rabbitmq.NewBroker(
        broker.Addrs(rabbitmqUrl),           // RabbitMQ訪問地址,含VHost
        rabbitmq.ExchangeName(exchangeName), // 交換機的名稱
        rabbitmq.DurableExchange(),          // 消息在Exchange中時會進行持久化處理
        rabbitmq.PrefetchCount(1),           // 同時消費的最大消息數(shù)量
    )

    // 創(chuàng)建Service怀喉,內(nèi)部會初始化一些東西书妻,必須在NewSubscribeOptions前邊
    service := micro.NewService(
        micro.Broker(b),
    )
    service.Init()

    // 初始化訂閱上下文:這里不是必需的,訂閱會有默認值
    subOpts := broker.NewSubscribeOptions(
        rabbitmq.DurableQueue(),   // 隊列持久化躬拢,消費者斷開連接后躲履,消息仍然保存到隊列中
        rabbitmq.RequeueOnError(), // 消息處理函數(shù)返回error時,消息再次入隊列
        rabbitmq.AckOnSuccess(),   // 消息處理函數(shù)沒有error返回時聊闯,go-micro發(fā)送Ack給RabbitMQ
    )

    // 注冊訂閱
    micro.RegisterSubscriber(
        subcribeTopic,    // 訂閱的Topic
        service.Server(), // 注冊到的rpcServer
        handle,           // 消息處理函數(shù)
        server.SubscriberContext(subOpts.Context), // 訂閱上下文工猜,也可以使用默認的
        server.SubscriberQueue(queueName),         // 隊列名稱
    )

    // 發(fā)布事件消息
    event := micro.NewEvent(subcribeTopic, service.Client())
    go loopPublish(event)

    log.Println("Service is running ...")
    if err := service.Run(); err != nil {
        log.Println(err)
    }
}

主要邏輯是:

1、先創(chuàng)建一個RabbitMQ Broker菱蔬,它實現(xiàn)了標準的Broker接口篷帅。其中主要的參數(shù)是RabbitMQ的訪問地址和RabbitMQ交換機,PrefetchCount是訂閱者(或稱為消費者)使用的拴泌。

2犹褒、然后通過 NewService 創(chuàng)建go-micro服務,并將broker設置進去弛针。這里邊會初始化很多東西叠骑,最核心的是創(chuàng)建一個rpcServer,并將rpcServer和這個broker綁定起來削茁。

3宙枷、然后是通過 RegisterSubscriber 注冊訂閱慰丛,這個注冊有兩個層面的功能:一是如果RabbitMQ上還不存在這個隊列時創(chuàng)建隊列诅病,并訂閱指定topic的消息;二是定義go-micro程序從這個RabbitMQ隊列接收數(shù)據(jù)的處理方式蝇棉。

這里詳細看下訂閱的參數(shù):

func RegisterSubscriber(topic string, s server.Server, h interface{}, opts ...server.SubscriberOption) error
  • topic:go-micro使用的是Topic模式篡殷,發(fā)布者發(fā)送消息的時候要指定一個topic,訂閱者根據(jù)需要只接收某個或某幾個topic的消息埋涧;
  • s:消息從RabbitMQ接收后會進入這個Server進行處理板辽,它是NewService的時候內(nèi)部創(chuàng)建的;
  • h:使用了上一步創(chuàng)建的接收消息的函數(shù) handle棘催,Server中的方法會調(diào)用這個函數(shù)劲弦;
  • opts 是訂閱的一些選項,這里需要指定RabbitMQ隊列的名稱醇坝;另外SubscriberContext定義了訂閱的一些行為邑跪,這里DurableQueue設置RabbitMQ訂閱消息的持久化方式,一般我們都希望消息不丟失纲仍,這個設置的作用是即使程序與RabbitMQ的連接斷開呀袱,消息也會保存在RabbitMQ隊列中;AckOnSuccess和RequeueOnError定義了程序處理消息出現(xiàn)錯誤時的行為郑叠,如果handle返回error夜赵,消息會重新返回RabbitMQ,然后再投遞給程序乡革。

4寇僧、然后這里為了演示,通過NewEvent創(chuàng)建了一個Event沸版,通過它每隔一秒發(fā)送1條消息嘁傀。

5、最后通過service.Run()把這個程序啟動起來细办。

辛苦寫了半天,看一下這個程序的運行效果:

image-20220429214626817

注意一般發(fā)布者和訂閱者是在不同的程序中,這里只是為了方便演示瓤狐,才把他們放在一個程序中嗓节。所以如果只是發(fā)布消息赦政,就不需要訂閱的代碼耀怜,如果只是訂閱,也不需要發(fā)布消息的代碼左痢,大家使用的時候根據(jù)需要自己裁剪吧。

go-micro集成RabbitMQ的處理流程

這個部分來看一下消息在go-micro和RabbitMQ中是怎么流轉(zhuǎn)的定页,我畫了一個示意圖:

go-micro-rabbiitmq

這個圖有點復雜,這里詳細講解下卒落。

首先分成三塊:RabbitMQ儡毕、消息發(fā)布部分、消息接收部分檐盟,這里用不同的顏色進行了區(qū)分。

  • RabbitMQ不是本文的重點谎痢,就把它看成一個整體就行了节猿。
  • 消息發(fā)布部分:從生產(chǎn)者程序調(diào)用Event.Publish開始,然后調(diào)用Client.Publish太雨,到這里為止,都是在go-micro的核心模塊中進行處理锥咸;然后再調(diào)用Broker.Publish,這里的Broker是RabbitMQ插件的Broker實例缔刹,從這里開始進入了RabbiitMQ插件部分,然后再依次通過RabbitMQ Connection的Publish方法鸟廓、RabbitMQ Channle的Publish方法,最終發(fā)送到RabbitMQ中员咽。
  • 消息接收部分:Service.Run內(nèi)部會調(diào)用rpcServer.Start契讲,這個方法內(nèi)部會調(diào)用Broker.Subscribe捡偏,這個方法是RabbitMQ插件中定義的,它會讀取RegisterSubscriber時的一些RabbitMQ隊列設置彤避,然后再依次傳遞到RabbitMQ Connection的Consume方法挟伙、RabbitMQ Channel的ConsumeQueue方法贮缅,最終連接到RabbitMQ,并在RabbitMQ上設置好要訂閱的隊列桂肌;這些方法還會返回一個類型為amqp.Delivery的Go Channel,Broker.Subscribe不斷的從這個Go Channel中讀取數(shù)據(jù)谭跨,然后再發(fā)送到調(diào)用Broker.Subscribe時傳入的一個消息處理方法中所坯,這里就是rpcServer.HandleEvnet堂湖,消息經(jīng)過一些處理后再進入rpcServer內(nèi)部的路由處理模塊饵蒂,這里就是route.ProcessMessage,這個方法內(nèi)部會根據(jù)當前消息的topic查找RegisterSubscriber時注冊的訂閱渊迁,并最終調(diào)用到當時注冊的用于接收消息的函數(shù)。

這個處理過程還可以劃分為業(yè)務部分、核心模塊部分和插件部分耕漱。

  • 首先創(chuàng)建一個插件的Broker實現(xiàn),把它注冊到核心模塊的rpcServer中;
  • 消息的發(fā)送從業(yè)務部分進入核心模塊部分寞宫,再進入具體實現(xiàn)Broker的插件部分毛俏;
  • 消息的接收則首先進入插件部分焕蹄,然后再流轉(zhuǎn)到核心模塊部分,再流轉(zhuǎn)到業(yè)務部分永品。

從上邊的圖中可以看到消息都需要經(jīng)過這個RabbitMQ插件進行處理做鹰,實際上可以只使用這個插件,就能實現(xiàn)消息的發(fā)送和接收鼎姐。這個演示代碼我已經(jīng)提交到了Github钾麸,有興趣的同學可以在文末獲取Github倉庫的地址。

從上邊這些劃分中炕桨,我們可以理解到設計者的整體設計思路饭尝,把握關(guān)鍵節(jié)點吭净,用好用對庶柿,出現(xiàn)問題時可以快速定位。

填的幾個坑

不能接收其它框架發(fā)布的消息

這個是因為route.ProcessMessage查找訂閱時使用了go-micro專用的一個頭信息:

// get the subscribers by topic
    subs, ok := router.subscribers[msg.Topic()]

這個msg.Topic返回的是如下實例中的topic字段:

    rpcMsg := &rpcMessage{
        topic:       msg.Header["Micro-Topic"],
        contentType: ct,
        payload:     &raw.Frame{Data: msg.Body},
        codec:       cf,
        header:      msg.Header,
        body:        msg.Body,
    }

其它框架不會有這么一個頭信息赎懦,除非專門適配go-micro迁酸。

因為使用RabbitMQ的場景下,整個開發(fā)都是圍繞RabbitMQ做的更胖,而且go-micro的處理邏輯沒有考慮RabbitMQ訂閱可以使用通配符的情況,發(fā)布消息的Topic题画、接收消息的Topic與Micro-Topic的值匹配時都是按照是否相等的原則處理的衙四,因此可以用RabbitMQ消息自帶的topic來設置這個消息頭漂彤。rabbitmq.rbroker.Subscribe 中接收到消息后泉哈,就可以進行這個設置:

// Messages sent from other frameworks to rabbitmq do not have this header.
        // The 'RoutingKey' in the message can be used as this header.
        // Then the message can be transfered to the subscriber which bind this topic.
        msgTopic := header["Micro-Topic"]
        if msgTopic == "" {
            header["Micro-Topic"] = msg.RoutingKey
        }

這樣go-micro開發(fā)的消費者程序就能接收其它框架發(fā)布的消息了煤率,其它框架無需適配。

RabbitMQ重啟后訂閱者和發(fā)布者無限阻塞

go-micro的RabbitMQ插件底層使用另一個庫:github.com/streadway/amqp

對于發(fā)布者多糠,RabbitMQ斷開連接時amqp庫會通過Go Channel同步通知go-micro,然后go-micro可以發(fā)起重新連接吞滞。問題出現(xiàn)在這個同步通知上疾掰,go-micro的RabbitMQ插件設置了接收連接和通道的關(guān)閉通知芬迄,但是只處理了一個通知就去重新連接了哗戈,這就導致有一個Go Channel一直阻塞沐兵,而這個阻塞會導致某個鎖不能釋放预吆,這個鎖又是Publish時候需要的龙填,因此導致發(fā)布者無限阻塞。解決辦法就是外層增加一個循環(huán)拐叉,等所有的通知都收到了觅够,再去做重新連接。

對于訂閱者巷嚣,RabbitMQ斷開連接時,它會一直阻塞在某個Go Channel上钳吟,直到它返回一個值廷粒,這個值代表連接已經(jīng)重新建立,訂閱者可以重建消費通道。問題也是出現(xiàn)在這個阻塞的Go Channel上坝茎,因為這個Go Channel在每次收到amqp的關(guān)閉通知時會重新賦值涤姊,而訂閱者等待的Go Channel可能是之前的舊值,永遠也不會返回嗤放,訂閱者也就無限阻塞了思喊。解決辦法呢,就是在select時增加一個time.After次酌,讓等待的Go Channel有機會更新到新值恨课。

代碼就不貼了,有興趣的可以到Github中去看:https://github.com/go-micro/plugins/commit/9f64710807221f3cc649ba4fe05f75b07c66c00c

關(guān)于這兩個問題的修改已經(jīng)合并到官方倉庫中岳服,大家去get最新的代碼就可以了剂公。

這兩個坑填了,基本上就能滿足我的需要了吊宋。當然可能還有其它的坑纲辽,比如go-micro的RabbitMQ插件好像沒有發(fā)布者確認的功能,這個要實現(xiàn)璃搜,還得好好想想怎么改拖吼。


好了,以上就是本文的主要內(nèi)容这吻。

老規(guī)矩吊档,代碼已經(jīng)上傳到Github,歡迎訪問:https://github.com/bosima/go-demo/tree/main/go-micro-broker-rabbitmq

收獲更多架構(gòu)知識橘原,請關(guān)注公眾號 螢火架構(gòu)籍铁。原創(chuàng)內(nèi)容,轉(zhuǎn)載請注明出處趾断。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末拒名,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子芋酌,更是在濱河造成了極大的恐慌增显,老刑警劉巖,帶你破解...
    沈念sama閱讀 217,657評論 6 505
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件脐帝,死亡現(xiàn)場離奇詭異同云,居然都是意外死亡,警方通過查閱死者的電腦和手機堵腹,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,889評論 3 394
  • 文/潘曉璐 我一進店門炸站,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人疚顷,你說我怎么就攤上這事旱易〗耍” “怎么了?”我有些...
    開封第一講書人閱讀 164,057評論 0 354
  • 文/不壞的土叔 我叫張陵阀坏,是天一觀的道長如暖。 經(jīng)常有香客問我,道長忌堂,這世上最難降的妖魔是什么盒至? 我笑而不...
    開封第一講書人閱讀 58,509評論 1 293
  • 正文 為了忘掉前任,我火速辦了婚禮士修,結(jié)果婚禮上枷遂,老公的妹妹穿的比我還像新娘。我一直安慰自己李命,他們只是感情好登淘,可當我...
    茶點故事閱讀 67,562評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著封字,像睡著了一般黔州。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上阔籽,一...
    開封第一講書人閱讀 51,443評論 1 302
  • 那天流妻,我揣著相機與錄音,去河邊找鬼笆制。 笑死绅这,一個胖子當著我的面吹牛,可吹牛的內(nèi)容都是我干的在辆。 我是一名探鬼主播证薇,決...
    沈念sama閱讀 40,251評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼匆篓!你這毒婦竟也來了浑度?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,129評論 0 276
  • 序言:老撾萬榮一對情侶失蹤鸦概,失蹤者是張志新(化名)和其女友劉穎箩张,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體窗市,經(jīng)...
    沈念sama閱讀 45,561評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡先慷,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,779評論 3 335
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了咨察。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片论熙。...
    茶點故事閱讀 39,902評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖摄狱,靈堂內(nèi)的尸體忽然破棺而出赴肚,到底是詐尸還是另有隱情素跺,我是刑警寧澤,帶...
    沈念sama閱讀 35,621評論 5 345
  • 正文 年R本政府宣布誉券,位于F島的核電站,受9級特大地震影響刊愚,放射性物質(zhì)發(fā)生泄漏踊跟。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,220評論 3 328
  • 文/蒙蒙 一鸥诽、第九天 我趴在偏房一處隱蔽的房頂上張望商玫。 院中可真熱鬧,春花似錦牡借、人聲如沸拳昌。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,838評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽炬藤。三九已至,卻和暖如春碴里,著一層夾襖步出監(jiān)牢的瞬間沈矿,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,971評論 1 269
  • 我被黑心中介騙來泰國打工咬腋, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留羹膳,地道東北人。 一個月前我還...
    沈念sama閱讀 48,025評論 2 370
  • 正文 我出身青樓根竿,卻偏偏與公主長得像陵像,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子寇壳,可洞房花燭夜當晚...
    茶點故事閱讀 44,843評論 2 354

推薦閱讀更多精彩內(nèi)容