本文作者:王一飛摆尝,叩丁狼高級(jí)講師多艇。原創(chuàng)文章逻恐,轉(zhuǎn)載請(qǐng)注明出處。
接上篇墩蔓,本篇主要講解ConcurrentHashMap類在并發(fā)環(huán)境下的使用梢莽。慣例萧豆,我們先來看下ConcurrentHashMap實(shí)現(xiàn)原理:
jdk7以前跟以后(jdk8)實(shí)現(xiàn)原理不一樣奸披,所以我們分2個(gè)版本研究,先看jdk7版
jdk7版本
ConcurrentHashMap和HashMap設(shè)計(jì)思路差不多涮雷,但是為支持并發(fā)操作阵面,做了一定的改進(jìn),ConcurrentHashMap引入Segment 的概念洪鸭,目的是將map拆分成多個(gè)Segment(默認(rèn)16個(gè))样刷。操作ConcurrentHashMap細(xì)化到操作某一個(gè)Segment。在多線程環(huán)境下览爵,不同線程操作不同的Segment置鼻,他們互不影響,這便可實(shí)現(xiàn)并發(fā)操作蜓竹。
Segment 字面翻譯是一段箕母,部分的意思,但我們更多稱之為”槽”俱济。Segment 繼承 ReentrantLock類 嘶是,當(dāng)我們對(duì)ConcurrentHashMap并發(fā)操作時(shí),只要鎖住一個(gè) segment蛛碌,其他剩余的Segment依然可以操作聂喇。這樣只要保證每個(gè) Segment 是線程安全的,我們就實(shí)現(xiàn)了全局的線程安全蔚携。
具體結(jié)構(gòu):
上圖可看出ConcurrentHashMap的大體結(jié)構(gòu)希太,ConcurrentHashMap由一個(gè)Segment[]數(shù)組組成,數(shù)組元素是一個(gè)數(shù)組+鏈表的結(jié)構(gòu)酝蜒。
抽取部分源碼:
public class ConcurrentHashMap<K, V> extends AbstractMap<K, V>
implements ConcurrentMap<K, V>, Serializable {
final Segment<K,V>[] segments;
...
}
static final class Segment<K,V> extends ReentrantLock implements Serializable {
transient volatile HashEntry<K,V>[] table;
}
static final class HashEntry<K,V> {
final int hash;
final K key;
volatile V value;
volatile HashEntry<K,V> next;
}
從源碼上看跛十,我們可以看到Segment就類似一個(gè)小型的hashMap,ConcurrentHashMap就是HashMap集合秕硝。那么我們就來看下put添加操作:
ConcurrentHashMap
public V put(K key, V value) {
Segment<K,V> s;
//計(jì)算hash key值
int hash = hash(key);
//通過hash key值計(jì)算出存入Segment的位置
int j = (hash >>> segmentShift) & segmentMask;
if ((s = (Segment<K,V>)UNSAFE.getObject
(segments, (j << SSHIFT) + SBASE)) == null)
//初始化Segment
s = ensureSegment(j);
//添加
return s.put(key, hash, value, false);
}
Segment:
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
//segment操作加鎖芥映,使用嘗試獲取鎖方式洲尊。如果獲取失敗,進(jìn)入scanAndLockForPut方法
HashEntry<K,V> node = tryLock() ? null :
scanAndLockForPut(key, hash, value);
V oldValue;
try {
HashEntry<K,V>[] tab = table;
int index = (tab.length - 1) & hash;
HashEntry<K,V> first = entryAt(tab, index);
for (HashEntry<K,V> e = first;;) {
if (e != null) {
K k;
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k))) {
oldValue = e.value;
if (!onlyIfAbsent) {
e.value = value;
++modCount;
}
break;
}
e = e.next;
}
else {
if (node != null)
node.setNext(first);
else
node = new HashEntry<K,V>(hash, key, value, first);
int c = count + 1;
if (c > threshold && tab.length < MAXIMUM_CAPACITY)
//擴(kuò)容奈偏, 這是后續(xù)做解釋
rehash(node);
else
setEntryAt(tab, index, node);
++modCount;
count = c;
oldValue = null;
break;
}
}
} finally {
//釋放鎖
unlock();
}
return oldValue;
}
上面幾個(gè)方法坞嘀,ConcurrentHashMap在進(jìn)行put操作時(shí),先通過key找到承載的Segment對(duì)象位置惊来,然后競(jìng)爭(zhēng)操作Segment的獨(dú)占鎖丽涩,以確保操作線程。獲取鎖方式很簡(jiǎn)單裁蚁,就是tryLock()矢渊,如果獲取鎖失敗,執(zhí)行scanAndLockForPut方法
private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
HashEntry<K,V> first = entryForHash(this, hash);
HashEntry<K,V> e = first;
HashEntry<K,V> node = null;
int retries = -1; // 迭代次數(shù)
while (!tryLock()) {
HashEntry<K,V> f;
if (retries < 0) {
if (e == null) {
if (node == null) // speculatively create node
node = new HashEntry<K,V>(hash, key, value, null);
retries = 0;
}
else if (key.equals(e.key))
retries = 0;
else
e = e.next;
}
//超過迭代次數(shù)枉证,阻塞
else if (++retries > MAX_SCAN_RETRIES) {
lock();
break;
}
else if ((retries & 1) == 0 &&
(f = entryForHash(this, hash)) != first) {
e = first = f; // re-traverse if entry changed
retries = -1;
}
}
return node;
}
scanAndLockForPut 實(shí)現(xiàn)也比較簡(jiǎn)單矮男,循環(huán)調(diào)用tryLock,多次獲取室谚,如果循環(huán)次數(shù)retries 次數(shù)大于事先設(shè)置定好的MAX_SCAN_RETRIES毡鉴,就執(zhí)行l(wèi)ock() 方法,此方法會(huì)阻塞等待秒赤,一直到成功拿到Segment鎖為止猪瞬。
//循環(huán)次數(shù),單核為1入篮, 多核為64.
static final int MAX_SCAN_RETRIES =
Runtime.getRuntime().availableProcessors() > 1 ? 64 : 1;
解決上篇場(chǎng)景3問題:
回到上一篇<<并發(fā)中的Map>>提到并發(fā)場(chǎng)景3中map擴(kuò)容問題陈瘦,ConcurrentHashMap的擴(kuò)容跟HashMap有點(diǎn)不同, ConcurrentHashMap的Segment槽是固定的16個(gè)潮售,不變的
final Segment<K,V>[] segments;
而ConcurrentHashMap的擴(kuò)容講的是Segment中的HashEntry數(shù)組擴(kuò)容痊项。當(dāng)HashEntry達(dá)到某個(gè)臨界點(diǎn)后,會(huì)擴(kuò)容2為之前的2倍饲做, 原理跟HashMap擴(kuò)容類似线婚。
現(xiàn)在我們,在看會(huì)put方法中一個(gè)if分支
if (c > threshold && tab.length < MAXIMUM_CAPACITY)
//擴(kuò)容盆均, 這是后續(xù)做解釋
rehash(node);
else
rehash方法就是HashEntry擴(kuò)展邏輯塞弊。當(dāng)線程執(zhí)行到rehash方法時(shí),表示當(dāng)前線程已經(jīng)獲取到到當(dāng)前Segment的鎖對(duì)象泪姨,這就表示rehash方法的執(zhí)行是線程安全游沿,不會(huì)存在并發(fā)問題。
ConcurrentHashMap的remove方法跟put方法操作一樣肮砾,先獲取segement對(duì)象后再操作诀黍,這里就不重復(fù)了。那么我們來看下get操作:
public V get(Object key) {
Segment<K,V> s;
HashEntry<K,V>[] tab;
//1:計(jì)算key的hash值
int h = hash(key);
//2:確定在segment的位置仗处,得到HashEntry數(shù)組
long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
(tab = s.table) != null) {
//3:得到數(shù)據(jù)鏈表眯勾,迭代枣宫,查找key對(duì)應(yīng)的value值
for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
(tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
e != null; e = e.next) {
K k;
if ((k = e.key) == key || (e.hash == h && key.equals(k)))
return e.value;
}
}
return null;
}
細(xì)心的朋友會(huì)發(fā)現(xiàn)get方法并沒有獲取鎖的操作,這時(shí)就得討論下執(zhí)行g(shù)et操作線程安全問題啦吃环。
1:一線程執(zhí)行put也颤,另一個(gè)線程執(zhí)行g(shù)et
ConcurrentHashMap約定新添的節(jié)點(diǎn)是在鏈表的表頭, 所以如果先執(zhí)行g(shù)et郁轻,后執(zhí)行put翅娶, get操作已經(jīng)遍歷到鏈表中間了, 不會(huì)影響put的安全執(zhí)行好唯。如果先執(zhí)行put竭沫,這時(shí)候,就必須保證剛剛插入的表頭節(jié)點(diǎn)能被讀取骑篙,ConcurrentHashMap使用的UNSAFE.putOrderedObject賦值方式保證蜕提。
2:一個(gè)線程執(zhí)行put,并在擴(kuò)容操作期間替蛉, 另一個(gè)線程執(zhí)行g(shù)et
ConcurrentHashMap擴(kuò)容是新創(chuàng)建了HashEntry數(shù)組贯溅,然后進(jìn)行遷移數(shù)據(jù)拄氯,最后面將 newTable賦值給oldTable躲查。如果 get 先執(zhí)行,那么就是在oldTable 上做查詢操作译柏,不發(fā)送線程安全問題镣煮;而如果put 先執(zhí)行,那么 put 操作的可見性保證就是 oldTable使用了 volatile 關(guān)鍵字即可鄙麦。
transient volatile HashEntry<K,V>[] table;
3:一線程執(zhí)行remove典唇,另一個(gè)線程執(zhí)行g(shù)et
ConcurrentHashMap的刪除分2種情況, 1>刪除節(jié)點(diǎn)在鏈表表頭胯府。那操作節(jié)點(diǎn)就是HashEntry數(shù)組元素了介衔,雖然HashEntry[] table 使用了volatile修飾, 但是骂因, volatile并保證數(shù)據(jù)內(nèi)部元素的操作可見性炎咖,所以只能使用UNSAFE 來操作元素。2>刪除節(jié)點(diǎn)中標(biāo)中間寒波, 那么好辦乘盼, 只需要保證節(jié)點(diǎn)中的next屬性是volatile修飾即可
static final class HashEntry<K,V> {
final int hash;
final K key;
volatile V value;
volatile HashEntry<K,V> next;
}
總結(jié):get方法之所以不需要加鎖,原因比較簡(jiǎn)單俄烁,get為只讀操作绸栅,不會(huì)改動(dòng)map數(shù)據(jù)結(jié)構(gòu),所以在操作過程中页屠,只需要保證涉及讀取數(shù)據(jù)的屬性為線程可見即可粹胯,也即使用volatile修飾蓖柔。
jdk8版本
jdk8版本的ConcurrentHashMap相對(duì)于jdk7版本,發(fā)送了很大改動(dòng)风纠,jdk8直接拋棄了Segment的設(shè)計(jì)渊抽,采用了較為輕捷的Node + CAS + Synchronized設(shè)計(jì),保證線程安全议忽。
看上圖ConcurrentHashMap的大體結(jié)構(gòu)懒闷,一個(gè)node數(shù)組,默認(rèn)為16栈幸,可以自動(dòng)擴(kuò)展愤估,擴(kuò)展速度為0.75
private static finalint DEFAULT_CONCURRENCY_LEVEL = 16;
private static final float LOAD_FACTOR = 0.75f;
每一個(gè)節(jié)點(diǎn),掛載一個(gè)鏈表速址,當(dāng)鏈表掛載數(shù)據(jù)大于8時(shí)玩焰,鏈表自動(dòng)轉(zhuǎn)換成紅黑樹
static final int TREEIFY_THRESHOLD = 8;
部分代碼:
public class ConcurrentHashMap<K,V> extends AbstractMap<K,V>
implements ConcurrentMap<K,V>, Serializable {
transient volatile Node<K,V>[] table;
}
static class Node<K,V> implements Map.Entry<K,V> {
final int hash;
final K key;
volatile V val;
volatile Node<K,V> next;
}
static final class TreeNode<K,V> extends Node<K,V> {
TreeNode<K,V> parent; // red-black tree links
TreeNode<K,V> left;
TreeNode<K,V> right;
TreeNode<K,V> prev; // needed to unlink next upon deletion
boolean red;
}
static final class TreeBin<K,V> extends Node<K,V> {
TreeNode<K,V> root;
volatile TreeNode<K,V> first;
volatile Thread waiter;
volatile int lockState;
}
Node
ConcurrentHashMap核心內(nèi)部類,它包裝了key-value鍵值對(duì)芍锚,所有插入ConcurrentHashMap的數(shù)據(jù)都包裝在這里面昔园。
TreeNode
樹節(jié)點(diǎn)類,當(dāng)數(shù)據(jù)鏈表長(zhǎng)度大于8時(shí)并炮,會(huì)轉(zhuǎn)換為TreeNode默刚。注意判帮,此時(shí)的TreeNode并不是紅黑樹對(duì)象蹄殃,它并不是直接轉(zhuǎn)換為紅黑樹,而是把這些結(jié)點(diǎn)包裝成TreeNode放在TreeBin對(duì)象中退敦,由TreeBin完成對(duì)紅黑樹的包裝伍俘。
TreeBin
TreeNode節(jié)點(diǎn)的包裝對(duì)象邪锌,可以認(rèn)為是紅黑樹對(duì)象。它代替了TreeNode的根節(jié)點(diǎn)癌瘾,ConcurrentHashMap的node“數(shù)組”中觅丰,存放就是TreeBin對(duì)象,而不是TreeNode對(duì)象妨退。
來看下jdk8版本ConcurrentHashMap 的put操作
public V put(K key, V value) {
return putVal(key, value, false);
}
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode());
int binCount = 0;
//一個(gè)死循環(huán)妇萄,目的,并發(fā)情況下碧注,也可以保障安全添加成功
//原理:cas算法的循環(huán)比較嚣伐,直至成功
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0)
//第一次添加,先初始化node數(shù)組
tab = initTable();
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
//計(jì)算出table[i]無節(jié)點(diǎn)萍丐,創(chuàng)建節(jié)點(diǎn)
//casTabAt : 底層使用Unsafe.compareAndSwapObject 原子操作table[i]位置轩端,如果為null,則添加新建的node節(jié)點(diǎn)逝变,跳出循環(huán)基茵,反之奋构,再循環(huán)進(jìn)入執(zhí)行添加操作
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break;
}
else if ((fh = f.hash) == MOVED)
//如果當(dāng)前處于拓展?fàn)顟B(tài),返回拓展后的tab拱层,然后再進(jìn)入循環(huán)執(zhí)行添加操作
tab = helpTransfer(tab, f);
else {
//鏈表中或紅黑樹中追加節(jié)點(diǎn)
V oldVal = null;
//使用synchronized 對(duì) f 對(duì)象加鎖弥臼, 這個(gè)f = tabAt(tab, i = (n - 1) & hash) :table[i] 的node對(duì)象,并發(fā)環(huán)境保證線程操作安全
//此處注意: 這里沒有ReentrantLock根灯,因?yàn)閖dk1.8對(duì)synchronized 做了優(yōu)化径缅,其執(zhí)行性能已經(jīng)跟ReentrantLock不相上下。
synchronized (f) {
if (tabAt(tab, i) == f) {
//鏈表上追加節(jié)點(diǎn)
if (fh >= 0) {
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
//紅黑樹上追加節(jié)點(diǎn)
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
//節(jié)點(diǎn)數(shù)大于臨界值烙肺,轉(zhuǎn)換成紅黑樹
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
addCount(1L, binCount);
return null;
}
從put源碼可看纳猪,JDK8版本更多使用的cas編程方式控制線程安全, 必要時(shí)也會(huì)使用synchronized 代碼塊保證線程安全桃笙。
最后氏堤,再看會(huì)ConcurrentHashMap的get方法:
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
int h = spread(key.hashCode());
if ((tab = table) != null && (n = tab.length) > 0 &&
//獲取table[i] 的node元素
(e = tabAt(tab, (n - 1) & h)) != null) {
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
else if (eh < 0)
return (p = e.find(h, key)) != null ? p.val : null;
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}
//確保多線程可見,并且保證獲取到是內(nèi)存中最新的table[i] 元素值
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
}
get源碼也沒有加鎖操作搏明,操作原理跟jdk1.7版本一樣鼠锈,這里就不累贅了。
回到上篇中
場(chǎng)景1:多線程復(fù)合操作時(shí)是否能保證線程安全
答案是不能星著,原因: ConcurrentHashMap 使用鎖分離(jdk7)/cas(jdk8)方式保證并發(fā)環(huán)境下购笆,添加/刪除操作安全,但這進(jìn)針對(duì)的是單個(gè)put 或者 remove方法强饮,如果多個(gè)方法配合復(fù)合使用由桌,依然需要額外加鎖为黎。
場(chǎng)景2:多線程同時(shí)添加相同hash 碼值時(shí)轉(zhuǎn)換成紅黑樹時(shí)邮丰,是否存在并發(fā)問題
答案是可以保證線程安全,原因:ConcurrentHashMap 鏈表轉(zhuǎn)換成紅黑樹時(shí)铭乾,對(duì)轉(zhuǎn)換方法做加鎖防護(hù)了
private final void treeifyBin(Node<K,V>[] tab, int index) {
Node<K,V> b; int n, sc;
if (tab != null) {
if ((n = tab.length) < MIN_TREEIFY_CAPACITY)
tryPresize(n << 1);
else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
synchronized (b) {
if (tabAt(tab, index) == b) {
TreeNode<K,V> hd = null, tl = null;
for (Node<K,V> e = b; e != null; e = e.next) {
TreeNode<K,V> p =
new TreeNode<K,V>(e.hash, e.key, e.val,
null, null);
if ((p.prev = tl) == null)
hd = p;
else
tl.next = p;
tl = p;
}
setTabAt(tab, index, new TreeBin<K,V>(hd));
}
}
}
}
}
最后對(duì)ConcurrentHashMap 來個(gè)大總結(jié):
1剪廉、get方法不加鎖;
2炕檩、put斗蒋、remove方法要使用鎖
jdk7使用鎖分離機(jī)制(Segment分段加鎖)
jdk8使用cas + synchronized 實(shí)現(xiàn)鎖操作
3、Iterator對(duì)象的使用笛质,運(yùn)行一邊更新泉沾,一遍遍歷(可以根據(jù)原理自己拓展)
4、復(fù)合操作妇押,無法保證線程安全跷究,需要額外加鎖保證
5、并發(fā)環(huán)境下敲霍,ConcurrentHashMap 效率較Collections.synchronizedMap()更高
想獲取更多技術(shù)視頻俊马,請(qǐng)前往叩丁狼官網(wǎng):http://www.wolfcode.cn/openClassWeb_listDetail.html