1.ConcurrentHashMap
①為什么要使用ConcurrentHashMap
1)線程不安全的HashMap
多線程環(huán)境下,使用HashMap進(jìn)行put操作會引起死循環(huán)蛤吓,導(dǎo)致CPU利用率接近100%撼唾。
以下代碼會引起死循環(huán)(1.8之前)
final HashMap<String, String> map = new HashMap<>(2);
Thread t = new Thread(new Runnable() {
@Override
public void run() {
for (int i = 0; i < 10000; i++) {
new Thread(new Runnable() {
@Override
public void run() {
map.put(UUID.randomUUID().toString(), "");
}
}, "ftf" + i).start();
}
}
}, "ftf");
t.start();
t.join();
System.out.println("ok");
HashMap在并發(fā)執(zhí)行put操作時會引起死循環(huán)付鹿,是因為多線程會導(dǎo)致HashMap的Entry鏈表形成環(huán)形數(shù)據(jù)結(jié)構(gòu)涂乌,一單形成環(huán)形數(shù)據(jù)結(jié)構(gòu)催享,Entry的next節(jié)點永遠(yuǎn)不為空靠欢,就會產(chǎn)生死循環(huán)獲取Entry弥姻。
1.8之前HashMap在并發(fā)執(zhí)行put操作時,需要擴(kuò)容的時候會出現(xiàn)鏈表形成環(huán)形數(shù)據(jù)結(jié)構(gòu):1.7擴(kuò)容代碼
void transfer(Entry[] newTable, boolean rehash) {
int newCapacity = newTable.length;
for (Entry<K,V> e : table) {
while(null != e) {
Entry<K,V> next = e.next;//這一步被掛起掺涛,就可能出現(xiàn)環(huán)
if (rehash) {
e.hash = null == e.key ? 0 : hash(e.key);
}
int i = indexFor(e.hash, newCapacity);
e.next = newTable[i];
newTable[i] = e;
e = next;
}
}
}
假設(shè)原來table的a處,是 k1→k2→null疼进,假設(shè)所有的key計算出在新的table中的位置都是b
線程A:
e=k1薪缆;
next=k1.next=k2;
線程B:
e=k1;
next=k1.next=k2拣帽;k1.next=newTable[b]=null疼电;newTable[b]=k1;e=k2减拭;
next=k2.next=null蔽豺;k2.next=newTable[b]=k1;newTable[b]=k2拧粪;e=null修陡;
//現(xiàn)在順序是:k2→k1→null
線程A:
k1.next=newTable[b]=k2;newTable[b]=k1可霎;e=k2魄鸦;
//現(xiàn)在順序是:k1→k2→k1
next=k2.next=k1;k2.next=newTable[b]=k1癣朗;newTable[b]=k2拾因;e=k1;
next=k1.next=k2旷余;k1.next=newTable[b]=k2绢记;newTable[b]=k1;e=k2正卧;
next=k2.next=k1蠢熄;k2.next=newTable[b]=k1;newTable[b]=k2穗酥;e=k1护赊;
.......//形成死循環(huán)
猜測1.8不會形成死循環(huán),這里并不會出現(xiàn)反向更改Node的next引用的情況
else { // preserve order
Node<K,V> loHead = null, loTail = null;//原位置
Node<K,V> hiHead = null, hiTail = null;//新位置
Node<K,V> next;
do {
next = e.next;
if ((e.hash & oldCap) == 0) {//不需要改變位置
if (loTail == null)
loHead = e;
else
loTail.next = e;
loTail = e;
}
else {//需要改變位置
if (hiTail == null)
hiHead = e;
else
hiTail.next = e;
hiTail = e;
}
} while ((e = next) != null);
if (loTail != null) {
loTail.next = null;
newTab[j] = loHead;
}
if (hiTail != null) {
hiTail.next = null;
newTab[j + oldCap] = hiHead;
}
}
2)效率低下的HashTable
HashTable容器使用synchronized來保證線程安全砾跃,在線程競爭激烈的情況下HashTable的效率非常低下骏啰。
3)ConcurrentHashMap的鎖分段技術(shù)(1.8之前)可有效提升并發(fā)訪問率
1.8之前:容器中有多把鎖,每一把鎖用于鎖容器其中一部分?jǐn)?shù)據(jù)抽高,那么當(dāng)多線程訪問容器里不同數(shù)據(jù)段的數(shù)據(jù)時判耕,線程間就不會存在鎖競爭,從而可以有效提高并發(fā)訪問率翘骂,這個就是鎖分段技術(shù)壁熄。首先將數(shù)據(jù)分成一段一段地存儲,然后給每一段數(shù)據(jù)配一把鎖碳竟,草丧。
1.8:鎖粒度降低,如果形成鏈表莹桅,以鏈表第一個節(jié)點為synchronized的對象昌执。
②ConcurrentHashMap的結(jié)構(gòu)
在JDK1.7版本中,ConcurrentHashMap的數(shù)據(jù)結(jié)構(gòu)是由一個Segment數(shù)組和多個HashEntry組成:
JDK1.8的實現(xiàn)已經(jīng)摒棄了Segment的概念,而是直接用Node數(shù)組+鏈表+紅黑樹的數(shù)據(jù)結(jié)構(gòu)實現(xiàn)懂拾,并發(fā)控制使用synchronized和CAS來操作煤禽,整個看起來就像是優(yōu)化過且現(xiàn)場安全的HashMap,雖然在JDK1.8中還能看到Segment的數(shù)據(jù)結(jié)構(gòu),但是已經(jīng)簡化了屬性,只是為了兼容舊版本普气。
常量設(shè)計:
// node數(shù)組最大容量:2^30=1073741824
private static final int MAXIMUM_CAPACITY = 1 << 30;
// 默認(rèn)初始值,必須是2的幕數(shù)
private static final int DEFAULT_CAPACITY = 16;
//數(shù)組可能最大值选脊,需要與toArray()相關(guān)方法關(guān)聯(lián)
static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
//并發(fā)級別,遺留下來的栗涂,為兼容以前的版本
private static final int DEFAULT_CONCURRENCY_LEVEL = 16;
// 負(fù)載因子
private static final float LOAD_FACTOR = 0.75f;
// 鏈表轉(zhuǎn)紅黑樹閥值,> 8 鏈表轉(zhuǎn)換為紅黑樹
static final int TREEIFY_THRESHOLD = 8;
//樹轉(zhuǎn)鏈表閥值知牌,小于等于6(tranfer時,lc斤程、hc=0兩個計數(shù)器分別++記錄原bin角寸、新binTreeNode數(shù)量,<=UNTREEIFY_THRESHOLD 則untreeify(lo))
static final int UNTREEIFY_THRESHOLD = 6;
static final int MIN_TREEIFY_CAPACITY = 64;
private static final int MIN_TRANSFER_STRIDE = 16;
private static int RESIZE_STAMP_BITS = 16;
// 2^15-1忿墅,help resize的最大線程數(shù)
private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;
// 32-16=16扁藕,sizeCtl中記錄size大小的偏移量
private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;
// forwarding nodes的hash值
static final int MOVED = -1;
// 樹根節(jié)點的hash值
static final int TREEBIN = -2;
// ReservationNode的hash值
static final int RESERVED = -3;
// 可用處理器數(shù)量
static final int NCPU = Runtime.getRuntime().availableProcessors();
//存放node的數(shù)組
transient volatile Node<K,V>[] table;
/*控制標(biāo)識符,用來控制table的初始化和擴(kuò)容的操作疚脐,不同的值有不同的含義
*當(dāng)為負(fù)數(shù)時:-1代表正在初始化亿柑,-N代表有N-1個線程正在 進(jìn)行擴(kuò)容
*當(dāng)為0時:代表當(dāng)時的table還沒有被初始化
*當(dāng)為正數(shù)時:表示初始化或者下一次進(jìn)行擴(kuò)容的大小
private transient volatile int sizeCtl;
內(nèi)部一些數(shù)據(jù)結(jié)構(gòu):
Node是ConcurrentHashMap存儲結(jié)構(gòu)的基本單元,用于存儲數(shù)據(jù)棍弄,數(shù)據(jù)結(jié)構(gòu)就是一個鏈表望薄,但只允許對數(shù)據(jù)進(jìn)行查找,不允許進(jìn)行修改:
static class Node<K,V> implements Map.Entry<K,V> {
//鏈表的數(shù)據(jù)結(jié)構(gòu)
final int hash;
final K key;
//val和next都會在擴(kuò)容時發(fā)生變化呼畸,所以加上volatile來保持可見性和禁止重排序
volatile V val;
volatile Node<K,V> next;
Node(int hash, K key, V val, Node<K,V> next) {
this.hash = hash;
this.key = key;
this.val = val;
this.next = next;
}
public final K getKey() { return key; }
public final V getValue() { return val; }
public final int hashCode() { return key.hashCode() ^ val.hashCode(); }
public final String toString(){ return key + "=" + val; }
//不允許更新value
public final V setValue(V value) {
throw new UnsupportedOperationException();
}
public final boolean equals(Object o) {
Object k, v, u; Map.Entry<?,?> e;
return ((o instanceof Map.Entry) &&
(k = (e = (Map.Entry<?,?>)o).getKey()) != null &&
(v = e.getValue()) != null &&
(k == key || k.equals(key)) &&
(v == (u = val) || v.equals(u)));
}
//用于map中的get()方法痕支,子類重寫
Node<K,V> find(int h, Object k) {
Node<K,V> e = this;
if (k != null) {
do {
K ek;
if (e.hash == h &&
((ek = e.key) == k || (ek != null && k.equals(ek))))
return e;
} while ((e = e.next) != null);
}
return null;
}
}
TreeNode繼承與Node,但是數(shù)據(jù)結(jié)構(gòu)換成了二叉樹結(jié)構(gòu)蛮原,它是紅黑樹的數(shù)據(jù)的存儲結(jié)構(gòu)卧须,用于紅黑樹中存儲數(shù)據(jù),當(dāng)鏈表節(jié)點數(shù)大于8時會轉(zhuǎn)換成紅黑樹的結(jié)構(gòu)儒陨,他就是通過TreeNode作為存儲結(jié)構(gòu)代替Node來轉(zhuǎn)換成紅黑樹花嘶。
static final class TreeNode<K,V> extends Node<K,V> {
//樹形結(jié)構(gòu)的屬性定義
TreeNode<K,V> parent; // red-black tree links
TreeNode<K,V> left;
TreeNode<K,V> right;
TreeNode<K,V> prev; // needed to unlink next upon deletion
boolean red; //標(biāo)志紅黑樹的紅節(jié)點
TreeNode(int hash, K key, V val, Node<K,V> next,
TreeNode<K,V> parent) {
super(hash, key, val, next);
this.parent = parent;
}
Node<K,V> find(int h, Object k) {
return findTreeNode(h, k, null);
}
//根據(jù)key查找 從根節(jié)點開始找出相應(yīng)的TreeNode,
final TreeNode<K,V> findTreeNode(int h, Object k, Class<?> kc) {
if (k != null) {
TreeNode<K,V> p = this;
do {
int ph, dir; K pk; TreeNode<K,V> q;
TreeNode<K,V> pl = p.left, pr = p.right;
if ((ph = p.hash) > h)
p = pl;
else if (ph < h)
p = pr;
else if ((pk = p.key) == k || (pk != null && k.equals(pk)))
return p;
else if (pl == null)
p = pr;
else if (pr == null)
p = pl;
else if ((kc != null ||
(kc = comparableClassFor(k)) != null) &&
(dir = compareComparables(kc, k, pk)) != 0)
p = (dir < 0) ? pl : pr;
else if ((q = pr.findTreeNode(h, k, kc)) != null)
return q;
else
p = pl;
} while (p != null);
}
return null;
}
}
TreeBin存儲樹形結(jié)構(gòu)的容器蹦漠,樹形結(jié)構(gòu)就是指TreeNode椭员,所以TreeBin就是封裝TreeNode的容器,它提供轉(zhuǎn)換黑紅樹的一些條件和鎖的控制笛园。
static final class TreeBin<K,V> extends Node<K,V> {
//指向TreeNode列表和根節(jié)點
TreeNode<K,V> root;
volatile TreeNode<K,V> first;
volatile Thread waiter;
volatile int lockState;
// 讀寫鎖狀態(tài)
static final int WRITER = 1; // 獲取寫鎖的狀態(tài)
static final int WAITER = 2; // 等待寫鎖的狀態(tài)
static final int READER = 4; // 增加數(shù)據(jù)時讀鎖的狀態(tài)
/**
* 初始化紅黑樹
*/
TreeBin(TreeNode<K,V> b) {
super(TREEBIN, null, null, null);
this.first = b;
TreeNode<K,V> r = null;
for (TreeNode<K,V> x = b, next; x != null; x = next) {
next = (TreeNode<K,V>)x.next;
x.left = x.right = null;
if (r == null) {
x.parent = null;
x.red = false;
r = x;
}
else {
K k = x.key;
int h = x.hash;
Class<?> kc = null;
for (TreeNode<K,V> p = r;;) {
int dir, ph;
K pk = p.key;
if ((ph = p.hash) > h)
dir = -1;
else if (ph < h)
dir = 1;
else if ((kc == null &&
(kc = comparableClassFor(k)) == null) ||
(dir = compareComparables(kc, k, pk)) == 0)
dir = tieBreakOrder(k, pk);
TreeNode<K,V> xp = p;
if ((p = (dir <= 0) ? p.left : p.right) == null) {
x.parent = xp;
if (dir <= 0)
xp.left = x;
else
xp.right = x;
r = balanceInsertion(r, x);
break;
}
}
}
}
this.root = r;
assert checkInvariants(root);
}
......
}
③ConcurrentHashMap的初始化
JDK1.7 ConcurrentHashMap的初始化會通過位與運算來初始化Segment的大小拆撼,用ssize來標(biāo)識容劳,如下:
int sshift = 0;
int ssize = 1;
while (ssize < concurrencyLevel) {
++sshift;
ssize <<= 1;
}
concurrencyLevel的最大值是65535,這意味著segments數(shù)組的長度最大為65536闸度,對應(yīng)的二進(jìn)制是16位。
④ConcurrentHashMap的操作
參考:https://www.cnblogs.com/study-everyday/p/6430462.html
2.ConcurrentLinkedQueue
并發(fā)編程中蚜印,有時候需要使用線程安全的隊列莺禁。如果要實現(xiàn)一個線程安全的隊列有兩種方式:一種是使用阻塞算法,另一種是使用非阻塞算法窄赋。
使用阻塞算法的隊列哟冬,可以用一個鎖(入隊和出隊用同一把鎖)或兩個鎖(入隊和出隊用不同的鎖)等方式來實現(xiàn)。
非阻塞的實現(xiàn)方式則可以使用循環(huán)CAS的方式來實現(xiàn)忆绰。
ConcurrentLinkedQueue是使用非阻塞的方式來實現(xiàn)線程安全隊列的浩峡。它是一個基于鏈接節(jié)點的無界線程安全隊列,采用先進(jìn)先出的規(guī)則對節(jié)點進(jìn)行排序错敢,當(dāng)我們添加一個元素的時候翰灾,它會添加到隊列的尾部;當(dāng)我們獲取一個元素時稚茅,它會返回隊列頭部的元素纸淮。
①ConcurrentLinkedQueue的結(jié)構(gòu)
②入隊列
入隊列的過程
public boolean offer(E e) {
checkNotNull(e);
//入隊前,創(chuàng)建一個入隊節(jié)點
final Node<E> newNode = new Node<E>(e);
//死循環(huán)亚享,入隊不成功反復(fù)入隊
for (Node<E> t = tail, p = t;;) {
//創(chuàng)建一個指向tail節(jié)點的引用咽块,用p來標(biāo)識隊列的尾節(jié)點,默認(rèn)情況下等于tail節(jié)點欺税。
Node<E> q = p.next;//獲取p節(jié)點的下一個節(jié)點
if (q == null) {//p節(jié)點是尾節(jié)點
if (p.casNext(null, newNode)) {//設(shè)置p節(jié)點的next節(jié)點為入隊節(jié)點
if (p != t) // t已經(jīng)不是尾節(jié)點了侈沪,t=tail,p經(jīng)過循環(huán)后已經(jīng)改變晚凿。
casTail(t, newNode); // 更新tail節(jié)點亭罪,允許失敗
return true;
}
//另一個線程CAS成功了,重新讀取下一個
}
else if (p == q)
//我們已經(jīng)脫離了隊列晃虫。如果tail沒有變化皆撩,它也已經(jīng)脫離了隊列,在這種情況下哲银,我們需要跳到頭部扛吞,否則跳到尾部
p = (t != (t = tail)) ? t : head;
else
//檢查tail更新
p = (p != t && t != (t = tail)) ? t : q;
}
}
入隊列就是將入隊節(jié)點添加到隊列的尾部。tail節(jié)點不總是尾節(jié)點荆责。入隊方法永遠(yuǎn)返回true滥比,所以不要通過返回值判斷入隊是否成功。
在一個隊列中依次插入4個節(jié)點做院,示例:
- 添加元素1盲泛。隊列更新head節(jié)點的next節(jié)點為元素1節(jié)點濒持。又因為tail節(jié)點默認(rèn)情況下等于head節(jié)點,所以它們的next節(jié)點都指向元素1節(jié)點寺滚。
- 添加元素2柑营。隊列首先設(shè)置元素1節(jié)點的next節(jié)點為元素2節(jié)點,然后更新tail節(jié)點指向元素2節(jié)點村视。(會跳過1節(jié)點)官套。
- 添加元素3。設(shè)置tail節(jié)點的next節(jié)點為元素3節(jié)點蚁孔。
- 添加元素4奶赔。設(shè)置元素3的next節(jié)點為元素4節(jié)點,然后將tail節(jié)點指向元素4節(jié)點杠氢。
③出隊列
public E poll() {
restartFromHead:
for (;;) {
for (Node<E> h = head, p = h, q;;) {
E item = p.item;//獲取p節(jié)點的元素
//p節(jié)點的元素不為空站刑,使用CAS設(shè)置p節(jié)點引用的元素為null,如果CAS成功鼻百,返回p節(jié)點的元素
if (item != null && p.casItem(item, null)) {
if (p != h) // 一次跳躍兩個節(jié)點
updateHead(h, ((q = p.next) != null) ? q : p);
return item;
}
else if ((q = p.next) == null) {//如果p的下一個節(jié)點也為空绞旅,說明這個隊列已經(jīng)空了。
updateHead(h, p);
return null;
}
else if (p == q)
continue restartFromHead;
else
p = q;
}
}
}
出隊列的就是從隊列里返回一個幾點元素愕宋,并清空該節(jié)點對元素的引用玻靡。
3.Java中的阻塞隊列
①什么是阻塞隊列
阻塞隊列(BlockingQueue)是一個支持兩個附加操作的隊列。這兩個附加的操作支持阻塞的插入和移除方法中贝。
1)支持阻塞的插入方法:意思是當(dāng)隊列滿時囤捻,隊列會阻塞插入元素的線程,知道隊列不滿邻寿。
2)支持阻塞的移除方法:意思是在隊列為空時蝎土,獲取元素的線程會等待隊列變?yōu)榉强铡?/p>
阻塞隊列常用于生產(chǎn)者和消費者的場景,生產(chǎn)者是向隊列里添加元素的線程绣否,消費者是從隊列里取元素的線程誊涯。阻塞隊列就是生產(chǎn)者用來存放元素、消費者用來獲取元素的容器蒜撮。
在阻塞隊列不可用時暴构,這兩個附加操作提供了4種處理方式:
- 拋出異常:當(dāng)隊列滿時,如果再往隊列里插入元素段磨,會拋出IllegalStateException(“Queue full”)異常取逾。當(dāng)隊列空時,從隊列里獲取元素會拋出NoSuchElementException異常苹支。(AbstractQueue)
- 返回特殊值:往隊列里插入元素時砾隅,成功返回true。移除方法取出一個元素债蜜,如果沒有則返回null晴埂。
- 一直阻塞:當(dāng)阻塞隊列滿時究反,如果生產(chǎn)者線程往隊列里put元素,隊列會一直阻塞生產(chǎn)者線程儒洛,知道隊列可用或者響應(yīng)中斷退出精耐。當(dāng)隊列空時,如果消費者線程從隊列里take元素晶丘,隊列會阻塞住消費者線程黍氮,直到隊列不為空。
- 超時退出:當(dāng)阻塞隊列滿時浅浮,如果生產(chǎn)者線程往隊列里插入元素,隊列會阻塞生產(chǎn)者線程一點時間捷枯,如果超過了指定的時間滚秩,生產(chǎn)者線程就會退出。
注意:無界阻塞隊列淮捆,隊列不可能出現(xiàn)滿的情況郁油。
②Java里的阻塞隊列
- ArrayBlockingQueue:一個由數(shù)組結(jié)構(gòu)組成的有界阻塞隊列。
- LinkedBlockingQueue:一個由鏈表結(jié)構(gòu)組成的有界阻塞隊列攀痊。
- PriorityBlockingQueue:一個支持優(yōu)先級排序的無界阻塞隊列。
- DelayQueue:一個使用優(yōu)先級隊列實現(xiàn)的支持延時獲取元素的無界阻塞隊列苟径。
- SynchronousQueue:一個不存儲元素的阻塞隊列。
- LinkedTransferQueue:一個由鏈表結(jié)構(gòu)組成的無界阻塞隊列蟆盐。
- LinkedBlockingDeque:一個由鏈表結(jié)構(gòu)組成的雙向阻塞隊列。
1)ArrayBlockingQueue
是一個用數(shù)組實現(xiàn)的有界阻塞隊列遭殉。此隊列按照先進(jìn)先出(FIFO)的原則對元素進(jìn)行排序石挂。用重入鎖ReentrantLock來實現(xiàn)線程訪問隊列的公平性。
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
2)LinkedBlockingQueue
一個用來鏈表實現(xiàn)的有界阻塞隊列险污。此隊列的默認(rèn)和最大長度為Integer.MAX_VALUE痹愚。
3)PriorityBlockingQueue
一個支持優(yōu)先級的無界阻塞隊列。默認(rèn)情況下元素采取自然順序升序排列蛔糯。也可以自定義類實現(xiàn)compareTo()方法來指定元素排序規(guī)則拯腮,或者初始化PriorityBlockingQueue時,指定構(gòu)造參數(shù)Comparator來對元素進(jìn)行排序渤闷。不能保證同優(yōu)先級元素的順序疾瓮。
4)DelayQueue
一個支持延時獲取元素的無界阻塞隊列。隊列使用PriorityQueue來實現(xiàn)飒箭。隊列中的元素必須實現(xiàn)Delayed接口狼电,在創(chuàng)建元素時可以指定多久才能從隊列中獲取當(dāng)前元素蜒灰。只有在延遲期滿時才能從隊列中提取元素。
適用于以下場景:
- 緩存系統(tǒng)的設(shè)計:可以保存緩存元素的有效期肩碟,使用一個線程循環(huán)查詢DelayQueue强窖,一旦能從DelayQueue中獲取元素時,表示緩存有效期到了削祈。
- 定時任務(wù)調(diào)度:使用DelayQueue保存當(dāng)天將會執(zhí)行的任務(wù)和執(zhí)行時間翅溺,一旦從DelayQueue中獲取到任務(wù)就開始執(zhí)行,比如TimerQueue就是使用DelayQueue實現(xiàn)的髓抑。
a.如何實現(xiàn)Delayed接口
參考java.util.concurrent.ScheduledThreadPoolExecutor.ScheduledFutureTask類的實現(xiàn)咙崎,一共有三步:
-
在對象創(chuàng)建的時候,初始化基本數(shù)據(jù)吨拍。使用time記錄當(dāng)前對象延遲到什么時候可以使用褪猛,使用sequenceNumber來標(biāo)識元素在隊列中的先后順序伊滋。
private static final AtomicLong sequencer = new AtomicLong(0); ScheduledFutureTask(Runnable r, V result, long ns, long period) { super(r, result); this.time = ns; this.period = period; this.sequenceNumber = sequencer.getAndIncrement(); }
?
-
實現(xiàn)getDelay方法笑旺,該方法返回當(dāng)前元素還需要延時多長時間,單位是納秒物舒。當(dāng)time小于當(dāng)前時間時冠胯,getDelay會返回負(fù)數(shù)荠察。
public long getDelay(TimeUnit unit) { return unit.convert(time - now(), TimeUnit.NANOSECONDS); }
-
實現(xiàn)compareTo方法來指定元素的順序悉盆。例如,讓延時時間最長的放在隊列的末尾脚翘。
public int compareTo(Delayed other) { if (other == this) // compare zero ONLY if same object return 0; if (other instanceof ScheduledFutureTask) { ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other; long diff = time - x.time; if (diff < 0) return -1; else if (diff > 0) return 1; else if (sequenceNumber < x.sequenceNumber) return -1; else return 1; } long d = (getDelay(TimeUnit.NANOSECONDS) - other.getDelay(TimeUnit.NANOSECONDS)); return (d == 0) ? 0 : ((d < 0) ? -1 : 1); }
b.如何實現(xiàn)延時阻塞隊列
當(dāng)消費者從隊列里獲取元素時来农,如果元素沒有達(dá)到延時時間涩咖,就阻塞當(dāng)前線程檩互。
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
E first = q.peek();
if (first == null)
available.await();
else {
long delay = first.getDelay(TimeUnit.NANOSECONDS);
if (delay <= 0) //到時間了
return q.poll();
else if (leader != null) //leader是等待獲取隊列頭部元素的線程不為空,表示已經(jīng)有線程在等待獲取隊列的頭元素。
available.await();
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;//將當(dāng)前線程設(shè)置成leader
try {
available.awaitNanos(delay);//等待delay時間
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && q.peek() != null)
available.signal();
lock.unlock();
}
}
5)SynchronousQueue
一個不存儲元素的阻塞隊列村刨。每一個put操作必須等待一個take操作嵌牺,否則不能繼續(xù)添加元素逆粹。默認(rèn)采用非公平性策略訪問隊列僻弹。
public SynchronousQueue(boolean fair) {//true則等待的線程會采用先進(jìn)先出的順序訪問隊列
transferer = fair ? new TransferQueue() : new TransferStack();
}
SynchronousQueue負(fù)責(zé)把生產(chǎn)者線程處理的數(shù)據(jù)直接傳遞給消費者線程。隊列本身不存儲元素卸耘,適合傳遞性場景蚣抗。吞吐量高于LinkedBlockingQueue和ArrayBlockingQueue翰铡。
6)LinkedTransferQueue
一個由鏈表結(jié)構(gòu)組成的無界阻塞TransferQueue隊列网梢。相對于其他阻塞隊列战虏,多了tryTransfer和transfer方法烦感。
a.transfer方法
- 當(dāng)前有消費者正在等待接收元素(消費者使用take方法或帶時間限制的poll方法時)手趣,transfer方法可以把生產(chǎn)者傳入的元素liketransfer(傳輸)給消費者。
- 沒有消費者在等待接收元素中符,transfer方法會將元素采暖費在隊列的tail節(jié)點淀散,并等到該元素被消費者消費了才返回。
關(guān)鍵代碼如下:
Node pred = tryAppend(s, haveData);//試圖把存放當(dāng)前元素的s節(jié)點作為tail節(jié)點亚再。
return awaitMatch(s, pred, e, (how == TIMED), nanos);//讓CPU自旋等待消費者消費元素饲鄙。因為自旋會消耗CPU,所以自旋一定次數(shù)后使用Thread.yield()方法來暫停當(dāng)前正在執(zhí)行的線程轴咱,并執(zhí)行其他線程窖剑。
b.tryTransfer方法
用來試探生產(chǎn)者傳入的元素是否能直接傳遞給消費者西土。如果沒有消費者等待接收元素,則返回false肋乍。
和transfer方法的區(qū)別是無論消費者是否接收,方法立即返回锚烦。
tryTransfer(E e, long timeout, TimeUnit unit)谱煤,試圖把生產(chǎn)者傳入的元素直接傳遞給消費者。但如果沒有消費者消費該元素睹栖,則等待指定的時間再返回,如果超時還沒有消費元素曼氛,返回false舀患,超時時間內(nèi)消費了元素聊浅,返回true。
7)LinkedBlockingDeque
一個由鏈表結(jié)構(gòu)組成的雙向阻塞隊列顽冶∑劭梗可以從隊列的兩端插入和移出元素,多線程同時入隊時强重,減少了一半訂單競爭绞呈。相比其他阻塞隊列,多了addFirst竿屹、addLast报强、offerFirst、offerLast拱燃、peekFirst召嘶、peekLast等方法。
以first結(jié)尾的方法非竿,表示插入韧骗、獲取(peek)或移除雙端隊列的第一個元素。以last結(jié)尾的方法表示插入芦缰、獲扔伞(peek)或移除雙端隊列的最后一個元素栖疑。
另外锻霎,插入方法add等同于addLast昂勒,remove等效于removeFirst刁岸。
初始化時可以設(shè)置容量防止其過度膨脹合蔽。另外牲证,雙向阻塞隊列可以運用在“工作竊取”模式中包颁。
③阻塞隊列的實現(xiàn)原理
1)使用通知模式實現(xiàn)漠其。
當(dāng)生產(chǎn)者往滿的隊列里添加元素時會阻塞住生產(chǎn)者,當(dāng)消費者消費了一個隊列中的元素后萄涯,會通知生產(chǎn)者當(dāng)前隊列可用枣察。
private final Condition notEmpty;
private final Condition notFull;
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
private void enqueue(E x) {
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length)
putIndex = 0;
count++;
notEmpty.signal();
}
4.Fork/Join框架
①什么是Fork/Join框架
Fork/Join框架是Java 7 提供的一個用于并行執(zhí)行任務(wù)的框架,是一個把大任務(wù)分割成若干個小任務(wù)蕴掏,最終匯總每個小任務(wù)結(jié)果后得到大任務(wù)結(jié)果的框架即供。
②工作竊取算法
工作竊取(work-stealing)算法是指某個線程從其他隊列里竊取任務(wù)來執(zhí)行于微。
每個線程一個隊列逗嫡,當(dāng)前線程將任務(wù)執(zhí)行完,而其他線程對應(yīng)的隊列里還有任務(wù)等待處理株依。干完活的線程與其等著驱证,不如去幫其他線程干活。為了減少竊取任務(wù)線程和被竊取任務(wù)線程之間的競爭恋腕,通常會使用雙端隊列抹锄,被竊取任務(wù)線程永遠(yuǎn)從雙端隊列的頭部拿任務(wù)執(zhí)行,而竊取任務(wù)的線程永遠(yuǎn)從雙端隊列的尾部拿任務(wù)執(zhí)行吗坚。
工作竊取算法的優(yōu)點:充分利用線程進(jìn)行并行計算祈远,減少了線程間的競爭。
工作竊取算法的缺點:在某些情況下還是存在競爭商源,比如雙端隊列里只有一個任務(wù)時车份。并且該算法會消耗更多的系統(tǒng)資源,比如創(chuàng)建多個線程和多個雙端隊列牡彻。
③Fork/Join框架的設(shè)計
- 分割任務(wù)扫沼。需要有一個fork類來把大任務(wù)分割成子任務(wù),有可能子任務(wù)還是很大庄吼,所以還需要不停地分割缎除,直到分割出的子任務(wù)足夠小。
- 執(zhí)行任務(wù)并合并結(jié)果总寻。分割的子任務(wù)分別放在雙端隊列里器罐,然后幾個啟動線程分別從雙端隊列里獲取任務(wù)執(zhí)行。子任務(wù)執(zhí)行完的結(jié)果都統(tǒng)一放在一個隊列里渐行,啟動一個線程從隊列里拿數(shù)據(jù)轰坊,然后合并這些數(shù)據(jù)铸董。
Fork/Join框架使用兩個類來完成以上兩件事情:
1)ForkJoinTask:使用Fork/Join框架,先創(chuàng)建一個ForkJoin任務(wù)肴沫。它提供在任務(wù)中執(zhí)行fork和join操作的機(jī)制粟害。一般我們不需要直接繼承ForkJoinTask類,只需要繼承它的子類颤芬,F(xiàn)ork/Join框架提供了以下子類:
- RecursiveAction:用于沒有返回結(jié)果的任務(wù)悲幅。
- RecursiveTask:用于有返回結(jié)果的任務(wù)。
2)ForkJoinPool:ForkJoinTask需要通過ForkJoinPool來執(zhí)行站蝠。
任務(wù)分割出的子任務(wù)會添加到當(dāng)前工作線程所維護(hù)的雙端隊列中汰具,進(jìn)入隊列的頭部。當(dāng)一個工作線程的隊列里暫時沒有任務(wù)時沉衣,它會隨機(jī)從其他工作線程的隊列的尾部獲取一個任務(wù)郁副。
④使用Fork/Join框架
示例(計算1+2+3+4):
public class CountTask extends RecursiveTask<Integer> {
private static final int THRESHOLD = 2;//閾值
private int start;
private int end;
public CountTask(int start, int end) {
this.start = start;
this.end = end;
}
@Override
protected Integer compute() {
int sum = 0;
//如果任務(wù)足夠小就計算任務(wù)
boolean canCompute = (end - start) <= THRESHOLD;
if (canCompute) {
for (int i = start; i <= end; i++) {
sum += i;
}
} else {
//如果任務(wù)大于閾值,就分裂長兩個子任務(wù)計算
int middle = (start + end) /2;
CountTask leftTask = new CountTask(start, middle);
CountTask rightTask = new CountTask(middle + 1, end);
//執(zhí)行子任務(wù)
leftTask.fork();//執(zhí)行fork方法時豌习,又會進(jìn)入compute方法
rightTask.fork();
//等待子任務(wù)執(zhí)行完存谎,并得到其結(jié)果
Integer leftResult = leftTask.join();//join方法會等待子任務(wù)執(zhí)行完并得到其結(jié)果
Integer rightResult = rightTask.join();
//合并子任務(wù)
sum = leftResult + rightResult;
}
return sum;
}
public static void main(String[] args) {
ForkJoinPool forkJoinPool = new ForkJoinPool();
//生成一個計算任務(wù),負(fù)責(zé)計算
CountTask task = new CountTask(1,4);
//執(zhí)行一個任務(wù)
ForkJoinTask<Integer> result = forkJoinPool.submit(task);
try {
System.out.println(result.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
}
⑤Fork/Join框架的異常處理
ForkJoinTask在執(zhí)行的時候可能會拋出異常肥隆,但我們在主線程里沒辦法直接捕獲異常既荚,所以ForkJoinTask提供了isCompletedAbnormally()方法來檢查任務(wù)是否已經(jīng)拋出異常或已經(jīng)被取消了栋艳,并且可以通過ForkJoinTask的getException()方法獲取異常恰聘。
getException()返回Throwable對象,如果任務(wù)被取消了則返回CancellationException吸占。如果任務(wù)沒有完成或者沒有拋出異常則返回null晴叨。
if (result.isCompletedAbnormally()) {
System.out.println(result.getException());
}
⑥Fork/Join框架的實現(xiàn)原理(1.7)
ForkJoinPool由ForkJoinTask數(shù)組和ForkJoinWorkerThread數(shù)組組成,F(xiàn)orkJoinTask數(shù)組負(fù)責(zé)將存放程序提交給ForkJoinPool的任務(wù)矾屯,而ForkJoinWorkerThread數(shù)組負(fù)責(zé)執(zhí)行這些任務(wù)兼蕊。
1)ForkJoinTask的fork方法實現(xiàn)原理
public final ForkJoinTask<V> fork() {
((ForkJoinWorkerThread) Thread.currentThread())
.pushTask(this);//將當(dāng)前任務(wù)存放在ForkJoinTask數(shù)組隊列里。
return this;
}
final void pushTask(ForkJoinTask<?> t) {
ForkJoinTask<?>[] q; int s, m;
if ((q = queue) != null) { // ignore if queue removed
long u = (((s = queueTop) & (m = q.length - 1)) << ASHIFT) + ABASE;
UNSAFE.putOrderedObject(q, u, t);
queueTop = s + 1; // or use putOrderedInt
if ((s -= queueBase) <= 2)
pool.signalWork();//喚醒或創(chuàng)建一個工作線程來執(zhí)行任務(wù)件蚕。
else if (s == m)
growQueue();
}
}
2)ForkJoinTask的join方法實現(xiàn)原理
join方法主要作用是阻塞當(dāng)前線程并等待獲取結(jié)果孙技。
public final V join() {
//通過doJoin方法得到當(dāng)前任務(wù)的狀態(tài)
//任務(wù)狀態(tài)有4種:已完成(NORMAL)、被取消(CANCELLED)排作、信號(SIGNAL)牵啦、出現(xiàn)異常(EXCEPTIONAL)
if (doJoin() != NORMAL)
return reportResult();
else
return getRawResult(); //任務(wù)狀態(tài)是已完成,直接返回任務(wù)結(jié)果
}
private V reportResult() {
int s; Throwable ex;
if ((s = status) == CANCELLED) //任務(wù)狀態(tài)是被取消妄痪,拋出CancellationException
throw new CancellationException();
if (s == EXCEPTIONAL && (ex = getThrowableException()) != null) //任務(wù)狀態(tài)是拋出異常哈雏,則直接拋出對應(yīng)的異常
UNSAFE.throwException(ex);
return getRawResult();
}
private int doJoin() {
Thread t; ForkJoinWorkerThread w; int s; boolean completed;
if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) {
if ((s = status) < 0)
return s;//任務(wù)執(zhí)行完成,直接返回任務(wù)狀態(tài)
if ((w = (ForkJoinWorkerThread)t).unpushTask(this)) {//從任務(wù)數(shù)組里取出任務(wù)
try {
completed = exec();//執(zhí)行任務(wù)
} catch (Throwable rex) {
return setExceptionalCompletion(rex);//執(zhí)行出現(xiàn)異常,記錄異常裳瘪,并將任務(wù)狀態(tài)設(shè)置為EXCEPTIONAL
}
if (completed)
return setCompletion(NORMAL);//任務(wù)順利執(zhí)行完成履因,設(shè)置任務(wù)狀態(tài)為NORMAL
}
return w.joinTask(this);
}
else
return externalAwaitDone();
}