簡介
處理大量并發(fā)是 Go 語言的一大優(yōu)勢毡庆。語言內(nèi)置了方便的并發(fā)語法,可以非常方便的創(chuàng)建很多個輕量級的 goroutine 并發(fā)處理任務(wù)烙如。相比于創(chuàng)建多個線程么抗,goroutine 更輕量、資源占用更少亚铁、切換速度更快蝇刀、無線程上下文切換開銷更少。但是受限于資源總量徘溢,系統(tǒng)中能夠創(chuàng)建的 goroutine 數(shù)量也是受限的吞琐。默認每個 goroutine 占用 8KB 內(nèi)存捆探,一臺 8GB 內(nèi)存的機器滿打滿算也只能創(chuàng)建 8GB/8KB = 1000000 個 goroutine,更何況系統(tǒng)還需要保留一部分內(nèi)存運行日常管理任務(wù)站粟,go 運行時需要內(nèi)存運行 gc黍图、處理 goroutine 切換等。使用的內(nèi)存超過機器內(nèi)存容量卒蘸,系統(tǒng)會使用交換區(qū)(swap)雌隅,導致性能急速下降。我們可以簡單驗證一下創(chuàng)建過多 goroutine 會發(fā)生什么:
func main() {
var wg sync.WaitGroup
wg.Add(10000000)
for i := 0; i < 10000000; i++ {
go func() {
time.Sleep(1 * time.Minute)
}()
}
wg.Wait()
}
在我的機器上(8G內(nèi)存)運行上面的程序會報errno 1455
缸沃,即Out of Memory
錯誤恰起,這很好理解。謹慎運行趾牧。
另一方面检盼,goroutine 的管理也是一個問題。goroutine 只能自己運行結(jié)束翘单,外部沒有任何手段可以強制j結(jié)束一個 goroutine吨枉。如果一個 goroutine 因為某種原因沒有自行結(jié)束,就會出現(xiàn) goroutine 泄露哄芜。此外貌亭,頻繁創(chuàng)建 goroutine 也是一個開銷。
鑒于上述原因认臊,自然出現(xiàn)了與線程池一樣的需求圃庭,即 goroutine 池。一般的 goroutine 池自動管理 goroutine 的生命周期失晴,可以按需創(chuàng)建剧腻,動態(tài)縮容。向 goroutine 池提交一個任務(wù)涂屁,goroutine 池會自動安排某個 goroutine 來處理书在。
ants
就是其中一個實現(xiàn) goroutine 池的庫。
快速使用
本文代碼使用 Go Modules拆又。
創(chuàng)建目錄并初始化:
$ mkdir ants && cd ants
$ go mod init github.com/darjun/go-daily-lib/ants
安裝ants
庫儒旬,使用v2
版本:
$ go get -u github.com/panjf2000/ants/v2
我們接下來要實現(xiàn)一個計算大量整數(shù)和的程序。首先創(chuàng)建基礎(chǔ)的任務(wù)結(jié)構(gòu)帖族,并實現(xiàn)其執(zhí)行任務(wù)方法:
type Task struct {
index int
nums []int
sum int
wg *sync.WaitGroup
}
func (t *Task) Do() {
for _, num := range t.nums {
t.sum += num
}
t.wg.Done()
}
很簡單义矛,就是將一個切片中的所有整數(shù)相加。
然后我們創(chuàng)建 goroutine 池盟萨,注意池使用完后需要手動關(guān)閉,這里使用defer
關(guān)閉:
p, _ := ants.NewPoolWithFunc(10, taskFunc)
defer p.Release()
func taskFunc(data interface{}) {
task := data.(*Task)
task.Do()
fmt.Printf("task:%d sum:%d\n", task.index, task.sum)
}
上面調(diào)用了ants.NewPoolWithFunc()
創(chuàng)建了一個 goroutine 池了讨。第一個參數(shù)是池容量捻激,即池中最多有 10 個 goroutine制轰。第二個參數(shù)為每次執(zhí)行任務(wù)的函數(shù)。當我們調(diào)用p.Invoke(data)
的時候胞谭,ants
池會在其管理的 goroutine 中找出一個空閑的垃杖,讓它執(zhí)行函數(shù)taskFunc
,并將data
作為參數(shù)丈屹。
接著调俘,我們模擬數(shù)據(jù),做數(shù)據(jù)切分旺垒,生成任務(wù)彩库,交給 ants
處理:
const (
DataSize = 10000
DataPerTask = 100
)
nums := make([]int, DataSize, DataSize)
for i := range nums {
nums[i] = rand.Intn(1000)
}
var wg sync.WaitGroup
wg.Add(DataSize / DataPerTask)
tasks := make([]*Task, 0, DataSize/DataPerTask)
for i := 0; i < DataSize/DataPerTask; i++ {
task := &Task{
index: i + 1,
nums: nums[i*DataPerTask : (i+1)*DataPerTask],
wg: &wg,
}
tasks = append(tasks, task)
p.Invoke(task)
}
wg.Wait()
fmt.Printf("running goroutines: %d\n", ants.Running())
隨機生成 10000 個整數(shù),將這些整數(shù)分為 100 份先蒋,每份 100 個骇钦,生成Task
結(jié)構(gòu),調(diào)用p.Invoke(task)
處理竞漾。wg.Wait()
等待處理完成眯搭,然后輸出ants
正在運行的 goroutine 數(shù)量,這時應該是 0业岁。
最后我們將結(jié)果匯總鳞仙,并驗證一下結(jié)果,與直接相加得到的結(jié)果做一個比較:
var sum int
for _, task := range tasks {
sum += task.sum
}
var expect int
for _, num := range nums {
expect += num
}
fmt.Printf("finish all tasks, result is %d expect:%d\n", sum, expect)
運行:
$ go run main.go
...
task:96 sum:53275
task:88 sum:50090
task:62 sum:57114
task:45 sum:48041
task:82 sum:45269
running goroutines: 0
finish all tasks, result is 5010172 expect:5010172
確實笔时,任務(wù)完成之后棍好,正在運行的 goroutine 數(shù)量變?yōu)?0。而且我們驗證了糊闽,結(jié)果沒有偏差梳玫。另外需要注意,goroutine 池中任務(wù)的執(zhí)行順序是隨機的右犹,與提交任務(wù)的先后沒有關(guān)系提澎。由上面運行打印的任務(wù)標識我們也能發(fā)現(xiàn)這一點。
函數(shù)作為任務(wù)
ants
支持將一個不接受任何參數(shù)的函數(shù)作為任務(wù)提交給 goroutine 運行念链。由于不接受參數(shù)盼忌,我們提交的函數(shù)要么不需要外部數(shù)據(jù),只需要處理自身邏輯掂墓,否則就必須用某種方式將需要的數(shù)據(jù)傳遞進去谦纱,例如閉包。
提交函數(shù)作為任務(wù)的 goroutine 池使用ants.NewPool()
創(chuàng)建君编,它只接受一個參數(shù)表示池子的容量跨嘉。調(diào)用池子對象的Submit()
方法來提交任務(wù),將一個不接受任何參數(shù)的函數(shù)傳入吃嘿。
最開始的例子可以改寫一下祠乃。增加一個任務(wù)包裝函數(shù)梦重,將任務(wù)需要的參數(shù)作為包裝函數(shù)的參數(shù)。包裝函數(shù)返回實際的任務(wù)函數(shù)亮瓷,該任務(wù)函數(shù)就可以通過閉包訪問它需要的數(shù)據(jù)了:
type taskFunc func()
func taskFuncWrapper(nums []int, i int, sum *int, wg *sync.WaitGroup) taskFunc {
return func() {
for _, num := range nums[i*DataPerTask : (i+1)*DataPerTask] {
*sum += num
}
fmt.Printf("task:%d sum:%d\n", i+1, *sum)
wg.Done()
}
}
調(diào)用ants.NewPool(10)
創(chuàng)建 goroutine 池琴拧,同樣池子用完需要釋放,這里使用defer
:
p, _ := ants.NewPool(10)
defer p.Release()
生成模擬數(shù)據(jù)嘱支,切分任務(wù)蚓胸。提交任務(wù)給ants
池執(zhí)行,這里使用taskFuncWrapper()
包裝函數(shù)生成具體的任務(wù)除师,然后調(diào)用p.Submit()
提交:
nums := make([]int, DataSize, DataSize)
for i := range nums {
nums[i] = rand.Intn(1000)
}
var wg sync.WaitGroup
wg.Add(DataSize / DataPerTask)
partSums := make([]int, DataSize/DataPerTask, DataSize/DataPerTask)
for i := 0; i < DataSize/DataPerTask; i++ {
p.Submit(taskFuncWrapper(nums, i, &partSums[i], &wg))
}
wg.Wait()
匯總結(jié)果沛膳,驗證:
var sum int
for _, partSum := range partSums {
sum += partSum
}
var expect int
for _, num := range nums {
expect += num
}
fmt.Printf("running goroutines: %d\n", ants.Running())
fmt.Printf("finish all tasks, result is %d expect is %d\n", sum, expect)
這個程序的功能與最開始的完全相同。
執(zhí)行流程
GitHub 倉庫中有個執(zhí)行流程圖馍盟,我重新繪制了一下:
[圖片上傳失敗...(image-3b5c14-1623109126905)]
執(zhí)行流程如下:
- 初始化 goroutine 池于置;
- 提交任務(wù)給 goroutine 池,檢查是否有空閑的 goroutine:
- 有贞岭,獲取空閑 goroutine
- 無八毯,檢查池中的 goroutine 數(shù)量是否已到池容量上限:
- 已到上限,檢查 goroutine 池是否是非阻塞的:
- 非阻塞瞄桨,直接返回
nil
表示執(zhí)行失敗 - 阻塞话速,等待 goroutine 空閑
- 非阻塞瞄桨,直接返回
- 未到上限,創(chuàng)建一個新的 goroutine 處理任務(wù)
- 已到上限,檢查 goroutine 池是否是非阻塞的:
- 任務(wù)處理完成芯侥,將 goroutine 交還給池泊交,以待處理下一個任務(wù)
選項
ants
提供了一些選項可以定制 goroutine 池的行為。選項使用Options
結(jié)構(gòu)定義:
// src/github.com/panjf2000/ants/options.go
type Options struct {
ExpiryDuration time.Duration
PreAlloc bool
MaxBlockingTasks int
Nonblocking bool
PanicHandler func(interface{})
Logger Logger
}
各個選項含義如下:
-
ExpiryDuration
:過期時間柱查。表示 goroutine 空閑多長時間之后會被ants
池回收 -
PreAlloc
:預分配廓俭。調(diào)用NewPool()/NewPoolWithFunc()
之后預分配worker
(管理一個工作 goroutine 的結(jié)構(gòu)體)切片。而且使用預分配與否會直接影響池中管理worker
的結(jié)構(gòu)唉工。見下面源碼 -
MaxBlockingTasks
:最大阻塞任務(wù)數(shù)量研乒。即池中 goroutine 數(shù)量已到池容量,且所有 goroutine 都處理繁忙狀態(tài)淋硝,這時到來的任務(wù)會在阻塞列表等待雹熬。這個選項設(shè)置的是列表的最大長度。阻塞的任務(wù)數(shù)量達到這個值后谣膳,后續(xù)任務(wù)提交直接返回失敗 -
Nonblocking
:池是否阻塞竿报,默認阻塞。提交任務(wù)時继谚,如果ants
池中 goroutine 已到上限且全部繁忙烈菌,阻塞的池會將任務(wù)添加的阻塞列表等待(當然受限于阻塞列表長度,見上一個選項)。非阻塞的池直接返回失敗 -
PanicHandler
:panic 處理僧界。遇到 panic 會調(diào)用這里設(shè)置的處理函數(shù) -
Logger
:指定日志記錄器
NewPool()
部分源碼:
if p.options.PreAlloc {
if size == -1 {
return nil, ErrInvalidPreAllocSize
}
p.workers = newWorkerArray(loopQueueType, size)
} else {
p.workers = newWorkerArray(stackType, 0)
}
使用預分配時侨嘀,創(chuàng)建loopQueueType
類型的結(jié)構(gòu),反之創(chuàng)建stackType
類型捂襟。這是ants
定義的兩種管理worker
的數(shù)據(jù)結(jié)構(gòu)。
ants
定義了一些With*
函數(shù)來設(shè)置這些選項:
func WithOptions(options Options) Option {
return func(opts *Options) {
*opts = options
}
}
func WithExpiryDuration(expiryDuration time.Duration) Option {
return func(opts *Options) {
opts.ExpiryDuration = expiryDuration
}
}
func WithPreAlloc(preAlloc bool) Option {
return func(opts *Options) {
opts.PreAlloc = preAlloc
}
}
func WithMaxBlockingTasks(maxBlockingTasks int) Option {
return func(opts *Options) {
opts.MaxBlockingTasks = maxBlockingTasks
}
}
func WithNonblocking(nonblocking bool) Option {
return func(opts *Options) {
opts.Nonblocking = nonblocking
}
}
func WithPanicHandler(panicHandler func(interface{})) Option {
return func(opts *Options) {
opts.PanicHandler = panicHandler
}
}
func WithLogger(logger Logger) Option {
return func(opts *Options) {
opts.Logger = logger
}
}
這里使用了 Go 語言中非常常見的一種模式欢峰,我稱之為選項模式葬荷,非常方便地構(gòu)造有大量參數(shù),且大部分有默認值或一般不需要顯式設(shè)置的對象纽帖。
我們來驗證幾個選項宠漩。
最大等待隊列長度
ants
池設(shè)置容量之后,如果所有的 goroutine 都在處理任務(wù)懊直。這時提交的任務(wù)默認會進入等待隊列扒吁,WithMaxBlockingTasks(maxBlockingTasks int)
可以設(shè)置等待隊列的最大長度。超過這個長度室囊,提交任務(wù)直接返回錯誤:
func wrapper(i int, wg *sync.WaitGroup) func() {
return func() {
fmt.Printf("hello from task:%d\n", i)
time.Sleep(1 * time.Second)
wg.Done()
}
}
func main() {
p, _ := ants.NewPool(4, ants.WithMaxBlockingTasks(2))
defer p.Release()
var wg sync.WaitGroup
wg.Add(8)
for i := 1; i <= 8; i++ {
go func(i int) {
err := p.Submit(wrapper(i, &wg))
if err != nil {
fmt.Printf("task:%d err:%v\n", i, err)
wg.Done()
}
}(i)
}
wg.Wait()
}
上面代碼中雕崩,我們設(shè)置 goroutine 池的容量為 4,最大阻塞隊列長度為 2融撞。然后一個 for 提交 8 個任務(wù)盼铁,期望結(jié)果是:4 個任務(wù)在執(zhí)行,2 個任務(wù)在等待尝偎,2 個任務(wù)提交失敗饶火。運行結(jié)果:
hello from task:8
hello from task:5
hello from task:4
hello from task:6
task:7 err:too many goroutines blocked on submit or Nonblocking is set
task:3 err:too many goroutines blocked on submit or Nonblocking is set
hello from task:1
hello from task:2
我們看到提交任務(wù)失敗,打印too many goroutines blocked ...
致扯。
代碼中有 4 點需要注意:
- 提交任務(wù)必須并行進行肤寝。如果是串行提交,第 5 個任務(wù)提交時由于池中沒有空閑的 goroutine 處理該任務(wù)抖僵,
Submit()
方法會被阻塞鲤看,后續(xù)任務(wù)就都不能提交了。也就達不到驗證的目的了 - 由于任務(wù)可能提交失敗裆针,失敗的任務(wù)不會實際執(zhí)行刨摩,所以實際上
wg.Done()
次數(shù)會小于 8。因而在err != nil
分支中我們需要調(diào)用一次wg.Done()
世吨。否則wg.Wait()
會永遠阻塞 - 為了避免任務(wù)執(zhí)行過快澡刹,空出了 goroutine,觀察不到現(xiàn)象耘婚,每個任務(wù)中我使用
time.Sleep(1 * time.Second)
休眠 1s - 由于 goroutine 之間的執(zhí)行順序未顯式同步罢浇,故每次執(zhí)行的順序不確定
由于簡單起見,前面的例子中Submit()
方法的返回值都被我們忽略了。實際開發(fā)中一定不要忽略嚷闭。
非阻塞
ants
池默認是阻塞的攒岛,我們可以使用WithNonblocking(nonblocking bool)
設(shè)置其為非阻塞。非阻塞的ants
池中胞锰,在所有 goroutine 都在處理任務(wù)時灾锯,提交新任務(wù)會直接返回錯誤:
func main() {
p, _ := ants.NewPool(2, ants.WithNonblocking(true))
defer p.Release()
var wg sync.WaitGroup
wg.Add(3)
for i := 1; i <= 3; i++ {
err := p.Submit(wrapper(i, &wg))
if err != nil {
fmt.Printf("task:%d err:%v\n", i, err)
wg.Done()
}
}
wg.Wait()
}
使用上個例子中的wrapper()
函數(shù),ants
池容量設(shè)置為 2嗅榕。連續(xù)提交 3 個任務(wù)顺饮,期望結(jié)果前兩個任務(wù)正常執(zhí)行,第 3 個任務(wù)提交時返回錯誤:
hello from task:2
task:3 err:too many goroutines blocked on submit or Nonblocking is set
hello from task:1
panic 處理器
一個魯棒性強的庫一定不會忽視錯誤的處理凌那,特別是宕機相關(guān)的錯誤兼雄。在 Go 語言中就是 panic,也被稱為運行時恐慌帽蝶,在程序運行的過程中產(chǎn)生的嚴重性錯誤赦肋,例如索引越界,空指針解引用等励稳,都會觸發(fā) panic佃乘。如果不處理 panic,程序會直接意外退出麦锯,可能造成數(shù)據(jù)丟失的嚴重后果恕稠。
ants
中如果 goroutine 在執(zhí)行任務(wù)時發(fā)生panic
,會終止當前任務(wù)的執(zhí)行扶欣,將發(fā)生錯誤的堆棧輸出到os.Stderr
鹅巍。注意,該 goroutine 還是會被放回池中料祠,下次可以取出執(zhí)行新的任務(wù)骆捧。
func wrapper(i int, wg *sync.WaitGroup) func() {
return func() {
fmt.Printf("hello from task:%d\n", i)
if i%2 == 0 {
panic(fmt.Sprintf("panic from task:%d", i))
}
wg.Done()
}
}
func main() {
p, _ := ants.NewPool(2)
defer p.Release()
var wg sync.WaitGroup
wg.Add(3)
for i := 1; i <= 2; i++ {
p.Submit(wrapper(i, &wg))
}
time.Sleep(1 * time.Second)
p.Submit(wrapper(3, &wg))
p.Submit(wrapper(5, &wg))
wg.Wait()
}
我們讓偶數(shù)個任務(wù)觸發(fā)panic
。提交兩個任務(wù)髓绽,第二個任務(wù)一定會觸發(fā)panic
敛苇。觸發(fā)panic
之后,我們還可以繼續(xù)提交任務(wù) 3顺呕、5枫攀。注意這里沒有 4,提交任務(wù) 4 還是會觸發(fā)panic
株茶。
上面的程序需要注意 2 點:
-
任務(wù)函數(shù)中
wg.Done()
是在panic
方法之后来涨,如果觸發(fā)了panic
,函數(shù)中的其他正常邏輯就不會再繼續(xù)執(zhí)行了启盛。所以我們雖然wg.Add(3)
蹦掐,但是一共提交了 4 個任務(wù)技羔,其中一個任務(wù)觸發(fā)了panic
,wg.Done()
沒有正確執(zhí)行卧抗。實際開發(fā)中藤滥,我們一般使用defer
語句來確保wg.Done()
一定會執(zhí)行 -
在 for 循環(huán)之后,我添加了一行代碼
time.Sleep(1 * time.Second)
社裆。如果沒有這一行拙绊,后續(xù)的兩條Submit()
方法可以直接執(zhí)行,可能會導致任務(wù)很快就完成了浦马,wg.Wait()
直接返回了时呀,這時panic
的堆棧還沒有輸出。你可以嘗試注釋掉這行代碼運行看看結(jié)果
除了ants
提供的默認 panic 處理器晶默,我們還可以使用WithPanicHandler(paincHandler func(interface{}))
指定我們自己編寫的 panic 處理器。處理器的參數(shù)就是傳給panic
的值:
func panicHandler(err interface{}) {
fmt.Fprintln(os.Stderr, err)
}
p, _ := ants.NewPool(2, ants.WithPanicHandler(panicHandler))
defer p.Release()
其余代碼與上面的完全相同航攒,指定了panicHandler
后觸發(fā)panic
就會執(zhí)行它磺陡。運行:
hello from task:2
panic from task:2
hello from task:1
hello from task:5
hello from task:3
看到輸出了傳給panic
函數(shù)的字符串(第二行輸出)。
默認池
為了方便使用漠畜,很多 Go 庫都喜歡提供其核心功能類型的一個默認實現(xiàn)币他。可以直接通過庫提供的接口調(diào)用憔狞。例如net/http
蝴悉,例如ants
。ants
庫中定義了一個默認的池瘾敢,默認容量為MaxInt32
拍冠。goroutine 池的各個方法都可以直接通過ants
包直接訪問:
// src/github.com/panjf2000/ants/ants.go
defaultAntsPool, _ = NewPool(DefaultAntsPoolSize)
func Submit(task func()) error {
return defaultAntsPool.Submit(task)
}
func Running() int {
return defaultAntsPool.Running()
}
func Cap() int {
return defaultAntsPool.Cap()
}
func Free() int {
return defaultAntsPool.Free()
}
func Release() {
defaultAntsPool.Release()
}
func Reboot() {
defaultAntsPool.Reboot()
}
直接使用:
func main() {
defer ants.Release()
var wg sync.WaitGroup
wg.Add(2)
for i := 1; i <= 2; i++ {
ants.Submit(wrapper(i, &wg))
}
wg.Wait()
}
默認池也需要Release()
。
總結(jié)
本文介紹了 goroutine 池的由來簇抵,并借由ants
庫介紹了基本的使用方法庆杜,和一些細節(jié)。ants
源碼不多碟摆,去掉測試的核心代碼只有 1k 行左右晃财,建議有時間、感興趣的童鞋深入閱讀典蜕。
大家如果發(fā)現(xiàn)好玩断盛、好用的 Go 語言庫,歡迎到 Go 每日一庫 GitHub 上提交 issue??
參考
- ants GitHub:github.com/panjf2000/ants
- Go 每日一庫 GitHub:https://github.com/darjun/go-daily-lib
我
歡迎關(guān)注我的微信公眾號【GoUpUp】愉舔,共同學習钢猛,一起進步~