前言
工作中難免會(huì)使用線程池溪猿。對(duì)線程池的使用要格外的小心,說(shuō)不定某天就出現(xiàn)了難搞的生產(chǎn)問(wèn)題(OOM)讲弄。每次在使用的時(shí)候依痊,我都會(huì)網(wǎng)上找找資料,今天我就自己全部整理了一篇瓶摆,不足或錯(cuò)誤之處性宏,希望大家看完后多多補(bǔ)充,提提意見(jiàn)书斜。
1、為什么要使用多線程
我們使用多線程的本質(zhì)是為了提升程序的性能荐吉。程序的性能我們可以用2個(gè)指標(biāo)來(lái)度量:
延遲:發(fā)出請(qǐng)求到收到響應(yīng)這個(gè)過(guò)程的時(shí)間;延遲越短穿撮,意味著程序執(zhí)行得越快瞧哟,性能也就越好。
吞吐量:在單位時(shí)間內(nèi)能處理請(qǐng)求的數(shù)量咧党;吞吐量越大陨亡,意味著程序能處理的請(qǐng)求越多,性能也就越好蛙埂。
同等條件下遮糖,延遲越短,吞吐量越大屡江。但是由于它們隸屬不同的維度(一個(gè)是時(shí)間維度赛不,一個(gè)是空間維度),并不能互相轉(zhuǎn)換文黎。
我們所謂提升性能殿较,從度量的角度,主要是降低延遲抓艳,提高吞吐量。
要想“降低延遲儡首,提高吞吐量”偏友,對(duì)應(yīng)的方法呢,基本上有兩個(gè)方向氛濒,一個(gè)方向是優(yōu)化算法鹅髓,另一個(gè)方向是將硬件的性能發(fā)揮到極致。前者屬于算法范疇骗奖,后者則是和并發(fā)編程息息相關(guān)了醒串。那計(jì)算機(jī)主要有哪些硬件呢?主要是兩類(lèi):一個(gè)是 I/O仰挣,一個(gè)是 CPU缠沈。簡(jiǎn)言之,在并發(fā)編程領(lǐng)域香椎,提升性能本質(zhì)上就是提升硬件的利用率禽篱,再具體點(diǎn)來(lái)說(shuō)躺率,就是提升 I/O 的利用率和 CPU 的利用率万矾。
在單核時(shí)代,多線程主要就是用來(lái)平衡 CPU 和 I/O 設(shè)備的后添。如果程序只有 CPU 計(jì)算薪丁,而沒(méi)有 I/O 操作的話馅精,多線程不但不會(huì)提升性能洲敢,還會(huì)使性能變得更差茄蚯,原因是增加了線程切換的成本。但是在多核時(shí)代壮不,這種純計(jì)算型的程序也可以利用多線程來(lái)提升性能皱碘。為什么呢?因?yàn)槔枚嗪丝梢越档晚憫?yīng)時(shí)間家凯。
2如失、創(chuàng)建線程幾種方式
繼承Thread類(lèi):Thread是類(lèi),有單繼承的局限性掂之。
實(shí)現(xiàn)Runnable接口:任務(wù)和線程分開(kāi)脆丁,不能返回執(zhí)行結(jié)果。
實(shí)現(xiàn)Callable接口:利用FutureTask執(zhí)行任務(wù)跟压,能取到執(zhí)行結(jié)果歼培。
package com.top.test.threads;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
public class ThreadPoolTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
Thread a = new A();
Thread b = new Thread(new B());
FutureTask<String> futureTask = new FutureTask<>(new C());
Thread c = new Thread(futureTask);
a.start();
b.start();
c.start();
System.out.println(futureTask.get());
}
}
class A extends Thread {
@Override
public void run() {
System.out.println("繼承Thread的線程任務(wù)");
}
}
class B implements Runnable {
@Override
public void run() {
System.out.println("實(shí)現(xiàn)Runnable的線程任務(wù)");
}
}
class C implements Callable<String> {
@Override
public String call() throws Exception {
return "實(shí)現(xiàn)Callable的線程任務(wù)";
}
}
但是我們工作中一般不這樣來(lái)創(chuàng)建線程躲庄。原因:雖然在 Java 語(yǔ)言中創(chuàng)建線程看上去就像創(chuàng)建一個(gè)對(duì)象一樣簡(jiǎn)單,只需要 new Thread() 就可以了笋庄,但實(shí)際上創(chuàng)建線程遠(yuǎn)不是創(chuàng)建一個(gè)對(duì)象那么簡(jiǎn)單。創(chuàng)建對(duì)象菌仁,僅僅是在 JVM 的堆里分配一塊內(nèi)存而已哆键;而創(chuàng)建一個(gè)線程,卻需要調(diào)用操作系統(tǒng)內(nèi)核的 API闪盔,然后操作系統(tǒng)要為線程分配一系列的資源辱士,這個(gè)成本就很高了,所以線程是一個(gè)重量級(jí)的對(duì)象异赫,應(yīng)該避免頻繁創(chuàng)建和銷(xiāo)毀头岔。
那應(yīng)該怎樣創(chuàng)建線程呢?你應(yīng)該立刻想到了用線程池靠抑。利用線程池把資源池化适掰,使得線程資源能服用,可以避免頻繁地創(chuàng)建和銷(xiāo)毀载城。
3费就、生產(chǎn)者-消費(fèi)者模式
線程池的設(shè)計(jì)采用了生產(chǎn)者-消費(fèi)者模式。線程池的使用方是生產(chǎn)者垦搬,線程池本身是消費(fèi)者艳汽,阻塞隊(duì)列來(lái)存儲(chǔ)要處理的任務(wù)对雪。簡(jiǎn)單畫(huà)了個(gè)圖:
4、Java中的線程池
從jdk1.5版本開(kāi)始馋艺,在java.uitl.concurrent包下面定義定義了一些列與并發(fā)相關(guān)的類(lèi),其中線程池最核心的一個(gè)類(lèi)是ThreadPoolExecutor碱鳞。
查看ThreadPoolExecutor的源碼踱蛀,看下基本的繼承關(guān)系:
public class ThreadPoolExecutor extends AbstractExecutorService {
…
}
public abstract class AbstractExecutorService implements ExecutorService {
…
}
public interface ExecutorService extends Executor {
…
}
public interface Executor {
void execute(Runnable command);
}
我們可以看出率拒,Executor接口中定義了execute方法,execute是用來(lái)執(zhí)行我們提交的任務(wù)的猬膨。
但是類(lèi)ThreadPoolExecutor源碼注釋中,是推薦我們使用類(lèi)Executors的工程方法來(lái)創(chuàng)建線程池的:
* <p>To be useful across a wide range of contexts, this class
* provides many adjustable parameters and extensibility
* hooks. However, programmers are urged to use the more convenient
* {@link Executors} factory methods {@link
* Executors#newCachedThreadPool} (unbounded thread pool, with
* automatic thread reclamation), {@link Executors#newFixedThreadPool}
* (fixed size thread pool) and {@link
* Executors#newSingleThreadExecutor} (single background thread), that
* preconfigure settings for the most common usage
* scenarios.
看下源碼勃痴,Executors提供四種線程池,分別為:
- newCachedThreadPool創(chuàng)建一個(gè)可緩存線程池劣领,如果線程池長(zhǎng)度超過(guò)處理需要污它,可靈活回收空閑線程,若無(wú)可回收德澈,則新建線程固惯。
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
- newFixedThreadPool 創(chuàng)建一個(gè)定長(zhǎng)線程池,可控制線程最大并發(fā)數(shù)镇辉,超出的線程會(huì)在隊(duì)列中等待贴捡。
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
- newScheduledThreadPool 創(chuàng)建一個(gè)定長(zhǎng)線程池,支持定時(shí)及周期性任務(wù)執(zhí)行屹逛。
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
- newSingleThreadExecutor 創(chuàng)建一個(gè)單線程化的線程池,它只會(huì)用唯一的工作線程來(lái)執(zhí)行任務(wù)罕模,保證所有任務(wù)按照指定順序(FIFO, LIFO, 優(yōu)先級(jí))執(zhí)行。
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
對(duì)于上面創(chuàng)建四種線程池蒿讥,我這里就不一一寫(xiě)例子了抛腕,網(wǎng)上多的是例子。
既然JDK提供了這么好的工具類(lèi)侥钳,我們是不是就肯定選擇它呢柄错?并不是,在阿里開(kāi)發(fā)手冊(cè)中有這樣一條:
需要阿里開(kāi)發(fā)手冊(cè)的同學(xué)给猾,可在公眾號(hào)“Java尖子生”颂跨,回復(fù)“alibaba”領(lǐng)取。
看來(lái)池颈,最終都有可能導(dǎo)致OOM钓丰,而 OOM 會(huì)導(dǎo)致所有請(qǐng)求都無(wú)法處理,這是致命問(wèn)題琢歇。所以強(qiáng)烈建議使用有界隊(duì)列并執(zhí)行最大線程數(shù)梦鉴。
5、自定義線程池ThreadPoolExecutor
從上面的源碼魄宏,我們也看到的四種線程池的最終方式也是調(diào)用的ThreadPoolExecutor的構(gòu)造方法:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
ThreadPoolExecutor 的構(gòu)造函數(shù)非常復(fù)雜存筏,現(xiàn)在我們自己通過(guò)ThreadPoolExecutor來(lái)創(chuàng)建線程池塔次,那么我們就有必要詳細(xì)了解下ThreadPoolExecutor的構(gòu)造函數(shù)的每一個(gè)參數(shù)。
5.1 corePoolSize
核心池(核心線程數(shù))的大小藕溅。在創(chuàng)建了線程池后,默認(rèn)情況下汁掠,線程池中并沒(méi)有任何線程集币,而是等待有任務(wù)到來(lái)才創(chuàng)建線程去執(zhí)行任務(wù),除非調(diào)用了prestartAllCoreThreads()或者prestartCoreThread()方法乞榨,從這2個(gè)方法的名字就可以看出当娱,是預(yù)創(chuàng)建線程的意思,即在沒(méi)有任務(wù)到來(lái)之前就創(chuàng)建corePoolSize個(gè)線程或者一個(gè)線程跨细。默認(rèn)情況下,在創(chuàng)建了線程池后震叙,線程池中的線程數(shù)為0散休,當(dāng)有任務(wù)來(lái)之后,就會(huì)創(chuàng)建一個(gè)線程去執(zhí)行任務(wù)匣砖,當(dāng)線程池中的線程數(shù)目達(dá)到corePoolSize后昏滴,就會(huì)把到達(dá)的任務(wù)放到緩存隊(duì)列(阻塞隊(duì)列)當(dāng)中。
核心線程數(shù)的選擇很重要拂共,那創(chuàng)建多少個(gè)線程是合理的呢姻几?
對(duì)于 CPU 密集型的計(jì)算場(chǎng)景势告,理論上“線程的數(shù)量 =CPU 核數(shù)”就是最合適的咱台。不過(guò)在工程上俭驮,線程的數(shù)量一般會(huì)設(shè)置為“CPU 核數(shù) +1”,這樣的話遗遵,當(dāng)線程因?yàn)榕紶柕膬?nèi)存頁(yè)失效或其他原因?qū)е伦枞麜r(shí)逸嘀,這個(gè)額外的線程可以頂上,從而保證 CPU 的利用率崭倘。
對(duì)于 I/O 密集型計(jì)算場(chǎng)景,最佳的線程數(shù)是與程序中 CPU 計(jì)算和 I/O 操作的耗時(shí)比相關(guān)的登澜,我們可以總結(jié)出這樣一個(gè)公式【理論值】:
最佳線程數(shù) =CPU 核數(shù) * [ 1 +(I/O 耗時(shí) / CPU 耗時(shí))]
但實(shí)際【經(jīng)驗(yàn)值】告訴我們應(yīng)該為:2 * CPU 的核數(shù) + 1
I/O 耗時(shí)和 CPU 耗時(shí)的比值是一個(gè)關(guān)鍵參數(shù)飘庄,不幸的是這個(gè)參數(shù)是未知的,而且是動(dòng)態(tài)變化的谴仙,所以工程上碾盐,我們要估算這個(gè)參數(shù),然后做各種不同場(chǎng)景下的壓測(cè)來(lái)驗(yàn)證我們的估計(jì)掀虎。不過(guò)工程上付枫,原則還是將硬件的性能發(fā)揮到極致,所以壓測(cè)時(shí)二打,我們需要重點(diǎn)關(guān)注 CPU掂榔、I/O 設(shè)備的利用率和性能指標(biāo)(響應(yīng)時(shí)間症杏、吞吐量)之間的關(guān)系瑞信。
工作中都是按照邏輯核數(shù)來(lái)的,理論值和經(jīng)驗(yàn)值只是提供個(gè)指導(dǎo)走芋,實(shí)際上還是要靠壓測(cè)E琐辍8嚷亍状植!
5.2 maximumPoolSize
線程池最大線程數(shù),表示在線程池中最多能創(chuàng)建多少個(gè)線程振定。當(dāng)阻塞隊(duì)列裝滿時(shí)肉拓,繼續(xù)提交任務(wù),會(huì)創(chuàng)建救急(非核心)線程來(lái)處理卑惜。注意: 不是先前創(chuàng)建的線程是核心線程驻售,后面創(chuàng)建的線程是非核心線程,線程是沒(méi)有核心非核心的概念的欺栗。
5.3 不同場(chǎng)景下提交任務(wù),線程池的表現(xiàn)形式
【其中workerThread表示工作線程數(shù)】消请。
當(dāng)workerThread等于0時(shí)瘤旨,提交任務(wù),創(chuàng)建線程
當(dāng)workerThread小于corePoolSize時(shí)因宇,提交任務(wù),創(chuàng)建線程察滑,不管其他工作線程是不是閑置的。
當(dāng)workerThread大于等于corePoolSize 且 workerThread小于maxinumPoolsize時(shí)户盯,將任務(wù)添加到隊(duì)列中饲化,當(dāng)隊(duì)列滿了后,創(chuàng)建線程硫眨。當(dāng)一個(gè)線程完成任務(wù)時(shí)巢块,它會(huì)從隊(duì)列中取下一個(gè)任務(wù)來(lái)執(zhí)行。
當(dāng)workerThread等于maxinumPoolsize時(shí)姥闭,提交任務(wù),既不能加入隊(duì)列,也不能創(chuàng)建新的線程越走,將RejectedExecutionHandler的rejectedExecution方法執(zhí)行拒絕策略。
下面是ThreadPoolExecutor 的execute()方法的源碼及注釋:
/**
* Executes the given task sometime in the future. The task
* may execute in a new thread or in an existing pooled thread.
*
* If the task cannot be submitted for execution, either because this
* executor has been shutdown or because its capacity has been reached,
* the task is handled by the current {@code RejectedExecutionHandler}.
*
* @param command the task to execute
* @throws RejectedExecutionException at discretion of
* {@code RejectedExecutionHandler}, if the task
* cannot be accepted for execution
* @throws NullPointerException if {@code command} is null
*/
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
/**
* Invokes the rejected execution handler for the given command.
* Package-protected for use by ScheduledThreadPoolExecutor.
*/
final void reject(Runnable command) {
handler.rejectedExecution(command, this);
}
5.4 keepAliveTime和unit
keepAliveTime表示線程沒(méi)有任務(wù)執(zhí)行時(shí)最多保持多久時(shí)間會(huì)終止。默認(rèn)情況下庭敦,只有當(dāng)線程池中的線程數(shù)大于corePoolSize時(shí),keepAliveTime才會(huì)起作用伞广,直到線程池中的線程數(shù)不大于corePoolSize疼电,即當(dāng)線程池中的線程數(shù)大于corePoolSize時(shí),如果一個(gè)線程空閑的時(shí)間達(dá)到keepAliveTime蔽豺,則會(huì)終止,直到線程池中的線程數(shù)不超過(guò)corePoolSize沧侥。但是如果調(diào)用了allowCoreThreadTimeOut(boolean)方法,在線程池中的線程數(shù)不大于corePoolSize時(shí)癣朗,keepAliveTime參數(shù)也會(huì)起作用旺罢,直到線程池中的線程數(shù)為0;當(dāng)一個(gè)線程無(wú)事可做正卧,超過(guò)一定的時(shí)間(keepAliveTime)時(shí)跪解,線程池會(huì)判斷,如果當(dāng)前運(yùn)行的線程數(shù)大于 corePoolSize惠遏,那么這個(gè)線程就被停掉节吮。所以線程池的所有任務(wù)完成后判耕,它最終會(huì)收縮到 corePoolSize 的大小。
unit:參數(shù)keepAliveTime的時(shí)間單位帚豪,有7種取值草丧,在TimeUnit類(lèi)中有7種靜態(tài)屬性:
TimeUnit.DAYS; //天
TimeUnit.HOURS; //小時(shí)
TimeUnit.MINUTES; //分鐘
TimeUnit.SECONDS; //秒
TimeUnit.MILLISECONDS; //毫秒
TimeUnit.MICROSECONDS; //微妙
TimeUnit.NANOSECONDS; //納秒
5.5 workQueue
一個(gè)阻塞隊(duì)列,用來(lái)存儲(chǔ)等待執(zhí)行的任務(wù)烛亦,這個(gè)參數(shù)的選擇也很重要,會(huì)對(duì)線程池的運(yùn)行過(guò)程產(chǎn)生重大影響煤禽,一般來(lái)說(shuō)岖赋,這里的阻塞隊(duì)列有以下幾種選擇:
ArrayBlockingQueue;
LinkedBlockingQueue;
SynchronousQueue;
線程池的排隊(duì)策略與BlockingQueue有關(guān)。
關(guān)于隊(duì)列选脊、阻塞隊(duì)列,后面我會(huì)寫(xiě)一篇文章祈争。
5.6 threadFactory
通過(guò)這個(gè)參數(shù)你可以自定義如何創(chuàng)建線程角寸,例如你可以給線程指定一個(gè)有意義的名字。
給線程賦予一個(gè)有意義的名字很重要沮峡,我這里整理了幾種方法:
方法一:自己實(shí)現(xiàn)ThreadFactory并制定給線程池,在實(shí)現(xiàn)的ThreadFactory中設(shè)定計(jì)數(shù)和調(diào)用Thread.setName邢疙。
方法二:guava的ThreadFactoryBuilder.setNameFormat可以指定一個(gè)前綴望薄,使用%d表示序號(hào);例如:
ThreadFactory myThreadFactory =(new ThreadFactoryBuilder()).setNameFormat("my-pool-thread-%d").build();
ThreadFactory是JDK自帶的颁虐,下面我們?cè)敿?xì)看下如何用ThreadFactory來(lái)實(shí)現(xiàn)卧须。
ThreadFactory是一個(gè)只有一個(gè)newThread方法的接口:
public interface ThreadFactory {
Thread newThread(Runnable r);
}
我們利用這個(gè)方法我們可以做什么呢?可以給線程命名笋籽,查看線程數(shù)椭员,指定是否守護(hù)線程,設(shè)置線程優(yōu)先級(jí)等等隘击。
下面是我寫(xiě)的完整例子,你可以拿這個(gè)例子做其他場(chǎng)景的測(cè)試(對(duì)shutdown等方法不太懂的竭贩,往下看):
package com.top.test.threads;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
public class TestThreadFactory {
public static void main(String[] args) throws InterruptedException {
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(8, 100,
0L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(100),
new CustomThreadFactory(false, 1), new ThreadPoolExecutor.AbortPolicy());
System.out.println("開(kāi)始往線程池中提交任務(wù)");
// 可以把10改成其他值來(lái)測(cè)試其他場(chǎng)景哦-_-
for (int i = 0; i < 10; i++) {
threadPool.submit(() -> {
// 可以在此睡眠一下
/*try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}*/
System.out.println("我是子線程:" + Thread.currentThread().getName());
});
}
System.out.println("一共往線程池提交了10個(gè)任務(wù)");
System.out.println("線程池是否已關(guān)閉:" + threadPool.isShutdown());
// 發(fā)起關(guān)閉線程池
threadPool.shutdown();
System.out.println("線程池是否已關(guān)閉:" + threadPool.isShutdown());
System.out.println("線程池是否已終止:" + threadPool.isTerminated());
threadPool.awaitTermination(1, TimeUnit.SECONDS);
if (threadPool.isTerminated()) {
System.out.println("線程池是否已終止:" + threadPool.isTerminated());
} else {
System.out.println("線程池未終止(超時(shí))就退出");
}
}
}
class CustomThreadFactory implements ThreadFactory {
// 原子類(lèi) CAS 保證線程安全
private AtomicInteger threadCount = new AtomicInteger();
private boolean isDaemon;
private int priority;
public CustomThreadFactory(boolean isDaemon, int priority) {
this.isDaemon = isDaemon;
this.priority = priority;
}
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("My_Custom_Thread_" + threadCount.incrementAndGet());
thread.setDaemon(isDaemon);
thread.setPriority(priority);
return thread;
}
}
執(zhí)行結(jié)果:
Connected to the target VM, address: '127.0.0.1:53514', transport: 'socket'
開(kāi)始往線程池中提交任務(wù)
一共往線程池提交了10個(gè)任務(wù)
線程池是否已關(guān)閉:false
線程池是否已關(guān)閉:true
線程池是否已終止:false
我是子線程:My_Custom_Thread_1
我是子線程:My_Custom_Thread_1
我是子線程:My_Custom_Thread_1
我是子線程:My_Custom_Thread_2
我是子線程:My_Custom_Thread_3
我是子線程:My_Custom_Thread_4
我是子線程:My_Custom_Thread_5
我是子線程:My_Custom_Thread_6
我是子線程:My_Custom_Thread_7
我是子線程:My_Custom_Thread_8
線程池是否已終止:true
Disconnected from the target VM, address: '127.0.0.1:53514', transport: 'socket'
Process finished with exit code 0
5.6 Handler
通過(guò)這個(gè)參數(shù)你可以自定義任務(wù)的拒絕策略留量。如果線程池中所有的線程都在忙碌楼熄,并且工作隊(duì)列也滿了(前提是工作隊(duì)列是有界隊(duì)列),那么此時(shí)提交任務(wù)可岂,線程池就會(huì)拒絕接收。
至于拒絕的策略稚茅,你可以通過(guò) handler 這個(gè)參數(shù)來(lái)指定平斩。ThreadPoolExecutor 已經(jīng)提供了以下 4 種策略。
- CallerRunsPolicy:提交任務(wù)的線程自己去執(zhí)行該任務(wù)欺税。
- AbortPolicy:默認(rèn)的拒絕策略,會(huì) throws RejectedExecutionException晚凿。
- DiscardPolicy:直接丟棄任務(wù)瘦馍,沒(méi)有任何異常拋出。
- DiscardOldestPolicy:丟棄最老的任務(wù),其實(shí)就是把最早進(jìn)入工作隊(duì)列的任務(wù)丟棄扛吞,然后把新任務(wù)加入到工作隊(duì)列。
注意:使用有界隊(duì)列滥比,當(dāng)任務(wù)過(guò)多時(shí),線程池會(huì)觸發(fā)執(zhí)行拒絕策略濒持,線程池默認(rèn)的拒絕策略會(huì) throw RejectedExecutionException 這是個(gè)運(yùn)行時(shí)異常寺滚,對(duì)于運(yùn)行時(shí)異常編譯器并不強(qiáng)制 catch 它,所以開(kāi)發(fā)人員很容易忽略官套。因此默認(rèn)拒絕策略要慎重使用。如果線程池處理的任務(wù)非常重要奶赔,建議自定義自己的拒絕策略;并且在實(shí)際工作中另伍,自定義的拒絕策略往往和降級(jí)策略配合使用。下面我給出一個(gè)完整的示例代碼:
public static class CallerRunsEnhancedPolicy implements RejectedExecutionHandler {
public CallerRunsEnhancedPolicy() {}
/**
* Executes task r in the caller's thread, unless the executor has been shut down, in which case the task is
* discarded.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
// 可以在此加一條日志
if (!e.isShutdown()) {
r.run();
}
}
}
public static class AbortEnhancedPolicy implements RejectedExecutionHandler {
public AbortEnhancedPolicy() {}
/**
* Always throws RejectedExecutionException.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
* @throws RejectedExecutionException always.
*/
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
// 可以在此加一條日志
throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString());
}
}
public static class RetryPolicy implements RejectedExecutionHandler {
public RetryPolicy() {}
/**
* 提醒并進(jìn)入后補(bǔ)線程池排隊(duì)執(zhí)行
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
// 可以在此加一條日志
if (!e.isShutdown()) {
// 可以在此加一條日志
try {
// ThreadPool.getAlternateExecutor()返回的是一個(gè)線程池摆尝,這個(gè)線程池應(yīng)該是個(gè)全局的線程池玻靡,最好只初始化一次,保證其線程的安全性(可以用單例模式來(lái)初始化)
ThreadPool.getAlternateExecutor().execute(r);
} catch (Exception ex) {
// 可以在此加一條日志
throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString()
+ "; failure retry execute task in " + ThreadPool.getAlternateExecutor().toString());
}
}
}
}
public static class WaitPolicy implements RejectedExecutionHandler {
public WaitPolicy() {}
/**
* 提醒并等待囤捻,直到加入線程池隊(duì)列中
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
// 可以在此加一條日志
if (!e.isShutdown()) {
// 可以在此加一條日志
try {
e.getQueue().put(r);
// 可以在此加一條日志
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
// 可以在此加一條日志
throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString()
+ "; waiting put task to pool interrupted");
} catch (Exception ex) {
// 可以在此加一條日志
throw new RejectedExecutionException(
"Task " + r.toString() + " rejected from " + e.toString() + "; failure put task to pool");
}
}
}
}
下面是ThreadPool的定義:
public final class ThreadPool {
public static final int LOGICAL_CPU_CORE_COUNT = Runtime.getRuntime().availableProcessors();
public static final int MAX_POOLSIZE_THRESHOLD = 2000;
private static Object alternateExecutorLock = new Object();
private static volatile ListenableThreadPoolExecutor alternateExecutor = null;
public static ThreadPoolExecutor getAlternateExecutor() {
synchronized (alternateExecutorLock) {
if (alternateExecutor == null) {
initAlternateExecutor();
} else if (alternateExecutor.isShutdown()) {
boolean term = false;
while (!term) {
try {
term = alternateExecutor.awaitTermination(1, TimeUnit.SECONDS);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
}
initAlternateExecutor();
}
return alternateExecutor;
}
}
private static void initAlternateExecutor() {
int threadCount =
getRecommendedThreadPoolSizeByTaskIntensiveType(TaskIntensiveType.CPU, MAX_POOLSIZE_THRESHOLD);
int maxThreadCount = getMaxThreadPoolSizeByTaskIntensiveType(TaskIntensiveType.CPU, MAX_POOLSIZE_THRESHOLD);
alternateExecutor = newThreadPool(threadCount, maxThreadCount, 60L, maxThreadCount * 3,
new WaitPolicy(), (new ThreadFactoryBuilder()).setNameFormat("alternate-pool-thread-%d").build());
alternateExecutor.allowCoreThreadTimeOut(true);
}
public static int getRecommendedThreadPoolSizeByTaskIntensiveType(TaskIntensiveType taskIntensive,
int poolSizeThreshold) {
int poolSize;
switch (taskIntensive) {
case IO:
poolSize = 2 * LOGICAL_CPU_CORE_COUNT + 1; // NOSONAR
break;
case CPU:
default:
poolSize = LOGICAL_CPU_CORE_COUNT + 1;
break;
}
if (poolSize > poolSizeThreshold) {
return poolSizeThreshold;
}
return poolSize;
}
public static int getMaxThreadPoolSizeByTaskIntensiveType(TaskIntensiveType taskIntensive, int poolSizeThreshold) {
int poolSize;
switch (taskIntensive) {
case IO:
poolSize = 2 * LOGICAL_CPU_CORE_COUNT * 5 + 1; // NOSONAR
break;
case CPU:
default:
poolSize = LOGICAL_CPU_CORE_COUNT * 5 + 1;
break;
}
if (poolSize > poolSizeThreshold) {
return poolSizeThreshold;
}
return poolSize;
}
public enum TaskIntensiveType {
IO(0), CPU(1);
private int value;
private TaskIntensiveType(int value) {
this.value = value;
}
public static TaskIntensiveType valueOf(int value) {
for (TaskIntensiveType oneVal : TaskIntensiveType.values()) {
if (oneVal.value == value) {
return oneVal;
}
}
return null;
}
public int getValue() {
return value;
}
}
6视哑、ThreadPoolExecutor常見(jiàn)方法
6.1 execute()
方法實(shí)際上是Executor中聲明的方法誊涯,在ThreadPoolExecutor進(jìn)行了具體的實(shí)現(xiàn),這個(gè)方法是ThreadPoolExecutor的核心方法跪呈,通過(guò)這個(gè)方法可以向線程池提交一個(gè)任務(wù)取逾,交由線程池去執(zhí)行。
6.2 submit()
方法是在ExecutorService中聲明的方法误阻,在AbstractExecutorService就已經(jīng)有了具體的實(shí)現(xiàn)旭愧,在ThreadPoolExecutor中并沒(méi)有對(duì)其進(jìn)行重寫(xiě),這個(gè)方法也是用來(lái)向線程池提交任務(wù)的儒洛,但是它和execute()方法不同,它能夠返回任務(wù)執(zhí)行的結(jié)果琅锻,去看submit()方法的實(shí)現(xiàn)唐含,會(huì)發(fā)現(xiàn)它實(shí)際上還是調(diào)用的execute()方法,只不過(guò)它利用了Future來(lái)獲取任務(wù)執(zhí)行結(jié)果沫浆。submit可以提交Runnable和 Callable。
6.3 6.3 shutdown()
將線程池狀態(tài)置為SHUTDOWN淮捆。平滑的關(guān)閉ExecutorService本股,當(dāng)此方法被調(diào)用時(shí),ExecutorService停止接收新的任務(wù)并且等待已經(jīng)提交的任務(wù)(包含提交正在執(zhí)行和提交未執(zhí)行)執(zhí)行完成拄显。當(dāng)所有提交任務(wù)執(zhí)行完畢,線程池即被關(guān)閉棘街。所以手動(dòng)調(diào)用shotdown方法,可以不必?fù)?dān)心存在剩余任務(wù)沒(méi)有執(zhí)行的情況承边。
6.4 shutdownNow()
將線程池狀態(tài)置為STOP。跟shutdown()一樣博助,先停止接收外部提交的任務(wù),忽略隊(duì)列里等待的任務(wù)富岳,嘗試將正在跑的任務(wù)interrupt中斷,返回未執(zhí)行的任務(wù)列表蚁飒。
對(duì)于那些正在執(zhí)行的task脖镀,并不能保證他們就一定會(huì)直接停止執(zhí)行狼电,或許他們會(huì)暫停,或許會(huì)執(zhí)行直到完成强窖,但是ExecutorService會(huì)盡力關(guān)閉所有正在運(yùn)行的task。
6.5 awaitTermination(long timeout, TimeUnit unit)
awaitTermination方法接收timeout和TimeUnit兩個(gè)參數(shù)翅溺,用于設(shè)定超時(shí)時(shí)間及單位。當(dāng)?shù)却^(guò)設(shè)定時(shí)間時(shí)咙崎,會(huì)監(jiān)測(cè)ExecutorService是否已經(jīng)關(guān)閉,若關(guān)閉則返回true褪猛,否則返回false。一般情況下會(huì)和shutdown方法組合使用碳却。
第一個(gè)參數(shù)指定的是時(shí)間,第二個(gè)參數(shù)指定的是時(shí)間單位(當(dāng)前是秒)昼浦。返回值類(lèi)型為boolean型筒主。
如果等待的時(shí)間超過(guò)指定的時(shí)間,但是線程池中的線程運(yùn)行完畢物舒,那么awaitTermination()返回true。執(zhí)行分線程已結(jié)束火诸。
如果等待的時(shí)間超過(guò)指定的時(shí)間,但是線程池中的線程未運(yùn)行完畢置蜀,那么awaitTermination()返回false悉盆。不執(zhí)行分線程已結(jié)束。
如果等待時(shí)間沒(méi)有超過(guò)指定時(shí)間焕盟,等待!
可以用awaitTermination()方法來(lái)判斷線程池中是否有繼續(xù)運(yùn)行的線程灼卢。
下面是ThreadPoolExecutor的awaitTermination源碼:
/**
* Wait condition to support awaitTermination
*/
private final Condition termination = mainLock.newCondition();
public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
// 等待時(shí)長(zhǎng)
long nanos = unit.toNanos(timeout);
//加鎖
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (;;) {
if (runStateAtLeast(ctl.get(), TERMINATED))
return true;
if (nanos <= 0)
return false;
// condition條件等待
nanos = termination.awaitNanos(nanos);
}
} finally {
mainLock.unlock();
}
}
6.6 其它方法
isShutdown():線程池是否已關(guān)閉
isTerminated:線程池是否已終止
7来农、線程池是否需要關(guān)閉呢?
這里說(shuō)下兩種情況下的線程池:全局線程池沃于、局部線程池海诲。
全局線程池:其實(shí)我們工作中使用線程池都是全局的特幔,當(dāng)然我們也可能為不同業(yè)務(wù)建立不同的全局線程池。那全局線程池是否需要關(guān)閉呢敬辣?分兩種場(chǎng)景:(一)如果每次在程序的結(jié)尾都去關(guān)閉線程池零院,那每次有新的任務(wù)進(jìn)來(lái),都要重新建立一個(gè)線程池告抄,這樣難免也太消耗資源了,而且原本這就是一個(gè)全局的龄糊。(二)如果一個(gè)線程池在某段時(shí)間內(nèi)處理了大量的任務(wù),創(chuàng)建了大量的線程炫惩,這個(gè)時(shí)間段之后可能一下子也沒(méi)有新的任務(wù)進(jìn)來(lái)阿浓,那這個(gè)線程池需要關(guān)閉嘛 ?我個(gè)人理解也是沒(méi)必要的芭毙,我們通過(guò)參數(shù)keepAliveTime可以給線程設(shè)置在沒(méi)有任務(wù)處理時(shí)的生存時(shí)間,并且調(diào)用allowCoreThreadTimeOut(boolean)方法粘咖,這樣充分減少資源的浪費(fèi)。
局部線程池:這種線程池工作中也該盡量避免使用瓮下。如果每個(gè)請(qǐng)求進(jìn)來(lái)都創(chuàng)建一個(gè)新的線程池钝域,當(dāng)請(qǐng)求多的時(shí)候,線程池增加网梢,那難免會(huì)有OOM赂毯。如果你真的使用了拣宰,那請(qǐng)記住一定要調(diào)用shutdown()來(lái)關(guān)閉烦感。