Hash算法
1. Memcached Hash介紹
? 我們在前面的文章中已經(jīng)介紹過了Memcached的內(nèi)存管理方式,LRU的策略邓梅。由于Memcached的數(shù)據(jù)存儲方式基本上是基于雙向鏈表來實現(xiàn)的围苫,而鏈表實現(xiàn)的最大好處在于可以快速的進行增刪改冲杀,但其最大的不足在于其數(shù)據(jù)的獲取只能通過遍歷鏈表的方式來進行公罕。而Memcached使用了Hash算法來進行數(shù)據(jù)的快速讀取。
2. Hash算法
? Memcached的Hash算法原理上非常簡單韧拒。我們用下面的圖來說明。
? 這個數(shù)據(jù)結(jié)構(gòu)跟我們熟知的HashMap是一致的十性,數(shù)據(jù)hash到不同的桶中叛溢,當(dāng)Hash發(fā)生沖突的時候,采用了鏈表來記錄相同Hash值的數(shù)據(jù)劲适。使用Hash算法最重要的一點是如何解決Hash沖突楷掉,Memcached采用的鏈表來解決Hash沖突是較為基本的方式。這種方式的缺陷是當(dāng)數(shù)據(jù)量增多霞势,Hash沖突增多時烹植,會發(fā)生鏈表過長的情況。Memcached在這種情況下愕贡,會采用擴大桶數(shù)量的方式來優(yōu)化草雕。Memcached的Hash算法本身并不復(fù)雜,這里也不再花大篇幅來介紹其Hash算法固以。
3. 源代碼分析
? 首先我們來看看Memcached的Hash算法:
unsigned int hashpower = HASHPOWER_DEFAULT;
/* 這里的hash算法采用的還是按位與的方式來定位Bucket墩虹,1<<(n)表示hash桶的數(shù)量 */
#define hashsize(n) ((ub4)1<<(n))
/* 這里是Hash的掩碼,數(shù)據(jù)的hash值與掩碼取與操作可以定位到唯一的Hash桶 */
#define hashmask(n) (hashsize(n)-1)
? 下面我們來看看Memcached的增刪查找操作:
/* hash列表中Item元素的查找 */
item *assoc_find(const char *key, const size_t nkey, const uint32_t hv) {
item *it;
unsigned int oldbucket;
/* 這一步是找到hash的桶號 */
if (expanding &&
/* 在Hash列表進行rehash的時候,是按照桶號順序進行的败晴,所以如果該桶號>=目前正在處理的桶號時浓冒,意味著該數(shù)據(jù)還是舊Hash表中*/
(oldbucket = (hv & hashmask(hashpower - 1))) >= expand_bucket)
{
it = old_hashtable[oldbucket];
} else {
it = primary_hashtable[hv & hashmask(hashpower)];
}
item *ret = NULL;
int depth = 0;
/* 這一步是Hash沖突列表的遍歷查找 */
while (it) {
/* Item值匹配的標準:
1. key的長度相等
2. key值相等
*/
if ((nkey == it->nkey) && (memcmp(key, ITEM_key(it), nkey) == 0)) {
ret = it;
break;
}
it = it->h_next;
++depth;
}
MEMCACHED_ASSOC_FIND(key, nkey, depth);
return ret;
}
/* 該方法是插入操作,該Key值必須是不存在才行 */
int assoc_insert(item *it, const uint32_t hv) {
unsigned int oldbucket;
/* 這一步是找到該數(shù)據(jù)應(yīng)存儲的桶號 */
if (expanding &&
(oldbucket = (hv & hashmask(hashpower - 1))) >= expand_bucket)
{
it->h_next = old_hashtable[oldbucket];
old_hashtable[oldbucket] = it;
} else {
it->h_next = primary_hashtable[hv & hashmask(hashpower)];
primary_hashtable[hv & hashmask(hashpower)] = it;
}
pthread_mutex_lock(&hash_items_counter_lock);
hash_items++;
/* 進行rehash的條件判斷尖坤,滿足rehash的條件如下:
1. 目前不是正處在rehash中
2. hash表中的所有數(shù)據(jù)量>hash表容量的1.5倍
*/
if (! expanding && hash_items > (hashsize(hashpower) * 3) / 2) {
assoc_start_expand();
}
pthread_mutex_unlock(&hash_items_counter_lock);
MEMCACHED_ASSOC_INSERT(ITEM_key(it), it->nkey, hash_items);
return 1;
}
/* hash表中元素的刪除 */
void assoc_delete(const char *key, const size_t nkey, const uint32_t hv) {
/* 指針的指針稳懒,要刪除元素的地址指針*/
item **before = _hashitem_before(key, nkey, hv);
if (*before) {
item *nxt;
pthread_mutex_lock(&hash_items_counter_lock);
hash_items--;
pthread_mutex_unlock(&hash_items_counter_lock);
/* The DTrace probe cannot be triggered as the last instruction
* due to possible tail-optimization by the compiler
*/
MEMCACHED_ASSOC_DELETE(key, nkey, hash_items);
nxt = (*before)->h_next;
(*before)->h_next = 0; /* probably pointless, but whatever. */
*before = nxt;
return;
}
/* Note: we never actually get here. the callers don't delete things
they can't find. */
assert(*before != 0);
}
static item** _hashitem_before (const char *key, const size_t nkey, const uint32_t hv) {
item **pos;
unsigned int oldbucket;
if (expanding &&
(oldbucket = (hv & hashmask(hashpower - 1))) >= expand_bucket)
{
pos = &old_hashtable[oldbucket];
} else {
pos = &primary_hashtable[hv & hashmask(hashpower)];
}
while (*pos && ((nkey != (*pos)->nkey) || memcmp(key, ITEM_key(*pos), nkey))) {
pos = &(*pos)->h_next;
}
return pos;
}
? 在看過了Memcached Hash表中數(shù)據(jù)的增刪查,下面來看看Hash表的擴容實現(xiàn):
/* 該方法只是Hash擴容的初始化方法 */
static void assoc_expand(void) {
old_hashtable = primary_hashtable;
/* 從這里可以看出慢味,Hash擴容的方式是重新申請兩倍大小的Hash表*/
primary_hashtable = calloc(hashsize(hashpower + 1), sizeof(void *));
if (primary_hashtable) {
if (settings.verbose > 1)
fprintf(stderr, "Hash table expansion starting\n");
hashpower++;
expanding = true;
expand_bucket = 0;
STATS_LOCK();
stats.hash_power_level = hashpower;
stats.hash_bytes += hashsize(hashpower) * sizeof(void *);
stats.hash_is_expanding = 1;
STATS_UNLOCK();
} else {
primary_hashtable = old_hashtable;
/* Bad news, but we can keep running. */
}
}
static volatile int do_run_maintenance_thread = 1;
#define DEFAULT_HASH_BULK_MOVE 1
int hash_bulk_move = DEFAULT_HASH_BULK_MOVE;
/* ReHash的線程任務(wù) */
static void *assoc_maintenance_thread(void *arg) {
mutex_lock(&maintenance_lock);
while (do_run_maintenance_thread) {
int ii = 0;
/* There is only one expansion thread, so no need to global lock. */
/* 這里的hash_bulk_move標記一次rehash的桶的最小個數(shù)*/
for (ii = 0; ii < hash_bulk_move && expanding; ++ii) {
item *it, *next;
int bucket;
void *item_lock = NULL;
/* bucket = hv & hashmask(hashpower) =>the bucket of hash table
* is the lowest N bits of the hv, and the bucket of item_locks is
* also the lowest M bits of hv, and N is greater than M.
* So we can process expanding with only one item_lock. cool! */
/*這里對整個桶進行加鎖*/
if ((item_lock = item_trylock(expand_bucket))) {
for (it = old_hashtable[expand_bucket]; NULL != it; it = next) {
next = it->h_next;
bucket = hash(ITEM_key(it), it->nkey) & hashmask(hashpower);
it->h_next = primary_hashtable[bucket];
primary_hashtable[bucket] = it;
}
/* 已經(jīng)處理掉的桶置為NULL */
old_hashtable[expand_bucket] = NULL;
expand_bucket++;
/* rehash完成的標記 */
if (expand_bucket == hashsize(hashpower - 1)) {
expanding = false;
free(old_hashtable);
STATS_LOCK();
stats.hash_bytes -= hashsize(hashpower - 1) * sizeof(void *);
stats.hash_is_expanding = 0;
STATS_UNLOCK();
if (settings.verbose > 1)
fprintf(stderr, "Hash table expansion done\n");
}
} else {
usleep(10*1000);
}
if (item_lock) {
item_trylock_unlock(item_lock);
item_lock = NULL;
}
}
if (!expanding) {
/* We are done expanding.. just wait for next invocation */
started_expanding = false;
pthread_cond_wait(&maintenance_cond, &maintenance_lock);
/* assoc_expand() swaps out the hash table entirely, so we need
* all threads to not hold any references related to the hash
* table while this happens.
* This is instead of a more complex, possibly slower algorithm to
* allow dynamic hash table expansion without causing significant
* wait times.
*/
pause_threads(PAUSE_ALL_THREADS);
assoc_expand();
pause_threads(RESUME_ALL_THREADS);
}
}
return NULL;
}