這是本系列文章的第三篇,第一篇在此golang并發(fā)三板斧系列之一:channel用于通信和同步颠区,第二篇在此golang并發(fā)三板斧系列之二:goroutine池用于并發(fā)。
前文描述了手工作坊的時代和工業(yè)時代毕莱,現(xiàn)在我們進入信息時代。
萬惡的資本主義
前文描述的工業(yè)時代其實是資本主義朋截,來到世間的每個毛孔都在滴血磨澡。不信你看前文的代碼,gen
之類的函數(shù)創(chuàng)建了一堆任務之后就扔給下游的works處理了质和,也不管他們要處理多久稳摄,是不是加班到深夜,是不是996ICU饲宿。
func BenchmarkCPUPool(b *testing.B) {
channum := 100
gonum := runtime.NumCPU()
for i := 0; i < b.N; i++ {
f := func(w *Work) {
if v, ok := w.input.(float64); ok {
cpubound(v)
}
}
list := benchmarkList()
c := genPoolChanBuffer(list)
p := InitPool(channum, gonum, f)
p.RunWorker()
p.FeedWorker(c)
p.Wait()
}
}
人民當家作主
感謝毛主席厦酬,解放之后我們進入了社會主義,我們有了工會這個關愛人民的組織瘫想。人民的工會愛人民仗阅,人民的工會力量大,工會可以在合適的時候給大家放假国夜,讓大家休息减噪,對應到程序中就是按下了神圣的ctrl+c
。
如下是最粗暴的模型车吹,工會一喊放假筹裕,大家都放下手上的工作開心的玩耍了。但是有的工作做了一半就放棄了窄驹,這確實是很沒有職業(yè)操守的:
var WorkPool chan work
type work struct {
sth string
}
func (w work) Working() {
log.Printf("start %s", w.sth)
time.Sleep(2 * time.Second)
log.Printf("end %s", w.sth)
}
func RunWorkerSimple() {
workers := 2
for i := 0; i < workers; i++ {
go HandleWorkerSimple()
}
}
func HandleWorkerSimple() {
for {
select {
case work := <-WorkPool:
work.Working()
}
}
return
}
func TestWorkerSimple(t *testing.T) {
WorkPool = make(chan work)
go RunWorkerSimple()
go func() {
list := benchmarkList()
for _, l := range list {
WorkPool <- work{fmt.Sprint(l)}
}
}()
select {}
}
很明顯從結果可以看出朝卒,有工作編號為2和3的沒有完成就被扔掉了,其余沒有啟動的工作都放棄了乐埠。
C02S259EFVH3:go_concurrency baixiao$ go test -run TestWorkerSimple
2019/04/08 22:56:11 start 1
2019/04/08 22:56:11 start 0
2019/04/08 22:56:13 end 0
2019/04/08 22:56:13 start 2
2019/04/08 22:56:13 end 1
2019/04/08 22:56:13 start 3
^Csignal: interrupt
FAIL _/Users/baixiao/Go/src/github.com/baixiaoustc/go_concurrency 2.790s
要對得起這份工作
人民的工人為人民抗斤,所以即便工會保障了工人的權益囚企,該做好的工作還是要認真做完啊。對應到程序中瑞眼,收到ctrl+c
中斷后龙宏,每個worker應該完成手上正在做的工作,并且由BOSS(主goroutine)把剩余隊列中的工作保存起來(比如寫數(shù)據(jù)庫或者文件)伤疙,留待明天上班繼續(xù)做烦衣。
代碼寫起來就復雜多了,要監(jiān)控系統(tǒng)的中斷信號掩浙,要等待所有worker處理完手上的事情花吟,最后再把剩余的事情保存起來。需要用兩個chan來通信厨姚,stopChan 用于通知workers下班啦衅澈,stoppedChan 用于所有worker處理完之后告知BOSS,再由BOSS保存剩余的工作谬墙。
func (w work) Saving() {
log.Printf("save %s", w.sth)
}
func RunWorkerHold(stop, stopped chan struct{}) {
var wg sync.WaitGroup
workers := 2
for i := 0; i < workers; i++ {
wg.Add(1)
go HandleWorkerHold(stop, &wg)
}
wg.Wait()
stopped <- struct{}{}
}
func HandleWorkerHold(stop chan struct{}, wg *sync.WaitGroup) {
defer wg.Done()
for {
select {
case work := <-WorkPool:
work.Working()
case <-stop:
log.Println("worker: caller has told us to stop")
return
}
}
return
}
func TestWorkerHold(t *testing.T) {
WorkPool = make(chan work)
stopChan := make(chan struct{})
stoppedChan := make(chan struct{})
go RunWorkerHold(stopChan, stoppedChan)
go func() {
list := benchmarkList()
for _, l := range list {
WorkPool <- work{fmt.Sprint(l)}
}
}()
// listen for C-c
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
<-c
log.Println("main: received C-c - shutting down")
// tell the goroutine to stop
log.Println("main: telling workers to stop")
close(stopChan)
// and wait for them to reply back
<-stoppedChan
log.Println("main: workers has told us they've finished")
for work := range WorkPool {
work.Saving()
}
return
}
這里沒有把打印貼完今布,最終的結果是所有工作都save好了的:
C02S259EFVH3:go_concurrency baixiao$ go test -run TestWorkerHold
2019/04/08 23:32:02 start 1
2019/04/08 23:32:02 start 0
2019/04/08 23:32:04 end 1
2019/04/08 23:32:04 start 2
2019/04/08 23:32:04 end 0
2019/04/08 23:32:04 start 3
^C2019/04/08 23:32:05 main: received C-c - shutting down
2019/04/08 23:32:05 main: telling workers to stop
2019/04/08 23:32:06 end 2
2019/04/08 23:32:06 worker: caller has told us to stop
2019/04/08 23:32:06 end 3
2019/04/08 23:32:06 start 4
2019/04/08 23:32:08 end 4
2019/04/08 23:32:08 start 5
2019/04/08 23:32:10 end 5
2019/04/08 23:32:10 worker: caller has told us to stop
2019/04/08 23:32:10 main: workers has told us they've finished
2019/04/08 23:32:10 save 6
2019/04/08 23:32:10 save 7
2019/04/08 23:32:10 save 8
2019/04/08 23:32:10 save 9
2019/04/08 23:32:10 save 10
2019/04/08 23:32:10 save 11
2019/04/08 23:32:10 save 12
2019/04/08 23:32:10 save 13
2019/04/08 23:32:10 save 14
2019/04/08 23:32:10 save 15
。部默。造虎。
但是值得注意的是,不是close(stopChan)
一執(zhí)行份蝴,馬上所有的worker都能結束工作了。如上其中一個worker在接著完成work4和work5之后才走了退出流程婚夫,是因為對于select
來講署鸡,如果多個chan
都準備好了的話,是隨機選擇其中一個时捌,所以會有概率一直接著work的撒穷。
更進一步
上面的模式還是有缺陷裆熙,如果worker下面還有徒弟怎么辦(又新開了goroutine)禽笑?最后BOSS在做剩余工作的保存時也不想耽誤太久怎么辦蛤奥?保存工作寫數(shù)據(jù)庫也想受控制(比如database/sql
包提供了相應支持)怎么辦?
終于我們的主角登場了蟀伸,golang提供了context
模式用于解決goroutine的高效且安全退出問題缅刽,教程在網(wǎng)上很多了,不用細講迟蜜,只貼一下主要函數(shù):
// 帶cancel返回值的Context啡省,一旦cancel被調用,即取消該創(chuàng)建的context
func WithCancel(parent Context) (ctx Context, cancel CancelFunc)
// 帶有效期cancel返回值的Context畦戒,即必須到達指定時間點調用的cancel方法才會被執(zhí)行
func WithDeadline(parent Context, deadline time.Time) (Context, CancelFunc)
// 帶超時時間cancel返回值的Context结序,類似Deadline,前者是時間點配喳,后者為時間間隔
// 相當于WithDeadline(parent, time.Now().Add(timeout)).
func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc)
最后進化到我們的代碼凳干,注意SavingDB()
方法只是一個偽代碼示意:
import "database/sql"
//fake, just a example
func (w work) SavingDB(ctx context.Context) {
log.Printf("save %s", w.sth)
stmt := "select name from db.table"
db := sql.DB{}
conn, _ := db.Conn(ctx)
rows, err := conn.QueryContext(ctx, stmt)
if err != nil {
if err == context.DeadlineExceeded {
// context canceled
}
return
}
var name string
for rows.Next() {
if err := rows.Scan(&name); err != nil {
if err == context.DeadlineExceeded {
log.Println("scan canceled")
}
}
}
}
func RunWorkerContext(ctx context.Context, stopped chan struct{}) {
var wg sync.WaitGroup
workers := 2
for i := 0; i < workers; i++ {
wg.Add(1)
go HandleWorkerContext(ctx, &wg)
}
wg.Wait()
stopped <- struct{}{}
}
func HandleWorkerContext(ctx context.Context, wg *sync.WaitGroup) {
defer wg.Done()
for {
select {
case work := <-WorkPool:
work.Working()
case <-ctx.Done():
log.Println("worker: caller has told us to stop")
return
}
}
return
}
func TestWorkerContext(t *testing.T) {
WorkPool = make(chan work)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
stoppedChan := make(chan struct{})
go RunWorkerContext(ctx, stoppedChan)
go func() {
list := benchmarkList()
for _, l := range list {
WorkPool <- work{fmt.Sprint(l)}
}
}()
// listen for C-c
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
<-c
log.Println("main: received C-c - shutting down")
// tell the goroutine to stop
log.Println("main: telling workers to stop")
cancel()
// and wait for them to reply back
<-stoppedChan
log.Println("main: workers has told us they've finished")
ctxTimeout, cancelTimeout := context.WithTimeout(ctx, 100*time.Microsecond)
defer cancelTimeout()
for {
select {
case work := <-WorkPool:
work.SavingDB(ctxTimeout)
case <-ctxTimeout.Done():
log.Println("main: cann't wait any more")
return
}
}
return
}
通過cancel()
方法通知該context.Context
其下的所有goroutine進入退出流程涧团,并可以啟動帶timeout的ctx開始保存工作流程泌绣,所有流程都是受控的预厌。
這像不像是現(xiàn)代企業(yè)的扁平化管理,BOSS直接控制所有員工苗沧,提升所有的工作效率?
所有代碼都在https://github.com/baixiaoustc/go_concurrency/blob/master/third_post_test.go中能找到待逞。