Go實(shí)現(xiàn)并發(fā)扇入救斑,批量扇出功能

背景

有上萬臺邊緣機(jī)器童本,每臺都會有多個(gè)agent客戶端,并且每個(gè)agent都會同時(shí)向中心系統(tǒng)上報(bào)數(shù)據(jù)脸候,由于上報(bào)數(shù)據(jù)頻繁穷娱,并發(fā)量也大绑蔫,每個(gè)agent都頻繁和中心建立連接,導(dǎo)致中心壓力非常大泵额,所以需要對此進(jìn)行優(yōu)化配深,對每臺機(jī)器上的agent上報(bào)數(shù)據(jù)做聚合,批量進(jìn)行上報(bào)嫁盲,減少邊緣和中心的上報(bào)頻率篓叶,從而減輕中心壓力。

方案

  • 邊緣增加一個(gè)batch中間件羞秤,所有agent上報(bào)的數(shù)據(jù)由原來直接向中心上報(bào)變?yōu)橄騜atch中間件上報(bào)
  • batch中間件會聚合請求缸托,再分批向中心上報(bào)
  • 上報(bào)的方式也由原來的同步也改為異步,agent只把數(shù)據(jù)提交到batch就結(jié)束

核心就是實(shí)現(xiàn)一個(gè)數(shù)據(jù)并發(fā)扇入瘾蛋,分批扇出的功能:

image.png

為了能優(yōu)化分批的效果俐镐,batch中間件會有一個(gè)緩存隊(duì)列,agent提交的數(shù)據(jù)會先放入到隊(duì)列中哺哼,然后消費(fèi)端從隊(duì)列中取出數(shù)據(jù)佩抹,分批上報(bào)到中心系統(tǒng),由于agent和batch中間件屬于本地通訊取董,提交數(shù)據(jù)又是先放到隊(duì)列匹摇,所以隊(duì)列中的數(shù)據(jù)就容易堆積,達(dá)到分批的前提條件甲葬,有堆積數(shù)據(jù)廊勃,我就可以開始分批處理,完整邏輯如下:

  1. 隊(duì)列中沒數(shù)據(jù)時(shí)等待
  2. 有數(shù)據(jù)時(shí)经窖,不斷從隊(duì)列中取出數(shù)據(jù)坡垫,當(dāng)數(shù)量達(dá)到分批數(shù)量時(shí),調(diào)用中心接口上報(bào)画侣。
  3. 數(shù)量沒達(dá)到分批數(shù)量冰悠,但是隊(duì)列中又沒有數(shù)據(jù)時(shí),也是上報(bào)配乱。
  4. 上報(bào)結(jié)束后溉卓,又跳到2步驟,當(dāng)都沒數(shù)據(jù)時(shí)搬泥,就會跳到1步驟繼續(xù)監(jiān)聽

注:因?yàn)榧词箶?shù)據(jù)沒達(dá)到批次數(shù)量桑寨,當(dāng)隊(duì)列沒數(shù)據(jù)時(shí)還是會上報(bào),所以當(dāng)上報(bào)數(shù)據(jù)少忿檩,且都是間隔上報(bào)尉尾,也沒有并發(fā),這時(shí)就容易出現(xiàn)一個(gè)批次就只有一個(gè)的情況燥透,分批的就沒什么效果沙咏。

接下來我們開始實(shí)現(xiàn)核心分批功能:

/**
 * @Title 分批處理器
 * @Description 將in中的數(shù)據(jù)取出分批放入out辨图。適合in有大量數(shù)據(jù)并發(fā)或快速寫入,需要分批處理的場景
 * @Author hyman
 * @Date 2022-03-05
 **/
package batch


func New(in <-chan []byte, size int) <-chan [][]byte {
    if size < 2 {
        panic("nonsense! batch size less then 2")
    }
    out := make(chan [][]byte)
    go func() {
        defer close(out)
        loopBatch(in, out, size)
    }()
    return out
}

func loopBatch(in <-chan []byte, out chan<- [][]byte, size int) {
    var batch [][]byte
    for d := range in {
        batch = append(batch, d)
        more := true // 默認(rèn)認(rèn)為還有數(shù)據(jù)
        for more {
            select {
            default:
                more = false // in 沒有數(shù)據(jù)
            case d, ok := <-in:
                if ok {
                    batch = append(batch, d)
                } else {
                    more = false // in關(guān)閉肢藐,且隊(duì)列里沒有數(shù)據(jù)
                }
            }
            l := len(batch)
            // more = false沒數(shù)據(jù)故河,同時(shí)batch中也沒數(shù)據(jù),則break到外層for等待數(shù)據(jù)
            if !more && l == 0 {
                break
            }
            // 如果more = true隊(duì)列可能還有數(shù)據(jù)吆豹,當(dāng)batch里的數(shù)量小于批次數(shù)量忧勿,則繼續(xù)內(nèi)層for嘗試從in取數(shù)據(jù)
            if more && l < size {
                continue
            }
            // 當(dāng)in沒數(shù)據(jù)或數(shù)量達(dá)到分批上限時(shí),發(fā)送給out
            out <- batch
            batch = [][]byte{} // 重置
        }
    }
}

核心代碼不到四十行瞻讽,使用一個(gè):in <-chan []byte帶緩沖的chan來接收數(shù)據(jù)鸳吸,使用一個(gè)out的chan作為批量輸出,只需在外層再簡單包裹一層業(yè)務(wù)邏輯速勇,就可以滿足分批需求晌砾。

核心的組裝批次的邏輯放在了一個(gè)協(xié)程中,巧妙的使用兩個(gè)for來讀取in烦磁,實(shí)現(xiàn)沒數(shù)據(jù)時(shí)阻塞和有數(shù)據(jù)時(shí)分批上報(bào):

  • 第一個(gè)for用來監(jiān)聽in是否有數(shù)據(jù)养匈,當(dāng)隊(duì)列無數(shù)據(jù)時(shí)會pending,in被關(guān)閉時(shí)就自動結(jié)束for都伪,協(xié)程結(jié)束呕乎。
  • 第二個(gè)for巧妙用到了select的default功能,可以在不阻塞的情況下判斷隊(duì)列是否還有數(shù)據(jù)陨晶,實(shí)現(xiàn)組裝分批數(shù)據(jù)猬仁。

測試

func TestBatchHttp(t *testing.T) {
    var in = make(chan []byte, 10000)
    // 模擬結(jié)束請求
    srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
        if b, err := ioutil.ReadAll(r.Body); err == nil {
            in <- b
        }
        w.Write([]byte("success"))
    }))
    var wg sync.WaitGroup


    n :=time.Now()
    // 模擬并發(fā)上報(bào)100次
    for i := 0; i < 100; i++ {
        wg.Add(1)
        _i:=i
        go func() {
            body, _ := json.Marshal(map[string]string{"say": "hello " + strconv.Itoa(_i)})
            resp, err := http.Post(srv.URL, "application/json", bytes.NewReader(body))
            if err != nil {
                t.Fatal(err)
            }
            defer resp.Body.Close()
            if resp.StatusCode != http.StatusOK {
                t.Fatal(resp.Status)
            }
        }()
    }
    t.Log("post cost time = ", time.Now().Sub(n))
    // 5個(gè)一批
    bats := batch.New(in, 5)
    // 分批上報(bào)的協(xié)程數(shù)
    batchWorker := 1
    for i := 0; i < batchWorker; i++ {
        go func() {
            for dd := range bats {
                // 分批調(diào)用中心接口上報(bào)數(shù)據(jù)
                for _, d := range dd {
                    fmt.Print(string(d))
                    wg.Done()
                }
                fmt.Println()
                time.Sleep(10* time.Millisecond) // 模擬消費(fèi)耗時(shí)

            }
        }()
    }
    wg.Wait()
}

打印結(jié)果:

 batch_test.go:61: post cost time =  240.333μs
{"say":"hello 92"}
{"say":"hello 9"}
{"say":"hello 10"}{"say":"hello 79"}{"say":"hello 98"}{"say":"hello 0"}{"say":"hello 80"}
{"say":"hello 99"}{"say":"hello 93"}{"say":"hello 4"}{"say":"hello 8"}{"say":"hello 5"}
{"say":"hello 11"}{"say":"hello 47"}{"say":"hello 12"}{"say":"hello 7"}{"say":"hello 81"}
{"say":"hello 6"}{"say":"hello 48"}{"say":"hello 13"}{"say":"hello 2"}{"say":"hello 49"}
{"say":"hello 14"}{"say":"hello 75"}{"say":"hello 77"}{"say":"hello 51"}{"say":"hello 50"}
{"say":"hello 74"}{"say":"hello 73"}{"say":"hello 27"}{"say":"hello 52"}{"say":"hello 76"}
{"say":"hello 72"}{"say":"hello 28"}{"say":"hello 41"}{"say":"hello 53"}{"say":"hello 94"}
{"say":"hello 54"}{"say":"hello 15"}{"say":"hello 95"}{"say":"hello 16"}{"say":"hello 56"}
{"say":"hello 61"}{"say":"hello 58"}{"say":"hello 43"}{"say":"hello 85"}{"say":"hello 84"}
{"say":"hello 89"}{"say":"hello 78"}{"say":"hello 96"}{"say":"hello 1"}{"say":"hello 82"}
{"say":"hello 26"}{"say":"hello 57"}{"say":"hello 3"}{"say":"hello 46"}{"say":"hello 83"}
{"say":"hello 40"}{"say":"hello 55"}{"say":"hello 29"}{"say":"hello 65"}{"say":"hello 33"}
{"say":"hello 60"}{"say":"hello 42"}{"say":"hello 30"}{"say":"hello 62"}{"say":"hello 34"}
{"say":"hello 64"}{"say":"hello 66"}{"say":"hello 88"}{"say":"hello 91"}{"say":"hello 35"}
{"say":"hello 38"}{"say":"hello 39"}{"say":"hello 87"}{"say":"hello 18"}{"say":"hello 59"}
{"say":"hello 68"}{"say":"hello 19"}{"say":"hello 63"}{"say":"hello 20"}{"say":"hello 21"}
{"say":"hello 86"}{"say":"hello 17"}{"say":"hello 24"}{"say":"hello 70"}{"say":"hello 69"}
{"say":"hello 90"}{"say":"hello 22"}{"say":"hello 71"}{"say":"hello 25"}{"say":"hello 44"}
{"say":"hello 45"}{"say":"hello 36"}{"say":"hello 23"}{"say":"hello 37"}{"say":"hello 32"}
{"say":"hello 31"}{"say":"hello 67"}{"say":"hello 97"}
--- PASS: TestBatchHttp (0.26s)
PASS

100個(gè)請求數(shù)據(jù)都有正常打印,并且大部分?jǐn)?shù)量都符合預(yù)期先誉,但是前兩行只有1個(gè)湿刽,并沒有到達(dá)分批數(shù)量,這是為什么褐耳?诈闺!

原因是剛開始分批速度大于提交速度,取出1個(gè)數(shù)據(jù)時(shí)隊(duì)列就沒有數(shù)據(jù)铃芦,所以第一批也就只有1個(gè)數(shù)據(jù)雅镊,第二批也是一樣,由于批次消費(fèi)就只有開一個(gè)協(xié)程刃滓,且每次消費(fèi)都time.Sleep(10* time.Millisecond)仁烹,消費(fèi)過慢,于是數(shù)據(jù)逐漸堆積注盈,滿足批次數(shù)量晃危,后面打印就正常叙赚。

PS:要驗(yàn)證這一說法老客,可以在”bats := batch.New(in, 5)“前面增加”time.Sleep(10* time.Millisecond)“讓隊(duì)列里數(shù)據(jù)有足夠數(shù)據(jù)酬荞,這時(shí)打印就會符合預(yù)期淘菩。

這里的batchWorker只有1個(gè),也就是說是向中心是串行上報(bào),這個(gè)可以根據(jù)實(shí)際需求修改太伊,通過開啟多個(gè)協(xié)程來提高上報(bào)速度。

但是巍膘,有時(shí)候反而不滿足需求缨硝,這樣做會并發(fā)上報(bào)數(shù)據(jù),增加中心壓力哲嘲,同時(shí)由于消費(fèi)過快贪薪,反而使隊(duì)列中的數(shù)量就不容易達(dá)到分批數(shù)量,分批效果就減少眠副,只是加快隊(duì)列消費(fèi)而已画切。

總結(jié)

Go的chan功能非常強(qiáng)大,各種并發(fā)處理都能看到他的身影囱怕,也是Go推薦的方式:

Do not communicate by sharing memory; instead, share memory by communicating.

靈活借助chan并發(fā)安全霍弹,既能阻塞又能select default非阻塞的特性,可以優(yōu)雅實(shí)現(xiàn)很多并發(fā)控制的需求

我的博客:Go實(shí)現(xiàn)并發(fā)扇入娃弓,批量扇出功能 | 藝術(shù)碼農(nóng)的小棧:https://itart.cn/blogs/2022/practice/go-chan-aggregate-batch.html

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末典格,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子台丛,更是在濱河造成了極大的恐慌耍缴,老刑警劉巖,帶你破解...
    沈念sama閱讀 212,454評論 6 493
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件挽霉,死亡現(xiàn)場離奇詭異私恬,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)炼吴,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,553評論 3 385
  • 文/潘曉璐 我一進(jìn)店門本鸣,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人硅蹦,你說我怎么就攤上這事荣德。” “怎么了童芹?”我有些...
    開封第一講書人閱讀 157,921評論 0 348
  • 文/不壞的土叔 我叫張陵涮瞻,是天一觀的道長。 經(jīng)常有香客問我假褪,道長署咽,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 56,648評論 1 284
  • 正文 為了忘掉前任,我火速辦了婚禮宁否,結(jié)果婚禮上窒升,老公的妹妹穿的比我還像新娘。我一直安慰自己慕匠,他們只是感情好饱须,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,770評論 6 386
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著台谊,像睡著了一般蓉媳。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上锅铅,一...
    開封第一講書人閱讀 49,950評論 1 291
  • 那天酪呻,我揣著相機(jī)與錄音,去河邊找鬼盐须。 笑死号杠,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的丰歌。 我是一名探鬼主播姨蟋,決...
    沈念sama閱讀 39,090評論 3 410
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼立帖!你這毒婦竟也來了眼溶?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 37,817評論 0 268
  • 序言:老撾萬榮一對情侶失蹤晓勇,失蹤者是張志新(化名)和其女友劉穎堂飞,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體绑咱,經(jīng)...
    沈念sama閱讀 44,275評論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡绰筛,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,592評論 2 327
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了描融。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片铝噩。...
    茶點(diǎn)故事閱讀 38,724評論 1 341
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖窿克,靈堂內(nèi)的尸體忽然破棺而出骏庸,到底是詐尸還是另有隱情,我是刑警寧澤年叮,帶...
    沈念sama閱讀 34,409評論 4 333
  • 正文 年R本政府宣布具被,位于F島的核電站,受9級特大地震影響只损,放射性物質(zhì)發(fā)生泄漏一姿。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 40,052評論 3 316
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望叮叹。 院中可真熱鬧艾栋,春花似錦、人聲如沸衬横。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,815評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽蜂林。三九已至,卻和暖如春拇泣,著一層夾襖步出監(jiān)牢的瞬間噪叙,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,043評論 1 266
  • 我被黑心中介騙來泰國打工霉翔, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留睁蕾,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 46,503評論 2 361
  • 正文 我出身青樓债朵,卻偏偏與公主長得像子眶,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個(gè)殘疾皇子序芦,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,627評論 2 350

推薦閱讀更多精彩內(nèi)容