背景:
我們有一個用go做的項目乒验,其中用到了zmq4進行通信愚隧,一個簡單的rpc過程,早期遠端是使用一個map去做ip和具體socket的映射锻全。
問題
大概是這樣
struct SocketMap {
sync.Mutex
sockets map[string]*zmq4.Socket
}
然后調(diào)用的時候的代碼大概就是這樣的:
func (pushList *SocketMap) push(ip string, data []byte) {
pushList.Lock()
defer pushList.UnLock()
socket := pushList.sockets[string]
if socket == nil {
socket := zmq4.NewSocket()
//do some initial operation like connect
pushList.sockets[ip] = socket
}
socket.Send(data)
}
相信大家都能看出問題:當push被并發(fā)訪問的時候(事實上push會經(jīng)常被并發(fā)訪問)奸攻,由于這把大鎖的存在蒜危,同時只能有一個協(xié)程在臨界區(qū)工作,效率是會被大大降低的睹耐。
解決方案:會帶來crash的優(yōu)化
所以我們決定使用sync.Map來替代這個設(shè)計辐赞,然后出了第一版代碼,寫的非常簡單硝训,只做了簡單的替換:
struct SocketMap {
sockets sync.Map
}
func (pushList *SocketMap) push(ip string, data []byte) {
var socket *zmq4.Socket
socketInter, ok = pushList.sockets.Load(ip)
if !ok {
socket = zmq4.NewSocket()
//do some initial operation like connect
pushList.sockets.Store(ip, socket)
} else {
socket = socketInter.(*zmq4.Socket)
}
socket.Send(data)
}
乍一看似乎沒什么問題响委?但是跑起來總是爆炸,然后一看log窖梁,提示有個非法地址赘风。后來在github上才看到,zmq4.Socket不是線程安全的纵刘。上面的代碼恰恰會造成多個線程同時拿到socket實例邀窃,然后就crash了。
解決方案2: 加一把鎖也擋不住的沖突
然后怎么辦呢假哎?看來也只能加鎖了瞬捕,不過這次加鎖不能加到整個map上,否則還會有性能問題舵抹,那就考慮減小鎖的粒度吧肪虎,使用鎖包裝socket。這個時候我們的代碼也就呼之欲出了:
struct SocketMutex{
sync.Mutex
socket *zmq4.Socket
}
struct SocketMap {
sockets sync.Map
}
func (pushList *SocketMap) push(ip string, data []byte) {
var socket *SocketMutex
socketInter, ok = pushList.sockets.Load(ip)
if !ok {
socket = &{
socket: zmq4.NewSocket()
}
//do some initial operation like connect
pushList.sockets.Store(ip, newSocket)
} else {
socket = socketInter.(*SocketMutex)
}
socket.Lock()
defer socket.Unlock()
socket.socket.Send(data)
}
但是這樣還是有問題惧蛹,相信經(jīng)驗比較豐富的老哥一眼就能看出來扇救,問題處在socketInter, ok = pushList.sockets.Load(ip)
這行代碼上,如果map中沒有這個值香嗓,且有多個協(xié)程同時訪問到這行代碼迅腔,顯然這幾個協(xié)程的ok都會置為false,然后都進入第一個if代碼塊靠娱,創(chuàng)建多個socket實例钾挟,并且爭相覆蓋原有值。
單純解決這個問題也很簡單饱岸,就是使用sync.Map.LoadOrStore(key interface{}, value interface{}) (v interface{}, loaded bool)
這個api,來原子地去做讀寫徽千。
然而這還沒完苫费,我們的寫入新值的操作不光是調(diào)用一個api創(chuàng)建socket就完了,還要有一系列的初始化操作双抽,我們必須保證在初始化完成之前百框,其他通過Load拿到這個實例的協(xié)程無法真正訪問socket實例。
這時候顯然sync.Map自帶的機制已經(jīng)無法解決這個問題了牍汹,那么我們必須尋求其他的手段铐维,要么鎖柬泽,要么就sync.WaitGroup或者whatever的其他什么東西。
解決方案3: 閉包帶來的神奇體驗
后來經(jīng)大佬指點嫁蛇,我在encoder.go中看到了這么一段代碼:
346 func typeEncoder(t reflect.Type) encoderFunc {
347 if fi, ok := encoderCache.Load(t); ok {
348 return fi.(encoderFunc)
349 }
350
351 // To deal with recursive types, populate the map with an
352 // indirect func before we build it. This type waits on the
353 // real func (f) to be ready and then calls it. This indirect
354 // func is only used for recursive types.
355 var (
356 wg sync.WaitGroup
357 f encoderFunc
358 )
359 wg.Add(1)
360 fi, loaded := encoderCache.LoadOrStore(t, encoderFunc(func(e *encodeState, v reflect.Value, opts encOpts) {
361 wg.Wait()
362 f(e, v, opts)
363 }))
364 if loaded {
365 return fi.(encoderFunc)
366 }
367
368 // Compute the real encoder and replace the indirect func with it.
369 f = newTypeEncoder(t, true)
370 wg.Done()
371 encoderCache.Store(t, f)
372 return f
373 }
豁然開朗锨并,我們可以在sync.Map中存放一個閉包函數(shù),然后在閉包函數(shù)中等待本地的sync.WaitGroup完成再返回實例睬棚。于是最終的代碼也就成型了第煮。
struct SocketMutex{
sync.Mutex
socket *zmq4.Socket
}
struct SocketMap {
sockets sync.Map
}
func (pushList *SocketMap) push(ip string, data []byte) {
type SocketFunc func()*SocketMutex
var (
socket *SocketMutex
w sync.WaitGroup
)
socket = &SocketMutex {
socket : zmq4.NewSocket()
}
w.Add(1)
socketf, ok = pushList.sockets.LoadOrStore(ip, SocketFunc(func()*SocketMutex) {
w.Wait()
return socket
})
if !ok {
socket = &{
socket: zmq4.NewSocket()
}
//do some initial operation like connect
w.Done()
} else {
socket = socketInter.(*SockeFunc)()
}
socket.Lock()
defer socket.Unlock()
socket.socket.Send(data)
}
總結(jié):
并發(fā)代碼中的競爭問題,每一行代碼的重入性都要深思熟慮啊抑党。
總的來說要保持以下幾個準則:
(1) 不可重入訪問的系統(tǒng)資源包警,如socketfd, filefd,signalfd(事實上大多數(shù)這種系統(tǒng)資源都是不可重入的)等底靠,在使用無鎖結(jié)構(gòu)的容器害晦、讀寫鎖封裝的容器時,需要給每個資源單獨加鎖或者使用其他手段保證系統(tǒng)資源在臨界區(qū)受到有效保護暑中。
(2)如果有讀取壹瘟,如果為空則寫入的邏輯,需要使用能提供原子性保證的LoadOrSave調(diào)用痒芝,或者沒有的話俐筋,自己實現(xiàn)也要保證讀取和寫入過程整體的原子性;防止并發(fā)訪問Load調(diào)用時严衬,多個線程都返回否而創(chuàng)建多個實例澄者,然后在Save的時候又互相覆蓋∏肓眨——這個原則不光對成員是系統(tǒng)資源的時候生效粱挡,如果存放的是其他東西也同樣適用。
(3)如果資源創(chuàng)建完畢俄精,還需要其他的初始化過程询筏,則可以考慮在容器內(nèi)放置閉包,初始化過程使用sync.WaitGroup保護竖慧,在閉包中調(diào)用Wait方法等待初始化完成再給其他線程返回初始化好的實例嫌套。而初始化過程完成后,可以置換閉包函數(shù)圾旨,不再調(diào)用Wait方法踱讨,來減少可能的開銷。