ConcurrentHashMap從JDK1.5開始隨java.util.concurrent包一起引入JDK中锣咒,主要為了解決HashMap線程不安全和Hashtable效率不高的問題裤唠。眾所周知洽糟,HashMap在多線程編程中是線程不安全的蔓榄,而Hashtable由于使用了synchronized修飾方法而導(dǎo)致執(zhí)行效率不高阿浓;因此他嚷,在concurrent包中,實(shí)現(xiàn)了ConcurrentHashMap以使在多線程編程中可以使用一個(gè)高性能的線程安全HashMap方案。
ConcurrentHashMap在jdk1.7和jdk1.8中的實(shí)現(xiàn)有所不同筋蓖。JDK1.7之前的ConcurrentHashMap使用分段鎖機(jī)制實(shí)現(xiàn)卸耘,JDK1.8則使用數(shù)組+鏈表+紅黑樹數(shù)據(jù)結(jié)構(gòu)和CAS原子操作實(shí)現(xiàn)ConcurrentHashMap。
JDK7版本
1.分段鎖機(jī)制
Hashtable之所以效率低下主要是因?yàn)槠鋵?shí)現(xiàn)使用了synchronized關(guān)鍵字對(duì)put等操作進(jìn)行加鎖粘咖,而synchronized關(guān)鍵字加鎖是對(duì)整個(gè)對(duì)象進(jìn)行加鎖蚣抗,也就是說在進(jìn)行put等修改Hash表的操作時(shí),鎖住了整個(gè)Hash表瓮下,從而使得其表現(xiàn)的效率低下翰铡。
** 在JDK1.5~1.7版本,Java使用了分段鎖機(jī)制實(shí)現(xiàn)ConcurrentHashMap**
ConcurrentHashMap在對(duì)象中保存了一個(gè)Segment數(shù)組唱捣,即將整個(gè)Hash表劃分為多個(gè)分段两蟀;而每個(gè)Segment元素,即每個(gè)分段則類似于一個(gè)Hashtable震缭;這樣赂毯,在執(zhí)行put操作時(shí)首先根據(jù)hash算法定位到元素屬于哪個(gè)Segment,然后對(duì)Segment加鎖即可拣宰。因此党涕,ConcurrentHashMap在多線程并發(fā)編程中可以實(shí)現(xiàn)多線程put操作。
2. ConcurrentHashMap的數(shù)據(jù)結(jié)構(gòu)
在ConcurrentHashMap中巡社,定義了一個(gè)Segment<K, V>[]數(shù)組來將Hash表實(shí)現(xiàn)分段存儲(chǔ)膛堤,從而實(shí)現(xiàn)分段加鎖;而一個(gè)Segment元素則與HashMap結(jié)構(gòu)類似晌该,其包含了一個(gè)HashEntry數(shù)組肥荔,用來存儲(chǔ)Key/Value對(duì)。Segment繼承了ReetrantLock朝群,表示Segment是一個(gè)可重入鎖燕耿,因此ConcurrentHashMap通過可重入鎖對(duì)每個(gè)分段進(jìn)行加鎖。
ConcurrentHashMap把容器分為多個(gè) segment(片段) 姜胖,每個(gè)片段有一把鎖誉帅,當(dāng)多線程訪問容器里不同數(shù)據(jù)段的數(shù)據(jù)時(shí),線程間就不會(huì)存在競(jìng)爭(zhēng)關(guān)系右莱;一個(gè)線程占用鎖訪問一個(gè)segment的數(shù)據(jù)時(shí)蚜锨,并不影響另外的線程訪問其他segment中的數(shù)據(jù)。
Segment的結(jié)構(gòu)與HashMap類似慢蜓,每個(gè)片段對(duì)應(yīng)一個(gè)table數(shù)組和鏈表結(jié)構(gòu)亚再!
一個(gè)Segment里面包含一個(gè)HashEntry數(shù)組,每個(gè)HashEntry是一個(gè)鏈表結(jié)構(gòu)晨抡,當(dāng)對(duì)HashEntry數(shù)組的數(shù)據(jù)進(jìn)行修改時(shí)针余,必須首先獲得與它對(duì)應(yīng)的Segment鎖饲鄙!
Segment中的元素HashEntry與hashmap中數(shù)組的元素Node區(qū)別僅在于value與下一節(jié)點(diǎn)執(zhí)行next,HashEntry對(duì)這兩個(gè)屬性添加volitie關(guān)鍵字圆雁,確保多線程并發(fā)下get方法的安全性(ConcurrentHashMap在get方法中是不加鎖的)
3 ConcurrentHashMap的初始化
JDK1.7的ConcurrentHashMap的初始化主要分為兩個(gè)部分:一是初始化ConcurrentHashMap忍级,即初始化segment數(shù)組、segmentShift段偏移量和segmentMask段掩碼等伪朽。然后則是初始化每個(gè)segment分段轴咱。接下來,我們將分別介紹這兩部分初始化烈涮。
public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) {
//非法判斷朴肺,裝載因子須大于0,數(shù)組容量不得小于0坚洽,并發(fā)數(shù)量要大于0戈稿,否則拋出異常
if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
//分段數(shù)大于數(shù)組長(zhǎng)度,則取分段數(shù)為數(shù)組長(zhǎng)度(最大分段)
if (concurrencyLevel > MAX_SEGMENTS)
concurrencyLevel = MAX_SEGMENTS;
// Find power-of-two sizes best matching arguments
int sshift = 0;
int ssize = 1;
//sshift++讶舰,ssize*2鞍盗,直到ssize達(dá)到最大并發(fā)數(shù)
while (ssize < concurrencyLevel) {
++sshift;
ssize <<= 1;
}
this.segmentShift = 32 - sshift;
this.segmentMask = ssize - 1;
if (initialCapacity > MAXIMUM_CAPACITY)
initialCapacity = MAXIMUM_CAPACITY;
int c = initialCapacity / ssize;
if (c * ssize < initialCapacity)
++c;
int cap = MIN_SEGMENT_TABLE_CAPACITY;
while (cap < c)
cap <<= 1;
// create segments and segments[0]
Segment<K,V> s0 = new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
(HashEntry<K,V>[])new HashEntry[cap]);
Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
// ordered write of segments[0]
UNSAFE.putOrderedObject(ss, SBASE, s0);
this.segments = ss;
}
由代碼可知,該構(gòu)造函數(shù)需要傳入三個(gè)參數(shù):initialCapacity跳昼、loadFactor般甲、concurrencyLevel,其中鹅颊,concurrencyLevel主要用來初始化segments敷存、segmentShift和segmentMask等;而initialCapacity和loadFactor則主要用來初始化每個(gè)Segment分段堪伍。
根據(jù)ConcurrentHashMap的構(gòu)造方法可知锚烦,在初始化時(shí)創(chuàng)建了兩個(gè)中間變量ssize和sshift,它們都是通過concurrencyLevel計(jì)算得到的帝雇。其中ssize表示了segments數(shù)組的長(zhǎng)度涮俄,為了能通過按位與的散列算法來定位segments數(shù)組的索引,必須保證segments數(shù)組的長(zhǎng)度是2的N次方摊求,所以在初始化時(shí)通過循環(huán)計(jì)算出一個(gè)大于或等于concurrencyLevel(允許并發(fā)數(shù))的最小的2的N次方值來作為數(shù)組的長(zhǎng)度;而sshift表示了計(jì)算ssize時(shí)進(jìn)行移位操作的次數(shù)刘离。
segmentShift用于定位參與散列運(yùn)算的位數(shù)室叉,其等于32減去sshift,使用32是因?yàn)镃oncurrentHashMap的hash()方法返回的最大數(shù)是32位的硫惕;segmentMask是散列運(yùn)算的掩碼茧痕,等于ssize減去1,所以掩碼的二進(jìn)制各位都為1恼除。
(如ssize=“...10000”踪旷,則segmentMask=“..1111”)
4 get
JDK1.7的ConcurrentHashMap的get操作是不加鎖的曼氛,因?yàn)樵诿總€(gè)Segment中定義的HashEntry數(shù)組和在每個(gè)HashEntry中定義的value和next HashEntry節(jié)點(diǎn)都是volatile類型的,volatile類型的變量可以保證其在多線程之間的可見性令野,因此可以被多個(gè)線程同時(shí)讀舀患,從而不用加鎖。而其get操作步驟也比較簡(jiǎn)單气破,定位Segment –> 定位HashEntry –> 通過getObjectVolatile()方法獲取指定偏移量上的HashEntry –> 通過循環(huán)遍歷鏈表獲取對(duì)應(yīng)值聊浅。
定位Segment:(((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE
定位HashEntry:(((tab.length - 1) & h)) << TSHIFT) + TBASE
put
public V put(K key, V value) {
Segment<K,V> s;
if (value == null)
throw new NullPointerException();
int hash = hash(key);
int j = (hash >>> segmentShift) & segmentMask;
if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck
(segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment
s = ensureSegment(j);
return s.put(key, hash, value, false);
}
同樣的,put方法首先也會(huì)通過hash算法定位到對(duì)應(yīng)的Segment现使,此時(shí)低匙,如果獲取到的Segment為空,則調(diào)用ensureSegment()方法碳锈;否則顽冶,直接調(diào)用查詢到的Segment的put方法插入值,注意此處并沒有用getObjectVolatile()方法讀售碳,而是在ensureSegment()中再用volatile讀操作强重,這樣可以在查詢segments不為空的時(shí)候避免使用volatile讀,提高效率团滥。在ensureSegment()方法中竿屹,首先使用getObjectVolatile()讀取對(duì)應(yīng)Segment,如果還是為空灸姊,則以segments[0]為原型創(chuàng)建一個(gè)Segment對(duì)象拱燃,并將這個(gè)對(duì)象設(shè)置為對(duì)應(yīng)的Segment值并返回。
在Segment的put方法中力惯,首先需要調(diào)用tryLock()方法獲取鎖碗誉,然后通過hash算法定位到對(duì)應(yīng)的HashEntry,然后遍歷整個(gè)鏈表父晶,如果查到key值哮缺,則直接插入元素即可;而如果沒有查詢到對(duì)應(yīng)的key甲喝,則需要調(diào)用rehash()方法對(duì)Segment中保存的table進(jìn)行擴(kuò)容尝苇,擴(kuò)容為原來的2倍,并在擴(kuò)容之后插入對(duì)應(yīng)的元素埠胖。插入一個(gè)key/value對(duì)后糠溜,需要將統(tǒng)計(jì)Segment中元素個(gè)數(shù)的count屬性加1。最后直撤,插入成功之后非竿,需要使用unLock()釋放鎖。
size
ConcurrentHashMap的size操作的實(shí)現(xiàn)方法也非常巧妙谋竖,一開始并不對(duì)Segment加鎖红柱,而是直接嘗試將所有的Segment元素中的count相加承匣,這樣執(zhí)行兩次,然后將兩次的結(jié)果對(duì)比锤悄,如果兩次結(jié)果相等則直接返回韧骗;而如果兩次結(jié)果不同,則再將所有Segment加鎖铁蹈,然后再執(zhí)行統(tǒng)計(jì)得到對(duì)應(yīng)的size值宽闲。
JDK8版本
在JDK1.7之前,ConcurrentHashMap是通過分段鎖機(jī)制來實(shí)現(xiàn)的握牧,所以其最大并發(fā)度受Segment的個(gè)數(shù)限制容诬。因此,在JDK1.8中沿腰,ConcurrentHashMap的實(shí)現(xiàn)原理摒棄了這種設(shè)計(jì)览徒,而是選擇了與HashMap類似的數(shù)組+鏈表+紅黑樹的方式實(shí)現(xiàn),而加鎖則采用CAS和synchronized實(shí)現(xiàn)颂龙。
初始化
JDK1.8的ConcurrentHashMap的初始化過程也比較簡(jiǎn)單习蓬,所有的構(gòu)造方法最終都會(huì)調(diào)用如下這個(gè)構(gòu)造方法。
public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
if (initialCapacity < concurrencyLevel)// Use at least as many bins
initialCapacity = concurrencyLevel; // as estimated threads
long size = (long)(1.0 + (long)initialCapacity / loadFactor);
int cap = (size >= (long)MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : tableSizeFor((int)size);
this.sizeCtl = cap;
}
該初始化過程通過指定的初始容量initialCapacity措嵌,加載因子loadFactor和預(yù)估并發(fā)度concurrencyLevel三個(gè)參數(shù)計(jì)算table數(shù)組的初始大小sizeCtl的值躲叼。
可以看到,在構(gòu)造ConcurrentHashMap時(shí)企巢,并不會(huì)對(duì)hash表(Node<K, V>[] table)進(jìn)行初始化枫慷,hash表的初始化是在插入第一個(gè)元素時(shí)進(jìn)行的。在put操作時(shí)浪规,如果檢測(cè)到table為空或其長(zhǎng)度為0時(shí)或听,則會(huì)調(diào)用initTable()方法對(duì)table進(jìn)行初始化操作。
private final Node<K,V>[] initTable() {
Node<K,V>[] tab;
int sc;
while ((tab = table) == null || tab.length == 0) {
if ((sc = sizeCtl) < 0)
Thread.yield(); // lost initialization race; just spin
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
if ((tab = table) == null || tab.length == 0) {
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
break;
}
}
return tab;
}
可以看到笋婿,該方法使用一個(gè)循環(huán)實(shí)現(xiàn)table的初始化誉裆;在循環(huán)中,首先會(huì)判斷sizeCtl的值缸濒,如果其小于0足丢,則說明其正在進(jìn)行初始化或擴(kuò)容操作,則不執(zhí)行任何操作庇配,調(diào)用yield()方法使當(dāng)前線程返回等待狀態(tài)斩跌;而如果sizeCtl大于等于0,則使用CAS操作比較sizeCtl的值是否是-1讨永,如果是-1則進(jìn)行初始化滔驶。初始化時(shí)遇革,如果sizeCtl的值為0卿闹,則創(chuàng)建默認(rèn)容量的table揭糕;否則創(chuàng)建大小為sizeCtl的table;然后重置sizeCtl的值為0.75n锻霎,即當(dāng)前table容量的0.75倍著角,并返回創(chuàng)建的table,此時(shí)初始化hash表完成旋恼。
Node鏈表和紅黑樹結(jié)構(gòu)轉(zhuǎn)換
一個(gè)table元素會(huì)根據(jù)其包含的Node節(jié)點(diǎn)數(shù)在鏈表和紅黑樹兩種結(jié)構(gòu)之間切換吏口,因此我們本節(jié)先介紹Node節(jié)點(diǎn)的結(jié)構(gòu)轉(zhuǎn)換的實(shí)現(xiàn)。
首先冰更,在table中添加一個(gè)元素時(shí)产徊,如果添加元素的鏈表節(jié)點(diǎn)個(gè)數(shù)超過8,則會(huì)觸發(fā)鏈表向紅黑樹結(jié)構(gòu)轉(zhuǎn)換蜀细。具體的實(shí)現(xiàn)方法如下:
private final void treeifyBin(Node<K,V>[] tab, int index) {
Node<K,V> b;
int n, sc;
if (tab != null) {
if ((n = tab.length) < MIN_TREEIFY_CAPACITY)
tryPresize(n << 1);
else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
synchronized (b) {
if (tabAt(tab, index) == b) {
TreeNode<K,V> hd = null, tl = null;
for (Node<K,V> e = b; e != null; e = e.next) {
TreeNode<K,V> p =new TreeNode<K,V>(e.hash, e.key, e.val, null, null);
if ((p.prev = tl) == null)
hd = p;
else
tl.next = p;
tl = p;
}
setTabAt(tab, index, new TreeBin<K,V>(hd));
}
}
}
}
}
該方法首先會(huì)檢查hash表的大小是否大于等于MIN_TREEIFY_CAPACITY舟铜,默認(rèn)值為64,如果小于該值奠衔,則表示不需要轉(zhuǎn)化為紅黑樹結(jié)構(gòu)谆刨,直接將hash表擴(kuò)容即可。
如果當(dāng)前table的長(zhǎng)度大于64归斤,則使用CAS獲取指定的Node節(jié)點(diǎn)痊夭,然后對(duì)該節(jié)點(diǎn)通過synchronized加鎖,由于只對(duì)一個(gè)Node節(jié)點(diǎn)加鎖脏里,因此該操作并不影響其他Node節(jié)點(diǎn)的操作抄淑,因此極大的提高了ConcurrentHashMap的并發(fā)效率。加鎖之后贺拣,便是將這個(gè)Node節(jié)點(diǎn)所在的鏈表轉(zhuǎn)換為TreeBin結(jié)構(gòu)的紅黑樹欧聘。
然后,在table中刪除元素時(shí)员淫,如果元素所在的紅黑樹節(jié)點(diǎn)個(gè)數(shù)小于6合蔽,則會(huì)觸發(fā)紅黑樹向鏈表結(jié)構(gòu)轉(zhuǎn)換。具體實(shí)現(xiàn)如下:
static <K,V> Node<K,V> untreeify(Node<K,V> b) {
Node<K,V> hd = null, tl = null;
for (Node<K,V> q = b; q != null; q = q.next) {
Node<K,V> p = new Node<K,V>(q.hash, q.key, q.val, null);
if (tl == null)
hd = p;
else
tl.next = p;
tl = p;
}
return hd;
}
get
通過get獲取hash表中的值時(shí)介返,首先需要獲取key值的hash值拴事。而在JDK1.8的ConcurrentHashMap中通過speed()方法獲取。
static final int spread(int h) {
return (h ^ (h >>> 16)) & HASH_BITS;
}
speed()方法將key的hash值進(jìn)行再hash圣蝎,讓hash值的高位也參與hash運(yùn)算刃宵,從而減少哈希沖突。然后再查詢對(duì)應(yīng)的value值徘公。
public V get(Object key) {
Node<K,V>[] tab;
Node<K,V> e, p;
int n, eh;
K ek;
int h = spread(key.hashCode());
if ((tab = table) != null && (n = tab.length) > 0 && (e = tabAt(tab, (n - 1) & h)) != null) {
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}else if (eh < 0)
return (p = e.find(h, key)) != null ? p.val : null;
while ((e = e.next) != null) {
if (e.hash == h && ((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}
查詢時(shí)牲证,首先通過tabAt()方法找到key對(duì)應(yīng)的Node鏈表或紅黑樹,然后遍歷該結(jié)構(gòu)便可以獲取key對(duì)應(yīng)的value值关面。其中坦袍,tabAt()方法主要通過Unsafe類的getObjectVolatile()方法獲取value值十厢,通過volatile讀獲取value值,可以保證value值的可見性捂齐,從而保證其是當(dāng)前最新的值蛮放。
put
JDK1.8的ConcurrentHashMap的put操作實(shí)現(xiàn)方式主要定義在putVal(K key, V value, boolean onlyIfAbsent)中。
put操作大致可分為以下幾個(gè)步驟:
計(jì)算key的hash值奠宜,即調(diào)用speed()方法計(jì)算hash值包颁;
獲取hash值對(duì)應(yīng)的Node節(jié)點(diǎn)位置,此時(shí)通過一個(gè)循環(huán)實(shí)現(xiàn)压真。有以下幾種情況:
1.如果table表為空娩嚼,則首先進(jìn)行初始化操作,初始化之后再次進(jìn)入循環(huán)獲取Node節(jié)點(diǎn)的位置滴肿;
2.如果table不為空待锈,但沒有找到key對(duì)應(yīng)的Node節(jié)點(diǎn),則直接調(diào)用casTabAt()方法插入一個(gè)新節(jié)點(diǎn)嘴高,此時(shí)不用加鎖竿音;
3.如果table不為空,且key對(duì)應(yīng)的Node節(jié)點(diǎn)也不為空拴驮,但Node頭結(jié)點(diǎn)的hash值為MOVED(-1)春瞬,則表示需要擴(kuò)容,此時(shí)調(diào)用helpTransfer()方法進(jìn)行擴(kuò)容套啤;
4.其他情況下宽气,則直接向Node中插入一個(gè)新Node節(jié)點(diǎn),此時(shí)需要對(duì)這個(gè)Node鏈表或紅黑樹通過synchronized加鎖潜沦。
插入元素后萄涯,判斷對(duì)應(yīng)的Node結(jié)構(gòu)是否需要改變結(jié)構(gòu),如果需要?jiǎng)t調(diào)用treeifyBin()方法將Node鏈表升級(jí)為紅黑樹結(jié)構(gòu)唆鸡;
最后涝影,調(diào)用addCount()方法記錄table中元素的數(shù)量。
size
JDK1.8的ConcurrentHashMap中保存元素的個(gè)數(shù)的記錄方法也有不同争占,首先在添加和刪除元素時(shí)燃逻,會(huì)通過CAS操作更新ConcurrentHashMap的baseCount屬性值來統(tǒng)計(jì)元素個(gè)數(shù)。但是CAS操作可能會(huì)失敗臂痕,因此伯襟,ConcurrentHashMap又定義了一個(gè)CounterCell數(shù)組來記錄CAS操作失敗時(shí)的元素個(gè)數(shù)。因此握童,ConcurrentHashMap中元素的個(gè)數(shù)則通過如下方式獲得:
元素總數(shù) = baseCount + sum(CounterCell)
size只能獲取int范圍內(nèi)的ConcurrentHashMap元素個(gè)數(shù)姆怪;而如果hash表中的數(shù)據(jù)過多,超過了int類型的最大值,則推薦使用mappingCount()方法獲取其元素個(gè)數(shù)稽揭。