golang中啟動一個協(xié)程不會消耗太多資源,有人認(rèn)為可以不用協(xié)程池豫缨。但是當(dāng)訪問量增大時谒兄,可能造成內(nèi)存消耗完摔桦,程序崩潰。于是寫了一個協(xié)程池的Demo承疲。
Demo中有worker和job邻耕。worker是一個協(xié)程,在worker中完成一個job燕鸽。Jobs是一個channel
赊豌,使用Jobs記錄job。當(dāng)生成一個新任務(wù)绵咱,就發(fā)送到Jobs中碘饼。程序啟動時,首先啟動3個worker協(xié)程悲伶,每個協(xié)程都嘗試從Jobs中接收job艾恼。如果Jobs中沒有job,worker協(xié)程就等待麸锉。
基本邏輯如下:
- Jobs管道存放job钠绍,Results管道存放結(jié)果。
- 程序一啟動花沉,啟動3個worker協(xié)程柳爽,等待從Jobs管道中取數(shù)據(jù)媳握。
- 向Jobs管道中發(fā)送3個數(shù)據(jù)。
- 關(guān)閉Jobs管道磷脯。
- worker協(xié)程從Jobs管道中接收到數(shù)據(jù)以后蛾找,執(zhí)行程序,把結(jié)果放到Results管道中赵誓。然后繼續(xù)等待打毛。
- 當(dāng)Jobs管道中沒有數(shù)據(jù),并且Results有3個數(shù)據(jù)時俩功。退出主程序幻枉。
代碼如下:
package main
import (
"fmt"
"time"
)
func worker(id int) {
go func() {
for {
fmt.Println("Waiting for job...")
select {
// Receive from channel
case j := <-Jobs :
fmt.Println("worker", id, "started job", j)
time.Sleep(time.Second)
fmt.Println("worker", id, "finished job", j)
Results <- true
}
}
}()
}
const channelLength = 3
var (
Jobs chan int
Results chan bool
)
func main() {
Jobs = make(chan int, channelLength)
Results = make(chan bool, channelLength)
// Start worker goroutines
for i:= 0; i < channelLength; i++ {
worker(i)
}
// Send to channel
time.Sleep(time.Second)
for j := 0; j < channelLength; j++ {
Jobs <- j
}
close(Jobs)
for len(Jobs) != 0 || len(Results) != channelLength {
time.Sleep(100 * time.Millisecond)
}
fmt.Println("Complete main")
}
運行結(jié)果如下:
Waiting for job...
Waiting for job...
Waiting for job...
worker 1 started job 2
worker 2 started job 0
worker 0 started job 1
worker 0 finished job 1
Waiting for job...
worker 0 started job 0
worker 2 finished job 0
Waiting for job...
worker 2 started job 0
worker 1 finished job 2
Waiting for job...
worker 1 started job 0
Complete main
這個程序出現(xiàn)問題了,bug在哪里诡蜓?
開始的3次熬甫,協(xié)程運行都是正常。
worker 1 started job 2
worker 2 started job 0
worker 0 started job 1
worker 0 finished job 1
worker 2 finished job 0
worker 1 finished job 2
根據(jù)設(shè)計蔓罚,向Jobs管道中發(fā)送3個數(shù)據(jù)以后罗珍,就關(guān)閉了管道。此后脚粟,協(xié)程不應(yīng)該再從Jobs管道中接收到數(shù)據(jù)覆旱。
for j := 0; j < channelLength; j++ {
jobs <- j
}
close(jobs)
實際運行中,協(xié)程接收完3個數(shù)據(jù)以后核无,worker還能不斷的從Jobs管道中接收到數(shù)據(jù)扣唱。與設(shè)計不符。
worker 0 started job 0
worker 2 started job 0
worker 1 started job 0
開始以為問題出在worker()中团南,j := <- job
噪沙,只有當(dāng)job中有返回,才會打印worker started
吐根。但是后面的job id都是0正歼,說明沒有向jobs管道中發(fā)送新數(shù)據(jù)。
for {
fmt.Println("Waiting for job...")
select {
case j := <-Jobs :
fmt.Println("worker", id, "started job", j)
time.Sleep(time.Second)
fmt.Println("worker", id, "finished job", j)
Results <- true
}
}
研究向Jobs管道發(fā)送數(shù)據(jù)的代碼拷橘,突發(fā)奇想局义,把close(Jobs)
注釋掉,看看如何冗疮。
for j := 0; j < channelLength; j++ {
Jobs <- j
}
//close(Jobs)
程序居然正常了萄唇。
Waiting for job...
Waiting for job...
Waiting for job...
worker 1 started job 0
worker 0 started job 2
worker 2 started job 1
worker 1 finished job 0
worker 0 finished job 2
Waiting for job...
Waiting for job...
worker 2 finished job 1
Waiting for job...
Complete main
原來問題出在close()
上,馬上查注釋术幔。close()
是在sender
中調(diào)用另萤,當(dāng)管道中最后一個數(shù)據(jù)被接收以后,就關(guān)閉管道。此時四敞,不能再向管道中發(fā)送數(shù)據(jù)泛源。否則會報錯panic: send on closed channel
。
使用x, ok := <-c
可以判斷一個管道是否關(guān)閉忿危,如果管道已經(jīng)關(guān)閉达箍,ok
的值為false
。
管道關(guān)閉以后癌蚁,并且管道中的數(shù)據(jù)被接收完以后幻梯,居然還能從管道中接收到數(shù)據(jù)0
兜畸。于是就造成了后續(xù)協(xié)程接收到job 0
的問題努释。
// The close built-in function closes a channel, which must be either
// bidirectional or send-only. It should be executed only by the sender,
// never the receiver, and has the effect of shutting down the channel after
// the last sent value is received. After the last value has been received
// from a closed channel c, any receive from c will succeed without
// blocking, returning the zero value for the channel element. The form
// x, ok := <-c
// will also set ok to false for a closed channel.
func close(c chan<- Type)
如果要使用close,應(yīng)該怎么做
管道不用時咬摇,close()
管道是個好習(xí)慣伐蒂。此時,應(yīng)該怎么解決這個問題呢肛鹏?首先要在協(xié)程中檢查接收到的數(shù)據(jù)逸邦,j:=<-jobs
,判斷j
是否為0
在扰。如果Jobs
中存放的是非指針數(shù)據(jù)
缕减,不能分辨0
是真正的0值
,還是close以后接收到的0
芒珠。因此需要在Jobs
管道中存放指針桥狡。管道打開時,接收的都是非nil
指針皱卓。close以后才返回0
裹芝,也就是nil
指針。
修改程序娜汁。新生成一個機(jī)構(gòu)體Job嫂易。
type Job struct {
JobId int
}
Jobs保存指向Job的指針。
Jobs chan *Job
func main() {
Jobs = make(chan *Job, channelLength)
...
for j := 0; j < channelLength; j++ {
Jobs <- &Job{JobId:j}
}
close(Jobs)
...
}
在worker協(xié)程中掐禁,從管道取出Job指針以后怜械,判斷指針是否為nil。如果為nil傅事,說明管道已經(jīng)關(guān)閉宫盔,協(xié)程退出。
func worker(id int) {
go func() {
for {
fmt.Println("Waiting for job...")
select {
// Receive from channel
case j := <-Jobs :
if j == nil {
fmt.Println("Close the worker", id)
return
}
fmt.Println("worker", id, "started job", j.JobId)
time.Sleep(time.Second)
fmt.Println("worker", id, "finished job", j.JobId)
Results <- true
}
}
}()
}
運行結(jié)果達(dá)到預(yù)期享完。
Waiting for job...
Waiting for job...
Waiting for job...
worker 0 started job 0
worker 1 started job 1
worker 2 started job 2
worker 2 finished job 2
worker 0 finished job 0
Waiting for job...
Waiting for job...
Close the worker 2
Close the worker 0
worker 1 finished job 1
Waiting for job...
Close the worker 1
Complete main
附上最終的代碼灼芭。
package main
import (
"fmt"
"time"
)
type Job struct {
JobId int
}
func worker(id int) {
go func() {
for {
fmt.Println("Waiting for job...")
select {
// Receive from channel
case j := <-Jobs :
if j == nil {
fmt.Println("Close the worker", id)
return
}
fmt.Println("worker", id, "started job", j.JobId)
time.Sleep(time.Second)
fmt.Println("worker", id, "finished job", j.JobId)
Results <- true
}
}
}()
}
const channelLength = 3
var (
Jobs chan *Job
Results chan bool
)
func main() {
Jobs = make(chan *Job, channelLength)
Results = make(chan bool, channelLength)
// Start worker goroutines
for i:= 0; i < channelLength; i++ {
worker(i)
}
// Send to channel
time.Sleep(time.Second)
for j := 0; j < channelLength; j++ {
Jobs <- &Job{JobId:j}
}
close(Jobs)
for len(Jobs) != 0 || len(Results) != channelLength {
time.Sleep(100 * time.Millisecond)
}
fmt.Println("Complete main")
}