并發(fā)容器
HashMap的并發(fā)版本ConcurrentHashMap
ConcurrentSkipListMap 和 ConcurrentSkipListSet
LinkedList的并發(fā)版本ConcurrentLinkedQueue
寫時(shí)復(fù)制容器
CopyOnWriteArrayList
CopyOnWriteArraySet
阻塞隊(duì)列
常用阻塞隊(duì)列
·ArrayBlockingQueue:一個(gè)由數(shù)組結(jié)構(gòu)組成的有界阻塞隊(duì)列羡蛾。
·LinkedBlockingQueue:一個(gè)由鏈表結(jié)構(gòu)組成的有界阻塞隊(duì)列欠气。
·PriorityBlockingQueue:一個(gè)支持優(yōu)先級(jí)排序的無界阻塞隊(duì)列刷晋。
·DelayQueue:一個(gè)使用優(yōu)先級(jí)隊(duì)列實(shí)現(xiàn)的無界阻塞隊(duì)列游昼。
·SynchronousQueue:一個(gè)不存儲(chǔ)元素的阻塞隊(duì)列错维。
·LinkedTransferQueue:一個(gè)由鏈表結(jié)構(gòu)組成的無界阻塞隊(duì)列。
·LinkedBlockingDeque:一個(gè)由鏈表結(jié)構(gòu)組成的雙向阻塞隊(duì)列。
看ConcurrentHashMap之前先順便說說HashMap
HashMap又分jdk1.7和1.7之后版本
1.7的hashMap結(jié)構(gòu)是數(shù)組+鏈表,這里找?guī)讉€(gè)比較有意思的地方分析下
1.7的hashMap在并發(fā)下的三個(gè)bug數(shù)據(jù)丟失,數(shù)據(jù)重復(fù),死循環(huán)
hash沖突
hash沖突解決方法:1,開放尋址(開放尋址法的核心是如果出現(xiàn)了散列沖突,就重新探測(cè)一個(gè)空閑位置诱桂,將其插入。當(dāng)我們往散列表中插入數(shù)據(jù)時(shí)呈昔,如果某個(gè)數(shù)據(jù)經(jīng)過散列函數(shù)散列之后挥等,存儲(chǔ)位置已經(jīng)被占用了,我們就從當(dāng)前位置開始堤尾,依次往后查找触菜,看是否有空閑位置,直到找到為止哀峻。);
2,再散列,3鏈地址法(HashMap就是)
這里HashMap的構(gòu)造方法里默認(rèn)的數(shù)組Entry<K,V>[] table長(zhǎng)度是16(不會(huì)new,在put的時(shí)候才會(huì)去new,這個(gè)是導(dǎo)致第一個(gè)bug的原因),
第一個(gè)bug數(shù)據(jù)丟失,因?yàn)樵趐ut的時(shí)候才會(huì)new Entry<K,V>[],并發(fā)情況下就可能new多個(gè)..然后put到哪去了呢...
這里再說說第二個(gè)bug
多線程下put同一個(gè)元素,假設(shè)put的時(shí)候都前面的條件都通過了,我們看看createEntry()方法
void createEntry(int hash, K key, V value, int bucketIndex) {
Entry<K,V> e = table[bucketIndex];
table[bucketIndex] = new Entry<>(hash, key, value, e);
size++;
}
往這個(gè)table[bucketIndex]賦值的時(shí)候線程1先賦值了table[i] = tooko,線程2進(jìn)來先拿到e=線程1tooko,新建一個(gè)Entry他的next又是線程1tooko,就變成了了table[i]=線程2tooko, 線程2tooko-->線程1tooko
第二個(gè)bug數(shù)據(jù)重復(fù)
再看看擴(kuò)容,擴(kuò)容的條件是(size >= threshold) && (null != table[bucketIndex]) threshold是在put和擴(kuò)容的時(shí)候去算的為容量加載因子,容量是16加載因子0.75的話就是12;
也就是說默認(rèn)情況下,第一次擴(kuò)容是元素的數(shù)量大于等于12并且,這個(gè)下標(biāo)沒有元素的時(shí)候進(jìn)行擴(kuò)容,所以即便是map里面已經(jīng)有七八十個(gè)元素了,也可能不回去擴(kuò)容,極限情況是剛好第十二個(gè)元素put的時(shí)候去擴(kuò)容
這個(gè)newTable的length是table的length2
void transfer(Entry[] newTable, boolean rehash) {
int newCapacity = newTable.length;
for (Entry<K,V> e : table) {
while(null != e) {
Entry<K,V> next = e.next;①
if (rehash) {
e.hash = null == e.key ? 0 : hash(e.key);
}
int i = indexFor(e.hash, newCapacity);
e.next = newTable[i];②
newTable[i] = e;
e = next;
}
}
}
看看這兩步
e.next = newTable[i];
newTable[i] = e;
這兩步操作鏈表的我們用table里的nana-->konoha來分析下,假設(shè)舊table里table[3]上是nana-->konoha
這里第一次循環(huán)拿到nana后nana-->null,newTable[3]=nana;
第二次循環(huán)拿到konoha后,konoha-->nana,newTable[3]=konoha
所以在新的newTable里我們的nana-->konoha變成了konoha-->nana,就是說newTable[i]會(huì)把table[i]里的鏈表翻轉(zhuǎn)
這里如果是在多線程情況下線程1剛好進(jìn)到while循環(huán)里的①步驟拿到了e=nana,線程2把nana-->konoha變成了konoha-->nana;完了,nana-->konoha-->nana,線程1在while里出不去了
這就是HashMap的并發(fā)下的bug之一死循環(huán)
我們看看里面其他有意思的東西
table長(zhǎng)度默認(rèn)是16即便是有參構(gòu)造去賦值也會(huì)變成大于等于這個(gè)值的2的次方,比如有參構(gòu)造傳的30,那么會(huì)通過inflateTable(30)變成32,
inflateTable(16)里面有個(gè)特別流弊的位運(yùn)算取n的大于等于2的次方數(shù)
Integer.highestOneBit((number - 1) << 1);
public static int highestOneBit(int i) {
// HD, Figure 3-1
i |= (i >> 1);
i |= (i >> 2);
i |= (i >> 4);
i |= (i >> 8);
i |= (i >> 16);
return i - (i >>> 1);
}
這個(gè)算法我收藏下,嘿嘿嘿~(這個(gè)算法在1.7的ConcurrentHashMap在通過并發(fā)級(jí)別算Segment[]的長(zhǎng)度的時(shí)候用了另一種實(shí)現(xiàn),當(dāng)然只是用位運(yùn)算就實(shí)現(xiàn)了更厲害些)
int ssize = 1;
while (ssize < number) {
ssize <<= 1;
}
為什么一定要是2的次方呢,因?yàn)槿∧5牟僮?a % (2^n) 等價(jià)于 a & (2^n-1),位運(yùn)算效率高于模運(yùn)算,所以這里選擇2的次方作為table的length.
比如說一個(gè)Entry的key為"konoha",hash后的值是19,table的length是16, 19 & (16-1) = 3,相當(dāng)于19%16=3;
那么key為"konoha"的Entry就放在table[3]下另一個(gè)Entry的key為"miu"hash后是20,算出的索引就是4就放在table[4]下,
如果此時(shí)一個(gè)Entry的key為"nana"hash后是35,算出的index也是3,這個(gè)時(shí)候就用到鏈表了,table[3]下是key為"nana"的Entry,key為"nana"的Entry的next就是key為"konoha"的Entry(注意這里是頭插法,后來的掛頭上,舊的掛新的next上,所以就是table[3]=nana,nana-->konoha)
ok,hashMap就簡(jiǎn)單說說三個(gè)并發(fā)bug,還有為什么數(shù)組的長(zhǎng)度要是2的n次方
我們?cè)诳纯?.8的HashMap
首先在結(jié)構(gòu)上發(fā)生了變化,由數(shù)組+鏈表變成了數(shù)組+鏈表+紅黑樹
如果鏈表長(zhǎng)度到達(dá)閥值(默認(rèn)是8,注釋上說根據(jù)泊松分布8的時(shí)候桶里的數(shù)據(jù)已經(jīng)很少了),就會(huì)將鏈表轉(zhuǎn)換成紅黑樹
我們看看put方法
final V putVal(int hash, K key, V value, boolean onlyIfAbsent,
boolean evict) {
Node<K,V>[] tab; Node<K,V> p; int n, i;
if ((tab = table) == null || (n = tab.length) == 0)
n = (tab = resize()).length;
if ((p = tab[i = (n - 1) & hash]) == null)
tab[i] = newNode(hash, key, value, null);
else {
Node<K,V> e; K k;
if (p.hash == hash &&
((k = p.key) == key || (key != null && key.equals(k))))
e = p;
else if (p instanceof TreeNode)
e = ((TreeNode<K,V>)p).putTreeVal(this, tab, hash, key, value);
else {
for (int binCount = 0; ; ++binCount) {
if ((e = p.next) == null) {
p.next = newNode(hash, key, value, null);
if (binCount >= TREEIFY_THRESHOLD - 1) // -1 for 1st
treeifyBin(tab, hash);
break;
}
if (e.hash == hash &&
((k = e.key) == key || (key != null && key.equals(k))))
break;
p = e;
}
}
if (e != null) { // existing mapping for key
V oldValue = e.value;
if (!onlyIfAbsent || oldValue == null)
e.value = value;
afterNodeAccess(e);
return oldValue;
}
}
++modCount;
if (++size > threshold)
resize();
afterNodeInsertion(evict);
return null;
}
當(dāng)遍歷的時(shí)候發(fā)現(xiàn)table[i]的元素大于等于7(TREEIFY_THRESHOLD - 1)的時(shí)候開始使用紅黑樹treeifyBin(tab, hash),擴(kuò)容的時(shí)候發(fā)現(xiàn)newTable[i]上的元素小于6(hc <= UNTREEIFY_THRESHOLD)的時(shí)候從紅黑樹轉(zhuǎn)回鏈表;
普通的二叉樹會(huì)寫,紅黑樹太麻煩了后面再去研究
我們發(fā)現(xiàn)1.8里是先添加進(jìn)去之后再去檢測(cè)擴(kuò)容,1.7里是擴(kuò)容后再去添加
還有1.8里擴(kuò)容時(shí)的鏈表移到newTable的時(shí)候不會(huì)翻轉(zhuǎn)了
ConcruuentHashMap
1.7中Segment[i]= new HashEntry[],HashEntry[i]-->鏈表,即數(shù)組[數(shù)組[]]+鏈表
就是說和Segment[]數(shù)組中有每個(gè)下標(biāo)下有一個(gè)或者多個(gè)HashEntry[]數(shù)組,然后HashEntry的對(duì)象上面掛鏈表
public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
if (concurrencyLevel > MAX_SEGMENTS)
concurrencyLevel = MAX_SEGMENTS;
// Find power-of-two sizes best matching arguments
int sshift = 0;
int ssize = 1;
while (ssize < concurrencyLevel) {
++sshift;
ssize <<= 1;
}
this.segmentShift = 32 - sshift;
this.segmentMask = ssize - 1;
if (initialCapacity > MAXIMUM_CAPACITY)
initialCapacity = MAXIMUM_CAPACITY;
int c = initialCapacity / ssize;
if (c * ssize < initialCapacity)
++c;
int cap = MIN_SEGMENT_TABLE_CAPACITY;
while (cap < c)
cap <<= 1;
// create segments and segments[0]
Segment<K,V> s0 =
new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
(HashEntry<K,V>[])new HashEntry[cap]);
Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
this.segments = ss;
}
在ConcurrentHashMap的構(gòu)造函數(shù)里面 會(huì)通過并發(fā)級(jí)別計(jì)算出Segment[]的長(zhǎng)度,比如說默認(rèn)并發(fā)級(jí)別是16 DEFAULT_CONCURRENCY_LEVEL = 16;那么Segment[]就是大于16的2的n次冪,剛好是16.然后用容量initialCapacity一除,在取最大2的n次冪,算出每個(gè)Segment里面的HashEntry[cap]的長(zhǎng)度
通過上面的構(gòu)造方法可以發(fā)現(xiàn)和hashMap不同,ConcurrentHashMap在初始化的時(shí)候就把Segment[]new出來了,并且還用CAS操作(UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0])給Segment[0]也賦值一個(gè)HashEntry[cap];
我們?cè)倏纯磒ut方法
先根據(jù)key的hash值在Segment數(shù)組中找到相應(yīng)的位置i如果Segment[i]里面還未初始化,則通過CAS進(jìn)行賦值,有就直接取到Segment,接著執(zhí)行Segment對(duì)象的put方法,這個(gè)Segment是繼承了ReentrantLock的,關(guān)于ReentrantLock我們知道的,在上一篇AQS里我們就分析過
他這里沒有直接用lock(),Segment的put方法進(jìn)來后先trylock(),拿到鎖了就繼續(xù)執(zhí)行,沒有拿到就scanAndLockForPut()
private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
HashEntry<K,V> first = entryForHash(this, hash);
HashEntry<K,V> e = first;
HashEntry<K,V> node = null;
int retries = -1; // negative while locating node
while (!tryLock()) {
HashEntry<K,V> f; // to recheck first below
if (retries < 0) {
if (e == null) {
if (node == null) // speculatively create node
node = new HashEntry<K,V>(hash, key, value, null);
retries = 0;
}
else if (key.equals(e.key))
retries = 0;
else
e = e.next;
}
else if (++retries > MAX_SCAN_RETRIES) {
lock();
break;
}
else if ((retries & 1) == 0 &&
(f = entryForHash(this, hash)) != first) {
e = first = f; // re-traverse if entry changed
retries = -1;
}
}
return node;
}
在scanAndLockForPut方法中,while循環(huán)執(zhí)行tryLock()方法嘗試獲取鎖,在多處理器環(huán)境下,重復(fù)次數(shù)為64,單處理器重復(fù)次數(shù)為1,當(dāng)執(zhí)行tryLock()方法的次數(shù)超過上限時(shí),則執(zhí)行l(wèi)ock()方法掛起線程;
當(dāng)上一個(gè)線程執(zhí)行完插入操作時(shí),會(huì)通過finally{}的unlock()方法釋放鎖,釋放時(shí)喚醒等待線程繼續(xù)執(zhí)行 ;
和HashMap不一樣,ConcurrentMap的size()不是直接取的size,而是去統(tǒng)計(jì)的
public int size() {
// Try a few times to get accurate count. On failure due to
// continuous async changes in table, resort to locking.
final Segment<K,V>[] segments = this.segments;
int size;
boolean overflow; // true if size overflows 32 bits
long sum; // sum of modCounts
long last = 0L; // previous sum
int retries = -1; // first iteration isn't retry
try {
for (;;) {
if (retries++ == RETRIES_BEFORE_LOCK) {
for (int j = 0; j < segments.length; ++j)
ensureSegment(j).lock(); // force creation
}
sum = 0L;
size = 0;
overflow = false;
for (int j = 0; j < segments.length; ++j) {
Segment<K,V> seg = segmentAt(segments, j);
if (seg != null) {
sum += seg.modCount;
int c = seg.count;
if (c < 0 || (size += c) < 0)
overflow = true;
}
}
if (sum == last)
break;
last = sum;
}
} finally {
if (retries > RETRIES_BEFORE_LOCK) {
for (int j = 0; j < segments.length; ++j)
segmentAt(segments, j).unlock();
}
}
return overflow ? Integer.MAX_VALUE : size;
}
先采用不加鎖的方式,連續(xù)計(jì)算元素的個(gè)數(shù),如果前后兩次計(jì)算結(jié)果相同,則說明計(jì)算出來的元素個(gè)數(shù)是準(zhǔn)確的,直接返回;
如果前后兩次計(jì)算結(jié)果都不同,則給每個(gè)Segment進(jìn)行加鎖,再計(jì)算一次元素的個(gè)數(shù);
看看1.8的ConcurrentHashMap,去掉了Segment[];結(jié)構(gòu)和1.8的HashMap一樣,數(shù)組+鏈表+紅黑樹
也是在put的時(shí)候去new Node[],但是他用了CAS去保證多線程下不會(huì)new多個(gè)Node[]
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {
if ((sc = sizeCtl) < 0)//如果容量小于0,說明有線程進(jìn)入了下面的else if,然后該線程yield()讓出執(zhí)行權(quán)
Thread.yield(); // lost initialization race; just spin
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {//CAS把容量改為-1;
try {
if ((tab = table) == null || tab.length == 0) {
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];//new出Node[]
table = tab = nt;
sc = n - (n >>> 2);//把sizeCtl置為容量的3/4(0.75f)應(yīng)該是要當(dāng)做擴(kuò)容時(shí)的閾值用
}
} finally {
sizeCtl = sc;
}
break;
}
}
return tab;
}
put的時(shí)候
如果相應(yīng)位置的Node還未初始化,則通過CAS插入相應(yīng)的數(shù)據(jù);
如果相應(yīng)位置的Node不為空,且當(dāng)前該節(jié)點(diǎn)不處于移動(dòng)狀態(tài),則對(duì)該節(jié)點(diǎn)加synchronized鎖,如果該節(jié)點(diǎn)的hash不小于0,則遍歷鏈表更新節(jié)點(diǎn)或插入新節(jié)點(diǎn)
如果該節(jié)點(diǎn)是TreeBin類型的節(jié)點(diǎn),說明是紅黑樹結(jié)構(gòu),則通過putTreeVal方法往紅黑樹中插入節(jié)點(diǎn)
如果binCount不為0,說明put操作對(duì)數(shù)據(jù)產(chǎn)生了影響,如果當(dāng)前鏈表的個(gè)數(shù)達(dá)到8個(gè),則通過treeifyBin方法轉(zhuǎn)化為紅黑樹,如果oldVal不為空,說明是一次更新操作,沒有對(duì)元素個(gè)數(shù)產(chǎn)生影響,則直接返回舊值;
size();
1.8的size有記錄baseCount,put和remove的時(shí)候都會(huì)通過CAS去更新,除此之外還有CounterCell[],這個(gè)放沒來得及更新baseCount的記錄遍歷出來累加
關(guān)于ConcurrentSkipListMap 和 ConcurrentSkipListSet,這兩個(gè)和HashMap,HashSet差不多,HashSet就是通過HashMap實(shí)現(xiàn)的,只是所有的Value都是同一個(gè)object
ConcurrentSkipListMap 就是跳表,類似二叉樹,put的時(shí)候會(huì)構(gòu)建多級(jí)索引
https://juejin.im/post/5cf8c50ff265da1b76389223
Queue:https://juejin.im/post/5dee37ad51882512657ba41f
總結(jié):
HashMap本來就是線程不安全的,并發(fā)條件下建議使用ConcurrentHashMap,注意下1.7死循環(huán)的原因,1.7和1.8的結(jié)構(gòu)不同.
ConcruuentHashMap注意1.7和1.8的結(jié)構(gòu):1.7多了Sement[],鎖的是Sement,沒有紅黑樹;