具體的池子
package main
import (
"fmt"
"sync"
"sync/atomic"
"time"
)
const (
defaultCapacity = 10
expireSecond = 2
)
type Pool struct {
capacity int32 // 最大同時執(zhí)行的協(xié)程
running int32 // 當(dāng)前在執(zhí)行的
waiting int32 // 當(dāng)前等待的數(shù)量
state int32 // 當(dāng)前池子的狀態(tài)将塑,如果為1脉顿,則關(guān)閉
lock sync.Locker // 自旋鎖,實(shí)現(xiàn)了加鎖和解鎖方法
workers workerQueue // worker隊列点寥,實(shí)現(xiàn)了一個接口艾疟,不同實(shí)現(xiàn)方式代表不同的隊列
workerCache sync.Pool // 存放每個創(chuàng)建出來的worker
cond *sync.Cond // 當(dāng)協(xié)程過多的時候需要阻塞,然后喚醒
}
func newPool(size int32) *Pool {
if size <= 0 {
size = defaultCapacity
}
p := &Pool{
capacity: size,
workers: newWorkerStack(size),
lock: NewSpinLock(),
}
p.cond = sync.NewCond(p.lock)
p.workerCache.New = func() interface{} {
return &goWorker{
pool: p,
task: make(chan func(), 1),
}
}
go p.purgeStableWorkers()
return p
}
func (p *Pool) submit(task func()) error {
if w := p.getWorker(); w != nil {
w.inputFunc(task)
fmt.Println("放入成功")
return nil
}
fmt.Println("放入失敗")
return fmt.Errorf("放入失敗")
}
func (p *Pool) getWorker() (w worker) {
// 這里只有啟動的時候會執(zhí)行敢辩,后續(xù)基本不會執(zhí)行蔽莱,因?yàn)槎际侵苯訌年犃兄蝎@取worker
spawnWorker := func() {
w = p.workerCache.Get().(*goWorker)
w.run()
}
//
p.lock.Lock()
w = p.workers.detach()
if w != nil {
fmt.Println("獲得worker")
p.lock.Unlock()
return w
} else if p.running < p.capacity {
fmt.Println("啟動")
p.lock.Unlock()
spawnWorker()
} else {
if p.isClosed() {
p.lock.Unlock()
return
}
fmt.Println("開始重試")
retry:
// 阻塞和重試
p.addWaiting(1)
p.cond.Wait()
p.addWaiting(-1)
if p.isClosed() {
p.lock.Unlock()
return
}
if w = p.workers.detach(); w == nil {
fmt.Println("沒有獲得任務(wù)")
if p.free() {
p.lock.Unlock()
spawnWorker()
return
}
goto retry
}
fmt.Println("獲得任務(wù)成功")
p.lock.Unlock()
}
return
}
func (p *Pool) addRunning(delta int) {
atomic.AddInt32(&p.running, int32(delta))
}
func (p *Pool) revertWorker(w *goWorker) bool {
fmt.Println("恢復(fù)worker")
if p.capacity < p.running || p.isClosed() {
p.cond.Broadcast()
fmt.Println("已關(guān)閉1")
return false
}
p.lock.Lock()
if p.isClosed() {
fmt.Println("已關(guān)閉1")
p.lock.Unlock()
return false
}
w.lastUsed = time.Now().Unix()
if err := p.workers.insert(w); err != nil {
p.lock.Unlock()
return false
}
p.cond.Signal()
p.lock.Unlock()
return true
}
// isClosed 是否關(guān)閉
func (p *Pool) isClosed() bool {
return atomic.LoadInt32(&p.state) == CLOSE
}
func (p *Pool) addWaiting(delta int) {
atomic.AddInt32(&p.waiting, int32(delta))
}
// free 當(dāng)前池子是否有空閑
func (p *Pool) free() bool {
return p.capacity-p.running > 0
}
func (p *Pool) purgeStableWorkers() {
//ctx, _ := context.WithCancel(context.Background())
ticker := time.NewTicker(time.Second * 2)
defer func() {
ticker.Stop()
}()
//
for {
select {
case <-ticker.C:
}
fmt.Println("開始清楚")
// 開始執(zhí)行主邏輯
var isDormant bool
p.lock.Lock()
stableWorker := p.workers.refresh(time.Now().Unix() - expireSecond)
fmt.Printf("空閑數(shù)量:%d\n", len(stableWorker))
n := p.running
isDormant = n == 0 || len(stableWorker) == int(n)
p.lock.Unlock()
for i := range stableWorker {
stableWorker[i].finish()
stableWorker[i] = nil
}
// 全部空閑
if isDormant && p.waiting > 0 {
fmt.Println("全部空閑")
p.cond.Broadcast()
}
}
}
自旋鎖:對worker進(jìn)行操作時需要上鎖,因?yàn)榍衅皇蔷€程安全的
package main
import (
"runtime"
"sync"
"sync/atomic"
)
type spinLock uint32
const maxBackoff = 16
// Lock 加鎖 使用CAS將0換成1 成功了就加鎖成功戚长,否則一直重試盗冷,重試次數(shù)從1增加到16
func (sl *spinLock) Lock() {
backOff := 1
for !atomic.CompareAndSwapUint32((*uint32)(sl), 0, 1) {
for i := 0; i < backOff; i++ {
runtime.Gosched()
}
if backOff < maxBackoff {
backOff <<= 1 // 左移1位
}
}
}
// Unlock 解鎖,CAS將1改成0
func (sl *spinLock) Unlock() {
atomic.StoreUint32((*uint32)(sl), 0)
}
func NewSpinLock() sync.Locker {
return new(spinLock)
}
具體的worker實(shí)現(xiàn)
package main
import (
"fmt"
)
type goWorker struct {
// pool who owns this worker.
pool *Pool
// task is a job should be done.
task chan func()
// lastUsed will be updated when putting a worker back into queue.
lastUsed int64
}
func (w *goWorker) inputFunc(fn func()) {
w.task <- fn
}
func (w *goWorker) run() {
w.pool.addRunning(1)
fmt.Printf("當(dāng)前運(yùn)行的go:%d\n", w.pool.running)
go func() {
defer func() {
w.pool.addRunning(-1)
w.pool.workerCache.Put(w)
w.pool.cond.Signal() // 喚醒一個協(xié)程
fmt.Println("任務(wù)結(jié)束----------------")
}()
// 在此遍歷執(zhí)行task
for task := range w.task {
if task == nil {
return
}
task()
if ok := w.pool.revertWorker(w); !ok {
return
}
//time.Sleep(time.Second * 5)
}
}()
}
// finish 往task中放入一個nil历葛,代表本次worker結(jié)束
func (w *goWorker) finish() {
w.task <- nil
}
func (w *goWorker) lastUsedTime() int64 {
return w.lastUsed
}
worker接口
package main
// 工作隊列interface
type worker interface {
run() // 執(zhí)行任務(wù)
finish() // 強(qiáng)制當(dāng)前worker完成任務(wù)
lastUsedTime() int64
inputFunc(func())
//inputParam(interface{})
}
type workerQueue interface {
len() int
//isEmpty() bool
insert(worker) error
detach() worker
refresh(duration int64) []worker // clean up the stale workers and return them
//reset()
}
隊列類型的worker
package main
import (
"fmt"
)
type workerStack struct {
items []worker
expiry []worker
}
func newWorkerStack(size int32) *workerStack {
return &workerStack{
items: make([]worker, 0, size),
}
}
// insert 任務(wù)完成后正塌,將worker放回隊列中
func (ws *workerStack) insert(w worker) error {
ws.items = append(ws.items, w)
fmt.Printf("當(dāng)前insert數(shù)量:%d\n", len(ws.items))
return nil
}
// detach 每次任務(wù)都要先從這里獲取,獲取不到的話就從池中獲取或者循環(huán)從這里獲取
func (ws *workerStack) detach() worker {
l := len(ws.items)
if l <= 0 {
return nil
}
w := ws.items[l-1]
ws.items[l-1] = nil
ws.items = ws.items[:l-1]
return w
}
// refresh 將空閑worker回收
func (ws *workerStack) refresh(expireTime int64) []worker {
n := ws.len()
if n <= 0 {
return nil
}
index := ws.binarySearch(0, n-1, expireTime)
ws.expiry = ws.expiry[:0] // 重置
if index != -1 {
ws.expiry = append(ws.expiry, ws.items[:index+1]...)
m := copy(ws.items, ws.items[index+1:]) // 把未過期的worker拷貝到數(shù)組前面
for i := m; i < n; i++ {
ws.items[m] = nil
}
ws.items = ws.items[:m]
}
return ws.expiry
}
func (ws *workerStack) len() int {
return len(ws.items)
}
func (ws *workerStack) binarySearch(left, right int, expireTime int64) int {
fmt.Printf("過期時間:%d", expireTime)
for left <= right {
mid := (left + right) >> 1
fmt.Println(ws.items[mid].lastUsedTime())
if ws.items[mid].lastUsedTime() > expireTime {
right = mid - 1
} else {
left = mid + 1
}
}
fmt.Println(right)
return right
}