JUC并發(fā)工具之Exchanger源碼解析

原文出處:https://www.zzwzdx.cn

實(shí)現(xiàn)原理

Exchanger(交換者)是用于線程協(xié)作的工具類。Exchanger用于進(jìn)行兩個線程之間的數(shù)據(jù)交換改备。它提供一個同步點(diǎn)唬复,在這個同步點(diǎn)矗积,兩個線程可以交換彼此的數(shù)據(jù)。這兩個線程通過exchange()方法交換數(shù)據(jù)敞咧,當(dāng)一個線程先執(zhí)行exchange()方法后棘捣,它會一直等待第二個線程也執(zhí)行exchange()方法,當(dāng)這兩個線程到達(dá)同步點(diǎn)時休建,這兩個線程就可以交換數(shù)據(jù)了乍恐。

Exchanger的算法核心是通過一個可以交換數(shù)據(jù)的slot和一個可以帶有數(shù)據(jù)item的參與者,在源碼中的定義如下:

for (;;) {
    if (slot is empty) { // offer
        // slot為空時测砂,將item 設(shè)置到Node 中        
        place item in a Node;
        if (can CAS slot from empty to node) {
            // 當(dāng)將node通過CAS交換到slot中時茵烈,掛起線程等待被喚醒
            wait for release;
            // 被喚醒后返回node中匹配到的item
            return matching item in node;
        }
    } else if (can CAS slot from node to empty) { // release
         // 將slot設(shè)置為空
        // 獲取node中的item,將需要交換的數(shù)據(jù)設(shè)置到匹配的item
        get the item in node;
        set matching item in node;
        // 喚醒等待的線程
        release waiting thread;
    }
    // else retry on CAS failure
}

比如有2條線程A和B砌些,A線程交換數(shù)據(jù)時呜投,發(fā)現(xiàn)slot為空,則將需要交換的數(shù)據(jù)放在slot中等待其它線程進(jìn)來交換數(shù)據(jù)存璃,等線程B進(jìn)來宙彪,讀取A設(shè)置的數(shù)據(jù),然后設(shè)置線程B需要交換的數(shù)據(jù)有巧,然后喚醒A線程释漆,原理就是這么簡單。但是當(dāng)多個線程之間進(jìn)行交換數(shù)據(jù)時就會出現(xiàn)問題篮迎,所以Exchanger加入了slot數(shù)組男图。

Exchanger中定義了幾個重要的成員變量,它們分別是:

private final Participant participant;
private volatile Node[] arena;
private volatile Node slot;

participant的作用是為每個線程保留唯一的一個Node節(jié)點(diǎn)。slot為單個槽甜橱,arena為數(shù)組槽逊笆。他們都是Node類型。這里arena存在的意義是當(dāng)有多個參與者使用同一個交換場所時岂傲,會存在嚴(yán)重伸縮性問題难裆。既然單個交換場所存在問題,那么我們就安排多個,也就是數(shù)組arena乃戈。通過數(shù)組arena來安排不同的線程使用不同的slot來降低競爭問題褂痰,并且可以保證最終一定會成對交換數(shù)據(jù)。但是Exchanger不是一來就會生成arena數(shù)組來降低競爭症虑,只有當(dāng)產(chǎn)生競爭時才會生成arena數(shù)組缩歪。那么怎么將Node與當(dāng)前線程綁定呢?Participant 谍憔,Participant 的作用就是為每個線程保留唯一的一個Node節(jié)點(diǎn)匪蝙,它繼承ThreadLocal,同時在Node節(jié)點(diǎn)中記錄在arena中的下標(biāo)index

Node的數(shù)據(jù)結(jié)構(gòu)如下:

@sun.misc.Contended static final class Node {
     // arena的下標(biāo)习贫,多個槽位的時候利用
    int index; 
    // 上一次記錄的Exchanger.bound
    int bound; 
    // 在當(dāng)前bound下CAS失敗的次數(shù)逛球;
    int collides;
    // 用于自旋;
    int hash; 
    // 這個線程的當(dāng)前項(xiàng)苫昌,也就是需要交換的數(shù)據(jù)需忿;
    Object item; 
    //做releasing操作的線程傳遞的項(xiàng);
    volatile Object match; 
    //掛起時設(shè)置線程值蜡歹,其他情況下為null屋厘;
    volatile Thread parked;
}

Exchanger的核心方法為exchange(V x),下面我們就來分析下exchange(V x)方法月而。

exchange(V x)方法

exchange(V x):等待另一個線程到達(dá)此交換點(diǎn)(除非當(dāng)前線程被中斷)汗洒,然后將給定的對象傳送給該線程,并接收該線程的對象父款。方法定義如下:

public V exchange(V x) throws InterruptedException {
    Object v;
    // 當(dāng)參數(shù)為null時需要將item設(shè)置為空的對象
    Object item = (x == null) ? NULL_ITEM : x; // translate null args
    // 注意到這里的這個表達(dá)式是整個方法的核心
    if ((arena != null ||
            (v = slotExchange(item, false, 0 L)) == null) &&
        ((Thread.interrupted() || // disambiguates null return
            (v = arenaExchange(item, false, 0 L)) == null)))
        throw new InterruptedException();
    return (v == NULL_ITEM) ? null : (V) v;
}

仔細(xì)分析上述方法中的if語句溢谤,可以得知:

  • 只有當(dāng)arena 為空時,才執(zhí)行slotExchange(item, false, 0 L)方法憨攒。
  • 當(dāng)arena不為空時世杀,或者(arena為null且slotExchange方法返回null)時,此時線程未中斷肝集,才會執(zhí)行arenaExchange方法;
  • 線程中斷時瞻坝,就直接拋出線程中斷異常。

下面我們再看看slotExchange方法杏瞻,其定義如下:

private final Object slotExchange(Object item, boolean timed, long ns) {
    // 獲取當(dāng)前線程node對象
    Node p = participant.get();
    // 當(dāng)前線程
    Thread t = Thread.currentThread();
    // 若果線程被中斷所刀,就直接返回null
    if (t.isInterrupted()) // preserve interrupt status so caller can recheck
        return null;
    // 自旋
    for (Node q;;) {
        // 將slot值賦給q
        if ((q = slot) != null) {
             // slot 不為null,即表示已有線程已經(jīng)把需要交換的數(shù)據(jù)設(shè)置在slot中了
            // 通過CAS將slot設(shè)置成null
            if (U.compareAndSwapObject(this, SLOT, q, null)) {
                // CAS操作成功后捞挥,將slot中的item賦值給對象v浮创,以便返回。
                // 這里也是就讀取之前線程要交換的數(shù)據(jù)
                Object v = q.item;
                // 將當(dāng)前線程需要交給的數(shù)據(jù)設(shè)置在q中的match
                q.match = item;
                 // 獲取被掛起的線程
                Thread w = q.parked;
                if (w != null)
                    // 如果線程不為null砌函,喚醒它
                    U.unpark(w);
                // 返回其他線程給的V
                return v;
            }
            // create arena on contention, but continue until slot null
            // CAS 操作失敗斩披,表示有其它線程競爭溜族,在此線程之前將數(shù)據(jù)已取走
            // NCPU:CPU的核數(shù)
            // bound == 0 表示arena數(shù)組未初始化過,CAS操作bound將其增加SEQ
            if (NCPU > 1 && bound == 0 &&
                U.compareAndSwapInt(this, BOUND, 0, SEQ))
                // 初始化arena數(shù)組
                arena = new Node[(FULL + 2) << ASHIFT];
        }
        // 上面分析過垦沉,只有當(dāng)arena不為空才會執(zhí)行slotExchange方法的
        // 所以表示剛好已有其它線程加入進(jìn)來將arena初始化
        else if (arena != null)
            // 這里就需要去執(zhí)行arenaExchange
            return null; // caller must reroute to arenaExchange
        else {
            // 這里表示當(dāng)前線程是以第一個線程進(jìn)來交換數(shù)據(jù)
            // 或者表示之前的數(shù)據(jù)交換已進(jìn)行完畢煌抒,這里可以看作是第一個線程
            // 將需要交換的數(shù)據(jù)先存放在當(dāng)前線程變量p中
            p.item = item;
            // 將需要交換的數(shù)據(jù)通過CAS設(shè)置到交換區(qū)slot
            if (U.compareAndSwapObject(this, SLOT, null, p))
                // 交換成功后跳出自旋
                break;
            // CAS操作失敗,表示有其它線程剛好先于當(dāng)前線程將數(shù)據(jù)設(shè)置到交換區(qū)slot
            // 將當(dāng)前線程變量中的item設(shè)置為null乡话,然后自旋獲取其它線程存放在交換區(qū)slot的數(shù)據(jù)
            p.item = null;
        }
    }

    // await release
    // 執(zhí)行到這里表示當(dāng)前線程已將需要的交換的數(shù)據(jù)放置于交換區(qū)slot中了摧玫,
    // 等待其它線程交換數(shù)據(jù)然后喚醒當(dāng)前線程
    int h = p.hash;
    long end = timed ? System.nanoTime() + ns : 0 L;
    // 自旋次數(shù)
    int spins = (NCPU > 1) ? SPINS : 1;
    Object v;
    // 自旋等待直到p.match不為null耳奕,也就是說等待其它線程將需要交換的數(shù)據(jù)放置于交換區(qū)slot
    while ((v = p.match) == null) {
        // 下面的邏輯主要是自旋等待绑青,直到spins遞減到0為止
        if (spins > 0) {
            h ^= h << 1;
            h ^= h >>> 3;
            h ^= h << 10;
            if (h == 0)
                h = SPINS | (int) t.getId();
            else if (h < 0 && (--spins & ((SPINS >>> 1) - 1)) == 0)
                Thread.yield();
        } else if (slot != p)
            spins = SPINS;
        // 此處表示未設(shè)置超時或者時間未超時
        else if (!t.isInterrupted() && arena == null &&
            (!timed || (ns = end - System.nanoTime()) > 0 L)) {
            // 設(shè)置線程t被當(dāng)前對象阻塞
            U.putObject(t, BLOCKER, this);
            // 給p掛機(jī)線程的值賦值
            p.parked = t;
            if (slot == p)
                // 如果slot還沒有被置為null,也就表示暫未有線程過來交換數(shù)據(jù)屋群,需要將當(dāng)前線程掛起
                U.park(false, ns);
            // 線程被喚醒闸婴,將被掛起的線程設(shè)置為null
            p.parked = null;
            // 設(shè)置線程t未被任何對象阻塞
            U.putObject(t, BLOCKER, null);
        // 不是以上條件時(可能是arena已不為null或者超時)    
        } else if (U.compareAndSwapObject(this, SLOT, p, null)) {
             // arena不為null則v為null,其它為超時則v為超市對象TIMED_OUT,并且跳出循環(huán)
            v = timed && ns <= 0 L && !t.isInterrupted() ? TIMED_OUT : null;
            break;
        }
    }
    // 取走match值芍躏,并將p中的match置為null
    U.putOrderedObject(p, MATCH, null);
    // 設(shè)置item為null
    p.item = null;
    p.hash = h;
    // 返回交換值
    return v;
}

再來看arenaExchange方法邪乍,此方法被執(zhí)行時表示多個線程進(jìn)入交換區(qū)交換數(shù)據(jù),arena數(shù)組已被初始化对竣,此方法中的一些處理方式和slotExchange比較類似庇楞,它是通過遍歷arena數(shù)組找到需要交換的數(shù)據(jù)。arenaExchange方法源碼定義如下:

// timed 為true表示設(shè)置了超時時間否纬,ns為>0的值吕晌,反之沒有設(shè)置超時時間
private final Object arenaExchange(Object item, boolean timed, long ns) {
    Node[] a = arena;
    // 獲取當(dāng)前線程中的存放的node
    Node p = participant.get();
    //index初始值0
    for (int i = p.index;;) { // access slot at i
        // 遍歷,如果在數(shù)組中找到數(shù)據(jù)則直接交換并喚醒線程临燃,如未找到則將需要交換給其它線程的數(shù)據(jù)放置于數(shù)組中
        int b, m, c;
        long j; // j is raw array offset
        // 其實(shí)這里就是向右遍歷數(shù)組睛驳,只是用到了元素在內(nèi)存偏移的偏移量
        // q實(shí)際為arena數(shù)組偏移(i + 1) *  128個地址位上的node
        Node q = (Node) U.getObjectVolatile(a, j = (i << ASHIFT) + ABASE);
        // 如果q不為null,并且CAS操作成功膜廊,將下標(biāo)j的元素置為null
        if (q != null && U.compareAndSwapObject(a, j, q, null)) {
            // 表示當(dāng)前線程已發(fā)現(xiàn)有交換的數(shù)據(jù)乏沸,然后獲取數(shù)據(jù),喚醒等待的線程
            Object v = q.item; // release
            q.match = item;
            Thread w = q.parked;
            if (w != null)
                U.unpark(w);
            return v;
        // q 為null 并且 i 未超過數(shù)組邊界    
        } else if (i <= (m = (b = bound) & MMASK) && q == null) {
             // 將需要給其它線程的item賦予給p中的item
            p.item = item; // offer
            if (U.compareAndSwapObject(a, j, null, p)) {
                // 交換成功
                long end = (timed && m == 0) ? System.nanoTime() + ns : 0 L;
                Thread t = Thread.currentThread(); // wait
                // 自旋直到有其它線程進(jìn)入爪瓜,遍歷到該元素并與其交換蹬跃,同時當(dāng)前線程被喚醒
                for (int h = p.hash, spins = SPINS;;) {
                    Object v = p.match;
                    if (v != null) {
                        // 其它線程設(shè)置的需要交換的數(shù)據(jù)match不為null
                        // 將match設(shè)置null,item設(shè)置為null
                        U.putOrderedObject(p, MATCH, null);
                        p.item = null; // clear for next use
                        p.hash = h;
                        return v;
                    } else if (spins > 0) {
                        h ^= h << 1;
                        h ^= h >>> 3;
                        h ^= h << 10; // xorshift
                        if (h == 0) // initialize hash
                            h = SPINS | (int) t.getId();
                        else if (h < 0 && // approx 50% true
                            (--spins & ((SPINS >>> 1) - 1)) == 0)
                            Thread.yield(); // two yields per wait
                    } else if (U.getObjectVolatile(a, j) != p)
                        // 和slotExchange方法中的類似,arena數(shù)組中的數(shù)據(jù)已被CAS設(shè)置
                       // match值還未設(shè)置铆铆,讓其再自旋等待match被設(shè)置
                        spins = SPINS; // releaser hasn't set match yet
                    else if (!t.isInterrupted() && m == 0 &&
                        (!timed ||
                            (ns = end - System.nanoTime()) > 0 L)) {
                        // 設(shè)置線程t被當(dāng)前對象阻塞
                        U.putObject(t, BLOCKER, this); // emulate LockSupport
                         // 線程t賦值
                        p.parked = t; // minimize window
                        if (U.getObjectVolatile(a, j) == p)
                            // 數(shù)組中對象還相等炬转,表示線程還未被喚醒,喚醒線程
                            U.park(false, ns);
                        p.parked = null;
                         // 設(shè)置線程t未被任何對象阻塞
                        U.putObject(t, BLOCKER, null);
                    } else if (U.getObjectVolatile(a, j) == p &&
                        U.compareAndSwapObject(a, j, p, null)) {
                        // 這里給bound增加加一個SEQ
                        if (m != 0) // try to shrink
                            U.compareAndSwapInt(this, BOUND, b, b + SEQ - 1);
                        p.item = null;
                        p.hash = h;
                        i = p.index >>>= 1; // descend
                        if (Thread.interrupted())
                            return null;
                        if (timed && m == 0 && ns <= 0 L)
                            return TIMED_OUT;
                        break; // expired; restart
                    }
                }
            } else
                // 交換失敗算灸,表示有其它線程更改了arena數(shù)組中下標(biāo)i的元素
                p.item = null; // clear offer
        } else {
            // 此時表示下標(biāo)不在bound & MMASK或q不為null但CAS操作失敗
           // 需要更新bound變化后的值
            if (p.bound != b) { // stale; reset
                p.bound = b;
                p.collides = 0;
                // 反向遍歷
                i = (i != m || m == 0) ? m : m - 1;
            } else if ((c = p.collides) < m || m == FULL ||
                !U.compareAndSwapInt(this, BOUND, b, b + SEQ + 1)) {
                 // 記錄CAS失敗的次數(shù)
                p.collides = c + 1;
                // 循環(huán)遍歷
                i = (i == 0) ? m : i - 1; // cyclically traverse
            } else
                // 此時表示bound值增加了SEQ+1
                i = m + 1; // grow
            // 設(shè)置下標(biāo)
            p.index = i;
        }
    }
}

????看完上面slotExchange方法和arenaExchange方法定義扼劈,我們可以看出Exchanger工具類的實(shí)現(xiàn)還是很復(fù)雜的,雖然Exchanger的使用比較簡單菲驴。


關(guān)注下面公眾號荐吵,回復(fù) 1024 領(lǐng)取最新大廠面試資料

image
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子先煎,更是在濱河造成了極大的恐慌贼涩,老刑警劉巖,帶你破解...
    沈念sama閱讀 216,470評論 6 501
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件薯蝎,死亡現(xiàn)場離奇詭異遥倦,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)占锯,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,393評論 3 392
  • 文/潘曉璐 我一進(jìn)店門袒哥,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人消略,你說我怎么就攤上這事堡称。” “怎么了艺演?”我有些...
    開封第一講書人閱讀 162,577評論 0 353
  • 文/不壞的土叔 我叫張陵却紧,是天一觀的道長。 經(jīng)常有香客問我胎撤,道長晓殊,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,176評論 1 292
  • 正文 為了忘掉前任伤提,我火速辦了婚禮巫俺,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘飘弧。我一直安慰自己识藤,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,189評論 6 388
  • 文/花漫 我一把揭開白布次伶。 她就那樣靜靜地躺著痴昧,像睡著了一般。 火紅的嫁衣襯著肌膚如雪冠王。 梳的紋絲不亂的頭發(fā)上赶撰,一...
    開封第一講書人閱讀 51,155評論 1 299
  • 那天,我揣著相機(jī)與錄音柱彻,去河邊找鬼豪娜。 笑死,一個胖子當(dāng)著我的面吹牛哟楷,可吹牛的內(nèi)容都是我干的瘤载。 我是一名探鬼主播,決...
    沈念sama閱讀 40,041評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼卖擅,長吁一口氣:“原來是場噩夢啊……” “哼鸣奔!你這毒婦竟也來了墨技?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 38,903評論 0 274
  • 序言:老撾萬榮一對情侶失蹤挎狸,失蹤者是張志新(化名)和其女友劉穎扣汪,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體锨匆,經(jīng)...
    沈念sama閱讀 45,319評論 1 310
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡崭别,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,539評論 2 332
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了恐锣。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片茅主。...
    茶點(diǎn)故事閱讀 39,703評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖侥蒙,靈堂內(nèi)的尸體忽然破棺而出暗膜,到底是詐尸還是另有隱情匀奏,我是刑警寧澤鞭衩,帶...
    沈念sama閱讀 35,417評論 5 343
  • 正文 年R本政府宣布,位于F島的核電站娃善,受9級特大地震影響论衍,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜聚磺,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,013評論 3 325
  • 文/蒙蒙 一坯台、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧瘫寝,春花似錦蜒蕾、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,664評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至暮屡,卻和暖如春撤摸,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背褒纲。 一陣腳步聲響...
    開封第一講書人閱讀 32,818評論 1 269
  • 我被黑心中介騙來泰國打工准夷, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人莺掠。 一個月前我還...
    沈念sama閱讀 47,711評論 2 368
  • 正文 我出身青樓衫嵌,卻偏偏與公主長得像,于是被迫代替她去往敵國和親彻秆。 傳聞我的和親對象是個殘疾皇子楔绞,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,601評論 2 353