在上篇中渤早,我們已經討論過如何去實現(xiàn)一個 Map 了,并且也討論了諸多優(yōu)化點瘫俊。在下篇中鹊杖,我們將繼續(xù)討論如何實現(xiàn)一個線程安全的 Map。說到線程安全扛芽,需要從概念開始說起骂蓖。
線程安全就是如果你的代碼塊所在的進程中有多個線程在同時運行,而這些線程可能會同時運行這段代碼川尖。如果每次運行結果和單線程運行的結果是一樣的登下,而且其他的變量的值也和預期的是一樣的,就是線程安全的叮喳。
如果代碼塊中包含了對共享數據的更新操作被芳,那么這個代碼塊就可能是非線程安全的。但是如果代碼塊中類似操作都處于臨界區(qū)之中嘲更,那么這個代碼塊就是線程安全的筐钟。
通常有以下兩類避免競爭條件的方法來實現(xiàn)線程安全:
第一類 —— 避免共享狀態(tài)
- 可重入 Re-entrancy
通常在線程安全的問題中揩瞪,最常見的代碼塊就是函數赋朦。讓函數具有線程安全的最有效的方式就是使其可重入。如果某個進程中所有線程都可以并發(fā)的對函數進行調用李破,并且無論他們調用該函數的實際執(zhí)行情況怎么樣宠哄,該函數都可以產生預期的結果,那么就可以說這個函數是可重入的嗤攻。
如果一個函數把共享數據作為它的返回結果或者包含在它返回的結果中毛嫉,那么該函數就肯定不是一個可重入的函數。任何內含了操作共享數據的代碼的函數都是不可重入的函數妇菱。
為了實現(xiàn)線程安全的函數承粤,把所有代碼都置放于臨界區(qū)中是可行的。但是互斥量的使用總會耗費一定的系統(tǒng)資源和時間闯团,使用互斥量的過程總會存在各種博弈和權衡辛臊。所以請合理使用互斥量保護好那些涉及共享數據操作的代碼。
注意:可重入只是線程安全的充分不必要條件房交,并不是充要條件彻舰。這個反例在下面會講到。
- 線程本地存儲
如果變量已經被本地化,所以每個線程都有自己的私有副本刃唤。這些變量通過子程序和其他代碼邊界保留它們的值隔心,并且是線程安全的,因為這些變量都是每個線程本地存儲的尚胞,即使訪問它們的代碼可能被另一個線程同時執(zhí)行硬霍,依舊是線程安全的。
- 不可變量
對象一旦初始化以后就不能改變笼裳。這意味著只有只讀數據被共享须尚,這也實現(xiàn)了固有的線程安全性∈淘郏可變(不是常量)操作可以通過為它們創(chuàng)建新對象耐床,而不是修改現(xiàn)有對象的方式去實現(xiàn)。 Java楔脯,C#和
Python 中的字符串的實現(xiàn)就使用了這種方法撩轰。
第二類 —— 線程同步
第一類方法都比較簡單,通過代碼改造就可以實現(xiàn)昧廷。但是如果遇到一定要進行線程中共享數據的情況堪嫂,第一類方法就解決不了了。這時候就出現(xiàn)了第二類解決方案木柬,利用線程同步的方法來解決線程安全問題皆串。
今天就從線程同步開始說起。
一. 線程同步理論
在多線程的程序中眉枕,多以共享數據作為線程之間傳遞數據的手段恶复。由于一個進程所擁有的相當一部分虛擬內存地址都可以被該進程中所有線程共享,所以這些共享數據大多是以內存空間作為載體的速挑。如果兩個線程同時讀取同一塊共享內存但獲取到的數據卻不同谤牡,那么程序很容易出現(xiàn)一些 bug。
為了保證共享數據一致性姥宝,最簡單并且最徹底的方法就是使該數據成為一個不變量翅萤。當然這種絕對的方式在大多數情況下都是不可行的。比如函數中會用到一個計數器腊满,記錄函數被調用了幾次套么,這個計數器肯定就不能被設為常量。那這種必須是變量的情況下碳蛋,還要保證共享數據的一致性胚泌,這就引出了臨界區(qū)的概念。
臨界區(qū)的出現(xiàn)就是為了使該區(qū)域只能被串行的訪問或者執(zhí)行疮蹦。臨界區(qū)可以是某個資源诸迟,也可以是某段代碼。保證臨界區(qū)最有效的方式就是利用線程同步機制。
先介紹2種共享數據同步的方法阵苇。
1. 互斥量
在同一時刻壁公,只允許一個線程處于臨界區(qū)之內的約束稱為互斥,每個線程在進入臨界區(qū)之前绅项,都必須先鎖定某個對象紊册,只有成功鎖定對象的線程才能允許進入臨界區(qū),否則就會阻塞快耿。這個對象稱為互斥對象或者互斥量囊陡。
一般我們日常說的互斥鎖就能達到這個目的。
互斥量可以有多個掀亥,它們所保護的臨界區(qū)也可以有多個撞反。先從簡單的說起,一個互斥量和一個臨界區(qū)搪花。
(一) 一個互斥量和一個臨界區(qū)
上圖就是一個互斥量和一個臨界區(qū)的例子遏片。當線程1先進入臨界區(qū)的時候,當前臨界區(qū)處于未上鎖的狀態(tài)撮竿,于是它便先將臨界區(qū)上鎖吮便。線程1獲取到臨界區(qū)里面的值。
這個時候線程2準備進入臨界區(qū)幢踏,由于線程1把臨界區(qū)上鎖了髓需,所以線程2進入臨界區(qū)失敗,線程2由就緒狀態(tài)轉成睡眠狀態(tài)房蝉。線程1繼續(xù)對臨界區(qū)的共享數據進行寫入操作僚匆。
當線程1完成所有的操作以后,線程1調用解鎖操作惨驶。當臨界區(qū)被解鎖以后白热,會嘗試喚醒正在睡眠的線程2敛助。線程2被喚醒以后粗卜,由睡眠狀態(tài)再次轉換成就緒狀態(tài)。線程2準備進入臨界區(qū)纳击,當臨界區(qū)此處處于未上鎖的狀態(tài)续扔,線程2便將臨界區(qū)上鎖。
經過 read焕数、write 一系列操作以后纱昧,最終在離開臨界區(qū)的時候會解鎖。
線程在離開臨界區(qū)的時候堡赔,一定要記得把對應的互斥量解鎖识脆。這樣其他因臨界區(qū)被上鎖而導致睡眠的線程還有機會被喚醒。所以對同一個互斥變量的鎖定和解鎖必須成對的出現(xiàn)。既不可以對一個互斥變量進行重復的鎖定灼捂,也不能對一個互斥變量進行多次的解鎖离例。
如果對一個互斥變量鎖定多次可能會導致臨界區(qū)最終永遠阻塞∠こ恚可能有人會問了宫蛆,對一個未鎖定的互斥變成解鎖多次會出現(xiàn)什么問題呢?
在 Go 1.8 之前的猛,雖然對互斥變量解鎖多次不會引起任何 goroutine 的阻塞耀盗,但是它可能引起一個運行時的恐慌。Go 1.8 之前的版本卦尊,是可以嘗試恢復這個恐慌的叛拷,但是恢復以后,可能會導致一系列的問題岂却,比如重復解鎖操作的 goroutine 會永久的阻塞胡诗。所以 Go 1.8 版本以后此類運行時的恐慌就變成了不可恢復的了。所以對互斥變量反復解鎖就會導致運行時操作,最終程序異常退出。
(二) 多個互斥量和一個臨界區(qū)
在這種情況下张抄,極容易產生線程死鎖的情況拘泞。所以盡量不要讓不同的互斥量所保護的臨界區(qū)重疊。
上圖這個例子中肛炮,一個臨界區(qū)中存在2個互斥量:互斥量 A 和互斥量
B。
線程1先鎖定了互斥量 A ,接著線程2鎖定了互斥量 B二汛。當線程1在成功鎖定互斥量 B 之前永遠不會釋放互斥量 A。同樣拨拓,線程2在成功鎖定互斥量 A 之前永遠不會釋放互斥量 B肴颊。那么這個時候線程1和線程2都因無法鎖定自己需要鎖定的互斥量,都由 ready 就緒狀態(tài)轉換為 sleep 睡眠狀態(tài)渣磷。這是就產生了線程死鎖了婿着。
線程死鎖的產生原因有以下幾種:
- 系統(tǒng)資源競爭
- 進程推薦順序非法
- 死鎖必要條件(必要條件中任意一個不滿足,死鎖都不會發(fā)生)
(1). 互斥條件
(2). 不剝奪條件
(3). 請求和保持條件
(4). 循環(huán)等待條件
- 死鎖必要條件(必要條件中任意一個不滿足,死鎖都不會發(fā)生)
想避免線程死鎖的情況發(fā)生有以下幾種方法可以解決:
- 預防死鎖
(1). 資源有序分配法(破壞環(huán)路等待條件)
(2). 資源原子分配法(破壞請求和保持條件)
- 預防死鎖
- 避免死鎖
銀行家算法
- 避免死鎖
- 檢測死鎖
死鎖定理(資源分配圖化簡法)醋界,這種方法雖然可以檢測竟宋,但是無法預防,檢測出來了死鎖還需要配合解除死鎖的方法才行形纺。
- 檢測死鎖
徹底解決死鎖有以下幾種方法:
- 剝奪資源
- 撤銷進程
- 試鎖定 — 回退
如果在執(zhí)行一個代碼塊的時候丘侠,需要先后(順序不定)鎖定兩個變量,那么在成功鎖定其中一個互斥量之后應該使用試鎖定的方法來鎖定另外一個變量逐样。如果試鎖定第二個互斥量失敗蜗字,就把已經鎖定的第一個互斥量解鎖打肝,并重新對這兩個互斥量進行鎖定和試鎖定。
- 試鎖定 — 回退
如上圖挪捕,線程2在鎖定互斥量 B 的時候闯睹,再試鎖定互斥量 A,此時鎖定失敗担神,于是就把互斥量 B 也一起解鎖楼吃。接著線程1會來鎖定互斥量 A。此時也不會出現(xiàn)死鎖的情況妄讯。
- 固定順序鎖定
這種方式就是讓線程1和線程2都按照相同的順序鎖定互斥量孩锡,都按成功鎖定互斥量1以后才能去鎖定互斥量2 。這樣就能保證在一個線程完全離開這些重疊的臨界區(qū)之前亥贸,不會有其他同樣需要鎖定那些互斥量的線程進入到那里躬窜。
(三) 多個互斥量和多個臨界區(qū)
多個臨界區(qū)和多個互斥量的情況就要看是否會有沖突的區(qū)域,如果出現(xiàn)相互交集的沖突區(qū)域炕置,后進臨界區(qū)的線程就會進入睡眠狀態(tài)荣挨,直到該臨界區(qū)的線程完成任務以后,再被喚醒朴摊。
一般情況下默垄,應該盡量少的使用互斥量。每個互斥量保護的臨界區(qū)應該在合理范圍內并盡量大甚纲。但是如果發(fā)現(xiàn)多個線程會頻繁出入某個較大的臨界區(qū)口锭,并且它們之間經常存在訪問沖突,那么就應該把這個較大的臨界區(qū)劃分的更小一點介杆,并使用不同的互斥量保護起來鹃操。這樣做的目的就是為了讓等待進入同一個臨界區(qū)的線程數變少,從而降低線程被阻塞的概率春哨,并減少它們被迫進入睡眠狀態(tài)的時間荆隘,這從一定程度上提高了程序的整體性能。
在說另外一個線程同步的方法之前赴背,回答一下文章開頭留下的一個疑問:可重入只是線程安全的充分不必要條件椰拒,并不是充要條件。這個反例在下面會講到癞尚。
這個問題最關鍵的一點在于:mutex 是不可重入的耸三。
舉個例子:
在下面這段代碼中,函數 increment_counter 是線程安全的浇揩,但不是可重入的。
#include <pthread.h>
int increment_counter ()
{
static int counter = 0;
static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_lock(&mutex);
// only allow one thread to increment at a time
++counter;
// store value before any other threads increment it further
int result = counter;
pthread_mutex_unlock(&mutex);
return result;
}
上面的代碼中憨颠,函數 increment_counter 可以在多個線程中被調用胳徽,因為有一個互斥鎖 mutex 來同步對共享變量 counter 的訪問积锅。但是如果這個函數用在可重入的中斷處理程序中,如果在
pthread_mutex_lock(&mutex) 和 pthread_mutex_unlock(&mutex)
之間產生另一個調用函數 increment_counter 的中斷养盗,則會第二次執(zhí)行此函數缚陷,此時由于 mutex 已被 lock,函數會在 pthread_mutex_lock(&mutex) 處阻塞往核,并且由于 mutex 沒有機會被
unlock箫爷,阻塞會永遠持續(xù)下去。簡言之聂儒,問題在于 pthread 的 mutex 是不可重入的虎锚。
解決辦法是設定 PTHREAD_MUTEX_RECURSIVE 屬性。然而對于給出的問題而言衩婚,專門使用一個 mutex 來保護一次簡單的增量操作顯然過于昂貴窜护,因此 c++11 中的 原子變量 提供了一個可使此函數既線程安全又可重入(而且還更簡潔)的替代方案:
#include <atomic>
int increment_counter ()
{
static std::atomic<int> counter(0);
// increment is guaranteed to be done atomically
int result = ++counter;
return result;
}
在 Go 中,互斥量在標準庫代碼包 sync 中的 Mutex 結構體表示的非春。sync.Mutex 類型只有兩個公開的指針方法柱徙,Lock 和 Unlock。前者用于鎖定當前的互斥量奇昙,后者則用于對當前的互斥量進行解鎖护侮。
2. 條件變量
在線程同步的方法中,還有一個可以與互斥量相提并論的同步方法储耐,條件變量概行。
條件變量與互斥量不同,條件變量的作用并不是保證在同一時刻僅有一個線程訪問某一個共享數據弧岳,而是在對應的共享數據的狀態(tài)發(fā)生變化時凳忙,通知其他因此而被阻塞的線程。條件變量總是與互斥變量組合使用的禽炬。
這類問題其實很常見涧卵。先用生產者消費者的例子來舉例。
如果不用條件變量腹尖,只用互斥量柳恐,來看看會發(fā)生什么后果。
生產者線程在完成添加操作之前热幔,其他的生產者線程和消費者線程都無法進行操作乐设。同一個商品也只能被一個消費者消費。
如果只用互斥量绎巨,可能會出現(xiàn)2個問題近尚。
- 生產者線程獲得了互斥量以后,卻發(fā)現(xiàn)商品已滿场勤,無法再添加新的商品了戈锻。于是該線程就會一直等待歼跟。新的生產者也進入不了臨界區(qū),消費者也無法進入格遭。這時候就死鎖了哈街。
- 消費者線程獲得了互斥量以后,卻發(fā)現(xiàn)商品是空的拒迅,無法消費了骚秦。這個時候該線程也是會一直等待。新的生產者和消費者也都無法進入璧微。這時候同樣也死鎖了作箍。
這就是只用互斥量無法解決的問題。在多個線程之間往毡,急需一套同步的機制蒙揣,能讓這些線程都協(xié)作起來。
條件變量就是大家熟悉的 P - V 操作了开瞭。這塊大家應該比較熟悉懒震,所以簡單的過一下。
P 操作就是 wait 操作嗤详,它的意思就是阻塞當前線程个扰,直到收到該條件變量發(fā)來的通知。
V 操作就是 signal 操作葱色,它的意思就是讓該條件變量向至少一個正在等待它通知的線程發(fā)送通知递宅,以表示某個共享數據的狀態(tài)已經變化。
Broadcast 廣播通知苍狰,它的意思就是讓條件變量給正在等待它通知的所有線程發(fā)送通知办龄,以表示某個共享數據的狀態(tài)已經發(fā)生改變。
signal 可以操作多次淋昭,如果操作3次俐填,就代表發(fā)了3次信號通知。如上圖翔忽。
P - V 操作設計美妙之處在于英融,P 操作的次數與 V 操作的次數是相同的。wait 多少次歇式,signal 對應的有多少次驶悟。看上圖材失,這個循環(huán)就是這么的奇妙痕鳍。
生產者消費者問題
這個問題可以形象的描述成像上圖這樣,門衛(wèi)守護著臨界區(qū)的安全豺憔。售票廳記錄著當前 semaphone 的值额获,它也控制著門衛(wèi)是否打開臨界區(qū)够庙。
臨界區(qū)只允許一個線程進入恭应,當已經有一個線程了抄邀,再來一個線程,就會被 lock 住昼榛。售票廳也會記錄當前阻塞的線程數境肾。
當之前的線程離開以后,售票廳就會告訴門衛(wèi)胆屿,允許一個線程進入臨界區(qū)奥喻。
用 P-V 偽代碼來描述生產者消費者:
初始變量:
semaphore mutex = 1; // 臨界區(qū)互斥信號量
semaphore empty = n; // 空閑緩沖區(qū)個數
semaphore full = 0; // 緩沖區(qū)初始化為空
生產者線程:
producer()
{
while(1) {
produce an item in nextp;
P(empty);
P(mutex);
add nextp to buffer;
V(mutex);
V(full);
}
}
消費者線程:
consumer()
{
while(1) {
P(full);
P(mutex);
remove an item from buffer;
V(mutex);
V(empty);
consume the item;
}
}
雖然在生產者和消費者單個程序里面 P,V 并不是成對的非迹,但是整個程序里面 P环鲤,V 還是成對的。
讀者寫者問題——讀者優(yōu)先憎兽,寫者延遲
讀者優(yōu)先冷离,寫進程被延遲。只要有讀者在讀纯命,后來的讀者都可以隨意進來讀西剥。
讀者要先進入 rmutex ,查看 readcount亿汞,然后修改 readcout 的值瞭空,最后再去讀數據。對于每個讀進程都是寫者疗我,都要進去修改 readcount 的值咆畏,所以還要單獨設置一個 rmutex 互斥訪問。
初始變量:
int readcount = 0; // 讀者數量
semaphore rmutex = 1; // 保證更新 readcount 互斥
semaphore wmutex = 1; // 保證讀者和寫著互斥的訪問文件
讀者線程:
reader()
{
while(1) {
P(rmutex); // 準備進入吴裤,修改 readcount旧找,“開門”
if(readcount == 0) { // 說明是第一個讀者
P(wmutex); // 拿到”鑰匙”,阻止寫線程來寫
}
readcount ++;
V(rmutex);
reading;
P(rmutex); // 準備離開
readcount --;
if(readcount == 0) { // 說明是最后一個讀者
V(wmutex); // 交出”鑰匙”嚼摩,讓寫線程來寫
}
V(rmutex); // 離開钦讳,“關門”
}
}
寫者線程:
writer()
{
while(1) {
P(wmutex);
writing;
V(wmutex);
}
}
讀者寫者問題——寫者優(yōu)先总寻,讀者延遲
有寫者寫磺浙,禁止后面的讀者來讀。在寫者前的讀者路克,讀完就走潮秘。只要有寫者在等待琼开,禁止后來的讀者進去讀。
初始變量:
int readcount = 0; // 讀者數量
semaphore rmutex = 1; // 保證更新 readcount 互斥
semaphore wmutex = 1; // 保證讀者和寫著互斥的訪問文件
semaphore w = 1; // 用于實現(xiàn)“寫者優(yōu)先”
讀者線程:
reader()
{
while(1) {
P(w); // 在沒有寫者的時候才能請求進入
P(rmutex); // 準備進入枕荞,修改 readcount柜候,“開門”
if(readcount == 0) { // 說明是第一個讀者
P(wmutex); // 拿到”鑰匙”搞动,阻止寫線程來寫
}
readcount ++;
V(rmutex);
V(w);
reading;
P(rmutex); // 準備離開
readcount --;
if(readcount == 0) { // 說明是最后一個讀者
V(wmutex); // 交出”鑰匙”,讓寫線程來寫
}
V(rmutex); // 離開渣刷,“關門”
}
}
寫者線程:
writer()
{
while(1) {
P(w);
P(wmutex);
writing;
V(wmutex);
V(w);
}
}
哲學家進餐問題
假設有五位哲學家圍坐在一張圓形餐桌旁鹦肿,做以下兩件事情之一:吃飯,或者思考辅柴。吃東西的時候箩溃,他們就停止思考,思考的時候也停止吃東西碌嘀。餐桌中間有一大碗意大利面涣旨,每兩個哲學家之間有一只餐叉。因為用一只餐叉很難吃到意大利面股冗,所以假設哲學家必須用兩只餐叉吃東西霹陡。他們只能使用自己左右手邊的那兩只餐叉。哲學家就餐問題有時也用米飯和筷子而不是意大利面和餐叉來描述止状,因為很明顯烹棉,吃米飯必須用兩根筷子。
初始變量:
semaphore chopstick[5] = {1,1,1,1,1}; // 初始化信號量
semaphore mutex = 1; // 設置取筷子的信號量
哲學家線程:
Pi()
{
do {
P(mutex); // 獲得取筷子的互斥量
P(chopstick[i]); // 取左邊的筷子
P(chopstick[ (i + 1) % 5 ]); // 取右邊的筷子
V(mutex); // 釋放取筷子的信號量
eat;
V(chopstick[i]); // 放回左邊的筷子
V(chopstick[ (i + 1) % 5 ]); // 放回右邊的筷子
think;
}while(1);
}
綜上所述导俘,互斥量可以實現(xiàn)對臨界區(qū)的保護峦耘,并會阻止競態(tài)條件的發(fā)生。條件變量作為補充手段旅薄,可以讓多方協(xié)作更加有效率辅髓。
在 Go 的標準庫中,sync 包里面 sync.Cond 類型代表了條件變量少梁。但是和互斥鎖和讀寫鎖不同的是洛口,簡單的聲明無法創(chuàng)建出一個可用的條件變量,還需要用到 sync.NewCond 函數凯沪。
func NewCond( l locker) *Cond
*sync.Cond 類型的方法集合中有3個方法第焰,即 Wait、Signal 和 Broadcast 妨马。
二. 簡單的線程鎖方案
實現(xiàn)線程安全的方案最簡單的方法就是加鎖了挺举。
先看看 OC 中如何實現(xiàn)一個線程安全的字典吧。
在 Weex 的源碼中烘跺,就實現(xiàn)了一套線程安全的字典湘纵。類名叫 WXThreadSafeMutableDictionary。
/**
* @abstract Thread safe NSMutableDictionary
*/
@interface WXThreadSafeMutableDictionary<KeyType, ObjectType> : NSMutableDictionary
@property (nonatomic, strong) dispatch_queue_t queue;
@property (nonatomic, strong) NSMutableDictionary* dict;
@end
具體實現(xiàn)如下:
- (instancetype)initCommon
{
self = [super init];
if (self) {
NSString* uuid = [NSString stringWithFormat:@"com.taobao.weex.dictionary_%p", self];
_queue = dispatch_queue_create([uuid UTF8String], DISPATCH_QUEUE_CONCURRENT);
}
return self;
}
該線程安全的字典初始化的時候會新建一個并發(fā)的 queue滤淳。
- (NSUInteger)count
{
__block NSUInteger count;
dispatch_sync(_queue, ^{
count = _dict.count;
});
return count;
}
- (id)objectForKey:(id)aKey
{
__block id obj;
dispatch_sync(_queue, ^{
obj = _dict[aKey];
});
return obj;
}
- (NSEnumerator *)keyEnumerator
{
__block NSEnumerator *enu;
dispatch_sync(_queue, ^{
enu = [_dict keyEnumerator];
});
return enu;
}
- (id)copy{
__block id copyInstance;
dispatch_sync(_queue, ^{
copyInstance = [_dict copy];
});
return copyInstance;
}
讀取的這些方法都用 dispatch_sync 梧喷。
- (void)setObject:(id)anObject forKey:(id<NSCopying>)aKey
{
aKey = [aKey copyWithZone:NULL];
dispatch_barrier_async(_queue, ^{
_dict[aKey] = anObject;
});
}
- (void)removeObjectForKey:(id)aKey
{
dispatch_barrier_async(_queue, ^{
[_dict removeObjectForKey:aKey];
});
}
- (void)removeAllObjects{
dispatch_barrier_async(_queue, ^{
[_dict removeAllObjects];
});
}
和寫入相關的方法都用 dispatch_barrier_async。
再看看 Go 用互斥量如何實現(xiàn)一個簡單的線程安全的 Map 吧。
既然要用到互斥量铺敌,那么我們封裝一個包含互斥量的 Map 汇歹。
type MyMap struct {
sync.Mutex
m map[int]int
}
var myMap *MyMap
func init() {
myMap = &MyMap{
m: make(map[int]int, 100),
}
}
再簡單的實現(xiàn) Map 的基礎方法。
func builtinMapStore(k, v int) {
myMap.Lock()
defer myMap.Unlock()
myMap.m[k] = v
}
func builtinMapLookup(k int) int {
myMap.Lock()
defer myMap.Unlock()
if v, ok := myMap.m[k]; !ok {
return -1
} else {
return v
}
}
func builtinMapDelete(k int) {
myMap.Lock()
defer myMap.Unlock()
if _, ok := myMap.m[k]; !ok {
return
} else {
delete(myMap.m, k)
}
}
實現(xiàn)思想比較簡單偿凭,在每個操作前都加上 lock产弹,在每個函數結束 defer 的時候都加上 unlock。
這種加鎖的方式實現(xiàn)的線程安全的字典笔喉,優(yōu)點是比較簡單取视,缺點是性能不高硝皂。文章最后會進行幾種實現(xiàn)方法的性能對比常挚,用數字說話,就知道這種基于互斥量加鎖方式實現(xiàn)的性能有多差了稽物。
在語言原生就自帶線程安全 Map 的語言中奄毡,它們的原生底層實現(xiàn)都不是通過單純的加鎖來實現(xiàn)線程安全的,比如 Java 的 ConcurrentHashMap贝或,Go 1.9 新加的 sync.map吼过。
三. 現(xiàn)代線程安全的 Lock - Free 方案 CAS
在 Java 的 ConcurrentHashMap 底層實現(xiàn)中大量的利用了 volatile,final咪奖,CAS 等 Lock-Free 技術來減少鎖競爭對于性能的影響盗忱。
在 Go 中也大量的使用了原子操作,CAS 是其中之一羊赵。比較并交換即 “Compare And Swap”趟佃,簡稱 CAS。
func CompareAndSwapInt32(addr *int32, old, new int32) (swapped bool)
func CompareAndSwapInt64(addr *int64, old, new int64) (swapped bool)
func CompareAndSwapUint32(addr *uint32, old, new uint32) (swapped bool)
func CompareAndSwapUint64(addr *uint64, old, new uint64) (swapped bool)
func CompareAndSwapUintptr(addr *uintptr, old, new uintptr) (swapped bool)
func CompareAndSwapPointer(addr *unsafe.Pointer, old, new unsafe.Pointer) (swapped bool)
CAS 會先判斷參數 addr 指向的被操作值與參數 old 的值是否相等昧捷。如果相當闲昭,相應的函數才會用參數 new 代表的新值替換舊值。否則序矩,替換操作就會被忽略。
這一點與互斥鎖明顯不同跋破,CAS 總是假設被操作的值未曾改變簸淀,并一旦確認這個假設成立,就立即進行值的替換毒返。而互斥鎖的做法就更加謹慎租幕,總是先假設會有并發(fā)的操作修改被操作的值,并需要使用鎖將相關操作放入臨界區(qū)中加以保護饿悬×铗龋可以說互斥鎖的做法趨于悲觀,CAS 的做法趨于樂觀,類似樂觀鎖珠叔。
CAS 做法最大的優(yōu)勢在于可以不創(chuàng)建互斥量和臨界區(qū)的情況下蝎宇,完成并發(fā)安全的值替換操作。這樣大大的減少了線程同步操作對程序性能的影響祷安。當然 CAS 也有一些缺點姥芥,缺點下一章會提到。
接下來看看源碼是如何實現(xiàn)的汇鞭。以下以64位為例凉唐,32位類似。
TEXT ·CompareAndSwapUintptr(SB),NOSPLIT,$0-25
JMP ·CompareAndSwapUint64(SB)
TEXT ·CompareAndSwapInt64(SB),NOSPLIT,$0-25
JMP ·CompareAndSwapUint64(SB)
TEXT ·CompareAndSwapUint64(SB),NOSPLIT,$0-25
MOVQ addr+0(FP), BP
MOVQ old+8(FP), AX
MOVQ new+16(FP), CX
LOCK
CMPXCHGQ CX, 0(BP)
SETEQ swapped+24(FP)
RET
上述實現(xiàn)最關鍵的一步就是 CMPXCHG霍骄。
查詢 Intel 的文檔
文檔上說:
比較 eax 和目的操作數(第一個操作數)的值台囱,如果相同,ZF 標志被設置读整,同時源操作數(第二個操作)的值被寫到目的操作數簿训,否則,清
ZF 標志米间,并且把目的操作數的值寫回 eax强品。
于是也就得出了 CMPXCHG 的工作原理:
比較 _old 和 (*__ptr) 的值,如果相同屈糊,ZF 標志被設置的榛,同時
_new 的值被寫到 (*__ptr),否則逻锐,清 ZF 標志夫晌,并且把 (*__ptr) 的值寫回 _old。
在 Intel 平臺下谦去,會用 LOCK CMPXCHG 來實現(xiàn)慷丽,這里的 LOCK 是 CPU 鎖。
Intel 的手冊對 LOCK 前綴的說明如下:
- 確保對內存的讀-改-寫操作原子執(zhí)行鳄哭。在 Pentium 及 Pentium 之前的處理器中要糊,帶有 LOCK 前綴的指令在執(zhí)行期間會鎖住總線,使得其他處理器暫時無法通過總線訪問內存妆丘。很顯然锄俄,這會帶來昂貴的開銷。從 Pentium 4勺拣,Intel Xeon 及 P6 處理器開始奶赠,Intel 在原有總線鎖的基礎上做了一個很有意義的優(yōu)化:如果要訪問的內存區(qū)域(area of memory)在 LOCK 前綴指令執(zhí)行期間已經在處理器內部的緩存中被鎖定(即包含該內存區(qū)域的緩存行當前處于獨占或以修改狀態(tài)),并且該內存區(qū)域被完全包含在單個緩存行(cache line)中药有,那么處理器將直接執(zhí)行該指令毅戈。由于在指令執(zhí)行期間該緩存行會一直被鎖定苹丸,其它處理器無法讀/寫該指令要訪問的內存區(qū)域,因此能保證指令執(zhí)行的原子性苇经。這個操作過程叫做緩存鎖定(cache locking)赘理,緩存鎖定將大大降低 LOCK 前綴指令的執(zhí)行開銷,但是當多處理器之間的競爭程度很高或者指令訪問的內存地址未對齊時扇单,仍然會鎖住總線商模。
- 禁止該指令與之前和之后的讀和寫指令重排序。
- 把寫緩沖區(qū)中的所有數據刷新到內存中蜘澜。
看完描述施流,可以看出,CPU 鎖主要分兩種鄙信,總線鎖和緩存鎖瞪醋。總線鎖用在老的 CPU 中扮碧,緩存鎖用在新的 CPU 中趟章。
所謂總線鎖就是使用 CPU 提供的一個LOCK#信號,當一個處理器在總線上輸出此信號時慎王,其他處理器的請求將被阻塞住,那么該 CPU 可以獨占使用共享內存宏侍±涤伲總線鎖的這種方式,在執(zhí)行期間會鎖住總線谅河,使得其他處理器暫時無法通過總線訪問內存咱旱。所以總線鎖定的開銷比較大,最新的處理器在某些場合下使用緩存鎖定代替總線鎖定來進行優(yōu)化绷耍。
所謂“緩存鎖定”就是如果緩存在處理器緩存行中內存區(qū)域在 LOCK 操作期間被鎖定吐限,當它執(zhí)行鎖操作回寫內存時,處理器不在總線上產生
LOCK#信號褂始,而是修改內部的內存地址诸典,并允許它的緩存一致性機制來保證操作的原子性,因為緩存一致性機制會阻止同時修改被兩個以上處理器緩存的內存區(qū)域數據崎苗,當其他處理器回寫已被鎖定的緩存行的數據時會對緩存行無效狐粱。
有兩種情況處理器無法使用緩存鎖。
第一種情況是胆数,當操作的數據不能被緩存在處理器內部肌蜻,或操作的數據跨多個緩存行(cache line),則處理器會調用總線鎖定必尼。
第二種情況是:有些處理器不支持緩存鎖定蒋搜。一些老的 CPU 就算鎖定的內存區(qū)域在處理器的緩存行中也會調用總線鎖定。
雖然緩存鎖可以大大降低 CPU 鎖的執(zhí)行開銷,但是如果遇到多處理器之間的競爭程度很高或者指令訪問的內存地址未對齊時豆挽,仍然會鎖住總線酸休。所以緩存鎖和總線鎖相互配合,效果更佳祷杈。
綜上斑司,用 CAS 方式來保證線程安全的方式就比用互斥鎖的方式效率要高很多。
四. CAS 的缺陷
雖然 CAS 的效率高但汞,但是依舊存在3大問題宿刮。
1. ABA 問題
線程1準備用 CAS 將變量的值由 A 替換為 B ,在此之前私蕾,線程2將變量的值由 A 替換為 C 僵缺,又由 C 替換為 A,然后線程1執(zhí)行 CAS 時發(fā)現(xiàn)變量的值仍然為 A踩叭,所以 CAS 成功磕潮。但實際上這時的現(xiàn)場已經和最初不同了。圖上也為了分開兩個 A 不同容贝,所以用不同的顏色標記了自脯。最終線程2把 A 替換成了 B 。這就是經典的 ABA 問題斤富。但是這會導致項目出現(xiàn)什么問題呢膏潮?
設想存在這樣一個鏈棧,棧里面存儲了一個鏈表满力,棧頂是 A焕参,A 的 next 指針指向 B闰歪。在線程1中甘磨,要將棧頂元素 A 用 CAS 把它替換成 B。接著線程2來了勺择,線程2將之前包含 A潦嘶,B 元素的鏈表都 pop 出去涩嚣。然后 push 進來一個 A - C - D 鏈表,棧頂元素依舊是 A衬以。這時線程1發(fā)現(xiàn) A 沒有發(fā)生變化缓艳,于是替換成 B。這個時候 B 的 next 其實為 nil看峻。替換完成以后阶淘,線程2操作的鏈表 C - D 這里就與表頭斷開連接了。也就是說線程1 CAS 操作結束互妓,C - D 就被丟失了溪窒,再也找不回來了坤塞。棧中只剩下 B 一個元素了。這很明顯出現(xiàn)了 bug澈蚌。
那怎么解決這種情況呢摹芙?最通用的做法就是加入版本號進行標識。
每次操作都加上版本號宛瞄,這樣就可以完美解決 ABA 的問題了浮禾。
2. 循環(huán)時間可能過長
自旋 CAS 如果長時間不成功,會給 CPU 帶來非常大的執(zhí)行開銷份汗。如果能支持 CPU 提供的 Pause 指令盈电,那么 CAS 的效率能有一定的提升。Pause 指令有兩個作用杯活,第一它可以延遲流水線執(zhí)行指令(de-pipeline)匆帚,使 CPU 不會消耗過多的執(zhí)行資源,延遲的時間取決于具體實現(xiàn)的版本旁钧,在一些處理器上延遲時間是零吸重。第二它可以避免在退出循環(huán)的時候因內存順序沖突(memory order violation)而引起 CPU 流水線被清空(CPU pipeline flush),從而提高 CPU 的執(zhí)行效率歪今。
3. 只能保證一個共享變量的原子操作
CAS 操作只能保證一個共享變量的原子操作嚎幸,但是保證多個共享變量操作的原子性。一般做法可能就考慮利用鎖了彤委。
不過也可以利用一個結構體鞭铆,把兩個變量合并成一個變量。這樣還可以繼續(xù)利用 CAS 來保證原子性操作焦影。
五. Lock - Free 方案舉例
在 Lock - Free方案舉例之前,先來回顧一下互斥量的方案封断。上面我們用互斥量實現(xiàn)了 Go 的線程安全的 Map斯辰。至于這個 Map 的性能如何,接下來對比的時候可以看看數據坡疼。
1. NO Lock - Free 方案
如果不用 Lock - Free 方案也不用簡單的互斥量的方案彬呻,如何實現(xiàn)一個線程安全的字典呢?答案是利用分段鎖的設計柄瑰,只有在同一個分段內才存在競態(tài)關系闸氮,不同的分段鎖之間沒有鎖競爭。相比于對整個
Map 加鎖的設計教沾,分段鎖大大的提高了高并發(fā)環(huán)境下的處理能力蒲跨。
type ConcurrentMap []*ConcurrentMapShared
type ConcurrentMapShared struct {
items map[string]interface{}
sync.RWMutex // 讀寫鎖,保證進入內部 map 的線程安全
}
分段鎖 Segment 存在一個并發(fā)度授翻。并發(fā)度可以理解為程序運行時能夠同時更新 ConccurentMap 且不產生鎖競爭的最大線程數或悲,實際上就是 ConcurrentMap 中的分段鎖個數孙咪。即數組的長度。
var SHARD_COUNT = 32
如果并發(fā)度設置的過小巡语,會帶來嚴重的鎖競爭問題翎蹈;如果并發(fā)度設置的過大,原本位于同一個 Segment 內的訪問會擴散到不同的 Segment 中男公,CPU cache 命中率會下降荤堪,從而引起程序性能下降。
ConcurrentMap 的初始化就是對數組的初始化枢赔,并且初始化數組里面每個字典澄阳。
func New() ConcurrentMap {
m := make(ConcurrentMap, SHARD_COUNT)
for i := 0; i < SHARD_COUNT; i++ {
m[i] = &ConcurrentMapShared{items: make(map[string]interface{})}
}
return m
}
ConcurrentMap 主要使用 Segment 來實現(xiàn)減小鎖粒度,把 Map 分割成若干個 Segment糠爬,在 put 的時候需要加讀寫鎖寇荧,get 時候只加讀鎖。
既然分段了执隧,那么針對每個 key 對應哪一個段的邏輯就由一個哈希函數來定揩抡。
func fnv32(key string) uint32 {
hash := uint32(2166136261)
const prime32 = uint32(16777619)
for i := 0; i < len(key); i++ {
hash *= prime32
hash ^= uint32(key[i])
}
return hash
}
上面這段哈希函數會根據每次傳入的 string ,計算出不同的哈希值镀琉。
func (m ConcurrentMap) GetShard(key string) *ConcurrentMapShared {
return m[uint(fnv32(key))%uint(SHARD_COUNT)]
}
根據哈希值對數組長度取余峦嗤,取出 ConcurrentMap 中的 ConcurrentMapShared。在 ConcurrentMapShared 中存儲對應這個段的 key - value屋摔。
func (m ConcurrentMap) Set(key string, value interface{}) {
// Get map shard.
shard := m.GetShard(key)
shard.Lock()
shard.items[key] = value
shard.Unlock()
}
上面這段就是 ConcurrentMap 的 set 操作烁设。思路很清晰:先取出對應段內的 ConcurrentMapShared,然后再加讀寫鎖鎖定钓试,寫入 key - value装黑,寫入成功以后再釋放讀寫鎖。
func (m ConcurrentMap) Get(key string) (interface{}, bool) {
// Get shard
shard := m.GetShard(key)
shard.RLock()
// Get item from shard.
val, ok := shard.items[key]
shard.RUnlock()
return val, ok
}
上面這段就是 ConcurrentMap 的 get 操作弓熏。思路也很清晰:先取出對應段內的 ConcurrentMapShared恋谭,然后再加讀鎖鎖定,讀取 key - value挽鞠,讀取成功以后再釋放讀鎖疚颊。
這里和 set 操作的區(qū)別就在于只需要加讀鎖即可,不用加讀寫鎖信认。
func (m ConcurrentMap) Count() int {
count := 0
for i := 0; i < SHARD_COUNT; i++ {
shard := m[i]
shard.RLock()
count += len(shard.items)
shard.RUnlock()
}
return count
}
ConcurrentMap 的 Count 操作就是把 ConcurrentMap 數組的每一個分段元素里面的每一個元素都遍歷一遍材义,計算出總數。
func (m ConcurrentMap) Keys() []string {
count := m.Count()
ch := make(chan string, count)
go func() {
// 遍歷所有的 shard.
wg := sync.WaitGroup{}
wg.Add(SHARD_COUNT)
for _, shard := range m {
go func(shard *ConcurrentMapShared) {
// 遍歷所有的 key, value 鍵值對.
shard.RLock()
for key := range shard.items {
ch <- key
}
shard.RUnlock()
wg.Done()
}(shard)
}
wg.Wait()
close(ch)
}()
// 生成 keys 數組嫁赏,存儲所有的 key
keys := make([]string, 0, count)
for k := range ch {
keys = append(keys, k)
}
return keys
}
上述是返回 ConcurrentMap 中所有 key 其掂,結果裝在字符串數組中。
type UpsertCb func(exist bool, valueInMap interface{}, newValue interface{}) interface{}
func (m ConcurrentMap) Upsert(key string, value interface{}, cb UpsertCb) (res interface{}) {
shard := m.GetShard(key)
shard.Lock()
v, ok := shard.items[key]
res = cb(ok, v, value)
shard.items[key] = res
shard.Unlock()
return res
}
上述代碼是 Upsert 操作橄教。如果已經存在了清寇,就更新喘漏。如果是一個新元素,就用 UpsertCb 函數插入一個新的华烟。思路也是先根據 string 找到對應的段翩迈,然后加讀寫鎖。這里只能加讀寫鎖盔夜,因為不管是 update 還是 insert 操作负饲,都需要寫入。讀取 key 對應的 value 值喂链,然后調用 UpsertCb 函數返十,把結果更新到 key 對應的 value 中。最后釋放讀寫鎖即可椭微。
UpsertCb 函數在這里值得說明的是洞坑,這個函數是回調返回待插入到 map 中的新元素。這個函數當且僅當在讀寫鎖被鎖定的時候才會被調用蝇率,因此一定不允許再去嘗試讀取同一個 map 中的其他 key 值迟杂。因為這樣會導致線程死鎖。死鎖的原因是 Go 中 sync.RWLock 是不可重入的本慕。
完整的代碼見concurrent_map.go
這種分段的方法雖然比單純的加互斥量好很多排拷,因為 Segment 把鎖住的范圍進一步的減少了,但是這個范圍依舊比較大锅尘,還能再進一步的減少鎖么监氢?
還有一點就是并發(fā)量的設置,要合理藤违,不能太大也不能太小浪腐。
2. Lock - Free 方案
在 Go 1.9 的版本中默認就實現(xiàn)了一種線程安全的 Map,摒棄了Segment(分段鎖)的概念顿乒,而是啟用了一種全新的方式實現(xiàn)牛欢,利用了 CAS 算法,即 Lock - Free 方案淆游。
采用 Lock - Free 方案以后,能比上一個分案隔盛,分段鎖更進一步縮小鎖的范圍犹菱。性能大大提升。
接下來就讓我們來看看如何用 CAS 實現(xiàn)一個線程安全的高性能 Map 吮炕。
官方是 sync.map 有如下的描述:
這個 Map 是線程安全的腊脱,讀取,插入龙亲,刪除也都保持著常數級的時間復雜度陕凹。多個 goroutines 協(xié)程同時調用 Map 方法也是線程安全的悍抑。該 Map 的零值是有效的,并且零值是一個空的 Map 杜耙。線程安全的 Map 在第一次使用之后搜骡,不允許被拷貝。
這里解釋一下為何不能被拷貝佑女。因為對結構體的復制不但會生成該值的副本记靡,還會生成其中字段的副本。如此一來团驱,本應施加于此的并發(fā)線程安全保護也就失效了摸吠。
作為源值賦給別的變量,作為參數值傳入函數嚎花,作為結果值從函數返回寸痢,作為元素值通過通道傳遞等都會造成值的復制。正確的做法是用指向該類型的指針類型的變量紊选。
Go 1.9 中 sync.map 的數據結構如下:
type Map struct {
mu Mutex
// 并發(fā)讀取 map 中一部分的內容是線程安全的啼止,這是不需要
// read 這部分自身讀取就是線程安全的,因為是原子性的丛楚。但是存儲的時候還是需要 Mutex
// 存儲在 read 中的 entry 在并發(fā)讀取過程中是允許更新的族壳,即使沒有 Mutex 信號量,也是線程安全的趣些。但是更新一個以前刪除的 entry 就需要把值拷貝到 dirty Map 中仿荆,并且必須要帶上 Mutex
read atomic.Value // readOnly
// dirty 中包含 map 中必須要互斥量 mu 保護才能線程安全的部分。為了使 dirty 能快速的轉化成 read map坏平,dirty 中包含了 read map 中所有沒有被刪除的 entries
// 已經刪除過的 entries 不存儲在 dirty map 中拢操。在 clean map 中一個已經刪除的 entry 一定是沒有被刪除過的,并且當新值將要被存儲的時候舶替,它們會被添加到 dirty map 中令境。
// 當 dirty map 為 nil 的時候,下一次寫入的時候會通過 clean map 忽略掉舊的 entries 以后的淺拷貝副本來初始化 dirty map顾瞪。
dirty map[interface{}]*entry
// misses 記錄了 read map 因為需要判斷 key 是否存在而鎖住了互斥量 mu 進行了 update 操作以后的加載次數舔庶。
// 一旦 misses 值大到足夠去復制 dirty map 所需的花費的時候,那么 dirty map 就被提升到未被修改狀態(tài)下的 read map陈醒,下次存儲就會創(chuàng)建一個新的 dirty map惕橙。
misses int
}
在這個 Map 中,包含一個互斥量 mu钉跷,一個原子值 read弥鹦,一個非線程安全的字典 map,這個字典的 key 是 interface{} 類型爷辙,value 是 *entry 類型彬坏。最后還有一個 int 類型的計數器朦促。
先來說說原子值。atomic.Value 這個類型有兩個公開的指針方法栓始,Load 和 Store 务冕。Load 方法用于原子地的讀取原子值實例中存儲的值,它會返回一個 interface{} 類型的結果混滔,并且不接受任何參數洒疚。Store 方法用于原子地在原子值實例中存儲一個值,它接受一個 interface{} 類型的參數而沒有任何結果坯屿。在未曾通過 Store 方法向原子值實例存儲值之前油湖,它的 Load 方法總會返回 nil。
在這個線程安全的字典中领跛,Load 和 Store 的都是一個 readOnly 的數據結構乏德。
// readOnly 是一個不可變的結構體,原子性的存儲在 Map.read 中
type readOnly struct {
m map[interface{}]*entry
// 標志 dirty map 中是否包含一些不在 m 中的 key 吠昭。
amended bool // true if the dirty map contains some key not in m.
}
readOnly 中存儲了一個非線程安全的字典喊括,這個字典和上面 dirty map 存儲的類型完全一致。key 是 interface{} 類型矢棚,value 是 *entry 類型郑什。
// entry 是一個插槽,與 map 中特定的 key 相對應
type entry struct {
p unsafe.Pointer // *interface{}
}
p 指針指向 *interface{} 類型蒲肋,里面存儲的是 entry 的地址蘑拯。如果 p == nil,代表 entry 被刪除了兜粘,并且 m.dirty == nil申窘。如果 p == expunged,代表 entry 被刪除了孔轴,并且 m.dirty 剃法!= nil ,那么 entry 從 m.dirty 中丟失了路鹰。
除去以上兩種情況外碴开,entry 都是有效的粗蔚,并且被記錄在 m.read.m[key] 中攀涵,如果 m.dirty!= nil滤愕,entry 被存儲在 m.dirty[key] 中捌治。
一個 entry 可以通過原子替換操作成 nil 來刪除它驼鹅。當 m.dirty 在下一次被創(chuàng)建讨衣,entry 會被 expunged 指針原子性的替換為 nil吏奸,m.dirty[key] 不對應任何 value黎休。只要 p != expunged浓领,那么一個 entry 就可以通過原子替換操作更新關聯(lián)的 value玉凯。如果 p == expunged,那么一個 entry 想要通過原子替換操作更新關聯(lián)的 value联贩,只能在首次設置 m.dirty[key] = e 以后才能更新 value漫仆。這樣做是為了能在 dirty map 中查找到它。
總結一下泪幌,sync.map 的數據結構如上盲厌。
再看看線程安全的 sync.map 的一些操作。
func (m *Map) Load(key interface{}) (value interface{}, ok bool) {
read, _ := m.read.Load().(readOnly)
e, ok := read.m[key]
// 如果 key 對應的 value 不存在祸泪,并且 dirty map 包含 read map 中沒有的 key吗浩,那么開始讀取 dirty map
if !ok && read.amended {
// dirty map 不是線程安全的,所以需要加上互斥鎖
m.mu.Lock()
// 當 m.dirty 被提升的時候没隘,為了防止得到一個虛假的 miss 懂扼,所以此時我們加鎖。
// 如果再次讀取相同的 key 不 miss右蒲,那么這個 key 值就就不值得拷貝到 dirty map 中阀湿。
read, _ = m.read.Load().(readOnly)
e, ok = read.m[key]
if !ok && read.amended {
e, ok = m.dirty[key]
// 無論 entry 是否存在,記錄這次 miss 瑰妄。
// 這個 key 將會緩慢的被取出陷嘴,直到 dirty map 提升到 read map
m.missLocked()
}
m.mu.Unlock()
}
if !ok {
return nil, false
}
return e.load()
}
上述代碼是 Load 操作。返回的是入參 key 對應的 value 值间坐。如果 value 不存在就返回 nil灾挨。dirty map 中會保存一些 read map 里面不存在的 key,那么就要讀取出 dirty map 里面 key 對應的 value眶诈。注意讀取的時候需要加互斥鎖涨醋,因為 dirty map 是非線程安全的。
func (m *Map) missLocked() {
m.misses++
if m.misses < len(m.dirty) {
return
}
m.read.Store(readOnly{m: m.dirty})
m.dirty = nil
m.misses = 0
}
上面這段代碼是記錄 misses 次數的逝撬。只有當 misses 個數大于 dirty map 的長度的時候浴骂,會把 dirty map 存儲到 read map 中。并且把 dirty 置空宪潮,misses 次數也清零溯警。
在看 Store 操作之前,先說一個 expunged 變量狡相。
// expunged 是一個指向任意類型的指針梯轻,用來標記從 dirty map 中刪除的 entry
var expunged = unsafe.Pointer(new(interface{}))
expunged 變量是一個指針,用來標記從 dirty map 中刪除的 entry尽棕。
func (m *Map) Store(key, value interface{}) {
read, _ := m.read.Load().(readOnly)
// 從 read map 中讀取 key 失敗或者取出的 entry 嘗試存儲 value 失敗喳挑,直接返回
if e, ok := read.m[key]; ok && e.tryStore(&value) {
return
}
m.mu.Lock()
read, _ = m.read.Load().(readOnly)
if e, ok := read.m[key]; ok {
// e 指向的是非 nil 的
if e.unexpungeLocked() {
// entry 先前被刪除了,這就意味著存在一個非空的 dirty map 里面并沒有存儲這個 entry
m.dirty[key] = e
}
// 使用 storeLocked 函數之前,必須保證 e 沒有被清除
e.storeLocked(&value)
} else if e, ok := m.dirty[key]; ok {
// 已經存儲在 dirty map 中了伊诵,代表 e 沒有被清除
e.storeLocked(&value)
} else {
if !read.amended {
// 到這個 else 中就意味著单绑,當前的 key 是第一次被加到 dirty map 中。
// store 之前先判斷一下 dirty map 是否為空曹宴,如果為空搂橙,就把 read map 淺拷貝一次。
m.dirtyLocked()
m.read.Store(readOnly{m: read.m, amended: true})
}
// 在 dirty 中存儲 value
m.dirty[key] = newEntry(value)
}
m.mu.Unlock()
}
Store 優(yōu)先從 read map 里面去讀取 key 笛坦,然后存儲它的 value区转。如果 entry 是被標記為從 dirty map 中刪除過的,那么還需要重新存儲回 dirty map中版扩。
如果 read map 里面沒有相應的 key废离,就去 dirty map 里面去讀取。dirty map 就直接存儲對應的 value资厉。
最后如何 read map 和 dirty map 都沒有這個 key 值厅缺,這就意味著該 key 是第一次被加入到 dirty map 中。在 dirty map 中存儲這個 key 以及對應的 value宴偿。
// 當 entry 沒有被刪除的情況下去存儲一個 value湘捎。
// 如果 entry 被刪除了,tryStore 方法返回 false窄刘,并且保留 entry 不變
func (e *entry) tryStore(i *interface{}) bool {
p := atomic.LoadPointer(&e.p)
if p == expunged {
return false
}
for {
if atomic.CompareAndSwapPointer(&e.p, p, unsafe.Pointer(i)) {
return true
}
p = atomic.LoadPointer(&e.p)
if p == expunged {
return false
}
}
}
tryStore 函數的實現(xiàn)和 CAS 原理差不多窥妇,它會反復的循環(huán)判斷 entry 是否被標記成了 expunged,如果 entry 經過 CAS 操作成功的替換成了 i娩践,那么就返回 true活翩,反之如果被標記成了 expunged,就返回 false翻伺。
// unexpungeLocked 函數確保了 entry 沒有被標記成已被清除材泄。
// 如果 entry 先前被清除過了,那么在 mutex 解鎖之前吨岭,它一定要被加入到 dirty map 中
func (e *entry) unexpungeLocked() (wasExpunged bool) {
return atomic.CompareAndSwapPointer(&e.p, expunged, nil)
}
如果 entry 的 unexpungeLocked 返回為 true拉宗,那么就說明 entry 已經被標記成了 expunged,那么它就會經過 CAS 操作把它置為 nil辣辫。
再來看看刪除操作的實現(xiàn)旦事。
func (m *Map) Delete(key interface{}) {
read, _ := m.read.Load().(readOnly)
e, ok := read.m[key]
if !ok && read.amended {
// 由于 dirty map 是非線程安全的,所以操作前要加鎖
m.mu.Lock()
read, _ = m.read.Load().(readOnly)
e, ok = read.m[key]
if !ok && read.amended {
// 刪除 dirty map 中的 key
delete(m.dirty, key)
}
m.mu.Unlock()
}
if ok {
e.delete()
}
}
delete 操作的實現(xiàn)比較簡單急灭,如果 read map 中存在 key姐浮,就可以直接刪除,如果不存在 key 并且 dirty map 中有這個 key葬馋,那么就要刪除 dirty map 中的這個 key卖鲤。操作 dirty map 的時候記得先加上鎖進行保護肾扰。
func (e *entry) delete() (hadValue bool) {
for {
p := atomic.LoadPointer(&e.p)
if p == nil || p == expunged {
return false
}
if atomic.CompareAndSwapPointer(&e.p, p, nil) {
return true
}
}
}
刪除 entry 具體的實現(xiàn)如上。這個操作里面都是原子性操作扫尖。循環(huán)判斷 entry 是否為 nil 或者已經被標記成了 expunged白对,如果是這種情況就返回 false,代表刪除失敗换怖。否則就 CAS 操作,將 entry 的 p 指針置為 nil蟀瞧,并返回 true沉颂,代表刪除成功。
至此悦污,關于 Go 1.9 中自帶的線程安全的 sync.map 的實現(xiàn)就分析完了铸屉。官方的實現(xiàn)里面基本沒有用到鎖,互斥量的 lock 也是基于 CAS的切端。read map 也是原子性的彻坛。所以比之前加鎖的實現(xiàn)版本性能有所提升。
究竟 Lock - Free 的性能有多強呢踏枣?接下來做一下性能測試昌屉。
五. 性能對比
性能測試主要針對3個方面,Insert茵瀑,Get间驮,Delete。測試對象主要針對簡單加互斥鎖的原生 Map 马昨,分段加鎖的 Map竞帽,Lock - Free 的 Map 這三種進行性能測試。
性能測試的所有代碼已經放在 github 了鸿捧,地址在這里屹篓,性能測試用的指令是:
go test -v -run=^$ -bench . -benchmem
1. 插入 Insert 性能測試
// 插入不存在的 key (粗糙的鎖)
func BenchmarkSingleInsertAbsentBuiltInMap(b *testing.B) {
myMap = &MyMap{
m: make(map[string]interface{}, 32),
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
myMap.BuiltinMapStore(strconv.Itoa(i), "value")
}
}
// 插入不存在的 key (分段鎖)
func BenchmarkSingleInsertAbsent(b *testing.B) {
m := New()
b.ResetTimer()
for i := 0; i < b.N; i++ {
m.Set(strconv.Itoa(i), "value")
}
}
// 插入不存在的 key (syncMap)
func BenchmarkSingleInsertAbsentSyncMap(b *testing.B) {
syncMap := &sync.Map{}
b.ResetTimer()
for i := 0; i < b.N; i++ {
syncMap.Store(strconv.Itoa(i), "value")
}
}
測試結果:
BenchmarkSingleInsertAbsentBuiltInMap-4 2000000 857 ns/op 170 B/op 1 allocs/op
BenchmarkSingleInsertAbsent-4 2000000 651 ns/op 170 B/op 1 allocs/op
BenchmarkSingleInsertAbsentSyncMap-4 1000000 1094 ns/op 187 B/op 5 allocs/op
實驗結果是分段鎖的性能最高。這里說明一下測試結果匙奴,-4代表測試用了4核 CPU 堆巧,2000000 代表循環(huán)次數,857 ns/op 代表的是平均每次執(zhí)行花費的時間饥脑,170 B/op 代表的是每次執(zhí)行堆上分配內存總數恳邀,allocs/op 代表的是每次執(zhí)行堆上分配內存次數。
這樣看來灶轰,循環(huán)次數越多谣沸,花費時間越少,分配內存總數越小笋颤,分配內存次數越少乳附,性能就越好内地。下面的性能圖表中去除掉了第一列循環(huán)次數,只花了剩下的3項赋除,所以條形圖越短的性能越好阱缓。以下的每張條形圖的規(guī)則和測試結果代表的意義都和這里一樣,下面就不再贅述了举农。
// 插入存在 key (粗糙鎖)
func BenchmarkSingleInsertPresentBuiltInMap(b *testing.B) {
myMap = &MyMap{
m: make(map[string]interface{}, 32),
}
myMap.BuiltinMapStore("key", "value")
b.ResetTimer()
for i := 0; i < b.N; i++ {
myMap.BuiltinMapStore("key", "value")
}
}
// 插入存在 key (分段鎖)
func BenchmarkSingleInsertPresent(b *testing.B) {
m := New()
m.Set("key", "value")
b.ResetTimer()
for i := 0; i < b.N; i++ {
m.Set("key", "value")
}
}
// 插入存在 key (syncMap)
func BenchmarkSingleInsertPresentSyncMap(b *testing.B) {
syncMap := &sync.Map{}
syncMap.Store("key", "value")
b.ResetTimer()
for i := 0; i < b.N; i++ {
syncMap.Store("key", "value")
}
}
測試結果:
BenchmarkSingleInsertPresentBuiltInMap-4 20000000 74.6 ns/op 0 B/op 0 allocs/op
BenchmarkSingleInsertPresent-4 20000000 61.1 ns/op 0 B/op 0 allocs/op
BenchmarkSingleInsertPresentSyncMap-4 20000000 108 ns/op 16 B/op 1 allocs/op
從圖中可以看出荆针,sync.map 在涉及到 Store 這一項的均比其他兩者的性能差。不管插入不存在的 Key 還是存在的 Key颁糟,分段鎖的性能均是目前最好的航背。
2. 讀取 Get 性能測試
// 讀取存在 key (粗糙鎖)
func BenchmarkSingleGetPresentBuiltInMap(b *testing.B) {
myMap = &MyMap{
m: make(map[string]interface{}, 32),
}
myMap.BuiltinMapStore("key", "value")
b.ResetTimer()
for i := 0; i < b.N; i++ {
myMap.BuiltinMapLookup("key")
}
}
// 讀取存在 key (分段鎖)
func BenchmarkSingleGetPresent(b *testing.B) {
m := New()
m.Set("key", "value")
b.ResetTimer()
for i := 0; i < b.N; i++ {
m.Get("key")
}
}
// 讀取存在 key (syncMap)
func BenchmarkSingleGetPresentSyncMap(b *testing.B) {
syncMap := &sync.Map{}
syncMap.Store("key", "value")
b.ResetTimer()
for i := 0; i < b.N; i++ {
syncMap.Load("key")
}
}
測試結果:
BenchmarkSingleGetPresentBuiltInMap-4 20000000 71.5 ns/op 0 B/op 0 allocs/op
BenchmarkSingleGetPresent-4 30000000 42.3 ns/op 0 B/op 0 allocs/op
BenchmarkSingleGetPresentSyncMap-4 30000000 40.3 ns/op 0 B/op 0 allocs/op
從圖中可以看出,sync.map 在 Load 這一項的性能非常優(yōu)秀棱貌,遠高于其他兩者玖媚。
3. 并發(fā)插入讀取混合性能測試
接下來的實現(xiàn)就涉及到了并發(fā)插入和讀取了。由于分段鎖實現(xiàn)的特殊性婚脱,分段個數會多多少少影響到性能今魔,那么接下來的實驗就會對分段鎖分1,16障贸,32错森,256 這4段進行測試,分別看看性能變化如何惹想,其他兩種線程安全的 Map 不變问词。
由于并發(fā)的代碼太多了,這里就不貼出來了嘀粱,感興趣的同學可以看這里
下面就直接放出測試結果:
并發(fā)插入不存在的 Key 值
BenchmarkMultiInsertDifferentBuiltInMap-4 1000000 2359 ns/op 330 B/op 11 allocs/op
BenchmarkMultiInsertDifferent_1_Shard-4 1000000 2039 ns/op 330 B/op 11 allocs/op
BenchmarkMultiInsertDifferent_16_Shard-4 1000000 1937 ns/op 330 B/op 11 allocs/op
BenchmarkMultiInsertDifferent_32_Shard-4 1000000 1944 ns/op 330 B/op 11 allocs/op
BenchmarkMultiInsertDifferent_256_Shard-4 1000000 1991 ns/op 331 B/op 11 allocs/op
BenchmarkMultiInsertDifferentSyncMap-4 1000000 3760 ns/op 635 B/op 33 allocs/op
從圖中可以看出激挪,sync.map 在涉及到 Store 這一項的均比其他兩者的性能差。并發(fā)插入不存在的 Key锋叨,分段鎖劃分的 Segment 多少與性能沒有關系垄分。
并發(fā)插入存在的 Key 值
BenchmarkMultiInsertSameBuiltInMap-4 1000000 1182 ns/op 160 B/op 10 allocs/op
BenchmarkMultiInsertSame-4 1000000 1091 ns/op 160 B/op 10 allocs/op
BenchmarkMultiInsertSameSyncMap-4 1000000 1809 ns/op 480 B/op 30 allocs/op
從圖中可以看出,sync.map 在涉及到 Store 這一項的均比其他兩者的性能差娃磺。
并發(fā)的讀取存在的 Key 值
BenchmarkMultiGetSameBuiltInMap-4 2000000 767 ns/op 0 B/op 0 allocs/op
BenchmarkMultiGetSame-4 3000000 481 ns/op 0 B/op 0 allocs/op
BenchmarkMultiGetSameSyncMap-4 3000000 464 ns/op 0 B/op 0 allocs/op
從圖中可以看出薄湿,sync.map 在 Load 這一項的性能遠超多其他兩者。
并發(fā)插入讀取不存在的 Key 值
BenchmarkMultiGetSetDifferentBuiltInMap-4 1000000 3281 ns/op 337 B/op 12 allocs/op
BenchmarkMultiGetSetDifferent_1_Shard-4 1000000 3007 ns/op 338 B/op 12 allocs/op
BenchmarkMultiGetSetDifferent_16_Shard-4 500000 2662 ns/op 337 B/op 12 allocs/op
BenchmarkMultiGetSetDifferent_32_Shard-4 1000000 2732 ns/op 337 B/op 12 allocs/op
BenchmarkMultiGetSetDifferent_256_Shard-4 1000000 2788 ns/op 339 B/op 12 allocs/op
BenchmarkMultiGetSetDifferentSyncMap-4 300000 8990 ns/op 1104 B/op 34 allocs/op
從圖中可以看出偷卧,sync.map 在涉及到 Store 這一項的均比其他兩者的性能差豺瘤。并發(fā)插入讀取不存在的 Key,分段鎖劃分的 Segment 多少與性能沒有關系听诸。
并發(fā)插入讀取存在的 Key 值
BenchmarkMultiGetSetBlockBuiltInMap-4 1000000 2095 ns/op 160 B/op 10 allocs/op
BenchmarkMultiGetSetBlock_1_Shard-4 1000000 1712 ns/op 160 B/op 10 allocs/op
BenchmarkMultiGetSetBlock_16_Shard-4 1000000 1730 ns/op 160 B/op 10 allocs/op
BenchmarkMultiGetSetBlock_32_Shard-4 1000000 1645 ns/op 160 B/op 10 allocs/op
BenchmarkMultiGetSetBlock_256_Shard-4 1000000 1619 ns/op 160 B/op 10 allocs/op
BenchmarkMultiGetSetBlockSyncMap-4 500000 2660 ns/op 480 B/op 30 allocs/op
從圖中可以看出坐求,sync.map 在涉及到 Store 這一項的均比其他兩者的性能差。并發(fā)插入讀取存在的 Key晌梨,分段鎖劃分的 Segment 越小桥嗤,性能越好须妻!
4. 刪除 Delete 性能測試
// 刪除存在 key (粗糙鎖)
func BenchmarkDeleteBuiltInMap(b *testing.B) {
myMap = &MyMap{
m: make(map[string]interface{}, 32),
}
b.RunParallel(func(pb *testing.PB) {
r := rand.New(rand.NewSource(time.Now().Unix()))
for pb.Next() {
// The loop body is executed b.N times total across all goroutines.
k := r.Intn(100000000)
myMap.BuiltinMapDelete(strconv.Itoa(k))
}
})
}
// 刪除存在 key (分段鎖)
func BenchmarkDelete(b *testing.B) {
m := New()
b.RunParallel(func(pb *testing.PB) {
r := rand.New(rand.NewSource(time.Now().Unix()))
for pb.Next() {
// The loop body is executed b.N times total across all goroutines.
k := r.Intn(100000000)
m.Remove(strconv.Itoa(k))
}
})
}
// 刪除存在 key (syncMap)
func BenchmarkDeleteSyncMap(b *testing.B) {
syncMap := &sync.Map{}
b.RunParallel(func(pb *testing.PB) {
r := rand.New(rand.NewSource(time.Now().Unix()))
for pb.Next() {
// The loop body is executed b.N times total across all goroutines.
k := r.Intn(100000000)
syncMap.Delete(strconv.Itoa(k))
}
})
}
測試結果:
BenchmarkDeleteBuiltInMap-4 10000000 130 ns/op 8 B/op 1 allocs/op
BenchmarkDelete-4 20000000 76.7 ns/op 8 B/op 1 allocs/op
BenchmarkDeleteSyncMap-4 30000000 45.4 ns/op 8 B/op 0 allocs/op
從圖中可以看出,sync.map 在 Delete 這一項是完美的超過其他兩者的泛领。
六. 總結
本文從線程安全理論基礎開始講了線程安全中一些處理方法荒吏。其中涉及到互斥量和條件變量相關知識。從 Lock 的方案談到了 Lock - Free 的 CAS 相關方案渊鞋。最后針對 Go 1.9 新加的 sync.map 進行了源碼分析和性能測試绰更。
采用了 Lock - Free 方案的 sync.map 測試結果并沒有想象中的那么出色。除了 Load 和 Delete 這兩項遠遠甩開其他兩者锡宋,凡是涉及到 Store 相關操作的性能均低于其他兩者 Map 的實現(xiàn)动知。不過這也是有原因的。
縱觀 Java ConcurrentHashmap 一路的變化:
JDK 6,7 中的 ConcurrentHashmap 主要使用 Segment 來實現(xiàn)減小鎖粒度员辩,把 HashMap 分割成若干個 Segment,在 put 的時候需要鎖住 Segment鸵鸥,get 時候不加鎖奠滑,使用 volatile 來保證可見性,當要統(tǒng)計全局時(比如size)妒穴,首先會嘗試多次計算 modcount 來確定宋税,這幾次嘗試中,是否有其他線程進行了修改操作讼油,如果沒有杰赛,則直接返回 size。如果有矮台,則需要依次鎖住所有的 Segment 來計算乏屯。
JDK 7 中 ConcurrentHashmap 中,當長度過長碰撞會很頻繁瘦赫,鏈表的增改刪查操作都會消耗很長的時間辰晕,影響性能,所以 JDK8 中完全重寫了concurrentHashmap,代碼量從原來的1000多行變成了 6000多行确虱,實現(xiàn)上也和原來的分段式存儲有很大的區(qū)別含友。
JDK 8 的 ConcurrentHashmap 主要設計上的變化有以下幾點:
- 不采用 Segment 而采用 node,鎖住 node 來實現(xiàn)減小鎖粒度校辩。
- 設計了 MOVED 狀態(tài) 當 Resize 的中過程中線程2還在 put 數據窘问,線程2會幫助 resize。
- 使用3個 CAS 操作來確保 node 的一些操作的原子性宜咒,這種方式代替了鎖惠赫。
- sizeCtl 的不同值來代表不同含義,起到了控制的作用荧呐。
可見 Go 1.9 一上來第一個版本就直接摒棄了 Segment 的做法汉形,采取了 CAS 這種 Lock - Free 的方案提高性能纸镊。但是它并沒有對整個字典進行類似 Java 的 Node 的設計。但是整個 sync.map 在 ns/op 概疆,B/op逗威,allocs/op 這三個性能指標上是普通原生非線程安全 Map 的三倍!
不過相信 Google 應該還會繼續(xù)優(yōu)化這部分吧岔冀,畢竟源碼里面還有幾處 TODO 呢凯旭,讓我們一起其他 Go 未來版本的發(fā)展吧,筆者也會一直持續(xù)關注的使套。
(在本篇文章截稿的時候罐呼,筆者又突然發(fā)現(xiàn)了一種分段鎖的 Map 實現(xiàn),性能更高侦高,它具有負載均衡等特點嫉柴,應該是目前筆者見到的性能最好的 Go 語言實現(xiàn)的線程安全的 Map ,關于它的實現(xiàn)源碼分析就只能放在下篇博文單獨寫一篇或者以后有空再分析啦)
Reference:
《Go 并發(fā)實戰(zhàn)編程》
Split-Ordered Lists: Lock-Free Extensible Hash Tables
Semaphores are Surprisingly Versatile
線程安全
JAVA CAS原理深度分析
Java ConcurrentHashMap 總結
GitHub Repo:Halfrost-Field
Follow: halfrost · GitHub