相關(guān)文章
Java并發(fā)編程(一)線程定義赋咽、狀態(tài)和屬性
Java并發(fā)編程(二)同步
Java并發(fā)編程(三)volatile域
Java并發(fā)編程(四)Java內(nèi)存模型
前言
在Java1.5中,并發(fā)編程大師Doug Lea給我們帶來了concurrent包宠哄,而該包中提供的ConcurrentHashMap是線程安全并且高效的HashMap濒募,本節(jié)我們就來研究下ConcurrentHashMap是如何保證線程安全的同時又能高效的操作懈糯。
1.為何用ConcurrentHashMap
在并發(fā)編程中使用HashMap可能會導(dǎo)致死循環(huán)奢方,而使用線程安全的HashTable效率又低下润樱。
線程不安全的HashMap
在多線程環(huán)境下,使用HashMap進(jìn)行put操作會引起死循環(huán)羡棵,導(dǎo)致CPU利用率接近100%壹若,所以在并發(fā)情況下不能使用HashMap,如以下代碼會導(dǎo)致死循環(huán):
final HashMap<String, String> map = new HashMap<String, String>(2);
Thread t = new Thread(new Runnable() {
@Override
public void run() {
for (int i = 0; i < 10000; i++) {
new Thread(new Runnable() {
@Override
public void run() {
map.put(UUID.randomUUID().toString(), "");
}
}, "moon" + i).start();
}
}
}, "ftf");
t.start();
t.join();
HashMap在并發(fā)執(zhí)行put操作是會引起死循環(huán)皂冰,是因?yàn)槎嗑€程會導(dǎo)致HashMap的Entry鏈表形成環(huán)形數(shù)據(jù)結(jié)構(gòu)店展,一旦形成喚醒數(shù)據(jù)結(jié)構(gòu),Entry的next節(jié)點(diǎn)永遠(yuǎn)不為空秃流,就會產(chǎn)生死循環(huán)赂蕴。
效率低下的HashTable
HashTable使用synchronized來保證線程的安全,但是在線程競爭激烈的情況下HashTable的效率非常低下舶胀。當(dāng)一個線程訪問HashTable的同步方法概说,其他方法訪問HashTable的同步方法時,會進(jìn)入阻塞或者輪詢狀態(tài)嚣伐。如果線程1使用put進(jìn)行元素添加糖赔,線程2不但不能用put方法添加于元素同是也無法用get方法來獲取元素,所以競爭越激烈效率越低轩端。
ConcurrentHashMap的鎖分段技術(shù)
HashTable容器在競爭激烈的并發(fā)環(huán)境效率低下的原因是所有訪問HashTable的線程都必須競爭同一把鎖放典,假如容器有多把鎖,每一把鎖用于鎖住容器中一部分?jǐn)?shù)據(jù)基茵,那么多線程訪問容器里不同數(shù)據(jù)段的數(shù)據(jù)時奋构,線程間就不會存在鎖競爭,從而可以有效提高并發(fā)訪問率拱层,這就是ConcurrentHashMap的鎖分段技術(shù)弥臼。將數(shù)據(jù)分成一段一段的存儲,然后給每一段數(shù)據(jù)配一把鎖舱呻,當(dāng)一個線程占用鎖訪問其中一段數(shù)據(jù)的時候醋火,其他段的數(shù)據(jù)也能被其他線程訪問。
2.Java1.6的ConcurrentHashMap的結(jié)構(gòu)
首先來看看 Java1.6中ConcurrentHashMap的類圖:
ConcurrentHashMap是由Segment數(shù)組結(jié)構(gòu)和HashEntry數(shù)組結(jié)構(gòu)組成箱吕。Segment是一種可重入鎖ReentrantLock芥驳,在ConcurrentHashMap里扮演鎖的角色,HashEntry則用于存儲鍵值對數(shù)據(jù)茬高。一個ConcurrentHashMap里包含一個Segment數(shù)組兆旬,Segment的結(jié)構(gòu)和HashMap類似,是一種數(shù)組和鏈表結(jié)構(gòu)怎栽, 一個Segment里包含一個HashEntry數(shù)組丽猬,每個HashEntry是一個鏈表結(jié)構(gòu)的元素宿饱, 每個Segment守護(hù)者一個HashEntry數(shù)組里的元素,當(dāng)對HashEntry數(shù)組的數(shù)據(jù)進(jìn)行修改時,必須首先獲得它對應(yīng)的Segment鎖脚祟。
3.java1.8的ConcurrentHashMap源碼分析
重要的內(nèi)部類
從Java1.7 版本開始 ConcurrentHashMap 不再采用 Segment 實(shí)現(xiàn)谬以,而是改用 Node,Node 是一個鏈表的結(jié)構(gòu)由桌,每個節(jié)點(diǎn)可以引用到下一個節(jié)點(diǎn)(next)为黎。
Node類
Node是最核心的內(nèi)部類,包裝了key-value鍵值對行您,所有插入ConcurrentHashMap的數(shù)據(jù)都包裝在這里面铭乾。
它與HashMap中的定義很相似,但是有一些差別它對value和next屬性設(shè)置了volatile同步鎖娃循,它不允許調(diào)用setValue方法直接改變Node的value域炕檩,它增加了find方法輔助map.get()方法。TreeNode類
樹節(jié)點(diǎn)類捌斧,另外一個核心的數(shù)據(jù)結(jié)構(gòu)笛质。 當(dāng)鏈表長度過長的時候,會轉(zhuǎn)換為TreeNode骤星。
但是與HashMap不相同的是经瓷,它并不是直接轉(zhuǎn)換為紅黑樹,而是把這些結(jié)點(diǎn)包裝成TreeNode放在TreeBin對象中洞难,由TreeBin完成對紅黑樹的包裝舆吮。
而且TreeNode在ConcurrentHashMap繼承自Node類,而并非HashMap中的集成自LinkedHashMap.EntryTreeBin
這個類并不負(fù)責(zé)包裝用戶的key队贱、value信息色冀,而是包裝的很多TreeNode節(jié)點(diǎn)。它代替了TreeNode的根節(jié)點(diǎn)柱嫌,也就是說在實(shí)際的ConcurrentHashMap“數(shù)組”中锋恬,存放的是TreeBin對象,而不是TreeNode對象编丘,這是與HashMap的區(qū)別与学。ForwardingNode
一個用于連接兩個table的節(jié)點(diǎn)類。它包含一個nextTable指針嘉抓,用于指向下一張表索守。而且這個節(jié)點(diǎn)的key value next指針全部為null,它的hash值為-1.
這里面定義的find的方法是從nextTable里進(jìn)行查詢節(jié)點(diǎn)抑片,而不是以自身為頭節(jié)點(diǎn)進(jìn)行查找
構(gòu)造函數(shù)
public ConcurrentHashMap() {
}
public ConcurrentHashMap(int initialCapacity) {
if (initialCapacity < 0)
throw new IllegalArgumentException();
int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
MAXIMUM_CAPACITY :
tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
this.sizeCtl = cap;
}
public ConcurrentHashMap(Map<? extends K, ? extends V> m) {
this.sizeCtl = DEFAULT_CAPACITY;
putAll(m);
}
public ConcurrentHashMap(int initialCapacity, float loadFactor) {
this(initialCapacity, loadFactor, 1);
}
public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
if (initialCapacity < concurrencyLevel) // Use at least as many bins
initialCapacity = concurrencyLevel; // as estimated threads
long size = (long)(1.0 + (long)initialCapacity / loadFactor);
int cap = (size >= (long)MAXIMUM_CAPACITY) ?
MAXIMUM_CAPACITY : tableSizeFor((int)size);
this.sizeCtl = cap;
}
Java1.8版本的 ConcurrentHashMap 在構(gòu)造函數(shù)中不會初始化 Node 數(shù)組卵佛,而是第一次 put 操作的時候初始化。
整個 Map 第一次 put 的時候,map 中用于存放數(shù)據(jù)的 Node[] 還是null截汪。
Unsafe與CAS
在ConcurrentHashMap中疾牲,大量使用了U.compareAndSwapXXX的方法,這個方法是利用一個CAS算法實(shí)現(xiàn)無鎖化的修改值的操作衙解,他可以大大降低鎖代理的性能消耗阳柔。這個算法的基本思想就是不斷地去比較當(dāng)前內(nèi)存中的變量值與你指定的一個變量值是否相等,如果相等蚓峦,則接受你指定的修改的值盔沫,否則拒絕你的操作。因?yàn)楫?dāng)前線程中的值已經(jīng)不是最新的值枫匾,你的修改很可能會覆蓋掉其他線程修改的結(jié)果。這一點(diǎn)與樂觀鎖拟淮,SVN的思想是比較類似的干茉。
unsafe代碼塊控制了一些屬性的修改工作,比如最常用的SIZECTL 很泊。 在這一版本的concurrentHashMap中角虫,大量應(yīng)用來的CAS方法進(jìn)行變量、屬性的修改工作委造。 利用CAS進(jìn)行無鎖操作戳鹅,可以大大提高性能。
初始化函數(shù)initTable
調(diào)用ConcurrentHashMap的構(gòu)造方法僅僅是設(shè)置了一些參數(shù)而已昏兆,而整個table的初始化是在向ConcurrentHashMap中插入元素的時候發(fā)生的枫虏。如調(diào)用put、computeIfAbsent爬虱、compute隶债、merge等方法的時候,調(diào)用時機(jī)是檢查table==null跑筝。
初始化方法主要應(yīng)用了關(guān)鍵屬性sizeCtl 如果這個值 < 0死讹,表示其他線程正在進(jìn)行初始化,就放棄這個操作曲梗。
在這也可以看出ConcurrentHashMap的初始化只能由一個線程完成赞警。如果獲得了初始化權(quán)限,就用CAS方法將sizeCtl置為-1虏两,防止其他線程進(jìn)入愧旦。初始化數(shù)組后,將sizeCtl的值改為0.75*n
sizeCtl含義
1.負(fù)數(shù)代表正在進(jìn)行初始化或擴(kuò)容操作
2.-1代表正在初始化
3.-N 表示有N-1個線程正在進(jìn)行擴(kuò)容操作
4.正數(shù)或0代表hash表還沒有被初始化碘举,這個數(shù)值表示初始化或下一次進(jìn)行擴(kuò)容的大小忘瓦,這一點(diǎn)類似于擴(kuò)容閾值的概念。還后面可以看到,它的值始終是當(dāng)前ConcurrentHashMap容量的0.75倍耕皮,這與loadfactor是對應(yīng)的境蜕。
/**
* Initializes table, using the size recorded in sizeCtl.
*/
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {
//sizeCtl表示有其他線程正在進(jìn)行初始化操作,把線程掛起凌停。對于table的初始化工作粱年,只能有一個線程在進(jìn)行。
if ((sc = sizeCtl) < 0)
Thread.yield(); // lost initialization race; just spin
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {//利用CAS方法把sizectl的值置為-1 表示本線程正在進(jìn)行初始化
try {
if ((tab = table) == null || tab.length == 0) {
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
sc = n - (n >>> 2);//相當(dāng)于0.75*n 設(shè)置一個擴(kuò)容的閾值
}
} finally {
sizeCtl = sc;
}
break;
}
}
return tab;
}
擴(kuò)容方法transfer
支持多線程進(jìn)行擴(kuò)容操作罚拟,并沒有加鎖 台诗,這樣做的目的不僅僅是為了滿足concurrent的要求,而是希望利用并發(fā)處理去減少擴(kuò)容帶來的時間影響赐俗。
單線程擴(kuò)容的大體思想就是遍歷拉队、復(fù)制的過程。首先根據(jù)運(yùn)算得到需要遍歷的次數(shù)i阻逮,然后利用tabAt方法獲得i位置的元素:
- 如果這個位置為空粱快,就在原table中的i位置放入forwardNode節(jié)點(diǎn),這個也是觸發(fā)并發(fā)擴(kuò)容的關(guān)鍵點(diǎn)叔扼;
- 如果這個位置是Node節(jié)點(diǎn)(fh>=0)事哭,如果它是一個鏈表的頭節(jié)點(diǎn),就構(gòu)造一個反序鏈表瓜富,把他們分別放在nextTable的i和i+n的位置上
- 如果這個位置是TreeBin節(jié)點(diǎn)(fh<0)鳍咱,也做一個反序處理,并且判斷是否需要untreefi与柑,把處理的結(jié)果分別放在nextTable的i和i+n的位置上
- 遍歷過所有的節(jié)點(diǎn)以后就完成了復(fù)制工作谤辜,這時讓nextTable作為新的table,并且更新sizeCtl為新容量的0.75倍仅胞,完成擴(kuò)容乳幸。
多線程遍歷節(jié)點(diǎn)拭嫁,處理了一個節(jié)點(diǎn),就把對應(yīng)點(diǎn)的值set為forward,另一個線程看到forward兆沙,就向后繼續(xù)遍歷材鹦,再加上給節(jié)點(diǎn)上鎖的機(jī)制蔗喂,就完成了多線程的控制再姑。這樣交叉就完成了復(fù)制工作。而且還很好的解決了線程安全的問題编整。
/**
* 一個過渡的table表 只有在擴(kuò)容的時候才會使用
*/
private transient volatile Node<K,V>[] nextTable;
/**
* Moves and/or copies the nodes in each bin to new table. See
* above for explanation.
*/
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
if (nextTab == null) { // initiating
try {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];//構(gòu)造一個nextTable對象 它的容量是原來的兩倍
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
transferIndex = n;
}
int nextn = nextTab.length;
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);//構(gòu)造一個連節(jié)點(diǎn)指針 用于標(biāo)志位
boolean advance = true;//并發(fā)擴(kuò)容的關(guān)鍵屬性 如果等于true 說明這個節(jié)點(diǎn)已經(jīng)處理過
boolean finishing = false; // to ensure sweep before committing nextTab
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
//這個while循環(huán)體的作用就是在控制i-- 通過i--可以依次遍歷原h(huán)ash表中的節(jié)點(diǎn)
while (advance) {
int nextIndex, nextBound;
if (--i >= bound || finishing)
advance = false;
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
if (finishing) {
//如果所有的節(jié)點(diǎn)都已經(jīng)完成復(fù)制工作 就把nextTable賦值給table 清空臨時對象nextTable
nextTable = null;
table = nextTab;
sizeCtl = (n << 1) - (n >>> 1);//擴(kuò)容閾值設(shè)置為原來容量的1.5倍 依然相當(dāng)于現(xiàn)在容量的0.75倍
return;
}
//利用CAS方法更新這個擴(kuò)容閾值舔稀,在這里面sizectl值減一,說明新加入一個線程參與到擴(kuò)容操作
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
finishing = advance = true;
i = n; // recheck before commit
}
}
//如果遍歷到的節(jié)點(diǎn)為空 則放入ForwardingNode指針
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
//如果遍歷到ForwardingNode節(jié)點(diǎn) 說明這個點(diǎn)已經(jīng)被處理過了 直接跳過 這里是控制并發(fā)擴(kuò)容的核心
else if ((fh = f.hash) == MOVED)
advance = true; // already processed
else {
//節(jié)點(diǎn)上鎖
synchronized (f) {
if (tabAt(tab, i) == f) {
Node<K,V> ln, hn;
//如果fh>=0 證明這是一個Node節(jié)點(diǎn)
if (fh >= 0) {
int runBit = fh & n;
//以下的部分在完成的工作是構(gòu)造兩個鏈表 一個是原鏈表 另一個是原鏈表的反序排列
Node<K,V> lastRun = f;
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
//在nextTable的i位置上插入一個鏈表
setTabAt(nextTab, i, ln);
//在nextTable的i+n的位置上插入另一個鏈表
setTabAt(nextTab, i + n, hn);
//在table的i位置上插入forwardNode節(jié)點(diǎn) 表示已經(jīng)處理過該節(jié)點(diǎn)
setTabAt(tab, i, fwd);
//設(shè)置advance為true 返回到上面的while循環(huán)中 就可以執(zhí)行i--操作
advance = true;
}
//對TreeBin對象進(jìn)行處理 與上面的過程類似
else if (f instanceof TreeBin) {
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
//構(gòu)造正序和反序兩個鏈表
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
//如果擴(kuò)容后已經(jīng)不再需要tree的結(jié)構(gòu) 反向轉(zhuǎn)換為鏈表結(jié)構(gòu)
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
//在nextTable的i位置上插入一個鏈表
setTabAt(nextTab, i, ln);
//在nextTable的i+n的位置上插入另一個鏈表
setTabAt(nextTab, i + n, hn);
//在table的i位置上插入forwardNode節(jié)點(diǎn) 表示已經(jīng)處理過該節(jié)點(diǎn)
setTabAt(tab, i, fwd);
//設(shè)置advance為true 返回到上面的while循環(huán)中 就可以執(zhí)行i--操作
advance = true;
}
}
}
}
}
}
put函數(shù)
put方法依然沿用HashMap的put方法的思想掌测,根據(jù)hash值計算這個新插入的點(diǎn)在table中的位置i内贮,如果i位置是空的,直接放進(jìn)去,否則進(jìn)行判斷夜郁,如果i位置是樹節(jié)點(diǎn)什燕,按照樹的方式插入新的節(jié)點(diǎn),否則把i插入到鏈表的末尾竞端。ConcurrentHashMap中依然沿用這個思想屎即,有一個最重要的不同點(diǎn)就是ConcurrentHashMap不允許key或value為null值。另外由于涉及到多線程事富,put方法就要復(fù)雜一點(diǎn)技俐。在多線程中可能有以下兩個情況:
- 如果一個或多個線程正在對ConcurrentHashMap進(jìn)行擴(kuò)容操作,當(dāng)前線程也要進(jìn)入擴(kuò)容的操作中统台。這個擴(kuò)容的操作之所以能被檢測到雕擂,是因?yàn)閠ransfer方法中在空結(jié)點(diǎn)上插入forward節(jié)點(diǎn),如果檢測到需要插入的位置被forward節(jié)點(diǎn)占有贱勃,就幫助進(jìn)行擴(kuò)容捂刺。
- 如果檢測到要插入的節(jié)點(diǎn)是非空且不是forward節(jié)點(diǎn),就對這個節(jié)點(diǎn)加鎖募寨,這樣就保證了線程安全。盡管這個有一些影響效率森缠,但是還是會比hashTable的synchronized要好得多拔鹰。
整體流程就是首先定義不允許key或value為null的情況放入 對于每一個放入的值,首先利用spread方法對key的hashcode進(jìn)行一次hash計算贵涵,由此來確定這個值在table中的位置列肢。如果這個位置是空的,那么直接放入宾茂,而且不需要加鎖操作瓷马。
如果這個位置存在結(jié)點(diǎn),說明發(fā)生了hash碰撞跨晴,首先判斷這個節(jié)點(diǎn)的類型欧聘。如果是鏈表節(jié)點(diǎn)(fh>0),則得到的結(jié)點(diǎn)就是hash值相同的節(jié)點(diǎn)組成的鏈表的頭節(jié)點(diǎn)。需要依次向后遍歷確定這個新加入的值所在位置端盆。如果遇到hash值與key值都與新加入節(jié)點(diǎn)是一致的情況怀骤,則只需要更新value值即可。否則依次向后遍歷焕妙,直到鏈表尾插入這個結(jié)點(diǎn)蒋伦。 如果加入這個節(jié)點(diǎn)以后鏈表長度大于8,就把這個鏈表轉(zhuǎn)換成紅黑樹焚鹊。如果這個節(jié)點(diǎn)的類型已經(jīng)是樹節(jié)點(diǎn)的話痕届,直接調(diào)用樹節(jié)點(diǎn)的插入方法進(jìn)行插入新的值。
final V putVal(K key, V value, boolean onlyIfAbsent) {
//不允許 key或value為null
if (key == null || value == null) throw new NullPointerException();
//計算hash值
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
// 第一次 put 操作的時候初始化,如果table為空的話研叫,初始化table
if (tab == null || (n = tab.length) == 0)
tab = initTable();
//根據(jù)hash值計算出在table里面的位置
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
// 根據(jù)對應(yīng)的key hash 到具體的索引锤窑,如果該索引對應(yīng)的 Node 為 null,則采用 CAS 操作更新整個 table
// 如果這個位置沒有值 蓝撇,直接放進(jìn)去果复,不需要加鎖
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
//當(dāng)遇到表連接點(diǎn)時,需要進(jìn)行整合表的操作
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else {
V oldVal = null;
// 結(jié)點(diǎn)上鎖渤昌,只是對鏈表頭結(jié)點(diǎn)作鎖操作
synchronized (f) {
if (tabAt(tab, i) == f) {
//fh > 0 說明這個節(jié)點(diǎn)是一個鏈表的節(jié)點(diǎn) 不是樹的節(jié)點(diǎn)
if (fh >= 0) {
binCount = 1;
//在這里遍歷鏈表所有的結(jié)點(diǎn)
for (Node<K,V> e = f;; ++binCount) {
K ek;
//如果hash值和key值相同 則修改對應(yīng)結(jié)點(diǎn)的value值
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
//如果遍歷到了最后一個結(jié)點(diǎn)虽抄,那么就證明新的節(jié)點(diǎn)需要插入 就把它插入在鏈表尾部
if ((e = e.next) == null) {
// 插入到鏈表尾
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
//如果這個節(jié)點(diǎn)是樹節(jié)點(diǎn),就按照樹的方式插入值
else if (f instanceof TreeBin) {
// 如果是紅黑樹結(jié)點(diǎn)独柑,按照紅黑樹的插入
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
// 如果這個鏈表結(jié)點(diǎn)達(dá)到了臨界值8迈窟,那么把這個鏈表轉(zhuǎn)換成紅黑樹
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
//將當(dāng)前ConcurrentHashMap的元素數(shù)量+1,table的擴(kuò)容是在這里發(fā)生的
addCount(1L, binCount);
return null;
}
協(xié)助擴(kuò)容函數(shù)helpTransfer
這是一個協(xié)助擴(kuò)容的方法忌栅。這個方法被調(diào)用的時候车酣,當(dāng)前ConcurrentHashMap一定已經(jīng)有了nextTable對象,首先拿到這個nextTable對象索绪,調(diào)用上面講到的transfer方法來進(jìn)行擴(kuò)容湖员。
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
Node<K,V>[] nextTab; int sc;
if (tab != null && (f instanceof ForwardingNode) &&
(nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
int rs = resizeStamp(tab.length);//計算一個操作校驗(yàn)碼
while (nextTab == nextTable && table == tab &&
(sc = sizeCtl) < 0) {
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || transferIndex <= 0)
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
transfer(tab, nextTab);
break;
}
}
return nextTab;
}
return table;
}
紅黑樹轉(zhuǎn)換
在putVal函數(shù)中,treeifyBin是在鏈表長度達(dá)到一定閾值(8)后轉(zhuǎn)換成紅黑樹的函數(shù)瑞驱。 但是并不是直接轉(zhuǎn)換娘摔,而是進(jìn)行一次容量判斷,如果容量沒有達(dá)到轉(zhuǎn)換的要求唤反,直接進(jìn)行擴(kuò)容操作并返回凳寺;如果滿足條件才將鏈表的結(jié)構(gòu)轉(zhuǎn)換為TreeBin ,這與HashMap不同的是彤侍,它并沒有把TreeNode直接放入紅黑樹肠缨,而是利用了TreeBin這個小容器來封裝所有的TreeNode。
private final void treeifyBin(Node<K,V>[] tab, int index) {
Node<K,V> b; int n, sc;
if (tab != null) {
if ((n = tab.length) < MIN_TREEIFY_CAPACITY)
tryPresize(n << 1);
else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
synchronized (b) {
if (tabAt(tab, index) == b) {
TreeNode<K,V> hd = null, tl = null;
for (Node<K,V> e = b; e != null; e = e.next) {
TreeNode<K,V> p =
new TreeNode<K,V>(e.hash, e.key, e.val,
null, null);
if ((p.prev = tl) == null)
hd = p;
else
tl.next = p;
tl = p;
}
setTabAt(tab, index, new TreeBin<K,V>(hd));
}
}
}
}
}
get方法
給定一個key來確定value的時候盏阶,必須滿足兩個條件 key相同 hash值相同晒奕,對于節(jié)點(diǎn)可能在鏈表或樹上的情況,需要分別去查找名斟。
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
//計算hash值
int h = spread(key.hashCode());
//根據(jù)hash值確定節(jié)點(diǎn)位置
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
//如果搜索到的節(jié)點(diǎn)key與傳入的key相同且不為null,直接返回這個節(jié)點(diǎn)
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
//如果eh<0 說明這個節(jié)點(diǎn)在樹上 直接尋找
else if (eh < 0)
return (p = e.find(h, key)) != null ? p.val : null;
//否則遍歷鏈表 找到對應(yīng)的值并返回
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}
參考資料:
《Java并發(fā)編程的藝術(shù)》
ConcurrentHashMap源碼分析(JDK8版本)