你在使用消息隊列的時候關注過吞吐量嗎亲配?
思考過吞吐量的影響因素嗎尘应?
考慮過怎么提高嗎?
總結過最佳實踐嗎吼虎?
本文帶你一起探討下消息隊列消費端高吞吐的 Go
框架實現(xiàn)犬钢。Let’s go!
關于吞吐量的一些思考
-
寫入消息隊列吞吐量取決于以下兩個方面
- 網(wǎng)絡帶寬
- 消息隊列(比如Kafka)寫入速度
最佳吞吐量是讓其中之一打滿,而一般情況下內(nèi)網(wǎng)帶寬都會非常高思灰,不太可能被打滿玷犹,所以自然就是講消息隊列的寫入速度打滿,這就就有兩個點需要平衡
- 批量寫入的消息量大小或者字節(jié)數(shù)多少
- 延遲多久寫入
go-zero 的
PeriodicalExecutor
和ChunkExecutor
就是為了這種情況設計的 -
從消息隊列里消費消息的吞吐量取決于以下兩個方面
- 消息隊列的讀取速度洒疚,一般情況下消息隊列本身的讀取速度相比于處理消息的速度都是足夠快的
- 處理速度歹颓,這個依賴于業(yè)務
這里有個核心問題是不能不考慮業(yè)務處理速度,而讀取過多的消息到內(nèi)存里油湖,否則可能會引起兩個問題:
- 內(nèi)存占用過高巍扛,甚至出現(xiàn)OOM,
pod
也是有memory limit
的 - 停止
pod
時堆積的消息來不及處理而導致消息丟失
解決方案和實現(xiàn)
借用一下 Rob Pike
的一張圖乏德,這個跟隊列消費異曲同工撤奸。左邊4個 gopher
從隊列里取,右邊4個 gopher
接過去處理喊括。比較理想的結果是左邊和右邊速率基本一致胧瓜,沒有誰浪費,沒有誰等待郑什,中間交換處也沒有堆積府喳。
我們來看看 go-zero
是怎么實現(xiàn)的:
-
Producer
端
for {
select {
case <-q.quit:
logx.Info("Quitting producer")
return
default:
if v, ok := q.produceOne(producer); ok {
q.channel <- v
}
}
}
沒有退出事件就會通過 produceOne
去讀取一個消息,成功后寫入 channel
蘑拯。利用 chan
就可以很好的解決讀取和消費的銜接問題劫拢。
-
Consumer
端
for {
select {
case message, ok := <-q.channel:
if ok {
q.consumeOne(consumer, message)
} else {
logx.Info("Task channel was closed, quitting consumer...")
return
}
case event := <-eventChan:
consumer.OnEvent(event)
}
}
這里如果拿到消息就去處理肉津,當 ok
為 false
的時候表示 channel
已被關閉,可以退出整個處理循環(huán)了舱沧。同時我們還在 redis queue
上支持了 pause/resume
,我們原來在社交場景里大量使用這樣的隊列偶洋,可以通知 consumer
暫停和繼續(xù)熟吏。
- 啟動
queue
,有了這些我們就可以通過控制producer/consumer
的數(shù)量來達到吞吐量的調(diào)優(yōu)了
func (q *Queue) Start() {
q.startProducers(q.producerCount)
q.startConsumers(q.consumerCount)
q.producerRoutineGroup.Wait()
close(q.channel)
q.consumerRoutineGroup.Wait()
}
這里需要注意的是玄窝,先要停掉 producer
牵寺,再去等 consumer
處理完。
到這里核心控制代碼基本就講完了恩脂,其實看起來還是挺簡單的帽氓,也可以到 https://github.com/tal-tech/go-zero/tree/master/core/queue 去看完整實現(xiàn)。
如何使用
基本的使用流程:
- 創(chuàng)建
producer
或consumer
- 啟動
queue
- 生產(chǎn)消息 / 消費消息
對應到 queue
中俩块,大致如下:
創(chuàng)建 queue
// 生產(chǎn)者創(chuàng)建工廠
producer := newMockedProducer()
// 消費者創(chuàng)建工廠
consumer := newMockedConsumer()
// 將生產(chǎn)者以及消費者的創(chuàng)建工廠函數(shù)傳遞給 NewQueue()
q := queue.NewQueue(func() (Producer, error) {
return producer, nil
}, func() (Consumer, error) {
return consumer, nil
})
我們看看 NewQueue
需要什么參數(shù):
-
producer
工廠方法 -
consumer
工廠方法
將 producer & consumer
的工廠函數(shù)傳遞 queue
黎休,由它去負責創(chuàng)建∮窨框架提供了 Producer
和 Consumer
的接口以及工廠方法定義势腮,然后整個流程的控制 queue
實現(xiàn)會自動完成。
生產(chǎn) message
我們通過自定義一個 mockedProducer
來模擬:
type mockedProducer struct {
total int32
count int32
// 使用waitgroup來模擬任務的完成
wait sync.WaitGroup
}
// 實現(xiàn) Producer interface 的方法:Produce()
func (p *mockedProducer) Produce() (string, bool) {
if atomic.AddInt32(&p.count, 1) <= p.total {
p.wait.Done()
return "item", true
}
time.Sleep(time.Second)
return "", false
}
queue
中的生產(chǎn)者編寫都必須實現(xiàn):
-
Produce()
:由開發(fā)者編寫生產(chǎn)消息的邏輯 -
AddListener()
:添加事件listener
消費 message
我們通過自定義一個 mockedConsumer
來模擬:
type mockedConsumer struct {
count int32
}
func (c *mockedConsumer) Consume(string) error {
atomic.AddInt32(&c.count, 1)
return nil
}
啟動 queue
啟動漫仆,然后驗證我們上述的生產(chǎn)者和消費者之間的數(shù)據(jù)是否傳輸成功:
func main() {
// 創(chuàng)建 queue
q := NewQueue(func() (Producer, error) {
return newMockedProducer(), nil
}, func() (Consumer, error) {
return newMockedConsumer(), nil
})
// 啟動panic了也可以確保stop被執(zhí)行以清理資源
defer q.Stop()
// 啟動
q.Start()
}
以上就是 queue
最簡易的實現(xiàn)示例捎拯。我們通過這個 core/queue
框架實現(xiàn)了基于 redis
和 kafka
等的消息隊列服務,在不同業(yè)務場景中經(jīng)過了充分的實踐檢驗盲厌。你也可以根據(jù)自己的業(yè)務實際情況署照,實現(xiàn)自己的消息隊列服務。
整體設計
整體流程如上圖:
- 全體的通信都由
channel
進行 -
Producer
和Consumer
的數(shù)量可以設定以匹配不同業(yè)務需求 -
Produce
和Consume
具體實現(xiàn)由開發(fā)者定義,queue
負責整體流程
總結
本篇文章講解了如何通過 channel
來平衡從隊列中讀取和處理消息的速度,以及如何實現(xiàn)一個通用的消息隊列處理框架系冗,并通過 mock
示例簡單展示了如何基于 core/queue
實現(xiàn)一個消息隊列處理服務牙言。你可以通過類似的方式實現(xiàn)一個基于 rocketmq
等的消息隊列處理服務。
關于 go-zero
更多的設計和實現(xiàn)文章壳咕,可以關注『微服務實踐』公眾號。
項目地址
https://github.com/tal-tech/go-zero
歡迎使用 go-zero 并 star 支持我們!
微信交流群
關注『微服務實踐』公眾號并點擊 進群 獲取社區(qū)群二維碼屡限。
go-zero 系列文章見『微服務實踐』公眾號