數(shù)據(jù)結(jié)構(gòu)
ConcurrentHashMap 實現(xiàn)并發(fā)操作的原理
使用了鎖分段技術(shù):ConcurrentHashMap持有一組鎖(segment[]),并將數(shù)據(jù)盡可能分散在不同的鎖段中(即装畅,每個鎖只會控制部分的數(shù)據(jù)HashEntry[])靠娱。這樣如果寫操作的數(shù)據(jù)分布在不同的鎖中,那么寫操作將可并行操作掠兄。因此來實現(xiàn)一定數(shù)量(即像云,鎖數(shù)量)并發(fā)線程修改。
同時通過Unsafe.putOrderedObject蚂夕、UNSAFE.getObjectVolatile(??這兩個方法很重要苫费,下文會介紹)來操作segment[]、HashEntry[]的元素使得在提升了性能的情況下在并發(fā)環(huán)境下依舊能獲取到最新的數(shù)據(jù)双抽,同時HashEntry的value為volatile屬性百框,從而實現(xiàn)不加鎖的進行并發(fā)的讀操作,并且對該并發(fā)量并無限制牍汹。
注意铐维,中不使用volatile的屬性來實現(xiàn)segment[]和HashEntry[]在多線程間的可見性柬泽。因為如果是修改操作读恃,則在釋放鎖的時候就會將當前線程緩存中的數(shù)據(jù)寫到主存中佣渴,所以就無需在修改操作的過程中因修改volatile屬性字段而頻繁的寫線程內(nèi)存數(shù)據(jù)到主存中。
源碼解析
重要屬性
//散列映射表的默認初始容量為 16五垮。
static final int DEFAULT_INITIAL_CAPACITY = 16;
//散列映射表的默認裝載因子為 0.75睬棚,用于表示segment中包含的HashEntry元素的個數(shù)與HashEntry[]數(shù)組長度的比值第煮。當某個segment中包含的HashEntry元素的個數(shù)超過了HashEntry[]數(shù)組的長度與裝載因子的乘積時,將觸發(fā)擴容操作抑党。
static final float DEFAULT_LOAD_FACTOR = 0.75f;
//散列表的默認并發(fā)級別為 16包警。該值表示segment[]數(shù)組的長度,也就是鎖的總數(shù)底靠。
static final int DEFAULT_CONCURRENCY_LEVEL = 16;
//散列表的最大容量
static final int MAXIMUM_CAPACITY = 1 << 30;
//segment中HashEntry[]數(shù)組最小長度
static final int MIN_SEGMENT_TABLE_CAPACITY = 2;
//散列表的最大段數(shù)害晦,也就是segment[]數(shù)組的最大長度
static final int MAX_SEGMENTS = 1 << 16; // slightly conservative
//在執(zhí)行size()和containsValue(value)操作時,ConcurrentHashMap的做法是先嘗試 RETRIES_BEFORE_LOCK 次( 即,2次 )通過不鎖住segment的方式來統(tǒng)計暑中、查詢各個segment壹瘟,如果2次執(zhí)行過程中,容器的modCount發(fā)生了變化鳄逾,則再采用加鎖的方式來操作所有的segment
static final int RETRIES_BEFORE_LOCK = 2;
//segmentMask用于定位segment在segment[]數(shù)組中的位置稻轨。segmentMask是與運算的掩碼,等于segment[]數(shù)組size減1雕凹,掩碼的二進制各個位的值都是1( 因為澄者,數(shù)組長度總是2^N )。
final int segmentMask;
//segmentShift用于決定H(key)參與segmentMask與運算的位數(shù)(高位)请琳,這里指在從segment[]數(shù)組定位segment:通過key的哈希結(jié)果的高位與segmentMask進行與運算哈希的結(jié)果粱挡。(詳見下面舉例)
final int segmentShift;
//Segment 類繼承于 ReentrantLock 類,從而使得 Segment 對象能充當鎖的角色俄精。
final ConcurrentHashMap.Segment<K, V>[] segments;
重要對象
- ConcurrentHashMap.Segment
Segment 類繼承于 ReentrantLock 類询筏,從而使得 Segment 對象能充當鎖的角色,并且是一個可重入鎖竖慧。每個 Segment 對象維護其包含的若干個桶(即嫌套,HashEntry[])。
//最大自旋次數(shù)圾旨,若是單核則為1踱讨,多核則為64。該字段用于scanAndLockForPut砍的、scanAndLock方法
static final int MAX_SCAN_RETRIES =
Runtime.getRuntime().availableProcessors() > 1 ? 64 : 1;
/**
* table 是由 HashEntry 對象組成的數(shù)組
* 如果散列時發(fā)生碰撞痹筛,碰撞的 HashEntry 對象就以鏈表的形式鏈接成一個鏈表
* table 數(shù)組的元素代表散列映射表的一個桶
* 每個 table 守護整個 ConcurrentHashMap 數(shù)據(jù)總數(shù)的一部分
* 如果并發(fā)級別為 16,table 則維護 ConcurrentHashMap 數(shù)據(jù)總數(shù)的 1/16
*/
transient volatile HashEntry<K,V>[] table;
//segment中HashEntry的總數(shù)。 PS:注意JDK 7中該字段不是volatile的
transient int count;
//segment中數(shù)據(jù)被更新的次數(shù)
transient int modCount;
//當table中包含的HashEntry元素的個數(shù)超過本變量值時帚稠,觸發(fā)table的擴容
transient int threshold;
//裝載因子
final float loadFactor;
- ConcurrentHashMap.HashEntry
HashEntry封裝了key-value對谣旁,是一個單向鏈表結(jié)構(gòu),每個HashEntry節(jié)點都維護著next HashEntry節(jié)點的引用滋早。
static final class HashEntry<K,V>
final int hash;
final K key;
volatile V value;
//HashEntry鏈表中的下一個entry榄审。PS:JDK 7中該字段不是final的,意味著該字段可修改杆麸,而且也確實在remove方法中對該地段進行了修改
volatile HashEntry<K,V> next;
構(gòu)造方法
public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
if (concurrencyLevel > MAX_SEGMENTS)
concurrencyLevel = MAX_SEGMENTS;
// Find power-of-two sizes best matching arguments
int sshift = 0;
int ssize = 1;
while (ssize < concurrencyLevel) {
++sshift;
ssize <<= 1;
}
this.segmentShift = 32 - sshift;
this.segmentMask = ssize - 1;
if (initialCapacity > MAXIMUM_CAPACITY)
initialCapacity = MAXIMUM_CAPACITY;
int c = initialCapacity / ssize;
if (c * ssize < initialCapacity)
++c;
int cap = MIN_SEGMENT_TABLE_CAPACITY;
while (cap < c)
cap <<= 1;
// create segments and segments[0]
Segment<K,V> s0 =
new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
(HashEntry<K,V>[])new HashEntry[cap]);
Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
this.segments = ss;
}
a) 限制并發(fā)等級最大為MAX_SEGMENTS搁进,即2^16。
b) 計算真實的并發(fā)等級ssize昔头,必須是2的N次方饼问,即 ssize( actual_concurrency_level ) >= concurrency_level。
舉例:concurrencyLevel等于14减细,15或16,ssize都會等于16赢笨,即容器里鎖的個數(shù)也是16未蝌。
Q:為什么數(shù)組的長度都需要設(shè)計成2^N次方了?
A:這是因為元素在數(shù)組中的定位主要是通過H(key) & (數(shù)組長度 - 1)方式實現(xiàn)的茧妒,這樣我們稱(數(shù)組長度 - 1)為element_mask萧吠。那么假設(shè)有一個長度為16和長度為15的數(shù)組,他們element_mask分別為15和14桐筏。即array_16_element_mask = 15(二進制”1111”);array_15_element_mask = 14(二進制”1110”)纸型。你會發(fā)現(xiàn)所以和”1110”進行與操作結(jié)果的最后一位都是0,這就導(dǎo)致數(shù)組的’0001’梅忌、’0011’狰腌、’1001’、’0101’牧氮、’1101’琼腔、’0111’位置都無法存放數(shù)據(jù),這就導(dǎo)致了數(shù)組空間的浪費踱葛,以及數(shù)據(jù)沒有得到更好的分散丹莲。而使用array_16_element_mask = 15(二進制”1111”)則不會有該問題,數(shù)據(jù)可以分散到數(shù)組個每個索引位置尸诽。
c) sshift表示在通過H(key)來定位segment的index時甥材,參與到segmentMask掩碼與運算的H(key)高位位數(shù)。
d) 計算每個Segment中HashEntry[]數(shù)組的長度性含,根據(jù)數(shù)據(jù)均勻分配到各個segment的HashEntry[]中洲赵,并且數(shù)組長度必須是2的N次方的思路來獲取。注意,HashEntry[]數(shù)組的長度最小為2板鬓。
e) 創(chuàng)建一個Segment對象悲敷,將新建的Segment對象放入Segment[]數(shù)組中index為0的位置。這里只先構(gòu)建了Segnemt[]數(shù)組的一個元素俭令,則其他index的元素在被使用到時通過ensureSegment(index)方法來構(gòu)建后德。
重要方法
- segment的定位
int h = hash(key);
long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u))
通過key的二次哈希運算后再進行移位和與運算得到key在segment[]數(shù)組中所對應(yīng)的segment
a) hash(key)
private int hash(Object k) {
int h = hashSeed;
if ((0 != h) && (k instanceof String)) {
return sun.misc.Hashing.stringHash32((String) k);
}
h ^= k.hashCode();
// Spread bits to regularize both segment and index locations,
// using variant of single-word Wang/Jenkins hash.
h += (h << 15) ^ 0xffffcd7d;
h ^= (h >>> 10);
h += (h << 3);
h ^= (h >>> 6);
h += (h << 2) + (h << 14);
return h ^ (h >>> 16);
}
??這里之所以需要將key.hashCode再進行一次hash計算,是為了減少哈希沖突抄腔,使元素能夠均勻的分布在不同的Segment上瓢湃,從而提高容器的存取效率。
b) 取hash(key)結(jié)果的(32 - segmentShift)位數(shù)的高位和segmentMask掩碼進行與運算赫蛇。(其實绵患,與運算時,就是“hash(key)的高segmentMask(十進制值)位"于“segmentMask的二進制值”進行與操作悟耘,此時進行與操作的兩個數(shù)的有效二進制位數(shù)是一樣的了落蝙。)
c) 將b)的結(jié)果j進行 (j << SSHIFT) + SBASE 以得到key在segement[]數(shù)組中的位置
舉例:假如哈希的質(zhì)量差到極點,那么所有的元素都在一個Segment中暂幼,不僅存取元素緩慢筏勒,分段鎖也會失去意義。我做了一個測試旺嬉,不通過再哈希而直接執(zhí)行哈希計算管行。
System.out.println(Integer.parseInt("0001111", 2) & 15);
System.out.println(Integer.parseInt("0011111", 2) & 15);
System.out.println(Integer.parseInt("0111111", 2) & 15);
System.out.println(Integer.parseInt("1111111", 2) & 15);
計算后輸出的哈希值全是15,通過這個例子可以發(fā)現(xiàn)如果不進行再哈希邪媳,哈希沖突會非常嚴重捐顷,因為只要低位一樣,無論高位是什么數(shù)雨效,其哈希值總是一樣迅涮。我們再把上面的二進制數(shù)據(jù)進行再哈希后結(jié)果如下,為了方便閱讀徽龟,不足32位的高位補了0逗柴,每隔四位用豎線分割下。
0100 | 0111 | 0110 | 0111 | 1101 | 1010 | 0100 | 1110 |
---|---|---|---|---|---|---|---|
1111 | 0111 | 0100 | 0011 | 0000 | 0001 | 1011 | 1000 |
0111 | 0111 | 0110 | 1001 | 0100 | 0110 | 0011 | 1110 |
1000 | 0011 | 0000 | 0000 | 1100 | 1000 | 0001 | 1010 |
可以發(fā)現(xiàn)每一位的數(shù)據(jù)都散列開了顿肺,通過這種再哈希能讓數(shù)字的每一位都能參加到哈希運算當中戏溺,從而減少哈希沖突。
- HashEntry定位
int h = hash(key);
((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE)
主要通過對key進行二次hash運算屠尊,再講哈希結(jié)果和HashEntry[]的長度掩碼進行與運算得到key所對應(yīng)的HashEntry在數(shù)組中的索引旷祸。HashEntry的定位和Segment的定位方式很像,但是HashEntry少了將hash(key)的結(jié)果進行掩碼取高位后再與數(shù)組長度與操作讼昆,而是直接將hash(key)的結(jié)果和數(shù)組長度的掩碼進行與操作托享。其目的是避免兩次哈希后的值一樣,導(dǎo)致元素雖然在Segment里散列開了,但是卻沒有在HashEntry里散列開( 也就是說闰围,如果Segment和HashEntry的定位方式一樣赃绊,那么到同一個Segment的key都會落到該segment中的同一個HashEntry了 )。
Unsafe類中的putOrderedObject羡榴、getObjectVolatile方法
getObjectVolatile:使用volatile讀的語義獲取數(shù)據(jù)碧查,也就是通過getObjectVolatile獲取數(shù)據(jù)時回去主存中獲取最新的數(shù)據(jù)放到線程的緩存中,這能保證正確的獲取最新的數(shù)據(jù)校仑。
putOrderedObject:為了控制特定條件下的指令重排序和內(nèi)存可見性問題忠售,Java編譯器使用一種叫內(nèi)存屏障(Memory Barrier,或叫做內(nèi)存柵欄迄沫,Memory Fence)的CPU指令來禁止指令重排序稻扬。java中volatile寫入使用了內(nèi)存屏障中的LoadStore屏障規(guī)則,對于這樣的語句Load1; LoadStore; Store2羊瘩,在Store2及后續(xù)寫入操作被刷出前泰佳,保證Load1要讀取的數(shù)據(jù)被讀取完畢。volatile的寫所插入的storeLoad是一個耗時的操作尘吗,因此出現(xiàn)了一個對volatile寫的升級版本逝她,利用lazySet方法進行性能優(yōu)化,在實現(xiàn)上對volatile的寫只會在之前插入StoreStore屏障摇予,對于這樣的語句Store1; StoreStore; Store2汽绢,在Store2及后續(xù)寫入操作執(zhí)行前吗跋,保證Store1的寫入操作對其它處理器可見侧戴,也就是按順序的寫入。UNSAFE.putOrderedObject正是提供了這樣的語義跌宛,避免了寫寫指令重排序酗宋,但不保證內(nèi)存可見性,因此讀取時需借助volatile讀保證可見性疆拘。ensureSegment(k)
private Segment<K,V> ensureSegment(int k) {
final Segment<K,V>[] ss = this.segments;
long u = (k << SSHIFT) + SBASE; // raw offset
Segment<K,V> seg;
if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {
Segment<K,V> proto = ss[0]; // use segment 0 as prototype
int cap = proto.table.length;
float lf = proto.loadFactor;
int threshold = (int)(cap * lf);
HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap];
if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
== null) { // recheck
Segment<K,V> s = new Segment<K,V>(lf, threshold, tab);
while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
== null) {
if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))
break;
}
}
}
return seg;
}
根據(jù)計算得到的index從segment[]數(shù)組中獲取segment蜕猫,如果segment不存在,則創(chuàng)建一個segment并通過CAS算法放入segment[]數(shù)組中哎迄。這里的獲取和插入分別通過UNSAGE.getObjectVolatile(??保證獲取segment[]最新數(shù)據(jù))和UNSAFE.cmpareAndSwapObject(??保證原子性的將新建的segment插入segment[]數(shù)組回右,并使其他線程可見)實現(xiàn),并不直接對segment[]數(shù)組操作漱挚。
- HashEntry<K,V> scanAndLockForPut(K key, int hash, V value)
private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
HashEntry<K,V> first = entryForHash(this, hash);
HashEntry<K,V> e = first;
HashEntry<K,V> node = null;
int retries = -1; // negative while locating node
while (!tryLock()) {
HashEntry<K,V> f; // to recheck first below
if (retries < 0) {
if (e == null) {
if (node == null) // speculatively create node
node = new HashEntry<K,V>(hash, key, value, null);
retries = 0;
}
else if (key.equals(e.key))
retries = 0;
else
e = e.next;
}
else if (++retries > MAX_SCAN_RETRIES) {
lock();
break;
}
else if ((retries & 1) == 0 &&
(f = entryForHash(this, hash)) != first) {
e = first = f; // re-traverse if entry changed
retries = -1;
}
}
return node;
}
在put操作嘗試加鎖沒成功時翔烁,不是直接進入等待狀態(tài),而是調(diào)用??scanAndLockForPut()方法旨涝,該方法實現(xiàn)了:
a) 首次進入該方法蹬屹,重試次數(shù)retries初始值為-1。
b) 若retries為-1,則判斷查詢key對應(yīng)的HashEntry節(jié)點鏈中是否已經(jīng)存在了該節(jié)點慨默,如果還沒則預(yù)先創(chuàng)建一個新節(jié)點贩耐。然后將retries=0;
c) 然后嘗試MAX_SCAN_RETRIES次獲取鎖( 自旋鎖 )厦取,如果依舊沒能成功獲得鎖潮太,則進入等待狀態(tài)(互斥鎖)。
JDK7嘗試使用自旋鎖來提升性能蒜胖,好處在于:自旋鎖當前的線程不會掛起消别,而是一直處于running狀態(tài),這樣一旦能夠獲得鎖時就key在不進行上下文切換的情況下獲取到鎖台谢。
d) 如果在嘗試MAX_SCAN_RETRIES次獲取鎖的過程寻狂,key對應(yīng)的entry發(fā)送了變化,則將嘗試次數(shù)重置為-1朋沮,從第b)步驟重新開始
- void scanAndLock(Object key, int hash)
private void scanAndLock(Object key, int hash) {
// similar to but simpler than scanAndLockForPut
HashEntry<K,V> first = entryForHash(this, hash);
HashEntry<K,V> e = first;
int retries = -1;
while (!tryLock()) {
HashEntry<K,V> f;
if (retries < 0) {
if (e == null || key.equals(e.key))
retries = 0;
else
e = e.next;
}
else if (++retries > MAX_SCAN_RETRIES) {
lock();
break;
}
else if ((retries & 1) == 0 &&
(f = entryForHash(this, hash)) != first) {
e = first = f;
retries = -1;
}
}
}
在replace蛇券、remove操作嘗試加鎖沒成功時,不是直接進入等待狀態(tài)樊拓,而是調(diào)用??scanAndLock()方法纠亚。該方法是實現(xiàn)和scanAndLockForPut()差不了多少,主要的區(qū)別在于scanAndLockForPut()方法在key對應(yīng)entry不存在時是不會去創(chuàng)建一個HashEntry對象的筋夏。
- V get(Object key)
public V get(Object key) {
Segment<K,V> s; // manually integrate access methods to reduce overhead
HashEntry<K,V>[] tab;
int h = hash(key);
long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
(tab = s.table) != null) {
for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
(tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
e != null; e = e.next) {
K k;
if ((k = e.key) == key || (e.hash == h && key.equals(k)))
return e.value;
}
}
return null;
}
在JDK 7中g(shù)et的實現(xiàn)原理已經(jīng)和JDK 6不同了蒂胞,JDK 6通過volatile實現(xiàn)多線程間內(nèi)存的可見性。而JDK 7為了提升性能条篷,用UNSAFE.getObjectVolatile(...)來獲取segment[]數(shù)組和HashEntry[]數(shù)組中對應(yīng)index的最新值骗随。同時值得說明的是,當volatile引用一個數(shù)組時赴叹,數(shù)組中的元素是不具有volatile特性的鸿染,所以,也需要通過UNSAFE.getObjectVolatile(…)來獲取數(shù)組中真實的數(shù)據(jù)乞巧。
- put操作
public V put(K key, V value) {
Segment<K,V> s;
if (value == null)
throw new NullPointerException();
int hash = hash(key);
int j = (hash >>> segmentShift) & segmentMask;
if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck
(segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment
s = ensureSegment(j);
return s.put(key, hash, value, false);
}
a) 通過key算出對應(yīng)的segment在segment[]中的位置涨椒,如果對應(yīng)的segment不存在,則創(chuàng)建绽媒。
b) 將key蚕冬、value插入到segment中對應(yīng)的HashEntry中
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
HashEntry<K,V> node = tryLock() ? null :
scanAndLockForPut(key, hash, value);
V oldValue;
try {
HashEntry<K,V>[] tab = table;
int index = (tab.length - 1) & hash;
HashEntry<K,V> first = entryAt(tab, index);
for (HashEntry<K,V> e = first;;) {
if (e != null) {
K k;
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k))) {
oldValue = e.value;
if (!onlyIfAbsent) {
e.value = value;
++modCount;
}
break;
}
e = e.next;
}
else {
if (node != null)
node.setNext(first);
else
node = new HashEntry<K,V>(hash, key, value, first);
int c = count + 1;
if (c > threshold && tab.length < MAXIMUM_CAPACITY)
rehash(node);
else
setEntryAt(tab, index, node);
++modCount;
count = c;
oldValue = null;
break;
}
}
} finally {
unlock();
}
return oldValue;
}
a) 嘗試獲得鎖,如果失敗是辕,則調(diào)用scanAndLockForPut(...)通過自旋等待的方式獲得鎖囤热。注意,這里鎖操作鎖的只是當前這個segment免糕,而不會影響segment[]數(shù)組中其他的segment對象的寫操作赢乓。這是ConcurrentHashMap實現(xiàn)并發(fā)寫操作的精髓所在忧侧。通過分段鎖來支持一定并發(fā)量的寫操作,并通過volatile以及UNSAFE.getObjectVolatile牌芋、UNSAFE.putOrderedObject來實現(xiàn)不加鎖的讀操作蚓炬,也就是支持任何并發(fā)量的讀操作。
b) 計算key應(yīng)插入的HashEntry在HashEntry[]數(shù)組的index躺屁,并通過UNSAFE.getObjectVolatile(...)方式獲取最新的到HashEntry對象
c) 判斷HashEntry鏈中是否已經(jīng)存在該key了肯夏,如果存在則將key的value替換成新值,并將modCount加1
d) 如果HashEntry鏈中不存在該key犀暑,則將key-value出入到HashEntry鏈頭處驯击,并將count加1,但此時count還未更新到segment中耐亏。
e) 如果在count加1后發(fā)現(xiàn)目前HashEntry鏈表長度以及達到了閾值并且HashEntry的鏈表長度小于限制的最大長度徊都,則會進行HashEntry的擴容操作。注意广辰,在JDK 7中是確定當前put操作是會加入一個新節(jié)點情況下才會觸發(fā)擴容操作暇矫,而在JDK 6中,可能存在put操作只是替換一個已經(jīng)存在的key的value值的情況下也會觸發(fā)擴容操作择吊。
f) 如果count加1未觸發(fā)閾值李根,則通過UNSAFE.putOrderedObject(…)方式將最新的HashEntry更新到HashEntry[]數(shù)組中。
g) 更新segment中的modCount几睛、count值
h) 釋放鎖房轿。釋放鎖的操作會將當前線程緩存里的數(shù)據(jù)寫到主存中。
- rehash(HashEntry<K,V> node)
private void rehash(HashEntry<K,V> node) {
/*
* Reclassify nodes in each list to new table. Because we
* are using power-of-two expansion, the elements from
* each bin must either stay at same index, or move with a
* power of two offset. We eliminate unnecessary node
* creation by catching cases where old nodes can be
* reused because their next fields won't change.
* Statistically, at the default threshold, only about
* one-sixth of them need cloning when a table
* doubles. The nodes they replace will be garbage
* collectable as soon as they are no longer referenced by
* any reader thread that may be in the midst of
* concurrently traversing table. Entry accesses use plain
* array indexing because they are followed by volatile
* table write.
*/
HashEntry<K,V>[] oldTable = table;
int oldCapacity = oldTable.length;
int newCapacity = oldCapacity << 1;
threshold = (int)(newCapacity * loadFactor);
HashEntry<K,V>[] newTable =
(HashEntry<K,V>[]) new HashEntry[newCapacity];
int sizeMask = newCapacity - 1;
for (int i = 0; i < oldCapacity ; i++) {
HashEntry<K,V> e = oldTable[i];
if (e != null) {
HashEntry<K,V> next = e.next;
int idx = e.hash & sizeMask;
if (next == null) // Single node on list
newTable[idx] = e;
else { // Reuse consecutive sequence at same slot
HashEntry<K,V> lastRun = e;
int lastIdx = idx;
for (HashEntry<K,V> last = next;
last != null;
last = last.next) {
int k = last.hash & sizeMask;
if (k != lastIdx) {
lastIdx = k;
lastRun = last;
}
}
newTable[lastIdx] = lastRun;
// Clone remaining nodes
for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
V v = p.value;
int h = p.hash;
int k = h & sizeMask;
HashEntry<K,V> n = newTable[k];
newTable[k] = new HashEntry<K,V>(h, p.key, v, n);
}
}
}
}
int nodeIndex = node.hash & sizeMask; // add the new node
node.setNext(newTable[nodeIndex]);
newTable[nodeIndex] = node;
table = newTable;
}
當HashEntry的數(shù)量達到閾值時就會觸發(fā)HashEntry[]數(shù)組的擴容操作
a) 創(chuàng)建new HashEntry[]數(shù)組所森,new HashEntry[]數(shù)組的容量為old HashEntry的2倍
b) 設(shè)置新的閾值
c) 將old HashEntry[]數(shù)組中的內(nèi)容放入new HashEntry[]中囱持,這并不是盲目的將元素一一取出然后計算元素在new HashEntry的位置,然后插入必峰。這里Doug Lea做了一些優(yōu)化洪唐。
如果old HashEntry[]數(shù)組的元素HashEntry鏈表钻蹬,若該HashEntry鏈表的頭節(jié)點不存在next節(jié)點吼蚁,即說明該HashEntry鏈表是個單節(jié)點,則直接將HashEntry插入到new HashEntry[]數(shù)組對應(yīng)的位置中问欠。
因為new HashEntry[]的length是old HashEntry[]的2倍肝匆,所以對應(yīng)的new sizeMask比old sizeMask多了old HashEntry[] length的大小( 比如,old_HashEntry_array_length為8顺献,則old sizeMask為’0000 0111’旗国;new_HashEntry_array_length為16,則new sizeMask為’0000 1111’)注整。所以元素在new HashEntry[]的new index要么和old index一樣能曾,要么就是old_index + old_HashEntry_array_length度硝。因此我們可通過對節(jié)點的復(fù)用來減少不必要的節(jié)點創(chuàng)建,通過計算每個HashEntry鏈表中每個entry的new index值寿冕,如果存在從某個entry開始到該HashEntry鏈表末尾的所有entrys蕊程,它們的new index值都一樣,那么就該entry直接插入到new HashEntry[newIndex]中驼唱,當然最壞的請求就是該entry就是HashEntry鏈的最后一個entry藻茂。然后只需重建HashEntry中該entry之前的到鏈表頭的entry節(jié)點,分別將新構(gòu)建的entry插入到new HashEntry[]中玫恳。
再者辨赐,經(jīng)統(tǒng)計,在使用默認閾值的情況下京办,一般只有1/6的節(jié)點需要重新構(gòu)建最后將當前操作新構(gòu)建的節(jié)點加入到new HashEntry[]數(shù)組中
d) old HashEntry如果沒有其他讀線程操作引用時掀序,將會盡快被垃圾回收。
e) 擴容操作因為要重新構(gòu)建正整個HashEntry[]數(shù)組惭婿,所以不需要通過UNSAFE.putOrderedObject(...)方式將元素插入一個已經(jīng)存在的HashEntry[]中森枪,而是直接通過索引操作插入到new HashEntry[]數(shù)組就好,最后我們會將new HashEntry[]直接賦值給volatile tables字段审孽,這樣就可以保證new HashEntry[]對其他線程可見了remove操作
public V remove(Object key) {
int hash = hash(key);
Segment<K,V> s = segmentForHash(hash);
return s == null ? null : s.remove(key, hash, null);
}
a) 根據(jù)key計算出該key對應(yīng)的segment在segment[]數(shù)組中的index县袱,并獲取該segment。
b) 將key從該segment中移除
final V remove(Object key, int hash, Object value) {
if (!tryLock())
scanAndLock(key, hash);
V oldValue = null;
try {
HashEntry<K,V>[] tab = table;
int index = (tab.length - 1) & hash;
HashEntry<K,V> e = entryAt(tab, index);
HashEntry<K,V> pred = null;
while (e != null) {
K k;
HashEntry<K,V> next = e.next;
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k))) {
V v = e.value;
if (value == null || value == v || value.equals(v)) {
if (pred == null)
setEntryAt(tab, index, next);
else
pred.setNext(next);
++modCount;
--count;
oldValue = v;
}
break;
}
pred = e;
e = next;
}
} finally {
unlock();
}
return oldValue;
}
a) 嘗試獲得鎖佑力,如果失敗則調(diào)用scanAndLock(...)通過自旋等待的方式獲得鎖式散。
b) 獲取key鎖對應(yīng)的HashEntry鏈表,并在該HashEntry中找到key對應(yīng)entry節(jié)點
c) 如果key對應(yīng)的節(jié)點是在HashEntry鏈表頭打颤,則直接將key的next節(jié)點通過UNSAFE.putOrderedObject的方式這是為對HashEntry[]數(shù)組中對應(yīng)的位置暴拄,即使得next節(jié)點稱為成為鏈表頭。
d) 如果key不是HashEntry的鏈表頭節(jié)點编饺,則將key的前一個節(jié)點的next節(jié)點修改為key的next節(jié)點乖篷。額,這么說太繞了透且,舉個例子吧~
key對應(yīng)的節(jié)點:current_HashEntry撕蔼;current_HashEntry的前一個節(jié)點:pre_HashEntry;current_HashEntry的下一個節(jié)點:next_HashEntry
刪除前:
pre_HashEntry.next ——> current_HashEntry
current_HashEntry.next ——> next_HashEntry
刪除后:
pre_HashEntry.next ——> next_HashEntry
e) 修改segment屬性:modCount加1秽誊,count減1
f) 釋放鎖
- size()
public int size() {
// Try a few times to get accurate count. On failure due to
// continuous async changes in table, resort to locking.
final Segment<K,V>[] segments = this.segments;
int size;
boolean overflow; // true if size overflows 32 bits
long sum; // sum of modCounts
long last = 0L; // previous sum
int retries = -1; // first iteration isn't retry
try {
for (;;) {
if (retries++ == RETRIES_BEFORE_LOCK) {
for (int j = 0; j < segments.length; ++j)
ensureSegment(j).lock(); // force creation
}
sum = 0L;
size = 0;
overflow = false;
for (int j = 0; j < segments.length; ++j) {
Segment<K,V> seg = segmentAt(segments, j);
if (seg != null) {
sum += seg.modCount;
int c = seg.count;
if (c < 0 || (size += c) < 0)
overflow = true;
}
}
if (sum == last)
break;
last = sum;
}
} finally {
if (retries > RETRIES_BEFORE_LOCK) {
for (int j = 0; j < segments.length; ++j)
segmentAt(segments, j).unlock();
}
}
return overflow ? Integer.MAX_VALUE : size;
}
a) 會先嘗試RETRIES_BEFORE_LOCK次( 即2次 )不加鎖的情況下鲸沮,將segment[]數(shù)組中每個segment的count累加,同時也會將每個segment的modCount進行累加锅论。如果兩次不加鎖的操作后讼溺,modCountSum值是一樣的,這說明在這兩次累加segmentcount的過程中ConcurrentHashMap沒有發(fā)生結(jié)構(gòu)性變化最易,那么就直接返回累加的count值
b) 如果在兩次累加segment的count操作期間ConcurrentHashMap發(fā)生了結(jié)構(gòu)性改變怒坯,則會通過將所有的segment都加鎖炫狱,然后重新進行count的累加操作。在完成count的累加操作后剔猿,釋放所有的鎖毕荐。最后返回累加的count值。
c) 注意艳馒,如果累加的count值大于了Integer.MAX_VALUE憎亚,則返回Integer.MAX_VALUE。
弱一致性
相對于HashMap的fast-fail弄慰,ConcurrentHashMap的迭代器并不會拋出ConcurrentModificationException異常第美。這是由于ConcurrentHashMap的讀行為是弱一致性的。
也就是說陆爽,在同時對一個segment進行讀線程和寫線程操作時什往,并不保證寫操作的行為能被并行允許的讀線程所感知。
比如慌闭,當一個寫線程和讀線程并發(fā)的對同一個key進行操作時:寫線程在操作一個put操作别威,如果這個時候put的是一個已經(jīng)存在的key值,則會替換該key對應(yīng)的value值驴剔,因為value是volatile屬性的省古,所以該替換操作時能立即被讀線程感知的。但如果此時的put是要新插入一個entry丧失,則存在兩種情況:①在寫線程通過UNSAFE.putOrderedObject方式將新entry插入到HashEntry鏈表后豺妓,讀線程才通過UNSAFE.getObjectVolatile來獲取對應(yīng)的HashEntry鏈表,那么這個時候讀線程是能夠獲取到這個新插入的entry的布讹;②反之琳拭,如果讀線程的UNSAFE.getObjectVolatile操作在寫線程的UNSAFE.putOrderedObject之前,則就無法感知到這個新加入的entry了描验。
其實在大多數(shù)并發(fā)的業(yè)務(wù)邏輯下白嘁,我們是允許這樣的弱一致性存在的。如果你的業(yè)務(wù)邏輯不允許這樣的弱一致性存在的膘流,你可以考慮對segment中的HashEntry鏈表的讀操作加鎖絮缅,或者將segment改造成讀寫鎖模式。但這都將大大降低ConcurrentHashMap的性能并且使得你的程序變得復(fù)雜且難以維護睡扬∶蓑迹或許你該考慮使用其他的存儲模型代替ConcurrentHashMap黍析。
后記
雖然JDK 8已經(jīng)出來很久了卖怜,但是我還是花了很多時間在JDK 7的ConcurrentHashMap上,一個很重要的原因是阐枣,我認為ConcurrentHashMap在并發(fā)模式下的設(shè)計思想是很值得我們深究和學(xué)習的马靠,無論是jdk7相對于jdk6的各種細節(jié)和性能上的優(yōu)化奄抽,還是jdk8的大改造都是對并發(fā)編程各種模式很好的學(xué)習。文章還有很多可以繼續(xù)深入挖掘的點甩鳄,希望在后期的學(xué)習中能夠繼續(xù)完善~
參考
http://www.infoq.com/cn/articles/ConcurrentHashMap/
http://www.blogjava.net/DLevin/archive/2013/10/18/405030.html
https://my.oschina.net/7001/blog/896587