2.并發(fā)編程
2.1 線程的基本認識
2.1.1 線程的基本介紹
什么是線程:線程是操作系統(tǒng)能夠進行運算調(diào)度的最小單位衫仑。它被包含在進程之中鸯檬,是進程中的實際運作單位。
什么是進程:進程是一個具有一定獨立功能的程序關(guān)于某個數(shù)據(jù)集合的一次運行活動谣沸。它是操作系統(tǒng)動態(tài)執(zhí)行的基本單元玷过,在傳統(tǒng)的操作系統(tǒng)中,進程既是基本的[分配單元]卤橄,也是基本的執(zhí)行單元
為什么會有線程:
1.在多核CPU中绿满,利用多線程可以實現(xiàn)真正意義上的并行執(zhí)行。
2.在一個應(yīng)用進程中窟扑,會存在多個同時執(zhí)行的任務(wù)喇颁,如果其中一個任務(wù)被阻塞,將會引起不依賴該任務(wù)的任務(wù)也被阻塞嚎货。通過對不同任務(wù)創(chuàng)建不同的線程去處理橘霎,可以提升程序處理的實時性。
3.線程可以認為是輕量級的進程殖属,所以線程的創(chuàng)建姐叁、銷毀比進程更快。
2.1.2 線程的應(yīng)用場景
為什么要用多線程:
- 異步執(zhí)行
- 利用多CPU資源實現(xiàn)真正意義上的并行執(zhí)行忱辅。
線程的價值:
線程的應(yīng)用場景:
- 使用多線程實現(xiàn)文件下載七蜘。
- 后臺任務(wù):如定時向大量(100W以上)的用戶發(fā)送郵件。
- 異步處理:記錄日志墙懂。
- 多步驟的任務(wù)處理橡卤,可根據(jù)步驟特征選用不同個數(shù)和特征的線程來協(xié)作處理,多任務(wù)的分割损搬,由一個主線程分割給多個線程完成碧库。
總結(jié):
多線程的本質(zhì)是:合理的利用多核心CPU資源來實現(xiàn)線程的并行處理柜与,來實現(xiàn)同一個進程內(nèi)的多個任務(wù)的并行執(zhí)行,同時基于線程本身的異步執(zhí)行特性嵌灰,提升任務(wù)處理的效率弄匕。
2.1.3 如何在 Java 中應(yīng)用多線程
Java中使用多線程的方式
:
- 繼承Thread類
public class ThreadDemo extends Thread{
@Override
public void run() {
System.out.println("當(dāng)前線程:"+Thread.currentThread().getName());
}
public static void main(String[] args) {
ThreadDemo threadDemo=new ThreadDemo();
//threadDemo.start(); //啟動一個線程
}
}
- 實現(xiàn)Runnable接口
public class RunnableDemo implements Runnable{
@Override
public void run() {
System.out.println("當(dāng)前線程:"+Thread.currentThread().getName());
}
public static void main(String[] args) {
RunnableDemo runnableDemo=new RunnableDemo();
Thread thread=new Thread(runnableDemo);
thread.start();//啟動線程
}
}
- 實現(xiàn)Callable接口
public class CallableDemo implements Callable<String> {
@Override
public String call() throws Exception {
System.out.println("當(dāng)前線程:"+Thread.currentThread().getName());
Thread.sleep(10000);
return "Hello";
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService executorService= Executors.newFixedThreadPool(1);
Future<String> future=executorService.submit(new CallableDemo());
//future.get 是一個阻塞方法
System.out.println(Thread.currentThread().getName()+"-"+future.get());
}
}
2.1.4 Java 線程的生命周期
Java 線程從創(chuàng)建到銷毀,一共有6 個狀態(tài):
- NEW:初始狀態(tài)沽瞭,線程被構(gòu)建迁匠,但是還沒有調(diào)用start方法。
- RUNNABLED:運行狀態(tài)驹溃,JAVA線程把操作系統(tǒng)中的就緒和運行兩種狀態(tài)統(tǒng)一稱為“運行中”城丧。
- BLOCKED:阻塞狀態(tài),表示線程進入等待狀態(tài),也就是線程因為某種原因放棄了CPU使用權(quán)豌鹤,阻塞也分為幾種情況亡哄。
- WAITING: 等待狀態(tài)
- TIME_WAITING:超時等待狀態(tài),超時以后自動返回
- TERMINATED:終止?fàn)顟B(tài)布疙,表示當(dāng)前線程執(zhí)行完畢
public class ThreadStatusDemo {
public static void main(String[] args) {
//TIME_WAITING
new Thread(()->{
while(true){
try {
TimeUnit.SECONDS.sleep(100); // 超時等待
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"Time_Wating_Demo").start();
//WAITING
new Thread(()->{
while(true){
synchronized (ThreadStatusDemo.class){
try {
ThreadStatusDemo.class.wait(); //等待
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
},"Wating").start();
// 優(yōu)先搶占鎖的進入time_waiting狀態(tài)蚊惯, 沒搶到鎖的進入block狀態(tài)
new Thread(new BlockedDemo(),"Blocked-Demo-01").start();
new Thread(new BlockedDemo(),"Blocked-Demo-02").start();
}
static class BlockedDemo extends Thread{
@Override
public void run(){
synchronized (BlockedDemo.class){
while(true){ //死循環(huán)
try {
TimeUnit.SECONDS.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
}
2.2 線程的基本操作及原理
2.2.1 Thread.join的使用及原理
public class ThreadJonDemo {
private static int x=0;
private static int i=0;
public static void main(String[] args) throws InterruptedException {
Thread t1=new Thread(()->{
//阻塞操作
i=1;
x=2;
});
Thread t2=new Thread(()->{
i=x+2;
});
//兩個線程的執(zhí)行順序,
t1.start();
t1.join(); //t1線程的執(zhí)行結(jié)果對于t2可見(t1線程一定要比t2線程有限執(zhí)行) --- 阻塞
t2.start();
Thread.sleep(1000);
System.out.println("result:"+i);
}
Thread.join的作用是保證線程執(zhí)行結(jié)果的可見性灵临。
可見性:可以理解為對結(jié)果對后續(xù)代碼可見截型,即意味著,線程執(zhí)行完成以后儒溉,后續(xù)代碼才會開始執(zhí)行菠劝。
Thread.join
的本質(zhì)其實是wait
/notifyall
。它阻塞的被調(diào)用時睁搭, join
所在的線程赶诊,在上述圖片示例中為主線程。
2.2.2 Thread.sleep
Thread.sleep
: 使線程暫停執(zhí)行一段時間园骆,直到等待的時間結(jié)束才恢復(fù)執(zhí)行或在這段時間內(nèi)被中斷舔痪。
Thread.sleep的工作流程:
- 掛起線程并修改其運行狀態(tài)
- 用sleep()提供的參數(shù)來設(shè)置一個定時器。
- 當(dāng)時間結(jié)束锌唾,定時器會觸發(fā)锄码,內(nèi)核收到中斷后修改線程的運行狀態(tài)。
例如線程會被標(biāo)志為就緒而進入就緒隊列等待調(diào)度晌涕。
Thread.Sleep(0) 的意義: 類似Thred.yield.使線程放棄CPU的使用并重新加入競爭中滋捶。
操作系統(tǒng)中,CPU競爭有很多種策略余黎。Unix系統(tǒng)使用的是時間片算法重窟,而Windows則屬于搶占式的。
2.2.3 wait 和 notify的使用
一個線程修改了一個對象的值,而另個線程感知到了變化狼忱,然后進行響應(yīng)的操作。
例如可以通過wait
和notify
來實現(xiàn)阻塞隊列展蒂。
// 生產(chǎn)者
public class Producer implements Runnable{
private Queue<String> bags;
private int size;
public Producer(Queue<String> bags, int size) {
this.bags = bags;
this.size = size;
}
@Override
public void run() {
int i=0;
while(true){
i++;
synchronized (bags){
while(bags.size()==size){
System.out.println("bags已經(jīng)滿了");
//TODO? 阻塞厅翔?
try {
bags.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("生產(chǎn)者-生產(chǎn):bag"+i);
bags.add("bag"+i);
//TODO? 喚醒處于阻塞狀態(tài)下的消費者
bags.notifyAll();
}
}
}
}
// 生產(chǎn)者
public class Consumer implements Runnable{
private Queue<String> bags;
private int size;
public Consumer(Queue<String> bags, int size) {
this.bags = bags;
this.size = size;
}
@Override
public void run() {
while(true){
synchronized (bags){
while(bags.isEmpty()){
System.out.println("bags為空");
try {
bags.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
String bag=bags.remove();
System.out.println("消費者消費:"+bag);
bags.notifyAll();
}
}
}
}
// 主程序入口
public class WaitNotifyDemo {
public static void main(String[] args) {
// 隊列為共享資源
Queue<String> queue=new LinkedList<>();
int size=10;
Producer producer=new Producer(queue,size);
Consumer consumer=new Consumer(queue,size);
Thread t1=new Thread(producer);
Thread t2=new Thread(consumer);
t1.start();
t2.start();
}
}
為什么wait/notify需要加synchronized乖坠?
- 從剛剛的案例來看,其實wait/notify本質(zhì)上其實是一種條件的競爭刀闷,至少來說熊泵,
wait和notify方法一定是互斥存在的,既然要實現(xiàn)互斥甸昏,那么synchronized就是一
個很好的解決方法戈次。 - wait和notify是用于實現(xiàn)多個線程之間的通信,而通信必然會存在一個通信的
載體筒扒,比如我們小時候玩的游戲,用兩個紙杯绊寻,中間用一根線連接花墩,然后可以實現(xiàn)
比較遠距離的對話。而這根線就是載體澄步,那么在這里也是一樣冰蘑,wait/notify是基
于synchronized來實現(xiàn)通信的。也就是兩者必須要在同一個頻道也就是同一個鎖的
范圍內(nèi)村缸。
2.3.4 Thread.interrupt和 Thread.interrupted
interrupt是類種的一個屬性祠肥,在JVM中維護。
interrupt()
: 中斷一個線程梯皿。當(dāng)其他線程通過調(diào)用當(dāng)前線程的interrupt方法仇箱,表示向當(dāng)前線程打個招呼,告訴他可以中斷線程的執(zhí)行了东羹,至于什么時候中斷剂桥,取決于當(dāng)前線程自己。
interrupted
:對設(shè)置中斷標(biāo)識的線程復(fù)位属提,并且返回當(dāng)前的中斷狀態(tài)权逗。
public static void main(String[] args) throws InterruptedException {
Thread thread=new Thread(()->{
int i = 0;
// isInterrupted只有在循環(huán)內(nèi)才有效
while(!Thread.currentThread().isInterrupted()){
i++;
}
}
});
thread.start();
TimeUnit.SECONDS.sleep(1);
thread.interrupt(); //中斷
}
線程處于阻塞狀態(tài)下的情況下(中斷才有意義)
- thread.join
- wait
- Thread.sleep
public static void main(String[] args) throws InterruptedException {
Thread thread=new Thread(()->{
//Thread.currentThread().isInterrupted() 默認是false
//正常的任務(wù)處理..
try {
// 線程阻塞時,調(diào)用interrupt()方法可以使阻塞的線程立馬跑出中斷異常冤议,我們可以捕獲中斷異常來進行后續(xù)的邏輯處理斟薇。
Thread.sleep(10000);
} catch (InterruptedException e) {
//拋出異常來相應(yīng)客戶端的中斷請求
e.printStackTrace();
}
});
thread.start();
Thread.sleep(5000);
//interrupt 這個屬性由false-true
thread.interrupt(); //中斷(友好)
}
2.3 線程的安全分析
2.3.1 并發(fā)編程問題的源頭- - 原子性、可見性恕酸、有序性
如何理解線程安全:當(dāng)多個線程訪問某個對象時堪滨,不管運行時環(huán)境采用何種調(diào)度方式或者這些線程將如何交替執(zhí)行,并且在主調(diào)代碼中不需要任何額外的同步或者協(xié)同蕊温,這個類都能表現(xiàn)出正確的行為椿猎,那么就稱這個類是線程安全的惶岭。
線程問題的源頭
CPU為了提高處理速度,減少IO瓶頸的優(yōu)化
- CPU增加了高速緩存犯眠,均衡與內(nèi)存的速度差異
- 操作系統(tǒng)增加進程按灶、線程、以及分時復(fù)用cpu筐咧,均衡cpu與i/o設(shè)備的速度差異
- 編譯程序優(yōu)化指令的執(zhí)行順序鸯旁,使得能夠更加合理的利用緩存
可見性問題的源頭
線程安全問題的本質(zhì):
- 原子性
- 可見性
- 有序性
CPU 高速緩存和內(nèi)存的架構(gòu)設(shè)計,以及指令重排序問題
2.3.2 Java 內(nèi)存模型 -Java 如何解決可見性有序性問題
JVM: Java內(nèi)存模型是一種抽象結(jié)構(gòu)量蕊,它提供了合理的禁用緩存以及禁止重排序
的方法來解決可見性铺罢、有序性問題。
JMM和硬件模型的對應(yīng)簡圖
2.3.3 同步關(guān)鍵字Synchronized
synchronized鎖的范圍
- 對于普通同步方法残炮,鎖是當(dāng)前實例對象韭赘。
- 對于靜態(tài)同步方法,鎖是當(dāng)前類的Class對象势就。
- 對于同步方法塊泉瞻,鎖是Synchonized括號里配置的對象
public class SyncDemo {
//對象鎖(同一個對象有效) this
public synchronized void demo(){
}
public void demo1(){
//TODO
synchronized (this){ //對象鎖
}
//TODO
}
//類級別的鎖 SyncDemo.class
public synchronized static void demo3(){
}
public void demo4(){
//TODO
// 方法快的鎖
synchronized (SyncDemo.class){
}
}
public static void main(String[] args) {
SyncDemo syncDemo=new SyncDemo();
SyncDemo syncDemo1=new SyncDemo();
//無法實現(xiàn)兩個線程的互斥.
//如果訪問demo3的話,那么下面兩個線程會存在互斥
new Thread(()->{ //syncDemo這個實例
syncDemo.demo1();
}).start();
new Thread(()->{ //BLOCKED狀態(tài)
syncDemo1.demo1();//syncDemo1這個實例
}).start();
}
}
synchronized 在指令中會增加monitorenter
指令
2.3.4 volatile關(guān)鍵字分析
volatile是什么苞冯?
volatile
可以用來解決可見性和有序性問題
volatile
會在指令中增加Lock
指令
Lock指令的作用
- 將當(dāng)前處理器緩存行的數(shù)據(jù)寫回到系統(tǒng)內(nèi)存袖牙。
- 這個寫回內(nèi)存的操作會使在其他CPU里緩存了該內(nèi)存地址的數(shù)據(jù)無效。
什么情況下需要用到volatile
當(dāng)存在多個線程對同一個共享變量進行修改的時候舅锄,需要增加volatile鞭达,保證數(shù)據(jù)修改的實時可見。
Volatile是如何解決有序性問題的
下圖中皇忿,進入if
后畴蹭,value可能不等于10,這就是有序性問題鳍烁。
CPU層面的內(nèi)存屏障
-
Store Barrier
:強制所有在store屏障指令之前的store指令撮胧,都在該store屏障指令執(zhí)行之前被執(zhí)行,并把store緩沖區(qū)的數(shù)據(jù)都刷到CPU緩存老翘; -
Load Barrier
:強制所有在load屏障指令之后的load指令芹啥,都在該load屏障指令執(zhí)行之后被執(zhí)行,并且一直等到load緩沖區(qū)被該CPU讀完才能執(zhí)行之后的load指令铺峭; -
Full Barrier
:復(fù)合了load和storee屏障的功能墓怀。
CPU層面的內(nèi)存屏障
JVM提供的內(nèi)存屏障
volatile的總結(jié)
本質(zhì)上來說:volatile
實際上是通過內(nèi)存屏障來防止指令重排序以及禁止cpu高速緩存來解決可見性問題。
而Lock
指令卫键,
它本意上是禁止高速緩存解決可見性問題傀履,但實際上在這里,它表示的是一種內(nèi)存屏障的功能。也就是說針對當(dāng)前的硬件環(huán)境钓账,JMM層面采用Lock
指令作為內(nèi)存屏障來解決可見性問題.
2.3.5 final域
final關(guān)鍵字
final
在Java中是一個保留的關(guān)鍵字碴犬,可以聲明成員變量、方法梆暮、類以及本地變量服协。一旦你將引用聲明作final
,你將不能改變這個引用了啦粹。
final域和線程安全有什么關(guān)系偿荷?
對于 final域,編譯器和處理器要遵守兩個重排序規(guī)則:
- 在構(gòu)造函數(shù)內(nèi)對一個final域的寫入唠椭,與隨后把這個被構(gòu)造對象的引用賦值給一
個引用變量跳纳,這兩個操作之間不能重排序。 - 初次讀一個包含final域的對象的引用贪嫂,與隨后初次讀這個final域寺庄,這兩個操
作之間不能重排序。
寫可能執(zhí)行的情況
寫final域重排序規(guī)則
- JMM禁止編譯器把
final
域的寫重排序到構(gòu)造函數(shù)之外力崇。 - 編譯器會在
final
域的寫之后斗塘,構(gòu)造函數(shù)return之前,插入一個StoreStore
屏障餐曹。這個屏障禁止處理器把final域的寫重排序到構(gòu)造函數(shù)之外。
讀的時候可能存在的執(zhí)行情況
讀域的重排序規(guī)則
在一個線程中敌厘,初次讀對象引用與初次讀該對象包含的final
域台猴,JMM禁止處理器重排序這兩個操作,編譯器會在讀final域操作的前面插入一個LoadLoad
屏障俱两。
構(gòu)造器內(nèi)部造成逃逸
溢出帶來的重排序問題
2.3.6 Happens-Before 規(guī)則
什么是Happens-Before饱狂?
Happens-Before是一種可見性規(guī)則,它表達的含義是前面一個操作的結(jié)果對后續(xù)操作是可見的宪彩。
6種Happens-Before規(guī)則:
- 程序順序規(guī)則
-
監(jiān)視器鎖規(guī)則:對一個鎖的解鎖 Happens-Before 于后續(xù)對這個鎖的加鎖休讳。
監(jiān)視器鎖規(guī)則
Volatile變量規(guī)則:對一個volatile域的寫,happens-before于任意后續(xù)對這個volatile域的讀.
傳遞性: 如果A happens-before B尿孔,且B happens-before C俊柔,那么A happens-before C.
start()規(guī)則: 如果線程A執(zhí)行操作
ThreadB.start()
(啟動線程B),那么A線程的ThreadB.start()
操作happens-before于線程B中的任意操作.Join()規(guī)則: 如果線程A執(zhí)行操作
ThreadB.join()
并成功返回活合,那么線程B中的任意操作happens-before于線程A從ThreadB.join()
操作成功返回.
2.3.7 原子類 Atomic-無鎖工具的典范
原子性問題的解決方案:
-
synchronized
雏婶、Lock
J.U.C包下的Atomic
類
Atomic
類的示例:
public class AtomicDemo {
// public static int count=0;
private static AtomicInteger atomicInteger=new AtomicInteger(0);
public static void incr(){
try {
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
//count++; //count++ (只會由一個線程來執(zhí)行)
atomicInteger.incrementAndGet();
}
public static void main(String[] args) throws InterruptedException {
TimeUnit.SECONDS.sleep(1);
for (int i = 0; i < 1000; i++) {
new Thread(AtomicDemo::incr).start();
}
Thread.sleep(2000);
System.out.println("result:"+count);
//System.out.println("result:"+atomicInteger.get());
}
}
Atomic實現(xiàn)原理:
- Unsafe類;
-
CAS:
image.png
2.3.8 ThreadLocal的實現(xiàn)原理
ThreadLocal實現(xiàn)線程隔離
public class ThreadLocalDemo {
private static Integer num = 0;
private static final ThreadLocal<Integer> local = new ThreadLocal<Integer>() {
// 重寫設(shè)置初始值的方法
@Override
protected Integer initialValue() {
return 0;
}
};
public static void main(String[] args) {
Thread[] threads = new Thread[5];
for (int i = 0; i < 5; i++) {
threads[i] = new Thread(() -> {
// 希望所有線程拿到初始值0
int num = local.get();
num += 5;
local.set(local.get() + 5);
System.out.println(Thread.currentThread().getName() + "---" + Integer.toString(local.get()));
}, "thread" + i);
}
for (Thread thread : threads) {
thread.start();
}
}
}
2.4.1 發(fā)布與逃逸
-
發(fā)布
發(fā)布的意思是使一個對象能夠被當(dāng)前范圍之外的代碼所使用.
public static HashSet<Person> persons;
public void init() {
person = new HashSet<Person>();
}
不安全發(fā)布
不安全發(fā)布是指某個不應(yīng)該發(fā)布的對象被發(fā)布了.
下面案例中, states是個私有變量白指,但是因為不安全發(fā)布使得其數(shù)值可以被外部對象修改留晚,導(dǎo)致不安全發(fā)布。
發(fā)布溢出(逃逸)
一種錯誤的發(fā)布告嘲,當(dāng)一個對象還沒有構(gòu)造完成時错维,就使它被其他線程所見.
下面實例中奖地,就可能因為指令重排序問題導(dǎo)致this對象逃逸。
2.4.2 安全發(fā)布對象
- 在靜態(tài)初始化函數(shù)中初始化一個對象引用.
public class StaticDemo {
private StaticDemo(){}
private static StaticDemo instance=new StaticDemo();
public static StaticDemo getInstance(){
return instance;
}
}
靜態(tài)初始化之所以安全是因為是在JVM類的初始化階段進行初始化的赋焕,在類被加載后并在被線程使用之前参歹,所有內(nèi)存的寫入操作會自動對所有線程可見。但該規(guī)則只適用于構(gòu)造時的狀態(tài)宏邮,不保證后續(xù)線程使用的安全性泽示。
- 將對象的引用保存到volatile類型的域或者AtomicReference對象中(利用volatile happen-before規(guī)則)
- 將對象的引用保存到一個由鎖保護的域中(讀寫都上鎖)
public class VolatileSyncDemo {
private VolatileSyncDemo(){}
private volatile static VolatileSyncDemo instance=null;
//DCL問題
/**
* instance = new VolatileSyncDemo();
* ->
* 1. memory=allocate()
* 2.
* 3. instance=memory
* ctorInstance(memory)
*
* 1.3.2 (不完整實例)
*/
public static VolatileSyncDemo getInstance(){
if(instance==null){
// 把鎖放入到判斷里,而不是防止方法上蜜氨,防止當(dāng)instance不為null時械筛,音方法加鎖帶來的性能損失。
// 再鎖里再進行一次判斷就可以防止多線程時的多實例問題飒炎。但是會帶來DCL問題埋哟,所以需要加入volatile,防止指令重排序郎汪。
synchronized(VolatileSyncDemo.class) {
if(instance==null) {
instance = new VolatileSyncDemo();
}
}
}
return instance;
}
}
將對象的引用保存到某個正確構(gòu)造對象的final類型域中(初始化安全性)
public class FinalDemo {
private final Map<String,String> states;
public FinalDemo(){
states=new HashMap<>();
states.put("mic","mic");
}
}
對還有final域的對象赤赊,其防止指令重排序的作用能防止對象初始化引用被重排序到構(gòu)造函數(shù)之外。后續(xù)使用的安全性還需要保障.
2.5 J.U.C核心之AQS
2.5.1 重入鎖ReentrantLock的初步認識
什么是鎖
鎖是用來解決多線程并發(fā)訪問共享資源所帶來的數(shù)據(jù)安全性問題的手段煞赢。
對一個共享資源加鎖后抛计,如果有一個線程獲得了鎖,那么其他線程無法訪問這個共享資源
什么是重入鎖照筑?
一個持有鎖的線程吹截,在釋放鎖之前,如果再次訪問加了該同步鎖的其他方法凝危,這個線程不需要再次爭搶鎖波俄,只需要記錄重入次數(shù)。
public class LockDemo {
static Lock lock=new ReentrantLock(); //重入鎖
public static int count=0;
public static void incr(){ //遞增
lock.lock(); //獲得鎖 線程A
try {
Thread.sleep(1);
decr();
count++; //count++ (只會由一個線程來執(zhí)行)
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
lock.unlock(); //釋放鎖
}
}
public static void decr(){ //遞減
lock.lock(); //有需要爭搶鎖 蛾默,線程A (不需要爭搶鎖懦铺,記錄重入次數(shù)即可)
try {
count -= 1;
}catch (Exception e) {
e.printStackTrace();
}finally {
lock.unlock(); //釋放鎖
}
}
public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 1000; i++) {
new Thread(LockDemo::incr).start();
}
Thread.sleep(4000);
System.out.println("result:"+count);
}
}
什么是AQS
AbstractQueuedSynchronizer,類如其名支鸡,抽象的隊列式的同步器冬念,AQS定義了一套多線程訪問共享資源的同步器框架,許多同步類實現(xiàn)都依賴于它牧挣,如常用的ReentrantLock/Semaphore/CountDownLatch...它維護了一個volatile int state(代表共享資源)和一個FIFO線程等待隊列(多線程爭用資源被阻塞時會進入此隊列)刘急。這里volatile是核心關(guān)鍵詞,具體volatile的語義浸踩,在此不述叔汁。state的訪問方式有三種:
- getState()
- setState()
- compareAndSetState()
AQS定義兩種資源共享方式:Exclusive(獨占,只有一個線程能執(zhí)行,如ReentrantLock)和Share(共享据块,多個線程可同時執(zhí)行码邻,如Semaphore/CountDownLatch)。
不同的自定義同步器爭用共享資源的方式也不同另假。自定義同步器在實現(xiàn)時只需要實現(xiàn)共享資源state的獲取與釋放方式即可像屋,至于具體線程等待隊列的維護(如獲取資源失敗入隊/喚醒出隊等),AQS已經(jīng)在頂層實現(xiàn)好了边篮。自定義同步器實現(xiàn)時主要實現(xiàn)以下幾種方法:
- isHeldExclusively():該線程是否正在獨占資源己莺。只有用到condition才需要去實現(xiàn)它。
- tryAcquire(int):獨占方式戈轿。嘗試獲取資源凌受,成功則返回true,失敗則返回false思杯。
- tryRelease(int):獨占方式胜蛉。嘗試釋放資源,成功則返回true色乾,失敗則返回false誊册。
- tryAcquireShared(int):共享方式。嘗試獲取資源暖璧。負數(shù)表示失敯盖印;0表示成功澎办,但沒有剩余可用資源嘲碱;正數(shù)表示成功,且有剩余資源浮驳。
- tryReleaseShared(int):共享方式悍汛。嘗試釋放資源捞魁,如果釋放后允許喚醒后續(xù)等待結(jié)點返回true至会,否則返回false。
以ReentrantLock為例谱俭,state初始化為0奉件,表示未鎖定狀態(tài)。A線程lock()時昆著,會調(diào)用tryAcquire()獨占該鎖并將state+1县貌。此后,其他線程再tryAcquire()時就會失敗凑懂,直到A線程unlock()到state=0(即釋放鎖)為止煤痕,其它線程才有機會獲取該鎖。當(dāng)然,釋放鎖之前摆碉,A線程自己是可以重復(fù)獲取此鎖的(state會累加)塘匣,這就是可重入的概念。但要注意巷帝,獲取多少次就要釋放多么次忌卤,這樣才能保證state是能回到零態(tài)的。
再以CountDownLatch以例楞泼,任務(wù)分為N個子線程去執(zhí)行驰徊,state也初始化為N(注意N要與線程個數(shù)一致)。這N個子線程是并行執(zhí)行的堕阔,每個子線程執(zhí)行完后countDown()一次棍厂,state會CAS減1。等到所有子線程都執(zhí)行完后(即state=0)印蔬,會unpark()主調(diào)用線程勋桶,然后主調(diào)用線程就會從await()函數(shù)返回,繼續(xù)后余動作侥猬。
一般來說例驹,自定義同步器要么是獨占方法,要么是共享方式退唠,他們也只需實現(xiàn)tryAcquire-tryRelease鹃锈、tryAcquireShared-tryReleaseShared中的一種即可。但AQS也支持自定義同步器同時實現(xiàn)獨占和共享兩種方式瞧预,如ReentrantReadWriteLock屎债。
CountDownLatch是什么?
countdownlatch是一個同步工具類垢油,它允許一個或多個線程一直等待盆驹,直到其他線程的操作執(zhí)行完畢再執(zhí)行。從命名可以解讀到
countdown是倒數(shù)的意思滩愁,類似于我們倒計時的概念躯喇。
public class CountDownLatchDemo {
public static void main(String[] args) throws InterruptedException {
//state存儲
CountDownLatch countDownLatch=new CountDownLatch(3);
new Thread(()->{
countDownLatch.countDown(); //倒計時 3-1=2
//修改state=state-1 通過cas設(shè)置到state這個字段上
}).start();
new Thread(()->{
countDownLatch.countDown(); //倒計時 2-1=1
}).start();
new Thread(()->{
countDownLatch.countDown(); //倒計時 1-1=0 ->觸發(fā)喚醒操作
}).start();
countDownLatch.await(); //阻塞主線程
System.out.println("線程執(zhí)行完畢");
}
}
什么是Semaphore
Semaphore 通常我們叫它信號量, 可以用來控制同時訪問特定資源的線程數(shù)量硝枉,通過協(xié)調(diào)各個線程廉丽,以保證合理的使用資源。
acquire()
獲取一個令牌妻味,在獲取到令牌正压、或者被其他線程調(diào)用中斷之前線程一直處于阻塞狀態(tài)。
release()
釋放一個令牌责球,喚醒一個獲取令牌不成功的阻塞線程焦履。
public class SemaphoreDemo {
public static void main(String[] args) {
//當(dāng)前可以獲得的最大許可數(shù)量是5個
//AQS ->state
Semaphore semaphore=new Semaphore(5);
for (int i = 0; i < 10; i++) {
new Car(i,semaphore).start();
}
}
static class Car extends Thread{
private int num;
private Semaphore semaphore;
public Car(int num, Semaphore semaphore) {
this.num = num;
this.semaphore = semaphore;
}
@Override
public void run() {
try {
semaphore.acquire(); //獲得一個許可,如果獲取不到許可拓劝,就會被阻塞
System.out.println("第"+num+" 占用一個停車位");
TimeUnit.SECONDS.sleep(2);
System.out.println("第"+num+" 倆車走了");
semaphore.release(); //釋放許可
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
什么是CyclicBarrier
CyclicBarrier 的字面意思是可循環(huán)使用(Cyclic)的屏障(Barrier)。它要做的事情是嘉裤,讓一組線程到達一個屏障(也可以叫同步點)時被阻塞凿将,直到最后一個線程到達屏障時,屏障才會開門价脾,所有被屏障攔截的線程才會繼續(xù)工作牧抵。
CyclicBarrier實例:
數(shù)據(jù)加載類,調(diào)用CyclicBarrier
public class DataImportThread extends Thread{
private String path;
private CyclicBarrier cyclicBarrier;
public DataImportThread(String path, CyclicBarrier cyclicBarrier) {
this.path = path;
this.cyclicBarrier = cyclicBarrier;
}
@Override
public void run() {
System.out.println("開始導(dǎo)入:"+path+" 位置的數(shù)據(jù)");
try {
cyclicBarrier.await(); //阻塞
//TODO
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
}
CycliBarrierDemo實例
public class CycliBarrierDemo extends Thread{
@Override
public void run() {
System.out.println("開始進行數(shù)據(jù)匯總和分析");
}
/**
* 1. parties ,如果因為某種原因?qū)е聸]有足夠多的線程來調(diào)用await,
* 這個時候會導(dǎo)致所有線程都會被阻塞
* 2. await(timeout,unit) 設(shè)置一個超市等待時間.
* 3. reset重置計數(shù),brokenBarrierException
* @param args
*/
public static void main(String[] args) {
//parties=3
CyclicBarrier cyclicBarrier=
new CyclicBarrier(3,new CycliBarrierDemo());
new DataImportThread("path1",cyclicBarrier).start();
new DataImportThread("path2",cyclicBarrier).start();
new DataImportThread("path3",cyclicBarrier).start();
//TODO 希望三個線程執(zhí)行結(jié)束之后荒给,再做一個匯總處理.
}
}
什么是condition
Condition是一個多線程協(xié)調(diào)通信的工具類,可以讓某些線程一起等待某個條件(condition)获枝,只有滿足條件時,線程才會被喚醒骇笔。
// condition 等待類
public class ConditionDemoWait extends Thread{
private Lock lock;
private Condition condition;
public ConditionDemoWait(Lock lock, Condition condition) {
this.lock = lock;
this.condition = condition;
}
/**
* synchronized(class){
* class.wait();
* }
*/
@Override
public void run() {
System.out.println("begin - ConditionDemoWait");
try {
lock.lock();//ThreadA獲得了鎖
condition.await(); //阻塞
System.out.println("end - ConditionDemoWait");
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
lock.unlock();
}
}
}
// condition 喚醒類
public class ConditionDemoNotify extends Thread{
private Lock lock;
private Condition condition;
public ConditionDemoNotify(Lock lock, Condition condition) {
this.lock = lock;
this.condition = condition;
}
/**
* synchronized(class){
* class.wait();
* }
*/
@Override
public void run() {
System.out.println("begin - ConditionDemoNotify");
lock.lock(); //線程B被阻塞在這個位置省店,由于被線程A喚醒,所以線程B繼續(xù)執(zhí)行
try {
condition.signal();//線程B會執(zhí)行這個方法
System.out.println("end - ConditionDemoNotify");
}finally {
lock.unlock();
}
}
}
// condition 主類
public class CondtionDemo {
public static void main(String[] args) {
Lock lock=new ReentrantLock();
Condition condition=lock.newCondition();
ConditionDemoWait conditionDemoWait=new ConditionDemoWait(lock,condition);
ConditionDemoNotify conditionDemoNotify=new ConditionDemoNotify(lock,condition);
conditionDemoWait.start();
conditionDemoNotify.start();
}
}
2.6 線程調(diào)度之線程池
2.6.1 什么是線程池
提前創(chuàng)建好若干個線程放在一個容器中笨触。如果有任務(wù)需要處理懦傍,則將任務(wù)直接分配給線程池中的線程來執(zhí)行,任務(wù)處理完以后這個線程不會被銷毀芦劣,而是等待后續(xù)分配任務(wù)粗俱。
2.6.2 Java 中提供的線程池
-
newFixedThreadPool
創(chuàng)建一個定長線程池,可控制線程最大并發(fā)數(shù)虚吟,超出的線程會在隊列中等待寸认。
public class ThreadPoolDemo implements Runnable{
public static void main(String[] args) {
ThreadPoolExecutor executorService=(ThreadPoolExecutor) ExecutorsSelf.newFixedThreadPool(3);
executorService.prestartAllCoreThreads(); //可以提前預(yù)熱所有核心線程
for (int i = 0; i < 100; i++) {
executorService.execute(new ThreadPoolDemo());
}
executorService.shutdown();
}
@Override
public void run() {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName());
}
}
-
newSingleThreadExecutor
創(chuàng)建一個單線程化的線程池,它只會用唯一的工作線程來執(zhí)行任務(wù)串慰,保證所有任務(wù)按照指定順序(FIFO, LIFO, 優(yōu)先級)執(zhí)行
-
newCachedThreadPool
創(chuàng)建一個可緩存線程池偏塞,如果線程池長度超過處理需要,可靈活回收空閑線程邦鲫,若無可回收灸叼,則新建線程。
-
newScheduledThreadPool
創(chuàng)建一個定長線程池掂碱,支持定時及周期性任務(wù)執(zhí)行.
線程池提交任務(wù)的邏輯圖
image-20220308222631001.png
2.6.3 線程池的監(jiān)控
線程池的監(jiān)控一般通過自己實現(xiàn)線程池的類怜姿,并在其中進行監(jiān)控慎冤。
public class ThreadPoolSelf extends ThreadPoolExecutor {
public ThreadPoolSelf(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
@Override
public void shutdown() {
super.shutdown();
}
//任務(wù)執(zhí)行開始
@Override
protected void beforeExecute(Thread t, Runnable r) {
//TODO 通過一個屬性來記錄任務(wù)的開始時間
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
//任務(wù)執(zhí)行結(jié)束
System.out.println("初始線程數(shù):"+this.getPoolSize());
System.out.println("核心線程數(shù):"+this.getCorePoolSize());
System.out.println("正在執(zhí)行的任務(wù)數(shù)量:"+this.getActiveCount());
System.out.println("已經(jīng)執(zhí)行的任務(wù)數(shù):"+this.getCompletedTaskCount());
System.out.println("任務(wù)總數(shù)"+this.getTaskCount());
}
}
2.6.4 帶返回值的線程
public class CallableFutureDemo implements Callable<String> {
@Override
public String call() throws Exception {
System.out.println("Hello Mic");
Thread.sleep(3000); //睡眠3s
return "Mic";
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
CallableFutureDemo callableFutureDemo=new CallableFutureDemo();
FutureTask futureTask=new FutureTask(callableFutureDemo);
new Thread(futureTask).start();
//get方法是屬于阻塞方法
System.out.println(futureTask.get());
ExecutorService executorService= Executors.newFixedThreadPool(1);
CallableFutureDemo callableFutureDemo2=new CallableFutureDemo();
FutureTask future=(FutureTask) executorService.submit(callableFutureDemo2);
System.out.println(future.get());
}
}
2.7 多線程并發(fā)拓展
2.7.1 死鎖發(fā)生的條件
- 互斥疼燥,共享資源 X 和 Y 只能被一個線程占用;
- 占有且等待蚁堤,線程 T1 已經(jīng)取得共享資源 X醉者,在等待共享資源 Y 的時候但狭,不釋放共享資源 X;
- 不可搶占撬即,其他線程不能強行搶占線程 T1 占有的資源立磁;
- 循環(huán)等待,線程 T1 等待線程 T2 占有的資源剥槐,線程 T2 等待線程 T1 占有的資源唱歧,就是循環(huán)等待。
四個條件同時滿足時粒竖,產(chǎn)生死鎖颅崩。
pojo
public class Account {
private String accountName;
private int balance; //余額
public Account(String accountName, int balance) {
this.accountName = accountName;
this.balance = balance;
}
public String getAccountName() {return accountName; }
public void setAccountName(String accountName) {this.accountName = accountName; }
public int getBalance() { return balance; }
public void setBalance(int balance) {this.balance = balance; }
public void debit(int amount){this.balance-=amount;}
public void credbit(int amount){this.balance+=amount;}
public class TransferAccount implements Runnable{
private Account fromAccount; //轉(zhuǎn)出賬戶
private Account toAccount; //轉(zhuǎn)入賬戶
private int amount;
public TransferAccount(Account fromAccount, Account toAccount, int amount) {
this.fromAccount = fromAccount;
this.toAccount = toAccount;
this.amount = amount;
}
@Override
public void run() {
while(true){
synchronized (fromAccount){
synchronized (toAccount){
if(fromAccount.getBalance()>=amount){
fromAccount.debit(amount);
toAccount.credbit(amount);
}
}
System.out.println(fromAccount.getAccountName()+"----"+fromAccount.getBalance());
System.out.println(toAccount.getAccountName()+"----"+toAccount.getBalance());
}
}
}
}
程序入口
public class TestMain {
public static void main(String[] args) {
Account fromAccount=new Account("張三",100000);
Account toAccount=new Account("李四",200000);
Allocator allocator=new Allocator(); //統(tǒng)一分配鎖
Thread a=new Thread(new TransferAccount01(fromAccount,toAccount,1,allocator));
Thread b=new Thread(new TransferAccount01(toAccount,fromAccount,2,allocator));
a.start();
b.start();
}
}
此時,該程序因為a b兩個賬戶互相爭搶資源導(dǎo)致死鎖蕊苗。
2.7.2 如何解決死鎖問題
一旦發(fā)生死鎖沿后,一般沒什么好的方法來解決,只能通過重啟應(yīng)用朽砰。所以如果要解決死鎖問題尖滚,最好的方式就是提前規(guī)避。
因為死鎖的四個條件中 互斥瞧柔,共享資源 X 和 Y 只能被一個線程占用 是先決條件漆弄,因此只能用其他三個條件入手。
-
同時申請兩個臨界資源
public class Allocator {private List<Object> list=new ArrayList<>();
/**- 申請資源
- @param from
- @param to
- @return
*/
synchronized boolean apply(Object from,Object to){
if(list.contains(from)||list.contains(to)){
return false;
}else {
list.add(from);
list.add(to);
return true;
}
}
synchronized void free(Object from,Object to){
list.remove(from);
list.remove(to);
}
}
public class TransferAccount implements Runnable{
@Override
public void run() {
while (true) {
// run方法改寫成如下:
if (allocator.apply(fromAccount, toAccount)) { //都會在這個地方去獲得臨界資源
try {
if (fromAccount.getBalance() >= amount) {
fromAccount.debit(amount);
toAccount.credbit(amount);
System.out.println(fromAccount.getAccountName() + "----" + fromAccount.getBalance());
System.out.println(toAccount.getAccountName() + "----" + toAccount.getBalance());
}
} finally {
allocator.free(fromAccount, toAccount);
}
}
}
}
}
此時因為程序會同時去獲取兩個臨界資源造锅,因此不會導(dǎo)致死鎖置逻。
- 當(dāng)搶資源失敗時,釋放已占有資源备绽,破壞不可搶占的條件券坞。
synchronized
天然具有阻塞性,搶占到鎖是就會阻塞肺素,用lock
來進行加鎖恨锚。
public class TransferAccount02 implements Runnable {
private Account fromAccount; //轉(zhuǎn)出賬戶
private Account toAccount; //轉(zhuǎn)入賬戶
private int amount;
Lock fromLock = new ReentrantLock();
Lock toLock = new ReentrantLock();
public TransferAccount02(Account fromAccount, Account toAccount, int amount) {
this.fromAccount = fromAccount;
this.toAccount = toAccount;
this.amount = amount;
}
@Override
public void run() {
while (true) {
// fromLock.tryLock() 返回布爾值,不會阻塞
if (fromLock.tryLock()) {
if (toLock.tryLock()) {
if (fromAccount.getBalance() >= amount) {
fromAccount.debit(amount);
toAccount.credbit(amount);
}
System.out.println(fromAccount.getAccountName() + "----" + fromAccount.getBalance());
System.out.println(toAccount.getAccountName() + "----" + toAccount.getBalance());
}
}
}
}
}
- 破壞循環(huán)等待倍靡,讓加鎖的循序一致猴伶。
@Override
public void run() {
Account left=null;
Account right=null;
if(fromAccount.hashCode()>toAccount.hashCode()){
left=toAccount;
right=fromAccount;
}
while(true){
synchronized (left) {
synchronized (right) {//申請不到資源,就已經(jīng)阻塞
if (fromAccount.getBalance() >= amount) {
fromAccount.debit(amount);
toAccount.credbit(amount);
}
}
System.out.println(fromAccount.getAccountName() + "----" + fromAccount.getBalance());
System.out.println(toAccount.getAccountName() + "----" + toAccount.getBalance());
}
}
}
2.7.3 單機版 MapReduce-Fork-Join
2.7.3.1什么是Fork-Join
Fork/Join框架是Java 7提供的一個用于并行執(zhí)行任務(wù)的框架塌西,是一個把大任務(wù)分割成若干個小任務(wù)他挎,最終匯總每個小任務(wù)結(jié)果后得到大任務(wù)結(jié)果的框架。
2.7.3.2 Fork-Join 實例
public class ForkJoinDemo extends RecursiveTask<Integer> {
// 分隔的閾值
private final int THREADHOLD = 11;
private int start;
private int end;
public ForkJoinDemo(int start, int end) {
this.start = start;
this.end = end;
}
@Override
protected Integer compute() {
int sum = 0;
if (end - start > THREADHOLD) {
int middle = (end + start) /2;
ForkJoinDemo left = new ForkJoinDemo(start, middle);
ForkJoinDemo right = new ForkJoinDemo(middle + 1, end);
left.fork();
right.fork();
int leftResult = left.join();
int rightResult = right.join();
sum = leftResult + rightResult;
} else {
System.out.println("start:" + start + " end:" + end);
for (int i = start; i <= end; ++i) {
sum += i;
}
}
return sum;
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
ForkJoinPool forkJoinPool = new ForkJoinPool();
ForkJoinDemo forkJoinDemo = new ForkJoinDemo(1, 100);
Future<Integer> submit = forkJoinPool.submit(forkJoinDemo);
System.out.println(submit.get());
}
}