singleflight是什么
singleflight是Go官方擴(kuò)展同步包(golang.org/x/sync/singleflight)的一個(gè)庫(kù)慈省,主要用于并發(fā)控制。針對(duì)同一個(gè)key的多個(gè)請(qǐng)求,只需要處理一個(gè)市殷,其余請(qǐng)求等待結(jié)果纫事,以此抑制對(duì)下游的重復(fù)請(qǐng)求。
為什么需要singleflight
對(duì)于讀請(qǐng)求量較大的后臺(tái)服務(wù)览露,為降低存儲(chǔ)層的壓力荧琼,一般會(huì)實(shí)現(xiàn)緩存層。服務(wù)器在收到請(qǐng)求后差牛,首先從緩存獲取數(shù)據(jù)命锄,若緩存未命中才會(huì)查詢存儲(chǔ)層。
若服務(wù)器在短時(shí)間內(nèi)收到大量未命中緩存層的重復(fù)請(qǐng)求偏化,這些請(qǐng)求會(huì)全部查詢存儲(chǔ)層脐恩,給存儲(chǔ)層帶來(lái)較大的壓力,甚至有高負(fù)載的可能侦讨。
singleflight會(huì)對(duì)請(qǐng)求進(jìn)行合并驶冒,相同key的請(qǐng)求只訪問(wèn)一次存儲(chǔ)層,大大減少了對(duì)存儲(chǔ)層的壓力韵卤。
如何使用singleflight
三個(gè)方法
singleflight對(duì)外提供了3個(gè)方法:
- Do:在對(duì)同一個(gè)key多次調(diào)用時(shí)骗污,若第一次的調(diào)用沒(méi)有完成,只會(huì)執(zhí)行一次fn()沈条,其他調(diào)用會(huì)阻塞并等待首次調(diào)用返回需忿。調(diào)用Do函數(shù)需要傳入2個(gè)參數(shù),key用于標(biāo)識(shí)請(qǐng)求蜡歹,重復(fù)請(qǐng)求的key是相同的屋厘;fn()為調(diào)用者需要實(shí)現(xiàn)的業(yè)務(wù)邏輯。返回值有3個(gè)月而,v和err為fn()的返回值擅这,shared表示返回結(jié)果是否是共享的。
- DoChan:作用和Do類似景鼠, 只不過(guò)返回channel仲翎,其中Result結(jié)構(gòu)體由Val、Err和Shared組成铛漓。和Do相比溯香,就是同步和異步的區(qū)別。
- Forget:通知Group刪除傳入的key浓恶,這樣后續(xù)調(diào)用此key時(shí)玫坛,請(qǐng)求不會(huì)阻塞。
func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool)
func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result
func (g *Group) Forget(key string)
使用示例
下述demo模擬1000個(gè)請(qǐng)求同時(shí)獲取相同的數(shù)據(jù)包晰,即key的值相同湿镀。getData()抽象為訪問(wèn)存儲(chǔ)層的函數(shù)炕吸,函數(shù)內(nèi)部的Sleep用于模擬訪問(wèn)耗時(shí);count記錄調(diào)用getData()函數(shù)的次數(shù)勉痴。
當(dāng)沒(méi)有使用singleflight時(shí)赫模,輸出結(jié)果為total num is 1000,表示每個(gè)請(qǐng)求都調(diào)用了getData()函數(shù)蒸矛;當(dāng)使用singleflight時(shí)瀑罗,輸出結(jié)果為total num is 1,表示只有1個(gè)請(qǐng)求雏掠。
var count int32
func main() {
total := 1000
sg := &singleflight.Group{}
var wg sync.WaitGroup
wg.Add(total)
key := "key"
for i := 0; i < total; i++ {
go func() {
defer wg.Done()
sg.Do(key, func() (interface{}, error) {
res, err := getData(key)
return res, err
})
// getData(key)
}()
}
wg.Wait()
fmt.Printf("total num is %v\n", count)
}
func getData(key string) (interface{}, error) {
atomic.AddInt32(&count, 1)
time.Sleep(10 * time.Millisecond)
return "result", nil
}
源碼分析
本文基于https://pkg.go.dev/golang.org/x/sync@v0.0.0-20210220032951-036812b2e83c/singleflight進(jìn)行分析斩祭。
Group結(jié)構(gòu)體
type Group struct {
mu sync.Mutex // protects m
m map[string]*call // lazily initialized
}
Group結(jié)構(gòu)體由互斥鎖和map組成,互斥鎖用于保證map的并發(fā)安全乡话;map的key為調(diào)用Do方法傳入的key摧玫,call保存了當(dāng)前調(diào)用對(duì)應(yīng)的信息。
call結(jié)構(gòu)體
type call struct {
wg sync.WaitGroup
val interface{}
err error
forgotten bool
dups int
chans []chan<- Result
}
val和err是調(diào)用fn()函數(shù)的返回值绑青;forgetten用于表示Forget()函數(shù)是否被調(diào)用诬像;dups用于統(tǒng)計(jì)調(diào)用次數(shù);chans是調(diào)用DoChan()函數(shù)時(shí)返回的channel时迫。
Do函數(shù)
func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) {
g.mu.Lock()
// map懶加載
if g.m == nil {
g.m = make(map[string]*call)
}
// 若key已經(jīng)存在颅停,則阻塞并等待wg執(zhí)行完畢。當(dāng)wg執(zhí)行完畢時(shí)掠拳,所有的wait都會(huì)被喚醒癞揉。
if c, ok := g.m[key]; ok {
c.dups++
g.mu.Unlock()
c.wg.Wait()
if e, ok := c.err.(*panicError); ok {
panic(e)
} else if c.err == errGoexit {
runtime.Goexit()
}
return c.val, c.err, true
}
c := new(call)
// 首次調(diào)用Do會(huì)Add 1,后續(xù)調(diào)用都會(huì)阻塞于wg.Wait()
c.wg.Add(1)
g.m[key] = c
g.mu.Unlock()
// 執(zhí)行業(yè)務(wù)邏輯
g.doCall(c, key, fn)
return c.val, c.err, c.dups > 0
}
Do函數(shù)實(shí)現(xiàn)的主要邏輯為溺欧,若傳入的key已經(jīng)在map中喊熟,則阻塞于wg.Wait();若不在map中姐刁,則調(diào)用wg.Add(1)并執(zhí)行業(yè)務(wù)邏輯芥牌。也就是說(shuō),同一個(gè)key的多個(gè)請(qǐng)求聂使,只有首個(gè)請(qǐng)求會(huì)調(diào)用wg.Add(1)壁拉,其余請(qǐng)求都會(huì)調(diào)用wg.Wait()并阻塞于此處。
對(duì)于阻塞在wg.Wait()的請(qǐng)求柏靶,在返回結(jié)果前弃理,還區(qū)分了panic錯(cuò)誤和runtime.Goexit()錯(cuò)誤,這部分邏輯是在這個(gè)版本補(bǔ)充的屎蜓,v0.0.0-20190423版本還沒(méi)有這個(gè)邏輯痘昌。后續(xù)會(huì)介紹為什么需要這個(gè)邏輯。
doCall函數(shù)
doCall()函數(shù)的實(shí)現(xiàn)看上去較為復(fù)雜,而且大部分邏輯是在處理異常辆苔。為了更好地理解為什么需要處理這些異常算灸,先介紹一下v0.0.0-20190423版本的doCall函數(shù)的實(shí)現(xiàn)。
v0.0.0-20190423版本的實(shí)現(xiàn)比較簡(jiǎn)單驻啤,執(zhí)行fn()函數(shù)并調(diào)用wg.Done()菲驴,最后從map中刪除key。
// v0.0.0-20190423版本的doCall
func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) {
c.val, c.err = fn()
c.wg.Done()
g.mu.Lock()
if !c.forgotten {
delete(g.m, key)
}
for _, ch := range c.chans {
ch <- Result{c.val, c.err, c.dups > 0}
}
g.mu.Unlock()
}
此版本存在如下問(wèn)題:若fn()函數(shù)內(nèi)部出現(xiàn)panic街佑,則當(dāng)前goroutine會(huì)立即停止執(zhí)行谢翎,c.wg.Done()無(wú)法被調(diào)用捍靠,且key一直在map中無(wú)法被刪除沐旨,導(dǎo)致相同key的其他請(qǐng)求全部阻塞于c.wg.Wait()。若業(yè)務(wù)邏輯在fn()外部有調(diào)用recover()榨婆,雖然程序不會(huì)直接panic磁携,但最終可能會(huì)因?yàn)樗梨i而發(fā)生錯(cuò)誤。
例如良风,啟動(dòng)2個(gè)協(xié)程調(diào)用group.Do函數(shù)谊迄,請(qǐng)求的key均為"same key"。fn()內(nèi)部會(huì)panic烟央,外部有recover()统诺,因此該panic可以被recover()捕獲。
func main() {
var wg sync.WaitGroup
// singleflight的版本為v0.0.0-20190423024810-112230192c58
group := &singleflight.Group{}
wg.Add(2)
go func() {
DoIt(&wg, group, 1)
}()
go func() {
DoIt(&wg, group, 2)
}()
wg.Wait()
}
func DoIt(wg *sync.WaitGroup, group *singleflight.Group, count int32) {
fmt.Printf("enter DoIt, count is %d\n", count)
defer wg.Done()
defer func() {
if rec := recover(); rec != nil {
//Recoverd panic
fmt.Printf("rec is %d,%v\n", count, rec)
}
}()
key := "same key"
value, err, shared := group.Do(key, func() (_ interface{}, err error) {
fmt.Printf("enter group.Do, count is %d\n", count)
time.Sleep(1000 * time.Millisecond)
panic("panic in singleflight")
})
fmt.Printf("count: %v, value: %v, err: %v, shared: %v\n", count, value, err, shared)
}
執(zhí)行后疑俭,得到如下結(jié)果:
enter DoIt, count is 2
enter group.Do, count is 2
enter DoIt, count is 1
rec is 2,panic in singleflight
fatal error: all goroutines are asleep - deadlock!
從輸出結(jié)果可以看出粮呢,DoIt函數(shù)被調(diào)用了2次,count為2的請(qǐng)求進(jìn)入了group.Do函數(shù)钞艇,隨后發(fā)生了panic啄寡,并被recover住。count為1的請(qǐng)求阻塞于c.wg.Wait()函數(shù)哩照,主協(xié)程也阻塞于自身的wg.Wait()函數(shù)挺物,隨后進(jìn)程因?yàn)榘l(fā)生死鎖而退出。
新版本修復(fù)了此問(wèn)題飘弧,將c.wg.Done()放入defer中執(zhí)行识藤,這樣即使fn()中出現(xiàn)panic,c.wg.Done()也會(huì)被調(diào)用次伶。此外痴昧,新版本還區(qū)分了panic錯(cuò)誤和runtime.Goexit(),主要邏輯如下所示:
// 新版本的doCall
func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) {
normalReturn := false
recovered := false
defer func() {
// the given function invoked runtime.Goexit
if !normalReturn && !recovered {
c.err = errGoexit
}
c.wg.Done()
// 根據(jù)err類型執(zhí)行對(duì)應(yīng)的邏輯
// ……
}()
func() {
defer func() {
if !normalReturn {
// 若出現(xiàn)panic学少,則返回值不為nil
if r := recover(); r != nil {
c.err = newPanicError(r)
}
}
}()
c.val, c.err = fn()
normalReturn = true
}()
if !normalReturn {
// 若被賦值剪个,說(shuō)明fn()內(nèi)部出現(xiàn)了panic,且panic被捕獲
recovered = true
}
}
新引入的2個(gè)變量normalReturn和recovered用于判斷fn()內(nèi)部是出現(xiàn)了panic還是調(diào)用了runtime.Goexit()。
若fn()內(nèi)部出現(xiàn)panic扣囊,當(dāng)前goroutine會(huì)停止運(yùn)行乎折,并執(zhí)行defer語(yǔ)句,且recover()的返回值不為nil侵歇。由于recover()捕獲了panic錯(cuò)誤骂澄,邏輯會(huì)繼續(xù)向下執(zhí)行,那么recovered會(huì)被賦值為true惕虑。因此坟冲,當(dāng)fn()內(nèi)部出現(xiàn)panic時(shí),normalReturn為false溃蔫,recovered為true健提。
若fn()內(nèi)部調(diào)用runtime.Goexit(),當(dāng)前goroutine會(huì)停止運(yùn)行伟叛,并執(zhí)行defer()語(yǔ)句私痹,且recover()的返回值為nil,并且不會(huì)繼續(xù)執(zhí)行后續(xù)邏輯统刮,因此recovered為false紊遵。綜上,fn()內(nèi)部調(diào)用runtime.Goexit()時(shí)侥蒙,normalReturn為false暗膜,recovered為false。
區(qū)分這兩類場(chǎng)景是為了讓調(diào)用者感知調(diào)用結(jié)果鞭衩。對(duì)于panic錯(cuò)誤学搜,因?yàn)間roup內(nèi)部捕獲了panic,所以需要重新拋出panic醋旦,這樣業(yè)務(wù)側(cè)才能知道fn()內(nèi)部出現(xiàn)了異常恒水;對(duì)于runtime.Goexit(),這是業(yè)務(wù)側(cè)主動(dòng)執(zhí)行的結(jié)果饲齐,因此不需要額外處理钉凌。
Do函數(shù)為什么需要處理異常
之前提到新版本的Do()函數(shù)在調(diào)用c.wg.Wait()后和return之前,補(bǔ)充了對(duì)錯(cuò)誤類型的判斷捂人。這是因?yàn)橄嗤琸ey的請(qǐng)求需要有相同的處理結(jié)果御雕。若第一個(gè)請(qǐng)求出現(xiàn)了panic,則后續(xù)請(qǐng)求也應(yīng)當(dāng)panic滥搭;若第一個(gè)請(qǐng)求內(nèi)部調(diào)用了runtime.Goexit()酸纲,則后續(xù)請(qǐng)求也需要調(diào)用runtime.Goexit()。