實(shí)現(xiàn)原理
ConcurrentHashMap使用分段鎖技術(shù),將數(shù)據(jù)分成一段一段的存儲(chǔ),然后給每一段數(shù)據(jù)配一把鎖,當(dāng)一個(gè)線程占用鎖訪問其中一個(gè)段數(shù)據(jù)的時(shí)候,其他段的數(shù)據(jù)也能被其他線程訪問抬闷,能夠?qū)崿F(xiàn)真正的并發(fā)訪問。如下圖是ConcurrentHashMap的內(nèi)部結(jié)構(gòu)圖:
?
從圖中可以看到耕突,ConcurrentHashMap內(nèi)部分為很多個(gè)Segment笤成,每一個(gè)Segment擁有一把鎖,然后每個(gè)Segment(繼承ReentrantLock)下面包含很多個(gè)HashEntry列表數(shù)組有勾。對(duì)于一個(gè)key疹启,需要經(jīng)過三次(為什么要hash三次下文會(huì)詳細(xì)講解)hash操作,才能最終定位這個(gè)元素的位置蔼卡,這三次hash分別為:
對(duì)于一個(gè)key喊崖,先進(jìn)行一次hash操作,得到hash值h1雇逞,也即h1 = hash1(key)荤懂;
將得到的h1的高幾位進(jìn)行第二次hash,得到hash值h2塘砸,也即h2 = hash2(h1高幾位)节仿,通過h2能夠確定該元素的放在哪個(gè)Segment;
將得到的h1進(jìn)行第三次hash掉蔬,得到hash值h3廊宪,也即h3 = hash3(h1)矾瘾,通過h3能夠確定該元素放置在哪個(gè)HashEntry。
初始化
構(gòu)造函數(shù)的源碼如下:
public ConcurrentHashMap(int initialCapacity,float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
if (concurrencyLevel > MAX_SEGMENTS)
concurrencyLevel = MAX_SEGMENTS;
// Find power-of-two sizes best matching arguments
int sshift = 0;
int ssize = 1;
while (ssize < concurrencyLevel) {
++sshift;
ssize <<= 1;
}
this.segmentShift = 32 - sshift;
this.segmentMask = ssize - 1;
if (initialCapacity > MAXIMUM_CAPACITY)
initialCapacity = MAXIMUM_CAPACITY;
int c = initialCapacity / ssize;
if (c * ssize < initialCapacity)
++c;
int cap = MIN_SEGMENT_TABLE_CAPACITY;
while (cap < c)
cap <<= 1;
// create segments and segments[0]
Segment<K,V> s0 = new Segment<K,V>(loadFactor, (int)(cap * loadFactor),(HashEntry<K,V>[])new HashEntry[cap]);
Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
this.segments = ss;
}
傳入的參數(shù)有initialCapacity箭启,loadFactor壕翩,concurrencyLevel這三個(gè)。
initialCapacity表示新創(chuàng)建的這個(gè)ConcurrentHashMap的初始容量傅寡,也就是上面的結(jié)構(gòu)圖中的Entry數(shù)量放妈。默認(rèn)值為static final int DEFAULT_INITIAL_CAPACITY = 16;
loadFactor表示負(fù)載因子,就是當(dāng)ConcurrentHashMap中的元素個(gè)數(shù)大于loadFactor * 最大容量時(shí)就需要rehash荐操,擴(kuò)容芜抒。默認(rèn)值為static final float DEFAULT_LOAD_FACTOR = 0.75f;
concurrencyLevel表示并發(fā)級(jí)別,這個(gè)值用來確定Segment的個(gè)數(shù)托启,Segment的個(gè)數(shù)是大于等于concurrencyLevel的第一個(gè)2的n次方的數(shù)宅倒。比如,如果concurrencyLevel為12驾中,13唉堪,14,15肩民,16這些數(shù),則Segment的數(shù)目為16(2的4次方)链方。默認(rèn)值為static final int DEFAULT_CONCURRENCY_LEVEL = 16;持痰。理想情況下ConcurrentHashMap的真正的并發(fā)訪問量能夠達(dá)到concurrencyLevel,因?yàn)橛衏oncurrencyLevel個(gè)Segment祟蚀,假如有concurrencyLevel個(gè)線程需要訪問Map工窍,并且需要訪問的數(shù)據(jù)都恰好分別落在不同的Segment中,則這些線程能夠無競爭地自由訪問(因?yàn)樗麄儾恍枰偁幫话焰i)前酿,達(dá)到同時(shí)訪問的效果患雏。這也是為什么這個(gè)參數(shù)起名為“并發(fā)級(jí)別”的原因。
初始化的一些動(dòng)作:
驗(yàn)證參數(shù)的合法性罢维,如果不合法淹仑,直接拋出異常。
concurrencyLevel也就是Segment的個(gè)數(shù)不能超過規(guī)定的最大Segment的個(gè)數(shù)肺孵,默認(rèn)值為static final int MAX_SEGMENTS = 1 << 16;匀借,如果超過這個(gè)值,設(shè)置為這個(gè)值平窘。
然后使用循環(huán)找到大于等于concurrencyLevel的第一個(gè)2的n次方的數(shù)ssize吓肋,這個(gè)數(shù)就是Segment數(shù)組的大小,并記錄一共向左按位移動(dòng)的次數(shù)sshift瑰艘,并令segmentShift = 32 - sshift是鬼,并且segmentMask的值等于ssize - 1肤舞,segmentMask的各個(gè)二進(jìn)制位都為1,目的是之后可以通過key的hash值與這個(gè)值做&運(yùn)算確定Segment的索引。
檢查給的容量值是否大于允許的最大容量值莹菱,如果大于該值寻馏,設(shè)置為該值。最大容量值為static final int MAXIMUM_CAPACITY = 1 << 30;杖爽。
然后計(jì)算每個(gè)Segment平均應(yīng)該放置多少個(gè)元素,這個(gè)值c是向上取整的值紫皇。比如初始容量為15慰安,Segment個(gè)數(shù)為4,則每個(gè)Segment平均需要放置4個(gè)元素聪铺。
最后創(chuàng)建一個(gè)Segment實(shí)例化焕,將其當(dāng)做Segment數(shù)組的第一個(gè)元素。
put操作铃剔,put操作的源碼如下:
public V put(K key, V value) {
Segment<K,V> s;
if (value == null)
throw new NullPointerException();
int hash = hash(key);
int j = (hash >>> segmentShift) & segmentMask;
if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck
(segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment
s = ensureSegment(j);
return s.put(key, hash, value, false);
}
操作步驟如下:
判斷value是否為null撒桨,如果為null,直接拋出異常键兜。
key通過一次hash運(yùn)算得到一個(gè)hash值凤类。(這個(gè)hash運(yùn)算下文詳說)
將得到hash值向右按位移動(dòng)segmentShift位,然后再與segmentMask做&運(yùn)算得到segment的索引j普气。
在初始化的時(shí)候我們說過segmentShift的值等于32-sshift谜疤,例如concurrencyLevel等于16,則sshift等于4现诀,則segmentShift為28夷磕。hash值是一個(gè)32位的整數(shù),將其向右移動(dòng)28位就變成這個(gè)樣子:
0000 0000 0000 0000 0000 0000 0000 xxxx仔沿,然后再用這個(gè)值與segmentMask做&運(yùn)算坐桩,也就是取最后四位的值。這個(gè)值確定Segment的索引封锉。
使用Unsafe的方式從Segment數(shù)組中獲取該索引對(duì)應(yīng)的Segment對(duì)象绵跷。
向這個(gè)Segment對(duì)象中put值,這個(gè)put操作也基本是一樣的步驟(通過&運(yùn)算獲取HashEntry的索引烘浦,然后set)抖坪。
get操作,get操作的源碼如下:
public V get(Object key) {
Segment<K,V> s; // manually integrate access methods to reduce overhead
HashEntry<K,V>[] tab;
int h = hash(key);
long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&(tab = s.table) != null) {
for (HashEntry<K,V> e = (HashEntry<K,V>)UNSAFE.getObjectVolatile(tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE); e != null; e = e.next) {
K k;
if ((k = e.key) == key || (e.hash == h && key.equals(k)))
return e.value;
}
}
return null;
}
操作步驟為:
和put操作一樣闷叉,先通過key進(jìn)行兩次hash確定應(yīng)該去哪個(gè)Segment中取數(shù)據(jù)擦俐。
使用Unsafe獲取對(duì)應(yīng)的Segment,然后再進(jìn)行一次&運(yùn)算得到HashEntry鏈表的位置握侧,然后從鏈表頭開始遍歷整個(gè)鏈表(因?yàn)镠ash可能會(huì)有碰撞蚯瞧,所以用一個(gè)鏈表保存)嘿期,如果找到對(duì)應(yīng)的key,則返回對(duì)應(yīng)的value值埋合,如果鏈表遍歷完都沒有找到對(duì)應(yīng)的key备徐,則說明Map中不包含該key,返回null甚颂。
size操作
size操作與put和get操作最大的區(qū)別在于蜜猾,size操作需要遍歷所有的Segment才能算出整個(gè)Map的大小,而put和get都只關(guān)心一個(gè)Segment振诬。假設(shè)我們當(dāng)前遍歷的Segment為SA蹭睡,那么在遍歷SA過程中其他的Segment比如SB可能會(huì)被修改,于是這一次運(yùn)算出來的size值可能并不是Map當(dāng)前的真正大小赶么。所以一個(gè)比較簡單的辦法就是計(jì)算Map大小的時(shí)候所有的Segment都Lock住肩豁,不能更新(包含put,remove等等)數(shù)據(jù)辫呻,計(jì)算完之后再Unlock清钥。這是普通人能夠想到的方案,但是牛逼的作者還有一個(gè)更好的Idea:先給3次機(jī)會(huì)放闺,不lock所有的Segment祟昭,遍歷所有Segment,累加各個(gè)Segment的大小得到整個(gè)Map的大小雄人,如果某相鄰的兩次計(jì)算獲取的所有Segment的更新的次數(shù)(每個(gè)Segment都有一個(gè)modCount變量从橘,這個(gè)變量在Segment中的Entry被修改時(shí)會(huì)加一,通過這個(gè)值可以得到每個(gè)Segment的更新操作的次數(shù))是一樣的础钠,說明計(jì)算過程中沒有更新操作,則直接返回這個(gè)值叉谜。如果這三次不加鎖的計(jì)算過程中Map的更新次數(shù)有變化旗吁,則之后的計(jì)算先對(duì)所有的Segment加鎖,再遍歷所有Segment計(jì)算Map大小停局,最后再解鎖所有Segment很钓。源代碼如下:
public int size() {
// Try a few times to get accurate count. On failure due to
// continuous async changes in table, resort to locking.
final Segment<K,V>[] segments = this.segments;
int size;
boolean overflow; // true if size overflows 32 bits
long sum; // sum of modCounts
long last = 0L; // previous sum
int retries = -1; // first iteration isn't retry
try {
for (;;) {
if (retries++ == RETRIES_BEFORE_LOCK) {
for (int j = 0; j < segments.length; ++j)
ensureSegment(j).lock(); // force creation
}
sum = 0L;
size = 0;
overflow = false;
for (int j = 0; j < segments.length; ++j) {
Segment<K,V> seg = segmentAt(segments, j);
if (seg != null) {
sum += seg.modCount;
int c = seg.count;
if (c < 0 || (size += c) < 0) overflow = true;
}
}
if (sum == last) break;
last = sum;
}
} finally {
if (retries > RETRIES_BEFORE_LOCK) {
for (int j = 0; j < segments.length; ++j) segmentAt(segments, j).unlock();
}
}
return overflow ? Integer.MAX_VALUE : size;
}
舉個(gè)例子:
一個(gè)Map有4個(gè)Segment,標(biāo)記為S1董栽,S2码倦,S3,S4锭碳,現(xiàn)在我們要獲取Map的size袁稽。計(jì)算過程是這樣的:第一次計(jì)算,不對(duì)S1擒抛,S2推汽,S3补疑,S4加鎖,遍歷所有的Segment歹撒,假設(shè)每個(gè)Segment的大小分別為1莲组,2,3暖夭,4锹杈,更新操作次數(shù)分別為:2,2迈着,3竭望,1,則這次計(jì)算可以得到Map的總大小為1+2+3+4=10寥假,總共更新操作次數(shù)為2+2+3+1=8市框;第二次計(jì)算,不對(duì)S1,S2,S3,S4加鎖糕韧,遍歷所有Segment枫振,假設(shè)這次每個(gè)Segment的大小變成了2,2萤彩,3粪滤,4,更新次數(shù)分別為3雀扶,2杖小,3,1愚墓,因?yàn)閮纱斡?jì)算得到的Map更新次數(shù)不一致(第一次是8予权,第二次是9)則可以斷定這段時(shí)間Map數(shù)據(jù)被更新,則此時(shí)應(yīng)該再試一次浪册;第三次計(jì)算扫腺,不對(duì)S1,S2村象,S3笆环,S4加鎖,遍歷所有Segment厚者,假設(shè)每個(gè)Segment的更新操作次數(shù)還是為3躁劣,2,3库菲,1账忘,則因?yàn)榈诙斡?jì)算和第三次計(jì)算得到的Map的更新操作的次數(shù)是一致的,就能說明第二次計(jì)算和第三次計(jì)算這段時(shí)間內(nèi)Map數(shù)據(jù)沒有被更新,此時(shí)可以直接返回第三次計(jì)算得到的Map的大小闪萄。最壞的情況:第三次計(jì)算得到的數(shù)據(jù)更新次數(shù)和第二次也不一樣梧却,則只能先對(duì)所有Segment加鎖再計(jì)算最后解鎖。
關(guān)于hash
術(shù)語英文解釋
哈希算法hash algorithm是一種將任意內(nèi)容的輸入轉(zhuǎn)換成相同長度輸出的加密方式败去,其輸出被稱為哈希值放航。
哈希表hash table根據(jù)設(shè)定的哈希函數(shù)H(key)和處理沖突方法將一組關(guān)鍵字映象到一個(gè)有限的地址區(qū)間上,并以關(guān)鍵字在地址區(qū)間中的象作為記錄在表中的存儲(chǔ)位置圆裕,這種表稱為哈希表或散列广鳍,所得存儲(chǔ)位置稱為哈希地址或散列地址。
hash的源代碼:
int h = hashSeed;
if ((0 != h) && (k instanceof String)) {
return sun.misc.Hashing.stringHash32((String) k);
}
h ^= k.hashCode();
// Spread bits to regularize both segment and index locations,
// using variant of single-word Wang/Jenkins hash.
h += (h << 15) ^ 0xffffcd7d;
h ^= (h >>> 10);
h += (h << 3);
h ^= (h >>> 6);
h += (h << 2) + (h << 14);
return h ^ (h >>> 16);
}
這里用到了Wang/Jenkins hash算法的變種吓妆,主要的目的是為了減少哈希沖突赊时,使元素能夠均勻的分布在不同的Segment上,從而提高容器的存取效率行拢。假如哈希的質(zhì)量差到極點(diǎn)祖秒,那么所有的元素都在一個(gè)Segment中,不僅存取元素緩慢舟奠,分段鎖也會(huì)失去意義竭缝。
舉個(gè)簡單的例子:
System.out.println(Integer.parseInt("0001111", 2) & 15);
System.out.println(Integer.parseInt("0011111", 2) & 15);
System.out.println(Integer.parseInt("0111111", 2) & 15);
System.out.println(Integer.parseInt("1111111", 2) & 15);
這些數(shù)字得到的hash值都是一樣的,全是15沼瘫,所以如果不進(jìn)行第一次預(yù)hash抬纸,發(fā)生沖突的幾率還是很大的,但是如果我們先把上例中的二進(jìn)制數(shù)字使用hash()函數(shù)先進(jìn)行一次預(yù)hash耿戚,得到的結(jié)果是這樣的:
0100|0111|0110|0111|1101|1010|0100|1110
1111|0111|0100|0011|0000|0001|1011|1000
0111|0111|0110|1001|0100|0110|0011|1110
1000|0011|0000|0000|1100|1000|0001|1010
可以看到每一位的數(shù)據(jù)都散開了湿故,并且ConcurrentHashMap中是使用預(yù)hash值的高位參與運(yùn)算的。比如之前說的先將hash值向右按位移動(dòng)28位膜蛔,再與15做&運(yùn)算坛猪,得到的結(jié)果都別為:4,15皂股,7砚哆,8,沒有沖突屑墨!
注意事項(xiàng)
ConcurrentHashMap中的key和value值都不能為null。
ConcurrentHashMap的整個(gè)操作過程中大量使用了Unsafe類來獲取Segment/HashEntry纷铣,這里Unsafe的主要作用是提供原子操作卵史。Unsafe這個(gè)類比較恐怖,破壞力極強(qiáng)搜立,一般場景不建議使用以躯,如果有興趣可以到這里做詳細(xì)的了解Java中鮮為人知的特性
ConcurrentHashMap是線程安全的類并不能保證使用了ConcurrentHashMap的操作都是線程安全的!
注:本文章在csdn上也有,都是本人書寫忧设,請(qǐng)勿轉(zhuǎn)載刁标!