- 一合武、線程狀態(tài)轉(zhuǎn)換
- 二服傍、使用線程
- 三纬纪、基礎(chǔ)線程機(jī)制
- 四、中斷
- 五珊楼、互斥同步
- 六怪得、線程之間的協(xié)作
- 七滚朵、J.U.C - AQS
- 八通砍、J.U.C - 其它組件
- 九玛臂、線程不安全示例
- 十、Java 內(nèi)存模型
- 十一封孙、線程安全
- 十二迹冤、鎖優(yōu)化
- 十三、多線程開發(fā)良好的實踐
-
參考資料
一虎忌、線程狀態(tài)轉(zhuǎn)換
<div align="center"> <img src="../pics//ace830df-9919-48ca-91b5-60b193f593d2.png" width=""/> </div>
新建(New)
創(chuàng)建后尚未啟動泡徙。
可運(yùn)行(Runnable)
可能正在運(yùn)行,也可能正在等待 CPU 時間片膜蠢。
包含了操作系統(tǒng)線程狀態(tài)中的 Running 和 Ready锋勺。
阻塞(Blocking)
等待獲取一個排它鎖,如果其線程釋放了鎖就會結(jié)束此狀態(tài)狡蝶。
無限期等待(Waiting)
等待其它線程顯式地喚醒,否則不會被分配 CPU 時間片贮勃。
進(jìn)入方法 | 退出方法 |
---|---|
沒有設(shè)置 Timeout 參數(shù)的 Object.wait() 方法 | Object.notify() / Object.notifyAll() |
沒有設(shè)置 Timeout 參數(shù)的 Thread.join() 方法 | 被調(diào)用的線程執(zhí)行完畢 |
LockSupport.park() 方法 | - |
限期等待(Timed Waiting)
無需等待其它線程顯式地喚醒贪惹,在一定時間之后會被系統(tǒng)自動喚醒。
調(diào)用 Thread.sleep() 方法使線程進(jìn)入限期等待狀態(tài)時寂嘉,常常用“使一個線程睡眠”進(jìn)行描述奏瞬。
調(diào)用 Object.wait() 方法使線程進(jìn)入限期等待或者無限期等待時,常常用“掛起一個線程”進(jìn)行描述泉孩。
睡眠和掛起是用來描述行為硼端,而阻塞和等待用來描述狀態(tài)。
阻塞和等待的區(qū)別在于寓搬,阻塞是被動的珍昨,它是在等待獲取一個排它鎖。而等待是主動的句喷,通過調(diào)用 Thread.sleep() 和 Object.wait() 等方法進(jìn)入镣典。
進(jìn)入方法 | 退出方法 |
---|---|
Thread.sleep() 方法 | 時間結(jié)束 |
設(shè)置了 Timeout 參數(shù)的 Object.wait() 方法 | 時間結(jié)束 / Object.notify() / Object.notifyAll() |
設(shè)置了 Timeout 參數(shù)的 Thread.join() 方法 | 時間結(jié)束 / 被調(diào)用的線程執(zhí)行完畢 |
LockSupport.parkNanos() 方法 | - |
LockSupport.parkUntil() 方法 | - |
死亡(Terminated)
可以是線程結(jié)束任務(wù)之后自己結(jié)束,或者產(chǎn)生了異常而結(jié)束唾琼。
二兄春、使用線程
有三種使用線程的方法:
- 實現(xiàn) Runnable 接口;
- 實現(xiàn) Callable 接口锡溯;
- 繼承 Thread 類赶舆。
實現(xiàn) Runnable 和 Callable 接口的類只能當(dāng)做一個可以在線程中運(yùn)行的任務(wù)哑姚,不是真正意義上的線程,因此最后還需要通過 Thread 來調(diào)用芜茵⌒鹆浚可以說任務(wù)是通過線程驅(qū)動從而執(zhí)行的。
實現(xiàn) Runnable 接口
需要實現(xiàn) run() 方法夕晓。
通過 Thread 調(diào)用 start() 方法來啟動線程宛乃。
public class MyRunnable implements Runnable {
public void run() {
// ...
}
}
public static void main(String[] args) {
MyRunnable instance = new MyRunnable();
Thread thread = new Thread(instance);
thread.start();
}
實現(xiàn) Callable 接口
與 Runnable 相比,Callable 可以有返回值蒸辆,返回值通過 FutureTask 進(jìn)行封裝征炼。
public class MyCallable implements Callable<Integer> {
public Integer call() {
return 123;
}
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
MyCallable mc = new MyCallable();
FutureTask<Integer> ft = new FutureTask<>(mc);
Thread thread = new Thread(ft);
thread.start();
System.out.println(ft.get());
}
繼承 Thread 類
同樣也是需要實現(xiàn) run() 方法,因為 Thread 類也實現(xiàn)了 Runable 接口躬贡。
public class MyThread extends Thread {
public void run() {
// ...
}
}
public static void main(String[] args) {
MyThread mt = new MyThread();
mt.start();
}
實現(xiàn)接口 VS 繼承 Thread
實現(xiàn)接口會更好一些谆奥,因為:
- Java 不支持多重繼承,因此繼承了 Thread 類就無法繼承其它類拂玻,但是可以實現(xiàn)多個接口酸些;
- 類可能只要求可執(zhí)行就行,繼承整個 Thread 類開銷過大檐蚜。
三魄懂、基礎(chǔ)線程機(jī)制
Executor
Executor 管理多個異步任務(wù)的執(zhí)行,而無需程序員顯式地管理線程的生命周期闯第。這里的異步是指多個任務(wù)的執(zhí)行互不干擾市栗,不需要進(jìn)行同步操作。
主要有三種 Executor:
- CachedThreadPool:一個任務(wù)創(chuàng)建一個線程咳短;
- FixedThreadPool:所有任務(wù)只能使用固定大小的線程填帽;
- SingleThreadExecutor:相當(dāng)于大小為 1 的 FixedThreadPool。
public static void main(String[] args) {
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < 5; i++) {
executorService.execute(new MyRunnable());
}
executorService.shutdown();
}
Daemon
守護(hù)線程是程序運(yùn)行時在后臺提供服務(wù)的線程咙好,不屬于程序中不可或缺的部分篡腌。
當(dāng)所有非守護(hù)線程結(jié)束時,程序也就終止勾效,同時會殺死所有守護(hù)線程嘹悼。
main() 屬于非守護(hù)線程。
使用 setDaemon() 方法將一個線程設(shè)置為守護(hù)線程层宫。
public static void main(String[] args) {
Thread thread = new Thread(new MyRunnable());
thread.setDaemon(true);
}
sleep()
Thread.sleep(millisec) 方法會休眠當(dāng)前正在執(zhí)行的線程绘迁,millisec 單位為毫秒。
sleep() 可能會拋出 InterruptedException卒密,因為異常不能跨線程傳播回 main() 中缀台,因此必須在本地進(jìn)行處理。線程中拋出的其它異常也同樣需要在本地進(jìn)行處理哮奇。
public void run() {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
yield()
對靜態(tài)方法 Thread.yield() 的調(diào)用聲明了當(dāng)前線程已經(jīng)完成了生命周期中最重要的部分膛腐,可以切換給其它線程來執(zhí)行睛约。該方法只是對線程調(diào)度器的一個建議,而且也只是建議具有相同優(yōu)先級的其它線程可以運(yùn)行哲身。
public void run() {
Thread.yield();
}
四辩涝、中斷
一個線程執(zhí)行完畢之后會自動結(jié)束,如果在運(yùn)行過程中發(fā)生異常也會提前結(jié)束勘天。
InterruptedException
通過調(diào)用一個線程的 interrupt() 來中斷該線程怔揩,如果該線程處于阻塞、限期等待或者無限期等待狀態(tài)脯丝,那么就會拋出 InterruptedException商膊,從而提前結(jié)束該線程。但是不能中斷 I/O 阻塞和 synchronized 鎖阻塞宠进。
對于以下代碼晕拆,在 main() 中啟動一個線程之后再中斷它,由于線程中調(diào)用了 Thread.sleep() 方法材蹬,因此會拋出一個 InterruptedException实幕,從而提前結(jié)束線程,不執(zhí)行之后的語句堤器。
public class InterruptExample {
private static class MyThread1 extends Thread {
@Override
public void run() {
try {
Thread.sleep(2000);
System.out.println("Thread run");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public static void main(String[] args) throws InterruptedException {
Thread thread1 = new MyThread1();
thread1.start();
thread1.interrupt();
System.out.println("Main run");
}
Main run
java.lang.InterruptedException: sleep interrupted
at java.lang.Thread.sleep(Native Method)
at InterruptExample.lambda$main$0(InterruptExample.java:5)
at InterruptExample$$Lambda$1/713338599.run(Unknown Source)
at java.lang.Thread.run(Thread.java:745)
interrupted()
如果一個線程的 run() 方法執(zhí)行一個無限循環(huán)昆庇,并且沒有執(zhí)行 sleep() 等會拋出 InterruptedException 的操作,那么調(diào)用線程的 interrupt() 方法就無法使線程提前結(jié)束闸溃。
但是調(diào)用 interrupt() 方法會設(shè)置線程的中斷標(biāo)記整吆,此時調(diào)用 interrupted() 方法會返回 true。因此可以在循環(huán)體中使用 interrupted() 方法來判斷線程是否處于中斷狀態(tài)圈暗,從而提前結(jié)束線程。
public class InterruptExample {
private static class MyThread2 extends Thread {
@Override
public void run() {
while (!interrupted()) {
// ..
}
System.out.println("Thread end");
}
}
}
public static void main(String[] args) throws InterruptedException {
Thread thread2 = new MyThread2();
thread2.start();
thread2.interrupt();
}
Thread end
Executor 的中斷操作
調(diào)用 Executor 的 shutdown() 方法會等待線程都執(zhí)行完畢之后再關(guān)閉裕膀,但是如果調(diào)用的是 shutdownNow() 方法员串,則相當(dāng)于調(diào)用每個線程的 interrupt() 方法。
以下使用 Lambda 創(chuàng)建線程昼扛,相當(dāng)于創(chuàng)建了一個匿名內(nèi)部線程寸齐。
public static void main(String[] args) {
ExecutorService executorService = Executors.newCachedThreadPool();
executorService.execute(() -> {
try {
Thread.sleep(2000);
System.out.println("Thread run");
} catch (InterruptedException e) {
e.printStackTrace();
}
});
executorService.shutdownNow();
System.out.println("Main run");
}
Main run
java.lang.InterruptedException: sleep interrupted
at java.lang.Thread.sleep(Native Method)
at ExecutorInterruptExample.lambda$main$0(ExecutorInterruptExample.java:9)
at ExecutorInterruptExample$$Lambda$1/1160460865.run(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
如果只想中斷 Executor 中的一個線程,可以通過使用 submit() 方法來提交一個線程抄谐,它會返回一個 Future<?> 對象渺鹦,通過調(diào)用該對象的 cancel(true) 方法就可以中斷線程。
Future<?> future = executorService.submit(() -> {
// ..
});
future.cancel(true);
五蛹含、互斥同步
Java 提供了兩種鎖機(jī)制來控制多個線程對共享資源的互斥訪問毅厚,第一個是 JVM 實現(xiàn)的 synchronized,而另一個是 JDK 實現(xiàn)的 ReentrantLock浦箱。
synchronized
1. 同步一個代碼塊
public void func() {
synchronized (this) {
// ...
}
}
它只作用于同一個對象吸耿,如果調(diào)用兩個對象上的同步代碼塊祠锣,就不會進(jìn)行同步。
對于以下代碼咽安,使用 ExecutorService 執(zhí)行了兩個線程伴网,由于調(diào)用的是同一個對象的同步代碼塊,因此這兩個線程會進(jìn)行同步妆棒,當(dāng)一個線程進(jìn)入同步語句塊時澡腾,另一個線程就必須等待。
public class SynchronizedExample {
public void func1() {
synchronized (this) {
for (int i = 0; i < 10; i++) {
System.out.print(i + " ");
}
}
}
}
public static void main(String[] args) {
SynchronizedExample e1 = new SynchronizedExample();
ExecutorService executorService = Executors.newCachedThreadPool();
executorService.execute(() -> e1.func1());
executorService.execute(() -> e1.func1());
}
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9
對于以下代碼糕珊,兩個線程調(diào)用了不同對象的同步代碼塊动分,因此這兩個線程就不需要同步。從輸出結(jié)果可以看出放接,兩個線程交叉執(zhí)行刺啦。
public static void main(String[] args) {
SynchronizedExample e1 = new SynchronizedExample();
SynchronizedExample e2 = new SynchronizedExample();
ExecutorService executorService = Executors.newCachedThreadPool();
executorService.execute(() -> e1.func1());
executorService.execute(() -> e2.func1());
}
0 0 1 1 2 2 3 3 4 4 5 5 6 6 7 7 8 8 9 9
2. 同步一個方法
public synchronized void func () {
// ...
}
它和同步代碼塊一樣,作用于同一個對象纠脾。
3. 同步一個類
public void func() {
synchronized (SynchronizedExample.class) {
// ...
}
}
作用于整個類玛瘸,也就是說兩個線程調(diào)用同一個類的不同對象上的這種同步語句,也會進(jìn)行同步苟蹈。
public class SynchronizedExample {
public void func2() {
synchronized (SynchronizedExample.class) {
for (int i = 0; i < 10; i++) {
System.out.print(i + " ");
}
}
}
}
public static void main(String[] args) {
SynchronizedExample e1 = new SynchronizedExample();
SynchronizedExample e2 = new SynchronizedExample();
ExecutorService executorService = Executors.newCachedThreadPool();
executorService.execute(() -> e1.func2());
executorService.execute(() -> e2.func2());
}
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9
4. 同步一個靜態(tài)方法
public synchronized static void fun() {
// ...
}
作用于整個類糊渊。
ReentrantLock
ReentrantLock 是 java.util.concurrent(J.U.C)包中的鎖。
public class LockExample {
private Lock lock = new ReentrantLock();
public void func() {
lock.lock();
try {
for (int i = 0; i < 10; i++) {
System.out.print(i + " ");
}
} finally {
lock.unlock(); // 確保釋放鎖慧脱,從而避免發(fā)生死鎖渺绒。
}
}
}
public static void main(String[] args) {
LockExample lockExample = new LockExample();
ExecutorService executorService = Executors.newCachedThreadPool();
executorService.execute(() -> lockExample.func());
executorService.execute(() -> lockExample.func());
}
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9
比較
1. 鎖的實現(xiàn)
synchronized 是 JVM 實現(xiàn)的,而 ReentrantLock 是 JDK 實現(xiàn)的菱鸥。
2. 性能
新版本 Java 對 synchronized 進(jìn)行了很多優(yōu)化宗兼,例如自旋鎖等,synchronized 與 ReentrantLock 大致相同氮采。
3. 等待可中斷
當(dāng)持有鎖的線程長期不釋放鎖的時候殷绍,正在等待的線程可以選擇放棄等待,改為處理其他事情鹊漠。
ReentrantLock 可中斷主到,而 synchronized 不行。
4. 公平鎖
公平鎖是指多個線程在等待同一個鎖時躯概,必須按照申請鎖的時間順序來依次獲得鎖登钥。
synchronized 中的鎖是非公平的,ReentrantLock 默認(rèn)情況下也是非公平的娶靡,但是也可以是公平的牧牢。
5. 鎖綁定多個條件
一個 ReentrantLock 可以同時綁定多個 Condition 對象。
使用選擇
除非需要使用 ReentrantLock 的高級功能,否則優(yōu)先使用 synchronized结执。這是因為 synchronized 是 JVM 實現(xiàn)的一種鎖機(jī)制度陆,JVM 原生地支持它,而 ReentrantLock 不是所有的 JDK 版本都支持献幔。并且使用 synchronized 不用擔(dān)心沒有釋放鎖而導(dǎo)致死鎖問題懂傀,因為 JVM 會確保鎖的釋放。
六蜡感、線程之間的協(xié)作
當(dāng)多個線程可以一起工作去解決某個問題時蹬蚁,如果某些部分必須在其它部分之前完成,那么就需要對線程進(jìn)行協(xié)調(diào)郑兴。
join()
在線程中調(diào)用另一個線程的 join() 方法犀斋,會將當(dāng)前線程掛起,而不是忙等待情连,直到目標(biāo)線程結(jié)束叽粹。
對于以下代碼,雖然 b 線程先啟動却舀,但是因為在 b 線程中調(diào)用了 a 線程的 join() 方法虫几,b 線程會等待 a 線程結(jié)束才繼續(xù)執(zhí)行,因此最后能夠保證 a 線程的輸出先于 b 線程的輸出挽拔。
public class JoinExample {
private class A extends Thread {
@Override
public void run() {
System.out.println("A");
}
}
private class B extends Thread {
private A a;
B(A a) {
this.a = a;
}
@Override
public void run() {
try {
a.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("B");
}
}
public void test() {
A a = new A();
B b = new B(a);
b.start();
a.start();
}
}
public static void main(String[] args) {
JoinExample example = new JoinExample();
example.test();
}
A
B
wait() notify() notifyAll()
調(diào)用 wait() 使得線程等待某個條件滿足辆脸,線程在等待時會被掛起,當(dāng)其他線程的運(yùn)行使得這個條件滿足時螃诅,其它線程會調(diào)用 notify() 或者 notifyAll() 來喚醒掛起的線程啡氢。
它們都屬于 Object 的一部分,而不屬于 Thread术裸。
只能用在同步方法或者同步控制塊中使用倘是,否則會在運(yùn)行時拋出 IllegalMonitorStateExeception。
使用 wait() 掛起期間袭艺,線程會釋放鎖搀崭。這是因為,如果沒有釋放鎖匹表,那么其它線程就無法進(jìn)入對象的同步方法或者同步控制塊中门坷,那么就無法執(zhí)行 notify() 或者 notifyAll() 來喚醒掛起的線程宣鄙,造成死鎖袍镀。
public class WaitNotifyExample {
public synchronized void before() {
System.out.println("before");
notifyAll();
}
public synchronized void after() {
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("after");
}
}
public static void main(String[] args) {
ExecutorService executorService = Executors.newCachedThreadPool();
WaitNotifyExample example = new WaitNotifyExample();
executorService.execute(() -> example.after());
executorService.execute(() -> example.before());
}
before
after
wait() 和 sleep() 的區(qū)別
- wait() 是 Object 的方法,而 sleep() 是 Thread 的靜態(tài)方法冻晤;
- wait() 會釋放鎖苇羡,sleep() 不會。
await() signal() signalAll()
java.util.concurrent 類庫中提供了 Condition 類來實現(xiàn)線程之間的協(xié)調(diào)鼻弧,可以在 Condition 上調(diào)用 await() 方法使線程等待设江,其它線程調(diào)用 signal() 或 signalAll() 方法喚醒等待的線程锦茁。相比于 wait() 這種等待方式,await() 可以指定等待的條件叉存,因此更加靈活码俩。
使用 Lock 來獲取一個 Condition 對象。
public class AwaitSignalExample {
private Lock lock = new ReentrantLock();
private Condition condition = lock.newCondition();
public void before() {
lock.lock();
try {
System.out.println("before");
condition.signalAll();
} finally {
lock.unlock();
}
}
public void after() {
lock.lock();
try {
condition.await();
System.out.println("after");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
public static void main(String[] args) {
ExecutorService executorService = Executors.newCachedThreadPool();
AwaitSignalExample example = new AwaitSignalExample();
executorService.execute(() -> example.after());
executorService.execute(() -> example.before());
}
before
after
七歼捏、J.U.C - AQS
java.util.concurrent(J.U.C)大大提高了并發(fā)性能稿存,AQS 被認(rèn)為是 J.U.C 的核心。
CountdownLatch
用來控制一個線程等待多個線程瞳秽。
維護(hù)了一個計數(shù)器 cnt瓣履,每次調(diào)用 countDown() 方法會讓計數(shù)器的值減 1,減到 0 的時候练俐,那些因為調(diào)用 await() 方法而在等待的線程就會被喚醒袖迎。
<div align="center"> <img src="../pics//CountdownLatch.png" width=""/> </div>
public class CountdownLatchExample {
public static void main(String[] args) throws InterruptedException {
final int totalThread = 10;
CountDownLatch countDownLatch = new CountDownLatch(totalThread);
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < totalThread; i++) {
executorService.execute(() -> {
System.out.print("run..");
countDownLatch.countDown();
});
}
countDownLatch.await();
System.out.println("end");
executorService.shutdown();
}
}
run..run..run..run..run..run..run..run..run..run..end
CyclicBarrier
用來控制多個線程互相等待,只有當(dāng)多個線程都到達(dá)時腺晾,這些線程才會繼續(xù)執(zhí)行燕锥。
和 CountdownLatch 相似,都是通過維護(hù)計數(shù)器來實現(xiàn)的丘喻。但是它的計數(shù)器是遞增的脯宿,每次執(zhí)行 await() 方法之后,計數(shù)器會加 1泉粉,直到計數(shù)器的值和設(shè)置的值相等连霉,等待的所有線程才會繼續(xù)執(zhí)行。和 CountdownLatch 的另一個區(qū)別是嗡靡,CyclicBarrier 的計數(shù)器可以循環(huán)使用跺撼,所以它才叫做循環(huán)屏障。
下圖應(yīng)該從下往上看才正確讨彼。
<div align="center"> <img src="../pics//CyclicBarrier.png" width=""/> </div>
public class CyclicBarrierExample {
public static void main(String[] args) throws InterruptedException {
final int totalThread = 10;
CyclicBarrier cyclicBarrier = new CyclicBarrier(totalThread);
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < totalThread; i++) {
executorService.execute(() -> {
System.out.print("before..");
try {
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
System.out.print("after..");
});
}
executorService.shutdown();
}
}
before..before..before..before..before..before..before..before..before..before..after..after..after..after..after..after..after..after..after..after..
Semaphore
Semaphore 就是操作系統(tǒng)中的信號量歉井,可以控制對互斥資源的訪問線程數(shù)。
<div align="center"> <img src="../pics//Semaphore.png" width=""/> </div>
以下代碼模擬了對某個服務(wù)的并發(fā)請求哈误,每次只能有 3 個客戶端同時訪問哩至,請求總數(shù)為 10。
public class SemaphoreExample {
public static void main(String[] args) {
final int clientCount = 3;
final int totalRequestCount = 10;
Semaphore semaphore = new Semaphore(clientCount);
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < totalRequestCount; i++) {
executorService.execute(()->{
try {
semaphore.acquire();
System.out.print(semaphore.availablePermits() + " ");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
semaphore.release();
}
});
}
executorService.shutdown();
}
}
2 1 2 2 2 2 2 1 2 2
八蜜自、J.U.C - 其它組件
FutureTask
在介紹 Callable 時我們知道它可以有返回值菩貌,返回值通過 Future<V> 進(jìn)行封裝。FutureTask 實現(xiàn)了 RunnableFuture 接口重荠,該接口繼承自 Runnable 和 Future<V> 接口箭阶,這使得 FutureTask 既可以當(dāng)做一個任務(wù)執(zhí)行,也可以有返回值。
public class FutureTask<V> implements RunnableFuture<V>
public interface RunnableFuture<V> extends Runnable, Future<V>
FutureTask 可用于異步獲取執(zhí)行結(jié)果或取消執(zhí)行任務(wù)的場景仇参。當(dāng)一個計算任務(wù)需要執(zhí)行很長時間嘹叫,那么就可以用 FutureTask 來封裝這個任務(wù),主線程在完成自己的任務(wù)之后再去獲取結(jié)果诈乒。
public class FutureTaskExample {
public static void main(String[] args) throws ExecutionException, InterruptedException {
FutureTask<Integer> futureTask = new FutureTask<Integer>(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
int result = 0;
for (int i = 0; i < 100; i++) {
Thread.sleep(10);
result += i;
}
return result;
}
});
Thread computeThread = new Thread(futureTask);
computeThread.start();
Thread otherThread = new Thread(() -> {
System.out.println("other task is running...");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
otherThread.start();
System.out.println(futureTask.get());
}
}
other task is running...
4950
BlockingQueue
java.util.concurrent.BlockingQueue 接口有以下阻塞隊列的實現(xiàn):
- FIFO 隊列 :LinkedBlockingQueue罩扇、ArrayBlockingQueue(固定長度)
- 優(yōu)先級隊列 :PriorityBlockingQueue
提供了阻塞的 take() 和 put() 方法:如果隊列為空 take() 將阻塞,直到隊列中有內(nèi)容怕磨;如果隊列為滿 put() 將阻塞暮蹂,直到隊列有空閑位置。
使用 BlockingQueue 實現(xiàn)生產(chǎn)者消費(fèi)者問題
public class ProducerConsumer {
private static BlockingQueue<String> queue = new ArrayBlockingQueue<>(5);
private static class Producer extends Thread {
@Override
public void run() {
try {
queue.put("product");
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.print("produce..");
}
}
private static class Consumer extends Thread {
@Override
public void run() {
try {
String product = queue.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.print("consume..");
}
}
}
public static void main(String[] args) {
for (int i = 0; i < 2; i++) {
Producer producer = new Producer();
producer.start();
}
for (int i = 0; i < 5; i++) {
Consumer consumer = new Consumer();
consumer.start();
}
for (int i = 0; i < 3; i++) {
Producer producer = new Producer();
producer.start();
}
}
produce..produce..consume..consume..produce..consume..produce..consume..produce..consume..
ForkJoin
主要用于并行計算中癌压,和 MapReduce 原理類似仰泻,都是把大的計算任務(wù)拆分成多個小任務(wù)并行計算。
public class ForkJoinExample extends RecursiveTask<Integer> {
private final int threshold = 5;
private int first;
private int last;
public ForkJoinExample(int first, int last) {
this.first = first;
this.last = last;
}
@Override
protected Integer compute() {
int result = 0;
if (last - first <= threshold) {
// 任務(wù)足夠小則直接計算
for (int i = first; i <= last; i++) {
result += i;
}
} else {
// 拆分成小任務(wù)
int middle = first + (last - first) / 2;
ForkJoinExample leftTask = new ForkJoinExample(first, middle);
ForkJoinExample rightTask = new ForkJoinExample(middle + 1, last);
leftTask.fork();
rightTask.fork();
result = leftTask.join() + rightTask.join();
}
return result;
}
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
ForkJoinExample example = new ForkJoinExample(1, 10000);
ForkJoinPool forkJoinPool = new ForkJoinPool();
Future result = forkJoinPool.submit(example);
System.out.println(result.get());
}
ForkJoin 使用 ForkJoinPool 來啟動滩届,它是一個特殊的線程池集侯,線程數(shù)量取決于 CPU 核數(shù)。
public class ForkJoinPool extends AbstractExecutorService
ForkJoinPool 實現(xiàn)了工作竊取算法來提高 CPU 的利用率帜消。每個線程都維護(hù)了一個雙端隊列棠枉,用來存儲需要執(zhí)行的任務(wù)。工作竊取算法允許空閑的線程從其它線程的雙端隊列中竊取一個任務(wù)來執(zhí)行泡挺。竊取的任務(wù)必須是最晚的任務(wù)辈讶,避免和隊列所屬線程發(fā)生競爭。例如下圖中娄猫,Thread2 從 Thread1 的隊列中拿出最晚的 Task1 任務(wù)贱除,Thread1 會拿出 Task2 來執(zhí)行,這樣就避免發(fā)生競爭媳溺。但是如果隊列中只有一個任務(wù)時還是會發(fā)生競爭月幌。
<div align="center"> <img src="../pics//15b45dc6-27aa-4519-9194-f4acfa2b077f.jpg" width=""/> </div>
九、線程不安全示例
如果多個線程對同一個共享數(shù)據(jù)進(jìn)行訪問而不采取同步操作的話悬蔽,那么操作的結(jié)果是不一致的扯躺。
以下代碼演示了 1000 個線程同時對 cnt 執(zhí)行自增操作,操作結(jié)束之后它的值為 997 而不是 1000蝎困。
public class ThreadUnsafeExample {
private int cnt = 0;
public void add() {
cnt++;
}
public int get() {
return cnt;
}
}
public static void main(String[] args) throws InterruptedException {
final int threadSize = 1000;
ThreadUnsafeExample example = new ThreadUnsafeExample();
final CountDownLatch countDownLatch = new CountDownLatch(threadSize);
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < threadSize; i++) {
executorService.execute(() -> {
example.add();
countDownLatch.countDown();
});
}
countDownLatch.await();
executorService.shutdown();
System.out.println(example.get());
}
997
十录语、Java 內(nèi)存模型
Java 內(nèi)存模型試圖屏蔽各種硬件和操作系統(tǒng)的內(nèi)存訪問差異,以實現(xiàn)讓 Java 程序在各種平臺下都能達(dá)到一致的內(nèi)存訪問效果禾乘。
主內(nèi)存與工作內(nèi)存
處理器上的寄存器的讀寫的速度比內(nèi)存快幾個數(shù)量級澎埠,為了解決這種速度矛盾,在它們之間加入了高速緩存盖袭。
加入高速緩存帶來了一個新的問題:緩存一致性失暂。如果多個緩存共享同一塊主內(nèi)存區(qū)域,那么多個緩存的數(shù)據(jù)可能會不一致鳄虱,需要一些協(xié)議來解決這個問題弟塞。
<div align="center"> <img src="../pics//68778c1b-15ab-4826-99c0-3b4fd38cb9e9.png" width=""/> </div>
所有的變量都存儲在主內(nèi)存中堡僻,每個線程還有自己的工作內(nèi)存吸重,工作內(nèi)存存儲在高速緩存或者寄存器中席楚,保存了該線程使用的變量的主內(nèi)存副本拷貝娱仔。
線程只能直接操作工作內(nèi)存中的變量撵幽,不同線程之間的變量值傳遞需要通過主內(nèi)存來完成而克。
<div align="center"> <img src="../pics//47358f87-bc4c-496f-9a90-8d696de94cee.png" width=""/> </div>
內(nèi)存間交互操作
Java 內(nèi)存模型定義了 8 個操作來完成主內(nèi)存和工作內(nèi)存的交互操作厢呵。
<div align="center"> <img src="../pics//536c6dfd-305a-4b95-b12c-28ca5e8aa043.png" width=""/> </div>
- read:把一個變量的值從主內(nèi)存?zhèn)鬏數(shù)焦ぷ鲀?nèi)存中
- load:在 read 之后執(zhí)行纳鼎,把 read 得到的值放入工作內(nèi)存的變量副本中
- use:把工作內(nèi)存中一個變量的值傳遞給執(zhí)行引擎
- assign:把一個從執(zhí)行引擎接收到的值賦給工作內(nèi)存的變量
- store:把工作內(nèi)存的一個變量的值傳送到主內(nèi)存中
- write:在 store 之后執(zhí)行建车,把 store 得到的值放入主內(nèi)存的變量中
- lock:作用于主內(nèi)存的變量
- unlock
內(nèi)存模型三大特性
1. 原子性
Java 內(nèi)存模型保證了 read扩借、load、use缤至、assign潮罪、store、write领斥、lock 和 unlock 操作具有原子性嫉到,例如對一個 int 類型的變量執(zhí)行 assign 賦值操作,這個操作就是原子性的月洛。但是 Java 內(nèi)存模型允許虛擬機(jī)將沒有被 volatile 修飾的 64 位數(shù)據(jù)(long何恶,double)的讀寫操作劃分為兩次 32 位的操作來進(jìn)行,即 load嚼黔、store细层、read 和 write 操作可以不具備原子性。
有一個錯誤認(rèn)識就是唬涧,int 等原子性的變量在多線程環(huán)境中不會出現(xiàn)線程安全問題今艺。前面的線程不安全示例代碼中,cnt 變量屬于 int 類型變量爵卒,1000 個線程對它進(jìn)行自增操作之后虚缎,得到的值為 997 而不是 1000。
為了方便討論钓株,將內(nèi)存間的交互操作簡化為 3 個:load实牡、assign、store轴合。
下圖演示了兩個線程同時對 cnt 變量進(jìn)行操作创坞,load、assign受葛、store 這一系列操作整體上看不具備原子性题涨,那么在 T1 修改 cnt 并且還沒有將修改后的值寫入主內(nèi)存偎谁,T2 依然可以讀入該變量的值「俣拢可以看出巡雨,這兩個線程雖然執(zhí)行了兩次自增運(yùn)算,但是主內(nèi)存中 cnt 的值最后為 1 而不是 2席函。因此對 int 類型讀寫操作滿足原子性只是說明 load铐望、assign、store 這些單個操作具備原子性茂附。
<div align="center"> <img src="../pics//ef8eab00-1d5e-4d99-a7c2-d6d68ea7fe92.png" width=""/> </div>
AtomicInteger 能保證多個線程修改的原子性正蛙。
<div align="center"> <img src="../pics//952afa9a-458b-44ce-bba9-463e60162945.png" width=""/> </div>
使用 AtomicInteger 重寫之前線程不安全的代碼之后得到以下線程安全實現(xiàn):
public class AtomicExample {
private AtomicInteger cnt = new AtomicInteger();
public void add() {
cnt.incrementAndGet();
}
public int get() {
return cnt.get();
}
}
public static void main(String[] args) throws InterruptedException {
final int threadSize = 1000;
AtomicExample example = new AtomicExample(); // 只修改這條語句
final CountDownLatch countDownLatch = new CountDownLatch(threadSize);
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < threadSize; i++) {
executorService.execute(() -> {
example.add();
countDownLatch.countDown();
});
}
countDownLatch.await();
executorService.shutdown();
System.out.println(example.get());
}
1000
除了使用原子類之外,也可以使用 synchronized 互斥鎖來保證操作的原子性营曼。它對應(yīng)的內(nèi)存間交互操作為:lock 和 unlock乒验,在虛擬機(jī)實現(xiàn)上對應(yīng)的字節(jié)碼指令為 monitorenter 和 monitorexit。
public class AtomicSynchronizedExample {
private int cnt = 0;
public synchronized void add() {
cnt++;
}
public synchronized int get() {
return cnt;
}
}
public static void main(String[] args) throws InterruptedException {
final int threadSize = 1000;
AtomicSynchronizedExample example = new AtomicSynchronizedExample();
final CountDownLatch countDownLatch = new CountDownLatch(threadSize);
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < threadSize; i++) {
executorService.execute(() -> {
example.add();
countDownLatch.countDown();
});
}
countDownLatch.await();
executorService.shutdown();
System.out.println(example.get());
}
1000
2. 可見性
可見性指當(dāng)一個線程修改了共享變量的值蒂阱,其它線程能夠立即得知這個修改徊件。Java 內(nèi)存模型是通過在變量修改后將新值同步回主內(nèi)存,在變量讀取前從主內(nèi)存刷新變量值來實現(xiàn)可見性的蒜危。
主要有有三種實現(xiàn)可見性的方式:
- volatile
- synchronized虱痕,對一個變量執(zhí)行 unlock 操作之前,必須把變量值同步回主內(nèi)存辐赞。
- final部翘,被 final 關(guān)鍵字修飾的字段在構(gòu)造器中一旦初始化完成,并且沒有發(fā)生 this 逃逸(其它線程通過 this 引用訪問到初始化了一半的對象)响委,那么其它線程就能看見 final 字段的值新思。
對前面的線程不安全示例中的 cnt 變量使用 volatile 修飾,不能解決線程不安全問題赘风,因為 volatile 并不能保證操作的原子性夹囚。
3. 有序性
有序性是指:在本線程內(nèi)觀察,所有操作都是有序的邀窃。在一個線程觀察另一個線程荸哟,所有操作都是無序的,無序是因為發(fā)生了指令重排序瞬捕。
在 Java 內(nèi)存模型中鞍历,允許編譯器和處理器對指令進(jìn)行重排序,重排序過程不會影響到單線程程序的執(zhí)行肪虎,卻會影響到多線程并發(fā)執(zhí)行的正確性劣砍。
volatile 關(guān)鍵字通過添加內(nèi)存屏障的方式來禁止指令重排,即重排序時不能把后面的指令放到內(nèi)存屏障之前扇救。
也可以通過 synchronized 來保證有序性刑枝,它保證每個時刻只有一個線程執(zhí)行同步代碼香嗓,相當(dāng)于是讓線程順序執(zhí)行同步代碼。
先行發(fā)生原則
上面提到了可以用 volatile 和 synchronized 來保證有序性装畅。除此之外靠娱,JVM 還規(guī)定了先行發(fā)生原則,讓一個操作無需控制就能先于另一個操作完成洁灵。
主要有以下這些原則:
1. 單一線程原則
Single Thread rule
在一個線程內(nèi),在程序前面的操作先行發(fā)生于后面的操作掺出。
<div align="center"> <img src="../pics//single-thread-rule.png" width=""/> </div>
2. 管程鎖定規(guī)則
Monitor Lock Rule
一個 unlock 操作先行發(fā)生于后面對同一個鎖的 lock 操作徽千。
<div align="center"> <img src="../pics//monitor-lock-rule.png" width=""/> </div>
3. volatile 變量規(guī)則
Volatile Variable Rule
對一個 volatile 變量的寫操作先行發(fā)生于后面對這個變量的讀操作。
<div align="center"> <img src="../pics//volatile-variable-rule.png" width=""/> </div>
4. 線程啟動規(guī)則
Thread Start Rule
Thread 對象的 start() 方法調(diào)用先行發(fā)生于此線程的每一個動作汤锨。
<div align="center"> <img src="../pics//thread-start-rule.png" width=""/> </div>
5. 線程加入規(guī)則
Thread Join Rule
Thread 對象的結(jié)束先行發(fā)生于 join() 方法返回双抽。
<div align="center"> <img src="../pics//thread-join-rule.png" width=""/> </div>
6. 線程中斷規(guī)則
Thread Interruption Rule
對線程 interrupt() 方法的調(diào)用先行發(fā)生于被中斷線程的代碼檢測到中斷事件的發(fā)生,可以通過 interrupted() 方法檢測到是否有中斷發(fā)生闲礼。
7. 對象終結(jié)規(guī)則
Finalizer Rule
一個對象的初始化完成(構(gòu)造函數(shù)執(zhí)行結(jié)束)先行發(fā)生于它的 finalize() 方法的開始牍汹。
8. 傳遞性
Transitivity
如果操作 A 先行發(fā)生于操作 B,操作 B 先行發(fā)生于操作 C柬泽,那么操作 A 先行發(fā)生于操作 C慎菲。
十一、線程安全
線程安全定義
一個類在可以被多個線程安全調(diào)用時就是線程安全的锨并。
線程安全分類
線程安全不是一個非真即假的命題露该,可以將共享數(shù)據(jù)按照安全程度的強(qiáng)弱順序分成以下五類:不可變、絕對線程安全第煮、相對線程安全解幼、線程兼容和線程對立。
1. 不可變
不可變(Immutable)的對象一定是線程安全的包警,無論是對象的方法實現(xiàn)還是方法的調(diào)用者撵摆,都不需要再采取任何的線程安全保障措施,只要一個不可變的對象被正確地構(gòu)建出來害晦,那其外部的可見狀態(tài)永遠(yuǎn)也不會改變特铝,永遠(yuǎn)也不會看到它在多個線程之中處于不一致的狀態(tài)。
不可變的類型:
- final 關(guān)鍵字修飾的基本數(shù)據(jù)類型壹瘟;
- String
- 枚舉類型
- Number 部分子類苟呐,如 Long 和 Double 等數(shù)值包裝類型,BigInteger 和 BigDecimal 等大數(shù)據(jù)類型俐筋。但同為 Number 的子類型的原子類 AtomicInteger 和 AtomicLong 則并非不可變的牵素。
對于集合類型,可以使用 Collections.unmodifiableXXX() 方法來獲取一個不可變的集合澄者。
public class ImmutableExample {
public static void main(String[] args) {
Map<String, Integer> map = new HashMap<>();
Map<String, Integer> unmodifiableMap = Collections.unmodifiableMap(map);
unmodifiableMap.put("a", 1);
}
}
Exception in thread "main" java.lang.UnsupportedOperationException
at java.util.Collections$UnmodifiableMap.put(Collections.java:1457)
at ImmutableExample.main(ImmutableExample.java:9)
Collections.unmodifiableXXX() 先對原始的集合進(jìn)行拷貝笆呆,需要對集合進(jìn)行修改的方法都直接拋出異常请琳。
public V put(K key, V value) {
throw new UnsupportedOperationException();
}
多線程環(huán)境下,應(yīng)當(dāng)盡量使對象成為不可變赠幕,來滿足線程安全俄精。
2. 絕對線程安全
不管運(yùn)行時環(huán)境如何,調(diào)用者都不需要任何額外的同步措施榕堰。
3. 相對線程安全
相對的線程安全需要保證對這個對象單獨的操作是線程安全的竖慧,在調(diào)用的時候不需要做額外的保障措施,但是對于一些特定順序的連續(xù)調(diào)用逆屡,就可能需要在調(diào)用端使用額外的同步手段來保證調(diào)用的正確性圾旨。
在 Java 語言中,大部分的線程安全類都屬于這種類型魏蔗,例如 Vector砍的、HashTable、Collections 的 synchronizedCollection() 方法包裝的集合等莺治。
對于下面的代碼廓鞠,如果刪除元素的線程刪除了一個元素,而獲取元素的線程試圖訪問一個已經(jīng)被刪除的元素谣旁,那么就會拋出 ArrayIndexOutOfBoundsException床佳。
public class VectorUnsafeExample {
private static Vector<Integer> vector = new Vector<>();
public static void main(String[] args) {
while (true) {
for (int i = 0; i < 100; i++) {
vector.add(i);
}
ExecutorService executorService = Executors.newCachedThreadPool();
executorService.execute(() -> {
for (int i = 0; i < vector.size(); i++) {
vector.remove(i);
}
});
executorService.execute(() -> {
for (int i = 0; i < vector.size(); i++) {
vector.get(i);
}
});
executorService.shutdown();
}
}
}
Exception in thread "Thread-159738" java.lang.ArrayIndexOutOfBoundsException: Array index out of range: 3
at java.util.Vector.remove(Vector.java:831)
at VectorUnsafeExample.lambda$main$0(VectorUnsafeExample.java:14)
at VectorUnsafeExample$$Lambda$1/713338599.run(Unknown Source)
at java.lang.Thread.run(Thread.java:745)
如果要保證上面的代碼能正確執(zhí)行下去,就需要對刪除元素和獲取元素的代碼進(jìn)行同步榄审。
executorService.execute(() -> {
synchronized (vector) {
for (int i = 0; i < vector.size(); i++) {
vector.remove(i);
}
}
});
executorService.execute(() -> {
synchronized (vector) {
for (int i = 0; i < vector.size(); i++) {
vector.get(i);
}
}
});
4. 線程兼容
線程兼容是指對象本身并不是線程安全的夕土,但是可以通過在調(diào)用端正確地使用同步手段來保證對象在并發(fā)環(huán)境中可以安全地使用,我們平常說一個類不是線程安全的瘟判,絕大多數(shù)時候指的是這一種情況怨绣。Java API 中大部分的類都是屬于線程兼容的,如與前面的 Vector 和 HashTable 相對應(yīng)的集合類 ArrayList 和 HashMap 等拷获。
5. 線程對立
線程對立是指無論調(diào)用端是否采取了同步措施篮撑,都無法在多線程環(huán)境中并發(fā)使用的代碼。由于 Java 語言天生就具備多線程特性匆瓜,線程對立這種排斥多線程的代碼是很少出現(xiàn)的赢笨,而且通常都是有害的,應(yīng)當(dāng)盡量避免驮吱。
線程安全的實現(xiàn)方法
1. 互斥同步
synchronized 和 ReentrantLock茧妒。
2. 非阻塞同步
互斥同步最主要的問題就是進(jìn)行線程阻塞和喚醒所帶來的性能問題,因此這種同步也稱為阻塞同步左冬。
互斥同步屬于一種悲觀的并發(fā)策略桐筏,總是認(rèn)為只要不去做正確的同步措施,那就肯定會出現(xiàn)問題拇砰。論共享數(shù)據(jù)是否真的會出現(xiàn)競爭梅忌,它都要進(jìn)行加鎖(這里討論的是概念模型狰腌,實際上虛擬機(jī)會優(yōu)化掉很大一部分不必要的加鎖)、用戶態(tài)核心態(tài)轉(zhuǎn)換牧氮、維護(hù)鎖計數(shù)器和檢查是否有被阻塞的線程需要喚醒等操作琼腔。
隨著硬件指令集的發(fā)展,我們可以使用基于沖突檢測的樂觀并發(fā)策略:先進(jìn)行操作踱葛,如果沒有其它線程爭用共享數(shù)據(jù)丹莲,那操作就成功了,否則采取補(bǔ)償措施(不斷地重試尸诽,直到成功為止)甥材。這種樂觀的并發(fā)策略的許多實現(xiàn)都不需要把線程掛起,因此這種同步操作稱為非阻塞同步逊谋。
樂觀鎖需要操作和沖突檢測這兩個步驟具備原子性擂达,這里就不能再使用互斥同步來保證了土铺,只能靠硬件來完成胶滋。
硬件支持的原子性操作最典型的是:比較并交換(Compare-and-Swap,CAS)悲敷。CAS 指令需要有 3 個操作數(shù)究恤,分別是內(nèi)存地址 V、舊的預(yù)期值 A 和新值 B后德。當(dāng)執(zhí)行操作時部宿,只有當(dāng) V 的值等于 A,才將 V 的值更新為 B瓢湃。
J.U.C 包里面的整數(shù)原子類 AtomicInteger理张,其中的 compareAndSet() 和 getAndIncrement() 等方法都使用了 Unsafe 類的 CAS 操作。
以下代碼使用了 AtomicInteger 執(zhí)行了自增的操作绵患。
private AtomicInteger cnt = new AtomicInteger();
public void add() {
cnt.incrementAndGet();
}
以下代碼是 incrementAndGet() 的源碼雾叭,它調(diào)用了 unsafe 的 getAndAddInt() 。
public final int incrementAndGet() {
return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
}
以下代碼是 getAndAddInt() 源碼落蝙,var1 指示內(nèi)存地址织狐,var2 指示舊值,var4 指示操作需要加的數(shù)值筏勒,這里為 1移迫。通過 getIntVolatile(var1, var2) 得到舊的預(yù)期值,通過調(diào)用 compareAndSwapInt() 來進(jìn)行 CAS 比較管行,如果 var2==var5厨埋,那么就更新內(nèi)存地址為 var1 的變量為 var5+var4【枨辏可以看到 getAndAddInt() 在一個循環(huán)中進(jìn)行揽咕,發(fā)生沖突的做法是不斷的進(jìn)行重試悲酷。
public final int getAndAddInt(Object var1, long var2, int var4) {
int var5;
do {
var5 = this.getIntVolatile(var1, var2);
} while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
return var5;
}
ABA :如果一個變量初次讀取的時候是 A 值,它的值被改成了 B亲善,后來又被改回為 A设易,那 CAS 操作就會誤認(rèn)為它從來沒有被改變過。
J.U.C 包提供了一個帶有標(biāo)記的原子引用類 AtomicStampedReference 來解決這個問題蛹头,它可以通過控制變量值的版本來保證 CAS 的正確性顿肺。大部分情況下 ABA 問題不會影響程序并發(fā)的正確性,如果需要解決 ABA 問題渣蜗,改用傳統(tǒng)的互斥同步可能會比原子類更高效屠尊。
3. 無同步方案
要保證線程安全,并不是一定就要進(jìn)行同步耕拷,兩者沒有因果關(guān)系讼昆。同步只是保證共享數(shù)據(jù)爭用時的正確性的手段,如果一個方法本來就不涉及共享數(shù)據(jù)骚烧,那它自然就無須任何同步措施去保證正確性浸赫,因此會有一些代碼天生就是線程安全的。
(一)可重入代碼(Reentrant Code)
這種代碼也叫做純代碼(Pure Code)赃绊,可以在代碼執(zhí)行的任何時刻中斷它既峡,轉(zhuǎn)而去執(zhí)行另外一段代碼(包括遞歸調(diào)用它本身),而在控制權(quán)返回后碧查,原來的程序不會出現(xiàn)任何錯誤运敢。
可重入代碼有一些共同的特征,例如不依賴存儲在堆上的數(shù)據(jù)和公用的系統(tǒng)資源忠售、用到的狀態(tài)量都由參數(shù)中傳入传惠、不調(diào)用非可重入的方法等。
(二)棧封閉
多個線程訪問同一個方法的局部變量時稻扬,不會出現(xiàn)線程安全問題卦方,因為局部變量存儲在棧中,屬于線程私有的腐螟。
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class StackClosedExample {
public void add100() {
int cnt = 0;
for (int i = 0; i < 100; i++) {
cnt++;
}
System.out.println(cnt);
}
}
public static void main(String[] args) {
StackClosedExample example = new StackClosedExample();
ExecutorService executorService = Executors.newCachedThreadPool();
executorService.execute(() -> example.add100());
executorService.execute(() -> example.add100());
executorService.shutdown();
}
100
100
(三)線程本地存儲(Thread Local Storage)
如果一段代碼中所需要的數(shù)據(jù)必須與其他代碼共享愿汰,那就看看這些共享數(shù)據(jù)的代碼是否能保證在同一個線程中執(zhí)行。如果能保證乐纸,我們就可以把共享數(shù)據(jù)的可見范圍限制在同一個線程之內(nèi)衬廷,這樣,無須同步也能保證線程之間不出現(xiàn)數(shù)據(jù)爭用的問題汽绢。
符合這種特點的應(yīng)用并不少見吗跋,大部分使用消費(fèi)隊列的架構(gòu)模式(如“生產(chǎn)者-消費(fèi)者”模式)都會將產(chǎn)品的消費(fèi)過程盡量在一個線程中消費(fèi)完,其中最重要的一個應(yīng)用實例就是經(jīng)典 Web 交互模型中的“一個請求對應(yīng)一個服務(wù)器線程”(Thread-per-Request)的處理方式,這種處理方式的廣泛應(yīng)用使得很多 Web 服務(wù)端應(yīng)用都可以使用線程本地存儲來解決線程安全問題跌宛。
可以使用 java.lang.ThreadLocal 類來實現(xiàn)線程本地存儲功能酗宋。
對于以下代碼,thread1 中設(shè)置 threadLocal 為 1疆拘,而 thread2 設(shè)置 threadLocal 為 2蜕猫。過了一段時間之后,thread1 讀取 threadLocal 依然是 1哎迄,不受 thread2 的影響回右。
public class ThreadLocalExample {
public static void main(String[] args) {
ThreadLocal threadLocal = new ThreadLocal();
Thread thread1 = new Thread(() -> {
threadLocal.set(1);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(threadLocal.get());
threadLocal.remove();
});
Thread thread2 = new Thread(() -> {
threadLocal.set(2);
threadLocal.remove();
});
thread1.start();
thread2.start();
}
}
1
為了理解 ThreadLocal,先看以下代碼:
public class ThreadLocalExample1 {
public static void main(String[] args) {
ThreadLocal threadLocal1 = new ThreadLocal();
ThreadLocal threadLocal2 = new ThreadLocal();
Thread thread1 = new Thread(() -> {
threadLocal1.set(1);
threadLocal2.set(1);
});
Thread thread2 = new Thread(() -> {
threadLocal1.set(2);
threadLocal2.set(2);
});
thread1.start();
thread2.start();
}
}
它所對應(yīng)的底層結(jié)構(gòu)圖為:
<div align="center"> <img src="../pics//3646544a-cb57-451d-9e03-d3c4f5e4434a.png" width=""/> </div>
每個 Thread 都有一個 ThreadLocal.ThreadLocalMap 對象漱挚,Thread 類中就定義了 ThreadLocal.ThreadLocalMap 成員翔烁。
/* ThreadLocal values pertaining to this thread. This map is maintained
* by the ThreadLocal class. */
ThreadLocal.ThreadLocalMap threadLocals = null;
當(dāng)調(diào)用一個 ThreadLocal 的 set(T value) 方法時,先得到當(dāng)前線程的 ThreadLocalMap 對象旨涝,然后將 ThreadLocal->value 鍵值對插入到該 Map 中蹬屹。
public void set(T value) {
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null)
map.set(this, value);
else
createMap(t, value);
}
get() 方法類似。
public T get() {
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null) {
ThreadLocalMap.Entry e = map.getEntry(this);
if (e != null) {
@SuppressWarnings("unchecked")
T result = (T)e.value;
return result;
}
}
return setInitialValue();
}
ThreadLocal 從理論上講并不是用來解決多線程并發(fā)問題的白华,因為根本不存在多線程競爭慨默。在一些場景 (尤其是使用線程池) 下,由于 ThreadLocal.ThreadLocalMap 的底層數(shù)據(jù)結(jié)構(gòu)導(dǎo)致 ThreadLocal 有內(nèi)存泄漏的情況衬鱼,盡可能在每次使用 ThreadLocal 后手動調(diào)用 remove()业筏,以避免出現(xiàn) ThreadLocal 經(jīng)典的內(nèi)存泄漏甚至是造成自身業(yè)務(wù)混亂的風(fēng)險憔杨。
十二鸟赫、鎖優(yōu)化
這里的鎖優(yōu)化主要是指虛擬機(jī)對 synchronized 的優(yōu)化。
自旋鎖
互斥同步的進(jìn)入阻塞狀態(tài)的開銷都很大消别,應(yīng)該盡量避免抛蚤。在許多應(yīng)用中,共享數(shù)據(jù)的鎖定狀態(tài)只會持續(xù)很短的一段時間寻狂。自旋鎖的思想是讓一個線程在請求一個共享數(shù)據(jù)的鎖時執(zhí)行忙循環(huán)(自旋)一段時間岁经,如果在這段時間內(nèi)能獲得鎖,就可以避免進(jìn)入阻塞狀態(tài)蛇券。
自選鎖雖然能避免進(jìn)入阻塞狀態(tài)從而減少開銷缀壤,但是它需要進(jìn)行忙循環(huán)操作占用 CPU 時間,它只適用于共享數(shù)據(jù)的鎖定狀態(tài)很短的場景纠亚。
在 JDK 1.6 中引入了自適應(yīng)的自旋鎖塘慕。自適應(yīng)意味著自旋的次數(shù)不再固定了,而是由前一次在同一個鎖上的自旋次數(shù)及鎖的擁有者的狀態(tài)來決定蒂胞。
鎖消除
鎖消除是指對于被檢測出不可能存在競爭的共享數(shù)據(jù)的鎖進(jìn)行消除图呢。
鎖消除主要是通過逃逸分析來支持,如果堆上的共享數(shù)據(jù)不可能逃逸出去被其它線程訪問到,那么就可以把它們當(dāng)成私有數(shù)據(jù)對待蛤织,也就可以將它們的鎖進(jìn)行消除赴叹。
對于一些看起來沒有加鎖的代碼,其實隱式的加了很多鎖指蚜。例如下面的字符串拼接代碼就隱式加了鎖:
public static String concatString(String s1, String s2, String s3) {
return s1 + s2 + s3;
}
String 是一個不可變的類乞巧,編譯器會對 String 的拼接自動優(yōu)化。在 JDK 1.5 之前摊鸡,會轉(zhuǎn)化為 StringBuffer 對象的連續(xù) append() 操作:
public static String concatString(String s1, String s2, String s3) {
StringBuffer sb = new StringBuffer();
sb.append(s1);
sb.append(s2);
sb.append(s3);
return sb.toString();
}
每個 append() 方法中都有一個同步塊摊欠。虛擬機(jī)觀察變量 sb,很快就會發(fā)現(xiàn)它的動態(tài)作用域被限制在 concatString() 方法內(nèi)部柱宦。也就是說些椒,sb 的所有引用永遠(yuǎn)不會“逃逸”到 concatString() 方法之外,其他線程無法訪問到它掸刊,因此可以進(jìn)行消除免糕。
鎖粗化
如果一系列的連續(xù)操作都對同一個對象反復(fù)加鎖和解鎖,頻繁的加鎖操作就會導(dǎo)致性能損耗忧侧。
上一節(jié)的示例代碼中連續(xù)的 append() 方法就屬于這類情況石窑。如果虛擬機(jī)探測到由這樣的一串零碎的操作都對同一個對象加鎖,將會把加鎖的范圍擴(kuò)展(粗化)到整個操作序列的外部蚓炬。對于上一節(jié)的示例代碼就是擴(kuò)展到第一個 append() 操作之前直至最后一個 append() 操作之后松逊,這樣只需要加鎖一次就可以了。
輕量級鎖
JDK 1.6 引入了偏向鎖和輕量級鎖肯夏,從而讓鎖擁有了四個狀態(tài):無鎖狀態(tài)(unlocked)经宏、偏向鎖狀態(tài)(biasble)、輕量級鎖狀態(tài)(lightweight locked)和重量級鎖狀態(tài)(inflated)驯击。
以下是 HotSpot 虛擬機(jī)對象頭的內(nèi)存布局烁兰,這些數(shù)據(jù)被稱為 mark word。其中 tag bits 對應(yīng)了五個狀態(tài)徊都,這些狀態(tài)在右側(cè)的 state 表格中給出沪斟,應(yīng)該注意的是 state 表格不是存儲在對象頭中的。除了 marked for gc 狀態(tài)暇矫,其它四個狀態(tài)已經(jīng)在前面介紹過了主之。
<div align="center"> <img src="../pics//bb6a49be-00f2-4f27-a0ce-4ed764bc605c.png" width="600"/> </div>
下圖左側(cè)是一個線程的虛擬機(jī)棧,其中有一部分稱為 Lock Record 的區(qū)域李根,這是在輕量級鎖運(yùn)行過程創(chuàng)建的槽奕,用于存放鎖對象的 Mark Word。而右側(cè)就是一個鎖對象朱巨,包含了 Mark Word 和其它信息史翘。
<div align="center"> <img src="../pics//051e436c-0e46-4c59-8f67-52d89d656182.png" width="500"/> </div>
輕量級鎖是相對于傳統(tǒng)的重量級鎖而言,它使用 CAS 操作來避免重量級鎖使用互斥量的開銷。對于絕大部分的鎖琼讽,在整個同步周期內(nèi)都是不存在競爭的必峰,因此也就不需要都使用互斥量進(jìn)行同步,可以先采用 CAS 操作進(jìn)行同步钻蹬,如果 CAS 失敗了再改用互斥量進(jìn)行同步吼蚁。
當(dāng)嘗試獲取一個鎖對象時,如果鎖對象標(biāo)記為 0 01问欠,說明鎖對象的鎖未鎖定(unlocked)狀態(tài)肝匆。此時虛擬機(jī)在當(dāng)前線程棧中創(chuàng)建 Lock Record,然后使用 CAS 操作將對象的 Mark Word 更新為 Lock Record 指針顺献。如果 CAS 操作成功了旗国,那么線程就獲取了該對象上的鎖,并且對象的 Mark Word 的鎖標(biāo)記變?yōu)?00注整,表示該對象處于輕量級鎖狀態(tài)能曾。
<div align="center"> <img src="../pics//baaa681f-7c52-4198-a5ae-303b9386cf47.png" width="500"/> </div>
如果 CAS 操作失敗了,虛擬機(jī)首先會檢查對象的 Mark Word 是否指向當(dāng)前線程的虛擬機(jī)棧肿轨,如果是的話說明當(dāng)前線程已經(jīng)擁有了這個鎖對象寿冕,那就可以直接進(jìn)入同步塊繼續(xù)執(zhí)行,否則說明這個鎖對象已經(jīng)被其他線程線程搶占了椒袍。如果有兩條以上的線程爭用同一個鎖驼唱,那輕量級鎖就不再有效,要膨脹為重量級鎖驹暑。
偏向鎖
偏向鎖的思想是偏向于讓第一個獲取鎖對象的線程玫恳,這個線程在之后獲取該鎖就不再需要進(jìn)行同步操作,甚至連 CAS 操作也不再需要岗钩。
當(dāng)鎖對象第一次被線程獲得的時候纽窟,進(jìn)入偏向狀態(tài)肖油,標(biāo)記為 1 01兼吓。同時使用 CAS 操作將線程 ID 記錄到 Mark Word 中,如果 CAS 操作成功森枪,這個線程以后每次進(jìn)入這個鎖相關(guān)的同步塊就不需要再進(jìn)行任何同步操作视搏。
當(dāng)有另外一個線程去嘗試獲取這個鎖對象時,偏向狀態(tài)就宣告結(jié)束县袱,此時撤銷偏向(Revoke Bias)后恢復(fù)到未鎖定狀態(tài)或者輕量級鎖狀態(tài)浑娜。
<div align="center"> <img src="../pics//390c913b-5f31-444f-bbdb-2b88b688e7ce.jpg" width="600"/> </div>
十三、多線程開發(fā)良好的實踐
給線程起個有意義的名字式散,這樣可以方便找 Bug筋遭。
縮小同步范圍,例如對于 synchronized,應(yīng)該盡量使用同步塊而不是同步方法漓滔。
多用同步類少用 wait() 和 notify()编饺。首先,CountDownLatch, CyclicBarrier, Semaphore 和 Exchanger 這些同步類簡化了編碼操作响驴,而用 wait() 和 notify() 很難實現(xiàn)對復(fù)雜的控制流透且;其次,這些同步類是由最好的企業(yè)編寫和維護(hù)豁鲤,在后續(xù)的 JDK 中還會不斷優(yōu)化和完善秽誊,使用這些更高等級的同步工具你的程序可以不費(fèi)吹灰之力獲得優(yōu)化。
多用并發(fā)集合少用同步集合琳骡,例如應(yīng)該使用 ConcurrentHashMap 而不是 Hashtable锅论。
使用本地變量和不可變類來保證線程安全。
使用線程池而不是直接創(chuàng)建 Thread 對象楣号,這是因為創(chuàng)建線程代價很高棍厌,線程池可以有效地利用有限的線程來啟動任務(wù)。
使用 BlockingQueue 實現(xiàn)生產(chǎn)者消費(fèi)者問題竖席。
參考資料
- BruceEckel. Java 編程思想: 第 4 版 [M]. 機(jī)械工業(yè)出版社, 2007.
- 周志明. 深入理解 Java 虛擬機(jī) [M]. 機(jī)械工業(yè)出版社, 2011.
- Threads and Locks
- 線程通信
- Java 線程面試題 Top 50
- BlockingQueue
- thread state java
- CSC 456 Spring 2012/ch7 MN
- Java - Understanding Happens-before relationship
- 6? Thread Synchronization
- How is Java's ThreadLocal implemented under the hood?
- Concurrent
- JAVA FORK JOIN EXAMPLE
- 聊聊并發(fā)(八)——Fork/Join 框架介紹
- Eliminating SynchronizationRelated Atomic Operations with Biased Locking and Bulk Rebiasing