前言
以前寫過介紹HashMap的文章,文中提到過HashMap在put的時(shí)候,插入的元素超過了容量(由負(fù)載因子決定)的范圍就會(huì)觸發(fā)擴(kuò)容操作,就是rehash倾鲫,這個(gè)會(huì)重新將原數(shù)組的內(nèi)容重新hash到新的擴(kuò)容數(shù)組中欧引,在多線程的環(huán)境下乌企,存在同時(shí)其他的元素也在進(jìn)行put操作拱层,如果hash值相同,可能出現(xiàn)同時(shí)在同一數(shù)組下用鏈表表示绿渣,造成閉環(huán)朝群,導(dǎo)致在get時(shí)會(huì)出現(xiàn)死循環(huán),所以HashMap是線程不安全的中符。
我們來了解另一個(gè)鍵值存儲(chǔ)集合HashTable潜圃,它是線程安全的,它在所有涉及到多線程操作的都加上了synchronized關(guān)鍵字來鎖住整個(gè)table舟茶,這就意味著所有的線程都在競(jìng)爭(zhēng)一把鎖,在多線程的環(huán)境下堵第,它是安全的吧凉,但是無疑是效率低下的。
其實(shí)HashTable有很多的優(yōu)化空間踏志,鎖住整個(gè)table這么粗暴的方法可以變相的柔和點(diǎn)阀捅,比如在多線程的環(huán)境下,對(duì)不同的數(shù)據(jù)集進(jìn)行操作時(shí)其實(shí)根本就不需要去競(jìng)爭(zhēng)一個(gè)鎖针余,因?yàn)樗麄儾煌琱ash值饲鄙,不會(huì)因?yàn)閞ehash造成線程不安全,所以互不影響圆雁,這就是鎖分離技術(shù)忍级,將鎖的粒度降低,利用多個(gè)鎖來控制多個(gè)小的table伪朽,這就是這篇文章的主角ConcurrentHashMap JDK1.7版本的核心思想轴咱。
ConcurrentHashMap
JDK1.7的實(shí)現(xiàn)
在JDK1.7版本中,ConcurrentHashMap的數(shù)據(jù)結(jié)構(gòu)是由一個(gè)Segment數(shù)組和多個(gè)HashEntry組成烈涮,如下圖所示:
Segment數(shù)組的意義就是將一個(gè)大的table分割成多個(gè)小的table來進(jìn)行加鎖朴肺,也就是上面的提到的鎖分離技術(shù),而每一個(gè)Segment元素存儲(chǔ)的是HashEntry數(shù)組+鏈表坚洽,這個(gè)和HashMap的數(shù)據(jù)存儲(chǔ)結(jié)構(gòu)一樣
初始化
ConcurrentHashMap的初始化是會(huì)通過位與運(yùn)算來初始化Segment的大小戈稿,用ssize來表示,如下所示
int sshift = 0;
int ssize = 1;
while (ssize < concurrencyLevel) {
++sshift;
ssize <<= 1;
}
如上所示讶舰,因?yàn)閟size用位于運(yùn)算來計(jì)算(ssize <<=1)鞍盗,所以Segment的大小取值都是以2的N次方需了,無關(guān)concurrencyLevel的取值,當(dāng)然concurrencyLevel最大只能用16位的二進(jìn)制來表示橡疼,即65536援所,換句話說,Segment的大小最多65536個(gè)欣除,沒有指定concurrencyLevel元素初始化住拭,Segment的大小ssize默認(rèn)為16
每一個(gè)Segment元素下的HashEntry的初始化也是按照位于運(yùn)算來計(jì)算,用cap來表示历帚,如下所示
int cap = 1;
while (cap < c)
cap <<= 1;
如上所示滔岳,HashEntry大小的計(jì)算也是2的N次方(cap <<=1), cap的初始值為1挽牢,所以HashEntry最小的容量為2
put操作
對(duì)于ConcurrentHashMap的數(shù)據(jù)插入谱煤,這里要進(jìn)行兩次Hash去定位數(shù)據(jù)的存儲(chǔ)位置
static class Segment<K,V> extends ReentrantLock implements Serializable {
從上Segment的繼承體系可以看出,Segment實(shí)現(xiàn)了ReentrantLock,也就帶有鎖的功能禽拔,當(dāng)執(zhí)行put操作時(shí)刘离,會(huì)進(jìn)行第一次key的hash來定位Segment的位置,如果該Segment還沒有初始化睹栖,即通過CAS操作進(jìn)行賦值硫惕,然后進(jìn)行第二次hash操作,找到相應(yīng)的HashEntry的位置野来,這里會(huì)利用繼承過來的鎖的特性恼除,在將數(shù)據(jù)插入指定的HashEntry位置時(shí)(鏈表的尾端),會(huì)通過繼承ReentrantLock的tryLock()方法嘗試去獲取鎖曼氛,如果獲取成功就直接插入相應(yīng)的位置豁辉,如果已經(jīng)有線程獲取該Segment的鎖,那當(dāng)前線程會(huì)以自旋的方式去繼續(xù)的調(diào)用tryLock()方法去獲取鎖舀患,超過指定次數(shù)就掛起徽级,等待喚醒。
get操作
ConcurrentHashMap的get操作跟HashMap類似聊浅,只是ConcurrentHashMap第一次需要經(jīng)過一次hash定位到Segment的位置灰追,然后再hash定位到指定的HashEntry,遍歷該HashEntry下的鏈表進(jìn)行對(duì)比狗超,成功就返回弹澎,不成功就返回null。
size操作
計(jì)算ConcurrentHashMap的元素大小是一個(gè)有趣的問題努咐,因?yàn)樗遣l(fā)操作的苦蒿,就是在你計(jì)算size的時(shí)候,他還在并發(fā)的插入數(shù)據(jù)渗稍,可能會(huì)導(dǎo)致你計(jì)算出來的size和你實(shí)際的size有相差(在你return size的時(shí)候佩迟,插入了多個(gè)數(shù)據(jù))团滥,要解決這個(gè)問題,JDK1.7版本用兩種方案报强。
try {
for (;;) {
if (retries++ == RETRIES_BEFORE_LOCK) {
for (int j = 0; j < segments.length; ++j) ensureSegment(j).lock(); // force creation
}
sum = 0L;
size = 0;
overflow = false;
for (int j = 0; j < segments.length; ++j) {
Segment<K,V> seg = segmentAt(segments, j);
if (seg != null) { sum += seg.modCount; int c = seg.count; if (c < 0 || (size += c) < 0)
overflow = true;
} }
if (sum == last) break;
last = sum; } }
finally {
if (retries > RETRIES_BEFORE_LOCK) {
for (int j = 0; j < segments.length; ++j)
segmentAt(segments, j).unlock();
}
}
第一種方案他會(huì)使用不加鎖的模式去嘗試多次計(jì)算ConcurrentHashMap的size灸姊,最多三次,比較前后兩次計(jì)算的結(jié)果秉溉,結(jié)果一致就認(rèn)為當(dāng)前沒有元素加入力惯,計(jì)算的結(jié)果是準(zhǔn)確的;
第二種方案是如果第一種方案不符合召嘶,他就會(huì)給每個(gè)Segment加上鎖父晶,然后計(jì)算ConcurrentHashMap的size返回。
JDK1.8的實(shí)現(xiàn)
JDK1.8的實(shí)現(xiàn)已經(jīng)摒棄了Segment的概念弄跌,而是直接用Node數(shù)組+鏈表+紅黑樹的數(shù)據(jù)結(jié)構(gòu)來實(shí)現(xiàn)甲喝,并發(fā)控制使用Synchronized和CAS來操作,整個(gè)看起來就像是優(yōu)化過且線程安全的HashMap铛只,雖然在JDK1.8中還能看到Segment的數(shù)據(jù)結(jié)構(gòu)埠胖,但是已經(jīng)簡(jiǎn)化了屬性,只是為了兼容舊版本淳玩。
在深入JDK1.8的put和get實(shí)現(xiàn)之前要知道一些常量設(shè)計(jì)和數(shù)據(jù)結(jié)構(gòu)直撤,這些是構(gòu)成ConcurrentHashMap實(shí)現(xiàn)結(jié)構(gòu)的基礎(chǔ),下面看一下基本屬性:
// node數(shù)組最大容量:2^30=1073741824
private static final int MAXIMUM_CAPACITY = 1 << 30;
// 默認(rèn)初始值凯肋,必須是2的幕數(shù)
private static final int DEFAULT_CAPACITY = 16;
//數(shù)組可能最大值,需要與toArray()相關(guān)方法關(guān)聯(lián)
static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
//并發(fā)級(jí)別汽馋,遺留下來的侮东,為兼容以前的版本
private static final int DEFAULT_CONCURRENCY_LEVEL = 16;
// 負(fù)載因子
private static final float LOAD_FACTOR = 0.75f;
// 鏈表轉(zhuǎn)紅黑樹閥值,> 8 鏈表轉(zhuǎn)換為紅黑樹
static final int TREEIFY_THRESHOLD = 8;
//樹轉(zhuǎn)鏈表閥值,小于等于6(tranfer時(shí)豹芯,lc悄雅、hc=0兩個(gè)計(jì)數(shù)器分別++記錄原bin、新binTreeNode數(shù)量铁蹈,<=UNTREEIFY_THRESHOLD 則untreeify(lo))
static final int UNTREEIFY_THRESHOLD = 6;
static final int MIN_TREEIFY_CAPACITY = 64;
private static final int MIN_TRANSFER_STRIDE = 16;
private static int RESIZE_STAMP_BITS = 16;
// 2^15-1宽闲,help resize的最大線程數(shù)
private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;
// 32-16=16,sizeCtl中記錄size大小的偏移量
private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;
// forwarding nodes的hash值
static final int MOVED = -1;
// 樹根節(jié)點(diǎn)的hash值
static final int TREEBIN = -2;
// ReservationNode的hash值
static final int RESERVED = -3;
// 可用處理器數(shù)量
static final int NCPU = Runtime.getRuntime().availableProcessors();
//存放node的數(shù)組
transient volatile Node<K,V>[] table;
/*控制標(biāo)識(shí)符握牧,用來控制table的初始化和擴(kuò)容的操作容诬,不同的值有不同的含義
*當(dāng)為負(fù)數(shù)時(shí):-1代表正在初始化,-N代表有N-1個(gè)線程正在 進(jìn)行擴(kuò)容
*當(dāng)為0時(shí):代表當(dāng)時(shí)的table還沒有被初始化
*當(dāng)為正數(shù)時(shí):表示初始化或者下一次進(jìn)行擴(kuò)容的大小
private transient volatile int sizeCtl;
基本屬性定義了ConcurrentHashMap的一些邊界以及操作時(shí)的一些控制沿腰,下面看一些內(nèi)部的一些結(jié)構(gòu)組成览徒,這些是整個(gè)ConcurrentHashMap整個(gè)數(shù)據(jù)結(jié)構(gòu)的核心。
Node
Node是ConcurrentHashMap存儲(chǔ)結(jié)構(gòu)的基本單元颂龙,繼承于HashMap中的Entry习蓬,用于存儲(chǔ)數(shù)據(jù)纽什,源代碼如下
static class Node<K,V> implements Map.Entry<K,V> {
//鏈表的數(shù)據(jù)結(jié)構(gòu)
final int hash;
final K key;
//val和next都會(huì)在擴(kuò)容時(shí)發(fā)生變化,所以加上volatile來保持可見性和禁止重排序
volatile V val;
volatile Node<K,V> next;
Node(int hash, K key, V val, Node<K,V> next) {
this.hash = hash;
this.key = key;
this.val = val;
this.next = next;
}
public final K getKey() { return key; }
public final V getValue() { return val; }
public final int hashCode() { return key.hashCode() ^ val.hashCode(); }
public final String toString(){ return key + "=" + val; }
//不允許更新value
public final V setValue(V value) {
throw new UnsupportedOperationException();
}
public final boolean equals(Object o) {
Object k, v, u; Map.Entry<?,?> e;
return ((o instanceof Map.Entry) &&
(k = (e = (Map.Entry<?,?>)o).getKey()) != null &&
(v = e.getValue()) != null &&
(k == key || k.equals(key)) &&
(v == (u = val) || v.equals(u)));
}
//用于map中的get()方法躲叼,子類重寫
Node<K,V> find(int h, Object k) {
Node<K,V> e = this;
if (k != null) {
do {
K ek;
if (e.hash == h &&
((ek = e.key) == k || (ek != null && k.equals(ek))))
return e;
} while ((e = e.next) != null);
}
return null;
}
}
Node數(shù)據(jù)結(jié)構(gòu)很簡(jiǎn)單芦缰,從上可知,就是一個(gè)鏈表枫慷,但是只允許對(duì)數(shù)據(jù)進(jìn)行查找让蕾,不允許進(jìn)行修改。
TreeNode
TreeNode繼承與Node流礁,但是數(shù)據(jù)結(jié)構(gòu)換成了二叉樹結(jié)構(gòu)涕俗,它是紅黑樹的數(shù)據(jù)的存儲(chǔ)結(jié)構(gòu),用于紅黑樹中存儲(chǔ)數(shù)據(jù)神帅,當(dāng)鏈表的節(jié)點(diǎn)數(shù)大于8時(shí)會(huì)轉(zhuǎn)換成紅黑樹的結(jié)構(gòu)再姑,他就是通過TreeNode作為存儲(chǔ)結(jié)構(gòu)代替Node來轉(zhuǎn)換成黑紅樹源代碼如下。
static final class TreeNode<K,V> extends Node<K,V> {
//樹形結(jié)構(gòu)的屬性定義
TreeNode<K,V> parent; // red-black tree links
TreeNode<K,V> left;
TreeNode<K,V> right;
TreeNode<K,V> prev; // needed to unlink next upon deletion
boolean red; //標(biāo)志紅黑樹的紅節(jié)點(diǎn)
TreeNode(int hash, K key, V val, Node<K,V> next,
TreeNode<K,V> parent) {
super(hash, key, val, next);
this.parent = parent;
}
Node<K,V> find(int h, Object k) {
return findTreeNode(h, k, null);
}
//根據(jù)key查找 從根節(jié)點(diǎn)開始找出相應(yīng)的TreeNode找御,
final TreeNode<K,V> findTreeNode(int h, Object k, Class<?> kc) {
if (k != null) {
TreeNode<K,V> p = this;
do {
int ph, dir; K pk; TreeNode<K,V> q;
TreeNode<K,V> pl = p.left, pr = p.right;
if ((ph = p.hash) > h)
p = pl;
else if (ph < h)
p = pr;
else if ((pk = p.key) == k || (pk != null && k.equals(pk)))
return p;
else if (pl == null)
p = pr;
else if (pr == null)
p = pl;
else if ((kc != null ||
(kc = comparableClassFor(k)) != null) &&
(dir = compareComparables(kc, k, pk)) != 0)
p = (dir < 0) ? pl : pr;
else if ((q = pr.findTreeNode(h, k, kc)) != null)
return q;
else
p = pl;
} while (p != null);
}
return null;
}
}
**TreeBin**
TreeBin從字面含義中可以理解為存儲(chǔ)樹形結(jié)構(gòu)的容器元镀,而樹形結(jié)構(gòu)就是指TreeNode,所以TreeBin就是封裝TreeNode的容器霎桅,它提供轉(zhuǎn)換黑紅樹的一些條件和鎖的控制栖疑,部分源碼結(jié)構(gòu)如下。
static final class TreeBin<K,V> extends Node<K,V> {
//指向TreeNode列表和根節(jié)點(diǎn)
TreeNode<K,V> root;
volatile TreeNode<K,V> first;
volatile Thread waiter;
volatile int lockState;
// 讀寫鎖狀態(tài)
static final int WRITER = 1; // 獲取寫鎖的狀態(tài)
static final int WAITER = 2; // 等待寫鎖的狀態(tài)
static final int READER = 4; // 增加數(shù)據(jù)時(shí)讀鎖的狀態(tài)
/**
* 初始化紅黑樹
*/
TreeBin(TreeNode<K,V> b) {
super(TREEBIN, null, null, null);
this.first = b;
TreeNode<K,V> r = null;
for (TreeNode<K,V> x = b, next; x != null; x = next) {
next = (TreeNode<K,V>)x.next;
x.left = x.right = null;
if (r == null) {
x.parent = null;
x.red = false;
r = x;
}
else {
K k = x.key;
int h = x.hash;
Class<?> kc = null;
for (TreeNode<K,V> p = r;;) {
int dir, ph;
K pk = p.key;
if ((ph = p.hash) > h)
dir = -1;
else if (ph < h)
dir = 1;
else if ((kc == null &&
(kc = comparableClassFor(k)) == null) ||
(dir = compareComparables(kc, k, pk)) == 0)
dir = tieBreakOrder(k, pk);
TreeNode<K,V> xp = p;
if ((p = (dir <= 0) ? p.left : p.right) == null) {
x.parent = xp;
if (dir <= 0)
xp.left = x;
else
xp.right = x;
r = balanceInsertion(r, x);
break;
}
}
}
}
this.root = r;
assert checkInvariants(root);
}
......
}
介紹了ConcurrentHashMap主要的屬性與內(nèi)部的數(shù)據(jù)結(jié)構(gòu)滔驶,現(xiàn)在通過一個(gè)簡(jiǎn)單的例子以debug的視角看看ConcurrentHashMap的具體操作細(xì)節(jié)遇革。
public class TestConcurrentHashMap{
public static void main(String[] args){
ConcurrentHashMap<String,String> map = new ConcurrentHashMap(); //初始化ConcurrentHashMap
//新增個(gè)人信息
map.put("id","1");
map.put("name","andy");
map.put("sex","男");
//獲取姓名
String name = map.get("name");
Assert.assertEquals(name,"andy");
//計(jì)算大小
int size = map.size();
Assert.assertEquals(size,3);
}
}
我們先通過new ConcurrentHashMap()來進(jìn)行初始化
public ConcurrentHashMap() {
}
由上你會(huì)發(fā)現(xiàn)ConcurrentHashMap的初始化其實(shí)是一個(gè)空實(shí)現(xiàn),并沒有做任何事揭糕,這里后面會(huì)講到萝快,這也是和其他的集合類有區(qū)別的地方,初始化操作并不是在構(gòu)造函數(shù)實(shí)現(xiàn)的著角,而是在put操作中實(shí)現(xiàn)揪漩,當(dāng)然ConcurrentHashMap還提供了其他的構(gòu)造函數(shù),有指定容量大小或者指定負(fù)載因子吏口,跟HashMap一樣奄容,這里就不做介紹了。
put操作
在上面的例子中我們新增個(gè)人信息會(huì)調(diào)用put方法产徊,我們來看下昂勒。
public V put(K key, V value) {
return putVal(key, value, false);
}
/** Implementation for put and putIfAbsent */
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode()); //兩次hash,減少hash沖突舟铜,可以均勻分布
int binCount = 0;
for (Node<K,V>[] tab = table;;) { //對(duì)這個(gè)table進(jìn)行迭代
Node<K,V> f; int n, i, fh;
//這里就是上面構(gòu)造方法沒有進(jìn)行初始化叁怪,在這里進(jìn)行判斷,為null就調(diào)用initTable進(jìn)行初始化深滚,屬于懶漢模式初始化
if (tab == null || (n = tab.length) == 0)
tab = initTable();
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {//如果i位置沒有數(shù)據(jù)奕谭,就直接無鎖插入
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
else if ((fh = f.hash) == MOVED)//如果在進(jìn)行擴(kuò)容涣觉,則先進(jìn)行擴(kuò)容操作
tab = helpTransfer(tab, f);
else {
V oldVal = null;
//如果以上條件都不滿足,那就要進(jìn)行加鎖操作血柳,也就是存在hash沖突官册,鎖住鏈表或者紅黑樹的頭結(jié)點(diǎn)
synchronized (f) {
if (tabAt(tab, i) == f) {
if (fh >= 0) { //表示該節(jié)點(diǎn)是鏈表結(jié)構(gòu)
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
//這里涉及到相同的key進(jìn)行put就會(huì)覆蓋原先的value
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) { //插入鏈表尾部
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
else if (f instanceof TreeBin) {//紅黑樹結(jié)構(gòu)
Node<K,V> p;
binCount = 2;
//紅黑樹結(jié)構(gòu)旋轉(zhuǎn)插入
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) { //如果鏈表的長(zhǎng)度大于8時(shí)就會(huì)進(jìn)行紅黑樹的轉(zhuǎn)換
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
addCount(1L, binCount);//統(tǒng)計(jì)size,并且檢查是否需要擴(kuò)容
return null;
}
這個(gè)put的過程很清晰难捌,對(duì)當(dāng)前的table進(jìn)行無條件自循環(huán)直到put成功膝宁,可以分成以下六步流程來概述。
如果沒有初始化就先調(diào)用initTable()方法來進(jìn)行初始化過程
如果沒有hash沖突就直接CAS插入
如果還在進(jìn)行擴(kuò)容操作就先進(jìn)行擴(kuò)容
如果存在hash沖突根吁,就加鎖來保證線程安全员淫,這里有兩種情況,一種是鏈表形式就直接遍歷到尾端插入击敌,一種是紅黑樹就按照紅黑樹結(jié)構(gòu)插入介返,
最后一個(gè)如果該鏈表的數(shù)量大于閾值8,就要先轉(zhuǎn)換成黑紅樹的結(jié)構(gòu)沃斤,break再一次進(jìn)入循環(huán)
如果添加成功就調(diào)用addCount()方法統(tǒng)計(jì)size圣蝎,并且檢查是否需要擴(kuò)容
1、具有1-5工作經(jīng)驗(yàn)的衡瓶,面對(duì)目前流行的技術(shù)不知從何下手徘公,需要突破技術(shù)瓶頸的可以加群。
2哮针、在公司待久了关面,過得很安逸,但跳槽時(shí)面試碰壁十厢。需要在短時(shí)間內(nèi)進(jìn)修等太、跳槽拿高薪的可以加群。
3寿烟、如果沒有工作經(jīng)驗(yàn)澈驼,但基礎(chǔ)非常扎實(shí)辛燥,對(duì)java工作機(jī)制筛武,常用設(shè)計(jì)思想,常用java開發(fā)框架掌握熟練的挎塌,可以加群徘六。
4、覺得自己很牛B榴都,一般需求都能搞定待锈。但是所學(xué)的知識(shí)點(diǎn)沒有系統(tǒng)化嘴高,很難在技術(shù)領(lǐng)域繼續(xù)突破的可以加群竿音。
5. 群號(hào):高級(jí)架構(gòu)群 283943715 備注好信息和屎!
6.阿里Java高級(jí)大牛直播講解知識(shí)點(diǎn),分享知識(shí)春瞬,各位老師多年工作經(jīng)驗(yàn)的梳理和總結(jié)柴信,帶著大家全面、科學(xué)地建立自己的技術(shù)體系和技術(shù)認(rèn)知宽气!