某徒弟每日工作就是把數(shù)據(jù)庫里上十萬條數(shù)據(jù)取出來進行一些操作(更新字段救赐、檢查鏈接狀態(tài)等),把 Go 當 PHP 寫,一個 for 循環(huán)怯屉,一兩個小時過去了才能出結(jié)果(可能他就是想這么摸魚吧)易阳。他說并發(fā)編程容易寫錯附较,需求又急:),幸好我之前寫過一點潦俺,整一個 demo 給他參考一下拒课。
WaitGroup 和 數(shù)據(jù)庫分頁
package main
import (
"fmt"
"sync"
"time"
)
func main() {
start := time.Now()
var wg sync.WaitGroup
// 總條數(shù)徐勃,一般從數(shù)據(jù)庫 COUNT 出來
count := 8823
// 每頁處理的條數(shù)
pageSize := 1000
// 總頁數(shù) 向上取整
page := (count + pageSize - 1) / pageSize
// 每頁開一個 goroutine
for i := 0; i < page; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
// 計算當前頁的偏移量
offset := i * pageSize
// demo 拼接 Sql 然后 按ID順序 查出數(shù)據(jù)遍歷處理 記錄日志方便知道清洗的位置
fmt.Println("sql 里的 limit ", offset, ",", pageSize)
}(i)
}
wg.Wait()
end := time.Since(start)
fmt.Println("總共花了", end)
}
數(shù)據(jù)量更大,不能直接載入機器內(nèi)存
package main
import (
"fmt"
"sync"
"time"
)
func main() {
start := time.Now()
// 大量數(shù)據(jù)一次開多個 goroutine 全部取出數(shù)據(jù)放到機器內(nèi)存里早像,數(shù)據(jù)庫和內(nèi)存都可能會崩
// 采用同步加異步的方式處理
// 外部循環(huán)同步僻肖,處理完再開始下一輪 內(nèi)部循環(huán)并發(fā)執(zhí)行
// 總條數(shù)
count := 121231
// 外部循環(huán)每次處理的條數(shù),考慮機器內(nèi)存可以適當調(diào)整
pageOutSize := 10000
// 外部循環(huán)次數(shù)
pageOut := (count + pageOutSize - 1) / pageOutSize
for i := 0; i < pageOut; i++ {
// 內(nèi)循環(huán)
var wg sync.WaitGroup
// 內(nèi)循環(huán)每頁處理的條數(shù)
pageInnerSize := 1000
// 內(nèi)部循環(huán)需要處理的總條數(shù)
innerCount := pageOutSize
if i == pageOut-1 {
// 最后一頁了 只需要處理剩下的條數(shù)即可
innerCount = count - pageOutSize*(pageOut-1)
}
// 內(nèi)循環(huán)的總頁數(shù)卢鹦,每頁開啟一個 goroutine
pageInner := (innerCount + pageInnerSize - 1) / pageInnerSize
for j := 0; j < pageInner; j++ {
wg.Add(1)
go func(j int) {
defer wg.Done()
// 計算偏移量臀脏,需要考慮外部循環(huán)的輪次
offset := i*pageOutSize + j*pageInnerSize
// demo 拼接 Sql 然后 按ID順序 查出數(shù)據(jù)遍歷處理 記錄日志方便知道清洗的位置
fmt.Println("sql 里的 limit ", offset, ",", pageInnerSize)
}(j)
}
wg.Wait()
}
end := time.Since(start)
fmt.Println("總共花了", end)
}
如果遇到錯誤需要終止執(zhí)行,可以考慮將 WaitGroup 換成 errgroup冀自,看具體需求揉稚。不足之處歡迎留言指正:)