此篇博客所有源碼均來自JDK 1.8
前面三篇博客分別介紹了CyclicBarrier蒜埋、CountDownLatch、Semaphore啦吧,現(xiàn)在介紹并發(fā)工具類中的最后一個Exchange绿店。Exchange是最簡單的也是最復雜的,簡單在于API非常簡單铸史,就一個構造方法和兩個exchange()方法鼻疮,最復雜在于它的實現(xiàn)是最復雜的(反正我是看暈了的)。
在API是這么介紹的:可以在對中對元素進行配對和交換的線程的同步點琳轿。每個線程將條目上的某個方法呈現(xiàn)給 exchange 方法判沟,與伙伴線程進行匹配,并且在返回時接收其伙伴的對象崭篡。Exchanger 可能被視為 SynchronousQueue 的雙向形式挪哄。Exchanger 可能在應用程序(比如遺傳算法和管道設計)中很有用。
Exchanger琉闪,它允許在并發(fā)任務之間交換數(shù)據(jù)迹炼。具體來說,Exchanger類允許在兩個線程之間定義同步點颠毙。當兩個線程都到達同步點時斯入,他們交換數(shù)據(jù)結構,因此第一個線程的數(shù)據(jù)結構進入到第二個線程中蛀蜜,第二個線程的數(shù)據(jù)結構進入到第一個線程中刻两。
應用示例
Exchange實現(xiàn)較為復雜,我們先看其怎么使用滴某,然后再來分析其源碼“跄。現(xiàn)在我們用Exchange來模擬生產(chǎn)-消費者問題:
public class ExchangerTest {
static class Producer implements Runnable{
//生產(chǎn)者、消費者交換的數(shù)據(jù)結構
private List<String> buffer;
//步生產(chǎn)者和消費者的交換對象
private Exchanger<List<String>> exchanger;
Producer(List<String> buffer,Exchanger<List<String>> exchanger){
this.buffer = buffer;
this.exchanger = exchanger;
}
@Override
public void run() {
for(int i = 1 ; i < 5 ; i++){
System.out.println("生產(chǎn)者第" + i + "次提供");
for(int j = 1 ; j <= 3 ; j++){
System.out.println("生產(chǎn)者裝入" + i + "--" + j);
buffer.add("buffer:" + i + "--" + j);
}
System.out.println("生產(chǎn)者裝滿霎奢,等待與消費者交換...");
try {
exchanger.exchange(buffer);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
static class Consumer implements Runnable {
private List<String> buffer;
private final Exchanger<List<String>> exchanger;
public Consumer(List<String> buffer, Exchanger<List<String>> exchanger) {
this.buffer = buffer;
this.exchanger = exchanger;
}
@Override
public void run() {
for (int i = 1; i < 5; i++) {
//調(diào)用exchange()與消費者進行數(shù)據(jù)交換
try {
buffer = exchanger.exchange(buffer);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("消費者第" + i + "次提取");
for (int j = 1; j <= 3 ; j++) {
System.out.println("消費者 : " + buffer.get(0));
buffer.remove(0);
}
}
}
}
public static void main(String[] args){
List<String> buffer1 = new ArrayList<String>();
List<String> buffer2 = new ArrayList<String>();
Exchanger<List<String>> exchanger = new Exchanger<List<String>>();
Thread producerThread = new Thread(new Producer(buffer1,exchanger));
Thread consumerThread = new Thread(new Consumer(buffer2,exchanger));
producerThread.start();
consumerThread.start();
}
}
運行結果:
首先生產(chǎn)者Producer户誓、消費者Consumer首先都創(chuàng)建一個緩沖列表,通過Exchanger來同步交換數(shù)據(jù)幕侠。消費中通過調(diào)用Exchanger與生產(chǎn)者進行同步來獲取數(shù)據(jù)帝美,而生產(chǎn)者則通過for循環(huán)向緩存隊列存儲數(shù)據(jù)并使用exchanger對象消費者同步。到消費者從exchanger哪里得到數(shù)據(jù)后橙依,他的緩沖列表中有3個數(shù)據(jù)证舟,而生產(chǎn)者得到的則是一個空的列表硕旗。上面的例子充分展示了消費者-生產(chǎn)者是如何利用Exchanger來完成數(shù)據(jù)交換的。
在Exchanger中女责,如果一個線程已經(jīng)到達了exchanger節(jié)點時漆枚,對于它的伙伴節(jié)點的情況有三種:
- 如果它的伙伴節(jié)點在該線程到達之前已經(jīng)調(diào)用了exchanger方法,則它會喚醒它的伙伴然后進行數(shù)據(jù)交換抵知,得到各自數(shù)據(jù)返回墙基。
- 如果它的伙伴節(jié)點還沒有到達交換點,則該線程將會被掛起刷喜,等待它的伙伴節(jié)點到達被喚醒残制,完成數(shù)據(jù)交換。
- 如果當前線程被中斷了則拋出異常掖疮,或者等待超時了初茶,則拋出超時異常。
實現(xiàn)分析
Exchanger算法的核心是通過一個可交換數(shù)據(jù)的slot浊闪,以及一個可以帶有數(shù)據(jù)item的參與者恼布。源碼中的描述如下:
for (;;) {
if (slot is empty) { // offer
place item in a Node;
if (can CAS slot from empty to node) {
wait for release;
return matching item in node;
}
}
else if (can CAS slot from node to empty) { // release
get the item in node;
set matching item in node;
release waiting thread;
}
// else retry on CAS failure
}
Exchanger中定義了如下幾個重要的成員變量:
private final Participant participant;
private volatile Node[] arena;
private volatile Node slot;
participant的作用是為每個線程保留唯一的一個Node節(jié)點。
slot為單個槽搁宾,arena為數(shù)組槽折汞。他們都是Node類型。在這里可能會感覺到疑惑盖腿,slot作為Exchanger交換數(shù)據(jù)的場景爽待,應該只需要一個就可以了啊翩腐?為何還多了一個Participant 和數(shù)組類型的arena呢鸟款?一個slot交換場所原則上來說應該是可以的,但實際情況卻不是如此栗菜,多個參與者使用同一個交換場所時欠雌,會存在嚴重伸縮性問題。既然單個交換場所存在問題疙筹,那么我們就安排多個,也就是數(shù)組arena禁炒。通過數(shù)組arena來安排不同的線程使用不同的slot來降低競爭問題而咆,并且可以保證最終一定會成對交換數(shù)據(jù)。但是Exchanger不是一來就會生成arena數(shù)組來降低競爭幕袱,只有當產(chǎn)生競爭是才會生成arena數(shù)組暴备。那么怎么將Node與當前線程綁定呢?Participant 们豌,Participant 的作用就是為每個線程保留唯一的一個Node節(jié)點涯捻,它繼承ThreadLocal浅妆,同時在Node節(jié)點中記錄在arena中的下標index。
Node定義如下:
@sun.misc.Contended static final class Node {
int index; // Arena index
int bound; // Last recorded value of Exchanger.bound
int collides; // Number of CAS failures at current bound
int hash; // Pseudo-random for spins
Object item; // This thread's current item
volatile Object match; // Item provided by releasing thread
volatile Thread parked; // Set to this thread when parked, else null
}
- index:arena的下標障癌;
- bound:上一次記錄的Exchanger.bound凌外;
- collides:在當前bound下CAS失敗的次數(shù);
- hash:偽隨機數(shù)涛浙,用于自旋康辑;
- item:這個線程的當前項,也就是需要交換的數(shù)據(jù)轿亮;
- match:做releasing操作的線程傳遞的項疮薇;
- parked:掛起時設置線程值,其他情況下為null我注;
在Node定義中有兩個變量值得思考:bound以及collides按咒。前面提到了數(shù)組area是為了避免競爭而產(chǎn)生的,如果系統(tǒng)不存在競爭問題但骨,那么完全沒有必要開辟一個高效的arena來徒增系統(tǒng)的復雜性胖齐。首先通過單個slot的exchanger來交換數(shù)據(jù),當探測到競爭時將安排不同的位置的slot來保存線程Node嗽冒,并且可以確保沒有slot會在同一個緩存行上呀伙。如何來判斷會有競爭呢?CAS替換slot失敗添坊,如果失敗剿另,則通過記錄沖突次數(shù)來擴展arena的尺寸,我們在記錄沖突的過程中會跟蹤“bound”的值贬蛙,以及會重新計算沖突次數(shù)在bound的值被改變時雨女。這里闡述可能有點兒模糊,不著急阳准,我們先有這個概念氛堕,后面在arenaExchange中再次做詳細闡述。
我們直接看exchange()方法
exchange(V x)
exchange(V x):等待另一個線程到達此交換點(除非當前線程被中斷)野蝇,然后將給定的對象傳送給該線程讼稚,并接收該線程的對象。方法定義如下:
public V exchange(V x) throws InterruptedException {
Object v;
Object item = (x == null) ? NULL_ITEM : x; // translate null args
if ((arena != null ||
(v = slotExchange(item, false, 0L)) == null) &&
((Thread.interrupted() || // disambiguates null return
(v = arenaExchange(item, false, 0L)) == null)))
throw new InterruptedException();
return (v == NULL_ITEM) ? null : (V)v;
}
這個方法比較好理解:arena為數(shù)組槽绕沈,如果為null锐想,則執(zhí)行slotExchange()方法,否則判斷線程是否中斷乍狐,如果中斷值拋出InterruptedException異常赠摇,沒有中斷則執(zhí)行arenaExchange()方法。整套邏輯就是:如果slotExchange(Object item, boolean timed, long ns)方法執(zhí)行失敗了就執(zhí)行arenaExchange(Object item, boolean timed, long ns)方法,最后返回結果V藕帜。
NULL_ITEM 為一個空節(jié)點烫罩,其實就是一個Object對象而已,slotExchange()為單個slot交換洽故。
slotExchange(Object item, boolean timed, long ns)
private final Object slotExchange(Object item, boolean timed, long ns) {
// 獲取當前線程的節(jié)點 p
Node p = participant.get();
// 當前線程
Thread t = Thread.currentThread();
// 線程中斷贝攒,直接返回
if (t.isInterrupted())
return null;
// 自旋
for (Node q;;) {
//slot != null
if ((q = slot) != null) {
//嘗試CAS替換
if (U.compareAndSwapObject(this, SLOT, q, null)) {
Object v = q.item; // 當前線程的項,也就是交換的數(shù)據(jù)
q.match = item; // 做releasing操作的線程傳遞的項
Thread w = q.parked; // 掛起時設置線程值
// 掛起線程不為null收津,線程掛起
if (w != null)
U.unpark(w);
return v;
}
//如果失敗了饿这,則創(chuàng)建arena
//bound 則是上次Exchanger.bound
if (NCPU > 1 && bound == 0 &&
U.compareAndSwapInt(this, BOUND, 0, SEQ))
arena = new Node[(FULL + 2) << ASHIFT];
}
//如果arena != null,直接返回撞秋,進入arenaExchange邏輯處理
else if (arena != null)
return null;
else {
p.item = item;
if (U.compareAndSwapObject(this, SLOT, null, p))
break;
p.item = null;
}
}
/*
* 等待 release
* 進入spin+block模式
*/
int h = p.hash;
long end = timed ? System.nanoTime() + ns : 0L;
int spins = (NCPU > 1) ? SPINS : 1;
Object v;
while ((v = p.match) == null) {
if (spins > 0) {
h ^= h << 1; h ^= h >>> 3; h ^= h << 10;
if (h == 0)
h = SPINS | (int)t.getId();
else if (h < 0 && (--spins & ((SPINS >>> 1) - 1)) == 0)
Thread.yield();
}
else if (slot != p)
spins = SPINS;
else if (!t.isInterrupted() && arena == null &&
(!timed || (ns = end - System.nanoTime()) > 0L)) {
U.putObject(t, BLOCKER, this);
p.parked = t;
if (slot == p)
U.park(false, ns);
p.parked = null;
U.putObject(t, BLOCKER, null);
}
else if (U.compareAndSwapObject(this, SLOT, p, null)) {
v = timed && ns <= 0L && !t.isInterrupted() ? TIMED_OUT : null;
break;
}
}
U.putOrderedObject(p, MATCH, null);
p.item = null;
p.hash = h;
return v;
}
程序首先通過participant獲取當前線程節(jié)點Node长捧。檢測是否中斷,如果中斷return null吻贿,等待后續(xù)拋出InterruptedException異常串结。
如果slot不為null,則進行slot消除舅列,成功直接返回數(shù)據(jù)V肌割,否則失敗,則創(chuàng)建arena消除數(shù)組帐要。
如果slot為null把敞,但arena不為null,則返回null榨惠,進入arenaExchange邏輯奋早。
如果slot為null,且arena也為null赠橙,則嘗試占領該slot耽装,失敗重試,成功則跳出循環(huán)進入spin+block(自旋+阻塞)模式期揪。
在自旋+阻塞模式中掉奄,首先取得結束時間和自旋次數(shù)。如果match(做releasing操作的線程傳遞的項)為null凤薛,其首先嘗試spins+隨機次自旋(改自旋使用當前節(jié)點中的hash姓建,并改變之)和退讓。當自旋數(shù)為0后枉侧,假如slot發(fā)生了改變(slot != p)則重置自旋數(shù)并重試引瀑。否則假如:當前未中斷&arena為null&(當前不是限時版本或者限時版本+當前時間未結束):阻塞或者限時阻塞。假如:當前中斷或者arena不為null或者當前為限時版本+時間已經(jīng)結束:不限時版本:置v為null榨馁;限時版本:如果時間結束以及未中斷則TIMED_OUT;否則給出null(原因是探測到arena非空或者當前線程中斷)帜矾。
match不為空時跳出循環(huán)翼虫。
整個slotExchange清晰明了屑柔。
arenaExchange(Object item, boolean timed, long ns)
private final Object arenaExchange(Object item, boolean timed, long ns) {
Node[] a = arena;
Node p = participant.get();
for (int i = p.index;;) { // access slot at i
int b, m, c; long j; // j is raw array offset
Node q = (Node)U.getObjectVolatile(a, j = (i << ASHIFT) + ABASE);
if (q != null && U.compareAndSwapObject(a, j, q, null)) {
Object v = q.item; // release
q.match = item;
Thread w = q.parked;
if (w != null)
U.unpark(w);
return v;
}
else if (i <= (m = (b = bound) & MMASK) && q == null) {
p.item = item; // offer
if (U.compareAndSwapObject(a, j, null, p)) {
long end = (timed && m == 0) ? System.nanoTime() + ns : 0L;
Thread t = Thread.currentThread(); // wait
for (int h = p.hash, spins = SPINS;;) {
Object v = p.match;
if (v != null) {
U.putOrderedObject(p, MATCH, null);
p.item = null; // clear for next use
p.hash = h;
return v;
}
else if (spins > 0) {
h ^= h << 1; h ^= h >>> 3; h ^= h << 10; // xorshift
if (h == 0) // initialize hash
h = SPINS | (int)t.getId();
else if (h < 0 && // approx 50% true
(--spins & ((SPINS >>> 1) - 1)) == 0)
Thread.yield(); // two yields per wait
}
else if (U.getObjectVolatile(a, j) != p)
spins = SPINS; // releaser hasn't set match yet
else if (!t.isInterrupted() && m == 0 &&
(!timed ||
(ns = end - System.nanoTime()) > 0L)) {
U.putObject(t, BLOCKER, this); // emulate LockSupport
p.parked = t; // minimize window
if (U.getObjectVolatile(a, j) == p)
U.park(false, ns);
p.parked = null;
U.putObject(t, BLOCKER, null);
}
else if (U.getObjectVolatile(a, j) == p &&
U.compareAndSwapObject(a, j, p, null)) {
if (m != 0) // try to shrink
U.compareAndSwapInt(this, BOUND, b, b + SEQ - 1);
p.item = null;
p.hash = h;
i = p.index >>>= 1; // descend
if (Thread.interrupted())
return null;
if (timed && m == 0 && ns <= 0L)
return TIMED_OUT;
break; // expired; restart
}
}
}
else
p.item = null; // clear offer
}
else {
if (p.bound != b) { // stale; reset
p.bound = b;
p.collides = 0;
i = (i != m || m == 0) ? m : m - 1;
}
else if ((c = p.collides) < m || m == FULL ||
!U.compareAndSwapInt(this, BOUND, b, b + SEQ + 1)) {
p.collides = c + 1;
i = (i == 0) ? m : i - 1; // cyclically traverse
}
else
i = m + 1; // grow
p.index = i;
}
}
}
首先通過participant取得當前節(jié)點Node,然后根據(jù)當前節(jié)點Node的index去取arena中相對應的節(jié)點node珍剑。前面提到過arena可以確保不同的slot在arena中是不會相沖突的掸宛,那么是怎么保證的呢?我們先看arena的創(chuàng)建:
arena = new Node[(FULL + 2) << ASHIFT];
這個arena到底有多大呢招拙?我們先看FULL 和ASHIFT的定義:
static final int FULL = (NCPU >= (MMASK << 1)) ? MMASK : NCPU >>> 1;
private static final int ASHIFT = 7;
private static final int NCPU = Runtime.getRuntime().availableProcessors();
private static final int MMASK = 0xff; // 255
假如我的機器NCPU = 8 唧瘾,則得到的是768大小的arena數(shù)組。然后通過以下代碼取得在arena中的節(jié)點:
Node q = (Node)U.getObjectVolatile(a, j = (i << ASHIFT) + ABASE);
他仍然是通過右移ASHIFT位來取得Node的别凤,ABASE定義如下:
Class<?> ak = Node[].class;
ABASE = U.arrayBaseOffset(ak) + (1 << ASHIFT);
U.arrayBaseOffset獲取對象頭長度饰序,數(shù)組元素的大小可以通過unsafe.arrayIndexScale(T[].class) 方法獲取到。這也就是說要訪問類型為T的第N個元素的話规哪,你的偏移量offset應該是arrayOffset+N*arrayScale求豫。也就是說BASE = arrayOffset+ 128 。其次我們再看Node節(jié)點的定義
@sun.misc.Contended static final class Node{
....
}
在Java 8 中我們是可以利用sun.misc.Contended來規(guī)避偽共享的诉稍。所以說通過 << ASHIFT方式加上sun.misc.Contended蝠嘉,所以使得任意兩個可用Node不會再同一個緩存行中。
關于偽共享請參考如下博文:
偽共享(False Sharing)](http://ifeve.com/falsesharing/))
Java8中用sun.misc.Contended避免偽共享(false sharing)](http://blog.csdn.net/aigoogle/article/details/41518369))
我們再次回到arenaExchange()杯巨。取得arena中的node節(jié)點后蚤告,如果定位的節(jié)點q 不為空顽分,且CAS操作成功染突,則交換數(shù)據(jù)怕犁,返回交換的數(shù)據(jù)朋腋,喚醒等待的線程丰滑。
如果q等于null且下標在bound & MMASK范圍之內(nèi)瓜饥,則嘗試占領該位置诊笤,如果成功嫡锌,則采用自旋 + 阻塞的方式進行等待交換數(shù)據(jù)镜会。
如果下標不在bound & MMASK范圍之內(nèi)獲取由于q不為null但是競爭失敗的時候:消除p檬寂。加入bound 不等于當前節(jié)點的bond(b != p.bound),則更新p.bound = b戳表,collides = 0 桶至,i = m或者m - 1。如果沖突的次數(shù)不到m 獲取m 已經(jīng)為最大值或者修改當前bound的值失敗匾旭,則通過增加一次collides以及循環(huán)遞減下標i的值镣屹;否則更新當前bound的值成功:我們令i為m+1即為此時最大的下標。最后更新當前index的值价涝。
Exchanger使用女蜈、原理都比較好理解,但是這個源碼看起來真心有點兒復雜,
是真心難看懂伪窖,但是這種交換的思路Doug Lea在后續(xù)博文中還會提到逸寓,例如SynchronousQueue、LinkedTransferQueue覆山。
最后用一個在網(wǎng)上看到的段子結束此篇博客(http://brokendreams.iteye.com/blog/2253956)竹伸,博主對其做了一點點修改,以便更加符合在1.8環(huán)境下的Exchanger:
其實就是"我"和"你"(可能有多個"我"簇宽,多個"你")在一個叫Slot的地方做交易(一手交錢勋篓,一手交貨),過程分以下步驟:
- 我先到一個叫做Slot的交易場所交易魏割,發(fā)現(xiàn)你已經(jīng)到了譬嚣,那我就嘗試喊你交易,如果你回應了我见妒,決定和我交易那么進入第2步孤荣;如果別人搶先一步把你喊走了,那我就進入第5步须揣。
- 我拿出錢交給你盐股,你可能會接收我的錢,然后把貨給我耻卡,交易結束疯汁;也可能嫌我掏錢太慢(超時)或者接個電話(中斷),TM的不賣了卵酪,走了幌蚊,那我只能再找別人買貨了(從頭開始)。
- 我到交易地點的時候溃卡,你不在溢豆,那我先嘗試把這個交易點給占了(一屁股做凳子上...),如果我成功搶占了單間(交易點)瘸羡,那就坐這兒等著你拿貨來交易漩仙,進入第4步;如果被別人搶座了犹赖,那我只能在找別的地方兒了队他,進入第5步。
- 你拿著貨來了峻村,喊我交易麸折,然后完成交易;也可能我等了好長時間你都沒來粘昨,我不等了垢啼,繼續(xù)找別人交易去窜锯,走的時候我看了一眼,一共沒多少人膊夹,弄了這么多單間(交易地點Slot)衬浑,太TM浪費了捌浩,我喊來交易地點管理員:一共也沒幾個人放刨,搞這么多單間兒干毛,給哥撤一個尸饺!进统。然后再找別人買貨(從頭開始);或者我老大給我打了個電話浪听,不讓我買貨了(中斷)螟碎。
- 我跑去喊管理員,尼瑪迹栓,就一個坑交易個毛啊掉分,然后管理在一個更加開闊的地方開辟了好多個單間,然后我就挨個來看每個單間是否有人克伊。如果有人我就問他是否可以交易酥郭,如果回應了我,那我就進入第2步愿吹。如果我沒有人不从,那我就占著這個單間等其他人來交易,進入第4步犁跪。
6.如果我嘗試了幾次都沒有成功椿息,我就會認為,是不是我TM選的這個單間風水不好坷衍?不行寝优,得換個地兒繼續(xù)(從頭開始);如果我嘗試了多次發(fā)現(xiàn)還沒有成功枫耳,怒了乏矾,把管理員喊來:給哥再開一個單間(Slot),加一個凳子嘉涌,這么多人就這么幾個破凳子夠誰用妻熊!