簡介
ConcurrentHashMap是java.util.concurrent
包下的重要成員,也是Java并發(fā)容器中使用較為頻繁的励堡。
ConcurrentHashMap不是采用和HashTable一樣的方法聊倔,在訪問操作的時候使用synchronized
保證只有一個線程能操作茶鹃,而通過一種更細(xì)粒度的鎖來實現(xiàn)鎖住部分哈希表科侈,從而提高并發(fā)性能戚长。
但是在使用上ConcurrentHashMap是可以與HashTable互操作的批钠,它遵循HashTable的規(guī)范宇植,也就是說在Java5之后,隨著ConcurrentHashMap的引入埋心,想要提高性能的話可以把使用HashTable的地方替換成ConcurrentHashMap指郁。可以說拷呆,ConcurrentHashMap的到來標(biāo)志之HashTable的過時闲坎。
如果性能不是考慮的因素,只需要保證Map的線程安全的話茬斧,還可以通過Java的集合工具類獲取一個經(jīng)過包裝的SynchronizedMap
腰懂,原理就是通過包裝器模式(也叫裝飾模式)把所有對原Map的操作進(jìn)行一次synchronized
包裝:
Map m = Collections.synchronizedMap(new HashMap())
理解ConcurrentHashMap的前提
由于涉及到并發(fā)編程,也就是說我們要理解ConcurrentHashMap的時候需要知道一些Java中提供的并發(fā)編程類庫和知識體系项秉,下面我主要大概介紹下ConcurrentHashMap需要提前知道的知識點绣溜,不做具體深入。
volatile
關(guān)鍵字
Java中使用volatile
關(guān)鍵字來保證變量的修改能夠被其他線程知道娄蔼,讀和寫一個volatile變量都有全局的排序(也就是不會被重排序怖喻,Java內(nèi)存允許編譯器和處理器對指令重排序),volatile變量不會被緩存在寄存器或者對其他處理器不可見的地方岁诉。
ReentrantLock
ReentrantLock是Java中用于并發(fā)編程的一個可重入的互斥鎖锚沸,通常稱為再入鎖。Java5之后提供的鎖實現(xiàn)涕癣,通過lock()
實現(xiàn)加鎖哗蜈,通過unlock()
實現(xiàn)鎖釋放。
相對于synchronized
同步鎖坠韩,支持公平鎖和非公平鎖來實現(xiàn)一定程度的公平性距潘,代碼書寫也相對靈活。
Unsafe
JDK中提供了類sun.misc.Unsafe
來支持任意內(nèi)存地址位置的數(shù)據(jù)讀寫和一些CAS(compareAndSwap)原子操作只搁,Unsafe不對用戶開放绽昼,只對Java核心類庫開放,通過判斷調(diào)用者是否為Bootstrap類加載器加載的類來判斷须蜗,可用通過反射獲取該實例硅确,但是建議在不得已的情況下不要使用Unsafe目溉。
CAS
CAS(compare and swap,比較并交換)是實現(xiàn)原子操作的一種菱农,可用于在多線程編程中實現(xiàn)不被打斷的數(shù)據(jù)交換操作缭付,從而避免多線程同時改寫某一數(shù)據(jù)時由于執(zhí)行順序不確定性以及中斷的不可預(yù)知性產(chǎn)生的數(shù)據(jù)不一致問題。 該操作通過將內(nèi)存中的值與指定數(shù)據(jù)進(jìn)行比較循未,當(dāng)數(shù)值一樣時將內(nèi)存中的數(shù)據(jù)替換為新的值陷猫。
ConcurrentHashMap分析
ConcurrentHashMap隨著Java的升級過程中也一直在升級演變,所以這里主要分析大概架構(gòu)以及對比Java7和Java8中ConcurrentHashMap的變化的妖。
ConcurrentHashMap結(jié)構(gòu)
ConcurrentHashMap的大致結(jié)構(gòu)如下绣檬,通過Segment對Map進(jìn)行分段保存,當(dāng)操作的時候只需要對對應(yīng)的Segment進(jìn)行鎖定即可(Segment自身繼承了ReentrantLock)嫂粟,不用像HashTable一樣對整個鏈表鎖定娇未,進(jìn)而大大提高性能。
ConcurrentHashMap In Java7
幾個重要常量
// 如果沒有指定Map大小星虹,默認(rèn)為16
static final int DEFAULT_INITIAL_CAPACITY = 16;
// 如果沒有指定默認(rèn)的加載因子
static final float DEFAULT_LOAD_FACTOR = 0.75f;
// 默認(rèn)的并發(fā)等級零抬,也就是Segment數(shù)組的大小
static final int DEFAULT_CONCURRENCY_LEVEL = 16;
// 該Map的最大容量,即2的30次方
static final int MAXIMUM_CAPACITY = 1 << 30;
// Segment中HashEntry數(shù)組的最小長度宽涌,必須是2的倍數(shù)平夜,最小是2為了避免懶加載構(gòu)造之后下次調(diào)用會立即擴容
static final int MIN_SEGMENT_TABLE_CAPACITY = 2;
// Segment數(shù)組的默認(rèn)最長長度(2的16次方),必須是2的倍數(shù)卸亮,Segment最大是`1<<24`(2的24次方)
static final int MAX_SEGMENTS = 1 << 16; //比較保守
// 在非同步操作的情況下的重試次數(shù)(size,containsValue)忽妒,避免在哈希表在連續(xù)修改的情況下出現(xiàn)無限重試
static final int RETRIES_BEFORE_LOCK = 2;
創(chuàng)建&初始化
- ConcurrentHashMap構(gòu)造函數(shù)
// 三個參數(shù)的構(gòu)造函數(shù)(初始化大小,加載因子兼贸,并發(fā)等級)
public ConcurrentHashMap(int initialCapacity,float loadFactor, int concurrencyLevel)
// 兩個參數(shù)的構(gòu)造函數(shù)(初始化大小段直,加載因子)
public ConcurrentHashMap(int initialCapacity, float loadFactor)
// 一個參數(shù)的構(gòu)造函數(shù)(初始化大小)
public ConcurrentHashMap(int initialCapacity)
// 無參構(gòu)造函數(shù)
public ConcurrentHashMap()
// 通過其他Map創(chuàng)建
public ConcurrentHashMap(Map<? extends K, ? extends V> m)
其中寝受,需要關(guān)心的三個參數(shù)的構(gòu)造函數(shù),其他構(gòu)造函數(shù)都是自定義+默認(rèn)值(上述常量)最終調(diào)用三個參數(shù)的構(gòu)造函數(shù)創(chuàng)建
- 初始化
public ConcurrentHashMap(int initialCapacity,float loadFactor, int concurrencyLevel) {
// 檢查
if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
// 并發(fā)等級不能超過MAX_SEGMENTS
if (concurrencyLevel > MAX_SEGMENTS)
concurrencyLevel = MAX_SEGMENTS;
// 根據(jù)concurrencyLevel計算Segment的長度罕偎,必須是2的倍數(shù)
int sshift = 0;
int ssize = 1;
while (ssize < concurrencyLevel) {
++sshift;
ssize <<= 1;
}
// 保存segmentShift和segmentMask用于索引Segment
this.segmentShift = 32 - sshift;
this.segmentMask = ssize - 1;
// 初始大小不能超過MAXIMUM_CAPACITY
if (initialCapacity > MAXIMUM_CAPACITY)
initialCapacity = MAXIMUM_CAPACITY;
// 計算每個Segment大小很澄,初始化大小除以segment數(shù)組長度
int c = initialCapacity / ssize;
if (c * ssize < initialCapacity)
++c;
// 計算每個Segment初始化HashEntry大小,最小為MIN_SEGMENT_TABLE_CAPACITY
int cap = MIN_SEGMENT_TABLE_CAPACITY;
while (cap < c)
cap <<= 1;
// 創(chuàng)建Segment數(shù)組颜及,和第一個Segment
Segment<K,V> s0 =
new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
(HashEntry<K,V>[])new HashEntry[cap]);
Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
// 使用UNSAFE寫入Segment數(shù)組
UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
this.segments = ss;
}
其中甩苛,為何在初始化期間創(chuàng)建第一個Segment是因為后面的Segment的創(chuàng)建都是以segments[0]作為模板創(chuàng)建的
get操作
get操作只是獲取操作,這里沒有任何同步代碼或者鎖俏站,支持對數(shù)據(jù)的可見性
public V get(Object key) {
// 手動集成訪問方法以減少開銷
Segment<K,V> s; // manually integrate access methods to reduce overhead
HashEntry<K,V>[] tab;
// 獲取hash值
int h = hash(key);
// 通過位運算計算散列所在Segment
long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
// 通過UNSAFE獲取對應(yīng)的Segment讯蒲,getObjectVolatile方法能保證其可見性
if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
(tab = s.table) != null) {
// 遍歷獲取到對應(yīng)的值
for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
(tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
e != null; e = e.next) {
K k;
if ((k = e.key) == key || (e.hash == h && key.equals(k)))
return e.value;
}
}
// 找不到返回空
return null;
}
put操作
public V put(K key, V value) {
Segment<K,V> s;
if (value == null)
throw new NullPointerException();
//和get操作一樣定位到Segment
int hash = hash(key);
int j = (hash >>> segmentShift) & segmentMask;
if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck
(segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment
s = ensureSegment(j);
//通過Segment的put操作保存
return s.put(key, hash, value, false);
}
所以,重點的邏輯在Segment的put方法中:
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
// 無論如何獲取鎖肄扎,首先嘗試獲取鎖墨林,如果獲取不到通過scanAndLockForPut獲取鎖
// scanAndLockForPut會通過while循環(huán)獲取鎖赁酝,直到獲取到鎖
HashEntry<K,V> node = tryLock() ? null :
scanAndLockForPut(key, hash, value);
V oldValue;
try {
//通過hash值定位到對應(yīng)鏈表即HashEntry
HashEntry<K,V>[] tab = table;
int index = (tab.length - 1) & hash;
HashEntry<K,V> first = entryAt(tab, index);//定位到具體HashEntry
for (HashEntry<K,V> e = first;;) {
if (e != null) {
//如果已經(jīng)存在相同key,更新value
K k;
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k))) {
oldValue = e.value;
if (!onlyIfAbsent) {
e.value = value;
++modCount;
}
break;
}
e = e.next;
}
else {
//如果不存在key則新增一個元素
if (node != null)
node.setNext(first);
else
node = new HashEntry<K,V>(hash, key, value, first);
int c = count + 1;
//如果已經(jīng)超過閾值則執(zhí)行rehash進(jìn)行擴容,然后保存
//注意旭等,擴容只是在Segment中擴容
if (c > threshold && tab.length < MAXIMUM_CAPACITY)
rehash(node);
else
setEntryAt(tab, index, node);
++modCount;
count = c;
oldValue = null;
break;
}
}
} finally {
unlock();
}
return oldValue;
}
size操作
ConcurrentHashMap中的size是一個比較有意思的操作酌呆,因為獲取Map大小是一個比較常用的方法,如果不加鎖獲取可能會不準(zhǔn)確因為同時可能有其他線程在操作map搔耕,如果加鎖獲取將會比較耗時隙袁,這里采用了通過指定重試次數(shù)(RETRIES_BEFORE_LOCK)(2次)
的方法來獲取大小弃榨;具體就是在在重試次數(shù)內(nèi)比較兩次獲取的size菩收,如果一直則認(rèn)為是準(zhǔn)確值,如果不一致進(jìn)行鎖定然后再獲取size
public int size() {
// Try a few times to get accurate count. On failure due to
// continuous async changes in table, resort to locking.
final Segment<K,V>[] segments = this.segments;
int size;
boolean overflow; // true if size overflows 32 bits
long sum; // sum of modCounts
long last = 0L; // previous sum
int retries = -1; // first iteration isn't retry
try {
for (;;) {
//如果達(dá)到重試次數(shù)鲸睛,鎖定
if (retries++ == RETRIES_BEFORE_LOCK) {
for (int j = 0; j < segments.length; ++j)
ensureSegment(j).lock(); // force creation
}
sum = 0L;
size = 0;
overflow = false;
//計算size
for (int j = 0; j < segments.length; ++j) {
Segment<K,V> seg = segmentAt(segments, j);
if (seg != null) {
sum += seg.modCount;
int c = seg.count;
if (c < 0 || (size += c) < 0)
overflow = true;
}
}
//比較兩次獲取的結(jié)果是否一致
if (sum == last)
break;
last = sum;
}
} finally {
//如果鎖定娜饵,則釋放鎖
if (retries > RETRIES_BEFORE_LOCK) {
for (int j = 0; j < segments.length; ++j)
segmentAt(segments, j).unlock();
}
}
return overflow ? Integer.MAX_VALUE : size;
}
ConcurrentHashMap在Java8新變化
ConcurrentHashMap在Java8中的已經(jīng)發(fā)生比較大的變化,基本的結(jié)構(gòu)和Java8的HashMap差不多腊凶,同樣的當(dāng)鏈表長度超過8的時候使用紅黑樹划咐;使用volatile
保證可見性,Segment已經(jīng)被取消钧萍,保留Segment只是為了反序列化的兼容褐缠,初始化改為懶加載,并發(fā)控制精確每個鏈表风瘦,使用synchronize
同步鎖代替再入鎖队魏,使用CAS操作等。
數(shù)據(jù)結(jié)構(gòu)
put操作
get操作就是類似HashMap的檢索比較万搔,這里重點關(guān)注put操作胡桨,看看在Java8中是如何實現(xiàn)并發(fā)操作的,其他操作暫不展開
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0)
//如果是第一次put瞬雹,進(jìn)行初始化(懶加載)
tab = initTable();
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
//如果鏈表為空昧谊,使用CAS進(jìn)行無鎖put
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
else if ((fh = f.hash) == MOVED)
//如果處于擴容階段,
tab = helpTransfer(tab, f);
else {
V oldVal = null;
//這里在鏈表/樹的第一個元素加鎖
synchronized (f) {
if (tabAt(tab, i) == f) {
if (fh >= 0) {
//如果是鏈表
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
else if (f instanceof TreeBin) {
//如果是紅黑樹
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
//如果是長度達(dá)到TREEIFY_THRESHOLD(默認(rèn)是8)酗捌,執(zhí)行樹化操作
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
addCount(1L, binCount);
return null;
}
懶加載初始化
這里可以看到使用CAS操作呢诬,實現(xiàn)table的初始化,并且使用雙重檢查(double-check)
保證重復(fù)初始化
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
//進(jìn)入循環(huán)檢查
while ((tab = table) == null || tab.length == 0) {
if ((sc = sizeCtl) < 0)
//如果正在初始化胖缤,等待
Thread.yield(); // lost initialization race; just spin
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
//進(jìn)入初始化操作
try {
//再次檢查
if ((tab = table) == null || tab.length == 0) {
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
break;
}
}
return tab;
}
總結(jié)
可以看到尚镰,隨之Java平臺的升級,ConcurrentHashMap也是逐漸演變哪廓,但是都是為了更好的達(dá)到ConcurrentHashMap的設(shè)計目標(biāo):
- 在最小化實現(xiàn)更新操作的同時保持并發(fā)的可讀性
- 是空間消耗到達(dá)和HashMap差不多或者更好
ConcurrentHashMap還有續(xù)續(xù)多多的實現(xiàn)細(xì)節(jié)狗唉,需要仔細(xì)閱讀源碼才能理解,并且涉及到許多底層操作的實現(xiàn)涡真,能力有限分俯,暫時能理解的就這么多肾筐,后續(xù)希望繼續(xù)深入理解。