ConcurrentHashMap通過(guò)將完整的表分成若干個(gè)segment的方式實(shí)現(xiàn)鎖分離农尖,每個(gè)segment都是一個(gè)獨(dú)立的線程安全的Hash表姜挺,當(dāng)需要操作數(shù)據(jù)時(shí)绝骚,HashMap通過(guò)Key的hash值和segment數(shù)量來(lái)路由到某個(gè)segment伟桅,剩下的操作交給segment來(lái)完成。
下面來(lái)看下segment的實(shí)現(xiàn):
segment是一類特殊的hash表箱沦,繼承了ReentrantLock類實(shí)現(xiàn)鎖功能
static final class Segment<K,V> extends ReentrantLock implements Serializable {
/*
1.segment的讀操作不需要加鎖稼病,但需要volatile讀
2.當(dāng)進(jìn)行擴(kuò)容時(shí)(調(diào)用reHash方法)绕沈,需要拷貝原始數(shù)據(jù)肮韧,在拷貝數(shù)據(jù)上操作融蹂,保證在擴(kuò)容完成前讀操作仍可以在原始數(shù)據(jù)上進(jìn)行旺订。
3.只有引起數(shù)據(jù)變化的操作需要加鎖。
4.scanAndLock(刪除超燃、替換)/scanAndLockForPut(新增)兩個(gè)方法提供了獲取鎖的途徑区拳,是通過(guò)自旋鎖實(shí)現(xiàn)的。
5.在等待獲取鎖的過(guò)程中意乓,兩個(gè)方法都會(huì)對(duì)目標(biāo)數(shù)據(jù)進(jìn)行查找樱调,每次查找都會(huì)與上次查找的結(jié)果對(duì)比,雖然查找結(jié)果不會(huì)被調(diào)用它的方法使用届良,但是這樣做可以減少后續(xù)操作可能的cache miss笆凌。
*/
private static final long serialVersionUID = 2249069246763182397L;
/*
自旋鎖的等待次數(shù)上限,多處理器時(shí)64次伙窃,單處理器時(shí)1次菩颖。
每次等待都會(huì)進(jìn)行查詢操作样漆,當(dāng)?shù)却螖?shù)超過(guò)上限時(shí)为障,不再自旋,調(diào)用lock方法等待獲取鎖放祟。
*/
static final int MAX_SCAN_RETRIES =
Runtime.getRuntime().availableProcessors() > 1 ? 64 : 1;
/*
segment中的hash表鳍怨,與hashMap結(jié)構(gòu)相同,表中每個(gè)元素都是一個(gè)鏈表跪妥。
*/
transient volatile HashEntry<K,V>[] table;
/*
表中元素個(gè)數(shù)
*/
transient int count;
/*
記錄數(shù)據(jù)變化操作的次數(shù)鞋喇。
這一數(shù)值主要為Map的isEmpty和size方法提供同步操作檢查,這兩個(gè)方法沒(méi)有為全表加鎖眉撵。
在統(tǒng)計(jì)segment.count前后侦香,都會(huì)統(tǒng)計(jì)segment.modCount,如果前后兩次值發(fā)生變化纽疟,可以判斷在統(tǒng)計(jì)count期間有segment發(fā)生了其它操作罐韩。
*/
transient int modCount;
/*
容量閾值,超過(guò)這一數(shù)值后segment將進(jìn)行擴(kuò)容污朽,容量變?yōu)樵瓉?lái)的兩倍散吵。
threshold = loadFactor*table.length
*/
transient int threshold;
final float loadFactor;
Segment(float lf, int threshold, HashEntry<K,V>[] tab) {
this.loadFactor = lf;
this.threshold = threshold;
this.table = tab;
}
/*
onlyIfAbsent:若為true,當(dāng)key已經(jīng)有對(duì)應(yīng)的value時(shí)蟆肆,不進(jìn)行替換矾睦;
若為false,即使key已經(jīng)有對(duì)應(yīng)的value炎功,仍進(jìn)行替換枚冗。
關(guān)于put方法,很重要的一點(diǎn)是segment最大長(zhǎng)度的問(wèn)題:
代碼 c > threshold && tab.length < MAXIMUM_CAPACITY 作為是否需要擴(kuò)容的判斷條件蛇损。
擴(kuò)容條件是node總數(shù)超過(guò)閾值且table長(zhǎng)度小于MAXIMUM_CAPACITY也就是2的30次冪赁温。
由于擴(kuò)容都是容量翻倍肛宋,所以tab.length最大值就是2的30次冪。此后束世,即使node總數(shù)超過(guò)了閾值酝陈,也不會(huì)擴(kuò)容了。
由于table[n]對(duì)應(yīng)的是一個(gè)鏈表毁涉,鏈表內(nèi)元素個(gè)數(shù)理論上是無(wú)限的沉帮,所以segment的node總數(shù)理論上也是無(wú)上限的。
ConcurrentHashMap的size()方法考慮到了這個(gè)問(wèn)題贫堰,當(dāng)計(jì)算結(jié)果超過(guò)Integer.MAX_VALUE時(shí)穆壕,直接返回Integer.MAX_VALUE.
*/
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
//tryLock判斷是否已經(jīng)獲得鎖.
//如果沒(méi)有獲得,調(diào)用scanAndLockForPut方法自旋等待獲得鎖其屏。
HashEntry<K,V> node = tryLock() ? null :
scanAndLockForPut(key, hash, value);
V oldValue;
try {
HashEntry<K,V>[] tab = table;
//計(jì)算key在表中的下標(biāo)
int index = (tab.length - 1) & hash;
//獲取鏈表的第一個(gè)node
HashEntry<K,V> first = entryAt(tab, index);
for (HashEntry<K,V> e = first;;) {
//鏈表下一個(gè)node不為空喇勋,比較key值是否相同。
//相同的偎行,根據(jù)onlyIfAbsent決定是否替換已有的值
if (e != null) {
K k;
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k))) {
oldValue = e.value;
if (!onlyIfAbsent) {
e.value = value;
++modCount;
}
break;
}
e = e.next;
}
else {
//鏈表遍歷到最后一個(gè)node川背,仍沒(méi)有找到key值相同的.
//此時(shí)應(yīng)當(dāng)生成新的node,將node的next指向鏈表表頭蛤袒,這樣新的node將處于鏈表的【表頭】位置
if (node != null)
//scanAndLockForPut當(dāng)且僅當(dāng)hash表中沒(méi)有該key值時(shí)
//才會(huì)返回新的node熄云,此時(shí)node不為null
node.setNext(first);
else
//node為null,表明scanAndLockForPut過(guò)程中找到了key值相同的node
//可以斷定在等待獲取鎖的過(guò)程中妙真,這個(gè)node被刪除了缴允,此時(shí)需要新建一個(gè)node
node = new HashEntry<K,V>(hash, key, value, first);
//添加新的node涉及到擴(kuò)容,當(dāng)node數(shù)量超過(guò)閾值時(shí)珍德,調(diào)用rehash方法進(jìn)行擴(kuò)容练般,并將新的node加入對(duì)應(yīng)鏈表表頭;
//沒(méi)有超過(guò)閾值锈候,直接加入鏈表表頭薄料。
int c = count + 1;
if (c > threshold && tab.length < MAXIMUM_CAPACITY)
rehash(node);
else
setEntryAt(tab, index, node);
++modCount;
count = c;
oldValue = null;
break;
}
}
} finally {
unlock();
}
return oldValue;
}
/*
hash表容量翻倍,將需要添加的node添加到擴(kuò)容后的表中晴及。
hash表默認(rèn)初始長(zhǎng)度為16都办,實(shí)際長(zhǎng)度總是2的n次冪。
設(shè)當(dāng)前table長(zhǎng)度為S虑稼,根據(jù)key的hash值計(jì)算table中下標(biāo)index的公式:
擴(kuò)容前:oldIndex = (S-1)&hash
擴(kuò)容后:newIndex = (S<<1-1)&hash
擴(kuò)容前后下標(biāo)變化:newIndex-oldIndex = S&hash
所以琳钉,擴(kuò)容前后node所在鏈表在table中的下標(biāo)要么不變,要么右移S蛛倦。
根據(jù)本方法官方注釋說(shuō)明歌懒,大約六分之一的node需要移動(dòng)。
對(duì)于每個(gè)鏈表溯壶,處理方法如下:
步驟一:對(duì)于鏈表中的每個(gè)node及皂,計(jì)算node和node.next的新下標(biāo)甫男,如果它們不相等,記錄最后一次出現(xiàn)這種情況時(shí)的node.next验烧,記為nodeSpecial板驳。
這一部分什么意思呢,假設(shè)table[n]所在的鏈表共有6個(gè)node碍拆,計(jì)算它們的新下標(biāo):
情況1:若計(jì)算結(jié)果為0:n,1:n+S,2:n,3:n+S,4:n,5:n若治,那么我們記錄的特殊node編號(hào)為4;
情況2:若計(jì)算結(jié)果為0:n,1:n+S,2:n,3:n+S,4:n+S,5:n+S感混,那么我們記錄的特殊node編號(hào)為3端幼;
情況3:若計(jì)算結(jié)果為0:n,1:n,2:n,3:n,4:n,5:n,特殊node為0;
情況4:若計(jì)算結(jié)果為0:n+S,1:n+S,2:n+S,3:n+S,4:n+S,5:n+S,特殊node為0。
很重要的一點(diǎn)弧满,由于新下標(biāo)只可能是n或n+S婆跑,因此這兩個(gè)位置的鏈表中不會(huì)出現(xiàn)來(lái)自其它鏈表的node。
對(duì)于情況3庭呜,令table[n]=node0滑进,進(jìn)入步驟三;
對(duì)于情況4疟赊,令table[n+S]=node0郊供,進(jìn)入步驟三峡碉;
對(duì)于情況1近哟,令table[n]=node4,進(jìn)入步驟二鲫寄;
對(duì)于情況2吉执,令table[n+S]=node3,進(jìn)入步驟二。
步驟二:從node0遍歷至nodeSpecial的前一個(gè)node地来,對(duì)于每一個(gè)node戳玫,計(jì)算它的index值,令node.next=table[index],table[index]=node,也就是說(shuō)未斑,將node放于對(duì)應(yīng)鏈表的表頭咕宿。
步驟三:計(jì)算需要插入的node的下標(biāo)index,同樣令node.next=table[index],table[index]=node,將node插入鏈表表頭蜡秽。
通過(guò)三步完成了鏈表的擴(kuò)容和新node的插入府阀。
在理解這一部分代碼的過(guò)程中,牢記三點(diǎn):
1.調(diào)用rehash方法的前提是已經(jīng)獲得了鎖芽突,所以擴(kuò)容過(guò)程中不存在其他線程修改數(shù)據(jù)试浙;
2.新的下標(biāo)只有兩種情況,原始下標(biāo)n或者新下標(biāo)n+S寞蚌;
3.通過(guò)2可以推出田巴,原表中不在同一鏈表的node钠糊,在新表中仍不會(huì)出現(xiàn)在同一鏈表中。
*/
@SuppressWarnings("unchecked")
private void rehash(HashEntry<K,V> node) {
//拷貝table壹哺,所有操作都在oldTable上進(jìn)行抄伍,不會(huì)影響無(wú)需獲得鎖的讀操作
HashEntry<K,V>[] oldTable = table;
int oldCapacity = oldTable.length;
int newCapacity = oldCapacity << 1;//容量翻倍
threshold = (int)(newCapacity * loadFactor);//更新閾值
HashEntry<K,V>[] newTable =
(HashEntry<K,V>[]) new HashEntry[newCapacity];
int sizeMask = newCapacity - 1;
for (int i = 0; i < oldCapacity ; i++) {
HashEntry<K,V> e = oldTable[i];
if (e != null) {
HashEntry<K,V> next = e.next;
int idx = e.hash & sizeMask;//新的table下標(biāo),定位鏈表
if (next == null)
//鏈表只有一個(gè)node管宵,直接賦值
newTable[idx] = e;
else {
HashEntry<K,V> lastRun = e;
int lastIdx = idx;
//這里獲取特殊node
for (HashEntry<K,V> last = next;
last != null;
last = last.next) {
int k = last.hash & sizeMask;
if (k != lastIdx) {
lastIdx = k;
lastRun = last;
}
}
//步驟一中的table[n]賦值過(guò)程
newTable[lastIdx] = lastRun;
// 步驟二逝慧,遍歷剩余node,插入對(duì)應(yīng)表頭
for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
V v = p.value;
int h = p.hash;
int k = h & sizeMask;
HashEntry<K,V> n = newTable[k];
newTable[k] = new HashEntry<K,V>(h, p.key, v, n);
}
}
}
}
//步驟三啄糙,處理需要插入的node
int nodeIndex = node.hash & sizeMask;
node.setNext(newTable[nodeIndex]);
newTable[nodeIndex] = node;
//將擴(kuò)容后的hashTable賦予table
table = newTable;
}
/*
put方法調(diào)用本方法獲取鎖笛臣,通過(guò)自旋鎖等待其他線程釋放鎖。
變量retries記錄自旋鎖循環(huán)次數(shù)隧饼,當(dāng)retries超過(guò)MAX_SCAN_RETRIES時(shí)沈堡,不再自旋,調(diào)用lock方法等待鎖釋放燕雁。
變量first記錄hash計(jì)算出的所在鏈表的表頭node诞丽,每次循環(huán)結(jié)束,重新獲取表頭node拐格,與first比較僧免,如果發(fā)生變化,說(shuō)明在自旋期間捏浊,有新的node插入了鏈表懂衩,retries計(jì)數(shù)重置。
自旋過(guò)程中金踪,會(huì)遍歷鏈表浊洞,如果發(fā)現(xiàn)不存在對(duì)應(yīng)key值的node,創(chuàng)建一個(gè)胡岔,這個(gè)新node可以作為返回值返回法希。
根據(jù)官方注釋,自旋過(guò)程中遍歷鏈表是為了緩存預(yù)熱靶瘸,減少hash表經(jīng)常出現(xiàn)的cache miss
*/
private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
HashEntry<K,V> first = entryForHash(this, hash);
HashEntry<K,V> e = first;
HashEntry<K,V> node = null;
int retries = -1; //自旋次數(shù)計(jì)數(shù)器
while (!tryLock()) {
HashEntry<K,V> f;
if (retries < 0) {
if (e == null) {
//鏈表為空或者遍歷至鏈表最后一個(gè)node仍沒(méi)有找到匹配
if (node == null)
node = new HashEntry<K,V>(hash, key, value, null);
retries = 0;
}
else if (key.equals(e.key))
retries = 0;
else
e = e.next;
}
else if (++retries > MAX_SCAN_RETRIES) {
lock();
break;
}
else if ((retries & 1) == 0 &&
(f = entryForHash(this, hash)) != first) {
//比較first與新獲得的鏈表表頭node是否一致苫亦,如果不一致,說(shuō)明該鏈表別修改過(guò)怨咪,自旋計(jì)數(shù)重置
e = first = f;
retries = -1;
}
}
return node;
}
/*
remove,replace方法會(huì)調(diào)用本方法獲取鎖屋剑,通過(guò)自旋鎖等待其他線程釋放鎖。
與scanAndLockForPut機(jī)制相似惊暴。
*/
private void scanAndLock(Object key, int hash) {
// similar to but simpler than scanAndLockForPut
HashEntry<K,V> first = entryForHash(this, hash);
HashEntry<K,V> e = first;
int retries = -1;
while (!tryLock()) {
HashEntry<K,V> f;
if (retries < 0) {
if (e == null || key.equals(e.key))
retries = 0;
else
e = e.next;
}
else if (++retries > MAX_SCAN_RETRIES) {
lock();
break;
}
else if ((retries & 1) == 0 &&
(f = entryForHash(this, hash)) != first) {
e = first = f;
retries = -1;
}
}
}
/*
刪除key-value都匹配的node饼丘,刪除過(guò)程很簡(jiǎn)單:
1.根據(jù)hash計(jì)算table下標(biāo)index。
2.根據(jù)index定位鏈表辽话,遍歷鏈表node肄鸽,如果存在node的key值和value值都匹配卫病,刪除該node。
3.令node的前一個(gè)節(jié)點(diǎn)pred的pred.next = node.next典徘。
*/
final V remove(Object key, int hash, Object value) {
//獲得鎖
if (!tryLock())
scanAndLock(key, hash);
V oldValue = null;
try {
HashEntry<K,V>[] tab = table;
int index = (tab.length - 1) & hash;
HashEntry<K,V> e = entryAt(tab, index);
HashEntry<K,V> pred = null;
while (e != null) {
K k;
HashEntry<K,V> next = e.next;
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k))) {
V v = e.value;
if (value == null || value == v || value.equals(v)) {
if (pred == null)
setEntryAt(tab, index, next);
else
pred.setNext(next);
++modCount;
--count;
oldValue = v;
}
break;
}
pred = e;
e = next;
}
} finally {
unlock();
}
return oldValue;
}
/*
找到hash表中key-oldValue匹配的node蟀苛,替換為newValue,替換過(guò)程與replace方法類似,不再贅述了逮诲。
*/
final boolean replace(K key, int hash, V oldValue, V newValue) {
if (!tryLock())
scanAndLock(key, hash);
boolean replaced = false;
try {
HashEntry<K,V> e;
for (e = entryForHash(this, hash); e != null; e = e.next) {
K k;
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k))) {
if (oldValue.equals(e.value)) {
e.value = newValue;
++modCount;
replaced = true;
}
break;
}
}
} finally {
unlock();
}
return replaced;
}
final V replace(K key, int hash, V value) {
if (!tryLock())
scanAndLock(key, hash);
V oldValue = null;
try {
HashEntry<K,V> e;
for (e = entryForHash(this, hash); e != null; e = e.next) {
K k;
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k))) {
oldValue = e.value;
e.value = value;
++modCount;
break;
}
}
} finally {
unlock();
}
return oldValue;
}
/*
清空segment帜平,將每個(gè)鏈表置為空,count置為0梅鹦,剩下的工作交給GC裆甩。
*/
final void clear() {
lock();
try {
HashEntry<K,V>[] tab = table;
for (int i = 0; i < tab.length ; i++)
setEntryAt(tab, i, null);
++modCount;
count = 0;
} finally {
unlock();
}
}
}
以上就是ConcurrentHashMap中關(guān)于segment部分的源碼∑胨簦可以看出嗤栓,關(guān)于修改操作的線程安全已經(jīng)被封裝在segment中,甚至在ConcurrentHashMap中都不需要關(guān)心鎖的問(wèn)題箍邮≤运В可以說(shuō),segment是整個(gè)ConcurrentHashMap的核心和關(guān)鍵锭弊。