如何讓消息隊列達到最大吞吐量?

你在使用消息隊列的時候關注過吞吐量嗎亲配?

思考過吞吐量的影響因素嗎尘应?

考慮過怎么提高嗎?

總結過最佳實踐嗎吼虎?

本文帶你一起探討下消息隊列消費端高吞吐的 Go 框架實現(xiàn)犬钢。Let’s go!

關于吞吐量的一些思考

  • 寫入消息隊列吞吐量取決于以下兩個方面

    • 網(wǎng)絡帶寬
    • 消息隊列(比如Kafka)寫入速度

    最佳吞吐量是讓其中之一打滿,而一般情況下內(nèi)網(wǎng)帶寬都會非常高思灰,不太可能被打滿玷犹,所以自然就是講消息隊列的寫入速度打滿,這就就有兩個點需要平衡

    • 批量寫入的消息量大小或者字節(jié)數(shù)多少
    • 延遲多久寫入

    go-zero 的 PeriodicalExecutorChunkExecutor 就是為了這種情況設計的

  • 從消息隊列里消費消息的吞吐量取決于以下兩個方面

    • 消息隊列的讀取速度洒疚,一般情況下消息隊列本身的讀取速度相比于處理消息的速度都是足夠快的
    • 處理速度歹颓,這個依賴于業(yè)務

    這里有個核心問題是不能不考慮業(yè)務處理速度,而讀取過多的消息到內(nèi)存里油湖,否則可能會引起兩個問題:

    • 內(nèi)存占用過高巍扛,甚至出現(xiàn)OOM,pod 也是有 memory limit
    • 停止 pod 時堆積的消息來不及處理而導致消息丟失

解決方案和實現(xiàn)

借用一下 Rob Pike 的一張圖乏德,這個跟隊列消費異曲同工撤奸。左邊4個 gopher 從隊列里取,右邊4個 gopher 接過去處理喊括。比較理想的結果是左邊和右邊速率基本一致胧瓜,沒有誰浪費,沒有誰等待郑什,中間交換處也沒有堆積府喳。

我們來看看 go-zero 是怎么實現(xiàn)的:

  • Producer
    for {
        select {
        case <-q.quit:
            logx.Info("Quitting producer")
            return
        default:
            if v, ok := q.produceOne(producer); ok {
                q.channel <- v
            }
        }
    }

沒有退出事件就會通過 produceOne 去讀取一個消息,成功后寫入 channel蘑拯。利用 chan 就可以很好的解決讀取和消費的銜接問題劫拢。

  • Consumer
    for {
        select {
        case message, ok := <-q.channel:
            if ok {
                q.consumeOne(consumer, message)
            } else {
                logx.Info("Task channel was closed, quitting consumer...")
                return
            }
        case event := <-eventChan:
            consumer.OnEvent(event)
        }
    }

這里如果拿到消息就去處理肉津,當 okfalse 的時候表示 channel 已被關閉,可以退出整個處理循環(huán)了舱沧。同時我們還在 redis queue 上支持了 pause/resume,我們原來在社交場景里大量使用這樣的隊列偶洋,可以通知 consumer 暫停和繼續(xù)熟吏。

  • 啟動 queue,有了這些我們就可以通過控制 producer/consumer 的數(shù)量來達到吞吐量的調(diào)優(yōu)了
func (q *Queue) Start() {
    q.startProducers(q.producerCount)
    q.startConsumers(q.consumerCount)

    q.producerRoutineGroup.Wait()
    close(q.channel)
    q.consumerRoutineGroup.Wait()
}

這里需要注意的是玄窝,先要停掉 producer牵寺,再去等 consumer 處理完。

到這里核心控制代碼基本就講完了恩脂,其實看起來還是挺簡單的帽氓,也可以到 https://github.com/tal-tech/go-zero/tree/master/core/queue 去看完整實現(xiàn)。

如何使用

基本的使用流程:

  1. 創(chuàng)建 producerconsumer
  2. 啟動 queue
  3. 生產(chǎn)消息 / 消費消息

對應到 queue 中俩块,大致如下:

創(chuàng)建 queue

// 生產(chǎn)者創(chuàng)建工廠
producer := newMockedProducer()
// 消費者創(chuàng)建工廠
consumer := newMockedConsumer()
// 將生產(chǎn)者以及消費者的創(chuàng)建工廠函數(shù)傳遞給 NewQueue()
q := queue.NewQueue(func() (Producer, error) {
  return producer, nil
}, func() (Consumer, error) {
  return consumer, nil
})

我們看看 NewQueue 需要什么參數(shù):

  1. producer 工廠方法
  2. consumer 工廠方法

producer & consumer 的工廠函數(shù)傳遞 queue 黎休,由它去負責創(chuàng)建∮窨框架提供了 ProducerConsumer 的接口以及工廠方法定義势腮,然后整個流程的控制 queue 實現(xiàn)會自動完成。

生產(chǎn) message

我們通過自定義一個 mockedProducer 來模擬:

type mockedProducer struct {
    total int32
    count int32
  // 使用waitgroup來模擬任務的完成
    wait  sync.WaitGroup
}
// 實現(xiàn) Producer interface 的方法:Produce()
func (p *mockedProducer) Produce() (string, bool) {
    if atomic.AddInt32(&p.count, 1) <= p.total {
        p.wait.Done()
        return "item", true
    }
    time.Sleep(time.Second)
    return "", false
}

queue 中的生產(chǎn)者編寫都必須實現(xiàn):

  • Produce():由開發(fā)者編寫生產(chǎn)消息的邏輯
  • AddListener():添加事件 listener

消費 message

我們通過自定義一個 mockedConsumer 來模擬:

type mockedConsumer struct {
    count  int32
}

func (c *mockedConsumer) Consume(string) error {
    atomic.AddInt32(&c.count, 1)
    return nil
}

啟動 queue

啟動漫仆,然后驗證我們上述的生產(chǎn)者和消費者之間的數(shù)據(jù)是否傳輸成功:

func main() {
    // 創(chuàng)建 queue
    q := NewQueue(func() (Producer, error) {
        return newMockedProducer(), nil
    }, func() (Consumer, error) {
        return newMockedConsumer(), nil
    })
  // 啟動panic了也可以確保stop被執(zhí)行以清理資源
  defer q.Stop()
    // 啟動
    q.Start()
}

以上就是 queue 最簡易的實現(xiàn)示例捎拯。我們通過這個 core/queue 框架實現(xiàn)了基于 rediskafka 等的消息隊列服務,在不同業(yè)務場景中經(jīng)過了充分的實踐檢驗盲厌。你也可以根據(jù)自己的業(yè)務實際情況署照,實現(xiàn)自己的消息隊列服務。

整體設計

整體流程如上圖:

  1. 全體的通信都由 channel 進行
  2. ProducerConsumer 的數(shù)量可以設定以匹配不同業(yè)務需求
  3. ProduceConsume 具體實現(xiàn)由開發(fā)者定義,queue 負責整體流程

總結

本篇文章講解了如何通過 channel 來平衡從隊列中讀取和處理消息的速度,以及如何實現(xiàn)一個通用的消息隊列處理框架系冗,并通過 mock 示例簡單展示了如何基于 core/queue 實現(xiàn)一個消息隊列處理服務牙言。你可以通過類似的方式實現(xiàn)一個基于 rocketmq 等的消息隊列處理服務。

關于 go-zero 更多的設計和實現(xiàn)文章壳咕,可以關注『微服務實踐』公眾號。

項目地址

https://github.com/tal-tech/go-zero

歡迎使用 go-zero 并 star 支持我們!

微信交流群

關注『微服務實踐』公眾號并點擊 進群 獲取社區(qū)群二維碼屡限。

go-zero 系列文章見『微服務實踐』公眾號

?著作權歸作者所有,轉載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市炕倘,隨后出現(xiàn)的幾起案子钧大,更是在濱河造成了極大的恐慌,老刑警劉巖罩旋,帶你破解...
    沈念sama閱讀 217,277評論 6 503
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件啊央,死亡現(xiàn)場離奇詭異眶诈,居然都是意外死亡,警方通過查閱死者的電腦和手機瓜饥,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,689評論 3 393
  • 文/潘曉璐 我一進店門逝撬,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人乓土,你說我怎么就攤上這事宪潮。” “怎么了趣苏?”我有些...
    開封第一講書人閱讀 163,624評論 0 353
  • 文/不壞的土叔 我叫張陵狡相,是天一觀的道長。 經(jīng)常有香客問我食磕,道長尽棕,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,356評論 1 293
  • 正文 為了忘掉前任彬伦,我火速辦了婚禮滔悉,結果婚禮上,老公的妹妹穿的比我還像新娘媚朦。我一直安慰自己氧敢,他們只是感情好,可當我...
    茶點故事閱讀 67,402評論 6 392
  • 文/花漫 我一把揭開白布询张。 她就那樣靜靜地躺著孙乖,像睡著了一般。 火紅的嫁衣襯著肌膚如雪份氧。 梳的紋絲不亂的頭發(fā)上唯袄,一...
    開封第一講書人閱讀 51,292評論 1 301
  • 那天,我揣著相機與錄音蜗帜,去河邊找鬼恋拷。 笑死,一個胖子當著我的面吹牛厅缺,可吹牛的內(nèi)容都是我干的蔬顾。 我是一名探鬼主播,決...
    沈念sama閱讀 40,135評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼湘捎,長吁一口氣:“原來是場噩夢啊……” “哼诀豁!你這毒婦竟也來了?” 一聲冷哼從身側響起窥妇,我...
    開封第一講書人閱讀 38,992評論 0 275
  • 序言:老撾萬榮一對情侶失蹤舷胜,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后活翩,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體烹骨,經(jīng)...
    沈念sama閱讀 45,429評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡翻伺,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,636評論 3 334
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了沮焕。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片吨岭。...
    茶點故事閱讀 39,785評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖峦树,靈堂內(nèi)的尸體忽然破棺而出未妹,到底是詐尸還是另有隱情,我是刑警寧澤空入,帶...
    沈念sama閱讀 35,492評論 5 345
  • 正文 年R本政府宣布,位于F島的核電站族檬,受9級特大地震影響歪赢,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜单料,卻給世界環(huán)境...
    茶點故事閱讀 41,092評論 3 328
  • 文/蒙蒙 一埋凯、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧扫尖,春花似錦白对、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,723評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至沉颂,卻和暖如春条摸,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背铸屉。 一陣腳步聲響...
    開封第一講書人閱讀 32,858評論 1 269
  • 我被黑心中介騙來泰國打工钉蒲, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人彻坛。 一個月前我還...
    沈念sama閱讀 47,891評論 2 370
  • 正文 我出身青樓顷啼,卻偏偏與公主長得像,于是被迫代替她去往敵國和親昌屉。 傳聞我的和親對象是個殘疾皇子钙蒙,可洞房花燭夜當晚...
    茶點故事閱讀 44,713評論 2 354

推薦閱讀更多精彩內(nèi)容