相關(guān)的HashMap
java里面有HashMap滤淳、HashTable 和 ConcurrentHashMap术健。其中HashTable 和 ConcurrentHashMap是線程安全的缓艳,但是原理不一樣亲桥。HashTable使用synchronized來鎖住整張Hash表來實現(xiàn)線程安全昼牛,即每次鎖住整張表讓線程獨占虽缕,這樣會造成并發(fā)量很低台妆。
初識ConcurrentHashMap
ConcurrentHashMap允許多個修改操作并發(fā)進行础锐,其關(guān)鍵在于使用了鎖分離技術(shù)
扶叉。它使用了多個鎖來控制對hash表的不同部分進行的修改勿锅。ConcurrentHashMap內(nèi)部使用段(Segment)來表示這些不同的部分,每個段其實就是一個小的Hashtable枣氧,它們有自己的鎖溢十。只要多個修改操作發(fā)生在不同的段上,它們就可以并發(fā)進行达吞。
有些方法需要跨段张弛,比如size()和containsValue(),它們可能需要鎖定整個表而而不僅僅是某個段,這需要按順序鎖定所有段吞鸭,操作完畢后寺董,又按順序釋放所有段的鎖。這里“按順序”是很重要的瞒大,否則極有可能出現(xiàn)死鎖螃征。
ConcurrentHashMap原理
本文章所有分析基于jdk1.7
原理圖
原理分析
ConcurrentHashMap使用分段鎖技術(shù),將數(shù)據(jù)分成一段一段的存儲透敌,然后給每一段數(shù)據(jù)配一把鎖盯滚,當一個線程占用鎖訪問其中一個段數(shù)據(jù)的時候,其他段的數(shù)據(jù)也能被其他線程訪問酗电,能夠?qū)崿F(xiàn)真正的并發(fā)訪問
數(shù)據(jù)結(jié)構(gòu)
- Segment
從內(nèi)部結(jié)構(gòu)圖中可以看到魄藕,ConcurrentHashMap內(nèi)部分為很多個Segment,每一個Segment擁有一把鎖撵术,然后每個Segment(繼承ReentrantLock)
Segment的聲明如下
static final class Segment<K,V> extends ReentrantLock implements Serializable
Segment繼承了ReentrantLock背率,表明每個segment都可以當做一個鎖。segment里面是HashEntry的數(shù)組(ReentrantLock的使用可以參考文章)這樣對每個segment中的數(shù)據(jù)需要同步操作的話都是使用每個segment容器對象自身的鎖來實現(xiàn)嫩与。只有對全局需要改變時鎖定的是所有的segment寝姿。
- HashEntry
static final class HashEntry<K,V> {
final int hash;
final K key;
volatile V value;
volatile HashEntry<K,V> next;
}
常用操作
1 初始化
先看一下初始化代碼
/**
* Creates a new, empty map with the specified initial
* capacity, load factor and concurrency level.
*
* @param initialCapacity the initial capacity. The implementation
* performs internal sizing to accommodate this many elements.
* @param loadFactor the load factor threshold, used to control resizing.
* Resizing may be performed when the average number of elements per
* bin exceeds this threshold.
* @param concurrencyLevel the estimated number of concurrently
* updating threads. The implementation performs internal sizing
* to try to accommodate this many threads.
* @throws IllegalArgumentException if the initial capacity is
* negative or the load factor or concurrencyLevel are
* nonpositive.
*/
@SuppressWarnings("unchecked")
public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
if (concurrencyLevel > MAX_SEGMENTS)
concurrencyLevel = MAX_SEGMENTS;
// Find power-of-two sizes best matching arguments
int sshift = 0;
int ssize = 1;
while (ssize < concurrencyLevel) {
++sshift;
ssize <<= 1;
}
this.segmentShift = 32 - sshift;
this.segmentMask = ssize - 1;
if (initialCapacity > MAXIMUM_CAPACITY)
initialCapacity = MAXIMUM_CAPACITY;
int c = initialCapacity / ssize;
if (c * ssize < initialCapacity)
++c;
int cap = MIN_SEGMENT_TABLE_CAPACITY;
while (cap < c)
cap <<= 1;
// create segments and segments[0]
Segment<K,V> s0 =
new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
(HashEntry<K,V>[])new HashEntry[cap]);
Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
this.segments = ss;
}
傳入的參數(shù)有initialCapacity,loadFactor划滋,concurrencyLevel這三個饵筑。
initialCapacity
表示新創(chuàng)建的這個ConcurrentHashMap的初始容量,也就是上面的結(jié)構(gòu)圖中的Entry數(shù)量处坪。默認值為static final int DEFAULT_INITIAL_CAPACITY = 16;
loadFactor
表示負載因子根资,就是當ConcurrentHashMap中的元素個數(shù)大于loadFactor * 最大容量時就需要rehash,擴容同窘。默認值為static final float DEFAULT_LOAD_FACTOR = 0.75f;
concurrencyLevel
表示并發(fā)級別玄帕,這個值用來確定Segment的個數(shù),Segment的個數(shù)是大于等于concurrencyLevel的第一個2的n次方的數(shù)想邦。比如裤纹,如果concurrencyLevel為12,13丧没,14服傍,15,16這些數(shù)骂铁,則Segment的數(shù)目為16(2的4次方)。默認值為static final int DEFAULT_CONCURRENCY_LEVEL = 16;罩抗。理想情況下ConcurrentHashMap的真正的并發(fā)訪問量能夠達到concurrencyLevel拉庵,因為有concurrencyLevel個Segment,假如有concurrencyLevel個線程需要訪問Map套蒂,并且需要訪問的數(shù)據(jù)都恰好分別落在不同的Segment中钞支,則這些線程能夠無競爭地自由訪問(因為他們不需要競爭同一把鎖)茫蛹,達到同時訪問的效果。這也是為什么這個參數(shù)起名為“并發(fā)級別”的原因烁挟。
初始化的一些動作:
- 驗證參數(shù)的合法性婴洼,如果不合法,直接拋出異常撼嗓。
- concurrencyLevel也就是Segment的個數(shù)不能超過規(guī)定的最大Segment的個數(shù)柬采,默認值為static final int MAX_SEGMENTS = 1 << 16;,如果超過這個值且警,設(shè)置為這個值粉捻。
- 然后使用循環(huán)找到大于等于concurrencyLevel的第一個2的n次方的數(shù)ssize,這個數(shù)就是Segment數(shù)組的大小斑芜,并記錄一共向左按位移動的次數(shù)sshift肩刃,并令segmentShift = 32 - sshift,并且segmentMask的值等于ssize - 1杏头,segmentMask的各個二進制位都為1盈包,目的是之后可以通過key的hash值與這個值做&運算確定Segment的索引。
- 檢查給的容量值是否大于允許的最大容量值醇王,如果大于該值呢燥,設(shè)置為該值厦画。最大容量值為static final int MAXIMUM_CAPACITY = 1 << 30;。
然后計算每個Segment平均應該放置多少個元素力试,這個值c是向上取整的值。比如初始容量為15排嫌,Segment個數(shù)為4,則每個Segment平均需要放置4個元素淳地。 - 最后創(chuàng)建一個Segment實例,將其當做Segment數(shù)組的第一個元素颇象。
2伍伤、put操作
/**
* Maps the specified key to the specified value in this table.
* Neither the key nor the value can be null.
*
* <p> The value can be retrieved by calling the <tt>get</tt> method
* with a key that is equal to the original key.
*
* @param key key with which the specified value is to be associated
* @param value value to be associated with the specified key
* @return the previous value associated with <tt>key</tt>, or
* <tt>null</tt> if there was no mapping for <tt>key</tt>
* @throws NullPointerException if the specified key or value is null
*/
@SuppressWarnings("unchecked")
public V put(K key, V value) {
Segment<K,V> s;
if (value == null)
throw new NullPointerException();
int hash = hash(key);
int j = (hash >>> segmentShift) & segmentMask;
if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck
(segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment
s = ensureSegment(j);
return s.put(key, hash, value, false);
}
操作步驟如下:
- 判斷value是否為null遣钳,如果為null,直接拋出異常姐直。
- key通過一次hash運算得到一個hash值蒋畜。(這個hash運算下文詳說)
- 將得到hash值向右按位移動segmentShift位,然后再與segmentMask做&運算得到segment的索引j插龄。
在初始化的時候我們說過segmentShift的值等于32-sshift佣渴,例如concurrencyLevel等于16辛润,則sshift等于4,則segmentShift為28真椿,segmentMask為15乎澄,對應的二進制為0000 0000 0000 0000 0000 0000 0000 1111
置济。hash值是一個32位的整數(shù)解恰,將其向右移動28位就變成這個樣子:
0000 0000 0000 0000 0000 0000 0000 xxxx
护盈,然后再用這個值與segmentMask做&運算羞酗,也就是取最后四位的值。這個值確定Segment的索引胸竞。 - 使用Unsafe的方式從Segment數(shù)組中獲取該索引對應的Segment對象卫枝。
- 向這個Segment對象中put值讹挎,這個put操作也基本是一樣的步驟(通過&運算獲取HashEntry的索引,然后set)痒谴。
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
HashEntry<K,V> node = tryLock() ? null :
scanAndLockForPut(key, hash, value);
V oldValue;
try {
HashEntry<K,V>[] tab = table;
int index = (tab.length - 1) & hash;
HashEntry<K,V> first = entryAt(tab, index);
for (HashEntry<K,V> e = first;;) {
if (e != null) {
K k;
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k))) {
oldValue = e.value;
if (!onlyIfAbsent) {
e.value = value;
++modCount;
}
break;
}
e = e.next;
}
else {
if (node != null)
node.setNext(first);
else
node = new HashEntry<K,V>(hash, key, value, first);
int c = count + 1;
if (c > threshold && tab.length < MAXIMUM_CAPACITY)
rehash(node);
else
setEntryAt(tab, index, node);
++modCount;
count = c;
oldValue = null;
break;
}
}
} finally {
unlock();
}
return oldValue;
}
3、get操作
/**
* Returns the value to which the specified key is mapped,
* or {@code null} if this map contains no mapping for the key.
*
* <p>More formally, if this map contains a mapping from a key
* {@code k} to a value {@code v} such that {@code key.equals(k)},
* then this method returns {@code v}; otherwise it returns
* {@code null}. (There can be at most one such mapping.)
*
* @throws NullPointerException if the specified key is null
*/
public V get(Object key) {
Segment<K,V> s; // manually integrate access methods to reduce overhead
HashEntry<K,V>[] tab;
int h = hash(key);
long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
(tab = s.table) != null) {
for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
(tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
e != null; e = e.next) {
K k;
if ((k = e.key) == key || (e.hash == h && key.equals(k)))
return e.value;
}
}
return null;
}
操作步驟為:
- 和put操作一樣,先通過key進行兩次hash確定應該去哪個Segment中取數(shù)據(jù)漱贱。
- 使用Unsafe獲取對應的Segment夭委,然后再進行一次&運算得到HashEntry鏈表的位置株灸,然后從鏈表頭開始遍歷整個鏈表(因為Hash可能會有碰撞,所以用一個鏈表保存)逐抑,如果找到對應的key屹蚊,則返回對應的value值汹粤,如果鏈表遍歷完都沒有找到對應的key,則說明Map中不包含該key冯丙,返回null遭京。
- 值得注意的是哪雕,get操作是不需要加鎖的(如果value為null,會調(diào)用readValueUnderLock利虫,只有這個步驟會加鎖)糠惫,通過前面提到的volatile和final來確保數(shù)據(jù)安全。
4巢价、 size操作
/**
* Returns the number of key-value mappings in this map. If the
* map contains more than <tt>Integer.MAX_VALUE</tt> elements, returns
* <tt>Integer.MAX_VALUE</tt>.
*
* @return the number of key-value mappings in this map
*/
public int size() {
// Try a few times to get accurate count. On failure due to
// continuous async changes in table, resort to locking.
final Segment<K,V>[] segments = this.segments;
int size;
boolean overflow; // true if size overflows 32 bits
long sum; // sum of modCounts
long last = 0L; // previous sum
int retries = -1; // first iteration isn't retry
try {
for (;;) {
if (retries++ == RETRIES_BEFORE_LOCK) {
for (int j = 0; j < segments.length; ++j)
ensureSegment(j).lock(); // force creation
}
sum = 0L;
size = 0;
overflow = false;
for (int j = 0; j < segments.length; ++j) {
Segment<K,V> seg = segmentAt(segments, j);
if (seg != null) {
sum += seg.modCount;
int c = seg.count;
if (c < 0 || (size += c) < 0)
overflow = true;
}
}
if (sum == last)
break;
last = sum;
}
} finally {
if (retries > RETRIES_BEFORE_LOCK) {
for (int j = 0; j < segments.length; ++j)
segmentAt(segments, j).unlock();
}
}
return overflow ? Integer.MAX_VALUE : size;
}
size操作與put和get操作最大的區(qū)別在于,size操作需要遍歷所有的Segment才能算出整個Map的大小备燃,而put和get都只關(guān)心一個Segment并齐。假設(shè)我們當前遍歷的Segment為SA,那么在遍歷SA過程中其他的Segment比如SB可能會被修改唁奢,于是這一次運算出來的size值可能并不是Map當前的真正大小麻掸。所以一個比較簡單的辦法就是計算Map大小的時候所有的Segment都Lock住赐纱,不能更新(包含put,remove等等)數(shù)據(jù)诚隙,計算完之后再Unlock久又。這是普通人能夠想到的方案效五,但是牛逼的作者還有一個更好的Idea:先給3次機會畏妖,不lock所有的Segment,遍歷所有Segment半夷,累加各個Segment的大小得到整個Map的大小,如果某相鄰的兩次計算獲取的所有Segment的更新的次數(shù)(每個Segment都有一個modCount變量淘邻,這個變量在Segment中的Entry被修改時會加一列荔,通過這個值可以得到每個Segment的更新操作的次數(shù))是一樣的枚尼,說明計算過程中沒有更新操作署恍,則直接返回這個值蜻直。如果這三次不加鎖的計算過程中Map的更新次數(shù)有變化概而,則之后的計算先對所有的Segment加鎖,再遍歷所有Segment計算Map大小王悍,最后再解鎖所有Segment
舉個例子
一個Map有4個Segment压储,標記為S1源譬,S2,S3刮刑,S4雷绢,現(xiàn)在我們要獲取Map的size厚脉。計算過程是這樣的:第一次計算傻工,不對S1孵滞,S2坊饶,S3殴蓬,S4加鎖染厅,遍歷所有的Segment,假設(shè)每個Segment的大小分別為1孤页,2涩馆,3魂那,4,更新操作次數(shù)分別為:2鲜结,2轻腺,3划乖,1琴庵,則這次計算可以得到Map的總大小為1+2+3+4=10,總共更新操作次數(shù)為2+2+3+1=8儿礼;第二次計算蚊夫,不對S1,S2,S3,S4加鎖懦尝,遍歷所有Segment,假設(shè)這次每個Segment的大小變成了2伍绳,2乍桂,3睹酌,4,更新次數(shù)分別為3闯传,2,3字币,1洗出,因為兩次計算得到的Map更新次數(shù)不一致(第一次是8翩活,第二次是9)則可以斷定這段時間Map數(shù)據(jù)被更新,則此時應該再試一次冗荸;第三次計算蚌本,不對S1隘梨,S2轴猎,S3,S4加鎖锐峭,遍歷所有Segment只祠,假設(shè)每個Segment的更新操作次數(shù)還是為3,2熊杨,3晶府,1钻趋,則因為第二次計算和第三次計算得到的Map的更新操作的次數(shù)是一致的蛮位,就能說明第二次計算和第三次計算這段時間內(nèi)Map數(shù)據(jù)沒有被更新失仁,此時可以直接返回第三次計算得到的Map的大小。最壞的情況:第三次計算得到的數(shù)據(jù)更新次數(shù)和第二次也不一樣控轿,則只能先對所有Segment加鎖再計算最后解鎖茬射。
注意事項
- ConcurrentHashMap中的key和value值都不能為null在抛,HashMap中key可以為null萧恕,HashTable中key不能為null廊鸥。
- ConcurrentHashMap是線程安全的類并不能保證使用了ConcurrentHashMap的操作都是線程安全的惰说!
- ConcurrentHashMap的get操作不需要加鎖,put操作需要加鎖