golang workerpool 源碼閱讀

今天讀了一下 fasthttp 的源碼沛简,其中讀到了 workpool 孕蝉,做了一些注釋嚣崭。

package fasthttp

import (
    "net"
    "runtime"
    "strings"
    "sync"
    "time"
)

// workerPool serves incoming connections via a pool of workers
// in FILO order, i.e. the most recently stopped worker will serve the next
// incoming connection.
//
// Such a scheme keeps CPU caches hot (in theory).
type workerPool struct {
    // Function for serving server connections.
    // It must leave c unclosed.
    WorkerFunc func(c net.Conn) error //注冊(cè)的conn 處理函數(shù)

    MaxWorkersCount int //最大的工作協(xié)程數(shù)

    LogAllErrors bool

    MaxIdleWorkerDuration time.Duration //協(xié)程最大的空閑時(shí)間浅辙,超過(guò)了就清理掉裕菠,其實(shí)就是退出協(xié)程函數(shù) ,退出 go

    Logger Logger

    lock         sync.Mutex
    workersCount int  //當(dāng)前的工作協(xié)程數(shù)
    mustStop     bool //workpool 停止標(biāo)記

    ready []*workerChan //準(zhǔn)備工作的協(xié)程竖螃,記當(dāng)時(shí)還在空閑的協(xié)程

    stopCh chan struct{} //workpool 停止信號(hào)

    workerChanPool sync.Pool //避免每次頻繁分配workerChan淑廊,使用pool
}

type workerChan struct { //工作協(xié)程
    lastUseTime time.Time
    ch          chan net.Conn // 帶緩沖區(qū) chan 處理完了一個(gè)conn 通過(guò)for range 再處理下一個(gè),都在一個(gè)協(xié)程里面
}

func (wp *workerPool) Start() {
    if wp.stopCh != nil {
        panic("BUG: workerPool already started")
    }
    wp.stopCh = make(chan struct{})
    stopCh := wp.stopCh
    go func() {
        var scratch []*workerChan
        for {
            wp.clean(&scratch) //定時(shí)清理掉協(xié)程  (workerChan)
            select {
            case <-stopCh:
                return
            default:
                time.Sleep(wp.getMaxIdleWorkerDuration())
            }
        }
    }()
}

func (wp *workerPool) Stop() {
    if wp.stopCh == nil {
        panic("BUG: workerPool wasn't started")
    }
    close(wp.stopCh) //停止
    wp.stopCh = nil

    // Stop all the workers waiting for incoming connections.
    // Do not wait for busy workers - they will stop after
    // serving the connection and noticing wp.mustStop = true.
    wp.lock.Lock()
    ready := wp.ready
    for i, ch := range ready { //清空
        ch.ch <- nil  //使用nil 值來(lái)關(guān)閉特咆,而不是close,因?yàn)?ch 是池化了的季惩,會(huì)循環(huán)使用,所以不能close
        ready[i] = nil
    }
    wp.ready = ready[:0]
    wp.mustStop = true
    wp.lock.Unlock()
}

func (wp *workerPool) getMaxIdleWorkerDuration() time.Duration {
    if wp.MaxIdleWorkerDuration <= 0 {
        return 10 * time.Second
    }
    return wp.MaxIdleWorkerDuration
}

func (wp *workerPool) clean(scratch *[]*workerChan) {
    // 傳入scratch 腻格,要淘汰的ch, 避免每次分配
    maxIdleWorkerDuration := wp.getMaxIdleWorkerDuration()

    // Clean least recently used workers if they didn't serve connections
    // for more than maxIdleWorkerDuration.
    currentTime := time.Now()

    wp.lock.Lock()
    ready := wp.ready
    n := len(ready)
    i := 0
    for i < n && currentTime.Sub(ready[i].lastUseTime) > maxIdleWorkerDuration {
        i++ //過(guò)期的ch 個(gè)數(shù)
    }
    *scratch = append((*scratch)[:0], ready[:i]...) //淘汰的ch,放到scratch
    if i > 0 {
        m := copy(ready, ready[i:]) //把需要保留的ch画拾,平移到前面,并且?guī)紫乱A舻臄?shù)量 m
        for i = m; i < n; i++ {
            ready[i] = nil //把ready 后面的ch淘汰 賦值nil
        }
        wp.ready = ready[:m] //保留的ch到ready
    }
    wp.lock.Unlock()

    // Notify obsolete workers to stop.
    // This notification must be outside the wp.lock, since ch.ch
    // may be blocking and may consume a lot of time if many workers
    // are located on non-local CPUs.
    tmp := *scratch
    for i, ch := range tmp { //淘汰的ch 賦值nil
        ch.ch <- nil
        tmp[i] = nil
    }
}

func (wp *workerPool) Serve(c net.Conn) bool {
    ch := wp.getCh() //獲取一個(gè)協(xié)程
    if ch == nil {
        return false
    }
    ch.ch <- c //傳入 conn 到協(xié)程
    return true
}

var workerChanCap = func() int {
    // Use blocking workerChan if GOMAXPROCS=1.
    // This immediately switches Serve to WorkerFunc, which results
    // in higher performance (under go1.5 at least).
    if runtime.GOMAXPROCS(0) == 1 {
        return 0
    }

    // Use non-blocking workerChan if GOMAXPROCS>1,
    // since otherwise the Serve caller (Acceptor) may lag accepting
    // new connections if WorkerFunc is CPU-bound.
    return 1
}()

func (wp *workerPool) getCh() *workerChan {
    var ch *workerChan //ch 是一個(gè)conn chan 阻塞的荒叶,通過(guò)for range 不停的處理不同的conn,可以看做是一個(gè)協(xié)程碾阁,不停的處理不同的鏈接
    createWorker := false

    wp.lock.Lock()
    ready := wp.ready
    n := len(ready) - 1
    if n < 0 {
        if wp.workersCount < wp.MaxWorkersCount {
            createWorker = true
            wp.workersCount++ //沒(méi)有可用的了需要 new
        }
    } else {
        ch = ready[n]
        ready[n] = nil
        wp.ready = ready[:n] //獲取ch 并且ready - 1
    }
    wp.lock.Unlock()

    if ch == nil {
        if !createWorker {
            return nil
        }
        vch := wp.workerChanPool.Get() //new 一個(gè),這里的new 其實(shí)是在 pool 拿一個(gè)workerChan些楣,從這里可以看出基本上只要是頻繁要分配的變量,都使用pool
        if vch == nil {
            vch = &workerChan{
                ch: make(chan net.Conn, workerChanCap),
            }
        }
        ch = vch.(*workerChan)
        go func() { //新建一個(gè)協(xié)程處理
            wp.workerFunc(ch)
            wp.workerChanPool.Put(vch) //歸還 workerChan
        }()
    }
    return ch
}

func (wp *workerPool) release(ch *workerChan) bool {
    ch.lastUseTime = CoarseTimeNow() //更新最后使用這個(gè)協(xié)程的時(shí)間
    wp.lock.Lock()
    if wp.mustStop {
        wp.lock.Unlock()
        return false //如果停止了宪睹,則上層 停止協(xié)程
    }
    wp.ready = append(wp.ready, ch) //歸還 ch 到ready,這里很巧妙愁茁,這樣 getch 的時(shí)候就又可以把新的conn放到這個(gè)協(xié)程處理
    wp.lock.Unlock()
    return true
}

func (wp *workerPool) workerFunc(ch *workerChan) {
    var c net.Conn

    var err error
    for c = range ch.ch { //不停的獲取 ch(阻塞的chan) ,處理不同的conn,在一個(gè)協(xié)程里面
        if c == nil { //接受到nil 值 就break
            break
        }

        if err = wp.WorkerFunc(c); err != nil && err != errHijacked { //注冊(cè)的conn鏈接處理函數(shù)
            errStr := err.Error()
            if wp.LogAllErrors || !(strings.Contains(errStr, "broken pipe") ||
                strings.Contains(errStr, "reset by peer") ||
                strings.Contains(errStr, "i/o timeout")) {
                wp.Logger.Printf("error when serving connection %q<->%q: %s", c.LocalAddr(), c.RemoteAddr(), err)
            }
        }
        if err != errHijacked {
            c.Close()
        }
        c = nil //釋放conn

        if !wp.release(ch) {
            break
        } //如果stop 了就 break
    }

    wp.lock.Lock()
    wp.workersCount-- //釋放此協(xié)程
    wp.lock.Unlock()
}

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末亭病,一起剝皮案震驚了整個(gè)濱河市鹅很,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌罪帖,老刑警劉巖促煮,帶你破解...
    沈念sama閱讀 211,265評(píng)論 6 490
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件邮屁,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡菠齿,警方通過(guò)查閱死者的電腦和手機(jī)佑吝,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,078評(píng)論 2 385
  • 文/潘曉璐 我一進(jìn)店門(mén),熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)绳匀,“玉大人芋忿,你說(shuō)我怎么就攤上這事〖部茫” “怎么了戈钢?”我有些...
    開(kāi)封第一講書(shū)人閱讀 156,852評(píng)論 0 347
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)是尔。 經(jīng)常有香客問(wèn)我殉了,道長(zhǎng),這世上最難降的妖魔是什么拟枚? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 56,408評(píng)論 1 283
  • 正文 為了忘掉前任薪铜,我火速辦了婚禮,結(jié)果婚禮上梨州,老公的妹妹穿的比我還像新娘痕囱。我一直安慰自己,他們只是感情好暴匠,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,445評(píng)論 5 384
  • 文/花漫 我一把揭開(kāi)白布鞍恢。 她就那樣靜靜地躺著,像睡著了一般每窖。 火紅的嫁衣襯著肌膚如雪帮掉。 梳的紋絲不亂的頭發(fā)上,一...
    開(kāi)封第一講書(shū)人閱讀 49,772評(píng)論 1 290
  • 那天窒典,我揣著相機(jī)與錄音蟆炊,去河邊找鬼。 笑死瀑志,一個(gè)胖子當(dāng)著我的面吹牛涩搓,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播劈猪,決...
    沈念sama閱讀 38,921評(píng)論 3 406
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼昧甘,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了战得?” 一聲冷哼從身側(cè)響起充边,我...
    開(kāi)封第一講書(shū)人閱讀 37,688評(píng)論 0 266
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎常侦,沒(méi)想到半個(gè)月后浇冰,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體贬媒,經(jīng)...
    沈念sama閱讀 44,130評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,467評(píng)論 2 325
  • 正文 我和宋清朗相戀三年肘习,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了际乘。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 38,617評(píng)論 1 340
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡井厌,死狀恐怖蚓庭,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情仅仆,我是刑警寧澤器赞,帶...
    沈念sama閱讀 34,276評(píng)論 4 329
  • 正文 年R本政府宣布,位于F島的核電站墓拜,受9級(jí)特大地震影響港柜,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜咳榜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,882評(píng)論 3 312
  • 文/蒙蒙 一夏醉、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧涌韩,春花似錦畔柔、人聲如沸。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 30,740評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至雇毫,卻和暖如春玄捕,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背棚放。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 31,967評(píng)論 1 265
  • 我被黑心中介騙來(lái)泰國(guó)打工枚粘, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人飘蚯。 一個(gè)月前我還...
    沈念sama閱讀 46,315評(píng)論 2 360
  • 正文 我出身青樓馍迄,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親局骤。 傳聞我的和親對(duì)象是個(gè)殘疾皇子柬姚,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,486評(píng)論 2 348

推薦閱讀更多精彩內(nèi)容

  • Android 自定義View的各種姿勢(shì)1 Activity的顯示之ViewRootImpl詳解 Activity...
    passiontim閱讀 171,754評(píng)論 25 707
  • 鄧文茂,二十五歲庄涡,是個(gè)初出茅廬的小作家,剛寫(xiě)了幾篇小文章就被市里的文學(xué)社團(tuán)看中搬设。一連幾個(gè)月在市里的期刊登載發(fā)表下來(lái)...
    半目翅閱讀 268評(píng)論 0 0
  • 要問(wèn)今年暑期最火而且還會(huì)繼續(xù)火下去的慢綜藝,那肯定是湖南衛(wèi)視的《中餐廳》泣洞。如果這個(gè)暑假你沒(méi)有看過(guò)《中餐廳》那你真的...
    股權(quán)幫閱讀 312評(píng)論 0 0