此文部分內容來自 https://javadoop.com/post/hashmap
Hashmap
多線程死循環(huán)
主要是多線程同時put時,如果同時觸發(fā)了rehash操作座每,會導致HashMap中的鏈表中出現(xiàn)循環(huán)節(jié)點圾旨,進而使得后面get的時候,會死循環(huán)惹谐。
ConcurrentHashmap JDK7
ConcurrentHashMap允許多個修改(寫)操作并發(fā)進行持偏,其關鍵在于使用了鎖分段技術,它使用了不同的鎖來控制對哈希表的不同部分進行的修改(寫)氨肌,而 ConcurrentHashMap 內部使用段(Segment)來表示這些不同的部分鸿秆。
segmentMask; // 用于定位段,大小等于segments數(shù)組的大小減 1怎囚,是不可變的
segmentShift; // 用于定位段卿叽,大小等于32(hash值的位數(shù))減去對segments的大小取以2為底的對數(shù)值,是不可變的
1 Segment
Segment 類繼承于 ReentrantLock 類恳守,從而使得 Segment 對象能充當鎖的角色考婴。每個 Segment 對象用來守護它的成員對象 table 中包含的若干個桶。table 是一個由 HashEntry 對象組成的鏈表數(shù)組催烘,table 數(shù)組的每一個數(shù)組成員就是一個桶沥阱。
segment中的字段
volatile int count 是用于計數(shù)每個segment管理的所有 table 數(shù)組包含的 HashEntry 對象的個數(shù),count 不是concurrenthashmap的全局變量的原因是 當需要更新count的時候不需要鎖住整個CHM颗圣。
volatile HashEntry<K,V>[] table 鏈表數(shù)組
2 HashEntry
==CHM中不需要對讀加鎖的原因== 其一是HashEntry內 key hash next這些字段都時final的喳钟,所以hashentry對象基本是不可變的 另一個原因則是 value域也被volatile修飾所以可以確保讀線程讀到最后一次更新后的值。
next是final意味著不能從hash鏈的中間和尾部添加刪除節(jié)點在岂,所以所有節(jié)點的修改只能從頭部開始奔则,對于put操作也是頭插法,而remove()刪除節(jié)點的時候也是需要重新new這個節(jié)點前面的所有節(jié)點并刪除原來的節(jié)點再把前部的最后一個節(jié)點的next指向刪除節(jié)點的后一個節(jié)點蔽午。
3 concurrentHashMap的構造函數(shù)
主要有三個參數(shù) 初始容量易茬、負載因子 和 并發(fā)級別,默認分別為16及老,0.15,16 依據(jù)這三個來初始化segments數(shù)組(長度為2的n次冪)抽莱,段偏移量 segmentShift(32-n),段掩碼segmentMask(2^n-1 二進制為n個1)骄恶,每個segment食铐。
4 concurrentHashMap的數(shù)據(jù)結構
5 定位操作
final Segment<K,V> segmentFor(int hash) {
return segments[(hash >>> segmentShift) & segmentMask];
}
讓key的高n位和段掩碼做與操作。就可以具體定位到桶位僧鲁。因為 segmentshift是 32-n
并發(fā)存取
線程對映射表做讀操作的時候不需要加鎖虐呻,而對容器做結構型的修改操作(put remove)則需要加鎖
put操作
==ConcurrentHashMap不同于HashMap象泵,它既不允許key值為null,也不允許value值為null斟叼。==
當我們向ConcurrentHashMap中put一個Key/Value對時偶惠,首先會獲得Key的哈希值并對其再次哈希,然后根據(jù)最終的hash值定位到這條記錄所應該插入的段朗涩。(見上面的第5點)
JDK7 put源碼
//主流程
public V put(K key, V value) {
Segment<K,V> s;
if (value == null)
throw new NullPointerException();
// 1. 計算 key 的 hash 值
int hash = hash(key);
// 2. 根據(jù) hash 值找到 Segment 數(shù)組中的位置 j
// hash 是 32 位忽孽,無符號右移 segmentShift(28) 位,剩下低 4 位谢床,
// 然后和 segmentMask(15) 做一次與操作兄一,也就是說 j 是 hash 值的最后 4 位,也就是槽的數(shù)組下標
int j = (hash >>> segmentShift) & segmentMask;
// 剛剛說了萤悴,初始化的時候初始化了 segment[0]瘾腰,但是其他位置還是 null,
// ensureSegment(j) 對 segment[j] 進行初始化
if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck
(segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment
s = ensureSegment(j);
// 3. 插入新值到 槽 s 中
return s.put(key, hash, value, false);
}
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
// 在往該 segment 寫入前覆履,需要先獲取該 segment 的獨占鎖
HashEntry<K,V> node = tryLock() ? null :
scanAndLockForPut(key, hash, value);
V oldValue;
try {
// 這個是 segment 內部的數(shù)組
HashEntry<K,V>[] tab = table;
// 再利用 hash 值蹋盆,求應該放置的數(shù)組下標
int index = (tab.length - 1) & hash;
// first 是數(shù)組該位置處的鏈表的表頭
HashEntry<K,V> first = entryAt(tab, index);
// 下面這串 for 循環(huán)雖然很長,不過也很好理解硝全,想想該位置沒有任何元素和已經存在一個鏈表這兩種情況
for (HashEntry<K,V> e = first;;) {
if (e != null) {
K k;
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k))) {
oldValue = e.value;
if (!onlyIfAbsent) {
// 覆蓋舊值
e.value = value;
++modCount;
}
break;
}
// 繼續(xù)順著鏈表走
e = e.next;
}
else {
// node 到底是不是 null栖雾,這個要看獲取鎖的過程,不過和這里都沒有關系伟众。
// 如果不為 null析藕,那就直接將它設置為鏈表表頭;如果是null凳厢,初始化并設置為鏈表表頭账胧。
if (node != null)
node.setNext(first);
else
node = new HashEntry<K,V>(hash, key, value, first);
int c = count + 1;
// 如果超過了該 segment 的閾值,這個 segment 需要擴容
if (c > threshold && tab.length < MAXIMUM_CAPACITY)
rehash(node); // 擴容后面也會具體分析
else
// 沒有達到閾值先紫,將 node 放到數(shù)組 tab 的 index 位置治泥,
// 其實就是將新的節(jié)點設置成原鏈表的表頭
setEntryAt(tab, index, node);
++modCount;
count = c;
oldValue = null;
break;
}
}
} finally {
// 解鎖
unlock();
}
return oldValue;
獲取寫入鎖
node = tryLock() ? null : scanAndLockForPut(key, hash, value)根據(jù)tryLock()快速獲取這個segment的獨占鎖,如果獲取失敗 就進入到 scanAndLockForPut(key,hash,value)來獲取鎖該方法的源碼如下
private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
HashEntry<K,V> first = entryForHash(this, hash);
HashEntry<K,V> e = first;
HashEntry<K,V> node = null;
int retries = -1; // negative while locating node
// 循環(huán)獲取鎖
while (!tryLock()) {
HashEntry<K,V> f; // to recheck first below
if (retries < 0) {
if (e == null) {
if (node == null) // speculatively create node
// 進到這里說明數(shù)組該位置的鏈表是空的遮精,沒有任何元素
// 當然居夹,進到這里的另一個原因是 tryLock() 失敗,所以該槽存在并發(fā)本冲,不一定是該位置
node = new HashEntry<K,V>(hash, key, value, null);
retries = 0;
}
else if (key.equals(e.key))
retries = 0;
else
// 順著鏈表往下走
e = e.next;
}
// 重試次數(shù)如果超過 MAX_SCAN_RETRIES(單核1多核64)准脂,那么不搶了,進入到阻塞隊列等待鎖
// lock() 是阻塞方法檬洞,直到獲取鎖后返回
else if (++retries > MAX_SCAN_RETRIES) {
lock();
break;
}
else if ((retries & 1) == 0 &&
// 這個時候是有大問題了狸膏,那就是有新的元素進到了鏈表,成為了新的表頭
// 所以這邊的策略是添怔,相當于重新進入 scanAndLockForPut 方法
(f = entryForHash(this, hash)) != first) {
e = first = f; // re-traverse if entry changed
retries = -1;
}
}
return node;
}
這個方法有兩個出口湾戳,一個是 tryLock() 成功了闷板,循環(huán)終止,另一個就是重試次數(shù)超過了 MAX_SCAN_RETRIES院塞,進到 lock() 方法,此方法會阻塞等待性昭,直到成功拿到獨占鎖拦止。
實質上這個方法的功能是獲取segment的獨占鎖,然后實例化node
rehash
segment數(shù)組不能擴容 糜颠,擴容是針對于HashEntry[]
/*
* Reclassify nodes in each list to new table. Because we
* are using power-of-two expansion, the elements from
* each bin must either stay at same index, or move with a
* power of two offset. We eliminate unnecessary node
* creation by catching cases where old nodes can be
* reused because their next fields won't change.
* Statistically, at the default threshold, only about
* one-sixth of them need cloning when a table
* doubles. The nodes they replace will be garbage
* collectable as soon as they are no longer referenced by
* any reader thread that may be in the midst of
* concurrently traversing table. Entry accesses use plain
* array indexing because they are followed by volatile
* table write.
*/
// 方法參數(shù)上的 node 是這次擴容后汹族,需要添加到新的數(shù)組中的數(shù)據(jù)。
private void rehash(HashEntry<K,V> node) {
HashEntry<K,V>[] oldTable = table;
int oldCapacity = oldTable.length;
// 2 倍
int newCapacity = oldCapacity << 1;
threshold = (int)(newCapacity * loadFactor);
// 創(chuàng)建新數(shù)組
HashEntry<K,V>[] newTable =
(HashEntry<K,V>[]) new HashEntry[newCapacity];
// 新的掩碼其兴,如從 16 擴容到 32顶瞒,那么 sizeMask 為 31,對應二進制 ‘000...00011111’
int sizeMask = newCapacity - 1;
// 遍歷原數(shù)組元旬,老套路榴徐,將原數(shù)組位置 i 處的鏈表拆分到 新數(shù)組位置 i 和 i+oldCap 兩個位置
for (int i = 0; i < oldCapacity ; i++) {
// e 是鏈表的第一個元素
HashEntry<K,V> e = oldTable[i];
if (e != null) {
HashEntry<K,V> next = e.next;
// 計算應該放置在新數(shù)組中的位置,
// 假設原數(shù)組長度為 16匀归,e 在 oldTable[3] 處坑资,那么 idx 只可能是 3 或者是 3 + 16 = 19
int idx = e.hash & sizeMask;
if (next == null) // 該位置處只有一個元素,那比較好辦
newTable[idx] = e;
else { // Reuse consecutive sequence at same slot
// e 是鏈表表頭
HashEntry<K,V> lastRun = e;
// idx 是當前鏈表的頭結點 e 的新位置
int lastIdx = idx;
// 下面這個 for 循環(huán)會找到一個 lastRun 節(jié)點穆端,這個節(jié)點之后的所有元素是將要放到一起的
// 尋找k值相同的子鏈袱贮,該子鏈尾節(jié)點與父鏈的尾節(jié)點必須是同一個
for (HashEntry<K,V> last = next;
last != null;
last = last.next) {
int k = last.hash & sizeMask;
if (k != lastIdx) {
lastIdx = k;
lastRun = last;
}
}
// 將 lastRun 及其之后的所有節(jié)點組成的這個鏈表放到 lastIdx 這個位置
newTable[lastIdx] = lastRun;
// 下面的操作是處理 lastRun 之前的節(jié)點,
// 這些節(jié)點可能分配在另一個鏈表中体啰,也可能分配到上面的那個鏈表中
for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
V v = p.value;
int h = p.hash;
int k = h & sizeMask;
HashEntry<K,V> n = newTable[k];
newTable[k] = new HashEntry<K,V>(h, p.key, v, n);
}
}
}
}
// 將新來的 node 放到新數(shù)組中剛剛的 兩個鏈表之一 的 頭部
int nodeIndex = node.hash & sizeMask; // add the new node
node.setNext(newTable[nodeIndex]);
newTable[nodeIndex] = node;
table = newTable;
}
擴容后鍵值對的新位置要么和原位置一樣攒巍,要么就是原位+舊數(shù)組的長度
上面的代碼先找出擴容前后需要轉移的點,先執(zhí)行轉移荒勇,再把該鏈條上剩下的節(jié)點轉移柒莉,這么寫是起到了復用的效果,官方注釋中也說了 at the default threshold, only about one-sixth of them need cloning when a table doubles 在默認閾值下只有大約1/6的節(jié)點需要被克隆枕屉。
get比較簡單不需要加鎖步驟是
- 計算hash值找到segment數(shù)組的位置
- segment中根據(jù)hash找到hashEntry數(shù)組中具體的位置對應的鏈表
- 遍歷鏈表找到具體的KV
源碼如下
public V get(Object key) {
Segment<K,V> s; // manually integrate access methods to reduce overhead
HashEntry<K,V>[] tab;
// 1. hash 值
int h = hash(key);
long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
// 2. 根據(jù) hash 找到對應的 segment
if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
(tab = s.table) != null) {
// 3. 找到segment 內部數(shù)組相應位置的鏈表常柄,遍歷
for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
(tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
e != null; e = e.next) {
K k;
if ((k = e.key) == key || (e.hash == h && key.equals(k)))
return e.value;
}
}
return null;
}
Q:如果在執(zhí)行get的時候在同一個segment中發(fā)生了put或remove操作呢?
1.put操作的線程安全性的體現(xiàn)
- 使用CAS在初始化segment數(shù)組
- 鏈表中添加節(jié)點使用頭插法搀擂,如果此時get遍歷鏈表已經到了中間就不會對get操作有影響西潘,如果put happens-before get 需要get到剛剛插入到的頭節(jié)點,就依賴setEntryAt方法中使用的 UNSAFE.putOrderedObject.
- 擴容哨颂,創(chuàng)建了新的數(shù)組如果這個時候有get,get先行就在舊table上查詢喷市,put先行,put的操作可見性保證就是table使用了volatile關鍵字
transient volatile HashEntry<K,V>[] table;
- remove操作的線程安全性
remove操作會更改鏈表的結構威恼,如果remove的是get已經遍歷過的節(jié)點品姓,則不會發(fā)生問題寝并。
如果 remove 先破壞了一個節(jié)點,分兩種情況考慮腹备。 1衬潦、如果此節(jié)點是頭結點,那么需要將頭結點的 next 設置為數(shù)組該位置的元素植酥,table 雖然使用了 volatile 修飾镀岛,但是 volatile 并不能提供數(shù)組內部操作的可見性保證,所以源碼中使用了 UNSAFE 來操作數(shù)組友驮,請看方法 setEntryAt漂羊。2、如果要刪除的節(jié)點不是頭結點卸留,它會將要刪除節(jié)點的后繼節(jié)點接到前驅節(jié)點中走越,這里的并發(fā)保證就是 next 屬性是 volatile 的。
ConcurrentHashmap JDK8
類似于8中的HashMap的改進,ConcurrentHashMap也引入了紅黑樹耻瑟。