介紹
Redis Zrange 是有序集合( SortedSet ) 提供的一個(gè)命令羡棵,可以返回有序集中指定區(qū)間內(nèi)的成員,而有序集合比較有用的一個(gè)功能就是 "范圍查找" 時(shí)間復(fù)雜度平均是 O[(LogN)+M] M是返回的元素個(gè)數(shù)嗅钻,有序集合底層是通過字典+跳躍表
的方式來實(shí)現(xiàn)的晾腔,我們這里只看這個(gè)跳躍表結(jié)構(gòu)如何實(shí)現(xiàn)范圍查找.
跳躍表
跳躍表可以理解成是可以二分查找的鏈表,因?yàn)樘S表是基于鏈表去實(shí)現(xiàn)的啊犬,而每個(gè)節(jié)點(diǎn)都會(huì)隨機(jī)一個(gè)層數(shù)灼擂,同時(shí)這個(gè)節(jié)點(diǎn)會(huì)根據(jù)當(dāng)前隨機(jī)出的level層數(shù)去創(chuàng)建一個(gè)level數(shù)組,而這個(gè)數(shù)組里面就是保存每一層的對應(yīng)的前驅(qū)指針觉至,所以每一個(gè)節(jié)點(diǎn)的level層數(shù)都不一樣剔应,這樣最后的結(jié)果就是有的節(jié)點(diǎn)在高層有的在低層,但是根據(jù)隨機(jī)算法設(shè)定高層的節(jié)點(diǎn)數(shù)永遠(yuǎn)小于低層语御,全部分散開從高到低形成跨度峻贮,每次查找某個(gè)值也是先在高level層去查找,因?yàn)楦遧evel層的節(jié)點(diǎn)永遠(yuǎn)是最少的且跨度最大的应闯,我們這里具體太詳細(xì)的概念就不做多說了纤控,可以詳細(xì)去了解下這個(gè) 數(shù)據(jù)結(jié)構(gòu)-跳躍表.
上面這張圖大概就是描述redis里面的一個(gè)跳躍表結(jié)構(gòu),score就是分?jǐn)?shù)也就是節(jié)點(diǎn)的value值碉纺,查找的時(shí)候就是利用每一層的跨度從高到低去比較value的大小最終確定結(jié)果船万,也可以理解成從高到低不斷的縮小查找范圍,跟二分查找有點(diǎn)類似骨田,這也就是我們上面說的跳躍表就是一個(gè)可以二分的鏈表耿导,但是可以看到除了score還有一個(gè)span字段,正常的跳躍表是沒有這個(gè)span字段的态贤,而作者為了可以實(shí)現(xiàn)范圍查找(分頁)擴(kuò)展了一個(gè)這個(gè)字段用來記錄從高到低每個(gè)節(jié)點(diǎn)距離下一個(gè)節(jié)點(diǎn)之間的跨度舱呻。
范圍查找(分頁) span 字段
首先因?yàn)槲覀円龅氖欠秶檎遥幌裆厦娴目梢园捶謹(jǐn)?shù)從高到低利用跨度去比較縮小范圍然后最后定位一個(gè)節(jié)點(diǎn)悠汽,我們要做的是 limit 0 10 -- limit 20 10 -- limit 100 10
這樣的效果箱吕,如果不加這個(gè)span字段那么就只能退化成從最低層節(jié)點(diǎn)開始的位置不斷的向后遍歷直到找到我們要的范圍起始位置節(jié)點(diǎn)芥驳,那么最終的時(shí)間復(fù)雜度也就是O(n),然而加上這個(gè)span字段之后茬高,也可以跟查找分?jǐn)?shù)的方式一樣從最高層利用span這個(gè)字段(記錄距離下一個(gè)節(jié)點(diǎn)的距離)晚树,去不斷的累加span值判斷是否小于等于我們的起始位置,如果不等于則判斷是否當(dāng)前就是我們的起始位置雅采,如果也不是那么就降下來一層繼續(xù)累加span,跟查找分?jǐn)?shù)的方式基本一樣.
舉個(gè)栗子:
查找 > zrange key 5,2
那么就從最高層開始計(jì)算慨亲,首先
level4 -- score:0-span:1 + score:10-span:4 == 5
level3 -- ....
level2 -- ....
這樣的話直接計(jì)算出來 score:20 是第5個(gè)節(jié)點(diǎn)婚瓜,那么就確認(rèn)了,也就代表直接定位到了第5個(gè)節(jié)點(diǎn)指針位置刑棵,那么最后就會(huì)在最低層以這個(gè)score:20節(jié)點(diǎn)指針作為開始位置不斷向后獲取2個(gè)節(jié)點(diǎn)然后返回結(jié)束.
所以使用 zrange 這個(gè)命令去范圍獲取,平均的時(shí)間復(fù)雜度就是 O[(LogN)+M],最壞的情況O(n)
下面這段代碼是從 redis 源碼 src/t_zset.c 里面抽出來的巴刻,功能就是創(chuàng)建一個(gè)跳表,為了生成的跳躍表跟上圖一致蛉签,我這里對代碼稍作修改了下胡陪,使每個(gè)節(jié)點(diǎn)層數(shù)及分布跟上圖完全一致,也就是每個(gè)score節(jié)點(diǎn)都固定了level層數(shù)碍舍,不讓它隨機(jī)分配柠座,同時(shí)也可以看下每插入一個(gè)score節(jié)點(diǎn)程序是如何算出span的.
#include <stdio.h>
#include <stdlib.h>
//設(shè)置跳表層數(shù)
#define ZSKIPLIST_MAXLEVEL 4
//跳表節(jié)點(diǎn)結(jié)構(gòu)體
typedef struct zskiplistNode {
double score;
//跳表節(jié)點(diǎn)中 存放的前驅(qū)節(jié)點(diǎn)
struct zskiplistNode *backward;
//保存每個(gè)節(jié)點(diǎn)中每一層指向的后繼節(jié)點(diǎn)數(shù)組
struct zskiplistLevel {
//存放每一層的節(jié)點(diǎn)指針
struct zskiplistNode *forward;
//當(dāng)前節(jié)點(diǎn)與后繼節(jié)點(diǎn)跨過的節(jié)點(diǎn)數(shù)量
unsigned int span;
} level[];
} zskiplistNode;
//跳表結(jié)構(gòu)體
typedef struct zskiplist {
//指向跳表節(jié)點(diǎn)的頭尾指針
struct zskiplistNode *header, *tail;
//跳表節(jié)點(diǎn)的個(gè)數(shù)
unsigned long length;
//當(dāng)前跳表節(jié)點(diǎn)最大層數(shù)
int level;
} zskiplist;
/* 創(chuàng)建跳表節(jié)點(diǎn) */
zskiplistNode *zslCreateNode(int level, double score) {
zskiplistNode *zn = (zskiplistNode *)malloc(sizeof(*zn)+level*sizeof(struct zskiplistLevel));
zn->score = score;
return zn;
}
/* 初始化跳表-同時(shí)創(chuàng)建一個(gè)初始化頭節(jié)點(diǎn) */
zskiplist * zslCreate(void) {
int j;
zskiplist *zsl;
zsl = malloc(sizeof(*zsl));
//設(shè)置當(dāng)前默認(rèn)層高為1
zsl->level = 1;
//節(jié)點(diǎn)數(shù)量
zsl->length = 0;
//創(chuàng)建頭結(jié)點(diǎn)
zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0);
//遍歷頭節(jié)點(diǎn)的每一層,初始化值
for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++) {
zsl->header->level[j].forward = NULL;
zsl->header->level[j].span = 0;
}
zsl->header->backward = NULL;
zsl->tail = NULL;
//返回創(chuàng)建的跳表指針
return zsl;
}
/* 往跳表里面插入節(jié)點(diǎn) */
zskiplistNode *zslInsert(zskiplist *zsl, double score) {
zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
unsigned int rank[ZSKIPLIST_MAXLEVEL];
int i, level;
//從高到低level層不斷地判斷找到當(dāng)前節(jié)點(diǎn)插入的位置
//注意:這塊我稍作修改了下片橡,跟源碼有點(diǎn)區(qū)別妈经,源碼除了判斷分?jǐn)?shù)還有其他的判斷條件.
// 而我這里只判斷了一個(gè)分?jǐn)?shù)
x = zsl->header;
for (i = zsl->level-1; i >= 0; i--) {
rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];
while (x->level[i].forward && x->level[i].forward->score < score) {
//記錄當(dāng)前插入的節(jié)點(diǎn)在每一層跳過的節(jié)點(diǎn)距離
rank[i] += x->level[i].span;
x = x->level[i].forward;
}
update[i] = x;
}
//這塊是我固定寫死了,為了跟上圖匹配捧书,正常源碼這塊
//全部都是隨機(jī)生成的level
if (score == 20.0 || score == 10.0){
level = 4;
}
if(score == 15.0){
level = 2;
}
if(score == 17.0){
level = 1;
}
if(score == 18.0){
level = 3;
}
//level = ZSKIPLIST_MAXLEVEL;//zslRandomLevel();
//end...
if (level > zsl->level) {
for (i = zsl->level; i < level; i++) {
rank[i] = 0;
update[i] = zsl->header;
update[i]->level[i].span = zsl->length;
}
zsl->level = level;
}
//printf("%d-%d-%d-%d\n",rank[3],rank[2],rank[1],rank[0]);
//將當(dāng)前的節(jié)點(diǎn)插入到跳躍表每一層里面
x = zslCreateNode(level,score);
for (i = 0; i < level; i++) {
//更新每一層指針
x->level[i].forward = update[i]->level[i].forward;
update[i]->level[i].forward = x;
//更新當(dāng)前節(jié)點(diǎn)和指向當(dāng)前節(jié)點(diǎn)(相當(dāng)于后驅(qū)節(jié)點(diǎn))的span值.
//更新當(dāng)前節(jié)點(diǎn)span
x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
//更新后驅(qū)節(jié)點(diǎn)span
update[i]->level[i].span = (rank[0] - rank[i]) + 1;
//printf("%d--%d\n",x->level[i].span,update[i]->level[i].span);
}
for (i = level; i < zsl->level; i++) {
update[i]->level[i].span++;
}
//設(shè)置 后退指針
x->backward = (update[0] == zsl->header) ? NULL : update[0];
//關(guān)聯(lián)頭尾
if (x->level[0].forward)
x->level[0].forward->backward = x;
else
zsl->tail = x;
//增加跳表長度
zsl->length++;
return x;
}
/* 隨機(jī)生成level層算法 */
int zslRandomLevel(void) {
int level = 1;
#define ZSKIPLIST_P 0.25
while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
level += 1;
return (level<ZSKIPLIST_MAXLEVEL) ? level : ZSKIPLIST_MAXLEVEL;
}
//入口
int main(){
//我們利用redis提供的跳表源碼吹泡,模擬插入一些節(jié)點(diǎn)
zskiplist* zsl = zslCreate();
zslInsert(zsl,10.0);
zslInsert(zsl,25.0);
zslInsert(zsl,15.0);
zslInsert(zsl,17.0);
zslInsert(zsl,18.0);
zslInsert(zsl,20.0);
//打印出跳表結(jié)構(gòu)
int i = 0;
zskiplistNode* p = zsl->header;
zskiplistNode* x = p;
for (i = zsl->level-1; i >= 0; i--) {
printf("level%d\t",i+1);
while (x->level[i].forward) {
printf("%.1f-%d\t",x->score,x->level[i].span);
x = x->level[i].forward;
}
printf("%.1f-%d\n",x->score,x->level[i].span);
x = p;
}
}
運(yùn)行結(jié)果-打印出跟上圖一樣的跳表結(jié)構(gòu)
上面的代碼如果有興趣可以直接拿下來修改調(diào)試,然后最后編譯運(yùn)行就可以了.
下面看下 zrange 命令如何去跳躍表-范圍查找的
//zrange 命令底層會(huì)調(diào)用此函數(shù)
void zrangeGenericCommand(redisClient *c, int reverse) {
//.....省略
// 處理ziplist編碼的情況
if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
//.....
}
// 處理skiplist編碼的情況
else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) {
zset *zs = zobj->ptr;
zskiplist *zsl = zs->zsl;
zskiplistNode *ln;
robj *ele;
//根據(jù)迭代對象取出第一個(gè)開始的迭代節(jié)點(diǎn)
if (reverse) {
ln = zsl->tail;
if (start > 0)
//通過傳入的起始位置快速去跳躍表定位這個(gè)起始位置節(jié)點(diǎn)指針
ln = zslGetElementByRank(zsl,llen-start);
} else {
ln = zsl->header->level[0].forward;
if (start > 0)
ln = zslGetElementByRank(zsl,start+1);
}
// 依次迭代经瓷,取出相應(yīng)的元素
while(rangelen--) {
redisAssertWithInfo(c,zobj,ln != NULL);
// 取出元素值
ele = ln->obj;
addReplyBulk(c,ele);
if (withscores)
addReplyDouble(c,ln->score);
// 根據(jù)迭代方向去的前一個(gè)節(jié)點(diǎn)或者后一個(gè)節(jié)點(diǎn)
ln = reverse ? ln->backward : ln->level[0].forward;
}
}
//......省略
}
zslGetElementByRank() 范圍查找函數(shù)
zskiplistNode* zslGetElementByRank(zskiplist *zsl, unsigned long rank) {
zskiplistNode *x;
// 記錄當(dāng)前的排位信息
unsigned long traversed = 0;
int i;
x = zsl->header;
//可以看到這塊代碼就是我們上面介紹的根據(jù)span去查找起始位置
//從高level到低level層利用每一層每一個(gè)節(jié)點(diǎn)的span跨度去累加判斷
for (i = zsl->level-1; i >= 0; i--) {
//不斷的累加span然后判斷是否小于等于我們的起始位置
while (x->level[i].forward && (traversed + x->level[i].span) <= rank)
{
traversed += x->level[i].span;
x = x->level[i].forward;
}
//判斷是否傳入的起始位置
if (traversed == rank) {
return x;
}
//如果不等于則按當(dāng)前的節(jié)點(diǎn)位置降低level層數(shù)繼續(xù)查找
}
return NULL;
}
結(jié)束
通過以上代碼分析可以看出來redis有序集合的范圍查找性能還是不錯(cuò)的爆哑,而且還是全部基于內(nèi)存,所以感覺如果想高速分頁的話舆吮,可以把ID全部丟到這個(gè)有序集合里面揭朝,然后每次zrange去范圍獲取就行了.