目錄
1.并行與并發(fā)
2.進(jìn)程與線程
---- 2.1 進(jìn)程
---- 2.2 線程
---- 2.3 進(jìn)程與線程的區(qū)別
---- 2.4 線程調(diào)度
3.線程的創(chuàng)建方式
---- 3.1 繼承方式
---- 3.2 實(shí)現(xiàn)方式
---- 3.3 匿名內(nèi)部類方式
---- 3.4 繼承和實(shí)現(xiàn)的區(qū)別
---- 3.5 線程資源在內(nèi)存中的分布
---- 3.6 線程池方式
---- ---- 3.6.1 什么是線程池
---- ---- 3.6.2 線程池的使用
---- ---- 3.3.3 線程池的種類
4.高并發(fā)與線程安全
---- 4.1 悲觀鎖方式解決線程安全問(wèn)題
---- ---- 4.1.2 問(wèn)題代碼示例
---- ---- 4.1.3 synchronized 關(guān)鍵字 (悲觀鎖)
---- ---- ---- 4.1.3.1 同步代碼塊
---- ---- ---- 4.1.3.2 同步方法
---- ---- 4.1.4 Lock
---- ---- 4.1.5 死鎖
---- 4.2 樂(lè)觀鎖的方式解決線程安全問(wèn)題
---- ---- 4.2.1 不可見性
---- ---- 4.2.2 無(wú)序性
---- ---- 4.2.3 原子性
---- ---- 4.2.4 原子類
---- ---- 4.2.5 CAS (樂(lè)觀鎖)
---- ---- 4.2.6 原子操作引發(fā)的 ABA 問(wèn)題
---- 4.3 樂(lè)觀鎖與悲觀鎖的區(qū)別
---- 4.4 并發(fā)工具包
---- ---- 4.4.1 并發(fā)容器
---- ---- ---- CopyOnWriteArrayList
---- ---- ---- CopyOnWriteArraySet
---- ---- ---- ConcurrentHashMap
---- ---- 4.4.2 并發(fā)工具類
---- ---- ---- CountDownLatch
---- ---- ---- CyclicBarrier
---- ---- ---- Semaphore
---- ---- ---- Exchanger
5.線程的狀態(tài)
---- 5.1 產(chǎn)者消費(fèi)者案例一
---- 5.1 產(chǎn)者消費(fèi)者案例二
1. 并行與并發(fā)
- 并行: 指多個(gè)事件在同一時(shí)刻發(fā)生;
- 并發(fā): 指多個(gè)事件在同一個(gè)時(shí)間段內(nèi)發(fā)生;
在操作系統(tǒng)中, 這裝了多個(gè)程序, 并發(fā)的是在一段時(shí)間內(nèi)看起來(lái)有多個(gè)程序同時(shí)運(yùn)行, 實(shí)際上是由CPU調(diào)度的分時(shí)交替運(yùn)行; 如果CPU的調(diào)度時(shí)間比較長(zhǎng), 那么這個(gè)過(guò)程就會(huì)比較清晰, 在一段時(shí)間內(nèi)只有一個(gè)線程在運(yùn)行;
2. 進(jìn)程與線程
2.1 進(jìn)程
進(jìn)程是程序的一次執(zhí)行過(guò)程衙荐,是系統(tǒng)運(yùn)行程序的基本單位;系統(tǒng)運(yùn)行一個(gè)程序即是一個(gè)進(jìn)程,從創(chuàng)建溜徙、運(yùn)行到消亡的過(guò)程;每個(gè)進(jìn)程都有一個(gè)獨(dú)立的內(nèi)存空間曲伊,一個(gè)應(yīng)用程序可以同時(shí)運(yùn)行多個(gè)進(jìn)程首昔;
- 每個(gè)進(jìn)程都有一個(gè)獨(dú)立的內(nèi)存空間, 進(jìn)程與進(jìn)程之間互不影響;
- 一個(gè)應(yīng)用程序可以有多個(gè)進(jìn)程;
2.1 線程
線程是進(jìn)程中一段程序的不同執(zhí)行路線流程,是進(jìn)程中的一個(gè)執(zhí)行單元拯田,負(fù)責(zé)當(dāng)前進(jìn)程中程序的執(zhí)行;
- 線程是擁有資源和獨(dú)立運(yùn)行的最小單位;
- 每個(gè)線程都有單獨(dú)的內(nèi)存空間;
- 一個(gè)進(jìn)程之后, 可以有多個(gè)線程共享進(jìn)程資源;
一個(gè) Java 程序其實(shí)就是一個(gè)進(jìn)程, 而一個(gè)進(jìn)程一次只能執(zhí)行一條線程, 所以 Java 只有高并發(fā)甩十;
2.3 進(jìn)程與線程的區(qū)別
- 進(jìn)程: 有獨(dú)立的內(nèi)存空間, 進(jìn)程中的數(shù)據(jù)存放空間是獨(dú)立的(棧和堆);
- 一個(gè) Java 程序至少有兩個(gè)線程, main線程和GC(垃圾回收)線程 ;
- 線程: 堆空間是共享的, 棿樱空間是獨(dú)立的, 線程通信與轉(zhuǎn)換消耗的資源比進(jìn)程小很多;
2.4 線程調(diào)度
-
分時(shí)調(diào)度
所有線程輪流使用 CPU 的使用權(quán), 平均分配每個(gè)線程占用 CPU 的時(shí)間 (這個(gè)時(shí)間會(huì)非常非常的短);
-
搶占式調(diào)度
優(yōu)先讓優(yōu)先級(jí)高的線程使用 CPU, 如果線程的優(yōu)先級(jí)相同, 那么會(huì)隨機(jī)選擇一個(gè)(線程隨機(jī)性);
Java使用的為搶占式調(diào)度;
3.線程的創(chuàng)建方式
3.1 繼承方式
// Java 中 Thread 表示一個(gè)線程
// Runnable接口表示執(zhí)行體函數(shù)的回調(diào), 而Thread實(shí)現(xiàn)了Runnable接口
public class MyThread extends Thread {
// 線程執(zhí)行體
@Override
public void run() {
}
}
public class ThreadTest {
public static void main(String[] args) {
new MyThread().start();
}
}
3.2 實(shí)現(xiàn)方式
// Runnable接口的回調(diào)就是線程執(zhí)行體
public class MyRunnable implements Runnable {
// 線程執(zhí)行體
@Override
public void run() {
}
}
public class ThreadTest {
public static void main(String[] args) {
MyRunnable runnable = new MyRunnable();
new Thread(runnable).start();
}
}
3.3 匿名內(nèi)部類方式
public class ThreadTest {
public static void main(String[] args) {
Runnable runnable = new Runnable() {
@Override
public void run() {
}
};
new Thread(runnable).start();
}
}
3.4 繼承和實(shí)現(xiàn)的區(qū)別
- 實(shí)現(xiàn)的方式將線程 (Thread) 和任務(wù) (Runnable的run方法中的內(nèi)容) 分開了, 而繼承是綁在了一起
- Runnable 中的資源可以被多個(gè)線程 (Thread) 共享
- 實(shí)現(xiàn)解耦操作, 增強(qiáng)程序的健壯性和擴(kuò)展性
- Java 中線程池只能支持實(shí)現(xiàn)的Runnable 或 Callable 類線程, 不支持繼承Thread的類
3.5 線程資源在內(nèi)存中的分布
- 一個(gè) Java 程序至少有兩個(gè)線程, 一個(gè)的 Main 方法所在的線程, 另一個(gè)是垃圾回收線程(GC);
- 當(dāng)線程啟動(dòng)時(shí), 會(huì)在棧內(nèi)存中開辟出一塊獨(dú)立空間 (每個(gè)線程都會(huì)有一塊獨(dú)立的棧空間);
- 此空間中保存 基本數(shù)據(jù)類型的對(duì)象和引用數(shù)據(jù)類型對(duì)象的引用 (堆空間中的地址), 也就是說(shuō)基本所有的 線程都共享堆空間;
- 線程中的 靜態(tài)變量會(huì)保存在方法區(qū) (也叫靜態(tài)區(qū), 包含整個(gè)程序中永遠(yuǎn)唯一的元素, 也就是 class 和 static變量); (data segment 與 code segment , 前面存放靜態(tài)變量或字符串常量, 后者存放類中的方法 )
- 當(dāng)線程中的方法被調(diào)用時(shí), 該 方法會(huì)入此線程所在的棧 空間;
- Main 線程是最后出棧的, 基本代表程序的結(jié)束;
3.6 線程池方式
3.6.1 什么是線程池
線程池是 Java 提供的為我們管理和使用線程對(duì)象的池, 使用線程池不必再關(guān)心線程的頻繁創(chuàng)建與回收等等;
Java 里面線程池的頂級(jí)接口是 Executor, 但是嚴(yán)格意義上講 Executor 并不是一個(gè)線程池, 而只是一個(gè)執(zhí)行線程的工具; 真正的線程池接口是 ExecutorService;
3.6.2 線程池的使用
示例一:
public class ThreadTest {
public static void main(String[] args) {
// 創(chuàng)建線程池,并指定線程池中初始化線程的個(gè)數(shù)
ExecutorService es = Executors.newFixedThreadPool(3);
// 提交無(wú)返回值的任務(wù),并執(zhí)行任務(wù)
Future<?> submit1 = es.submit(new Runnable() {
@Override
public void run() {
}
});
// 提交有返回值的任務(wù),并執(zhí)行任務(wù)
Future<String> submit = es.submit(new Callable<String>() {
@Override
public String call() throws Exception {
return "yiGeSiren";
}
});
try {
System.out.println(submit1.get());
System.out.println(submit.get());
} catch (Exception e) {
e.printStackTrace();
}
}
}
示例二:
public class ThreadTest {
public static void main(String[] args) {
// 創(chuàng)建線程池,并指定線程池中初始化線程的個(gè)數(shù)
ExecutorService es = Executors.newFixedThreadPool(3);
Callable<String> runnable = new Callable<String>() {
@Override
public String call() throws Exception {
System.out.println(Thread.currentThread().getName()+
"開始: 實(shí)現(xiàn)Callable接口的任務(wù)...");
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+
"結(jié)束:實(shí)現(xiàn)Callable接口的任務(wù)...");
return "tiGeSiRen";
}
};
es.submit(runnable);
es.submit(runnable);
es.submit(runnable);
es.submit(runnable);
es.submit(runnable);
es.submit(runnable);
es.submit(runnable);
}
}
對(duì)于上述代碼線程池中的線程來(lái)說(shuō), 當(dāng)任務(wù)多于線程個(gè)數(shù)時(shí), 任務(wù)會(huì)等待, 直到有空閑的線程從線程池中釋放出來(lái);
3.6.2 線程池的種類
-
1. newCachedThreadPool
創(chuàng)建一個(gè)可緩存的線程池, 如果線程池長(zhǎng)度超過(guò)處理需要, 會(huì)終止并從緩存中移除那些已有 60 秒鐘未被使用的線程;
如果線程池長(zhǎng)度小于任務(wù)數(shù), 則會(huì)將已完成任務(wù)的線程加入緩存, 然后從緩存中取出線程去執(zhí)行新的任務(wù);
如果緩存中沒有空閑進(jìn)程則會(huì)創(chuàng)建一個(gè)新的線程;public class ThreadTest { public static void main(String[] args) { ExecutorService es = Executors.newCachedThreadPool(); Callable<String> runnable = new Callable<String>() { @Override public String call() throws Exception { System.out.println(Thread.currentThread().getName()+" 開始: 實(shí)現(xiàn)Callable接口的任務(wù)..."); try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } // System.out.println(Thread.currentThread().getName()+" 結(jié)束:實(shí)現(xiàn)Callable接口的任務(wù)..."); return "tiGeSiRen"; } }; es.submit(runnable); es.submit(runnable); es.submit(runnable); es.submit(runnable); es.submit(runnable); es.submit(runnable); es.submit(runnable); es.shutdown(); } }
打印結(jié)果
pool-1-thread-6 開始: 實(shí)現(xiàn)Callable接口的任務(wù)...
pool-1-thread-4 開始: 實(shí)現(xiàn)Callable接口的任務(wù)...
pool-1-thread-7 開始: 實(shí)現(xiàn)Callable接口的任務(wù)...
pool-1-thread-5 開始: 實(shí)現(xiàn)Callable接口的任務(wù)...
pool-1-thread-1 開始: 實(shí)現(xiàn)Callable接口的任務(wù)...
pool-1-thread-3 開始: 實(shí)現(xiàn)Callable接口的任務(wù)...
pool-1-thread-2 開始: 實(shí)現(xiàn)Callable接口的任務(wù)...從上面的打印結(jié)果可以看出創(chuàng)建了七個(gè)線程對(duì)象;
-
2. newFixedThreadPool
指定線程個(gè)數(shù)的線程池, 也就是說(shuō)當(dāng)任務(wù)數(shù)大于線程個(gè)數(shù)時(shí), 任務(wù)只能等待線程空閑出來(lái);
-
3. newScheduledThreadPool
創(chuàng)建一個(gè)指定線程個(gè)數(shù)的線程池, 支持定時(shí)及執(zhí)行周期性任務(wù);
public class ThreadTest { public static void main(String[] args) { ScheduledExecutorService es = Executors.newScheduledThreadPool(2); Callable<String> runnable = new Callable<String>() { @Override public String call() throws Exception { System.out.println(Thread.currentThread().getName() + " 開始: 實(shí)現(xiàn)Callable接口的任務(wù)..."); try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } return "tiGeSiRen"; } }; es.schedule(runnable, 2000, TimeUnit.MILLISECONDS); es.schedule(runnable, 2000, TimeUnit.MILLISECONDS); es.schedule(runnable, 2000, TimeUnit.MILLISECONDS); es.schedule(runnable, 2000, TimeUnit.MILLISECONDS); es.schedule(runnable, 2000, TimeUnit.MILLISECONDS); es.shutdown(); } }
pool-1-thread-1 開始: 實(shí)現(xiàn)Callable接口的任務(wù)...
pool-1-thread-2 開始: 實(shí)現(xiàn)Callable接口的任務(wù)...
pool-1-thread-1 開始: 實(shí)現(xiàn)Callable接口的任務(wù)...
pool-1-thread-2 開始: 實(shí)現(xiàn)Callable接口的任務(wù)...
pool-1-thread-2 開始: 實(shí)現(xiàn)Callable接口的任務(wù)... -
4. newSingleThreadExecutor
創(chuàng)建一個(gè)單線程的線程池; 這個(gè)線程池只有一個(gè)線程在工作, 也就是相當(dāng)于單線程串行執(zhí)行所有任務(wù);
如果這個(gè)唯一的線程因?yàn)楫惓=Y(jié)束, 那么會(huì)有一個(gè)新的線程來(lái)替代它, 此線程池保證所有任務(wù)的執(zhí)行順序按照任務(wù)的提交順序執(zhí)行;
4 高并發(fā)與線程安全
- 高并發(fā): 是指在某個(gè)時(shí)間點(diǎn)上, 有大量的用戶(線程)同時(shí)訪問(wèn)同一資源;
- 線程安全: 在某個(gè)時(shí)間點(diǎn)上, 當(dāng)大量用戶(線程)訪問(wèn)同一資源時(shí), 由于多線程運(yùn)行機(jī)制的原因, 可能會(huì)導(dǎo)致被訪問(wèn)的資源出現(xiàn)"數(shù)據(jù)污染"的問(wèn)題;
4.1 悲觀鎖方式的解決線程安全問(wèn)題
4.1.2 問(wèn)題代碼示例
public class ThreadTest {
public static void main(String[] args) {
MyRunnable myRunnable = new MyRunnable();
new Thread(myRunnable,"線程一").start();
new Thread(myRunnable,"線程二").start();
new Thread(myRunnable,"線程三").start();
}
}
public class MyRunnable implements Runnable {
int tickets = 100;
@Override
public void run() {
while (true) {
if (tickets < 1) {
break;
}
System.out.println(Thread.currentThread().getName() + "正在出售第" +
tickets + "張票");
tickets--;
}
}
}
上面的代碼會(huì)執(zhí)行的結(jié)果會(huì)有幾個(gè)問(wèn)題:
- 多條線程出現(xiàn)賣重復(fù)票
- 會(huì)出現(xiàn)賣負(fù)數(shù)票
- 有些票沒有出售
出現(xiàn)這些問(wèn)題的原因, 是因?yàn)樵?Java 中線程是搶占式調(diào)度, 所以當(dāng)線程在執(zhí)行任務(wù)的時(shí)候, 會(huì)被其他線程打斷;
解決方式一: 使用 synchronized 關(guān)鍵字加鎖
解決方式二: 使用 Lock 加鎖
解決方式三: 使用 volatile 關(guān)鍵字加 原子類
4.1.3 synchronized 關(guān)鍵字 (悲觀鎖)
synchronized 可以作用在 方法中的某個(gè)區(qū)塊 或 函數(shù) 上, 表示只對(duì)這個(gè)區(qū)塊的資源或函數(shù)進(jìn)行互斥訪問(wèn); 也就是說(shuō), 當(dāng)有線程訪問(wèn)到這個(gè)區(qū)塊或函數(shù)時(shí), 其它的線程只能先等待;
需要注意:
- 鎖對(duì)象可以是任意對(duì)象;
- 但是同步線程加鎖的對(duì)象要一致;
- 同步函數(shù)的鎖對(duì)象; 如果這個(gè)函數(shù)是靜態(tài)函數(shù), 那么鎖對(duì)象是這個(gè)函數(shù)所在的 類.class; 如果這個(gè)函數(shù)是非靜態(tài)函數(shù), 那么鎖對(duì)象是調(diào)用這個(gè)函數(shù)的對(duì)象, 也就是 this ;
4.1.3.1 同步代碼塊
使用同步代碼塊方式, 解決 4.1.2 代碼示例中的問(wèn)題; 代碼如下:
public class ThreadTest {
public static void main(String[] args) {
MyRunnable myRunnable = new MyRunnable();
new Thread(myRunnable,"線程一").start();
new Thread(myRunnable,"線程二").start();
new Thread(myRunnable,"線程三").start();
}
}
public class MyRunnable implements Runnable {
int tickets = 50;
@Override
public void run() {
while (true) {
synchronized (this){
if (tickets < 1) {
break;
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "正在出售第"
+ tickets + "張票");
tickets--;
}
}
}
}
4.1.3.2 同步方法
使用同步方法方式, 解決 4.1.2 代碼示例中的問(wèn)題; 代碼如下:
public class ThreadTest {
public static void main(String[] args) {
MyRunnable myRunnable = new MyRunnable();
new Thread(myRunnable,"線程一").start();
new Thread(myRunnable,"線程二").start();
new Thread(myRunnable,"線程三").start();
}
}
public class MyRunnable implements Runnable {
int tickets = 50;
@Override
public void run() {
while (true) {
if (sellTickets()) {
break;
}
}
}
public synchronized boolean sellTickets() {
if (tickets < 1) {
return true;
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "正在出售第" + tickets
+ "張票");
tickets--;
return false;
}
}
4.1.4 Lock
Lock 也稱同步鎖, 是一個(gè)接口, 其實(shí)現(xiàn)類為 ReentrantLock ;
使用 Lock 鎖解決 4.1.2 代碼示例中的問(wèn)題; 代碼如下:
public class MyRunnable implements Runnable {
Lock lock = new ReentrantLock();
int tickets = 50;
@Override
public void run() {
while (true) {
// 加鎖
lock.lock();
if (tickets < 1) {
// 循環(huán)退出 也要解鎖
lock.unlock();
break;
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "正在出售第" +
tickets + "張票");
tickets--;
// 解鎖
lock.unlock();
}
}
}
4.1.5 死鎖
- 什么是死鎖
多線程程序中, 使用了多把鎖, 造成線程之間相互等待;- 產(chǎn)生死鎖的條件
- 有多個(gè)線程;
- 有多把鎖
- 有同步代碼塊嵌套
- 案例:
線程A : 需要獲取A鎖, 再獲取B鎖, 才能執(zhí)行里面的代碼
線程B : 需要獲取B鎖, 再獲取A鎖, 才能執(zhí)行里面的代碼
代碼示例:
public class ThreadTest {
public static void main(String[] args) {
new Thread(new Runnable() {
@Override
public void run() {
synchronized ("鎖A"){
System.out.println("線程A獲取到了鎖A,等待獲取鎖B");
synchronized ("鎖B"){
System.out.println("線程A獲取到了鎖A,鎖B 開始執(zhí)行里面的代碼");
}
}
}
},"線程A").start();
new Thread(new Runnable() {
@Override
public void run() {
synchronized ("鎖B"){
System.out.println("線程B獲取到了鎖B,等待獲取鎖A");
synchronized ("鎖A"){
System.out.println("線程B獲取到了鎖B,鎖A 開始執(zhí)行里面的代碼");
}
}
}
},"線程B").start();
}
}
打印結(jié)果:
線程A獲取到了鎖A,等待獲取鎖B
線程B獲取到了鎖B,等待獲取鎖A (程序到這里是沒有結(jié)束的)
4.2 樂(lè)觀鎖的方式解決線程安全問(wèn)題
使用樂(lè)觀鎖方式解決線程安全問(wèn)題, 需要解決不可見性, 無(wú)序性和非原子操作的問(wèn)題;
4.2.1 不可見性
代碼示例:
public class ThreadTest {
public static void main(String[] args) {
MyThread thread = new MyThread();
// 在子線程中修改 flag 變量為True
thread.start();
while (true){
// 檢測(cè)到線程中 flag 變量為 true 之后結(jié)否死循環(huán)
if (MyThread.flag){
System.out.println("結(jié)束 Main 線程中的死循環(huán)!");
break;
}
}
}
}
public class MyThread extends Thread {
static boolean flag = false;
@Override
public void run() {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
flag = true;
System.out.println("修改了flag變量為true");
}
}
期望結(jié)果:
子線程進(jìn)行睡眠3秒, 主線程一直死循環(huán);
子線程醒了之后,修改flag=true,子線程結(jié)束,主線程結(jié)束;
實(shí)際結(jié)果:
子線程進(jìn)入睡眠3秒,主線程一直死循環(huán);
子線程醒了之后,修改flag=true,子線程結(jié)束,但是主線程結(jié)束不了;
原因:
子線程對(duì)共享變量flag的值進(jìn)行了修改,而主線程沒有看見(沒有獲取到修改后的值);
為什么主線程獲取不到子線程對(duì)共享變量flag修改后的值
- Java 內(nèi)存模型 (Java Memory Modle) JMM 描述了 Java 程序中各種變量的訪問(wèn)規(guī)則 (靜態(tài)共享變量) , 以及 JVM 在將變量 讀取到內(nèi)存 和將變量 存儲(chǔ)到內(nèi)存 這樣的底層細(xì)節(jié);
- 線程中的 靜態(tài)變量會(huì)保存在方法區(qū) (也叫靜態(tài)區(qū), 包含整個(gè)程序中永遠(yuǎn)唯一的元素, 也就是 class 和 static變量); (data segment 與 code segment , 前面存放靜態(tài)變量或字符串常量, 后者存放類中的方法 )
- 不同的線程會(huì)在棧區(qū)中有 各自的工作空間;
- 當(dāng)線程訪問(wèn)共享靜態(tài)變量時(shí), 線程會(huì)將這個(gè)變量 拷備一份到自己的工作區(qū) (也就是說(shuō), 在上面代碼中獲取到的變量值是在線程獨(dú)立的棧區(qū)), 進(jìn)行讀寫的操作;
- 但是不會(huì)將在棧區(qū)操作后的變量值 寫回主內(nèi)存(也就是靜態(tài)區(qū));
volatile關(guān)鍵字可以解決線程中的可見性問(wèn)題
public class MyThread extends Thread {
// 當(dāng)共享變量被volatile修飾,會(huì)強(qiáng)制讓線程每次獲取變量的值都從主內(nèi)存(方法區(qū)也就靜態(tài)區(qū))中去獲取;
volatile static boolean flag = false;
@Override
public void run() {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
flag = true;
System.out.println("修改了flag變量為true");
}
}
4.2.2 無(wú)序性
無(wú)序性是指 內(nèi)存訪問(wèn)順序與得到的字節(jié)碼順序 不一樣 或者 字節(jié)碼順序與實(shí)際的執(zhí)行順序 不一樣; 是由 JIT 動(dòng)態(tài)編譯 (會(huì)重排序) 優(yōu)化的原因造成的, 靜態(tài)編譯是不會(huì)有這樣的問(wèn)題產(chǎn)生的;
volatile 關(guān)鍵字修飾變量可以禁止變量相關(guān)的指令重排序;
http://www.reibang.com/p/119ffdcef55a
https://baijiahao.baidu.com/s?id=1662251623172268398&wfr=spider&for=pc
4.2.3 原子性
原子性是指在一次或多次操作中, 要么全部執(zhí)行不被中斷, 要么全部不執(zhí)行;
代碼示例如下:
public class ThreadTest {
public static void main(String[] args) {
// 子線程1000000次自增
new MyThread().start();
// main線程1000000次自增
for (int i = 0; i < 1000000; i++) {
MyThread.a++;
}
System.out.println("Main線程的1000000次自增結(jié)束了");
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("a的值為" + MyThread.a);
}
}
public class MyThread extends Thread {
volatile static int a = 0;
@Override
public void run() {
for (int i = 0; i < 1000000; i++) {
a++;
}
System.out.println("子線程的1000000次自增結(jié)束了");
}
}
期望打印結(jié)果
Main線程的1000000次自增結(jié)束了
子線程的1000000次自增結(jié)束了
a的值為2000000
實(shí)際打印結(jié)果
Main線程的1000000次自增結(jié)束了
子線程的1000000次自增結(jié)束了
a的值為1803604(實(shí)際上小于2000000)
原因: 兩個(gè)線程對(duì)a的自增操作, 產(chǎn)生了覆蓋效果; 就是這樣一種情況main線程自增一次之后等于1, 而子線程自增加之后也等于1, 而實(shí)際上是等于2了;
并且Volatile不能解決原子性問(wèn)題, 實(shí)際上就是兩個(gè)或兩個(gè)以上的線程同時(shí)對(duì)主內(nèi)存的變量進(jìn)行了寫的操作, 正確的應(yīng)該是當(dāng)多個(gè)線程同時(shí)對(duì)主內(nèi)存的變量時(shí)行寫操作時(shí), 只有一個(gè)線程可以進(jìn)行寫操作, 在等待當(dāng)前線程寫完之后其它的線程需要重要讀取變量的值;
4.2.4 原子類
在 java.util.concurrent.atomic 包下提供了一系的具有 原子性的原子類 API ;
它們可以保證對(duì)“變量”操作的:原子性侣监、有序性鸭轮、可見性; 其工作原理是基于 CAS 機(jī)制;
以 AtomicInteger 為例驗(yàn)證上面4.2.3示例代碼中的原子性問(wèn)題:
public class ThreadTest {
public static void main(String[] args) {
// 子線程1000000次自增
new MyThread().start();
// main線程1000000次自增
for (int i = 0; i < 1000000; i++) {
MyThread.a.addAndGet(1);
}
System.out.println("Main線程的1000000次自增結(jié)束了");
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("a的值為" + MyThread.a);
}
}
public class MyThread extends Thread {
static AtomicInteger a = new AtomicInteger(0);
@Override
public void run() {
for (int i = 0; i < 1000000; i++) {
a.addAndGet(1);
}
System.out.println("子線程的1000000次自增結(jié)束了");
}
}
Main線程的1000000次自增結(jié)束了
子線程的1000000次自增結(jié)束了
a的值為2000000
4.2.5 CAS (樂(lè)觀鎖)
CAS (Compare and Swap/Set) 比較并交換, CAS 的算法過(guò)程是: 它包含3個(gè)參數(shù)CAS(V,E,N) ;
- V 表示要更新的變量(內(nèi)存值), E表示預(yù)期值(舊的), N表示新值;
- 當(dāng)且僅當(dāng) V 值等于 E 值是, 才會(huì)將 N 值覆給 V 值;
- 如果 V 值不等于 E 值, 表示當(dāng)前有其它線程正在進(jìn)行操作, 那么當(dāng)前線程則什么都不做, 返回 V 的真實(shí)值;
- 當(dāng)多個(gè)線程同時(shí)使用 CAS 操作一個(gè)變量時(shí), 只有一個(gè)會(huì)勝出, 并成功更新, 其余均會(huì)失敗; 失敗的線程不會(huì)被掛起, 僅是被告知失敗, 并且允許再次嘗試, 當(dāng)然也允許失敗的線程放棄操作;
4.2.6 原子操作引發(fā)的 ABA 問(wèn)題
4.3 樂(lè)觀鎖與悲觀鎖的區(qū)別
- 樂(lè)觀鎖: 樂(lè)觀思想, 認(rèn)為讀多寫少, 遇到并發(fā)寫的可能性低; 每次去拿數(shù)據(jù)的時(shí)候都認(rèn)為別人不會(huì)修改, 所以不會(huì)上鎖, 但是在更新的時(shí)候會(huì)判斷一下有么有其它線程在此期間去更新這個(gè)數(shù)據(jù), 采取在寫入數(shù)據(jù)的時(shí)候加鎖的操作, 也就是說(shuō)在 寫入數(shù)據(jù) 時(shí)候, 其它線程是可以進(jìn)行讀操作 的;
- 悲觀鎖: 悲觀思想, 認(rèn)為寫多, 遇到并發(fā)寫的可能性高; 每次去拿數(shù)據(jù)的時(shí)候都認(rèn)為別人會(huì)修改, 所以每次在讀寫數(shù)據(jù)的時(shí)候都會(huì)上鎖, 這樣別人想讀寫這個(gè)數(shù)據(jù)就會(huì)block直到拿到鎖;
4.4 并發(fā)工具包
java.util.concurrent 下提供的一系列的處理并發(fā)問(wèn)題的容器和工具類
4.4.1 并發(fā)容器
CopyOnWriteArrayList
與 ArrayList 相關(guān)操作一致, 不同的是 CopyOnWriteArrayList 就線程安全的, 其內(nèi)部源碼使用了同步代碼塊的方式, 并且 new 了一個(gè) Object 對(duì)象作為鎖;
CopyOnWriteArraySet
與 CopyOnWriteArrayList 的相關(guān)操作一致, 是依賴 CopyOnWriteArrayList 實(shí)現(xiàn)的, 額外加了兩個(gè)函數(shù) addIfAbsent() 和 addAllAbsent() 來(lái)保證元素的唯 一性, 元素已經(jīng)在列表中, 就不往里面添加了;
ConcurrentHashMap
并發(fā)基于分段鎖思想;
4.4.2 并發(fā)工具類
CountDownLatch
利用這個(gè)類可以實(shí)現(xiàn)類似計(jì)數(shù)器的功能; 比如: 線程C 只有在 線程A 和線程B 走完 之后才能執(zhí)行; 示例示碼如下:
public class ThreadTest {
public static void main(String[] args) {
// 初始計(jì)數(shù)值為 2
CountDownLatch countDownLatch = new CountDownLatch(2);
// 線程c
new Thread(new Runnable() {
@Override
public void run() {
try {
// 當(dāng)基數(shù)值減為0時(shí), 等待的線程C 被喚醒執(zhí)行
countDownLatch.await();
Thread.sleep(3000);
System.out.println("線程C執(zhí)行完畢");
} catch (Exception e) {
e.printStackTrace();
}
}
}, "線程c").start();
// 線程 A
new Thread(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("線程A執(zhí)行完畢");
// 每次調(diào)用 countDown 函數(shù)計(jì)數(shù)值都會(huì)減一
countDownLatch.countDown();
}
}, "線程A").start();
// 線程 B
new Thread(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("線程B執(zhí)行完畢");
// 每次調(diào)用 countDown 函數(shù)計(jì)數(shù)值都會(huì)減一
countDownLatch.countDown();
}
}, "線程B").start();
}
}
CyclicBarrier
通過(guò)它可以實(shí)現(xiàn)讓一組線程等待至某個(gè)狀態(tài)之后再全部同時(shí)執(zhí)行, 當(dāng)所有等待線程都被釋放以后,CyclicBarrier 可以被重用;
示例:公司召集5名員工開會(huì)橄霉,等5名員工都到了窃爷,會(huì)議開始。
?---------創(chuàng)建5個(gè)員工線程姓蜂,1個(gè)開會(huì)線程按厘,幾乎同時(shí)啟動(dòng),使用 CyclicBarrier 保證5名員工線程全部執(zhí)行后钱慢,再執(zhí)行開會(huì)線程;
---------如果是再有5個(gè)員工線程, 重新進(jìn)入, 也可以重新開始開會(huì) ( CyclicBarrier 可被重復(fù)利用);
public class ThreadTest {
public static void main(String[] args) {
CyclicBarrier cyclicBarrier = new CyclicBarrier(5, new Runnable() {
// 當(dāng)達(dá)到解放出阻塞線程的解開條件時(shí),會(huì)回調(diào)此函數(shù)
@Override
public void run() {
System.out.println("可以開始開會(huì)了,會(huì)議內(nèi)容是......");
}
});
MyRunnable myRunnable = new MyRunnable(cyclicBarrier);
new Thread(myRunnable,"員工一").start();
new Thread(myRunnable,"員工二").start();
new Thread(myRunnable,"員工三").start();
new Thread(myRunnable,"員工四").start();
new Thread(myRunnable,"員工五").start();
new Thread(myRunnable,"員工六").start();
new Thread(myRunnable,"員工七").start();
new Thread(myRunnable,"員工八").start();
new Thread(myRunnable,"員工九").start();
new Thread(myRunnable,"員工十").start();
}
}
public class MyRunnable implements Runnable {
CyclicBarrier cyclicBarrier;
public MyRunnable(CyclicBarrier cyclicBarrier) {
this.cyclicBarrier = cyclicBarrier;
}
@Override
public void run() {
if (cyclicBarrier != null) {
System.out.println(Thread.currentThread().getName()+":到達(dá)了會(huì)議室");
try {
// 阻塞線程繼續(xù)執(zhí)行,直到達(dá)到放開阻塞的條件
cyclicBarrier.await();
} catch (Exception e) {
e.printStackTrace();
}
// 阻塞放開之后,線程繼續(xù)往下執(zhí)行
System.out.println(Thread.currentThread().getName()+":離開了會(huì)議室");
}
}
}
Semaphore
字面意思為信號(hào)量逮京,Semaphore 可以控制 同時(shí)訪問(wèn)的線程個(gè)數(shù);
案例: 教室同時(shí)只允許至多4名學(xué)生進(jìn)來(lái)上課, 出去一名, 才能再進(jìn)來(lái)一名; 示例代碼如下:
public class CLassRoom {
// 最大同時(shí)訪問(wèn)的線程個(gè)數(shù)為4
Semaphore sp = new Semaphore(4);
public void into(){
// 獲得許可
try {
sp.acquire();
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
System.out.println(Thread.currentThread().getName()+"正在聽課3秒鐘");
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+"聽課完成");
// 釋放許可
sp.release();
}
}
public class MyRunnable implements Runnable {
ClassRoom cr;
public MyRunnable(ClassRoom cr) {
this.cr = cr;
}
@Override
public void run() {
cr.into();// 進(jìn)入教室
}
}
public class ThreadTest {
public static void main(String[] args) {
CLassRoom cLassRoom = new CLassRoom();
MyRunnable myRunnable = new MyRunnable(cLassRoom);
new Thread(myRunnable,"學(xué)生一").start();
new Thread(myRunnable,"學(xué)生二").start();
new Thread(myRunnable,"學(xué)生三").start();
new Thread(myRunnable,"學(xué)生四").start();
new Thread(myRunnable,"學(xué)生五").start();
new Thread(myRunnable,"學(xué)生六").start();
new Thread(myRunnable,"學(xué)生七").start();
new Thread(myRunnable,"學(xué)生八").start();
new Thread(myRunnable,"學(xué)生九").start();
new Thread(myRunnable,"學(xué)生十").start();
}
}
Exchanger
是一個(gè)用于線程間協(xié)作的工具類; Exchanger用于進(jìn)行 線程間的數(shù)據(jù)交換;
示例代碼:
public class ThreadA extends Thread {
Exchanger<String> exchanger;
public ThreadA(Exchanger<String> exchanger) {
this.exchanger = exchanger;
}
@Override
public void run() {
try {
String s = exchanger.exchange("ThreadA");
System.out.println("線程A接收到了線程B交換的數(shù)據(jù): "+ s);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public class ThreadB extends Thread {
Exchanger<String> exchanger;
public ThreadB(Exchanger<String> exchanger) {
this.exchanger = exchanger;
}
@Override
public void run() {
try {
String s = exchanger.exchange("ThreadB");
System.out.println("線程B接收到了線程A交換的數(shù)據(jù): "+ s);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public class ThreadTest {
public static void main(String[] args) {
Exchanger<String> stringExchanger = new Exchanger<>();
new ThreadA(stringExchanger).start();
new ThreadB(stringExchanger).start();
}
}
打印結(jié)果:
線程A接收到了線程B交換的數(shù)據(jù): ThreadB
線程B接收到了線程A交換的數(shù)據(jù): ThreadA
5 線程的狀態(tài)
- 新建(new)
- 運(yùn)行(runnable)
- 終止(teminated)
- 鎖阻塞(blocked)
- 無(wú)線等待(waiting)
- 限時(shí)等待(time waiting)
線程狀態(tài)之間的關(guān)系:
運(yùn)行與鎖阻塞: 線程未搶到鎖對(duì)象就會(huì)阻塞, 搶到鎖對(duì)象就會(huì)進(jìn)入運(yùn)行狀態(tài);
運(yùn)行與無(wú)線等待: 運(yùn)行中的線程調(diào)用鎖對(duì)象的 wait() 函數(shù), 會(huì)進(jìn)入無(wú)限等待狀態(tài)等待被喚醒; 如果無(wú)限等待中的線程被喚醒但 沒有搶到鎖, 那么會(huì)進(jìn)入阻塞狀態(tài); 如果無(wú)限等待中的線程被喚醒并且 搶到了鎖, 那么會(huì)進(jìn)入運(yùn)行狀態(tài);
運(yùn)行與限時(shí)等待第一種情況: 運(yùn)行中的線程調(diào)用鎖對(duì)象的 wait(數(shù)字) 函數(shù), 會(huì)進(jìn)入無(wú)限等待狀態(tài)等待被喚醒;
如果時(shí)間到了, 但沒有搶到鎖, 會(huì)進(jìn)入阻塞狀態(tài);
如果時(shí)間到了, 并且搶到了鎖, 會(huì)進(jìn)入運(yùn)行狀態(tài);
如果時(shí)間沒到, 卻被喚醒, 搶到了鎖就會(huì)進(jìn)行運(yùn)行狀態(tài), 沒有搶到鎖就會(huì)進(jìn)入阻塞狀態(tài);
運(yùn)行與限時(shí)等待第二種情況: 運(yùn)行中的線程調(diào)用 sleep() 方法, 由于調(diào)用 sleep() 方法并不像調(diào)用 wait() 方法那樣會(huì)釋放鎖, 所以線程應(yīng)該是屬于限時(shí)等待狀態(tài), 但是時(shí)間到了就會(huì)直接進(jìn)行運(yùn)行狀態(tài);
代碼示例
public class ThreadTest {
public static void main(String[] args) {
Object object = new Object();
new Thread(new Runnable() {
@Override
public void run() {
synchronized (object){
try {
System.out.println("線程一進(jìn)入無(wú)限等待狀態(tài)");
object.wait();
System.out.println("線程一被線程二喚醒進(jìn)行運(yùn)行狀態(tài)");
Thread.sleep(2000);
System.out.println("線程一執(zhí)行完成");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
},"線程一").start();
new Thread(new Runnable() {
@Override
public void run() {
synchronized (object){
try {
System.out.println("線程二準(zhǔn)備喚醒線程一");
Thread.sleep(2000);
object.notify();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
},"線程二").start();
}
}
5.1 生產(chǎn)者消費(fèi)者案例一
public class ThreadA extends Thread {
Product product;
String name;
public ThreadA(Product product, String name) {
this.product = product;
this.name = name;
}
@Override
public void run() {
while (true) {
synchronized (product) {
if (product.flag) {
try {
System.out.println(name + "進(jìn)入無(wú)限等待");
try {
sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
product.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
if (!product.flag) {
product.flag = true;
System.out.println(name + "正在生產(chǎn)商品");
try {
sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
product.notify();
}
}
}
}
}
public class ThreadB extends Thread {
Product product;
String name;
public ThreadB(Product product, String name) {
this.product = product;
this.name = name;
}
@Override
public void run() {
while (true) {
synchronized (product) {
if (!product.flag) {
try {
System.out.println("------" + name + "進(jìn)入無(wú)限等待");
try {
sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
product.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
if (product.flag) {
product.flag = false;
System.out.println("------" + name + "正在消費(fèi)商品");
try {
sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
product.notify();
}
}
}
}
}
public class Product {
public boolean flag = false;
}
public class ThreadTest2 {
public static void main(String[] args) {
Product product = new Product();
new ThreadA(product, "生產(chǎn)者線程1").start();
new ThreadB(product, "消費(fèi)者線程1").start();
}
}
5.1 生產(chǎn)者消費(fèi)者案例二
public class Product {
public int num = 0;
public boolean flag2 = false;
}
public class ThreadTest2 {
public static void main(String[] args) {
Product product = new Product();
new ThreadA(product, "生產(chǎn)者線程1").start();
new ThreadA(product, "生產(chǎn)者線程2").start();
new ThreadA(product, "生產(chǎn)者線程3").start();
new ThreadA(product, "生產(chǎn)者線程4").start();
new ThreadA(product, "生產(chǎn)者線程5").start();
new ThreadA(product, "生產(chǎn)者線程6").start();
new ThreadA(product, "生產(chǎn)者線程7").start();
new ThreadA(product, "生產(chǎn)者線程8").start();
new ThreadA(product, "生產(chǎn)者線程9").start();
new ThreadB(product, "消費(fèi)者線程1").start();
new ThreadB(product, "消費(fèi)者線程2").start();
new ThreadB(product, "消費(fèi)者線程3").start();
new ThreadB(product, "消費(fèi)者線程4").start();
new ThreadB(product, "消費(fèi)者線程5").start();
new ThreadB(product, "消費(fèi)者線程6").start();
new ThreadB(product, "消費(fèi)者線程7").start();
new ThreadB(product, "消費(fèi)者線程8").start();
new ThreadB(product, "消費(fèi)者線程9").start();
}
}
public class ThreadA extends Thread {
Product product;
String name;
public ThreadA(Product product, String name) {
this.product = product;
this.name = name;
}
@Override
public void run() {
while (true) {
synchronized (product) {
if (!product.flag2) {
if (product.num < 10) {
product.num++;
System.out.println(name + "正在生產(chǎn)第" + product.num + "商品");
try {
sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
if (product.num == 10) {
product.flag2 = true;
product.notifyAll();
System.out.println(name + "進(jìn)入無(wú)限等待");
try {
product.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
}
}
}
public class ThreadB extends Thread {
Product product;
String name;
public ThreadB(Product product, String name) {
this.product = product;
this.name = name;
}
@Override
public void run() {
while (true) {
synchronized (product) {
if (product.flag2) {
if (product.num <= 10) {
System.out.println("------" + name + "正在消費(fèi)第" + product.num + "商品");
product.num--;
try {
sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
if (product.num == 0) {
product.flag2 = false;
product.notifyAll();
System.out.println("------" + name + "進(jìn)入無(wú)限等待");
try {
product.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
}
}
}