導航
memcached源碼分析
memcached源碼分析-網(wǎng)絡模塊
memcached源碼分析-指令解析模塊
memcached源碼分析-哈希表(hashtable)模塊
memcached源碼分析-slab存儲機制
1.前言
Memcached是一個高性能的開源分布式內存對象緩存系統(tǒng)卤妒,說到這里的"高性能"梨水、"緩存"那么不得不提memcached內存是如何管理的节视,這也是memecached核心內容,接下來主要分hashtable模塊篷就、slabs內存分配管理機制、LRU淘汰策略辖众。這一章節(jié)先行分析hashtable模塊楞件。memcached的哈希表是用來保存item*數(shù)據(jù)結構的一段固定大小的內存區(qū)域,這里主要分析的功能點有hash表的段鎖视粮、hash表的增刪改查细办、hash表的動態(tài)拓容。
2.hash表段鎖
memcached是多線程的實現(xiàn)的蕾殴,為了確保多線程操作hash表的安全性笑撞,代碼中使用了鎖。什么是段鎖钓觉,就是有可能會多個key對應一把鎖茴肥,而不是每一個key都對應一把鎖,因為不同的key可能hash出來的值一樣荡灾,那么就都會對應這一把鎖炉爆。
程序中在memcached_thread_init函數(shù)中初始化了鎖
void memcached_thread_init(int nthreads, void *arg) {
//....
if (nthreads < 3) {
power = 10;
} else if (nthreads < 4) {
power = 11;
} else if (nthreads < 5) {
power = 12;
} else if (nthreads <= 10) {
power = 13;
} else if (nthreads <= 20) {
power = 14;
} else {
/* 32k buckets. just under the hashpower default. */
power = 15;
}
//段鎖的槽大小要小于hash槽的大小
//個人理解power等于hashpower是可取的堕虹,可以減少鎖沖突,如果power大于hashpower是一種資源的浪費
if (power >= hashpower) {
fprintf(stderr, "Hash table power size (%d) cannot be equal to or less than item lock table (%d)\n", hashpower, power);
fprintf(stderr, "Item lock table grows with `-t N` (worker threadcount)\n");
fprintf(stderr, "Hash table grows with `-o hashpower=N` \n");
exit(1);
}
//哈希鎖的數(shù)量計算
item_lock_count = hashsize(power);
item_lock_hashpower = power;
//申請hash鎖資源
//哈希鎖不會和hash表一樣進行拓容,如果后期hash拓容芬首,那么會導致越多的hash槽位對應同一把鎖
item_locks = calloc(item_lock_count, sizeof(pthread_mutex_t));
if (! item_locks) {
perror("Can't allocate item locks");
exit(1);
}
//初始化
for (i = 0; i < item_lock_count; i++) {
pthread_mutex_init(&item_locks[i], NULL);
}
}
鎖的使用實例
//鎖的使用
enum store_item_type store_item(item *item, int comm, conn* c) {
enum store_item_type ret;
uint32_t hv;
//根據(jù)item計算一個對應的hash value
hv = hash(ITEM_key(item), item->nkey);
//根據(jù)hash value 獲取對應的鎖資源,鎖hash槽
item_lock(hv);
//存儲item到hash槽鏈表上
ret = do_store_item(item, comm, c, hv);
//釋放鎖資源
item_unlock(hv);
return ret;
}
3.哈希表的增刪改查
hash表的增刪改查,說到hash表逼裆,我們先介紹一個memcached中的一個重要結構體item郁稍,item結構是用于存儲緩存數(shù)據(jù)的節(jié)點,item內存分配于slab(后面會做詳細分析),而hash表主要作用是記錄item節(jié)點胜宇,我們在后面可通過key快速查取到緩存數(shù)據(jù)
item節(jié)點結構
/**
* Structure for storing items within memcached.
*/
typedef struct _stritem {
/* Protected by LRU locks */
struct _stritem *next;
struct _stritem *prev;
/* Rest are protected by an item lock */
//h_next用于記錄哈希表槽中下一個item節(jié)點的地址
//多線程訪問hash槽節(jié)點耀怜,是通過item lock保證線程安全的
struct _stritem *h_next; /* hash chain next */
rel_time_t time; /* least recent access */
rel_time_t exptime; /* expire time */
int nbytes; /* size of data */
unsigned short refcount;
uint8_t nsuffix; /* length of flags-and-length string */
uint8_t it_flags; /* ITEM_* above */
uint8_t slabs_clsid;/* which slab class we're in */
uint8_t nkey; /* key length, w/terminating null and padding */
/* this odd type prevents type-punning issues when we do
* the little shuffle to save space when not using CAS. */
union {
uint64_t cas;
char end;
} data[];
/* if it_flags & ITEM_CAS we have 8 bytes CAS */
/* then null-terminated key */
/* then " flags length\r\n" (no terminating null) */
/* then data with terminating \r\n (no terminating null; it's binary!) */
} item;
哈希表的初始化
main函數(shù)中調用assoc_init實現(xiàn)hashtable的初始化工作
void assoc_init(const int hashtable_init) {
//根據(jù)hashtable_init計算哈希表的大小
if (hashtable_init) {
hashpower = hashtable_init;
}
//根據(jù)hashpower計算hash表一共有多少個桶槽
//例如hashpower = 16,那么通過hashsize計算可得桶槽大小為1<<16 = 65536
primary_hashtable = calloc(hashsize(hashpower), sizeof(void *));
if (! primary_hashtable) {
fprintf(stderr, "Failed to init hashtable.\n");
exit(EXIT_FAILURE);
}
STATS_LOCK();
stats_state.hash_power_level = hashpower;
stats_state.hash_bytes = hashsize(hashpower) * sizeof(void *);
STATS_UNLOCK();
}
哈希表查找
查找過程:
1.根據(jù)key計算一個hash值hv
2.根據(jù)hv獲取段鎖(保證多線程安全訪問)
3.根據(jù)hv到哈希表中找到對應的桶槽
4.遍歷桶槽的的單向鏈表,比較key值桐愉,找到具體的item節(jié)點
item *item_get(const char *key, const size_t nkey, conn *c, const bool do_update) {
item *it;
uint32_t hv;
hv = hash(key, nkey);
//獲取鎖财破,保證多線程訪問hash表的安全性
item_lock(hv);
//根據(jù)key值,取hash表中查找item
it = do_item_get(key, nkey, hv, c, do_update);
//釋放鎖資源
item_unlock(hv);
return it;
}
item *do_item_get(const char *key, const size_t nkey, const uint32_t hv, conn *c, const bool do_update) {
//assoc_find就是具體的hash表查找
item *it = assoc_find(key, nkey, hv);
//...
}
item *assoc_find(const char *key, const size_t nkey, const uint32_t hv) {
item *it;
unsigned int oldbucket;
//expanding是用來判斷hash表是否正在拓容
//expanding == true說明hash表正在拓容
//先簡要說明一下拓容:hash表拓容就是分配一張更大的hash表从诲,然后將數(shù)據(jù)從頭到尾將舊的hash內容復制一份到新的hash表
//expand_bucket是標記hash表已經(jīng)拓容到哪個桶槽了
//比如舊的hash表old_hashtble一共65536個桶槽位左痢,已經(jīng)復制了10000個桶槽數(shù)據(jù)到新哈希表primary_hashtable,那么此時
//expand_bucket == 10000
if (expanding &&
(oldbucket = (hv & hashmask(hashpower - 1))) >= expand_bucket)
{
//1.哈希表正在拓容
//2.hv & hashmask(hashpower - 1) 大于expand_bucket說明系洛,要查詢的桶槽在old_hashtable處
it = old_hashtable[oldbucket];
} else {
it = primary_hashtable[hv & hashmask(hashpower)];
}
item *ret = NULL;
int depth = 0;
//遍歷桶槽的單向鏈表
while (it) {
//比較具體的key值
if ((nkey == it->nkey) && (memcmp(key, ITEM_key(it), nkey) == 0)) {
ret = it;
break;
}
it = it->h_next;
++depth;
}
MEMCACHED_ASSOC_FIND(key, nkey, depth);
return ret;
}
向hash表中插入一個新增的item
int assoc_insert(item *it, const uint32_t hv) {
unsigned int oldbucket;
//...
//這里的解析參考我們的assoc_find分析
if (expanding &&
(oldbucket = (hv & hashmask(hashpower - 1))) >= expand_bucket)
{
it->h_next = old_hashtable[oldbucket];
old_hashtable[oldbucket] = it;
} else {
it->h_next = primary_hashtable[hv & hashmask(hashpower)]
primary_hashtable[hv & hashmask(hashpower)] = it;
}
}
從hash表中刪除一個item節(jié)點
void assoc_delete(const char *key, const size_t nkey, const uint32_t hv) {
//根據(jù)key,nkey,hv查找節(jié)點
//我們要查找key對應的item節(jié)點pnode
//prev->h_next指向pnode
//這里的before就是prev->h_next的變量地址,也就是pnode前驅item節(jié)點h_next指針變量的地址
item **before = _hashitem_before(key, nkey, hv);
if (*before) {
item *nxt;
/* The DTrace probe cannot be triggered as the last instruction
* due to possible tail-optimization by the compiler
*/
MEMCACHED_ASSOC_DELETE(key, nkey);
nxt = (*before)->h_next;
(*before)->h_next = 0; /* probably pointless, but whatever. */
*before = nxt;
return;
}
/* Note: we never actually get here. the callers don't delete things
they can't find. */
assert(*before != 0);
}
/* returns the address of the item pointer before the key. if *item == 0,
the item wasn't found */
static item** _hashitem_before (const char *key, const size_t nkey, const uint32_t hv) {
item **pos;
unsigned int oldbucket;
//這里的解析參考我們的assoc_find分析
if (expanding &&
(oldbucket = (hv & hashmask(hashpower - 1))) >= expand_bucket)
{
pos = &old_hashtable[oldbucket];
} else {
pos = &primary_hashtable[hv & hashmask(hashpower)];
}
//比較key
while (*pos && ((nkey != (*pos)->nkey) || memcmp(key, ITEM_key(*pos), nkey))) {
pos = &(*pos)->h_next;
}
return pos;
}
3.哈希表的拓容
哈希表何時進行拓容呢俊性?memcached做法是向libevent中注冊了一個時鐘回調,定時的去檢測是否需要對哈希表進行拓容操作描扯。hash表的拓容過程則是由memcached單獨申請了一個線程進行拓容操作定页。
檢測拓容回調函數(shù)
int main(){
//...
/* initialise clock event */
clock_handler(0, 0, 0);
//....
}
/* libevent uses a monotonic clock when available for event scheduling. Aside
* from jitter, simply ticking our internal timer here is accurate enough.
* Note that users who are setting explicit dates for expiration times *must*
* ensure their clocks are correct before starting memcached. */
static void clock_handler(const int fd, const short which, void *arg) {
//...
// While we're here, check for hash table expansion.
// This function should be quick to avoid delaying the timer.
//判斷是否需要hash拓容
assoc_start_expand(stats_state.curr_items);
evtimer_set(&clockevent, clock_handler, 0);
event_base_set(main_base, &clockevent);
evtimer_add(&clockevent, &t);
//...
}
void assoc_start_expand(uint64_t curr_items) {
//正在拓容,返回
if (started_expanding)
return;
//curr_items是當前hash表中記錄的item節(jié)點的個數(shù)
//當curr_items大于當前hash表桶槽個數(shù)的1.5倍的時候绽诚,則需要拓容當前的hash表
if (curr_items > (hashsize(hashpower) * 3) / 2 &&
hashpower < HASHPOWER_MAX) {
started_expanding = true;
//信號量觸發(fā)
pthread_cond_signal(&maintenance_cond);
}
}
拓容線程
//hash的拓容memcached獨立的啟動了一個線程進行拓容操作
static void *assoc_maintenance_thread(void *arg) {
mutex_lock(&maintenance_lock);
while (do_run_maintenance_thread) {
int ii = 0;
/* There is only one expansion thread, so no need to global lock. */
//expanding初始化為false典徊,當assoc_start_expand中信號量觸發(fā)將會觸發(fā)assoc_expand的執(zhí)行
//assoc_expand中對expanding置為true
for (ii = 0; ii < hash_bulk_move && expanding; ++ii) {
item *it, *next;
unsigned int bucket;
void *item_lock = NULL;
/* bucket = hv & hashmask(hashpower) =>the bucket of hash table
* is the lowest N bits of the hv, and the bucket of item_locks is
* also the lowest M bits of hv, and N is greater than M.
* So we can process expanding with only one item_lock. cool! */
//expand_bucket變量含義解析我們可以參考assoc_find中的解釋
//當前桶槽嘗試加鎖
if ((item_lock = item_trylock(expand_bucket))) {
//for循環(huán)遍歷桶槽下的鏈表,將每個節(jié)點從old_hashtable遷移到primary_hashtable
for (it = old_hashtable[expand_bucket]; NULL != it; it = next) {
next = it->h_next;
bucket = hash(ITEM_key(it), it->nkey) & hashmask(hashpower);
it->h_next = primary_hashtable[bucket];
primary_hashtable[bucket] = it;
}
old_hashtable[expand_bucket] = NULL;
//下一個桶槽
expand_bucket++;
//當expand_bucket == old_hashtable尺寸大小恩够,則節(jié)點轉移完畢
if (expand_bucket == hashsize(hashpower - 1)) {
//停止拓容
expanding = false;
free(old_hashtable);
STATS_LOCK();
stats_state.hash_bytes -= hashsize(hashpower - 1) * sizeof(void *);
stats_state.hash_is_expanding = false;
STATS_UNLOCK();
if (settings.verbose > 1)
fprintf(stderr, "Hash table expansion done\n");
}
} else {
usleep(10*1000);
}
if (item_lock) {
item_trylock_unlock(item_lock);
item_lock = NULL;
}
}
if (!expanding) {
/* We are done expanding.. just wait for next invocation */
started_expanding = false;
//信號量未觸發(fā)卒落,當前線程將阻塞在這里
//當assoc_start_expand中執(zhí)行pthread_cond_signal,當前線程不在阻塞
pthread_cond_wait(&maintenance_cond, &maintenance_lock);
/* assoc_expand() swaps out the hash table entirely, so we need
* all threads to not hold any references related to the hash
* table while this happens.
* This is instead of a more complex, possibly slower algorithm to
* allow dynamic hash table expansion without causing significant
* wait times.
*/
pause_threads(PAUSE_ALL_THREADS);
//拓容前的準備
assoc_expand();
pause_threads(RESUME_ALL_THREADS);
}
}
return NULL;
}
/* grows the hashtable to the next power of 2. */
static void assoc_expand(void) {
//副本
old_hashtable = primary_hashtable;
//新拓容的hash表
primary_hashtable = calloc(hashsize(hashpower + 1), sizeof(void *));
if (primary_hashtable) {
if (settings.verbose > 1)
fprintf(stderr, "Hash table expansion starting\n");
hashpower++;
//assoc_maintenance_thread函數(shù)將依據(jù)該變量玫鸟,開始進行拓容操作
expanding = true;
expand_bucket = 0;
STATS_LOCK();
stats_state.hash_power_level = hashpower;
stats_state.hash_bytes += hashsize(hashpower) * sizeof(void *);
stats_state.hash_is_expanding = true;
STATS_UNLOCK();
} else {
primary_hashtable = old_hashtable;
/* Bad news, but we can keep running. */
}
}