轉(zhuǎn):golang實(shí)現(xiàn)線程安全的map

轉(zhuǎn)載自:https://halfrost.com/go_map_chapter_one/

https://halfrost.com/go_map_chapter_two/

Map 是一種很常見的數(shù)據(jù)結(jié)構(gòu),用于存儲(chǔ)一些無序的鍵值對(duì)。在主流的編程語言中,默認(rèn)就自帶它的實(shí)現(xiàn)感憾。C锐涯、C++ 中的 STL 就實(shí)現(xiàn)了 Map懊亡,JavaScript 中也有 Map摇锋,Java 中有 HashMap,Swift 和 Python 中有 Dictionary赶盔,Go 中有 Map,Objective-C 中有 NSDictionary榆浓、NSMutableDictionary于未。

上面這些 Map 都是線程安全的么?答案是否定的陡鹃,并非全是線程安全的烘浦。那如何能實(shí)現(xiàn)一個(gè)線程安全的 Map 呢?想回答這個(gè)問題萍鲸,需要先從如何實(shí)現(xiàn)一個(gè) Map 說起闷叉。

一. 選用什么數(shù)據(jù)結(jié)構(gòu)實(shí)現(xiàn) Map ?

Map 是一個(gè)非常常用的數(shù)據(jù)結(jié)構(gòu)脊阴,一個(gè)無序的 key/value 對(duì)的集合仔役,其中 Map 所有的 key 都是不同的褐捻,然后通過給定的 key 可以在常數(shù)時(shí)間 O(1) 復(fù)雜度內(nèi)查找狸驳、更新或刪除對(duì)應(yīng)的 value。

要想實(shí)現(xiàn)常數(shù)級(jí)的查找状知,應(yīng)該用什么來實(shí)現(xiàn)呢?讀者應(yīng)該很快會(huì)想到哈希表孽查。確實(shí)饥悴,Map 底層一般都是使用數(shù)組來實(shí)現(xiàn),會(huì)借用哈希算法輔助盲再。對(duì)于給定的 key西设,一般先進(jìn)行 hash 操作,然后相對(duì)哈希表的長(zhǎng)度取模答朋,將 key 映射到指定的地方贷揽。

哈希算法有很多種,選哪一種更加高效呢梦碗?

1. 哈希函數(shù)

MD5 和 SHA1 可以說是目前應(yīng)用最廣泛的 Hash 算法禽绪,而它們都是以 MD4 為基礎(chǔ)設(shè)計(jì)的。

MD4(RFC 1320) 是 MIT 的Ronald L. Rivest 在 1990 年設(shè)計(jì)的洪规,MD 是 Message Digest(消息摘要) 的縮寫印屁。它適用在32位字長(zhǎng)的處理器上用高速軟件實(shí)現(xiàn)——它是基于 32位操作數(shù)的位操作來實(shí)現(xiàn)的。

MD5(RFC 1321) 是 Rivest 于1991年對(duì) MD4 的改進(jìn)版本斩例。它對(duì)輸入仍以512位分組雄人,其輸出是4個(gè)32位字的級(jí)聯(lián),與 MD4 相同念赶。MD5 比 MD4 來得復(fù)雜础钠,并且速度較之要慢一點(diǎn),但更安全叉谜,在抗分析和抗差分方面表現(xiàn)更好旗吁。

SHA1 是由 NIST NSA 設(shè)計(jì)為同 DSA 一起使用的,它對(duì)長(zhǎng)度小于264的輸入正罢,產(chǎn)生長(zhǎng)度為160bit 的散列值阵漏,因此抗窮舉 (brute-force)

性更好。SHA-1 設(shè)計(jì)時(shí)基于和 MD4 相同原理,并且模仿了該算法翻具。

常用的 hash 函數(shù)有 SHA-1履怯,SHA-256,SHA-512裆泳,MD5 叹洲。這些都是經(jīng)典的 hash 算法。在現(xiàn)代化生產(chǎn)中工禾,還會(huì)用到現(xiàn)代的 hash 算法运提。下面列舉幾個(gè)蝗柔,進(jìn)行性能對(duì)比,最后再選其中一個(gè)源碼分析一下實(shí)現(xiàn)過程民泵。

(1) Jenkins Hash 和 SpookyHash

1997年Bob Jenkins在《 Dr. Dobbs Journal》雜志上發(fā)表了一片關(guān)于散列函數(shù)的文章《A hash function for hash Table lookup》癣丧。這篇文章中,Bob 廣泛收錄了很多已有的散列函數(shù)栈妆,這其中也包括了他自己所謂的“l(fā)ookup2”胁编。隨后在2006年,Bob 發(fā)布了lookup3鳞尔。lookup3 即為 Jenkins Hash嬉橙。更多有關(guān) Bob’s 散列函數(shù)的內(nèi)容請(qǐng)參閱維基百科:Jenkins hash function。memcached的 hash 算法寥假,支持兩種算法:jenkins, murmur3市框,默認(rèn)是 jenkins。

2011年 Bob Jenkins 發(fā)布了他自己的一個(gè)新散列函數(shù)

SpookyHash(這樣命名是因?yàn)樗窃谌f圣節(jié)發(fā)布的)糕韧。它們都擁有2倍于 MurmurHash 的速度枫振,但他們都只使用了64位數(shù)學(xué)函數(shù)而沒有32位版本,SpookyHash 給出128位輸出萤彩。

(2) MurmurHash

MurmurHash?是一種非加密哈希函數(shù)蒋得,適用于一般的哈希檢索操作。

Austin Appleby 在2008年發(fā)布了一個(gè)新的散列函數(shù)——MurmurHash乒疏。其最新版本大約是 lookup3 速度的2倍(大約為1 byte/cycle),它有32位和64位兩個(gè)版本饮焦。32位版本只使用32位數(shù)學(xué)函數(shù)并給出一個(gè)32位的哈希值怕吴,而64位版本使用了64位的數(shù)學(xué)函數(shù),并給出64位哈希值县踢。根據(jù)Austin的分析转绷,MurmurHash具有優(yōu)異的性能,雖然 Bob Jenkins 在《Dr. Dobbs article》雜志上聲稱“我預(yù)測(cè) MurmurHash 比起lookup3要弱硼啤,但是我不知道具體值议经,因?yàn)槲疫€沒測(cè)試過它”。MurmurHash能夠迅速走紅得益于其出色的速度和統(tǒng)計(jì)特性谴返。當(dāng)前的版本是MurmurHash3煞肾,Redis、Memcached嗓袱、Cassandra籍救、HBase、Lucene都在使用它渠抹。

下面是用 C 實(shí)現(xiàn) MurmurHash 的版本:

C

uint32_tmurmur3_32(constchar*key,uint32_tlen,uint32_tseed){

staticconstuint32_tc1=0xcc9e2d51;

staticconstuint32_tc2=0x1b873593;

staticconstuint32_tr1=15;

staticconstuint32_tr2=13;

staticconstuint32_tm=5;

staticconstuint32_tn=0xe6546b64;

uint32_thash=seed;

constintnblocks=len/4;

constuint32_t*blocks=(constuint32_t*)key;

inti;

for(i=0;i

uint32_tk=blocks[i];

k*=c1;

k=(k<>(32-r1));

k*=c2;

hash^=k;

hash=((hash<>(32-r2)))*m+n;

}

constuint8_t*tail=(constuint8_t*)(key+nblocks*4);

uint32_tk1=0;

switch(len&3){

case3:

k1^=tail[2]<<16;

case2:

k1^=tail[1]<<8;

case1:

k1^=tail[0];

k1*=c1;

k1=(k1<>(32-r1));

k1*=c2;

hash^=k1;

}

hash^=len;

hash^=(hash>>16);

hash*=0x85ebca6b;

hash^=(hash>>13);

hash*=0xc2b2ae35;

hash^=(hash>>16);

returnhash;

}

(3) CityHash 和 FramHash

這兩種算法都是 Google 發(fā)布的字符串算法蝙昙。

CityHash是2011年 Google 發(fā)布的字符串散列算法闪萄,和 murmurhash 一樣,屬于非加密型 hash 算法奇颠。CityHash 算法的開發(fā)是受到了 MurmurHash 的啟發(fā)败去。其主要優(yōu)點(diǎn)是大部分步驟包含了至少兩步獨(dú)立的數(shù)學(xué)運(yùn)算。現(xiàn)代 CPU 通常能從這種代碼獲得最佳性能烈拒。CityHash 也有其缺點(diǎn):代碼較同類流行算法復(fù)雜圆裕。Google 希望為速度而不是為了簡(jiǎn)單而優(yōu)化,因此沒有照顧較短輸入的特例缺菌。Google發(fā)布的有兩種算法:cityhash64 與 cityhash128葫辐。它們分別根據(jù)字串計(jì)算 64 和 128 位的散列值。這些算法不適用于加密伴郁,但適合用在散列表等處耿战。CityHash 的速度取決于CRC32 指令,目前為SSE 4.2(Intel Nehalem及以后版本)焊傅。

相比 Murmurhash 支持32剂陡、64、128bit狐胎, Cityhash 支持64鸭栖、128、256bit 握巢。

2014年 Google 又發(fā)布了FarmHash晕鹊,一個(gè)新的用于字符串的哈希函數(shù)系列。FarmHash 從 CityHash 繼承了許多技巧和技術(shù)暴浦,是它的后繼溅话。FarmHash 有多個(gè)目標(biāo),聲稱從多個(gè)方面改進(jìn)了 CityHash歌焦。與 CityHash 相比飞几,F(xiàn)armHash 的另一項(xiàng)改進(jìn)是在多個(gè)特定于平臺(tái)的實(shí)現(xiàn)之上提供了一個(gè)接口。這樣独撇,當(dāng)開發(fā)人員只是想要一個(gè)用于哈希表的屑墨、快速健壯的哈希函數(shù),而不需要在每個(gè)平臺(tái)上都一樣時(shí)纷铣,F(xiàn)armHash 也能滿足要求卵史。目前,F(xiàn)armHash 只包含在32关炼、64和128位平臺(tái)上用于字節(jié)數(shù)組的哈希函數(shù)程腹。未來開發(fā)計(jì)劃包含了對(duì)整數(shù)、元組和其它數(shù)據(jù)的支持儒拂。

(4) xxHash

xxHash 是由 Yann Collet 創(chuàng)建的非加密哈希函數(shù)寸潦。它最初用于 LZ4 壓縮算法色鸳,作為最終的錯(cuò)誤檢查簽名的。該 hash 算法的速度接近于 RAM 的極限见转。并給出了32位和64位的兩個(gè)版本∶福現(xiàn)在它被廣泛使用在PrestoDBRocksDB斩箫、MySQL吏砂、ArangoDBPGroonga乘客、Spark這些數(shù)據(jù)庫(kù)中给梅,還用在了Cocos2D踱阿、DolphinCxbx-reloaded這些游戲框架中,

下面這有一個(gè)性能對(duì)比的實(shí)驗(yàn)谋梭。測(cè)試環(huán)境是Open-Source SMHasher program by Austin Appleby疾就,它是在 Windows 7 上通過 Visual C 編譯出來的肩碟,并且它只有唯一一個(gè)線程贸辈。CPU 內(nèi)核是 Core 2 Duo @3.0GHz。

上表里面的 hash 函數(shù)并不是所有的 hash 函數(shù)碰逸,只列舉了一些常見的算法乡小。第二欄是速度的對(duì)比,可以看出來速度最快的是 xxHash 饵史。第三欄是哈希的質(zhì)量满钟,哈希質(zhì)量最高的有5個(gè),全是5星胳喷,xxHash零远、MurmurHash 3a、CityHash64厌蔽、MD5-32、SHA1-32 摔癣。從表里的數(shù)據(jù)看奴饮,哈希質(zhì)量最高,速度最快的還是 xxHash择浊。

(4) memhash

這個(gè)哈希算法筆者沒有在網(wǎng)上找到很明確的作者信息戴卜。只在 Google 的 Go 的文檔上有這么幾行注釋,說明了它的靈感來源:

Go

// Hashing algorithm inspired by

//? xxhash: https://code.google.com/p/xxhash/

// cityhash: https://code.google.com/p/cityhash/

它說 memhash 的靈感來源于 xxhash 和 cityhash琢岩。那么接下來就來看看 memhash 是怎么對(duì)字符串進(jìn)行哈希的投剥。

Go

const(

// Constants for multiplication: four random odd 32-bit numbers.

m1=3168982561

m2=3339683297

m3=832293441

m4=2336365089

)

func memhash(p unsafe.Pointer, seed, s uintptr) uintptr{

ifGOARCH=="386"&&GOOS!="nacl"&&useAeshash{

returnaeshash(p,seed,s)

}

h:=uint32(seed+s*hashkey[0])

tail:

switch{

cases==0:

cases<4:

h^=uint32(*(*byte)(p))

h^=uint32(*(*byte)(add(p,s>>1)))<<8

h^=uint32(*(*byte)(add(p,s-1)))<<16

h=rotl_15(h*m1)*m2

cases==4:

h^=readUnaligned32(p)

h=rotl_15(h*m1)*m2

cases<=8:

h^=readUnaligned32(p)

h=rotl_15(h*m1)*m2

h^=readUnaligned32(add(p,s-4))

h=rotl_15(h*m1)*m2

cases<=16:

h^=readUnaligned32(p)

h=rotl_15(h*m1)*m2

h^=readUnaligned32(add(p,4))

h=rotl_15(h*m1)*m2

h^=readUnaligned32(add(p,s-8))

h=rotl_15(h*m1)*m2

h^=readUnaligned32(add(p,s-4))

h=rotl_15(h*m1)*m2

default:

v1:=h

v2:=uint32(seed*hashkey[1])

v3:=uint32(seed*hashkey[2])

v4:=uint32(seed*hashkey[3])

fors>=16{

v1^=readUnaligned32(p)

v1=rotl_15(v1*m1)*m2

p=add(p,4)

v2^=readUnaligned32(p)

v2=rotl_15(v2*m2)*m3

p=add(p,4)

v3^=readUnaligned32(p)

v3=rotl_15(v3*m3)*m4

p=add(p,4)

v4^=readUnaligned32(p)

v4=rotl_15(v4*m4)*m1

p=add(p,4)

s-=16

}

h=v1^v2^v3^v4

gototail

}

h^=h>>17

h*=m3

h^=h>>13

h*=m4

h^=h>>16

returnuintptr(h)

}

// Note: in order to get the compiler to issue rotl instructions, we

// need to constant fold the shift amount by hand.

//TODO:convince the compiler to issue rotl instructions after inlining.

func rotl_15(x uint32) uint32{

return(x<<15)|(x>>(32-15))

}

m1、m2担孔、m3江锨、m4 是4個(gè)隨機(jī)選的奇數(shù)吃警,作為哈希的乘法因子。

Go

// used in hash{32,64}.go to seed the hash function

varhashkey[4]uintptr

func alginit(){

// Install aes hash algorithm if we have the instructions we need

if(GOARCH=="386"||GOARCH=="amd64")&&

GOOS!="nacl"&&

cpuid_ecx&(1<<25)!=0&&// aes (aesenc)

cpuid_ecx&(1<<9)!=0&&// sse3 (pshufb)

cpuid_ecx&(1<<19)!=0{// sse4.1 (pinsr{d,q})

useAeshash=true

algarray[alg_MEM32].hash=aeshash32

algarray[alg_MEM64].hash=aeshash64

algarray[alg_STRING].hash=aeshashstr

// Initialize with random data so hash collisions will be hard to engineer.

getRandomData(aeskeysched[:])

return

}

getRandomData((*[len(hashkey)*sys.PtrSize]byte)(unsafe.Pointer(&hashkey))[:])

hashkey[0]|=1// make sure these numbers are odd

hashkey[1]|=1

hashkey[2]|=1

hashkey[3]|=1

}

在這個(gè)初始化的函數(shù)中啄育,初始化了2個(gè)數(shù)組酌心,數(shù)組里面裝的都是隨機(jī)的 hashkey。在 386挑豌、 amd64安券、非 nacl 的平臺(tái)上,會(huì)用 aeshash 氓英。這里會(huì)把隨機(jī)的 key 生成好侯勉,存入到 aeskeysched 數(shù)組中。同理铝阐,hashkey 數(shù)組里面也會(huì)隨機(jī)好4個(gè)數(shù)字址貌。最后都按位與了一個(gè)1,就是為了保證生成出來的隨機(jī)數(shù)都是奇數(shù)饰迹。

接下來舉個(gè)例子芳誓,來看看 memhash 究竟是如何計(jì)算哈希值的。

Go

func main(){

r:=[8]byte{'h','a','l','f','r','o','s','t'}

pp:=memhashpp(unsafe.Pointer(&r),3,7)

fmt.Println(pp)

}

為了簡(jiǎn)單起見啊鸭,這里用筆者的名字為例算出哈希值锹淌,種子簡(jiǎn)單一點(diǎn)設(shè)置成3。

第一步計(jì)算 h 的值赠制。

Go

h:=uint32(seed+s*hashkey[0])

這里假設(shè) hashkey[0] = 1赂摆,那么 h 的值為 3 + 7 * 1 = 10 。由于 s < 8钟些,那么就會(huì)進(jìn)行以下的處理:

Go

cases<=8:

h^=readUnaligned32(p)

h=rotl_15(h*m1)*m2

h^=readUnaligned32(add(p,s-4))

h=rotl_15(h*m1)*m2

readUnaligned32()函數(shù)會(huì)把傳入的 unsafe.Pointer 指針進(jìn)行2次轉(zhuǎn)換烟号,先轉(zhuǎn)成 *uint32 類型,然后再轉(zhuǎn)成 *(*uint32) 類型政恍。

接著進(jìn)行異或操作:

接著第二步 h * m1 = 1718378850 * 3168982561 = 3185867170

由于是32位的乘法汪拥,最終結(jié)果是64位的,高32位溢出篙耗,直接舍棄迫筑。

乘出來的結(jié)果當(dāng)做 rotl_15() 入?yún)ⅰ?/p>

Go

func rotl_15(x uint32) uint32{

return(x<<15)|(x>>(32-15))

}

這個(gè)函數(shù)里面對(duì)入?yún)⑦M(jìn)行了兩次位移操作。

最后將兩次位移的結(jié)果進(jìn)行邏輯或運(yùn)算:

接著再進(jìn)行一次 readUnaligned32() 轉(zhuǎn)換:

轉(zhuǎn)換完再進(jìn)行一次異或宗弯。此時(shí) h = 2615762644脯燃。

然后還要再進(jìn)行一次 rotl_15() 變換。這里就不畫圖演示了蒙保。變換完成以后 h = 2932930721辕棚。

最后執(zhí)行 hash 的最后一步:

Go

h^=h>>17

h*=m3

h^=h>>13

h*=m4

h^=h>>16

先右移17位,然后異或,再乘以m3逝嚎,再右移13位扁瓢,再異或,再乘以m4懈糯,再右移16位涤妒,最后再異或。

通過這樣一系列的操作赚哗,最后就能生成出 hash 值了她紫。最后 h = 1870717864。感興趣的同學(xué)可以算一算屿储。

(5)AES Hash

在上面分析 Go 的 hash 算法的時(shí)候贿讹,我們可以看到它對(duì) CPU 是否支持 AES 指令集進(jìn)行了判斷,當(dāng) CPU 支持 AES 指令集的時(shí)候够掠,它會(huì)選用 AES Hash 算法民褂,當(dāng) CPU 不支持 AES 指令集的時(shí)候,換成 memhash 算法疯潭。

AES 指令集全稱是高級(jí)加密標(biāo)準(zhǔn)指令集(或稱英特爾高級(jí)加密標(biāo)準(zhǔn)新指令赊堪,簡(jiǎn)稱AES-NI)是一個(gè)x86指令集架構(gòu)的擴(kuò)展,用于IntelAMD微處理器竖哩。

利用 AES 實(shí)現(xiàn) Hash 算法性能會(huì)很優(yōu)秀哭廉,因?yàn)樗芴峁┯布铀佟?/p>

具體代碼實(shí)現(xiàn)如下,匯編程序相叁,注釋見下面程序中:

C

// aes hash 算法通過 AES 硬件指令集實(shí)現(xiàn)

TEXT runtime·aeshash(SB),NOSPLIT,$0-32

MOVQ? ? p+0(FP),AX// 把ptr移動(dòng)到data數(shù)據(jù)段中

MOVQ? ? s+16(FP),CX// 長(zhǎng)度

LEAQ? ? ret+24(FP),DX

JMP runtime·aeshashbody(SB)

TEXT runtime·aeshashstr(SB),NOSPLIT,$0-24

MOVQ? ? p+0(FP),AX// 把ptr移動(dòng)到字符串的結(jié)構(gòu)體中

MOVQ8(AX),CX// 字符串長(zhǎng)度

MOVQ(AX),AX// 字符串的數(shù)據(jù)

LEAQ? ? ret+16(FP),DX

JMP runtime·aeshashbody(SB)

最終的 hash 的實(shí)現(xiàn)都在 aeshashbody 中:

C

// AX: 數(shù)據(jù)

// CX: 長(zhǎng)度

// DX: 返回的地址

TEXT runtime·aeshashbody(SB),NOSPLIT,$0-0

// SSE 寄存器中裝填入我們的隨機(jī)數(shù)種子

MOVQ? ? h+8(FP),X0// 每個(gè)table中hash種子有64 位

PINSRW? $4,CX,X0// 長(zhǎng)度占16位

PSHUFHW $0,X0,X0// 壓縮高位字亂序遵绰,重復(fù)長(zhǎng)度4次

MOVO? ? X0,X1// 保存加密前的種子

PXOR? ? runtime·aeskeysched(SB),X0// 對(duì)每一個(gè)處理中的種子進(jìn)行邏輯異或

AESENC? X0,X0// 加密種子

CMPQ? ? CX,$16

? ? JB? aes0to15

? ? JE? aes16

CMPQ? ? CX,$32

? ? JBE aes17to32

CMPQ? ? CX,$64

? ? JBE aes33to64

CMPQ? ? CX,$128

? ? JBE aes65to128

? ? JMP aes129plus

// aes 從 0 - 15

aes0to15:

TESTQ? CX,CX

? ? JE? aes0

ADDQ? ? $16,AX

TESTW? $0xff0,AX

? ? JE? endofpage

//當(dāng)前加載的16位字節(jié)的地址不會(huì)越過一個(gè)頁面邊界,所以我們可以直接加載它增淹。

MOVOU-16(AX),X1

ADDQ? ? CX,CX

MOVQ? ? $masks<>(SB),AX

PAND(AX)(CX*8), X1

final1:?

PXOR? ? X0, X1? // 異或數(shù)據(jù)和種子

AESENC? X1, X1? // 連續(xù)加密3次

AESENC? X1, X1

? ? AESENC? X1, X1

? ? MOVQ? ? X1, (DX)

? ? RET

endofpage:?

// 地址結(jié)尾是1111xxxx椿访。 這樣就可能超過一個(gè)頁面邊界,所以在加載完最后一個(gè)字節(jié)后停止加載虑润。然后使用pshufb將字節(jié)向下移動(dòng)成玫。

MOVOU-32(AX)(CX*1), X1

? ? ADDQ? ? CX, CX

? ? MOVQ? ? $shifts<>(SB), AX

PSHUFB(AX)(CX*8), X1

? ? JMP final1

aes0:?

// 返回輸入的并且已經(jīng)加密過的種子

AESENC? X0, X0

? ? MOVQ? ? X0, (DX)

? ? RET

aes16:?

MOVOU(AX), X1

? ? JMP final1

aes17to32:?

// 開始處理第二個(gè)起始種子

PXOR? ? runtime·aeskeysched+16(SB), X1

? ? AESENC? X1, X1

// 加載要被哈希算法處理的數(shù)據(jù)

MOVOU(AX), X2

? ? MOVOU? -16(AX)(CX*1), X3

// 異或種子

PXOR? ? X0, X2

? ? PXOR? ? X1, X3

// 連續(xù)加密3次

AESENC? X2, X2

? ? AESENC? X3, X3

? ? AESENC? X2, X2

? ? AESENC? X3, X3

? ? AESENC? X2, X2

? ? AESENC? X3, X3

// 拼接并生成結(jié)果

PXOR? ? X3, X2

? ? MOVQ? ? X2, (DX)

? ? RET

aes33to64:?

// 處理第三個(gè)以上的起始種子

MOVO? ? X1, X2

? ? MOVO? ? X1, X3

? ? PXOR? ? runtime·aeskeysched+16(SB), X1

? ? PXOR? ? runtime·aeskeysched+32(SB), X2

? ? PXOR? ? runtime·aeskeysched+48(SB), X3

? ? AESENC? X1, X1

? ? AESENC? X2, X2

? ? AESENC? X3, X3

MOVOU(AX), X4

? ? MOVOU? 16(AX), X5

? ? MOVOU? -32(AX)(CX*1), X6

? ? MOVOU? -16(AX)(CX*1), X7

? ? PXOR? ? X0, X4

? ? PXOR? ? X1, X5

? ? PXOR? ? X2, X6

? ? PXOR? ? X3, X7

? ? AESENC? X4, X4

? ? AESENC? X5, X5

? ? AESENC? X6, X6

? ? AESENC? X7, X7

? ? AESENC? X4, X4

? ? AESENC? X5, X5

? ? AESENC? X6, X6

? ? AESENC? X7, X7

? ? AESENC? X4, X4

? ? AESENC? X5, X5

? ? AESENC? X6, X6

? ? AESENC? X7, X7

? ? PXOR? ? X6, X4

? ? PXOR? ? X7, X5

? ? PXOR? ? X5, X4

? ? MOVQ? ? X4, (DX)

? ? RET

aes65to128:?

// 處理第七個(gè)以上的起始種子

MOVO? ? X1, X2

? ? MOVO? ? X1, X3

? ? MOVO? ? X1, X4

? ? MOVO? ? X1, X5

? ? MOVO? ? X1, X6

? ? MOVO? ? X1, X7

? ? PXOR? ? runtime·aeskeysched+16(SB), X1

? ? PXOR? ? runtime·aeskeysched+32(SB), X2

? ? PXOR? ? runtime·aeskeysched+48(SB), X3

? ? PXOR? ? runtime·aeskeysched+64(SB), X4

? ? PXOR? ? runtime·aeskeysched+80(SB), X5

? ? PXOR? ? runtime·aeskeysched+96(SB), X6

? ? PXOR? ? runtime·aeskeysched+112(SB), X7

? ? AESENC? X1, X1

? ? AESENC? X2, X2

? ? AESENC? X3, X3

? ? AESENC? X4, X4

? ? AESENC? X5, X5

? ? AESENC? X6, X6

? ? AESENC? X7, X7

// 加載數(shù)據(jù)

MOVOU(AX), X8

? ? MOVOU? 16(AX), X9

? ? MOVOU? 32(AX), X10

? ? MOVOU? 48(AX), X11

? ? MOVOU? -64(AX)(CX*1), X12

? ? MOVOU? -48(AX)(CX*1), X13

? ? MOVOU? -32(AX)(CX*1), X14

? ? MOVOU? -16(AX)(CX*1), X15

// 異或種子

PXOR? ? X0, X8

? ? PXOR? ? X1, X9

? ? PXOR? ? X2, X10

? ? PXOR? ? X3, X11

? ? PXOR? ? X4, X12

? ? PXOR? ? X5, X13

? ? PXOR? ? X6, X14

? ? PXOR? ? X7, X15

// 連續(xù)加密3次

AESENC? X8, X8

? ? AESENC? X9, X9

? ? AESENC? X10, X10

? ? AESENC? X11, X11

? ? AESENC? X12, X12

? ? AESENC? X13, X13

? ? AESENC? X14, X14

? ? AESENC? X15, X15

? ? AESENC? X8, X8

? ? AESENC? X9, X9

? ? AESENC? X10, X10

? ? AESENC? X11, X11

? ? AESENC? X12, X12

? ? AESENC? X13, X13

? ? AESENC? X14, X14

? ? AESENC? X15, X15

? ? AESENC? X8, X8

? ? AESENC? X9, X9

? ? AESENC? X10, X10

? ? AESENC? X11, X11

? ? AESENC? X12, X12

? ? AESENC? X13, X13

? ? AESENC? X14, X14

? ? AESENC? X15, X15

// 拼裝結(jié)果

PXOR? ? X12, X8

? ? PXOR? ? X13, X9

? ? PXOR? ? X14, X10

? ? PXOR? ? X15, X11

? ? PXOR? ? X10, X8

? ? PXOR? ? X11, X9

? ? PXOR? ? X9, X8

? ? MOVQ? ? X8, (DX)

? ? RET

aes129plus:?

// 處理第七個(gè)以上的起始種子

MOVO? ? X1, X2

? ? MOVO? ? X1, X3

? ? MOVO? ? X1, X4

? ? MOVO? ? X1, X5

? ? MOVO? ? X1, X6

? ? MOVO? ? X1, X7

? ? PXOR? ? runtime·aeskeysched+16(SB), X1

? ? PXOR? ? runtime·aeskeysched+32(SB), X2

? ? PXOR? ? runtime·aeskeysched+48(SB), X3

? ? PXOR? ? runtime·aeskeysched+64(SB), X4

? ? PXOR? ? runtime·aeskeysched+80(SB), X5

? ? PXOR? ? runtime·aeskeysched+96(SB), X6

? ? PXOR? ? runtime·aeskeysched+112(SB), X7

? ? AESENC? X1, X1

? ? AESENC? X2, X2

? ? AESENC? X3, X3

? ? AESENC? X4, X4

? ? AESENC? X5, X5

? ? AESENC? X6, X6

? ? AESENC? X7, X7

// 逆序開始,從最后的block開始處理拳喻,因?yàn)榭赡軙?huì)出現(xiàn)重疊的情況

MOVOU-128(AX)(CX*1), X8

? ? MOVOU? -112(AX)(CX*1), X9

? ? MOVOU? -96(AX)(CX*1), X10

? ? MOVOU? -80(AX)(CX*1), X11

? ? MOVOU? -64(AX)(CX*1), X12

? ? MOVOU? -48(AX)(CX*1), X13

? ? MOVOU? -32(AX)(CX*1), X14

? ? MOVOU? -16(AX)(CX*1), X15

// 異或種子

PXOR? ? X0, X8

? ? PXOR? ? X1, X9

? ? PXOR? ? X2, X10

? ? PXOR? ? X3, X11

? ? PXOR? ? X4, X12

? ? PXOR? ? X5, X13

? ? PXOR? ? X6, X14

? ? PXOR? ? X7, X15

// 計(jì)算剩余128字節(jié)塊的數(shù)量

? ? DECQ? ? CX

? ? SHRQ? ? $7, CX

aesloop:?

// 加密狀態(tài)

AESENC? X8, X8

? ? AESENC? X9, X9

? ? AESENC? X10, X10

? ? AESENC? X11, X11

? ? AESENC? X12, X12

? ? AESENC? X13, X13

? ? AESENC? X14, X14

? ? AESENC? X15, X15

// 在同一個(gè)block塊中加密狀態(tài)梁剔,進(jìn)行異或運(yùn)算

MOVOU(AX), X0

? ? MOVOU? 16(AX), X1

? ? MOVOU? 32(AX), X2

? ? MOVOU? 48(AX), X3

? ? AESENC? X0, X8

? ? AESENC? X1, X9

? ? AESENC? X2, X10

? ? AESENC? X3, X11

? ? MOVOU? 64(AX), X4

? ? MOVOU? 80(AX), X5

? ? MOVOU? 96(AX), X6

? ? MOVOU? 112(AX), X7

? ? AESENC? X4, X12

? ? AESENC? X5, X13

? ? AESENC? X6, X14

? ? AESENC? X7, X15

? ? ADDQ? ? $128, AX

? ? DECQ? ? CX

? ? JNE aesloop

? ? // 最后一步,進(jìn)行3次以上的加密

AESENC? X8, X8

? ? AESENC? X9, X9

? ? AESENC? X10, X10

? ? AESENC? X11, X11

? ? AESENC? X12, X12

? ? AESENC? X13, X13

? ? AESENC? X14, X14

? ? AESENC? X15, X15

? ? AESENC? X8, X8

? ? AESENC? X9, X9

? ? AESENC? X10, X10

? ? AESENC? X11, X11

? ? AESENC? X12, X12

? ? AESENC? X13, X13

? ? AESENC? X14, X14

? ? AESENC? X15, X15

? ? AESENC? X8, X8

? ? AESENC? X9, X9

? ? AESENC? X10, X10

? ? AESENC? X11, X11

? ? AESENC? X12, X12

? ? AESENC? X13, X13

? ? AESENC? X14, X14

? ? AESENC? X15, X15

? ? PXOR? ? X12, X8

? ? PXOR? ? X13, X9

? ? PXOR? ? X14, X10

? ? PXOR? ? X15, X11

? ? PXOR? ? X10, X8

? ? PXOR? ? X11, X9

? ? PXOR? ? X9, X8

? ? MOVQ? ? X8, (DX)

? ? RET

2. 哈希沖突處理

(1)鏈表數(shù)組法

鏈表數(shù)組法比較簡(jiǎn)單舞蔽,每個(gè)鍵值對(duì)表長(zhǎng)取模,如果結(jié)果相同码撰,用鏈表的方式依次往后插入渗柿。

假設(shè)待插入的鍵值集合是{ 2,3,5朵栖,7颊亮,11,13陨溅,19}终惑,表長(zhǎng) MOD 8。假設(shè)哈希函數(shù)在[0,9)上均勻分布门扇。如上圖雹有。

接下來重點(diǎn)進(jìn)行性能分析:

查找鍵值 k,假設(shè)鍵值 k 不在哈希表中臼寄,h(k) 在 [0霸奕,M) 中均勻分布,即 P(h(k) = i) = 1/M 吉拳。令 Xi 為哈希表 ht[ i ] 中包含鍵值的個(gè)數(shù)质帅。如果 h(k) = i ,則不成功查找 k 的鍵值比較次數(shù)是 Xi留攒,于是:

成功查找的分析稍微復(fù)雜一點(diǎn)煤惩。要考慮添加哈希表的次序,不考慮有相同鍵值的情況炼邀,假設(shè) K = {k1,k2,……kn}魄揉,并且假設(shè)從空哈希表開始按照這個(gè)次序添加到哈希表中。引入隨機(jī)變量汤善,如果 h(ki) = h(kj)什猖,那么 Xij = 1照藻;如果 h(ki) 辞嗡!= h(kj),那么 Xij = 0 兔朦。

由于之前的假設(shè)哈希表是均勻分布的在旱,所以 P(Xij = i) = E(Xij) = 1/M 摇零,這里的 E(X) 表示隨機(jī)變量 X 的數(shù)學(xué)期望。再假設(shè)每次添加鍵值的時(shí)候都是把添加在鏈表末端桶蝎。令 Ci 為查找 Ki 所需的鍵值比較次數(shù)驻仅,由于不能事先確定查找 Ki 的概率,所以假定查找不同鍵值的概率都是相同的登渣,都是 1/n 噪服,則有:

由此我們可以看出,哈希表的性能和表中元素的多少關(guān)系不大胜茧,而和填充因子 α 有關(guān)粘优。如果哈希表長(zhǎng)和哈希表中元素個(gè)數(shù)成正比仇味,則哈希表查找的復(fù)雜度為 O(1) 。

綜上所述雹顺,鏈表數(shù)組的成功與不成功的平均鍵值比較次數(shù)如下:

(2)開放地址法 —— 線性探測(cè)

線性探測(cè)的規(guī)則是 hi = ( h(k) + i ) MOD M丹墨。舉個(gè)例子,i = 1嬉愧,M = 9贩挣。

這種處理沖突的方法,一旦發(fā)生沖突没酣,就把位置往后加1王财,直到找到一個(gè)空的位置。

舉例如下四康,假設(shè)待插入的鍵值集合是{2搪搏,3,5闪金,7疯溺,11,13哎垦,19}囱嫩,線性探測(cè)的發(fā)生沖突以后添加的值為1,那么最終結(jié)果如下:

線性探測(cè)哈希表的性能分析比較復(fù)雜漏设,這里就僅給出結(jié)果墨闲。

(3)開放地址法 —— 平方探測(cè)

線性探測(cè)的規(guī)則是 h0 = h(k) ,hi = ( h0 + i * i ) MOD M郑口。

舉例如下鸳碧,假設(shè)待插入的鍵值集合是{2,3犬性,5瞻离,7,11乒裆,13套利,20},平方探測(cè)的發(fā)生沖突以后添加的值為查找次數(shù)的平方鹤耍,那么最終結(jié)果如下:

平方探測(cè)在線性探測(cè)的基礎(chǔ)上肉迫,加了一個(gè)二次曲線。當(dāng)發(fā)生沖突以后稿黄,不再是加一個(gè)線性的參數(shù)喊衫,而是加上探測(cè)次數(shù)的平方。

平方探測(cè)有一個(gè)需要注意的是杆怕,M的大小有講究族购。如果M不是奇素?cái)?shù)鼻听,那么就可能出現(xiàn)下面這樣的問題,即使哈希表里面還有空的位置联四,但是卻有元素找不到要插入的位置。

舉例撑教,假設(shè) M = 10朝墩,待插入的鍵值集合是{0,1伟姐,4收苏,5,6愤兵,9鹿霸,10},當(dāng)前面6個(gè)鍵值插入哈希表中以后秆乳,10就再也無法插入了懦鼠。

所以在平方探測(cè)中,存在下面這則規(guī)律:

如果 M 為奇素?cái)?shù)屹堰,則下面的 ?M / 2? 位置 h0肛冶,h1,h2 …… h?M/2? 互不相同扯键。其中睦袖,hi = (h0 + i * i ) MOD M。

這面這則規(guī)律可以用反證法證明荣刑。假設(shè) hi = hj馅笙,i > j;0<=i厉亏,j<= ?M/2?董习,則 h0 + i* i = ( h0 + j * j ) MOD M,從而 M 可以整除 ( i + j )( i - j )叶堆。由于 M 為素?cái)?shù)阱飘,并且 0 < i + j,i - j < M虱颗,當(dāng)且僅當(dāng) i = j 的時(shí)候才能滿足沥匈。

上述規(guī)則也就說明了一點(diǎn),只要 M 為奇素?cái)?shù)忘渔,平方探測(cè)至少可以遍歷哈希表一般的位置高帖。所以只要哈希表的填充因子 α <= 1 / 2 ,平方探測(cè)總能找到可插入的位置畦粮。

上述舉的例子散址,之所以鍵值10無法插入乖阵,原因也因?yàn)?α > 1 / 2了,所以不能保證有可插入的位置了预麸。

(4)開放地址法 —— 雙哈希探測(cè)

雙哈希探測(cè)是為了解決聚集的現(xiàn)象瞪浸。無論是線性探測(cè)還是平方探測(cè),如果 h(k1) 和 h(k2) 相鄰吏祸,則它們的探測(cè)序列也都是相鄰的对蒲,這就是所謂的聚集現(xiàn)象。為了避免這種現(xiàn)象贡翘,所以引入了雙哈希函數(shù) h2蹈矮,使得兩次探測(cè)之間的距離為 h2(k)。所以探測(cè)序列為 h0 = h1(k)鸣驱,hi = ( h0 + i * h2(k) ) MOD M 泛鸟。實(shí)驗(yàn)表明,雙哈希探測(cè)的性能類似于隨機(jī)探測(cè)踊东。

關(guān)于雙哈希探測(cè)和平方探測(cè)的平均查找長(zhǎng)度比線性探測(cè)更加困難北滥。所以引入隨機(jī)探測(cè)的概念來近似這兩種探測(cè)。隨機(jī)探測(cè)是指探測(cè)序列 { hi } 在區(qū)間 [0递胧,M]中等概率獨(dú)立隨機(jī)選取碑韵,這樣 P(hi = j) = 1/M 。

假設(shè)探測(cè)序列為 h0缎脾,h1祝闻,……,hi遗菠。在哈希表的 hi 位置為空联喘,在 h0,h1辙纬,……豁遭,hi-1 的位置上哈希表不是空,此次查找的鍵值比較次數(shù)為 i贺拣。令隨機(jī)變量 X 為一次不成功查找所需的鍵值比較次數(shù)蓖谢。由于哈希表的填充因子為 α,所以在一個(gè)位置上哈希表為空值的概率為 1 - α 譬涡,為非空值的概率為 α闪幽,所以 P( X = i ) = α^i * ( 1 - α ) 。

在概率論中涡匀,上述的分布叫幾何分布盯腌。

假定哈希表元素的添加順序?yàn)?{k1,k2陨瘩,…… 腕够,kn}级乍,令 Xi 為當(dāng)哈希表只包含 {k1,k2帚湘,…… 玫荣,ki} 時(shí)候一次不成功查找的鍵值比較次數(shù),注意,這個(gè)時(shí)候哈希表的填充因子為 i/M ,則查找 k(i+1) 的鍵值次數(shù)為 Yi = 1 + Xi陈莽。假定查找任意一個(gè)鍵值的概率為 1/n汗捡,則一次成功查找的平均鍵值比較次數(shù)為:

綜上所述,平方探測(cè)和雙哈希探測(cè)的成功與不成功的平均鍵值比較次數(shù)如下:

總的來說脸侥,在數(shù)據(jù)量非常大的情況下建邓,簡(jiǎn)單的 hash 函數(shù)不可避免不產(chǎn)生碰撞,即使采用了合適的處理碰撞的方法睁枕,依舊有一定時(shí)間復(fù)雜度官边。所以想盡可能的避免碰撞,還是要選擇高性能的 hash 函數(shù)外遇,或者增加 hash 的位數(shù)注簿,比如64位,128位跳仿,256位诡渴,這樣碰撞的幾率會(huì)小很多。

3. 哈希表的擴(kuò)容策略

隨著哈希表裝載因子的變大菲语,發(fā)生碰撞的次數(shù)變得越來也多妄辩,哈希表的性能變得越來越差。對(duì)于單獨(dú)鏈表法實(shí)現(xiàn)的哈希表山上,尚可以容忍眼耀,但是對(duì)于開放尋址法,這種性能的下降是不能接受的佩憾,因此對(duì)于開放尋址法需要尋找一種方法解決這個(gè)問題哮伟。

在實(shí)際應(yīng)用中,解決這個(gè)問題的辦法是動(dòng)態(tài)的增大哈希表的長(zhǎng)度妄帘,當(dāng)裝載因子超過某個(gè)閾值時(shí)增加哈希表的長(zhǎng)度楞黄,自動(dòng)擴(kuò)容。每當(dāng)哈希表的長(zhǎng)度發(fā)生變化之后寄摆,所有 key 在哈希表中對(duì)應(yīng)的下標(biāo)索引需要全部重新計(jì)算谅辣,不能直接從原來的哈希表中拷貝到新的哈希表中。必須一個(gè)一個(gè)計(jì)算原來哈希表中的 key 的哈希值并插入到新的哈希表中婶恼。這種方式肯定是達(dá)不到生產(chǎn)環(huán)境的要求的桑阶,因?yàn)闀r(shí)間復(fù)雜度太高了柏副,O(n),數(shù)據(jù)量一旦大了蚣录,性能就會(huì)很差割择。Redis 想了一種方法,就算是觸發(fā)增長(zhǎng)時(shí)也只需要常數(shù)時(shí)間 O(1) 即可完成插入操作萎河。解決辦法是分多次荔泳、漸進(jìn)式地完成的舊哈希表到新哈希表的拷貝而不是一次拷貝完成。

接下來以 Redis 為例虐杯,來談?wù)勊枪1硎侨绾芜M(jìn)行擴(kuò)容并且不太影響性能的玛歌。

Redis 對(duì)字典的定義如下:

C

/*

* 字典

*

* 每個(gè)字典使用兩個(gè)哈希表,用于實(shí)現(xiàn)漸進(jìn)式 rehash

*/

typedefstructdict{

// 特定于類型的處理函數(shù)

dictType*type;

// 類型處理函數(shù)的私有數(shù)據(jù)

void*privdata;

// 哈希表(2 個(gè))

dictht ht[2];

// 記錄 rehash 進(jìn)度的標(biāo)志擎椰,值為 -1 表示 rehash 未進(jìn)行

intrehashidx;

// 當(dāng)前正在運(yùn)作的安全迭代器數(shù)量

intiterators;

}dict;

從定義上我們可以看到支子,Redis 字典保存了2個(gè)哈希表,哈希表ht[1]就是用來 rehash 的达舒。

在 Redis 中定義了如下的哈希表的數(shù)據(jù)結(jié)構(gòu):

C

/*

* 哈希表

*/

typedefstructdictht{

// 哈希表節(jié)點(diǎn)指針數(shù)組(俗稱桶值朋,bucket)

dictEntry**table;

// 指針數(shù)組的大小

unsignedlongsize;

// 指針數(shù)組的長(zhǎng)度掩碼,用于計(jì)算索引值

unsignedlongsizemask;

// 哈希表現(xiàn)有的節(jié)點(diǎn)數(shù)量

unsignedlongused;

}dictht;

table 屬性是個(gè)數(shù)組巩搏, 數(shù)組的每個(gè)元素都是個(gè)指向 dictEntry 結(jié)構(gòu)的指針昨登。

每個(gè) dictEntry 都保存著一個(gè)鍵值對(duì), 以及一個(gè)指向另一個(gè) dictEntry 結(jié)構(gòu)的指針:

C

/*

* 哈希表節(jié)點(diǎn)

*/

typedefstructdictEntry{

// 鍵

void*key;

// 值

union{

void*val;

uint64_tu64;

int64_ts64;

}v;

// 鏈往后繼節(jié)點(diǎn)

structdictEntry*next;

}dictEntry;

next 屬性指向另一個(gè)?dictEntry 結(jié)構(gòu)贯底, 多個(gè)?dictEntry 可以通過?next 指針串連成鏈表丰辣, 從這里可以看出,?dictht 使用鏈地址法來處理鍵碰撞問題的禽捆。

dictAdd 在每次向字典添加新鍵值對(duì)之前糯俗, 都會(huì)對(duì)哈希表 ht[0] 進(jìn)行檢查, 對(duì)于 ht[0] 的 size 和 used 屬性睦擂, 如果它們之間的比率 ratio = used / size 滿足以下任何一個(gè)條件的話得湘,rehash 過程就會(huì)被激活:

自然 rehash : ratio >= 1 ,且變量 dict_can_resize 為真顿仇。

強(qiáng)制 rehash : ratio 大于變量 dict_force_resize_ratio (目前版本中淘正, dict_force_resize_ratio 的值為 5 )。

假設(shè)當(dāng)前的字典需要擴(kuò)容 rehash臼闻,那么 Redis 會(huì)先設(shè)置字典的 rehashidx 為 0 鸿吆,標(biāo)識(shí)著 rehash 的開始;再為 ht[1]->table 分配空間述呐,大小至少為 ht[0]->used 的兩倍惩淳。

如上圖, ht[1]->table 已經(jīng)分配空間了8個(gè)空間了。

接著思犁,開始 rehash 代虾。將 ht[0]->table 內(nèi)的鍵值移動(dòng)到 ht[1]->table 中,鍵值的移動(dòng)不是一次完成的激蹲,分多次進(jìn)行棉磨。

上圖可以看出來, ht[0] 中的一部分鍵值已經(jīng)遷移到 ht[1] 中了学辱,并且此時(shí)還有新的鍵值插入進(jìn)來乘瓤,是直接插入到 ht[1] 中的,不會(huì)再插入到 ht[0] 中了策泣。保證了 ht[0] 只減不增衙傀。

在 rehash 進(jìn)行的過程中,不斷的有新的鍵值插入進(jìn)來萨咕,也不斷的把 ht[0] 中的鍵值都遷移過來差油,直到 ht[0] 中的鍵值都遷移過來為止。注意 Redis 用的是頭插法任洞,新值永遠(yuǎn)都插在鏈表的第一個(gè)位置,這樣也不用遍歷到鏈表的最后发侵,省去了 O(n) 的時(shí)間復(fù)雜度交掏。進(jìn)行到上圖這種情況,所有的節(jié)點(diǎn)也就遷移完畢了刃鳄。

rehash 在結(jié)束之前會(huì)進(jìn)行清理工作盅弛,釋放 ht[0] 的空間;用 ht[1] 來代替 ht[0] 叔锐,使原來的 ht[1] 成為新的 ht[0] 挪鹏;創(chuàng)建一個(gè)新的空哈希表,并將它設(shè)置為 ht[1] 愉烙;將字典的 rehashidx 屬性設(shè)置為 -1 讨盒,標(biāo)識(shí) rehash 已停止;

最終 rehash 結(jié)束以后情況如上圖步责。如果還下次還需要 rehash 返顺,重復(fù)上述過程即可。這種分多次蔓肯,漸進(jìn)式 rehash 的方式也成就了 Redis 的高性能遂鹊。

值得一提的是,Redis 是支持字典的 reshrink 操作的蔗包。操作步驟就是 rehash 的逆過程秉扑。

二. 紅黑樹優(yōu)化

讀到這里,讀者應(yīng)該已經(jīng)明白了到底用什么方式來控制 map 使得 Hash 碰撞的概率又小调限,哈希桶數(shù)組占用空間又少了吧舟陆,答案就是選擇好的 Hash 算法和增加擴(kuò)容機(jī)制误澳。

Java 在 JDK1.8 對(duì) HashMap 底層的實(shí)現(xiàn)再次進(jìn)行了優(yōu)化。

上圖是來自美團(tuán)博客總結(jié)的吨娜。從這里我們可以發(fā)現(xiàn):

Java 底層初始桶的個(gè)數(shù)是16個(gè)脓匿,負(fù)載因子默認(rèn)是0.75,也就是說當(dāng)鍵值第一次達(dá)到12個(gè)的時(shí)候就會(huì)進(jìn)行擴(kuò)容 resize宦赠。擴(kuò)容的臨界值在64陪毡,當(dāng)超過了64以后,并且沖突節(jié)點(diǎn)為8或者大于8勾扭,這個(gè)時(shí)候就會(huì)觸發(fā)紅黑樹轉(zhuǎn)換毡琉。為了防止底層鏈表過長(zhǎng),鏈表就轉(zhuǎn)換為紅黑樹妙色。

換句話說桅滋,當(dāng)桶的總個(gè)數(shù)沒有到64個(gè)的時(shí)候,即使鏈表長(zhǎng)為8身辨,也不會(huì)進(jìn)行紅黑樹轉(zhuǎn)換丐谋。

如果節(jié)點(diǎn)小于6個(gè),紅黑樹又會(huì)重新退化成鏈表煌珊。

當(dāng)然這里之所以選擇用紅黑樹來進(jìn)行優(yōu)化号俐,保證最壞情況不會(huì)退化成 O(n),紅黑樹能保證最壞時(shí)間復(fù)雜度也為 O(log n)定庵。

在美團(tuán)博客中也提到了吏饿,Java 在 JDK1.8 中還有一個(gè)值得學(xué)習(xí)的優(yōu)化。Java 在 rehash 的鍵值節(jié)點(diǎn)遷移過程中蔬浙,不需要再次計(jì)算一次 hash 計(jì)算猪落!

由于使用了2次冪的擴(kuò)展(指長(zhǎng)度擴(kuò)為原來2倍),所以畴博,元素的位置要么是在原位置笨忌,要么是在原位置再移動(dòng)2次冪的位置【悴。看下圖可以明白這句話的意思洲鸠,n 為 table 的長(zhǎng)度别垮,圖(a)表示擴(kuò)容前的 key1 和 key2 兩種 key 確定索引位置的示例,圖(b)表示擴(kuò)容后 key1 和

key2 兩種 key 確定索引位置的示例,其中 hash1 是 key1 對(duì)應(yīng)的哈希與高位運(yùn)算結(jié)果耳璧。

元素在重新計(jì)算 hash 之后痘昌,因?yàn)?n 變?yōu)?倍节预,那么 n-1 的 mask 范圍在高位多1bit(紅色)惊科,因此新的 index 就會(huì)發(fā)生這樣的變化:

所以在擴(kuò)容以后,就只需要看擴(kuò)容容量以后那個(gè)位上的值為0,還是為1掩完,如果是0噪漾,代表索引不變,如果是1且蓬,代表的是新的索引值等于原來的索引值加上 oldCap 即可欣硼,這樣就不需要再次計(jì)算一次 hash 了。

上圖是把16擴(kuò)容到32的情況恶阴。

三. Go 中 Map 的具體實(shí)現(xiàn)舉例

讀到這里诈胜,讀者對(duì)如何設(shè)計(jì)一個(gè) Map 應(yīng)該有一些自己的想法了。選擇一個(gè)優(yōu)秀的哈希算法冯事,用鏈表 + 數(shù)組 作為底層數(shù)據(jù)結(jié)構(gòu)焦匈,如何擴(kuò)容和優(yōu)化,這些應(yīng)該都有了解了昵仅。讀到這里也許讀者認(rèn)為本篇文章內(nèi)容已經(jīng)過半了缓熟,不過前面這些都是偏理論,接下來也許才到了本文的重點(diǎn)部分 —— 從零開始分析一下完整的 Map 實(shí)現(xiàn)摔笤。

接下來筆者對(duì) Go 中的 Map 的底層實(shí)現(xiàn)進(jìn)行分析够滑,也算是對(duì)一個(gè) Map 的具體實(shí)現(xiàn)和重要的幾個(gè)操作,添加鍵值吕世,刪除鍵值彰触,擴(kuò)容策略進(jìn)行舉例。

Go 的 map 實(shí)現(xiàn)在 /src/runtime/hashmap.go 這個(gè)文件中寞冯。

map 底層實(shí)質(zhì)還是一個(gè) hash table。

先來看看 Go 定義了一些常量晚伙。

Go

const(

// 一個(gè)桶里面最多可以裝的鍵值對(duì)的個(gè)數(shù)吮龄,8對(duì)。

bucketCntBits=3

bucketCnt=1<

// 觸發(fā)擴(kuò)容操作的最大裝載因子的臨界值

loadFactor=6.5

// 為了保持內(nèi)聯(lián)咆疗,鍵 和 值 的最大長(zhǎng)度都是128字節(jié)漓帚,如果超過了128個(gè)字節(jié),就存儲(chǔ)它的指針

maxKeySize=128

maxValueSize=128

// 數(shù)據(jù)偏移應(yīng)該是 bmap 的整數(shù)倍午磁,但是需要正確的對(duì)齊尝抖。

dataOffset=unsafe.Offsetof(struct{

? ? ? ? b bmap

vint64

}{}.v)

// tophash 的一些值

empty=0// 沒有鍵值對(duì)

evacuatedEmpty=1// 沒有鍵值對(duì),并且桶內(nèi)的鍵值被遷移走了迅皇。

evacuatedX=2// 鍵值對(duì)有效昧辽,并且已經(jīng)遷移了一個(gè)表的前半段

evacuatedY=3// 鍵值對(duì)有效,并且已經(jīng)遷移了一個(gè)表的后半段

minTopHash=4// 最小的 tophash

// 標(biāo)記

iterator=1// 當(dāng)前桶的迭代子

oldIterator=2// 舊桶的迭代子

hashWriting=4// 一個(gè)goroutine正在寫入map

sameSizeGrow=8// 當(dāng)前字典增長(zhǎng)到新字典并且保持相同的大小

// 迭代子檢查桶ID的哨兵

noCheck=1<<(8*sys.PtrSize)-1

)

這里值得說明的一點(diǎn)是觸發(fā)擴(kuò)容操作的臨界值6.5是怎么得來的登颓。這個(gè)值太大會(huì)導(dǎo)致overflow buckets過多搅荞,查找效率降低,過小會(huì)浪費(fèi)存儲(chǔ)空間。

據(jù) Google 開發(fā)人員稱咕痛,這個(gè)值是一個(gè)測(cè)試的程序痢甘,測(cè)量出來選擇的一個(gè)經(jīng)驗(yàn)值。

%overflow : 溢出率茉贡,平均一個(gè) bucket 有多少個(gè) 鍵值kv 的時(shí)候會(huì)溢出塞栅。

bytes/entry :

平均存一個(gè) 鍵值kv 需要額外存儲(chǔ)多少字節(jié)的數(shù)據(jù)。

hitprobe :

查找一個(gè)存在的 key 平均查找次數(shù)腔丧。

missprobe :

查找一個(gè)不存在的 key 平均查找次數(shù)放椰。

經(jīng)過這幾組測(cè)試數(shù)據(jù),最終選定 6.5 作為臨界的裝載因子悔据。

接著看看 Go 中 map header 的定義:

Go

typehmapstruct{

countint// map 的長(zhǎng)度

flagsuint8

Buint8// log以2為底庄敛,桶個(gè)數(shù)的對(duì)數(shù) (總共能存 6.5 * 2^B 個(gè)元素)

noverflowuint16// 近似溢出桶的個(gè)數(shù)

hash0uint32// 哈希種子

buckets? ? unsafe.Pointer// 有 2^B 個(gè)桶的數(shù)組. count==0 的時(shí)候,這個(gè)數(shù)組為 nil.

oldbuckets unsafe.Pointer// 舊的桶數(shù)組一半的元素

nevacuateuintptr// 擴(kuò)容增長(zhǎng)過程中的計(jì)數(shù)器

extra*mapextra// 可選字段

}

在 Go 的 map header 結(jié)構(gòu)中科汗,也包含了2個(gè)指向桶數(shù)組的指針藻烤,buckets 指向新的桶數(shù)組,oldbuckets 指向舊的桶數(shù)組头滔。這點(diǎn)和 Redis 字典中也有兩個(gè) dictht 數(shù)組類似怖亭。

hmap 的最后一個(gè)字段是一個(gè)指向 mapextra 結(jié)構(gòu)的指針,它的定義如下:

Go

typemapextrastruct{

overflow[2]*[]*bmap

nextOverflow*bmap

}

如果一個(gè)鍵值對(duì)沒有找到對(duì)應(yīng)的指針坤检,那么就會(huì)把它們先存到溢出桶 overflow 里面兴猩。在 mapextra 中還有一個(gè)指向下一個(gè)可用的溢出桶的指針。

溢出桶 overflow 是一個(gè)數(shù)組早歇,里面存了2個(gè)指向 *bmap 數(shù)組的指針倾芝。overflow[0] 里面裝的是 hmap.buckets 。overflow[1] 里面裝的是 hmap.oldbuckets箭跳。

再看看桶的數(shù)據(jù)結(jié)構(gòu)的定義晨另,bmap 就是 Go 中 map 里面桶對(duì)應(yīng)的結(jié)構(gòu)體類型。

Go

typebmapstruct{

tophash[bucketCnt]uint8

}

桶的定義比較簡(jiǎn)單谱姓,里面就只是包含了一個(gè) uint8 類型的數(shù)組借尿,里面包含8個(gè)元素。這8個(gè)元素存儲(chǔ)的是 hash 值的高8位屉来。

在 tophash 之后的內(nèi)存布局里還有2塊內(nèi)容路翻。緊接著 tophash 之后的是8對(duì) 鍵值 key- value 對(duì)。并且排列方式是 8個(gè) key 和 8個(gè) value 放在一起茄靠。

8對(duì) 鍵值 key- value 對(duì)結(jié)束以后緊接著一個(gè) overflow 指針茂契,指向下一個(gè) bmap。從此也可以看出 Go 中 map是用鏈表的方式處理 hash 沖突的慨绳。

為何 Go 存儲(chǔ)鍵值對(duì)的方式不是普通的 key/value账嚎、key/value莫瞬、key/value……這樣存儲(chǔ)的呢?它是鍵 key 都存儲(chǔ)在一起郭蕉,然后緊接著是 值value 都存儲(chǔ)在一起疼邀,為什么會(huì)這樣呢?

在 Redis 中召锈,當(dāng)使用 REDIS_ENCODING_ZIPLIST 編碼哈希表時(shí)旁振, 程序通過將鍵和值一同推入壓縮列表, 從而形成保存哈希表所需的鍵-值對(duì)結(jié)構(gòu)涨岁,如上圖拐袜。新添加的 key-value 對(duì)會(huì)被添加到壓縮列表的表尾。

這種結(jié)構(gòu)有一個(gè)弊端梢薪,如果存儲(chǔ)的鍵和值的類型不同蹬铺,在內(nèi)存中布局中所占字節(jié)不同的話,就需要對(duì)齊秉撇。比如說存儲(chǔ)一個(gè) map[int64]int8 類型的字典甜攀。

Go 為了節(jié)約內(nèi)存對(duì)齊的內(nèi)存消耗,于是把它設(shè)計(jì)成上圖所示那樣琐馆。

如果 map 里面存儲(chǔ)了上萬億的大數(shù)據(jù)规阀,這里節(jié)約出來的內(nèi)存空間還是比較可觀的。

1. 新建 Map

makemap 新建了一個(gè) Map瘦麸,如果入?yún)?h 不為空谁撼,那么 map 的 hmap 就是入?yún)⒌倪@個(gè) hmap,如果入?yún)?bucket 不為空滋饲,那么這個(gè) bucket 桶就作為第一個(gè)桶厉碟。

Go

func makemap(t *maptype, hint int64, h *hmap, bucket unsafe.Pointer) *hmap{

// hmap 的 size 大小的值非法

ifsz:=unsafe.Sizeof(hmap{});sz>48||sz!=t.hmap.size{

println("runtime: sizeof(hmap) =",sz,", t.hmap.size =",t.hmap.size)

throw("bad hmap size")

}

// 超過范圍的 hint 值都為0

ifhint<0||hint>int64(maxSliceCap(t.bucket.size)){

hint=0

}

// key 值的類型不是 Go 所支持的

if!ismapkey(t.key){

throw("runtime.makemap: unsupported map key type")

}

// 通過編譯器和反射檢車 key 值的 size 是否合法

ift.key.size>maxKeySize&&(!t.indirectkey||t.keysize!=uint8(sys.PtrSize))||

t.key.size<=maxKeySize&&(t.indirectkey||t.keysize!=uint8(t.key.size)){

throw("key size wrong")

}

// 通過編譯器和反射檢車 value 值的 size 是否合法

ift.elem.size>maxValueSize&&(!t.indirectvalue||t.valuesize!=uint8(sys.PtrSize))||

t.elem.size<=maxValueSize&&(t.indirectvalue||t.valuesize!=uint8(t.elem.size)){

throw("value size wrong")

}

// 雖然以下的變量我們不依賴,而且可以在編譯階段檢查下面這些值的合法性屠缭,

// 但是我們還是在這里檢測(cè)箍鼓。

// key 值對(duì)齊超過桶的個(gè)數(shù)

ift.key.align>bucketCnt{

throw("key align too big")

}

// value 值對(duì)齊超過桶的個(gè)數(shù)

ift.elem.align>bucketCnt{

throw("value align too big")

}

// key 值的 size 不是 key 值對(duì)齊的倍數(shù)

ift.key.size%uintptr(t.key.align)!=0{

throw("key size not a multiple of key align")

}

// value 值的 size 不是 value 值對(duì)齊的倍數(shù)

ift.elem.size%uintptr(t.elem.align)!=0{

throw("value size not a multiple of value align")

}

// 桶個(gè)數(shù)太小,無法正確對(duì)齊

ifbucketCnt<8{

throw("bucketsize too small for proper alignment")

}

// 數(shù)據(jù)偏移量不是 key 值對(duì)齊的整數(shù)倍勿她,說明需要在桶中填充 key

ifdataOffset%uintptr(t.key.align)!=0{

throw("need padding in bucket (key)")

}

// 數(shù)據(jù)偏移量不是 value 值對(duì)齊的整數(shù)倍袄秩,說明需要在桶中填充 value

ifdataOffset%uintptr(t.elem.align)!=0{

throw("need padding in bucket (value)")

}

B:=uint8(0)

for;overLoadFactor(hint,B);B++{

}

// 分配內(nèi)存并初始化哈希表

// 如果此時(shí) B = 0阵翎,那么 hmap 中的 buckets 字段稍后分配

// 如果 hint 值很大逢并,初始化這塊內(nèi)存需要一段時(shí)間。

buckets:=bucket

varextra*mapextra

ifB!=0{

varnextOverflow*bmap

// 初始化 bucket 和 nextOverflow

buckets,nextOverflow=makeBucketArray(t,B)

ifnextOverflow!=nil{

extra=new(mapextra)

extra.nextOverflow=nextOverflow

}

}

// 初始化 hmap

ifh==nil{

h=(*hmap)(newobject(t.hmap))

}

h.count=0

h.B=B

h.extra=extra

h.flags=0

h.hash0=fastrand()

h.buckets=buckets

h.oldbuckets=nil

h.nevacuate=0

h.noverflow=0

returnh

}

新建一個(gè) map 最重要的就是分配內(nèi)存并初始化哈希表郭卫,在 B 不為0的情況下砍聊,還會(huì)初始化 mapextra 并且會(huì) buckets 會(huì)被重新生成。

Go

func makeBucketArray(t *maptype, b uint8) (buckets unsafe.Pointer, nextOverflow *bmap){

base:=uintptr(1<

nbuckets:=base

ifb>=4{

nbuckets+=1<<(b-4)

sz:=t.bucket.size*nbuckets

up:=roundupsize(sz)

// 如果申請(qǐng) sz 大小的桶贰军,系統(tǒng)只能返回 up 大小的內(nèi)存空間玻蝌,那么桶的個(gè)數(shù)為 up / t.bucket.size

ifup!=sz{

nbuckets=up/t.bucket.size

}

}

buckets=newarray(t.bucket,int(nbuckets))

// 當(dāng) b > 4 并且計(jì)算出來桶的個(gè)數(shù)與 1 << b 個(gè)數(shù)不等的時(shí)候蟹肘,

ifbase!=nbuckets{

// 此時(shí) nbuckets 比 base 大,那么會(huì)預(yù)先分配 nbuckets - base 個(gè) nextOverflow 桶

nextOverflow=(*bmap)(add(buckets,base*uintptr(t.bucketsize)))

last:=(*bmap)(add(buckets,(nbuckets-1)*uintptr(t.bucketsize)))

last.setoverflow(t,(*bmap)(buckets))

}

returnbuckets,nextOverflow

}

這里的 newarray 就已經(jīng)是 mallocgc 了俯树。

從上述代碼里面可以看出帘腹,只有當(dāng) B >=4 的時(shí)候,makeBucketArray 才會(huì)生成 nextOverflow 指針指向 bmap许饿,從而在 Map 生成 hmap 的時(shí)候才會(huì)生成 mapextra 阳欲。

當(dāng) B = 3 ( B < 4 ) 的時(shí)候,初始化 hmap 只會(huì)生成8個(gè)桶陋率。

當(dāng) B = 4 ( B >= 4 ) 的時(shí)候球化,初始化 hmap 的時(shí)候還會(huì)額外生成 mapextra ,并初始化 nextOverflow瓦糟。mapextra 的 nextOverflow 指針會(huì)指向第16個(gè)桶結(jié)束筒愚,第17個(gè)桶的首地址。第17個(gè)桶(從0開始菩浙,也就是下標(biāo)為16的桶)的 bucketsize - sys.PtrSize 地址開始存一個(gè)指針巢掺,這個(gè)指針指向當(dāng)前整個(gè)桶的首地址。這個(gè)指針就是 bmap 的 overflow 指針芍耘。

當(dāng) B = 5 ( B >= 4 ) 的時(shí)候址遇,初始化 hmap 的時(shí)候還會(huì)額外生成 mapextra ,并初始化 nextOverflow斋竞。這個(gè)時(shí)候就會(huì)生成總共34個(gè)桶了莉钙。同理,最后一個(gè)桶大小減去一個(gè)指針的大小的地址開始存儲(chǔ)一個(gè) overflow 指針免绿。

2. 查找 Key

在 Go 中可帽,如果字典里面查找一個(gè)不存在的 key ,查找不到并不會(huì)返回一個(gè) nil 鳄袍,而是返回當(dāng)前類型的零值绢要。比如,字符串就返回空字符串拗小,int 類型就返回 0 重罪。

Go

func mapaccess1(t *maptype, h *hmap, key unsafe.Pointer)unsafe.Pointer{

ifraceenabled&&h!=nil{

// 獲取 caller 的 程序計(jì)數(shù)器 program counter

callerpc:=getcallerpc(unsafe.Pointer(&t))

// 獲取 mapaccess1 的程序計(jì)數(shù)器 program counter

pc:=funcPC(mapaccess1)

racereadpc(unsafe.Pointer(h),callerpc,pc)

raceReadObjectPC(t.key,key,callerpc,pc)

}

ifmsanenabled&&h!=nil{

msanread(key,t.key.size)

}

ifh==nil||h.count==0{

returnunsafe.Pointer(&zeroVal[0])

}

// 如果多線程讀寫,直接拋出異常

// 并發(fā)檢查 go hashmap 不支持并發(fā)訪問

ifh.flags&hashWriting!=0{

throw("concurrent map read and map write")

}

alg:=t.key.alg

// 計(jì)算 key 的 hash 值

hash:=alg.hash(key,uintptr(h.hash0))

m:=uintptr(1)<

// hash % (1<

b:=(*bmap)(add(h.buckets,(hash&m)*uintptr(t.bucketsize)))

// 如果當(dāng)前還存在 oldbuckets 桶

ifc:=h.oldbuckets;c!=nil{

// 當(dāng)前擴(kuò)容不是等量擴(kuò)容

if!h.sameSizeGrow(){

// 如果 oldbuckets 未遷移完成 則找找 oldbuckets 中對(duì)應(yīng)的 bucket(低 B-1 位)

// 否則為 buckets 中的 bucket(低 B 位)

// 把 mask 縮小 1 倍

m>>=1

}

oldb:=(*bmap)(add(c,(hash&m)*uintptr(t.bucketsize)))

if!evacuated(oldb){

// 如果 oldbuckets 桶存在哀九,并且還沒有擴(kuò)容遷移剿配,就在老的桶里面查找 key

b=oldb

}

}

// 取出 hash 值的高 8 位

top:=uint8(hash>>(sys.PtrSize*8-8))

// 如果 top 小于 minTopHash,就讓它加上 minTopHash 的偏移阅束。

// 因?yàn)?0 - minTopHash 這區(qū)間的數(shù)都已經(jīng)用來作為標(biāo)記位了

iftop

top+=minTopHash

}

for{

fori:=uintptr(0);i

// 如果 hash 的高8位和當(dāng)前 key 記錄的不一樣呼胚,就找下一個(gè)

// 這樣比較很高效,因?yàn)橹挥帽容^高8位息裸,不用比較所有的 hash 值

// 如果高8位都不相同蝇更,hash 值肯定不同沪编,但是高8位如果相同,那么就要比較整個(gè) hash 值了

ifb.tophash[i]!=top{

continue

}

// 取出 key 值的方式是用偏移量年扩,bmap 首地址 + i 個(gè) key 值大小的偏移量

k:=add(unsafe.Pointer(b),dataOffset+i*uintptr(t.keysize))

ift.indirectkey{

k=*((*unsafe.Pointer)(k))

}

// 比較 key 值是否相等

ifalg.equal(key,k){

// 如果找到了 key蚁廓,那么取出 value 值

// 取出 value 值的方式是用偏移量,bmap 首地址 + 8 個(gè) key 值大小的偏移量 + i 個(gè) value 值大小的偏移量

v:=add(unsafe.Pointer(b),dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.valuesize))

ift.indirectvalue{

v=*((*unsafe.Pointer)(v))

}

returnv

}

}

// 如果當(dāng)前桶里面沒有找到相應(yīng)的 key 厨幻,那么就去下一個(gè)桶去找

b=b.overflow(t)

// 如果 b == nil纳令,說明桶已經(jīng)都找完了,返回對(duì)應(yīng)type的零值

ifb==nil{

returnunsafe.Pointer(&zeroVal[0])

}

}

}

具體實(shí)現(xiàn)代碼如上克胳,詳細(xì)解釋見代碼平绩。

如上圖,這是一個(gè)查找 key 的全過程漠另。

首先計(jì)算出 key 對(duì)應(yīng)的 hash 值捏雌,hash 值對(duì) B 取余。

這里有一個(gè)優(yōu)化點(diǎn)笆搓。m % n 這步計(jì)算性湿,如果 n 是2的倍數(shù),那么可以省去這一步取余操作满败。

Go

m%n=m&(n-1)

這樣優(yōu)化就可以省去耗時(shí)的取余操作了肤频。這里例子中計(jì)算完取出來是 0010 ,也就是2算墨,于是對(duì)應(yīng)的是桶數(shù)組里面的第3個(gè)桶宵荒。為什么是第3個(gè)桶呢?首地址指向第0個(gè)桶净嘀,往下偏移2個(gè)桶的大小报咳,于是偏移到了第3個(gè)桶的首地址了,具體實(shí)現(xiàn)可以看上述代碼挖藏。

hash 的低 B 位決定了桶數(shù)組里面的第幾個(gè)桶暑刃,hash 值的高8位決定了這個(gè)桶數(shù)組 bmap 里面 key 存在 tophash 數(shù)組的第幾位了。如上圖膜眠,hash 的高8位用來和 tophash 數(shù)組里面的每個(gè)值進(jìn)行對(duì)比岩臣,如果高8位和 tophash[i] 不等,就直接比下一個(gè)宵膨。如果相等架谎,則取出 bmap 里面對(duì)應(yīng)完整的 key,再比較一次柄驻,看是否完全一致狐树。

整個(gè)查找過程優(yōu)先在 oldbucket 里面找(如果存在 lodbucket 的話)焙压,找完再去新 bmap 里面找鸿脓。

有人可能會(huì)有疑問抑钟,為何這里要加入 tophash 多一次比較呢?

tophash 的引入是為了加速查找的野哭。由于它只存了 hash 值的高8位在塔,比查找完整的64位要快很多。通過比較高8位拨黔,迅速找到高8位一致hash 值的索引蛔溃,接下來再進(jìn)行一次完整的比較,如果還一致篱蝇,那么就判定找到該 key 了贺待。

如果找到了 key 就返回對(duì)應(yīng)的 value。如果沒有找到零截,還會(huì)繼續(xù)去 overflow 桶繼續(xù)尋找麸塞,直到找到最后一個(gè)桶,如果還沒有找到就返回對(duì)應(yīng)類型的零值涧衙。

3. 插入 Key

插入 key 的過程和查找 key 的過程大體一致哪工。

Go

func mapassign(t *maptype, h *hmap, key unsafe.Pointer)unsafe.Pointer{

ifh==nil{

panic(plainError("assignment to entry in nil map"))

}

ifraceenabled{

// 獲取 caller 的 程序計(jì)數(shù)器 program counter

callerpc:=getcallerpc(unsafe.Pointer(&t))

// 獲取 mapassign 的程序計(jì)數(shù)器 program counter

pc:=funcPC(mapassign)

racewritepc(unsafe.Pointer(h),callerpc,pc)

raceReadObjectPC(t.key,key,callerpc,pc)

}

ifmsanenabled{

msanread(key,t.key.size)

}

// 如果多線程讀寫,直接拋出異常

// 并發(fā)檢查 go hashmap 不支持并發(fā)訪問

ifh.flags&hashWriting!=0{

throw("concurrent map writes")

}

alg:=t.key.alg

// 計(jì)算 key 值的 hash 值

hash:=alg.hash(key,uintptr(h.hash0))

// 在計(jì)算完 hash 值以后立即設(shè)置 hashWriting 變量的值弧哎,如果在計(jì)算 hash 值的過程中沒有完全寫完雁比,可能會(huì)導(dǎo)致 panic

h.flags|=hashWriting

// 如果 hmap 的桶的個(gè)數(shù)為0,那么就新建一個(gè)桶

ifh.buckets==nil{

h.buckets=newarray(t.bucket,1)

}

again:

// hash 值對(duì) B 取余撤嫩,求得所在哪個(gè)桶

bucket:=hash&(uintptr(1)<

// 如果還在擴(kuò)容中偎捎,繼續(xù)擴(kuò)容

ifh.growing(){

growWork(t,h,bucket)

}

// 根據(jù) hash 值的低 B 位找到位于哪個(gè)桶

b:=(*bmap)(unsafe.Pointer(uintptr(h.buckets)+bucket*uintptr(t.bucketsize)))

// 計(jì)算 hash 值的高 8 位

top:=uint8(hash>>(sys.PtrSize*8-8))

iftop

top+=minTopHash

}

varinserti*uint8

varinsertk unsafe.Pointer

varval unsafe.Pointer

for{

// 遍歷當(dāng)前桶所有鍵值,查找 key 對(duì)應(yīng)的 value

fori:=uintptr(0);i

ifb.tophash[i]!=top{

ifb.tophash[i]==empty&&inserti==nil{

// 如果往后找都沒有找到序攘,這里先記錄一個(gè)標(biāo)記鸭限,方便找不到以后插入到這里

inserti=&b.tophash[i]

// 計(jì)算出偏移 i 個(gè) key 值的位置

insertk=add(unsafe.Pointer(b),dataOffset+i*uintptr(t.keysize))

// 計(jì)算出 val 所在的位置,當(dāng)前桶的首地址 + 8個(gè) key 值所占的大小 + i 個(gè) value 值所占的大小

val=add(unsafe.Pointer(b),dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.valuesize))

}

continue

}

// 依次取出 key 值

k:=add(unsafe.Pointer(b),dataOffset+i*uintptr(t.keysize))

// 如果 key 值是一個(gè)指針两踏,那么就取出改指針對(duì)應(yīng)的 key 值

ift.indirectkey{

k=*((*unsafe.Pointer)(k))

}

// 比較 key 值是否相等

if!alg.equal(key,k){

continue

}

// 如果需要更新败京,那么就把 t.key 拷貝從 k 拷貝到 key

ift.needkeyupdate{

typedmemmove(t.key,k,key)

}

// 計(jì)算出 val 所在的位置,當(dāng)前桶的首地址 + 8個(gè) key 值所占的大小 + i 個(gè) value 值所占的大小

val=add(unsafe.Pointer(b),dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.valuesize))

gotodone

}

ovf:=b.overflow(t)

ifovf==nil{

break

}

b=ovf

}

// 沒有找到當(dāng)前的 key 值梦染,并且檢查最大負(fù)載因子赡麦,如果達(dá)到了最大負(fù)載因子,或者存在很多溢出的桶

if!h.growing()&&(overLoadFactor(int64(h.count),h.B)||tooManyOverflowBuckets(h.noverflow,h.B)){

// 開始擴(kuò)容

hashGrow(t,h)

gotoagain// Growing the table invalidates everything, so try again

}

// 如果找不到一個(gè)空的位置可以插入 key 值

ifinserti==nil{

// all current buckets are full, allocate a new one.

// 意味著當(dāng)前桶已經(jīng)全部滿了帕识,那么就生成一個(gè)新的

newb:=h.newoverflow(t,b)

inserti=&newb.tophash[0]

insertk=add(unsafe.Pointer(newb),dataOffset)

val=add(insertk,bucketCnt*uintptr(t.keysize))

}

// store new key/value at insert position

ift.indirectkey{

// 如果是存儲(chǔ) key 值的指針泛粹,這里就用 insertk 存儲(chǔ) key 值的地址

kmem:=newobject(t.key)

*(*unsafe.Pointer)(insertk)=kmem

insertk=kmem

}

ift.indirectvalue{

// 如果是存儲(chǔ) value 值的指針,這里就用 val 存儲(chǔ) key 值的地址

vmem:=newobject(t.elem)

*(*unsafe.Pointer)(val)=vmem

}

// 將 t.key 從 insertk 拷貝到 key 的位置

typedmemmove(t.key,insertk,key)

*inserti=top

// hmap 中保存的總 key 值的數(shù)量 + 1

h.count++

done:

// 禁止并發(fā)寫

ifh.flags&hashWriting==0{

throw("concurrent map writes")

}

h.flags&^=hashWriting

ift.indirectvalue{

// 如果 value 里面存儲(chǔ)的是指針肮疗,那么取值該指針指向的 value 值

val=*((*unsafe.Pointer)(val))

}

returnval

}

插入 key 的過程中和查找 key 有幾點(diǎn)不同晶姊,需要注意:

1.如果找到要插入的 key ,只需要直接更新對(duì)應(yīng)的 value 值就好了伪货。

2.如果沒有在 bmap 中沒有找到待插入的 key 们衙,這么這時(shí)分幾種情況钾怔。 情況一: bmap 中還有空位,在遍歷 bmap 的時(shí)候預(yù)先標(biāo)記空位蒙挑,一旦查找結(jié)束也沒有找到 key宗侦,就把 key 放到預(yù)先遍歷時(shí)候標(biāo)記的空位上。 情況二:bmap中已經(jīng)沒有空位了忆蚀。這個(gè)時(shí)候 bmap 裝的很滿了矾利。此時(shí)需要檢查一次最大負(fù)載因子是否已經(jīng)達(dá)到了。如果達(dá)到了馋袜,立即進(jìn)行擴(kuò)容操作男旗。擴(kuò)容以后在新桶里面插入 key,流程和上述的一致欣鳖。如果沒有達(dá)到最大負(fù)載因子剑肯,那么就在新生成一個(gè) bmap,并把前一個(gè) bmap 的 overflow 指針指向新的 bmap观堂。

3.在擴(kuò)容過程中让网,oldbucke t是被凍結(jié)的,查找 key 時(shí)會(huì)在 oldbucket 中查找师痕,但不會(huì)在 oldbucket 中插入數(shù)據(jù)溃睹。如果在 oldbucket 是找到了相應(yīng)的key,做法是將它遷移到新 bmap 后加入 evalucated 標(biāo)記胰坟。

其他流程和查找 key 基本一致因篇,這里就不再贅述了。

3. 刪除 Key

Go

func mapdelete(t *maptype, h *hmap, key unsafe.Pointer){

ifraceenabled&&h!=nil{

// 獲取 caller 的 程序計(jì)數(shù)器 program counter

callerpc:=getcallerpc(unsafe.Pointer(&t))

// 獲取 mapdelete 的程序計(jì)數(shù)器 program counter

pc:=funcPC(mapdelete)

racewritepc(unsafe.Pointer(h),callerpc,pc)

raceReadObjectPC(t.key,key,callerpc,pc)

}

ifmsanenabled&&h!=nil{

msanread(key,t.key.size)

}

ifh==nil||h.count==0{

return

}

// 如果多線程讀寫笔横,直接拋出異常

// 并發(fā)檢查 go hashmap 不支持并發(fā)訪問

ifh.flags&hashWriting!=0{

throw("concurrent map writes")

}

alg:=t.key.alg

// 計(jì)算 key 值的 hash 值

hash:=alg.hash(key,uintptr(h.hash0))

// 在計(jì)算完 hash 值以后立即設(shè)置 hashWriting 變量的值竞滓,如果在計(jì)算 hash 值的過程中沒有完全寫完,可能會(huì)導(dǎo)致 panic

h.flags|=hashWriting

bucket:=hash&(uintptr(1)<

// 如果還在擴(kuò)容中吹缔,繼續(xù)擴(kuò)容

ifh.growing(){

growWork(t,h,bucket)

}

// 根據(jù) hash 值的低 B 位找到位于哪個(gè)桶

b:=(*bmap)(unsafe.Pointer(uintptr(h.buckets)+bucket*uintptr(t.bucketsize)))

// 計(jì)算 hash 值的高 8 位

top:=uint8(hash>>(sys.PtrSize*8-8))

iftop

top+=minTopHash

}

for{

// 遍歷當(dāng)前桶所有鍵值商佑,查找 key 對(duì)應(yīng)的 value

fori:=uintptr(0);i

ifb.tophash[i]!=top{

continue

}

k:=add(unsafe.Pointer(b),dataOffset+i*uintptr(t.keysize))

// 如果 k 是指向 key 的指針,那么這里需要取出 key 的值

k2:=k

ift.indirectkey{

k2=*((*unsafe.Pointer)(k2))

}

if!alg.equal(key,k2){

continue

}

// key 的指針置空

ift.indirectkey{

*(*unsafe.Pointer)(k)=nil

}else{

// 清除 key 的內(nèi)存

typedmemclr(t.key,k)

}

v:=unsafe.Pointer(uintptr(unsafe.Pointer(b))+dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.valuesize))

// value 的指針置空

ift.indirectvalue{

*(*unsafe.Pointer)(v)=nil

}else{

// 清除 value 的內(nèi)存

typedmemclr(t.elem,v)

}

// 清空 tophash 里面的值

b.tophash[i]=empty

// map 里面 key 的總個(gè)數(shù)減1

h.count--

gotodone

}

// 如果沒有找到厢塘,那么就繼續(xù)查找 overflow 桶茶没,一直遍歷到最后一個(gè)

b=b.overflow(t)

ifb==nil{

gotodone

}

}

done:

ifh.flags&hashWriting==0{

throw("concurrent map writes")

}

h.flags&^=hashWriting

}

刪除操作主要流程和查找 key 流程也差不多,找到對(duì)應(yīng)的 key 以后晚碾,如果是指針指向原來的 key抓半,就把指針置為 nil。如果是值就清空它所在的內(nèi)存格嘁。還要清理 tophash 里面的值最后把 map 的 key 總個(gè)數(shù)計(jì)數(shù)器減1 笛求。

如果在擴(kuò)容過程中,刪除操作會(huì)在擴(kuò)容以后在新的 bmap 里面刪除。

查找的過程依舊會(huì)一直遍歷到鏈表的最后一個(gè) bmap 桶探入。

4. 增量翻倍擴(kuò)容

這部分算是整個(gè) Map 實(shí)現(xiàn)比較核心的部分了狡孔。我們都知道 Map 在不斷的裝載 Key 值的時(shí)候,查找效率會(huì)變的越來越低新症,如果此時(shí)不進(jìn)行擴(kuò)容操作的話,哈希沖突使得鏈表變得越來越長(zhǎng)响禽,性能也就越來越差徒爹。擴(kuò)容勢(shì)在必行。

但是擴(kuò)容過程中如果阻斷了 Key 值的寫入芋类,在處理大數(shù)據(jù)的時(shí)候會(huì)導(dǎo)致有一段不響應(yīng)的時(shí)間隆嗅,如果用在高實(shí)時(shí)的系統(tǒng)中,那么每次擴(kuò)容都會(huì)卡幾秒侯繁,這段時(shí)間都不能相應(yīng)任何請(qǐng)求胖喳。這種性能明顯是不能接受的。所以要既不影響寫入贮竟,也同時(shí)要進(jìn)行擴(kuò)容丽焊。這個(gè)時(shí)候就應(yīng)該增量擴(kuò)容了。

這里增量擴(kuò)容其實(shí)用途已經(jīng)很廣泛了隘谣,之前舉例的 Redis 就采用的增量擴(kuò)容策略呻畸。

接下來看看 Go 是怎么進(jìn)行增量擴(kuò)容的菇夸。

在 Go 的 mapassign 插入 Key 值、mapdelete 刪除 key 值的時(shí)候都會(huì)檢查當(dāng)前是否在擴(kuò)容中雌贱。

Go

func growWork(t *maptype, h *hmap, bucket uintptr){

// 確保我們遷移了所有 oldbucket

evacuate(t,h,bucket&h.oldbucketmask())

// 再遷移一個(gè)標(biāo)記過的桶

ifh.growing(){

evacuate(t,h,h.nevacuate)

}

}

從這里我們可以看到,每次執(zhí)行一次 growWork 會(huì)遷移2個(gè)桶偿短。一個(gè)是當(dāng)前的桶欣孤,這算是局部遷移,另外一個(gè)是 hmap 里面指向的 nevacuate 的桶昔逗,這算是增量遷移降传。

在插入 Key 值的時(shí)候,如果當(dāng)前在擴(kuò)容過程中勾怒,oldbucket 是被凍結(jié)的搬瑰,查找時(shí)會(huì)先在 oldbucket 中查找,但不會(huì)在oldbucket中插入數(shù)據(jù)控硼。只有在 oldbucket 找到了相應(yīng)的 key泽论,那么將它遷移到新 bucket 后加入 evalucated 標(biāo)記。

在刪除 Key 值的時(shí)候卡乾,如果當(dāng)前在擴(kuò)容過程中翼悴,優(yōu)先查找 bucket,即新桶,找到一個(gè)以后把它對(duì)應(yīng)的 Key鹦赎、Value 都置空谍椅。如果 bucket 里面找不到,才會(huì)去 oldbucket 中去查找古话。

每次插入 Key 值的時(shí)候雏吭,都會(huì)判斷一下當(dāng)前裝載因子是否超過了 6.5,如果達(dá)到了這個(gè)極限陪踩,就立即執(zhí)行擴(kuò)容操作 hashGrow杖们。這是擴(kuò)容之前的準(zhǔn)備工作。

Go

func hashGrow(t *maptype, h *hmap){

// 如果達(dá)到了最大裝載因子肩狂,就需要擴(kuò)容摘完。

// 不然的話,一個(gè)桶后面鏈表跟著一大堆的 overflow 桶

bigger:=uint8(1)

if!overLoadFactor(int64(h.count),h.B){

bigger=0

h.flags|=sameSizeGrow

}

// 把 hmap 的舊桶的指針指向當(dāng)前桶

oldbuckets:=h.buckets

// 生成新的擴(kuò)容以后的桶傻谁,hmap 的 buckets 指針指向擴(kuò)容以后的桶孝治。

newbuckets,nextOverflow:=makeBucketArray(t,h.B+bigger)

flags:=h.flags&^(iterator|oldIterator)

ifh.flags&iterator!=0{

flags|=oldIterator

}

// B 加上新的值

h.B+=bigger

h.flags=flags

// 舊桶指針指向當(dāng)前桶

h.oldbuckets=oldbuckets

// 新桶指針指向擴(kuò)容以后的桶

h.buckets=newbuckets

h.nevacuate=0

h.noverflow=0

ifh.extra!=nil&&h.extra.overflow[0]!=nil{

ifh.extra.overflow[1]!=nil{

throw("overflow is not nil")

}

// 交換 overflow[0] 和 overflow[1] 的指向

h.extra.overflow[1]=h.extra.overflow[0]

h.extra.overflow[0]=nil

}

ifnextOverflow!=nil{

ifh.extra==nil{

// 生成 mapextra

h.extra=new(mapextra)

}

h.extra.nextOverflow=nextOverflow

}

// 實(shí)際拷貝鍵值對(duì)的過程在 evacuate() 中

}

用圖表示出它的流程:

hashGrow 操作算是擴(kuò)容之前的準(zhǔn)備工作,實(shí)際拷貝的過程在 evacuate 中审磁。

hashGrow 操作會(huì)先生成擴(kuò)容以后的新的桶數(shù)組谈飒。新的桶數(shù)組的大小是之前的2倍。然后 hmap 的 buckets 會(huì)指向這個(gè)新的擴(kuò)容以后的桶态蒂,而 oldbuckets 會(huì)指向當(dāng)前的桶數(shù)組步绸。

處理完 hmap 以后,再處理 mapextra吃媒,nextOverflow 的指向原來的 overflow 指針瓤介,overflow 指針置為 null。

到此就做好擴(kuò)容之前的準(zhǔn)備工作了赘那。

Go

func evacuate(t *maptype, h *hmap, oldbucket uintptr){

b:=(*bmap)(add(h.oldbuckets,oldbucket*uintptr(t.bucketsize)))

// 在準(zhǔn)備擴(kuò)容之前桶的個(gè)數(shù)

newbit:=h.noldbuckets()

alg:=t.key.alg

if!evacuated(b){

//TODO:reuse overflow buckets instead of using new ones, if there

// is no iterator using the old buckets.? (If !oldIterator.)

var(

x,y*bmap// 在新桶里面 低位桶和高位桶

xi,yiint// key 和 value 值的索引值分別為 xi 刑桑, yi

xk,yk unsafe.Pointer// 指向 x 和 y 的 key 值的指針

xv,yv unsafe.Pointer// 指向 x 和 y 的 value 值的指針?

)

// 新桶中低位的一些桶

x=(*bmap)(add(h.buckets,oldbucket*uintptr(t.bucketsize)))

xi=0

// 擴(kuò)容以后的新桶中低位的第一個(gè) key 值

xk=add(unsafe.Pointer(x),dataOffset)

// 擴(kuò)容以后的新桶中低位的第一個(gè) key 值對(duì)應(yīng)的 value 值

xv=add(xk,bucketCnt*uintptr(t.keysize))

// 如果不是等量擴(kuò)容

if!h.sameSizeGrow(){

y=(*bmap)(add(h.buckets,(oldbucket+newbit)*uintptr(t.bucketsize)))

yi=0

yk=add(unsafe.Pointer(y),dataOffset)

yv=add(yk,bucketCnt*uintptr(t.keysize))

}

// 依次遍歷溢出桶

for;b!=nil;b=b.overflow(t){

k:=add(unsafe.Pointer(b),dataOffset)

v:=add(k,bucketCnt*uintptr(t.keysize))

// 遍歷 key - value 鍵值對(duì)

fori:=0;i

top:=b.tophash[i]

iftop==empty{

b.tophash[i]=evacuatedEmpty

continue

}

iftop

throw("bad map state")

}

k2:=k

// key 值如果是指針,則取出指針里面的值

ift.indirectkey{

k2=*((*unsafe.Pointer)(k2))

}

useX:=true

if!h.sameSizeGrow(){

// 如果不是等量擴(kuò)容募舟,則需要重新計(jì)算 hash 值祠斧,不管是高位桶 x 中,還是低位桶 y 中

hash:=alg.hash(k2,uintptr(h.hash0))

ifh.flags&iterator!=0{

if!t.reflexivekey&&!alg.equal(k2,k2){

// 如果兩個(gè) key 不相等拱礁,那么他們倆極大可能舊的 hash 值也不相等琢锋。

// tophash 對(duì)要遷移的 key 值也是沒有多大意義的,所以我們用低位的 tophash 輔助擴(kuò)容呢灶,標(biāo)記一些狀態(tài)吴超。

// 為下一個(gè)級(jí) level 重新計(jì)算一些新的隨機(jī)的 hash 值。以至于這些 key 值在多次擴(kuò)容以后依舊可以均勻分布在所有桶中

// 判斷 top 的最低位是否為1

iftop&1!=0{

hash|=newbit

}else{

hash&^=newbit

}

top=uint8(hash>>(sys.PtrSize*8-8))

iftop

top+=minTopHash

}

}

}

useX=hash&newbit==0

}

ifuseX{

// 標(biāo)記低位桶存在 tophash 中

b.tophash[i]=evacuatedX

// 如果 key 的索引值到了桶最后一個(gè)鸯乃,就新建一個(gè) overflow

ifxi==bucketCnt{

newx:=h.newoverflow(t,x)

x=newx

xi=0

xk=add(unsafe.Pointer(x),dataOffset)

xv=add(xk,bucketCnt*uintptr(t.keysize))

}

// 把 hash 的高8位再次存在 tophash 中

x.tophash[xi]=top

ift.indirectkey{

// 如果是指針指向 key 鲸阻,那么拷貝指針指向

*(*unsafe.Pointer)(xk)=k2// copy pointer

}else{

// 如果是指針指向 key ,那么進(jìn)行值拷貝

typedmemmove(t.key,xk,k)// copy value

}

// 同理拷貝 value

ift.indirectvalue{

*(*unsafe.Pointer)(xv)=*(*unsafe.Pointer)(v)

}else{

typedmemmove(t.elem,xv,v)

}

// 繼續(xù)遷移下一個(gè)

xi++

xk=add(xk,uintptr(t.keysize))

xv=add(xv,uintptr(t.valuesize))

}else{

// 這里是高位桶 y,遷移過程和上述低位桶 x 一致鸟悴,下面就不再贅述了

b.tophash[i]=evacuatedY

ifyi==bucketCnt{

newy:=h.newoverflow(t,y)

y=newy

yi=0

yk=add(unsafe.Pointer(y),dataOffset)

yv=add(yk,bucketCnt*uintptr(t.keysize))

}

y.tophash[yi]=top

ift.indirectkey{

*(*unsafe.Pointer)(yk)=k2

}else{

typedmemmove(t.key,yk,k)

}

ift.indirectvalue{

*(*unsafe.Pointer)(yv)=*(*unsafe.Pointer)(v)

}else{

typedmemmove(t.elem,yv,v)

}

yi++

yk=add(yk,uintptr(t.keysize))

yv=add(yv,uintptr(t.valuesize))

}

}

}

// Unlink the overflow buckets & clear key/value to help GC.

ifh.flags&oldIterator==0{

b=(*bmap)(add(h.oldbuckets,oldbucket*uintptr(t.bucketsize)))

// Preserve b.tophash because the evacuation

// state is maintained there.

ift.bucket.kind&kindNoPointers==0{

memclrHasPointers(add(unsafe.Pointer(b),dataOffset),uintptr(t.bucketsize)-dataOffset)

}else{

memclrNoHeapPointers(add(unsafe.Pointer(b),dataOffset),uintptr(t.bucketsize)-dataOffset)

}

}

}

// Advance evacuation mark

ifoldbucket==h.nevacuate{

h.nevacuate=oldbucket+1

// Experiments suggest that 1024 is overkill by at least an order of magnitude.

// Put it in there as a safeguard anyway, to ensure O(1) behavior.

stop:=h.nevacuate+1024

ifstop>newbit{

stop=newbit

}

forh.nevacuate!=stop&&bucketEvacuated(t,h,h.nevacuate){

h.nevacuate++

}

ifh.nevacuate==newbit{// newbit == # of oldbuckets

// Growing is all done. Free old main bucket array.

h.oldbuckets=nil

// Can discard old overflow buckets as well.

// If they are still referenced by an iterator,

// then the iterator holds a pointers to the slice.

ifh.extra!=nil{

h.extra.overflow[1]=nil

}

h.flags&^=sameSizeGrow

}

}

}

上述函數(shù)就是遷移過程最核心的拷貝工作了陈辱。

整個(gè)遷移過程并不難。這里需要說明的是 x 细诸,y 代表的意義沛贪。由于擴(kuò)容以后,新的桶數(shù)組是原來桶數(shù)組的2倍震贵。用 x 代表新的桶數(shù)組里面低位的那一半利赋,用 y 代表高位的那一半。其他的變量就是一些標(biāo)記了屏歹,游標(biāo)和標(biāo)記 key - value 原來所在的位置隐砸。詳細(xì)的見代碼注釋之碗。

上圖中表示了遷移開始之后的過程蝙眶。可以看到舊的桶數(shù)組里面的桶在遷移到新的桶中褪那,并且新的桶也在不斷的寫入新的 key 值幽纷。

一直拷貝鍵值對(duì),直到舊桶中所有的鍵值都拷貝到了新的桶中博敬。

最后一步就是釋放舊桶友浸,oldbuckets 的指針置為 null。到此偏窝,一次遷移過程就完全結(jié)束了收恢。

5. 等量擴(kuò)容

嚴(yán)格意義上這種方式并不能算是擴(kuò)容。但是函數(shù)名是 Grow祭往,姑且暫時(shí)就這么叫吧伦意。

在 go1.8 的版本開始,添加了 sameSizeGrow硼补,當(dāng) overflow buckets 的數(shù)量超過一定數(shù)量 (2^B) 但裝載因子又未達(dá)到 6.5 的時(shí)候驮肉,此時(shí)可能存在部分空的bucket,即 bucket 的使用率低已骇,這時(shí)會(huì)觸發(fā)sameSizeGrow离钝,即 B 不變,但走數(shù)據(jù)遷移流程褪储,將 oldbuckets 的數(shù)據(jù)重新緊湊排列提高 bucket 的利用率卵渴。當(dāng)然在 sameSizeGrow 過程中,不會(huì)觸發(fā) loadFactorGrow鲤竹。

四. Map 實(shí)現(xiàn)中的一些優(yōu)化

讀到這里奖恰,相信讀者心里應(yīng)該很清楚如何設(shè)計(jì)并實(shí)現(xiàn)一個(gè) Map 了吧。包括 Map 中的各種操作的實(shí)現(xiàn)。在探究如何實(shí)現(xiàn)一個(gè)線程安全的 Map 之前瑟啃,先把之前說到個(gè)一些亮點(diǎn)優(yōu)化點(diǎn)论泛,小結(jié)一下。

在 Redis 中蛹屿,采用增量式擴(kuò)容的方式處理哈希沖突屁奏。當(dāng)平均查找長(zhǎng)度超過 5 的時(shí)候就會(huì)觸發(fā)增量擴(kuò)容操作,保證 hash 表的高性能错负。

同時(shí) Redis 采用頭插法坟瓢,保證插入 key 值時(shí)候的性能。

在 Java 中犹撒,當(dāng)桶的個(gè)數(shù)超過了64個(gè)以后折联,并且沖突節(jié)點(diǎn)為8或者大于8,這個(gè)時(shí)候就會(huì)觸發(fā)紅黑樹轉(zhuǎn)換识颊。這樣能保證鏈表在很長(zhǎng)的情況下诚镰,查找長(zhǎng)度依舊不會(huì)太長(zhǎng),并且紅黑樹保證最差情況下也支持 O(log n) 的時(shí)間復(fù)雜度祥款。

Java 在遷移之后有一個(gè)非常好的設(shè)計(jì)清笨,只需要比較遷移之后桶個(gè)數(shù)的最高位是否為0,如果是0刃跛,key 在新桶內(nèi)的相對(duì)位置不變抠艾,如果是1,則加上桶的舊的桶的個(gè)數(shù) oldCap 就可以得到新的位置桨昙。

在 Go 中優(yōu)化的點(diǎn)比較多:

哈希算法選用高效的 memhash 算法 和 CPU AES指令集检号。AES 指令集充分利用 CPU 硬件特性,計(jì)算哈希值的效率超高蛙酪。

key - value 的排列設(shè)計(jì)成 key 放在一起齐苛,value 放在一起,而不是key滤否,value成對(duì)排列脸狸。這樣方便內(nèi)存對(duì)齊,數(shù)據(jù)量大了以后節(jié)約內(nèi)存對(duì)齊造成的一些浪費(fèi)藐俺。

key炊甲,value 的內(nèi)存大小超過128字節(jié)以后自動(dòng)轉(zhuǎn)成存儲(chǔ)一個(gè)指針。

tophash 數(shù)組的設(shè)計(jì)加速了 key 的查找過程欲芹。tophash 也被復(fù)用卿啡,用來標(biāo)記擴(kuò)容操作時(shí)候的狀態(tài)。

用位運(yùn)算轉(zhuǎn)換求余操作菱父,m % n 颈娜,當(dāng) n = 1 << B 的時(shí)候剑逃,可以轉(zhuǎn)換成 m & (1 << B - 1) 。

增量式擴(kuò)容官辽。

等量擴(kuò)容蛹磺,緊湊操作。

Go 1.9 版本以后同仆,Map 原生就已經(jīng)支持線程安全萤捆。(在下一章中重點(diǎn)討論這個(gè)問題)

當(dāng)然 Go 中還有一些需要再優(yōu)化的地方:

在遷移的過程中,當(dāng)前版本不會(huì)重用 overflow 桶俗批,而是直接重新申請(qǐng)一個(gè)新的桶俗或。這里可以優(yōu)化成優(yōu)先重用沒有指針指向的 overflow 桶,當(dāng)沒有可用的了岁忘,再去申請(qǐng)一個(gè)新的辛慰。這一點(diǎn)作者已經(jīng)寫在了 TODO 里面了。

動(dòng)態(tài)合并多個(gè) empty 的桶干像。

當(dāng)前版本中沒有 shrink 操作帅腌,Map 只能增長(zhǎng)而不能收縮。這塊 Redis 有相關(guān)的實(shí)現(xiàn)蝠筑。

線程安全就是如果你的代碼塊所在的進(jìn)程中有多個(gè)線程在同時(shí)運(yùn)行狞膘,而這些線程可能會(huì)同時(shí)運(yùn)行這段代碼揩懒。如果每次運(yùn)行結(jié)果和單線程運(yùn)行的結(jié)果是一樣的什乙,而且其他的變量的值也和預(yù)期的是一樣的,就是線程安全的。

如果代碼塊中包含了對(duì)共享數(shù)據(jù)的更新操作,那么這個(gè)代碼塊就可能是非線程安全的嗦哆。但是如果代碼塊中類似操作都處于臨界區(qū)之中聂儒,那么這個(gè)代碼塊就是線程安全的。

通常有以下兩類避免競(jìng)爭(zhēng)條件的方法來實(shí)現(xiàn)線程安全:

第一類 —— 避免共享狀態(tài)

1.可重入Re-entrancy

通常在線程安全的問題中衡载,最常見的代碼塊就是函數(shù)。讓函數(shù)具有線程安全的最有效的方式就是使其可重入。如果某個(gè)進(jìn)程中所有線程都可以并發(fā)的對(duì)函數(shù)進(jìn)行調(diào)用弃舒,并且無論他們調(diào)用該函數(shù)的實(shí)際執(zhí)行情況怎么樣,該函數(shù)都可以產(chǎn)生預(yù)期的結(jié)果状原,那么就可以說這個(gè)函數(shù)是可重入的聋呢。

如果一個(gè)函數(shù)把共享數(shù)據(jù)作為它的返回結(jié)果或者包含在它返回的結(jié)果中,那么該函數(shù)就肯定不是一個(gè)可重入的函數(shù)颠区。任何內(nèi)含了操作共享數(shù)據(jù)的代碼的函數(shù)都是不可重入的函數(shù)削锰。

為了實(shí)現(xiàn)線程安全的函數(shù),把所有代碼都置放于臨界區(qū)中是可行的毕莱。但是互斥量的使用總會(huì)耗費(fèi)一定的系統(tǒng)資源和時(shí)間器贩,使用互斥量的過程總會(huì)存在各種博弈和權(quán)衡颅夺。所以請(qǐng)合理使用互斥量保護(hù)好那些涉及共享數(shù)據(jù)操作的代碼。

注意:可重入只是線程安全的充分不必要條件蛹稍,并不是充要條件吧黄。這個(gè)反例在下面會(huì)講到。

2.線程本地存儲(chǔ)

如果變量已經(jīng)被本地化唆姐,所以每個(gè)線程都有自己的私有副本稚字。這些變量通過子程序和其他代碼邊界保留它們的值,并且是線程安全的厦酬,因?yàn)檫@些變量都是每個(gè)線程本地存儲(chǔ)的胆描,即使訪問它們的代碼可能被另一個(gè)線程同時(shí)執(zhí)行,依舊是線程安全的仗阅。

3.不可變量

對(duì)象一旦初始化以后就不能改變昌讲。這意味著只有只讀數(shù)據(jù)被共享,這也實(shí)現(xiàn)了固有的線程安全性减噪《坛瘢可變(不是常量)操作可以通過為它們創(chuàng)建新對(duì)象,而不是修改現(xiàn)有對(duì)象的方式去實(shí)現(xiàn)筹裕。 Java醋闭,C#和 Python 中的字符串的實(shí)現(xiàn)就使用了這種方法。

第二類 —— 線程同步

第一類方法都比較簡(jiǎn)單朝卒,通過代碼改造就可以實(shí)現(xiàn)证逻。但是如果遇到一定要進(jìn)行線程中共享數(shù)據(jù)的情況,第一類方法就解決不了了抗斤。這時(shí)候就出現(xiàn)了第二類解決方案囚企,利用線程同步的方法來解決線程安全問題。

今天就從線程同步開始說起瑞眼。

一. 線程同步理論

在多線程的程序中龙宏,多以共享數(shù)據(jù)作為線程之間傳遞數(shù)據(jù)的手段。由于一個(gè)進(jìn)程所擁有的相當(dāng)一部分虛擬內(nèi)存地址都可以被該進(jìn)程中所有線程共享伤疙,所以這些共享數(shù)據(jù)大多是以內(nèi)存空間作為載體的银酗。如果兩個(gè)線程同時(shí)讀取同一塊共享內(nèi)存但獲取到的數(shù)據(jù)卻不同,那么程序很容易出現(xiàn)一些 bug徒像。

為了保證共享數(shù)據(jù)一致性黍特,最簡(jiǎn)單并且最徹底的方法就是使該數(shù)據(jù)成為一個(gè)不變量。當(dāng)然這種絕對(duì)的方式在大多數(shù)情況下都是不可行的厨姚。比如函數(shù)中會(huì)用到一個(gè)計(jì)數(shù)器衅澈,記錄函數(shù)被調(diào)用了幾次,這個(gè)計(jì)數(shù)器肯定就不能被設(shè)為常量谬墙。那這種必須是變量的情況下今布,還要保證共享數(shù)據(jù)的一致性经备,這就引出了臨界區(qū)的概念。

臨界區(qū)的出現(xiàn)就是為了使該區(qū)域只能被串行的訪問或者執(zhí)行部默。臨界區(qū)可以是某個(gè)資源侵蒙,也可以是某段代碼。保證臨界區(qū)最有效的方式就是利用線程同步機(jī)制傅蹂。

先介紹2種共享數(shù)據(jù)同步的方法纷闺。

1. 互斥量

在同一時(shí)刻,只允許一個(gè)線程處于臨界區(qū)之內(nèi)的約束稱為互斥份蝴,每個(gè)線程在進(jìn)入臨界區(qū)之前犁功,都必須先鎖定某個(gè)對(duì)象,只有成功鎖定對(duì)象的線程才能允許進(jìn)入臨界區(qū)婚夫,否則就會(huì)阻塞浸卦。這個(gè)對(duì)象稱為互斥對(duì)象或者互斥量。

一般我們?nèi)粘Uf的互斥鎖就能達(dá)到這個(gè)目的案糙。

互斥量可以有多個(gè)限嫌,它們所保護(hù)的臨界區(qū)也可以有多個(gè)。先從簡(jiǎn)單的說起时捌,一個(gè)互斥量和一個(gè)臨界區(qū)怒医。

(一) 一個(gè)互斥量和一個(gè)臨界區(qū)

上圖就是一個(gè)互斥量和一個(gè)臨界區(qū)的例子。當(dāng)線程1先進(jìn)入臨界區(qū)的時(shí)候奢讨,當(dāng)前臨界區(qū)處于未上鎖的狀態(tài)稚叹,于是它便先將臨界區(qū)上鎖。線程1獲取到臨界區(qū)里面的值禽笑。

這個(gè)時(shí)候線程2準(zhǔn)備進(jìn)入臨界區(qū)入录,由于線程1把臨界區(qū)上鎖了蛤奥,所以線程2進(jìn)入臨界區(qū)失敗佳镜,線程2由就緒狀態(tài)轉(zhuǎn)成睡眠狀態(tài)。線程1繼續(xù)對(duì)臨界區(qū)的共享數(shù)據(jù)進(jìn)行寫入操作凡桥。

當(dāng)線程1完成所有的操作以后蟀伸,線程1調(diào)用解鎖操作。當(dāng)臨界區(qū)被解鎖以后缅刽,會(huì)嘗試喚醒正在睡眠的線程2啊掏。線程2被喚醒以后,由睡眠狀態(tài)再次轉(zhuǎn)換成就緒狀態(tài)衰猛。線程2準(zhǔn)備進(jìn)入臨界區(qū)迟蜜,當(dāng)臨界區(qū)此處處于未上鎖的狀態(tài),線程2便將臨界區(qū)上鎖啡省。

經(jīng)過 read娜睛、write 一系列操作以后髓霞,最終在離開臨界區(qū)的時(shí)候會(huì)解鎖。

線程在離開臨界區(qū)的時(shí)候畦戒,一定要記得把對(duì)應(yīng)的互斥量解鎖方库。這樣其他因臨界區(qū)被上鎖而導(dǎo)致睡眠的線程還有機(jī)會(huì)被喚醒。所以對(duì)同一個(gè)互斥變量的鎖定和解鎖必須成對(duì)的出現(xiàn)障斋。既不可以對(duì)一個(gè)互斥變量進(jìn)行重復(fù)的鎖定纵潦,也不能對(duì)一個(gè)互斥變量進(jìn)行多次的解鎖。

如果對(duì)一個(gè)互斥變量鎖定多次可能會(huì)導(dǎo)致臨界區(qū)最終永遠(yuǎn)阻塞垃环⊙悖可能有人會(huì)問了,對(duì)一個(gè)未鎖定的互斥變成解鎖多次會(huì)出現(xiàn)什么問題呢遂庄?

在 Go 1.8 之前被济,雖然對(duì)互斥變量解鎖多次不會(huì)引起任何 goroutine 的阻塞,但是它可能引起一個(gè)運(yùn)行時(shí)的恐慌涧团。Go 1.8 之前的版本只磷,是可以嘗試恢復(fù)這個(gè)恐慌的,但是恢復(fù)以后泌绣,可能會(huì)導(dǎo)致一系列的問題钮追,比如重復(fù)解鎖操作的 goroutine 會(huì)永久的阻塞。所以 Go 1.8 版本以后此類運(yùn)行時(shí)的恐慌就變成了不可恢復(fù)的了阿迈。所以對(duì)互斥變量反復(fù)解鎖就會(huì)導(dǎo)致運(yùn)行時(shí)操作元媚,最終程序異常退出。

(二) 多個(gè)互斥量和一個(gè)臨界區(qū)

在這種情況下苗沧,極容易產(chǎn)生線程死鎖的情況刊棕。所以盡量不要讓不同的互斥量所保護(hù)的臨界區(qū)重疊。

上圖這個(gè)例子中待逞,一個(gè)臨界區(qū)中存在2個(gè)互斥量:互斥量 A 和互斥量 B甥角。

線程1先鎖定了互斥量 A ,接著線程2鎖定了互斥量 B识樱。當(dāng)線程1在成功鎖定互斥量 B 之前永遠(yuǎn)不會(huì)釋放互斥量 A嗤无。同樣,線程2在成功鎖定互斥量 A 之前永遠(yuǎn)不會(huì)釋放互斥量 B怜庸。那么這個(gè)時(shí)候線程1和線程2都因無法鎖定自己需要鎖定的互斥量当犯,都由 ready 就緒狀態(tài)轉(zhuǎn)換為 sleep 睡眠狀態(tài)。這是就產(chǎn)生了線程死鎖了割疾。

線程死鎖的產(chǎn)生原因有以下幾種:

1.系統(tǒng)資源競(jìng)爭(zhēng)

2.進(jìn)程推薦順序非法

3.死鎖必要條件(必要條件中任意一個(gè)不滿足嚎卫,死鎖都不會(huì)發(fā)生)

(1). 互斥條件

(2). 不剝奪條件

(3). 請(qǐng)求和保持條件

(4). 循環(huán)等待條件

想避免線程死鎖的情況發(fā)生有以下幾種方法可以解決:

1.預(yù)防死鎖

(1). 資源有序分配法(破壞環(huán)路等待條件)

(2). 資源原子分配法(破壞請(qǐng)求和保持條件)

2.避免死鎖

銀行家算法

3.檢測(cè)死鎖

死鎖定理(資源分配圖化簡(jiǎn)法),這種方法雖然可以檢測(cè)宏榕,但是無法預(yù)防拓诸,檢測(cè)出來了死鎖還需要配合解除死鎖的方法才行胸懈。

徹底解決死鎖有以下幾種方法:

1.剝奪資源

2.撤銷進(jìn)程

3.試鎖定 — 回退

如果在執(zhí)行一個(gè)代碼塊的時(shí)候,需要先后(順序不定)鎖定兩個(gè)變量恰响,那么在成功鎖定其中一個(gè)互斥量之后應(yīng)該使用試鎖定的方法來鎖定另外一個(gè)變量趣钱。如果試鎖定第二個(gè)互斥量失敗,就把已經(jīng)鎖定的第一個(gè)互斥量解鎖胚宦,并重新對(duì)這兩個(gè)互斥量進(jìn)行鎖定和試鎖定首有。

如上圖,線程2在鎖定互斥量 B 的時(shí)候枢劝,再試鎖定互斥量 A井联,此時(shí)鎖定失敗,于是就把互斥量 B 也一起解鎖您旁。接著線程1會(huì)來鎖定互斥量 A烙常。此時(shí)也不會(huì)出現(xiàn)死鎖的情況。

4.固定順序鎖定

這種方式就是讓線程1和線程2都按照相同的順序鎖定互斥量鹤盒,都按成功鎖定互斥量1以后才能去鎖定互斥量2 蚕脏。這樣就能保證在一個(gè)線程完全離開這些重疊的臨界區(qū)之前,不會(huì)有其他同樣需要鎖定那些互斥量的線程進(jìn)入到那里侦锯。

(三) 多個(gè)互斥量和多個(gè)臨界區(qū)

多個(gè)臨界區(qū)和多個(gè)互斥量的情況就要看是否會(huì)有沖突的區(qū)域驼鞭,如果出現(xiàn)相互交集的沖突區(qū)域,后進(jìn)臨界區(qū)的線程就會(huì)進(jìn)入睡眠狀態(tài)尺碰,直到該臨界區(qū)的線程完成任務(wù)以后挣棕,再被喚醒。

一般情況下亲桥,應(yīng)該盡量少的使用互斥量洛心。每個(gè)互斥量保護(hù)的臨界區(qū)應(yīng)該在合理范圍內(nèi)并盡量大。但是如果發(fā)現(xiàn)多個(gè)線程會(huì)頻繁出入某個(gè)較大的臨界區(qū)题篷,并且它們之間經(jīng)常存在訪問沖突词身,那么就應(yīng)該把這個(gè)較大的臨界區(qū)劃分的更小一點(diǎn),并使用不同的互斥量保護(hù)起來悼凑。這樣做的目的就是為了讓等待進(jìn)入同一個(gè)臨界區(qū)的線程數(shù)變少偿枕,從而降低線程被阻塞的概率捐迫,并減少它們被迫進(jìn)入睡眠狀態(tài)的時(shí)間焰情,這從一定程度上提高了程序的整體性能断国。

在說另外一個(gè)線程同步的方法之前,回答一下文章開頭留下的一個(gè)疑問:可重入只是線程安全的充分不必要條件渔欢,并不是充要條件。這個(gè)反例在下面會(huì)講到瘟忱。

這個(gè)問題最關(guān)鍵的一點(diǎn)在于:mutex 是不可重入的奥额。

舉個(gè)例子:

在下面這段代碼中苫幢,函數(shù) increment_counter 是線程安全的,但不是可重入的垫挨。

C

#include

intincrement_counter()?

{

staticintcounter=0;

staticpthread_mutex_tmutex=PTHREAD_MUTEX_INITIALIZER;

pthread_mutex_lock(&mutex);

// only allow one thread to increment at a time

++counter;

// store value before any other threads increment it further

intresult=counter;

pthread_mutex_unlock(&mutex);

returnresult;

}

上面的代碼中韩肝,函數(shù) increment_counter 可以在多個(gè)線程中被調(diào)用,因?yàn)橛幸粋€(gè)互斥鎖 mutex 來同步對(duì)共享變量 counter 的訪問九榔。但是如果這個(gè)函數(shù)用在可重入的中斷處理程序中哀峻,如果在 pthread_mutex_lock(&mutex) 和

pthread_mutex_unlock(&mutex) 之間產(chǎn)生另一個(gè)調(diào)用函數(shù) increment_counter 的中斷,則會(huì)第二次執(zhí)行此函數(shù)哲泊,此時(shí)由于 mutex 已被 lock剩蟀,函數(shù)會(huì)在 pthread_mutex_lock(&mutex) 處阻塞,并且由于 mutex 沒有機(jī)會(huì)被 unlock切威,阻塞會(huì)永遠(yuǎn)持續(xù)下去育特。簡(jiǎn)言之,問題在于pthread的 mutex 是不可重入的先朦。

解決辦法是設(shè)定 PTHREAD_MUTEX_RECURSIVE 屬性缰冤。然而對(duì)于給出的問題而言,專門使用一個(gè) mutex 來保護(hù)一次簡(jiǎn)單的增量操作顯然過于昂貴喳魏,因此c++11中的原子變量提供了一個(gè)可使此函數(shù)既線程安全又可重入(而且還更簡(jiǎn)潔)的替代方案:

C

#include

intincrement_counter()?

{

staticstd::atomiccounter(0);

// increment is guaranteed to be done atomically

intresult=++counter;

returnresult;

}

在 Go 中锋谐,互斥量在標(biāo)準(zhǔn)庫(kù)代碼包 sync 中的 Mutex 結(jié)構(gòu)體表示的。sync.Mutex 類型只有兩個(gè)公開的指針方法截酷,Lock 和 Unlock涮拗。前者用于鎖定當(dāng)前的互斥量,后者則用于對(duì)當(dāng)前的互斥量進(jìn)行解鎖迂苛。

2. 條件變量

在線程同步的方法中三热,還有一個(gè)可以與互斥量相提并論的同步方法,條件變量三幻。

條件變量與互斥量不同就漾,條件變量的作用并不是保證在同一時(shí)刻僅有一個(gè)線程訪問某一個(gè)共享數(shù)據(jù),而是在對(duì)應(yīng)的共享數(shù)據(jù)的狀態(tài)發(fā)生變化時(shí)念搬,通知其他因此而被阻塞的線程抑堡。條件變量總是與互斥變量組合使用的。

這類問題其實(shí)很常見朗徊。先用生產(chǎn)者消費(fèi)者的例子來舉例首妖。

如果不用條件變量,只用互斥量爷恳,來看看會(huì)發(fā)生什么后果有缆。

生產(chǎn)者線程在完成添加操作之前,其他的生產(chǎn)者線程和消費(fèi)者線程都無法進(jìn)行操作。同一個(gè)商品也只能被一個(gè)消費(fèi)者消費(fèi)棚壁。

如果只用互斥量杯矩,可能會(huì)出現(xiàn)2個(gè)問題。

1.生產(chǎn)者線程獲得了互斥量以后袖外,卻發(fā)現(xiàn)商品已滿史隆,無法再添加新的商品了。于是該線程就會(huì)一直等待曼验。新的生產(chǎn)者也進(jìn)入不了臨界區(qū)泌射,消費(fèi)者也無法進(jìn)入。這時(shí)候就死鎖了蚣驼。

2.消費(fèi)者線程獲得了互斥量以后魄幕,卻發(fā)現(xiàn)商品是空的,無法消費(fèi)了颖杏。這個(gè)時(shí)候該線程也是會(huì)一直等待纯陨。新的生產(chǎn)者和消費(fèi)者也都無法進(jìn)入。這時(shí)候同樣也死鎖了留储。

這就是只用互斥量無法解決的問題翼抠。在多個(gè)線程之間,急需一套同步的機(jī)制获讳,能讓這些線程都協(xié)作起來阴颖。

條件變量就是大家熟悉的 P - V 操作了。這塊大家應(yīng)該比較熟悉丐膝,所以簡(jiǎn)單的過一下量愧。

P 操作就是 wait 操作,它的意思就是阻塞當(dāng)前線程帅矗,直到收到該條件變量發(fā)來的通知偎肃。

V 操作就是 signal 操作,它的意思就是讓該條件變量向至少一個(gè)正在等待它通知的線程發(fā)送通知浑此,以表示某個(gè)共享數(shù)據(jù)的狀態(tài)已經(jīng)變化累颂。

Broadcast 廣播通知,它的意思就是讓條件變量給正在等待它通知的所有線程發(fā)送通知凛俱,以表示某個(gè)共享數(shù)據(jù)的狀態(tài)已經(jīng)發(fā)生改變紊馏。

signal 可以操作多次,如果操作3次蒲犬,就代表發(fā)了3次信號(hào)通知朱监。如上圖。

P - V 操作設(shè)計(jì)美妙之處在于暖哨,P 操作的次數(shù)與 V 操作的次數(shù)是相同的赌朋。wait 多少次凰狞,signal 對(duì)應(yīng)的有多少次篇裁∨媛看上圖,這個(gè)循環(huán)就是這么的奇妙达布。

生產(chǎn)者消費(fèi)者問題

這個(gè)問題可以形象的描述成像上圖這樣团甲,門衛(wèi)守護(hù)著臨界區(qū)的安全。售票廳記錄著當(dāng)前 semaphone 的值黍聂,它也控制著門衛(wèi)是否打開臨界區(qū)躺苦。

臨界區(qū)只允許一個(gè)線程進(jìn)入,當(dāng)已經(jīng)有一個(gè)線程了产还,再來一個(gè)線程匹厘,就會(huì)被 lock 住。售票廳也會(huì)記錄當(dāng)前阻塞的線程數(shù)脐区。

當(dāng)之前的線程離開以后愈诚,售票廳就會(huì)告訴門衛(wèi),允許一個(gè)線程進(jìn)入臨界區(qū)牛隅。

用 P-V 偽代碼來描述生產(chǎn)者消費(fèi)者:

初始變量:

C

semaphore? mutex=1;// 臨界區(qū)互斥信號(hào)量?

semaphore? empty=n;// 空閑緩沖區(qū)個(gè)數(shù)?

semaphore? full=0;// 緩沖區(qū)初始化為空

生產(chǎn)者線程:

C

producer()

{

while(1){

produce an item in nextp;

P(empty);

P(mutex);

add nextp to buffer;

V(mutex);

V(full);

}

}

消費(fèi)者線程:

C

consumer()

{

while(1){

P(full);

P(mutex);

remove an item from buffer;

V(mutex);

V(empty);

consume the item;

}

}

雖然在生產(chǎn)者和消費(fèi)者單個(gè)程序里面 P炕柔,V 并不是成對(duì)的,但是整個(gè)程序里面 P媒佣,V 還是成對(duì)的匕累。

讀者寫者問題——讀者優(yōu)先,寫者延遲

讀者優(yōu)先默伍,寫進(jìn)程被延遲欢嘿。只要有讀者在讀,后來的讀者都可以隨意進(jìn)來讀也糊。

讀者要先進(jìn)入 rmutex 炼蹦,查看 readcount,然后修改 readcout 的值显设,最后再去讀數(shù)據(jù)框弛。對(duì)于每個(gè)讀進(jìn)程都是寫者,都要進(jìn)去修改 readcount 的值捕捂,所以還要單獨(dú)設(shè)置一個(gè) rmutex 互斥訪問瑟枫。

初始變量:

C

intreadcount=0;// 讀者數(shù)量?

semaphore? rmutex=1;// 保證更新 readcount 互斥?

semaphore? wmutex=1;// 保證讀者和寫著互斥的訪問文件

讀者線程:

C

reader()

{

while(1){

P(rmutex);// 準(zhǔn)備進(jìn)入,修改 readcount指攒,“開門”

if(readcount==0){// 說明是第一個(gè)讀者

P(wmutex);// 拿到”鑰匙”慷妙,阻止寫線程來寫

}

readcount++;

V(rmutex);

reading;

P(rmutex);// 準(zhǔn)備離開

readcount--;

if(readcount==0){// 說明是最后一個(gè)讀者

V(wmutex);// 交出”鑰匙”,讓寫線程來寫

}

V(rmutex);// 離開允悦,“關(guān)門”

}

}

寫者線程:

C

writer()

{

while(1){

P(wmutex);

writing;

V(wmutex);

}

}

讀者寫者問題——寫者優(yōu)先膝擂,讀者延遲

有寫者寫,禁止后面的讀者來讀。在寫者前的讀者架馋,讀完就走狞山。只要有寫者在等待,禁止后來的讀者進(jìn)去讀叉寂。

初始變量:

C

intreadcount=0;// 讀者數(shù)量?

semaphore? rmutex=1;// 保證更新 readcount 互斥?

semaphore? wmutex=1;// 保證讀者和寫著互斥的訪問文件?

semaphore? w=1;// 用于實(shí)現(xiàn)“寫者優(yōu)先”

讀者線程:

C

reader()

{

while(1){

P(w);// 在沒有寫者的時(shí)候才能請(qǐng)求進(jìn)入

P(rmutex);// 準(zhǔn)備進(jìn)入萍启,修改 readcount,“開門”

if(readcount==0){// 說明是第一個(gè)讀者

P(wmutex);// 拿到”鑰匙”屏鳍,阻止寫線程來寫

}

readcount++;

V(rmutex);

V(w);

reading;

P(rmutex);// 準(zhǔn)備離開

readcount--;

if(readcount==0){// 說明是最后一個(gè)讀者

V(wmutex);// 交出”鑰匙”勘纯,讓寫線程來寫

}

V(rmutex);// 離開,“關(guān)門”

}

}

寫者線程:

C

writer()

{

while(1){

P(w);

P(wmutex);

writing;

V(wmutex);

V(w);

}

}

哲學(xué)家進(jìn)餐問題

假設(shè)有五位哲學(xué)家圍坐在一張圓形餐桌旁钓瞭,做以下兩件事情之一:吃飯驳遵,或者思考。吃東西的時(shí)候山涡,他們就停止思考堤结,思考的時(shí)候也停止吃東西。餐桌中間有一大碗意大利面佳鳖,每?jī)蓚€(gè)哲學(xué)家之間有一只餐叉霍殴。因?yàn)橛靡恢徊筒婧茈y吃到意大利面,所以假設(shè)哲學(xué)家必須用兩只餐叉吃東西系吩。他們只能使用自己左右手邊的那兩只餐叉来庭。哲學(xué)家就餐問題有時(shí)也用米飯和筷子而不是意大利面和餐叉來描述,因?yàn)楹苊黠@穿挨,吃米飯必須用兩根筷子月弛。

初始變量:

C

semaphore? chopstick[5]={1,1,1,1,1};// 初始化信號(hào)量?

semaphore? mutex=1;// 設(shè)置取筷子的信號(hào)量

哲學(xué)家線程:

C

Pi()

{

do{

P(mutex);// 獲得取筷子的互斥量

P(chopstick[i]);// 取左邊的筷子

P(chopstick[(i+1)%5]);// 取右邊的筷子

V(mutex);// 釋放取筷子的信號(hào)量

eat;

V(chopstick[i]);// 放回左邊的筷子

V(chopstick[(i+1)%5]);// 放回右邊的筷子

think;

}while(1);

}

綜上所述,互斥量可以實(shí)現(xiàn)對(duì)臨界區(qū)的保護(hù)科盛,并會(huì)阻止競(jìng)態(tài)條件的發(fā)生帽衙。條件變量作為補(bǔ)充手段,可以讓多方協(xié)作更加有效率贞绵。

在 Go 的標(biāo)準(zhǔn)庫(kù)中厉萝,sync 包里面 sync.Cond 類型代表了條件變量。但是和互斥鎖和讀寫鎖不同的是榨崩,簡(jiǎn)單的聲明無法創(chuàng)建出一個(gè)可用的條件變量谴垫,還需要用到 sync.NewCond 函數(shù)。

Go

func NewCond( l locker) *Cond

*sync.Cond 類型的方法集合中有3個(gè)方法母蛛,即 Wait翩剪、Signal 和 Broadcast 。

二. 簡(jiǎn)單的線程鎖方案

實(shí)現(xiàn)線程安全的方案最簡(jiǎn)單的方法就是加鎖了彩郊。

先看看 OC 中如何實(shí)現(xiàn)一個(gè)線程安全的字典吧前弯。

在 Weex 的源碼中蚪缀,就實(shí)現(xiàn)了一套線程安全的字典。類名叫 WXThreadSafeMutableDictionary恕出。

Objective-C

/**

*? @abstract Thread safe NSMutableDictionary

*/

@interfaceWXThreadSafeMutableDictionary :NSMutableDictionary

@property(nonatomic,strong)dispatch_queue_tqueue;

@property(nonatomic,strong)NSMutableDictionary*dict;

@end

具體實(shí)現(xiàn)如下:

Objective-C

-(instancetype)initCommon

{

self=[superinit];

if(self){

NSString*uuid=[NSStringstringWithFormat:@"com.taobao.weex.dictionary_%p",self];

_queue=dispatch_queue_create([uuid UTF8String],DISPATCH_QUEUE_CONCURRENT);

}

returnself;

}

該線程安全的字典初始化的時(shí)候會(huì)新建一個(gè)并發(fā)的 queue询枚。

Objective-C

-(NSUInteger)count

{

__blockNSUIntegercount;

dispatch_sync(_queue,^{

count=_dict.count;

});

returncount;

}

-(id)objectForKey:(id)aKey

{

__blockidobj;

dispatch_sync(_queue,^{

obj=_dict[aKey];

});

returnobj;

}

-(NSEnumerator*)keyEnumerator

{

__blockNSEnumerator*enu;

dispatch_sync(_queue,^{

enu=[_dict keyEnumerator];

});

returnenu;

}

-(id)copy{

__blockidcopyInstance;

dispatch_sync(_queue,^{

copyInstance=[_dictcopy];

});

returncopyInstance;

}

讀取的這些方法都用 dispatch_sync 。

Objective-C

-(void)setObject:(id)anObject forKey:(id)aKey

{

aKey=[aKey copyWithZone:NULL];

dispatch_barrier_async(_queue,^{

_dict[aKey]=anObject;

});

}

-(void)removeObjectForKey:(id)aKey

{

dispatch_barrier_async(_queue,^{

[_dict removeObjectForKey:aKey];

});

}

-(void)removeAllObjects{

dispatch_barrier_async(_queue,^{

[_dict removeAllObjects];

});

}

和寫入相關(guān)的方法都用 dispatch_barrier_async剃根。

再看看 Go 用互斥量如何實(shí)現(xiàn)一個(gè)簡(jiǎn)單的線程安全的 Map 吧哩盲。

既然要用到互斥量前方,那么我們封裝一個(gè)包含互斥量的 Map 掂咒。

Go

typeMyMapstruct{

sync.Mutex

mmap[int]int

}

varmyMap*MyMap

func init(){

myMap=&MyMap{

m:make(map[int]int,100),

}

}

再簡(jiǎn)單的實(shí)現(xiàn) Map 的基礎(chǔ)方法秆吵。

Go

func builtinMapStore(k, v int){

myMap.Lock()

defermyMap.Unlock()

myMap.m[k]=v

}

func builtinMapLookup(k int) int{

myMap.Lock()

defermyMap.Unlock()

ifv,ok:=myMap.m[k];!ok{

return-1

}else{

returnv

}

}

func builtinMapDelete(k int){

myMap.Lock()

defermyMap.Unlock()

if_,ok:=myMap.m[k];!ok{

return

}else{

delete(myMap.m,k)

}

}

實(shí)現(xiàn)思想比較簡(jiǎn)單,在每個(gè)操作前都加上 lock,在每個(gè)函數(shù)結(jié)束 defer 的時(shí)候都加上 unlock瞧柔。

這種加鎖的方式實(shí)現(xiàn)的線程安全的字典,優(yōu)點(diǎn)是比較簡(jiǎn)單查近,缺點(diǎn)是性能不高氮兵。文章最后會(huì)進(jìn)行幾種實(shí)現(xiàn)方法的性能對(duì)比,用數(shù)字說話抱慌,就知道這種基于互斥量加鎖方式實(shí)現(xiàn)的性能有多差了逊桦。

在語言原生就自帶線程安全 Map 的語言中,它們的原生底層實(shí)現(xiàn)都不是通過單純的加鎖來實(shí)現(xiàn)線程安全的抑进,比如 Java 的 ConcurrentHashMap强经,Go 1.9 新加的 sync.map。

三. 現(xiàn)代線程安全的 Lock - Free 方案 CAS

在 Java 的 ConcurrentHashMap 底層實(shí)現(xiàn)中大量的利用了 volatile寺渗,final匿情,CAS 等 Lock-Free 技術(shù)來減少鎖競(jìng)爭(zhēng)對(duì)于性能的影響。

在 Go 中也大量的使用了原子操作信殊,CAS 是其中之一炬称。比較并交換即 “Compare And Swap”,簡(jiǎn)稱 CAS涡拘。

Go

func CompareAndSwapInt32(addr *int32, old, new int32) (swapped bool)

func CompareAndSwapInt64(addr *int64, old, new int64) (swapped bool)

func CompareAndSwapUint32(addr *uint32, old, new uint32) (swapped bool)

func CompareAndSwapUint64(addr *uint64, old, new uint64) (swapped bool)

func CompareAndSwapUintptr(addr *uintptr, old, new uintptr) (swapped bool)

func CompareAndSwapPointer(addr *unsafe.Pointer, old, new unsafe.Pointer) (swapped bool)

CAS 會(huì)先判斷參數(shù) addr 指向的被操作值與參數(shù) old 的值是否相等玲躯。如果相當(dāng),相應(yīng)的函數(shù)才會(huì)用參數(shù) new 代表的新值替換舊值鳄乏。否則跷车,替換操作就會(huì)被忽略。

這一點(diǎn)與互斥鎖明顯不同汞窗,CAS 總是假設(shè)被操作的值未曾改變姓赤,并一旦確認(rèn)這個(gè)假設(shè)成立,就立即進(jìn)行值的替換仲吏。而互斥鎖的做法就更加謹(jǐn)慎不铆,總是先假設(shè)會(huì)有并發(fā)的操作修改被操作的值蝌焚,并需要使用鎖將相關(guān)操作放入臨界區(qū)中加以保護(hù)∈某猓可以說互斥鎖的做法趨于悲觀只洒,CAS 的做法趨于樂觀,類似樂觀鎖劳坑。

CAS 做法最大的優(yōu)勢(shì)在于可以不創(chuàng)建互斥量和臨界區(qū)的情況下毕谴,完成并發(fā)安全的值替換操作。這樣大大的減少了線程同步操作對(duì)程序性能的影響距芬。當(dāng)然 CAS 也有一些缺點(diǎn)涝开,缺點(diǎn)下一章會(huì)提到。

接下來看看源碼是如何實(shí)現(xiàn)的框仔。以下以64位為例舀武,32位類似。

C

TEXT ·CompareAndSwapUintptr(SB),NOSPLIT,$0-25

JMP ·CompareAndSwapUint64(SB)

TEXT ·CompareAndSwapInt64(SB),NOSPLIT,$0-25

JMP ·CompareAndSwapUint64(SB)

TEXT ·CompareAndSwapUint64(SB),NOSPLIT,$0-25

MOVQ? ? addr+0(FP),BP

MOVQ? ? old+8(FP),AX

MOVQnew+16(FP),CX

? ? LOCK

CMPXCHGQ? ? CX,0(BP)

SETEQ? swapped+24(FP)

? ? RET

上述實(shí)現(xiàn)最關(guān)鍵的一步就是 CMPXCHG离斩。

查詢 Intel 的文檔

文檔上說:

比較 eax 和目的操作數(shù)(第一個(gè)操作數(shù))的值银舱,如果相同,ZF 標(biāo)志被設(shè)置跛梗,同時(shí)源操作數(shù)(第二個(gè)操作)的值被寫到目的操作數(shù)寻馏,否則,清 ZF 標(biāo)志核偿,并且把目的操作數(shù)的值寫回 eax诚欠。

于是也就得出了 CMPXCHG 的工作原理:

比較 _old 和 (*__ptr) 的值,如果相同宪祥,ZF 標(biāo)志被設(shè)置聂薪,同時(shí) _new 的值被寫到 (*__ptr),否則蝗羊,清 ZF 標(biāo)志藏澳,并且把 (*__ptr) 的值寫回 _old。

在 Intel 平臺(tái)下耀找,會(huì)用 LOCK CMPXCHG 來實(shí)現(xiàn)翔悠,這里的 LOCK 是 CPU 鎖。

Intel 的手冊(cè)對(duì) LOCK 前綴的說明如下:

1.確保對(duì)內(nèi)存的讀-改-寫操作原子執(zhí)行野芒。在 Pentium 及 Pentium 之前的處理器中蓄愁,帶有 LOCK 前綴的指令在執(zhí)行期間會(huì)鎖住總線,使得其他處理器暫時(shí)無法通過總線訪問內(nèi)存狞悲。很顯然撮抓,這會(huì)帶來昂貴的開銷。從 Pentium 4摇锋,Intel Xeon 及 P6 處理器開始丹拯,Intel 在原有總線鎖的基礎(chǔ)上做了一個(gè)很有意義的優(yōu)化:如果要訪問的內(nèi)存區(qū)域(area of memory)在 LOCK 前綴指令執(zhí)行期間已經(jīng)在處理器內(nèi)部的緩存中被鎖定(即包含該內(nèi)存區(qū)域的緩存行當(dāng)前處于獨(dú)占或以修改狀態(tài))站超,并且該內(nèi)存區(qū)域被完全包含在單個(gè)緩存行(cache line)中,那么處理器將直接執(zhí)行該指令乖酬。由于在指令執(zhí)行期間該緩存行會(huì)一直被鎖定死相,其它處理器無法讀/寫該指令要訪問的內(nèi)存區(qū)域,因此能保證指令執(zhí)行的原子性咬像。這個(gè)操作過程叫做緩存鎖定(cache locking)算撮,緩存鎖定將大大降低 LOCK 前綴指令的執(zhí)行開銷,但是當(dāng)多處理器之間的競(jìng)爭(zhēng)程度很高或者指令訪問的內(nèi)存地址未對(duì)齊時(shí)县昂,仍然會(huì)鎖住總線肮柜。

2.禁止該指令與之前和之后的讀和寫指令重排序。

3.把寫緩沖區(qū)中的所有數(shù)據(jù)刷新到內(nèi)存中七芭。

看完描述素挽,可以看出,CPU 鎖主要分兩種狸驳,總線鎖和緩存鎖∷跞總線鎖用在老的 CPU 中耙箍,緩存鎖用在新的 CPU 中。

所謂總線鎖就是使用 CPU 提供的一個(gè)LOCK#信號(hào)酥馍,當(dāng)一個(gè)處理器在總線上輸出此信號(hào)時(shí)辩昆,其他處理器的請(qǐng)求將被阻塞住,那么該 CPU 可以獨(dú)占使用共享內(nèi)存旨袒≈耄總線鎖的這種方式,在執(zhí)行期間會(huì)鎖住總線砚尽,使得其他處理器暫時(shí)無法通過總線訪問內(nèi)存施无。所以總線鎖定的開銷比較大,最新的處理器在某些場(chǎng)合下使用緩存鎖定代替總線鎖定來進(jìn)行優(yōu)化必孤。

所謂緩存鎖定就是如果緩存在處理器緩存行中內(nèi)存區(qū)域在 LOCK 操作期間被鎖定猾骡,當(dāng)它執(zhí)行鎖操作回寫內(nèi)存時(shí),處理器不在總線上產(chǎn)生 LOCK#信號(hào)敷搪,而是修改內(nèi)部的內(nèi)存地址兴想,并允許它的緩存一致性機(jī)制來保證操作的原子性,因?yàn)榫彺嬉恢滦詸C(jī)制會(huì)阻止同時(shí)修改被兩個(gè)以上處理器緩存的內(nèi)存區(qū)域數(shù)據(jù)赡勘,當(dāng)其他處理器回寫已被鎖定的緩存行的數(shù)據(jù)時(shí)會(huì)對(duì)緩存行無效嫂便。

有兩種情況處理器無法使用緩存鎖。

第一種情況是闸与,當(dāng)操作的數(shù)據(jù)不能被緩存在處理器內(nèi)部毙替,或操作的數(shù)據(jù)跨多個(gè)緩存行(cache line)曼振,則處理器會(huì)調(diào)用總線鎖定。

第二種情況是:有些處理器不支持緩存鎖定蔚龙。一些老的 CPU 就算鎖定的內(nèi)存區(qū)域在處理器的緩存行中也會(huì)調(diào)用總線鎖定冰评。

雖然緩存鎖可以大大降低 CPU 鎖的執(zhí)行開銷,但是如果遇到多處理器之間的競(jìng)爭(zhēng)程度很高或者指令訪問的內(nèi)存地址未對(duì)齊時(shí)木羹,仍然會(huì)鎖住總線甲雅。所以緩存鎖和總線鎖相互配合,效果更佳坑填。

綜上抛人,用 CAS 方式來保證線程安全的方式就比用互斥鎖的方式效率要高很多。

四. CAS 的缺陷

雖然 CAS 的效率高脐瑰,但是依舊存在3大問題妖枚。

1. ABA 問題

線程1準(zhǔn)備用 CAS 將變量的值由 A 替換為 B ,在此之前苍在,線程2將變量的值由 A 替換為 C 绝页,又由 C 替換為 A,然后線程1執(zhí)行 CAS 時(shí)發(fā)現(xiàn)變量的值仍然為 A寂恬,所以 CAS 成功续誉。但實(shí)際上這時(shí)的現(xiàn)場(chǎng)已經(jīng)和最初不同了。圖上也為了分開兩個(gè) A 不同初肉,所以用不同的顏色標(biāo)記了酷鸦。最終線程2把 A 替換成了 B 。這就是經(jīng)典的 ABA 問題牙咏。但是這會(huì)導(dǎo)致項(xiàng)目出現(xiàn)什么問題呢臼隔?

設(shè)想存在這樣一個(gè)鏈棧,棧里面存儲(chǔ)了一個(gè)鏈表妄壶,棧頂是 A摔握,A 的 next 指針指向 B。在線程1中盯拱,要將棧頂元素 A 用 CAS 把它替換成 B盒发。接著線程2來了,線程2將之前包含 A狡逢,B 元素的鏈表都 pop 出去宁舰。然后 push 進(jìn)來一個(gè) A - C - D 鏈表,棧頂元素依舊是 A奢浑。這時(shí)線程1發(fā)現(xiàn) A 沒有發(fā)生變化蛮艰,于是替換成 B。這個(gè)時(shí)候 B 的 next 其實(shí)為 nil雀彼。替換完成以后壤蚜,線程2操作的鏈表 C - D 這里就與表頭斷開連接了即寡。也就是說線程1 CAS 操作結(jié)束,C - D 就被丟失了袜刷,再也找不回來了聪富。棧中只剩下 B 一個(gè)元素了。這很明顯出現(xiàn)了 bug著蟹。

那怎么解決這種情況呢墩蔓?最通用的做法就是加入版本號(hào)進(jìn)行標(biāo)識(shí)。

每次操作都加上版本號(hào)萧豆,這樣就可以完美解決 ABA 的問題了奸披。

2. 循環(huán)時(shí)間可能過長(zhǎng)

自旋 CAS 如果長(zhǎng)時(shí)間不成功,會(huì)給 CPU 帶來非常大的執(zhí)行開銷涮雷。如果能支持 CPU 提供的 Pause 指令阵面,那么 CAS 的效率能有一定的提升。Pause 指令有兩個(gè)作用洪鸭,第一它可以延遲流水線執(zhí)行指令(de-pipeline)样刷,使 CPU 不會(huì)消耗過多的執(zhí)行資源,延遲的時(shí)間取決于具體實(shí)現(xiàn)的版本卿嘲,在一些處理器上延遲時(shí)間是零颂斜。第二它可以避免在退出循環(huán)的時(shí)候因內(nèi)存順序沖突(memory order violation)而引起 CPU 流水線被清空(CPU pipeline flush),從而提高 CPU 的執(zhí)行效率拾枣。

3. 只能保證一個(gè)共享變量的原子操作

CAS 操作只能保證一個(gè)共享變量的原子操作,但是保證多個(gè)共享變量操作的原子性盒让。一般做法可能就考慮利用鎖了梅肤。

不過也可以利用一個(gè)結(jié)構(gòu)體,把兩個(gè)變量合并成一個(gè)變量邑茄。這樣還可以繼續(xù)利用 CAS 來保證原子性操作姨蝴。

五. Lock - Free 方案舉例

在 Lock - Free方案舉例之前,先來回顧一下互斥量的方案肺缕。上面我們用互斥量實(shí)現(xiàn)了 Go 的線程安全的 Map左医。至于這個(gè) Map 的性能如何,接下來對(duì)比的時(shí)候可以看看數(shù)據(jù)同木。

1. NO Lock - Free 方案

如果不用 Lock - Free 方案也不用簡(jiǎn)單的互斥量的方案,如何實(shí)現(xiàn)一個(gè)線程安全的字典呢?答案是利用分段鎖的設(shè)計(jì)岛请,只有在同一個(gè)分段內(nèi)才存在競(jìng)態(tài)關(guān)系沫浆,不同的分段鎖之間沒有鎖競(jìng)爭(zhēng)。相比于對(duì)整個(gè) Map 加鎖的設(shè)計(jì)洲尊,分段鎖大大的提高了高并發(fā)環(huán)境下的處理能力远豺。

Go

typeConcurrentMap[]*ConcurrentMapShared

typeConcurrentMapSharedstruct{

itemsmap[string]interface{}

sync.RWMutex// 讀寫鎖奈偏,保證進(jìn)入內(nèi)部 map 的線程安全

}

分段鎖 Segment 存在一個(gè)并發(fā)度。并發(fā)度可以理解為程序運(yùn)行時(shí)能夠同時(shí)更新 ConccurentMap 且不產(chǎn)生鎖競(jìng)爭(zhēng)的最大線程數(shù)躯护,實(shí)際上就是 ConcurrentMap 中的分段鎖個(gè)數(shù)惊来。即數(shù)組的長(zhǎng)度。

Go

varSHARD_COUNT=32

如果并發(fā)度設(shè)置的過小棺滞,會(huì)帶來嚴(yán)重的鎖競(jìng)爭(zhēng)問題裁蚁;如果并發(fā)度設(shè)置的過大,原本位于同一個(gè) Segment 內(nèi)的訪問會(huì)擴(kuò)散到不同的 Segment 中检眯,CPU cache 命中率會(huì)下降厘擂,從而引起程序性能下降。

ConcurrentMap 的初始化就是對(duì)數(shù)組的初始化锰瘸,并且初始化數(shù)組里面每個(gè)字典刽严。

Go

func New()ConcurrentMap{

m:=make(ConcurrentMap,SHARD_COUNT)

fori:=0;i

m[i]=&ConcurrentMapShared{items:make(map[string]interface{})}

}

returnm

}

ConcurrentMap 主要使用 Segment 來實(shí)現(xiàn)減小鎖粒度,把 Map 分割成若干個(gè) Segment避凝,在 put 的時(shí)候需要加讀寫鎖舞萄,get 時(shí)候只加讀鎖。

既然分段了管削,那么針對(duì)每個(gè) key 對(duì)應(yīng)哪一個(gè)段的邏輯就由一個(gè)哈希函數(shù)來定倒脓。

Go

func fnv32(key string) uint32{

hash:=uint32(2166136261)

constprime32=uint32(16777619)

fori:=0;i

hash*=prime32

hash^=uint32(key[i])

}

returnhash

}

上面這段哈希函數(shù)會(huì)根據(jù)每次傳入的 string ,計(jì)算出不同的哈希值含思。

Go

func (m ConcurrentMap) GetShard(key string) *ConcurrentMapShared{

returnm[uint(fnv32(key))%uint(SHARD_COUNT)]

}

根據(jù)哈希值對(duì)數(shù)組長(zhǎng)度取余崎弃,取出 ConcurrentMap 中的 ConcurrentMapShared。在 ConcurrentMapShared 中存儲(chǔ)對(duì)應(yīng)這個(gè)段的 key - value含潘。

Go

func (m ConcurrentMap) Set(key string, value interface{}){

// Get map shard.

shard:=m.GetShard(key)

shard.Lock()

shard.items[key]=value

shard.Unlock()

}

上面這段就是 ConcurrentMap 的 set 操作饲做。思路很清晰:先取出對(duì)應(yīng)段內(nèi)的 ConcurrentMapShared,然后再加讀寫鎖鎖定遏弱,寫入 key - value盆均,寫入成功以后再釋放讀寫鎖。

Go

func (m ConcurrentMap) Get(key string) (interface{}, bool){

// Get shard

shard:=m.GetShard(key)

shard.RLock()

// Get item from shard.

val,ok:=shard.items[key]

shard.RUnlock()

returnval,ok

}

上面這段就是 ConcurrentMap 的 get 操作漱逸。思路也很清晰:先取出對(duì)應(yīng)段內(nèi)的 ConcurrentMapShared泪姨,然后再加讀鎖鎖定,讀取 key - value饰抒,讀取成功以后再釋放讀鎖肮砾。

這里和 set 操作的區(qū)別就在于只需要加讀鎖即可,不用加讀寫鎖循集。

Go

func (m ConcurrentMap) Count() int{

count:=0

fori:=0;i

shard:=m[i]

shard.RLock()

count+=len(shard.items)

shard.RUnlock()

}

returncount

}

ConcurrentMap 的 Count 操作就是把 ConcurrentMap 數(shù)組的每一個(gè)分段元素里面的每一個(gè)元素都遍歷一遍唇敞,計(jì)算出總數(shù)。

Go

func (m ConcurrentMap) Keys() []string{

count:=m.Count()

ch:=make(chanstring,count)

gofunc(){

// 遍歷所有的 shard.

wg:=sync.WaitGroup{}

wg.Add(SHARD_COUNT)

for_,shard:=rangem{

gofunc(shard *ConcurrentMapShared){

// 遍歷所有的 key, value 鍵值對(duì).

shard.RLock()

forkey:=rangeshard.items{

ch<-key

}

shard.RUnlock()

wg.Done()

}(shard)

}

wg.Wait()

close(ch)

}()

// 生成 keys 數(shù)組,存儲(chǔ)所有的 key

keys:=make([]string,0,count)

fork:=rangech{

keys=append(keys,k)

}

returnkeys

}

上述是返回 ConcurrentMap 中所有 key 疆柔,結(jié)果裝在字符串?dāng)?shù)組中咒精。

Go

typeUpsertCbfunc(exist bool, valueInMap interface{}, newValue interface{}) interface{}

func (m ConcurrentMap) Upsert(key string, value interface{}, cb UpsertCb) (res interface{}){

shard:=m.GetShard(key)

shard.Lock()

v,ok:=shard.items[key]

res=cb(ok,v,value)

shard.items[key]=res

shard.Unlock()

returnres

}

上述代碼是 Upsert 操作。如果已經(jīng)存在了旷档,就更新模叙。如果是一個(gè)新元素,就用 UpsertCb 函數(shù)插入一個(gè)新的鞋屈。思路也是先根據(jù) string 找到對(duì)應(yīng)的段范咨,然后加讀寫鎖。這里只能加讀寫鎖厂庇,因?yàn)椴还苁?update 還是 insert 操作渠啊,都需要寫入。讀取 key 對(duì)應(yīng)的 value 值权旷,然后調(diào)用 UpsertCb 函數(shù)替蛉,把結(jié)果更新到 key 對(duì)應(yīng)的 value 中。最后釋放讀寫鎖即可拄氯。

UpsertCb 函數(shù)在這里值得說明的是躲查,這個(gè)函數(shù)是回調(diào)返回待插入到 map 中的新元素。這個(gè)函數(shù)當(dāng)且僅當(dāng)在讀寫鎖被鎖定的時(shí)候才會(huì)被調(diào)用译柏,因此一定不允許再去嘗試讀取同一個(gè) map 中的其他 key 值镣煮。因?yàn)檫@樣會(huì)導(dǎo)致線程死鎖。死鎖的原因是 Go 中 sync.RWLock 是不可重入的鄙麦。

完整的代碼見concurrent_map.go

這種分段的方法雖然比單純的加互斥量好很多典唇,因?yàn)?Segment 把鎖住的范圍進(jìn)一步的減少了,但是這個(gè)范圍依舊比較大胯府,還能再進(jìn)一步的減少鎖么蚓聘?

還有一點(diǎn)就是并發(fā)量的設(shè)置,要合理盟劫,不能太大也不能太小。

2. Lock - Free 方案

在 Go 1.9 的版本中默認(rèn)就實(shí)現(xiàn)了一種線程安全的 Map与纽,摒棄了 Segment(分段鎖)的概念侣签,而是啟用了一種全新的方式實(shí)現(xiàn),利用了 CAS 算法急迂,即 Lock - Free 方案影所。

采用 Lock - Free 方案以后,能比上一個(gè)分案僚碎,分段鎖更進(jìn)一步縮小鎖的范圍猴娩。性能大大提升。

接下來就讓我們來看看如何用 CAS 實(shí)現(xiàn)一個(gè)線程安全的高性能 Map 。

官方是 sync.map 有如下的描述:

這個(gè) Map 是線程安全的卷中,讀取矛双,插入,刪除也都保持著常數(shù)級(jí)的時(shí)間復(fù)雜度蟆豫。多個(gè) goroutines 協(xié)程同時(shí)調(diào)用 Map 方法也是線程安全的议忽。該 Map 的零值是有效的,并且零值是一個(gè)空的 Map 十减。線程安全的 Map 在第一次使用之后栈幸,不允許被拷貝。

這里解釋一下為何不能被拷貝帮辟。因?yàn)閷?duì)結(jié)構(gòu)體的復(fù)制不但會(huì)生成該值的副本速址,還會(huì)生成其中字段的副本。如此一來由驹,本應(yīng)施加于此的并發(fā)線程安全保護(hù)也就失效了芍锚。

作為源值賦給別的變量,作為參數(shù)值傳入函數(shù)荔棉,作為結(jié)果值從函數(shù)返回闹炉,作為元素值通過通道傳遞等都會(huì)造成值的復(fù)制。正確的做法是用指向該類型的指針類型的變量润樱。

Go 1.9 中 sync.map 的數(shù)據(jù)結(jié)構(gòu)如下:

Go

typeMapstruct{

? ? mu Mutex

// 并發(fā)讀取 map 中一部分的內(nèi)容是線程安全的渣触,這是不需要

// read 這部分自身讀取就是線程安全的,因?yàn)槭窃有缘囊既簟5谴鎯?chǔ)的時(shí)候還是需要 Mutex

// 存儲(chǔ)在 read 中的 entry 在并發(fā)讀取過程中是允許更新的嗅钻,即使沒有 Mutex 信號(hào)量,也是線程安全的店展。但是更新一個(gè)以前刪除的 entry 就需要把值拷貝到 dirty Map 中养篓,并且必須要帶上 Mutex

read atomic.Value// readOnly

// dirty 中包含 map 中必須要互斥量 mu 保護(hù)才能線程安全的部分。為了使 dirty 能快速的轉(zhuǎn)化成 read map赂蕴,dirty 中包含了 read map 中所有沒有被刪除的 entries

// 已經(jīng)刪除過的 entries 不存儲(chǔ)在 dirty map 中柳弄。在 clean map 中一個(gè)已經(jīng)刪除的 entry 一定是沒有被刪除過的,并且當(dāng)新值將要被存儲(chǔ)的時(shí)候概说,它們會(huì)被添加到 dirty map 中碧注。

// 當(dāng) dirty map 為 nil 的時(shí)候,下一次寫入的時(shí)候會(huì)通過 clean map 忽略掉舊的 entries 以后的淺拷貝副本來初始化 dirty map糖赔。

dirtymap[interface{}]*entry

// misses 記錄了 read map 因?yàn)樾枰袛?key 是否存在而鎖住了互斥量 mu 進(jìn)行了 update 操作以后的加載次數(shù)萍丐。

// 一旦 misses 值大到足夠去復(fù)制 dirty map 所需的花費(fèi)的時(shí)候,那么 dirty map 就被提升到未被修改狀態(tài)下的 read map放典,下次存儲(chǔ)就會(huì)創(chuàng)建一個(gè)新的 dirty map逝变。

missesint

}

在這個(gè) Map 中基茵,包含一個(gè)互斥量 mu,一個(gè)原子值 read壳影,一個(gè)非線程安全的字典 map拱层,這個(gè)字典的 key 是 interface{} 類型,value 是 *entry 類型态贤。最后還有一個(gè) int 類型的計(jì)數(shù)器舱呻。

先來說說原子值。atomic.Value 這個(gè)類型有兩個(gè)公開的指針方法悠汽,Load 和 Store 箱吕。Load 方法用于原子地的讀取原子值實(shí)例中存儲(chǔ)的值,它會(huì)返回一個(gè) interface{} 類型的結(jié)果柿冲,并且不接受任何參數(shù)茬高。Store 方法用于原子地在原子值實(shí)例中存儲(chǔ)一個(gè)值,它接受一個(gè) interface{} 類型的參數(shù)而沒有任何結(jié)果假抄。在未曾通過 Store 方法向原子值實(shí)例存儲(chǔ)值之前怎栽,它的 Load 方法總會(huì)返回 nil。

在這個(gè)線程安全的字典中宿饱,Load 和 Store 的都是一個(gè) readOnly 的數(shù)據(jù)結(jié)構(gòu)熏瞄。

Go

// readOnly 是一個(gè)不可變的結(jié)構(gòu)體,原子性的存儲(chǔ)在 Map.read 中

typereadOnlystruct{

mmap[interface{}]*entry

// 標(biāo)志 dirty map 中是否包含一些不在 m 中的 key 谬以。

amendedbool// true if the dirty map contains some key not in m.

}

readOnly 中存儲(chǔ)了一個(gè)非線程安全的字典强饮,這個(gè)字典和上面 dirty map 存儲(chǔ)的類型完全一致。key 是 interface{} 類型为黎,value 是 *entry 類型邮丰。

Go

// entry 是一個(gè)插槽,與 map 中特定的 key 相對(duì)應(yīng)

typeentrystruct{

p unsafe.Pointer// *interface{}

}

p 指針指向 *interface{} 類型铭乾,里面存儲(chǔ)的是 entry 的地址剪廉。如果 p = = nil,代表 entry 被刪除了炕檩,并且 m.dirty = = nil斗蒋。如果 p = = expunged,代表 entry 被刪除了笛质,并且 m.dirty 吹泡!= nil ,那么 entry 從 m.dirty 中丟失了经瓷。

除去以上兩種情況外,entry 都是有效的洞难,并且被記錄在 m.read.m[key] 中舆吮,如果 m.dirty!= nil,entry 被存儲(chǔ)在 m.dirty[key] 中。

一個(gè) entry 可以通過原子替換操作成 nil 來刪除它色冀。當(dāng) m.dirty 在下一次被創(chuàng)建潭袱,entry 會(huì)被 expunged 指針原子性的替換為 nil,m.dirty[key] 不對(duì)應(yīng)任何 value锋恬。只要 p != expunged屯换,那么一個(gè) entry 就可以通過原子替換操作更新關(guān)聯(lián)的 value。如果 p = = expunged与学,那么一個(gè) entry 想要通過原子替換操作更新關(guān)聯(lián)的 value彤悔,只能在首次設(shè)置 m.dirty[key] = e 以后才能更新 value。這樣做是為了能在 dirty map 中查找到它索守。

所以從上面分析可以看出晕窑,read 中的 key 是 readOnly 的(key 的集合不會(huì)變,刪除也只是打一個(gè)標(biāo)記)卵佛,value 的操作全都可以原子完成杨赤,所以這個(gè)結(jié)構(gòu)不用加鎖。dirty 是一個(gè) read 的拷貝截汪,加鎖的操作都在這里疾牲,如增加元素、刪除元素等(dirty上執(zhí)行刪除操作就是真的刪除)具體操作見下面分析衙解。

總結(jié)一下阳柔,sync.map 的數(shù)據(jù)結(jié)構(gòu)如上。

再看看線程安全的 sync.map 的一些操作丢郊。

Go

func (m *Map) Load(key interface{}) (value interface{}, ok bool){

read,_:=m.read.Load().(readOnly)

e,ok:=read.m[key]

// 如果 key 對(duì)應(yīng)的 value 不存在盔沫,并且 dirty map 包含 read map 中沒有的 key,那么開始讀取? dirty map

if!ok&&read.amended{

// dirty map 不是線程安全的枫匾,所以需要加上互斥鎖

m.mu.Lock()

// 當(dāng) m.dirty 被提升的時(shí)候架诞,為了防止得到一個(gè)虛假的 miss ,所以此時(shí)我們加鎖干茉。

// 如果再次讀取相同的 key 不 miss谴忧,那么這個(gè) key 值就就不值得拷貝到 dirty map 中。

read,_=m.read.Load().(readOnly)

e,ok=read.m[key]

if!ok&&read.amended{

e,ok=m.dirty[key]

// 無論 entry 是否存在角虫,記錄這次 miss 炒瘟。

// 這個(gè) key 將會(huì)緩慢的被取出,直到 dirty map 提升到 read map

m.missLocked()

}

m.mu.Unlock()

}

if!ok{

returnnil,false

}

returne.load()

}

上述代碼是 Load 操作挑宠。返回的是入?yún)?key 對(duì)應(yīng)的 value 值团滥。如果 value 不存在就返回 nil。dirty map 中會(huì)保存一些 read map 里面不存在的 key枫虏,那么就要讀取出 dirty map 里面 key 對(duì)應(yīng)的 value妇穴。注意讀取的時(shí)候需要加互斥鎖爬虱,因?yàn)?dirty map 是非線程安全的。

Go

func (m *Map) missLocked(){

m.misses++

ifm.misses

return

}

m.read.Store(readOnly{m:m.dirty})

m.dirty=nil

m.misses=0

}

上面這段代碼是記錄 misses 次數(shù)的腾它。只有當(dāng) misses 個(gè)數(shù)大于 dirty map 的長(zhǎng)度的時(shí)候跑筝,會(huì)把 dirty map 存儲(chǔ)到 read map 中。并且把 dirty 置空瞒滴,misses 次數(shù)也清零曲梗。

在看 Store 操作之前,先說一個(gè) expunged 變量妓忍。

Go

// expunged 是一個(gè)指向任意類型的指針虏两,用來標(biāo)記從 dirty map 中刪除的 entry

varexpunged=unsafe.Pointer(new(interface{}))

expunged 變量是一個(gè)指針,用來標(biāo)記從 dirty map 中刪除的 entry单默。

Go

func (m *Map) Store(key, value interface{}){

read,_:=m.read.Load().(readOnly)

// 從 read map 中讀取 key 失敗或者取出的 entry 嘗試存儲(chǔ) value 失敗碘举,直接返回

ife,ok:=read.m[key];ok&&e.tryStore(&value){

return

}

m.mu.Lock()

read,_=m.read.Load().(readOnly)

ife,ok:=read.m[key];ok{

// e 指向的是非 nil 的

ife.unexpungeLocked(){

// entry 先前被刪除了,這就意味著存在一個(gè)非空的 dirty map 里面并沒有存儲(chǔ)這個(gè) entry

m.dirty[key]=e

}

// 使用 storeLocked 函數(shù)之前搁廓,必須保證 e 沒有被清除

e.storeLocked(&value)

}elseife,ok:=m.dirty[key];ok{

// 已經(jīng)存儲(chǔ)在 dirty map 中了引颈,代表 e 沒有被清除

e.storeLocked(&value)

}else{

if!read.amended{

// 到這個(gè) else 中就意味著,當(dāng)前的 key 是第一次被加到 dirty map 中境蜕。

// store 之前先判斷一下 dirty map 是否為空蝙场,如果為空,就把 read map 淺拷貝一次粱年。

m.dirtyLocked()

m.read.Store(readOnly{m:read.m,amended:true})

}

// 在 dirty 中存儲(chǔ) value

m.dirty[key]=newEntry(value)

}

m.mu.Unlock()

}

Store 優(yōu)先從 read map 里面去讀取 key 售滤,然后存儲(chǔ)它的 value。如果 entry 是被標(biāo)記為從 dirty map 中刪除過的台诗,那么還需要重新存儲(chǔ)回 dirty map中完箩。

如果 read map 里面沒有相應(yīng)的 key,就去 dirty map 里面去讀取拉队。dirty map 就直接存儲(chǔ)對(duì)應(yīng)的 value弊知。

最后如何 read map 和 dirty map 都沒有這個(gè) key 值,這就意味著該 key 是第一次被加入到 dirty map 中粱快。在 dirty map 中存儲(chǔ)這個(gè) key 以及對(duì)應(yīng)的 value秩彤。

Go

// 當(dāng) entry 沒有被刪除的情況下去存儲(chǔ)一個(gè) value。

// 如果 entry 被刪除了事哭,tryStore 方法返回 false漫雷,并且保留 entry 不變

func (e *entry) tryStore(i *interface{}) bool{

p:=atomic.LoadPointer(&e.p)

ifp==expunged{

returnfalse

}

for{

ifatomic.CompareAndSwapPointer(&e.p,p,unsafe.Pointer(i)){

returntrue

}

p=atomic.LoadPointer(&e.p)

ifp==expunged{

returnfalse

}

}

}

tryStore 函數(shù)的實(shí)現(xiàn)和 CAS 原理差不多,它會(huì)反復(fù)的循環(huán)判斷 entry 是否被標(biāo)記成了 expunged鳍咱,如果 entry 經(jīng)過 CAS 操作成功的替換成了 i降盹,那么就返回 true,反之如果被標(biāo)記成了 expunged谤辜,就返回 false澎现。

Go

// unexpungeLocked 函數(shù)確保了 entry 沒有被標(biāo)記成已被清除仅胞。

// 如果 entry 先前被清除過了,那么在 mutex 解鎖之前剑辫,它一定要被加入到 dirty map 中

func (e *entry) unexpungeLocked() (wasExpunged bool){

returnatomic.CompareAndSwapPointer(&e.p,expunged,nil)

}

如果 entry 的 unexpungeLocked 返回為 true,那么就說明 entry 已經(jīng)被標(biāo)記成了 expunged渠欺,那么它就會(huì)經(jīng)過 CAS 操作把它置為 nil妹蔽。

再來看看刪除操作的實(shí)現(xiàn)。

Go

func (m *Map) Delete(key interface{}){

read,_:=m.read.Load().(readOnly)

e,ok:=read.m[key]

if!ok&&read.amended{

// 由于 dirty map 是非線程安全的挠将,所以操作前要加鎖

m.mu.Lock()

read,_=m.read.Load().(readOnly)

e,ok=read.m[key]

if!ok&&read.amended{

// 刪除 dirty map 中的 key

delete(m.dirty,key)

}

m.mu.Unlock()

}

ifok{

e.delete()

}

}

delete 操作的實(shí)現(xiàn)比較簡(jiǎn)單胳岂,如果 read map 中存在 key,就可以直接刪除舔稀,如果不存在 key 并且 dirty map 中有這個(gè) key乳丰,那么就要?jiǎng)h除 dirty map 中的這個(gè) key。操作 dirty map 的時(shí)候記得先加上鎖進(jìn)行保護(hù)内贮。

Go

func (e *entry) delete() (hadValue bool){

for{

p:=atomic.LoadPointer(&e.p)

ifp==nil||p==expunged{

returnfalse

}

ifatomic.CompareAndSwapPointer(&e.p,p,nil){

returntrue

}

}

}

刪除 entry 具體的實(shí)現(xiàn)如上产园。這個(gè)操作里面都是原子性操作。循環(huán)判斷 entry 是否為 nil 或者已經(jīng)被標(biāo)記成了 expunged夜郁,如果是這種情況就返回 false什燕,代表刪除失敗。否則就 CAS 操作竞端,將 entry 的 p 指針置為 nil屎即,并返回 true,代表刪除成功事富。

至此技俐,關(guān)于 Go 1.9 中自帶的線程安全的 sync.map 的實(shí)現(xiàn)就分析完了。官方的實(shí)現(xiàn)里面基本沒有用到鎖统台,互斥量的 lock 也是基于 CAS的雕擂。read map 也是原子性的。所以比之前加鎖的實(shí)現(xiàn)版本性能有所提升饺谬。

如果 read 中的一個(gè) key 被刪除了捂刺,即被打了 expunged 標(biāo)記,然后如果再添加相同的 key募寨,不需要操作 dirty族展,直接恢復(fù)回來就可以了。

如果 dirty 沒有任何數(shù)據(jù)的時(shí)候拔鹰,刪除了 read 中的 key仪缸,同樣也是加上 expunged 標(biāo)記,再添加另外一個(gè)不同的 key列肢,dirtyLocked() 函數(shù)會(huì)把 read 中所有已經(jīng)有的 key-value 全部都拷貝到 dirty 中保存恰画,并再追加上 read 中最后添加上的和之前刪掉不同的 key宾茂。

這就等同于:

當(dāng) dirty 不存在的時(shí)候,read 就是全部 map 的數(shù)據(jù)拴还。

當(dāng) dirty 存在的時(shí)候跨晴,dirty 才是正確的 map 數(shù)據(jù)。

從 sync.map 的實(shí)現(xiàn)中我們可以看出片林,它的思想重點(diǎn)在讀取是無鎖的端盆。字典大部分操作也都是讀取,多個(gè)讀者之間是可以無鎖的费封。當(dāng) read 中的數(shù)據(jù) miss 到一定程度的時(shí)候焕妙,(有點(diǎn)命中緩存的意思),就會(huì)考慮直接把 dirty 替換成 read弓摘,整個(gè)一個(gè)賦值替換就可以了焚鹊。但是在 dirty 重新生成新的時(shí)候,或者說第一個(gè) dirty 生成的時(shí)候韧献,還是需要遍歷整個(gè) read末患,這里會(huì)消耗很多性能,那官方的這個(gè) sync.map 的性能究竟是怎么樣的呢势决?

究竟 Lock - Free 的性能有多強(qiáng)呢阻塑?接下來做一下性能測(cè)試。

五. 性能對(duì)比

性能測(cè)試主要針對(duì)3個(gè)方面果复,Insert陈莽,Get,Delete虽抄。測(cè)試對(duì)象主要針對(duì)簡(jiǎn)單加互斥鎖的原生 Map 走搁,分段加鎖的 Map,Lock - Free 的 Map 這三種進(jìn)行性能測(cè)試迈窟。

性能測(cè)試的所有代碼已經(jīng)放在 github 了私植,地址在這里,性能測(cè)試用的指令是:

Go

gotest-v-run=^$-bench.-benchmem

1. 插入 Insert 性能測(cè)試

Go

// 插入不存在的 key (粗糙的鎖)

func BenchmarkSingleInsertAbsentBuiltInMap(b *testing.B){

myMap=&MyMap{

m:make(map[string]interface{},32),

}

b.ResetTimer()

fori:=0;i

myMap.BuiltinMapStore(strconv.Itoa(i),"value")

}

}

// 插入不存在的 key (分段鎖)

func BenchmarkSingleInsertAbsent(b *testing.B){

m:=New()

b.ResetTimer()

fori:=0;i

m.Set(strconv.Itoa(i),"value")

}

}

// 插入不存在的 key (syncMap)

func BenchmarkSingleInsertAbsentSyncMap(b *testing.B){

syncMap:=&sync.Map{}

b.ResetTimer()

fori:=0;i

syncMap.Store(strconv.Itoa(i),"value")

}

}

測(cè)試結(jié)果:

Go

BenchmarkSingleInsertAbsentBuiltInMap-42000000857ns/op170B/op1allocs/op

BenchmarkSingleInsertAbsent-42000000651ns/op170B/op1allocs/op

BenchmarkSingleInsertAbsentSyncMap-410000001094ns/op187B/op5allocs/op

實(shí)驗(yàn)結(jié)果是分段鎖的性能最高车酣。這里說明一下測(cè)試結(jié)果曲稼,-4代表測(cè)試用了4核 CPU ,2000000 代表循環(huán)次數(shù)湖员,857 ns/op 代表的是平均每次執(zhí)行花費(fèi)的時(shí)間贫悄,170 B/op 代表的是每次執(zhí)行堆上分配內(nèi)存總數(shù),allocs/op 代表的是每次執(zhí)行堆上分配內(nèi)存次數(shù)娘摔。

這樣看來窄坦,循環(huán)次數(shù)越多,花費(fèi)時(shí)間越少,分配內(nèi)存總數(shù)越小鸭津,分配內(nèi)存次數(shù)越少彤侍,性能就越好。下面的性能圖表中去除掉了第一列循環(huán)次數(shù)逆趋,只花了剩下的3項(xiàng)盏阶,所以條形圖越短的性能越好。以下的每張條形圖的規(guī)則和測(cè)試結(jié)果代表的意義都和這里一樣闻书,下面就不再贅述了般哼。

Go

// 插入存在 key (粗糙鎖)

func BenchmarkSingleInsertPresentBuiltInMap(b *testing.B){

myMap=&MyMap{

m:make(map[string]interface{},32),

}

myMap.BuiltinMapStore("key","value")

b.ResetTimer()

fori:=0;i

myMap.BuiltinMapStore("key","value")

}

}

// 插入存在 key (分段鎖)

func BenchmarkSingleInsertPresent(b *testing.B){

m:=New()

m.Set("key","value")

b.ResetTimer()

fori:=0;i

m.Set("key","value")

}

}

// 插入存在 key (syncMap)

func BenchmarkSingleInsertPresentSyncMap(b *testing.B){

syncMap:=&sync.Map{}

syncMap.Store("key","value")

b.ResetTimer()

fori:=0;i

syncMap.Store("key","value")

}

}

測(cè)試結(jié)果:

Go

BenchmarkSingleInsertPresentBuiltInMap-42000000074.6ns/op0B/op0allocs/op

BenchmarkSingleInsertPresent-42000000061.1ns/op0B/op0allocs/op

BenchmarkSingleInsertPresentSyncMap-420000000108ns/op16B/op1allocs/op

從圖中可以看出,sync.map 在涉及到 Store 這一項(xiàng)的均比其他兩者的性能差惠窄。不管插入不存在的 Key 還是存在的 Key,分段鎖的性能均是目前最好的漾橙。

2. 讀取 Get 性能測(cè)試

Go

// 讀取存在 key (粗糙鎖)

func BenchmarkSingleGetPresentBuiltInMap(b *testing.B){

myMap=&MyMap{

m:make(map[string]interface{},32),

}

myMap.BuiltinMapStore("key","value")

b.ResetTimer()

fori:=0;i

myMap.BuiltinMapLookup("key")

}

}

// 讀取存在 key (分段鎖)

func BenchmarkSingleGetPresent(b *testing.B){

m:=New()

m.Set("key","value")

b.ResetTimer()

fori:=0;i

m.Get("key")

}

}

// 讀取存在 key (syncMap)

func BenchmarkSingleGetPresentSyncMap(b *testing.B){

syncMap:=&sync.Map{}

syncMap.Store("key","value")

b.ResetTimer()

fori:=0;i

syncMap.Load("key")

}

}

測(cè)試結(jié)果:

Go

BenchmarkSingleGetPresentBuiltInMap-42000000071.5ns/op0B/op0allocs/op

BenchmarkSingleGetPresent-43000000042.3ns/op0B/op0allocs/op

BenchmarkSingleGetPresentSyncMap-43000000040.3ns/op0B/op0allocs/op

從圖中可以看出杆融,sync.map 在 Load 這一項(xiàng)的性能非常優(yōu)秀,遠(yuǎn)高于其他兩者霜运。

3. 并發(fā)插入讀取混合性能測(cè)試

接下來的實(shí)現(xiàn)就涉及到了并發(fā)插入和讀取了脾歇。由于分段鎖實(shí)現(xiàn)的特殊性,分段個(gè)數(shù)會(huì)多多少少影響到性能淘捡,那么接下來的實(shí)驗(yàn)就會(huì)對(duì)分段鎖分1藕各,16,32焦除,256 這4段進(jìn)行測(cè)試激况,分別看看性能變化如何,其他兩種線程安全的 Map 不變膘魄。

由于并發(fā)的代碼太多了乌逐,這里就不貼出來了,感興趣的同學(xué)可以看這里

下面就直接放出測(cè)試結(jié)果:

并發(fā)插入不存在的 Key 值

Go

BenchmarkMultiInsertDifferentBuiltInMap-410000002359ns/op330B/op11allocs/op

BenchmarkMultiInsertDifferent_1_Shard-410000002039ns/op330B/op11allocs/op

BenchmarkMultiInsertDifferent_16_Shard-410000001937ns/op330B/op11allocs/op

BenchmarkMultiInsertDifferent_32_Shard-410000001944ns/op330B/op11allocs/op

BenchmarkMultiInsertDifferent_256_Shard-410000001991ns/op331B/op11allocs/op

BenchmarkMultiInsertDifferentSyncMap-410000003760ns/op635B/op33allocs/op

從圖中可以看出创葡,sync.map 在涉及到 Store 這一項(xiàng)的均比其他兩者的性能差浙踢。并發(fā)插入不存在的 Key,分段鎖劃分的 Segment 多少與性能沒有關(guān)系灿渴。

并發(fā)插入存在的 Key 值

Go

BenchmarkMultiInsertSameBuiltInMap-410000001182ns/op160B/op10allocs/op

BenchmarkMultiInsertSame-410000001091ns/op160B/op10allocs/op

BenchmarkMultiInsertSameSyncMap-410000001809ns/op480B/op30allocs/op

從圖中可以看出洛波,sync.map 在涉及到 Store 這一項(xiàng)的均比其他兩者的性能差。

并發(fā)的讀取存在的 Key 值

Go

BenchmarkMultiGetSameBuiltInMap-42000000767ns/op0B/op0allocs/op

BenchmarkMultiGetSame-43000000481ns/op0B/op0allocs/op

BenchmarkMultiGetSameSyncMap-43000000464ns/op0B/op0allocs/op

從圖中可以看出骚露,sync.map 在 Load 這一項(xiàng)的性能遠(yuǎn)超多其他兩者蹬挤。

并發(fā)插入讀取不存在的 Key 值

Go

BenchmarkMultiGetSetDifferentBuiltInMap-410000003281ns/op337B/op12allocs/op

BenchmarkMultiGetSetDifferent_1_Shard-410000003007ns/op338B/op12allocs/op

BenchmarkMultiGetSetDifferent_16_Shard-45000002662ns/op337B/op12allocs/op

BenchmarkMultiGetSetDifferent_32_Shard-410000002732ns/op337B/op12allocs/op

BenchmarkMultiGetSetDifferent_256_Shard-410000002788ns/op339B/op12allocs/op

BenchmarkMultiGetSetDifferentSyncMap-43000008990ns/op1104B/op34allocs/op

從圖中可以看出,sync.map 在涉及到 Store 這一項(xiàng)的均比其他兩者的性能差荸百。并發(fā)插入讀取不存在的 Key闻伶,分段鎖劃分的 Segment 多少與性能沒有關(guān)系。

并發(fā)插入讀取存在的 Key 值

Go

BenchmarkMultiGetSetBlockBuiltInMap-410000002095ns/op160B/op10allocs/op

BenchmarkMultiGetSetBlock_1_Shard-410000001712ns/op160B/op10allocs/op

BenchmarkMultiGetSetBlock_16_Shard-410000001730ns/op160B/op10allocs/op

BenchmarkMultiGetSetBlock_32_Shard-410000001645ns/op160B/op10allocs/op

BenchmarkMultiGetSetBlock_256_Shard-410000001619ns/op160B/op10allocs/op

BenchmarkMultiGetSetBlockSyncMap-45000002660ns/op480B/op30allocs/op

從圖中可以看出够话,sync.map 在涉及到 Store 這一項(xiàng)的均比其他兩者的性能差蓝翰。并發(fā)插入讀取存在的 Key光绕,分段鎖劃分的 Segment 越小,性能越好畜份!

4. 刪除 Delete 性能測(cè)試

Go

// 刪除存在 key (粗糙鎖)

func BenchmarkDeleteBuiltInMap(b *testing.B){

myMap=&MyMap{

m:make(map[string]interface{},32),

}

b.RunParallel(func(pb *testing.PB){

r:=rand.New(rand.NewSource(time.Now().Unix()))

forpb.Next(){

// The loop body is executed b.N times total across all goroutines.

k:=r.Intn(100000000)

myMap.BuiltinMapDelete(strconv.Itoa(k))

}

})

}

// 刪除存在 key (分段鎖)

func BenchmarkDelete(b *testing.B){

m:=New()

b.RunParallel(func(pb *testing.PB){

r:=rand.New(rand.NewSource(time.Now().Unix()))

forpb.Next(){

// The loop body is executed b.N times total across all goroutines.

k:=r.Intn(100000000)

m.Remove(strconv.Itoa(k))

}

})

}

// 刪除存在 key (syncMap)

func BenchmarkDeleteSyncMap(b *testing.B){

syncMap:=&sync.Map{}

b.RunParallel(func(pb *testing.PB){

r:=rand.New(rand.NewSource(time.Now().Unix()))

forpb.Next(){

// The loop body is executed b.N times total across all goroutines.

k:=r.Intn(100000000)

syncMap.Delete(strconv.Itoa(k))

}

})

}

測(cè)試結(jié)果:

Go

BenchmarkDeleteBuiltInMap-410000000130ns/op8B/op1allocs/op

BenchmarkDelete-42000000076.7ns/op8B/op1allocs/op

BenchmarkDeleteSyncMap-43000000045.4ns/op8B/op0allocs/op

從圖中可以看出诞帐,sync.map 在 Delete 這一項(xiàng)是完美的超過其他兩者的。

六. 總結(jié)

本文從線程安全理論基礎(chǔ)開始講了線程安全中一些處理方法爆雹。其中涉及到互斥量和條件變量相關(guān)知識(shí)停蕉。從 Lock 的方案談到了 Lock - Free 的 CAS 相關(guān)方案。最后針對(duì) Go 1.9 新加的 sync.map 進(jìn)行了源碼分析和性能測(cè)試钙态。

采用了 Lock - Free 方案的 sync.map 測(cè)試結(jié)果并沒有想象中的那么出色慧起。除了 Load 和 Delete 這兩項(xiàng)遠(yuǎn)遠(yuǎn)甩開其他兩者,凡是涉及到 Store 相關(guān)操作的性能均低于其他兩者 Map 的實(shí)現(xiàn)册倒。不過這也是有原因的蚓挤。

縱觀 Java ConcurrentHashmap 一路的變化:

JDK 6,7 中的 ConcurrentHashmap 主要使用 Segment 來實(shí)現(xiàn)減小鎖粒度,把 HashMap 分割成若干個(gè) Segment驻子,在 put 的時(shí)候需要鎖住 Segment灿意,get 時(shí)候不加鎖,使用 volatile 來保證可見性崇呵,當(dāng)要統(tǒng)計(jì)全局時(shí)(比如size)缤剧,首先會(huì)嘗試多次計(jì)算 modcount 來確定,這幾次嘗試中域慷,是否有其他線程進(jìn)行了修改操作荒辕,如果沒有,則直接返回 size芒粹。如果有兄纺,則需要依次鎖住所有的 Segment 來計(jì)算。

JDK 7 中 ConcurrentHashmap 中化漆,當(dāng)長(zhǎng)度過長(zhǎng)碰撞會(huì)很頻繁估脆,鏈表的增改刪查操作都會(huì)消耗很長(zhǎng)的時(shí)間,影響性能,所以 JDK8 中完全重寫了concurrentHashmap座云,代碼量從原來的1000多行變成了 6000多行疙赠,實(shí)現(xiàn)上也和原來的分段式存儲(chǔ)有很大的區(qū)別。

JDK 8 的 ConcurrentHashmap 主要設(shè)計(jì)上的變化有以下幾點(diǎn):

不采用 Segment 而采用 node朦拖,鎖住 node 來實(shí)現(xiàn)減小鎖粒度圃阳。

設(shè)計(jì)了 MOVED 狀態(tài) 當(dāng) Resize 的中過程中線程2還在 put 數(shù)據(jù),線程2會(huì)幫助 resize璧帝。

使用3個(gè) CAS 操作來確保 node 的一些操作的原子性层坠,這種方式代替了鎖橘洞。

sizeCtl 的不同值來代表不同含義雅镊,起到了控制的作用困檩。

可見 Go 1.9 一上來第一個(gè)版本就直接摒棄了 Segment 的做法舰绘,采取了 CAS 這種 Lock - Free 的方案提高性能。但是它并沒有對(duì)整個(gè)字典進(jìn)行類似 Java 的 Node 的設(shè)計(jì)。但是整個(gè) sync.map 在 ns/op ,B/op变勇,allocs/op 這三個(gè)性能指標(biāo)上是普通原生非線程安全 Map 的三倍!

不過相信 Google 應(yīng)該還會(huì)繼續(xù)優(yōu)化這部分吧贴唇,畢竟源碼里面還有幾處 TODO 呢搀绣,讓我們一起其他 Go 未來版本的發(fā)展吧,筆者也會(huì)一直持續(xù)關(guān)注的戳气。

(在本篇文章截稿的時(shí)候链患,筆者又突然發(fā)現(xiàn)了一種分段鎖的 Map 實(shí)現(xiàn),性能更高瓶您,它具有負(fù)載均衡等特點(diǎn)锣险,應(yīng)該是目前筆者見到的性能最好的 Go 語言實(shí)現(xiàn)的線程安全的 Map ,關(guān)于它的實(shí)現(xiàn)源碼分析就只能放在下篇博文單獨(dú)寫一篇或者以后有空再分析啦)

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末览闰,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子巷折,更是在濱河造成了極大的恐慌压鉴,老刑警劉巖,帶你破解...
    沈念sama閱讀 218,682評(píng)論 6 507
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件锻拘,死亡現(xiàn)場(chǎng)離奇詭異油吭,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)署拟,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,277評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門婉宰,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人推穷,你說我怎么就攤上這事心包。” “怎么了馒铃?”我有些...
    開封第一講書人閱讀 165,083評(píng)論 0 355
  • 文/不壞的土叔 我叫張陵蟹腾,是天一觀的道長(zhǎng)。 經(jīng)常有香客問我区宇,道長(zhǎng)娃殖,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,763評(píng)論 1 295
  • 正文 為了忘掉前任议谷,我火速辦了婚禮炉爆,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘。我一直安慰自己芬首,他們只是感情好赴捞,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,785評(píng)論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著衩辟,像睡著了一般螟炫。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上艺晴,一...
    開封第一講書人閱讀 51,624評(píng)論 1 305
  • 那天昼钻,我揣著相機(jī)與錄音,去河邊找鬼封寞。 笑死然评,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的狈究。 我是一名探鬼主播碗淌,決...
    沈念sama閱讀 40,358評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼抖锥!你這毒婦竟也來了亿眠?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,261評(píng)論 0 276
  • 序言:老撾萬榮一對(duì)情侶失蹤磅废,失蹤者是張志新(化名)和其女友劉穎纳像,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體拯勉,經(jīng)...
    沈念sama閱讀 45,722評(píng)論 1 315
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡竟趾,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,900評(píng)論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了宫峦。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片岔帽。...
    茶點(diǎn)故事閱讀 40,030評(píng)論 1 350
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖导绷,靈堂內(nèi)的尸體忽然破棺而出犀勒,到底是詐尸還是另有隱情,我是刑警寧澤妥曲,帶...
    沈念sama閱讀 35,737評(píng)論 5 346
  • 正文 年R本政府宣布账蓉,位于F島的核電站,受9級(jí)特大地震影響逾一,放射性物質(zhì)發(fā)生泄漏铸本。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,360評(píng)論 3 330
  • 文/蒙蒙 一遵堵、第九天 我趴在偏房一處隱蔽的房頂上張望箱玷。 院中可真熱鬧怨规,春花似錦、人聲如沸锡足。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,941評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽舶得。三九已至掰烟,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間沐批,已是汗流浹背纫骑。 一陣腳步聲響...
    開封第一講書人閱讀 33,057評(píng)論 1 270
  • 我被黑心中介騙來泰國(guó)打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留九孩,地道東北人先馆。 一個(gè)月前我還...
    沈念sama閱讀 48,237評(píng)論 3 371
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像躺彬,于是被迫代替她去往敵國(guó)和親煤墙。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,976評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容