原文地址:https://ruby-china.org/topics/34240
本文描述了如何實現(xiàn)一個消息回調(diào)中間件,得益于 golang 管道和協(xié)程的編程思想沿癞,通過巧妙的設(shè)計棒掠,只需要約500行代碼就可以實現(xiàn)高性能愁茁、優(yōu)雅關(guān)閉整葡、自動重連等特性,基于作者的源碼自己改了一版 https://github.com/weylau/mcenter
問題
隨著業(yè)務(wù)復(fù)雜度的增加鲫懒,服務(wù)拆分后服務(wù)數(shù)量不斷增加,異步消息隊列的引入是必不可少的刽辙。當(dāng)服務(wù)較少的時候窥岩,比如業(yè)務(wù)早期,很多時候就是一個比較大的單體應(yīng)用或者少量幾個服務(wù)宰缤,消息隊列(之后寫做 MQ颂翼,Message Queue)的使用方法如下:
- 發(fā)送端,直接連接 MQ慨灭,根據(jù)業(yè)務(wù)需求發(fā)送消息朦乏;
- 消費端,通過一個后臺進程氧骤,通過長連接連接至 MQ呻疹,然后實時消費消息,然后進行相應(yīng)的處理筹陵;
相對來說刽锤,發(fā)送端比較簡單,消費端比較復(fù)雜朦佩,需要處理的邏輯比較多并思。比如目前我們公司使用的 sneakers 需要處理如下的邏輯:
- 消費端需要長連接,需要獨立的進程實時消費消息(某些語言可能是一個獨立的線程)语稠;
- 消費消息之后宋彼,需要加載業(yè)務(wù)框架(比如 Sneakers 需要加入 Rails 環(huán)境執(zhí)行業(yè)務(wù)代碼)調(diào)用相關(guān)代碼來消費消息;
- MQ 無法連接時仙畦,需要自動重連宙暇,同時應(yīng)用也需要能夠優(yōu)雅重啟,不至于丟消息议泵。
- 消費消息很可能處理失敗占贫,這個時候需要比較安全可靠的機制保證不能丟失消息,同時也要求能夠過一段時間對消息進行重試先口,重試多次之后也需要能夠?qū)ο⑦M一步做處理型奥;
而隨著服務(wù)增多,如果每個需要消費消息的服務(wù)都部署一個這樣的后臺進程顯然不夠環(huán)保:
- 每個服務(wù)增加一個進程碉京,增加了部署運維的成本厢汹;
- 對于隊列的管理(創(chuàng)建、銷毀谐宙、binding)以及消息重試機制烫葬,每個服務(wù)來自己負責(zé)的話,很容易造成標(biāo)準(zhǔn)不統(tǒng)一;
- 如果不同的服務(wù)是不同的語言搭综、不同的框架垢箕,每個語言又都要實現(xiàn)一遍,會浪費不少開發(fā)資源兑巾;
那有沒有更好的辦法呢条获?
其中一般辦法是打造一個全局的、高性能的消息回調(diào)中間件蒋歌,中間件來負責(zé)隊列的管理帅掘、消息的收發(fā)、重試以及出錯處理堂油,這樣就不再需要每個服務(wù)去考慮諸如消息丟失修档、消息重試等問題了,基本解決了上面的缺點府框。具體這個消息回調(diào)中心應(yīng)該具備哪些功能呢吱窝?
- 統(tǒng)一管理所有 MQ 隊列的創(chuàng)建和消息監(jiān)聽;
- 當(dāng)有消息接收到時寓免,中間件調(diào)用相關(guān)服務(wù)的回調(diào)地址,因為回調(diào)中心負責(zé)所有的服務(wù)计维,該中間件必須是高性能袜香、高并發(fā)的;
- 中間件應(yīng)當(dāng)具備消息重試的功能鲫惶,同時重試消息的時候不應(yīng)該丟失消息蜈首;
- 中間件應(yīng)當(dāng)具備「重連」和「優(yōu)雅關(guān)閉」等基礎(chǔ)功能,這樣才能保證不丟消息欠母;
這樣的話欢策,每個服務(wù)的工作就變得輕量了很多。本文的目的就是來實現(xiàn)一版生產(chǎn)環(huán)境可用的消息回調(diào)中間件赏淌。當(dāng)然踩寇,我們第一版的回調(diào)中心也不需要太多功能,有如下的限制:
- 整個重試流程需要 RabbitMQ 內(nèi)置功能支持六水,所以暫時只支持 RabbitMQ俺孙;
- 目前只支持 HTTP 回調(diào)方式;
基本的需求有了掷贾,如何實現(xiàn)一個這樣的消息回調(diào)中間件呢睛榄?
解決思路
開發(fā)語言選擇
Golang 作為「系統(tǒng)級開發(fā)語言」,非常適合開發(fā)這類中間件想帅。內(nèi)建的 goroutine/channel 機制非常容易實現(xiàn)高并發(fā)场靴。而作為 Golang 新手,這個項目也不復(fù)雜,很適合練手和進一步學(xué)習(xí)旨剥。
消息可靠性
關(guān)于重試和出錯處理呢咧欣?我們從 Sneakers 的實現(xiàn)中借鑒了它的方法,通過利用 RabbitMQ 內(nèi)置的機制泞边,也就是通過 x-dead-letter 機制來保證消息在重試時可以做到高可靠性该押,具體可以參考前段時間我寫的這篇文章。簡單總結(jié)一下文中的思路:
- 消息正常被處理時阵谚,直接 ack 消息就好蚕礼;
- 當(dāng)消息處理出錯,需要重試時梢什,reject 消息奠蹬,此時消息會進入到單獨的 retry 隊列;
- retry 隊列配置好了 ttl 超時時間嗡午,等到超時時囤躁,消息會進入到 requeue Exchange(RabbitMQ 的概念,用來做消息的路由)荔睹;
- 消息會再次進入工作隊列狸演,等待被下次重試;
- 如果消息的重試次數(shù)超過了一定的值僻他,那么消息會進入到錯誤隊列等待進一步處理宵距;
這里面有兩個地方利用了 RabbitMQ 的 Dead-Letter 機制:
- 當(dāng)消息被 reject 之后,消息進入到該隊列的 dead-letter Exchange 吨拗,也就是重試隊列满哪;
- 當(dāng)重試隊列的消息,在超時時(隊列設(shè)置了 ttl-expires 時長)劝篷,消息進入該隊列的 dead-letter Exchange哨鸭,也就是重新進入了工作隊列。
通過這種機制娇妓,可以保證在進行消息處理的時候像鸡,不管是正常、還是出錯時哈恰,消息都不會丟失坟桅。關(guān)于這里進一步的細節(jié)可以參考上面的文章。
實現(xiàn)高并發(fā)
對于中間件蕊蝗,性能的要求比較高仅乓,性能也包含兩個方面:低延遲和高并發(fā)。低延遲在這個場景中我們無法解決蓬戚,因為一個消息回調(diào)之后的延遲是其他業(yè)務(wù)服務(wù)決定的夸楣。所以我們更多的是追求高并發(fā)。
如何獲得高并發(fā)?首先是開發(fā)語言的選擇豫喧,這類底層的中間件很適合用 Golang 來實現(xiàn)石洗,為什么呢?因為回調(diào)中心的主邏輯就是不斷回調(diào)各個服務(wù)紧显,而各個服務(wù)的延遲時間中間件無法控制讲衫,所以如果想獲得高并發(fā),最好是使用異步事件這種機制孵班。而借助于 Golang 內(nèi)置的 Channel 涉兽,既可以獲得接近于異步事件的性能,又可以讓整個開發(fā)變得簡單高效篙程,是一個比較合適的選擇枷畏。
具體實現(xiàn)呢?其實對于一個回調(diào)中心來說虱饿,大概分成這么幾個步驟:
- 獲取消息:連接消息隊列( 目前我們只需要支持 RabbitMQ 即可)拥诡,消費消息;
- 回調(diào)業(yè)務(wù)接口:消費消息之后氮发,根據(jù)配置信息渴肉,不同的隊列可能需要調(diào)用不同的回調(diào)地址,開始調(diào)用業(yè)務(wù)接口(目前我們只需要支持 HTTP 協(xié)議即可)爽冕;
- 根據(jù)回調(diào)結(jié)果處理消息:如果調(diào)用業(yè)務(wù)接口如果成功仇祭,則直接 ack 消息即可;如果調(diào)用失敗扇售,則 reject 此消息前塔;如果超過最大重試次數(shù)嚣艇,則進入出錯處理邏輯承冰;
- 出錯處理邏輯:把原有消息 ack,同時轉(zhuǎn)發(fā)此消息進入 error 隊列食零,等待進一步處理(可能是報警困乒,然后人工處理);
通過消息這么一個「實體」可以把所有上面的流程串聯(lián)起來贰谣,是不是很像 pipeline 娜搂?而 pipeline 的設(shè)計模式是 Golang 非常推薦的實現(xiàn)高并發(fā)的方式。上面的每一個步驟可以看做一組協(xié)程(goroutine)吱抚,他們之間通過管道通信百宇,因此不存在資源競爭的情況,大大降低了開發(fā)成本秘豹。
而上面每個步驟可以通過設(shè)計不同的 Goroutine 模型來實現(xiàn)高并發(fā):
- 獲取消息:需要長連接 RabbitMQ携御,較好的實現(xiàn)方式是每個隊列有獨立的一組協(xié)程,這樣隊列之間的消息接受互相不會干擾,如果出現(xiàn)了繁忙隊列和較閑的隊列時啄刹,也不會出現(xiàn)消息處理不及時的情況涮坐;
- 回調(diào)業(yè)務(wù)接口:每個消息都會調(diào)用業(yè)務(wù)接口,但是業(yè)務(wù)接口的處理時長對于中間件來說是透明的誓军。因此袱讹,這里最好的模型是每個消息一個協(xié)程。如果出現(xiàn)了較慢的接口昵时,那么通過 goroutine 的內(nèi)部調(diào)度機制捷雕,并不會影響系統(tǒng)的吞吐,同時 goroutine 可以支持上百萬的并發(fā)债查,因此用這種模式最合適非区。
- 根據(jù)回調(diào)結(jié)果處理消息:這個步驟主要是要連接 RabbitMQ,發(fā)送 ack/reject 消息盹廷。默認我們認為 RabbitMQ 是可靠的征绸,這里統(tǒng)一用同一組協(xié)程來處理即可。
- 出錯處理邏輯:這里的消息量應(yīng)該大大降低俄占,因為多次失敼艿 (超過重試次數(shù))的消息才會進入到這里。我們也采用同一組協(xié)程處理即可缸榄。
上面四個步驟渤弛,我們用了三種協(xié)程的設(shè)計模型,細化一下上面的圖就是這個樣子的甚带。
實現(xiàn)
有了上面的設(shè)計過程她肯,代碼并不復(fù)雜,大概分為幾部分:配置管理鹰贵、主流程晴氨、消息對象、重試邏輯以及優(yōu)雅關(guān)閉等的實現(xiàn)碉输。詳細的代碼放在 github:fishtrip/watchman
配置管理
配置管理這部分籽前,這個版本我們實現(xiàn)的比較簡單,就是讀取 yml 配置文件敷钾。配置文件主要包含的主要是三部分信息:
- 消息隊列定義枝哄。要根據(jù)消息隊列的配置調(diào)用 RabbitMQ 接口生成相關(guān)的隊列(重試隊列、錯誤隊列等)阻荒;
- 回調(diào)地址配置挠锥。不同的消息隊列需要不同的回調(diào)地址;
- 其他配置侨赡。如重試次數(shù)蓖租、超時等纱控。
# config/queues.example.yml
projects:
- name: test
queues_default:
notify_base: "http://localhost:8080"
notify_timeout: 5
retry_times: 40
retry_duration: 300
binding_exchange: fishtrip
queues:
- queue_name: "order_processor"
notify_path: "/orders/notify"
routing_key:
- "order.state.created"
- "house.state.#"
我們使用 yaml.v2 包可以很方便的解析 yaml 配置文件到 struct 之中,比如對于 queue 的定義菜秦,struct 實現(xiàn)如下:
// config.go 28-38
type QueueConfig struct {
QueueName string `yaml:"queue_name"`
RoutingKey []string `yaml:"routing_key"`
NotifyPath string `yaml:"notify_path"`
NotifyTimeout int `yaml:"notify_timeout"`
RetryTimes int `yaml:"retry_times"`
RetryDuration int `yaml:"retry_duration"`
BindingExchange string `yaml:"binding_exchange"`
project *ProjectConfig
}
上面之所以需要一個 ProjectConfig 的指針甜害,主要是為了方便讀取 project的配置,因此加載的時候需要把隊列指向 project球昨。
// config.go
func loadQueuesConfig(configFileName string, allQueues []*QueueConfig) []*QueueConfig {
// ......
projects := projectsConfig.Projects
for i, project := range projects {
log.Printf("find project: %s", project.Name)
// 這里不能寫作 queue := project.Queues
queues := projects[i].Queues
for j, queue := range queues {
log.Printf("find queue: %v", queue)
// 這里不能寫作 queues[j].project = &queue
queues[j].project = &projects[i]
allQueues = append(allQueues, &queues[j])
}
}
// .......
}
上面代碼中有個地方容易出錯尔店,就是在 for 循環(huán)內(nèi)部設(shè)置指針的時候不能直接使用 queue 變量,因為此時獲取的 queue 變量是一份復(fù)制出來的數(shù)據(jù)主慰,并不是原始數(shù)據(jù)嚣州。
另外,config.go 中大部分邏輯是按照面向?qū)ο蟮乃伎挤绞絹頃鴮懙墓猜荩热纾?/p>
// config.go
func (qc QueueConfig) ErrorQueueName() string {
return fmt.Sprintf("%s-error", qc.QueueName)
}
func (qc QueueConfig) WorkerExchangeName() string {
if qc.BindingExchange == "" {
return qc.project.QueuesDefaultConfig.BindingExchange
}
return qc.BindingExchange
}
通過這種方式该肴,可以寫出更清晰可維護的代碼。
消息對象封裝
整個程序中藐不,在 channel 中傳遞的數(shù)據(jù)都是 Message 對象匀哄,通過這種對象封裝,可以非常方便的在各種類型的 Goroutine 之間傳遞數(shù)據(jù)雏蛮。
Message 類的定義如下:
type Message struct {
queueConfig QueueConfig // 消息來自于哪個隊列
amqpDelivery *amqp.Delivery // RabbitMQ 的消息封裝
notifyResponse NotifyResponse // 消息回調(diào)結(jié)果
}
我們把 RabbitMQ 中的原生消息以及隊列信息涎嚼、回調(diào)結(jié)果封裝在一起,通過這種方式把 Message 對象在管道之間傳遞挑秉。同時 Message 封裝了眾多的方法來供其他協(xié)程方便的調(diào)用法梯。
// Message 相關(guān)方法
func (m Message) CurrentMessageRetries() int {}
func (m *Message) Notify(client *http.Client) *Message {}
func (m Message) IsMaxRetry() bool {}
func (m Message) IsNotifySuccess() bool {}
func (m Message) Ack() error {}
func (m Message) Reject() error {}
func (m Message) Republish(out chan<- Message) error {}
func (m Message) CloneAndPublish(channel *amqp.Channel) error {}
注意上面方法的接收對象,帶指針的接收對象表示會修改對象的值犀概。
主流程
主流程就是我們上面說的立哑,通過 pipeline 的模式、把消息的整條流程串聯(lián)起來姻灶。最核心的代碼在這里:
// main.go
<-resendMessage(ackMessage(workMessage(receiveMessage(allQueues, done))))
上面每個函數(shù)都接收相同的管道定義铛绰,因此可以串聯(lián)使用。其實每個函數(shù)的實現(xiàn)區(qū)別不大木蹬,不同的協(xié)程模型可能需要不同的寫法至耻。
下面是 receiveMessage 的寫法若皱,并進行了詳細的注釋镊叁。revceiveMessage 對每個消息隊列都生成了 N 個協(xié)程,然后把讀取的消息全部寫入管道走触。
// main.go
func receiveMessage(queues []*QueueConfig, done <-chan struct{}) <-chan Message {
// 創(chuàng)建一個管道晦譬,這個管道會作為函數(shù)的返回值
out := make(chan Message, ChannelBufferLength)
// WaitGroup 用于同步,這里來控制協(xié)程是否結(jié)束
var wg sync.WaitGroup
// 入?yún)⑹顷犃信渲没ス悖@個見下文傳入的值
receiver := func(qc QueueConfig) {
defer wg.Done()
// RECONNECT 標(biāo)記用于跳出循環(huán)來重新連接 RabbitMQ
RECONNECT:
for {
_, channel, err := setupChannel()
if err != nil {
PanicOnError(err)
}
// 消費消息
msgs, err := channel.Consume(
qc.WorkerQueueName(), // queue
"", // consumer
false, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
PanicOnError(err)
for {
select {
case msg, ok := <-msgs:
if !ok {
log.Printf("receiver: channel is closed, maybe lost connection")
time.Sleep(5 * time.Second)
continue RECONNECT
}
// 這里生成消息的 UUID敛腌,用來跟蹤整個消息流卧土,稍后會解釋
msg.MessageId = fmt.Sprintf("%s", uuid.NewV4())
message := Message{qc, &msg, 0}
// 這里把消息寫到出管道
out <- message
message.Printf("receiver: received msg")
case <-done:
// 當(dāng)主協(xié)程收到 done 信號的時候,自己也退出
log.Printf("receiver: received a done signal")
return
}
}
}
}
for _, queue := range queues {
wg.Add(ReceiverNum)
for i := 0; i < ReceiverNum; i++ {
// 每個隊列生成 N 個協(xié)程共同消費
go receiver(*queue)
}
}
// 控制協(xié)程像樊,當(dāng)所有的消費協(xié)程退出時尤莺,出口管道也需要關(guān)閉,通知下游協(xié)程
go func() {
wg.Wait()
log.Printf("all receiver is done, closing channel")
close(out)
}()
return out
}
里面有幾個關(guān)鍵點需要注意生棍。
- 每個函數(shù)都是類似的結(jié)構(gòu)颤霎,一組工作協(xié)程和協(xié)作協(xié)程,當(dāng)全部工作協(xié)程退出時涂滴,關(guān)閉出口管道友酱,通知下游協(xié)程。注意 golang 中柔纵,對于管道的使用缔杉,需要從寫入端關(guān)閉,否則很容易出現(xiàn)崩潰搁料。
- 我們在每個消息中或详,記錄了一個唯一的 uuid,這個 uuid 用來打日志郭计,來跟蹤一整條信息流鸭叙。
- 因為可能出現(xiàn)的網(wǎng)絡(luò)狀況,我們要進行判斷拣宏,如果出現(xiàn)了連接失敗的情況沈贝,直接 sleep 一段時間,然后重連勋乾。
- done 這個管道是在主協(xié)程進行控制的宋下,主要用作優(yōu)雅關(guān)閉。優(yōu)雅關(guān)閉的作用是在升級配置辑莫、升級主程序的時候可以保證不丟消息(等待消息真的完成之后才會結(jié)束協(xié)程学歧,整個程序才會退出)。
總結(jié)
得益于 Golang 的高效的表達能力各吨,通過大約 500 行代碼實現(xiàn)了一個穩(wěn)定的消息回調(diào)中間件枝笨,同時具備下面的特性:
- 高性能。在 macbook pro 15 上簡單測試揭蜒,每個隊列的處理能力可以輕松達到 3000 message/second 以上横浑,多個隊列也可以做到線性的增加性能,整體應(yīng)用達到幾萬每秒很輕松屉更。同時徙融,得益于 golang 的協(xié)程設(shè)計,如果下游出現(xiàn)了慢調(diào)用瑰谜,那么也不會影響并發(fā)欺冀。
- 優(yōu)雅關(guān)閉树绩。通過對信號的監(jiān)聽,整個程序可以在不丟消息的情況下優(yōu)雅關(guān)閉隐轩,利于配置更改和程序重啟饺饭。這個在生產(chǎn)環(huán)境非常重要。
- 自動重連职车。當(dāng) RabbitMQ 服務(wù)無法連接的時候砰奕,應(yīng)用可以自動重連。
當(dāng)然提鸟,我們團隊目前還都是 golang 新手军援,也沒有做太多的單元測試和性能測試,下一步可能會繼續(xù)優(yōu)化称勋,完善測試工作胸哥,并且優(yōu)化配置的管理,歡迎各位去 github 圍觀源碼赡鲜。