跳表(跳躍表)是一種數(shù)據(jù)結構拣挪,改進自鏈表畦攘,用于存儲有序的數(shù)據(jù)届吁,跳躍表通過空間換時間的方法來提高數(shù)據(jù)檢索的速度。早些在學校的數(shù)據(jù)結構課程中并沒有接觸過跳表筋讨。第一次接觸是在了解Redis的有序集合的底層實現(xiàn)的時候尚揣,畢竟Redis的五種數(shù)據(jù)結構是面試中的常見題,即使沒有實際使用過按摘,也需要提前去了解一下。
但是最近在工作中不一樣了纫谅,由于是從事游戲開發(fā)炫贤,經(jīng)常會有排行榜的需求需要實現(xiàn),而這大多數(shù)通過跳表來實現(xiàn)付秕,所以有必要來深入了解一下跳表并簡略的閱讀一下ConcurrentSkipListMap
的源碼兰珍。前面也提到了Redis中的有序集合底層就是跳表,所以其也是開發(fā)排行榜功能時可以考慮使用的询吴。有時候呢掠河,我們需要排序的東西不需要持久化或者為了更高的效率,是直接在內(nèi)存中進行排序猛计,畢竟訪問redis是有網(wǎng)絡io開銷的唠摹,那么使用到的數(shù)據(jù)結構就是jdk中的ConcurrentSkipListMap
(支持同步)或者TreeMap
(不支持同步,性能更好).
不過這次源碼重點在于跳躍表奉瘤,不在于并發(fā)的控制勾拉。
跳躍表
通過上面的介紹,我們了解到跳躍表的應用場景大概是這樣的:
- 有序
- 頻繁插入刪除
- 頻繁的查找
對于有序來說盗温,數(shù)組和普通鏈表都可以通過排序算法來實現(xiàn)藕赞,在排序復雜度上不相上下。鏈表在插入和刪除上性能較好卖局,數(shù)組在查找上性能較好斧蜕,所以都不能滿足我們的要求。跳躍表則是在插入刪除和查找的性能上做了折中砚偶,復雜度為log(n)批销。
跳表結構如下所示:
為了更好的支持插入和刪除洒闸,所以采用鏈表的形式,可以看到圖片中最下面一行是一個有序的鏈表风钻。但如果只是一個單一的鏈表顷蟀,查找時復雜度為O(n),性能太差骡技。如何優(yōu)化呢鸣个?
在有序數(shù)組中,我們查找時用的是二分查找布朦,一次查找可以排除一半元素的遍歷囤萤。在數(shù)組中之所以可以用二分查找,是因為我們能夠快速的以O(1)的復雜度定位到中間的位置是趴,但是鏈表只能是O(N)涛舍。所以跳躍表采取空間換時間的方式,既然你找不到中間點唆途,或者三分之一點等中間位置富雅,那么我可以通過多增加一個節(jié)點來指向中間位置,這樣你也能夠快速的定位到中間的位置肛搬,然后一定程度的減少你遍歷元素的個數(shù)没佑,提高效率。圖中有多層温赔,相鄰的兩層蛤奢,采用的都是這樣的思想。
這個圖一目了然陶贼,很容易就可以讓大家了解跳表的思想啤贩。至于我們應該添加多少層額外的鏈表,給什么位置的節(jié)點添加索引才能更好的優(yōu)化檢索和插入的效率拜秧,就是我希望通過閱讀源碼找尋的.
節(jié)點
通過上圖痹屹,可以發(fā)現(xiàn)有兩種節(jié)點類型,第一種是最下層的枉氮,與普通鏈表相似志衍,第二種是除了最后一層以外的其他索引層節(jié)點,有兩個指針嘲恍。
但是在jdk源碼中還存在一種節(jié)點,是索引層的頭節(jié)點雄驹,還維護了其層數(shù)信息佃牛,下面先給出源碼注釋中的跳表樣例。
* Head nodes Index nodes
* +-+ right +-+ +-+
* |2|---------------->| |--------------------->| |->null
* +-+ +-+ +-+
* | down | |
* v v v
* +-+ +-+ +-+ +-+ +-+ +-+
* |1|----------->| |->| |------>| |----------->| |------>| |->null
* +-+ +-+ +-+ +-+ +-+ +-+
* v | | | | |
* Nodes next v v v v v
* +-+ +-+ +-+ +-+ +-+ +-+ +-+ +-+ +-+ +-+ +-+ +-+
* | |->|A|->|B|->|C|->|D|->|E|->|F|->|G|->|H|->|I|->|J|->|K|->null
* +-+ +-+ +-+ +-+ +-+ +-+ +-+ +-+ +-+ +-+ +-+ +-+
*
接著我們來分別看下它們在源碼中是如何定義的医舆。
// 記住了俘侠,只關注跳表象缀,不去關注并發(fā)!R佟央星!水平有限,不誤導大家
// Node是最底層的鏈表的節(jié)點惫东,包括鍵值對和指向下一個節(jié)點的指針
static final class Node<K,V> {
final K key;
volatile Object value;
volatile Node<K,V> next;
// 至于為什么需要兩個構造函數(shù)莉给,后面源碼會有解釋
Node(K key, Object value, Node<K,V> next) {
this.key = key;
this.value = value;
this.next = next;
}
Node(Node<K,V> next) {
this.key = null;
this.value = this;
this.next = next;
}
// ...配套method
}
// 索引節(jié)點結構
// 存儲了兩個指針,分別指向右邊和下邊的節(jié)點
// 索引節(jié)點的value為鏈表節(jié)點
static class Index<K,V> {
final Node<K,V> node;
final Index<K,V> down;
volatile Index<K,V> right;
Index(Node<K,V> node, Index<K,V> down, Index<K,V> right) {
this.node = node;
this.down = down;
this.right = right;
}
// ...配套method
}
// 索引層的頭節(jié)點結構
// 在索引節(jié)點的基礎上添加了表示層數(shù)的level變量
static final class HeadIndex<K,V> extends Index<K,V> {
final int level;
HeadIndex(Node<K,V> node, Index<K,V> down, Index<K,V> right, int level) {
super(node, down, right);
this.level = level;
}
}
源碼閱讀小技巧
JDK中集合包下的代碼廉沮,一個集合差不多有幾千行颓遏,想要從上往下逐行看完是不現(xiàn)實的,需要能快速的定位到我們想看的方法滞时。如果是HashMap
這種常用的數(shù)據(jù)結構叁幢,我們經(jīng)常使用,對它有哪些方法非常的了解坪稽,就可以通過搜索方法名字來定位代碼的位置曼玩。但是像今天這種ConcurrentSkipListMap
比較不常用的類,在不了解它大多數(shù)方法的時候窒百,可以通過idea提供的功能幫我們快速定位黍判。打開下圖中的show members
就可以讓類現(xiàn)實它所有的方法和內(nèi)部類,就可以靠猜名字快速定位到源碼的位置贝咙。
[圖片上傳失敗...(image-61db2d-1568213866130)]
構造函數(shù)
了解完內(nèi)部鏈表節(jié)點的實現(xiàn)情況样悟,現(xiàn)在按照老規(guī)矩,從構造函數(shù)開始閱讀源碼庭猩。
// Compartor接口用來指定key的排序規(guī)則
public ConcurrentSkipListMap() {
this.comparator = null;
initialize();
}
public ConcurrentSkipListMap(Comparator<? super K> comparator) {
this.comparator = comparator;
initialize();
}
// 還有兩個傳入map和sortedMap的構造函數(shù)
private void initialize() {
keySet = null;// 內(nèi)部類
entrySet = null;// 內(nèi)部類
values = null;// 內(nèi)部類
descendingMap = null;// 內(nèi)部類
// private static final Object BASE_HEADER = new Object();
// 從注釋給出的圖來看窟她,這個head應該是一直處于第一層的頭節(jié)點
head = new HeadIndex<K,V>(new Node<K,V>(null, BASE_HEADER, null),
null, null, 1);
}
構造函數(shù)好像沒啥,那么接著往下看吧
put()
// 與一般的map一樣蔼水,通過put插入鍵值對震糖,,key趴腋,value不能為空
public V put(K key, V value) {
if (value == null)
throw new NullPointerException();
return doPut(key, value, false);
}
// doPut
private V doPut(K key, V value, boolean onlyIfAbsent) {
Node<K,V> z; // 要被添加的Node
if (key == null)
throw new NullPointerException();
// key的比較方法
Comparator<? super K> cmp = comparator;
outer: for (;;) { // 因為cas操作可能失敗吊说,套了個無限循環(huán)
// findPrecessor返回小于key但最接近key的節(jié)點,不存在則為鏈表的頭節(jié)點优炬,下面也會介紹
for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) {
if (n != null) {
Object v; int c;
Node<K,V> f = n.next;
// b--->n--->f
if (n != b.next)
break;
if ((v = n.value) == null) { // n is deleted
// 簡單來說就是b.next = f颁井,但是考慮了cas
n.helpDelete(b, f);
break;
}
if (b.value == null || v == n) // b is deleted
break;
// 如果key大于n.key
if ((c = cpr(cmp, key, n.key)) > 0) {
// 說明有并發(fā)插入,繼續(xù)向后遍歷一個節(jié)點
b = n;
n = f;
continue;
}
// 如果key相等
if (c == 0) {
// 沒有指定putIfAbsent的話蠢护,通過cas替換value并返回新的value
// 指定了putIfabsent則返回原有value值
if (onlyIfAbsent || n.casValue(v, value)) {
@SuppressWarnings("unchecked") V vv = (V)v;
return vv;
}
break; //cas失敗雅宾,重試
}
// else c < 0;則位置正確,插入就行
}
// 創(chuàng)建新的Node節(jié)點
z = new Node<K,V>(key, value, n);
// 更新鏈表b->n->f ==> b->z->n->f
if (!b.casNext(n, z))
break;
break outer;
}
}
int rnd = ThreadLocalRandom.nextSecondarySeed();
// 0x80000001轉(zhuǎn)換為二進制1000...0001
// 看起來似乎添加層數(shù)是有一定隨機性的
// rnd為0xxx...xxx0時可以進入
if ((rnd & 0x80000001) == 0) {
int level = 1, max;
// 有多少個1葵硕,level遞增多少次
while (((rnd >>>= 1) & 1) != 0)
++level;
Index<K,V> idx = null;
HeadIndex<K,V> h = head;
if (level <= (max = h.level)) {
// 如果level比現(xiàn)在的層數(shù)小眉抬,則在新增加的節(jié)點z上
// 建立level個索引節(jié)點贯吓,忘記了可以回上面看看索引節(jié)點和其他節(jié)點區(qū)別
for (int i = 1; i <= level; ++i)
idx = new Index<K,V>(z, idx, null);
}
else {
// 如果level大于層數(shù),則level設為層數(shù)+1
level = max + 1;
// 構造索引節(jié)點數(shù)組
@SuppressWarnings("unchecked")Index<K,V>[] idxs =
(Index<K,V>[])new Index<?,?>[level+1];
//為新建的節(jié)點z創(chuàng)造level個索引節(jié)點
//下標從1開始
for (int i = 1; i <= level; ++i)
idxs[i] = idx = new Index<K,V>(z, idx, null);
for (;;) {
h = head;
int oldLevel = h.level;
//用于判斷并發(fā)修改蜀变,不考慮
//正確情況下該分支的level>當前層數(shù)
if (level <= oldLevel) // lost race to add level
break;
HeadIndex<K,V> newh = h;
Node<K,V> oldbase = h.node;
for (int j = oldLevel+1; j <= level; ++j)
//為每層生成一個headIndex
newh = new HeadIndex<K,V>(oldbase, newh, idxs[j], j);
//更新最上層的headIndex指針
if (casHead(h, newh)) {
h = newh;
idx = idxs[level = oldLevel];
break;
}
}
}
splice: for (int insertionLevel = level;;) {
int j = h.level;
// h為最新的頭節(jié)點
for (Index<K,V> q = h, r = q.right, t = idx;;) {
if (q == null || t == null)
break splice;
// r為前面的idxs[x]
if (r != null) {
Node<K,V> n = r.node;
// compare before deletion check avoids needing recheck
int c = cpr(cmp, key, n.key);
if (n.value == null) {
if (!q.unlink(r))
break;
r = q.right;
continue;
}
// 沒并發(fā)的情況下悄谐,idxs數(shù)組里都是新增的key
// c應該=0
if (c > 0) {
q = r;
r = r.right;
continue;
}
}
// j開始時為新跳表層數(shù)
//insertionLevel為舊跳表,經(jīng)過后面的幾個j--才會進入該分支
if (j == insertionLevel) {
//將t插入q,r之間
if (!q.link(r, t))
break; // restart
if (t.node.value == null) {
findNode(key);
break splice;
}
if (--insertionLevel == 0)
break splice;
}
if (--j >= insertionLevel && j < level)
t = t.down;
//向下一層
q = q.down;
r = q.right;
}
}
}
return null;
}
// findPredecessor
// 回想一下前面那個跳表的結構库北,該方法就是根據(jù)key爬舰,先從head往右找,然后往下找
//然后再往右再往下贤惯,知道找到比key小的節(jié)點
private Node<K,V> findPredecessor(Object key, Comparator<? super K> cmp) {
if (key == null)
throw new NullPointerException();
for (;;) {
for (Index<K,V> q = head, r = q.right, d;;) {
// 從head頭節(jié)點點向后遍歷
if (r != null) {
Node<K,V> n = r.node;
K k = n.key;
//說明被刪除了洼专,更新q.next
if (n.value == null) {
if (!q.unlink(r))
break; // restart
r = q.right; // reread r
continue;
}
if (cpr(cmp, key, k) > 0) {
q = r;
r = r.right;
continue;
}
}
// 失敗了就從下一層開始
if ((d = q.down) == null)
return q.node;
q = d;
r = d.right;
}
}
}
put函數(shù)介紹完了,最后一段更新跳表的操作還是有些亂和難以理解...
get()
public V get(Object key) {
return doGet(key);
}
private V doGet(Object key) {
if (key == null)
throw new NullPointerException();
Comparator<? super K> cmp = comparator;
outer: for (;;) {
//findPredecessor找到離key最近的孵构,小于key的node
// 沒有并發(fā)的情況下屁商,要么是n,要么沒有要找的key
for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) {
Object v; int c;
if (n == null)
break outer;
Node<K,V> f = n.next;
if (n != b.next) // inconsistent read
break;
if ((v = n.value) == null) { // n is deleted
n.helpDelete(b, f);
break;
}
if (b.value == null || v == n) // b is deleted
break;
//key與n.key相同
if ((c = cpr(cmp, key, n.key)) == 0) {
@SuppressWarnings("unchecked") V vv = (V)v;
return vv;
}
if (c < 0)
break outer;
//并發(fā)導致key>n.key颈墅,說明findPredecessor找的位置不對
//往后位移一個節(jié)點蜡镶,重新開始
b = n;
n = f;
}
}
return null;
}
get方法還是比較簡單的,findPrecessor
方法找到的就是小于key但是最接近key的節(jié)點恤筛,所以key如果存在官还,必然是findPrecessor
找到的節(jié)點的下一個節(jié)點,而代碼中為了考慮并發(fā)帶來的修改毒坛,還要做很多其他的判斷望伦。
remove()
接下來看下刪除的代碼。
public V remove(Object key) {
return doRemove(key, null);
}
final V doRemove(Object key, Object value) {
if (key == null)
throw new NullPointerException();
Comparator<? super K> cmp = comparator;
outer: for (;;) {
// findPrecessor找到比key小但是最近的node
//不考慮并發(fā)的話煎殷,如果存在key那就是n=b.next
for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) {
Object v; int c;
if (n == null)
break outer;
Node<K,V> f = n.next;
//考慮一些并發(fā)修改的問題
if (n != b.next) // inconsistent read
break;
if ((v = n.value) == null) { // n is deleted
n.helpDelete(b, f);
break;
}
if (b.value == null || v == n) // b is deleted
break;
// 如果key<n屯伞,說明key不存在,退出最外層循環(huán)
if ((c = cpr(cmp, key, n.key)) < 0)
break outer;
//c>0說明findPre方法找到的節(jié)點過期了豪直,重新找
if (c > 0) {
b = n;
n = f;
continue;
}
// c==0
if (value != null && !value.equals(v))
break outer;
//通過cas將節(jié)點n的value設為null劣摇,就是key對應的節(jié)點
//findPrecessor方法會在讀到value為null的值時進行刪除
if (!n.casValue(v, null))
break;
//n添加刪除標志并且更新b.next指針
//appendMarker創(chuàng)建一個key為null,value為自己的節(jié)點
if (!n.appendMarker(f) || !b.casNext(n, f))
findNode(key); // retry via findNode
else {
//刪除成功并且更新b.next后進入該分支
//findPredecessor會在value==null時,更新next指針弓乙,實現(xiàn)刪除
findPredecessor(key, cmp); // clean index
if (head.right == null)
tryReduceLevel();
}
@SuppressWarnings("unchecked") V vv = (V)v;
return vv;
}
}
return null;
}
findPredecessor()
到這里插入末融、讀取和刪除都介紹完了,這三個方法都調(diào)用了findPredecessor
暇韧,可以說這個方法是跳表的核心了勾习。所以這里拿出來再看一下該方法。
private Node<K,V> findPredecessor(Object key, Comparator<? super K> cmp) {
if (key == null)
throw new NullPointerException();
for (;;) {
//q為第一層的head節(jié)點
//r=q.right說明先向右遍歷
for (Index<K,V> q = head, r = q.right, d;;) {
//向右遍歷的時候懈玻,右邊的節(jié)點不為空
if (r != null) {
Node<K,V> n = r.node;
K k = n.key;
//在remove方法中巧婶,n.casValue(v, null)將節(jié)點的值置空
//置空后,在這里進行索引節(jié)點的刪除
if (n.value == null) {
//unlink操作為將原本的q->r->r.next
//轉(zhuǎn)換為q->r.next
if (!q.unlink(r))
break; // restart
r = q.right; // reread r
continue;
}
//如果r的節(jié)點沒被刪除,就執(zhí)行到這里
//如果key大于當前的k粹舵,則向后移一個節(jié)點
//head索引節(jié)點也向后移一個節(jié)點
if (cpr(cmp, key, k) > 0) {
q = r;
r = r.right;
continue;
}
}
//如果索引節(jié)點的右邊沒有節(jié)點了,則向下移動
//向下移動后如果還是索引層骂倘,則繼續(xù)向后
//如果是節(jié)點眼滤,直接返回了
if ((d = q.down) == null)
return q.node;
q = d;
r = d.right;
}
}
}
總結
本文介紹了跳表的特點和使用場景,并對ConcurrentSkipListMap
的源碼進行了簡略的分析历涝,但由于并發(fā)的存在诅需,使得在理解跳表上增加了一些不必要的難度,同時其生成索引層的代碼也比較晦澀難懂荧库,可以通過維基百科等更簡單的途徑來了解其原理和源碼堰塌。