import "sync"
// call is an in-flight or completed Do call
type call struct {
wg sync.WaitGroup
val interface{}
err error
}
// Group represents a class of work and forms a namespace in which
// units of work can be executed with duplicate suppression.
type Group struct {
mu sync.Mutex // protects m
m map[string]*call // lazily initialized
}
// Do executes and returns the results of the given function, making
// sure that only one execution is in-flight for a given key at a
// time. If a duplicate comes in, the duplicate caller waits for the
// original to complete and receives the same results.
//同一個(gè)對象多次同時(shí)多次調(diào)用這個(gè)邏輯的時(shí)候,可以使用其中的一個(gè)去執(zhí)行
func (g *Group) Do(key string, fn func()(interface{},error)) (interface{}, error ){
g.mu.Lock() //加鎖保護(hù)存放key的map,因?yàn)橐l(fā)執(zhí)行
if g.m == nil { //lazing make 方式建立
g.m = make(map[string]*call)
}
if c, ok := g.m[key]; ok { //如果map中已經(jīng)存在對這個(gè)key的處理那就等著吧
g.mu.Unlock() //解鎖少办,對map的操作已經(jīng)完畢
c.wg.Wait()
return c.val,c.err //map中只有一份key,所以只有一個(gè)c
}
c := new(call) //創(chuàng)建一個(gè)工作單元申鱼,只負(fù)責(zé)處理一種key
c.wg.Add(1)
g.m[key] = c //將key注冊到map中
g.mu.Unlock() //map的操做完成,解鎖
c.val, c.err = fn()//第一個(gè)注冊者去執(zhí)行
c.wg.Done()
g.mu.Lock()
delete(g.m,key) //對map進(jìn)行操作云头,需要枷鎖
g.mu.Unlock()
return c.val, c.err //給第一個(gè)注冊者返回結(jié)果
}
測試函數(shù)
func TestDoCopy(t *testing.T) {
var g Group
c := make(chan string)
var calls int32
fn := func()(interface{},error) {
atomic.AddInt32(&calls,1)
return <-c , nil
}
var wg sync.WaitGroup
for i:= 0; i< 10 ;i++ {
wg.Add(1)
go func(){
v, err := g.Do("key",fn)
if err != nil {
t.Errorf("exec fn error =%v", err)
}
if v.(string) != "bar" {
t.Errorf("got %q; want %q", v, "bar")
}
wg.Done()
}()
}
time.Sleep(1000* time.Millisecond)
c <- "bar"
wg.Wait()
if got := atomic.LoadInt32(&calls); got != 1 {
t.Errorf("number of calls = %d; want 1", got)
}
return
}