golang并發(fā)三板斧系列之三:context用于退出

這是本系列文章的第三篇,第一篇在此golang并發(fā)三板斧系列之一:channel用于通信和同步颠区,第二篇在此golang并發(fā)三板斧系列之二:goroutine池用于并發(fā)

前文描述了手工作坊的時代和工業(yè)時代毕莱,現(xiàn)在我們進入信息時代。

萬惡的資本主義

前文描述的工業(yè)時代其實是資本主義朋截,來到世間的每個毛孔都在滴血磨澡。不信你看前文的代碼,gen之類的函數(shù)創(chuàng)建了一堆任務之后就扔給下游的works處理了质和,也不管他們要處理多久稳摄,是不是加班到深夜,是不是996ICU饲宿。

func BenchmarkCPUPool(b *testing.B) {
    channum := 100
    gonum := runtime.NumCPU()
    for i := 0; i < b.N; i++ {
        f := func(w *Work) {
            if v, ok := w.input.(float64); ok {
                cpubound(v)
            }
        }
        list := benchmarkList()
        c := genPoolChanBuffer(list)

        p := InitPool(channum, gonum, f)
        p.RunWorker()
        p.FeedWorker(c)
        p.Wait()
    }
}

人民當家作主

感謝毛主席厦酬,解放之后我們進入了社會主義,我們有了工會這個關愛人民的組織瘫想。人民的工會愛人民仗阅,人民的工會力量大,工會可以在合適的時候給大家放假国夜,讓大家休息减噪,對應到程序中就是按下了神圣的ctrl+c

如下是最粗暴的模型车吹,工會一喊放假筹裕,大家都放下手上的工作開心的玩耍了。但是有的工作做了一半就放棄了窄驹,這確實是很沒有職業(yè)操守的:

var WorkPool chan work

type work struct {
    sth string
}

func (w work) Working() {
    log.Printf("start %s", w.sth)

    time.Sleep(2 * time.Second)

    log.Printf("end %s", w.sth)
}

func RunWorkerSimple() {
    workers := 2
    for i := 0; i < workers; i++ {
        go HandleWorkerSimple()
    }
}

func HandleWorkerSimple() {
    for {
        select {
        case work := <-WorkPool:
            work.Working()
        }
    }

    return
}

func TestWorkerSimple(t *testing.T) {
    WorkPool = make(chan work)
    go RunWorkerSimple()

    go func() {
        list := benchmarkList()
        for _, l := range list {
            WorkPool <- work{fmt.Sprint(l)}
        }
    }()

    select {}
}

很明顯從結果可以看出朝卒,有工作編號為2和3的沒有完成就被扔掉了,其余沒有啟動的工作都放棄了乐埠。

C02S259EFVH3:go_concurrency baixiao$ go test -run TestWorkerSimple
2019/04/08 22:56:11 start 1
2019/04/08 22:56:11 start 0
2019/04/08 22:56:13 end 0
2019/04/08 22:56:13 start 2
2019/04/08 22:56:13 end 1
2019/04/08 22:56:13 start 3
^Csignal: interrupt
FAIL    _/Users/baixiao/Go/src/github.com/baixiaoustc/go_concurrency    2.790s

要對得起這份工作

人民的工人為人民抗斤,所以即便工會保障了工人的權益囚企,該做好的工作還是要認真做完啊。對應到程序中瑞眼,收到ctrl+c中斷后龙宏,每個worker應該完成手上正在做的工作,并且由BOSS(主goroutine)把剩余隊列中的工作保存起來(比如寫數(shù)據(jù)庫或者文件)伤疙,留待明天上班繼續(xù)做烦衣。

代碼寫起來就復雜多了,要監(jiān)控系統(tǒng)的中斷信號掩浙,要等待所有worker處理完手上的事情花吟,最后再把剩余的事情保存起來。需要用兩個chan來通信厨姚,stopChan 用于通知workers下班啦衅澈,stoppedChan 用于所有worker處理完之后告知BOSS,再由BOSS保存剩余的工作谬墙。

func (w work) Saving() {
    log.Printf("save %s", w.sth)
}

func RunWorkerHold(stop, stopped chan struct{}) {
    var wg sync.WaitGroup
    workers := 2
    for i := 0; i < workers; i++ {
        wg.Add(1)
        go HandleWorkerHold(stop, &wg)
    }
    wg.Wait()
    stopped <- struct{}{}
}

func HandleWorkerHold(stop chan struct{}, wg *sync.WaitGroup) {
    defer wg.Done()

    for {
        select {
        case work := <-WorkPool:
            work.Working()
        case <-stop:
            log.Println("worker: caller has told us to stop")
            return
        }
    }

    return
}

func TestWorkerHold(t *testing.T) {
    WorkPool = make(chan work)
    stopChan := make(chan struct{})
    stoppedChan := make(chan struct{})
    go RunWorkerHold(stopChan, stoppedChan)

    go func() {
        list := benchmarkList()
        for _, l := range list {
            WorkPool <- work{fmt.Sprint(l)}
        }
    }()

    // listen for C-c
    c := make(chan os.Signal, 1)
    signal.Notify(c, os.Interrupt)
    <-c
    log.Println("main: received C-c - shutting down")

    // tell the goroutine to stop
    log.Println("main: telling workers to stop")
    close(stopChan)
    // and wait for them to reply back
    <-stoppedChan
    log.Println("main: workers has told us they've finished")

    for work := range WorkPool {
        work.Saving()
    }

    return
}

這里沒有把打印貼完今布,最終的結果是所有工作都save好了的:

C02S259EFVH3:go_concurrency baixiao$ go test -run TestWorkerHold
2019/04/08 23:32:02 start 1
2019/04/08 23:32:02 start 0
2019/04/08 23:32:04 end 1
2019/04/08 23:32:04 start 2
2019/04/08 23:32:04 end 0
2019/04/08 23:32:04 start 3
^C2019/04/08 23:32:05 main: received C-c - shutting down
2019/04/08 23:32:05 main: telling workers to stop
2019/04/08 23:32:06 end 2
2019/04/08 23:32:06 worker: caller has told us to stop
2019/04/08 23:32:06 end 3
2019/04/08 23:32:06 start 4
2019/04/08 23:32:08 end 4
2019/04/08 23:32:08 start 5
2019/04/08 23:32:10 end 5
2019/04/08 23:32:10 worker: caller has told us to stop
2019/04/08 23:32:10 main: workers has told us they've finished
2019/04/08 23:32:10 save 6
2019/04/08 23:32:10 save 7
2019/04/08 23:32:10 save 8
2019/04/08 23:32:10 save 9
2019/04/08 23:32:10 save 10
2019/04/08 23:32:10 save 11
2019/04/08 23:32:10 save 12
2019/04/08 23:32:10 save 13
2019/04/08 23:32:10 save 14
2019/04/08 23:32:10 save 15
。部默。造虎。

但是值得注意的是,不是close(stopChan)一執(zhí)行份蝴,馬上所有的worker都能結束工作了。如上其中一個worker在接著完成work4和work5之后才走了退出流程婚夫,是因為對于select來講署鸡,如果多個chan都準備好了的話,是隨機選擇其中一個时捌,所以會有概率一直接著work的撒穷。

更進一步

上面的模式還是有缺陷裆熙,如果worker下面還有徒弟怎么辦(又新開了goroutine)禽笑?最后BOSS在做剩余工作的保存時也不想耽誤太久怎么辦蛤奥?保存工作寫數(shù)據(jù)庫也想受控制(比如database/sql包提供了相應支持)怎么辦?

終于我們的主角登場了蟀伸,golang提供了context模式用于解決goroutine的高效且安全退出問題缅刽,教程在網(wǎng)上很多了,不用細講迟蜜,只貼一下主要函數(shù):

// 帶cancel返回值的Context啡省,一旦cancel被調用,即取消該創(chuàng)建的context

func WithCancel(parent Context) (ctx Context, cancel CancelFunc)

// 帶有效期cancel返回值的Context畦戒,即必須到達指定時間點調用的cancel方法才會被執(zhí)行

func WithDeadline(parent Context, deadline time.Time) (Context, CancelFunc)

// 帶超時時間cancel返回值的Context结序,類似Deadline,前者是時間點配喳,后者為時間間隔
// 相當于WithDeadline(parent, time.Now().Add(timeout)).

func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc)

最后進化到我們的代碼凳干,注意SavingDB()方法只是一個偽代碼示意:

import "database/sql"

//fake, just a example
func (w work) SavingDB(ctx context.Context) {
    log.Printf("save %s", w.sth)

    stmt := "select name from db.table"
    db := sql.DB{}
    conn, _ := db.Conn(ctx)
    rows, err := conn.QueryContext(ctx, stmt)
    if err != nil {
        if err == context.DeadlineExceeded {
            // context canceled
        }
        return
    }

    var name string
    for rows.Next() {
        if err := rows.Scan(&name); err != nil {
            if err == context.DeadlineExceeded {
                log.Println("scan canceled")
            }
        }
    }
}

func RunWorkerContext(ctx context.Context, stopped chan struct{}) {
    var wg sync.WaitGroup
    workers := 2
    for i := 0; i < workers; i++ {
        wg.Add(1)
        go HandleWorkerContext(ctx, &wg)
    }
    wg.Wait()
    stopped <- struct{}{}
}

func HandleWorkerContext(ctx context.Context, wg *sync.WaitGroup) {
    defer wg.Done()

    for {
        select {
        case work := <-WorkPool:
            work.Working()
        case <-ctx.Done():
            log.Println("worker: caller has told us to stop")
            return
        }
    }

    return
}

func TestWorkerContext(t *testing.T) {
    WorkPool = make(chan work)
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()
    stoppedChan := make(chan struct{})
    go RunWorkerContext(ctx, stoppedChan)

    go func() {
        list := benchmarkList()
        for _, l := range list {
            WorkPool <- work{fmt.Sprint(l)}
        }
    }()

    // listen for C-c
    c := make(chan os.Signal, 1)
    signal.Notify(c, os.Interrupt)
    <-c
    log.Println("main: received C-c - shutting down")

    // tell the goroutine to stop
    log.Println("main: telling workers to stop")
    cancel()
    // and wait for them to reply back
    <-stoppedChan
    log.Println("main: workers has told us they've finished")

    ctxTimeout, cancelTimeout := context.WithTimeout(ctx, 100*time.Microsecond)
    defer cancelTimeout()
    for {
        select {
        case work := <-WorkPool:
            work.SavingDB(ctxTimeout)
        case <-ctxTimeout.Done():
            log.Println("main: cann't wait any more")
            return
        }
    }

    return
}

通過cancel()方法通知該context.Context其下的所有goroutine進入退出流程涧团,并可以啟動帶timeout的ctx開始保存工作流程泌绣,所有流程都是受控的预厌。


這像不像是現(xiàn)代企業(yè)的扁平化管理,BOSS直接控制所有員工苗沧,提升所有的工作效率?


所有代碼都在https://github.com/baixiaoustc/go_concurrency/blob/master/third_post_test.go中能找到待逞。

原文載于golang并發(fā)三板斧系列之三:context用于退出

最后編輯于
?著作權歸作者所有,轉載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末识樱,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子当犯,更是在濱河造成了極大的恐慌割疾,老刑警劉巖,帶你破解...
    沈念sama閱讀 206,214評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件驰凛,死亡現(xiàn)場離奇詭異担扑,居然都是意外死亡,警方通過查閱死者的電腦和手機胚宦,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,307評論 2 382
  • 文/潘曉璐 我一進店門枢劝,熙熙樓的掌柜王于貴愁眉苦臉地迎上來卜壕,“玉大人,你說我怎么就攤上這事鹤盒≌旄保” “怎么了?”我有些...
    開封第一講書人閱讀 152,543評論 0 341
  • 文/不壞的土叔 我叫張陵尺碰,是天一觀的道長。 經(jīng)常有香客問我洛心,道長两曼,這世上最難降的妖魔是什么玻驻? 我笑而不...
    開封第一講書人閱讀 55,221評論 1 279
  • 正文 為了忘掉前任璧瞬,我火速辦了婚禮,結果婚禮上嗤锉,老公的妹妹穿的比我還像新娘。我一直安慰自己奥额,他們只是感情好访诱,可當我...
    茶點故事閱讀 64,224評論 5 371
  • 文/花漫 我一把揭開白布触菜。 她就那樣靜靜地躺著,像睡著了一般涡相。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上切威,一...
    開封第一講書人閱讀 49,007評論 1 284
  • 那天牢屋,我揣著相機與錄音,去河邊找鬼烙无。 笑死遍尺,一個胖子當著我的面吹牛,可吹牛的內(nèi)容都是我干的迂苛。 我是一名探鬼主播,決...
    沈念sama閱讀 38,313評論 3 399
  • 文/蒼蘭香墨 我猛地睜開眼就漾,長吁一口氣:“原來是場噩夢啊……” “哼念搬!你這毒婦竟也來了?” 一聲冷哼從身側響起朗徊,我...
    開封第一講書人閱讀 36,956評論 0 259
  • 序言:老撾萬榮一對情侶失蹤爷恳,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后温亲,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 43,441評論 1 300
  • 正文 獨居荒郊野嶺守林人離奇死亡袖外,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 35,925評論 2 323
  • 正文 我和宋清朗相戀三年在刺,在試婚紗的時候發(fā)現(xiàn)自己被綠了头镊。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 38,018評論 1 333
  • 序言:一個原本活蹦亂跳的男人離奇死亡颖杏,死狀恐怖留储,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情获讳,我是刑警寧澤活喊,帶...
    沈念sama閱讀 33,685評論 4 322
  • 正文 年R本政府宣布,位于F島的核電站帅矗,受9級特大地震影響,放射性物質發(fā)生泄漏累颂。R本人自食惡果不足惜凛俱,卻給世界環(huán)境...
    茶點故事閱讀 39,234評論 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望瘦棋。 院中可真熱鬧暖哨,春花似錦、人聲如沸篇裁。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,240評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽黍聂。三九已至身腻,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間嘀趟,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,464評論 1 261
  • 我被黑心中介騙來泰國打工牛隅, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留酌泰,地道東北人。 一個月前我還...
    沈念sama閱讀 45,467評論 2 352
  • 正文 我出身青樓默伍,卻偏偏與公主長得像,于是被迫代替她去往敵國和親际插。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 42,762評論 2 345

推薦閱讀更多精彩內(nèi)容