線程的基本操作
用new關(guān)鍵字創(chuàng)建一個線程對象搀罢,然后調(diào)用它的start()啟動線程即可嫉嘀。
Thread thread = new Thread();
thread.start();
線程有個run()方法,start()會創(chuàng)建一個新的線程并讓這個線程執(zhí)行run()方法魄揉。
//這種run()方法調(diào)用不能新建一個線程亏娜,而是在當(dāng)前線程中調(diào)用run()方法蕴坪,
//將run方法只是作為一個普通的方法調(diào)用。
Thread thread1 = new Thread();
thread1.run();
start方法是啟動一個線程夹抗,run方法只會在當(dāng)前線程中串行的執(zhí)行run方法中的代碼杰标。
Thread thread = new Thread(){
@Override
public void run() {
System.out.println("xxxxxxx");
}
};
thread.start();
通過繼承Thread類兵怯,然后重寫run方法,來自定義一個線程腔剂。
java中剛好提供了Runnable接口來自定義一個線程媒区。
@FunctionalInterface
public interface Runnable {
public abstract void run();
}
Thread類有一個非常重要的構(gòu)造方法:
public Thread(Runnable target)
Thread的run方法:
@Override
public void run() {
if (target != null) {
target.run();
}
}
實現(xiàn)Runnable接口是比較常見的做法,也是推薦的做法掸犬。
Thread thread2 =new Thread(new Runnable() {
@Override
public void run() {
}
});
thread2.start();
線程狀態(tài) Thread
6個狀態(tài)定義:java.lang.Thread.State
1.NEW : 尚未啟動的線程的線程狀態(tài)
2.RUNNABLE :可運行線程的線程狀態(tài)袜漩,等待cpu調(diào)度
3.BLOCKED:線程阻塞等待監(jiān)視器鎖定的線程狀態(tài),處在synchronized同步代碼塊或方法中被阻塞湾碎。
4.WAITING:等待線程的線程狀態(tài)宙攻,如 不帶超時的方式:Object.wait,Thread.join,LockSupport.park進入這個狀態(tài)
5.TIMED_WAITING:具有指定等待時間的等待線程的此案陳狀態(tài),如 帶超時方式;Thread.sleep,Object.wait,Thread.join,LockSupport.parkNanos,LockSupport.parkUntil進入這個狀態(tài)
6.TERMINATED:終止線程的線程狀態(tài)介褥。線程正常完成執(zhí)行或出現(xiàn)異常
線程終止
正確的線程中止 interrupter
如果目標(biāo)線程在調(diào)用Object class的wait()座掘,wait(long)或wait(long,int)方法递惋,join(),join(long,int),或sleep(long,int)方法是被阻塞,那么Interrupt會生效溢陪,該線程的中斷狀態(tài)將被清除萍虽,拋出InterrupttedException異常。
如果目標(biāo)線程是被I/O或者NIO中的Channel鎖阻塞形真,同樣杉编,I/O操作會被中斷或者返回特殊異常值,達到終止線程的目的没酣。
如果以上條件都不滿足王财,則會設(shè)置此線程的中斷狀態(tài)。
正確的線程中止 標(biāo)志位
在代碼邏輯中裕便,增加一個判斷绒净,用來控制。
Thread提供了3個與線程中斷有關(guān)的方法:
public void interrupt()//中斷線程
public boolean isInterrupted()//判斷線程是否中斷
public static boolean interrupted()//判斷線程是否被中斷偿衰,并清除當(dāng)前中斷狀態(tài)
public static void main(String[] args) throws InterruptedException {
Thread thread2 =new Thread(){
@Override
public void run() {
while (true){
if (this.interrupted()){
break;
}
}
}
};
thread2.start();
TimeUnit.SECONDS.sleep(1);
thread2.interrupt();//中斷
}
如果一個線程調(diào)用了sleep方法挂疆,一直處于休眠狀態(tài),通過變量控制下翎,不可以中斷線程缤言。此時只能使用線程提供的interrupt方法來中斷線程了。
Thread thread2 =new Thread(){
@Override
public void run() {
while (true){
try {
TimeUnit.SECONDS.sleep(100);
} catch (InterruptedException e) {
this.interrupt();//拋異常视事,發(fā)出終止線程
e.printStackTrace();
}
if (this.isInterrupted()){
break;
}
}
}
};
thread2.setName("thread2");
thread2.start();
TimeUnit.SECONDS.sleep(1);
thread2.interrupt();//中斷
}
sleep方法由于中斷而拋出異常之后胆萧,線程的中斷標(biāo)志會被清除(置為false),所以在異常中需要執(zhí)行this.interrupt()方法俐东,將中斷標(biāo)志位置為true.
線程通信
通信方式:
1)文件共享
2)網(wǎng)絡(luò)共享
3)共享變量
4)jdk提供的線程協(xié)調(diào)API
線程協(xié)作 JDK API
多線程協(xié)作的典型場景:生產(chǎn)者 - 消費者 (線程的阻塞跌穗,線程喚醒)
等待(wait)和通知(notify)
為了支持 多線程 之間的協(xié)作,JDK提供了兩個非常重要的方法:等待wait()方法和通知notify()方法虏辫。在Object類中定義的蚌吸。這意味著所有的對象都可以調(diào)用者兩個方法。java.lang.Object
public final void wait() throws InterruptedException
public final native void notify();//隨機喚醒
當(dāng)在一個對象實例上調(diào)用wait()方法后砌庄,當(dāng)前線程就會在這個對象上等待羹唠。
比如在線程A中,調(diào)用了obj.wait()方法娄昆,那么線程A就會停止繼續(xù)執(zhí)行佩微,轉(zhuǎn)為等待狀態(tài)。等待到什么時候結(jié)束呢萌焰?線程A會一直等到其他線程調(diào)用obj.notify()方法為止喊衫,這時,obj對象成為了多個線程之間的有效通信手段杆怕。
如果一個線程調(diào)用了object.wait()方法族购,那么它就會進出object對象的等待隊列壳贪。這個隊列中,可能會有多個線程寝杖,因為系統(tǒng)可能運行多個線程同時等待某一個對象违施。
當(dāng)object.notify()方法被調(diào)用時,它就會從這個隊列中 隨機 選擇一個線程瑟幕,并將其喚醒磕蒲。這個選擇是不公平.
Object獨享還有一個nofiyAll()方法,它和notify()方法的功能類似只盹,不同的是辣往,它會喚醒在這個等待隊列中所有等待的線程,而不是隨機選擇一個殖卑。
Object.wait()方法并不能隨便調(diào)用站削。它必須包含在對應(yīng)的synchronize語句匯總,無論是wait()方法或者notify()方法都需要首先獲取目標(biāo)獨享的一個監(jiān)視器孵稽。
Object.wait()方法和Thread.sleeep()方法都可以讓現(xiàn)場等待若干時間许起。除wait()方法可以被喚醒外,另外一個主要的區(qū)別就是wait()方法會釋放目標(biāo)對象的鎖菩鲜,而Thread.sleep()方法不會釋放鎖园细。
park/unpark機制
線程調(diào)用park則等待“許可”,unpark方法為指定線程
提供“許可permit”接校。
不要求 park和unpark方法的調(diào)用順序猛频。
調(diào)了幾次park,就得調(diào)幾次unpark
線程封閉
數(shù)據(jù)都被封閉在各自的線程之中蛛勉,就不需要同步鹿寻,這種通過將數(shù)據(jù)封閉在線程中而避免使用同步的技術(shù)即線程封閉。
ThreadLocal
ThreadLocal是java中的一種特殊的變量董习,每個線程都有一個ThreadLocal就是每個線程都擁有了自己的一個變量,競爭條件被徹底消除了爱只,在并發(fā)模式下是絕對安全的變量皿淋。
用法:
ThreadLocal<T> var = new ThreadLocal<T>();
會自動在每一個線程上創(chuàng)建一個T的副本,副本之間彼此獨立恬试,互不影響窝趣。可以用ThreadLocal存儲一些參數(shù)训柴,以便在線程中多個方法中使用哑舒,用來代替方法傳參的做法。
棧封閉
局部變量的固有屬性之一即是封閉在線程中幻馁,它們位于執(zhí)行線程的棧中洗鸵,其它線程無法訪問這個棧越锈。
線程池及原理
volatile修飾共享變量
java幫我們提供了這樣的方法,使用volatile修飾共享變量膘滨,被volatile修改的變量有以下特點:
1.線程中讀取的時候甘凭,每次讀取都會去主內(nèi)存中讀取共享變量最新的值,然后將其復(fù)制到工作內(nèi)存
2.線程中修改了工作內(nèi)存中變量的副本火邓,修改之后會立即刷新到主內(nèi)存
volatile解決了共享變量在多線程中可見性的問題丹弱,可見性是指一個線程對共享變量的修改,對于另一個線程來說是否是可以看到的铲咨。
volatile不能保證線程安全躲胳,只能保證被修飾變量的內(nèi)存可見性,如果對該變量執(zhí)行的是非原子性操作依舊線程不安全纤勒。
什么是線程池?
如果系統(tǒng)能夠提前為我們創(chuàng)建好線程坯苹,我們需要的時候直接拿來使用,用完之后不是直接將其關(guān)閉踊东,而是將其返回到線程中中北滥,給其他需要這使用,這樣直接節(jié)省了創(chuàng)建和銷毀的時間闸翅,提升了系統(tǒng)的性能再芋。
線程池實現(xiàn)原理
當(dāng)向線程池提交一個任務(wù)之后,線程池的處理流程如下:
- 判斷是否達到核心線程數(shù)坚冀,若未達到济赎,則直接創(chuàng)建新的線程處理當(dāng)前傳入的任務(wù),否則進入下個流程
- 線程池中的工作隊列是否已滿记某,若未滿司训,則將任務(wù)丟入工作隊列中先存著等待處理,否則進入下個流程
- 是否達到最大線程數(shù)液南,若未達到壳猜,則創(chuàng)建新的線程處理當(dāng)前傳入的任務(wù),否則交給線程池中的飽和策略進行處理滑凉。
image.png
jdk中提供了線程池的具體實現(xiàn)统扳,實現(xiàn)類是:java.util.concurrent.ThreadPoolExecutor
,主要構(gòu)造方法:ThreadPoolExecutor
類型 | 類名 | 描述 |
---|---|---|
接口 | Executor | 最上層的接口畅姊,定義了執(zhí)行任務(wù)的 execute |
接口 | ExecutorService | 繼承Executor接口咒钟,擴展Callable,Tutrue若未,關(guān)閉方法 |
接口 | ScheduledExecutorService | 繼承ExecutorService朱嘴,增加定時任務(wù)相關(guān)方法 |
實現(xiàn)類 | ThreadPoolExecutor | 基礎(chǔ),標(biāo)準(zhǔn)的線程池實現(xiàn) |
實現(xiàn)類 | ScheduledThreadPoolExecutor | 繼承ThreadPoolExecutor粗合,實現(xiàn)ScheduledExecutorService定時任務(wù)的方法 |
ExecutorService
//優(yōu)雅關(guān)閉線程萍嬉,之前提交的任務(wù)繼續(xù)執(zhí)行乌昔,但不接受新的任務(wù)
void shutdown();
//嘗試停止所有正在執(zhí)行的任務(wù),停止等待任務(wù)的處理帚湘,并返回等待執(zhí)行任務(wù)的列表
List<Runnable> shutdownNow();
//如果關(guān)閉后所有任務(wù)都已完成玫荣,則返回true
boolean isTerminated();
//監(jiān)測線程池是否關(guān)閉,直到所有任務(wù)完成執(zhí)行大诸,或超時發(fā)生捅厂,或當(dāng)前線程被中斷
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;
//提交一個用于執(zhí)行Callable返回任務(wù),并返回一個Future资柔,用獲取Callable執(zhí)行結(jié)果
<T> Future<T> submit(Callable<T> task);
//提交一個運行任務(wù)執(zhí)行焙贷,并返回一個Future, 執(zhí)行結(jié)果為傳入result
<T> Future<T> submit(Runnable task, T result);
//提交一個運行任務(wù)執(zhí)行贿堰,并返回一個Future辙芍, 執(zhí)行結(jié)果null
Future<?> submit(Runnable task);
//執(zhí)行給定的任務(wù)集合,執(zhí)行完畢后返回結(jié)果
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;
//執(zhí)行給定的任務(wù)集合羹与,執(zhí)行完畢或超時故硅,返回結(jié)果,其它任務(wù)終止
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;
//執(zhí)行給定的任務(wù),任意一個執(zhí)行成功纵搁,則返回結(jié)果吃衅,其它終止
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;
//執(zhí)行給定的任務(wù),任意一個執(zhí)行成功或超時,則返回結(jié)果腾誉,其它終止
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
ScheduledExecutorService
//創(chuàng)建并執(zhí)行一個一次性任務(wù)徘层,過了延遲時間會被執(zhí)行
public ScheduledFuture<?> schedule(Runnable command,
long delay, TimeUnit unit);
//創(chuàng)建并執(zhí)行一個一次性任務(wù),過了延遲時間會被執(zhí)行
public <V> ScheduledFuture<V> schedule(Callable<V> callable,
long delay, TimeUnit unit);
//創(chuàng)建并執(zhí)行一個周期性任務(wù)利职,過了給定初始延遲時間趣效,會第一次被執(zhí)行,執(zhí)行過程發(fā)生異常猪贪,那么任務(wù)終止
//一次性任務(wù)執(zhí)行超過了周期時間跷敬,下一次任務(wù)會等到該任務(wù)執(zhí)行結(jié)束后,立刻執(zhí)行
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit);
//創(chuàng)建并執(zhí)行一個周期性任務(wù)热押,過了給定初始延遲時間西傀,第一次被執(zhí)行,后續(xù)以給定的周期時間執(zhí)行
//執(zhí)行過程中發(fā)生異常楞黄,那么任務(wù)就停止池凄。
//一次性任務(wù)執(zhí)行超過了周期時間抡驼,下一次任務(wù)在該任務(wù)執(zhí)行結(jié)束的時間基礎(chǔ)上鬼廓,計算執(zhí)行延時
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit);
ThreadPoolExecutor
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
corePoolSize:核心線程大小,當(dāng)提交一個任務(wù)到線程池時致盟,線程池會創(chuàng)建一個線程來執(zhí)行任務(wù)碎税,即使有其他空閑線程可以處理任務(wù)也會創(chuàng)新線程尤慰,等到工作的線程數(shù)大于核心線程數(shù)時就不會在創(chuàng)建了。如果調(diào)用了線程池的prestartAllCoreThreads
方法雷蹂,線程池會提前把核心線程都創(chuàng)造好伟端,并啟動
maximumPoolSize:線程池允許創(chuàng)建的最大線程數(shù)。如果隊列滿了匪煌,并且以創(chuàng)建的線程數(shù)小于最大線程數(shù)责蝠,則線程池會再創(chuàng)建新的線程執(zhí)行任務(wù)。如果我們使用了無界隊列萎庭,那么所有的任務(wù)會加入隊列霜医,這個參數(shù)就沒有什么效果了
keepAliveTime:線程池的工作線程空閑后,保持存活的時間驳规。如果沒有任務(wù)處理了肴敛,有些線程會空閑,空閑的時間超過了這個值吗购,會被回收掉医男。如果任務(wù)很多,并且每個任務(wù)的執(zhí)行時間比較短捻勉,避免線程重復(fù)創(chuàng)建和回收镀梭,可以調(diào)大這個時間,提高線程的利用率
unit:keepAliveTIme的時間單位贯底,可以選擇的單位有天丰辣、小時、分鐘禽捆、毫秒笙什、微妙、千分之一毫秒和納秒胚想。類型是一個枚舉java.util.concurrent.TimeUnit
琐凭,這個枚舉也經(jīng)常使用,有興趣的可以看一下其源碼
workQueue:工作隊列浊服,用于緩存待處理任務(wù)的阻塞隊列统屈,常見的有4種,本文后面有介紹
threadFactory:線程池中創(chuàng)建線程的工廠牙躺,可以通過線程工廠給每個創(chuàng)建出來的線程設(shè)置更有意義的名字
handler:飽和策略愁憔,當(dāng)線程池?zé)o法處理新來的任務(wù)了,那么需要提供一種策略處理提交的新任務(wù)孽拷,默認(rèn)有4種策略
調(diào)用線程池的execute方法處理任務(wù)吨掌,執(zhí)行execute方法的過程:
- 判斷線程池中運行的線程數(shù)是否小于corepoolsize,是:則創(chuàng)建新的線程來處理任務(wù),否:執(zhí)行下一步
- 試圖將任務(wù)添加到workQueue指定的隊列中膜宋,如果無法添加到隊列窿侈,進入下一步
- 判斷線程池中運行的線程數(shù)是否小于
maximumPoolSize
,是:則新增線程處理當(dāng)前傳入的任務(wù)秋茫,否:將任務(wù)傳遞給handler
對象rejectedExecution
方法處理
線程池的使用步驟:
- 調(diào)用構(gòu)造方法創(chuàng)建線程池
- 調(diào)用線程池的方法處理任務(wù)
- 關(guān)閉線程池
/**
* ThreadPoolExecutor 線程池
*
*/
public class ThreadPoolExecutorDemo {
static ThreadPoolExecutor executor = new ThreadPoolExecutor(3,
5,
10,
TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(10),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy());
public static void main(String[] args) {
for (int i=0;i<10;i++){
int j = i;
String taskName = "任務(wù)"+j;
executor.execute(() -> {
try {
TimeUnit.SECONDS.sleep(j);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+taskName+"處理完畢");
});
}
executor.shutdown();
}
}
線程池中常見5種工作隊列
任務(wù)太多的時候史简,工作隊列用于暫時緩存待處理的任務(wù),jdk中常見的5種阻塞隊列:
ArrayBlockingQueue:是一個基于數(shù)組結(jié)構(gòu)的有界阻塞隊列肛著,此隊列按照先進先出原則對元素進行排序
LinkedBlockingQueue:是一個基于鏈表結(jié)構(gòu)的阻塞隊列圆兵,此隊列按照先進先出排序元素,吞吐量通常要高于ArrayBlockingQueue枢贿。靜態(tài)工廠方法Executors.newFixedThreadPool
使用了這個隊列衙傀。
SynchronousQueue :一個不存儲元素的阻塞隊列,每個插入操作必須等到另外一個線程調(diào)用移除操作萨咕,否則插入操作一直處理阻塞狀態(tài)统抬,吞吐量通常要高于LinkedBlockingQueue,靜態(tài)工廠方法Executors.newCachedThreadPool
使用這個隊列
PriorityBlockingQueue:優(yōu)先級隊列危队,進入隊列的元素按照優(yōu)先級會進行排序
public static void main(String[] args) {
ExecutorService executorService = Executors.newCachedThreadPool();
System.out.println("開始");
for (int i=0;i<50;i++){
int j = i;
String taskName = "任務(wù)"+j;
executorService.execute(() -> {
System.out.println(Thread.currentThread().getName()+" 處理 "+taskName);
try {
//模擬任務(wù)內(nèi)部處理耗時
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
System.out.println("結(jié)束");
executorService.shutdown();
}
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
使用上面的方式創(chuàng)建線程池需要注意聪建,如果需要處理的任務(wù)比較耗時,會導(dǎo)致新來的任務(wù)都會創(chuàng)建新的線程進行處理茫陆,可能會導(dǎo)致創(chuàng)建非常多的線程金麸,最終耗盡系統(tǒng)資源,觸發(fā)OOM簿盅。
PriorityBlockingQueue優(yōu)先級隊列的線程池
static class Task implements Runnable,Comparable<Task>{
private int i;
private String name;
public Task(int i,String name){
this.i = i;
this.name=name;
}
@Override
public void run() {
System.out.println(Thread.currentThread().getName()+" 處理 "+this.name);
}
@Override
public int compareTo(Task o) {
return Integer.compare(o.i,this.i);
}
}
public static void main(String[] args) {
ExecutorService executorService = new ThreadPoolExecutor(1,1,60L,
TimeUnit.SECONDS,new PriorityBlockingQueue<>());
for (int i=0;i<10;i++){
int j = i;
String taskName = " 任務(wù) "+j;
executorService.execute(new Task(i,taskName));
}
for (int i=100;i>=90;i--){
int j = i;
String taskName = " 任務(wù) "+j;
executorService.execute(new Task(i,taskName));
}
executor.shutdown();
}
除了第一個任務(wù)挥下,其他任務(wù)按照優(yōu)先級高低按順序處理。原因在于:創(chuàng)建線程池的時候使用了優(yōu)先級隊列桨醋,進入隊列中的任務(wù)會進行排序午笛,任務(wù)的先后順序由Task中的i變量決定驶冒。向PriorityBlockingQueue
加入元素的時候,內(nèi)部會調(diào)用代碼中Task的compareTo
方法決定元素的先后順序。
自定義創(chuàng)建線程的工廠
給線程池中線程起一個有意義的名字我碟,在系統(tǒng)出現(xiàn)問題的時候官研,通過線程堆棧信息可以更容易發(fā)現(xiàn)系統(tǒng)中問題所在藻治。自定義創(chuàng)建工廠需要實現(xiàn)java.util.concurrent.ThreadFactory
接口中的Thread newThread(Runnable r)
方法载矿,參數(shù)為傳入的任務(wù),需要返回一個工作線程虫蝶。
static AtomicInteger threadNum = new AtomicInteger(1);
public static void main(String[] args) {
ThreadPoolExecutor executor = new ThreadPoolExecutor(5,5,
60L, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(10),
r -> {
Thread thread = new Thread(r);
thread.setName("自定義線程--"+threadNum.getAndIncrement());
return thread;
});
for (int i=0;i<5;i++){
String taskName = "任務(wù)--" + i;
executor.execute(() ->{
System.out.println(Thread.currentThread().getName()+" , 處理: "+taskName);
});
}
executor.shutdown();
}
4種常見飽和策略
當(dāng)線程池中隊列已滿章咧,并且線程池已達到最大線程數(shù),線程池會將任務(wù)傳遞給飽和策略進行處理能真。這些策略都實現(xiàn)了RejectedExecutionHandler
接口赁严。接口中有個方法:
void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
r:需要執(zhí)行的任務(wù)
executor:當(dāng)前線程池對象
JDK中提供了4種常見的飽和策略:
AbortPolicy:直接拋出異常
CallerRunsPolicy:在當(dāng)前調(diào)用者的線程中運行任務(wù)调限,即隨丟來的任務(wù),由他自己去處理
DiscardOldestPolicy:丟棄隊列中最老的一個任務(wù)误澳,即丟棄隊列頭部的一個任務(wù),然后執(zhí)行當(dāng)前傳入的任務(wù)
DiscardPolicy:不處理秦躯,直接丟棄掉忆谓,方法內(nèi)部為空
自定義飽和策略
需要實現(xiàn)RejectedExecutionHandler
接口。任務(wù)無法處理的時候踱承,我們想記錄一下日志倡缠,我們需要自定義一個飽和策略,示例代碼:
static class Task implements Runnable{
private String name;
public Task(String name){
this.name = name;
}
@Override
public void run() {
try {
System.out.println(Thread.currentThread().getName()+", 處理="+this.name);
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public String toString() {
return "Task{name="+name+"}";
}
}
public static void main(String[] args) {
ThreadPoolExecutor executor = new ThreadPoolExecutor(1,1,
60L,TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(1),
Executors.defaultThreadFactory(),
(r,rexecutor) -> {
//自定義飽和策略
//記錄一下無法處理的任務(wù)
System.out.println("無法處理的任務(wù): "+r.toString());
});
for (int i=0;i<5;i++){
executor.execute(new Task(" 任務(wù)-"+i));
}
executor.shutdown();
}
輸出結(jié)果中可以看到有3個任務(wù)進入了飽和策略中茎活,記錄了任務(wù)的日志昙沦,對于無法處理多任務(wù),我們最好能夠記錄一下载荔,讓開發(fā)人員能夠知道盾饮。任務(wù)進入了飽和策略,說明線程池的配置可能不是太合理懒熙,或者機器的性能有限丘损,需要做一些優(yōu)化調(diào)整。
擴展線程池
ThreadPoolExecutor
內(nèi)部提供了幾個方法beforeExecute
工扎、afterExecute
徘钥、terminated
,可以由開發(fā)人員自己去這些方法肢娘〕蚀。看一下線程池內(nèi)部的源碼:
beforeExecute:任務(wù)執(zhí)行之前調(diào)用的方法,有2個參數(shù)橱健,第1個參數(shù)是執(zhí)行任務(wù)的線程而钞,第2個參數(shù)是任務(wù)
protected void beforeExecute(Thread t, Runnable r) { }
afterExecute:任務(wù)執(zhí)行完成之后調(diào)用的方法,2個參數(shù)拘荡,第1個參數(shù)表示任務(wù)笨忌,第2個參數(shù)表示任務(wù)執(zhí)行時的異常信息,如果無異常俱病,第二個參數(shù)為null
protected void afterExecute(Runnable r, Throwable t) { }
terminated:線程池最終關(guān)閉之后調(diào)用的方法官疲。所有的工作線程都退出了,最終線程池會退出亮隙,退出時調(diào)用該方法
protected void terminated() { }
public static void main(String[] args) throws InterruptedException {
ThreadPoolExecutor executor = new ThreadPoolExecutor(10,
10,60L,TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(1),
Executors.defaultThreadFactory(),
(r,executors) -> {
//自定義飽和策略
//記錄一下無法處理的任務(wù)
System.out.println("無法處理的任務(wù)-" + r.toString());
}){
@Override
protected void beforeExecute(Thread t, Runnable r) {
System.out.println(System.currentTimeMillis()+","+t.getName()+",開始執(zhí)行任務(wù):" + r.toString());
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
System.out.println(System.currentTimeMillis()+","+Thread.currentThread().getName()+",r任務(wù):" + r.toString()+",執(zhí)行完畢");
}
@Override
protected void terminated() {
System.out.println(System.currentTimeMillis()+","+Thread.currentThread().getName() + ",關(guān)閉線程池");
}
};
for (int i= 0;i<10;i++){
executor.execute(new Task("任務(wù)-"+i));
}
TimeUnit.SECONDS.sleep(2);
executor.shutdown();
}
JUC中的Executors框架
Excecutor框架主要包含3部分的內(nèi)容:
- 任務(wù)相關(guān)的:包含被執(zhí)行的任務(wù)要實現(xiàn)的接口:Runnable接口或Callable接口
- 任務(wù)的執(zhí)行相關(guān)的:包含任務(wù)執(zhí)行機制的核心接口Executor途凫,以及繼承自
Executor
的ExecutorService
接口。Executor框架中有兩個關(guān)鍵的類實現(xiàn)了ExecutorService接口(ThreadPoolExecutor
和ScheduleThreadPoolExecutor
) - 異步計算結(jié)果相關(guān)的:包含接口Future和實現(xiàn)Future接口的FutureTask類
Executors框架包括:
- Executor
- ExecutorService
- ThreadPoolExecutor
- Executors
- Future
- Callable
- FutureTask
- CompletableFuture
- CompletionService
- ExecutorCompletionService
Executors類
Executors類溢吻,提供了一系列工廠方法用于創(chuàng)建線程池维费,返回的線程池都實現(xiàn)了ExecutorService接口果元。常用的方法有:
newSingleThreadExecutor
public static ExecutorService newSingleThreadExecutor(){}
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {}
創(chuàng)建一個單線程的線程池。這個線程池只有一個線程在工作犀盟,也就是相當(dāng)于單線程串行執(zhí)行所有任務(wù)而晒。如果這個唯一的線程因為異常結(jié)束,那么會有一個新的線程來替代它阅畴。此線程池保證所有任務(wù)的執(zhí)行順序按照任務(wù)的提交順序執(zhí)行倡怎。內(nèi)部使用了無限容量的LinkedBlockingQueue阻塞隊列來緩存任務(wù),任務(wù)如果比較多贱枣,單線程如果處理不過來监署,會導(dǎo)致隊列堆滿,引發(fā)OOM纽哥。
newFixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) {}
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {}
創(chuàng)建固定大小的線程池钠乏。每次提交一個任務(wù)就創(chuàng)建一個線程,直到線程達到線程池的最大大小春塌。線程池的大小一旦達到最大值就會保持不變晓避,在提交新任務(wù),任務(wù)將會進入等待隊列中等待只壳。如果某個線程因為執(zhí)行異常而結(jié)束够滑,那么線程池會補充一個新線程。內(nèi)部使用了無限容量的LinkedBlockingQueue阻塞隊列來緩存任務(wù)吕世,任務(wù)如果比較多彰触,如果處理不過來,會導(dǎo)致隊列堆滿命辖,引發(fā)OOM况毅。
newScheduledThreadPool
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {}
public static ScheduledExecutorService newScheduledThreadPool(
int corePoolSize, ThreadFactory threadFactory) {}
創(chuàng)建一個大小無限的線程池。此線程池支持定時以及周期性執(zhí)行任務(wù)的需求尔艇。
在《阿里巴巴java開發(fā)手冊》中指出了線程資源必須通過線程池提供,
使用ThreadPoolExecutor有助于大家明確線程池的運行規(guī)則尔许,創(chuàng)建符合自己的業(yè)務(wù)場景需要的線程池,避免資源耗盡的風(fēng)險终娃。
Future味廊、Callable接口
Future
接口定義了操作異步任務(wù)執(zhí)行一些方法,如獲取異步任務(wù)的執(zhí)行結(jié)果棠耕、取消任務(wù)的執(zhí)行余佛、判斷任務(wù)是否被取消、判斷任務(wù)執(zhí)行是否完畢等窍荧。
Callable
接口中定義了需要有返回的任務(wù)需要實現(xiàn)的方法辉巡。
@FunctionalInterface
public interface Callable<V> {
V call() throws Exception;
}
比如主線程讓一個子線程去執(zhí)行任務(wù),子線程可能比較耗時蕊退,啟動子線程開始執(zhí)行任務(wù)后郊楣,主線程就去做其他事情了憔恳,過了一會才去獲取子任務(wù)的執(zhí)行結(jié)果。
//獲取異步任務(wù)執(zhí)行結(jié)果
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(1);
Future<Integer> result = executorService.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
System.out.println(System.currentTimeMillis()+","+Thread.currentThread().getName()+",開始");
TimeUnit.SECONDS.sleep(5);
System.out.println(System.currentTimeMillis()+","+Thread.currentThread().getName()+",結(jié)束");
return 10;
}
});
System.out.println(System.currentTimeMillis() +" "+ Thread.currentThread().getName());
try {//get()阻塞等待結(jié)果返回
System.out.println(System.currentTimeMillis() +" "+ Thread.currentThread().getName()+", result="+result.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}finally {
executorService.shutdown();
}
}
代碼中創(chuàng)建了一個線程池净蚤,調(diào)用線程池的submit
方法執(zhí)行任務(wù)钥组,submit參數(shù)為Callable
接口:表示需要執(zhí)行的任務(wù)有返回值,submit方法返回一個Future
對象今瀑,F(xiàn)uture相當(dāng)于一個憑證程梦,可以在任意時間拿著這個憑證去獲取對應(yīng)任務(wù)的執(zhí)行結(jié)果(調(diào)用其get
方法),代碼中調(diào)用了result.get()
方法之后放椰,此方法會阻塞當(dāng)前線程直到任務(wù)執(zhí)行結(jié)束。
超時獲取異步任務(wù)執(zhí)行結(jié)果
可能任務(wù)執(zhí)行比較耗時愉粤,比如耗時1分鐘砾医,我最多只能等待10秒,如果10秒還沒返回衣厘,我就去做其他事情了如蚜。
剛好get有個超時的方法,聲明如下:
java.util.concurrent.Future;
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
Future
其他方法介紹一下
cancel:取消在執(zhí)行的任務(wù)影暴,參數(shù)表示是否對執(zhí)行的任務(wù)發(fā)送中斷信號错邦,方法聲明如下:
boolean cancel(boolean mayInterruptIfRunning);
isCancelled:用來判斷任務(wù)是否被取消
isDone:判斷任務(wù)是否執(zhí)行完畢。
Future型宙、Callable接口需要結(jié)合ExecutorService來使用撬呢,需要有線程池的支持。
FutureTask類
FutureTask除了實現(xiàn)Future接口妆兑,還實現(xiàn)了Runnable接口魂拦,因此FutureTask可以交給Executor執(zhí)行,也可以交給線程執(zhí)行執(zhí)行(Thread有個Runnable的構(gòu)造方法)搁嗓,FutureTask表示帶返回值結(jié)果的任務(wù)芯勘。
public static void main(String[] args) throws ExecutionException, InterruptedException {
FutureTask<Integer> futureTask = new FutureTask<Integer>(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
System.out.println(System.currentTimeMillis() + "開始" + Thread.currentThread().getName());
TimeUnit.SECONDS.sleep(3);
System.out.println(System.currentTimeMillis() + " 結(jié)束 " + Thread.currentThread().getName());
return 10;
}
});
System.out.println(System.currentTimeMillis() + " " + Thread.currentThread().getName());
new Thread(futureTask).start();//線程啟動Futuretask
System.out.println(System.currentTimeMillis() + " " + Thread.currentThread().getName());
System.out.println(System.currentTimeMillis() +" "+ Thread.currentThread().getName()+",result:"+futureTask.get());
}
線程池的submit方法返回的Future實際類型正是FutureTask對象
CompletionService接口
java.util.concurrent.CompletionService
CompletionService相當(dāng)于一個執(zhí)行任務(wù)的服務(wù),通過submit丟任務(wù)給這個服務(wù)腺逛,服務(wù)內(nèi)部去執(zhí)行任務(wù)荷愕,可以通過服務(wù)提供的一些方法獲取服務(wù)中已經(jīng)完成的任務(wù)。
接口內(nèi)的幾個方法:
Future<V> submit(Callable<V> task);
用于向服務(wù)中提交有返回結(jié)果的任務(wù)棍矛,并返回Future對象
Future<V> take() throws InterruptedException;
從服務(wù)中返回并移除一個已經(jīng)完成的任務(wù)安疗,如果獲取不到,會一致阻塞到有返回值為止够委。此方法會響應(yīng)線程中斷茂契。
從服務(wù)中返回并移除一個已經(jīng)完成的任務(wù),如果獲取不到慨绳,會一致阻塞到有返回值為止掉冶。此方法會響應(yīng)線程中斷真竖。
Future<V> poll();
通過submit向內(nèi)部提交任意多個任務(wù),通過take方法可以獲取已經(jīng)執(zhí)行完成的任務(wù)厌小,如果獲取不到將等待恢共。
ExecutorCompletionService 類
ExecutorCompletionService類是CompletionService接口的具體實現(xiàn);
ExecutorCompletionService創(chuàng)建的時候會傳入一個線程池璧亚,調(diào)用submit方法傳入需要執(zhí)行的任務(wù)讨韭,任務(wù)由內(nèi)部的線程池來處理;ExecutorCompletionService內(nèi)部有個阻塞隊列癣蟋,任意一個任務(wù)完成之后透硝,會將任務(wù)的執(zhí)行結(jié)果(Future類型)放入阻塞隊列中,然后其他線程可以調(diào)用它take疯搅、poll方法從這個阻塞隊列中獲取一個已經(jīng)完成的任務(wù)濒生,獲取任務(wù)返回結(jié)果的順序和任務(wù)執(zhí)行完成的先后順序一致,所以最先完成的任務(wù)會先返回幔欧。
public ExecutorCompletionService(Executor executor) {
if (executor == null)
throw new NullPointerException();
this.executor = executor;
this.aes = (executor instanceof AbstractExecutorService) ?
(AbstractExecutorService) executor : null;
this.completionQueue = new LinkedBlockingQueue<Future<V>>();
}
構(gòu)造方法需要傳入一個Executor對象罪治,這個對象表示任務(wù)執(zhí)行器,所有傳入的任務(wù)會被這個執(zhí)行器執(zhí)行礁蔗。
completionQueue
是用來存儲任務(wù)結(jié)果的阻塞隊列觉义,默認(rèn)用采用的是LinkedBlockingQueue
,也支持開發(fā)自己設(shè)置浴井。通過submit傳入需要執(zhí)行的任務(wù)晒骇,任務(wù)執(zhí)行完成之后,會放入completionQueue
中磺浙。
執(zhí)行一批任務(wù)厉碟,然后消費執(zhí)行結(jié)果
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(5);
List<Callable<Integer>> list = new ArrayList<>();
int taskCount = 5;
for (int i = taskCount;i>0;i--){
int j=i*2;
list.add(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
TimeUnit.SECONDS.sleep(j);
return j;
}
});
}
soive(executorService,list,new Consumer<Integer>(){
@Override
public void accept(Integer integer) {
System.out.println(System.currentTimeMillis()+" = "+integer);
}
});
executorService.shutdown();
}
private static <T> void soive(ExecutorService executorService, List<Callable<T>> solvers, Consumer<T> consumer) throws InterruptedException, ExecutionException {
CompletionService<T> ecs = new ExecutorCompletionService<T>(executorService);
for (Callable<T> s:solvers){
ecs.submit(s);//提交任務(wù)
}
int n = solvers.size();
for (int i = 0;i<n;i++){
T r = ecs.take().get();
if (r !=null){
consumer.accept(r);//消費任務(wù)
}
}
}