1 線程池簡介
1.1 為什么使用線程池
- 降低系統(tǒng)資源消耗回窘,通過重用已存在的線程,降低線程創(chuàng)建和銷毀造成的消耗市袖;
- 提高系統(tǒng)響應速度啡直,當有任務到達時,通過復用已存在的線程苍碟,無需等待新線程的創(chuàng)建便能立即執(zhí)行酒觅;
- 方便線程并發(fā)數(shù)的管控,因為線程若是無限制的創(chuàng)建微峰,可能會導致內存占用過多而產(chǎn)生
OOM
阐滩,并且會造成cpu
過度切換(cpu
切換線程是有時間成本的(需要保持當前執(zhí)行線程的現(xiàn)場,并恢復要執(zhí)行線程的現(xiàn)場) - 提供更強大的功能县忌,延時定時線程池
1.2 線程池為什么需要使用隊列
因為線程若是無限制的創(chuàng)建掂榔,可能會導致內存
占用過多而產(chǎn)生OOM
,并且會造成cpu
過度切換症杏。
創(chuàng)建線程池的消耗較高或者線程池創(chuàng)建線程需要獲取mainlock
這個全局鎖装获,影響并發(fā)效率,阻塞隊列可以很好的緩沖
1.3 線程池為什么要使用阻塞隊列而不使用非阻塞隊列
阻塞隊列
可以保證任務隊列中沒有任務時阻塞獲取任務的線程厉颤,使得線程進入wait
狀態(tài)穴豫,釋放cpu
資源,當隊列中有任務時才喚醒對應線程從隊列中取出消息進行執(zhí)行。
使得在線程不至于一直占用cpu
資源精肃。(線程執(zhí)行完任務后通過循環(huán)再次從任務隊列中取出任務進行執(zhí)行秤涩,代碼片段如:while (task != null || (task = getTask()) != null) {}
)。
不用阻塞隊列也是可以的司抱,不過實現(xiàn)起來比較麻煩而已筐眷,有好用的為啥不用呢
1.4 如何配置線程池
CPU
密集型任務
盡量使用較小的線程池,一般為CPU
核心數(shù)+1
习柠。 因為CPU
密集型任務使得CPU
使用率很高匀谣,若開過多的線程數(shù),會造成CPU
過度切換IO
密集型任務
可以使用稍大的線程池资溃,一般為2*CPU
核心數(shù)武翎。IO
密集型任務CPU
使用率并不高,因此可以讓CPU
在等待IO
的時候有其他線程去處理別的任務溶锭,充分利用CPU
時間混合型任務
可以將任務分成IO
密集型和CPU
密集型任務宝恶,然后分別用不同的線程池去處理。 只要分完之后兩個任務的執(zhí)行時間相差不大趴捅,那么就會比串行執(zhí)行來的高效
因為如果劃分之后兩個任務執(zhí)行時間有數(shù)據(jù)級的差距卑惜,那么拆分沒有意義。
因為先執(zhí)行完的任務就要等后執(zhí)行完的任務驻售,最終的時間仍然取決于后執(zhí)行完的任務露久,而且還要加上任務拆分與合并的開銷,得不償失
1.5 execute()和submit()方法
-
execute()
欺栗,執(zhí)行一個任務毫痕,沒有返回值 -
submit()
,提交一個線程任務,有返回值
submit(Callable<T> task)
能獲取到它的返回值,通過future.get()
獲然(阻塞
直到任務執(zhí)行完)煌珊。一般使用FutureTask+Callable
配合使用
submit(Runnable task, T result)
能通過傳入的載體result
間接獲得線程的返回值脸秽。
submit(Runnable task)
則是沒有返回值的,就算獲取它的返回值也是null
Future.get()
方法會使取結果的線程進入阻塞狀態(tài)
,直到線程執(zhí)行完成之后,喚醒取結果的線程缸逃,然后返回結果
1.6 Spring線程池
Spring
通過任務執(zhí)行器(TaskExecutor
)來實現(xiàn)多線程和并發(fā)編程,使用ThreadPoolTaskExecutor
實現(xiàn)一個基于線程池的TaskExecutor
厂抽,
還得需要使用@EnableAsync
開啟異步需频,并通過在需要的異步方法那里使用注解@Async
聲明是一個異步任務
Spring
已經(jīng)實現(xiàn)的異常線程池:
-
SimpleAsyncTaskExecutor
:不是真的線程池,這個類不重用線程筷凤,每次調用都會創(chuàng)建一個新的線程昭殉。 -
SyncTaskExecutor
:這個類沒有實現(xiàn)異步調用,只是一個同步操作。只適用于不需要多線程的地方 -
ConcurrentTaskExecutor
:Executor
的適配類挪丢,不推薦使用蹂风。如果ThreadPoolTaskExecutor
不滿足要求時,才用考慮使用這個類 -
SimpleThreadPoolTaskExecutor
:是Quartz的SimpleThreadPool
的類乾蓬。線程池同時被quartz和非quartz使用惠啄,才需要使用此類 -
ThreadPoolTaskExecutor
:最常使用,推薦巢块。 其實質是對java.util.concurrent.ThreadPoolExecutor
的包裝
1.7 @Async調用中的事務處理機制
2 示例
2.1 線程池配置類
package cn.jzh.thread;
import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
@Configuration
@ComponentScan("cn.jzh.thread")
@EnableAsync //開啟異步操作
public class TaskExecutorConfig implements AsyncConfigurer {
/**
* 通過getAsyncExecutor方法配置ThreadPoolTaskExecutor,獲得一個基于線程池TaskExecutor
*
* @return
*/
@Override
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor pool = new ThreadPoolTaskExecutor();
pool.setCorePoolSize(5);//核心線程數(shù)
pool.setMaxPoolSize(10);//最大線程數(shù)
pool.setQueueCapacity(25);//線程隊列
pool.initialize();//線程初始化
return pool;
}
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return null;
}
}
配置類中方法說明:
Spring
中的ThreadPoolExecutor
是借助JDK
并發(fā)包中的java.util.concurrent.ThreadPoolExecutor
來實現(xiàn)的。其中一些值的含義如下:
-
int corePoolSize:
線程池維護線程的最小數(shù)量 -
int maximumPoolSize:
線程池維護線程的最大數(shù)量巧号,線程池中允許的最大線程數(shù)族奢,線程池中的當前線程數(shù)目不會超過該值。如果隊列中任務已滿丹鸿,并且當前線程個數(shù)小于maximumPoolSize
越走,那么會創(chuàng)建新的線程來執(zhí)行任務。 -
long keepAliveTime:
空閑線程的存活時間TimeUnit
-
unit:
時間單位靠欢,現(xiàn)由納秒廊敌,微秒,毫秒门怪,秒 -
BlockingQueue workQueue:
持有等待執(zhí)行的任務隊列 -
RejectedExecutionHandler
handler 線程池的拒絕策略骡澈,是指當任務添加到線程池中被拒絕,而采取的處理措施掷空。
當任務添加到線程池中之所以被拒絕肋殴,可能是由于:第一,線程池異常關閉坦弟。第二护锤,任務數(shù)量超過線程池的最大限制。
Reject
策略預定義有四種:
ThreadPoolExecutor.AbortPolicy
策略酿傍,是默認的策略,處理程序遭到拒絕將拋出運行時RejectedExecutionException
ThreadPoolExecutor.CallerRunsPolicy
策略 ,調用者的線程會執(zhí)行該任務,如果執(zhí)行器已關閉,則丟棄.ThreadPoolExecutor.DiscardPolicy
策略烙懦,不能執(zhí)行的任務將被丟棄.ThreadPoolExecutor.DiscardOldestPolicy
策略,如果執(zhí)行程序尚未關閉赤炒,則位于工作隊列頭部的任務將被刪除氯析,然后重試執(zhí)行程序(如
果再次失敗,則重復此過程)- 自定義策略:當然也可以根據(jù)應用場景需要來實現(xiàn)
RejectedExecutionHandler
接口自定義策略莺褒。如記錄日志或持久化不能處理的任務
2.2 異步方法
@Async
注解可以用在方法上魄鸦,表示該方法是個異步方法,也可以用在類上癣朗,那么表示此類的所有方法都是異步方法
異步方法會自動注入使用ThreadPoolTaskExecutor
作為TaskExecutor
package cn.jzh.thread;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.AsyncResult;
import org.springframework.stereotype.Service;
import java.util.concurrent.Future;
@Service
public class AsyncTaskService {
/**
*
* @param i
*/
@Async
public void executeAsync(Integer i) throws Exception{
System.out.println("線程ID:" + Thread.currentThread().getId() + "線程名字:" +Thread.currentThread().getName()+"執(zhí)行異步任務:" + i);
}
@Async
public Future<String> executeAsyncPlus(Integer i) throws Exception {
System.out.println("線程ID:" + Thread.currentThread().getId() +"線程名字:" +Thread.currentThread().getName()+ "執(zhí)行異步有返回的任務:" + i);
return new AsyncResult<>("success:"+i);
}
}
2.3 啟動測試
package cn.jzh.thread;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import java.util.concurrent.Future;
public class MainApp {
public static void main(String[] args) throws Exception{
System.out.println("主線程id:" + Thread.currentThread().getId() + "開始執(zhí)行調用任務...");
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(TaskExecutorConfig.class);
AsyncTaskService service = context.getBean(AsyncTaskService.class);
for (int i = 0;i<10;i++){
service.executeAsync(i);
Future<String> result = service.executeAsyncPlus(i);
System.out.println("異步程序執(zhí)行結束拾因,獲取子線程返回內容(會阻塞當前main線程)" + result.get());
}
context.close();
System.out.println("主線程id:" + Thread.currentThread().getId() + "程序結束!!");
}
}
注意:
- 是否影響主線程
如果main
主線程不去獲取子線程的結果(Future.get()
),那么主線程完全可以不阻塞。那么绢记,此時扁达,主線程和子線程完全異步。此功能蠢熄,可以做成類似MQ
消息中間件之類的跪解,消息異步進行發(fā)送 - 判斷是否執(zhí)行完畢
當返回的數(shù)據(jù)類型為Future
類型,其為一個接口签孔。具體的結果類型為AsyncResult
,這個是需要注意的地方叉讥。
調用返回結果的異步方法,判斷是否執(zhí)行完畢時需要使用future.isDone()
來判斷是否執(zhí)行完畢
public void testAsyncAnnotationForMethodsWithReturnType()
throws InterruptedException, ExecutionException {
System.out.println("Invoking an asynchronous method. " + Thread.currentThread().getName());
Future<String> future = asyncAnnotationExample.asyncMethodWithReturnType();
while (true) { ///這里使用了循環(huán)判斷饥追,等待獲取結果信息
if (future.isDone()) { //判斷是否執(zhí)行完畢
System.out.println("Result from asynchronous process - " + future.get());
break;
}
System.out.println("Continue doing something else. ");
Thread.sleep(1000);
}
}
這些獲取異步方法的結果信息图仓,是通過不停的檢查Future
的狀態(tài)來獲取當前的異步方法是否執(zhí)行完畢來實現(xiàn)的