1 概述
ConcurrentHashMap和HashMap一樣都是基于散列的容器蔫饰,ConcurrentHashMap可以認(rèn)為是一種線程安全HashMap荷憋,它使用了一中完全不同的加鎖策略提高并發(fā)性和伸縮性扛或。
ConcurrentHashMap并不是將每個方法在同一個鎖上同步并使得每次只能有一個線程訪問容器,而是使用一種粒度更細(xì)的加鎖機(jī)制來實現(xiàn)更大程度的共享鼻百,這種機(jī)制稱為“分段鎖”岗照。
2 靜態(tài)結(jié)構(gòu)
2.1 ConcurrentHashMap主要構(gòu)件:
ConcurrentHashMap中主要實體類就是三個:
- ConcurrentHashMap(整個Hash表)
- Segment(桶)
- HashEntry(節(jié)點(diǎn))
對應(yīng)下面的圖可以看出之間的關(guān)系,靜態(tài)內(nèi)部類HashEntry用來封裝映射表的鍵值對亭病,Segment充當(dāng)鎖的角色埋市,每個 Segment 對象維護(hù)散列表的若干個桶。每個桶是由若干個 HashEntry 對象鏈接起來的鏈表命贴。一個 ConcurrentHashMap 實例中包含由若干個 Segment 對象組成的數(shù)組道宅。
下面是ConcurrentHashMap類的聲明:
public class ConcurrentHashMap<K, V> extends AbstractMap<K, V>
implements ConcurrentMap<K, V>, Serializable
其中實現(xiàn)了ConcurrentMap接口,定義如下:
public interface ConcurrentMap<K, V> extendsMap<K, V> {
V putIfAbsent(K key, V value);
boolean remove(Object key, Object value);
boolean replace(K key, V oldValue, V newValue);
V replace(K key, V value);
}
其中定義了4個常用的復(fù)合操作:
V putIfAbsent(K key, V value);
如果沒有這個key胸蛛,則放入這個key-value污茵,返回null,否則返回key對應(yīng)的value葬项。
boolean remove(Object key, Object value);
移除key和對應(yīng)的value泞当,如果key對應(yīng)的不是value,移除失敗
boolean replace(K key, V oldValue, V newValue);
替代key對應(yīng)的值民珍,僅當(dāng)當(dāng)前值為舊值
V replace(K key, V value);
替代key對應(yīng)的值襟士,只要當(dāng)前有值
2.2 Segment 類
static final class Segment<K,V> extends ReentrantLock implements Serializable {
private static final long serialVersionUID = 2249069246763182397L;
static final int MAX_SCAN_RETRIES =
Runtime.getRuntime().availableProcessors() > 1 ? 64 : 1;
transient volatile HashEntry<K,V>[] table;
transient int count; //本segment中HashEntry的個數(shù)
transient int modCount; //table被更新的次數(shù)
transient int threshold; //當(dāng)table中的HashEntry個數(shù)超過該值后,需要table再散列
final float loadFactor; //裝載因子
segment結(jié)構(gòu)示意圖:
Segment是一個可重入鎖嚷量,每個Segment就相當(dāng)于一個HashMap陋桂,是一個數(shù)組和鏈表結(jié)構(gòu),維持了一個HashEntry數(shù)組蝶溶。
2.3 HashEntry
Map內(nèi)的鍵值對是用hashEntry類封裝的嗜历,其中hash,key抖所,next域都被聲明為final屬性梨州,value被聲明為volatile類型。下面是HashEntry源碼:
static final class HashEntry<K,V> {
final int hash;
final K key;
volatile V value;
volatile HashEntry<K,V> next;
田轧。暴匠。。
}
3 動態(tài)算法
3.1 init-初始化流程
ConcurrentHashMap初始化:
public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
if (concurrencyLevel > MAX_SEGMENTS)
concurrencyLevel = MAX_SEGMENTS;
// Find power-of-two sizes best matching arguments
int sshift = 0;
int ssize = 1; //ssize為segment個數(shù)(默認(rèn)16)
while (ssize < concurrencyLevel) {//找到最佳匹配參數(shù)(不小于concurrencyLevel的最小2次冪)
++sshift;
ssize <<= 1;
}
this.segmentShift = 32 - sshift;
this.segmentMask = ssize - 1;
if (initialCapacity > MAXIMUM_CAPACITY)
initialCapacity = MAXIMUM_CAPACITY;
int c = initialCapacity / ssize; //總?cè)萘?segment個數(shù) = 每個segment容量
if (c * ssize < initialCapacity) //確保segment個數(shù)*segment容量大于總的容量
++c;
int cap = MIN_SEGMENT_TABLE_CAPACITY; //找到不小于c的最小2次冪傻粘,cap最終為每個segment的容量
while (cap < c)
cap <<= 1;
// create segments and segments[0]
//這里區(qū)別于JDK1.6區(qū)別的地方每窖,只創(chuàng)建出segments[0]帮掉,并且引入了UNSAFE類的putOrderedObject()方法,這個方法特點(diǎn)是低延時岛请,速度快。而1.6中循環(huán)segments數(shù)組警绩,為每個元素創(chuàng)建segment對象崇败。
Segment<K,V> s0 =
new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
(HashEntry<K,V>[])new HashEntry[cap]);
Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
this.segments = ss;
}
為什么segments數(shù)組大小必須是2的n次方?
為了能通過按位與操作來定位segments數(shù)組的索引值肩祥。
segmentShift和segmentMask干什么用滴?
我們知道基于散列的容器是通過元素的hashCode值來確定元素在容器中的索引的后室,那么ConcurrentHashMap中定位一個元素至少需要兩步:
- 定位segment
- 定位HashEntry
segmentShift和segmentMask的作用就是用來定位segment元素的,如下:
//若ssize=16混狠,即segments數(shù)組大小為16岸霹,那么sshift=4
this.segmentShift = 32 - sshift;//segmentShift=32-4=28
this.segmentMask = ssize - 1;//掩碼segmentMask=15,對應(yīng)二進(jìn)制1111,每位上都是1
((hash >>> segmentShift) & segmentMask)//定位segmetns
(hash & (tab.length-1))//定位HashEntry
3.2 service-常用操作分析
3.2.1.get(Object key)
ConcurrentHashMap的get方法主要步驟如下:
- 通過key的hash值将饺,確定所屬的segment的索引贡避。
long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
默認(rèn)情況下segmentShift=28,segmentMask=15予弧,u是hash后的值刮吧,為32位,h >>> segmentShift) & segmentMask)即讓hash值得高4位參與到運(yùn)算掖蛤。
- 通過UNSAFE類的getObjectVolatile方法獲取segment杀捻。
s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u))
- 通過hash值確定HashEntry數(shù)組位置,遍歷鏈表找出entry
for(HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
(tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
e != null; e = e.next) {
K k;
if((k = e.key) == key || (e.hash == h && key.equals(k)))
returne.value;
}
下面是get方法整體源碼:
publicV get(Object key) {
Segment<K,V> s;
HashEntry<K,V>[] tab;
int h = hash(key);
long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
if((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null&&
(tab = s.table) != null) {
for(HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
(tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
e != null; e = e.next) {
K k;
if((k = e.key) == key || (e.hash == h && key.equals(k)))
returne.value;
}
}
return null;
}
性能分析:
可以看到整個get方法執(zhí)行過程中并沒有進(jìn)行同步操作蚓庭,get操作無需加鎖時間復(fù)雜度O(1)
3.2.2.put(K key, V value)
put操作步驟:
//ConcurrentHashMap的put源碼
public V put(K key, V value) {
Segment<K,V> s;
if (value == null)//value不允許為null
throw new NullPointerException();
int hash = hash(key);
int j = (hash >>> segmentShift) & segmentMask;//定位segment元素
if ((s = (Segment<K,V>)UNSAFE.getObject
(segments, (j << SSHIFT) + SBASE)) == null)
s = ensureSegment(j);//第一次訪問segment時致讥,創(chuàng)建segment對象
return s.put(key, hash, value, false);//調(diào)用segment的put方法
}
//下面是segment中的put方法:
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
//tryLock()加鎖,不成功則執(zhí)行scanAndLockForPut
HashEntry<K,V> node = tryLock() ? null :
scanAndLockForPut(key, hash, value);
V oldValue;
try {
HashEntry<K,V>[] tab = table;
int index = (tab.length - 1) & hash;
HashEntry<K,V> first = entryAt(tab, index);
//下面for循環(huán)是進(jìn)行元素定位即更新操作
for (HashEntry<K,V> e = first;;) {
if (e != null) {
K k;
//key已存在,通過onlyIfAbsent決定是否更新值
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k))) {
oldValue = e.value;
if (!onlyIfAbsent) {
e.value = value;
++modCount;
}
break;
}
e = e.next;
}
else {
//key不存在器赞,構(gòu)造node節(jié)點(diǎn)垢袱,通過UNSAFE類插入節(jié)點(diǎn),注意:這里插入是頭插法
if (node != null)
node.setNext(first);
else
node = new HashEntry<K,V>(hash, key, value, first);//插入鏈表頭node.next = first;
int c = count + 1;
//是否需要擴(kuò)容港柜,注意:只對當(dāng)前segment擴(kuò)容
if (c > threshold && tab.length < MAXIMUM_CAPACITY)
rehash(node);
else
setEntryAt(tab, index, node);
++modCount;
count = c;
oldValue = null;
break;
}
}
} finally {
unlock();
}
return oldValue;
}
需要注意的點(diǎn):
- 插入新節(jié)點(diǎn)的時候惶桐,是頭插法。
- 如果count > threshold需要擴(kuò)容,那么只對當(dāng)前segment擴(kuò)容潘懊。
- 若元素已存在姚糊,不一定更新,只有當(dāng)onlyIfAbsent=false的時候才更新授舟。
在put方法中救恨,首先進(jìn)行加鎖操作,tryLock失敗后并不會按照常規(guī)思路阻塞當(dāng)前線程释树,而是執(zhí)行scanAndLockForPut方法肠槽,下面重點(diǎn)分析下這個方法做了什么工作擎淤?
//Segment類中的常量MAX_SCAN_RETRIES:最大掃描次數(shù),在scanAndLockForPut方法中會用到
static final int MAX_SCAN_RETRIES =
Runtime.getRuntime().availableProcessors() > 1 ? 64 : 1;
//scanAndLockForPut的源碼:
private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
//定位HashEntry數(shù)組位置秸仙,獲取第一個節(jié)點(diǎn)
HashEntry<K,V> first = entryForHash(this, hash);
HashEntry<K,V> e = first;
HashEntry<K,V> node = null;
int retries = -1;//掃描次數(shù)
while (!tryLock()) {
HashEntry<K,V> f;
if (retries < 0) {
if (e == null) {
if (node == null) // 構(gòu)造新節(jié)點(diǎn)
node = new HashEntry<K,V>(hash, key, value, null);
retries = 0;
}
else if (key.equals(e.key))
retries = 0;
else
e = e.next;
}
else if (++retries > MAX_SCAN_RETRIES) {
lock();
break;
}
else if ((retries & 1) == 0 &&
(f = entryForHash(this, hash)) != first) {
e = first = f; // 首節(jié)點(diǎn)有變動嘴拢,更新first,重新掃描
retries = -1;
}
}
return node;
}
scanAndLockForPut的作用:
- 當(dāng)前線程獲取不到鎖的時候并沒有閑著寂纪,而是查找key是否已經(jīng)存在席吴,如果當(dāng)前鏈表中沒有查到,則新建一個HashEntry對象捞蛋。
- 新建HashEntry節(jié)點(diǎn)后孝冒,當(dāng)retries <= MAX_SCAN_RETRIES時,不斷通過tryLock嘗試獲取鎖拟杉,retries > MAX_SCAN_RETRIES,則調(diào)用lock(),此時若還獲取不到鎖庄涡,那么當(dāng)前線程就被阻塞,這點(diǎn)類似于自旋鎖搬设。
- 在檢索key的時候穴店,別的線程可能正在對segment進(jìn)行修改,所以要做如下檢查:
//每間隔一次循環(huán)拿穴,檢查一次first節(jié)點(diǎn)是否改變
if ((retries & 1) == 0 &&(f = entryForHash(this, hash)) != first) {
e = first = f; // 首節(jié)點(diǎn)有變動迹鹅,更新first,重新掃描
retries = -1;
}
通過scanAndLockForPut方法贞言,當(dāng)前線程就可以在即使獲取不到segment鎖的情況下斜棚,完成潛在需要添加節(jié)點(diǎn)的實例化工作,當(dāng)獲取鎖后该窗,就可以直接將該節(jié)點(diǎn)插入鏈表即可弟蚀。還實現(xiàn)了類似于自旋鎖的功能,防止執(zhí)行put操作的線程頻繁阻塞酗失,這些優(yōu)化都提升了put操作的性能义钉。
3.2.3.全局操作 size()/isEmpty()/clear()
由于分段技術(shù)的緣故,ConcurrentHashMap的全局操作(size/isEmpty/clear)就會變得復(fù)雜
size操作
要統(tǒng)計ConcurrentHashMap的元素個數(shù)规肴,就要將所有segment的元素個數(shù)求和捶闸。直接累加?肯定不行拖刃,在并發(fā)環(huán)境中删壮,這樣得到的結(jié)果并不準(zhǔn)確。對所有segment加鎖再求和兑牡?這樣做結(jié)果肯定正確央碟,但是這違背了ConcurrentHashMap設(shè)計的初衷,在并發(fā)環(huán)境中要有良好的變現(xiàn)均函。ConcurrentHashMap中size方法實現(xiàn)選擇了一個折中方案:
先嘗試兩次不對Segment加鎖方式來統(tǒng)計count值亿虽,如果統(tǒng)計過程中count發(fā)生變化了菱涤,再加鎖。如果沒有改變洛勉,則返回size粘秆。
isEmpty操作
isEmpty操作會對segment對象的count進(jìn)行檢查,如果count!=0,那么直接返回false,同時對所有segment的modCount變量進(jìn)行統(tǒng)計收毫,統(tǒng)計兩次的結(jié)果如果一致攻走,那么表示容器在此之間沒有發(fā)生變化,那么返回true.
clear操作
clear操作將容器中的所有元素移除牛哺,通過遍歷segments數(shù)組陋气,將每個segment對象中元素清除劳吠,源碼如下:
public void clear() {
final Segment<K,V>[] segments = this.segments;
for (int j = 0; j < segments.length; ++j) {
Segment<K,V> s = segmentAt(segments, j);
if (s != null)
s.clear();
}
}
調(diào)用clear方法后引润,容器一定被清空了嗎?
考慮這么一種情況痒玩,當(dāng)A線程執(zhí)行clear方法時淳附,已經(jīng)將segments[0]對象清空了,此時B線程執(zhí)行put(key,value)方法蠢古,如果key散列到segments[0]上奴曙,那么A執(zhí)行完后容器中還有元素!所以clear方法是弱一致的
ConcurrentHashMap弱一致性問題?
上面分析clear方法的若一致性草讶,其實size()/isEmpty()/clear()都是在無鎖清空下執(zhí)行的洽糟,這帶來的后果就是數(shù)據(jù)的不一致。所以在選擇容器的時候要注意堕战,如果非常強(qiáng)調(diào)全局方法的實時性和一致性坤溃,那么ConcurrentHashMap并不是一個好選擇。