1. ConcurrentSkipListMap 簡(jiǎn)介
ConcurrentSkipListMap 一個(gè)并發(fā)安全, 基于 skip list 實(shí)現(xiàn)有序存儲(chǔ)的Map. OK 我們回憶一下 Java 中常用 的Map HashMap, TreeMap, ConcurrentHashMap, 還有就是我們今天的主角 ConcurrentSkipListMap.
ConcurrentSkipListMap 與 ConcurrentHashMap 同在 concurrent 包下, 雖然 ConcurrentSkipListMap 比 ConcurrentHashMap 多用存儲(chǔ)空間(用空間換時(shí)間), 但它有著ConcurrentHashMap不能比擬的優(yōu)點(diǎn): 有序數(shù)據(jù)存儲(chǔ), 基于的就是 skip list.
2. 跳表(skip list)
跳躍列表(skip list) 是一種隨機(jī)話的數(shù)據(jù)結(jié)構(gòu), 基于上下左右都是鏈表的數(shù)據(jù)結(jié)構(gòu), 其效率可以比擬 二叉查找樹; 基本上 skip list 是在鏈表的上層增加多層索引鏈表(增加是隨機(jī)的)實(shí)現(xiàn)的, 所以查找中根據(jù)索引層進(jìn)行跳躍快速查找, 因此得名;
如圖:
上面這張圖來自 wiki, 圖中有添加的過程, 但是結(jié)構(gòu)沒有細(xì)節(jié), 我結(jié)合自己的理解, 也畫了一張: (圖片右擊在新的窗口中看, 簡(jiǎn)書的圖片預(yù)覽不能真實(shí)展現(xiàn)圖, 暈!)
從這張圖中我們可以得出 ConcurrentSkipListMap 的幾個(gè)特點(diǎn):
- ConcurrentSkipListMap 的節(jié)點(diǎn)主要由 Node, Index, HeadIndex 構(gòu)成;
- ConcurrentSkipListMap 的數(shù)據(jù)結(jié)構(gòu)橫向縱向都是鏈表
- 最下面那層鏈表是Node層(數(shù)據(jù)節(jié)點(diǎn)層), 上面幾層都是Index層(索引)
- 從縱向鏈表來看, 最左邊的是 HeadIndex 層, 右邊的都是Index 層, 且每層的最底端都是對(duì)應(yīng)Node, 縱向上的索引都是指向最底端的Node(這一點(diǎn)圖片為了美觀就沒體現(xiàn))
了解了數(shù)據(jù)結(jié)構(gòu), 我們直接來看源碼
2. 內(nèi)部節(jié)點(diǎn)類 Node
static final class Node<K, V>{
final K key; // key 是 final 的, 說明節(jié)點(diǎn)一旦定下來, 除了刪除, 不然不會(huì)改動(dòng) key 了
volatile Object value; // 對(duì)應(yīng)的 value
volatile Node<K, V> next; // 下一個(gè)節(jié)點(diǎn)
// 構(gòu)造函數(shù)
public Node(K key, Object value, Node<K, V> next) {
this.key = key;
this.value = value;
this.next = next;
}
/**
* 創(chuàng)建一個(gè)標(biāo)記節(jié)點(diǎn)(通過 this.value = this 來標(biāo)記)
* 這個(gè)標(biāo)記節(jié)點(diǎn)非常重要: 有了它, 就能對(duì)鏈表中間節(jié)點(diǎn)進(jìn)行同時(shí)刪除了插入
* ps: ConcurrentLinkedQueue 只能在頭上, 尾端進(jìn)行插入, 中間進(jìn)行刪除
*/
public Node(Node<K, V> next) {
this.key = null;
this.value = this;
this.next = next;
}
/**
* CAS 操作設(shè)置 Value
*/
boolean casValue(Object cmp, Object val){
return unsafe.compareAndSwapObject(this, valueOffset, cmp, val);
}
/**
* CAS 操作設(shè)置 next
*/
boolean casNext(Node<K, V> cmp, Node<K, V> val){
return unsafe.compareAndSwapObject(this, nextOffset, cmp, val);
}
/**
* 檢測(cè)是否為標(biāo)記節(jié)點(diǎn)
*/
boolean isMarker(){
return value == this;
}
/**
* 檢測(cè)是否為 鏈表最左下角的 BASE_HEADER 節(jié)點(diǎn)
*/
boolean isBaseHeader(){
return value == BASE_HEADER;
}
/**
* 對(duì)節(jié)點(diǎn)追加一個(gè)標(biāo)記節(jié)點(diǎn), 為最終的刪除做準(zhǔn)備
*/
boolean appendMarker(Node<K, V> f){
return casNext(f, new Node<K, V>(f));
}
/**
* Help out a deletion by appending marker or unlinking from
* predecessor. This called during traversals when value
* field seen to be null
*
* helpDelete 方法, 這個(gè)方法要么追加一個(gè)標(biāo)記節(jié)點(diǎn), 要么進(jìn)行刪除操作
*/
void helpDelete(Node<K, V> b, Node<K, V> f){
/**
* Rechecking links and then doing only one of the
* help-out stages per call tends to minimize CAS
* interference among helping threads
*/
if(f == next && this == b.next){
if(f == null || f.value != f){ // 還沒有對(duì)刪除的節(jié)點(diǎn)進(jìn)行節(jié)點(diǎn) marker
casNext(f, new Node<K, V>(f));
}else{
b.casNext(this, f.next); // 刪除 節(jié)點(diǎn) b 與 f.next 之間的節(jié)點(diǎn)
}
}
}
/**
* 校驗(yàn)數(shù)據(jù)
*/
V getValidValue(){
Object v = value;
if(v == this || v == BASE_HEADER){
return null;
}
V vv = (V)v;
return vv;
}
/**
* Creates and returns a new SimpleImmutableEntry holding current
* mapping if this node holds a valid value, else null.
*
* @return new entry or null
*/
AbstractMap.SimpleImmutableEntry<K, V> createSnapshot(){
Object v = value;
if(v == null || v == this || v == BASE_HEADER){
return null;
}
V vv = (V) v;
return new AbstractMap.SimpleImmutableEntry<K, V>(key, vv);
}
// UNSAFE mechanics
private static final Unsafe unsafe;
private static final long valueOffset;
private static final long nextOffset;
static {
try {
unsafe = UnSafeClass.getInstance();
Class<?> k = Node.class;
valueOffset = unsafe.objectFieldOffset(k.getDeclaredField("value"));
nextOffset = unsafe.objectFieldOffset(k.getDeclaredField("next"));
}catch (Exception e){
throw new Error(e);
}
}
}
從上面的代碼中我們大體知道 Node 有以下幾個(gè)特點(diǎn):
- this.value = this 的標(biāo)記節(jié)點(diǎn)
- 刪除分為2步, 把 non-null 變成 null, 在進(jìn)行添加標(biāo)記節(jié)點(diǎn) (helpDelete方法中)
3. 索引節(jié)點(diǎn) Index
static class Index<K, V>{
final Node<K, V> node; // 索引指向的節(jié)點(diǎn), 縱向上所有索引指向鏈表最下面的節(jié)點(diǎn)
final Index<K, V> down; // 下邊level層的 Index
volatile Index<K, V> right; // 右邊的 Index
/**
* Creates index node with given values
* @param node
* @param down
* @param right
*/
public Index(Node<K, V> node, Index<K, V> down, Index<K, V> right) {
this.node = node;
this.down = down;
this.right = right;
}
/**
* compareAndSet right field
* @param cmp
* @param val
* @return
*/
final boolean casRight(Index<K, V> cmp, Index<K, V> val){
return unsafe.compareAndSwapObject(this, rightOffset, cmp, val);
}
/**
* Returns true if the node this indexes has been deleted.
* @return true if indexed node is known to be deleted
*/
final boolean indexesDeletedNode(){
return node.value == null;
}
/**
* Tries to CAS newSucc as successor. To minimize races with
* unlink that may lose this index node, if the node being
* indexed is known to be deleted, it doesn't try to link in
*
* @param succ the expecteccurrent successor
* @param newSucc the new successor
* @return true if successful
*/
/**
* 在 index 本身 和 succ 之間插入一個(gè)新的節(jié)點(diǎn) newSucc
* @param succ
* @param newSucc
* @return
*/
final boolean link(Index<K, V> succ, Index<K, V> newSucc){
Node<K, V> n = node;
newSucc.right = succ;
return n.value != null && casRight(succ, newSucc);
}
/**
* Tries to CAS field to skip over apparent successor
* succ. Fails (forcing a retravesal by caller) if this node
* is known to be deleted
* @param succ the expected current successor
* @return true if successful
*/
/**
* 將當(dāng)前的節(jié)點(diǎn) index 設(shè)置其的 right 為 succ.right 等于刪除 succ 節(jié)點(diǎn)
* @param succ
* @return
*/
final boolean unlink(Index<K, V> succ){
return node.value != null && casRight(succ, succ.right);
}
// Unsafe mechanics
private static final Unsafe unsafe;
private static final long rightOffset;
static {
try{
unsafe = UnSafeClass.getInstance();
Class<?> k = Index.class;
rightOffset = unsafe.objectFieldOffset(k.getDeclaredField("right"));
}catch (Exception e){
throw new Error(e);
}
Index 特點(diǎn):
- 縱向鏈表都指向同個(gè) Node
- Index 有兩個(gè)表 常用的方法 link, unlink 方法, 后邊會(huì)提及到的
4. 頭索引節(jié)點(diǎn) HeadIndex
/**
* Nodes heading each level keep track of their level.
*/
static final class HeadIndex<K,V> extends Index<K,V> {
final int level;
HeadIndex(Node<K,V> node, Index<K,V> down, Index<K,V> right, int level) {
super(node, down, right);
this.level = level;
}
}
HeadIndex 沒什么特別, 只是增加一個(gè) level 屬性用來標(biāo)示索引層級(jí); 注意所有的 HeadIndex 都指向同一個(gè) Base_header 節(jié)點(diǎn);
OK, 下面我們來看看 ConcurrentSkipListMap 的主要方法 doPut, doGet, doRemove
5. 數(shù)據(jù)增加方法 doPut()
/**
* Main insetion method. Adds element if not present, or
* replaces value if present and onlyIfAbsent is false.
*
* @param key the key
* @param value the values that must be associated with key
* @param onlyIfAbstsent if should not insert if already present
* @return the old value, or null if newly inserted
*/
private V doPut(K key, V value, boolean onlyIfAbstsent){
Node<K, V> z; // adde node
if(key == null){
throw new NullPointerException();
}
Comparator<? super K> cmp = comparator;
outer:
for(;;){
// 0.
for(Node<K, V> b = findPredecessor(key, cmp), n = b.next;;){ // 1. 將 key 對(duì)應(yīng)的前繼節(jié)點(diǎn)找到, b 為前繼節(jié)點(diǎn), n是前繼節(jié)點(diǎn)的next, 若沒發(fā)生 條件競(jìng)爭(zhēng), 最終 key在 b 與 n 之間 (找到的b在 base_level 上)
if(n != null){ // 2. n = null時(shí) b 是鏈表的最后一個(gè)節(jié)點(diǎn), key 直接插到 b 之后 (調(diào)用 b.casNext(n, z))
Object v; int c;
Node<K, V> f = n.next; // 3 獲取 n 的右節(jié)點(diǎn)
if(n != b.next){ // 4. 條件競(jìng)爭(zhēng)(另外一個(gè)線程在b之后插入節(jié)點(diǎn), 或直接刪除節(jié)點(diǎn)n), 則 break 到位置 0, 重新
break ;
}
if((v = n.value) == null){ // 4. 若 節(jié)點(diǎn)n已經(jīng)刪除, 則 調(diào)用 helpDelete 進(jìn)行幫助刪除 (詳情見 helpDelete), 則 break 到位置 0, 重新來
n.helpDelete(b, f);
break ;
}
if(b.value == null || v == n){ // 5. 節(jié)點(diǎn)b被刪除中 ,則 break 到位置 0, 調(diào)用 findPredecessor 幫助刪除 index 層的數(shù)據(jù), 至于 node 層的數(shù)據(jù) 會(huì)通過 helpDelete 方法進(jìn)行刪除
break ;
}
if((c = cpr(cmp, key, n.key)) > 0){ // 6. 若 key 真的 > n.key (在調(diào)用 findPredecessor 時(shí)是成立的), 則進(jìn)行 向后走
b = n;
n = f;
continue ;
}
if(c == 0){ // 7. 直接進(jìn)行賦值
if(onlyIfAbstsent || n.casValue(v, value)){
V vv = (V) v;
return vv;
}
break ; // 8. cas 競(jìng)爭(zhēng)條件失敗 重來
}
// else c < 0; fall through
}
// 9. 到這邊時(shí) n.key > key > b.key
z = new Node<K, V> (key, value, n);
if(!b.casNext(n, z)){
break ; // 10. cas競(jìng)爭(zhēng)條件失敗 重來
}
break outer; // 11. 注意 這里 break outer 后, 上面的 for循環(huán)不會(huì)再執(zhí)行, 而后執(zhí)行下面的代碼, 這里是break 不是 continue outer, 這兩者的效果是不一樣的
}
}
int rnd = KThreadLocalRandom.nextSecondarySeed();
if((rnd & 0x80000001) == 0){ // 12. 判斷是否需要添加level
int level = 1, max;
while(((rnd >>>= 1) & 1) != 0){
++level;
}
// 13. 上面這段代碼是獲取 level 的, 我們這里只需要知道獲取 level 就可以 (50%的幾率返回0尉剩,25%的幾率返回1,12.5%的幾率返回2...最大返回31毅臊。)
Index<K, V> idx = null;
HeadIndex<K, V> h = head;
if(level <= (max = h.level)){ // 14. 初始化 max 的值, 若 level 小于 max , 則進(jìn)入這段代碼 (level 是 1-31 之間的隨機(jī)數(shù))
for(int i = 1; i <= level; ++i){
idx = new Index<K, V>(z, idx, null); // 15 添加 z 對(duì)應(yīng)的 index 數(shù)據(jù), 并將它們組成一個(gè)上下的鏈表(index層是上下左右都是鏈表)
}
}
else{ // 16. 若 level > max 則只增加一層 index 索引層
level = max + 1; // 17. 跳表新的 level 產(chǎn)生
Index<K, V>[] idxs = (Index<K, V>[])new Index<?, ?>[level + 1];
for(int i = 1; i <= level; ++i){
idxs[i] = idx = new Index<K, V>(z, idx, null);
}
for(;;){
h = head;
int oldLevel = h.level; // 18. 獲取老的 level 層
if(level <= oldLevel){ // 19. 另外的線程進(jìn)行了index 層增加操作, 所以 不需要增加 HeadIndex 層數(shù)
break;
}
HeadIndex<K, V> newh = h;
Node<K, V> oldbase = h.node; // 20. 這里的 oldbase 就是BASE_HEADER
for(int j = oldLevel+1; j <= level; ++j){ // 21. 這里其實(shí)就是增加一層的 HeadIndex (level = max + 1)
newh = new HeadIndex<K, V>(oldbase, newh, idxs[j], j); // 22. idxs[j] 就是上面的 idxs中的最高層的索引
}
if(casHead(h, newh)){ // 23. 這只新的 headIndex
h = newh; // 24. 這里的 h 變成了 new HeadIndex
idx = idxs[level = oldLevel]; // 25. 這里的 idx 上從上往下第二層的 index 節(jié)點(diǎn) level 也變成的 第二
break;
}
}
}
// find insertion points and splice in
splice:
for(int insertionLevel = level;;){ // 26. 這時(shí)的 level 已經(jīng)是 第二高的 level(若上面 步驟19 條件競(jìng)爭(zhēng)失敗, 則多出的 index 層其實(shí)是無用的, 因?yàn)?那是 調(diào)用 Index.right 是找不到它的)
int j = h.level;
for(Index<K, V> q = h, r = q.right, t = idx;;){ // 27. 初始化對(duì)應(yīng)的數(shù)據(jù)
if(q == null || t == null){ // 28. 節(jié)點(diǎn)都被刪除 直接 break出去
break splice;
}
if(r != null){
Node<K, V> n = r.node;
// compare before deletion check avoids needing recheck
int c = cpr(cmp, key, n.key);
if(n.value == null){ // 29. 老步驟, 幫助index 的刪除
if(!q.unlink(r)){
break ;
}
r = q.right; // 30. 向右進(jìn)行遍歷
continue ;
}
if(c > 0){ // 31. 向右進(jìn)行遍歷
q = r;
r = r.right;
continue ;
}
}
// 32.
// 代碼運(yùn)行到這里, 說明 key < n.key
// 第一次運(yùn)行到這邊時(shí), j 是最新的 HeadIndex 的level j > insertionLevel 非常用可能, 而下面又有 --j, 所以終會(huì)到 j == insertionLevel
if(j == insertionLevel){
if(!q.link(r, t)){ // 33. 將 index t 加到 q 與 r 中間, 若條件競(jìng)爭(zhēng)失敗的話就重試
break ; // restrt
}
if(t.node.value == null){ // 34. 若這時(shí) node 被刪除, 則開始通過 findPredecessor 清理 index 層, findNode 清理 node 層, 之后直接 break 出去, doPut調(diào)用結(jié)束
findNode(key);
break splice;
}
if(--insertionLevel == 0){ // 35. index 層添加OK理茎, --1 為下層插入 index 做準(zhǔn)備
break splice;
}
}
/**
* 下面這行代碼其實(shí)是最重要的, 理解這行代碼, 那 doPut 就差不多了
* 1). --j 要知道 j 是 newhead 的level, 一開始一定 > insertionLevel的, 通過 --1 來為下層操作做準(zhǔn)備 (j 是 headIndex 的level)
* 2). 通過 19. 21, 22 步驟, 個(gè)人認(rèn)為 --j >= insertionLevel 是橫成立, 而 --j 是必須要做的
* 3) j 經(jīng)過幾次--1管嬉, 當(dāng)出現(xiàn) j < level 時(shí)說明 (j+1) 層的 index已經(jīng)添加成功, 所以處理下層的 index
*/
if(--j >= insertionLevel && j < level){
t = t.down;
}
/** 到這里時(shí), 其實(shí)有兩種情況
* 1) 還沒有一次index 層的數(shù)據(jù)插入
* 2) 已經(jīng)進(jìn)行 index 層的數(shù)據(jù)插入, 現(xiàn)在為下一層的插入做準(zhǔn)備
*/
q = q.down; // 從 index 層向下進(jìn)行查找
r = q.right;
}
}
}
return null;
}
doPut方法分析:
整個(gè)doPut方法看起來有點(diǎn)嚇人, 但沒事,我們將這個(gè)方法進(jìn)行分割:
- 獲取前繼節(jié)點(diǎn)(步驟1 findPredecessor), 進(jìn)行節(jié)點(diǎn)的插入 (步驟 10 b.casNext(n, z))
- 準(zhǔn)備 Index 層的數(shù)據(jù) (idxs[i] = idx = new Index<K, V>(z, idx, null)), 準(zhǔn)備 HeadIndex的數(shù)據(jù), 進(jìn)行插入(步驟 23)
- 循環(huán)遍歷Index層, 進(jìn)行索引層的插入(步驟 33 q.link(r, t))
還迷糊, 沒事, 我們繼續(xù)細(xì)分拆解講:
6. doPut() 增加數(shù)據(jù)過程
6.1. ConcurrentSkipListMap在新建時(shí)的初始狀態(tài)如圖:
初始時(shí), 只存在 HeadIndex 和 Base_Header 節(jié)點(diǎn)
6.2. 進(jìn)行節(jié)點(diǎn)添加
6.2.1. 添加 key=1, value = A 節(jié)點(diǎn), 結(jié)果如圖:
涉及doPut操作:
1. doPut步驟1 尋找前繼節(jié)點(diǎn), 這時(shí)返回的 b = BaseHeader, n = null, 所以直接到 doPut 步驟 9
2. doPut步驟9, 直接 CAS操作設(shè)置next節(jié)點(diǎn)
3. 這里假設(shè) 步驟 12 中獲取的 level 是0(要知道獲得0的概率是很大的, 這個(gè)函數(shù)返回的最大值也就31, 也就是說, 最多有31層的索引)
4. 所以這時(shí) idx = null, 直接到步驟 28 break 出去皂林,操作結(jié)束
這里為了理解上的便利, 我們?cè)偬砑右粋€(gè)節(jié)點(diǎn), 最終效果圖如下:
6.2.2. 再次添加 key=3, value = C 節(jié)點(diǎn), 結(jié)果如圖:
這次增加了索引層 index 1
涉及doPut操作:
1. doPut步驟1 尋找前繼節(jié)點(diǎn), 這時(shí)返回的 b = node2, n = null, 所以直接到 doPut 步驟 9
2. doPut步驟9, 直接 CAS操作設(shè)置next節(jié)點(diǎn)
3. 進(jìn)入步驟 12, 假設(shè)我們獲取到 level = 1, 則步驟14 中 level <= max(max = 1)成立, 初始化一個(gè) idx
4. 最終找到要插入index位置, 進(jìn)行l(wèi)ink操作, 步驟 33
ps: 這時(shí)再put節(jié)點(diǎn) key=4 value = D (情形和 Node1, Node2 一樣), 最終結(jié)果:
6.2.3. 再次添加 key=5, value = E 節(jié)點(diǎn), 結(jié)果如圖:
這時(shí)發(fā)現(xiàn)索引層增加了一層
我們來看 doPut 操作:
1. doPut步驟1 尋找前繼節(jié)點(diǎn), 這時(shí)返回的 b = node2, n = null, 所以直接到 doPut 步驟 9
2. doPut步驟9, 直接 CAS操作設(shè)置next節(jié)點(diǎn)
3. 進(jìn)入步驟 12, 假設(shè)我們獲取到 level = 25, 則步驟14 中 level <= max(max = 1)不成立, 則進(jìn)入 步驟16, 這里我們發(fā)現(xiàn), 只要 level > max, 只是在原來的 max + 1, 就是指增加一層的索引
4. 進(jìn)行 indx 鏈表的初始化, 一共兩個(gè)鏈表節(jié)點(diǎn) (index是縱向的鏈表)
5. 步驟21, 其實(shí)這時(shí)只是在原來的 HeadIndex 的縱向鏈表上增加一個(gè)新節(jié)點(diǎn)
6. 步驟 23 CAS 新的 HeadIndex, 這里有個(gè)注意點(diǎn) h = newh, 而index是第二高的 Index, 為什么呢? 因?yàn)椴襟E 22中已經(jīng)將最高層的 HadeIndex與Index橫向連接起來了(奇妙吧, 一會(huì)橫向一會(huì)縱向)
7. 步驟 26, 這里的 insertionLevel 是第二高的 level, 而下面的 j 則是最高層的 level
8. 最后就是步驟30中的link操作(j > insertionLevel, 所以先 j-- , 再for loop, 接著就滿足了 )
我們?cè)诓迦雮€(gè) key = 11, value = w (步驟和 node1, node2 一樣, 這里省略了)
最終如圖:
6.2.4. 再次添加 key=8, value = W 節(jié)點(diǎn), 結(jié)果如圖:
這里和添加 key= 5 差不多, 唯一的區(qū)別就是 獲取的前繼節(jié)點(diǎn).right != null 而已, 所以不說了
總結(jié): 其實(shí)doPut方法就是獲取key對(duì)應(yīng)的前繼節(jié)點(diǎn), 然后cas設(shè)置next值, 隨后 生成隨機(jī) level(0-31之間), 若新的 level <= oldMaxLevel 則增加對(duì)應(yīng)的索引層, 若level > oldMaxLevel, 則 HeadIndex 也會(huì)隨之增加索引層;
上面doPut時(shí)每次都會(huì)調(diào)用 findPredecessor 來獲取前繼節(jié)點(diǎn), 那我們就看這個(gè)方法
7. findPredecessor() 尋找前繼節(jié)點(diǎn)
總體思路是: 從矩形鏈表的左上角的 HeadIndex 索引開始, 先向右, 遇到 null, 或 > key 時(shí)向下, 重復(fù)向右向下找, 一直找到 對(duì)應(yīng)的前繼節(jié)點(diǎn)(前繼節(jié)點(diǎn)就是小于 key 的最大節(jié)點(diǎn))
/**
* Returns a base-level node with key strictly less than given key,
* or the base-level header if there is no such node. Also
* unlinks indexes to deleted nodes found along the way. Callers
* rely on this side-effect of clearing indices to deleted nodes
* @param key the key
* @return a predecessor of the key
*/
private Node<K, V> findPredecessor(Object key, Comparator<? super K> cmp){
if(key == null)
throw new NullPointerException(); // don't postpone errors
for(;;){
for(Index<K, V> q = head, r = q.right, d;;){ // 1. 初始化數(shù)據(jù) q 是head, r 是 最頂層 h 的右Index節(jié)點(diǎn)
if(r != null){ // 2. 對(duì)應(yīng)的 r = null, 則進(jìn)行向下查找
Node<K, V> n = r.node;
K k = n.key;
if(n.value == null){ // 3. n.value = null 說明 節(jié)點(diǎn)n 正在刪除的過程中
if(!q.unlink(r)){ // 4. 在 index 層直接刪除 r 節(jié)點(diǎn), 若條件競(jìng)爭(zhēng)發(fā)生直接進(jìn)行break 到步驟1 , 重新從 head 節(jié)點(diǎn)開始查找
break; // restart
}
r = q.right; //reread r // 5. 刪除 節(jié)點(diǎn)r 成功, 獲取新的 r 節(jié)點(diǎn), 回到步驟 2 (還是從這層索引開始向右遍歷, 直到 r == null)
continue;
}
if(cpr(cmp, key, k) > 0){ // 6. 若 r.node.key < 參數(shù)key, 則繼續(xù)向右遍歷, continue 到 步驟 2處, 若 r.node.key > 參數(shù)key 直接跳到 步驟 7
q = r;
r = r.right;
continue;
}
}
if((d = q.down) == null){ // 7. 到這邊時(shí), 已經(jīng)到跳表的數(shù)據(jù)層, q.node < key < r的 或q.node < key 且 r == null; 所以直接返回 q.node
return q.node;
}
q = d; // 8 未到數(shù)據(jù)層, 進(jìn)行重新賦值向下走 (為什么向下走呢? 回過頭看看 跳表, 原來 上層的index 一般都是比下層的 index 個(gè)數(shù)少的)
r = d.right;
}
}
}
若尋找的過程不是很清楚, 直接看一下最上邊的那張圖
7. doGet() 獲取節(jié)點(diǎn)對(duì)應(yīng)的值
整個(gè)過程:
- 尋找 key 的前繼節(jié)點(diǎn) b (這時(shí)b.next = null || b.next > key, 則說明不存key對(duì)應(yīng)的 Node)
- 接著就判斷 b, b.next 與 key之間的關(guān)系(其中有些 helpDelete操作)
直接看代碼:
/**
* Gets value for key. Almost the same as findNode, but returns
* the found value (to avoid retires during ret-reads)
*
* 這個(gè) doGet 方法比較簡(jiǎn)單
* @param key the key
* @return the value, or null if absent
*/
private V doGet(Object key){
if(key == null){
throw new NullPointerException();
}
Comparator<? super K> cmp = comparator;
outer:
for(;;){
for(Node<K, V> b = findPredecessor(key, cmp), n = b.next;;){ // 1. 獲取 key 的前繼節(jié)點(diǎn) b, 其實(shí)這時(shí) n.key >= key
Object v; int c;
if(n == null){ // 2. n == null 說明 key 對(duì)應(yīng)的 node 不存在 所以直接 return null
break outer;
}
Node<K, V> f = n.next;
if(n != b.next){ // 3. 有另外的線程修改數(shù)據(jù), 重新來
break ;
}
if((v = n.value) == null){ // 4. n 是被刪除了的節(jié)點(diǎn), 進(jìn)行helpDelete 后重新再來
n.helpDelete(b, f);
break ;
}
if(b.value == null || v == n){ // 5. b已經(jīng)是刪除了的節(jié)點(diǎn), 則 break 后再來
break ;
}
if((c = cpr(cmp, key, n.key)) == 0){ // 6. 若 n.key = key 直接返回結(jié)果, 這里返回的結(jié)果有可能是 null
V vv = (V) v;
return vv;
}
if(c < 0){ // 7. c < 0說明不存在 key 的node 節(jié)點(diǎn)
break outer;
}
// 8. 運(yùn)行到這一步時(shí), 其實(shí)是 在調(diào)用 findPredecessor 后又有節(jié)點(diǎn)添加到 節(jié)點(diǎn)b的后面所致
b = n;
n = f;
}
}
return null;
}
8. doRemove() 刪除節(jié)點(diǎn)
整個(gè)刪除個(gè) ConcurrentSkipListMap 里面 nonBlockingLinkedList 實(shí)現(xiàn)的一大亮點(diǎn), 為什么呢? 因?yàn)檫@個(gè) nonBlockingLinkedList 同時(shí)支持并發(fā)安全的從鏈表中間添加/刪除操作, 而 ConcurrentLinkedQueue 只支持并發(fā)安全的從鏈表中間刪除;
刪除操作:
- 尋找對(duì)應(yīng)的節(jié)點(diǎn)
- 給節(jié)點(diǎn)的 value 至 null, node.value = null
- 將 node 有增加一個(gè)標(biāo)記節(jié)點(diǎn) (this.value = this 還記得哇, 不記得的直接看 node 類)
- 通過 CAS 直接將 K對(duì)應(yīng)的Node和標(biāo)記節(jié)點(diǎn)一同刪除
直接來看代碼:
/**
* Main deletion method. Locates node, nulls value, appends a
* deletion marker, unlinks predecessor, removes associated index
* nodes, and possibly reduces head index level
*
* Index nodes are cleared out simply by calling findPredecessor.
* which unlinks indexes to deleted nodes found along path to key,
* which will include the indexes to this node. This is node
* unconditionally. We can't check beforehand whether there are
* indexes hadn't been inserted yet for this node during initial
* search for it, and we'd like to ensure lack of garbage
* retention, so must call to be sure
*
* @param key the key
* @param value if non-null, the value that must be
* associated with key
* @return the node, or null if not found
*/
final V doRemove(Object key, Object value){
if(key == null){
throw new NullPointerException();
}
Comparator<? super K> cmp = comparator;
outer:
for(;;){
for(Node<K, V> b = findPredecessor(key, cmp), n = b.next;;){ // 1. 獲取對(duì)應(yīng)的前繼節(jié)點(diǎn) b
Object v; int c;
if(n == null){ // 2. 節(jié)點(diǎn) n 被刪除 直接 return null 返回 , 因?yàn)槔碚撋?b.key < key < n.key
break outer;
}
Node<K, V> f = n.next;
if(n != b.next){ // 3. 有其他線程在 節(jié)點(diǎn)b 后增加數(shù)據(jù), 重來
break ;
}
if((v = n.value) == null){ // 4. 節(jié)點(diǎn) n 被刪除, 調(diào)用 helpDelete 后重來
n.helpDelete(b, f);
break ;
}
if(b.value == null || v == n){ // 5. 節(jié)點(diǎn) b 刪除, 重來 調(diào)用findPredecessor時(shí)會(huì)對(duì) b節(jié)點(diǎn)對(duì)應(yīng)的index進(jìn)行清除, 而b借點(diǎn)吧本身會(huì)通過 helpDelete 來刪除
break ;
}
if((c = cpr(cmp, key, n.key)) < 0){ // 6. 若n.key < key 則說明 key 對(duì)應(yīng)的節(jié)點(diǎn)就不存在, 所以直接 return
break outer;
}
if(c > 0){ // 7. c>0 出現(xiàn)在 有其他線程在本方法調(diào)用findPredecessor后又在b 后增加節(jié)點(diǎn), 所以向后遍歷
b = n;
n = f;
continue ;
}
if(value != null && !value.equals(v)){ // 8. 若 前面的條件為真, 則不進(jìn)行刪除 (調(diào)用 doRemove 時(shí)指定一定要滿足 key value 都相同, 具體看 remove 方法)
break outer;
}
if(!n.casValue(v, null)){ // 9. 進(jìn)行數(shù)據(jù)的刪除
break ;
}
if(!n.appendMarker(f) || !b.casNext(n, f)){ // 10. 進(jìn)行 marker 節(jié)點(diǎn)的追加, 這里的第二個(gè) cas 不一定會(huì)成功, 但沒關(guān)系的 (第二個(gè) cas 是刪除 n節(jié)點(diǎn), 不成功會(huì)有 helpDelete 進(jìn)行刪除)
findNode(key); // 11. 對(duì) key 對(duì)應(yīng)的index 進(jìn)行刪除
}
else{
findPredecessor(key, cmp); //12. 對(duì) key 對(duì)應(yīng)的index 進(jìn)行刪除 10進(jìn)行操作失敗后通過 findPredecessor 進(jìn)行index 的刪除
if(head.right == null){
tryReduceLevel(); // 13. 進(jìn)行headIndex 對(duì)應(yīng)的index 層的刪除
}
}
V vv = (V) v;
return vv;
}
}
return null;
}
整個(gè)doRemove代碼比較簡(jiǎn)單,我就不說了..... 但問題來了, 為什么要?jiǎng)h除時(shí)增加 marker 節(jié)點(diǎn), 這到底什么作用?
先給大家一個(gè)結(jié)論: 沒有這個(gè)marker節(jié)點(diǎn)會(huì)有可能多刪除節(jié)點(diǎn), marker的存在保證之間節(jié)點(diǎn)一邊刪除一邊插入數(shù)據(jù) 是安全的
9. marker 節(jié)點(diǎn)的作用
如圖 demo1(沒有maker節(jié)點(diǎn)進(jìn)行刪除插入操作):
有三個(gè)鏈接在一起的Node (node 有三個(gè)屬性 key, value, next, 且所有操作都是 cas)
* +------+ +------+ +------+
* ... | A |------>| B |----->| C | ...
* +------+ +------+ +------+
Thread 1 | Thread 2 |
---|---|
if B.value == null 判斷B的value是否為null | |
B.value = null | |
A.casNext(B, B.next) | |
Node D = new Node(C); B.casNext(C, D); |
步驟:
1. Thread 2 準(zhǔn)備在B的后面插入一個(gè)節(jié)點(diǎn) D, 它先判斷 B.value == null, 發(fā)現(xiàn) B 沒被刪除(假設(shè) value = null 是刪除)
2. Thread1 對(duì) B 節(jié)點(diǎn)進(jìn)行刪除 B.value = null
3. Thread 1 直接設(shè)置 A的next是 B 的next (A.casNext(B, B.next)) 成功將節(jié)點(diǎn) B 刪除
4. 這時(shí) Thread 2 new 一個(gè)節(jié)點(diǎn) D, 直接設(shè)置 B.casNext(C, D) 成功
5. 最終效果, 因?yàn)楣?jié)點(diǎn)B被刪除掉, 所以節(jié)點(diǎn)D也沒有插入進(jìn)去(沒插進(jìn)去指不能從以A節(jié)點(diǎn)為head, 通過 next() 方法獲取到節(jié)點(diǎn)D)
原因: 若隊(duì)列在刪除的過程中沒有引入 maker 節(jié)點(diǎn), 有可能導(dǎo)致剛剛插入的節(jié)點(diǎn)無緣無故的消失
下面是一個(gè)例子:
原本: 有3各節(jié)點(diǎn) A, B, C 組成的隊(duì)列
A -> B -> C (A.next = B, B.next = C)
現(xiàn)在有兩個(gè)線程 (thread1, thread2) 進(jìn)行操作, thread1 進(jìn)行刪除節(jié)點(diǎn)B, thread2 在節(jié)點(diǎn)B后插入節(jié)點(diǎn)D
進(jìn)行操作前:
+------+ +------+ +------+
| A |------>| B |----->| C |
+------+ +------+ +------+
進(jìn)行操作后:
D 插入到節(jié)點(diǎn)B之后成功了, 可是隊(duì)列的 頭結(jié)點(diǎn)是 A, 通過A.next().next().next()..... 方法
無法訪問到節(jié)點(diǎn)D(D節(jié)點(diǎn)已經(jīng)丟失了)
+------+ +------+
| B |----->| D |
+------+ +------+
|
V
+------+ +------+
| A |------>----->----->| C |
+------+ +------+
如圖 demo2(有 marker 節(jié)點(diǎn)):
有三個(gè)鏈接在一起的Node (node 有三個(gè)屬性 key, value, next, 且所有操作都是 cas)
* +------+ +------+ +------+
* ... | A |------>| B |----->| C | ...
* +------+ +------+ +------+
Thread 1 | Thread 2 |
---|---|
if B.value == null 判斷B的value是否為null | |
B.value = null | |
M = new MarkerNode(C); B.caseNext(C, M); | |
A.casNext(B, B.next) | |
Node D = new Node(); B.casNext(C, D); |
步驟:
1. Thread 2 準(zhǔn)備在B的后面插入一個(gè)節(jié)點(diǎn) D, 它先判斷 B.value == null, 發(fā)現(xiàn) B 沒被刪除(假設(shè) value = null 是刪除)
2. Thread1 對(duì) B 節(jié)點(diǎn)進(jìn)行刪除 B.value = null
3. Thread 1 在B的后面追加一個(gè) MarkerNode M
4. Thread 1 將 B 與 M 一起刪除
5. 這時(shí)你會(huì)發(fā)現(xiàn) Thread 2 的 B.casNext(C, D) 發(fā)生的可能 :
1) 在Thread 1 設(shè)置 marker 節(jié)點(diǎn)前操作, 則B.casNext(C, D) 成功, B 與 marker 也一起刪除
2) 在Thread 1 設(shè)置maker之后, 刪除 b與marker之前, 則B.casNext(C, D) 操作失敗(b.next 變成maker了), 所以在代碼中加個(gè) loop 循環(huán)嘗試
3) 在Thread 1 刪除 B, marker 之后, 則B.casNext(C, D) 失敗(b.next變成maker, B.casNext(C,D) 操作失敗, 在 loop 中重新進(jìn)行嘗試插入)
6. 最終結(jié)論 maker 節(jié)點(diǎn)的存在致使 非阻塞鏈表能實(shí)現(xiàn)中間節(jié)點(diǎn)的刪除和插入同時(shí)安全進(jìn)行(反過來就是若沒有marker節(jié)點(diǎn), 有可能剛剛插入的數(shù)據(jù)就丟掉了)
10. ConcurrentSkipListMap 總結(jié)
整個(gè)代碼分析過來, 發(fā)現(xiàn)了很多有意思的東西, skip list, nonblockingLinkedList 等等!
ConcurrentSkipListMap 是一個(gè)基于 Skip List 實(shí)現(xiàn)的并發(fā)安全, 非阻塞讀/寫/刪除 的 Map, 最重要的是 它的value是有序存儲(chǔ)的, 而且其內(nèi)部是由縱橫鏈表組成, 在進(jìn)行java開發(fā)中有一定的運(yùn)用場(chǎng)景!
參考:
wiki 定義 Skip List
High Performance Dynamic Lock-Free Hash Tables and List-Based Sets