ConcurrentHashMap
在JAVA線程安全的并發(fā)集合有HashTable,而HashTable
是一種通過在方法上使用Synchronize的簡單粗暴的方式進行保證線程的安全燃逻,ConcurrentHashMap則是一個高效的并發(fā)集合蝙云。ConcurrentHashMap不是簡單粗暴的在方法上使用Synchronize,而是通過分段(segment) 鎖的方式進行處理并發(fā)問題在旱。ConcurrentHashMap的鎖機制更類似于我們數(shù)據(jù)庫中的行鎖偷线,而HashTable則是整張表鎖磨确。下圖為ConcurrentHashMap的數(shù)據(jù)結(jié)構(gòu)。
取模算法
ConcurrentHashMap獲取Segment或者在Segment中獲取Entry都是是通過mod求余來確定當前的Segment或者Entry.而在ConcurrentHashMap不是使用“%”的方式來進行求余值淋昭,而是通過&位運算的方式求余值俐填。但是這種&位運算求余的方式必須是2n
公式:a%(2n)是等價于a&(2n-1)。
System.out.println("345%16="+(345%16)+" 等價于: 345&(16-1)="+(345&(16-1)));
輸出結(jié)果:
345%16=9 等價于: 345&(16-1)=9
ConcurrentHashMap初始化
public ConcurrentHashMap() {
this(DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);
}
@SuppressWarnings("unchecked")
public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
if (concurrencyLevel > MAX_SEGMENTS)
concurrencyLevel = MAX_SEGMENTS;
// Find power-of-two sizes best matching arguments
int sshift = 0;
int ssize = 1;
//根據(jù)并發(fā)級別進行計算Segment的size和位移量
while (ssize < concurrencyLevel) {
++sshift;
ssize <<= 1;
}
//計算key&(2n-1)的值和key左移量
this.segmentShift = 32 - sshift;
this.segmentMask = ssize - 1;
if (initialCapacity > MAXIMUM_CAPACITY)
initialCapacity = MAXIMUM_CAPACITY;
//16384
int c = initialCapacity / ssize;
if (c * ssize < initialCapacity)
++c;
int cap = MIN_SEGMENT_TABLE_CAPACITY;
while (cap < c)
cap <<= 1;
// create segments and segments[0]
Segment<K,V> s0 =
new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
(HashEntry<K,V>[])new HashEntry[cap]);
Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
this.segments = ss;
}
我們通過源碼的初始化方法進行分析得出:
- ConcurrentHashMap初始化的時候會傳入:
(1)initialCapacity初始容量翔忽,而initialCapacity主要用于計算初始化Segment中的Table的初始化容量英融。
(2) loadFactor負載因子,則是用來計算Table的擴展闊值歇式。當Table達到闊值的時候就會進行動態(tài)擴展驶悟。
(3) concurrencyLevel并發(fā)級別,則是用來確定Segment的大小材失。默認系統(tǒng)設(shè)置16個segment痕鳍。 - 初始化的時候默認先計算segmentshift和sgmentMask,而上面已經(jīng)說過ConcurrentHashMap是通過&運算求余的方式進行確定當前Key值對應(yīng)的位置。那么segmentShift則是(2n-1),sgmentMask則是key左移的值笼呆,ConcurrentHashMap是對key左移sgmentMask后再進行&操作熊响。
- 默認配置下創(chuàng)建1個Segment,而Segment是不可以進行動態(tài)擴容的诗赌。但是Segment中的Table根據(jù)闊值可以進行動態(tài)擴容汗茄。
put
- 創(chuàng)建Segment
@SuppressWarnings("unchecked")
public V put(K key, V value) {
Segment<K,V> s;
if (value == null)
throw new NullPointerException();
//計算hash值,在hash方法中會對對象中的hashCode進行Hash
int hash = hash(key);
//對hash值進行左位移后铭若,求余
int j = (hash >>> segmentShift) & segmentMask;
//判斷當這個Segment是否為Null,如果為Null創(chuàng)建一個Segment
if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck
(segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment
s = ensureSegment(j);
return s.put(key, hash, value, false);
}
@SuppressWarnings("unchecked")
private Segment<K,V> ensureSegment(int k) {
final Segment<K,V>[] ss = this.segments;
long u = (k << SSHIFT) + SBASE; // raw offset
Segment<K,V> seg;
//判斷是否存在segament洪碳,因為不是線程安全的所以需要多次檢測是否存在Segment
if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {
Segment<K,V> proto = ss[0]; // use segment 0 as prototype
//初始化Segment中Table的闊值、容量叼屠、負載因子
int cap = proto.table.length;
float lf = proto.loadFactor;
int threshold = (int)(cap * lf);
//創(chuàng)建Table
HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap];
//判斷是否存在segament瞳腌,因為不是線程安全的所以需要多次檢測是否存在Segment
if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
== null) { // recheck
Segment<K,V> s = new Segment<K,V>(lf, threshold, tab);
while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
== null) {
//使用CAS方式進行插入Segment
if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))
break;
}
}
}
return seg;
}
private int hash(Object k) {
int h = hashSeed;
if ((0 != h) && (k instanceof String)) {
return sun.misc.Hashing.stringHash32((String) k);
}
h ^= k.hashCode();
// Spread bits to regularize both segment and index locations,
// using variant of single-word Wang/Jenkins hash.
h += (h << 15) ^ 0xffffcd7d;
h ^= (h >>> 10);
h += (h << 3);
h ^= (h >>> 6);
h += (h << 2) + (h << 14);
return h ^ (h >>> 16);
}
通過對源碼的分析我們可以看到因為在初始化ConcurrentHashMap的時候只有一個Segment,當計算出key的hash值后镜雨,若該Hash值沒有對應(yīng)的Segment的時候會去創(chuàng)建一個Segment嫂侍,而因為在創(chuàng)建Segment是非加鎖的方式,因此在創(chuàng)建Segment的時候會多次判斷是否存在Segment冷离,在最后還使用了CAS的方式進行插入到Segment的數(shù)組中吵冒。
CAS的使用的標準范式纯命,在循環(huán)中使用CAS操作西剥。保證CAS一定執(zhí)行。
-
插入值
``final V put(K key, int hash, V value, boolean onlyIfAbsent) { //獲取重入鎖 HashEntry<K,V> node = tryLock() ? null : scanAndLockForPut(key, hash, value); V oldValue; try { HashEntry<K,V>[] tab = table; int index = (tab.length - 1) & hash; //根據(jù)計算的index獲取Entry HashEntry<K,V> first = entryAt(tab, index); //循環(huán)所有Entry for (HashEntry<K,V> e = first;;) { if (e != null) { K k; //判斷值是否存在相同的Key或者Hash值 if ((k = e.key) == key || (e.hash == hash && key.equals(k))) { oldValue = e.value; //判斷是否為不存在插入亿汞。 if (!onlyIfAbsent) { e.value = value; ++modCount; } break; } //迭代下一個 e = e.next; } else { if (node != null) node.setNext(first); else node = new HashEntry<K,V>(hash, key, value, first); int c = count + 1; //判斷是否達到闊值瞭空, if (c > threshold && tab.length < MAXIMUM_CAPACITY) //達到闊值,擴容并且重新計算HASH值 rehash(node); else //如果沒有達到闊值則添加到node的下一個 setEntryAt(tab, index, node); ++modCount; count = c; oldValue = null; break; } } } finally { //釋放鎖 unlock(); } return oldValue; }
``
(1)這個方法有兩個出口疗我,一個是 tryLock() 成功了咆畏,循環(huán)終止,另一個就是重試次數(shù)超過了 MAX_SCAN_RETRIES吴裤,進到 lock() 方法旧找,此方法會阻塞等待,直到成功拿到獨占鎖麦牺。
(2)循環(huán)table鏈钮蛛,如果存在HashEntry時,則進行判斷是否需要更新剖膳,如果存相等的Hash和Key魏颓,則判斷onlyIfAbsent是否為False,若為False則更新當前的值并且跳出循環(huán)返回吱晒。因為put方法有putIfAbsent和put公共甸饱,而putIfAbsent則只有當前值不存在時插入,如果存在則返回舊值,而put則是無論任何時候都會更新舊值叹话,并且將舊值返回偷遗。如果不存相等的Hash和Key。判斷當前節(jié)點是否頭節(jié)點驼壶,如果是則設(shè)置為頭節(jié)點鹦肿。然后判斷是否需要對Table進行擴容,如果需要則從新計算hash值并且將新值增加到鏈中辅柴。如果不需要擴容則直接添加到鏈中箩溃。
get
public V get(Object key) {
Segment<K,V> s; // manually integrate access methods to reduce overhead
HashEntry<K,V>[] tab;
int h = hash(key);
long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
(tab = s.table) != null) {
for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
(tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
e != null; e = e.next) {
K k;
if ((k = e.key) == key || (e.hash == h && key.equals(k)))
return e.value;
}
}
return null;
}
計算 hash 值,找到 segment 數(shù)組中的具體位置.根據(jù) hash 找到數(shù)組中具體的位置,循環(huán)鏈查找相等的值并且返回
size
public int size() {
// Try a few times to get accurate count. On failure due to
// continuous async changes in table, resort to locking.
final Segment<K,V>[] segments = this.segments;
int size;
boolean overflow; // true if size overflows 32 bits
long sum; // sum of modCounts
long last = 0L; // previous sum
int retries = -1; // first iteration isn't retry
try {
for (;;) {
if (retries++ == RETRIES_BEFORE_LOCK) {
for (int j = 0; j < segments.length; ++j)
ensureSegment(j).lock(); // force creation
}
sum = 0L;
size = 0;
overflow = false;
for (int j = 0; j < segments.length; ++j) {
Segment<K,V> seg = segmentAt(segments, j);
if (seg != null) {
sum += seg.modCount;
int c = seg.count;
if (c < 0 || (size += c) < 0)
overflow = true;
}
}
if (sum == last)
break;
last = sum;
}
} finally {
if (retries > RETRIES_BEFORE_LOCK) {
for (int j = 0; j < segments.length; ++j)
segmentAt(segments, j).unlock();
}
}
return overflow ? Integer.MAX_VALUE : size;
}
在獲取ConcurrentHashMap的size時候碌嘀,是獲取兩次size的值判斷是否一致涣旨,如果不是一致的情況下,再進行加鎖操作股冗。所以在高并發(fā)寫入的情況下避免使用Size方法霹陡,以免由于加鎖導(dǎo)致性能問題。