? ? 前文介紹了使用CountDownLatch來實現(xiàn)線程間同步,但是CountDownLatch的計數(shù)器是一次性的耕驰,當計數(shù)器值減到0之后再調用await或countdown方法就會立刻返回黎茎。今天講解的CyclicBarrier是一種可重置的線程間同步堡称,當指定個數(shù)的線程全部到達了一個狀態(tài)后再全部同時執(zhí)行,并重置CyclicBarrier翻翩。
? ? 下面通過一個代碼示例介紹CountDownLatch的使用檐盟,下面的示例中啟動兩個線程A褂萧、B,并在A遵堵、B線程都完成各自的任務之后箱玷,對A怨规、B線程完成的任務數(shù)進行統(tǒng)計陌宿。
任務處理類MyRunnable?:
class MyRunnable implements Runnable {
? ? private CyclicBarrier cyclicBarrier = null;
? ? private int threadId;
? ? private int taskCount;
? ? private LinkedBlockingQueue<Integer> taskSummaryQueue = null;
? ? public MyRunnable(CyclicBarrier cyclicBarrier, int threadId, int taskCount, LinkedBlockingQueue<Integer> taskSummaryQueue) {
? ? ? ? this.cyclicBarrier = cyclicBarrier;
? ? ? ? this.threadId = threadId;
? ? ? ? this.taskCount = taskCount;
? ? ? ? this.taskSummaryQueue = taskSummaryQueue;
? ? }
? ? @Override
? ? public void run() {
? ? ? ? String threadName = Thread.currentThread().getName();
? ? ? ? if (cyclicBarrier != null) {
? ? ? ? ? ? try {
//執(zhí)行任務
? ? ? ? ? ? ? ? for (int i = 0; i < taskCount; i++) {
? ? ? ? ? ? ? ? ? ? Thread.sleep(1000);
? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? System.out.println("thread name:" + threadName + " thread id:" + threadId + "到達屏障處");
//到達屏障處,阻塞直到到達屏障處的線程個數(shù)等于CyclicBarrier 中的parties值波丰。
? ? ? ? ? ? ? ? int await = cyclicBarrier.await();
? ? ? ? ? ? ? ? System.out.println("thread name:" + threadName + "thread:" + threadId + " await:" + await);
? ? ? ? ? ? } catch (InterruptedException e) {
? ? ? ? ? ? ? ? System.out.println("thread name:" + threadName + " thread:" + threadId + " InterruptedException");
? ? ? ? ? ? ? ? e.printStackTrace();
? ? ? ? ? ? } catch (BrokenBarrierException e) {
? ? ? ? ? ? ? ? System.out.println("thread name:" + threadName + " thread:" + threadId + " BrokenBarrierException");
? ? ? ? ? ? ? ? e.printStackTrace();
? ? ? ? ? ? }
? ? ? ? }
? ? }
}
main函數(shù)
public static void main(String[] args) {
? ? ThreadGroup group = new ThreadGroup("myThreadGroup");
//創(chuàng)建一個多線程安全的隊列壳坪,用來統(tǒng)計完成的任務數(shù)。
? ? LinkedBlockingQueue<Integer> queue = new LinkedBlockingQueue<>();
//創(chuàng)建回旋屏障CyclicBarrier 掰烟,并指定barrierAction爽蝴,barrierAction會在最后一個到達屏障處的線程執(zhí)行。
? ? CyclicBarrier cyclicBarrier = new CyclicBarrier(2, new Runnable() {
? ? ? ? @Override
? ? ? ? public void run() {
? ? ? ? ? ? String threadName = Thread.currentThread().getName();
? ? ? ? ? ? int finishedTask = 0;
//匯總完成任務數(shù)
? ? ? ? ? ? for (Integer integer : queue) {
? ? ? ? ? ? ? ? if (integer != null) {
? ? ? ? ? ? ? ? ? ? finishedTask += integer;
? ? ? ? ? ? ? ? }
? ? ? ? ? ? }
? ? ? ? ? ? System.out.println("barrierAction in thread name:" + threadName + " finished task:" + finishedTask);
? ? ? ? }
? ? });
? ? Thread thread1 = new Thread(group,new MyRunnable(cyclicBarrier, 1,5,queue), "thread1");
? ? thread1.start();
? ? try {
? ? ? ? Thread.sleep(10);
? ? } catch (InterruptedException e) {
? ? ? ? e.printStackTrace();
? ? }
? ? Thread thread2 = new Thread(group, new MyRunnable(cyclicBarrier, 2,10,queue), "thread2");
? ? thread2.start();
? ? try {
? ? ? ? Thread.sleep(10);
? ? } catch (InterruptedException e) {
? ? ? ? e.printStackTrace();
? ? }
}
執(zhí)行結果:
thread name:thread1 thread id:1到達屏障處
thread name:thread2 thread id:2到達屏障處
barrierAction in thread name:thread2 finished task:15
thread name:thread2thread:2 屏障之后
thread name:thread1thread:1 屏障之后
? ? 下面對CyclicBarrier內部原理進行剖析纫骑。
CyclicBarrier內部的成員變量如下:
//獨占鎖蝎亚,用來操作計數(shù)器。
private final ReentrantLock lock =new ReentrantLock();
//條件變量先馆,阻塞到達屏障處的線程发框,直到計數(shù)器為0。
private final Condition trip =lock.newCondition();
//重置之后會將計數(shù)器值重置為該值煤墙。
private final int parties;
//當計數(shù)器值為0之后會觸發(fā)該runnable梅惯。
private final Runnable barrierCommand;
//年代變量,當重置仿野、計數(shù)器減到0或拋出異常铣减,年代變量會重新創(chuàng)建進入下一個年代岛心。
private Generation generation =new Generation();
//計數(shù)器役拴,當線程調用awit到達屏障點時會進行減一。
private int count;
int await()
? ? 調用await方法則表示調用線程到達屏障處振湾,阻塞直到parties個線程到達屏障處球涛。如果當前線程不是最后一個到達屏障處的線程則會進行阻塞魄梯,當有如下情況發(fā)生當前線程才會返回:
1.最后一個線程到達屏障處。
2.其他線程調用了當前線程的中斷標志宾符。
3.其他到達屏障處的線程被設置了中斷標志酿秸。
4.其他到達屏障處阻塞的線程到了阻塞過期時間。
5.其他線程調用了CyclicBarrier的rest方法魏烫。
public int await() throws InterruptedException, BrokenBarrierException {
? ? try {
? ? ? ? return dowait(false, 0L);
? ? } catch (TimeoutException toe) {
? ? ? ? throw new Error(toe); // cannot happen
? ? }
}
int dowait(boolean timed, long nanos)
private int dowait(boolean timed, long nanos)
? ? throws InterruptedException, BrokenBarrierException,
? ? ? ? ? TimeoutException {
//(1)獲取獨占鎖辣苏。
? ? final ReentrantLock lock = this.lock;
? ? lock.lock();
? ? try {
? ? ? ? final Generation g = generation;
? ? ? ? if (g.broken)
? ? ? ? ? ? throw new BrokenBarrierException();
? ? ? ? if (Thread.interrupted()) {
? ? ? ? ? ? breakBarrier();
? ? ? ? ? ? throw new InterruptedException();
? ? ? ? }
//(2)計數(shù)器減1肝箱。
? ? ? ? int index = --count;
//(3)如果計數(shù)器為0,則說明最后一個線程到達了屏障處稀蟋,則調用barrierAction后喚醒之前到達屏障處阻塞的線程煌张。
? ? ? ? if (index == 0) {? // tripped
? ? ? ? ? ? boolean ranAction = false;
? ? ? ? ? ? try {
//(4)調用barrierAction對應的runnable。
? ? ? ? ? ? ? ? final Runnable command = barrierCommand;
? ? ? ? ? ? ? ? if (command != null)
? ? ? ? ? ? ? ? ? ? command.run();
? ? ? ? ? ? ? ? ranAction = true;
//(5)喚醒之前到達屏障處阻塞的線程退客,并重置計數(shù)器和年代骏融。為什么先喚醒在重置呢?因為即使調用了條件變量的signalAll萌狂,但是當前線程還沒有釋放獨占鎖档玻,其他阻塞的線程也不能被實際喚醒。所以重置操作放在signalAll之后沒有問題茫藏。
? ? ? ? ? ? ? ? nextGeneration();
? ? ? ? ? ? ? ? return 0;
? ? ? ? ? ? } finally {
? ? ? ? ? ? ? ? if (!ranAction)
? ? ? ? ? ? ? ? ? ? breakBarrier();
? ? ? ? ? ? }
? ? ? ? }
? ? ? ? // (6)循環(huán)直到最后一個線程到達屏障處误趴、CyclicBarrier被broker、當前線程被中斷或阻塞超時务傲。
? ? ? ? for (;;) {
? ? ? ? ? ? try {
//(7)調用條件變量的await方法進行阻塞凉当。
? ? ? ? ? ? ? ? if (!timed)
? ? ? ? ? ? ? ? ? ? trip.await();
? ? ? ? ? ? ? ? else if (nanos > 0L)
? ? ? ? ? ? ? ? ? ? nanos = trip.awaitNanos(nanos);
? ? ? ? ? ? } catch (InterruptedException ie) {
//(8)如果當前線程被其他線程設置了中斷標志,則將CyclicBarrier設置為broker并喚醒其他在屏障點阻塞的線程售葡,這些線程會拋出BrokenBarrierException異常看杭。
? ? ? ? ? ? ? ? if (g == generation && ! g.broken) {
? ? ? ? ? ? ? ? ? ? breakBarrier();
? ? ? ? ? ? ? ? ? ? throw ie;
? ? ? ? ? ? ? ? } else {
? ? ? ? ? ? ? ? ? ? // We're about to finish waiting even if we had not
? ? ? ? ? ? ? ? ? ? // been interrupted, so this interrupt is deemed to
? ? ? ? ? ? ? ? ? ? // "belong" to subsequent execution.
? ? ? ? ? ? ? ? ? ? Thread.currentThread().interrupt();
? ? ? ? ? ? ? ? }
? ? ? ? ? ? }
? ? ? ? ? ? if (g.broken)
? ? ? ? ? ? ? ? throw new BrokenBarrierException();
//(9)如果年代不一致則正常返回。
? ? ? ? ? ? if (g != generation)
? ? ? ? ? ? ? ? return index;
//(10)當前線程阻塞超時了則拋出TimeoutException異常挟伙,將CyclicBarrier設置為broker并喚醒其他在屏障點阻塞的線程楼雹,這些線程會拋出BrokenBarrierException異常。
? ? ? ? ? ? if (timed && nanos <= 0L) {
? ? ? ? ? ? ? ? breakBarrier();
? ? ? ? ? ? ? ? throw new TimeoutException();
? ? ? ? ? ? }
? ? ? ? }
? ? } finally {
//(11)釋放鎖
? ? ? ? lock.unlock();
? ? }
}
void nextGeneration()
? ? 喚醒所有在屏障點出阻塞的線程像寒,并重置CyclicBarrier烘豹,只有在獲取獨占鎖的情況下才能調用此方法。
private void nextGeneration() {
? ? // signal completion of last generation
? ? trip.signalAll();
? ? // set up next generation
? ? count = parties;
? ? generation = new Generation();
}
void breakBarrier()?
? ? 設置CyclicBarrier為broken狀態(tài)诺祸,重置計數(shù)器并喚醒阻塞在屏障點的線程携悯,這些線程會拋出BrokenBarrierException異常。
private void breakBarrier() {
? ? generation.broken = true;
? ? count = parties;
? ? trip.signalAll();
}
void reset()?
? ? 重置CyclicBarrier筷笨。
public void reset() {
? ? final ReentrantLock lock = this.lock;
? ? lock.lock();
? ? try {
? ? ? ? breakBarrier();? // break the current generation
? ? ? ? nextGeneration(); // start a new generation
? ? } finally {
? ? ? ? lock.unlock();
? ? }
}
? ??CyclicBarrier通過獨占鎖+條件變量+計數(shù)器+年代變量來實現(xiàn)可重復使用的線程間同步器憔鬼。
? ? 今天的分享就到這,有看不明白的地方一定是我寫的不夠清楚胃夏,所有歡迎提任何問題以及改善方法轴或。