實(shí)現(xiàn)原理
Exchanger(交換者)是用于線程協(xié)作的工具類。Exchanger用于進(jìn)行兩個線程之間的數(shù)據(jù)交換改备。它提供一個同步點(diǎn)唬复,在這個同步點(diǎn)矗积,兩個線程可以交換彼此的數(shù)據(jù)。這兩個線程通過exchange()方法交換數(shù)據(jù)敞咧,當(dāng)一個線程先執(zhí)行exchange()方法后棘捣,它會一直等待第二個線程也執(zhí)行exchange()方法,當(dāng)這兩個線程到達(dá)同步點(diǎn)時休建,這兩個線程就可以交換數(shù)據(jù)了乍恐。
Exchanger的算法核心是通過一個可以交換數(shù)據(jù)的slot和一個可以帶有數(shù)據(jù)item的參與者,在源碼中的定義如下:
for (;;) {
if (slot is empty) { // offer
// slot為空時测砂,將item 設(shè)置到Node 中
place item in a Node;
if (can CAS slot from empty to node) {
// 當(dāng)將node通過CAS交換到slot中時茵烈,掛起線程等待被喚醒
wait for release;
// 被喚醒后返回node中匹配到的item
return matching item in node;
}
} else if (can CAS slot from node to empty) { // release
// 將slot設(shè)置為空
// 獲取node中的item,將需要交換的數(shù)據(jù)設(shè)置到匹配的item
get the item in node;
set matching item in node;
// 喚醒等待的線程
release waiting thread;
}
// else retry on CAS failure
}
比如有2條線程A和B砌些,A線程交換數(shù)據(jù)時呜投,發(fā)現(xiàn)slot為空,則將需要交換的數(shù)據(jù)放在slot中等待其它線程進(jìn)來交換數(shù)據(jù)存璃,等線程B進(jìn)來宙彪,讀取A設(shè)置的數(shù)據(jù),然后設(shè)置線程B需要交換的數(shù)據(jù)有巧,然后喚醒A線程释漆,原理就是這么簡單。但是當(dāng)多個線程之間進(jìn)行交換數(shù)據(jù)時就會出現(xiàn)問題篮迎,所以Exchanger加入了slot數(shù)組男图。
Exchanger中定義了幾個重要的成員變量,它們分別是:
private final Participant participant;
private volatile Node[] arena;
private volatile Node slot;
participant的作用是為每個線程保留唯一的一個Node節(jié)點(diǎn)。slot為單個槽甜橱,arena為數(shù)組槽逊笆。他們都是Node類型。這里arena存在的意義是當(dāng)有多個參與者使用同一個交換場所時岂傲,會存在嚴(yán)重伸縮性問題难裆。既然單個交換場所存在問題,那么我們就安排多個,也就是數(shù)組arena乃戈。通過數(shù)組arena來安排不同的線程使用不同的slot來降低競爭問題褂痰,并且可以保證最終一定會成對交換數(shù)據(jù)。但是Exchanger不是一來就會生成arena數(shù)組來降低競爭症虑,只有當(dāng)產(chǎn)生競爭時才會生成arena數(shù)組缩歪。那么怎么將Node與當(dāng)前線程綁定呢?Participant 谍憔,Participant 的作用就是為每個線程保留唯一的一個Node節(jié)點(diǎn)匪蝙,它繼承ThreadLocal,同時在Node節(jié)點(diǎn)中記錄在arena中的下標(biāo)index
Node的數(shù)據(jù)結(jié)構(gòu)如下:
@sun.misc.Contended static final class Node {
// arena的下標(biāo)习贫,多個槽位的時候利用
int index;
// 上一次記錄的Exchanger.bound
int bound;
// 在當(dāng)前bound下CAS失敗的次數(shù)逛球;
int collides;
// 用于自旋;
int hash;
// 這個線程的當(dāng)前項(xiàng)苫昌,也就是需要交換的數(shù)據(jù)需忿;
Object item;
//做releasing操作的線程傳遞的項(xiàng);
volatile Object match;
//掛起時設(shè)置線程值蜡歹,其他情況下為null屋厘;
volatile Thread parked;
}
Exchanger的核心方法為exchange(V x)
,下面我們就來分析下exchange(V x)
方法月而。
exchange(V x)方法
exchange(V x):等待另一個線程到達(dá)此交換點(diǎn)(除非當(dāng)前線程被中斷)汗洒,然后將給定的對象傳送給該線程,并接收該線程的對象父款。方法定義如下:
public V exchange(V x) throws InterruptedException {
Object v;
// 當(dāng)參數(shù)為null時需要將item設(shè)置為空的對象
Object item = (x == null) ? NULL_ITEM : x; // translate null args
// 注意到這里的這個表達(dá)式是整個方法的核心
if ((arena != null ||
(v = slotExchange(item, false, 0 L)) == null) &&
((Thread.interrupted() || // disambiguates null return
(v = arenaExchange(item, false, 0 L)) == null)))
throw new InterruptedException();
return (v == NULL_ITEM) ? null : (V) v;
}
仔細(xì)分析上述方法中的if語句溢谤,可以得知:
- 只有當(dāng)arena 為空時,才執(zhí)行slotExchange(item, false, 0 L)方法憨攒。
- 當(dāng)arena不為空時世杀,或者(arena為null且slotExchange方法返回null)時,此時線程未中斷肝集,才會執(zhí)行arenaExchange方法;
- 線程中斷時瞻坝,就直接拋出線程中斷異常。
下面我們再看看slotExchange方法杏瞻,其定義如下:
private final Object slotExchange(Object item, boolean timed, long ns) {
// 獲取當(dāng)前線程node對象
Node p = participant.get();
// 當(dāng)前線程
Thread t = Thread.currentThread();
// 若果線程被中斷所刀,就直接返回null
if (t.isInterrupted()) // preserve interrupt status so caller can recheck
return null;
// 自旋
for (Node q;;) {
// 將slot值賦給q
if ((q = slot) != null) {
// slot 不為null,即表示已有線程已經(jīng)把需要交換的數(shù)據(jù)設(shè)置在slot中了
// 通過CAS將slot設(shè)置成null
if (U.compareAndSwapObject(this, SLOT, q, null)) {
// CAS操作成功后捞挥,將slot中的item賦值給對象v浮创,以便返回。
// 這里也是就讀取之前線程要交換的數(shù)據(jù)
Object v = q.item;
// 將當(dāng)前線程需要交給的數(shù)據(jù)設(shè)置在q中的match
q.match = item;
// 獲取被掛起的線程
Thread w = q.parked;
if (w != null)
// 如果線程不為null砌函,喚醒它
U.unpark(w);
// 返回其他線程給的V
return v;
}
// create arena on contention, but continue until slot null
// CAS 操作失敗斩披,表示有其它線程競爭溜族,在此線程之前將數(shù)據(jù)已取走
// NCPU:CPU的核數(shù)
// bound == 0 表示arena數(shù)組未初始化過,CAS操作bound將其增加SEQ
if (NCPU > 1 && bound == 0 &&
U.compareAndSwapInt(this, BOUND, 0, SEQ))
// 初始化arena數(shù)組
arena = new Node[(FULL + 2) << ASHIFT];
}
// 上面分析過垦沉,只有當(dāng)arena不為空才會執(zhí)行slotExchange方法的
// 所以表示剛好已有其它線程加入進(jìn)來將arena初始化
else if (arena != null)
// 這里就需要去執(zhí)行arenaExchange
return null; // caller must reroute to arenaExchange
else {
// 這里表示當(dāng)前線程是以第一個線程進(jìn)來交換數(shù)據(jù)
// 或者表示之前的數(shù)據(jù)交換已進(jìn)行完畢煌抒,這里可以看作是第一個線程
// 將需要交換的數(shù)據(jù)先存放在當(dāng)前線程變量p中
p.item = item;
// 將需要交換的數(shù)據(jù)通過CAS設(shè)置到交換區(qū)slot
if (U.compareAndSwapObject(this, SLOT, null, p))
// 交換成功后跳出自旋
break;
// CAS操作失敗,表示有其它線程剛好先于當(dāng)前線程將數(shù)據(jù)設(shè)置到交換區(qū)slot
// 將當(dāng)前線程變量中的item設(shè)置為null乡话,然后自旋獲取其它線程存放在交換區(qū)slot的數(shù)據(jù)
p.item = null;
}
}
// await release
// 執(zhí)行到這里表示當(dāng)前線程已將需要的交換的數(shù)據(jù)放置于交換區(qū)slot中了摧玫,
// 等待其它線程交換數(shù)據(jù)然后喚醒當(dāng)前線程
int h = p.hash;
long end = timed ? System.nanoTime() + ns : 0 L;
// 自旋次數(shù)
int spins = (NCPU > 1) ? SPINS : 1;
Object v;
// 自旋等待直到p.match不為null耳奕,也就是說等待其它線程將需要交換的數(shù)據(jù)放置于交換區(qū)slot
while ((v = p.match) == null) {
// 下面的邏輯主要是自旋等待绑青,直到spins遞減到0為止
if (spins > 0) {
h ^= h << 1;
h ^= h >>> 3;
h ^= h << 10;
if (h == 0)
h = SPINS | (int) t.getId();
else if (h < 0 && (--spins & ((SPINS >>> 1) - 1)) == 0)
Thread.yield();
} else if (slot != p)
spins = SPINS;
// 此處表示未設(shè)置超時或者時間未超時
else if (!t.isInterrupted() && arena == null &&
(!timed || (ns = end - System.nanoTime()) > 0 L)) {
// 設(shè)置線程t被當(dāng)前對象阻塞
U.putObject(t, BLOCKER, this);
// 給p掛機(jī)線程的值賦值
p.parked = t;
if (slot == p)
// 如果slot還沒有被置為null,也就表示暫未有線程過來交換數(shù)據(jù)屋群,需要將當(dāng)前線程掛起
U.park(false, ns);
// 線程被喚醒闸婴,將被掛起的線程設(shè)置為null
p.parked = null;
// 設(shè)置線程t未被任何對象阻塞
U.putObject(t, BLOCKER, null);
// 不是以上條件時(可能是arena已不為null或者超時)
} else if (U.compareAndSwapObject(this, SLOT, p, null)) {
// arena不為null則v為null,其它為超時則v為超市對象TIMED_OUT,并且跳出循環(huán)
v = timed && ns <= 0 L && !t.isInterrupted() ? TIMED_OUT : null;
break;
}
}
// 取走match值芍躏,并將p中的match置為null
U.putOrderedObject(p, MATCH, null);
// 設(shè)置item為null
p.item = null;
p.hash = h;
// 返回交換值
return v;
}
再來看arenaExchange方法邪乍,此方法被執(zhí)行時表示多個線程進(jìn)入交換區(qū)交換數(shù)據(jù),arena數(shù)組已被初始化对竣,此方法中的一些處理方式和slotExchange比較類似庇楞,它是通過遍歷arena數(shù)組找到需要交換的數(shù)據(jù)。arenaExchange方法源碼定義如下:
// timed 為true表示設(shè)置了超時時間否纬,ns為>0的值吕晌,反之沒有設(shè)置超時時間
private final Object arenaExchange(Object item, boolean timed, long ns) {
Node[] a = arena;
// 獲取當(dāng)前線程中的存放的node
Node p = participant.get();
//index初始值0
for (int i = p.index;;) { // access slot at i
// 遍歷,如果在數(shù)組中找到數(shù)據(jù)則直接交換并喚醒線程临燃,如未找到則將需要交換給其它線程的數(shù)據(jù)放置于數(shù)組中
int b, m, c;
long j; // j is raw array offset
// 其實(shí)這里就是向右遍歷數(shù)組睛驳,只是用到了元素在內(nèi)存偏移的偏移量
// q實(shí)際為arena數(shù)組偏移(i + 1) * 128個地址位上的node
Node q = (Node) U.getObjectVolatile(a, j = (i << ASHIFT) + ABASE);
// 如果q不為null,并且CAS操作成功膜廊,將下標(biāo)j的元素置為null
if (q != null && U.compareAndSwapObject(a, j, q, null)) {
// 表示當(dāng)前線程已發(fā)現(xiàn)有交換的數(shù)據(jù)乏沸,然后獲取數(shù)據(jù),喚醒等待的線程
Object v = q.item; // release
q.match = item;
Thread w = q.parked;
if (w != null)
U.unpark(w);
return v;
// q 為null 并且 i 未超過數(shù)組邊界
} else if (i <= (m = (b = bound) & MMASK) && q == null) {
// 將需要給其它線程的item賦予給p中的item
p.item = item; // offer
if (U.compareAndSwapObject(a, j, null, p)) {
// 交換成功
long end = (timed && m == 0) ? System.nanoTime() + ns : 0 L;
Thread t = Thread.currentThread(); // wait
// 自旋直到有其它線程進(jìn)入爪瓜,遍歷到該元素并與其交換蹬跃,同時當(dāng)前線程被喚醒
for (int h = p.hash, spins = SPINS;;) {
Object v = p.match;
if (v != null) {
// 其它線程設(shè)置的需要交換的數(shù)據(jù)match不為null
// 將match設(shè)置null,item設(shè)置為null
U.putOrderedObject(p, MATCH, null);
p.item = null; // clear for next use
p.hash = h;
return v;
} else if (spins > 0) {
h ^= h << 1;
h ^= h >>> 3;
h ^= h << 10; // xorshift
if (h == 0) // initialize hash
h = SPINS | (int) t.getId();
else if (h < 0 && // approx 50% true
(--spins & ((SPINS >>> 1) - 1)) == 0)
Thread.yield(); // two yields per wait
} else if (U.getObjectVolatile(a, j) != p)
// 和slotExchange方法中的類似,arena數(shù)組中的數(shù)據(jù)已被CAS設(shè)置
// match值還未設(shè)置铆铆,讓其再自旋等待match被設(shè)置
spins = SPINS; // releaser hasn't set match yet
else if (!t.isInterrupted() && m == 0 &&
(!timed ||
(ns = end - System.nanoTime()) > 0 L)) {
// 設(shè)置線程t被當(dāng)前對象阻塞
U.putObject(t, BLOCKER, this); // emulate LockSupport
// 線程t賦值
p.parked = t; // minimize window
if (U.getObjectVolatile(a, j) == p)
// 數(shù)組中對象還相等炬转,表示線程還未被喚醒,喚醒線程
U.park(false, ns);
p.parked = null;
// 設(shè)置線程t未被任何對象阻塞
U.putObject(t, BLOCKER, null);
} else if (U.getObjectVolatile(a, j) == p &&
U.compareAndSwapObject(a, j, p, null)) {
// 這里給bound增加加一個SEQ
if (m != 0) // try to shrink
U.compareAndSwapInt(this, BOUND, b, b + SEQ - 1);
p.item = null;
p.hash = h;
i = p.index >>>= 1; // descend
if (Thread.interrupted())
return null;
if (timed && m == 0 && ns <= 0 L)
return TIMED_OUT;
break; // expired; restart
}
}
} else
// 交換失敗算灸,表示有其它線程更改了arena數(shù)組中下標(biāo)i的元素
p.item = null; // clear offer
} else {
// 此時表示下標(biāo)不在bound & MMASK或q不為null但CAS操作失敗
// 需要更新bound變化后的值
if (p.bound != b) { // stale; reset
p.bound = b;
p.collides = 0;
// 反向遍歷
i = (i != m || m == 0) ? m : m - 1;
} else if ((c = p.collides) < m || m == FULL ||
!U.compareAndSwapInt(this, BOUND, b, b + SEQ + 1)) {
// 記錄CAS失敗的次數(shù)
p.collides = c + 1;
// 循環(huán)遍歷
i = (i == 0) ? m : i - 1; // cyclically traverse
} else
// 此時表示bound值增加了SEQ+1
i = m + 1; // grow
// 設(shè)置下標(biāo)
p.index = i;
}
}
}
????看完上面slotExchange
方法和arenaExchange
方法定義扼劈,我們可以看出Exchanger工具類的實(shí)現(xiàn)還是很復(fù)雜的,雖然Exchanger的使用比較簡單菲驴。
關(guān)注下面公眾號荐吵,回復(fù)
1024
領(lǐng)取最新大廠面試資料