Doug Lea
大神在j.u.c
包下給我們提供了一個(gè)適用于多線程并發(fā)環(huán)境使用的集合類ConcurrentHashMap
觅彰。而如果在多線程環(huán)境累提,不考慮任何線程安全的防范的話皱碘,使用HashMap
會(huì)帶來諸多問題财剖。
HashMap的并發(fā)問題
HashMap
集合是非線程安全幌羞,在多線程環(huán)境下容易出現(xiàn)問題寸谜。HashMap
在數(shù)據(jù)更新的時(shí)候,會(huì)帶來很多問題:
- 數(shù)據(jù)丟失
多線程環(huán)境下属桦,如兩個(gè)線程同時(shí)在一個(gè)bucket
下put
元素熊痴,有可能會(huì)造成元素寫入被覆蓋,從而丟失數(shù)據(jù)地啰。
- fail-fast機(jī)制
當(dāng)多個(gè)線程對(duì)同一個(gè)集合進(jìn)行操作時(shí)候愁拭,就會(huì)觸發(fā)fail-fast
機(jī)制并拋出ConcurrentModificationException
。
- 死循環(huán)
當(dāng)HashMap
中table
的大小不夠時(shí)亏吝,如果兩個(gè)線程同時(shí)進(jìn)行resize
的操作岭埠。如果某一bucket
下元素是用鏈表記錄,在resize
過程中蔚鸥,鏈表在多線程的環(huán)境下有可能會(huì)形成閉合回鏈惜论,get
請(qǐng)求就會(huì)造成死循環(huán),使得CPU
飆升止喷。詳細(xì)可以看看這篇博文:疫苗:JAVA HASHMAP的死循環(huán)馆类。不過JDK1.8
中已經(jīng)改寫resize
方法,應(yīng)該不會(huì)出現(xiàn)這種問題弹谁,但這并不是我們可以在多線程環(huán)境下使用HashMap
理由乾巧。
...
線程安全類 HashTable
HashTable
是線程安全的句喜,底層是通過synchronized
來保證線程安全。當(dāng)多線程競(jìng)爭(zhēng)激烈的時(shí)候沟于,沒有獲得鎖的線程都將會(huì)阻塞咳胃。synchronized
修飾所有針對(duì)HashTable
集合的操作。這樣一旦有線程獲得鎖旷太,其他的線程都只能等待鎖的釋放展懈,然后再去競(jìng)爭(zhēng)鎖,這樣一來HashTable
的效率必定會(huì)受到影響供璧。
ConcurrentHashMap 鎖機(jī)制
相對(duì)于低效的HashTable
存崖,ConcurrentHashMap
在鎖機(jī)制層面上做了優(yōu)化。鎖優(yōu)化的思路一般有一下幾種:
- 減少鎖持有時(shí)間
- 減小鎖粒度
- 鎖分離
- 鎖粗化
- 鎖消除
JDK1.6 ConcurrentHashMap
ConcurrentHashMap
中存儲(chǔ)的元素是通過靜態(tài)內(nèi)部類HashEntry
封裝實(shí)現(xiàn)的睡毒。
static final class HashEntry<K,V> {
final K key;
final int hash;
volatile V value;
final HashEntry<K,V> next;
HashEntry(K key, int hash, HashEntry<K,V> next, V value) {
this.key = key;
this.hash = hash;
this.next = next;
this.value = value;
}
}
其中value
字段被聲明為 volatile
型来惧,保證其在內(nèi)存中可見性。key
演顾,hash
和 next
都被聲明為 final
型违寞。ConcurrentHashMap
存儲(chǔ)數(shù)據(jù)根據(jù)key
的hash
值將數(shù)據(jù)元素散列到哈希表中每一個(gè)bucket
中。當(dāng)發(fā)生哈希碰撞時(shí)候偶房,會(huì)將元素封裝成HashEntry
構(gòu)成鏈表趁曼。由于next
是final
類型的,鏈表中添加元素都將從表頭添加棕洋。
減小鎖粒度-分段鎖
在JDK1.7
中挡闰,ConcurrentHashMap
在鎖優(yōu)化過程通過減小鎖粒度的實(shí)現(xiàn)了對(duì)集合的高效并發(fā)操作。ConcurrentHashMap
包含一個(gè)靜態(tài)內(nèi)部類Segment
掰盘,是用來充當(dāng)鎖的角色摄悯。
每個(gè)Segment
將維護(hù)若干個(gè)bucket
。而鎖只針對(duì)Segment
而不是整張表愧捕,從而實(shí)現(xiàn)減小鎖的粒度奢驯。
Segment類
static final class Segment<K,V> extends ReentrantLock implements Serializable {
// 包含HashEntry的個(gè)數(shù)
transient volatile int count;
// table 更新次數(shù)
transient int modCount;
// table resize 閾值
transient int threshold;
// HashEntry數(shù)組用于存儲(chǔ)元素
transient volatile HashEntry<K,V>[] table;
// 加載因子
final float loadFactor;
// 構(gòu)造函數(shù)
Segment(int initialCapacity, float lf) {
loadFactor = lf;
setTable(HashEntry.<K,V>newArray(initialCapacity));
}
void setTable(HashEntry<K,V>[] newTable) {
threshold = (int)(newTable.length * loadFactor);
table = newTable;
}
HashEntry<K,V> getFirst(int hash) {
HashEntry<K,V>[] tab = table;
return tab[hash & (tab.length - 1)];
}
}
ConcurrentHashMap 初始化
public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
if(!(loadFactor > 0) || initialCapacity < 0 ||
concurrencyLevel <= 0)
throw new IllegalArgumentException();
if(concurrencyLevel > MAX_SEGMENTS)
concurrencyLevel = MAX_SEGMENTS;
int sshift = 0;
int ssize = 1;
while(ssize < concurrencyLevel) {
++sshift;
ssize <<= 1;
}
segmentShift = 32 - sshift;
segmentMask = ssize - 1;
this.segments = Segment.newArray(ssize);
if (initialCapacity > MAXIMUM_CAPACITY)
initialCapacity = MAXIMUM_CAPACITY;
int c = initialCapacity / ssize;
if(c * ssize < initialCapacity)
++c;
int cap = 1;
while(cap < c)
cap <<= 1;
for(int i = 0; i < this.segments.length; ++i) {
this.segments[i] = new Segment<K,V>(cap, loadFactor);
}
ConcurrentHashMap
在初始化的過程中,創(chuàng)建了segments
數(shù)組次绘。ConcurrentHashMap
的結(jié)構(gòu)示意圖如下所示:
put 方法
public V put(K key, V value) {
// 不允許value為null
if (value == null)
throw new NullPointerException();
int hash = hash(key.hashCode());
// 根據(jù)哈希值找到在segments數(shù)組中對(duì)一個(gè)的segment
return segmentFor(hash).put(key, hash, value, false);
}
V put(K key, int hash, V value, boolean onlyIfAbsent) {
// segment.put 加鎖瘪阁,鎖對(duì)象是segment而非整個(gè)table
lock();
try {
int c = count;
// 動(dòng)態(tài)擴(kuò)容
if (c++ > threshold)
rehash();
// 找出table中key index處的元素
HashEntry<K,V>[] tab = table;
int index = hash & (tab.length - 1);
HashEntry<K,V> first = tab[index];
HashEntry<K,V> e = first;
while (e != null && (e.hash != hash || !key.equals(e.key)))
e = e.next;
V oldValue;
if (e != null) {
oldValue = e.value;
// key處已有值,根據(jù)onlyIfAbsent覺得是否需要覆蓋
if (!onlyIfAbsent) {
e.value = value;
}
else {
// 元素封裝成 HashEntry邮偎,添加至表頭
oldValue = null;
++modCount;
tab[index] = new HashEntry<K,V>(key, hash, first, value);
count = c;
}
return oldValue;
} finally {
// 釋放鎖
unlock();
}
}
segment
是繼承ReentrantLock
管跺。put
操作在開始之前會(huì)通過調(diào)用lock
獲取鎖,添加元素完畢后禾进,調(diào)用unlock
釋放鎖豁跑。從此可以看出,ConcurrentHashMap
在鎖方面優(yōu)化點(diǎn)之一泻云,引入segment
艇拍,將鎖分成N
段狐蜕,每次操作集合,只會(huì)鎖住對(duì)應(yīng)的segment
而非整張表卸夕,減小鎖粒度馏鹤,支持一定數(shù)量并發(fā)寫入,提升了并發(fā)效率娇哆。
讀寫鎖分離-完全并發(fā)讀
get 方法
ConcurrentHashMap
中的讀操作如get
方法是沒有加鎖的。在更新操作中勃救,最后都會(huì)更新count
變量碍讨。count
是volatile
類型,在不加鎖的前提下蒙秒,也可以保證被準(zhǔn)確讀取勃黍。而在讀的時(shí)候也會(huì)去首先判斷count
的值。如果寫入過程讀取值晕讲,就要加鎖等待其他操作釋放鎖之后再去讀取覆获。
V get(Object key, int hash) {
if(count != 0) {
HashEntry<K,V> e = getFirst(hash);
while(e != null) {
if(e.hash == hash && key.equals(e.key)) {
V v = e.value;
if(v != null)
return v;
// 寫入完成前讀取需加鎖
return readValueUnderLock(e);
}
e = e.next;
}
}
return null;
}
在ConcurrentHashMap
中,在不加鎖的前提下可以成功讀取值瓢省,這種讀寫分離鎖的實(shí)現(xiàn)弄息,減少了請(qǐng)求獲取鎖的頻次,使得并發(fā)效率進(jìn)一步提高勤婚。
JDK1.8 ConcurrentHashMap
在JDK1.8
中摹量,ConcurrentHashMap
的實(shí)現(xiàn)不再使用Segment
做鎖分段方法。新版本中ConcurrentHashMap
采用底層的CPU
的CAS
指令和synchronized
來實(shí)現(xiàn)鎖機(jī)制馒胆。數(shù)據(jù)存儲(chǔ)和HashMap
一致缨称,采用數(shù)組、鏈表和紅黑樹實(shí)現(xiàn)祝迂。
sizeCtl 變量
private transient volatile int sizeCtl;
sizeCtl
是控制標(biāo)識(shí)符睦尽,不同的值代表不同的意義。
-
-1
表示正在初始化 -
-N
表示N-1
個(gè)線程正在進(jìn)行擴(kuò)容 -
0
代表尚未初始化 -
>0
擴(kuò)容閾值
table 初始化
table
初始化是在put
操作過程中進(jìn)行的型雳〉狈玻可以從源碼角度看一下initTable
是如何保證在多線程環(huán)境下,只會(huì)初始化一次纠俭。
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {
if ((sc = sizeCtl) < 0)
// sizeCtl 小于0表明有其他線程正在操作table 初始化或者擴(kuò)容宁玫,當(dāng)前線程讓出CPU
Thread.yield(); // lost initialization race; just spin
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
// 通過CAS機(jī)制講更新sizeCtl為-1,保證線程安全柑晒。
try {
if ((tab = table) == null || tab.length == 0) {
// table 初始化
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
break;
}
}
return tab;
}
CAS
(Compare and Swap
)欧瘪,從字面意思來看就是比較并交換。CAS
有3個(gè)操作數(shù)匙赞,原始值V
佛掖,預(yù)期值A
妖碉,要修改的值B
,當(dāng)且僅當(dāng)原始值V
等于預(yù)期值A
的時(shí)候芥被,才會(huì)將V
修改為B
欧宜。Java
中通過sun.misc.Unsafe
類調(diào)用JNI
代碼來實(shí)現(xiàn)CPU
的CAS
指令。
這里通過借助CAS
實(shí)現(xiàn)了區(qū)別于內(nèi)部悲觀獨(dú)占鎖synchronized
的樂觀鎖來實(shí)現(xiàn)ConcurrentHashMap
的并發(fā)安全拴魄。
put 方法
public V put(K key, V value) {
return putVal(key, value, false);
}
/** Implementation for put and putIfAbsent */
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0)
tab = initTable();
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else {
V oldVal = null;
synchronized (f) {
if (tabAt(tab, i) == f) {
if (fh >= 0) {
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
addCount(1L, binCount);
return null;
}
-
key
或value
為空,拋出NP
異常冗茸,表明ConcurrentHashMap
不允許key
或value
為空 - 調(diào)用
spread
方法計(jì)算出key
的哈希值 - 遍歷
table
- 如果
table
為空,進(jìn)行初始化工作 - 當(dāng)前
index
沒有其他元素匹中,調(diào)用casTabAt
通過CAS
更新元素值 - 檢測(cè)到其他線程正在擴(kuò)容夏漱,會(huì)調(diào)用
helpTransfer
方法協(xié)助其調(diào)用 - 當(dāng)發(fā)生哈希碰撞,無論是鏈表還是紅黑樹顶捷,添加元素的操作都需要上鎖
synchronized
- 如果
get 方法
ConcurrentHashMap
的get
方法挂绰,沒有上鎖,表明ConcurrentHashMap
在讀操作上是支持完全并發(fā)的服赎。效率層面不受加鎖機(jī)制的影響葵蒂。
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
int h = spread(key.hashCode());
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
else if (eh < 0)
return (p = e.find(h, key)) != null ? p.val : null;
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}
總結(jié)
在多線程并發(fā)環(huán)境下,HashMap
是肯定不能用重虑。我們要選擇適用于多線程高并發(fā)場(chǎng)景的集合類践付。
ConcurrentHashMap
支持完全并發(fā)讀操作,從效率上來說是優(yōu)于HashTable
缺厉,但由于ConcurrentHashMap
在讀操作中存在弱一致性荔仁,所以還是需要結(jié)合場(chǎng)景來決定是否用ConcurrentHashMap
替代HashTable
。