實例代碼
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(3);
//規(guī)定幾步
CountDownLatch countDownLatch = new CountDownLatch(3);//構(gòu)造函數(shù)
executorService.submit(()->{
System.out.println("飯煮好了");
countDownLatch.countDown();
});
executorService.submit(()->{
System.out.println("菜炒好了");
countDownLatch.countDown();
});
executorService.submit(()->{
System.out.println("湯煮好了");
countDownLatch.countDown();
});
countDownLatch.await();
//只有上面規(guī)定幾步都執(zhí)行完成了才會執(zhí)行下面語句武鲁,否則繼續(xù)等待
System.out.println("出來吃飯了");
}
構(gòu)造函數(shù)new CountDownLatch(3)
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
Sync代碼繼承AQS
private static final class Sync extends AbstractQueuedSynchronizer {
//將AQS的state設(shè)置為count
Sync(int count) {
setState(count);
}
}
countDownLatch.countDown()方法
public void countDown() {
sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
//判斷是否達到臨界點达舒,能否釋放鎖
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
//主要是利用cas獲取
protected boolean tryReleaseShared(int releases) {
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
/*
* 釋放鎖
*/
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
countDownLatch.await();
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
//判斷state是否大于0牡直,如果其他的countdown方法已經(jīng)執(zhí)行完了這邊會返回1橄维,這個時候這邊不會進入該方法武契,也不會中斷線程钾虐,直接執(zhí)行下面的業(yè)務(wù)邏輯
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
//添加一個waiter梳码,設(shè)置模式為SHARED(包含初始化waiter隊列,可參考AQS文章)
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
//找到頭結(jié)點
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
//如果state為0則返回1(getState() == 0) ? 1 : -1
if (r >= 0) {
//當(dāng)state為0智什,則表示waiter可以執(zhí)行了
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
//停止該線程,喚醒線程也是這邊喚醒的
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}