Java集合框架中的Map類型的數(shù)據(jù)結(jié)構(gòu)是非線程安全, 在多線程環(huán)境中使用時(shí)需要手動(dòng)進(jìn)行線程同步. 因此在java.util.concurrent包中提供了一個(gè)線程安全版本的Map類型數(shù)據(jù)結(jié)構(gòu): ConcurrentMap. 本篇文章主要關(guān)注ConcurrentMap接口以及它的Hash版本的實(shí)現(xiàn)ConcurrentHashMap.
要實(shí)現(xiàn)線程安全,就需要加鎖, HashTable就是線程安全的, 但是HashTable對(duì)整張表加鎖的做法非常消耗性能, ConcurrentMap的做法簡(jiǎn)單來(lái)說(shuō), 就是把哈希表分成若干段, 對(duì)其中某一段操作時(shí), 只鎖住這一段, 其他段可以不受影響. 如下圖所示:
整個(gè)ConcurrentMap由一個(gè)segment數(shù)組組成(即segments), 數(shù)組中每一個(gè)segment是一張哈希表, 哈希表中存放的是一張hashentry鏈表.
本文主要分析以下兩個(gè)過(guò)程:
1.ConcurrentMap的構(gòu)造方法
2.put(K key, V value)
ConcurrentMap的構(gòu)造方法
public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel);
public ConcurrentHashMap(int initialCapacity, float loadFactor);
public ConcurrentHashMap(int initialCapacity);
public ConcurrentHashMap();
public ConcurrentHashMap(Map<? extends K, ? extends V> m);
這5種構(gòu)造方法, 最終都會(huì)使用第一個(gè)構(gòu)造函數(shù)來(lái)初始化ConcurrentHashMap.
第一個(gè)構(gòu)造函數(shù)的代碼:
public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
if (concurrencyLevel > MAX_SEGMENTS)
concurrencyLevel = MAX_SEGMENTS;
// Find power-of-two sizes best matching arguments
int sshift = 0;
int ssize = 1;
while (ssize < concurrencyLevel) {
++sshift;
ssize <<= 1;
}
this.segmentShift = 32 - sshift;
this.segmentMask = ssize - 1;
if (initialCapacity > MAXIMUM_CAPACITY)
initialCapacity = MAXIMUM_CAPACITY;
int c = initialCapacity / ssize;
if (c * ssize < initialCapacity)
++c;
int cap = MIN_SEGMENT_TABLE_CAPACITY;
while (cap < c)
cap <<= 1;
// create segments and segments[0]
Segment<K,V> s0 =
new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
(HashEntry<K,V>[])new HashEntry[cap]);
Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
this.segments = ss;
}
其中concurrencyLevel是實(shí)例化以后就不可改變的, 它決定了segments數(shù)組的大小, 同時(shí)決定了這個(gè)ConcurrentHashMap的并發(fā)能力.segmentShift和segmentMask與hash算法相關(guān).loadFactor參數(shù)決定了Segment在何時(shí)擴(kuò)容.
ConcurrentHashMap的put方法
public V put(K key, V value) {
Segment<K,V> s;
if (value == null)
throw new NullPointerException();
int hash = hash(key);
int j = (hash >>> segmentShift) & segmentMask;
if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck
(segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment
s = ensureSegment(j);
return s.put(key, hash, value, false);
}
過(guò)程如下:
1.通過(guò)第一次哈希算法,計(jì)算當(dāng)前key屬于哪一個(gè)segment;
2.如果映射到的segment為空, 則執(zhí)行3, 否則執(zhí)行4;
3.借助UNSAFE類, 使用線程安全的方式, 構(gòu)建一個(gè)新的segment, 并且放入segments中相應(yīng)的位置;
4.調(diào)用segment中的put方法, 將key, value對(duì)放入segment中.
從中可以看到, 最終存儲(chǔ)key,value對(duì)的是segment, 因此只要對(duì)需要插入鍵值對(duì)的segment上鎖就可以保證線程安全.
segment的put方法
首先看segment的定義, 它是ConcurrentHashMap的內(nèi)部類, 該類繼承了ReentrantLock.
static final class Segment<K,V> extends ReentrantLock implements Serializable {}
查看它的構(gòu)造方法:
Segment(float lf, int threshold, HashEntry<K,V>[] tab) {
this.loadFactor = lf;
this.threshold = threshold;
this.table = tab;
}
Segment的主要構(gòu)成是一張哈希表(存儲(chǔ)hashentry)和threshold(當(dāng)哈希表尺寸大于threshold時(shí)擴(kuò)容).
下面是它的put方法:
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
HashEntry<K,V> node = tryLock() ? null :
scanAndLockForPut(key, hash, value);
V oldValue;
try {
HashEntry<K,V>[] tab = table;
int index = (tab.length - 1) & hash;
HashEntry<K,V> first = entryAt(tab, index);
for (HashEntry<K,V> e = first;;) {
if (e != null) {
K k;
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k))) {
oldValue = e.value;
if (!onlyIfAbsent) {
e.value = value;
++modCount;
}
break;
}
e = e.next;
}
else {
if (node != null)
node.setNext(first);
else
node = new HashEntry<K,V>(hash, key, value, first);
int c = count + 1;
if (c > threshold && tab.length < MAXIMUM_CAPACITY)
rehash(node);
else
setEntryAt(tab, index, node);
++modCount;
count = c;
oldValue = null;
break;
}
}
} finally {
unlock();
}
return oldValue;
}
put方法的主要流程如下:
1.嘗試獲取當(dāng)前segment的鎖,若成功則執(zhí)行3, 若失敗則執(zhí)行2;
2.根據(jù)key在hashtable中搜索hashentry, 并且獲取鎖(此處會(huì)進(jìn)行阻塞操作);
3.根據(jù)key在hashtable中搜索hashentry,如果當(dāng)前位置已經(jīng)存在值, 則使用鏈接的方式解決沖突;
4.如果當(dāng)前尺寸超過(guò)了threshold,則執(zhí)行rehash(將segment中的所有鍵值對(duì)重新hash).
總結(jié)
ConcurrentHashMap的實(shí)現(xiàn),主要原理就是使用segments將原本唯一的hashtable分段, 增加并發(fā)能力. ConcurrentHashMap中還有比較重要的一點(diǎn)就是使用的兩次哈希算法(第一次哈希算法找到segment, 第二次哈希算法找到hashentry), 這兩次哈希算法要求能盡量將元素均勻的分布到不同的segment的hashtable中, ConcurrentHashMap使用的是single-word Wang/Jenkins哈希算法的一個(gè)變種, 關(guān)于哈希算法, 可以看此處(http://burtleburtle.net/bob/hash/doobs.html) 進(jìn)一步了解.