多線程并發(fā)編程20-線程同步之CyclicBarrier

? ? 前文介紹了使用CountDownLatch來實現(xiàn)線程間同步,但是CountDownLatch的計數(shù)器是一次性的耕驰,當計數(shù)器值減到0之后再調用await或countdown方法就會立刻返回黎茎。今天講解的CyclicBarrier是一種可重置的線程間同步堡称,當指定個數(shù)的線程全部到達了一個狀態(tài)后再全部同時執(zhí)行,并重置CyclicBarrier翻翩。

? ? 下面通過一個代碼示例介紹CountDownLatch的使用檐盟,下面的示例中啟動兩個線程A褂萧、B,并在A遵堵、B線程都完成各自的任務之后箱玷,對A怨规、B線程完成的任務數(shù)進行統(tǒng)計陌宿。

任務處理類MyRunnable?

class MyRunnable implements Runnable {

? ? private CyclicBarrier cyclicBarrier = null;

? ? private int threadId;

? ? private int taskCount;

? ? private LinkedBlockingQueue<Integer> taskSummaryQueue = null;

? ? public MyRunnable(CyclicBarrier cyclicBarrier, int threadId, int taskCount, LinkedBlockingQueue<Integer> taskSummaryQueue) {

? ? ? ? this.cyclicBarrier = cyclicBarrier;

? ? ? ? this.threadId = threadId;

? ? ? ? this.taskCount = taskCount;

? ? ? ? this.taskSummaryQueue = taskSummaryQueue;

? ? }

? ? @Override

? ? public void run() {

? ? ? ? String threadName = Thread.currentThread().getName();

? ? ? ? if (cyclicBarrier != null) {

? ? ? ? ? ? try {

//執(zhí)行任務

? ? ? ? ? ? ? ? for (int i = 0; i < taskCount; i++) {

? ? ? ? ? ? ? ? ? ? Thread.sleep(1000);

? ? ? ? ? ? ? ? }

? ? ? ? ? ? ? ? System.out.println("thread name:" + threadName + " thread id:" + threadId + "到達屏障處");

//到達屏障處,阻塞直到到達屏障處的線程個數(shù)等于CyclicBarrier 中的parties值波丰。

? ? ? ? ? ? ? ? int await = cyclicBarrier.await();

? ? ? ? ? ? ? ? System.out.println("thread name:" + threadName + "thread:" + threadId + " await:" + await);

? ? ? ? ? ? } catch (InterruptedException e) {

? ? ? ? ? ? ? ? System.out.println("thread name:" + threadName + " thread:" + threadId + " InterruptedException");

? ? ? ? ? ? ? ? e.printStackTrace();

? ? ? ? ? ? } catch (BrokenBarrierException e) {

? ? ? ? ? ? ? ? System.out.println("thread name:" + threadName + " thread:" + threadId + " BrokenBarrierException");

? ? ? ? ? ? ? ? e.printStackTrace();

? ? ? ? ? ? }

? ? ? ? }

? ? }

}

main函數(shù)

public static void main(String[] args) {

? ? ThreadGroup group = new ThreadGroup("myThreadGroup");

//創(chuàng)建一個多線程安全的隊列壳坪,用來統(tǒng)計完成的任務數(shù)。

? ? LinkedBlockingQueue<Integer> queue = new LinkedBlockingQueue<>();

//創(chuàng)建回旋屏障CyclicBarrier 掰烟,并指定barrierAction爽蝴,barrierAction會在最后一個到達屏障處的線程執(zhí)行。

? ? CyclicBarrier cyclicBarrier = new CyclicBarrier(2, new Runnable() {

? ? ? ? @Override

? ? ? ? public void run() {

? ? ? ? ? ? String threadName = Thread.currentThread().getName();

? ? ? ? ? ? int finishedTask = 0;

//匯總完成任務數(shù)

? ? ? ? ? ? for (Integer integer : queue) {

? ? ? ? ? ? ? ? if (integer != null) {

? ? ? ? ? ? ? ? ? ? finishedTask += integer;

? ? ? ? ? ? ? ? }

? ? ? ? ? ? }

? ? ? ? ? ? System.out.println("barrierAction in thread name:" + threadName + " finished task:" + finishedTask);

? ? ? ? }

? ? });

? ? Thread thread1 = new Thread(group,new MyRunnable(cyclicBarrier, 1,5,queue), "thread1");

? ? thread1.start();

? ? try {

? ? ? ? Thread.sleep(10);

? ? } catch (InterruptedException e) {

? ? ? ? e.printStackTrace();

? ? }

? ? Thread thread2 = new Thread(group, new MyRunnable(cyclicBarrier, 2,10,queue), "thread2");

? ? thread2.start();

? ? try {

? ? ? ? Thread.sleep(10);

? ? } catch (InterruptedException e) {

? ? ? ? e.printStackTrace();

? ? }

}

執(zhí)行結果:

thread name:thread1 thread id:1到達屏障處

thread name:thread2 thread id:2到達屏障處

barrierAction in thread name:thread2 finished task:15

thread name:thread2thread:2 屏障之后

thread name:thread1thread:1 屏障之后

? ? 下面對CyclicBarrier內部原理進行剖析纫骑。

CyclicBarrier內部的成員變量如下:

//獨占鎖蝎亚,用來操作計數(shù)器。

private final ReentrantLock lock =new ReentrantLock();

//條件變量先馆,阻塞到達屏障處的線程发框,直到計數(shù)器為0。

private final Condition trip =lock.newCondition();

//重置之后會將計數(shù)器值重置為該值煤墙。

private final int parties;

//當計數(shù)器值為0之后會觸發(fā)該runnable梅惯。

private final Runnable barrierCommand;

//年代變量,當重置仿野、計數(shù)器減到0或拋出異常铣减,年代變量會重新創(chuàng)建進入下一個年代岛心。

private Generation generation =new Generation();

//計數(shù)器役拴,當線程調用awit到達屏障點時會進行減一。

private int count;

int await()

? ? 調用await方法則表示調用線程到達屏障處振湾,阻塞直到parties個線程到達屏障處球涛。如果當前線程不是最后一個到達屏障處的線程則會進行阻塞魄梯,當有如下情況發(fā)生當前線程才會返回:

1.最后一個線程到達屏障處。

2.其他線程調用了當前線程的中斷標志宾符。

3.其他到達屏障處的線程被設置了中斷標志酿秸。

4.其他到達屏障處阻塞的線程到了阻塞過期時間。

5.其他線程調用了CyclicBarrier的rest方法魏烫。

public int await() throws InterruptedException, BrokenBarrierException {

? ? try {

? ? ? ? return dowait(false, 0L);

? ? } catch (TimeoutException toe) {

? ? ? ? throw new Error(toe); // cannot happen

? ? }

}

int dowait(boolean timed, long nanos)

private int dowait(boolean timed, long nanos)

? ? throws InterruptedException, BrokenBarrierException,

? ? ? ? ? TimeoutException {

//(1)獲取獨占鎖辣苏。

? ? final ReentrantLock lock = this.lock;

? ? lock.lock();

? ? try {

? ? ? ? final Generation g = generation;

? ? ? ? if (g.broken)

? ? ? ? ? ? throw new BrokenBarrierException();

? ? ? ? if (Thread.interrupted()) {

? ? ? ? ? ? breakBarrier();

? ? ? ? ? ? throw new InterruptedException();

? ? ? ? }

//(2)計數(shù)器減1肝箱。

? ? ? ? int index = --count;

//(3)如果計數(shù)器為0,則說明最后一個線程到達了屏障處稀蟋,則調用barrierAction后喚醒之前到達屏障處阻塞的線程煌张。

? ? ? ? if (index == 0) {? // tripped

? ? ? ? ? ? boolean ranAction = false;

? ? ? ? ? ? try {

//(4)調用barrierAction對應的runnable。

? ? ? ? ? ? ? ? final Runnable command = barrierCommand;

? ? ? ? ? ? ? ? if (command != null)

? ? ? ? ? ? ? ? ? ? command.run();

? ? ? ? ? ? ? ? ranAction = true;

//(5)喚醒之前到達屏障處阻塞的線程退客,并重置計數(shù)器和年代骏融。為什么先喚醒在重置呢?因為即使調用了條件變量的signalAll萌狂,但是當前線程還沒有釋放獨占鎖档玻,其他阻塞的線程也不能被實際喚醒。所以重置操作放在signalAll之后沒有問題茫藏。

? ? ? ? ? ? ? ? nextGeneration();

? ? ? ? ? ? ? ? return 0;

? ? ? ? ? ? } finally {

? ? ? ? ? ? ? ? if (!ranAction)

? ? ? ? ? ? ? ? ? ? breakBarrier();

? ? ? ? ? ? }

? ? ? ? }

? ? ? ? // (6)循環(huán)直到最后一個線程到達屏障處误趴、CyclicBarrier被broker、當前線程被中斷或阻塞超時务傲。

? ? ? ? for (;;) {

? ? ? ? ? ? try {

//(7)調用條件變量的await方法進行阻塞凉当。

? ? ? ? ? ? ? ? if (!timed)

? ? ? ? ? ? ? ? ? ? trip.await();

? ? ? ? ? ? ? ? else if (nanos > 0L)

? ? ? ? ? ? ? ? ? ? nanos = trip.awaitNanos(nanos);

? ? ? ? ? ? } catch (InterruptedException ie) {

//(8)如果當前線程被其他線程設置了中斷標志,則將CyclicBarrier設置為broker并喚醒其他在屏障點阻塞的線程售葡,這些線程會拋出BrokenBarrierException異常看杭。

? ? ? ? ? ? ? ? if (g == generation && ! g.broken) {

? ? ? ? ? ? ? ? ? ? breakBarrier();

? ? ? ? ? ? ? ? ? ? throw ie;

? ? ? ? ? ? ? ? } else {

? ? ? ? ? ? ? ? ? ? // We're about to finish waiting even if we had not

? ? ? ? ? ? ? ? ? ? // been interrupted, so this interrupt is deemed to

? ? ? ? ? ? ? ? ? ? // "belong" to subsequent execution.

? ? ? ? ? ? ? ? ? ? Thread.currentThread().interrupt();

? ? ? ? ? ? ? ? }

? ? ? ? ? ? }

? ? ? ? ? ? if (g.broken)

? ? ? ? ? ? ? ? throw new BrokenBarrierException();

//(9)如果年代不一致則正常返回。

? ? ? ? ? ? if (g != generation)

? ? ? ? ? ? ? ? return index;

//(10)當前線程阻塞超時了則拋出TimeoutException異常挟伙,將CyclicBarrier設置為broker并喚醒其他在屏障點阻塞的線程楼雹,這些線程會拋出BrokenBarrierException異常。

? ? ? ? ? ? if (timed && nanos <= 0L) {

? ? ? ? ? ? ? ? breakBarrier();

? ? ? ? ? ? ? ? throw new TimeoutException();

? ? ? ? ? ? }

? ? ? ? }

? ? } finally {

//(11)釋放鎖

? ? ? ? lock.unlock();

? ? }

}

void nextGeneration()

? ? 喚醒所有在屏障點出阻塞的線程像寒,并重置CyclicBarrier烘豹,只有在獲取獨占鎖的情況下才能調用此方法。

private void nextGeneration() {

? ? // signal completion of last generation

? ? trip.signalAll();

? ? // set up next generation

? ? count = parties;

? ? generation = new Generation();

}

void breakBarrier()?

? ? 設置CyclicBarrier為broken狀態(tài)诺祸,重置計數(shù)器并喚醒阻塞在屏障點的線程携悯,這些線程會拋出BrokenBarrierException異常。

private void breakBarrier() {

? ? generation.broken = true;

? ? count = parties;

? ? trip.signalAll();

}

void reset()?

? ? 重置CyclicBarrier筷笨。

public void reset() {

? ? final ReentrantLock lock = this.lock;

? ? lock.lock();

? ? try {

? ? ? ? breakBarrier();? // break the current generation

? ? ? ? nextGeneration(); // start a new generation

? ? } finally {

? ? ? ? lock.unlock();

? ? }

}

? ??CyclicBarrier通過獨占鎖+條件變量+計數(shù)器+年代變量來實現(xiàn)可重復使用的線程間同步器憔鬼。

? ? 今天的分享就到這,有看不明白的地方一定是我寫的不夠清楚胃夏,所有歡迎提任何問題以及改善方法轴或。

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市仰禀,隨后出現(xiàn)的幾起案子照雁,更是在濱河造成了極大的恐慌,老刑警劉巖答恶,帶你破解...
    沈念sama閱讀 219,110評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件饺蚊,死亡現(xiàn)場離奇詭異萍诱,居然都是意外死亡,警方通過查閱死者的電腦和手機污呼,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,443評論 3 395
  • 文/潘曉璐 我一進店門裕坊,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人燕酷,你說我怎么就攤上這事籍凝。” “怎么了苗缩?”我有些...
    開封第一講書人閱讀 165,474評論 0 356
  • 文/不壞的土叔 我叫張陵饵蒂,是天一觀的道長。 經(jīng)常有香客問我挤渐,道長苹享,這世上最難降的妖魔是什么双絮? 我笑而不...
    開封第一講書人閱讀 58,881評論 1 295
  • 正文 為了忘掉前任浴麻,我火速辦了婚禮,結果婚禮上囤攀,老公的妹妹穿的比我還像新娘软免。我一直安慰自己,他們只是感情好焚挠,可當我...
    茶點故事閱讀 67,902評論 6 392
  • 文/花漫 我一把揭開白布膏萧。 她就那樣靜靜地躺著,像睡著了一般蝌衔。 火紅的嫁衣襯著肌膚如雪榛泛。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,698評論 1 305
  • 那天噩斟,我揣著相機與錄音曹锨,去河邊找鬼。 笑死剃允,一個胖子當著我的面吹牛沛简,可吹牛的內容都是我干的。 我是一名探鬼主播斥废,決...
    沈念sama閱讀 40,418評論 3 419
  • 文/蒼蘭香墨 我猛地睜開眼椒楣,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了牡肉?” 一聲冷哼從身側響起捧灰,我...
    開封第一講書人閱讀 39,332評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎统锤,沒想到半個月后毛俏,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體吩屹,經(jīng)...
    沈念sama閱讀 45,796評論 1 316
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 37,968評論 3 337
  • 正文 我和宋清朗相戀三年拧抖,在試婚紗的時候發(fā)現(xiàn)自己被綠了煤搜。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 40,110評論 1 351
  • 序言:一個原本活蹦亂跳的男人離奇死亡唧席,死狀恐怖擦盾,靈堂內的尸體忽然破棺而出,到底是詐尸還是另有隱情淌哟,我是刑警寧澤迹卢,帶...
    沈念sama閱讀 35,792評論 5 346
  • 正文 年R本政府宣布,位于F島的核電站徒仓,受9級特大地震影響腐碱,放射性物質發(fā)生泄漏。R本人自食惡果不足惜掉弛,卻給世界環(huán)境...
    茶點故事閱讀 41,455評論 3 331
  • 文/蒙蒙 一症见、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧殃饿,春花似錦谋作、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,003評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至奈惑,卻和暖如春吭净,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背肴甸。 一陣腳步聲響...
    開封第一講書人閱讀 33,130評論 1 272
  • 我被黑心中介騙來泰國打工寂殉, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人雷滋。 一個月前我還...
    沈念sama閱讀 48,348評論 3 373
  • 正文 我出身青樓不撑,卻偏偏與公主長得像,于是被迫代替她去往敵國和親晤斩。 傳聞我的和親對象是個殘疾皇子焕檬,可洞房花燭夜當晚...
    茶點故事閱讀 45,047評論 2 355

推薦閱讀更多精彩內容