導讀
??????Channel
是Golang實現(xiàn)并發(fā)編程非常重要的組成部分,Channel
是一種內(nèi)建的核心數(shù)據(jù)類型卿操,需要使用make函數(shù)初始化警检,包括無緩沖的Channel(unbuffered Channel) 和有緩沖的Channel(buffered Channel)兩種孙援。無緩沖的Channel(unbuffered Channel) 主要用于goroutine之間的同步,有緩沖的Channel(buffered Channel)主要用于異步通信扇雕、控制goroutine并發(fā)數(shù)量拓售。
Unbuffered := make(chan int) // Unbuffered channel of integer type
buffered := make(chan int, 10) // Buffered channel of integer type
場景
??????在我們的日常開發(fā)工作中,時常有需要控制并發(fā)的場景镶奉,如控制訪問一個接口的并發(fā)础淤,運維系統(tǒng)初始化機器服務(wù)的數(shù)量等。下面介紹一下如何使用有緩沖的Channel(buffered Channel)實現(xiàn)控制并發(fā)數(shù)量腮鞍。
package main
import (
"flag"
"fmt"
"time"
)
// Fake a long and difficult work.
func DoWork() {
time.Sleep(5000 * time.Millisecond)
}
func main() {
maxNbConcurrentGoroutines := flag.Int("maxNbConcurrentGoroutines", 5, "the number of goroutines that are allowed to run concurrently")
nbJobs := flag.Int("nbJobs", 100, "the number of jobs that we need to do")
flag.Parse()
// Dummy channel to coordinate the number of concurrent goroutines.
// This channel should be buffered otherwise we will be immediately blocked
// when trying to fill it.
concurrentGoroutines := make(chan struct{}, *maxNbConcurrentGoroutines)
// The done channel indicates when a single goroutine has
// finished its job.
done := make(chan bool)
// The waitForAllJobs channel allows the main program
// to wait until we have indeed done all the jobs.
waitForAllJobs := make(chan bool)
// Collect all the jobs, and since the job is finished, we can
// release another spot for a goroutine.
go func() {
for i := 0; i < *nbJobs; i++ {
<-done
}
// We have collected all the jobs, the program
// can now terminate
waitForAllJobs <- true
}()
// Try to start nbJobs jobs
for i := 1; i <= *nbJobs; i++ {
fmt.Printf("ID: %v: waiting to launch!\n", i)
// Try to receive from the concurrentGoroutines channel. When we have something,
// it means we can start a new goroutine because another one finished.
// Otherwise, it will block the execution until an execution
// spot is available.
concurrentGoroutines <- struct{}{}
fmt.Printf("ID: %v: it's my turn!\n", i)
go func(id int) {
DoWork()
fmt.Printf("ID: %v: all done!\n", id)
done <- true
<-concurrentGoroutines
}(i)
}
// Wait for all jobs to finish
<-waitForAllJobs
}
這里代碼源自Github值骇,我對其做了一些改動莹菱,讓執(zhí)行流程更容易理解移国、輸出結(jié)果更加明顯,下面解釋一下這段程序:
- 使用
flag
包創(chuàng)建命令行參數(shù)道伟,可以自己指定并發(fā)的數(shù)量maxNbConcurrentGoroutines
和總共任務(wù)數(shù)量nbJobs
maxNbConcurrentGoroutines := flag.Int("maxNbConcurrentGoroutines", 5, "the number of goroutines that are allowed to run concurrently")
nbJobs := flag.Int("nbJobs", 100, "the number of jobs that we need to do")
flag.Parse()
運行一個并發(fā)為10迹缀,總數(shù)為100的示例:
go run limit_concurrency.go -maxNbConcurrentGoroutines 10 -nbJobs 100
- 創(chuàng)建
concurrentGoroutines
作為控制并發(fā)的帶緩沖的Channel,done
用于記錄一個goroutine完成,waitForAllJobs
用于阻塞main函數(shù)蜜徽,等待所有g(shù)oroutine完成祝懂。
concurrentGoroutines := make(chan struct{}, *maxNbConcurrentGoroutines)
done := make(chan bool)
waitForAllJobs := make(chan bool)
- 創(chuàng)建一個收集所有g(shù)oroutine運行狀態(tài)的goroutine,當所有g(shù)oroutine完成后向
waitForAllJobs
發(fā)送“完成信號”拘鞋。
go func() {
for i := 0; i < *nbJobs; i++ {
<-done
}
waitForAllJobs <- true
}()
- 這里我們叫它任務(wù)創(chuàng)建著砚蓬,或者叫生成者也可以,首先會向
concurrentGoroutines
寫入空的struct盆色,因為concurrentGoroutines
的buffer是10灰蛙,所以這里不會阻塞,直到for循環(huán)執(zhí)行10次隔躲,將buffer填滿摩梧,同時的也創(chuàng)建了10個goroutine用于執(zhí)行我們的任務(wù),當任務(wù)執(zhí)行完畢(這里都暫定執(zhí)行5s)宣旱,向done
發(fā)送true通知第三步的goroutine我執(zhí)行完了仅父,接著讀取concurrentGoroutines
,“釋放”一個空間浑吟,讓其他goroutine可以進來繼續(xù)執(zhí)行笙纤,但是怎么都不會超出buffer
的個數(shù),等所有任務(wù)執(zhí)行完组力,waitForAllJobs
收到了第三步gorotine發(fā)送的信號省容,整個程序結(jié)束,這就實現(xiàn)了控制并發(fā)忿项。
for i := 1; i <= *nbJobs; i++ {
fmt.Printf("ID: %v: waiting to launch!\n", i)
concurrentGoroutines <- struct{}{}
fmt.Printf("ID: %v: it's my turn!\n", i)
go func(id int) {
DoWork()
fmt.Printf("ID: %v: all done!\n", id)
done <- true
<-concurrentGoroutines
}(i)
}
<-waitForAllJobs
好了蓉冈,小伙伴們城舞,快運行下看看結(jié)果吧,Let's ready to Go :)