一. ConcurrentHashMap
ConcurrentHashMap是線程安全且高效的HashMap巷懈。
- HashMap線程不安全;
- HashTable使用synchronized保證線程安全秕噪,但是效率非常低下离熏;
- ConcurrentHashMap使用鎖分段技術(shù)提升并發(fā)訪問率。
注意:
ConcurrentHashMap的線程安全指的是其內(nèi)部的方法線程安全(如get、put等),即單獨(dú)調(diào)用某個(gè)方法線程安全的。
當(dāng)代碼塊包含多個(gè)操作時(shí)狡逢,此代碼塊并不是線程安全的,因?yàn)椴荒鼙WC整個(gè)代碼塊是互斥執(zhí)行的拼卵,因此仍然需要同步處理(加鎖奢浑、double check等)。
1. ConcurrentHashMap的結(jié)構(gòu)
JDK1.7版本
HashMap由HashEntry數(shù)組組成腋腮,HashEntry是鏈表結(jié)構(gòu)雀彼。
ConcurrentHashMap由Segment數(shù)組組成壤蚜,Segment由HashEntry數(shù)組組成,HashEntry是鏈表結(jié)構(gòu)徊哑。
2. ConcurrentHashMap的初始化
ConcurrentHashMap初始化方法是通過initialCapacity袜刷、loadFactor和concurrencyLevel等幾個(gè)參數(shù)來初始化segment數(shù)組、段偏移量segmentShift莺丑、段掩碼segmentMask和每個(gè)segment里的HashEntry數(shù)組來實(shí)現(xiàn)的著蟹。
3. 定位Segment
ConcurrentHashMap使用分段鎖Segment來保護(hù)不同段的數(shù)據(jù),在插入和獲取元素的時(shí)候梢莽,必須先通過散列算法定位到Segment萧豆。
散列算法的目的:是元素盡可能均勻的分布在不同的Segment上,從而提高容器的存取效率昏名。
4. ConcurrentHashMap的操作
(1) get操作
- 對(duì)key的hashCode進(jìn)行一次再散列得到一個(gè)新的散列值涮雷;
- 根據(jù)新的散列值,利用散列運(yùn)算定位到Segment轻局;
- 再通過散列算法定位到元素洪鸭;
整個(gè)get過程不需要加鎖,除非讀到的值為空才會(huì)加鎖重讀嗽交。
(2) put操作
- 定位到Segment卿嘲;
- 判斷是否需要對(duì)Segment
(3) size操作
嘗試兩次不鎖住Segment直接統(tǒng)計(jì)各Segment大小的方法颂斜,如果兩次結(jié)果不相同夫壁,則采用加鎖的方式來統(tǒng)計(jì)所有Segment的大小。
二. ConcurrentLinkedQueue
ConcurrentLinkedQueue是一個(gè)基于鏈接節(jié)點(diǎn)的無界線程安全隊(duì)列沃疮,采用FIFO的規(guī)則對(duì)節(jié)點(diǎn)進(jìn)行排序盒让。
隊(duì)列前后分別用head節(jié)點(diǎn)和tail節(jié)點(diǎn)來定位,中間的節(jié)點(diǎn)通過next關(guān)聯(lián)起來司蔬。
1. 入隊(duì)列
入隊(duì)列就是將入隊(duì)節(jié)點(diǎn)添加到隊(duì)列的尾部邑茄。
- 將入隊(duì)節(jié)點(diǎn)設(shè)置成當(dāng)前隊(duì)列尾節(jié)點(diǎn)的下一個(gè)節(jié)點(diǎn);
- 更新tail節(jié)點(diǎn)俊啼,如果tail節(jié)點(diǎn)的next節(jié)點(diǎn)不為空肺缕,則將入隊(duì)節(jié)點(diǎn)設(shè)置為tail節(jié)點(diǎn),如果tail節(jié)點(diǎn)的next節(jié)點(diǎn)為空授帕,則將入隊(duì)節(jié)點(diǎn)設(shè)置為tail節(jié)點(diǎn)的next節(jié)點(diǎn)同木。
因此:tail節(jié)點(diǎn)并不一定就是尾節(jié)點(diǎn),尾節(jié)點(diǎn)可能是tail節(jié)點(diǎn)跛十,也可能是tail節(jié)點(diǎn)的next節(jié)點(diǎn)彤路。
2. 出隊(duì)列
出隊(duì)列就是從隊(duì)列中返回一個(gè)節(jié)點(diǎn)元素,并清空該節(jié)點(diǎn)對(duì)元素的引用芥映。
- 獲取head節(jié)點(diǎn)的元素洲尊,判斷是否為空
- 如果為空远豺,表示另一個(gè)線程已經(jīng)進(jìn)行了一次出隊(duì)列操作將該節(jié)點(diǎn)的元素取走,如果不為空坞嘀,則使用CAS將head節(jié)點(diǎn)設(shè)置為null并返回head節(jié)點(diǎn)的元素躯护。
三. 阻塞隊(duì)列
阻塞隊(duì)列的主要使用場景:生產(chǎn)者和消費(fèi)者。
四種處理方式:
處理方式 | 拋出異常 | 返回特殊值 | 一直阻塞 | 超時(shí)退出 |
---|---|---|---|---|
插入方法 | add(e) | offer(e) | put(e) | offer(e,time,unit) |
移除方法 | remove() | poll() | take() | poll(time,unit) |
檢查方法 | element() | peek() | 不可用 | 不可用 |
拋出異常:隊(duì)列滿時(shí)丽涩,繼續(xù)插入數(shù)據(jù)則拋出IllegalStateException榛做;隊(duì)列空時(shí),從隊(duì)列取數(shù)據(jù)則拋出NoSuchElementException内狸。
返回特殊值:插入元素時(shí)检眯,成功返回true;移除元素時(shí)昆淡,如果沒有則返回null锰瘸。
一直阻塞:隊(duì)列滿時(shí),繼續(xù)插入數(shù)據(jù)會(huì)一直阻塞生產(chǎn)者線程直到隊(duì)列可用或響應(yīng)中斷退出昂灵;隊(duì)列空時(shí)避凝,從隊(duì)列取數(shù)據(jù)會(huì)阻塞消費(fèi)者線程直到隊(duì)列不空。
超時(shí)退出:隊(duì)列滿時(shí)眨补,繼續(xù)插入數(shù)據(jù)會(huì)阻塞生產(chǎn)者線程一段時(shí)間管削,超過指定時(shí)間則退出。
1. Java里的阻塞隊(duì)列
(1) ArrayBlockingQueue
用數(shù)組實(shí)現(xiàn)的有界阻塞隊(duì)列撑螺,F(xiàn)IFO含思。
(2) LinkedBlockingQueue
用鏈表實(shí)現(xiàn)的有界阻塞隊(duì)列,默認(rèn)和最大長度為Integer.MAX_VALUE甘晤,F(xiàn)IFO含潘。
(3) PriorityBlockingQueue
支持優(yōu)先級(jí)的無界阻塞隊(duì)列。
(4) DelayQueue
支持延時(shí)獲取元素的無界阻塞隊(duì)列线婚,在創(chuàng)建元素時(shí)可指定延時(shí)遏弱,只有延遲期滿才能從隊(duì)列中提取元素。
(5) SynchronousQueue
不存儲(chǔ)元素的阻塞隊(duì)列塞弊,每一個(gè)put操作必須等待一個(gè)take操作漱逸,否則不能繼續(xù)添加元素。
(6) LinkedTransferQueue
由鏈表結(jié)構(gòu)組成的無界阻塞TransferQueue隊(duì)列游沿,主要多了tryTransfer和transfer方法饰抒。
transfer方法:如果當(dāng)前有消費(fèi)者正在等待接收元素,transfer方法可以把生產(chǎn)者傳入的元素立刻傳輸給消費(fèi)者奏候;如果沒有消費(fèi)者在等待接收元素循集,transfer方法將元素存放在隊(duì)列tail節(jié)點(diǎn),并等到該元素被消費(fèi)者消費(fèi)了才返回蔗草。
tryTransfer方法:如果當(dāng)前有消費(fèi)者正在等待接收元素咒彤,tryTransfer方法可以把生產(chǎn)者傳入的元素立刻傳輸給消費(fèi)者疆柔;如果沒有消費(fèi)者在等待接收元素,tryTransfer方法將元素存放在隊(duì)列tail節(jié)點(diǎn)镶柱,并返回false旷档。
(7) LinkedBlockingDeque
由鏈表結(jié)構(gòu)組成的雙向阻塞隊(duì)列,可以從隊(duì)列的兩端插入和移除元素歇拆。
2. 阻塞隊(duì)列的實(shí)現(xiàn)原理
使用等待 / 通知模式實(shí)現(xiàn)鞋屈。
public class ArrayBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
private final Condition notEmpty;
private final Condition notFull;
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}
private void enqueue(E x) {
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length)
putIndex = 0;
count++;
notEmpty.signal();
}
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
private E dequeue() {
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
notFull.signal();
return x;
}
}
當(dāng)生產(chǎn)者往滿的隊(duì)列中添加元素時(shí)會(huì)阻塞生產(chǎn)者(await),當(dāng)消費(fèi)者消費(fèi)了一個(gè)隊(duì)列中的元素后故觅,會(huì)通知生產(chǎn)者當(dāng)前隊(duì)列可用(signal)厂庇。
當(dāng)消費(fèi)者從空的隊(duì)列中取元素時(shí)會(huì)阻塞消費(fèi)者(await),當(dāng)生產(chǎn)者添加了一個(gè)元素后输吏,會(huì)通知消費(fèi)者當(dāng)前隊(duì)列可用(signal)权旷。
四. Fork/Join 框架
Fork/Join 框架是Java 7 提供的一個(gè)用于并行執(zhí)行任務(wù)的框架,把大任務(wù)分割成若干個(gè)小任務(wù)贯溅,最終匯總每個(gè)小任務(wù)結(jié)果后得到大任務(wù)結(jié)果拄氯。
- 分割任務(wù);
- 執(zhí)行任務(wù)并合并結(jié)果它浅。