如何設(shè)計并實現(xiàn)一個線程安全的 Map 澜掩?(上篇)

Map 是一種很常見的數(shù)據(jù)結(jié)構(gòu),用于存儲一些無序的鍵值對杖挣。在主流的編程語言中肩榕,默認(rèn)就自帶它的實現(xiàn)。C程梦、C++ 中的 STL 就實現(xiàn)了 Map点把,JavaScript 中也有 Map橘荠,Java 中有 HashMap,Swift 和 Python 中有 Dictionary郎逃,Go 中有 Map哥童,Objective-C 中有 NSDictionary、NSMutableDictionary褒翰。

上面這些 Map 都是線程安全的么贮懈?答案是否定的,并非全是線程安全的优训。那如何能實現(xiàn)一個線程安全的 Map 呢朵你?想回答這個問題,需要先從如何實現(xiàn)一個 Map 說起揣非。

一. 選用什么數(shù)據(jù)結(jié)構(gòu)實現(xiàn) Map 抡医?

Map 是一個非常常用的數(shù)據(jù)結(jié)構(gòu),一個無序的 key/value 對的集合早敬,其中 Map 所有的 key 都是不同的忌傻,然后通過給定的 key 可以在常數(shù)時間 O(1) 復(fù)雜度內(nèi)查找、更新或刪除對應(yīng)的 value搞监。

要想實現(xiàn)常數(shù)級的查找水孩,應(yīng)該用什么來實現(xiàn)呢?讀者應(yīng)該很快會想到哈希表琐驴。確實俘种,Map 底層一般都是使用數(shù)組來實現(xiàn),會借用哈希算法輔助绝淡。對于給定的 key宙刘,一般先進(jìn)行 hash 操作,然后相對哈希表的長度取模够委,將 key 映射到指定的地方荐类。

哈希算法有很多種怖现,選哪一種更加高效呢茁帽?

1. 哈希函數(shù)

MD5 和 SHA1 可以說是目前應(yīng)用最廣泛的 Hash 算法,而它們都是以 MD4 為基礎(chǔ)設(shè)計的屈嗤。

MD4(RFC 1320) 是 MIT 的Ronald L. Rivest 在 1990 年設(shè)計的潘拨,MD 是 Message Digest(消息摘要) 的縮寫。它適用在32位字長的處理器上用高速軟件實現(xiàn)——它是基于 32位操作數(shù)的位操作來實現(xiàn)的饶号。
MD5(RFC 1321) 是 Rivest 于1991年對 MD4 的改進(jìn)版本铁追。它對輸入仍以512位分組,其輸出是4個32位字的級聯(lián)茫船,與 MD4 相同琅束。MD5 比 MD4 來得復(fù)雜扭屁,并且速度較之要慢一點,但更安全涩禀,在抗分析和抗差分方面表現(xiàn)更好料滥。

SHA1 是由 NIST NSA 設(shè)計為同 DSA 一起使用的,它對長度小于264的輸入艾船,產(chǎn)生長度為160bit 的散列值葵腹,因此抗窮舉 (brute-force)
性更好。SHA-1 設(shè)計時基于和 MD4 相同原理,并且模仿了該算法屿岂。

常用的 hash 函數(shù)有 SHA-1践宴,SHA-256,SHA-512爷怀,MD5 阻肩。這些都是經(jīng)典的 hash 算法。在現(xiàn)代化生產(chǎn)中运授,還會用到現(xiàn)代的 hash 算法磺浙。下面列舉幾個,進(jìn)行性能對比徒坡,最后再選其中一個源碼分析一下實現(xiàn)過程撕氧。

(1) Jenkins Hash 和 SpookyHash

1997年 Bob Jenkins 在《 Dr. Dobbs Journal》雜志上發(fā)表了一片關(guān)于散列函數(shù)的文章《A hash function for hash Table lookup》。這篇文章中喇完,Bob 廣泛收錄了很多已有的散列函數(shù)伦泥,這其中也包括了他自己所謂的“l(fā)ookup2”。隨后在2006年锦溪,Bob 發(fā)布了 lookup3不脯。lookup3 即為 Jenkins Hash。更多有關(guān) Bob’s 散列函數(shù)的內(nèi)容請參閱維基百科:Jenkins hash function刻诊。memcached的 hash 算法防楷,支持兩種算法:jenkins, murmur3,默認(rèn)是 jenkins则涯。

2011年 Bob Jenkins 發(fā)布了他自己的一個新散列函數(shù)
SpookyHash(這樣命名是因為它是在萬圣節(jié)發(fā)布的)复局。它們都擁有2倍于 MurmurHash 的速度,但他們都只使用了64位數(shù)學(xué)函數(shù)而沒有32位版本粟判,SpookyHash 給出128位輸出亿昏。

(2) MurmurHash

MurmurHash 是一種非加密哈希函數(shù),適用于一般的哈希檢索操作档礁。
Austin Appleby 在2008年發(fā)布了一個新的散列函數(shù)——MurmurHash角钩。其最新版本大約是 lookup3 速度的2倍(大約為1 byte/cycle),它有32位和64位兩個版本。32位版本只使用32位數(shù)學(xué)函數(shù)并給出一個32位的哈希值递礼,而64位版本使用了64位的數(shù)學(xué)函數(shù)惨险,并給出64位哈希值。根據(jù)Austin的分析脊髓,MurmurHash具有優(yōu)異的性能平道,雖然 Bob Jenkins 在《Dr. Dobbs article》雜志上聲稱“我預(yù)測 MurmurHash 比起lookup3要弱,但是我不知道具體值供炼,因為我還沒測試過它”一屋。MurmurHash能夠迅速走紅得益于其出色的速度和統(tǒng)計特性。當(dāng)前的版本是MurmurHash3袋哼,Redis冀墨、Memcached、Cassandra涛贯、HBase诽嘉、Lucene都在使用它。

下面是用 C 實現(xiàn) MurmurHash 的版本:


uint32_t murmur3_32(const char *key, uint32_t len, uint32_t seed) {
        static const uint32_t c1 = 0xcc9e2d51;
        static const uint32_t c2 = 0x1b873593;
        static const uint32_t r1 = 15;
        static const uint32_t r2 = 13;
        static const uint32_t m = 5;
        static const uint32_t n = 0xe6546b64;

        uint32_t hash = seed;

        const int nblocks = len / 4;
        const uint32_t *blocks = (const uint32_t *) key;
        int i;
        for (i = 0; i < nblocks; i++) {
                uint32_t k = blocks[i];
                k *= c1;
                k = (k << r1) | (k >> (32 - r1));
                k *= c2;

                hash ^= k;
                hash = ((hash << r2) | (hash >> (32 - r2))) * m + n;
        }

        const uint8_t *tail = (const uint8_t *) (key + nblocks * 4);
        uint32_t k1 = 0;

        switch (len & 3) {
        case 3:
                k1 ^= tail[2] << 16;
        case 2:
                k1 ^= tail[1] << 8;
        case 1:
                k1 ^= tail[0];

                k1 *= c1;
                k1 = (k1 << r1) | (k1 >> (32 - r1));
                k1 *= c2;
                hash ^= k1;
        }

        hash ^= len;
        hash ^= (hash >> 16);
        hash *= 0x85ebca6b;
        hash ^= (hash >> 13);
        hash *= 0xc2b2ae35;
        hash ^= (hash >> 16);

        return hash;
}


(3) CityHash 和 FramHash

這兩種算法都是 Google 發(fā)布的字符串算法弟翘。

CityHash 是2011年 Google 發(fā)布的字符串散列算法虫腋,和 murmurhash 一樣,屬于非加密型 hash 算法稀余。CityHash 算法的開發(fā)是受到了 MurmurHash 的啟發(fā)悦冀。其主要優(yōu)點是大部分步驟包含了至少兩步獨立的數(shù)學(xué)運算。現(xiàn)代 CPU 通常能從這種代碼獲得最佳性能睛琳。CityHash 也有其缺點:代碼較同類流行算法復(fù)雜盒蟆。Google 希望為速度而不是為了簡單而優(yōu)化,因此沒有照顧較短輸入的特例师骗。Google發(fā)布的有兩種算法:cityhash64 與 cityhash128历等。它們分別根據(jù)字串計算 64 和 128 位的散列值。這些算法不適用于加密辟癌,但適合用在散列表等處寒屯。CityHash 的速度取決于CRC32 指令,目前為SSE 4.2(Intel Nehalem及以后版本)黍少。

相比 Murmurhash 支持32寡夹、64蛋褥、128bit兄一, Cityhash 支持64、128、256bit 农渊。

2014年 Google 又發(fā)布了 FarmHash,一個新的用于字符串的哈希函數(shù)系列。FarmHash 從 CityHash 繼承了許多技巧和技術(shù)砸紊,是它的后繼传于。FarmHash 有多個目標(biāo),聲稱從多個方面改進(jìn)了 CityHash醉顽。與 CityHash 相比沼溜,F(xiàn)armHash 的另一項改進(jìn)是在多個特定于平臺的實現(xiàn)之上提供了一個接口。這樣游添,當(dāng)開發(fā)人員只是想要一個用于哈希表的系草、快速健壯的哈希函數(shù),而不需要在每個平臺上都一樣時唆涝,F(xiàn)armHash 也能滿足要求找都。目前,F(xiàn)armHash 只包含在32廊酣、64和128位平臺上用于字節(jié)數(shù)組的哈希函數(shù)能耻。未來開發(fā)計劃包含了對整數(shù)、元組和其它數(shù)據(jù)的支持亡驰。

(4) xxHash

xxHash 是由 Yann Collet 創(chuàng)建的非加密哈希函數(shù)晓猛。它最初用于 LZ4 壓縮算法,作為最終的錯誤檢查簽名的凡辱。該 hash 算法的速度接近于 RAM 的極限戒职。并給出了32位和64位的兩個版本。現(xiàn)在它被廣泛使用在PrestoDB透乾、RocksDB帕涌、MySQLArangoDB续徽、PGroonga蚓曼、Spark 這些數(shù)據(jù)庫中,還用在了 Cocos2D钦扭、Dolphin纫版、Cxbx-reloaded 這些游戲框架中,

下面這有一個性能對比的實驗客情。測試環(huán)境是 Open-Source SMHasher program by Austin Appleby 其弊,它是在 Windows 7 上通過 Visual C 編譯出來的,并且它只有唯一一個線程膀斋。CPU 內(nèi)核是 Core 2 Duo @3.0GHz梭伐。

上表里面的 hash 函數(shù)并不是所有的 hash 函數(shù),只列舉了一些常見的算法仰担。第二欄是速度的對比糊识,可以看出來速度最快的是 xxHash 。第三欄是哈希的質(zhì)量,哈希質(zhì)量最高的有5個赂苗,全是5星愉耙,xxHash、MurmurHash 3a拌滋、CityHash64朴沿、MD5-32、SHA1-32 败砂。從表里的數(shù)據(jù)看赌渣,哈希質(zhì)量最高,速度最快的還是 xxHash昌犹。

(4) memhash

這個哈希算法筆者沒有在網(wǎng)上找到很明確的作者信息坚芜。只在 Google 的 Go 的文檔上有這么幾行注釋,說明了它的靈感來源:


// Hashing algorithm inspired by
//   xxhash: https://code.google.com/p/xxhash/
// cityhash: https://code.google.com/p/cityhash/

它說 memhash 的靈感來源于 xxhash 和 cityhash祭隔。那么接下來就來看看 memhash 是怎么對字符串進(jìn)行哈希的货岭。



const (
    // Constants for multiplication: four random odd 32-bit numbers.
    m1 = 3168982561
    m2 = 3339683297
    m3 = 832293441
    m4 = 2336365089
)

func memhash(p unsafe.Pointer, seed, s uintptr) uintptr {
    if GOARCH == "386" && GOOS != "nacl" && useAeshash {
        return aeshash(p, seed, s)
    }
    h := uint32(seed + s*hashkey[0])
tail:
    switch {
    case s == 0:
    case s < 4:
        h ^= uint32(*(*byte)(p))
        h ^= uint32(*(*byte)(add(p, s>>1))) << 8
        h ^= uint32(*(*byte)(add(p, s-1))) << 16
        h = rotl_15(h*m1) * m2
    case s == 4:
        h ^= readUnaligned32(p)
        h = rotl_15(h*m1) * m2
    case s <= 8:
        h ^= readUnaligned32(p)
        h = rotl_15(h*m1) * m2
        h ^= readUnaligned32(add(p, s-4))
        h = rotl_15(h*m1) * m2
    case s <= 16:
        h ^= readUnaligned32(p)
        h = rotl_15(h*m1) * m2
        h ^= readUnaligned32(add(p, 4))
        h = rotl_15(h*m1) * m2
        h ^= readUnaligned32(add(p, s-8))
        h = rotl_15(h*m1) * m2
        h ^= readUnaligned32(add(p, s-4))
        h = rotl_15(h*m1) * m2
    default:
        v1 := h
        v2 := uint32(seed * hashkey[1])
        v3 := uint32(seed * hashkey[2])
        v4 := uint32(seed * hashkey[3])
        for s >= 16 {
            v1 ^= readUnaligned32(p)
            v1 = rotl_15(v1*m1) * m2
            p = add(p, 4)
            v2 ^= readUnaligned32(p)
            v2 = rotl_15(v2*m2) * m3
            p = add(p, 4)
            v3 ^= readUnaligned32(p)
            v3 = rotl_15(v3*m3) * m4
            p = add(p, 4)
            v4 ^= readUnaligned32(p)
            v4 = rotl_15(v4*m4) * m1
            p = add(p, 4)
            s -= 16
        }
        h = v1 ^ v2 ^ v3 ^ v4
        goto tail
    }
    h ^= h >> 17
    h *= m3
    h ^= h >> 13
    h *= m4
    h ^= h >> 16
    return uintptr(h)
}

// Note: in order to get the compiler to issue rotl instructions, we
// need to constant fold the shift amount by hand.
// TODO: convince the compiler to issue rotl instructions after inlining.
func rotl_15(x uint32) uint32 {
    return (x << 15) | (x >> (32 - 15))
}


m1、m2疾渴、m3千贯、m4 是4個隨機(jī)選的奇數(shù),作為哈希的乘法因子搞坝。


// used in hash{32,64}.go to seed the hash function
var hashkey [4]uintptr

func alginit() {
    // Install aes hash algorithm if we have the instructions we need
    if (GOARCH == "386" || GOARCH == "amd64") &&
        GOOS != "nacl" &&
        cpuid_ecx&(1<<25) != 0 && // aes (aesenc)
        cpuid_ecx&(1<<9) != 0 && // sse3 (pshufb)
        cpuid_ecx&(1<<19) != 0 { // sse4.1 (pinsr{d,q})
        useAeshash = true
        algarray[alg_MEM32].hash = aeshash32
        algarray[alg_MEM64].hash = aeshash64
        algarray[alg_STRING].hash = aeshashstr
        // Initialize with random data so hash collisions will be hard to engineer.
        getRandomData(aeskeysched[:])
        return
    }
    getRandomData((*[len(hashkey) * sys.PtrSize]byte)(unsafe.Pointer(&hashkey))[:])
    hashkey[0] |= 1 // make sure these numbers are odd
    hashkey[1] |= 1
    hashkey[2] |= 1
    hashkey[3] |= 1
}

在這個初始化的函數(shù)中搔谴,初始化了2個數(shù)組,數(shù)組里面裝的都是隨機(jī)的 hashkey桩撮。在 386敦第、 amd64、非 nacl 的平臺上店量,會用 aeshash 芜果。這里會把隨機(jī)的 key 生成好,存入到 aeskeysched 數(shù)組中融师。同理右钾,hashkey 數(shù)組里面也會隨機(jī)好4個數(shù)字。最后都按位與了一個1旱爆,就是為了保證生成出來的隨機(jī)數(shù)都是奇數(shù)舀射。

接下來舉個例子,來看看 memhash 究竟是如何計算哈希值的怀伦。


func main() {
    r := [8]byte{'h', 'a', 'l', 'f', 'r', 'o', 's', 't'}
    pp := memhashpp(unsafe.Pointer(&r), 3, 7)
    fmt.Println(pp)
}

為了簡單起見脆烟,這里用筆者的名字為例算出哈希值,種子簡單一點設(shè)置成3房待。

第一步計算 h 的值邢羔。


h := uint32(seed + s*hashkey[0])

這里假設(shè) hashkey[0] = 1驼抹,那么 h 的值為 3 + 7 * 1 = 10 。由于 s < 8张抄,那么就會進(jìn)行以下的處理:


    case s <= 8:
        h ^= readUnaligned32(p)
        h = rotl_15(h*m1) * m2
        h ^= readUnaligned32(add(p, s-4))
        h = rotl_15(h*m1) * m2

readUnaligned32()函數(shù)會把傳入的 unsafe.Pointer 指針進(jìn)行2次轉(zhuǎn)換砂蔽,先轉(zhuǎn)成 *uint32 類型洼怔,然后再轉(zhuǎn)成 *(*uint32) 類型署惯。

接著進(jìn)行異或操作:

接著第二步 h * m1 = 1718378850 * 3168982561 = 3185867170

由于是32位的乘法,最終結(jié)果是64位的镣隶,高32位溢出极谊,直接舍棄。

乘出來的結(jié)果當(dāng)做 rotl_15() 入?yún)ⅰ?/p>


func rotl_15(x uint32) uint32 {
    return (x << 15) | (x >> (32 - 15))
}


這個函數(shù)里面對入?yún)⑦M(jìn)行了兩次位移操作安岂。

最后將兩次位移的結(jié)果進(jìn)行邏輯或運算:

接著再進(jìn)行一次 readUnaligned32() 轉(zhuǎn)換:

轉(zhuǎn)換完再進(jìn)行一次異或轻猖。此時 h = 2615762644。

然后還要再進(jìn)行一次 rotl_15() 變換域那。這里就不畫圖演示了咙边。變換完成以后 h = 2932930721。

最后執(zhí)行 hash 的最后一步:


    h ^= h >> 17
    h *= m3
    h ^= h >> 13
    h *= m4
    h ^= h >> 16

先右移17位次员,然后異或败许,再乘以m3,再右移13位淑蔚,再異或市殷,再乘以m4,再右移16位刹衫,最后再異或醋寝。

通過這樣一系列的操作,最后就能生成出 hash 值了带迟。最后 h = 1870717864音羞。感興趣的同學(xué)可以算一算。

(5)AES Hash

在上面分析 Go 的 hash 算法的時候仓犬,我們可以看到它對 CPU 是否支持 AES 指令集進(jìn)行了判斷嗅绰,當(dāng) CPU 支持 AES 指令集的時候,它會選用 AES Hash 算法婶肩,當(dāng) CPU 不支持 AES 指令集的時候办陷,換成 memhash 算法。

AES 指令集全稱是高級加密標(biāo)準(zhǔn)指令集(或稱英特爾高級加密標(biāo)準(zhǔn)新指令律歼,簡稱AES-NI)是一個 x86指令集架構(gòu) 的擴(kuò)展民镜,用于 IntelAMD微處理器

利用 AES 實現(xiàn) Hash 算法性能會很優(yōu)秀险毁,因為它能提供硬件加速制圈。

具體代碼實現(xiàn)如下们童,匯編程序,注釋見下面程序中:


// aes hash 算法通過 AES 硬件指令集實現(xiàn)
TEXT runtime·aeshash(SB),NOSPLIT,$0-32
    MOVQ    p+0(FP), AX // 把ptr移動到data數(shù)據(jù)段中
    MOVQ    s+16(FP), CX    // 長度
    LEAQ    ret+24(FP), DX
    JMP runtime·aeshashbody(SB)

TEXT runtime·aeshashstr(SB),NOSPLIT,$0-24
    MOVQ    p+0(FP), AX // 把ptr移動到字符串的結(jié)構(gòu)體中
    MOVQ    8(AX), CX   // 字符串長度
    MOVQ    (AX), AX    // 字符串的數(shù)據(jù)
    LEAQ    ret+16(FP), DX
    JMP runtime·aeshashbody(SB)

最終的 hash 的實現(xiàn)都在 aeshashbody 中:


// AX: 數(shù)據(jù)
// CX: 長度
// DX: 返回的地址
TEXT runtime·aeshashbody(SB),NOSPLIT,$0-0
    // SSE 寄存器中裝填入我們的隨機(jī)數(shù)種子
    MOVQ    h+8(FP), X0         // 每個table中hash種子有64 位
    PINSRW  $4, CX, X0          // 長度占16位
    PSHUFHW $0, X0, X0          // 壓縮高位字亂序鲸鹦,重復(fù)長度4次
    MOVO    X0, X1              // 保存加密前的種子
    PXOR    runtime·aeskeysched(SB), X0 // 對每一個處理中的種子進(jìn)行邏輯異或
    AESENC  X0, X0              // 加密種子

    CMPQ    CX, $16
    JB  aes0to15
    JE  aes16
    CMPQ    CX, $32
    JBE aes17to32
    CMPQ    CX, $64
    JBE aes33to64
    CMPQ    CX, $128
    JBE aes65to128
    JMP aes129plus

// aes 從 0 - 15
aes0to15:
    TESTQ   CX, CX
    JE  aes0

    ADDQ    $16, AX
    TESTW   $0xff0, AX
    JE  endofpage

    //當(dāng)前加載的16位字節(jié)的地址不會越過一個頁面邊界慧库,所以我們可以直接加載它。
    MOVOU   -16(AX), X1
    ADDQ    CX, CX
    MOVQ    $masks<>(SB), AX
    PAND    (AX)(CX*8), X1
final1:
    PXOR    X0, X1  // 異或數(shù)據(jù)和種子
    AESENC  X1, X1  // 連續(xù)加密3次
    AESENC  X1, X1
    AESENC  X1, X1
    MOVQ    X1, (DX)
    RET

endofpage:
    // 地址結(jié)尾是1111xxxx馋嗜。 這樣就可能超過一個頁面邊界齐板,所以在加載完最后一個字節(jié)后停止加載。然后使用pshufb將字節(jié)向下移動葛菇。
    MOVOU   -32(AX)(CX*1), X1
    ADDQ    CX, CX
    MOVQ    $shifts<>(SB), AX
    PSHUFB  (AX)(CX*8), X1
    JMP final1

aes0:
    // 返回輸入的并且已經(jīng)加密過的種子
    AESENC  X0, X0
    MOVQ    X0, (DX)
    RET

aes16:
    MOVOU   (AX), X1
    JMP final1

aes17to32:
    // 開始處理第二個起始種子
    PXOR    runtime·aeskeysched+16(SB), X1
    AESENC  X1, X1
    
    // 加載要被哈希算法處理的數(shù)據(jù)
    MOVOU   (AX), X2
    MOVOU   -16(AX)(CX*1), X3

    // 異或種子
    PXOR    X0, X2
    PXOR    X1, X3

    // 連續(xù)加密3次
    AESENC  X2, X2
    AESENC  X3, X3
    AESENC  X2, X2
    AESENC  X3, X3
    AESENC  X2, X2
    AESENC  X3, X3

    // 拼接并生成結(jié)果
    PXOR    X3, X2
    MOVQ    X2, (DX)
    RET

aes33to64:
    // 處理第三個以上的起始種子
    MOVO    X1, X2
    MOVO    X1, X3
    PXOR    runtime·aeskeysched+16(SB), X1
    PXOR    runtime·aeskeysched+32(SB), X2
    PXOR    runtime·aeskeysched+48(SB), X3
    AESENC  X1, X1
    AESENC  X2, X2
    AESENC  X3, X3
    
    MOVOU   (AX), X4
    MOVOU   16(AX), X5
    MOVOU   -32(AX)(CX*1), X6
    MOVOU   -16(AX)(CX*1), X7

    PXOR    X0, X4
    PXOR    X1, X5
    PXOR    X2, X6
    PXOR    X3, X7
    
    AESENC  X4, X4
    AESENC  X5, X5
    AESENC  X6, X6
    AESENC  X7, X7
    
    AESENC  X4, X4
    AESENC  X5, X5
    AESENC  X6, X6
    AESENC  X7, X7
    
    AESENC  X4, X4
    AESENC  X5, X5
    AESENC  X6, X6
    AESENC  X7, X7

    PXOR    X6, X4
    PXOR    X7, X5
    PXOR    X5, X4
    MOVQ    X4, (DX)
    RET

aes65to128:
    // 處理第七個以上的起始種子
    MOVO    X1, X2
    MOVO    X1, X3
    MOVO    X1, X4
    MOVO    X1, X5
    MOVO    X1, X6
    MOVO    X1, X7
    PXOR    runtime·aeskeysched+16(SB), X1
    PXOR    runtime·aeskeysched+32(SB), X2
    PXOR    runtime·aeskeysched+48(SB), X3
    PXOR    runtime·aeskeysched+64(SB), X4
    PXOR    runtime·aeskeysched+80(SB), X5
    PXOR    runtime·aeskeysched+96(SB), X6
    PXOR    runtime·aeskeysched+112(SB), X7
    AESENC  X1, X1
    AESENC  X2, X2
    AESENC  X3, X3
    AESENC  X4, X4
    AESENC  X5, X5
    AESENC  X6, X6
    AESENC  X7, X7

    // 加載數(shù)據(jù)
    MOVOU   (AX), X8
    MOVOU   16(AX), X9
    MOVOU   32(AX), X10
    MOVOU   48(AX), X11
    MOVOU   -64(AX)(CX*1), X12
    MOVOU   -48(AX)(CX*1), X13
    MOVOU   -32(AX)(CX*1), X14
    MOVOU   -16(AX)(CX*1), X15

    // 異或種子
    PXOR    X0, X8
    PXOR    X1, X9
    PXOR    X2, X10
    PXOR    X3, X11
    PXOR    X4, X12
    PXOR    X5, X13
    PXOR    X6, X14
    PXOR    X7, X15

    // 連續(xù)加密3次
    AESENC  X8, X8
    AESENC  X9, X9
    AESENC  X10, X10
    AESENC  X11, X11
    AESENC  X12, X12
    AESENC  X13, X13
    AESENC  X14, X14
    AESENC  X15, X15

    AESENC  X8, X8
    AESENC  X9, X9
    AESENC  X10, X10
    AESENC  X11, X11
    AESENC  X12, X12
    AESENC  X13, X13
    AESENC  X14, X14
    AESENC  X15, X15

    AESENC  X8, X8
    AESENC  X9, X9
    AESENC  X10, X10
    AESENC  X11, X11
    AESENC  X12, X12
    AESENC  X13, X13
    AESENC  X14, X14
    AESENC  X15, X15

    // 拼裝結(jié)果
    PXOR    X12, X8
    PXOR    X13, X9
    PXOR    X14, X10
    PXOR    X15, X11
    PXOR    X10, X8
    PXOR    X11, X9
    PXOR    X9, X8
    MOVQ    X8, (DX)
    RET

aes129plus:
    // 處理第七個以上的起始種子
    MOVO    X1, X2
    MOVO    X1, X3
    MOVO    X1, X4
    MOVO    X1, X5
    MOVO    X1, X6
    MOVO    X1, X7
    PXOR    runtime·aeskeysched+16(SB), X1
    PXOR    runtime·aeskeysched+32(SB), X2
    PXOR    runtime·aeskeysched+48(SB), X3
    PXOR    runtime·aeskeysched+64(SB), X4
    PXOR    runtime·aeskeysched+80(SB), X5
    PXOR    runtime·aeskeysched+96(SB), X6
    PXOR    runtime·aeskeysched+112(SB), X7
    AESENC  X1, X1
    AESENC  X2, X2
    AESENC  X3, X3
    AESENC  X4, X4
    AESENC  X5, X5
    AESENC  X6, X6
    AESENC  X7, X7
    
    // 逆序開始甘磨,從最后的block開始處理,因為可能會出現(xiàn)重疊的情況
    MOVOU   -128(AX)(CX*1), X8
    MOVOU   -112(AX)(CX*1), X9
    MOVOU   -96(AX)(CX*1), X10
    MOVOU   -80(AX)(CX*1), X11
    MOVOU   -64(AX)(CX*1), X12
    MOVOU   -48(AX)(CX*1), X13
    MOVOU   -32(AX)(CX*1), X14
    MOVOU   -16(AX)(CX*1), X15

    // 異或種子
    PXOR    X0, X8
    PXOR    X1, X9
    PXOR    X2, X10
    PXOR    X3, X11
    PXOR    X4, X12
    PXOR    X5, X13
    PXOR    X6, X14
    PXOR    X7, X15
    
    // 計算剩余128字節(jié)塊的數(shù)量
    DECQ    CX
    SHRQ    $7, CX
    
aesloop:
    // 加密狀態(tài)
    AESENC  X8, X8
    AESENC  X9, X9
    AESENC  X10, X10
    AESENC  X11, X11
    AESENC  X12, X12
    AESENC  X13, X13
    AESENC  X14, X14
    AESENC  X15, X15

    // 在同一個block塊中加密狀態(tài)眯停,進(jìn)行異或運算
    MOVOU   (AX), X0
    MOVOU   16(AX), X1
    MOVOU   32(AX), X2
    MOVOU   48(AX), X3
    AESENC  X0, X8
    AESENC  X1, X9
    AESENC  X2, X10
    AESENC  X3, X11
    MOVOU   64(AX), X4
    MOVOU   80(AX), X5
    MOVOU   96(AX), X6
    MOVOU   112(AX), X7
    AESENC  X4, X12
    AESENC  X5, X13
    AESENC  X6, X14
    AESENC  X7, X15

    ADDQ    $128, AX
    DECQ    CX
    JNE aesloop

    // 最后一步济舆,進(jìn)行3次以上的加密
    AESENC  X8, X8
    AESENC  X9, X9
    AESENC  X10, X10
    AESENC  X11, X11
    AESENC  X12, X12
    AESENC  X13, X13
    AESENC  X14, X14
    AESENC  X15, X15
    AESENC  X8, X8
    AESENC  X9, X9
    AESENC  X10, X10
    AESENC  X11, X11
    AESENC  X12, X12
    AESENC  X13, X13
    AESENC  X14, X14
    AESENC  X15, X15
    AESENC  X8, X8
    AESENC  X9, X9
    AESENC  X10, X10
    AESENC  X11, X11
    AESENC  X12, X12
    AESENC  X13, X13
    AESENC  X14, X14
    AESENC  X15, X15

    PXOR    X12, X8
    PXOR    X13, X9
    PXOR    X14, X10
    PXOR    X15, X11
    PXOR    X10, X8
    PXOR    X11, X9
    PXOR    X9, X8
    MOVQ    X8, (DX)
    RET


2. 哈希沖突處理

(1)鏈表數(shù)組法

鏈表數(shù)組法比較簡單,每個鍵值對表長取模莺债,如果結(jié)果相同滋觉,用鏈表的方式依次往后插入。

假設(shè)待插入的鍵值集合是{ 2齐邦,3椎侠,5,7侄旬,11肺蔚,13,19}儡羔,表長 MOD 8宣羊。假設(shè)哈希函數(shù)在[0,9)上均勻分布。如上圖汰蜘。

接下來重點進(jìn)行性能分析:

查找鍵值 k仇冯,假設(shè)鍵值 k 不在哈希表中,h(k) 在 [0族操,M) 中均勻分布苛坚,即 P(h(k) = i) = 1/M 。令 Xi 為哈希表 ht[ i ] 中包含鍵值的個數(shù)色难。如果 h(k) = i 泼舱,則不成功查找 k 的鍵值比較次數(shù)是 Xi,于是:

成功查找的分析稍微復(fù)雜一點枷莉。要考慮添加哈希表的次序娇昙,不考慮有相同鍵值的情況,假設(shè) K = {k1,k2,……kn}笤妙,并且假設(shè)從空哈希表開始按照這個次序添加到哈希表中冒掌。引入隨機(jī)變量噪裕,如果 h(ki) = h(kj),那么 Xij = 1股毫;如果 h(ki) 膳音!= h(kj),那么 Xij = 0 铃诬。

由于之前的假設(shè)哈希表是均勻分布的祭陷,所以 P(Xij = i) = E(Xij) = 1/M ,這里的 E(X) 表示隨機(jī)變量 X 的數(shù)學(xué)期望氧急。再假設(shè)每次添加鍵值的時候都是把添加在鏈表末端颗胡。令 Ci 為查找 Ki 所需的鍵值比較次數(shù)毫深,由于不能事先確定查找 Ki 的概率吩坝,所以假定查找不同鍵值的概率都是相同的,都是 1/n 哑蔫,則有:

由此我們可以看出钉寝,哈希表的性能和表中元素的多少關(guān)系不大,而和填充因子 α 有關(guān)闸迷。如果哈希表長和哈希表中元素個數(shù)成正比嵌纲,則哈希表查找的復(fù)雜度為 O(1) 。

綜上所述腥沽,鏈表數(shù)組的成功與不成功的平均鍵值比較次數(shù)如下:

(2)開放地址法 —— 線性探測

線性探測的規(guī)則是 hi = ( h(k) + i ) MOD M逮走。舉個例子,i = 1今阳,M = 9师溅。

這種處理沖突的方法,一旦發(fā)生沖突盾舌,就把位置往后加1墓臭,直到找到一個空的位置。

舉例如下妖谴,假設(shè)待插入的鍵值集合是{2窿锉,3,5膝舅,7嗡载,11,13仍稀,19}洼滚,線性探測的發(fā)生沖突以后添加的值為1,那么最終結(jié)果如下:

線性探測哈希表的性能分析比較復(fù)雜琳轿,這里就僅給出結(jié)果判沟。

(3)開放地址法 —— 平方探測

線性探測的規(guī)則是 h0 = h(k) 耿芹,hi = ( h0 + i * i ) MOD M。

舉例如下挪哄,假設(shè)待插入的鍵值集合是{2吧秕,3,5迹炼,7砸彬,11,13斯入,20}砂碉,平方探測的發(fā)生沖突以后添加的值為查找次數(shù)的平方,那么最終結(jié)果如下:

平方探測在線性探測的基礎(chǔ)上刻两,加了一個二次曲線增蹭。當(dāng)發(fā)生沖突以后,不再是加一個線性的參數(shù)磅摹,而是加上探測次數(shù)的平方滋迈。

平方探測有一個需要注意的是,M的大小有講究户誓。如果M不是奇素數(shù)饼灿,那么就可能出現(xiàn)下面這樣的問題,即使哈希表里面還有空的位置帝美,但是卻有元素找不到要插入的位置碍彭。

舉例,假設(shè) M = 10悼潭,待插入的鍵值集合是{0庇忌,1,4女责,5漆枚,6,9抵知,10}墙基,當(dāng)前面6個鍵值插入哈希表中以后,10就再也無法插入了刷喜。

所以在平方探測中残制,存在下面這則規(guī)律:

如果 M 為奇素數(shù),則下面的 ?M / 2? 位置 h0掖疮,h1初茶,h2 …… h?M/2? 互不相同。其中浊闪,hi = (h0 + i * i ) MOD M恼布。

這面這則規(guī)律可以用反證法證明螺戳。假設(shè) hi = hj,i > j折汞;0<=i倔幼,j<= ?M/2?,則 h0 + i* i = ( h0 + j * j ) MOD M爽待,從而 M 可以整除 ( i + j )( i - j )损同。由于 M 為素數(shù),并且 0 < i + j鸟款,i - j < M膏燃,當(dāng)且僅當(dāng) i = j 的時候才能滿足瓦胎。

上述規(guī)則也就說明了一點恩掷,只要 M 為奇素數(shù),平方探測至少可以遍歷哈希表一般的位置壶冒。所以只要哈希表的填充因子 α <= 1 / 2 富俄,平方探測總能找到可插入的位置禁炒。

上述舉的例子,之所以鍵值10無法插入霍比,原因也因為 α > 1 / 2了,所以不能保證有可插入的位置了暴备。

(4)開放地址法 —— 雙哈希探測

雙哈希探測是為了解決聚集的現(xiàn)象悠瞬。無論是線性探測還是平方探測,如果 h(k1) 和 h(k2) 相鄰涯捻,則它們的探測序列也都是相鄰的浅妆,這就是所謂的聚集現(xiàn)象。為了避免這種現(xiàn)象障癌,所以引入了雙哈希函數(shù) h2凌外,使得兩次探測之間的距離為 h2(k)。所以探測序列為 h0 = h1(k)涛浙,hi = ( h0 + i * h2(k) ) MOD M 康辑。實驗表明,雙哈希探測的性能類似于隨機(jī)探測轿亮。

關(guān)于雙哈希探測和平方探測的平均查找長度比線性探測更加困難疮薇。所以引入隨機(jī)探測的概念來近似這兩種探測。隨機(jī)探測是指探測序列 { hi } 在區(qū)間 [0我注,M]中等概率獨立隨機(jī)選取按咒,這樣 P(hi = j) = 1/M 。

假設(shè)探測序列為 h0但骨,h1励七,……智袭,hi。在哈希表的 hi 位置為空掠抬,在 h0补履,h1,……剿另,hi-1 的位置上哈希表不是空箫锤,此次查找的鍵值比較次數(shù)為 i。令隨機(jī)變量 X 為一次不成功查找所需的鍵值比較次數(shù)雨女。由于哈希表的填充因子為 α谚攒,所以在一個位置上哈希表為空值的概率為 1 - α ,為非空值的概率為 α氛堕,所以 P( X = i ) = α^i * ( 1 - α ) 馏臭。

在概率論中,上述的分布叫幾何分布讼稚。

假定哈希表元素的添加順序為 {k1括儒,k2,…… 锐想,kn}帮寻,令 Xi 為當(dāng)哈希表只包含 {k1,k2赠摇,…… 固逗,ki} 時候一次不成功查找的鍵值比較次數(shù),注意藕帜,這個時候哈希表的填充因子為 i/M 烫罩,則查找 k(i+1) 的鍵值次數(shù)為 Yi = 1 + Xi。假定查找任意一個鍵值的概率為 1/n洽故,則一次成功查找的平均鍵值比較次數(shù)為:

綜上所述贝攒,平方探測和雙哈希探測的成功與不成功的平均鍵值比較次數(shù)如下:

總的來說,在數(shù)據(jù)量非常大的情況下时甚,簡單的 hash 函數(shù)不可避免不產(chǎn)生碰撞隘弊,即使采用了合適的處理碰撞的方法,依舊有一定時間復(fù)雜度撞秋。所以想盡可能的避免碰撞长捧,還是要選擇高性能的 hash 函數(shù),或者增加 hash 的位數(shù)吻贿,比如64位串结,128位,256位,這樣碰撞的幾率會小很多肌割。

3. 哈希表的擴(kuò)容策略

隨著哈希表裝載因子的變大卧蜓,發(fā)生碰撞的次數(shù)變得越來也多,哈希表的性能變得越來越差把敞。對于單獨鏈表法實現(xiàn)的哈希表弥奸,尚可以容忍,但是對于開放尋址法奋早,這種性能的下降是不能接受的盛霎,因此對于開放尋址法需要尋找一種方法解決這個問題。

在實際應(yīng)用中耽装,解決這個問題的辦法是動態(tài)的增大哈希表的長度愤炸,當(dāng)裝載因子超過某個閾值時增加哈希表的長度,自動擴(kuò)容掉奄。每當(dāng)哈希表的長度發(fā)生變化之后规个,所有 key 在哈希表中對應(yīng)的下標(biāo)索引需要全部重新計算,不能直接從原來的哈希表中拷貝到新的哈希表中姓建。必須一個一個計算原來哈希表中的 key 的哈希值并插入到新的哈希表中诞仓。這種方式肯定是達(dá)不到生產(chǎn)環(huán)境的要求的,因為時間復(fù)雜度太高了速兔,O(n)墅拭,數(shù)據(jù)量一旦大了,性能就會很差憨栽。Redis 想了一種方法帜矾,就算是觸發(fā)增長時也只需要常數(shù)時間 O(1) 即可完成插入操作。解決辦法是分多次屑柔、漸進(jìn)式地完成的舊哈希表到新哈希表的拷貝而不是一次拷貝完成。

接下來以 Redis 為例珍剑,來談?wù)勊枪1硎侨绾芜M(jìn)行擴(kuò)容并且不太影響性能的掸宛。

Redis 對字典的定義如下:


/*
 * 字典
 *
 * 每個字典使用兩個哈希表,用于實現(xiàn)漸進(jìn)式 rehash
 */
typedef struct dict {
    // 特定于類型的處理函數(shù)
    dictType *type;
    // 類型處理函數(shù)的私有數(shù)據(jù)
    void *privdata;
    // 哈希表(2 個)
    dictht ht[2];
    // 記錄 rehash 進(jìn)度的標(biāo)志招拙,值為 -1 表示 rehash 未進(jìn)行
    int rehashidx;
    // 當(dāng)前正在運作的安全迭代器數(shù)量
    int iterators;
} dict;


從定義上我們可以看到唧瘾,Redis 字典保存了2個哈希表,哈希表ht[1]就是用來 rehash 的别凤。

在 Redis 中定義了如下的哈希表的數(shù)據(jù)結(jié)構(gòu):


/*
 * 哈希表
 */
typedef struct dictht {
    // 哈希表節(jié)點指針數(shù)組(俗稱桶饰序,bucket)
    dictEntry **table;
    // 指針數(shù)組的大小
    unsigned long size;
    // 指針數(shù)組的長度掩碼,用于計算索引值
    unsigned long sizemask;
    // 哈希表現(xiàn)有的節(jié)點數(shù)量
    unsigned long used;

} dictht;


table 屬性是個數(shù)組规哪, 數(shù)組的每個元素都是個指向 dictEntry 結(jié)構(gòu)的指針求豫。

每個 dictEntry 都保存著一個鍵值對, 以及一個指向另一個 dictEntry 結(jié)構(gòu)的指針:

/*
 * 哈希表節(jié)點
 */
typedef struct dictEntry {
    // 鍵
    void *key;
    // 值
    union {
        void *val;
        uint64_t u64;
        int64_t s64;
    } v;
    // 鏈往后繼節(jié)點
    struct dictEntry *next;

} dictEntry;

next 屬性指向另一個 dictEntry 結(jié)構(gòu), 多個 dictEntry 可以通過 next 指針串連成鏈表蝠嘉, 從這里可以看出最疆, dictht 使用鏈地址法來處理鍵碰撞問題的。

dictAdd 在每次向字典添加新鍵值對之前蚤告, 都會對哈希表 ht[0] 進(jìn)行檢查努酸, 對于 ht[0] 的 size 和 used 屬性, 如果它們之間的比率 ratio = used / size 滿足以下任何一個條件的話杜恰,rehash 過程就會被激活:

自然 rehash : ratio >= 1 获诈,且變量 dict_can_resize 為真。
強(qiáng)制 rehash : ratio 大于變量 dict_force_resize_ratio (目前版本中心褐, dict_force_resize_ratio 的值為 5 )舔涎。

假設(shè)當(dāng)前的字典需要擴(kuò)容 rehash,那么 Redis 會先設(shè)置字典的 rehashidx 為 0 檬寂,標(biāo)識著 rehash 的開始终抽;再為 ht[1]->table 分配空間,大小至少為 ht[0]->used 的兩倍桶至。

如上圖昼伴, ht[1]->table 已經(jīng)分配空間了8個空間了。

接著镣屹,開始 rehash 圃郊。將 ht[0]->table 內(nèi)的鍵值移動到 ht[1]->table 中,鍵值的移動不是一次完成的女蜈,分多次進(jìn)行持舆。

上圖可以看出來, ht[0] 中的一部分鍵值已經(jīng)遷移到 ht[1] 中了伪窖,并且此時還有新的鍵值插入進(jìn)來逸寓,是直接插入到 ht[1] 中的,不會再插入到 ht[0] 中了覆山。保證了 ht[0] 只減不增竹伸。

在 rehash 進(jìn)行的過程中,不斷的有新的鍵值插入進(jìn)來簇宽,也不斷的把 ht[0] 中的鍵值都遷移過來勋篓,直到 ht[0] 中的鍵值都遷移過來為止。注意 Redis 用的是頭插法魏割,新值永遠(yuǎn)都插在鏈表的第一個位置譬嚣,這樣也不用遍歷到鏈表的最后,省去了 O(n) 的時間復(fù)雜度钞它。進(jìn)行到上圖這種情況拜银,所有的節(jié)點也就遷移完畢了殊鞭。

rehash 在結(jié)束之前會進(jìn)行清理工作,釋放 ht[0] 的空間盐股;用 ht[1] 來代替 ht[0] 钱豁,使原來的 ht[1] 成為新的 ht[0] ;創(chuàng)建一個新的空哈希表疯汁,并將它設(shè)置為 ht[1] 牲尺;將字典的 rehashidx 屬性設(shè)置為 -1 ,標(biāo)識 rehash 已停止幌蚊;

最終 rehash 結(jié)束以后情況如上圖谤碳。如果還下次還需要 rehash ,重復(fù)上述過程即可溢豆。這種分多次蜒简,漸進(jìn)式 rehash 的方式也成就了 Redis 的高性能。

值得一提的是漩仙,Redis 是支持字典的 reshrink 操作的搓茬。操作步驟就是
rehash 的逆過程。

二. 紅黑樹優(yōu)化

讀到這里队他,讀者應(yīng)該已經(jīng)明白了到底用什么方式來控制 map 使得
Hash 碰撞的概率又小卷仑,哈希桶數(shù)組占用空間又少了吧,答案就是選擇好的 Hash 算法和增加擴(kuò)容機(jī)制麸折。

Java 在 JDK1.8 對 HashMap 底層的實現(xiàn)再次進(jìn)行了優(yōu)化锡凝。

上圖是來自美團(tuán)博客總結(jié)的。從這里我們可以發(fā)現(xiàn):

Java 底層初始桶的個數(shù)是16個垢啼,負(fù)載因子默認(rèn)是0.75窜锯,也就是說當(dāng)鍵值第一次達(dá)到12個的時候就會進(jìn)行擴(kuò)容 resize。擴(kuò)容的臨界值在64芭析,當(dāng)超過了64以后锚扎,并且沖突節(jié)點為8或者大于8,這個時候就會觸發(fā)紅黑樹轉(zhuǎn)換馁启。為了防止底層鏈表過長工秩,鏈表就轉(zhuǎn)換為紅黑樹。

換句話說进统,當(dāng)桶的總個數(shù)沒有到64個的時候,即使鏈表長為8浪听,也不會進(jìn)行紅黑樹轉(zhuǎn)換螟碎。

如果節(jié)點小于6個,紅黑樹又會重新退化成鏈表迹栓。

當(dāng)然這里之所以選擇用紅黑樹來進(jìn)行優(yōu)化掉分,保證最壞情況不會退化成
O(n),紅黑樹能保證最壞時間復(fù)雜度也為 O(log n)。

在美團(tuán)博客中也提到了酥郭,Java 在 JDK1.8 中還有一個值得學(xué)習(xí)的優(yōu)化华坦。Java 在 rehash 的鍵值節(jié)點遷移過程中,不需要再次計算一次 hash 計算不从!

由于使用了2次冪的擴(kuò)展(指長度擴(kuò)為原來2倍)惜姐,所以,元素的位置要么是在原位置椿息,要么是在原位置再移動2次冪的位置歹袁。看下圖可以明白這句話的意思寝优,n 為 table 的長度条舔,圖(a)表示擴(kuò)容前的 key1 和
key2 兩種 key 確定索引位置的示例,圖(b)表示擴(kuò)容后 key1 和
key2 兩種 key 確定索引位置的示例乏矾,其中 hash1 是 key1 對應(yīng)的哈希與高位運算結(jié)果孟抗。

元素在重新計算 hash 之后,因為 n 變?yōu)?倍钻心,那么 n-1 的 mask 范圍在高位多1bit(紅色)凄硼,因此新的 index 就會發(fā)生這樣的變化:

所以在擴(kuò)容以后,就只需要看擴(kuò)容容量以后那個位上的值為0扔役,還是為1帆喇,如果是0,代表索引不變亿胸,如果是1坯钦,代表的是新的索引值等于原來的索引值加上 oldCap 即可,這樣就不需要再次計算一次 hash 了侈玄。

上圖是把16擴(kuò)容到32的情況婉刀。

三. Go 中 Map 的具體實現(xiàn)舉例

讀到這里,讀者對如何設(shè)計一個 Map 應(yīng)該有一些自己的想法了序仙。選擇一個優(yōu)秀的哈希算法突颊,用鏈表 + 數(shù)組 作為底層數(shù)據(jù)結(jié)構(gòu),如何擴(kuò)容和優(yōu)化潘悼,這些應(yīng)該都有了解了律秃。讀到這里也許讀者認(rèn)為本篇文章內(nèi)容已經(jīng)過半了,不過前面這些都是偏理論治唤,接下來也許才到了本文的重點部分 —— 從零開始分析一下完整的 Map 實現(xiàn)棒动。

接下來筆者對 Go 中的 Map 的底層實現(xiàn)進(jìn)行分析,也算是對一個 Map 的具體實現(xiàn)和重要的幾個操作宾添,添加鍵值船惨,刪除鍵值柜裸,擴(kuò)容策略進(jìn)行舉例。

Go 的 map 實現(xiàn)在 /src/runtime/hashmap.go 這個文件中粱锐。

map 底層實質(zhì)還是一個 hash table疙挺。

先來看看 Go 定義了一些常量。



const (
    // 一個桶里面最多可以裝的鍵值對的個數(shù)怜浅,8對铐然。
    bucketCntBits = 3
    bucketCnt     = 1 << bucketCntBits

    // 觸發(fā)擴(kuò)容操作的最大裝載因子的臨界值
    loadFactor = 6.5

    // 為了保持內(nèi)聯(lián),鍵 和 值 的最大長度都是128字節(jié)海雪,如果超過了128個字節(jié)锦爵,就存儲它的指針
    maxKeySize   = 128
    maxValueSize = 128

    // 數(shù)據(jù)偏移應(yīng)該是 bmap 的整數(shù)倍,但是需要正確的對齊奥裸。
    dataOffset = unsafe.Offsetof(struct {
        b bmap
        v int64
    }{}.v)

    // tophash 的一些值
    empty          = 0 // 沒有鍵值對
    evacuatedEmpty = 1 // 沒有鍵值對险掀,并且桶內(nèi)的鍵值被遷移走了。
    evacuatedX     = 2 // 鍵值對有效湾宙,并且已經(jīng)遷移了一個表的前半段
    evacuatedY     = 3 // 鍵值對有效樟氢,并且已經(jīng)遷移了一個表的后半段
    minTopHash     = 4 // 最小的 tophash

    // 標(biāo)記
    iterator     = 1 // 當(dāng)前桶的迭代子
    oldIterator  = 2 // 舊桶的迭代子
    hashWriting  = 4 // 一個goroutine正在寫入map
    sameSizeGrow = 8 // 當(dāng)前字典增長到新字典并且保持相同的大小

    // 迭代子檢查桶ID的哨兵
    noCheck = 1<<(8*sys.PtrSize) - 1
)

這里值得說明的一點是觸發(fā)擴(kuò)容操作的臨界值6.5是怎么得來的。這個值太大會導(dǎo)致overflow buckets過多侠鳄,查找效率降低埠啃,過小會浪費存儲空間。

據(jù) Google 開發(fā)人員稱伟恶,這個值是一個測試的程序碴开,測量出來選擇的一個經(jīng)驗值。

%overflow :
溢出率博秫,平均一個 bucket 有多少個 鍵值kv 的時候會溢出潦牛。

bytes/entry :
平均存一個 鍵值kv 需要額外存儲多少字節(jié)的數(shù)據(jù)。

hitprobe :
查找一個存在的 key 平均查找次數(shù)挡育。

missprobe :
查找一個不存在的 key 平均查找次數(shù)巴碗。

經(jīng)過這幾組測試數(shù)據(jù),最終選定 6.5 作為臨界的裝載因子即寒。

接著看看 Go 中 map header 的定義:



type hmap struct {
    count     int // map 的長度
    flags     uint8
    B         uint8  // log以2為底橡淆,桶個數(shù)的對數(shù) (總共能存 6.5 * 2^B 個元素)
    noverflow uint16 // 近似溢出桶的個數(shù)
    hash0     uint32 // 哈希種子

    buckets    unsafe.Pointer // 有 2^B 個桶的數(shù)組. count==0 的時候,這個數(shù)組為 nil.
    oldbuckets unsafe.Pointer // 舊的桶數(shù)組一半的元素
    nevacuate  uintptr        // 擴(kuò)容增長過程中的計數(shù)器

    extra *mapextra // 可選字段
}


在 Go 的 map header 結(jié)構(gòu)中母赵,也包含了2個指向桶數(shù)組的指針逸爵,buckets 指向新的桶數(shù)組,oldbuckets 指向舊的桶數(shù)組凹嘲。這點和 Redis 字典中也有兩個 dictht 數(shù)組類似痊银。

hmap 的最后一個字段是一個指向 mapextra 結(jié)構(gòu)的指針,它的定義如下:


type mapextra struct {
    overflow [2]*[]*bmap
    nextOverflow *bmap
}

如果一個鍵值對沒有找到對應(yīng)的指針施绎,那么就會把它們先存到溢出桶
overflow 里面溯革。在 mapextra 中還有一個指向下一個可用的溢出桶的指針。

溢出桶 overflow 是一個數(shù)組谷醉,里面存了2個指向 *bmap 數(shù)組的指針致稀。overflow[0] 里面裝的是 hmap.buckets 。overflow[1] 里面裝的是 hmap.oldbuckets俱尼。

再看看桶的數(shù)據(jù)結(jié)構(gòu)的定義抖单,bmap 就是 Go 中 map 里面桶對應(yīng)的結(jié)構(gòu)體類型。



type bmap struct {
    tophash [bucketCnt]uint8
}

桶的定義比較簡單遇八,里面就只是包含了一個 uint8 類型的數(shù)組矛绘,里面包含8個元素。這8個元素存儲的是 hash 值的高8位刃永。

在 tophash 之后的內(nèi)存布局里還有2塊內(nèi)容货矮。緊接著 tophash 之后的是8對 鍵值 key- value 對。并且排列方式是 8個 key 和 8個 value 放在一起斯够。

8對 鍵值 key- value 對結(jié)束以后緊接著一個 overflow 指針囚玫,指向下一個 bmap。從此也可以看出 Go 中 map是用鏈表的方式處理 hash 沖突的读规。

為何 Go 存儲鍵值對的方式不是普通的 key/value抓督、key/value、key/value……這樣存儲的呢束亏?它是鍵 key 都存儲在一起铃在,然后緊接著是 值value 都存儲在一起趣避,為什么會這樣呢戈擒?

在 Redis 中拧廊,當(dāng)使用 REDIS_ENCODING_ZIPLIST 編碼哈希表時虹茶, 程序通過將鍵和值一同推入壓縮列表盏混, 從而形成保存哈希表所需的鍵-值對結(jié)構(gòu)峰搪,如上圖愕难。新添加的 key-value 對會被添加到壓縮列表的表尾嵌洼。

這種結(jié)構(gòu)有一個弊端赖捌,如果存儲的鍵和值的類型不同祝沸,在內(nèi)存中布局中所占字節(jié)不同的話,就需要對齊越庇。比如說存儲一個 map[int64]int8 類型的字典罩锐。

Go 為了節(jié)約內(nèi)存對齊的內(nèi)存消耗,于是把它設(shè)計成上圖所示那樣卤唉。

如果 map 里面存儲了上萬億的大數(shù)據(jù)涩惑,這里節(jié)約出來的內(nèi)存空間還是比較可觀的。

1. 新建 Map

makemap 新建了一個 Map桑驱,如果入?yún)?h 不為空竭恬,那么 map 的 hmap 就是入?yún)⒌倪@個 hmap跛蛋,如果入?yún)?bucket 不為空,那么這個 bucket 桶就作為第一個桶痊硕。



func makemap(t *maptype, hint int64, h *hmap, bucket unsafe.Pointer) *hmap {
    // hmap 的 size 大小的值非法
    if sz := unsafe.Sizeof(hmap{}); sz > 48 || sz != t.hmap.size {
        println("runtime: sizeof(hmap) =", sz, ", t.hmap.size =", t.hmap.size)
        throw("bad hmap size")
    }

    // 超過范圍的 hint 值都為0
    if hint < 0 || hint > int64(maxSliceCap(t.bucket.size)) {
        hint = 0
    }

    // key 值的類型不是 Go 所支持的
    if !ismapkey(t.key) {
        throw("runtime.makemap: unsupported map key type")
    }

    // 通過編譯器和反射檢車 key 值的 size 是否合法
    if t.key.size > maxKeySize && (!t.indirectkey || t.keysize != uint8(sys.PtrSize)) ||
        t.key.size <= maxKeySize && (t.indirectkey || t.keysize != uint8(t.key.size)) {
        throw("key size wrong")
    }
    // 通過編譯器和反射檢車 value 值的 size 是否合法
    if t.elem.size > maxValueSize && (!t.indirectvalue || t.valuesize != uint8(sys.PtrSize)) ||
        t.elem.size <= maxValueSize && (t.indirectvalue || t.valuesize != uint8(t.elem.size)) {
        throw("value size wrong")
    }

    // 雖然以下的變量我們不依賴赊级,而且可以在編譯階段檢查下面這些值的合法性,
    // 但是我們還是在這里檢測岔绸。

    // key 值對齊超過桶的個數(shù)
    if t.key.align > bucketCnt {
        throw("key align too big")
    }
    // value 值對齊超過桶的個數(shù)
    if t.elem.align > bucketCnt {
        throw("value align too big")
    }
    // key 值的 size 不是 key 值對齊的倍數(shù)
    if t.key.size%uintptr(t.key.align) != 0 {
        throw("key size not a multiple of key align")
    }
    // value 值的 size 不是 value 值對齊的倍數(shù)
    if t.elem.size%uintptr(t.elem.align) != 0 {
        throw("value size not a multiple of value align")
    }
    // 桶個數(shù)太小理逊,無法正確對齊
    if bucketCnt < 8 {
        throw("bucketsize too small for proper alignment")
    }
    // 數(shù)據(jù)偏移量不是 key 值對齊的整數(shù)倍,說明需要在桶中填充 key
    if dataOffset%uintptr(t.key.align) != 0 {
        throw("need padding in bucket (key)")
    }
    // 數(shù)據(jù)偏移量不是 value 值對齊的整數(shù)倍盒揉,說明需要在桶中填充 value
    if dataOffset%uintptr(t.elem.align) != 0 {
        throw("need padding in bucket (value)")
    }

    B := uint8(0)
    for ; overLoadFactor(hint, B); B++ {
    }

    // 分配內(nèi)存并初始化哈希表
    // 如果此時 B = 0晋被,那么 hmap 中的 buckets 字段稍后分配
    // 如果 hint 值很大,初始化這塊內(nèi)存需要一段時間刚盈。
    buckets := bucket
    var extra *mapextra
    if B != 0 {
        var nextOverflow *bmap
        // 初始化 bucket 和 nextOverflow 
        buckets, nextOverflow = makeBucketArray(t, B)
        if nextOverflow != nil {
            extra = new(mapextra)
            extra.nextOverflow = nextOverflow
        }
    }

    // 初始化 hmap
    if h == nil {
        h = (*hmap)(newobject(t.hmap))
    }
    h.count = 0
    h.B = B
    h.extra = extra
    h.flags = 0
    h.hash0 = fastrand()
    h.buckets = buckets
    h.oldbuckets = nil
    h.nevacuate = 0
    h.noverflow = 0

    return h
}


新建一個 map 最重要的就是分配內(nèi)存并初始化哈希表羡洛,在 B 不為0的情況下,還會初始化 mapextra 并且會 buckets 會被重新生成扁掸。


func makeBucketArray(t *maptype, b uint8) (buckets unsafe.Pointer, nextOverflow *bmap) {
    base := uintptr(1 << b)
    nbuckets := base
    if b >= 4 {
        nbuckets += 1 << (b - 4)
        sz := t.bucket.size * nbuckets
        up := roundupsize(sz)
        // 如果申請 sz 大小的桶翘县,系統(tǒng)只能返回 up 大小的內(nèi)存空間,那么桶的個數(shù)為 up / t.bucket.size
        if up != sz {
            nbuckets = up / t.bucket.size
        }
    }
    buckets = newarray(t.bucket, int(nbuckets))
    // 當(dāng) b > 4 并且計算出來桶的個數(shù)與 1 << b 個數(shù)不等的時候谴分,
    if base != nbuckets {
        // 此時 nbuckets 比 base 大锈麸,那么會預(yù)先分配 nbuckets - base 個 nextOverflow 桶
        nextOverflow = (*bmap)(add(buckets, base*uintptr(t.bucketsize)))
        last := (*bmap)(add(buckets, (nbuckets-1)*uintptr(t.bucketsize)))
        last.setoverflow(t, (*bmap)(buckets))
    }
    return buckets, nextOverflow
}


這里的 newarray 就已經(jīng)是 mallocgc 了。

從上述代碼里面可以看出牺蹄,只有當(dāng) B >=4 的時候忘伞,makeBucketArray 才會生成 nextOverflow 指針指向 bmap,從而在 Map 生成 hmap 的時候才會生成 mapextra 沙兰。

當(dāng) B = 3 ( B < 4 ) 的時候氓奈,初始化 hmap 只會生成8個桶。

當(dāng) B = 4 ( B >= 4 ) 的時候鼎天,初始化 hmap 的時候還會額外生成 mapextra 舀奶,并初始化 nextOverflow。mapextra 的 nextOverflow 指針會指向第16個桶結(jié)束斋射,第17個桶的首地址育勺。第17個桶(從0開始,也就是下標(biāo)為16的桶)的 bucketsize - sys.PtrSize 地址開始存一個指針罗岖,這個指針指向當(dāng)前整個桶的首地址涧至。這個指針就是 bmap 的 overflow 指針。

當(dāng) B = 5 ( B >= 4 ) 的時候桑包,初始化 hmap 的時候還會額外生成 mapextra 南蓬,并初始化 nextOverflow。這個時候就會生成總共34個桶了。同理赘方,最后一個桶大小減去一個指針的大小的地址開始存儲一個 overflow 指針烧颖。

2. 查找 Key

在 Go 中,如果字典里面查找一個不存在的 key 蒜焊,查找不到并不會返回一個 nil 倒信,而是返回當(dāng)前類型的零值。比如泳梆,字符串就返回空字符串,int 類型就返回 0 榜掌。



func mapaccess1(t *maptype, h *hmap, key unsafe.Pointer) unsafe.Pointer {
    if raceenabled && h != nil {
        // 獲取 caller 的 程序計數(shù)器 program counter
        callerpc := getcallerpc(unsafe.Pointer(&t))
        // 獲取 mapaccess1 的程序計數(shù)器 program counter
        pc := funcPC(mapaccess1)
        racereadpc(unsafe.Pointer(h), callerpc, pc)
        raceReadObjectPC(t.key, key, callerpc, pc)
    }
    if msanenabled && h != nil {
        msanread(key, t.key.size) 
    }
    if h == nil || h.count == 0 {
        return unsafe.Pointer(&zeroVal[0])
    }
    // 如果多線程讀寫优妙,直接拋出異常
    // 并發(fā)檢查 go hashmap 不支持并發(fā)訪問
    if h.flags&hashWriting != 0 {
        throw("concurrent map read and map write")
    }
    alg := t.key.alg
    // 計算 key 的 hash 值
    hash := alg.hash(key, uintptr(h.hash0))
    m := uintptr(1)<<h.B - 1
    // hash % (1<<B - 1) 求出 key 在哪個桶
    b := (*bmap)(add(h.buckets, (hash&m)*uintptr(t.bucketsize)))
    // 如果當(dāng)前還存在 oldbuckets 桶
    if c := h.oldbuckets; c != nil {
        // 當(dāng)前擴(kuò)容不是等量擴(kuò)容
        if !h.sameSizeGrow() {
            // 如果 oldbuckets 未遷移完成 則找找 oldbuckets 中對應(yīng)的 bucket(低 B-1 位)
            // 否則為 buckets 中的 bucket(低 B 位)
            // 把 mask 縮小 1 倍
            m >>= 1
        }
        
        oldb := (*bmap)(add(c, (hash&m)*uintptr(t.bucketsize)))
        if !evacuated(oldb) {
            // 如果 oldbuckets 桶存在,并且還沒有擴(kuò)容遷移憎账,就在老的桶里面查找 key 
            b = oldb
        }
    }
    // 取出 hash 值的高 8 位
    top := uint8(hash >> (sys.PtrSize*8 - 8))
    // 如果 top 小于 minTopHash套硼,就讓它加上 minTopHash 的偏移。
    // 因為 0 - minTopHash 這區(qū)間的數(shù)都已經(jīng)用來作為標(biāo)記位了
    if top < minTopHash {
        top += minTopHash
    }
    for {
        for i := uintptr(0); i < bucketCnt; i++ {
            // 如果 hash 的高8位和當(dāng)前 key 記錄的不一樣胞皱,就找下一個
            // 這樣比較很高效邪意,因為只用比較高8位,不用比較所有的 hash 值
            // 如果高8位都不相同反砌,hash 值肯定不同雾鬼,但是高8位如果相同,那么就要比較整個 hash 值了
            if b.tophash[i] != top {
                continue
            }
            // 取出 key 值的方式是用偏移量宴树,bmap 首地址 + i 個 key 值大小的偏移量
            k := add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize))
            if t.indirectkey {
                k = *((*unsafe.Pointer)(k))
            }
            // 比較 key 值是否相等
            if alg.equal(key, k) {
                // 如果找到了 key策菜,那么取出 value 值
                // 取出 value 值的方式是用偏移量,bmap 首地址 + 8 個 key 值大小的偏移量 + i 個 value 值大小的偏移量
                v := add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.valuesize))
                if t.indirectvalue {
                    v = *((*unsafe.Pointer)(v))
                }
                return v
            }
        }
        // 如果當(dāng)前桶里面沒有找到相應(yīng)的 key 酒贬,那么就去下一個桶去找
        b = b.overflow(t)
        // 如果 b == nil又憨,說明桶已經(jīng)都找完了,返回對應(yīng)type的零值
        if b == nil {
            return unsafe.Pointer(&zeroVal[0])
        }
    }
}


具體實現(xiàn)代碼如上锭吨,詳細(xì)解釋見代碼蠢莺。

如上圖,這是一個查找 key 的全過程零如。

首先計算出 key 對應(yīng)的 hash 值躏将,hash 值對 B 取余。

這里有一個優(yōu)化點埠况。m % n 這步計算耸携,如果 n 是2的倍數(shù),那么可以省去這一步取余操作辕翰。


m % n = m & ( n - 1 ) 


這樣優(yōu)化就可以省去耗時的取余操作了夺衍。這里例子中計算完取出來是 0010 ,也就是2喜命,于是對應(yīng)的是桶數(shù)組里面的第3個桶沟沙。為什么是第3個桶呢河劝?首地址指向第0個桶,往下偏移2個桶的大小矛紫,于是偏移到了第3個桶的首地址了赎瞎,具體實現(xiàn)可以看上述代碼。

hash 的低 B 位決定了桶數(shù)組里面的第幾個桶颊咬,hash 值的高8位決定了這個桶數(shù)組 bmap 里面 key 存在 tophash 數(shù)組的第幾位了务甥。如上圖,hash 的高8位用來和 tophash 數(shù)組里面的每個值進(jìn)行對比喳篇,如果高8位和 tophash[i] 不等敞临,就直接比下一個。如果相等麸澜,則取出 bmap 里面對應(yīng)完整的 key挺尿,再比較一次,看是否完全一致炊邦。

整個查找過程優(yōu)先在 oldbucket 里面找(如果存在 lodbucket 的話)编矾,找完再去新 bmap 里面找。

有人可能會有疑問馁害,為何這里要加入 tophash 多一次比較呢窄俏?

tophash 的引入是為了加速查找的。由于它只存了 hash 值的高8位蜗细,比查找完整的64位要快很多裆操。通過比較高8位,迅速找到高8位一致hash 值的索引炉媒,接下來再進(jìn)行一次完整的比較踪区,如果還一致,那么就判定找到該 key 了吊骤。

如果找到了 key 就返回對應(yīng)的 value缎岗。如果沒有找到,還會繼續(xù)去 overflow 桶繼續(xù)尋找白粉,直到找到最后一個桶传泊,如果還沒有找到就返回對應(yīng)類型的零值。

3. 插入 Key

插入 key 的過程和查找 key 的過程大體一致鸭巴。


func mapassign(t *maptype, h *hmap, key unsafe.Pointer) unsafe.Pointer {
    if h == nil {
        panic(plainError("assignment to entry in nil map"))
    }
    if raceenabled {
        // 獲取 caller 的 程序計數(shù)器 program counter
        callerpc := getcallerpc(unsafe.Pointer(&t))
        // 獲取 mapassign 的程序計數(shù)器 program counter
        pc := funcPC(mapassign)
        racewritepc(unsafe.Pointer(h), callerpc, pc)
        raceReadObjectPC(t.key, key, callerpc, pc)
    }
    if msanenabled {
        msanread(key, t.key.size)
    }
    // 如果多線程讀寫眷细,直接拋出異常
    // 并發(fā)檢查 go hashmap 不支持并發(fā)訪問
    if h.flags&hashWriting != 0 {
        throw("concurrent map writes")
    }
    alg := t.key.alg
    // 計算 key 值的 hash 值
    hash := alg.hash(key, uintptr(h.hash0))

    // 在計算完 hash 值以后立即設(shè)置 hashWriting 變量的值,如果在計算 hash 值的過程中沒有完全寫完鹃祖,可能會導(dǎo)致 panic
    h.flags |= hashWriting

    // 如果 hmap 的桶的個數(shù)為0溪椎,那么就新建一個桶
    if h.buckets == nil {
        h.buckets = newarray(t.bucket, 1)
    }

again:
    // hash 值對 B 取余,求得所在哪個桶
    bucket := hash & (uintptr(1)<<h.B - 1)
    // 如果還在擴(kuò)容中,繼續(xù)擴(kuò)容
    if h.growing() {
        growWork(t, h, bucket)
    }
    // 根據(jù) hash 值的低 B 位找到位于哪個桶
    b := (*bmap)(unsafe.Pointer(uintptr(h.buckets) + bucket*uintptr(t.bucketsize)))
    // 計算 hash 值的高 8 位
    top := uint8(hash >> (sys.PtrSize*8 - 8))
    if top < minTopHash {
        top += minTopHash
    }

    var inserti *uint8
    var insertk unsafe.Pointer
    var val unsafe.Pointer
    for {
        // 遍歷當(dāng)前桶所有鍵值校读,查找 key 對應(yīng)的 value
        for i := uintptr(0); i < bucketCnt; i++ {
            if b.tophash[i] != top {
                if b.tophash[i] == empty && inserti == nil {
                    // 如果往后找都沒有找到沼侣,這里先記錄一個標(biāo)記,方便找不到以后插入到這里
                    inserti = &b.tophash[i]
                    // 計算出偏移 i 個 key 值的位置
                    insertk = add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize))
                    // 計算出 val 所在的位置歉秫,當(dāng)前桶的首地址 + 8個 key 值所占的大小 + i 個 value 值所占的大小
                    val = add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.valuesize))
                }
                continue
            }
            // 依次取出 key 值
            k := add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize))
            // 如果 key 值是一個指針蛾洛,那么就取出改指針對應(yīng)的 key 值
            if t.indirectkey {
                k = *((*unsafe.Pointer)(k))
            }
            // 比較 key 值是否相等
            if !alg.equal(key, k) {
                continue
            }
            // 如果需要更新,那么就把 t.key 拷貝從 k 拷貝到 key
            if t.needkeyupdate {
                typedmemmove(t.key, k, key)
            }
            // 計算出 val 所在的位置雁芙,當(dāng)前桶的首地址 + 8個 key 值所占的大小 + i 個 value 值所占的大小
            val = add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.valuesize))
            goto done
        }
        ovf := b.overflow(t)
        if ovf == nil {
            break
        }
        b = ovf
    }

    // 沒有找到當(dāng)前的 key 值轧膘,并且檢查最大負(fù)載因子,如果達(dá)到了最大負(fù)載因子兔甘,或者存在很多溢出的桶
    if !h.growing() && (overLoadFactor(int64(h.count), h.B) || tooManyOverflowBuckets(h.noverflow, h.B)) {
        // 開始擴(kuò)容
        hashGrow(t, h)
        goto again // Growing the table invalidates everything, so try again
    }
    // 如果找不到一個空的位置可以插入 key 值
    if inserti == nil {
        // all current buckets are full, allocate a new one.
        // 意味著當(dāng)前桶已經(jīng)全部滿了扶供,那么就生成一個新的
        newb := h.newoverflow(t, b)
        inserti = &newb.tophash[0]
        insertk = add(unsafe.Pointer(newb), dataOffset)
        val = add(insertk, bucketCnt*uintptr(t.keysize))
    }

    // store new key/value at insert position
    if t.indirectkey {
        // 如果是存儲 key 值的指針,這里就用 insertk 存儲 key 值的地址
        kmem := newobject(t.key)
        *(*unsafe.Pointer)(insertk) = kmem
        insertk = kmem
    }
    if t.indirectvalue {
        // 如果是存儲 value 值的指針裂明,這里就用 val 存儲 key 值的地址
        vmem := newobject(t.elem)
        *(*unsafe.Pointer)(val) = vmem
    }
    // 將 t.key 從 insertk 拷貝到 key 的位置
    typedmemmove(t.key, insertk, key)
    *inserti = top
    // hmap 中保存的總 key 值的數(shù)量 + 1
    h.count++

done:
    // 禁止并發(fā)寫
    if h.flags&hashWriting == 0 {
        throw("concurrent map writes")
    }
    h.flags &^= hashWriting
    if t.indirectvalue {
        // 如果 value 里面存儲的是指針,那么取值該指針指向的 value 值
        val = *((*unsafe.Pointer)(val))
    }
    return val
}


插入 key 的過程中和查找 key 有幾點不同太援,需要注意:

    1. 如果找到要插入的 key 闽晦,只需要直接更新對應(yīng)的 value 值就好了。
    1. 如果沒有在 bmap 中沒有找到待插入的 key 提岔,這么這時分幾種情況仙蛉。
      情況一: bmap 中還有空位,在遍歷 bmap 的時候預(yù)先標(biāo)記空位碱蒙,一旦查找結(jié)束也沒有找到 key荠瘪,就把 key 放到預(yù)先遍歷時候標(biāo)記的空位上。
      情況二:bmap中已經(jīng)沒有空位了赛惩。這個時候 bmap 裝的很滿了哀墓。此時需要檢查一次最大負(fù)載因子是否已經(jīng)達(dá)到了。如果達(dá)到了喷兼,立即進(jìn)行擴(kuò)容操作篮绰。擴(kuò)容以后在新桶里面插入 key,流程和上述的一致季惯。如果沒有達(dá)到最大負(fù)載因子吠各,那么就在新生成一個 bmap,并把前一個 bmap 的 overflow 指針指向新的 bmap勉抓。
    1. 在擴(kuò)容過程中贾漏,oldbucke t是被凍結(jié)的,查找 key 時會在
      oldbucket 中查找藕筋,但不會在 oldbucket 中插入數(shù)據(jù)纵散。如果在
      oldbucket 是找到了相應(yīng)的key,做法是將它遷移到新 bmap 后加入
      evalucated 標(biāo)記。

其他流程和查找 key 基本一致困食,這里就不再贅述了边翁。

3. 刪除 Key


func mapdelete(t *maptype, h *hmap, key unsafe.Pointer) {
    if raceenabled && h != nil {
        // 獲取 caller 的 程序計數(shù)器 program counter
        callerpc := getcallerpc(unsafe.Pointer(&t))
        // 獲取 mapdelete 的程序計數(shù)器 program counter
        pc := funcPC(mapdelete)
        racewritepc(unsafe.Pointer(h), callerpc, pc)
        raceReadObjectPC(t.key, key, callerpc, pc)
    }
    if msanenabled && h != nil {
        msanread(key, t.key.size)
    }
    if h == nil || h.count == 0 {
        return
    }
    // 如果多線程讀寫,直接拋出異常
    // 并發(fā)檢查 go hashmap 不支持并發(fā)訪問
    if h.flags&hashWriting != 0 {
        throw("concurrent map writes")
    }

    alg := t.key.alg
    // 計算 key 值的 hash 值
    hash := alg.hash(key, uintptr(h.hash0))

    // 在計算完 hash 值以后立即設(shè)置 hashWriting 變量的值硕盹,如果在計算 hash 值的過程中沒有完全寫完符匾,可能會導(dǎo)致 panic
    h.flags |= hashWriting

    bucket := hash & (uintptr(1)<<h.B - 1)
    // 如果還在擴(kuò)容中,繼續(xù)擴(kuò)容
    if h.growing() {
        growWork(t, h, bucket)
    }
    // 根據(jù) hash 值的低 B 位找到位于哪個桶
    b := (*bmap)(unsafe.Pointer(uintptr(h.buckets) + bucket*uintptr(t.bucketsize)))
    // 計算 hash 值的高 8 位
    top := uint8(hash >> (sys.PtrSize*8 - 8))
    if top < minTopHash {
        top += minTopHash
    }
    for {
        // 遍歷當(dāng)前桶所有鍵值瘩例,查找 key 對應(yīng)的 value
        for i := uintptr(0); i < bucketCnt; i++ {
            if b.tophash[i] != top {
                continue
            }
            k := add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize))
            // 如果 k 是指向 key 的指針啊胶,那么這里需要取出 key 的值
            k2 := k
            if t.indirectkey {
                k2 = *((*unsafe.Pointer)(k2))
            }
            if !alg.equal(key, k2) {
                continue
            }
            // key 的指針置空
            if t.indirectkey {
                *(*unsafe.Pointer)(k) = nil
            } else {
            // 清除 key 的內(nèi)存
                typedmemclr(t.key, k)
            }
            v := unsafe.Pointer(uintptr(unsafe.Pointer(b)) + dataOffset + bucketCnt*uintptr(t.keysize) + i*uintptr(t.valuesize))
            // value 的指針置空
            if t.indirectvalue {
                *(*unsafe.Pointer)(v) = nil
            } else {
            // 清除 value 的內(nèi)存
                typedmemclr(t.elem, v)
            }
            // 清空 tophash 里面的值
            b.tophash[i] = empty
            // map 里面 key 的總個數(shù)減1
            h.count--
            goto done
        }
        // 如果沒有找到,那么就繼續(xù)查找 overflow 桶垛贤,一直遍歷到最后一個
        b = b.overflow(t)
        if b == nil {
            goto done
        }
    }

done:
    if h.flags&hashWriting == 0 {
        throw("concurrent map writes")
    }
    h.flags &^= hashWriting
}

刪除操作主要流程和查找 key 流程也差不多焰坪,找到對應(yīng)的 key 以后,如果是指針指向原來的 key聘惦,就把指針置為 nil某饰。如果是值就清空它所在的內(nèi)存。還要清理 tophash 里面的值最后把 map 的 key 總個數(shù)計數(shù)器減1 善绎。

如果在擴(kuò)容過程中黔漂,刪除操作會在擴(kuò)容以后在新的 bmap 里面刪除。

查找的過程依舊會一直遍歷到鏈表的最后一個 bmap 桶禀酱。

4. 增量翻倍擴(kuò)容

這部分算是整個 Map 實現(xiàn)比較核心的部分了炬守。我們都知道 Map 在不斷的裝載 Key 值的時候,查找效率會變的越來越低剂跟,如果此時不進(jìn)行擴(kuò)容操作的話减途,哈希沖突使得鏈表變得越來越長,性能也就越來越差曹洽。擴(kuò)容勢在必行鳍置。

但是擴(kuò)容過程中如果阻斷了 Key 值的寫入,在處理大數(shù)據(jù)的時候會導(dǎo)致有一段不響應(yīng)的時間衣洁,如果用在高實時的系統(tǒng)中墓捻,那么每次擴(kuò)容都會卡幾秒,這段時間都不能相應(yīng)任何請求坊夫。這種性能明顯是不能接受的砖第。所以要既不影響寫入,也同時要進(jìn)行擴(kuò)容环凿。這個時候就應(yīng)該增量擴(kuò)容了梧兼。

這里增量擴(kuò)容其實用途已經(jīng)很廣泛了,之前舉例的 Redis 就采用的增量擴(kuò)容策略智听。

接下來看看 Go 是怎么進(jìn)行增量擴(kuò)容的羽杰。

在 Go 的 mapassign 插入 Key 值渡紫、mapdelete 刪除 key 值的時候都會檢查當(dāng)前是否在擴(kuò)容中。


func growWork(t *maptype, h *hmap, bucket uintptr) {
    // 確保我們遷移了所有 oldbucket
    evacuate(t, h, bucket&h.oldbucketmask())

    // 再遷移一個標(biāo)記過的桶
    if h.growing() {
        evacuate(t, h, h.nevacuate)
    }
}

從這里我們可以看到考赛,每次執(zhí)行一次 growWork 會遷移2個桶惕澎。一個是當(dāng)前的桶,這算是局部遷移颜骤,另外一個是 hmap 里面指向的 nevacuate 的桶唧喉,這算是增量遷移。

在插入 Key 值的時候忍抽,如果當(dāng)前在擴(kuò)容過程中八孝,oldbucket 是被凍結(jié)的,查找時會先在 oldbucket 中查找鸠项,但不會在oldbucket中插入數(shù)據(jù)干跛。只有在 oldbucket 找到了相應(yīng)的 key,那么將它遷移到新 bucket 后加入 evalucated 標(biāo)記祟绊。

在刪除 Key 值的時候楼入,如果當(dāng)前在擴(kuò)容過程中,優(yōu)先查找 bucket牧抽,即新桶浅辙,找到一個以后把它對應(yīng)的 Key、Value 都置空阎姥。如果 bucket 里面找不到,才會去 oldbucket 中去查找鸽捻。

每次插入 Key 值的時候呼巴,都會判斷一下當(dāng)前裝載因子是否超過了 6.5,如果達(dá)到了這個極限御蒲,就立即執(zhí)行擴(kuò)容操作 hashGrow衣赶。這是擴(kuò)容之前的準(zhǔn)備工作。



func hashGrow(t *maptype, h *hmap) {
    // 如果達(dá)到了最大裝載因子厚满,就需要擴(kuò)容府瞄。
    // 不然的話,一個桶后面鏈表跟著一大堆的 overflow 桶
    bigger := uint8(1)
    if !overLoadFactor(int64(h.count), h.B) {
        bigger = 0
        h.flags |= sameSizeGrow
    }
    // 把 hmap 的舊桶的指針指向當(dāng)前桶
    oldbuckets := h.buckets
    // 生成新的擴(kuò)容以后的桶碘箍,hmap 的 buckets 指針指向擴(kuò)容以后的桶遵馆。
    newbuckets, nextOverflow := makeBucketArray(t, h.B+bigger)

    flags := h.flags &^ (iterator | oldIterator)
    if h.flags&iterator != 0 {
        flags |= oldIterator
    }
    // B 加上新的值
    h.B += bigger
    h.flags = flags
    // 舊桶指針指向當(dāng)前桶
    h.oldbuckets = oldbuckets
    // 新桶指針指向擴(kuò)容以后的桶
    h.buckets = newbuckets
    h.nevacuate = 0
    h.noverflow = 0

    if h.extra != nil && h.extra.overflow[0] != nil {
        if h.extra.overflow[1] != nil {
            throw("overflow is not nil")
        }
        // 交換 overflow[0] 和 overflow[1] 的指向
        h.extra.overflow[1] = h.extra.overflow[0]
        h.extra.overflow[0] = nil
    }
    if nextOverflow != nil {
        if h.extra == nil {
            // 生成 mapextra
            h.extra = new(mapextra)
        }
        h.extra.nextOverflow = nextOverflow
    }

    // 實際拷貝鍵值對的過程在 evacuate() 中
}


用圖表示出它的流程:

hashGrow 操作算是擴(kuò)容之前的準(zhǔn)備工作,實際拷貝的過程在 evacuate 中丰榴。

hashGrow 操作會先生成擴(kuò)容以后的新的桶數(shù)組货邓。新的桶數(shù)組的大小是之前的2倍。然后 hmap 的 buckets 會指向這個新的擴(kuò)容以后的桶四濒,而 oldbuckets 會指向當(dāng)前的桶數(shù)組换况。

處理完 hmap 以后职辨,再處理 mapextra,nextOverflow 的指向原來的 overflow 指針戈二,overflow 指針置為 null舒裤。

到此就做好擴(kuò)容之前的準(zhǔn)備工作了。


func evacuate(t *maptype, h *hmap, oldbucket uintptr) {
    b := (*bmap)(add(h.oldbuckets, oldbucket*uintptr(t.bucketsize)))
    // 在準(zhǔn)備擴(kuò)容之前桶的個數(shù)
    newbit := h.noldbuckets()
    alg := t.key.alg
    if !evacuated(b) {
        // TODO: reuse overflow buckets instead of using new ones, if there
        // is no iterator using the old buckets.  (If !oldIterator.)

        var (
            x, y   *bmap          // 在新桶里面 低位桶和高位桶
            xi, yi int            // key 和 value 值的索引值分別為 xi 觉吭, yi 
            xk, yk unsafe.Pointer // 指向 x 和 y 的 key 值的指針 
            xv, yv unsafe.Pointer // 指向 x 和 y 的 value 值的指針  
        )
        // 新桶中低位的一些桶
        x = (*bmap)(add(h.buckets, oldbucket*uintptr(t.bucketsize)))
        xi = 0
        // 擴(kuò)容以后的新桶中低位的第一個 key 值
        xk = add(unsafe.Pointer(x), dataOffset)
        // 擴(kuò)容以后的新桶中低位的第一個 key 值對應(yīng)的 value 值
        xv = add(xk, bucketCnt*uintptr(t.keysize))
        // 如果不是等量擴(kuò)容
        if !h.sameSizeGrow() {
            y = (*bmap)(add(h.buckets, (oldbucket+newbit)*uintptr(t.bucketsize)))
            yi = 0
            yk = add(unsafe.Pointer(y), dataOffset)
            yv = add(yk, bucketCnt*uintptr(t.keysize))
        }
        // 依次遍歷溢出桶
        for ; b != nil; b = b.overflow(t) {
            k := add(unsafe.Pointer(b), dataOffset)
            v := add(k, bucketCnt*uintptr(t.keysize))
            // 遍歷 key - value 鍵值對
            for i := 0; i < bucketCnt; i, k, v = i+1, add(k, uintptr(t.keysize)), add(v, uintptr(t.valuesize)) {
                top := b.tophash[i]
                if top == empty {
                    b.tophash[i] = evacuatedEmpty
                    continue
                }
                if top < minTopHash {
                    throw("bad map state")
                }
                k2 := k
                // key 值如果是指針腾供,則取出指針里面的值
                if t.indirectkey {
                    k2 = *((*unsafe.Pointer)(k2))
                }
                useX := true
                if !h.sameSizeGrow() {
                    // 如果不是等量擴(kuò)容,則需要重新計算 hash 值亏栈,不管是高位桶 x 中台腥,還是低位桶 y 中
                    hash := alg.hash(k2, uintptr(h.hash0))
                    if h.flags&iterator != 0 {
                        if !t.reflexivekey && !alg.equal(k2, k2) {
                            // 如果兩個 key 不相等,那么他們倆極大可能舊的 hash 值也不相等绒北。
                            // tophash 對要遷移的 key 值也是沒有多大意義的黎侈,所以我們用低位的 tophash 輔助擴(kuò)容,標(biāo)記一些狀態(tài)闷游。
                            // 為下一個級 level 重新計算一些新的隨機(jī)的 hash 值峻汉。以至于這些 key 值在多次擴(kuò)容以后依舊可以均勻分布在所有桶中
                            // 判斷 top 的最低位是否為1
                            if top&1 != 0 {
                                hash |= newbit
                            } else {
                                hash &^= newbit
                            }
                            top = uint8(hash >> (sys.PtrSize*8 - 8))
                            if top < minTopHash {
                                top += minTopHash
                            }
                        }
                    }
                    useX = hash&newbit == 0
                }
                if useX {
                    // 標(biāo)記低位桶存在 tophash 中
                    b.tophash[i] = evacuatedX
                    // 如果 key 的索引值到了桶最后一個,就新建一個 overflow
                    if xi == bucketCnt {
                        newx := h.newoverflow(t, x)
                        x = newx
                        xi = 0
                        xk = add(unsafe.Pointer(x), dataOffset)
                        xv = add(xk, bucketCnt*uintptr(t.keysize))
                    }
                    // 把 hash 的高8位再次存在 tophash 中
                    x.tophash[xi] = top
                    if t.indirectkey {
                        // 如果是指針指向 key 脐往,那么拷貝指針指向
                        *(*unsafe.Pointer)(xk) = k2 // copy pointer
                    } else {
                        // 如果是指針指向 key 休吠,那么進(jìn)行值拷貝
                        typedmemmove(t.key, xk, k) // copy value
                    }
                    // 同理拷貝 value
                    if t.indirectvalue {
                        *(*unsafe.Pointer)(xv) = *(*unsafe.Pointer)(v)
                    } else {
                        typedmemmove(t.elem, xv, v)
                    }
                    // 繼續(xù)遷移下一個
                    xi++
                    xk = add(xk, uintptr(t.keysize))
                    xv = add(xv, uintptr(t.valuesize))
                } else {
                    // 這里是高位桶 y,遷移過程和上述低位桶 x 一致业簿,下面就不再贅述了
                    b.tophash[i] = evacuatedY
                    if yi == bucketCnt {
                        newy := h.newoverflow(t, y)
                        y = newy
                        yi = 0
                        yk = add(unsafe.Pointer(y), dataOffset)
                        yv = add(yk, bucketCnt*uintptr(t.keysize))
                    }
                    y.tophash[yi] = top
                    if t.indirectkey {
                        *(*unsafe.Pointer)(yk) = k2
                    } else {
                        typedmemmove(t.key, yk, k)
                    }
                    if t.indirectvalue {
                        *(*unsafe.Pointer)(yv) = *(*unsafe.Pointer)(v)
                    } else {
                        typedmemmove(t.elem, yv, v)
                    }
                    yi++
                    yk = add(yk, uintptr(t.keysize))
                    yv = add(yv, uintptr(t.valuesize))
                }
            }
        }
        // Unlink the overflow buckets & clear key/value to help GC.
        if h.flags&oldIterator == 0 {
            b = (*bmap)(add(h.oldbuckets, oldbucket*uintptr(t.bucketsize)))
            // Preserve b.tophash because the evacuation
            // state is maintained there.
            if t.bucket.kind&kindNoPointers == 0 {
                memclrHasPointers(add(unsafe.Pointer(b), dataOffset), uintptr(t.bucketsize)-dataOffset)
            } else {
                memclrNoHeapPointers(add(unsafe.Pointer(b), dataOffset), uintptr(t.bucketsize)-dataOffset)
            }
        }
    }

    // Advance evacuation mark
    if oldbucket == h.nevacuate {
        h.nevacuate = oldbucket + 1
        // Experiments suggest that 1024 is overkill by at least an order of magnitude.
        // Put it in there as a safeguard anyway, to ensure O(1) behavior.
        stop := h.nevacuate + 1024
        if stop > newbit {
            stop = newbit
        }
        for h.nevacuate != stop && bucketEvacuated(t, h, h.nevacuate) {
            h.nevacuate++
        }
        if h.nevacuate == newbit { // newbit == # of oldbuckets
            // Growing is all done. Free old main bucket array.
            h.oldbuckets = nil
            // Can discard old overflow buckets as well.
            // If they are still referenced by an iterator,
            // then the iterator holds a pointers to the slice.
            if h.extra != nil {
                h.extra.overflow[1] = nil
            }
            h.flags &^= sameSizeGrow
        }
    }
}


上述函數(shù)就是遷移過程最核心的拷貝工作了瘤礁。

整個遷移過程并不難。這里需要說明的是 x 梅尤,y 代表的意義柜思。由于擴(kuò)容以后,新的桶數(shù)組是原來桶數(shù)組的2倍巷燥。用 x 代表新的桶數(shù)組里面低位的那一半赡盘,用 y 代表高位的那一半。其他的變量就是一些標(biāo)記了缰揪,游標(biāo)和標(biāo)記 key - value 原來所在的位置陨享。詳細(xì)的見代碼注釋。

上圖中表示了遷移開始之后的過程钝腺∨坠茫可以看到舊的桶數(shù)組里面的桶在遷移到新的桶中,并且新的桶也在不斷的寫入新的 key 值艳狐。

一直拷貝鍵值對途戒,直到舊桶中所有的鍵值都拷貝到了新的桶中。

最后一步就是釋放舊桶僵驰,oldbuckets 的指針置為 null喷斋。到此唁毒,一次遷移過程就完全結(jié)束了。

5. 等量擴(kuò)容

嚴(yán)格意義上這種方式并不能算是擴(kuò)容星爪。但是函數(shù)名是 Grow浆西,姑且暫時就這么叫吧。

在 go1.8 的版本開始顽腾,添加了 sameSizeGrow近零,當(dāng) overflow buckets
的數(shù)量超過一定數(shù)量 (2^B) 但裝載因子又未達(dá)到 6.5 的時候,此時可能存在部分空的bucket抄肖,即 bucket 的使用率低久信,這時會觸發(fā)sameSizeGrow,即 B 不變漓摩,但走數(shù)據(jù)遷移流程裙士,將 oldbuckets 的數(shù)據(jù)重新緊湊排列提高 bucket 的利用率。當(dāng)然在 sameSizeGrow 過程中管毙,不會觸發(fā) loadFactorGrow腿椎。

四. Map 實現(xiàn)中的一些優(yōu)化

讀到這里,相信讀者心里應(yīng)該很清楚如何設(shè)計并實現(xiàn)一個 Map 了吧夭咬。包括 Map 中的各種操作的實現(xiàn)啃炸。在探究如何實現(xiàn)一個線程安全的 Map 之前,先把之前說到個一些亮點優(yōu)化點卓舵,小結(jié)一下南用。

在 Redis 中,采用增量式擴(kuò)容的方式處理哈希沖突掏湾。當(dāng)平均查找長度超過 5 的時候就會觸發(fā)增量擴(kuò)容操作训枢,保證 hash 表的高性能。

同時 Redis 采用頭插法忘巧,保證插入 key 值時候的性能。

在 Java 中睦刃,當(dāng)桶的個數(shù)超過了64個以后砚嘴,并且沖突節(jié)點為8或者大于8,這個時候就會觸發(fā)紅黑樹轉(zhuǎn)換涩拙。這樣能保證鏈表在很長的情況下际长,查找長度依舊不會太長,并且紅黑樹保證最差情況下也支持 O(log n) 的時間復(fù)雜度兴泥。

Java 在遷移之后有一個非常好的設(shè)計工育,只需要比較遷移之后桶個數(shù)的最高位是否為0,如果是0搓彻,key 在新桶內(nèi)的相對位置不變如绸,如果是1嘱朽,則加上桶的舊的桶的個數(shù) oldCap 就可以得到新的位置。

在 Go 中優(yōu)化的點比較多:

  1. 哈希算法選用高效的 memhash 算法 和 CPU AES指令集怔接。AES 指令集充分利用 CPU 硬件特性,計算哈希值的效率超高。
  2. key - value 的排列設(shè)計成 key 放在一起冰单,value 放在一起肖抱,而不是key,value成對排列瓦侮。這樣方便內(nèi)存對齊艰赞,數(shù)據(jù)量大了以后節(jié)約內(nèi)存對齊造成的一些浪費。
  3. key肚吏,value 的內(nèi)存大小超過128字節(jié)以后自動轉(zhuǎn)成存儲一個指針方妖。
  4. tophash 數(shù)組的設(shè)計加速了 key 的查找過程。tophash 也被復(fù)用须喂,用來標(biāo)記擴(kuò)容操作時候的狀態(tài)吁断。
  5. 用位運算轉(zhuǎn)換求余操作,m % n 坞生,當(dāng) n = 1 << B 的時候仔役,可以轉(zhuǎn)換成 m & (1<<B - 1) 。
  6. 增量式擴(kuò)容是己。
  7. 等量擴(kuò)容又兵,緊湊操作。
  8. Go 1.9 版本以后卒废,Map 原生就已經(jīng)支持線程安全沛厨。(在下一章中重點討論這個問題)

當(dāng)然 Go 中還有一些需要再優(yōu)化的地方:

  1. 在遷移的過程中,當(dāng)前版本不會重用 overflow 桶摔认,而是直接重新申請一個新的桶逆皮。這里可以優(yōu)化成優(yōu)先重用沒有指針指向的 overflow 桶,當(dāng)沒有可用的了参袱,再去申請一個新的电谣。這一點作者已經(jīng)寫在了 TODO 里面了。
  2. 動態(tài)合并多個 empty 的桶抹蚀。
  3. 當(dāng)前版本中沒有 shrink 操作剿牺,Map 只能增長而不能收縮。這塊 Redis 有相關(guān)的實現(xiàn)环壤。

Reference:
《算法與數(shù)據(jù)結(jié)構(gòu)》
《Redis 設(shè)計與實現(xiàn)》
xxHash
字符串hash函數(shù)
General Purpose Hash Function Algorithms
Java 8系列之重新認(rèn)識HashMap

GitHub Repo:Halfrost-Field

Follow: halfrost · GitHub

Source: https://halfrost.com/go_map_chapter_one/

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末晒来,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子郑现,更是在濱河造成了極大的恐慌湃崩,老刑警劉巖荧降,帶你破解...
    沈念sama閱讀 206,378評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異竹习,居然都是意外死亡誊抛,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,356評論 2 382
  • 文/潘曉璐 我一進(jìn)店門整陌,熙熙樓的掌柜王于貴愁眉苦臉地迎上來拗窃,“玉大人,你說我怎么就攤上這事泌辫∷婵洌” “怎么了?”我有些...
    開封第一講書人閱讀 152,702評論 0 342
  • 文/不壞的土叔 我叫張陵震放,是天一觀的道長宾毒。 經(jīng)常有香客問我,道長殿遂,這世上最難降的妖魔是什么诈铛? 我笑而不...
    開封第一講書人閱讀 55,259評論 1 279
  • 正文 為了忘掉前任,我火速辦了婚禮墨礁,結(jié)果婚禮上幢竹,老公的妹妹穿的比我還像新娘。我一直安慰自己恩静,他們只是感情好焕毫,可當(dāng)我...
    茶點故事閱讀 64,263評論 5 371
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著驶乾,像睡著了一般邑飒。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上级乐,一...
    開封第一講書人閱讀 49,036評論 1 285
  • 那天疙咸,我揣著相機(jī)與錄音,去河邊找鬼风科。 笑死撒轮,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的丐重。 我是一名探鬼主播,決...
    沈念sama閱讀 38,349評論 3 400
  • 文/蒼蘭香墨 我猛地睜開眼杆查,長吁一口氣:“原來是場噩夢啊……” “哼扮惦!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起亲桦,我...
    開封第一講書人閱讀 36,979評論 0 259
  • 序言:老撾萬榮一對情侶失蹤崖蜜,失蹤者是張志新(化名)和其女友劉穎浊仆,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體豫领,經(jīng)...
    沈念sama閱讀 43,469評論 1 300
  • 正文 獨居荒郊野嶺守林人離奇死亡抡柿,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 35,938評論 2 323
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了等恐。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片洲劣。...
    茶點故事閱讀 38,059評論 1 333
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖课蔬,靈堂內(nèi)的尸體忽然破棺而出囱稽,到底是詐尸還是另有隱情,我是刑警寧澤二跋,帶...
    沈念sama閱讀 33,703評論 4 323
  • 正文 年R本政府宣布战惊,位于F島的核電站,受9級特大地震影響扎即,放射性物質(zhì)發(fā)生泄漏吞获。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 39,257評論 3 307
  • 文/蒙蒙 一谚鄙、第九天 我趴在偏房一處隱蔽的房頂上張望各拷。 院中可真熱鬧,春花似錦襟锐、人聲如沸撤逢。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,262評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽蚊荣。三九已至,卻和暖如春莫杈,著一層夾襖步出監(jiān)牢的瞬間互例,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,485評論 1 262
  • 我被黑心中介騙來泰國打工筝闹, 沒想到剛下飛機(jī)就差點兒被人妖公主榨干…… 1. 我叫王不留媳叨,地道東北人。 一個月前我還...
    沈念sama閱讀 45,501評論 2 354
  • 正文 我出身青樓关顷,卻偏偏與公主長得像糊秆,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子议双,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 42,792評論 2 345

推薦閱讀更多精彩內(nèi)容