點(diǎn)贊再看,養(yǎng)成習(xí)慣,搜一搜【一角錢(qián)技術(shù)】關(guān)注更多原創(chuàng)技術(shù)文章蝉娜。本文 GitHub org_hejianhui/JavaStudy 已收錄坛善,有我的系列文章。
前言
在介紹線程池之前缓升,我們先回顧下線程的基本知識(shí)吧史。其中線程池包括ThreadPoolExecutor 默認(rèn)線程和ScheduledThreadPoolExecutor 定時(shí)線程池 邮辽,本篇重點(diǎn)介紹ThreadPoolExecutor線程池。
線程
線程是調(diào)度CPU資源的最小單位贸营,線程模型分為KLT模型與ULT模型吨述,JVM使用的是KLT模型,Java線程與OS線程保持 1:1 的映射關(guān)系钞脂,也就是說(shuō)有一個(gè)Java線程也會(huì)在操作系統(tǒng)里有一個(gè)對(duì)應(yīng)的線程揣云。
內(nèi)核線程模型
內(nèi)核線程(KLT):系統(tǒng)內(nèi)核管理線程(KLT),內(nèi)核保存線程的狀態(tài)和上下文信息冰啃,線程阻塞不會(huì)引起進(jìn)程阻塞邓夕。在多處理器系統(tǒng)上,多線程在多處理器上并行運(yùn)行阎毅。線程的創(chuàng)建焚刚、調(diào)度和管理由內(nèi)核完成,效率比ULT要慢扇调,比進(jìn)程操作快矿咕。
用戶線程模型
用戶線程(ULT):用戶程序?qū)崿F(xiàn),不依賴操作系統(tǒng)核心肃拜,應(yīng)用提供創(chuàng)建痴腌、同步雌团、調(diào)度和管理線程的函數(shù)來(lái)控制用戶線程燃领。不需要用戶態(tài)/內(nèi)核態(tài)切換,速度快锦援。內(nèi)核對(duì)ULT無(wú)感知,線程阻塞則進(jìn)程(包括它的所有線程)阻塞。
Java線程生命狀態(tài)
Java線程有多種生命狀態(tài):
- NEW 捎谨,新建
- RUNNABLE ,運(yùn)行
- BLOCKED 区岗,阻塞
- WAITING ,等待
- TIMED_WAITING 毁枯,超時(shí)等待
- TERMINATED慈缔,終結(jié)
狀態(tài)切換如下圖所示:
Java線程實(shí)現(xiàn)方式
Java線程實(shí)現(xiàn)方式主要有四種:
- 繼承Thread類
- 實(shí)現(xiàn)Runnable接口、
- 實(shí)現(xiàn)Callable接口通過(guò)FutureTask包裝器來(lái)創(chuàng)建Thread線程种玛、
- 使用ExecutorService藐鹤、Callable、Future實(shí)現(xiàn)有返回結(jié)果的多線程赂韵。
其中前兩種方式線程執(zhí)行完后都沒(méi)有返回值娱节,后兩種是帶返回值的。
繼承Thread類創(chuàng)建線程
Thread類本質(zhì)上是實(shí)現(xiàn)了Runnable接口的一個(gè)實(shí)例祭示,代表一個(gè)線程的實(shí)例肄满。啟動(dòng)線程的唯一方法就是通過(guò)Thread類的start()實(shí)例方法。start()方法是一個(gè)native方法质涛,它將啟動(dòng)一個(gè)新線程稠歉,并執(zhí)行run()方法。這種方式實(shí)現(xiàn)多線程很簡(jiǎn)單汇陆,通過(guò)自己的類直接extend Thread轧抗,并復(fù)寫(xiě)run()方法,就可以啟動(dòng)新線程并執(zhí)行自己定義的run()方法瞬测。例如:
public class MyThread extends Thread {
public void run() {
System.out.println("關(guān)注一角錢(qián)技術(shù)横媚,獲取Java架構(gòu)資料");
}
}
MyThread myThread1 = new MyThread();
MyThread myThread2 = new MyThread();
myThread1.start();
myThread2.start();
實(shí)現(xiàn)Runnable接口創(chuàng)建線程
如果自己的類已經(jīng)extends另一個(gè)類,就無(wú)法直接extends Thread月趟,此時(shí)灯蝴,可以實(shí)現(xiàn)一個(gè)Runnable接口,如下:
// 實(shí)現(xiàn)Runnable接口的類將被Thread執(zhí)行孝宗,表示一個(gè)基本的任務(wù)
public interface Runnable {
// run方法就是它所有的內(nèi)容穷躁,就是實(shí)際執(zhí)行的任務(wù)
public abstract void run();
}
public class MyThread implements Runnable {
public void run() {
System.out.println("關(guān)注一角錢(qián)技術(shù),獲取Java架構(gòu)資料");
}
}
為了啟動(dòng)MyThread因妇,需要首先實(shí)例化一個(gè)Thread问潭,并傳入自己的MyThread實(shí)例:
MyThread myThread = new MyThread();
Thread thread = new Thread(myThread);
thread.start();
事實(shí)上,當(dāng)傳入一個(gè)Runnable target參數(shù)給Thread后婚被,Thread的run()方法就會(huì)調(diào)用target.run()狡忙,參考JDK源代碼:
public void run() {
if (target != null) {
target.run();
}
}
實(shí)現(xiàn)Callable接口通過(guò)FutureTask包裝器來(lái)創(chuàng)建Thread線程
Callable接口(也只有一個(gè)方法)定義如下:
public interface Callable<V> {
V call() throws Exception;
}
//Callable同樣是任務(wù),與Runnable接口的區(qū)別在于它接收泛型址芯,同時(shí)它執(zhí)行任務(wù)后帶有返回內(nèi)容
public class SomeCallable<V> implements Callable<V> {
// 相對(duì)于run方法的帶有返回值的call方法
@Override
public V call() throws Exception {
// TODO Auto-generated method stub
return null;
}
}
Callable<V> oneCallable = new SomeCallable<V>();
//由Callable<Integer>創(chuàng)建一個(gè)FutureTask<Integer>對(duì)象:
FutureTask<V> oneTask = new FutureTask<V>(oneCallable);
//注釋:FutureTask<Integer>是一個(gè)包裝器灾茁,它通過(guò)接受Callable<Integer>來(lái)創(chuàng)建窜觉,它同時(shí)實(shí)現(xiàn)了Future和Runnable接口。
//由FutureTask<Integer>創(chuàng)建一個(gè)Thread對(duì)象:
Thread oneThread = new Thread(oneTask);
oneThread.start();
//至此北专,一個(gè)線程就創(chuàng)建完成了禀挫。
使用ExecutorService、Callable拓颓、Future實(shí)現(xiàn)有返回結(jié)果的線程
ExecutorService语婴、Callable、Future三個(gè)接口實(shí)際上都是屬于Executor框架驶睦。返回結(jié)果的線程是在JDK1.5中引入的新特征腻格,有了這種特征就不需要再為了得到返回值而大費(fèi)周折了。而且自己實(shí)現(xiàn)了也可能漏洞百出啥繁。(下部分來(lái)講線程池了)
- 可返回值的任務(wù)必須實(shí)現(xiàn)Callable接口菜职。
- 類似的,無(wú)返回值的任務(wù)必須實(shí)現(xiàn)Runnable接口旗闽。
執(zhí)行Callable任務(wù)后酬核,可以獲取一個(gè)Future的對(duì)象,在該對(duì)象上調(diào)用get就可以獲取到Callable任務(wù)返回的Object了适室。
注意:get方法是阻塞的嫡意,即:線程無(wú)返回結(jié)果,get方法會(huì)一直等待捣辆。
再結(jié)合線程池接口ExecutorService就可以實(shí)現(xiàn)傳說(shuō)中有返回結(jié)果的多線程了蔬螟。
下面提供了一個(gè)完整的有返回結(jié)果的多線程測(cè)試?yán)印4a如下:
package com.niuh.thread.v4;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
/**
* <p>
* 使用ExecutorService汽畴、Callable旧巾、Future實(shí)現(xiàn)有返回結(jié)果的線程
* </p>
*/
public class MyThread {
public static void main(String[] args) throws ExecutionException,
InterruptedException {
System.out.println(("----程序開(kāi)始運(yùn)行----"));
Date date1 = new Date();
int taskSize = 5;
// 創(chuàng)建一個(gè)線程池
ExecutorService pool = Executors.newFixedThreadPool(taskSize);
// 創(chuàng)建多個(gè)有返回值的任務(wù)
List<Future> list = new ArrayList<Future>();
for (int i = 0; i < taskSize; i++) {
Callable c = new MyCallable(i + " ");
// 執(zhí)行任務(wù)并獲取Future對(duì)象
Future f = pool.submit(c);
// System.out.println(">>>" + f.get().toString());
list.add(f);
}
// 關(guān)閉線程池
pool.shutdown();
// 獲取所有并發(fā)任務(wù)的運(yùn)行結(jié)果
for (Future f : list) {
// 從Future對(duì)象上獲取任務(wù)的返回值,并輸出到控制臺(tái)
System.out.println(">>>" + f.get().toString());
}
Date date2 = new Date();
System.out.println("----程序結(jié)束運(yùn)行----忍些,程序運(yùn)行時(shí)間【"
+ (date2.getTime() - date1.getTime()) + "毫秒】");
}
}
class MyCallable implements Callable<Object> {
private String taskNum;
MyCallable(String taskNum) {
this.taskNum = taskNum;
}
public Object call() throws Exception {
System.out.println(">>>" + taskNum + "任務(wù)啟動(dòng)");
Date dateTmp1 = new Date();
Thread.sleep(1000);
Date dateTmp2 = new Date();
long time = dateTmp2.getTime() - dateTmp1.getTime();
System.out.println(">>>" + taskNum + "任務(wù)終止");
return taskNum + "任務(wù)返回運(yùn)行結(jié)果,當(dāng)前任務(wù)時(shí)間【" + time + "毫秒】";
}
}
協(xié)程
協(xié)程(纖程鲁猩,用戶級(jí)線程),目的是為了追求最大力度的發(fā)揮硬件性能和提升軟件的速度罢坝,協(xié)程基本原理是:在某個(gè)點(diǎn)掛起當(dāng)前的任務(wù)廓握,并且保存棧信息,去執(zhí)行另一個(gè)任務(wù)嘁酿;等完成或達(dá)到某個(gè)條件時(shí)隙券,再還原原來(lái)的棧信息并繼續(xù)執(zhí)行(整個(gè)過(guò)程不需要上下文切換)。
協(xié)程的概念很早就提出來(lái)了闹司,但直到最近幾年才在某些語(yǔ)言(如Lua)中得到廣泛應(yīng)用娱仔。
協(xié)程的目的:當(dāng)我們?cè)谑褂枚嗑€程的時(shí)候,如果存在長(zhǎng)時(shí)間的I/O操作开仰。這個(gè)時(shí)候線程一直處于阻塞狀態(tài)拟枚,如果線程很多的時(shí)候薪铜,會(huì)存在很多線程處于空閑狀態(tài)众弓,造成了資源應(yīng)用不徹底恩溅。相對(duì)的協(xié)程不一樣了,在單線程中多個(gè)任務(wù)來(lái)回執(zhí)行如果出現(xiàn)長(zhǎng)時(shí)間的I/O操作谓娃,讓其讓出目前的協(xié)程調(diào)度脚乡,執(zhí)行下一個(gè)任務(wù)。當(dāng)然可能所有任務(wù)滨达,全部卡在同一個(gè)點(diǎn)上奶稠,但是這只是針對(duì)于單線程而言,當(dāng)所有數(shù)據(jù)正常返回時(shí)捡遍,會(huì)同時(shí)處理當(dāng)前的I/O操作锌订。
Java原生不支持協(xié)程,在純java代碼里需要使用協(xié)程的話需要引入第三方包,如:quasar
<dependency>
<groupId>co.paralleluniverse</groupId>
<artifactId>quasar-core</artifactId>
<version>0.8.0</version>
<classifier>jdk8</classifier>
</dependency>
線程池
“線程池”画株,顧名思義就是一個(gè)線程緩存辆飘,線程是稀缺資源,如果被無(wú)限制的創(chuàng)建谓传,不僅會(huì)消耗系統(tǒng)資源蜈项,還會(huì)降低系統(tǒng)的穩(wěn)定性,因此 Java 中提供線程池對(duì)線程進(jìn)行統(tǒng)一分配续挟、調(diào)優(yōu)和監(jiān)控紧卒。
線程池介紹
在web開(kāi)發(fā)中,服務(wù)器需要接受并處理請(qǐng)求诗祸,所以會(huì)為一個(gè)請(qǐng)求分配一個(gè)線程來(lái)進(jìn)行處理跑芳。如果每次請(qǐng)求都創(chuàng)建一個(gè)線程的話實(shí)現(xiàn)起來(lái)非常簡(jiǎn)單,但是存在一個(gè)問(wèn)題:如果并發(fā)的請(qǐng)求數(shù)量非常多直颅,但每個(gè)線程執(zhí)行的時(shí)間很短聋亡,這樣就會(huì)頻繁的創(chuàng)建和銷毀線程,如此一來(lái)會(huì)大大降低系統(tǒng)的效率际乘∑戮螅可能出現(xiàn)服務(wù)器在為每個(gè)請(qǐng)求創(chuàng)建新線程和銷毀線程上花費(fèi)的時(shí)間和消耗的系統(tǒng)資源要比處理實(shí)際的用戶請(qǐng)求的時(shí)間和資源更多。
那么有沒(méi)有一種辦法使執(zhí)行完一個(gè)任務(wù)脖含,并不被銷毀罪塔,而是可以繼續(xù)執(zhí)行其他的任務(wù)呢?
這就是線程池的目的养葵。線程池為線程生命周期的開(kāi)銷和資源不足問(wèn)題提供了解決方案征堪。通過(guò)對(duì)多個(gè)任務(wù)重用線程,線程創(chuàng)建的開(kāi)銷被分?jǐn)偟蕉鄠€(gè)任務(wù)上关拒。
什么時(shí)候使用線程池佃蚜?
- 單個(gè)任務(wù)處理時(shí)間比較短庸娱;
- 需要處理的任務(wù)數(shù)量很大。
線程池優(yōu)勢(shì)
- 重用存在的線程谐算。減少線程黃金熟尉、消亡的開(kāi)銷,提高性能洲脂;
- 提高響應(yīng)速度斤儿。當(dāng)任務(wù)到達(dá)時(shí),任務(wù)可以不需要等待線程創(chuàng)建就能立即執(zhí)行恐锦;
- 提高線程的可管理性往果。線程是稀缺資源,如果無(wú)限制的創(chuàng)建一铅,不僅會(huì)消耗系統(tǒng)資源陕贮,還會(huì)降低系統(tǒng)的穩(wěn)定性,使用線程池可以進(jìn)行統(tǒng)一的分配潘飘、調(diào)優(yōu)和監(jiān)控肮之。
Executor框架
Executor接口是線程池框架中最基礎(chǔ)的部分,定義來(lái)一個(gè)用于執(zhí)行 Runnable 的 execute 方法福也。下面為它的繼承與實(shí)現(xiàn)
ExecutorService接口
從圖中可以看出 Executor 下有一個(gè)重要的子接口 ExecutorService
局骤,其中定義來(lái)線程池的具體行為
- execute(Runnable command):履行Ruannable類型的任務(wù);
- submit(task):可用來(lái)提交Callable或Runnable任務(wù)暴凑,并返回代表此任務(wù)的Future對(duì)象峦甩;
- shutdown():在完成已提交的任務(wù)后封閉辦事,不再接管新任務(wù)现喳;
- shutdownNow():停止所有正在履行的任務(wù)并封閉辦事凯傲;
- isTerminated():測(cè)試是否所有任務(wù)都履行完畢了;
- isShutdown():測(cè)試是否該ExecutorService已被關(guān)閉;
- awaitTermination(long,TimeUnit):接收timeout和TimeUnit兩個(gè)參數(shù)嗦篱,用于設(shè)定超時(shí)時(shí)間及單位冰单。當(dāng)?shù)却^(guò)設(shè)定時(shí)間時(shí),會(huì)監(jiān)測(cè)ExecutorService是否已經(jīng)關(guān)閉灸促,若關(guān)閉則返回true诫欠,否則返回false。一般情況下會(huì)和shutdown方法組合使用;
- invokeAll :作用是等待所有的任務(wù)執(zhí)行完成后統(tǒng)一返回;
- invokeAny :將第一個(gè)得到的結(jié)果作為返回值浴栽,然后立刻終止所有的線程荒叼。如果設(shè)置了超時(shí)時(shí)間,未超時(shí)完成則正常返回結(jié)果典鸡,如果超時(shí)未完成則報(bào)超時(shí)異常被廓。
AbstractExcutorService抽象類
此類的定義并沒(méi)有特殊的意義僅僅是實(shí)現(xiàn)了ExecutorService接口
public abstract class AbstractExecutorService implements ExecutorService {
//此方法很簡(jiǎn)單就是對(duì)runnable保證,將其包裝為一個(gè)FutureTask
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}
//包裝callable為FutureTask
//FutureTask其實(shí)就是對(duì)Callable的一個(gè)封裝
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}
//提交一個(gè)Runnable類型的任務(wù)
public Future<?> submit(Runnable task) {
//如果為null則拋出NPE
if (task == null) throw new NullPointerException();
//包裝任務(wù)為一個(gè)Future
RunnableFuture<Void> ftask = newTaskFor(task, null);
//將任務(wù)丟給執(zhí)行器萝玷,而此處會(huì)拋出拒絕異常嫁乘,在講述ThreadPoolExecutor的時(shí)候有講述昆婿,不記得的讀者可以去再看看
execute(ftask);
return ftask;
}
//與上方方法相同只不過(guò)指定了返回結(jié)果
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}
//與上方方法相同只是換成了callable
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
//執(zhí)行集合tasks結(jié)果是最后一個(gè)執(zhí)行結(jié)束的任務(wù)結(jié)果
//可以設(shè)置超時(shí) timed為true并且nanos是未來(lái)的一個(gè)時(shí)間
//任何一個(gè)任務(wù)完成都將會(huì)返回結(jié)果
private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
boolean timed, long nanos)
throws InterruptedException, ExecutionException, TimeoutException {
//傳入的任務(wù)集合不能為null
if (tasks == null)
throw new NullPointerException();
//傳入的任務(wù)數(shù)不能是0
int ntasks = tasks.size();
if (ntasks == 0)
throw new IllegalArgumentException();
//滿足上面的校驗(yàn)后將任務(wù)分裝到一個(gè)ArrayList中
ArrayList<Future<T>> futures = new ArrayList<Future<T>>(ntasks);
//并且創(chuàng)建一個(gè)執(zhí)行器傳入this
//這里簡(jiǎn)單講述他的執(zhí)行原理,傳入this會(huì)使用傳入的this(類型為Executor)作為執(zhí)行器用于執(zhí)行任務(wù)蜓斧,當(dāng)submit提交任務(wù)的時(shí)候回將任務(wù)
//封裝為一個(gè)內(nèi)部的Future并且重寫(xiě)他的done而此方法就是在future完成的時(shí)候調(diào)用的仓蛆,而他的寫(xiě)法則是將當(dāng)前完成的future添加到esc
//維護(hù)的結(jié)果隊(duì)列中
ExecutorCompletionService<T> ecs =
new ExecutorCompletionService<T>(this);
try {
//創(chuàng)建一個(gè)執(zhí)行異常,以便后面拋出
ExecutionException ee = null;
//如果開(kāi)啟了超時(shí)則計(jì)算死線時(shí)間如果時(shí)間是0則代表沒(méi)有開(kāi)啟執(zhí)行超時(shí)
final long deadline = timed ? System.nanoTime() + nanos : 0L;
//獲取任務(wù)的迭代器
Iterator<? extends Callable<T>> it = tasks.iterator();
//先獲取迭代器中的第一個(gè)任務(wù)提交給前面創(chuàng)建的ecs執(zhí)行器
futures.add(ecs.submit(it.next()));
//前面記錄的任務(wù)數(shù)減一
--ntasks;
//當(dāng)前激活數(shù)為1
int active = 1;
//進(jìn)入死循環(huán)
for (;;) {
//獲取剛才提價(jià)的任務(wù)是否完成如果完成則f不是null否則為null
Future<T> f = ecs.poll();
//如果為null則代表任務(wù)還在繼續(xù)
if (f == null) {
//如果當(dāng)前任務(wù)大于0 說(shuō)明除了剛才的任務(wù)還有別的任務(wù)存在
if (ntasks > 0) {
//則任務(wù)數(shù)減一
--ntasks;
//并且再次提交新的任務(wù)
futures.add(ecs.submit(it.next()));
//當(dāng)前的存活的執(zhí)行任務(wù)加一
++active;
}
//如果當(dāng)前存活任務(wù)數(shù)是0則代表沒(méi)有任務(wù)在執(zhí)行了從而跳出循環(huán)
else if (active == 0)
break;
//如果當(dāng)前任務(wù)執(zhí)行設(shè)置了超時(shí)時(shí)間
else if (timed) {
//則設(shè)置指定的超時(shí)時(shí)間獲取
f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
//等待執(zhí)行超時(shí)還沒(méi)有獲取到則拋出超時(shí)異常
if (f == null)
throw new TimeoutException();
//否則使用當(dāng)前時(shí)間計(jì)算剩下的超時(shí)時(shí)間用于下一個(gè)循環(huán)使用
nanos = deadline - System.nanoTime();
}
//如果沒(méi)有設(shè)置超時(shí)則直接獲取任務(wù)
else
f = ecs.take();
}
//如果獲取到了任務(wù)結(jié)果f!=null
if (f != null) {
//激活數(shù)減一
--active;
try {
//返回獲取到的結(jié)果
return f.get();
//如果獲取結(jié)果出錯(cuò)則包裝異常
} catch (ExecutionException eex) {
ee = eex;
} catch (RuntimeException rex) {
ee = new ExecutionException(rex);
}
}
}
//如果異常不是null則拋出如果是則創(chuàng)建一個(gè)
if (ee == null)
ee = new ExecutionException();
throw ee;
} finally {
//其他任務(wù)則設(shè)置取消
for (int i = 0, size = futures.size(); i < size; i++)
futures.get(i).cancel(true);
}
}
//對(duì)上方方法的封裝
public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException {
try {
return doInvokeAny(tasks, false, 0);
} catch (TimeoutException cannotHappen) {
assert false;
return null;
}
}
//對(duì)上方法的封裝
public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
return doInvokeAny(tasks, true, unit.toNanos(timeout));
}
//相對(duì)于上一個(gè)方法執(zhí)行成功任何一個(gè)則返回結(jié)果而此方法是全部執(zhí)行完然后統(tǒng)一返回結(jié)果
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException {
//傳入的任務(wù)集合不能是null
if (tasks == null)
throw new NullPointerException();
//創(chuàng)建一個(gè)集合用來(lái)保存獲取到的執(zhí)行future
ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
//任務(wù)是否執(zhí)行完成
boolean done = false;
try {
//遍歷傳入的任務(wù)并且調(diào)用執(zhí)行方法將創(chuàng)建的future添加到集合中
for (Callable<T> t : tasks) {
RunnableFuture<T> f = newTaskFor(t);
futures.add(f);
execute(f);
}
//遍歷獲取到的future
for (int i = 0, size = futures.size(); i < size; i++) {
Future<T> f = futures.get(i);
//如果當(dāng)前任務(wù)沒(méi)有成功則進(jìn)行f.get方法等待此方法執(zhí)行成功法精,如果方法執(zhí)行異扯嗦桑或者被取消將忽略異常
if (!f.isDone()) {
try {
f.get();
} catch (CancellationException ignore) {
} catch (ExecutionException ignore) {
}
}
}
//到這一步則代表所有的任務(wù)都已經(jīng)有了確切的結(jié)果
done = true;
//返回任務(wù)結(jié)果集合
return futures;
} finally {
//如果不是true是false 則代表執(zhí)行過(guò)程中被中斷了則需要對(duì)任務(wù)進(jìn)行取消操作痴突,如果正常完成則不會(huì)被取消
if (!done)
for (int i = 0, size = futures.size(); i < size; i++)
futures.get(i).cancel(true);
}
}
//與上方方法的區(qū)別在于對(duì)于任務(wù)集合可以設(shè)置超時(shí)時(shí)間
//這里會(huì)針對(duì)差異進(jìn)行講解
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException {
if (tasks == null)
throw new NullPointerException();
//計(jì)算設(shè)置時(shí)長(zhǎng)的納秒時(shí)間
long nanos = unit.toNanos(timeout);
ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
boolean done = false;
try {
for (Callable<T> t : tasks)
futures.add(newTaskFor(t));
//計(jì)算最終計(jì)算的確切時(shí)間點(diǎn)搂蜓,運(yùn)行時(shí)長(zhǎng)不能超過(guò)此時(shí)間也就是時(shí)間死線
//這里是個(gè)細(xì)節(jié)future創(chuàng)建的時(shí)間并沒(méi)有算作執(zhí)行時(shí)間
final long deadline = System.nanoTime() + nanos;
//獲取當(dāng)前結(jié)果數(shù)
final int size = futures.size();
//遍歷將任務(wù)進(jìn)行執(zhí)行
for (int i = 0; i < size; i++) {
execute((Runnable)futures.get(i));
//并且每次都計(jì)算死線
nanos = deadline - System.nanoTime();
//如果時(shí)間已經(jīng)超過(guò)則返回結(jié)果
if (nanos <= 0L)
return futures;
}
//否則遍歷future確定每次執(zhí)行都獲取到了結(jié)果
for (int i = 0; i < size; i++) {
Future<T> f = futures.get(i);
if (!f.isDone()) {
//如果在等待過(guò)程中已經(jīng)超時(shí)則返回當(dāng)前等待結(jié)合
if (nanos <= 0L)
return futures;
try {
//如果沒(méi)有超過(guò)死線則設(shè)置從future中獲取結(jié)果的時(shí)間如果超過(guò)則會(huì)派出timeout
f.get(nanos, TimeUnit.NANOSECONDS);
} catch (CancellationException ignore) {
} catch (ExecutionException ignore) {
} catch (TimeoutException toe) {
//拋出了異常則會(huì)返回當(dāng)前的列表
return futures;
}
//計(jì)算最新的超時(shí)時(shí)間
nanos = deadline - System.nanoTime();
}
}
//之前的返回都沒(méi)有設(shè)置為true所以在finally中都會(huì)設(shè)置為取消唯獨(dú)正常執(zhí)行完成到此處返回的結(jié)果才是最終的結(jié)果
done = true;
return futures;
} finally {
if (!done)
for (int i = 0, size = futures.size(); i < size; i++)
futures.get(i).cancel(true);
}
}
}
線程池的具體實(shí)現(xiàn)
- ThreadPoolExecutor 默認(rèn)線程池
- ScheduledThreadPoolExecutor 定時(shí)線程池 (下篇再做介紹)
ThreadPoolExecutor
線程池重點(diǎn)屬性
//用來(lái)標(biāo)記線程池狀態(tài)(高3位),線程個(gè)數(shù)(低29位)
//默認(rèn)是RUNNING狀態(tài)辽装,線程個(gè)數(shù)為0
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//線程個(gè)數(shù)掩碼位數(shù)帮碰,并不是所有平臺(tái)int類型是32位,所以準(zhǔn)確說(shuō)是具體平臺(tái)下Integer的二進(jìn)制位數(shù)-3后的剩余位數(shù)才是線程的個(gè)數(shù)拾积,
private static final int COUNT_BITS = Integer.SIZE - 3;
//線程最大個(gè)數(shù)(低29位)000 11111111111111111111111111111
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
ctl 是對(duì)線程池的運(yùn)行狀態(tài)和線程池中有效線程的數(shù)量進(jìn)行控制的一個(gè)字段殉挽, 它包含兩部分的信息: 線程池的運(yùn)行狀態(tài) (runState) 和線程池內(nèi)有效線程的數(shù)量 (workerCount),這里可以看到拓巧,使用了Integer類型來(lái)保存斯碌,高3位保存runState,低29位保存workerCount肛度。COUNT_BITS 就是29傻唾,CAPACITY就是1左移29位減1(29個(gè)1),這個(gè)常量表示workerCount的上限值承耿,大約是5億冠骄。
ctl相關(guān)方法
- runStateOf:獲取運(yùn)行狀態(tài);
- workerCountOf:獲取活動(dòng)線程數(shù)加袋;
- ctlOf:獲取運(yùn)行狀態(tài)和活動(dòng)線程數(shù)的值凛辣。
// 獲取高三位 運(yùn)行狀態(tài)
private static int runStateOf(int c) { return c & ~CAPACITY; }
//獲取低29位 線程個(gè)數(shù)
private static int workerCountOf(int c) { return c & CAPACITY; }
//計(jì)算ctl新值,線程狀態(tài) 與 線程個(gè)數(shù)
private static int ctlOf(int rs, int wc) { return rs | wc; }
線程池存在5種狀態(tài)
//運(yùn)行中 111 00000000000000000000000000000
private static final int RUNNING = -1 << COUNT_BITS;
//關(guān)閉 000 00000000000000000000000000000
private static final int SHUTDOWN = 0 << COUNT_BITS;
//停止 001 00000000000000000000000000000
private static final int STOP = 1 << COUNT_BITS;
//整理 010 00000000000000000000000000000
private static final int TIDYING = 2 << COUNT_BITS;
//終止 011 00000000000000000000000000000
private static final int TERMINATED = 3 << COUNT_BITS;
使用一個(gè)整形,前3位表示狀態(tài),后29位表示線程容量,也就是說(shuō)線程最多有 個(gè)
前三位 | 狀態(tài) | 十進(jìn)制 |
---|---|---|
111 | RUNNING | -1 |
000 | SHUTDOWN | 0 |
001 | STOP | 1 |
010 | TIDYING | 2 |
011 | TERMINATED | 3 |
也可以看出當(dāng)ctl小于零表示線程池仍在運(yùn)行
RUNNING
- 狀態(tài)說(shuō)明:線程池處在RUNNING狀態(tài)時(shí)职烧,能夠接收新任務(wù)扁誓,以及對(duì)已添加的任務(wù)進(jìn)行處理。
- 狀態(tài)切換:線程池的初始化狀態(tài)是RUNNING蚀之。換句話說(shuō)蝗敢,線程池被一旦被創(chuàng)建,就處于RUNNING狀態(tài)恬总,并且線程池中的任務(wù)數(shù)為0前普!
SHUTDOWN
- 狀態(tài)說(shuō)明:線程池處在SHUTDOWN狀態(tài)時(shí),不接收新任務(wù)壹堰,但能處理已添加的任務(wù)拭卿。
- 狀態(tài)切換:調(diào)用線程池的shutdown()接口時(shí)骡湖,線程池由RUNNING -> SHUTDOWN。
STOP
- 狀態(tài)說(shuō)明:線程池處在STOP狀態(tài)時(shí)峻厚,不接收新任務(wù)响蕴,不處理已添加的任務(wù),并且會(huì)中斷正在處理的任務(wù)惠桃。
- 狀態(tài)切換:調(diào)用線程池的shutdownNow()接口時(shí)浦夷,線程池由(RUNNING or SHUTDOWN ) -> STOP。
TIDYING
- 狀態(tài)說(shuō)明:當(dāng)所有的任務(wù)已終止辜王,ctl記錄的”任務(wù)數(shù)量”為0劈狐,線程池會(huì)變?yōu)門(mén)IDYING狀態(tài)。當(dāng)線程池變?yōu)門(mén)IDYING狀態(tài)時(shí)呐馆,會(huì)執(zhí)行鉤子函數(shù)terminated()肥缔。terminated()在ThreadPoolExecutor類中是空的,若用戶想在線程池變?yōu)門(mén)IDYING時(shí)汹来,進(jìn)行相應(yīng)的處理续膳;可以通過(guò)重載terminated()函數(shù)來(lái)實(shí)現(xiàn)。
- 狀態(tài)切換:當(dāng)線程池在SHUTDOWN狀態(tài)下收班,阻塞隊(duì)列為空并且線程池中執(zhí)行的任務(wù)也為空時(shí)坟岔,就會(huì)由 SHUTDOWN -> TIDYING。 當(dāng)線程池在STOP狀態(tài)下摔桦,線程池中執(zhí)行的任務(wù)為空時(shí)社付,就會(huì)由STOP -> TIDYING。
TERMINATED
- 狀態(tài)說(shuō)明:線程池徹底終止酣溃,就變成TERMINATED狀態(tài)瘦穆。
- 狀態(tài)切換:線程池處在TIDYING狀態(tài)時(shí),執(zhí)行完terminated()之后赊豌,就會(huì)由 TIDYING -> TERMINATED扛或。
進(jìn)入TERMINATED的條件如下:
- 線程池不是RUNNING狀態(tài);
- 線程池狀態(tài)不是TIDYING狀態(tài)或TERMINATED狀態(tài)碘饼;
- 如果線程池狀態(tài)是SHUTDOWN并且workerQueue為空熙兔;
- workerCount為0;
- 設(shè)置TIDYING狀態(tài)成功艾恼。
線程池參數(shù)
corePoolSize
線程池中的核心線程數(shù)住涉,當(dāng)提交一個(gè)任務(wù)時(shí),線程池創(chuàng)建一個(gè)新線程執(zhí)行任務(wù)钠绍,直到當(dāng)前線程數(shù)等于corePoolSize舆声;如果當(dāng)前線程數(shù)為corePoolSize,繼續(xù)提交的任務(wù)被保存到阻塞隊(duì)列中,等待被執(zhí)行媳握;如果執(zhí)行了線程池的prestartAllCoreThreads()方法碱屁,線程池會(huì)提前創(chuàng)建并啟動(dòng)所有核心線程。
maximumPoolSize
線程池中允許的最大線程數(shù)蛾找。如果當(dāng)前阻塞隊(duì)列滿了娩脾,且繼續(xù)提交任務(wù),則創(chuàng)建新的線程執(zhí)行任務(wù)打毛,前提是當(dāng)前線程數(shù)小于maximumPoolSize柿赊;
keepAliveTim
線程池維護(hù)線程所允許的空閑時(shí)間。當(dāng)線程池中的線程數(shù)量大于corePoolSize的時(shí)候幻枉,如果這時(shí)沒(méi)有新的任務(wù)提交碰声,核心線程外的線程不會(huì)立即銷毀,而是會(huì)等待展辞,直到等待的時(shí)間超過(guò)了keepAliveTime奥邮;
unit
keepAliveTime的單位万牺;
workQueue
用來(lái)保存等待被執(zhí)行的任務(wù)的阻塞隊(duì)列罗珍,且任務(wù)必須實(shí)現(xiàn)Runable接口,在JDK中提供了如下阻塞隊(duì)列:
- 1脚粟、ArrayBlockingQueue:基于數(shù)組結(jié)構(gòu)的有界阻塞隊(duì)列覆旱,按FIFO排序任務(wù);
- 2核无、LinkedBlockingQuene:基于鏈表結(jié)構(gòu)的阻塞隊(duì)列扣唱,按FIFO排序任務(wù),吞吐量通常要高于ArrayBlockingQuene团南;
- 3噪沙、SynchronousQuene:一個(gè)不存儲(chǔ)元素的阻塞隊(duì)列,每個(gè)插入操作必須等到另一個(gè)線程調(diào)用移除操作吐根,否則插入操作一直處于阻塞狀態(tài)正歼,吞吐量通常要高于LinkedBlockingQuene;
- 4拷橘、priorityBlockingQuene:具有優(yōu)先級(jí)的無(wú)界阻塞隊(duì)列局义;
threadFactory
它是ThreadFactory類型的變量,用來(lái)創(chuàng)建新線程冗疮。默認(rèn)使用Executors.defaultThreadFactory() 來(lái)創(chuàng)建線程萄唇。使用默認(rèn)的ThreadFactory來(lái)創(chuàng)建線程時(shí),會(huì)使新創(chuàng)建的線程具有相同的NORM_PRIORITY優(yōu)先級(jí)并且是非守護(hù)線程术幔,同時(shí)也設(shè)置了線程的名稱另萤。
handler
線程池的飽和策略,當(dāng)阻塞隊(duì)列滿了诅挑,且沒(méi)有空閑的工作線程四敞,如果繼續(xù)提交任務(wù)勾缭,必須采取一種策略處理該任務(wù),線程池提供了4種策略:
- AbortPolicy:直接拋出異常目养,默認(rèn)策略俩由;
- CallerRunsPolicy:用調(diào)用者所在的線程來(lái)執(zhí)行任務(wù);
- DiscardOldestPolicy:丟棄阻塞隊(duì)列中靠最前的任務(wù)癌蚁,并執(zhí)行當(dāng)前任務(wù)幻梯;
- DiscardPolicy:直接丟棄任務(wù);
上面的4種策略都是ThreadPoolExecutor的內(nèi)部類努释。
當(dāng)然也可以根據(jù)應(yīng)用場(chǎng)景實(shí)現(xiàn)RejectedExecutionHandler接口碘梢,自定義飽和策略眨补,如記錄日志或持久化存儲(chǔ)不能處理的任務(wù)酿雪。
線程池的創(chuàng)建
有四個(gè)構(gòu)造函數(shù)蕾盯,其他三個(gè)都是調(diào)用下面代碼中的這個(gè)構(gòu)造函數(shù)
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
線程池監(jiān)控
public long getTaskCount() //線程池已執(zhí)行與未執(zhí)行的任務(wù)總數(shù)
public long getCompletedTaskCount() //已完成的任務(wù)數(shù)
public int getPoolSize() //線程池當(dāng)前的線程數(shù)
public int getActiveCount() //線程池中正在執(zhí)行任務(wù)的線程數(shù)量
線程池原理
核心方法分析
execute方法
execute
方法是提交任務(wù) command 到線程池進(jìn)行執(zhí)行
public void execute(Runnable command) {
//如果任務(wù)為null排龄,則拋出NPE異常
if (command == null)
throw new NullPointerException();
/*
* clt記錄著runState和workerCount
*/
int c = ctl.get();
/*
* workerCountOf方法取出低29位的值遏弱,表示當(dāng)前活動(dòng)的線程數(shù)渣窜;
* 如果當(dāng)前活動(dòng)線程數(shù)小于corePoolSize呀枢,則新建一個(gè)線程放入線程池中霉晕;
* 并把任務(wù)添加到該線程中缕减。
*/
if (workerCountOf(c) < corePoolSize) {
/*
* addWorker中的第二個(gè)參數(shù)表示限制添加線程的數(shù)量是根據(jù)corePoolSize來(lái)判斷還是maximumPoolSize來(lái)判斷雷客;
* 如果為true,根據(jù)corePoolSize來(lái)判斷桥狡;
* 如果為false搅裙,則根據(jù)maximumPoolSize來(lái)判斷
*/
if (addWorker(command, true))
return;
/*
* 如果添加失敗,則重新獲取ctl值
*/
c = ctl.get();
}
/*
* 如果當(dāng)前線程池是運(yùn)行狀態(tài)并且任務(wù)添加到隊(duì)列成功
*/
if (isRunning(c) && workQueue.offer(command)) {
// 重新獲取ctl值
int recheck = ctl.get();
// 再次判斷線程池的運(yùn)行狀態(tài)裹芝,如果不是運(yùn)行狀態(tài)部逮,由于之前已經(jīng)把command添加到workQueue中了,
// 這時(shí)需要移除該command
// 執(zhí)行過(guò)后通過(guò)handler使用拒絕策略對(duì)該任務(wù)進(jìn)行處理嫂易,整個(gè)方法返回
if (! isRunning(recheck) && remove(command))
reject(command);
/*
* 獲取線程池中的有效線程數(shù)兄朋,如果數(shù)量是0,則執(zhí)行addWorker方法
* 這里傳入的參數(shù)表示:
* 1. 第一個(gè)參數(shù)為null炬搭,表示在線程池中創(chuàng)建一個(gè)線程蜈漓,但不去啟動(dòng);
* 2. 第二個(gè)參數(shù)為false宫盔,將線程池的有限線程數(shù)量的上限設(shè)置為maximumPoolSize融虽,添加線程時(shí)根據(jù)maximumPoolSize來(lái)判斷;
* 如果判斷workerCount大于0灼芭,則直接返回有额,在workQueue中新增的command會(huì)在將來(lái)的某個(gè)時(shí)刻被執(zhí)行。
*/
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
/*
* 如果執(zhí)行到這里,有兩種情況:
* 1. 線程池已經(jīng)不是RUNNING狀態(tài)巍佑;
* 2. 線程池是RUNNING狀態(tài)茴迁,但workerCount >= corePoolSize并且workQueue已滿。
* 這時(shí)萤衰,再次調(diào)用addWorker方法堕义,但第二個(gè)參數(shù)傳入為false,將線程池的有限線程數(shù)量的上限設(shè)置為maximumPoolSize脆栋;
* 如果失敗則拒絕該任務(wù)
*/
else if (!addWorker(command, false))
reject(command);
}
簡(jiǎn)單來(lái)說(shuō)倦卖,在執(zhí)行execute()方法時(shí)如果狀態(tài)一直是RUNNING時(shí)的執(zhí)行過(guò)程如下:
- 如果 workerCount < corePoolSize,則創(chuàng)建并啟動(dòng)一個(gè)線程來(lái)執(zhí)行新提交的任務(wù)椿争;
- 如果 workerCount > corePoolSize怕膛,且線程池內(nèi)的阻塞隊(duì)列未滿,則將任務(wù)添加到阻塞隊(duì)列中秦踪;
- 如果 workerCount >= corePoolSize && workerCount < maximumPoolSize褐捻,且線程池內(nèi)的阻塞隊(duì)列已滿,則創(chuàng)建并啟動(dòng)一個(gè)線程來(lái)執(zhí)行新提交的任務(wù)椅邓;
- 如果 workerCount >= maximumPoolSize柠逞,并且線程池內(nèi)的阻塞隊(duì)列已滿,則根據(jù)拒絕策略來(lái)處理該任務(wù)希坚,默認(rèn)的處理方式是直接拋異常边苹。
注意:addWorker(null,false); ,也就是創(chuàng)建一個(gè)線程裁僧,但并沒(méi)有傳入從任務(wù),因?yàn)槿蝿?wù)已經(jīng)被添加到 workQueue中了慕购,所以 worker 在執(zhí)行的時(shí)候聊疲,會(huì)直接從 workQueue 中獲取任務(wù)。所以沪悲,在workerCountof(recheck) == 0 時(shí)執(zhí)行 addWorker(null,false); 也是為了保證線程池在RUNNING狀態(tài)下必須要有一個(gè)線程來(lái)執(zhí)行任務(wù)获洲。
execute方法執(zhí)行流程如下:
addWorker方法
addWorker方法的主要工作是在線程池中創(chuàng)建一個(gè)新的線程并執(zhí)行:
- firstTask參數(shù) 用于指定新增的線程執(zhí)行的第一個(gè)任務(wù);
- core參數(shù) 為true表示在新增線程時(shí)會(huì)判斷當(dāng)前活動(dòng)線程數(shù)是否少于corePoolSize殿如,false表示新增線程前需要判斷當(dāng)前活動(dòng)線程數(shù)是否少于maximumPoolSize贡珊。
代碼如下:
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
//獲取ctl
int c = ctl.get();
// 獲取運(yùn)行狀態(tài)
int rs = runStateOf(c);
/*
* 這個(gè)if判斷
* 如果rs >= SHUTDOWN,則表示此時(shí)不再接收新任務(wù)涉馁;
* 接著判斷以下3個(gè)條件门岔,只要有1個(gè)不滿足,則返回false:
* 1. rs == SHUTDOWN烤送,這時(shí)表示關(guān)閉狀態(tài)寒随,不再接受新提交的任務(wù),但卻可以繼續(xù)處理阻塞隊(duì)列中已保存的任務(wù)
* 2. firsTask為空
* 3. 阻塞隊(duì)列不為空
*
* 首先考慮rs == SHUTDOWN的情況
* 這種情況下不會(huì)接受新提交的任務(wù),所以在firstTask不為空的時(shí)候會(huì)返回false妻往;
* 然后互艾,如果firstTask為空,并且workQueue也為空讯泣,則返回false纫普,
* 因?yàn)殛?duì)列中已經(jīng)沒(méi)有任務(wù)了,不需要再添加線程了
*/
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
// 獲取線程數(shù)
int wc = workerCountOf(c);
// 如果wc超過(guò)CAPACITY好渠,也就是ctl的低29位的最大值(二進(jìn)制是29個(gè)1)局嘁,返回false;
// 這里的core是addWorker方法的第二個(gè)參數(shù)晦墙,如果為true表示根據(jù)corePoolSize來(lái)比較悦昵,
// 如果為false則根據(jù)maximumPoolSize來(lái)比較。
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 嘗試增加workerCount晌畅,如果成功但指,則跳出第一個(gè)for循環(huán)
if (compareAndIncrementWorkerCount(c))
break retry;
// 如果增加workerCount失敗,則重新獲取ctl的值
c = ctl.get(); // Re-read ctl
// 如果當(dāng)前的運(yùn)行狀態(tài)不等于rs抗楔,說(shuō)明狀態(tài)已被改變棋凳,返回第一個(gè)for循環(huán)繼續(xù)執(zhí)行
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 根據(jù)firstTask來(lái)創(chuàng)建Worker對(duì)象
w = new Worker(firstTask);
// 每一個(gè)Worker對(duì)象都會(huì)創(chuàng)建一個(gè)線程
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int rs = runStateOf(ctl.get());
// rs < SHUTDOWN表示是RUNNING狀態(tài);
// 如果rs是RUNNING狀態(tài)或者rs是SHUTDOWN狀態(tài)并且firstTask為null连躏,向線程池中添加線程剩岳。
// 因?yàn)樵赟HUTDOWN時(shí)不會(huì)在添加新的任務(wù),但還是會(huì)執(zhí)行workQueue中的任務(wù)
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
// workers是一個(gè)HashSet
workers.add(w);
int s = workers.size();
// largestPoolSize記錄著線程池中出現(xiàn)過(guò)的最大線程數(shù)量
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
// 啟動(dòng)線程
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
Worker類
線程池中的每一個(gè)線程被封裝成一個(gè) Worker 對(duì)象入热,ThreadPool 維護(hù)的其實(shí)就是一組 Worker 對(duì)象拍棕。
/**
* Woker主要維護(hù)著運(yùn)行task的worker的中斷控制信息,以及其他小記錄勺良。這個(gè)類繼承AbstractQueuedSynchronizer
* 而來(lái)簡(jiǎn)化獲取和釋放每一個(gè)任務(wù)執(zhí)行中的鎖绰播。這可以防止中斷那些打算喚醒正在等待其他線程任務(wù)的任務(wù),而不是
* 中斷正在運(yùn)行的任務(wù)尚困。我們實(shí)現(xiàn)一個(gè)簡(jiǎn)單的不可重入鎖而不是ReentrantLock蠢箩,因?yàn)槲覀儾幌氘?dāng)其調(diào)用setCorePoolSize
* 這樣的方法的時(shí)候能獲得鎖。
*/
//worker主要是對(duì)進(jìn)行中的任務(wù)進(jìn)行中斷控制事甜,順帶著對(duì)其他行為進(jìn)行記錄
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
private static final long serialVersionUID = 6138294804551838833L;
//正在跑的線程谬泌,如果是null標(biāo)識(shí)factory失敗
final Thread thread;
//初始化一個(gè)任務(wù)以運(yùn)行
Runnable firstTask;
//每個(gè)線程計(jì)數(shù)
volatile long completedTasks;
/**
* 用給定的firstTask和從threadFactory創(chuàng)建
*/
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
//主要調(diào)用了runWorker
public void run() {
runWorker(this);
}
// Lock methods
//
// The value 0 represents the unlocked state.
// The value 1 represents the locked state.
protected boolean isHeldExclusively() {
return getState() != 0;
}
//嘗試獲取鎖
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
//嘗試釋放鎖
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
Worker類繼承了AQS,并實(shí)現(xiàn)了Runnable接口逻谦,注意其中的 firstTask 和 thread 屬性:
- firstTask用它來(lái)保存?zhèn)魅氲娜蝿?wù)掌实;
- thread是在調(diào)用構(gòu)造方法時(shí)通過(guò) ThreadFactory 來(lái)創(chuàng)建的線程,是用來(lái)處理任務(wù)的線程跨跨。
在調(diào)用構(gòu)造方法時(shí)潮峦,需要把任務(wù)傳入囱皿,這里通過(guò) getThreadFactory().newThread(this);
來(lái)新建一個(gè)線程,newThread 方法傳入的參數(shù)是 this忱嘹,因?yàn)?Worker 本身繼承了 Runnable 接口嘱腥,也就是一個(gè)線程,所以一個(gè) Worker 對(duì)象在啟動(dòng)的時(shí)候會(huì)調(diào)用 Worker 類中的 run 方法拘悦。
Worker 繼承了 AQS齿兔,使用AQS來(lái)實(shí)現(xiàn)獨(dú)占鎖的功能。為什么不使用 ReentrantLock 來(lái)實(shí)現(xiàn)呢础米?可以看到 tryAcquire
方法分苇,它是不允許重入的,而 ReentrantLock 是允許重入的:
- lock方法一旦獲取了獨(dú)占鎖屁桑,表示當(dāng)前線程正在執(zhí)行任務(wù)中医寿;
- 如果正在執(zhí)行任務(wù),則不應(yīng)該中斷線程蘑斧;
- 如果該線程現(xiàn)在不是獨(dú)占鎖的狀態(tài)靖秩,也就是空閑的狀態(tài),說(shuō)明它沒(méi)有在處理任務(wù)竖瘾,這時(shí)可以對(duì)該線程進(jìn)行中斷沟突;
- 線程池在執(zhí)行 shutdown 方法或 tryTerminate 方法時(shí)會(huì)調(diào)用 interruptIdleWorkers 方法來(lái)中斷空閑的線程,interruptIdleWorkers 方法會(huì)使用 tryLock 方法來(lái)判斷線程池中的線程是否是空閑狀態(tài)捕传;
- 之所以設(shè)置為不可重入惠拭,是因?yàn)槲覀儾幌M蝿?wù)在調(diào)用像 setCorePoolSize 這樣的線程池控制方法時(shí)重新獲取鎖。如果使用ReentrantLock庸论,它是可重入的职辅,這樣如果在任務(wù)中調(diào)用了如 setCorePoolSize 這類線程池控制的方法,會(huì)中斷正在運(yùn)行的線程葡公。
所以罐农,Worker繼承自AQS,用于判斷線程是否空閑以及是否可以被中斷催什。
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
此外,在構(gòu)造方法中執(zhí)行了 setState(-1);
宰睡,把 state 變量設(shè)置為 -1蒲凶,為什么這樣做呢?是因?yàn)锳QS中默認(rèn)的 state 是0拆内,如果剛剛創(chuàng)建了一個(gè) Worker 對(duì)象旋圆,還沒(méi)有執(zhí)行任務(wù)時(shí),這時(shí)就不應(yīng)該被中斷麸恍,看一下 tryAquire
方法:
protected boolean tryAcquire(int unused) {
//cas修改state灵巧,不可重入
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
tryAcquire方法是根據(jù)state是否是0來(lái)判斷的搀矫,所以,setState(-1);將state設(shè)置為-1是為了禁止在執(zhí)行任務(wù)前對(duì)線程進(jìn)行中斷刻肄。
既然是AQS,其對(duì)于state的解釋也很關(guān)鍵
state | 含義 |
---|---|
-1 | 初始化狀態(tài),禁止中斷 |
0 | 解鎖狀態(tài) |
1 | 上鎖狀態(tài)(獨(dú)占) |
正因?yàn)槿绱巳壳颍趓unWorker方法中會(huì)先調(diào)用Worker對(duì)象的unlock方法將state設(shè)置為0。
runWorker方法
在Worker類中的run方法調(diào)用了runWorker方法來(lái)執(zhí)行任務(wù)敏弃,runWorker方法的代碼如下:
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
// 獲取第一個(gè)任務(wù)
Runnable task = w.firstTask;
w.firstTask = null;
// 解鎖,允許interrupt操作
w.unlock(); // allow interrupts
// 是否因?yàn)楫惓M顺鲅h(huán)
boolean completedAbruptly = true;
try {
// 如果task為空卦羡,則通過(guò)getTask來(lái)獲取任務(wù)
while (task != null || (task = getTask()) != null) {
// 這兒對(duì)worker進(jìn)行加鎖,是為了達(dá)到下面的目的
// 1. 降低鎖范圍麦到,提升性能
// 2. 保證每個(gè)worker執(zhí)行的任務(wù)是串行的
w.lock();
// 如果線程池處于STOP狀態(tài)就中斷線程
// 如果線程被中斷(清除中斷標(biāo)記),線程池處于STOP狀態(tài),線程沒(méi)有再被中斷
if ((runStateAtLeast(ctl.get(), STOP) || //至少是STOP狀態(tài)
(Thread.interrupted() && //中斷過(guò)(并抹除中斷標(biāo)記)
runStateAtLeast(ctl.get(), STOP))) && //再次檢查
!wt.isInterrupted()) //wt沒(méi)有被中斷
wt.interrupt();
try {
beforeExecute(wt, task); //鉤子方法
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown); //鉤子方法
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
這里說(shuō)明一下第一個(gè) if判斷
绿饵,目的是:
- 如果線程池正在停止,那么要保證當(dāng)前線程是中斷狀態(tài)瓶颠;
- 如果不是的話拟赊,則要保證當(dāng)前線程不是中斷狀態(tài);
這里要考慮在執(zhí)行該 if語(yǔ)句
期間可能也執(zhí)行了 shutdownNow
方法粹淋,shutdownNow 方法會(huì)把狀態(tài)設(shè)置為 STOP
吸祟,回顧一下 STOP 狀態(tài):不能接受新任務(wù),也不處理隊(duì)列中的任務(wù)廓啊,會(huì)中斷正在處理任務(wù)的線程欢搜。在線程池處于 RUNNING 或 SHUTDOWN 狀態(tài)時(shí),調(diào)用 shutdownNow() 方法會(huì)使線程池進(jìn)入該狀態(tài)谴轮。
總結(jié)一下runWorker方法的執(zhí)行過(guò)程:
- while循環(huán)不斷地通過(guò)getTask()方法獲取任務(wù)炒瘟;
- getTask()方法從阻塞隊(duì)列中取任務(wù);
- 如果線程池正在停止第步,那么要保證當(dāng)前線程是中斷狀態(tài)疮装,否則要保證當(dāng)前線程不是中斷狀態(tài);
- 調(diào)用task.run()執(zhí)行任務(wù)粘都;
- 如果task為null則跳出循環(huán)廓推,執(zhí)行processWorkerExit()方法;
- runWorker方法執(zhí)行完畢翩隧,也代表著Worker中的run方法執(zhí)行完畢樊展,銷毀線程。
這里的beforeExecute方法和afterExecute方法在ThreadPoolExecutor類中是空的堆生,留給子類來(lái)實(shí)現(xiàn)专缠。
completedAbruptly 變量來(lái)表示執(zhí)行任務(wù)過(guò)程中是否出現(xiàn)了異常,在processWorkerExit方法中會(huì)對(duì)該變量的值進(jìn)行判斷淑仆。
getTask方法
getTask方法用來(lái)從阻塞隊(duì)列中取任務(wù)涝婉,代碼如下:
private Runnable getTask() {
// timeOut變量的值表示上次從阻塞隊(duì)列中取任務(wù)時(shí)是否超時(shí)
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
/*
* 如果線程池狀態(tài)rs >= SHUTDOWN,也就是非RUNNING狀態(tài)蔗怠,再進(jìn)行以下判斷:
* 1. rs >= STOP墩弯,線程池是否正在stop吩跋;
* 2. 阻塞隊(duì)列是否為空。
* 如果以上條件滿足渔工,則將workerCount減1并返回null锌钮。
* 因?yàn)槿绻?dāng)前線程池狀態(tài)的值是SHUTDOWN或以上時(shí),不允許再向阻塞隊(duì)列中添加任務(wù)涨缚。
*/
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
// timed變量用于判斷是否需要進(jìn)行超時(shí)控制轧粟。
// allowCoreThreadTimeOut默認(rèn)是false,也就是核心線程不允許進(jìn)行超時(shí)脓魏;
// wc > corePoolSize兰吟,表示當(dāng)前線程池中的線程數(shù)量大于核心線程數(shù)量;
// 對(duì)于超過(guò)核心線程數(shù)量的這些線程茂翔,需要進(jìn)行超時(shí)控制
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
/*
* wc > maximumPoolSize的情況是因?yàn)榭赡茉诖朔椒▓?zhí)行階段同時(shí)執(zhí)行了setMaximumPoolSize方法混蔼;
* timed && timedOut 如果為true,表示當(dāng)前操作需要進(jìn)行超時(shí)控制珊燎,并且上次從阻塞隊(duì)列中獲取任務(wù)發(fā)生了超時(shí)
* 接下來(lái)判斷惭嚣,如果有效線程數(shù)量大于1,或者阻塞隊(duì)列是空的悔政,那么嘗試將workerCount減1晚吞;
* 如果減1失敗,則返回重試谋国。
* 如果wc == 1時(shí)槽地,也就說(shuō)明當(dāng)前線程是線程池中唯一的一個(gè)線程了。
*/
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
/*
* 根據(jù)timed來(lái)判斷芦瘾,如果為true捌蚊,則通過(guò)阻塞隊(duì)列的poll方法進(jìn)行超時(shí)控制,如果在keepAliveTime時(shí)間內(nèi)沒(méi)有獲取到任務(wù)近弟,則返回null缅糟;
* 否則通過(guò)take方法,如果這時(shí)隊(duì)列為空祷愉,則take方法會(huì)阻塞直到隊(duì)列不為空窗宦。
*
*/
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
// 如果 r == null,說(shuō)明已經(jīng)超時(shí)二鳄,timedOut設(shè)置為true
timedOut = true;
} catch (InterruptedException retry) {
// 如果獲取任務(wù)時(shí)當(dāng)前線程發(fā)生了中斷迫摔,則設(shè)置timedOut為false并返回循環(huán)重試
timedOut = false;
}
}
}
這里重要的地方是第二個(gè) if判斷
,目的是控制線程池的有效線程數(shù)量泥从。由上文中的分析可以知道,在執(zhí)行 execute 方法時(shí)沪摄,如果當(dāng)前線程池的線程數(shù)量超過(guò)了 corePoolSize 且小于 maximumPoolSize躯嫉,并且workQueue已滿時(shí)纱烘,則可以添加工作線程,但這時(shí)如果超時(shí)沒(méi)有獲取到任務(wù)祈餐,也就是 timedOut 為 true的情況擂啥,說(shuō)明 workQueue 已經(jīng)為空了,也就說(shuō)明了當(dāng)前線程池中不需要那么多線程來(lái)執(zhí)行任務(wù)了帆阳,可以把多余 corePoolSize 數(shù)量的線程銷毀掉哺壶,保持線程數(shù)量在 corePoolSize 即可。
什么時(shí)候會(huì)銷毀蜒谤?
當(dāng)然是runWorker方法執(zhí)行完之后山宾,也就是Worker中的run方法執(zhí)行完,由JVM自動(dòng)回收鳍徽。
getTask方法返回null時(shí)资锰,在runWorker方法中會(huì)跳出while循環(huán),然后會(huì)執(zhí)行 processWorkerExit方法阶祭。
processWorkerExit方法
private void processWorkerExit(Worker w, boolean completedAbruptly) {
// 如果completedAbruptly值為true绷杜,則說(shuō)明線程執(zhí)行時(shí)出現(xiàn)了異常,需要將workerCount減1濒募;
// 如果線程執(zhí)行時(shí)沒(méi)有出現(xiàn)異常鞭盟,說(shuō)明在getTask()方法中已經(jīng)已經(jīng)對(duì)workerCount進(jìn)行了減1操作,這里就不必再減了瑰剃。
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//統(tǒng)計(jì)完成的任務(wù)數(shù)
completedTaskCount += w.completedTasks;
// 從workers中移除齿诉,也就表示著從線程池中移除了一個(gè)工作線程
workers.remove(w);
} finally {
mainLock.unlock();
}
// 根據(jù)線程池狀態(tài)進(jìn)行判斷是否結(jié)束線程池
tryTerminate();
int c = ctl.get();
/*
* 當(dāng)線程池是RUNNING或SHUTDOWN狀態(tài)時(shí),如果worker是異常結(jié)束培他,那么會(huì)直接addWorker鹃两;
* 如果allowCoreThreadTimeOut=true,并且等待隊(duì)列有任務(wù)舀凛,至少保留一個(gè)worker俊扳;
* 如果allowCoreThreadTimeOut=false,workerCount不少于corePoolSize猛遍。
*/
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
addWorker(null, false);
}
}
至此馋记,processWorkerExit執(zhí)行完成之后,工作線程被銷毀懊烤,以上就是整個(gè)工作線程的生命周期梯醒,從 execute 方法開(kāi)始,Worker 使用 ThreadFactory 創(chuàng)建新的工作線程腌紧,runWorker 通過(guò) getTask 獲取任務(wù)茸习,然后執(zhí)行任務(wù),如果 getTask 返回null壁肋,進(jìn)入 oricessWorkerExit 方法号胚,整個(gè)線程結(jié)束籽慢。如下圖所示:
[外鏈圖片轉(zhuǎn)存失敗,源站可能有防盜鏈機(jī)制,建議將圖片保存下來(lái)直接上傳(img-aOmgFHaU-1607357995983)(https://imgkr2.cn-bj.ufileos.com/5171fa57-6556-4adb-a6e3-98a1805792e1.jpg?UCloudPublicKey=TOKEN_8d8b72be-579a-4e83-bfd0-5f6ce1546f13&Signature=d%252Fo2TdoPfih5GlF0LZOsql0wfWQ%253D&Expires=1607443308)]
shutdown方法
調(diào)用關(guān)系
[外鏈圖片轉(zhuǎn)存失敗,源站可能有防盜鏈機(jī)制,建議將圖片保存下來(lái)直接上傳(img-sfdnAYMr-1607357995986)(https://imgkr2.cn-bj.ufileos.com/cc964937-ee9d-4c8b-8d99-68b05c96c158.jpg?UCloudPublicKey=TOKEN_8d8b72be-579a-4e83-bfd0-5f6ce1546f13&Signature=ML48OwlgyVP5kZwQPCm99qp%252FGoU%253D&Expires=1607443292)]
shutdown
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();//權(quán)限檢查
advanceRunState(SHUTDOWN);//設(shè)置當(dāng)前線程池狀態(tài)為SHUTDOWN
interruptIdleWorkers();//中斷空閑線程
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate();
}
interruptIdleWorkers
中斷空閑線程
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();//加鎖
try {
for (Worker w : workers) {
Thread t = w.thread;
if (!t.isInterrupted() && w.tryLock()) {//未被中斷且正在獨(dú)占運(yùn)行
try {
t.interrupt();//中斷
} catch (SecurityException ignore) {
} finally {
w.unlock();//worker解鎖
}
}
if (onlyOne)//如果只中斷一個(gè)
break;
}
} finally {
mainLock.unlock();//解鎖
}
}
tryTerminate
final void tryTerminate() {
for (;;) { // 無(wú)限循環(huán),確保操作成功
// 獲取線程池控制狀態(tài)
int c = ctl.get();
if (isRunning(c) || // 線程池的運(yùn)行狀態(tài)為RUNNING
runStateAtLeast(c, TIDYING) || // 線程池的運(yùn)行狀態(tài)最小要大于TIDYING
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) // 線程池的運(yùn)行狀態(tài)為SHUTDOWN并且workQueue隊(duì)列不為null
// 不能終止猫胁,直接返回
return;
if (workerCountOf(c) != 0) { // 線程池正在運(yùn)行的worker數(shù)量不為0 // Eligible to terminate
// 僅僅中斷一個(gè)空閑的worker
interruptIdleWorkers(ONLY_ONE);
return;
}
// 獲取線程池的鎖
final ReentrantLock mainLock = this.mainLock;
// 獲取鎖
mainLock.lock();
try {
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { // 比較并設(shè)置線程池控制狀態(tài)為T(mén)IDYING
try {
// 終止箱亿,鉤子函數(shù)
terminated();
} finally {
// 設(shè)置線程池控制狀態(tài)為T(mén)ERMINATED
ctl.set(ctlOf(TERMINATED, 0));
// 釋放在termination條件上等待的所有線程
termination.signalAll();
}
return;
}
} finally {
// 釋放鎖
mainLock.unlock();
}
// else retry on failed CAS
}
}
shutdownNow方法
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(STOP);//設(shè)置線程池狀態(tài)為STOP
interruptWorkers();//中斷線程
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
Executors類
Executors類,提供了一系列工廠方法用于創(chuàng)建線程池弃秆,返回的線程池都實(shí)現(xiàn)了ExecutorService接口届惋。
關(guān)于Callable的支持
- Callable<Object> callable(Runnable task)
返回 Callable 對(duì)象,調(diào)用它時(shí)可運(yùn)行給定的任務(wù)菠赚。
public static Callable<Object> callable(Runnable task) {
if (task == null)
throw new NullPointerException();
return new RunnableAdapter<Object>(task, null);
}
- Callable<T> callable(Runnable task, T result)
返回 Callable 對(duì)象脑豹,調(diào)用它時(shí)可運(yùn)行給定的任務(wù)并返回給定的結(jié)果。callable(task)等價(jià)于callable(task, null)锈至。
public static <T> Callable<T> callable(Runnable task, T result) {
if (task == null)
throw new NullPointerException();
return new RunnableAdapter<T>(task, result);
}
RunnableAdapter類
/**
* A callable that runs given task and returns given result
*/
static final class RunnableAdapter<T> implements Callable<T> {
final Runnable task;
final T result;
RunnableAdapter(Runnable task, T result) {
this.task = task;
this.result = result;
}
public T call() {
task.run();
return result;
}
}
- Callable callable(final PrivilegedAction action)
返回 Callable 對(duì)象晨缴,調(diào)用它時(shí)可運(yùn)行給定特權(quán)的操作并返回其結(jié)果。
public static Callable<Object> callable(final PrivilegedAction<?> action) {
if (action == null)
throw new NullPointerException();
return new Callable<Object>() {
public Object call() { return action.run(); }
};
}
- callable(PrivilegedExceptionAction action)
和Callable callable(final PrivilegedAction action)其類似峡捡,唯一的區(qū)別在于击碗,前者可拋出異常。
public static Callable<Object> callable(final PrivilegedExceptionAction<?> action) {
if (action == null)
throw new NullPointerException();
return new Callable<Object>() {
public Object call() throws Exception { return action.run(); }
};
}
PrivilegedAction接口
public interface PrivilegedAction<T> {
T run();
}
- Callable privilegedCallable(Callable<T> callable)
返回 Callable 對(duì)象们拙,調(diào)用它時(shí)可在當(dāng)前的訪問(wèn)控制上下文中執(zhí)行給定的 callable 對(duì)象稍途。
public static <T> Callable<T> privilegedCallable(Callable<T> callable) {
if (callable == null)
throw new NullPointerException();
return new PrivilegedCallable<T>(callable);
}
- Callable privilegedCallableUsingCurrentClassLoader(Callable<T> callable)
返回 Callable 對(duì)象,調(diào)用它時(shí)可在當(dāng)前的訪問(wèn)控制上下文中砚婆,使用當(dāng)前上下文類加載器作為上下文類加載器來(lái)執(zhí)行給定的 callable 對(duì)象械拍。
public static <T> Callable<T> privilegedCallableUsingCurrentClassLoader(Callable<T> callable) {
if (callable == null)
throw new NullPointerException();
return new PrivilegedCallableUsingCurrentClassLoader<T>(callable);
}
關(guān)于ThreadFactory的支持
- 根據(jù)需要?jiǎng)?chuàng)建新線程的對(duì)象。使用線程工廠就無(wú)需再手工編寫(xiě)對(duì) new Thread 的調(diào)用了装盯,從而允許應(yīng)用程序使用特殊的線程子類坷虑、屬性等等。
- Executors對(duì)其提供支持:DefaultThreadFactory和PrivilegedThreadFactory埂奈。
public interface ThreadFactory {
Thread newThread(Runnable r);
}
ThreadFactory為默認(rèn)的defaultThreadFactory
迄损,ThreadFactory定義了兩個(gè):defaultThreadFactory和privilegedThreadFactory,但是也可以自己定義账磺,實(shí)現(xiàn)ThreadFactory接口或者繼承已經(jīng)實(shí)現(xiàn)的這兩個(gè)實(shí)現(xiàn)并且修改對(duì)應(yīng)的代碼即可芹敌。
// 返回用于創(chuàng)建新線程的默認(rèn)線程工廠。
public static ThreadFactory defaultThreadFactory() {
return new DefaultThreadFactory();
}
// 返回用于創(chuàng)建新線程的線程工廠垮抗,這些新線程與當(dāng)前線程具有相同的權(quán)限氏捞。
public static ThreadFactory privilegedThreadFactory() {
return new PrivilegedThreadFactory();
}
- DefaultThreadFactory
static class DefaultThreadFactory implements ThreadFactory {
static final AtomicInteger poolNumber = new AtomicInteger(1);
final ThreadGroup group;
final AtomicInteger threadNumber = new AtomicInteger(1);
final String namePrefix;
DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null)? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
從源碼看出DefaultThreadFactory就是創(chuàng)建一個(gè)普通的線程,非守護(hù)線程冒版,優(yōu)先級(jí)為5液茎。
新線程具有可通過(guò) pool-N-thread-M 的 Thread.getName() 來(lái)訪問(wèn)的名稱,其中 N 是此工廠的序列號(hào), M 是此工廠所創(chuàng)建線程的序列號(hào)豁护。
- PrivilegedThreadFactory
static class PrivilegedThreadFactory extends DefaultThreadFactory {
private final ClassLoader ccl;
private final AccessControlContext acc;
PrivilegedThreadFactory() {
super();
this.ccl = Thread.currentThread().getContextClassLoader();
this.acc = AccessController.getContext();
acc.checkPermission(new RuntimePermission("setContextClassLoader"));
}
public Thread newThread(final Runnable r) {
return super.newThread(new Runnable() {
public void run() {
AccessController.doPrivileged(new PrivilegedAction<Object>() {
public Object run() {
Thread.currentThread().setContextClassLoader(ccl);
r.run();
return null;
}
}, acc);
}
});
}
}
從源碼看出哼凯,PrivilegedThreadFactory extends DefaultThreadFactory從而具有與 defaultThreadFactory() 相同設(shè)置的線程。但增加了兩個(gè)特性:ClassLoader和AccessControlContext楚里,從而使運(yùn)行在此類線程中的任務(wù)具有與當(dāng)前線程相同的訪問(wèn)控制和類加載器。
關(guān)于RejectedExecutionHandler的支持
而默認(rèn)的拒絕策略RejectedExecutionHandler則為 AbortPolicy猎贴,并且所有的拒絕策略都是實(shí)現(xiàn)接口RejectedExecutionHandler
private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();
五種線程池創(chuàng)建類型
newFixedThreadPool(固定大小線程池)
newFixedThreadPool:創(chuàng)建一個(gè)核心線程個(gè)數(shù)和最大線程個(gè)數(shù)都為 nThreads 的線程池班缎,并且阻塞隊(duì)列長(zhǎng)度為 Integer.MAX_VALUE
,keeyAliveTime=0
說(shuō)明只要線程個(gè)數(shù)比核心線程個(gè)數(shù)多并且當(dāng)前空閑則回收她渴。代碼如下:
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
//使用自定義線程創(chuàng)建工廠
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
newSingleThreadExecutor(單個(gè)后臺(tái)線程)
newSingleThreadExecutor:創(chuàng)建一個(gè)核心線程個(gè)數(shù)和最大線程個(gè)數(shù)都為1的線程池达址,并且阻塞隊(duì)列長(zhǎng)度為 Integer.MAX_VALUE
,keeyAliveTime=0
說(shuō)明只要線程個(gè)數(shù)比核心線程個(gè)數(shù)多并且當(dāng)前空閑則回收趁耗。代碼如下:
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
//使用自己的線程工廠
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory));
}
newCachedThreadPool(無(wú)界線程池沉唠,可以進(jìn)行自動(dòng)線程回收)
newCachedThreadPool:創(chuàng)建一個(gè)按需創(chuàng)建線程的線程池,初始線程個(gè)數(shù)為 0苛败,最多線程個(gè)數(shù)為 Integer.MAX_VALUE
满葛,并且阻塞隊(duì)列為同步隊(duì)列,keeyAliveTime=60
說(shuō)明只要當(dāng)前線程 60s 內(nèi)空閑則回收罢屈。這個(gè)特殊在于加入到同步隊(duì)列的任務(wù)會(huì)被馬上被執(zhí)行嘀韧,同步隊(duì)列里面最多只有一個(gè)任務(wù)。代碼如下:
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
//使用自定義的線程工廠
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
}
newScheduledThreadPool (可調(diào)度)
newScheduledThreadPool:創(chuàng)建一個(gè)定長(zhǎng)線程池缠捌,支持定時(shí)及周期性任務(wù)執(zhí)行锄贷。newScheduledThreadPool 和 其他線程池最大的區(qū)別是使用的阻塞隊(duì)列是 DelayedWorkQueue,而且多了兩個(gè)定時(shí)執(zhí)行的方法scheduleAtFixedRate和scheduleWithFixedDelay
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
//使用自定義的線程工廠
public static ScheduledExecutorService newScheduledThreadPool(
int corePoolSize, ThreadFactory threadFactory) {
return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}
newWorkStealingPool(并行操作)
newWorkStealingPool:JDK1.8新增newWorkStealingPool曼月,適合使用在很耗時(shí)的操作谊却,但是newWorkStealingPool不是ThreadPoolExecutor的擴(kuò)展,它是新的線程池類ForkJoinPool的擴(kuò)展哑芹,但是都是在統(tǒng)一的一個(gè)Executors類中實(shí)現(xiàn)炎辨,由于能夠合理的使用CPU進(jìn)行對(duì)任務(wù)操作(并行操作),所以適合使用在很耗時(shí)的任務(wù)中绩衷。代碼如下:
public static ExecutorService newWorkStealingPool() {
return new ForkJoinPool
(Runtime.getRuntime().availableProcessors(),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
public static ExecutorService newWorkStealingPool(int parallelism) {
return new ForkJoinPool
(parallelism,
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
相關(guān)文章
- 并發(fā)編程從操作系統(tǒng)底層工作的整體認(rèn)識(shí)開(kāi)始
- 深入理解Java內(nèi)存模型(JMM)及volatile關(guān)鍵字
- 深入理解CPU緩存一致性協(xié)議(MESI)
- 并發(fā)編程之synchronized深入理解
- 并發(fā)編程之抽象隊(duì)列同步器AQS應(yīng)用Lock詳解
- 阻塞隊(duì)列 — ArrayBlockingQueue源碼分析
- 阻塞隊(duì)列 — LinkedBlockingQueue源碼分析
- 阻塞隊(duì)列 —PriorityBlockingQueue源碼分析
- 阻塞隊(duì)列 — DelayQueue源碼分析
- 阻塞隊(duì)列 — SynchronousQueue源碼分析
- 阻塞隊(duì)列 — LinkedTransferQueue源碼分析
- 阻塞隊(duì)列 — LinkedBlockingDeque源碼分析
- 阻塞隊(duì)列 — DelayedWorkQueue源碼分析
- 并發(fā)編程之Future&FutureTask深入解析
PS:以上代碼提交在 Github :https://github.com/Niuh-Study/niuh-juc-final.git
文章持續(xù)更新蹦魔,可以公眾號(hào)搜一搜「 一角錢(qián)技術(shù) 」第一時(shí)間閱讀, 本文 GitHub org_hejianhui/JavaStudy 已經(jīng)收錄咳燕,歡迎 Star勿决。