一坦康、背景
? 在JDK1.5
版本中引入了一個線程安全高性能Map類—ConcurrentHashMap
揖赴,主要為了解決HashMap
線程不安全和Hashtable
效率不高的問題。眾所周知竭钝,HashMap
在多線程編程中是線程不安全的梨撞,而Hashtable
由于使用了synchronized
修飾方法而導(dǎo)致執(zhí)行效率不高雹洗;因此,ConcurrentHashMap
的出現(xiàn)是為了在保證線程安全的前提下卧波,提高性能时肿。
二、設(shè)計猜想
? 在探究源碼之前港粱,首先我們先嘗試用自己的邏輯來設(shè)計下ConcurrentHashMap<K,V>
的數(shù)據(jù)結(jié)構(gòu)螃成。在我們熟悉的數(shù)據(jù)結(jié)構(gòu)中有 一種hash
表的數(shù)據(jù)結(jié)構(gòu)符合ConcurrentHashMap
的存儲要求
[散列表](Hash table,也叫哈希表)啥容,是根據(jù)關(guān)鍵碼值(Key value)而直接進(jìn)行訪問的數(shù)據(jù)結(jié)構(gòu)锈颗。也就是 說,它通過把關(guān)鍵碼值映射到表中一個位置來訪問記錄咪惠,以加快查找的速度击吱。這個映射函數(shù)叫做散列函數(shù), 存放記錄的數(shù)組叫做散列表遥昧。
在設(shè)計ConcurrentHashMap
中需要關(guān)注的幾個方面:
- 由于數(shù)組大小覆醇、類型一旦確定,編譯炭臭、運行時都不可修改永脓,必須對
ConcurrentHashMap
的數(shù)組動態(tài)擴(kuò)容。 - 通過Key計算得到的
Hash
值常摧,必須保證元素都能正確的被保存落午,需要考慮hash
沖突的處理溃斋。
三梗劫、設(shè)計思路
Hashtable
效率不高的主要原因是容器操作過程中對整個對象進(jìn)行加鎖梳侨,其他線程的任何操作都會被阻塞。解決方式就是從“減少鎖的范圍”割坠、“引入非阻塞鎖”或引入”讀寫鎖“等方面進(jìn)行優(yōu)化彼哼,針對以上的思路可以進(jìn)行如下改進(jìn):
- 將
Hashtable
整張表拆分成一張張的小表敢朱,操作中只有某個小表受影響,而其他小表可以實現(xiàn)高性能操作孝常。 - 小表可以實現(xiàn)非阻塞鎖的”讀寫“鎖蚓哩,在操作小表時岸梨,可以減少其他線程的等待時間曹阔。
我們暫稱以上的設(shè)計為“分段鎖”技術(shù)赃份。
Map的底層數(shù)據(jù)結(jié)構(gòu)是數(shù)組+鏈表漓库,基于上面的拆表思路,將上面的小表替換成鏈表的頭節(jié)點即數(shù)組的某個元素彪薛。基于以上的方案易遣,可以降低方案一的復(fù)雜度和空間使用率豆茫。
JDK中提供的ConcurrentHashMap
在1.7和1.8的版本中正是基于這個改進(jìn)思路進(jìn)行設(shè)計的侨歉。
四、分段鎖技術(shù)
? ConcurrentHashMap
通過持有一個Segment[]
數(shù)組對象來實現(xiàn)鎖分段技術(shù)幽邓,其中每一個Segment
對象通過繼承ReentrantLock
實現(xiàn)持有非阻塞的重入鎖火脉。實則一個Segment
對象可以理解為一個HashMap
牵舵,不同在于Segment
持有鎖倦挂,而HashMap
不持有鎖畸颅。Segment
中持有一個HashEntry[]
數(shù)組,HashEntry
是數(shù)據(jù)節(jié)點對象方援。
ConcurrentHashMap
結(jié)構(gòu)如下圖:
4.1 名詞術(shù)語
字段 | 說明 | 備注 |
---|---|---|
concurrencyLevel | 并發(fā)級別 | |
segmentShift | 段偏移量 | |
segmentMask | 段掩碼 |
4.2 源碼剖析
4.2.1 容器初始化
? 容器初始化主要是為完成Segment[]
數(shù)組對象的初始化肯骇,Segment[]
數(shù)組的大小又依賴于ssize
的值笛丙,ssize
是通過concurrencyLevel
計算得到骨稿,其中ssize應(yīng)該滿足2的N次方(power-of-two sizes)姜钳,所以ssize >=concurrencyLevel哥桥,此處對于Segment[]數(shù)組的大小的約束是為了通過按位與的散列算法來定位Segments
數(shù)組的索引辙浑。(以下省略無關(guān)代碼)
public ConcurrentHashMap(int initialCapacity,float loadFactor, int concurrencyLevel) {
int ssize = 1;
while (ssize < concurrencyLevel)
ssize <<= 1;
Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
}
4.2.2 設(shè)置值
public V put(K key, V value) {
Segment<K,V> s;
if (value == null)
throw new NullPointerException();
int hash = hash(key);
int j = (hash >>> segmentShift) & segmentMask;
if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck
(segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment
s = ensureSegment(j);
return s.put(key, hash, value, false);
}
-
ConcurrentHashMap
的value不能為空,否則拋出NPE異常拟糕。 - 獲取key相應(yīng)的
hash
值判呕,用于計算操作元素所在的segment[]
數(shù)組的索引下標(biāo)j。 - 通過索引拿到相應(yīng)的
segment
對象,進(jìn)行put
操作
計算Key的hash值
private int hash(Object k) {
int h = hashSeed;
if ((0 != h) && (k instanceof String))
return sun.misc.Hashing.stringHash32((String) k);
h ^= k.hashCode();
h += (h << 15) ^ 0xffffcd7d;
h ^= (h >>> 10);
h += (h << 3);
h ^= (h >>> 6);
h += (h << 2) + (h << 14);
return h ^ (h >>> 16);
}
ConcurrentHashMap
中對key的處理采用的是變體的Wang/Jenkins
哈希算法送滞。此算法特點:雪崩性和可逆性侠草。
計算Segment數(shù)組下標(biāo)
public ConcurrentHashMap(int initialCapacity,float loadFactor, int concurrencyLevel) {
int sshift = 0,ssize = 1;
while (ssize < concurrencyLevel){
++sshift;
ssize <<= 1;
}
this.segmentShift = 32 - sshift;
this.segmentMask = ssize - 1;
}
public V put(K key, V value) {
int j = (hash >>> segmentShift) & segmentMask;
}
? j指待操作的Segment[]
數(shù)組的下標(biāo),而其中的segmentShift
和segmentMask
是容器初始化時完成的賦值犁嗅。其中的ssize
等于Segment[]
數(shù)組的長度边涕,而sshift
滿足2^sshif=ssize
公式。下標(biāo)j的值域范圍為[0,ssize-1]
,由于ssize=2^sshif
功蜓,那么下標(biāo)j可以用1個sshift
位的二進(jìn)制數(shù)字表示园爷。假如:ssize為16,那么sshift=4霞赫,j的值域為[0,15]腮介,而[0000b,1111b]
就是j的值域;則求key在Segment[]的下標(biāo)j端衰,就是求key對應(yīng)的一個散列的4位二進(jìn)制數(shù)值叠洗。而ConcurrentHashMap的源碼求下標(biāo)j的方式非常簡單,就是取key的hash值的高4位旅东。因此灭抑,求key散列到長度為ssize的Segment數(shù)組的下標(biāo)j,就是求key的hash值的高sshift位抵代。
初始化Segment對象
private Segment<K,V> ensureSegment(int k) {
final Segment<K,V>[] ss = this.segments;
long u = (k << SSHIFT) + SBASE; // raw offset
Segment<K,V> seg;
if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {
Segment<K,V> proto = ss[0]; // use segment 0 as prototype
int cap = proto.table.length;
float lf = proto.loadFactor;
int threshold = (int)(cap * lf);
HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap];
if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))== null) { // recheck
Segment<K,V> s = new Segment<K,V>(lf, threshold, tab);
while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))== null) {
if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))
break;
}
}
}
return seg;
}
- 利用
UNSAFE
原子操作從Segment[]
數(shù)組中嘗試獲取對應(yīng)Segment
對象腾节,沒有則進(jìn)行初始化,有則直接返回荤牍。 -
Segment
對象的初始化以Segment[]
數(shù)組中頭節(jié)點的參數(shù)和UNSAFE
原子操作完成初始化案腺。
存放元素
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
//嘗試獲取Segment的Lock,獲取成功返回null康吵,否則非阻塞重試3次劈榨,超過3次則阻塞等待鎖,返回對應(yīng)的鏈表節(jié)點晦嵌。
HashEntry<K,V> node = tryLock() ? null :scanAndLockForPut(key, hash, value);
V oldValue;
try {
HashEntry<K,V>[] tab = table;
int index = (tab.length - 1) & hash;
HashEntry<K,V> first = entryAt(tab, index);
for (HashEntry<K,V> e = first;;) {
if (e != null) {
K k;
if ((k = e.key) == key ||(e.hash == hash && key.equals(k))) {
oldValue = e.value;
if (!onlyIfAbsent) {
e.value = value;
++modCount;
}
break;
}
//遞歸從鏈表頭切換到后續(xù)節(jié)點
e = e.next;
}
else {
if (node != null)
node.setNext(first);
else
node = new HashEntry<K,V>(hash, key, value, first);
int c = count + 1;
if (c > threshold && tab.length < MAXIMUM_CAPACITY)
rehash(node);
else
setEntryAt(tab, index, node);
++modCount;
count = c;
oldValue = null;
break;
}
}
} finally {
unlock();
}
return oldValue;
}
- 首先嘗試獲取
Segment
的Lock同辣,Segment
持有的是ReentrantLock
非阻塞鎖,如果失敗則重試獲取 - 獲取鏈表的頭結(jié)點惭载,
Segment
持有的HashEntry[]
數(shù)組操作的是頭結(jié)點開始:- 如果頭節(jié)點e不為空旱函,如果key相同,則更新值描滔,否則執(zhí)行
e = e.next
進(jìn)行鏈表遍歷 - 如果頭節(jié)點為空棒妨,則新增節(jié)點node,滿足
node.next=first
含长。
- 如果頭節(jié)點e不為空旱函,如果key相同,則更新值描滔,否則執(zhí)行
4.2.3 容器擴(kuò)容
private void rehash(HashEntry<K,V> node) {
HashEntry<K,V>[] oldTable = table;
int oldCapacity = oldTable.length;
int newCapacity = oldCapacity << 1;
threshold = (int)(newCapacity * loadFactor);
HashEntry<K,V>[] newTable =
(HashEntry<K,V>[]) new HashEntry[newCapacity];
int sizeMask = newCapacity - 1;
for (int i = 0; i < oldCapacity ; i++) {
HashEntry<K,V> e = oldTable[i];
if (e != null) {
HashEntry<K,V> next = e.next;
int idx = e.hash & sizeMask;
if (next == null) // Single node on list
newTable[idx] = e;
else { // Reuse consecutive sequence at same slot
HashEntry<K,V> lastRun = e;
int lastIdx = idx;
for (HashEntry<K,V> last = next;
last != null;
last = last.next) {
int k = last.hash & sizeMask;
if (k != lastIdx) {
lastIdx = k;
lastRun = last;
}
}
newTable[lastIdx] = lastRun;
// Clone remaining nodes
for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
V v = p.value;
int h = p.hash;
int k = h & sizeMask;
HashEntry<K,V> n = newTable[k];
newTable[k] = new HashEntry<K,V>(h, p.key, v, n);
}
}
}
}
int nodeIndex = node.hash & sizeMask; // add the new node
node.setNext(newTable[nodeIndex]);
newTable[nodeIndex] = node;
table = newTable;
}
4.2.4 獲取值
public V get(Object key) {
Segment<K,V> s; // manually integrate access methods to reduce overhead
HashEntry<K,V>[] tab;
int h = hash(key);
long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
if ((s=(Segment<K,V>)UNSAFE.getObjectVolatile(segments,u))!=null&&(tab = s.table)!=null) {
for (HashEntry<K,V> e=(HashEntry<K,V>) UNSAFE.getObjectVolatile(tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);e != null; e = e.next) {
K k;
if ((k = e.key) == key || (e.hash == h && key.equals(k)))
return e.value;
}
}
return null;
}
- 獲取key相應(yīng)的
hash
值靶衍,通過計算得到索引u
,從而定位到Segment[]
數(shù)組中的Segment
對象茎芋。 - 在通過計算得到
HashEntry[]
數(shù)組中的HashEntry
對象,再通過循環(huán)遍歷鏈表蜈出,最終定位到數(shù)據(jù)田弥。
五、行鎖技術(shù)
5.1 源碼剖析
5.2.1 容器初始化
static final float DEFAULT_LOAD_FACTOR = 0.75f;
static final int DEFAULT_INITIAL_CAPACITY = 1 << 4;
public ConcurrentHashMap(int initialCapacity,float loadFactor, int concurrencyLevel) {
……………………
this.loadFactor = loadFactor;
this.threshold = tableSizeFor(initialCapacity);
}
在進(jìn)行初始化容器時铡原,會設(shè)置兩個參數(shù)偷厦,initialCapacity
表示容器數(shù)組的容量商叹,loadFactor
表示容器的負(fù)載因子。其中tableSizeFor
會對initialCapacity
參數(shù)值進(jìn)行二進(jìn)制的位左移只泼,使得容量是2的N次方剖笙。
5.2.2 設(shè)置值
計算Key的hash值
static final int spread(int h) {
return (h ^ (h >>> 16)) & HASH_BITS;
}
其中 hash(key) 方法就是計算key的hash值, (h = key.hashCode()) ^ (h >>> 16) 使用高位16位和低16位異或運算得到Hash值请唱,主要為了使hash分布盡可能的均勻弥咪。
[圖片上傳失敗...(image-237caa-1576651270043)]
初始化容器數(shù)組
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {
if ((sc = sizeCtl) < 0)
Thread.yield(); // lost initialization race; just spin
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
if ((tab = table) == null || tab.length == 0) {
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
break;
}
}
return tab;
}
- 通過sc變量來控制初始化過程,
sc=-1
在進(jìn)行容器初始化十绑,其他線程循環(huán)等待并讓出CPU(Thread.yield()
) - 初始化
Node[]
數(shù)組對象聚至,并返回
存放元素
final V putVal(K key, V value, boolean onlyIfAbsent) {
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null,new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}else {
V oldVal = null;
synchronized (f) {
if (tabAt(tab, i) == f) {
if (fh >= 0) {
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
addCount(1L, binCount);
return null;
}
- 獲取容器中
Node[]
數(shù)組的頭結(jié)點,如果頭結(jié)點為空本橙,則構(gòu)造新節(jié)點扳躬,并添加到該頭結(jié)點上(CAS)。 - 如果不是頭節(jié)點甚亭,則使用
synchronized
鎖住頭節(jié)點贷币,循環(huán)進(jìn)行鏈表尾部添加
5.2.3 容器擴(kuò)容
https://www.cnblogs.com/menghuantiancheng/p/10462370.html
5.2.4 獲取值
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
int h = spread(key.hashCode());
if ((tab = table) != null && (n = tab.length) >0&&(e = tabAt(tab, (n - 1) & h)) != null) {
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
} else if (eh < 0)
return (p = e.find(h, key)) != null ? p.val : null;
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}
- 獲取key相應(yīng)的
hash
值,定位到Node[]數(shù)組的索引位亏狰,依次循環(huán)遍歷鏈表役纹,最終定位到數(shù)據(jù)。
參考文檔:
https://www.cnblogs.com/weiweiblog/archive/2018/09/09/9611271.html