本文主要是介紹java中線程同步的幾種常用方式击你。
CountDownLatch
從字面上理解剃氧,CountDownLatch是一個(gè)同步等待的鎖敏储,根據(jù)官方的注釋可以看出這其實(shí)是一個(gè)同步執(zhí)行工具類。
先看一下官方注釋的前兩段
/**
* A synchronization aid that allows one or more threads to wait until
* a set of operations being performed in other threads completes.
*
* <p>A {@code CountDownLatch} is initialized with a given <em>count</em>.
* The {@link #await await} methods block until the current count reaches
* zero due to invocations of the {@link #countDown} method, after which
* all waiting threads are released and any subsequent invocations of
* {@link #await await} return immediately. This is a one-shot phenomenon
* -- the count cannot be reset. If you need a version that resets the
* count, consider using a {@link CyclicBarrier}.
翻譯一下就是:
/**
* CountDownLatch是一個(gè)朋鞍,允許一個(gè)或多個(gè)線程已添,
* 等待其他線程中執(zhí)行的一組操作完成的,同步輔助工具滥酥。
*
* CountDownLatch用給定的計(jì)數(shù)進(jìn)行初始化更舞。
* 當(dāng)線程點(diǎn)用await方法后被阻塞,直到當(dāng)前計(jì)數(shù)由于其他線程調(diào)用countDown()方法而達(dá)到零坎吻,
* 此后所有等待線程被放缆蝉,并且任何后續(xù)調(diào)用await立即返回。
* 這是一次性的操作瘦真,計(jì)數(shù)無(wú)法重置刊头。
* 如果您需要重置計(jì)數(shù)的版本,請(qǐng)考慮使用CyclicBarrier诸尽。
* /
解釋的很清楚原杂,不在贅述,接著看一下官方提供的偽代碼案例
官方案例一
class Driver { // ...
void main() throws InterruptedException {
CountDownLatch startSignal = new CountDownLatch(1);
CountDownLatch doneSignal = new CountDownLatch(N);
for (int i = 0; i < N; ++i) // create and start threads
new Thread(new Worker(startSignal, doneSignal)).start();
doSomethingElse(); // don't let run yet
startSignal.countDown(); // let all threads proceed
doSomethingElse();
doneSignal.await(); // wait for all to finish
}
}
class Worker implements Runnable {
private final CountDownLatch startSignal;
private final CountDownLatch doneSignal;
Worker(CountDownLatch startSignal, CountDownLatch doneSignal) {
this.startSignal = startSignal;
this.doneSignal = doneSignal;
}
public void run() {
try {
startSignal.await();
doWork();
doneSignal.countDown();
} catch (InterruptedException ex) {} // return;
}
void doWork() { ... }
}
這個(gè)案例很直白的說(shuō)明了您机,CountDownLatch可以讓多個(gè)線程同時(shí)初始化完成后等待穿肄,直到主線程要求他們開(kāi)始執(zhí)行為止,并且當(dāng)主線程調(diào)用await()之后阻塞直到所有的線程調(diào)用countDown()將計(jì)數(shù)減為0际看,主線程再次喚醒執(zhí)行后序操作咸产。
當(dāng)然這樣還有一些其他的注意點(diǎn),譬如子線程被中斷或者子線程的耗時(shí)操作很長(zhǎng)導(dǎo)致主線程一直阻塞等問(wèn)題仲闽。
官方案例二
class Driver2 { // ...
void main() throws InterruptedException {
CountDownLatch doneSignal = new CountDownLatch(N);
Executor e = ...
for (int i = 0; i < N; ++i) // create and start threads
e.execute(new WorkerRunnable(doneSignal, i));
doneSignal.await(); // wait for all to finish
}
}
class WorkerRunnable implements Runnable {
private final CountDownLatch doneSignal;
private final int i;
WorkerRunnable(CountDownLatch doneSignal, int i) {
this.doneSignal = doneSignal;
this.i = i;
}
public void run() {
try {
doWork(i);
doneSignal.countDown();
} catch (InterruptedException ex) {} // return;
}
void doWork() { ... }
}
}
這個(gè)案例是說(shuō)锐朴,當(dāng)一個(gè)問(wèn)題需要被分成n份進(jìn)行處理時(shí),將他們用線程池來(lái)執(zhí)行蔼囊,并讓主線程等待焚志。當(dāng)然官方注釋里還說(shuō)了,如果需要反復(fù)用這種形式來(lái)執(zhí)行一些問(wèn)題時(shí)可以考慮使用CyclicBarrier來(lái)代替CountDownLatch畏鼓,因?yàn)镃ountDownLatch是一次性的計(jì)數(shù)器無(wú)法重置酱酬。
CyclicBarrier
字面意思:可循環(huán)使用的柵欄。主要的作用也是讓指定個(gè)數(shù)的線程到達(dá)目標(biāo)位置后進(jìn)入等到狀態(tài)云矫,等所有的線程都到到目標(biāo)位置后同時(shí)開(kāi)始執(zhí)行膳沽。
構(gòu)造方法有2個(gè)
- CyclicBarrier(int parties),其中parties指等待的線程數(shù)目,當(dāng)await線程數(shù)達(dá)到parties時(shí)挑社,線程同時(shí)開(kāi)始執(zhí)行陨界。
- CyclicBarrier(int parties, Runnable barrierAction),第二個(gè)參數(shù)指所有線程達(dá)到后執(zhí)行的操作痛阻。
通過(guò)第二個(gè)構(gòu)造方法也可以實(shí)現(xiàn)CountDownLatch功能菌瘪,當(dāng)然這不是CyclicBarrier的目的
再來(lái)看一下到達(dá)目標(biāo)位置時(shí)的等待方法,有2個(gè)重載方法
- await()阱当,這個(gè)沒(méi)什么可說(shuō)的俏扩,到達(dá)指定位置后等待
- await(long timeout, TimeUnit unit),這個(gè)指到到指定位置后等待一段時(shí)間弊添,如果超時(shí)則繼續(xù)執(zhí)行后序操作录淡。
現(xiàn)在來(lái)看2個(gè)例子說(shuō)明一下使用CyclicBarrier可能出現(xiàn)的問(wèn)題
CyclicBarrier例一
public class CyclicBarrierTest {
public static void main(String[] args) {
try {
final int Num = 5;
CyclicBarrier cyclicBarrier = new CyclicBarrier(Num);
for (int i = 0; i < Num - 1; i++) {
new Thread(new RunnableOne(cyclicBarrier)).start();
}
Thread thread = new Thread(new RunnableTwo(cyclicBarrier));
thread.start();
Thread.sleep(2000);
thread.interrupt();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private static class RunnableOne implements Runnable {
CyclicBarrier mCyclicBarrier;
RunnableOne(CyclicBarrier cyclicBarrier) {
mCyclicBarrier = cyclicBarrier;
}
@Override
public void run() {
try {
System.out.println("wait in barrier");
mCyclicBarrier.await();
System.out.println("finish");
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}
}
private static class RunnableTwo implements Runnable {
CyclicBarrier mCyclicBarrier;
RunnableTwo(CyclicBarrier cyclicBarrier) {
mCyclicBarrier = cyclicBarrier;
}
@Override
public void run() {
try {
System.out.println("wait in barrier");
Thread.sleep(5000);
mCyclicBarrier.await();
System.out.println("finish");
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}
}
}
打印結(jié)果如下:
wait in barrier
wait in barrier
wait in barrier
wait in barrier
wait in barrier
java.lang.InterruptedException: sleep interrupted
at java.lang.Thread.sleep(Native Method)
at com.jxx.myjavatest.CyclicBarrierTest$RunnableTwo.run(CyclicBarrierTest.java:65)
at java.lang.Thread.run(Thread.java:748)
這個(gè)例子的意圖也很簡(jiǎn)單,啟動(dòng)4個(gè)RunnableOne油坝,隨后啟動(dòng)1個(gè)RunnableTwo嫉戚,在所有線程都await()之前其中一個(gè)線程被中斷了,因?yàn)闆](méi)有都await()成功澈圈,其他4個(gè)線程就一直阻塞彼水。
這就提醒我們,要在拋出異常后及時(shí)處理极舔,至少也要讓其他線程能正常執(zhí)行下去。
CyclicBarrier例二
public class CyclicBarrierTest {
public static void main(String[] args) {
final int Num = 5;
CyclicBarrier cyclicBarrier = new CyclicBarrier(Num);
for (int i = 0; i < Num - 1; i++) {
new Thread(new RunnableOne(cyclicBarrier)).start();
}
Thread thread = new Thread(new RunnableTwo(cyclicBarrier));
thread.start();
}
private static class RunnableOne implements Runnable {
CyclicBarrier mCyclicBarrier;
RunnableOne(CyclicBarrier cyclicBarrier) {
mCyclicBarrier = cyclicBarrier;
}
@Override
public void run() {
try {
System.out.println("wait in barrier");
Thread.sleep(5000);
mCyclicBarrier.await();
System.out.println("finish");
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}
}
private static class RunnableTwo implements Runnable {
CyclicBarrier mCyclicBarrier;
RunnableTwo(CyclicBarrier cyclicBarrier) {
mCyclicBarrier = cyclicBarrier;
}
@Override
public void run() {
try {
System.out.println("wait in barrier");
mCyclicBarrier.await(2000, TimeUnit.MILLISECONDS);
System.out.println("finish");
} catch (InterruptedException | BrokenBarrierException | TimeoutException e) {
e.printStackTrace();
}
}
}
}
打印如下:
wait in barrier
wait in barrier
wait in barrier
wait in barrier
wait in barrier
java.util.concurrent.TimeoutException
at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:257)
at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:435)
at com.jxx.myjavatest.CyclicBarrierTest$RunnableTwo.run(CyclicBarrierTest.java:61)
at java.lang.Thread.run(Thread.java:748)
java.util.concurrent.BrokenBarrierException
at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:207)
at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:362)
at com.jxx.myjavatest.CyclicBarrierTest$RunnableOne.run(CyclicBarrierTest.java:40)
at java.lang.Thread.run(Thread.java:748)
java.util.concurrent.BrokenBarrierException
at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:207)
at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:362)
at com.jxx.myjavatest.CyclicBarrierTest$RunnableOne.run(CyclicBarrierTest.java:40)
at java.lang.Thread.run(Thread.java:748)
java.util.concurrent.BrokenBarrierException
at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:207)
at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:362)
at com.jxx.myjavatest.CyclicBarrierTest$RunnableOne.run(CyclicBarrierTest.java:40)
at java.lang.Thread.run(Thread.java:748)
java.util.concurrent.BrokenBarrierException
at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:207)
at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:362)
at com.jxx.myjavatest.CyclicBarrierTest$RunnableOne.run(CyclicBarrierTest.java:40)
at java.lang.Thread.run(Thread.java:748)
這里模擬了一個(gè)await()超時(shí)的異常链瓦,可以看到在拋出異常后需要我們自己處理后期的事物拆魏。同時(shí)某一個(gè)線程拋出超時(shí)異常后,其他線程再次到達(dá)會(huì)拋出BrokenBarrierException異常慈俯,防止繼續(xù)等待渤刃。
Semaphore
其實(shí)Semaphore不該放到這里講,因?yàn)镾emaphore類似于Lock的存在贴膘,是對(duì)資源或者線程的一種控制卖子,但是這篇博文主要講了線程的等待喚起,信號(hào)量放這里講問(wèn)題也不大刑峡。
官方的說(shuō)法是信號(hào)量通常用來(lái)限制線程的數(shù)量洋闽,而不是控制訪問(wèn)一些(物理或邏輯)資源。用法也非常簡(jiǎn)單突梦,使用前先acquire()獲取許可诫舅,在獲取許可過(guò)程中,是線程是被阻塞的宫患,使用完畢release()許可即可刊懈。這點(diǎn)類似于Lock,不同的是Semaphore的acquire()可以被允許多次。
Semaphore有兩個(gè)構(gòu)造方法虚汛,可以指定Semaphore獲取是公平的還是非公平的匾浪,默認(rèn)是非公平
看這里,舉個(gè)栗子:
public class SemaphoreTest {
public static void main(String[] args) {
CountDownLatch startLatch = new CountDownLatch(1);
Semaphore semaphore = new Semaphore(3);
for (int i = 0; i < 10; i++) {
new Thread(new MyRunnable(startLatch, semaphore)).start();
}
startLatch.countDown();
}
private static class MyRunnable implements Runnable {
final CountDownLatch mCountDownLatch;
final Semaphore mSemaphore;
MyRunnable(CountDownLatch countDownLatch, Semaphore semaphore) {
mCountDownLatch = countDownLatch;
mSemaphore = semaphore;
}
@Override
public void run() {
try {
mCountDownLatch.await();
mSemaphore.acquire();
System.out.println(Thread.currentThread().getName() + " acquire success");
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
mSemaphore.release();
}
}
}
}
打印如下:
Thread-0 acquire success
Thread-1 acquire success
Thread-9 acquire success
Thread-3 acquire success
Thread-2 acquire success
Thread-4 acquire success
Thread-6 acquire success
Thread-7 acquire success
Thread-5 acquire success
Thread-8 acquire success
可以看出這是默認(rèn)的非公平鎖的情況卷哩,再來(lái)看一下公平鎖的情況
public class SemaphoreTest {
public static void main(String[] args) {
Semaphore semaphore = new Semaphore(3, true);
for (int i = 0; i < 10; i++) {
new Thread(new MyRunnable(semaphore)).start();
}
}
private static class MyRunnable implements Runnable {
final Semaphore mSemaphore;
MyRunnable(Semaphore semaphore) {
mSemaphore = semaphore;
}
@Override
public void run() {
try {
mSemaphore.acquire();
System.out.println(Thread.currentThread().getName() + " acquire success");
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
mSemaphore.release();
}
}
}
}
打印如下
Thread-0 acquire success
Thread-1 acquire success
Thread-2 acquire success
Thread-3 acquire success
Thread-4 acquire success
Thread-5 acquire success
Thread-6 acquire success
Thread-7 acquire success
Thread-8 acquire success
Thread-9 acquire success
當(dāng)然這里肯定有讀者想了蛋辈,直接將Semaphore置為true公平鎖的情況就好了,何必去掉CountDownLatch呢殉疼。
這里需要注意下梯浪,雖然你Semaphore是公平,但是CountDownLatch到點(diǎn)之后喚起線程的順序是隨機(jī)的瓢娜,并不一定就是線程入隊(duì)的順序喚起挂洛。
線程的join()
jion方法的作用是讓主線程阻塞等待子線程完成,當(dāng)然有幾個(gè)前提條件眠砾,下面細(xì)說(shuō)虏劲。
join方法有三個(gè)重載的版本
- final void join(); //一直等待到j(luò)oin的線程執(zhí)行完畢
- final synchronized void join(long millis); //等待指定時(shí)間后繼續(xù)執(zhí)行
- final synchronized void join(long millis, int nanos); 同上,時(shí)間處理了一下
第一個(gè)和第三個(gè)最后其實(shí)調(diào)用的都是第二個(gè)重載方法褒颈,我們來(lái)看一下源碼
public final synchronized void join(long millis)
throws InterruptedException {
long base = System.currentTimeMillis();
long now = 0;
if (millis < 0) {
throw new IllegalArgumentException("timeout value is negative");
}
if (millis == 0) {
while (isAlive()) {
wait(0);
}
} else {
while (isAlive()) {
long delay = millis - now;
if (delay <= 0) {
break;
}
wait(delay);
now = System.currentTimeMillis() - base;
}
}
}
直接看最后的while循環(huán)柒巫,可以看到,調(diào)用這個(gè)方法谷丸,其實(shí)是調(diào)用Object提供的wait(long timeout)讓主線程阻塞而已堡掏。有幾個(gè)注意點(diǎn)
- 子線程如果已經(jīng)銷毀,則直接跳過(guò)等待
- join(long millis) 是一個(gè)同步方位刨疼,意味著要想調(diào)用此方法需要先獲取到子線程的實(shí)例對(duì)象鎖
來(lái)看一個(gè)例子泉唁,驗(yàn)證一下第二點(diǎn):
public class JoinTest {
public static void main(String[] args) {
final Thread thread = new Thread(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(6000);
System.out.println("4---" + System.currentTimeMillis());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
thread.start();
try {
System.out.println("1---" + System.currentTimeMillis());
new Thread(new MyRunnable(thread)).start();
System.out.println("2---" + System.currentTimeMillis());
thread.join(2000);
System.out.println("3---" + System.currentTimeMillis());
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("finish " + System.currentTimeMillis());
}
private static class MyRunnable implements Runnable {
final Object mObject;
MyRunnable(Object object) {
mObject = object;
}
@Override
public void run() {
synchronized (mObject) {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
打印如下:
1---1525529803445
2---1525529803446
3---1525529807449
finish 1525529807449
4---1525529809445
可以很清晰的看到,打印完1之后立即打印了2揩慕,但是2和3之間打相差了4秒亭畜,原因就在join之前需要先獲取thread的鎖對(duì)象,但是需要MyRunnable釋放鎖之后才能執(zhí)行迎卤。
總結(jié)
好了拴鸵,又到總結(jié)的時(shí)間了。
- CountDownLatch相對(duì)于CyclicBarrier側(cè)重點(diǎn)是蜗搔,等待其他線程操作完成后主線程在繼續(xù)后續(xù)的操作
- CyclicBarrier相對(duì)于CountDownLatch側(cè)重點(diǎn)是劲藐,所有的線程操作完成后等待一起繼續(xù)后續(xù)操作。
- CountDownLatch不能重置狀態(tài)樟凄,CyclicBarrier可以重置后多次利用
- CountDownLatch和CyclicBarrier拋出異常后都需要妥善處理
- Semaphore于Lock類似瘩燥,主要用于線程的訪問(wèn)控制,構(gòu)造時(shí)可以指定是否是公平競(jìng)爭(zhēng)
- thread.join()主要是讓主線程等待子線程執(zhí)行完畢不同,有個(gè)注意點(diǎn)就是join()執(zhí)行之前需要獲取到子線程的實(shí)例對(duì)象鎖厉膀。