[toc]
- Posted by 微博@Yangsc_o
- 原創(chuàng)文章菱魔,版權(quán)聲明:自由轉(zhuǎn)載-非商用-非衍生-保持署名 | Creative Commons BY-NC-ND 3.0
前言
最近從java轉(zhuǎn)到go,來(lái)公司第一個(gè)開(kāi)發(fā)工作就是對(duì)一個(gè)資源請(qǐng)求去重復(fù)桩卵,最終發(fā)現(xiàn)這個(gè)singleflight這個(gè)好東西,分享一下产禾。
singleflight使用場(chǎng)景
- 緩存擊穿:緩存在某個(gè)時(shí)間點(diǎn)過(guò)期的時(shí)候衫生,恰好在這個(gè)時(shí)間點(diǎn)對(duì)這個(gè)Key有大量的并發(fā)請(qǐng)求過(guò)來(lái),這些請(qǐng)求發(fā)現(xiàn)緩存過(guò)期一般都會(huì)從后端DB加載數(shù)據(jù)并回設(shè)到緩存斯撮,這個(gè)時(shí)候大并發(fā)的請(qǐng)求可能會(huì)瞬間把后端DB壓垮。
- 絕大多數(shù)公司都是這么用的
- 請(qǐng)求資源去重復(fù)
- 我們的用法宗挥,需要改動(dòng)一行代碼透敌。
singleflight 簡(jiǎn)介
singleflight
在 golang.org/x/sync/singleflight
項(xiàng)目下话瞧,對(duì)外提供了以下幾個(gè)方法
//Do方法,傳入key,以及回調(diào)函數(shù)桨仿,如果key相同拉庵,fn方法只會(huì)執(zhí)行一次,同步等待
//返回值v:表示fn執(zhí)行結(jié)果
//返回值err:表示fn的返回的err
//返回值shared:表示是否是真實(shí)fn返回的還是從保存的map[key]返回的,也就是共享的
func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) {
//DoChan方法類(lèi)似Do方法警没,只是返回的是一個(gè)chan
func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result {
//設(shè)計(jì)Forget 控制key關(guān)聯(lián)的值是否失效,默認(rèn)以上兩個(gè)方法只要fn方法執(zhí)行完成后大州,內(nèi)部維護(hù)的fn的值也刪除(即并發(fā)結(jié)束后就失效了)
func (g *Group) Forget(key string)
singleflight的使用
從singleflight的test探尋最簡(jiǎn)單用法
func TestDo(t *testing.T) {
var g Group
// key 可以理解資源的id
v, err, _ := g.Do("key", func() (interface{}, error) {
// do what you want
return "bar", nil
})
if got, want := fmt.Sprintf("%v (%T)", v, v), "bar (string)"; got != want {
t.Errorf("Do = %v; want %v", got, want)
}
if err != nil {
t.Errorf("Do error = %v", err)
}
}
驗(yàn)證并發(fā)重復(fù)請(qǐng)求
func process(g *Group, t *testing.T, ch chan int, key string) {
for count := 0; count < 10; count++ {
v, err, shared := g.Do(key, func() (interface{}, error) {
time.Sleep(1000 * time.Millisecond)
return "bar", nil
})
t.Log("v = ", v, " err = ", err, " shared =", shared, " ch :", ch, "g ", len(g.m))
if got, want := fmt.Sprintf("%v (%T)", v, v), "bar (string)"; got != want {
t.Errorf("Do = %v; want %v", got, want)
}
if err != nil {
t.Errorf("Do error = %v", err)
}
}
ch <- 1
}
func TestDo1(t *testing.T) {
var g Group
channels := make([]chan int, 10)
key := "key"
for i := 0; i < 10; i++ {
channels[i] = make(chan int)
go process(&g, t, channels[i], key)
}
for i, ch := range channels {
<-ch
fmt.Println("routine ", i, "quit!")
}
}
- 結(jié)果
singleflight的原理
call
call 用來(lái)表示一個(gè)正在執(zhí)行或已完成的函數(shù)調(diào)用。
// call is an in-flight or completed singleflight.Do call
type call struct {
wg sync.WaitGroup
// These fields are written once before the WaitGroup is done
// and are only read after the WaitGroup is done.
//val和err用來(lái)記錄fn發(fā)放執(zhí)行的返回值
val interface{}
err error
// forgotten indicates whether Forget was called with this call's key
// while the call was still in flight.
// 用來(lái)標(biāo)識(shí)fn方法執(zhí)行完成之后結(jié)果是否立馬刪除還是保留在singleflight中
forgotten bool
// These fields are read and written with the singleflight
// mutex held before the WaitGroup is done, and are read but
// not written after the WaitGroup is done.
//dups 用來(lái)記錄fn方法執(zhí)行的次數(shù)
dups int
//用來(lái)記錄DoChan中調(diào)用次數(shù)以及需要返回的數(shù)據(jù)
chans []chan<- Result
}
Group
Group 可以看做是任務(wù)的分類(lèi)并徘。
// Group represents a class of work and forms a namespace in which
// units of work can be executed with duplicate suppression.
type Group struct {
mu sync.Mutex // protects m
m map[string]*call // lazily initialized
}
Do 函數(shù)
// Do executes and returns the results of the given function, making
// sure that only one execution is in-flight for a given key at a
// time. If a duplicate comes in, the duplicate caller waits for the
// original to complete and receives the same results.
// The return value shared indicates whether v was given to multiple callers.
func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) {
g.mu.Lock()
if g.m == nil {
g.m = make(map[string]*call)
}
if c, ok := g.m[key]; ok {
c.dups++
g.mu.Unlock()
c.wg.Wait()
return c.val, c.err, true
}
c := new(call)
// 設(shè)置forgotten = true, doCall時(shí) 不再調(diào)用delete(g.m, key)
// c.forgotten = true
c.wg.Add(1)
g.m[key] = c
g.mu.Unlock()
g.doCall(c, key, fn)
return c.val, c.err, c.dups > 0
}
// doCall handles the single call for a key.
func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) {
c.val, c.err = fn()
c.wg.Done()
g.mu.Lock()
if !c.forgotten {
delete(g.m, key)
}
for _, ch := range c.chans {
ch <- Result{c.val, c.err, c.dups > 0}
}
g.mu.Unlock()
}
在Do方法中是通過(guò)waitgroup來(lái)控制的初斑,主要流程如下:
- 在Group中設(shè)置了一個(gè)map辛润,如果key不存在,則實(shí)例化call(用來(lái)保存值信息)见秤,并將key=>call的對(duì)應(yīng)關(guān)系存入map中通過(guò)mutex保證了并發(fā)安全
- 如果已經(jīng)在調(diào)用中則key已經(jīng)存在map砂竖,則wg.Wait
- 在fn執(zhí)行結(jié)束之后(在doCall方法中執(zhí)行)執(zhí)行wg.Done
- 卡在第2步的方法得到執(zhí)行,返回結(jié)果
其他的DoChan方法也是類(lèi)似的邏輯鹃答,只是返回的是一個(gè)chan乎澄。