1 集合特性
對(duì)于集合框架關(guān)注點(diǎn):
- 集合底層實(shí)現(xiàn)的數(shù)據(jù)結(jié)構(gòu)是什么 數(shù)組+鏈表+紅黑樹
- 集合中元素是否允許為空 否
- 是否允許重復(fù)的數(shù)據(jù) 否
- 是否有序(這里的有序是指讀取數(shù)據(jù)和存放數(shù)據(jù)的順序是否一致) 否
- 是否線程安全叙凡。 是
2 ConcurrentHashMap分析
ConcurrentHashMap 繼承AbstractMap 并實(shí)現(xiàn)了 ConcurrentMap接口
CAS算法涎跨;unsafe.compareAndSwapInt(this, valueOffset, expect, update);
CAS(Compare And Swap)汪榔,意思是如果valueOffset位置包含的值與expect值相同悦荒,則更新valueOffset位置的值為update春锋,并返回true累贤,否則不更新,返回false屹耐。
與Java8的HashMap有相通之處尸疆,底層依然由“數(shù)組”+鏈表+紅黑樹;
底層結(jié)構(gòu)存放的是TreeBin對(duì)象惶岭,而不是TreeNode對(duì)象仓技;
CAS作為知名無(wú)鎖算法,那ConcurrentHashMap就沒(méi)用鎖了么俗他?當(dāng)然不是脖捻,hash值相同的鏈表的頭結(jié)點(diǎn)還是會(huì)synchronized上鎖。
public class ConcurrentHashMap<K,V> extends AbstractMap<K,V>
implements ConcurrentMap<K,V>, Serializable
簡(jiǎn)單寫個(gè)使用
ConcurrentHashMap concurrentHashMap=new ConcurrentHashMap();
concurrentHashMap.put("a",123);
接下來(lái)分析下put方法兆衅,代碼太長(zhǎng)地沮,拆開一步一步分析
if (key == null || value == null) throw new NullPointerException();//
說(shuō)明key和value都不能為空,這是和hashMap的區(qū)別
int hash = spread(key.hashCode());//計(jì)算hash值使元素分布的更均勻
int binCount = 0;
for (Node<K,V>[] tab = table;;)//table即為其基本結(jié)構(gòu)
這里為啥有個(gè)for循環(huán)羡亩?
是因?yàn)樵趖able的初始化和casTabAt用到了compareAndSwapInt摩疑、compareAndSwapObject,因?yàn)槿绻渌€程正在修改tab畏铆,那么嘗試就會(huì)失敗雷袋,所以這邊要加一個(gè)for循環(huán),不斷的嘗試
接著看table的基本結(jié)構(gòu)辞居,為靜態(tài)內(nèi)部類
static class Node<K,V> implements Map.Entry<K,V> {
final int hash;
final K key;
volatile V val;
volatile Node<K,V> next;//保持內(nèi)存可見(jiàn)性
Node(int hash, K key, V val, Node<K,V> next) {
this.hash = hash;
this.key = key;
this.val = val;
this.next = next;
}
public final K getKey() { return key; }
public final V getValue() { return val; }
public final int hashCode() { return key.hashCode() ^ val.hashCode(); }
public final String toString(){ return key + "=" + val; }
public final V setValue(V value) {
throw new UnsupportedOperationException();
}
public final boolean equals(Object o) {
Object k, v, u; Map.Entry<?,?> e;
return ((o instanceof Map.Entry) &&
(k = (e = (Map.Entry<?,?>)o).getKey()) != null &&
(v = e.getValue()) != null &&
(k == key || k.equals(key)) &&
(v == (u = val) || v.equals(u)));
}
/**
* Virtualized support for map.get(); overridden in subclasses.
*/
Node<K,V> find(int h, Object k) {
Node<K,V> e = this;
if (k != null) {
do {
K ek;
if (e.hash == h &&
((ek = e.key) == k || (ek != null && k.equals(ek))))
return e;
} while ((e = e.next) != null);
}
return null;
}
}
接著分析初始化
if (tab == null || (n = tab.length) == 0)
tab = initTable();
initTable
sizeCtl:
負(fù)數(shù)代表正在進(jìn)行初始化或擴(kuò)容操作
-1代表正在初始化
-N表示有N-1個(gè)線程正在進(jìn)行擴(kuò)容操作
正數(shù)或0代表hash表還沒(méi)有被初始化楷怒,這個(gè)數(shù)值表示初始化或下一次進(jìn)行擴(kuò)容的大小,類似于擴(kuò)容閾值瓦灶。它的值始終是當(dāng)前ConcurrentHashMap容量的0.75倍鸠删,這與loadfactor是對(duì)應(yīng)的。實(shí)際容量>=sizeCtl,則擴(kuò)容贼陶。
Thread.yield();標(biāo)識(shí)將線程掛起
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {
if ((sc = sizeCtl) < 0)///sizeCtl<0表示其他線程已經(jīng)在初始化了或者擴(kuò)容了刃泡,掛起當(dāng)前線程
Thread.yield(); // 線程掛起
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
if ((tab = table) == null || tab.length == 0) {
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
break;
}
}
return tab;
}
sizeCtl默認(rèn)為0巧娱,如果ConcurrentHashMap實(shí)例化時(shí)有傳參數(shù),sizeCtl會(huì)是一個(gè)2的冪次方的值烘贴。所以執(zhí)行第一次put操作的線程會(huì)執(zhí)行Unsafe.compareAndSwapInt方法修改sizeCtl為-1禁添,有且只有一個(gè)線程能夠修改成功,其它線程通過(guò)Thread.yield()讓出CPU時(shí)間片等待table初始化完成
那么怎么解決并發(fā)插入的問(wèn)題呢
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0)
tab = initTable();
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else {
V oldVal = null;
synchronized (f) {
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
}
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
Node<K,V> c, Node<K,V> v) {
return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}
static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {
U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v);
}
Doug Lea采用Unsafe.getObjectVolatile來(lái)獲取桨踪,也許有人質(zhì)疑老翘,直接table[index]不可以么,為什么要這么復(fù)雜馒闷?在java內(nèi)存模型中酪捡,我們已經(jīng)知道每個(gè)線程都有一個(gè)工作內(nèi)存叁征,里面存儲(chǔ)著table的副本纳账,雖然table是volatile修飾的,但不能保證線程每次都拿到table中的最新元素捺疼,Unsafe.getObjectVolatile可以直接獲取指定內(nèi)存的數(shù)據(jù)疏虫,保證了每次拿到數(shù)據(jù)都是最新的。
如果f為null啤呼,說(shuō)明table中這個(gè)位置第一次插入元素卧秘,利用Unsafe.compareAndSwapObject方法插入Node節(jié)點(diǎn)。
如果CAS成功官扣,說(shuō)明Node節(jié)點(diǎn)已經(jīng)插入翅敌,隨后addCount(1L, binCount)方法會(huì)檢查當(dāng)前容量是否需要進(jìn)行擴(kuò)容。
如果CAS失敗惕蹄,說(shuō)明有其它線程提前插入了節(jié)點(diǎn)蚯涮,自旋重新嘗試在這個(gè)位置插入節(jié)點(diǎn)。
如果f的hash值為-1卖陵,說(shuō)明當(dāng)前f是ForwardingNode節(jié)點(diǎn)遭顶,意味有其它線程正在擴(kuò)容,則一起進(jìn)行擴(kuò)容操作泪蔫。
其余情況把新的Node節(jié)點(diǎn)按鏈表或紅黑樹的方式插入到合適的位置棒旗,這個(gè)過(guò)程采用同步內(nèi)置鎖實(shí)現(xiàn)并發(fā)
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
Node<K,V>[] nextTab; int sc;
if (tab != null && (f instanceof ForwardingNode) &&
(nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
int rs = resizeStamp(tab.length);
while (nextTab == nextTable && table == tab &&
(sc = sizeCtl) < 0) {
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || transferIndex <= 0)
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
transfer(tab, nextTab);
break;
}
}
return nextTab;
}
return table;
}
在節(jié)點(diǎn)f上進(jìn)行同步,節(jié)點(diǎn)插入之前撩荣,再次利用tabAt(tab, i) == f判斷铣揉,防止被其它線程修改。
如果f.hash >= 0餐曹,說(shuō)明f是鏈表結(jié)構(gòu)的頭結(jié)點(diǎn)老速,遍歷鏈表,如果找到對(duì)應(yīng)的node節(jié)點(diǎn)凸主,則修改value橘券,否則在鏈表尾部加入節(jié)點(diǎn)。
如果f是TreeBin類型節(jié)點(diǎn),說(shuō)明f是紅黑樹根節(jié)點(diǎn)旁舰,則在樹結(jié)構(gòu)上遍歷元素锋华,更新或增加節(jié)點(diǎn)。
如果鏈表中節(jié)點(diǎn)數(shù)binCount >= TREEIFY_THRESHOLD(默認(rèn)是8)箭窜,則把鏈表轉(zhuǎn)化為紅黑樹結(jié)構(gòu)毯焕。
總結(jié)下put的流程
理一下put的流程:
- 判空:null直接拋空指針異常;
- hash:計(jì)算h=key.hashcode磺樱;調(diào)用spread計(jì)算hash=(h ^(h >>>16))& HASH_BITS
- 遍歷table若table為空纳猫,則初始化,僅設(shè)置相關(guān)參數(shù)竹捉;
- 計(jì)算當(dāng)前key存放位置芜辕,即table的下標(biāo)i=(n - 1) & hash;
- 若待存放位置為null块差,casTabAt無(wú)鎖插入侵续;
- 若是forwarding nodes(檢測(cè)到正在擴(kuò)容),則helpTransfer(幫助其擴(kuò)容)憨闰;
else(待插入位置非空且不是forward節(jié)點(diǎn)状蜗,即碰撞了),將頭節(jié)點(diǎn)上鎖(保證了線程安全)鹉动; - 區(qū)分鏈表節(jié)點(diǎn)和樹節(jié)點(diǎn)轧坎,分別插入(遇到hash值與key值都與新節(jié)點(diǎn)一致的情況,只需要更新value值即可泽示。否則依次向后遍歷缸血,直到鏈表尾插入這個(gè)結(jié)點(diǎn);
- 若鏈表長(zhǎng)度>8边琉,則treeifyBin轉(zhuǎn)樹(Note:若length<64,直接tryPresize,兩倍table.length;不轉(zhuǎn)樹)
- addCount(1L, binCount)属百。
- put操作共計(jì)兩次hash操作,再利用“與&”操作計(jì)算Node的存放位置变姨。
get方法
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
int h = spread(key.hashCode());
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
else if (eh < 0)
return (p = e.find(h, key)) != null ? p.val : null;
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}
- spread計(jì)算hash值族扰;
- table不為空;
- tabAt(i)處桶位不為空定欧;
- check first渔呵,是則返回當(dāng)前Node的value;否則分別根據(jù)樹砍鸠、鏈表查詢扩氢。