簡介
redisDb作為整個redis緩存存儲的核心。保存著我們客戶端所有的需要的緩存數據吝梅。來一起了解下投储。
數據結構
typedef struct redisDb {
dict *dict; /* The keyspace for this DB *///保持數據的dict
dict *expires; /* Timeout of keys with a timeout set *///保持key的過期信息
dict *blocking_keys; /* Keys with clients waiting for data (BLPOP)*///一些同步的keys
dict *ready_keys; /* Blocked keys that received a PUSH */
dict *watched_keys; /* WATCHED keys for MULTI/EXEC CAS */
int id; /* Database ID */
long long avg_ttl; /* Average TTL, just for stats */
} redisDb;
初始化
初始化在initServer里面
server.db = zmalloc(sizeof(redisDb)*server.dbnum);
for (j = 0; j < server.dbnum; j++) {
server.db[j].dict = dictCreate(&dbDictType,NULL);
server.db[j].expires = dictCreate(&keyptrDictType,NULL);
server.db[j].blocking_keys = dictCreate(&keylistDictType,NULL);
server.db[j].ready_keys = dictCreate(&objectKeyPointerValueDictType,NULL);
server.db[j].watched_keys = dictCreate(&keylistDictType,NULL);
server.db[j].id = j;
server.db[j].avg_ttl = 0;
}
我們也需要關注下創(chuàng)建各個dict的時候的dicttpe,這塊對內存那塊理解需要
/* Db->dict, keys are sds strings, vals are Redis objects. */
dictType dbDictType = {
dictSdsHash, /* hash function */
NULL, /* key dup */
NULL, /* val dup */
dictSdsKeyCompare, /* key compare */
dictSdsDestructor, /* key destructor */
dictObjectDestructor /* val destructor */
};
/* Db->expires */
dictType keyptrDictType = {
dictSdsHash, /* hash function */
NULL, /* key dup */
NULL, /* val dup */
dictSdsKeyCompare, /* key compare */
NULL, /* key destructor */
NULL /* val destructor */
};
/* Keylist hash table type has unencoded redis objects as keys and
* lists as values. It's used for blocking operations (BLPOP) and to
* map swapped keys to a list of clients waiting for this keys to be loaded. */
dictType keylistDictType = {
dictObjHash, /* hash function */
NULL, /* key dup */
NULL, /* val dup */
dictObjKeyCompare, /* key compare */
dictObjectDestructor, /* key destructor */
dictListDestructor /* val destructor */
};
/* Generic hash table type where keys are Redis Objects, Values
* dummy pointers. */
dictType objectKeyPointerValueDictType = {
dictEncObjHash, /* hash function */
NULL, /* key dup */
NULL, /* val dup */
dictEncObjKeyCompare, /* key compare */
dictObjectDestructor, /* key destructor */
NULL /* val destructor */
};
基本操作
查詢
對于我們的db來說提供的功能就是查詢昧港,插入崖技,覆蓋等基本操作逻住。在db層其實并不關心對于插入數據的type,在db看來每一個插入的數據都是redisobject.這樣也可以統(tǒng)一進行管理钟哥。一般是在數據成功被查詢出來之后。后面的 邏輯自己去做類型判斷瞎访。
redis對每一個鍵提供了Expire功能腻贰。因為提供了這個過期機制。所以我們的查詢肯定就是都會需要來判斷這個鍵值對有米有過期装诡。米有過期的數據才是有效的數據银受。
Expire
Expire定義在struct redisDb里面定義為dict *expires;所以可以看出來這就是一個簡單的hashtable來保持了一份過期信息
setExpire
1.首先必須保證我們的dict里面設置的key存在
2.查詢出原來保存expire的信息践盼。沒有的話就創(chuàng)建一個信息
3.設置新的expire的信息
void setExpire(client *c, redisDb *db, robj *key, long long when) {
dictEntry *kde, *de;
/* Reuse the sds from the main dict in the expire dict */
kde = dictFind(db->dict,key->ptr);//首先查詢一下這個我們設置的這個key在dict里面是否存在鸦采。不存在設置個expire就感覺是搞笑卅
serverAssertWithInfo(NULL,key,kde != NULL);
de = dictAddOrFind(db->expires,dictGetKey(kde));//有的話返回原來保持的過期信息沒有就創(chuàng)建一個新的
dictSetSignedIntegerVal(de,when);//設置val為過期時間
int writable_slave = server.masterhost && server.repl_slave_ro == 0;
if (c && writable_slave && !(c->flags & CLIENT_MASTER))
rememberSlaveKeyWithExpire(db,key);
}
getExpire
這個方法其實就是直接讀取hashtable的保存的信息
long long getExpire(redisDb *db, robj *key) {
dictEntry *de;
/* No expire? return ASAP */
if (dictSize(db->expires) == 0 ||
(de = dictFind(db->expires,key->ptr)) == NULL) return -1;//在expires里面查找 key
/* The entry was found in the expire dict, this means it should also
* be present in the main dict (safety check). */
serverAssertWithInfo(NULL,key,dictFind(db->dict,key->ptr) != NULL);
return dictGetSignedIntegerVal(de);//獲取integerval
}
removeExpire
刪除的時候有個強制要求就是你調用這個方法代表這個過期時間必須存在
int removeExpire(redisDb *db, robj *key) {
/* An expire may only be removed if there is a corresponding entry in the
* main dict. Otherwise, the key will never be freed. */
serverAssertWithInfo(NULL,key,dictFind(db->dict,key->ptr) != NULL);//必須存在這個key的過期信息
return dictDelete(db->expires,key->ptr) == DICT_OK;//調用刪除 因為只是一個數字信息不存在什么異步的情況
}
expireIfNeeded
這個方法主要是檢查過期時間,如果過期的話就刪除咕幻。
如果是loading狀態(tài)渔伯,就不管過不過期啥的
如果是script的訪問。他是使用lua_time_start來計算是否過期肄程。
如果是子節(jié)點锣吼,過期了也不管。只管返回是不是過期就行了蓝厌。否則的話就需要做刪除操作玄叠。
最后通知salve和aof文件
int expireIfNeeded(redisDb *db, robj *key) {
mstime_t when = getExpire(db,key);//獲取expire
mstime_t now;
if (when < 0) return 0; /* No expire for this key */ //小于0代表沒有這個key的expire信息
/* Don't expire anything while loading. It will be done later. */
if (server.loading) return 0;// server.loading 不能exipre
/* If we are in the context of a Lua script, we pretend that time is
* blocked to when the Lua script started. This way a key can expire
* only the first time it is accessed and not in the middle of the
* script execution, making propagation to slaves / AOF consistent.
* See issue #1525 on Github for more information. */
now = server.lua_caller ? server.lua_time_start : mstime();//lua將使用lua開始的時間作為now 防止用著用著被干掉了
/* If we are running in the context of a slave, return ASAP:
* the slave key expiration is controlled by the master that will
* send us synthesized DEL operations for expired keys.
*
* Still we try to return the right information to the caller,
* that is, 0 if we think the key should be still valid, 1 if
* we think the key is expired at this time. */
if (server.masterhost != NULL) return now > when; //作為是一個子節(jié)點就只做不叫不做更改 now>when 返回1 now<when 返回0
/* Return when this key has not expired */
if (now <= when) return 0; //還沒有過期返回0
/* Delete the key */
server.stat_expiredkeys++; //更新stat_expiredkeys
propagateExpire(db,key,server.lazyfree_lazy_expire);//把消息發(fā)送到aof file和salve
notifyKeyspaceEvent(NOTIFY_EXPIRED,
"expired",key,db->id); //notify
return server.lazyfree_lazy_expire ? dbAsyncDelete(db,key) : //同步或者異步刪除
dbSyncDelete(db,key);
}
expire就到這里。不過這里的話都是被動刪除拓提。當我們的內存過期的時候读恃。我們需要把它釋放掉。不釋放的話代态,會占著內存寺惫。我們這里看到的是我們被動的查詢的時候來做的刪除。所以程序里面還會有一個主動刪除的機制蹦疑。會在后面寫西雀。
查詢
lookupKey
這個方法屬于最低級的查詢api了,他提供查詢功能歉摧。并且根據標志位來確定是不是更新LFU或者lru
需要注意的是這個方法沒有判斷是不是expire艇肴,所以需要配合expireIfNeeded來用的
robj *lookupKey(redisDb *db, robj *key, int flags) { //查詢key
dictEntry *de = dictFind(db->dict,key->ptr);//調用dict的查詢方法
if (de) {//如果存在
robj *val = dictGetVal(de);//獲取value
/* Update the access time for the ageing algorithm.
* Don't do it if we have a saving child, as this will trigger
* a copy on write madness. */
if (server.rdb_child_pid == -1 && //看下有沒有saving 進程和是否需要更改lru
server.aof_child_pid == -1 &&
!(flags & LOOKUP_NOTOUCH))
{
if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) {
updateLFU(val);
} else {
val->lru = LRU_CLOCK();
}
}
return val;
} else {
return NULL;
}
}
lookupKeyReadWithFlags
這個函數因為加了expire操作,所以有點特別叁温。因為為了主從一致性再悼,在salve里面即使發(fā)現過期key,也是不會刪除的券盅。這個刪除需要master主動來通知帮哈。所以在遇到查詢的時候,如果不是master的鏈接锰镀,或者這個請求只是個readonly的請求娘侍】校可以安全的返回個null沒啥毛病。但是當是masetr的請求并且不是readonly的請求憾筏,就需要原樣的返回嚎杨。比如這個請求就是來刪除他的喃,你給他返回不存在氧腰,不科學卅枫浙。
在最后的時候更新miss和hit
robj *lookupKeyReadWithFlags(redisDb *db, robj *key, int flags) {
robj *val;
if (expireIfNeeded(db,key) == 1) { //如果這個玩意存在 并且被刪除了
/* Key expired. If we are in the context of a master, expireIfNeeded()
* returns 0 only when the key does not exist at all, so it's safe
* to return NULL ASAP. */
if (server.masterhost == NULL) return NULL;
//進入的情況就是就是這個玩意是slave 他不得去刪除這個key的
/* However if we are in the context of a slave, expireIfNeeded() will
* not really try to expire the key, it only returns information
* about the "logical" status of the key: key expiring is up to the
* master in order to have a consistent view of master's data set.
*
* However, if the command caller is not the master, and as additional
* safety measure, the command invoked is a read-only command, we can
* safely return NULL here, and provide a more consistent behavior
* to clients accessign expired values in a read-only fashion, that
* will say the key as non exisitng.
*
* Notably this covers GETs when slaves are used to scale reads. */
//對于非master 或者不readonly的commnad請求可以安全的返回null
if (server.current_client &&
server.current_client != server.master &&
server.current_client->cmd &&
server.current_client->cmd->flags & CMD_READONLY) //這個commnad 不是master 并且不是readonly就給他返回null
{
return NULL;
} //是master 的非readonly 肯定就要干事情
}
val = lookupKey(db,key,flags);//查
if (val == NULL)//更新命中和未命中
server.stat_keyspace_misses++;
else
server.stat_keyspace_hits++;
return val;
}
其他幾個查詢函數
robj *lookupKeyRead(redisDb *db, robj *key) {//這個值是判斷有米有
return lookupKeyReadWithFlags(db,key,LOOKUP_NONE);
}
/* Lookup a key for write operations, and as a side effect, if needed, expires
* the key if its TTL is reached.
*
* Returns the linked value object if the key exists or NULL if the key
* does not exist in the specified DB. */
robj *lookupKeyWrite(redisDb *db, robj *key) {
expireIfNeeded(db,key);//write 先進行expire
return lookupKey(db,key,LOOKUP_NONE);
}
robj *lookupKeyReadOrReply(client *c, robj *key, robj *reply) {
robj *o = lookupKeyRead(c->db, key);
if (!o) addReply(c,reply);
return o;
}
robj *lookupKeyWriteOrReply(client *c, robj *key, robj *reply) {
robj *o = lookupKeyWrite(c->db, key);
if (!o) addReply(c,reply);
return o;
}
添加覆蓋
添加或者覆蓋總的說來就是set。這些函數必須的注意的就是古拴,調用的時候必須要保證是添加箩帚,還到底是覆蓋。必須有明確原來key是否存在
void dbAdd(redisDb *db, robj *key, robj *val) {//增加
sds copy = sdsdup(key->ptr);//copy key
int retval = dictAdd(db->dict, copy, val);//dict add
serverAssertWithInfo(NULL,key,retval == DICT_OK);//程序會被終止在這個keyexist
if (val->type == OBJ_LIST) signalListAsReady(db, key);// 更新ready
if (server.cluster_enabled) slotToKeyAdd(key);
}
void dbOverwrite(redisDb *db, robj *key, robj *val) {
dictEntry *de = dictFind(db->dict,key->ptr);
serverAssertWithInfo(NULL,key,de != NULL);//必須找到
if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) {
robj *old = dictGetVal(de);
int saved_lru = old->lru;
dictReplace(db->dict, key->ptr, val);//replace
val->lru = saved_lru;
/* LFU should be not only copied but also updated
* when a key is overwritten. */
updateLFU(val);
} else {
dictReplace(db->dict, key->ptr, val);// replace
}
}
void setKey(redisDb *db, robj *key, robj *val) {
if (lookupKeyWrite(db,key) == NULL) { //判斷存在不
dbAdd(db,key,val);//直接add
} else {
dbOverwrite(db,key,val); //覆蓋寫
}
incrRefCount(val);//增加引用
removeExpire(db,key);//移除expire
signalModifiedKey(db,key);//通知修改
}
刪除
對于后面版本的redis提供了異步刪除的工作黄痪。這個異步刪除和我們前面異步刪除key value沒什么本質上的區(qū)別
dbSyncDelete
這個玩意很簡單直接干就行了
int dbSyncDelete(redisDb *db, robj *key) {
/* Deleting an entry from the expires dict will not free the sds of
* the key, because it is shared with the main dictionary. */
if (dictSize(db->expires) > 0) dictDelete(db->expires,key->ptr);//直接刪除刪除expire
if (dictDelete(db->dict,key->ptr) == DICT_OK) {//直接刪除dict
if (server.cluster_enabled) slotToKeyDel(key);
return 1;
} else {
return 0;
}
}
dbSyncDelete
這個函數就是判斷下value的長度紧帕,看下是不是符合異步刪除的條件這個條件的定義是#define LAZYFREE_THRESHOLD 64
int dbAsyncDelete(redisDb *db, robj *key) {
/* Deleting an entry from the expires dict will not free the sds of
* the key, because it is shared with the main dictionary. */
if (dictSize(db->expires) > 0) dictDelete(db->expires,key->ptr);//刪除expire
/* If the value is composed of a few allocations, to free in a lazy way
* is actually just slower... So under a certain limit we just free
* the object synchronously. */
dictEntry *de = dictUnlink(db->dict,key->ptr);//從dict里面移除 但是不釋放內存
if (de) {
robj *val = dictGetVal(de);
size_t free_effort = lazyfreeGetFreeEffort(val);//判斷value的長度
/* If releasing the object is too much work, do it in the background
* by adding the object to the lazy free list.
* Note that if the object is shared, to reclaim it now it is not
* possible. This rarely happens, however sometimes the implementation
* of parts of the Redis core may call incrRefCount() to protect
* objects, and then call dbDelete(). In this case we'll fall
* through and reach the dictFreeUnlinkedEntry() call, that will be
* equivalent to just calling decrRefCount(). */
if (free_effort > LAZYFREE_THRESHOLD && val->refcount == 1) {//這個個數還是有點多 并且沒有其他引用了
atomicIncr(lazyfree_objects,1);//增加lazyfree_objects
bioCreateBackgroundJob(BIO_LAZY_FREE,val,NULL,NULL);//添加異步任務
dictSetVal(db->dict,de,NULL);//設置val 為null
}
}
/* Release the key-val pair, or just the key if we set the val
* field to NULL in order to lazy free it later. */
if (de) {
dictFreeUnlinkedEntry(db->dict,de);//釋放dictEntry 其中如果val沒有被異步釋放也會在這里釋放
if (server.cluster_enabled) slotToKeyDel(key);//slot
return 1;
} else {
return 0;
}
}
dbDelete
就是調用刪除
int dbDelete(redisDb *db, robj *key) {
return server.lazyfree_lazy_server_del ? dbAsyncDelete(db,key) :
dbSyncDelete(db,key);
}
清空數據庫
清空數據庫其實和刪除沒啥區(qū)別。就是分一個同步刪除和異步刪除桅打。本質上就是清空hashtable
emptyDbAsync
void emptyDbAsync(redisDb *db) {
dict *oldht1 = db->dict, *oldht2 = db->expires;//取出expires 和 dict
db->dict = dictCreate(&dbDictType,NULL);// 清空的時候把原來的重新創(chuàng)建過
db->expires = dictCreate(&keyptrDictType,NULL);
atomicIncr(lazyfree_objects,dictSize(oldht1));
bioCreateBackgroundJob(BIO_LAZY_FREE,NULL,oldht1,oldht2);//創(chuàng)建任務
}
emptyDb
清空的時候-1代表全部是嗜。其他的代表具體的db
long long emptyDb(int dbnum, int flags, void(callback)(void*)) { //清空數據庫
int j, async = (flags & EMPTYDB_ASYNC);
long long removed = 0;
if (dbnum < -1 || dbnum >= server.dbnum) {// -1 代表全部 其他代表 清空的id
errno = EINVAL;
return -1;
}
for (j = 0; j < server.dbnum; j++) {
if (dbnum != -1 && dbnum != j) continue;
removed += dictSize(server.db[j].dict);
if (async) {
emptyDbAsync(&server.db[j]);//異步清空
} else {
dictEmpty(server.db[j].dict,callback);//清空dict
dictEmpty(server.db[j].expires,callback);//清空expire
}
}
if (server.cluster_enabled) {
if (async) {
slotToKeyFlushAsync();
} else {
slotToKeyFlush();
}
}
if (dbnum == -1) flushSlaveKeysWithExpireList();
return removed;
}
總結
我們粗略的看了下redisDb⊥ξ玻看了dict和expires鹅搪。
我們可以發(fā)現其實在db層面上是完全沒有關心存儲的類型的。所以我們的對于類型方面的東西需要在上層處理遭铺。
db還有幾部分沒有關注到的blocking_key這些和slot這些丽柿。這里我會在后面的對應的特性去關注。現在的關注點就是把它看成一個單純的kv緩存掂僵,米有什么主從航厚,集群啥的。