一暴构、Lock 和 Condition
Java 并發(fā)包中的 Lock 和 Condition 主要解決的是線程的互斥和同步問題吩坝,這兩者的配合使用巷蚪,相當于 synchronized、wait()类缤、notify() 的使用不撑。
1. Lock 的優(yōu)勢
比起傳統(tǒng)的 synchronized 關(guān)鍵字文兢,Lock 最大的不同(或者說優(yōu)勢)在于:
- 阻塞的線程能夠響應中斷,這樣能夠有機會釋放自己持有的鎖焕檬,避免死鎖
- 支持超時姆坚,如果線程在一定時間內(nèi)未獲取到鎖,不是進入阻塞狀態(tài)实愚,而是拋出異常
- 非阻塞的獲取鎖兼呵,如果未獲取到鎖,不進入阻塞狀態(tài)腊敲,而是直接返回
三種情況分別對應 Lock 的三個方法:void lockInterruptibly()
击喂,boolean tryLock(long time, TimeUnit unit)
,boolean tryLock()
碰辅。
Lock 最常用的一個實現(xiàn)類是 ReentrantLock懂昂,代表可重入鎖,意思是可以反復獲取同一把鎖没宾。
除此之外凌彬,Lock 的構(gòu)造方法可以傳入一個 boolean 值,表示是否是公平鎖循衰。
2. Lock 和 Condition 的使用
前面實現(xiàn)的簡單的阻塞隊列就是使用 Lock 和 Condition 铲敛,現(xiàn)在其含義已經(jīng)非常明確了:
public class BlockingQueue<T> {
private int capacity;
private int size;
//定義鎖和條件
private final Lock lock = new ReentrantLock();
private final Condition notFull = lock.newCondition();
private final Condition notEmpty = lock.newCondition();
/**
* 入隊列
*/
public void enqueue(T data){
lock.lock();
try {
//如果隊列滿了,需要等待会钝,直到隊列不滿
while (size >= capacity){
notFull.await();
}
//入隊代碼伐蒋,省略
//入隊之后,通知隊列已經(jīng)不為空了
notEmpty.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
//在finally塊中釋放鎖迁酸,避免死鎖
lock.unlock();
}
}
/**
* 出隊列
*/
public T dequeue(){
lock.lock();
try {
//如果隊列為空咽弦,需要等待,直到隊列不為空
while (size <= 0){
notEmpty.await();
}
//出隊代碼胁出,省略
//出隊列之后型型,通知隊列已經(jīng)不滿了
notFull.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
//實際應該返回出隊數(shù)據(jù)
return null;
}
}
可以看到,Lock 需要手動的加鎖和解鎖全蝶,并且解鎖操作是放在 finally 塊中的闹蒜,這是一種編程范式寺枉,盡量遵守。
二绷落、ReadWriteLock
ReadWriteLock 表示讀寫鎖姥闪,適用于讀多寫少的情況,讀寫鎖一般有幾個特征:
- 讀鎖與讀鎖之間不互斥砌烁,即允許多個線程同時讀變量筐喳。
- 寫鎖與讀鎖之間互斥,一個線程在寫時函喉,不允許讀操作避归。
- 寫鎖與寫鎖之間互斥,只允許 一個線程寫操作管呵。
讀寫鎖減小了鎖的粒度梳毙,在讀多寫少的場景下,對性能的提升較為明顯捐下。ReadWriteLock 的簡單使用示例如下:
public class ReadWriteLockTest {
private final ReadWriteLock lock = new ReentrantReadWriteLock();
private final Lock readLock =lock.readLock();
private final Lock writeLock =lock.writeLock();
private int value;
//加寫鎖
private void addValue(){
writeLock.lock();
try {
value += 1;
}
finally {
writeLock.unlock();
}
}
//加讀鎖
private int getValue(){
readLock.lock();
try {
return value;
}
finally {
readLock.unlock();
}
}
}
讀寫鎖的升級與降級
Java 中不允許鎖的升級账锹,即加寫鎖時必須釋放讀鎖。
但是允許鎖的降級坷襟,即加讀鎖時奸柬,可以不釋放寫鎖,最后讀鎖和寫鎖一起釋放婴程。
三鸟缕、StampedLock
1. StampedLock 的使用及特點
StampedLock 是 Java 1.8 版本中提供的鎖,主要支持三種鎖模式:寫鎖排抬、悲觀讀鎖、樂觀讀授段。
其中寫鎖和悲觀讀鎖跟 ReadWriteLock 中的寫鎖和讀鎖的概念類似蹲蒲。StampedLock 在使用的時候不一樣,加鎖的時候會返回一個參數(shù)侵贵,解鎖的時候需要傳入這個參數(shù)届搁,示例如下:
public class StampedLockTest {
private final StampedLock lock = new StampedLock();
private int value;
private void addValue(){
long stamp = lock.writeLock();
try {
value += 1;
}
finally {
lock.unlockWrite(stamp);
}
}
}
StampedLock 最主要的特點是支持“樂觀讀”,即當進行讀操作的時候窍育,并不是所有的寫操作都被阻塞卡睦,允許一個線程獲取寫鎖。樂觀讀的使用示例如下:
public class StampedLockTest {
private final StampedLock lock = new StampedLock();
private int value;
private void getValue(){
//樂觀讀漱抓,讀入變量
long stamp = lock.tryOptimisticRead();
int a = value;
//如果驗證失敗
if (!lock.validate(stamp)){
//升級為悲觀讀鎖表锻,繼續(xù)讀入變量
stamp = lock.readLock();
try {
a = value;
}
finally {
lock.unlockRead(stamp);
}
}
}
}
需要注意的是,這里使用 validate() 方法進行驗證乞娄,如果樂觀讀失敗瞬逊,則升級為悲觀讀鎖显歧,繼續(xù)獲取變量。
2. StampedLock 的注意事項
StampedLock 不支持重入确镊,即不可反復獲取同一把鎖士骤。
在使用 StampedLock 的時候,不要調(diào)用中斷操作蕾域。如果需要支持中斷拷肌,可以調(diào)用 readLockInterruptibly 和 writeLockInterruptibly 方法。
四旨巷、Semaphore
Semaphore 表示信號量巨缘,初始化對象的時候,需要傳一個參數(shù)契沫,表示信號量的計數(shù)器值带猴。acquire() 方法將計數(shù)器加 1,release() 方法減 1懈万,這兩個方法都能夠保證原子性拴清。
信號量的簡單示例:
public class SemaphoreTest {
private final Semaphore semaphore = new Semaphore(1);
private int value;
public void addValue() {
try {
semaphore.acquire();
value += 1;
}
catch (InterruptedException e) {
e.printStackTrace();
}
finally {
semaphore.release();
}
}
程序中使用信號量實現(xiàn)了一個線程安全的方法,初始值設(shè)為了 1会通,當多個方法訪問 addValue 方法的時候口予,由于 acquire 方法保證原子性,所以只能有一個線程將計數(shù)器減 1 并進入臨界區(qū)涕侈,另一個線程等待沪停。
一個線程執(zhí)行完后,調(diào)用 release 方法裳涛,計數(shù)器加 1木张,另一個等待的線程被喚醒。
Semaphore 與 Lock 的一個不同點便是信號量允許多個線程同時進入臨界區(qū)端三,例如將初始值設(shè)置的更大一些舷礼。例如下面這個例子:
public class SemaphoreTest {
//初始值 2,表示 2 個線程可同時進入臨界區(qū)
private final Semaphore semaphore = new Semaphore(2);
public void test() {
try {
semaphore.acquire();
System.out.println("線程" + Thread.currentThread().getName() + " 進入臨界區(qū) : " + System.currentTimeMillis());
Thread.sleep(1000);
}
catch (InterruptedException e) {
e.printStackTrace();
}
finally {
semaphore.release();
}
}
}
五郊闯、CountDownLatch
CountDownLatch 是一個線程同步的工具妻献,主要實現(xiàn)一個線程等待多個線程的功能。在原始的 Thread 中团赁,可以調(diào)用 join() 方法來等待線程執(zhí)行完畢育拨,而 CountDownLatch 則可以用在線程池中的線程等待。
下面是 CountDownLatch 的使用示例:
public class CountDownLatchTest {
//實際生產(chǎn)中不推薦使用這種創(chuàng)建線程的方式
private final ExecutorService threadPool = Executors.newFixedThreadPool(2);
public void test() throws InterruptedException {
CountDownLatch latch = new CountDownLatch(2);
threadPool.execute(() -> {
System.out.println("線程1執(zhí)行完畢");
latch.countDown();
});
threadPool.execute(() -> {
System.out.println("線程2執(zhí)行完畢");
latch.countDown();
});
latch.await();
System.out.println("兩個線程都執(zhí)行完畢");
threadPool.shutdown();
}
}
CountDownLatch 的初始值為 2欢摄,線程執(zhí)行完畢則調(diào)用 countDown 方法熬丧,計數(shù)器減 1。減到 0 的時候怀挠,會喚醒主線程繼續(xù)執(zhí)行锹引。
六矗钟、CyclicBarrier
CyclicBarrier 也是一個線程同步工具類,主要實現(xiàn)多個線程之間的互相等待嫌变。
CyclicBarrier 有兩個構(gòu)造函數(shù)吨艇,可以傳一個計數(shù)器的初始值,還可以加上一個 Runnable腾啥,表示計數(shù)器執(zhí)行減到 0 的時候东涡,需要執(zhí)行的回調(diào)方法。
public class CyclicBarrierTest {
private final ExecutorService threadPool = Executors.newFixedThreadPool(2);
private final CyclicBarrier barrier = new CyclicBarrier(2, this::note);
public void print(){
threadPool.execute(() -> {
System.out.println("線程1執(zhí)行完畢");
try {
barrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
});
threadPool.execute(() -> {
System.out.println("線程2執(zhí)行完畢");
try {
barrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
});
threadPool.shutdown();
}
public void note(){
System.out.println("兩個線程執(zhí)行完畢");
}
}
示例中設(shè)置 CyclicBarrier 的初始值為 2倘待,線程執(zhí)行完畢調(diào)用 await 方法疮跑,計數(shù)器減 1。print() 方法中的兩個線程執(zhí)行完后凸舵,計數(shù)器減到 0祖娘,就會調(diào)用 note 方法。
七啊奄、ThreadPoolExecutor
1. 線程池的工作原理
由于線程是一種重量級對象渐苏,頻繁的創(chuàng)建和銷毀比較消耗系統(tǒng)資源,因此線程池的優(yōu)勢就顯現(xiàn)出來了菇夸。線程池可有降低資源消耗琼富,因為不用頻繁創(chuàng)建和銷毀線程;提高響應速度庄新,需要執(zhí)行任務時鞠眉,可直接使用線程池中的線程資源;還能夠有效的管理择诈、監(jiān)控線程池中的線程械蹋。
Java 中的線程池的實現(xiàn)是一種很典型的生產(chǎn)者-消費者模式,使用線程的一方是生產(chǎn)者羞芍,主要提供需要執(zhí)行的任務哗戈,線程池是消費者,消費生產(chǎn)者提供的任務涩金。
下面這段代碼能夠幫助理解線程池的實現(xiàn)原理(僅用于幫助理解,實際執(zhí)行結(jié)果有出入):
public class ThreadPool {
//保存任務的阻塞隊列
private BlockingQueue<Runnable> workQueue;
//保存工作線程的列表
private List<WorkThread> threadList = new ArrayList<>();
//構(gòu)造方法
public ThreadPool(int poolSize, BlockingQueue<Runnable> workQueue) {
this.workQueue = workQueue;
//根據(jù)poolSize的數(shù)量創(chuàng)建工作線程暇仲,并執(zhí)行線程
for (int i = 0; i < poolSize; i++) {
WorkThread thread = new WorkThread();
thread.start();
threadList.add(thread);
}
}
//執(zhí)行任務的方法步做,主要是將任務添加到隊列中
public void execute(Runnable task) {
try {
workQueue.put(task);
}
catch (InterruptedException e) {
e.printStackTrace();
}
}
//工作線程
class WorkThread extends Thread{
@Override
public void run() {
//循環(huán)取出任務執(zhí)行
while (!workQueue.isEmpty()) {
try {
Runnable task = workQueue.take();
task.run();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
上面的代碼注釋很詳細了,主要是使用了一個阻塞隊列奈附,用來存儲生產(chǎn)者的任務全度。然后在構(gòu)造器中創(chuàng)建線程,并循環(huán)從隊列中取出任務執(zhí)行斥滤。
2. Java 中的線程池
Java 中提供了 Executors 這個類來快速創(chuàng)建線程池将鸵,簡單使用示例如下:
Executors.newSingleThreadExecutor();//創(chuàng)建一個線程的線程池
Executors.newFixedThreadPool(5);//創(chuàng)建固定數(shù)量線程
Executors.newCachedThreadPool();//創(chuàng)建可調(diào)整數(shù)量的線程
Executors.newScheduledThreadPool(5);//創(chuàng)建定時任務線程池
但是在《阿里巴巴Java開發(fā)手冊》中勉盅,明確禁止使用 Executors 創(chuàng)建線程池(甚至也不建議使用 Thread 顯式創(chuàng)建線程),主要原因是 Executors 的默認方法都是使用的無界隊列顶掉,在高負載的情況下草娜,很容易導致 OOM(Out Of Memory)。
所以在 Java 中創(chuàng)建線程池的正確姿勢是使用 ThreadPoolExecutor 痒筒,其構(gòu)造函數(shù)有七個:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,//可選
RejectedExecutionHandler handler//可選
) { ...
-
corePoolSize
:線程池中最少的線程數(shù) -
maximumPoolSize
:線程池中創(chuàng)建的最大的線程數(shù) -
keepAliveTime
:表示線程池中線程的活躍時間宰闰,如果線程在這個活躍時間內(nèi)沒有執(zhí)行任務,并且線程數(shù)量超過了 corePoolSize簿透,那么線程池就會回收多余的線程移袍。 -
TimeUnit
:上一個參數(shù)的時間單位 -
workQueue
:保存任務的隊列,為了避免 OOM老充,建議使用有界隊列 -
threadFactory
:可選參數(shù)葡盗,不傳的話就是默認值。也可以自己傳一個實現(xiàn)了 ThreadFactory 接口的類啡浊,表示自定義線程觅够,例如給線程指定名字,線程組等虫啥。 -
handler
:可選參數(shù)蔚约。定義任務的拒絕策略,表示無空閑線程時涂籽,并且隊列中的任務滿了的苹祟,怎么拒絕新的任務。目前的拒絕策略有四種:- AbortPolicy:默認的拒絕策略评雌,拋出 RejectedExecutionException 異常
- CallerRunsPolicy:讓提交任務的線程自己去執(zhí)行這個任務
- DiscardOldestPolicy:丟棄最老的任務树枫,及最先加入隊列中的任務,并添加新的任務
- DiscardPolicy:直接丟棄任務景东,并且不會拋出任何異常
調(diào)用 ThreadPoolExecutor
線程池創(chuàng)建好了之后砂轻,就需要執(zhí)行任務,ThreadPoolExecutor 提供了兩個方法斤吐,一是 execute搔涝,二是 submit。execute 沒有返回值和措,也就是說無法獲取執(zhí)行結(jié)果庄呈。使用示例如下:
public static void main(String[] args) {
BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>(5);
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(5, 10, 60L, TimeUnit.SECONDS, queue);
threadPool.execute(() -> {
System.out.println("In this world");
});
threadPool.shutdown();
}
而 submit 方法有一個 Future 接口的返回值,F(xiàn)uture 接口有五個方法:
- cancle:取消任務
- isCancled:任務是否已取消
- isDone:任務是否已執(zhí)行完
- get:獲取任務執(zhí)行結(jié)果
- get(long timeout, TimeUnit unit):支持超時獲取任務執(zhí)行結(jié)果
下面代碼展示了取消任務的方法:
public static void main(String[] args) {
BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>(5);
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(5, 10, 60L, TimeUnit.SECONDS, queue);
Future<?> future = threadPool.submit(() -> {
System.out.println("I am roseduan");
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
future.cancel(false);
threadPool.shutdown();
}
程序的本意是打印語句然后休眠 5 秒派阱,但由于調(diào)用了 cancle 方法 诬留,因此程序直接結(jié)束,不會有任何輸出。
八文兑、FutureTask
FutureTask 也是一個支持獲取任務執(zhí)行結(jié)果的工具類盒刚,F(xiàn)utureTask 實現(xiàn)了 Runnable 和 Future 接口。
所以可以將 FutureTask 作為任務提交給 ThreadPoolExecutor 或者 Thread 執(zhí)行绿贞,并且可以獲取執(zhí)行結(jié)果因块。簡單的使用如下:
public static void main(String[] args) throws ExecutionException, InterruptedException {
//創(chuàng)建任務
FutureTask<String> task = new FutureTask<>(() -> "Java and " + "Python");
BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>(5);
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(5, 10, 60L, TimeUnit.SECONDS, queue);
threadPool.execute(task);
//獲取執(zhí)行結(jié)果
System.out.println(task.get());
threadPool.shutdown();
}
傳給 Thread 作為參數(shù)的使用示例如下:
public static void main(String[] args) throws ExecutionException, InterruptedException {
FutureTask<Integer> task = new FutureTask<>(() -> 1 + 2);
Thread thread = new Thread(task);
thread.start();
System.out.println(task.get());//輸出3
}
九、CompletableFuture
CompletableFuture 是一個異步編程的工具類樟蠕,異步化能夠最大化并行程序的執(zhí)行贮聂,是多線程性能優(yōu)化的基礎(chǔ)。
1. 創(chuàng)建 CompletableFuture 對象
Completable 有四個靜態(tài)方法寨辩,可以用來創(chuàng)建對象:
runAsync(Runnable runnable);//無返回值
runAsync(Runnable runnable, Executor executor);//無返回值吓懈,可指定線程池
supplyAsync(Supplier<U> supplier);//有返回值
supplyAsync(Supplier<U> supplier, Executor executor);//有返回值,可指定線程池
可以看到靡狞,四個方法分為了是否有返回值耻警,和是否自定義線程池。如果不自定義線程池甸怕,那么 CompletableFuture 會使用公共的線程池甘穿,默認創(chuàng)建 CPU 核數(shù)的數(shù)量的線程池,當有多個任務的時候梢杭,還是建議根據(jù)每個任務自定義線程池温兼。
一個簡單的使用示例如下,其中 task3 會等待兩個任務都執(zhí)行完畢:
public static void main(String[] args) {
CompletableFuture<Void> task1 = CompletableFuture.runAsync(() -> {
System.out.println("任務1執(zhí)行完畢");
});
CompletableFuture<Void> task2 = CompletableFuture.runAsync(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("任務2執(zhí)行完畢");
});
CompletableFuture<String> task3 = task1.thenCombine(task2, (__, res) -> "兩個任務執(zhí)行完畢");
System.out.println(task3.join());
}
CompletableFuture 實現(xiàn)了 Future 接口武契,因此可以查看任務執(zhí)行的情況募判,并且可以獲取返回值。
2. CompletionStage 接口中的方法
CompletableFuture 還實現(xiàn)了 CompletionStage 接口咒唆。這個接口描述了任務之間的時序關(guān)系届垫,分別有串行、并行全释、聚合三種關(guān)系装处。需要注意的是,并行本就是其所具有的特性浸船,所以不再探討了妄迁,并且聚合關(guān)系又分為了 AND 聚合關(guān)系和 OR 聚合關(guān)系。下面依次介紹串行李命、AND 聚合登淘、OR 聚合這三種關(guān)系。
首先是串行關(guān)系项戴,串行很簡單形帮,一個任務執(zhí)行完后再執(zhí)行另一個任務槽惫,例如下圖:
描述串行關(guān)系的幾個方法是:thenApply周叮、thenAccept辩撑、thenRun、thenCompose仿耽。
thenApply 既支持接收參數(shù)合冀,又能夠支持返回值。
thenAccept 支持接收參數(shù)项贺,但是不支持返回值君躺。
thenRun 既不能接收參數(shù),也不能有返回值开缎。
CompletionStage 中的大部分方法都有帶有 Async 后綴的方法棕叫,表示可能會使用其他的線程來執(zhí)行主體中的內(nèi)容,后面介紹的方法都類似這樣奕删,不再贅述俺泣。
簡單的使用示例如下:
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("任務1執(zhí)行完畢");
return "Task1";
}).thenApply((s) -> "接收到的參數(shù) : " + s);;
System.out.println(future.get());
}
其次是 AND 匯聚關(guān)系,典型的場景便是一個線程等待兩個線程都執(zhí)行完后再執(zhí)行完残,例如下圖:
描述 AND 聚合關(guān)系的有三個方法:thenCombine伏钠、thenAcceptBoth、runAfterBoth谨设,其是否接收參數(shù)和支持返回值熟掂,和上面的三個方法對應。一個簡單的使用示例如下:
public static void main(String[] args) {
CompletableFuture<String> task1 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("任務1執(zhí)行完畢");
return "task1";
});
CompletableFuture<String> task2 = CompletableFuture.supplyAsync(() -> {
System.out.println("任務2執(zhí)行完畢");
return "task2";
});
CompletableFuture<String> task3 = task1.thenCombine(task2, (r,s) -> r + " " + s);
System.out.println(task3.join());
}
任務 1 休眠了 2 秒扎拣,任務 3 會等待前面兩個任務執(zhí)行完成之后再執(zhí)行赴肚。
最后是 OR 聚合關(guān)系,表示線程等待其中一個線程滿足條件之后鹏秋,就可以繼續(xù)執(zhí)行了尊蚁,不用等待全部的線程。
描述 OR 聚合關(guān)系的是 applyToEither侣夷、acceptEither横朋、runAfterEither。使用示例和上面的類似百拓,只需要將方法改一下就是了琴锭,這里不再贅述了。
3. 處理異常
在異步編程中衙传,CompletionStage 接口還提供了幾個可以處理異常的方法决帖,和 try() catch() finally() 類似。
這幾個方法分別是 :
- exceptionally:相當于 catch
- whenComplete:相當于 finally
- handle:相當于 finally 蓖捶,支持返回值
使用示例如下:
public static void main(String[] args) {
CompletableFuture<Integer> task = CompletableFuture.supplyAsync(() -> {
String str = null;
return str.length();
//相當于catch
}).exceptionally((e) -> {
System.out.println("發(fā)生異常");
return 0;
});
//相當于 finally
task.whenComplete((s, r) -> {
System.out.println("執(zhí)行結(jié)束");
});
System.out.println(task.join());
}
十地回、CompletionService
CompletionService 是一個批量執(zhí)行異步任務的工具類,先來看一個例子:
public static void main(String[] args) throws ExecutionException, InterruptedException {
StringBuffer sb = new StringBuffer();
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
5, 5,
10, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(5));
Future<String> task1 = threadPool.submit(() -> {
Thread.sleep(2000);
return "Task1";
});
Future<String> task2 = threadPool.submit(() -> "Task2");
Future<String> task3 = threadPool.submit(() -> "Task3");
sb.append(task1.get());
sb.append(task2.get());
sb.append(task3.get());
}
程序的意思是,依次執(zhí)行三個任務刻像,并將其結(jié)果存儲到 StringBuffer 中畅买,由于 task1 休眠了 2 秒,所以 sb 會在這里阻塞细睡。
由于這三個任務之間沒有關(guān)聯(lián)谷羞,所以等待的消耗完全是沒必要的,解決的辦法便是利用一個阻塞隊列溜徙,先執(zhí)行完的任務將結(jié)果保存在隊列中湃缎,sb 從隊列中取出就行了。
CompletionService 實際上就是將線程池和阻塞隊列的功能整合了起來蠢壹,解決了類似上面的問題嗓违。CompletionService 的實現(xiàn)類是 ExecutorCompletionService,這個類有兩個構(gòu)造方法:
public ExecutorCompletionService(Executor executor) {}
public ExecutorCompletionService(Executor executor,
BlockingQueue<Future<V>> completionQueue) {}
如果不傳一個阻塞隊列图贸,則會使用默認的無界隊列靠瞎。
CompletionService 主要有這幾個方法:
submit() 提交任務、take() 從阻塞隊列中獲取執(zhí)行結(jié)果(如果隊列為空求妹,線程阻塞)乏盐、poll() 也是從隊列中獲取執(zhí)行結(jié)果(如果隊列為空,則返回 null)制恍,另外 poll 還支持超時獲取父能。
使用 CompletionService 改造后的程序示例如下:
public static void main(String[] args) throws ExecutionException, InterruptedException {
StringBuffer sb = new StringBuffer();
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
5, 5,
10, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(5));
CompletionService<String> service = new ExecutorCompletionService<>(threadPool);
service.submit(() -> {
Thread.sleep(2000);
return "Task1";
});
service.submit(() -> "Task2");
service.submit(() -> "Task3");
System.out.println(sb.append(service.take().get()).toString());
System.out.println(sb.append(service.take().get()).toString());
System.out.println(sb.append(service.take().get()).toString());
}
十一、Fork/Join
1. Fork/Join 使用
Fork/Join 是一個處理分治任務的計算框架净神,所謂分治何吝,即分而治之,將一個任務分解成子任務鹃唯,求解子任務爱榕,然后將子任務的結(jié)果合并,就得到了最后的結(jié)果坡慌。分治思想的應用十分的廣泛黔酥,例如常見的快速排序、歸并排序洪橘,還有流行的大數(shù)據(jù)計算框架 MapReduce跪者,都應用了分治思想。
Java 中熄求,F(xiàn)ork 對應的是 任務分解渣玲,Join 則表示 子任務的結(jié)果合并。
Fork/Join 主要包含兩個主要的實現(xiàn)類:
一是線程池 ForkJoinPool弟晚,默認會創(chuàng)建 CPU核數(shù)數(shù)量的線程
-
二是 ForkJoinTask忘衍,這是一個抽象類逾苫,主要的方法有 fork() 和 join(),前者表示執(zhí)行子任務枚钓,后者表示阻塞等待子任務的執(zhí)行結(jié)果隶垮。ForkJoinTask 還有兩個子類:
- RecursiveTask
- RecursiveAction
這兩個類也是抽象的,我們需要自定義并繼承這個類秘噪,并覆蓋其 compute 方法。其中 RecursiveTask 有返回值勉耀,而 RecursiveAction 沒有返回值指煎。
下面是一個使用 ForkJoin 的示例,實現(xiàn)了 n 的階乘便斥,注釋寫得比較詳細至壤。
public class ForkJoinTest {
public static void main(String[] args) {
//創(chuàng)建線程池
ForkJoinPool forkJoinPool = new ForkJoinPool(4);
//創(chuàng)建任務
Factorial task = new Factorial(6);
//invoke 方法執(zhí)行任務(還可以使用 execute、submit)枢纠,得到執(zhí)行的結(jié)果
Integer res = forkJoinPool.invoke(task);
System.out.println(res);
}
static class Factorial extends RecursiveTask<Integer> {
private final int n;
Factorial(int n) {
this.n = n;
}
@Override
protected Integer compute() {
if (n == 0){
return 1;
}
Factorial f = new Factorial(n - 1);
//執(zhí)行子任務
f.fork();
//等待子任務結(jié)果
return n * factorial.join();
}
}
}
2. ForkJoinPool 原理
和普通的線程池類似像街,F(xiàn)orkJoinPool 是一個特殊的線程池,并且也采用的是生產(chǎn)者 - 消費者模式晋渺。跟普通線程池共享一個隊列不同镰绎,F(xiàn)orkJoinPool 其中維護了多個雙端隊列,當一個線程對應的任務隊列為空的時候木西,線程并不會空閑畴栖,而是“竊取”其他隊列的任務執(zhí)行。
由于是雙端隊列八千,正常執(zhí)行任務和“竊取任務”可以從兩端進行出隊吗讶,這樣避免了數(shù)據(jù)競爭。
采用“任務竊取”這種模式恋捆,也是 ForkJoinPool 比普通線程池更加智能的體現(xiàn)照皆。