在JUC這個線程同步工具包下,有幾個比較游戲的類签夭,Semaphore齐邦、CountdownLatch和CyclicBarrier,你都用過嗎第租?下面我們就來簡單介紹下他們的用法措拇,并且提供些簡單的代碼示例,方便大家理解慎宾。
一丐吓、簡介
-
Semaphore:通常翻譯成
信號量
,用來控制共享變量可以同時被線程訪問的數(shù)量趟据。通過構(gòu)造方法指定計數(shù)券犁,線程使用
acquire()
方法獲取許可,當達到執(zhí)行計數(shù)后汹碱,其他線程將不能再次獲取粘衬,并進入阻塞,知道獲取許可的線程執(zhí)行1release()1釋放許可。 -
CountdownLatch:常被稱作
門栓
, 用來進行線程的同步協(xié)作稚新,等待所有線程到達后泼舱,在執(zhí)行后續(xù)操作。通過構(gòu)造方法指定線程數(shù)量枷莉,主線程使用
await()
進行等待線程到達,工作線程使用countDown()
進行報到尺迂,也就是讓計數(shù)減一笤妙。 -
CyclicBarrier:常被稱為
柵欄
,用來進行線程的同步協(xié)作噪裕,等待達到預設的計數(shù)蹲盘,在執(zhí)行后續(xù)操作。通過構(gòu)造方法指定計數(shù)膳音,線程使用
await()
方法進行同步等待召衔,當線程等待數(shù)達到計數(shù)值時繼續(xù)執(zhí)行。
二祭陷、使用案例
2.1 Semaphore
共十個線程苍凛,設置兩個信號量:
public static void main(String[] args) {
Semaphore semaphore = new Semaphore(2);
for (int i = 0; i < 10; i++) {
new Thread(() -> {
try {
semaphore.acquire();
System.out.println("線程" + Thread.currentThread().getName() + "占用時間:" + LocalDateTime.now());
Thread.sleep(2000);
semaphore.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
}
結(jié)果如下,每兩個線程的執(zhí)行時間是相同的兵志,即每次只允許兩個線程同時執(zhí)行:
線程Thread-2占用時間:2022-02-16T09:56:25.589
線程Thread-1占用時間:2022-02-16T09:56:25.589
線程Thread-0占用時間:2022-02-16T09:56:27.593
線程Thread-4占用時間:2022-02-16T09:56:27.593
線程Thread-5占用時間:2022-02-16T09:56:29.595
線程Thread-3占用時間:2022-02-16T09:56:29.595
線程Thread-8占用時間:2022-02-16T09:56:31.603
線程Thread-9占用時間:2022-02-16T09:56:31.603
線程Thread-6占用時間:2022-02-16T09:56:33.615
線程Thread-7占用時間:2022-02-16T09:56:33.615
2.2 CountdownLatch
設置數(shù)值為10醇蝴,10個線程,只有當10個線程全部到達后想罕,主線程才會繼續(xù)執(zhí)行:
public static void main(String[] args) throws InterruptedException {
// 使用倒計數(shù)門閂器 悠栓,迫使主線程進入等待 ;設置門栓的值為10
CountDownLatch latch = new CountDownLatch(10);
new Thread(() -> {
for (int i = 0; i < 10; i++) {
//門栓值減1
latch.countDown();
System.out.println("當前門栓值:" + latch.getCount());
}
}).start();
//阻塞主線程按价,等門栓值為0惭适,主線程執(zhí)行
latch.await();
System.out.println("主線程執(zhí)行。楼镐。癞志。");
}
結(jié)果如下:
當前門栓值:9
當前門栓值:8
當前門栓值:7
當前門栓值:6
當前門栓值:5
當前門栓值:4
當前門栓值:3
當前門栓值:2
當前門栓值:1
當前門栓值:0
主線程執(zhí)行。框产。今阳。
2.3 CyclicBarrier
設置計數(shù)值為6,1個主線程茅信,5個工作線程盾舌,當6個線程全部到達后,才會繼續(xù)執(zhí)行:
public static void main(String[] args) throws BrokenBarrierException, InterruptedException {
CyclicBarrier cyclicBarrier = new CyclicBarrier(6);
for (int i = 0; i < 5; i++) {
new Thread(() -> {
try {
System.out.println(Thread.currentThread().getName() + "準備就緒");
cyclicBarrier.await();
System.out.println(Thread.currentThread().getName() + "到達");
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}).start();
}
Thread.sleep(1000);
System.out.println(Thread.currentThread().getName() + "準備開始");
cyclicBarrier.await();
}
結(jié)果:
Thread-0準備就緒
Thread-2準備就緒
Thread-3準備就緒
Thread-1準備就緒
Thread-4準備就緒
main準備開始
Thread-2到達
Thread-3到達
Thread-0到達
Thread-1到達
Thread-4到達
三蘸鲸、原理
3.1 Semaphore
首先看下類圖:
僅僅包含我們常見的三個內(nèi)部類:Sync妖谴,F(xiàn)airSync,NonfairSync。Sync是AQS的子類膝舅。
3.1.1 構(gòu)造方法
直接看最底層嗡载,我們設置的計數(shù)被設置成state:
Sync(int permits) {
setState(permits);
}
3.1.2 acquire()方法
下面簡單分析其源碼,首先來看acquire()方法:
獲取AQS當中的可中斷共享鎖:
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
AQS方法:
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
嘗試獲取共享鎖tryAcquireShared()仍稀,前面學習中提到過洼滚,默認是非公平鎖:
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -2694183684443567898L;
NonfairSync(int permits) {
super(permits);
}
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
}
真正獲取鎖的邏輯:
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
// 獲取狀態(tài)
int available = getState();
int remaining = available - acquires;
// 如果小于0,獲取失敗, 進入 doAcquireSharedInterruptibly
if (remaining < 0 ||
// 如果cas成功技潘,返回正數(shù)遥巴,表示成功
compareAndSetState(available, remaining))
return remaining;
}
}
doAcquireSharedInterruptibly方法:
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
// 添加到等待隊列
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (; ; ) {
// 獲取前面的節(jié)點
final Node p = node.predecessor();
if (p == head) {
// 如果是頭結(jié)點,嘗試獲取許可
int r = tryAcquireShared(arg);
if (r >= 0) {
// 成功
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
// 不成功, 設置上一個節(jié)點 阻塞
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
3.1.3 release()方法
走的是Sync的釋放共享鎖方法
public void release() {
sync.releaseShared(1);
}
AQS的方法:
public final boolean releaseShared(int arg) {
// 嘗試釋放鎖享幽,Semaphore的方法
if (tryReleaseShared(arg)) {
// 釋放铲掐,AQS的方法,處理隊列和狀態(tài)相關內(nèi)容
doReleaseShared();
return true;
}
return false;
}
Semaphore自己實現(xiàn)的方法,循環(huán)值桩,知道成功為止:
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
// 使用cas
if (compareAndSetState(current, next))
return true;
}
}
3.2 CountdownLatch
類圖如下:
只有一個內(nèi)部類Sync摆霉。Sync繼承自AQS。
3.2.1 構(gòu)造方法:
直接點進去看最后面奔坟,如下:
Sync(int count) {
setState(count);
}
state被設置為我們指定的值携栋。
3.2.2 await() 方法
同樣使用的是Sync的方法:
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
不同之處是其自己實現(xiàn)的tryAcquireShared方法:
protected int tryAcquireShared(int acquires) {
// 構(gòu)造設置的值肯定是大于0,此處一定是-1咳秉,所以會阻塞
return (getState() == 0) ? 1 : -1;
}
3.2.3 countDown()方法
也是通過AQS的方法如下:
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
直接看其自己實現(xiàn)的tryReleaseShared方法刻两,
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
// 每一次countDown就將 state - 1
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
3.3 CyclicBarrier
3.3.1 構(gòu)造方法
直接看底層構(gòu)造,parties 就是我們設置的線程數(shù)量滴某,初始化時count與parties 相等:
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
3.3.2 await()方法
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
看看其dowait()方法磅摹,很長,包含各種策略霎奢,主要看中文注釋的重點位置就行了户誓,建議寫代碼跟蹤一下:
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
// 使用的ReetrantLock
final ReentrantLock lock = this.lock;
// 上鎖,防止多線程造成并發(fā)問題
lock.lock();
try {
final Generation g = generation;
if (g.broken)
throw new BrokenBarrierException();
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
// 總線程數(shù)-1
int index = --count;
if (index == 0) { // tripped
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if (command != null)
command.run();
ranAction = true;
// 喚醒所有等待中的線程
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}
// 循環(huán)幕侠,直到觸發(fā)帝美、中斷晤硕、中斷或超時
for (;;) {
try {
// 默認是false
if (!timed)
// conditiont條件隊列 的await()
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
Thread.currentThread().interrupt();
}
}
if (g.broken)
throw new BrokenBarrierException();
if (g != generation)
// 返回剩余計數(shù)
return index;
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
// 釋放鎖
lock.unlock();
}
}
當所有的index減少為0時,會走次方法nextGeneration()舞箍,此方法主要的作用就是更新柵欄狀態(tài),并且喚醒所有等待的線程疏橄。
private void nextGeneration() {
// signal completion of last generation
trip.signalAll();
// set up next generation
count = parties;
generation = new Generation();
}
上面的流程有幾個點:
- 上鎖lock
- 減去計數(shù)index
- 如果index為0占拍,就執(zhí)行nextGeneration喚醒所有等待的線程略就,并重置狀態(tài)晃酒。
- 釋放鎖unlock
3.4 簡單總結(jié)
- Semaphore、CountdownLatch和CyclicBarrier都是在JUC下的贝次,用于線程同步的工具類崔兴。
- Semaphore、CountdownLatch的核心還是AQS蛔翅,而CyclicBarrier則不是敲茄。
- Semaphore搁宾、CountdownLatch的狀態(tài)修改都是基于CAS(比較并替換)倔幼,而CyclicBarrier使用了ReentrantLock。
- 雖說CyclicBarrier沒有直接使用AQS的子類损同,但是其使用的ReentrantLock仍然是通過AQS實現(xiàn)的。
- AQS是JUC下的核心膏燃。
關于上面的三個類,就簡單介紹完了组哩,我們在工作當中其實很容易記混,希望本文可以給你帶來一點幫助蛛砰,讓你能夠在項目當中正確的使用它們。