1. new Thread的弊端
- 每次都使用new Thread()性能很差。
- 線程缺乏統(tǒng)一管理浙炼。如線程數(shù)的管理弯屈。
2. 線程池
一種線程使用模式资厉。線程池維護著多個線程湘捎,等待著監(jiān)督管理者分配可并發(fā)執(zhí)行的任務(wù)消痛。這避免了在處理短時間任務(wù)時創(chuàng)建與銷毀線程的代價秩伞。
一般來說纱新,對于有N個CPU的主機(或N個核心)脸爱,線程池大小應(yīng)如下設(shè)置:
- 如果是CPU密集型應(yīng)用,線程池大小為N+1族檬。
- 如果是IO密集型應(yīng)用单料,線程池大小為2N+1扫尖。
3. 線程池的優(yōu)勢
- 重用存在的線程,省去線程的創(chuàng)建銷毀過程沉颂,性能佳。
- 有效控制最大并發(fā)線程數(shù)塞关。提高了使用率并避免了競爭帆赢。
- 定時執(zhí)行怠益,定期執(zhí)行蜻牢,單線程抢呆,并發(fā)控制等功能。
4. Executors
Java通過Executors類提供四種線程池恳邀。創(chuàng)建方法為靜態(tài)方式創(chuàng)建谣沸。
4.1. ExecutorService
繼承了Executor類,在其基礎(chǔ)上進行具體的擴展秉版。
4.2. ThreadPoolExecutor
ThreadPoolExecutor是ExecutorService類的子樹上的類,是ExecutorService類提供的四個主要線程池方法的實現(xiàn)類滚停,其完整構(gòu)造器包括以下參數(shù):
- corePoolSize:線程池中核心線程數(shù)的最大數(shù)值。核心線程:線程池新建線程的時候,如果當(dāng)前線程總數(shù)小于corePoolSize问词,則新建的是核心線程激挪,如果超過corePoolSize,則新建的是非核心線程锋喜。核心線程默認情況下會一直存活在線程池中嘿般,即使這個核心線程閑置炉奴。如果指定ThreadPoolExecutor的allowCoreThreadTimeOut這個屬性為true,那么核心線程如果不干活(閑置狀態(tài))的話砸逊,超過一定時間(時長下面參數(shù)決定)师逸,就會被銷毀掉。
- maximumPoolSize:池中允許的最大線程總數(shù)员辩〉旎縣城總數(shù)=核心線程+非核心線程。非核心線程在閑置時會被銷毀弃甥。
- keepAliveTime:非核心線程閑置超時時長淆攻,若allowCoreThreadTimeOut這個屬性為true,核心線程也會被影響伞芹。
- unit:keepAliveTime參數(shù)的時間單位唱较。
- workQueue:執(zhí)行前用于保持任務(wù)的隊列南缓。此隊列僅保持由 execute方法提交的 Runnable任務(wù)收捣,可在Java容器:Stack童漩,Queue侧馅,PriorityQueue和BlockingQueue一文中查詢济欢。
- threadFactory:執(zhí)行程序創(chuàng)建新線程時使用的工廠,可用于定義創(chuàng)建現(xiàn)成的方式,一般無用蔚舀。
- handler:由于超出線程范圍和隊列容量而使執(zhí)行被阻塞時所使用的處理程序缅叠,存在默認方案鸥鹉。
(繼承關(guān)系Executor-ExecutorService-AbstractExecutorService-ThreadPoolExecutor)
而接下來介紹的幾種方法,其實即是預(yù)定義的ThreadPoolExecutor喇聊。
4.3. CachedThreadPool
創(chuàng)建一個可緩存線程池邻遏,線程池長度超過處理需要時垂寥,可靈活回收空閑線程律杠,若無可回收線程則新建線程讼撒。
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
for (int i = 0; i < 10; i++) {
final int index = i;
try {
Thread.sleep(index * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
cachedThreadPool.execute(new Runnable() {
@Override
public void run() {
System.out.println(index);
}
});
}
其實現(xiàn)代碼如下:
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
可見,該方法中所有線程均由SynchronousQueue管理,且不設(shè)置線程數(shù)量上限担映。對于SynchronousQueue朋魔,每個插入線程必須等待另一線程的對應(yīng)移除操作。(即該隊列沒有容量哨苛,僅試圖取得元素時元素才存在)因而祝懂,該方法實現(xiàn)了隔躲,如果有線程空閑组力,則使用空閑線程進行操作家夺,否則就會創(chuàng)建新線程日川。
4.4. FixedThreadPool
創(chuàng)建一個定長線程池傀蓉,可以控制最大并發(fā)數(shù),超出的線程會在隊列中等待。
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3);
for (int i = 0; i < 10; i++) {
final int index = i;
fixedThreadPool.execute(new Runnable() {
@Override
public void run() {
try {
System.out.println(index);
Thread.sleep(2000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
});
}
其實現(xiàn)代碼如下:
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
可見該方法讓keepAliveTime為0,即限制了線程數(shù)必須小于等于corePoolSize。而多出的線程則會被無界隊列所存儲,在其中排隊。
4.5. ScheduledThreadPool
創(chuàng)建一個定長線程池窗看,相對于FixedThreadPool,它支持周期性執(zhí)行和延期執(zhí)行。
延期3秒執(zhí)行
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);
scheduledThreadPool.schedule(new Runnable() {
@Override
public void run() {
System.out.println("delay 3 seconds");
}
}, 3, TimeUnit.SECONDS);
每三秒隔一秒執(zhí)行
scheduledThreadPool.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
System.out.println("delay 1 seconds, and excute every 3 seconds");
}
}, 1, 3, TimeUnit.SECONDS);
和FixedThreadPool的最大不同是页滚,它采用一個DelayedWorkQueue去控制線程,該隊列僅有到期時才能取出元素。
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
4.6. SingleThreadExecuter
創(chuàng)建一個單線程線程池讥脐,只會用唯一的工作線程執(zhí)行任務(wù)枪蘑,保證所有任務(wù)按FIFO赦役,LIFO的優(yōu)先級執(zhí)行。
ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
for (int i = 0; i < 10; i++) {
final int index = i;
singleThreadExecutor.execute(new Runnable() {
@Override
public void run() {
try {
System.out.println(index);
Thread.sleep(2000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
});
}
在實現(xiàn)上,其相當(dāng)于一個線程數(shù)為1的FixedThreadPool
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
4.7. ForkJoinPool
ForkJoinPool是JDK7中引用的特殊的新的線程池,其核心思想類似于MapReduce误窖,將大任務(wù)拆分成多個小任務(wù)(fork)丙唧,再將多個小任務(wù)匯集到結(jié)果上(join)北苟,同時妆档,他通過繼承了AbstractExecutorService
獲得了基礎(chǔ)的線程池功能,可以像普通線程池一樣配置。其工作模式如圖:
普通的線程池中每個任務(wù)都由單獨的線程處理,如果出現(xiàn)一個耗時比較大的任務(wù)糜俗,可能出現(xiàn)線程池中只有一個線程在進行這個任務(wù)啤挎,其他線程卻空閑著伙判,所謂“一核有難,八核圍觀”,造成了CPU負載不均衡邮利。ForkJoinPool為解決這種問題提出方庭,在ForkJoinPool中龄减,引入了工作竊取算法希停,其核心思想為:
- 每個線程有自己的工作隊列(WorkQueue)宠能,該隊列是一個雙向鏈表(Java的WorkQueue結(jié)構(gòu)中用ArrayList實現(xiàn))
- 隊列所有者線程可以調(diào)用雙鏈表的push/pop(取頭取尾根據(jù)模式?jīng)Q定)方法,其他線程可以調(diào)用該隊列poll(取尾取頭根據(jù)模式?jīng)Q定)方法羞延,push/poll/pop均引入CAS伴箩,為原子操作。
- 劃分的子任務(wù)調(diào)用fork時彤恶,會把任務(wù)push到自己的隊列中。
- 默認情況,工作線程從自己的隊列pop任務(wù)并執(zhí)行虹菲。
- 自己隊列為空损合,線程隨機從另一線程poll任務(wù)并執(zhí)行胳嘲。
隊列結(jié)構(gòu)
/**
* Queues supporting work-stealing as well as external task
* submission. See above for descriptions and algorithms.
*/
public class WorkQueue {
volatile int source; // source queue id, or sentinel
int id; // pool index, mode, tag
int base; // index of next slot for poll
int top; // index of next slot for push
volatile int phase; // versioned, negative: queued, 1: locked
int stackPred; // pool stack (ctl) predecessor link
int nsteals; // number of steals
ForkJoinTask<?>[] array; // the queued tasks; power of 2 size
final ForkJoinPool pool; // the containing pool (may be null)
final ForkJoinWorkerThread owner; // owning thread or null if shared
//......
}
Java8中在Executors里也加入了新增ForkJoinPool的方法径玖,讓它像普通線程池一樣工作匿垄,創(chuàng)建的ForJoinPool任務(wù)是FIFO的滴劲。
public static ExecutorService newWorkStealingPool() {
return new ForkJoinPool
(Runtime.getRuntime().availableProcessors(),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
該方法也可以帶參數(shù),決定parallelism。
此外贾虽,還可以使用ForkJoinPool內(nèi)部已經(jīng)初始化好的commonPool:
ForkJoinPool forkJoinPool = ForkJoinPool.commonPool();
當(dāng)然蟆技,你可以直接調(diào)用構(gòu)造方法來創(chuàng)建ForkJoinPool谐丢,其完整參數(shù)如下右蕊,解釋見注釋贪磺。
public ForkJoinPool(int parallelism,//并行化層數(shù)制妄,默認為可用CPU處理器數(shù)振愿。
ForkJoinWorkerThreadFactory factory,//threadFactory,前文提過毁兆,無視薇组。
UncaughtExceptionHandler handler,//handler,前文提過菩暗,無視。
boolean asyncMode,//控制workQueue工作模式佑稠,若為true讶坯,則任務(wù)FIFO從base取任務(wù),默認為false婉烟,任務(wù)LIFO,從top取任務(wù)昙衅。
int corePoolSize,//核心線程數(shù)而涉,通常和parallelism數(shù)量一致材原。設(shè)置較大可以降低動態(tài)開銷余蟹,如果任務(wù)中經(jīng)常有阻塞,建議設(shè)置為小值兼搏,比如默認值0裳朋。
int maximumPoolSize,//最大線程數(shù)
int minimumRunnable,//允許的最小的不被join操作阻塞的線程數(shù)鲤嫡。默認值為1惕耕。
Predicate<? super ForkJoinPool> saturate,//若不為空則可能創(chuàng)建超過最大線程數(shù)的線程數(shù)。
long keepAliveTime,//非核心線程閑置時長挤安。
TimeUnit unit)//keepAliveTime的單位。
由于構(gòu)造器重載围肥,多個參數(shù)可缺省怨愤。
根據(jù)工作模式不同撰洗,WorkQueue取元素模型如下:
當(dāng)不把ForkJoinPool作為簡單線程池使用時猪勇,使用ForkJoinPool泣刹,需要構(gòu)建ForkJoinTask對象到ForJoinPool中外冀,F(xiàn)orkJoinTask有三個核心方法:
- fork():用于任務(wù)分治,調(diào)用子任務(wù)fork()可以將任務(wù)放到線程池異步調(diào)用脑沿。
- join():調(diào)用子任務(wù)的join()方法等待返回的結(jié)果,不受中斷機制影響措近。join()會拋出異常,若不需要可以使用quietlyJoin()并用getExecption()或getRawResult()自己處理異常和結(jié)果凰浮。
- invoke():在當(dāng)前線程同步執(zhí)行該任務(wù)菜拓,該方法不受中斷機制影響。
ForkJoinTask實現(xiàn)了Future接口贱鄙,內(nèi)部維護四個狀態(tài)并提供查詢API
- isCancelled() => CANCELLED
- isCompletedAbnormally => status < NORMAL => CANCELLED || EXCEPTIONAL
- isCompletedNormally => NORMAL
- isDone() => status<0 => NORMAL || CANCELLED || EXCEPTIONAL
通常情況我們使用ForkJoinTask的兩個子類
- RecursiveAction:沒有返回值的任務(wù)
- RecursiveTask:有返回值的任務(wù)
兩個例子,轉(zhuǎn)載自ForkJoinPool入門篇
使用RecursiveAction
public class RecursiveActionTest {
static class Sorter extends RecursiveAction {
public static void sort(long[] array) {
ForkJoinPool.commonPool().invoke(new Sorter(array, 0, array.length));
}
private final long[] array;
private final int lo, hi;
private Sorter(long[] array, int lo, int hi) {
this.array = array;
this.lo = lo;
this.hi = hi;
}
private static final int THRESHOLD = 1000;
protected void compute() {
// 數(shù)組長度小于1000直接排序
if (hi - lo < THRESHOLD)
Arrays.sort(array, lo, hi);
else {
int mid = (lo + hi) >>> 1;
// 數(shù)組長度大于1000梦湘,將數(shù)組平分為兩份
// 由兩個子任務(wù)進行排序
Sorter left = new Sorter(array, lo, mid);
Sorter right = new Sorter(array, mid, hi);
invokeAll(left, right);
// 排序完成后合并排序結(jié)果
merge(lo, mid, hi);
}
}
private void merge(int lo, int mid, int hi) {
long[] buf = Arrays.copyOfRange(array, lo, mid);
for (int i = 0, j = lo, k = mid; i < buf.length; j++) {
if (k == hi || buf[i] < array[k]) {
array[j] = buf[i++];
} else {
array[j] = array[k++];
}
}
}
}
public static void main(String[] args) {
long[] array = new Random().longs(100_0000).toArray();
Sorter.sort(array);
System.out.println(Arrays.toString(array));
}
}
使用RecurisiveTask
public class RecursiveTaskTest {
static class Sum extends RecursiveTask<Long> {
public static long sum(int[] array) {
return ForkJoinPool.commonPool().invoke(new Sum(array, 0, array.length));
}
private final int[] array;
private final int lo, hi;
private Sum(int[] array, int lo, int hi) {
this.array = array;
this.lo = lo;
this.hi = hi;
}
private static final int THRESHOLD = 600;
@Override
protected Long compute() {
if (hi - lo < THRESHOLD) {
return sumSequentially();
} else {
int middle = (lo + hi) >>> 1;
Sum left = new Sum(array, lo, middle);
Sum right = new Sum(array, middle, hi);
right.fork();
long leftAns = left.compute();
long rightAns = right.join();
// 注意subTask2.fork要在subTask1.compute之前
// 因為這里的subTask1.compute實際上是同步計算的
return leftAns + rightAns;
}
}
private long sumSequentially() {
long sum = 0;
for (int i = lo; i < hi; i++) {
sum += array[i];
}
return sum;
}
}
public static void main(String[] args) {
int[] array = IntStream.rangeClosed(1, 100_0000).toArray();
Long sum = Sum.sum(array);
System.out.println(sum);
}
}