字典在 Redis 中是一個非常重要的數(shù)據(jù)結構,因為 Redis 本身就是一個鍵值數(shù)據(jù)庫。我們先來回顧下在 Redis 源碼學習之基本數(shù)據(jù)結構 中提到的 Redis 字典實現(xiàn)的一些特點:
- 支持海量
<key, value>
存儲纵寝; - 使用漸進式 Rehash 策略,避免因為需要遷移的 buckets 太多導致阻塞時間過久(Redis 核心處理邏輯是單線程模型);
- 默認使用 SipHash 算法計算鍵的 hash 值驯耻;
- 對于哈希沖突問題发皿,采用了常見的鏈地址法崔慧,且新加入的節(jié)點會插入到鏈表的頭部;
- 字典內部維護了兩張哈希表穴墅,其中第二個哈希表會在擴容期間(Rehash)使用惶室;
- 提供了安全和非安全的迭代器,方便對整個字典進行迭代玄货。
在看了 Redis 字典源碼皇钞,搞懂它的工作原理后,有沒有想要自己實現(xiàn)下呢松捉?所以夹界,本文將介紹如何使用 Go 語言來山寨一個 Redis 字典實現(xiàn),雖然「容貌」有異隘世,但「內核」還是基本一致的可柿。為了簡單起見,我們在實現(xiàn)的時候先不考慮 goroutine 安全問題丙者,焦點放在 Redis 字典實現(xiàn)的核心思想上晾蜘。所以后面的實現(xiàn),都假設只有一個 goroutine 在對字典進行操作南缓。由于 Go 語言自帶 GC和泌,所以使用它來實現(xiàn)就不用煩心內存管理的問題了(在 Redis dict.c
實現(xiàn)中,還有很多代碼是涉及內存申請和釋放的)滥沫,這樣就能讓我們更加容易地理解核心的實現(xiàn)策略侣集。
一點說明
正所謂「入鄉(xiāng)隨俗」嘛,所以在使用 Go 語言實現(xiàn)的字典中兰绣,并沒有照搬原先 Redis 中字典的接口世分,而是提供了一組類似于標準庫 sync.Map
的接口。
另外缀辩,什么樣的 key 可以作為字典的鍵呢臭埋?首先,必須要是方便計算哈希值的臀玄;其次瓢阴,方便進行直接比較。我們知道在 Redis 的字典實現(xiàn)中提供了一組接口健无,供實際使用字典存儲的數(shù)據(jù)類型實現(xiàn)荣恐。之所以這樣做,也是為了更好的擴展性。
typedef struct dictType {
uint64_t (*hashFunction)(const void *key);
void *(*keyDup)(void *privdata, const void *key);
void *(*valDup)(void *privdata, const void *obj);
int (*keyCompare)(void *privdata, const void *key1, const void *key2);
void (*keyDestructor)(void *privdata, void *key);
void (*valDestructor)(void *privdata, void *obj);
} dictType;
不過為了簡單起見叠穆,在使用 Go 語言實現(xiàn)時字典時少漆,傳入的 key
和 value
均為 interface{}
類型,并沒有強制的接口實現(xiàn)要求硼被。另外示损,針對 key
將只支持 string
和 int
類型及其變種。這種可以滿足基本的使用場景嚷硫,同時也能夠擁有和 sync.Map
一樣的接口簽名检访。
最后,來看下字典的接口設計:
// Store 向字典中添加新的 key-value
func (d *Dict) Store(key interface{}, value interface{})
// Load 從字典中獲取指定 key 對應的值
func (d *Dict) Load(key interface{}) (value interface{}, ok bool)
// LoadOrStore 用于根據(jù)指定 key 先查找對應值仔掸,如果存在則返回對應值脆贵;
// 否則,會將給定 key-value 存儲到字典中嘉汰,并返回傳入的 value丹禀。
func (d *Dict) LoadOrStore(key, value interface{}) (actual interface{}, loaded bool)
// Delete 刪除指定的 key
func (d *Dict) Delete(key interface{})
// Len 返回字典的元素個數(shù)
func (d *Dict) Len() uint64
// Cap 返回字典的容量
func (d *Dict) Cap() uint64
// Range 模擬 Redis 字典普通迭代器行為,不支持非安全的操作
func (d *Dict) Range(fn func(key, value interface{}) bool)
// RangeSafely 模擬 Redis 字典安全迭代器行為鞋怀,迭代期間不做 rehash 操作
func (d *Dict) RangeSafely(fn func(key, value interface{}) bool)
// Resize 用于調整字典容量(擴容或縮容双泪,但是 rehash 還是漸進式的)
func (d *Dict) Resize() error
// RehashForAWhile 執(zhí)行一段時間的 rehash
func (d *Dict) RehashForAWhile(duration time.Duration)
實現(xiàn)細節(jié)
數(shù)據(jù)結構
type Dict struct {
hashTables []*hashTable
rehashIdx int64
iterators uint64
}
type hashTable struct {
buckets []*entry
size uint64
sizemask uint64
used uint64
}
type entry struct {
key, value interface{}
next *entry
}
字典初始化
// New 實例化一個字典。
func New() *Dict {
return &Dict{
// 初始化的時候密似,準備兩張哈希表焙矛,默認使用哈希表 1
// 在進行擴容時,會將哈希表 1 中的所有元素遷移到
// 哈希表 2残腌。
hashTables: []*hashTable{{}, {}},
rehashIdx: -1,
iterators: 0,
}
}
在哈希表中查找指定的鍵
下面這個函數(shù)將基于指定的 key 計算出對應的 hash 值(使用 SipHash 算法村斟,Redis 字典中默認使用的哈希算法),并且通過查詢哈希表來確定對應的 key 是否存在于字典中抛猫。這個函數(shù)比較重要蟆盹,在后面的 Load
和 Store
函數(shù)中都有應用,下面來看看它的具體實現(xiàn)吧:
// keyIndex 基于指定的 key 獲得對應的 bucket 索引
// 如果 key 已經存在于字典中闺金,則直接返回關聯(lián)的 entry
func (d *Dict) keyIndex(key interface{}) (idx uint64, existed *entry) {
hash := SipHash(key)
for i := 0; i < 2; i++ {
ht := d.hashTables[i]
idx = ht.sizemask & hash
for ent := ht.buckets[idx]; ent != nil; ent = ent.next {
if ent.key == key {
return idx, ent
}
}
if !d.isRehashing() {
break
}
}
// 如果字典處于 rehashing 中逾滥,上面的循環(huán)可以保證最后的 idx 一定位于
// 第二個哈希表,從而保證依賴該接口的地方存儲的新鍵一定進入到新的哈希表
return idx, nil
}
查詢鍵值對
// Load 從字典中加載指定的 key 對應的值败匹。
func (d *Dict) Load(key interface{}) (value interface{}, ok bool) {
if d.isRehashing() {
d.rehashStep()
}
_, existed := d.keyIndex(key)
if existed != nil {
return existed.value, true
}
return nil, false
}
存儲鍵值對
// Store 向字典中添加 key-value寨昙。
func (d *Dict) Store(key interface{}, value interface{}) {
ent, loaded := d.loadOrStore(key, value)
if loaded {
ent.value = value // 直接更新 value 即可
} // 否則,上述函數(shù)調用會自動添加 (key, value) 到字典中
}
// loadOrStore 先嘗試使用 key 查找掀亩,如果查找到則直接返回對應 entry舔哪,
// 否則,會添加新的 entry 到字典中槽棍,同時返回 nil捉蚤,表示之前不存在抬驴。
func (d *Dict) loadOrStore(key, value interface{}) (ent *entry, loaded bool) {
if d.isRehashing() {
d.rehashStep()
}
_ = d.expandIfNeeded() // 這里簡單起見,假設一定是可以擴容成功的缆巧,忽略了錯誤
idx, existed := d.keyIndex(key)
ht := d.hashTables[0]
if d.isRehashing() {
ht = d.hashTables[1]
}
if existed != nil {
return existed, true
} else {
// 否則怎爵,需要在指定 bucket 添加新的 entry
// 對于哈希沖突的情況,采用鏈地址法盅蝗,在插入新的 entry 時,
// 采用頭插法姆蘸,保證最近添加的在最前面
entry := &entry{key: key, value: value, next: ht.buckets[idx]}
ht.buckets[idx] = entry
ht.used++
}
return nil, false
}
刪除鍵值對
刪除操作值得一提的是墩莫,在查找到要刪除的 Entry 后,需要記得調整哈希桶的頭指針逞敷,可能被刪除的 Entry 恰好就是頭節(jié)點狂秦。代碼實現(xiàn)比較簡單,如下:
// Delete 從字典中刪除指定的 key推捐,如果 key 不存在裂问,則什么也
// 不做。
// 實現(xiàn)描述:
// 1. 遍歷哈希表牛柒,定位到對應的 buckets
// 2. 刪除 buckets 中匹配的 entry堪簿。
func (d *Dict) Delete(key interface{}) {
if d.Len() == 0 {
// 不要做無畏的掙扎!
return
}
if d.isRehashing() {
d.rehashStep()
}
hash := SipHash(key)
for i := 0; i < 2; i++ {
ht := d.hashTables[i]
idx := ht.sizemask & hash
var prevEntry *entry
for ent := ht.buckets[idx]; ent != nil; ent = ent.next {
if ent.key == key {
// 此時需要釋放 ent 節(jié)點
if prevEntry != nil {
prevEntry.next = ent.next
} else {
// 說明待釋放的節(jié)點是頭節(jié)點皮壁,需要調整 buckets[idx] 指向下一個節(jié)點
ht.buckets[idx] = ent.next
}
ent.next = nil
ht.used--
return
}
prevEntry = ent
}
if !d.isRehashing() {
break
}
}
}
擴容和縮容
在給字典添加鍵值對時椭更,會調用 loadOrStore
方法,而在該方法內部調用了一次 d.expandIfNeeded()
方法嘗試給字典按需擴容蛾魄。那么虑瀑,字典擴容的時機是什么呢?擴容的策略又是怎樣的呢滴须?且看源碼:
const (
_initialHashtableSize uint64 = 4
)
func (d *Dict) expandIfNeeded() error {
if d.isRehashing() {
// 此時表明擴容已經成功舌狗,正在進行遷移(rehash)
return nil
}
if d.hashTables[0].size == 0 {
// 第一次擴容,需要一定的空間存放新的 keys
return d.resizeTo(_initialHashtableSize)
}
// 否則扔水,根據(jù)負載因子判斷是否需要進行擴容
// 擴容策略簡單粗暴痛侍,至少要是已有元素個數(shù)的二倍
if d.hashTables[0].used == d.hashTables[0].size {
return d.resizeTo(d.hashTables[0].used * 2)
}
return nil
}
func (d *Dict) resizeTo(size uint64) error {
// 這里主要是要保證擴容大小符合要求,至少要比現(xiàn)有元素個數(shù)多
if d.isRehashing() || d.hashTables[0].used > size {
return errors.New("failed to resize")
}
size = d.nextPower(size)
if size == d.hashTables[0].size {
return nil
}
// 準備開始擴容
var ht *hashTable
if d.hashTables[0].size == 0 {
// 第一次執(zhí)行擴容铭污,給 ht[0] 準備好恋日,接下來 keys 可以直接放進來
ht = d.hashTables[0]
} else {
ht = d.hashTables[1]
// 表明需要開始進一步擴容,遷移 ht[0] -> ht[1]
d.rehashIdx = 0
}
ht.size = size
ht.sizemask = size - 1
ht.buckets = make([]*entry, ht.size)
return nil
}
// nextPower 找到匹配 size 的擴容大小
// 2^2 -> 2^3 -> 2^4 -> 2^5 -> ...
func (d *Dict) nextPower(size uint64) uint64 {
if size >= math.MaxUint64 {
return math.MaxUint64
}
i := _initialHashtableSize
for i < size {
i <<= 1 // i*= 2
}
return i
}
我們知道了擴容是何時進行的了嘹狞, 但是看起來并沒有在刪除元素時執(zhí)行縮容操作呢岂膳?那縮容會在什么時候執(zhí)行呢?在 Redis 中磅网,是由字典的使用者來確定縮容的時機的谈截,比如在刪除鍵值對后,或者在 serverCron
中執(zhí)行(具體調用鏈路為:serverCron->databasesCron->tryResizeHashTables->dictResize
)。該方法的實現(xiàn)很簡單簸喂,用 Go 語言表達如下:
// Resize 讓字典擴容或者縮容到一定大小毙死。
// 注意,這里只是會準備好用于擴容的第二個哈希表喻鳄,但真正的遷移還是分散
// 在多次 Rehash 操作中扼倘。
func (d *Dict) Resize() error {
if d.isRehashing() {
return errors.New("dict is rehashing")
}
size := d.hashTables[0].used
if size < _initialHashtableSize {
size = _initialHashtableSize
}
return d.resizeTo(size)
}
漸進式 rehash
漸進式 Rehash 的思想很簡單,就是將大量的工作分成很多步完成除呵。在上面的源碼中可以看到再菊,Load
, Store
, Delete
方法中,都有調用 d.rehashStep()
颜曾,進而又會調用 d.rehash(1)
纠拔。下面我們來看看漸進式 Rehash 是怎么實現(xiàn)的:
// rehash 實現(xiàn)漸進式 Rehash 策略》汉溃基本思想就是稠诲,每次對最多
// steps 個 buckets 進行遷移。另外诡曙,考慮到可能舊的哈希表中
// 會連續(xù)遇到較多的空 buckets臀叙,導致耗費時間不受限制,這里還
// 限定最多遇到 10 * steps 個空 buckets 就退出岗仑。
func (d *Dict) rehash(steps uint64) (finished bool) {
if !d.isRehashing() {
return true
}
maxEmptyBucketsMeets := 10 * steps
src, dst := d.hashTables[0], d.hashTables[1]
for ; steps > 0 && src.used != 0; steps-- {
// 掃描哈希表直到遇到非空的 bucket
for src.buckets[d.rehashIdx] == nil {
d.rehashIdx++
maxEmptyBucketsMeets--
if maxEmptyBucketsMeets <= 0 {
return false
}
}
// 把整個 bucket 上所有的 entry 都遷移走
for ent := src.buckets[d.rehashIdx]; ent != nil; {
next := ent.next
idx := SiphHash(ent.key) & dst.sizemask
ent.next = dst.buckets[idx]
dst.buckets[idx] = ent
src.used--
dst.used++
ent = next
}
src.buckets[d.rehashIdx] = nil // 清空舊的 bucket
d.rehashIdx++
}
// 如果遷移完畢匹耕,需要將 ht[0] 指向遷移后的哈希表
if src.used == 0 {
d.hashTables[0] = dst
d.hashTables[1] = &hashTable{}
d.rehashIdx = -1
return true
}
return false
}
字典迭代器
迭代器的數(shù)據(jù)結構定義如下:
// iterator 實現(xiàn)了一個對字典的迭代器。
// 不過考慮到我們將為字典提供 `Range` 方法荠雕,故該迭代器就不往外暴露了稳其。
type iterator struct {
d *Dict
tableIndex int
safe bool
fingerprint int64
entry *entry
bucketIndex uint64
waitFirstIteration bool
}
func newIterator(d *Dict, safe bool) *iterator {
return &iterator{
d: d,
safe: safe,
waitFirstIteration: true,
}
}
在 Redis 的字典中,提供了兩種類型的迭代器炸卑,分別通過 dictGetIterator
和 dictGetSafeIterator
獲得既鞠。普通迭代器只能執(zhí)行和字典關聯(lián)的 dictNext
方法,不允許執(zhí)行 dictFind
盖文,dictAdd
等操作嘱蛋,這主要是因為這些操作可能會引起 Rehash,從而導致在迭代期間可能會掃描到重復的鍵值對(比如在執(zhí)行 Rehash 期間五续,某些鍵值對被遷移到了新的哈希表洒敏,但是我們是優(yōu)先掃描第一個哈希表,然后再掃描第二個哈希表疙驾,而此時可能會遇到之前掃描過的元素)凶伙。當然,Redis 的普通迭代器是沒法阻止你在迭代期間執(zhí)行不安全的操作的它碎,但是它會通過計算迭代前后字典的指紋信息函荣,并在最后進行比對显押,若指紋不匹配,則無法通過 assert(iter->fingerprint == dictFingerprint(iter->d))
斷言傻挂。
那么安全迭代器又是如何做到可以允許 dictFind
和 dictAdd
等操作執(zhí)行的呢乘碑?其實它是通過阻止字典 rehash
實現(xiàn)的,正因為如此金拒,它才可以放心大膽地掃描哈希表中的 Entries兽肤,而不用擔心遇到重復的 Entries。在上面的代碼中可以看到绪抛,在 Load
轿衔、Store
和 Delete
中都有直接或間接地調用 d.rehashStep()
方法,它的實現(xiàn)如下:
func (d *Dict) rehashStep() {
if d.iterators == 0 {
d.rehash(1)
}
}
最后睦疫,我們來看看迭代器最重要的 next()
方法實現(xiàn),就可以看到安全迭代器和普通迭代器的區(qū)別了:
// next 會依次掃描字典中哈希表的所有 buckets鞭呕,并將其中的 entry 一一返回蛤育。
// 如果字典正在 rehash,那么會在掃描完哈希表 1 后葫松,繼續(xù)掃描哈希表 2瓦糕。需要
// 注意的是,如果在迭代期間腋么,繼續(xù)向字典中添加數(shù)據(jù)可能沒法被掃描到咕娄!
func (it *iterator) next() *entry {
for {
if it.entry == nil {
if it.waitFirstIteration {
// 第一次迭代,要做點特別的事情~
if it.safe {
// 告訴 dict珊擂,有正在運行的安全迭代器圣勒,進而阻止某些操作時的 Rehash 操作
it.d.iterators++
} else {
it.fingerprint = it.d.fingerprint()
}
it.waitFirstIteration = false
}
ht := it.d.hashTables[it.tableIndex]
if it.bucketIndex >= ht.size {
if !it.d.isRehashing() || it.tableIndex != 0 {
return nil
}
// 切換到第二個哈希表繼續(xù)掃描
it.tableIndex = 1
it.bucketIndex = 0
ht = it.d.hashTables[1]
}
it.entry = ht.buckets[it.bucketIndex]
it.bucketIndex++
} else {
it.entry = it.entry.next
}
if it.entry != nil {
return it.entry
}
}
}
func (it *iterator) release() {
if it.safe {
it.d.iterators--
} else {
fp := it.d.fingerprint()
if fp != it.fingerprint {
panic("operations like 'LoadOrStore', 'Load' or 'Delete' are not safe for an unsafe iterator")
}
}
}
使用示例
func main() {
d := dict.New()
d.Store("hello", "world")
d.Store(100, 200)
fmt.Println(d.Load("hello"))
fmt.Println(d.LoadOrStore("language", "Eng"))
d.Range(func(key, value interface{}) bool {
fmt.Println(key, "=>", value)
return true
})
_ = d.Resize()
d.RehashForAWhile(1 * time.Microsecond)
}
總結
好啦,關于 Redis 字典的實現(xiàn)介紹就到此為止啦摧扇。相信看完上面的代碼后圣贸,應該可以了解到 Redis 字典的擴容機制、漸進式 Rehash 策略扛稽,以及哈希沖突解決方案吁峻。完整的實現(xiàn)代碼及其單元測試參見 go-redis-dict。
聲明
- 本文鏈接: http://ifaceless.space/2019/12/17/implement-redis-dict-in-go/
- 版權聲明:本博客所有文章除特別聲明外在张,均采用 CC BY-NC-SA 3.0 許可協(xié)議用含。轉載請注明出處!