為了解決線程安全問題欧穴,同時(shí)又為了照顧效率的問題,java從1.5就有了ConcurrentHashMap。從而代替了HashTable锥忿。1.7的ConcurrentHashMap 是用了多個(gè)(默認(rèn)16)ReentrantLock來解決線程安全問題的虚吟。1.8的ConcurrentHashMap 則用了cas來解決的寸认。
如果想讀ConcurrentHashMap源碼的話,一定要具備以下知識(shí)點(diǎn)稍味。
1.volatile關(guān)鍵字废麻。
2.unsafe的cas方法
在閱讀ConcurrentHashMap之前,如果讀過HashMap (JDK1.8)的源碼模庐,也會(huì)有一定的幫助烛愧。
java.util.HashMap(JDK1.8)
下邊只是一些比較簡單的源碼,有很多我也不是很清楚掂碱,歡迎大家溝通怜姿。
initTable
在map(在這里指ConcurrentHashMap,下邊不在贅述)添加的時(shí)候疼燥,一定會(huì)初始化這個(gè)Node數(shù)組沧卢,但是因?yàn)槎嗑€程的原因,不能有多個(gè)線程同時(shí)去初始化醉者,所以這里用while來監(jiān)控但狭,如果有別的線程正在擴(kuò)容,則用yield方法去重新競爭cpu資源撬即。
private final Node<K,V>[] initTable() {
//sizeCtl 負(fù)數(shù)代表正在進(jìn)行初始化或擴(kuò)容操作
//(-1代表正在初始化立磁,-N 表示有N-1個(gè)線程正在進(jìn)行擴(kuò)容操作。0代表hash表還沒有被初始化剥槐,正數(shù)表示初始化或下一次進(jìn)行擴(kuò)容的大小唱歧,這一點(diǎn)類似于擴(kuò)容閾值的概念)
Node<K,V>[] tab; int sc;
//開始初始化
//因?yàn)閠able是volatile修飾的,如果table不為空了粒竖。直接跳出循環(huán)颅崩。
while ((tab = table) == null || tab.length == 0) {
//進(jìn)入此判斷,表示已經(jīng)有線程正在初始化或者擴(kuò)容了蕊苗。
if ((sc = sizeCtl) < 0)
//線程從運(yùn)行狀態(tài)變?yōu)榫途w狀態(tài)沿后。
Thread.yield(); // lost initialization race; just spin
//通過unsafe類的cas把sizeCtl更換為-1
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
if ((tab = table) == null || tab.length == 0) {
//todo 暫時(shí)想不通為什么sc會(huì)大于0 ,多這個(gè)判斷的意義是什么岁歉,得运。因?yàn)閟c并不是volatile修飾膝蜈。上邊只有賦值,賦值完馬上判斷
//table大小默認(rèn)值為16
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
//創(chuàng)建node數(shù)組
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
//更新閾值 16 - 4 = 12. 是16 * 0.75
sc = n - (n >>> 2);
}
} finally {
//把sc賦值到sizeCtl上熔掺。
sizeCtl = sc;
}
break;
}
}
return tab;
}
helpTransfer
在map添加元素的時(shí)候饱搏,如果此時(shí)map正在擴(kuò)容。此時(shí)不會(huì)阻塞置逻,而會(huì)進(jìn)入helpTransfer這個(gè)方法推沸, 如果看過hashmap擴(kuò)容源碼,則知道擴(kuò)容的時(shí)候會(huì)有數(shù)組的copy動(dòng)作券坞。此時(shí)這里會(huì)多線程進(jìn)行拷貝鬓催。這個(gè)很NB。
/**
* Helps transfer if a resize is in progress.
*/
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
Node<K,V>[] nextTab; int sc;
if (tab != null && (f instanceof ForwardingNode) &&
(nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
//先通過此時(shí)tab的大小計(jì)算一個(gè)校檢碼
int rs = resizeStamp(tab.length);
//用while監(jiān)控是否擴(kuò)容完畢恨锚。
while (nextTab == nextTable && table == tab &&
(sc = sizeCtl) < 0) {
//如果已經(jīng)擴(kuò)容完畢宇驾,或者不能在擴(kuò)容了。則直接跳出
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || transferIndex <= 0)
break;
//幫助擴(kuò)容
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
transfer(tab, nextTab);
break;
}
}
return nextTab;
}
return table;
}
transfer
這個(gè)擴(kuò)容方法猴伶,我自己看了會(huì)课舍,沒有看太懂,我是從這篇文章中摘取的擴(kuò)容方法的解析他挎。
ConcurrentHashMap
/**
* 一個(gè)過渡的table表 只有在擴(kuò)容的時(shí)候才會(huì)使用
*/
private transient volatile Node<K,V>[] nextTable;
/**
* Moves and/or copies the nodes in each bin to new table. See
* above for explanation.
*/
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
if (nextTab == null) { // initiating
try {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];//構(gòu)造一個(gè)nextTable對象 它的容量是原來的兩倍
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
transferIndex = n;
}
int nextn = nextTab.length;
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);//構(gòu)造一個(gè)連節(jié)點(diǎn)指針 用于標(biāo)志位
boolean advance = true;//并發(fā)擴(kuò)容的關(guān)鍵屬性 如果等于true 說明這個(gè)節(jié)點(diǎn)已經(jīng)處理過
boolean finishing = false; // to ensure sweep before committing nextTab
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
//這個(gè)while循環(huán)體的作用就是在控制i-- 通過i--可以依次遍歷原h(huán)ash表中的節(jié)點(diǎn)
while (advance) {
int nextIndex, nextBound;
if (--i >= bound || finishing)
advance = false;
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
if (finishing) {
//如果所有的節(jié)點(diǎn)都已經(jīng)完成復(fù)制工作 就把nextTable賦值給table 清空臨時(shí)對象nextTable
nextTable = null;
table = nextTab;
sizeCtl = (n << 1) - (n >>> 1);//擴(kuò)容閾值設(shè)置為原來容量的1.5倍 依然相當(dāng)于現(xiàn)在容量的0.75倍
return;
}
//利用CAS方法更新這個(gè)擴(kuò)容閾值筝尾,在這里面sizectl值減一,說明新加入一個(gè)線程參與到擴(kuò)容操作
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
finishing = advance = true;
i = n; // recheck before commit
}
}
//如果遍歷到的節(jié)點(diǎn)為空 則放入ForwardingNode指針
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
//如果遍歷到ForwardingNode節(jié)點(diǎn) 說明這個(gè)點(diǎn)已經(jīng)被處理過了 直接跳過 這里是控制并發(fā)擴(kuò)容的核心
else if ((fh = f.hash) == MOVED)
advance = true; // already processed
else {
//節(jié)點(diǎn)上鎖
synchronized (f) {
if (tabAt(tab, i) == f) {
Node<K,V> ln, hn;
//如果fh>=0 證明這是一個(gè)Node節(jié)點(diǎn)
if (fh >= 0) {
int runBit = fh & n;
//以下的部分在完成的工作是構(gòu)造兩個(gè)鏈表 一個(gè)是原鏈表 另一個(gè)是原鏈表的反序排列
Node<K,V> lastRun = f;
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
//在nextTable的i位置上插入一個(gè)鏈表
setTabAt(nextTab, i, ln);
//在nextTable的i+n的位置上插入另一個(gè)鏈表
setTabAt(nextTab, i + n, hn);
//在table的i位置上插入forwardNode節(jié)點(diǎn) 表示已經(jīng)處理過該節(jié)點(diǎn)
setTabAt(tab, i, fwd);
//設(shè)置advance為true 返回到上面的while循環(huán)中 就可以執(zhí)行i--操作
advance = true;
}
//對TreeBin對象進(jìn)行處理 與上面的過程類似
else if (f instanceof TreeBin) {
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
//構(gòu)造正序和反序兩個(gè)鏈表
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
//如果擴(kuò)容后已經(jīng)不再需要tree的結(jié)構(gòu) 反向轉(zhuǎn)換為鏈表結(jié)構(gòu)
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
//在nextTable的i位置上插入一個(gè)鏈表
setTabAt(nextTab, i, ln);
//在nextTable的i+n的位置上插入另一個(gè)鏈表
setTabAt(nextTab, i + n, hn);
//在table的i位置上插入forwardNode節(jié)點(diǎn) 表示已經(jīng)處理過該節(jié)點(diǎn)
setTabAt(tab, i, fwd);
//設(shè)置advance為true 返回到上面的while循環(huán)中 就可以執(zhí)行i--操作
advance = true;
}
}
}
}
}
}
putVal
接下來就可以看put方法了办桨。 put方法會(huì)直接調(diào)用putVal方法筹淫,他比put方法多一個(gè)onlyIfAbsent參數(shù)。
該參數(shù)的意思為是否只在key為空的時(shí)候添加呢撞。(或者理解為是否覆蓋value)
如果上邊那些方法看明白了损姜。HashMap的put方法看過,那么這個(gè)還是很簡單的殊霞。
//添加方法薛匪。
final V putVal(K key, V value, boolean onlyIfAbsent) {
//concurrentHashMap不允許key或value為空
if (key == null || value == null) throw new NullPointerException();
//獲取key的hash值
int hash = spread(key.hashCode());
int binCount = 0;
//開始循環(huán),并且賦值tab數(shù)組脓鹃。成功后才會(huì)跳出循環(huán)
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
//首次添加會(huì)進(jìn)入此判斷。
if (tab == null || (n = tab.length) == 0)
//初始化node數(shù)組
//因?yàn)檫@是一個(gè)死循環(huán)古沥,初始化后瘸右,會(huì)再次循環(huán)。
tab = initTable();
//通過key的hash值來尋找位置岩齿,如果當(dāng)前位置為空太颤,則進(jìn)入判斷
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
//cas直接放入,如果放入成功盹沈。然后跳出循環(huán)龄章。沒有成功繼續(xù)循環(huán)吃谣。
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
//如果正在擴(kuò)容。則幫助擴(kuò)容
else if ((fh = f.hash) == MOVED)
//多線程擴(kuò)容做裙,返回新的tab
tab = helpTransfer(tab, f);
else {
V oldVal = null;
//當(dāng)該位置已經(jīng)有值 則上鎖岗憋。
synchronized (f) {
//判斷是該值是否已經(jīng)改變
if (tabAt(tab, i) == f) {
//如果f的hashcode大于0.則該節(jié)點(diǎn)是連邊。
if (fh >= 0) {
binCount = 1;
//循環(huán)判斷锚贱,如果hash值一樣仔戈,并且equals也想等,則覆蓋拧廊,如果e已經(jīng)是最后一個(gè)元素监徘,則鏈下去
//在這里每循環(huán)一次,就會(huì)++binCount.為下邊是否變成樹做準(zhǔn)備
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
//如果該節(jié)點(diǎn)是樹吧碾。則用樹的方式往下鏈
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
//如果bincount大于8 則該位置變?yōu)榧t黑樹
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
//增加當(dāng)前的容量凰盔,在增加的時(shí)候 也會(huì)根據(jù)容量來擴(kuò)容。
addCount(1L, binCount);
return null;
}
get
這個(gè)方法沒什么特殊的倦春。跟hashmap的getnode方法差不多户敬,就是找位置然后是樹就從樹中取,是鏈表就從鏈表中取
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
//計(jì)算hash值
int h = spread(key.hashCode());
//如果當(dāng)前tab沒問題并且該位置有值
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
//如果該位置的第一個(gè)就是該值溅漾、就直接返回
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
//如果是樹山叮。則從樹中尋找。
else if (eh < 0)
return (p = e.find(h, key)) != null ? p.val : null;
//如果是鏈表添履,則next下去找屁倔。
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}