Go語言無緩沖通道創(chuàng)建協(xié)程池
這些協(xié)程池通常用于并發(fā)執(zhí)行一組任務,最終組合起來完成某個功能。在這種情況下嚣镜,使用無緩沖通道要比使用緩沖通道好,因為既不需要任務隊列橘蜜,也不需要一組協(xié)程配合執(zhí)行菊匿,并且方便知道什么時候協(xié)程池正在執(zhí)行任務,如果協(xié)程池中的所有協(xié)程都在忙计福,無法處理新的任務跌捆,也能及時通過通道通知調(diào)用者(分配給無緩沖通道的任務未處理會阻塞后續(xù)分配)。另外象颖,使用無緩沖通道不會有任務在隊列中丟失或卡住佩厚,所有任務都會被處理。
注:以上是來自https://xueyuanjun.com/post/22061
實現(xiàn)下列實例:
// worker/worker.go
pcakge worker
import (
"sync"
)
type Worker interface {
Task()
}
type Pool struct {
work chan Worker
wg sync.WaitGroup
}
func New(maxGoroutines int) *Pool {
p := Pool {
work:make(chan Worker),
}
p.wg.Add(maxGoroutines)
for i := 0; i < maxGoroutines;i ++ {
go func() {
for w := range p.work {
w.Task()
}
p.wg.Done()
}()
return &p
}
func (p *Pool) Run (w Worker) {
p.work <- w
}
func (p *Pool) ShutDown() {
close(p.work)
p.wg.Wait()
}
// main.go
package main
var langs = []string{
"Golang",
"PHP",
"JavaScript"
"Python",
"Java",
}
type langPrinter struct {
lang string
}
func (m *langPrinter) Task() {
log.Println(m.lang)
time.Sleep(time.Second)
}
func main() {
var setnum = 3
p := worker,New(3)
var wg syn c.WaitGroup
wg.Add(setnum * len(langs))
for i := 0; i < setnum; i++ {
lp := langPrinter{lang}
go func() {
p.Run(&lp)
wg.Donw()
}
}
wg.Wait()
p.ShutDown()
}
以上是無緩存創(chuàng)建協(xié)程池
現(xiàn)在我會對緩存池進行深入分析说订!
先對 worker.go 進行分析抄瓦!
package worker
import (
"sync"
)
// 創(chuàng)建公共的接口,這個接口是執(zhí)行任務邏輯的陶冷。
type Worker interface {
Task()
}
// 協(xié)程池 struct
type Pool struct {
work chan Worker 協(xié)程池
wg sync.WaitGroup 原子容器
}
// 實例化模塊方法钙姊!開辟協(xié)程池,任務傳遞的相關功能埂伦!
func New(maxGoroutines int) * Pool {
// 開辟協(xié)程池煞额,對其進行實例化
p := Pool{
work:make(chan Worker),
}
// 添加原子計數(shù)器
p.wg.Add(maxGoroutines)
// 一次處理任務個數(shù),當 maxGoroutines = 3 沾谜。說明并發(fā)一次處理3個任務
for i := 0; i < maxGoroutines; i++ {
// 并發(fā)操作
go func() {
// 對 p.work (協(xié)程池) 進行遍歷膊毁,把所有任務進行分發(fā)!
// 當 p.work 沒有任務的時候类早,會進行阻塞
for w := range p.work {
w.Task()
}
// 對原子進行遞減
p.wg.Done()
}()
}
return &p
}
// 任務寫入方法媚媒!
func (p *Pool) Run (w Worker) {
// 把任務寫入 p.work 的 chan 里面!
p.work <- w
}
// 銷毀任務方法
func (p *Pool) ShutDown() {
// 把 p.work 協(xié)程池進行銷毀
close(p.work)
// 等待所有 goroutine 執(zhí)行完畢
p.wg.Wait()
}
package main
import (
"golang_no_buff/worker"
"log"
"sync"
"time"
)
var langs = []string{
"Golang",
"PHP",
"JavaScript",
"Python",
"Java",
}
type langPrinter struct {
lang string
}
// 對 langPrinter 綁定Task()方法涩僻,執(zhí)行 langPrinter 的任務邏輯操作缭召!
func (m *langPrinter) Task() {
log.Println(m.lang)
time.Sleep(time.Second)
}
func main() {
// 所執(zhí)行任務數(shù)
var setnum = 3
p := worker.New(3)
// 創(chuàng)建原子計數(shù)隊列
var wg sync.WaitGroup
// 添加原子數(shù)!3 * langs個數(shù)(5) = 15
wg.Add(setnum * len(langs))
// 對其循環(huán)3次
for i := 0; i < setnum; i++ {
// 循環(huán) 3 次 , 每一個 langs 有一個goroutine進行處理逆日,當
for _, lang := range langs {
// 把 key 寫到 langPrinter 里面去
lp := langPrinter{lang}
// 對其實現(xiàn)并發(fā)操作
go func() {
// p.Run() 是把數(shù)據(jù)寫入 p.work 方法
p.Run(&lp)
// 對原子計數(shù)進行遞減
wg.Done()
}()
}
}
// 等待所有線程執(zhí)行
wg.Wait()
// 把任務進行銷毀
p.ShutDown()
}
上面基本邏輯我以注釋進行寫入嵌巷,里面原理需要自己體會!