并發(fā)工具CyclicBarrier源碼分析及應用

原創(chuàng)申明:本文由公眾號【猿燈塔】原創(chuàng),轉載請說明出處標注

今天呢!燈塔君跟大家講:? ? ? ? ? ? ? ? ? ? ??

并發(fā)工具CyclicBarrier源碼分析及應用

一.CyclicBarrier簡介

1.簡介

CyclicBarrier是一個同步的輔助類榄融,允許一組線程相互之間等待袱箱,達到一個共同點,再繼續(xù)執(zhí)行痊硕。 CyclicBarrier(循環(huán)屏障) 直譯為可循環(huán)使用(Cyclic)的屏障(Barrier)拌喉。它可以讓一組線程到 達一個屏障(同步點)時被阻塞速那,直到最后一個線程到達屏障時,屏障才會開門尿背,所有被屏障攔截的線程才 會繼續(xù)工作琅坡。

JDK中的描述:

A synchronization aid that allows a set of threads to all wait for each other to reach a common barrier point. CyclicBarriers are useful in programs involving a fixed sized party of threads that must occasionally wait for each other. The barrier is called cyclic because it can be re-used after the waiting threads are released. CyclicBarrier是一個同步輔助類,它允許一組線程相互等待直到所有線程都到達一個公共的屏障點残家。 在程序中有固定數(shù)量的線程,這些線程有時候必須等待彼此售躁,這種情況下坞淮,使用CyclicBarrier很有幫 助。這個屏障之所以用循環(huán)修飾陪捷,是因為在所有的線程釋放彼此之后回窘,這個屏障是可以重新使用的

2.運行機制

二.CyclicBarrier結構圖

三.CyclicBarrier方法說明

1.CyclicBarrier(parties)

初始化相互等待的線程數(shù)量的構造方法?

2.CyclicBarrier(parties,RunnablebarrierAction)

初始化相互等待的線程數(shù)量的構造方法以及屏障線程的構造方法 屏障線程的運行時機:等待的線程數(shù)量 = parties,CyclicBarrier打開屏障之前 舉例:在分組計算中市袖,每個線程負責一部分計算啡直,最終這些線程計算結束之后,交由屏障線程進行匯總計算

3苍碟、getParties()?

獲取CyclicBarrier打開屏障的線程數(shù)量酒觅,也成為方數(shù)?

4、getNumberWaiting()?

獲取真在CyclicBarrier上等待的線程數(shù)量

5微峰、await()

在CyclicBarrier上進行阻塞等待舷丹,直到發(fā)生以下情形之一:?

a.在CyclicBarrier上等待的線程數(shù)量達到parties,則所有線程被釋放蜓肆,繼續(xù)執(zhí)行颜凯。?

b.當前線程被中斷谋币,則拋出InterruptedException異常,并停止等待症概,繼續(xù)執(zhí)行蕾额。?

c.其他等待的線程被中斷,則當前線程拋出BrokenBarrierException異常彼城,并停止等待诅蝶,繼續(xù)執(zhí)行。?

d.其他等待的線程超時精肃,則當前線程拋出BrokenBarrierException異常秤涩,并停止等待,繼續(xù)執(zhí)行司抱。?

e.其他線程調用CyclicBarrier.reset()方法筐眷,則當前線程拋出BrokenBarrierException異常,并 停止等待习柠,繼續(xù)執(zhí)行匀谣。?

6资溃、await(timeout,TimeUnit)

在CyclicBarrier上進行限時的阻塞等待,直到發(fā)生以下情形之一:?

a.在CyclicBarrier上等待的線程數(shù)量達到parties溶锭,則所有線程被釋放,繼續(xù)執(zhí)行趴捅。?

b.當前線程被中斷,則拋出InterruptedException異常拱绑,并停止等待,繼續(xù)執(zhí)行猎拨。?

c.當前線程等待超時,則拋出TimeoutException異常红省,并停止等待,繼續(xù)執(zhí)行类腮。?

d.其他等待的線程被中斷,則當前線程拋出BrokenBarrierException異常,并停止等待针饥,繼續(xù)執(zhí)行。?

e.其他等待的線程超時需频,則當前線程拋出BrokenBarrierException異常丁眼,并停止等待,繼續(xù)執(zhí)行昭殉。?

f.其他線程調用CyclicBarrier.reset()方法苞七,則當前線程拋出BrokenBarrierException異常,并 停止等待挪丢,繼續(xù)執(zhí)行蹂风。?

7、isBroken

獲取是否破損標志位broken的值乾蓬,此值有以下幾種情況:?

a.CyclicBarrier初始化時惠啄,broken=false,表示屏障未破損任内。?

b.如果正在等待的線程被中斷撵渡,則broken=true,表示屏障破損死嗦。?

c.如果正在等待的線程超時趋距,則broken=true,表示屏障破損越除。?

d.如果有線程調用CyclicBarrier.reset()方法节腐,則broken=false,表示屏障回到未破損狀態(tài)摘盆。?

8铜跑、reset?

使得CyclicBarrier回歸初始狀態(tài),直觀來看它做了兩件事: a.如果有正在等待的線程骡澈,則會拋出BrokenBarrierException異常,且這些線程停止等待掷空,繼續(xù)執(zhí)行肋殴。 b.將是否破損標志位broken置為false。

四.源碼分析

首先看一下CyclicBarrier內部聲明的一些屬性?

/**用于保護屏障入口的鎖*/

private final ReentrantLock lock = new?

ReentrantLock();?

/**線程等待條件 */?

private final Condition trip = lock.newCondition();

?/** 記錄等待的線程數(shù) */?

private final int parties;?

/**所有線程到達屏障點后坦弟,首先執(zhí)行的命令?

*/ private final Runnable barrierCommand;?

private Generation generation = new Generation();?

/**實際中仍在等待的線程數(shù)护锤,每當有一個線程到達屏障點,

count值就會減一酿傍;當一次新的運算開始后烙懦,?

count的值被重置為parties*/?

private int count;

其中,Generation是CyclicBarrier的一個靜態(tài)內部類

它只有一個boolean類型的屬性赤炒,具體代碼如下:

private static class Generation { Generation() {} // prevent access constructor creation boolean broken; // initially false }

當使用構造方法創(chuàng)建CyclicBarrier實例的時候

就是給上面這些屬性賦值

//創(chuàng)建一個CyclicBarrier實例亏较,parties指定參與相互等待的線程數(shù)雪情,?

//barrierAction指定當所有線程到達屏障點之后巡通,

首先執(zhí)行的操作宴凉,該操作由最后一個進入屏障點線程執(zhí)行.?

public CyclicBarrier(int parties,?

Runnable barrierAction)?{?

if (parties <= 0) throw new IllegalArgumentException();?

this.parties = parties; this.count = parties; this.barrierCommand = barrierAction; }

//創(chuàng)建一個CyclicBarrier實例弥锄,parties指定

?參與相互等待的線程數(shù)?

public CyclicBarrier(int parties) { this(parties, null); }

CyclicBarrier.await方法調用CyclicBarrier.dowait()

每次調用await()都會使計數(shù)器-1叉讥,當減少到0 時就會

喚醒所有的線程 图仓,當調用await()方法時救崔,當前線程已經

到達屏障點六孵,當前線程阻塞進入休眠狀態(tài)

//該方法被調用時表示當前線程已經到達屏障點劫窒,當前線程阻塞進入休眠狀態(tài)?

//直到所有線程都到達屏障點拆座,當前線程才會被喚醒?

public int await() throws InterruptedException, BrokenBarrierException { try {return dowait(false, 0L); } catch (TimeoutException toe) { throw new Error(toe); // cannot happen } }

當前線程已經到達屏障點孕索,當前線程阻塞進入休眠狀態(tài)

//該方法被調用時表示當前線程已經到達屏障點.當前? ?線程阻塞進入休眠狀態(tài)?

//在timeout指定的超時時間內搞旭,等待其他參與線程

? 到達屏障點?

//如果超出指定的等待時間則拋TimeoutException異常肄渗,如果該時間小于等于零,則此方法根本不會等待?

public int await(long timeout, TimeUnit unit)?

throws InterruptedException, BrokenBarrierException,?

TimeoutException { return dowait(true, unit.toNanos(timeout)); }

dowait()方法

private int dowait(boolean timed, long nanos)?

throws InterruptedException, BrokenBarrierException, TimeoutException?

{ //使用獨占資源鎖控制多線程并發(fā)進入這段代碼

final ReentrantLock lock = this.lock;?

//獨占鎖控制線程并發(fā)訪問?

lock.lock();?

try {final Generation g = generation;?

if (g.broken) throw new BrokenBarrierException();?

//檢查當前線程是否被中斷?

if (Thread.interrupted()) {?

//如果當前線程被中斷會做以下三件事?

//1.打翻當前柵欄?

//2.喚醒攔截的所有線程?

//3.拋出中斷異常?

breakBarrier();?

throw new InterruptedException(); }

//每調用一次await()方法,計數(shù)器就減一?

int index = --count;?

//計數(shù)器的值減為0則需喚醒所有線程并轉換到下一代?

if (index == 0) { // tripped boolean ranAction = false;?

try {

//如果在創(chuàng)建CyclicBarrier實例時設置了barrierAction翁垂,則先執(zhí)行?

barrierAction inal Runnable command = barrierCommand;

?if (command != null)?

? ? ?command.run();?

? ? ?ranAction = true;?

//當所有參與的線程都到達屏障點沿猜,為喚醒所有處于

? 休眠狀態(tài)的線程做準備工作?

//需要注意的是啼肩,喚醒所有阻塞線程不是在這里 nextGeneration();?

return 0;?

} finally {?

//確保在任務未成功執(zhí)行時能將所有線程喚醒?

if (!ranAction)

?breakBarrier();?

}?

}

// loop until tripped, broken,?

interrupted, or timed out?

//如果計數(shù)器不為0則執(zhí)行此循環(huán)

for (;;)?

{ try {

//根據(jù)傳入的參數(shù)來決定是定時等待還是非定時等待

if (!timed)?

//讓當前執(zhí)行的線程阻塞祈坠,處于休眠狀態(tài) trip.await();?

else if (nanos > 0L)?

//讓當前執(zhí)行的線程阻塞赦拘,在超時時間內處于休眠狀態(tài)?

nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) {?

//若當前線程在等待期間被中斷則打翻柵欄喚醒其他線程

if (g == generation && ! g.broken) { breakBarrier();?

throw ie; }?

else {?

// We're about to finish waiting even?

? ?if we had not

// been interrupted, so this interrupt?

? ?is deemed to?

// "belong" to subsequent execution. Thread.currentThread().interrupt(); } }

//如果線程因為打翻柵欄操作而被喚醒則拋出異常?

if (g.broken) throw new BrokenBarrierException();?

if (g != generation) return index;?

//如果線程因為時間到了而被喚醒則打翻柵欄

? 并拋出異常?

if (timed && nanos <= 0L) {?

breakBarrier();?

throw new TimeoutException(); }?

}?

}?

finally { lock.unlock(); } }

每次調用await方法都會使內部的計數(shù)器臨時變量-1

當減少到0時躺同,就會調用nextGeneration方法

private void nextGeneration() {?

// signal completion of last generation trip.signalAll();?

// set up next generation?

count = parties; generation = new Generation(); }

在這里喚醒所有阻塞的線程?

提醒:在聲明CyclicBarrier的時候還可以傳一個Runnable的實現(xiàn)類蹋艺,當計數(shù)器減少到0時捎谨,

會執(zhí)行該 實現(xiàn)類?到這里CyclicBarrier的

實現(xiàn)原理基本已經都清楚了下面來深入源碼分析

一下線程阻塞代碼 trip.await()和線程喚醒trip.signalAll()的實現(xiàn)侍芝。

//await()是AQS內部類ConditionObject中的方法 public final void await() throws InterruptedException {?

//如果線程中斷拋異常?

if (Thread.interrupted())?

? ?throw new InterruptedException();?

//新建Node節(jié)點,并將新節(jié)點加入到Condition

? 等待隊列中?

//Condition等待隊列是AQS內部類

ConditionObject實現(xiàn)的凶赁,ConditionObject

有兩個屬性虱肄,

分別是firstWaiter和lastWaiter

都是Node類型?

//firstWaiter和lastWaiter分別用于代表Condition等待隊列的頭結點和尾節(jié)點 Node node = addConditionWaiter();?

//釋放獨占鎖咏窿,讓其它線程可以獲取到dowait()

? 方法中的獨占鎖?

int savedState = fullyRelease(node);?

int interruptMode = 0;?

//檢測此節(jié)點是否在資源等待隊列(AQS同步隊列)中集嵌, //如果不在,說明此線程還沒有競爭資源鎖的權利怜珍,

此線程繼續(xù)阻塞酥泛,直到檢測到此節(jié)點在 資源等待隊列上(AQS同步隊列)中

//這里出現(xiàn)了兩個等待隊列嫌拣,分別是Condition等待

? 隊列和AQS資源鎖等待隊列(或者說是 同步隊列) //Condition等待隊列是等待被喚醒的線程隊列

?AQS資源鎖等待隊列是等待獲取資源鎖 的隊列?

while (!isOnSyncQueue(node))?

{?

//阻塞當前線程异逐,當前線程進入休眠狀態(tài),可以看到

? 這里使用LockSupport.park阻 塞當前線程 LockSupport.park(this);?

if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; }

if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode?

= REINTERRUPT;?

if (node.nextWaiter != null)?

// clean up if cancelled unlinkCancelledWaiters();?

if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }

//addConditionWaiter()是AQS內部類ConditionObject中的方法?

private Node addConditionWaiter()?

{ Node t = lastWaiter;?

// 將condition等待隊列中,節(jié)點狀態(tài)不是

CONDITION的節(jié)點箩祥,從condition等待隊列中移除

if (t != null && t.waitStatus != Node.CONDITION)?

{ unlinkCancelledWaiters();?

t = lastWaiter; }

//以下操作是用此線程構造一個節(jié)點,并將之加入到condition等待隊列尾部?

Node node = new Node(Thread.currentThread(), Node.CONDITION);?

if (t == null)?

firstWaiter = node;?

else

t.nextWaiter = node;?

lastWaiter = node; return node; }

//signalAll是AQS內部類ConditionObject中的方法

public final void signalAll() {

?if (!isHeldExclusively())?

throw new IllegalMonitorStateException(); //Condition等待隊列的頭結點

Node first = firstWaiter;?

if (first != null) doSignalAll(first); }

private void doSignalAll(Node first) { lastWaiter = firstWaiter = null; do {Node next = first.nextWaiter; first.nextWaiter = null;

//將Condition等待隊列中的Node節(jié)點按之前順序

都轉移到了AQS同步隊列中 transferForSignal(first);?

first = next;?

} while (first != null); }

final boolean transferForSignal

(Node node) {?

if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))?

return false;?

//這里將Condition等待隊列中的Node節(jié)點插入到AQS同步隊列的尾部?

Node p = enq(node);?

int ws = p.waitStatus; if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true; }

//ReentrantLock#unlock()方法?

public void unlock() {?

//Sync是ReentrantLock的內部類,繼承自AbstractQueuedSynchronizer捐凭,

它是 ReentrantLock中公平鎖和非公平鎖的基礎實現(xiàn) sync.release(1); }

public final boolean release(int arg) {?

//釋放鎖?

if (tryRelease(arg)) {?

//AQS同步隊列頭結點?

Node h = head; if (h != null && h.waitStatus != 0)?

//喚醒節(jié)點中的線程?

unparkSuccessor(h); return true; }

return false; }

private void unparkSuccessor(Node node)?

{ int ws = node.waitStatus;?

if (ws < 0) compareAndSetWaitStatus(node, ws, 0);?

Node s = node.next;?

if (s == null || s.waitStatus > 0)?

{ s = null; for (Node t = tail;?

t != null && t != node; t = t.prev)?

if (t.waitStatus <= 0) s = t; }

if (s != null) //喚醒阻塞線程 LockSupport.unpark(s.thread); }

365天干貨不斷,可以微信搜索「猿燈塔」第一時間閱讀茁肠,回復【資料】【面試】【簡歷】有我準備的一線大廠面試資料和簡歷模板

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末垦梆,一起剝皮案震驚了整個濱河市托猩,隨后出現(xiàn)的幾起案子京腥,更是在濱河造成了極大的恐慌,老刑警劉巖他宛,帶你破解...
    沈念sama閱讀 211,348評論 6 491
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件堕汞,死亡現(xiàn)場離奇詭異讯检,居然都是意外死亡人灼,警方通過查閱死者的電腦和手機顾翼,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,122評論 2 385
  • 文/潘曉璐 我一進店門灸芳,熙熙樓的掌柜王于貴愁眉苦臉地迎上來拜姿,“玉大人蕊肥,你說我怎么就攤上這事∨” “怎么了赔硫?”我有些...
    開封第一講書人閱讀 156,936評論 0 347
  • 文/不壞的土叔 我叫張陵爪膊,是天一觀的道長。 經常有香客問我惊完,道長小槐,這世上最難降的妖魔是什么凿跳? 我笑而不...
    開封第一講書人閱讀 56,427評論 1 283
  • 正文 為了忘掉前任控嗜,我火速辦了婚禮疆栏,結果婚禮上壁顶,老公的妹妹穿的比我還像新娘若专。我一直安慰自己蝴猪,他們只是感情好自阱,可當我...
    茶點故事閱讀 65,467評論 6 385
  • 文/花漫 我一把揭開白布动壤。 她就那樣靜靜地躺著琼懊,像睡著了一般哼丈。 火紅的嫁衣襯著肌膚如雪醉旦。 梳的紋絲不亂的頭發(fā)上桨啃,一...
    開封第一講書人閱讀 49,785評論 1 290
  • 那天,我揣著相機與錄音丧慈,去河邊找鬼逃默。 笑死完域,一個胖子當著我的面吹牛,可吹牛的內容都是我干的凹耙。 我是一名探鬼主播使兔,決...
    沈念sama閱讀 38,931評論 3 406
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼泽艘!你這毒婦竟也來了匹涮?” 一聲冷哼從身側響起然低,我...
    開封第一講書人閱讀 37,696評論 0 266
  • 序言:老撾萬榮一對情侶失蹤带兜,失蹤者是張志新(化名)和其女友劉穎刚照,沒想到半個月后喧兄,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經...
    沈念sama閱讀 44,141評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡恭理,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 36,483評論 2 327
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片遭赂。...
    茶點故事閱讀 38,625評論 1 340
  • 序言:一個原本活蹦亂跳的男人離奇死亡循诉,死狀恐怖,靈堂內的尸體忽然破棺而出撇他,到底是詐尸還是另有隱情茄猫,我是刑警寧澤,帶...
    沈念sama閱讀 34,291評論 4 329
  • 正文 年R本政府宣布困肩,位于F島的核電站划纽,受9級特大地震影響,放射性物質發(fā)生泄漏锌畸。R本人自食惡果不足惜勇劣,卻給世界環(huán)境...
    茶點故事閱讀 39,892評論 3 312
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望潭枣。 院中可真熱鬧比默,春花似錦命咐、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,741評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至轧葛,卻和暖如春焰雕,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,977評論 1 265
  • 我被黑心中介騙來泰國打工约郁, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人。 一個月前我還...
    沈念sama閱讀 46,324評論 2 360
  • 正文 我出身青樓艘绍,卻偏偏與公主長得像这敬,于是被迫代替她去往敵國和親始衅。 傳聞我的和親對象是個殘疾皇子蝙茶,可洞房花燭夜當晚...
    茶點故事閱讀 43,492評論 2 348

推薦閱讀更多精彩內容