設(shè)計一個協(xié)程池

具體的池子

package main

import (
    "fmt"
    "sync"
    "sync/atomic"
    "time"
)

const (
    defaultCapacity = 10
    expireSecond    = 2
)

type Pool struct {
    capacity    int32       // 最大同時執(zhí)行的協(xié)程
    running     int32       // 當(dāng)前在執(zhí)行的
    waiting     int32       // 當(dāng)前等待的數(shù)量
    state       int32       // 當(dāng)前池子的狀態(tài)将塑,如果為1脉顿,則關(guān)閉
    lock        sync.Locker // 自旋鎖,實(shí)現(xiàn)了加鎖和解鎖方法
    workers     workerQueue // worker隊列点寥,實(shí)現(xiàn)了一個接口艾疟,不同實(shí)現(xiàn)方式代表不同的隊列
    workerCache sync.Pool   // 存放每個創(chuàng)建出來的worker
    cond        *sync.Cond  // 當(dāng)協(xié)程過多的時候需要阻塞,然后喚醒
}

func newPool(size int32) *Pool {
    if size <= 0 {
        size = defaultCapacity
    }
    p := &Pool{
        capacity: size,
        workers:  newWorkerStack(size),
        lock:     NewSpinLock(),
    }
    p.cond = sync.NewCond(p.lock)
    p.workerCache.New = func() interface{} {
        return &goWorker{
            pool: p,
            task: make(chan func(), 1),
        }
    }
    go p.purgeStableWorkers()
    return p
}

func (p *Pool) submit(task func()) error {
    if w := p.getWorker(); w != nil {
        w.inputFunc(task)
        fmt.Println("放入成功")
        return nil
    }
    fmt.Println("放入失敗")
    return fmt.Errorf("放入失敗")
}

func (p *Pool) getWorker() (w worker) {
    // 這里只有啟動的時候會執(zhí)行敢辩,后續(xù)基本不會執(zhí)行蔽莱,因?yàn)槎际侵苯訌年犃兄蝎@取worker
    spawnWorker := func() {
        w = p.workerCache.Get().(*goWorker)
        w.run()
    }
    //
    p.lock.Lock()
    w = p.workers.detach()
    if w != nil {
        fmt.Println("獲得worker")
        p.lock.Unlock()
        return w
    } else if p.running < p.capacity {
        fmt.Println("啟動")
        p.lock.Unlock()
        spawnWorker()
    } else {
        if p.isClosed() {
            p.lock.Unlock()
            return
        }
        fmt.Println("開始重試")
    retry:
        // 阻塞和重試
        p.addWaiting(1)
        p.cond.Wait()
        p.addWaiting(-1)

        if p.isClosed() {
            p.lock.Unlock()
            return
        }

        if w = p.workers.detach(); w == nil {
            fmt.Println("沒有獲得任務(wù)")
            if p.free() {
                p.lock.Unlock()
                spawnWorker()
                return
            }
            goto retry
        }
        fmt.Println("獲得任務(wù)成功")
        p.lock.Unlock()
    }
    return
}

func (p *Pool) addRunning(delta int) {
    atomic.AddInt32(&p.running, int32(delta))
}

func (p *Pool) revertWorker(w *goWorker) bool {
    fmt.Println("恢復(fù)worker")
    if p.capacity < p.running || p.isClosed() {
        p.cond.Broadcast()
        fmt.Println("已關(guān)閉1")
        return false
    }
    p.lock.Lock()
    if p.isClosed() {
        fmt.Println("已關(guān)閉1")
        p.lock.Unlock()
        return false
    }
    w.lastUsed = time.Now().Unix()
    if err := p.workers.insert(w); err != nil {
        p.lock.Unlock()
        return false
    }
    p.cond.Signal()
    p.lock.Unlock()
    return true
}

// isClosed 是否關(guān)閉
func (p *Pool) isClosed() bool {
    return atomic.LoadInt32(&p.state) == CLOSE
}

func (p *Pool) addWaiting(delta int) {
    atomic.AddInt32(&p.waiting, int32(delta))
}

// free 當(dāng)前池子是否有空閑
func (p *Pool) free() bool {
    return p.capacity-p.running > 0
}

func (p *Pool) purgeStableWorkers() {
    //ctx, _ := context.WithCancel(context.Background())
    ticker := time.NewTicker(time.Second * 2)
    defer func() {
        ticker.Stop()
    }()
    //
    for {
        select {
        case <-ticker.C:

        }
        fmt.Println("開始清楚")
        // 開始執(zhí)行主邏輯
        var isDormant bool
        p.lock.Lock()
        stableWorker := p.workers.refresh(time.Now().Unix() - expireSecond)
        fmt.Printf("空閑數(shù)量:%d\n", len(stableWorker))
        n := p.running
        isDormant = n == 0 || len(stableWorker) == int(n)
        p.lock.Unlock()
        for i := range stableWorker {
            stableWorker[i].finish()
            stableWorker[i] = nil
        }
        // 全部空閑
        if isDormant && p.waiting > 0 {
            fmt.Println("全部空閑")
            p.cond.Broadcast()
        }
    }
}

自旋鎖:對worker進(jìn)行操作時需要上鎖,因?yàn)榍衅皇蔷€程安全的

package main

import (
    "runtime"
    "sync"
    "sync/atomic"
)

type spinLock uint32

const maxBackoff = 16

// Lock 加鎖 使用CAS將0換成1 成功了就加鎖成功戚长,否則一直重試盗冷,重試次數(shù)從1增加到16
func (sl *spinLock) Lock() {
    backOff := 1
    for !atomic.CompareAndSwapUint32((*uint32)(sl), 0, 1) {
        for i := 0; i < backOff; i++ {
            runtime.Gosched()
        }
        if backOff < maxBackoff {
            backOff <<= 1 // 左移1位
        }
    }
}

// Unlock 解鎖,CAS將1改成0
func (sl *spinLock) Unlock() {
    atomic.StoreUint32((*uint32)(sl), 0)
}

func NewSpinLock() sync.Locker {
    return new(spinLock)
}

具體的worker實(shí)現(xiàn)

package main

import (
    "fmt"
)

type goWorker struct {
    // pool who owns this worker.
    pool *Pool

    // task is a job should be done.
    task chan func()

    // lastUsed will be updated when putting a worker back into queue.
    lastUsed int64
}

func (w *goWorker) inputFunc(fn func()) {
    w.task <- fn
}

func (w *goWorker) run() {
    w.pool.addRunning(1)
    fmt.Printf("當(dāng)前運(yùn)行的go:%d\n", w.pool.running)
    go func() {
        defer func() {
            w.pool.addRunning(-1)
            w.pool.workerCache.Put(w)
            w.pool.cond.Signal() // 喚醒一個協(xié)程
            fmt.Println("任務(wù)結(jié)束----------------")
        }()
        //  在此遍歷執(zhí)行task
        for task := range w.task {
            if task == nil {
                return
            }
            task()
            if ok := w.pool.revertWorker(w); !ok {
                return
            }
            //time.Sleep(time.Second * 5)
        }

    }()
}

// finish 往task中放入一個nil历葛,代表本次worker結(jié)束
func (w *goWorker) finish() {
    w.task <- nil
}

func (w *goWorker) lastUsedTime() int64 {
    return w.lastUsed
}

worker接口

package main

// 工作隊列interface

type worker interface {
    run()    // 執(zhí)行任務(wù)
    finish() // 強(qiáng)制當(dāng)前worker完成任務(wù)
    lastUsedTime() int64
    inputFunc(func())
    //inputParam(interface{})
}

type workerQueue interface {
    len() int
    //isEmpty() bool
    insert(worker) error
    detach() worker
    refresh(duration int64) []worker // clean up the stale workers and return them
    //reset()
}

隊列類型的worker

package main

import (
    "fmt"
)

type workerStack struct {
    items  []worker
    expiry []worker
}

func newWorkerStack(size int32) *workerStack {
    return &workerStack{
        items: make([]worker, 0, size),
    }
}

// insert 任務(wù)完成后正塌,將worker放回隊列中
func (ws *workerStack) insert(w worker) error {
    ws.items = append(ws.items, w)
    fmt.Printf("當(dāng)前insert數(shù)量:%d\n", len(ws.items))
    return nil
}

// detach 每次任務(wù)都要先從這里獲取,獲取不到的話就從池中獲取或者循環(huán)從這里獲取
func (ws *workerStack) detach() worker {
    l := len(ws.items)
    if l <= 0 {
        return nil
    }
    w := ws.items[l-1]
    ws.items[l-1] = nil
    ws.items = ws.items[:l-1]
    return w
}

// refresh 將空閑worker回收
func (ws *workerStack) refresh(expireTime int64) []worker {
    n := ws.len()
    if n <= 0 {
        return nil
    }

    index := ws.binarySearch(0, n-1, expireTime)
    ws.expiry = ws.expiry[:0] // 重置
    if index != -1 {
        ws.expiry = append(ws.expiry, ws.items[:index+1]...)
        m := copy(ws.items, ws.items[index+1:]) // 把未過期的worker拷貝到數(shù)組前面
        for i := m; i < n; i++ {
            ws.items[m] = nil
        }
        ws.items = ws.items[:m]
    }
    return ws.expiry
}

func (ws *workerStack) len() int {
    return len(ws.items)
}

func (ws *workerStack) binarySearch(left, right int, expireTime int64) int {
    fmt.Printf("過期時間:%d", expireTime)
    for left <= right {
        mid := (left + right) >> 1
        fmt.Println(ws.items[mid].lastUsedTime())
        if ws.items[mid].lastUsedTime() > expireTime {
            right = mid - 1
        } else {
            left = mid + 1
        }
    }
    fmt.Println(right)
    return right
}

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末恤溶,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子帜羊,更是在濱河造成了極大的恐慌咒程,老刑警劉巖,帶你破解...
    沈念sama閱讀 222,104評論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件讼育,死亡現(xiàn)場離奇詭異帐姻,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)奶段,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,816評論 3 399
  • 文/潘曉璐 我一進(jìn)店門饥瓷,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人痹籍,你說我怎么就攤上這事呢铆。” “怎么了蹲缠?”我有些...
    開封第一講書人閱讀 168,697評論 0 360
  • 文/不壞的土叔 我叫張陵棺克,是天一觀的道長。 經(jīng)常有香客問我线定,道長娜谊,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 59,836評論 1 298
  • 正文 為了忘掉前任斤讥,我火速辦了婚禮纱皆,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘。我一直安慰自己派草,他們只是感情好撑帖,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,851評論 6 397
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著澳眷,像睡著了一般胡嘿。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上钳踊,一...
    開封第一講書人閱讀 52,441評論 1 310
  • 那天衷敌,我揣著相機(jī)與錄音,去河邊找鬼拓瞪。 笑死缴罗,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的祭埂。 我是一名探鬼主播面氓,決...
    沈念sama閱讀 40,992評論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼蛆橡!你這毒婦竟也來了舌界?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,899評論 0 276
  • 序言:老撾萬榮一對情侶失蹤泰演,失蹤者是張志新(化名)和其女友劉穎呻拌,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體睦焕,經(jīng)...
    沈念sama閱讀 46,457評論 1 318
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡藐握,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,529評論 3 341
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了垃喊。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片猾普。...
    茶點(diǎn)故事閱讀 40,664評論 1 352
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖本谜,靈堂內(nèi)的尸體忽然破棺而出初家,到底是詐尸還是另有隱情,我是刑警寧澤耕突,帶...
    沈念sama閱讀 36,346評論 5 350
  • 正文 年R本政府宣布笤成,位于F島的核電站,受9級特大地震影響眷茁,放射性物質(zhì)發(fā)生泄漏炕泳。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 42,025評論 3 334
  • 文/蒙蒙 一上祈、第九天 我趴在偏房一處隱蔽的房頂上張望培遵。 院中可真熱鬧浙芙,春花似錦、人聲如沸籽腕。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,511評論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽皇耗。三九已至南窗,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間郎楼,已是汗流浹背万伤。 一陣腳步聲響...
    開封第一講書人閱讀 33,611評論 1 272
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留呜袁,地道東北人敌买。 一個月前我還...
    沈念sama閱讀 49,081評論 3 377
  • 正文 我出身青樓,卻偏偏與公主長得像阶界,于是被迫代替她去往敵國和親虹钮。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,675評論 2 359

推薦閱讀更多精彩內(nèi)容