原創(chuàng)文章弛作,轉(zhuǎn)載請標(biāo)注出處:《Java并發(fā)編程系列-ConcurrentHashMap 1.8》
一、概述
ConcurrentHashMap是HashMap的線程安全版本洪己,當(dāng)我們在多線程并發(fā)環(huán)境中編程時使用ConcurrentHashMap來代替HashMap务冕。
ConcurrentHashMap底層結(jié)構(gòu)和實現(xiàn)原理基本與HashMap雷同拇囊,只是增加了針對并發(fā)的處理。
ConcurrentHashMap通過對桶位數(shù)組值加鎖的方式來保證并發(fā)下的操作安全性覆享。注意這里不是對桶位加鎖佳遂,而是對桶位上的元素進行加鎖呀潭。
二量没、常量變量解析
public class ConcurrentHashMap<K,V> extends AbstractMap<K,V>
implements ConcurrentMap<K,V>, Serializable {
//...
// 桶數(shù)組的最大容量(2的30次方)
private static final int MAXIMUM_CAPACITY = 1 << 30;
// 桶數(shù)組的默認(rèn)容量為16(2的4次方)
private static final int DEFAULT_CAPACITY = 16;
static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;//
// 默認(rèn)的并發(fā)等級,這個值一般等于桶數(shù)組容量杰扫,這個并發(fā)等級,其實就是可以同時支持的最大并發(fā)量
private static final int DEFAULT_CONCURRENCY_LEVEL = 16;
// 負(fù)載因子巍糯,一般不改動
private static final float LOAD_FACTOR = 0.75f;
// 樹形化閾值啸驯,鏈表元素達到8個就嘗試執(zhí)行樹形化
static final int TREEIFY_THRESHOLD = 8;
// 樹退化閾值,樹在擴容時分拆后樹容量達到6時執(zhí)行退化操作祟峦,轉(zhuǎn)化為單向鏈表
static final int UNTREEIFY_THRESHOLD = 6;
// 樹形化容量閾值罚斗,只有在桶數(shù)組容量達到64之后才能執(zhí)行樹形化操作,否則會執(zhí)行擴容
static final int MIN_TREEIFY_CAPACITY = 64;
// 數(shù)據(jù)遷移的最短步長宅楞,也就是分配給每個線程的遷移的區(qū)間最小值為16
private static final int MIN_TRANSFER_STRIDE = 16;
// 用于生成擴容戳記sizeCtr的一個基礎(chǔ)量
private static int RESIZE_STAMP_BITS = 16;
// 輔助擴容的線程的最大數(shù)量针姿。1111111111111111 共16個1,整數(shù)是65535
private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;
private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;//
// 表示正在進行元素遷移
static final int MOVED = -1;
// 表示樹形化已完成
static final int TREEBIN = -2;
static final int RESERVED = -3; // hash for transient reservations
// 正常節(jié)點的hash值的可用位(共32位厌衙,除首位外均可用)
static final int HASH_BITS = 0x7fffffff;
// 當(dāng)前服務(wù)器的CPU核心數(shù)
static final int NCPU = Runtime.getRuntime().availableProcessors();
// 這一部分主要是為了向前兼容
private static final ObjectStreamField[] serialPersistentFields = {
new ObjectStreamField("segments", Segment[].class),
new ObjectStreamField("segmentMask", Integer.TYPE),
new ObjectStreamField("segmentShift", Integer.TYPE)
};
// 以下幾個變量屬于公共變量全部加了volatile
// volatile可以保證操作的有序性和可見性距淫,無法保證操作的原子性
transient volatile Node<K,V>[] table;// 桶數(shù)組
private transient volatile Node<K,V>[] nextTable;// 擴容時的新桶數(shù)組
private transient volatile long baseCount;//
// 容量控制器,用途很多婶希,一般用于在改變桶數(shù)組容量時作為CAS鎖榕暇。
private transient volatile int sizeCtl;
private transient volatile int transferIndex;//
private transient volatile int cellsBusy;//
private transient volatile CounterCell[] counterCells;//
// 鍵集合、值集合喻杈、Entry實體集合的視圖緩存彤枢,用于快速訪問
private transient KeySetView<K,V> keySet;// 鍵集合緩存
private transient ValuesView<K,V> values;// 值集合緩存
private transient EntrySetView<K,V> entrySet;// 鍵值對集合緩存
// 以下幾個字段都是final的,一旦賦值就不變了筒饰,其賦值就在下面的靜態(tài)塊中
// 這幾個字段保存的是對應(yīng)字段在內(nèi)存中的偏移量缴啡,這個偏移量主要用于CAS操作的時候
private static final sun.misc.Unsafe U;
private static final long SIZECTL;
private static final long TRANSFERINDEX;
private static final long BASECOUNT;
private static final long CELLSBUSY;
private static final long CELLVALUE;
private static final long ABASE;
private static final int ASHIFT;
//...
}
三、靜態(tài)塊解析
3.1 描述
具體見源碼
3.2 源碼
public class ConcurrentHashMap<K,V> extends AbstractMap<K,V>
implements ConcurrentMap<K,V>, Serializable {
//...
static {
try {
// 首先獲取Unsafe實例瓷们,也只有源碼可以以這種方式獲取實例业栅,并沒有對用戶開放
// 很明顯并不推薦用戶自行使用,但是使用反射還是可以獲取到Unsafe實例的
U = sun.misc.Unsafe.getUnsafe();
Class<?> k = ConcurrentHashMap.class;
// 獲取ConcurrentHashMap實例的字段sizeCtl的內(nèi)存偏移量
SIZECTL = U.objectFieldOffset
(k.getDeclaredField("sizeCtl"));
// 獲取ConcurrentHashMap實例的字段transferIndex的內(nèi)存偏移量
TRANSFERINDEX = U.objectFieldOffset
(k.getDeclaredField("transferIndex"));
// 獲取ConcurrentHashMap實例的字段baseCount的內(nèi)存偏移量
BASECOUNT = U.objectFieldOffset
(k.getDeclaredField("baseCount"));
// 獲取ConcurrentHashMap實例的字段cellsBusy的內(nèi)存偏移量
CELLSBUSY = U.objectFieldOffset
(k.getDeclaredField("cellsBusy"));
Class<?> ck = CounterCell.class;
// 獲取CounterCell的實例字段value的內(nèi)存偏移量
CELLVALUE = U.objectFieldOffset
(ck.getDeclaredField("value"));
Class<?> ak = Node[].class;
ABASE = U.arrayBaseOffset(ak);
int scale = U.arrayIndexScale(ak);
if ((scale & (scale - 1)) != 0)
throw new Error("data type scale not a power of two");
ASHIFT = 31 - Integer.numberOfLeadingZeros(scale);
} catch (Exception e) {
throw new Error(e);
}
}
//...
}
四谬晕、構(gòu)造器解析
4.1 構(gòu)造器描述
無參構(gòu)造器初始容量采用默認(rèn)的初始容量16碘裕,負(fù)載因子為默認(rèn)的0.75,一旦使用帶參數(shù)的構(gòu)造器自定義了容量或負(fù)載因子攒钳、并發(fā)級別等參數(shù)帮孔,那么就會根據(jù)給定的值進行內(nèi)部換算,得出最優(yōu)的初始容量值夕玩。
集合的實際初始容量和參數(shù)指定的容量一般不同你弦,而是根據(jù)一定的規(guī)則計算出來的。有兩種計算方法燎孟,分別對應(yīng)2號和5號構(gòu)造器中的算法禽作。
2號構(gòu)造器中計算方法類似HashMap中方式,只是在進行二進制轉(zhuǎn)換(調(diào)用tableSizeFor方法)之前還需要經(jīng)過一些基礎(chǔ)計算:給定容量*1.5+1揩页。
5號構(gòu)造器中計算方法也類似HashMap,同樣需要在進行二進制轉(zhuǎn)換之前進行一些計算:給定容量/負(fù)載因子+1
4.2 源碼解析
public class ConcurrentHashMap<K,V> extends AbstractMap<K,V>
implements ConcurrentMap<K,V>, Serializable {
//...
// 1-無參構(gòu)造器
public ConcurrentHashMap() {
}
// 2-指定初始容量的構(gòu)造器
public ConcurrentHashMap(int initialCapacity) {
if (initialCapacity < 0)
throw new IllegalArgumentException();
int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
MAXIMUM_CAPACITY :
tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
this.sizeCtl = cap;
}
// 3-指定Map映射集的構(gòu)造器旷偿,等于將給定的Map集合改造為線程安全的集合
public ConcurrentHashMap(Map<? extends K, ? extends V> m) {
this.sizeCtl = DEFAULT_CAPACITY;
putAll(m);
}
// 4-指定初始容量和負(fù)載因子
// 負(fù)載因子一般最好使用默認(rèn)的0.75,這是一個通過檢驗的空間與時間消耗的折中值
public ConcurrentHashMap(int initialCapacity, float loadFactor) {
this(initialCapacity, loadFactor, 1);
}
// 5-指定初始容量、負(fù)載因子萍程、并發(fā)等級
// 這個并發(fā)等級幢妄,其實就是可以同時支持的最大并發(fā)量,ConcurrentHashMap采用在
// 數(shù)組位元素加鎖的方式來防止并發(fā)茫负,這種加鎖方式保證針對不同數(shù)組位的操作是可以
// 同時進行的蕉鸳,不存在線程不安全情況,那么也就是說可以同時支持最多數(shù)組容量個線
// 程并發(fā)執(zhí)行忍法,如果給定的并發(fā)等級大于初始容量潮尝,必然導(dǎo)致出錯,必須將容量設(shè)置
// 成大于等于并發(fā)等級的值
public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
if (initialCapacity < concurrencyLevel) // Use at least as many bins
initialCapacity = concurrencyLevel; // as estimated threads
long size = (long)(1.0 + (long)initialCapacity / loadFactor);
int cap = (size >= (long)MAXIMUM_CAPACITY) ?
MAXIMUM_CAPACITY : tableSizeFor((int)size);
this.sizeCtl = cap;//sizeCtl初始化為與容量值一樣
}
//...
}
五饿序、功能解析
5.1 添加元素操作
5.1.1 功能描述
ConcurrentHashMap添加新元素與HashMap添加新元素的整體流程是相似的勉失,只是多了針對多線程的處理,同時在hash算法上也做了修改原探。
5.1.2 源碼解析
public class ConcurrentHashMap<K,V> extends AbstractMap<K,V> implements ConcurrentMap<K,V>, Serializable {
//...
public V put(K key, V value) {
return putVal(key, value, false);
}
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode());// 首先通過特定hash算法得出key的hash值
int binCount = 0;// 鏈表長度
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0)
// 首次添加元素時乱凿,先進行桶數(shù)組初始化操作
tab = initTable();
// 原子的獲取下標(biāo)i處的節(jié)點元素
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
// 如果桶位下標(biāo)i處沒有元素,則直接將新元素置于該桶下標(biāo)位
// tab表示要操作的數(shù)組咽弦,i為要操作的數(shù)組下標(biāo)徒蟆,null為原來的下標(biāo)位元素,
// 最后一個參數(shù)為新的下標(biāo)位元素离唬,這里有種樂觀鎖的概念
// 為防止多線程插值導(dǎo)致問題后专,這里采用CAS來進行原子插入操作
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
// 如果指定桶位下標(biāo)存在元素且其hash值為-1划鸽,則表示有線程正在進行擴容-元素遷移输莺,
}else if ((fh = f.hash) == MOVED)
// 進行輔助遷移,擴容完成之后裸诽,還要繼續(xù)進行元素的添加操作
tab = helpTransfer(tab, f);
else {
// 針對桶位存在鏈表或者樹的情況
V oldVal = null;
// 這里對f加鎖嫂用,f是桶位元素,鏈表頭元素丈冬、樹根元素
synchronized (f) {
// 二次校驗桶位元素還是不是f
if (tabAt(tab, i) == f) {
// 如果hash值大于等于0嘱函,則說明是鏈表
if (fh >= 0) {
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
// 找到相同的key則執(zhí)行更新value操作
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
// 尾插法插入新元素到鏈表尾部
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
// 針對紅黑樹結(jié)構(gòu)
}else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
// 執(zhí)行紅黑樹新增節(jié)點操作
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
// 如果鏈表長度達到樹化閾值8,執(zhí)行鏈表樹化操作
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
// 元素添加完成之后的操作埂蕊,增加元素數(shù)量
addCount(1L, binCount);
return null;
}
// 0x7fffffff的二進制為:0111 1111 1111 1111 1111 1111 1111 1111往弓,除第一位為0全為1
// 這里表示為1的位置為可用位,因為1的與操作擁有保留原值的功能蓄氧。
static final int HASH_BITS = 0x7fffffff; // usable bits of normal node hash
// 對key的hashCode值進行高低位相異或函似,這個與HashMap一樣
// 然后在和0x7fffffff(首位為0,其余為1,共32位)相與,1具有保留原值效果喉童,
// 所以其實最后結(jié)果并未變動撇寞。即使是容量的最大值1<<<30,它也空出了首位只用了后31位
// 相較HashMap的hash算法,多了一步與操作
static final int spread(int h) {
return (h ^ (h >>> 16)) & HASH_BITS;
}
// addCount方法的主要功能就是多線程更新容器元素個數(shù)baseCount
private final void addCount(long x, int check) {
CounterCell[] as; long b, s;
// 原子更新baseCount的值
if ((as = counterCells) != null ||
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
CounterCell a; long v; int m;
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
!(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
fullAddCount(x, uncontended);
return;
}
// 如果check小于等于1,那么僅僅校驗是否存在競爭蔑担,即是否存在多個線程
if (check <= 1)
return;
s = sumCount();
}
// 如果check大于等于0牌废,則需要校驗是否需要進行擴容
if (check >= 0) {
Node<K,V>[] tab, nt; int n, sc;
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
int rs = resizeStamp(n);
// sc是sizeCtl的值,如果其小于0啤握,那么桶數(shù)組要么正在初始化鸟缕,要么正在擴容,
// 在上面排除了table為null的情況排抬,那么這里只能是在進行擴容叁扫,然后當(dāng)前線程開始參與擴容
if (sc < 0) {
// 如果所有區(qū)段都已經(jīng)分配完畢,則不再對新的線程進行擴容分配
// 條件1:(sc >>> RESIZE_STAMP_SHIFT) != rs ==>
// sizeCtr在擴容初始值是(rs << RESIZE_STAMP_SHIFT) + 2)畜埋,
// 而在分配多線程進行擴容的時候莫绣,又會對sizeCtr進行操作,每多一個線程參與擴容悠鞍,
// sizeCtr就是加1对室,每有一個線程完成擴容任務(wù)退出擴容隊列時sizeCtr又會減1,
// 當(dāng)所有的線程都完成擴容之后咖祭,sizeCtr的值又恢復(fù)成初始值了
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
// 當(dāng)前線程參與擴容遷移操作
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
// 否則sc大于等于0掩宜,這是sc其實是當(dāng)前的擴容閾值。
// 嘗試將sizeCtl的值更新為(rs << RESIZE_STAMP_SHIFT) + 2)么翰,
// 更新成功的話牺汤,則由當(dāng)前線程開啟擴容操作,這里sizeCtl的值為一個負(fù)值
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
s = sumCount();
}
}
}
final long sumCount() {
CounterCell[] as = counterCells; CounterCell a;
long sum = baseCount;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}
static final int resizeStamp(int n) {
return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1));
}
//...
}
5.2 初始化操作
5.2.1 功能描述
與HashMap一樣浩嫌,ConcurrentHashMap的桶數(shù)組初始化也是在首次添加元素的時候完成的檐迟,但是后者比前者多了一個控制參數(shù)sizeCtl。
控制參數(shù)sizeCtl在這里的作用相當(dāng)于鎖码耐,只有獲得鎖的線程才能執(zhí)行桶數(shù)組的初始化追迟,其他線程只能將自己掛起,并不是阻塞骚腥,而是從運行狀態(tài)變成可運行狀態(tài)(Thread.yield())敦间。
sizeCtl默認(rèn)為0,如果構(gòu)造器指定了容量則為實際容量束铭,執(zhí)行初始化時會被修改為-1廓块,初始化完成之后會被設(shè)置為擴容閾值,即容量的0.75(亦即容量*負(fù)載因子)注意們這里計算sizeCtl的時候是寫死的契沫,即使負(fù)載因子被自定為其他值带猴,這里也還是0.75
5.2.2 源碼解析
public class ConcurrentHashMap<K,V> extends AbstractMap<K,V>
implements ConcurrentMap<K,V>, Serializable {
//...
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {
if ((sc = sizeCtl) < 0)
// 其他線程正在進行桶數(shù)組初始化,sc會被設(shè)置為-1埠褪,本線程坐等初始化完成浓利。
Thread.yield(); // lost initialization race; just spin
// 否則嘗試將sizeCtl的值原子的更新為-1挤庇,然后進行初始化操作
// 當(dāng)有多個線程同時到達這里的時候,為避免重復(fù)初始化桶數(shù)組贷掖,這里才進行原子更新嫡秕,
// 只有搶到鎖的線程才能執(zhí)行桶數(shù)組初始化,而其他線程因為原子更新失敗而開啟下一循環(huán)苹威,
// 卻發(fā)現(xiàn)sc<0昆咽,線程停止執(zhí)行,變成可運行狀態(tài)牙甫,等待CPU分配時間進行執(zhí)行掷酗。
// 當(dāng)我們用無參構(gòu)造器創(chuàng)建ConcurrentHashMap實例時,sizeCtl應(yīng)為是0窟哺,
// 當(dāng)我們指定容量或者負(fù)載因子時泻轰,sizeCtl為計算出的實際容量值
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
// 如果sc==0,那么是使用無參構(gòu)造器創(chuàng)建的ConcurrentHashMap且轨,
// 這時需要使用默認(rèn)容量16,否則就要使用通過給定容量計算出來的實際容量sc
if ((tab = table) == null || tab.length == 0) {
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
// 最后將sc設(shè)置為n的四分之三值即0.75倍的容量浮声,這相當(dāng)于擴容閾值
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
break;
}
}
return tab;
}
//...
}
5.3 擴容操作
5.3.1 功能描述
5.3.1.1 引發(fā)擴容的因素
- 容器某一個桶位的元素數(shù)量達到8個但是容器總元素數(shù)量不足64個的情況,優(yōu)先觸發(fā)擴容操作 旋奢。
- 容器新增元素之后泳挥,元素數(shù)量達到擴容閾值的情況下,要進行擴容至朗。
正是上述兩處位置會觸發(fā)擴容屉符,正對應(yīng)于tryPresize方法和addCount方法。而這兩個方法里面擁有幾乎相同的邏輯來觸發(fā)擴容(transfer(tab, null))锹引,第二個參數(shù)傳null矗钟,代表當(dāng)前線程是首個觸發(fā)擴容的線程,如果第二個參數(shù)不是null粤蝎,則當(dāng)前線程為輔助擴容線程真仲。
5.3.1.2 基礎(chǔ)準(zhǔn)備
- 準(zhǔn)備1:擴容是由sizeCtr來控制的袋马,在擴容的過程中初澎,sizeCtr代表擴容的線程數(shù)量。
- 準(zhǔn)備2:rs和sc多次出現(xiàn)虑凛,其中rs是resizeStamp(n)的值碑宴,是一個極大的負(fù)值,擴容開啟時初始化sizeCtr的時候就是使用這個負(fù)值左移16位后再加上2的結(jié)果桑谍,而sc指的就是sizeCtr的當(dāng)前值延柠。
- 準(zhǔn)備3:ForwardingNode是一個臨時節(jié)點,表示的是完成遷移的桶數(shù)組位節(jié)點
- 準(zhǔn)備4:advanced值表示的是區(qū)間分配的循環(huán)條件锣披,當(dāng)advanced為false則結(jié)束循環(huán)贞间,結(jié)束循環(huán)的因素有很多贿条,包括,當(dāng)期區(qū)段未轉(zhuǎn)移完成或者所有元素均轉(zhuǎn)移完成增热,否則所有元素的遷移都分配完成整以,否則完成了一次新的區(qū)間分配
- 準(zhǔn)備5:transferIndex是一個公共變量,保存的是多個線程分配區(qū)間后剩余未分配區(qū)間的最大數(shù)組位(倒序分配)峻仇,只要它大于0公黑,則還有待分配的區(qū)間需要進行元素轉(zhuǎn)移,這個區(qū)間可能分配給新加入的線程摄咆,也可能分配給完成了一次區(qū)間元素遷移的老線程
- 準(zhǔn)備6:stride是區(qū)間分配步長:單核CPU的情況下,stride直接取舊數(shù)組長度n(也就是只分一個區(qū)間)凡蚜,如果n比16還小,則stride值設(shè)為16吭从,多核CPU情況下朝蜘,stride需要通過計算(n >>> 3) / NCPU(就將n除以8再除以CPU數(shù)量的結(jié)果),如果得出的結(jié)果小于16,那么還是以16為步長涩金,也就是說芹务,stride最小值為16
5.3.1.3 擴容邏輯
當(dāng)?shù)谝粋€線程發(fā)現(xiàn)容器需要擴容,首先會CAS原子更新sizeCtr的值為(rs << RESIZE_STAMP_SHIFT) + 2)鸭廷,這是一個負(fù)值枣抱,然后調(diào)用transfer(tab,null)方法開啟擴容流程辆床,由于該線程屬于觸發(fā)擴容的第一個線程佳晶,新桶數(shù)組并未創(chuàng)建,所以此處傳null讼载。
進入到transfer方法之后轿秧,開始進行元素遷移操作,由于是開啟擴容的線程咨堤,nextTab=null菇篡,所以需要先進行新桶數(shù)組的創(chuàng)建,如果是輔助遷移的線程一喘,那么nextTab已經(jīng)存在驱还,直接傳值即可(其為公共變量)
新桶數(shù)組
創(chuàng)建新桶數(shù)組最主要的就是確認(rèn)桶數(shù)組的容量,這里新桶數(shù)組直接就是舊桶數(shù)組容量的2倍凸克,如果創(chuàng)建失敗议蟆,則將sizeCtr置為Integer的最大值,將sizeCtr置為最大值后將不會再觸發(fā)擴容萎战。
然后咐容,為當(dāng)前線程分配區(qū)間,分配區(qū)間的原理其實就是以bound來劃分?jǐn)?shù)組區(qū)段蚂维,bound指的是當(dāng)前分配區(qū)間的最小桶位下標(biāo),transferIndex指向的是剩余未分配區(qū)間的最大位+1戳粒,其實和最后一次區(qū)間分配的線程的bound值一樣路狮,i指的是當(dāng)前分配區(qū)間的最大桶位下標(biāo),如此一來i和bound就將每個線程的區(qū)間給固定了,通過--i來倒序循環(huán)遷移每一位的元素蔚约,而transferIndex面對的是所有的線程览祖,屬于公共變量,每個線程分配區(qū)間時炊琉,都會更新transferIndex的值展蒂。transferIndex的值逐漸減小,直到變成0代表舊桶數(shù)組的所有位均被分配完畢苔咪。
區(qū)間分配好之后锰悼,然后就是針對區(qū)間內(nèi)的元素進行遷移了,ConcurrentHashMap采用的是倒序遷移(--i):
遷移邏輯
遷移的情況分為兩種团赏,一種是針對單向鏈表箕般,另一種是針對紅黑樹
單向鏈表:類似于HashMap,將長鏈表分拆成為兩個小鏈表,分別遷移到新數(shù)組的原位(低位)與原位+舊桶數(shù)組容量n(高位)的位置舔清。
紅黑樹:也類似于HashMap,將大紅黑樹拆成兩個小樹丝里,如果小樹中元素數(shù)量達到6個以下,則將其退化為單項鏈表体谒,然后再將兩個小樹(或鏈表)分別遷移到新數(shù)組的原位(低位)和原位+舊桶數(shù)組容量n(高位)的位置杯聚。具體分拆邏輯可以參照源碼注釋
i會逐漸減小,直到其等于bound為止抒痒,代表當(dāng)前區(qū)間的元素遷移完畢(或者說當(dāng)前線程的前一任務(wù)完畢)幌绍,這時候要檢查是否還有剩余的區(qū)間需要分配,如果transferIndex>0,則還有未分配的區(qū)間故响,那么為當(dāng)前線程再分配一段區(qū)間傀广,如果transferIndex<=0,則所有區(qū)間均已分配完畢彩届,中斷當(dāng)前線程伪冰。
等到所有的擴容線程均完成了自己所分配的遷移任務(wù)之后,參與擴容的線程逐步減少樟蠕,每減少一個線程贮聂,sizeCtr就會減1,最后一個線程完成擴容之后坯墨,最后sizeCtr恢復(fù)了擴容前的初始值寂汇,然后將finishing置為true,i置為n捣染。
然后執(zhí)行擴容后結(jié)束操作,主要內(nèi)容就是置空nextTable變量停巷,表示退出擴容期耍攘,重置table和sizeCtl榕栏,sizeCtl在非擴容期間為擴容閾值(桶數(shù)組容量*0.75)
輔助遷移
所謂的輔助遷移就是添加元素的線程發(fā)現(xiàn)容器正處于擴容期間,則暫停添加操作蕾各,先輔助擴容扒磁,待擴容完成,再執(zhí)行添加操作式曲。
5.3.2 源碼解析
public class ConcurrentHashMap<K,V> extends AbstractMap<K,V>
implements ConcurrentMap<K,V>, Serializable {
//...
// 擴容操作
private final void tryPresize(int size) {
// c表示擴容后的實際容量妨托,當(dāng)給定的加倍的容量size大于等于最大容量的1/2,
// 直接將實際容量設(shè)置為最大容量吝羞,否則通過之前的公式計算實際容量
int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :
tableSizeFor(size + (size >>> 1) + 1);
int sc;
while ((sc = sizeCtl) >= 0) {
Node<K,V>[] tab = table; int n;
// 如果桶數(shù)組還未初始化兰伤,則初始化桶數(shù)組
if (tab == null || (n = tab.length) == 0) {
n = (sc > c) ? sc : c;// n取sc和c中的較大值
// 然后將sizeCtl設(shè)置為-1,進行桶數(shù)組初始化
if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
if (table == tab) {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = nt;
sc = n - (n >>> 2);// 計算sc=當(dāng)前桶數(shù)組容量*0.75
}
} finally {
// 最后將sizeCtl置為擴容閾值(默認(rèn)的)
sizeCtl = sc;
}
}
}
else if (c <= sc || n >= MAXIMUM_CAPACITY)
// 如果計算得到的c(新桶數(shù)組容量)還沒有sc(舊桶數(shù)組容量)大钧排,或者舊桶數(shù)組
// 的容量就已經(jīng)達到或超過桶數(shù)組允許的最大值的話敦腔,這里不再進行擴容操作
break;
// 只有真正的擴容時才會執(zhí)行以下操作
else if (tab == table) {
int rs = resizeStamp(n);// 我們只要知道這里得到的rs是一個負(fù)值
// sc何時小于0:
// 1-初始化桶數(shù)組的時候值為-1
// 2-當(dāng)正在進行擴容的時候,sc的值小于0
if (sc < 0) {
// 這里的sc小于0,表示已經(jīng)開始擴容恨溜,這種情況下符衔,
// 在還存在待擴容區(qū)間的情況下,當(dāng)前線程也加入擴容行列
Node<K,V>[] nt;
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
// 這種情況是在不存在擴容區(qū)間的情況下糟袁,不再讓新線程加入擴容行列判族,直接中斷
break;
// sizeCtl+1表示增加一個擴容線程,這里調(diào)用transfer時必然存在nexttable
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
// 否則的話當(dāng)前線程就是第一個發(fā)起擴容的線程项戴,表示nextTable還不存在五嫂,
// 所以這里講sizeCtl初始化為(rs << RESIZE_STAMP_SHIFT) + 2),
// 并且調(diào)用transfer的時候nextTable傳null
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
}
}
}
// 擴容后元素遷移操作
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
// n表示舊數(shù)組的長度肯尺,stride為處理步長
int n = tab.length, stride;
// 單線程情況下步長stride=n沃缘,
// 多線程情況下,stride=(n >>> 3) / NCPU.(即:n/8/cpu數(shù)量)则吟,這個值最小為16
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
// 發(fā)起遷移的第一個線程調(diào)用該方法時槐臀,nextTab會傳null,
// 剩余不會氓仲,針對首次傳null的情況水慨,這里進行處理:新建擴容數(shù)組
if (nextTab == null) { // initiating
try {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];// 數(shù)組加倍擴容
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
// transferIndex為遷移位置的控制量,起初其值為桶數(shù)組最高位置下標(biāo)+1(即數(shù)組size)
transferIndex = n;
}
int nextn = nextTab.length;// 新數(shù)組的長度
// 創(chuàng)建一個標(biāo)記節(jié)點fwd敬扛,將hash置為-1(MOVED)晰洒,表示的是處理完成的節(jié)點,后面要用。
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
boolean advance = true;// advance表示的是是否結(jié)束區(qū)間分配循環(huán)啥箭,false表示結(jié)束循環(huán)
boolean finishing = false; // to ensure sweep before committing nextTab
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
// 下面的這個while循環(huán)的作用就是劃分區(qū)域谍珊,其中i指向transferIndex,
// bound指向transferIndex-stride,那么這兩個值就代表了一個小區(qū)間急侥,
// 這個小區(qū)間就是當(dāng)前線程需要進行處理遷移的區(qū)間
while (advance) {
int nextIndex, nextBound;
// 1-bound指向的是舊桶數(shù)組的一個下標(biāo)位砌滞,它和i共同決定了一個區(qū)間侮邀,
// 區(qū)間大小為stride(最小為16)且,指向高位贝润,bound指向低位
// 2-如果當(dāng)前線程的區(qū)間處理完畢绊茧,而還未完成所有元素遷移的情況下,則再次進行區(qū)間分配打掘,
// 如果所有元素都遷移完畢f(xié)inishing=true华畏,則結(jié)束區(qū)間分配
// 3-這里的--i是控制外層for循環(huán)的條件,表示逐個桶位元素的遷移尊蚁,從后到前
if (--i >= bound || finishing)
// 如果--i >= bound亡笑,那么結(jié)束while循環(huán),開始--i桶位元素遷移操作
// --i >= bound表示區(qū)間從后向前的下一位未到達邊界bound枝誊,還可以繼續(xù)進行元素遷移
// finishing表示舊桶數(shù)組所有元素均遷移完畢的情況
advance = false;
else if ((nextIndex = transferIndex) <= 0) {
// transferIndex是公共變量况芒,如果它小于等于0,則表示遷移工作已全部完成叶撒,
// 結(jié)束內(nèi)部循環(huán)并將i置為-1绝骚,下一步走下面的結(jié)束邏輯
i = -1;
advance = false;
}
// TRANSFERINDEX位置保存的值是上一線程分區(qū)的bound值,
// 即上一個區(qū)間的bound值祠够,bound值要比i值小stride
// 這里是真正的區(qū)間分配的邏輯压汪,分配完成advance置為false,while循環(huán)得以結(jié)束
// 這里有個三元操作符古瓤,作用是當(dāng)剩余的元素不足16時止剖,直接將剩余全部元素作為一個
// 區(qū)間分配給當(dāng)前線程,將transferIndex更新為0
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
// 當(dāng)舊桶數(shù)組的所有節(jié)點的元素都遷移完畢之后落君,進行如下結(jié)束邏輯
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
if (finishing) {
// 擴容結(jié)束的具體邏輯:nextTable置null穿香,將新的桶數(shù)組作為
// 當(dāng)前桶數(shù)組table,再計算新的閾值,賦值給sizeCtr绎速,為桶數(shù)組容量*0.75
nextTable = null;
table = nextTab;
sizeCtl = (n << 1) - (n >>> 1);// 0.75*n
return;
}
// 原子更新sizeCtl皮获,減1,表示當(dāng)前線程已完成遷移任務(wù)纹冤,表示進行遷移的線程數(shù)量減少了一個
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
// 遷移元素的線程逐個減少洒宝,這里sizeCtr的值逐漸減小,直到其值等于之前賦的初始值
// (resizeStamp(n) << RESIZE_STAMP_SHIFT-2)的時候萌京,表示所有擴容的線程都完
// 成了擴容雁歌,亦即擴容結(jié)束,隨將finishing置為true知残,將i=n靠瞎,然后再次for循環(huán)的時
// 候進入while循環(huán)后,因為finishing=true,第一個判斷就結(jié)束了较坛,然后再次進入結(jié)束
// 邏輯印蔗,因為i=n而進入邏輯扒最,因為finishing=true而執(zhí)行擴容結(jié)束的具體邏輯
// 所有擴容線程中只有最后一個完成擴容的線程可以保留來進行擴容結(jié)束的邏輯丑勤,之前的
// 所有線程都在下面的這個return結(jié)束了
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
finishing = advance = true;
i = n; // recheck before commit
}
}
else if ((f = tabAt(tab, i)) == null)
// 每一個舊桶位的元素遷移完成后,都需要將舊桶數(shù)組位置null吧趣,
// 這里找到這個null桶位之后法竞,將其更新為ForwardingNode節(jié)點,表示已完成遷移
advance = casTabAt(tab, i, null, fwd);
else if ((fh = f.hash) == MOVED)
// 如果當(dāng)前節(jié)點是ForwardingNode節(jié)點强挫,則說明該節(jié)點數(shù)據(jù)已遷移完畢岔霸,不再進行處理,跳過俯渤。
advance = true; // already processed(已處理)
else {
synchronized (f) {
if (tabAt(tab, i) == f) {
Node<K,V> ln, hn;
if (fh >= 0) {
// 鏈表元素遷移
// 遷移邏輯:先將原鏈表分拆成兩個小鏈表呆细,然后將兩個小鏈表分別置于
// 新桶數(shù)組的對應(yīng)桶位,然后將原桶位置為fwd節(jié)點表示遷移結(jié)束八匠。
// 分拆原理:n為舊桶容量絮爷,我們知道所有的桶容量就是2的次冪,二進制表示
// 的話就是1個1梨树,N個0的二進制值坑夯,將fh(首結(jié)點的hash值)與n進行與操作,
// 結(jié)果只會有兩種抡四,一種就是0柜蜈,一種就是n,分別對應(yīng)新桶數(shù)組的原桶位
// (新桶低位)和原桶位+舊桶容量位(新桶高位)指巡,分別用ln和hn表示
int runBit = fh & n;// n為舊桶容量
Node<K,V> lastRun = f;
// 下面這個循環(huán)的目的是找出鏈表最后一個“與結(jié)果”變化的鏈表節(jié)點lastRun淑履,
// runBit表示最后的變化的“與結(jié)果”,可能是0可能是n藻雪,如果為0秘噪,則將最后這
// 一變化節(jié)點賦值給ln表示低位,否則賦值給hn表示高位阔涉,這樣該節(jié)點之后的
// 節(jié)點將不用再參與后面的分拆缆娃,因為它們的與結(jié)果與lastRun的一樣,要么全
// 是0要么全是n瑰排。所以下面的這個for循環(huán)和if-else其實是為了簡化分拆鏈表
// 的操作贯要,當(dāng)然如果入到極端情況,最后還是發(fā)生變化的情況椭住,那么分拆的時候
// 還是需要全部判斷的崇渗。
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
// 組裝兩個新的小鏈表
// 組裝鏈表的時候使用的構(gòu)造器最后一個參數(shù)是當(dāng)前節(jié)點的后置節(jié)點,
// 之前找出的ln和hn正好用于此處。
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
// 最后將組裝好的小鏈表定位到新數(shù)組的指定位置宅广,
// 再將原數(shù)組的指定位置置為fwd表示遷移完畢葫掉。
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
else if (f instanceof TreeBin) {
// 紅黑樹元素遷移
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
// 下面的if-else其實是在組裝高位與低位的兩個雙向鏈表
// 如果當(dāng)前節(jié)點的hash值h與n的與結(jié)果為0,則當(dāng)前節(jié)點位于低位
if ((h & n) == 0) {// 低位
if ((p.prev = loTail) == null)
// 針對第一個與結(jié)果為0的節(jié)點跟狱,將其置為低位首節(jié)點
lo = p;// lo代表低位首節(jié)點
else
// 針對非首個與結(jié)果為0的節(jié)點進行處理
loTail.next = p;
loTail = p;// 循環(huán)的推進器
++lc;// 低位容量
}
else {// 高位
if ((p.prev = hiTail) == null)
// 針對第一個與結(jié)果為1的節(jié)點俭厚,將其置為高位首節(jié)點
hi = p;// hi代表高位首節(jié)點
else
// 針對非首個與結(jié)果為1的節(jié)點進行處理
hiTail.next = p;
hiTail = p;// 循環(huán)的推進器
++hc;// 高位容量
}
}
// 將準(zhǔn)備好的兩個雙向鏈表進行樹化,當(dāng)然如果該位節(jié)點數(shù)量為6個及以下驶臊,
// 則結(jié)構(gòu)會退化為單向鏈表
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
// 最后將組裝好的高位與低位的結(jié)構(gòu)置于新桶數(shù)組的指定位置挪挤,
// 再將舊桶數(shù)組指定位置置為fwd表示遷移結(jié)束
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
}
}
}
}
}
// 輔助遷移操作,當(dāng)一個線程準(zhǔn)備執(zhí)行添加元素的操作時發(fā)現(xiàn)正在擴容关翎,那么它就會停止添加操作扛门,
// 并且去輔助擴容操作。其實所謂的輔助擴容就是為當(dāng)前線程分配一段桶位區(qū)間進行元素遷移操作
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
Node<K,V>[] nextTab; int sc;
//
if (tab != null && (f instanceof ForwardingNode) &&
(nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
int rs = resizeStamp(tab.length);
while (nextTab == nextTable && table == tab &&
(sc = sizeCtl) < 0) {
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || transferIndex <= 0)
break;
// sizeCtl值原子加1纵寝,表示執(zhí)行擴容的線程由多了一個论寨,
// 那么sizeCtl在線程擴容期間表示的就是執(zhí)行擴容的線程的數(shù)量
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
transfer(tab, nextTab);
break;
}
}
return nextTab;
}
return table;
}
// 該方法的結(jié)果是一個負(fù)值
static final int resizeStamp(int n) {
return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1));
}
//...
}
5.4 獲取元素操作
5.4.1 功能描述
獲取指定key的值,需要先對key進行hash,找到對應(yīng)的桶位,然后在針對桶位的實際情況采用對應(yīng)的措施:
桶位元素即為目標(biāo)元素爽茴,紅黑樹結(jié)構(gòu)葬凳,單向鏈表結(jié)構(gòu)三種情況
5.4.2 源碼解析
public class ConcurrentHashMap<K,V> extends AbstractMap<K,V>
implements ConcurrentMap<K,V>, Serializable {
//...
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
int h = spread(key.hashCode());
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {// e為桶位元素
// 針對桶位元素就是目標(biāo)元素的情況
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
// 如果頭結(jié)點的 hash 小于 0,說明 正在擴容闹啦,或者該位置是紅黑樹
else if (eh < 0)
return (p = e.find(h, key)) != null ? p.val : null;
// 針對鏈表進行處理沮明,獲取指定的值
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}
static class Node<K,V> implements Map.Entry<K,V> {
Node<K,V> find(int h, Object k) {
Node<K,V> e = this;
if (k != null) {
// 遍歷操作與鏈表一樣,是因為紅黑樹結(jié)構(gòu)其實還是一個雙向鏈表
do {
K ek;
if (e.hash == h &&
((ek = e.key) == k || (ek != null && k.equals(ek))))
return e;
} while ((e = e.next) != null);
}
return null;
}
}
//...
}
5.5 紅黑樹
5.5.1 樹形化操作
5.5.1.1 功能描述
ConcurrentHashMap的紅黑樹操作與HashMap的基本一致窍奋,底層涉及到的都是純粹的紅黑樹數(shù)據(jù)結(jié)構(gòu)的操作荐健,那是固定的。
參考Java基礎(chǔ)系列-HashMap 1.8(二)
5.5.1.2 源碼解析
(略)
六琳袄、總結(jié)
6.1 sizeCtr
- 0:default江场,默認(rèn)值,在使用無參構(gòu)造器構(gòu)造實例時
- -1:初始化桶數(shù)組
- ((rs << RESIZE_STAMP_SHIFT)+2) +N:數(shù)組擴容窖逗,代表線程數(shù)量址否,但有個基數(shù)(rs << RESIZE_STAMP_SHIFT)+2,在此基數(shù)上進行增減碎紊,每有一個線程參加擴容佑附,該值+1,否則減1仗考,擴容結(jié)束會恢復(fù)基數(shù)值音同。
- 0.75*table.length:正常狀態(tài),代表擴容閾值
6.2 hash
- -1:(MOVED)表示正在擴容秃嗜,且當(dāng)前桶位的元素已遷移完畢权均。
- -2:(TREEBIN)表示紅黑樹
參考: