1,為什么要用線程池,優(yōu)勢
(1)降低資源消耗同波,通過重復利用已創(chuàng)建的線程降低線程創(chuàng)建和銷毀造成的消耗略贮。
(2) 提高響應(yīng)速度兄淫,當任務(wù)到達時绑莺,任務(wù)可以不需要的等到線程創(chuàng)建就能立即執(zhí)行哈误。
(3)? 提高線程的可管理性哩至,線程是稀缺資源,如果無限制的創(chuàng)建蜜自,不僅會消耗系統(tǒng)資源菩貌,還會降低系統(tǒng)的穩(wěn)定性,使用線程池可以進行統(tǒng)一的分配重荠,調(diào)優(yōu)和監(jiān)控箭阶。
1.1常用方式
那java中是怎樣實現(xiàn)的線程池呢?是通過Executor框架實現(xiàn)的,該框架中用到了Executor仇参,Executors嘹叫,ExecutorService,ThreadPoolExecutor這幾個接口或類诈乒,它們都是JUC包下的罩扇。 java.util.concurrent.Executors類是Executor的輔助類,類似于java中操作數(shù)組的輔助類java.util.Arrays怕磨,以及操作集合的java.util.Collections類
1.2:Executors類中的主要三個方法
線程安全的隊列:staticQueue queue = new ConcurrentLinkedQueue<String>();
(1) 創(chuàng)建一個定長線程池喂饥,可控制線程最大并發(fā)數(shù),超出的線程會在隊列中的等待肠鲫,它創(chuàng)建的線程池corePoolSize和maximnumPoolSize是相等的员帮,它使用的是LinkedBlockingQueue;
源碼如下:
? public static ExecutorService newFixedThreadPool(int nThreads) {
? ? ? ? return new ThreadPoolExecutor(nThreads, nThreads,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? 0L, TimeUnit.MILLISECONDS,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? new LinkedBlockingQueue<Runnable>());
? ? }
(2)Executors#newSingleThreadExecutor
? ? 創(chuàng)建一個單線程化的線程池,它只會用唯一的工作線程來執(zhí)行任務(wù)滩届,保證所有任務(wù)按照指定順序執(zhí)行集侯,它將corePoolSize和maximnumPoolSize都設(shè)置為1,它也使用的是LinkedBlockingQueue;
源碼:
public static ExecutorService newSingleThreadExecutor() {
? ? ? ? return new FinalizableDelegatedExecutorService
? ? ? ? ? ? (new ThreadPoolExecutor(1, 1,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? 0L, TimeUnit.MILLISECONDS,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? new LinkedBlockingQueue<Runnable>()));
? ? }
(3)Executors#newCachedThreadPool
創(chuàng)建一個可緩存線程池帜消,如果線程池長度超過處理需要棠枉,可靈活回收空閑線程,若無可回收泡挺,則新建線程辈讶。,它將corePoolSize設(shè)置為0娄猫,將maximnumPoolSize設(shè)置為Integer.MAX_VALUE贱除,它使用的是SynchronousQueue,也就是說來了任務(wù)就創(chuàng)建線程運行媳溺,當前線程空閑超過60秒月幌,就銷毀線程;
源碼:
public static ExecutorService newCachedThreadPool() {
? ? ? ? return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? 60L, TimeUnit.SECONDS,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? new SynchronousQueue<Runnable>());
? ? }
2,線程池的重要參數(shù):
源碼:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
參數(shù):
corePoolSize
? ? 線程池中的常駐核心線程數(shù),在創(chuàng)建了線程池后悬蔽,當有請求任務(wù)來之后扯躺,就會安排池中的線程去執(zhí)行請求任務(wù),近似理解為今日當值線程蝎困,當線程池中的線程數(shù)目達到corePoolSize后录语,就會把到達的任務(wù)放到緩存隊列當中。
maximumPoolSize
? ? 線程池能夠容納同時執(zhí)行的最大線程數(shù)禾乘,此值必須大于等于1澎埠。
keepAliveTime
? ? 多余的空閑線程的存活時間,當前線程池數(shù)量超過corePoolSize時始藕,當空閑時間達到keepAliveTime值時蒲稳,多余空閑線程會被銷毀直到只剩下corePoolSize個線程為止氮趋。
unit
? ??keepAliveTime的單位。
workQueue
? ? 任務(wù)隊列江耀,被提交但尚未被執(zhí)行的任務(wù)凭峡。
threadFactory
? ? 表示生成線程池中工作線程的線程工廠,用于創(chuàng)建線程一般用默認的即可决记。
handler
? ? 拒絕策略摧冀,表示當隊列滿了,再也塞不下新任務(wù)了系宫,同時索昂,工作線程大于等于線程池的最大線程數(shù),無法繼續(xù)為新任務(wù)服務(wù)扩借,這時候我們就需要拒絕策略機制合理的處理這個問題椒惨,默認會拋異常, 那拒絕策略有哪些呢潮罪,我們繼續(xù)往下看康谆。
JDK內(nèi)置的接口:RejectedExcutionHandle
AbortPolicy(默認)
? ? 直接拋出java.util.concurrent.RejectedExecutionException異常阻止系統(tǒng)正常運行,這種方式顯然是不友好的嫉到。
CallerRunsPolicy
? ? "調(diào)用者運行"一種調(diào)節(jié)機制沃暗,該策略既不會拋棄任務(wù),也不會拋出異常何恶,而是將某些任務(wù)回退到調(diào)用者孽锥,從而降低新任務(wù)的流量。
DiscardOldestPolicy
? ? 拋棄隊列中等待最久的任務(wù)细层,然后把當前任務(wù)加入隊列中嘗試再次提交當前任務(wù)惜辑。
DiscardPolicy
? ? 直接丟棄任務(wù),不予任何處理也不拋出異常疫赎。如果允許任務(wù)丟失盛撑,這是最好的一種解決方案。
? ? 具體選擇哪一種的拒絕策略捧搞,也是看自己的系統(tǒng)需求了;
3,底層工作原理
(1).在創(chuàng)建了線程池后歉眷,等待提交過來的任務(wù)請求
?(2).當調(diào)用execute()方法添加一個請求任務(wù)時翻翩,線程池會做如下判斷
? ? ? ? ? ? 2.1 如果正在運行的線程數(shù)量小于corePoolSize却紧,那么馬上創(chuàng)建線程運行這個任務(wù)
? ? ? ? ? ? 2.2 如果正在運行的線程數(shù)量大于或等于corePoolSize荞雏,那么將這個任務(wù)放入隊列
? ? ? ? ? ? 2.3 如果這時候隊列滿了且正在運行的線程數(shù)量還小于maximumPoolSize轴合,那么還是要創(chuàng)建非核心線程立刻運行這個任務(wù)
? ? ? ? ? ? 2.4 如果隊列滿了且正在運行的線程數(shù)量大于或等于maximumPoolSize创坞,那么線程池會啟動飽和拒絕策略來執(zhí)行
? ? (3). 當一個線程完成任務(wù)時,它會從隊列中取下一個任務(wù)來執(zhí)行
? ? (4). 當一個線程無事可做超過一定的時間(keepAliveTime)時受葛,線程池會判斷
? ? ? ? ? ? 4.1 如果當前運行的線程數(shù)大于corePoolSize题涨,那么這個線程就被停掉
? ? ? ? ? ? 4.2 所以線程池的所有任務(wù)完成后它最終會收縮到corePoolSize的大小
創(chuàng)建線程池時偎谁,配置多少線程數(shù)是合理的:
(1)CPU密集型:CPU核數(shù)+1個線程的線程池(CPU密集任務(wù)只有在真正的多核CPU上才可能得到加速)
(2)IO密集型:O密集型時,大部分線程都阻塞纲堵,故需要多配置線程數(shù)巡雨,CPU核數(shù)/1-阻塞系數(shù)?阻塞系數(shù)在0.8至0.9之間。例如4核席函,取個樂觀值0.9铐望,可達到40個線程左右
------------------------------------------------------------------------
--------------------------------
阿里巴巴開發(fā)手冊上:出自于生產(chǎn)時間來說:
1,線程資源必須通過線程池提供,不能夠在應(yīng)用中自行創(chuàng)建線程;
2,線程池不允許使用Executors去創(chuàng)建,而是使用ThreadPoolExecutor的方式,
這樣可以明確線程池的規(guī)則,規(guī)避資源耗盡的風險;
---------------------------
SpringBoot 自定義線程池:
1,application.yml配置:
task:
pool:
corePoolSize:5#設(shè)置核心線程數(shù)
maxPoolSize:20#設(shè)置最大線程數(shù)
keepAliveSeconds:300#設(shè)置線程活躍時間(秒)
queueCapacity:50#設(shè)置隊列容量
2,線程池配置屬性類:
importorg.springframework.boot.context.properties.ConfigurationProperties;
/**
* 線程池配置屬性類
*/
@ConfigurationProperties(prefix ="task.pool")
publicclassTaskThreadPoolConfig{
privateintcorePoolSize;
privateintmaxPoolSize;
privateintkeepAliveSeconds;
privateintqueueCapacity;
? ? ...getter and setter methods...
}
3,啟動類上加上異步支持:
@EnableAsync
@EnableConfigurationProperties({TaskThreadPoolConfig.class} )// 開啟配置屬性支持
4,自定義線程池:
/**
* 創(chuàng)建線程池配置類
*/
@Configuration
public class TaskExecutePool {
? ? @Autowired
? ? private TaskThreadPoolConfig config;
? ? /**
? ? * 1.這種形式的線程池配置是需要在使用的方法上面@Async("taskExecutor"),
? ? * 2.如果在使用的方法上面不加該注解那么spring就會使用默認的線程池
? ? * 3.所以如果加@Async注解但是不指定使用的線程池,又想自己定義線程池那么就可以重寫spring默認的線程池
? ? * 4.所以第二個方法就是重寫默認線程池
? ? * 注意:完全可以把線程池的參數(shù)寫到配置文件中
? ? */
? ? @Bean
? ? public Executor taskExecutor() {
? ? ? ? ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
? ? ? ? //核心線程池大小
? ? ? ? executor.setCorePoolSize(config.getCorePoolSize());
? ? ? ? //最大線程數(shù)
? ? ? ? executor.setMaxPoolSize(config.getMaxPoolSize());
? ? ? ? //隊列容量
? ? ? ? executor.setQueueCapacity(config.getQueueCapacity());
? ? ? ? //活躍時間
? ? ? ? executor.setKeepAliveSeconds(config.getKeepAliveSeconds());
? ? ? ? //線程名字前綴
? ? ? ? executor.setThreadNamePrefix("TaskExecutePool-");
? ? ? ? // setRejectedExecutionHandler:當pool已經(jīng)達到max size的時候茂附,如何處理新任務(wù)
? ? ? ? // CallerRunsPolicy:不在新線程中執(zhí)行任務(wù)正蛙,而是由調(diào)用者所在的線程來執(zhí)行
? ? ? ? executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
? ? ? ? // 等待所有任務(wù)結(jié)束后再關(guān)閉線程池
? ? ? ? executor.setWaitForTasksToCompleteOnShutdown(true);
? ? ? ? executor.initialize();
? ? ? ? return executor;
? ? }
}
測試:
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;
/**
* @author qijx
*/
@Api(description = "測試控制類11111")
@RestController
@RequestMapping("/threadPoolController1")
public class ThreadPoolController1 {
? ? ? ? @Autowired
? ? ? ? private ThreadPoolService1 threadPoolService;
? ? ? ? @ApiOperation(value = "測試方法")
? ? ? ? @ResponseBody
? ? ? ? @RequestMapping(value = "/test",method = RequestMethod.GET)
? ? ? ? public String threadPoolTest() {
? ? ? ? ? ? threadPoolService.executeAsync();
? ? ? ? ? ? return "hello word!";
? ? ? ? }
}
第二種方法:重寫springboot線程池:
**
* 原生(Spring)異步任務(wù)線程池裝配類,實現(xiàn)AsyncConfigurer重寫他的兩個方法营曼,這樣在使用默認的
*? 線程池的時候就會使用自己重寫的
*/
@Slf4j
@Configuration
public class NativeAsyncTaskExecutePool implements AsyncConfigurer{
? ? //注入配置類
? ? @Autowired
? ? TaskThreadPoolConfig config;
? ? @Override
? ? public Executor getAsyncExecutor() {
? ? ? ? ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
? ? ? ? //核心線程池大小
? ? ? ? executor.setCorePoolSize(config.getCorePoolSize());
? ? ? ? //最大線程數(shù)
? ? ? ? executor.setMaxPoolSize(config.getMaxPoolSize());
? ? ? ? //隊列容量
? ? ? ? executor.setQueueCapacity(config.getQueueCapacity());
? ? ? ? //活躍時間
? ? ? ? executor.setKeepAliveSeconds(config.getKeepAliveSeconds());
? ? ? ? //線程名字前綴
? ? ? ? executor.setThreadNamePrefix("NativeAsyncTaskExecutePool-");
? ? ? ? // setRejectedExecutionHandler:當pool已經(jīng)達到max size的時候乒验,如何處理新任務(wù)
? ? ? ? // CallerRunsPolicy:不在新線程中執(zhí)行任務(wù),而是由調(diào)用者所在的線程來執(zhí)行
? ? ? ? executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
? ? ? ? // 等待所有任務(wù)結(jié)束后再關(guān)閉線程池
? ? ? ? executor.setWaitForTasksToCompleteOnShutdown(true);
? ? ? ? executor.initialize();
? ? ? ? return executor;
? ? }
? ? /**
? ? *? 異步任務(wù)中異常處理
? ? * @return
? ? */
? ? @Override
? ? public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
? ? ? ? return new AsyncUncaughtExceptionHandler() {
? ? ? ? ? ? @Override
? ? ? ? ? ? public void handleUncaughtException(Throwable arg0, Method arg1, Object... arg2) {
? ? ? ? ? ? ? ? log.error("=========================="+arg0.getMessage()+"=======================", arg0);
? ? ? ? ? ? ? ? log.error("exception method:"+arg1.getName());
? ? ? ? ? ? }
? ? ? ? };
? ? }
}
測試:
/**
* @author qijx
*/
@Service
public class ThreadPoolService2 {
? ? private static final Logger logger = LoggerFactory.getLogger(ThreadPoolService2.class);
? ? /**
? ? * @Async該注解不需要在指定任何bean
? ? */
? ? @Async
? ? public void executeAsync() {
? ? ? ? logger.info("start executeAsync");
? ? ? ? try {
? ? ? ? ? ? System.out.println("當前運行的線程名稱:" + Thread.currentThread().getName());
? ? ? ? ? ? Thread.sleep(1000);
? ? ? ? } catch (Exception e) {
? ? ? ? ? ? e.printStackTrace();
? ? ? ? }
? ? ? ? logger.info("end executeAsync");
? ? }
}
------------------------------------