并發(fā)編程之Executor線程池原理與源碼解讀

點(diǎn)贊再看,養(yǎng)成習(xí)慣,搜一搜【一角錢(qián)技術(shù)】關(guān)注更多原創(chuàng)技術(shù)文章蝉娜。本文 GitHub org_hejianhui/JavaStudy 已收錄坛善,有我的系列文章。

前言

在介紹線程池之前缓升,我們先回顧下線程的基本知識(shí)吧史。其中線程池包括ThreadPoolExecutor 默認(rèn)線程和ScheduledThreadPoolExecutor 定時(shí)線程池 邮辽,本篇重點(diǎn)介紹ThreadPoolExecutor線程池。

線程

線程是調(diào)度CPU資源的最小單位贸营,線程模型分為KLT模型與ULT模型吨述,JVM使用的是KLT模型,Java線程與OS線程保持 1:1 的映射關(guān)系钞脂,也就是說(shuō)有一個(gè)Java線程也會(huì)在操作系統(tǒng)里有一個(gè)對(duì)應(yīng)的線程揣云。

內(nèi)核線程模型

內(nèi)核線程(KLT):系統(tǒng)內(nèi)核管理線程(KLT),內(nèi)核保存線程的狀態(tài)和上下文信息冰啃,線程阻塞不會(huì)引起進(jìn)程阻塞邓夕。在多處理器系統(tǒng)上,多線程在多處理器上并行運(yùn)行阎毅。線程的創(chuàng)建焚刚、調(diào)度和管理由內(nèi)核完成,效率比ULT要慢扇调,比進(jìn)程操作快矿咕。

用戶線程模型

用戶線程(ULT):用戶程序?qū)崿F(xiàn),不依賴操作系統(tǒng)核心肃拜,應(yīng)用提供創(chuàng)建痴腌、同步雌团、調(diào)度和管理線程的函數(shù)來(lái)控制用戶線程燃领。不需要用戶態(tài)/內(nèi)核態(tài)切換,速度快锦援。內(nèi)核對(duì)ULT無(wú)感知,線程阻塞則進(jìn)程(包括它的所有線程)阻塞。

Java線程生命狀態(tài)

Java線程有多種生命狀態(tài):

  • NEW 捎谨,新建
  • RUNNABLE ,運(yùn)行
  • BLOCKED 区岗,阻塞
  • WAITING ,等待
  • TIMED_WAITING 毁枯,超時(shí)等待
  • TERMINATED慈缔,終結(jié)

狀態(tài)切換如下圖所示:


Java線程實(shí)現(xiàn)方式

Java線程實(shí)現(xiàn)方式主要有四種:

  • 繼承Thread類
  • 實(shí)現(xiàn)Runnable接口、
  • 實(shí)現(xiàn)Callable接口通過(guò)FutureTask包裝器來(lái)創(chuàng)建Thread線程种玛、
  • 使用ExecutorService藐鹤、Callable、Future實(shí)現(xiàn)有返回結(jié)果的多線程赂韵。

其中前兩種方式線程執(zhí)行完后都沒(méi)有返回值娱节,后兩種是帶返回值的。

繼承Thread類創(chuàng)建線程

Thread類本質(zhì)上是實(shí)現(xiàn)了Runnable接口的一個(gè)實(shí)例祭示,代表一個(gè)線程的實(shí)例肄满。啟動(dòng)線程的唯一方法就是通過(guò)Thread類的start()實(shí)例方法。start()方法是一個(gè)native方法质涛,它將啟動(dòng)一個(gè)新線程稠歉,并執(zhí)行run()方法。這種方式實(shí)現(xiàn)多線程很簡(jiǎn)單汇陆,通過(guò)自己的類直接extend Thread轧抗,并復(fù)寫(xiě)run()方法,就可以啟動(dòng)新線程并執(zhí)行自己定義的run()方法瞬测。例如:

public class MyThread extends Thread {  
  public void run() {  
   System.out.println("關(guān)注一角錢(qián)技術(shù)横媚,獲取Java架構(gòu)資料");  
  }  
}  
 
MyThread myThread1 = new MyThread();  
MyThread myThread2 = new MyThread();  
myThread1.start();  
myThread2.start();

實(shí)現(xiàn)Runnable接口創(chuàng)建線程

如果自己的類已經(jīng)extends另一個(gè)類,就無(wú)法直接extends Thread月趟,此時(shí)灯蝴,可以實(shí)現(xiàn)一個(gè)Runnable接口,如下:

// 實(shí)現(xiàn)Runnable接口的類將被Thread執(zhí)行孝宗,表示一個(gè)基本的任務(wù)
public interface Runnable {
    // run方法就是它所有的內(nèi)容穷躁,就是實(shí)際執(zhí)行的任務(wù)
    public abstract void run();
}
public class MyThread implements Runnable {  
  public void run() {  
    System.out.println("關(guān)注一角錢(qián)技術(shù),獲取Java架構(gòu)資料");  
  }  
}

為了啟動(dòng)MyThread因妇,需要首先實(shí)例化一個(gè)Thread问潭,并傳入自己的MyThread實(shí)例:

MyThread myThread = new MyThread();  
Thread thread = new Thread(myThread);  
thread.start();  

事實(shí)上,當(dāng)傳入一個(gè)Runnable target參數(shù)給Thread后婚被,Thread的run()方法就會(huì)調(diào)用target.run()狡忙,參考JDK源代碼:

public void run() {  
  if (target != null) {  
   target.run();  
  }  
}

實(shí)現(xiàn)Callable接口通過(guò)FutureTask包裝器來(lái)創(chuàng)建Thread線程

Callable接口(也只有一個(gè)方法)定義如下:

public interface Callable<V> { 
    V call() throws Exception;   
} 
//Callable同樣是任務(wù),與Runnable接口的區(qū)別在于它接收泛型址芯,同時(shí)它執(zhí)行任務(wù)后帶有返回內(nèi)容
public class SomeCallable<V> implements Callable<V> {
    // 相對(duì)于run方法的帶有返回值的call方法
    @Override
    public V call() throws Exception {
        // TODO Auto-generated method stub
        return null;
    }

}
Callable<V> oneCallable = new SomeCallable<V>();   
//由Callable<Integer>創(chuàng)建一個(gè)FutureTask<Integer>對(duì)象:   
FutureTask<V> oneTask = new FutureTask<V>(oneCallable);
//注釋:FutureTask<Integer>是一個(gè)包裝器灾茁,它通過(guò)接受Callable<Integer>來(lái)創(chuàng)建窜觉,它同時(shí)實(shí)現(xiàn)了Future和Runnable接口。
//由FutureTask<Integer>創(chuàng)建一個(gè)Thread對(duì)象:   
Thread oneThread = new Thread(oneTask);   
oneThread.start();   
//至此北专,一個(gè)線程就創(chuàng)建完成了禀挫。

使用ExecutorService、Callable拓颓、Future實(shí)現(xiàn)有返回結(jié)果的線程

ExecutorService语婴、Callable、Future三個(gè)接口實(shí)際上都是屬于Executor框架驶睦。返回結(jié)果的線程是在JDK1.5中引入的新特征腻格,有了這種特征就不需要再為了得到返回值而大費(fèi)周折了。而且自己實(shí)現(xiàn)了也可能漏洞百出啥繁。(下部分來(lái)講線程池了

  • 可返回值的任務(wù)必須實(shí)現(xiàn)Callable接口菜职。
  • 類似的,無(wú)返回值的任務(wù)必須實(shí)現(xiàn)Runnable接口旗闽。

執(zhí)行Callable任務(wù)后酬核,可以獲取一個(gè)Future的對(duì)象,在該對(duì)象上調(diào)用get就可以獲取到Callable任務(wù)返回的Object了适室。

注意:get方法是阻塞的嫡意,即:線程無(wú)返回結(jié)果,get方法會(huì)一直等待捣辆。

再結(jié)合線程池接口ExecutorService就可以實(shí)現(xiàn)傳說(shuō)中有返回結(jié)果的多線程了蔬螟。

下面提供了一個(gè)完整的有返回結(jié)果的多線程測(cè)試?yán)印4a如下:

package com.niuh.thread.v4;

import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

/**
 * <p>
 * 使用ExecutorService汽畴、Callable旧巾、Future實(shí)現(xiàn)有返回結(jié)果的線程
 * </p>
 */
public class MyThread {
    
    public static void main(String[] args) throws ExecutionException,
            InterruptedException {

        System.out.println(("----程序開(kāi)始運(yùn)行----"));
        Date date1 = new Date();

        int taskSize = 5;
        // 創(chuàng)建一個(gè)線程池
        ExecutorService pool = Executors.newFixedThreadPool(taskSize);
        // 創(chuàng)建多個(gè)有返回值的任務(wù)
        List<Future> list = new ArrayList<Future>();
        for (int i = 0; i < taskSize; i++) {
            Callable c = new MyCallable(i + " ");
            // 執(zhí)行任務(wù)并獲取Future對(duì)象
            Future f = pool.submit(c);
            // System.out.println(">>>" + f.get().toString());
            list.add(f);
        }
        // 關(guān)閉線程池
        pool.shutdown();

        // 獲取所有并發(fā)任務(wù)的運(yùn)行結(jié)果
        for (Future f : list) {
            // 從Future對(duì)象上獲取任務(wù)的返回值,并輸出到控制臺(tái)
            System.out.println(">>>" + f.get().toString());
        }

        Date date2 = new Date();
        System.out.println("----程序結(jié)束運(yùn)行----忍些,程序運(yùn)行時(shí)間【"
                + (date2.getTime() - date1.getTime()) + "毫秒】");
    }
}

class MyCallable implements Callable<Object> {
    private String taskNum;

    MyCallable(String taskNum) {
        this.taskNum = taskNum;
    }

    public Object call() throws Exception {
        System.out.println(">>>" + taskNum + "任務(wù)啟動(dòng)");
        Date dateTmp1 = new Date();
        Thread.sleep(1000);
        Date dateTmp2 = new Date();
        long time = dateTmp2.getTime() - dateTmp1.getTime();
        System.out.println(">>>" + taskNum + "任務(wù)終止");
        return taskNum + "任務(wù)返回運(yùn)行結(jié)果,當(dāng)前任務(wù)時(shí)間【" + time + "毫秒】";
    }
}

協(xié)程

協(xié)程(纖程鲁猩,用戶級(jí)線程),目的是為了追求最大力度的發(fā)揮硬件性能和提升軟件的速度罢坝,協(xié)程基本原理是:在某個(gè)點(diǎn)掛起當(dāng)前的任務(wù)廓握,并且保存棧信息,去執(zhí)行另一個(gè)任務(wù)嘁酿;等完成或達(dá)到某個(gè)條件時(shí)隙券,再還原原來(lái)的棧信息并繼續(xù)執(zhí)行(整個(gè)過(guò)程不需要上下文切換)。

協(xié)程的概念很早就提出來(lái)了闹司,但直到最近幾年才在某些語(yǔ)言(如Lua)中得到廣泛應(yīng)用娱仔。

協(xié)程的目的:當(dāng)我們?cè)谑褂枚嗑€程的時(shí)候,如果存在長(zhǎng)時(shí)間的I/O操作开仰。這個(gè)時(shí)候線程一直處于阻塞狀態(tài)拟枚,如果線程很多的時(shí)候薪铜,會(huì)存在很多線程處于空閑狀態(tài)众弓,造成了資源應(yīng)用不徹底恩溅。相對(duì)的協(xié)程不一樣了,在單線程中多個(gè)任務(wù)來(lái)回執(zhí)行如果出現(xiàn)長(zhǎng)時(shí)間的I/O操作谓娃,讓其讓出目前的協(xié)程調(diào)度脚乡,執(zhí)行下一個(gè)任務(wù)。當(dāng)然可能所有任務(wù)滨达,全部卡在同一個(gè)點(diǎn)上奶稠,但是這只是針對(duì)于單線程而言,當(dāng)所有數(shù)據(jù)正常返回時(shí)捡遍,會(huì)同時(shí)處理當(dāng)前的I/O操作锌订。

Java原生不支持協(xié)程,在純java代碼里需要使用協(xié)程的話需要引入第三方包,如:quasar

<dependency>
    <groupId>co.paralleluniverse</groupId>
    <artifactId>quasar-core</artifactId>
    <version>0.8.0</version>
    <classifier>jdk8</classifier>
</dependency>

線程池

“線程池”画株,顧名思義就是一個(gè)線程緩存辆飘,線程是稀缺資源,如果被無(wú)限制的創(chuàng)建谓传,不僅會(huì)消耗系統(tǒng)資源蜈项,還會(huì)降低系統(tǒng)的穩(wěn)定性,因此 Java 中提供線程池對(duì)線程進(jìn)行統(tǒng)一分配续挟、調(diào)優(yōu)和監(jiān)控紧卒。

線程池介紹

在web開(kāi)發(fā)中,服務(wù)器需要接受并處理請(qǐng)求诗祸,所以會(huì)為一個(gè)請(qǐng)求分配一個(gè)線程來(lái)進(jìn)行處理跑芳。如果每次請(qǐng)求都創(chuàng)建一個(gè)線程的話實(shí)現(xiàn)起來(lái)非常簡(jiǎn)單,但是存在一個(gè)問(wèn)題:如果并發(fā)的請(qǐng)求數(shù)量非常多直颅,但每個(gè)線程執(zhí)行的時(shí)間很短聋亡,這樣就會(huì)頻繁的創(chuàng)建和銷毀線程,如此一來(lái)會(huì)大大降低系統(tǒng)的效率际乘∑戮螅可能出現(xiàn)服務(wù)器在為每個(gè)請(qǐng)求創(chuàng)建新線程和銷毀線程上花費(fèi)的時(shí)間和消耗的系統(tǒng)資源要比處理實(shí)際的用戶請(qǐng)求的時(shí)間和資源更多

那么有沒(méi)有一種辦法使執(zhí)行完一個(gè)任務(wù)脖含,并不被銷毀罪塔,而是可以繼續(xù)執(zhí)行其他的任務(wù)呢?

這就是線程池的目的养葵。線程池為線程生命周期的開(kāi)銷和資源不足問(wèn)題提供了解決方案征堪。通過(guò)對(duì)多個(gè)任務(wù)重用線程,線程創(chuàng)建的開(kāi)銷被分?jǐn)偟蕉鄠€(gè)任務(wù)上关拒。

什么時(shí)候使用線程池佃蚜?

  • 單個(gè)任務(wù)處理時(shí)間比較短庸娱;
  • 需要處理的任務(wù)數(shù)量很大。

線程池優(yōu)勢(shì)

  • 重用存在的線程谐算。減少線程黃金熟尉、消亡的開(kāi)銷,提高性能洲脂;
  • 提高響應(yīng)速度斤儿。當(dāng)任務(wù)到達(dá)時(shí),任務(wù)可以不需要等待線程創(chuàng)建就能立即執(zhí)行恐锦;
  • 提高線程的可管理性往果。線程是稀缺資源,如果無(wú)限制的創(chuàng)建一铅,不僅會(huì)消耗系統(tǒng)資源陕贮,還會(huì)降低系統(tǒng)的穩(wěn)定性,使用線程池可以進(jìn)行統(tǒng)一的分配潘飘、調(diào)優(yōu)和監(jiān)控肮之。

Executor框架

Executor接口是線程池框架中最基礎(chǔ)的部分,定義來(lái)一個(gè)用于執(zhí)行 Runnable 的 execute 方法福也。下面為它的繼承與實(shí)現(xiàn)


ExecutorService接口

從圖中可以看出 Executor 下有一個(gè)重要的子接口 ExecutorService 局骤,其中定義來(lái)線程池的具體行為

  • execute(Runnable command):履行Ruannable類型的任務(wù);
  • submit(task):可用來(lái)提交Callable或Runnable任務(wù)暴凑,并返回代表此任務(wù)的Future對(duì)象峦甩;
  • shutdown():在完成已提交的任務(wù)后封閉辦事,不再接管新任務(wù)现喳;
  • shutdownNow():停止所有正在履行的任務(wù)并封閉辦事凯傲;
  • isTerminated():測(cè)試是否所有任務(wù)都履行完畢了;
  • isShutdown():測(cè)試是否該ExecutorService已被關(guān)閉;
  • awaitTermination(long,TimeUnit):接收timeout和TimeUnit兩個(gè)參數(shù)嗦篱,用于設(shè)定超時(shí)時(shí)間及單位冰单。當(dāng)?shù)却^(guò)設(shè)定時(shí)間時(shí),會(huì)監(jiān)測(cè)ExecutorService是否已經(jīng)關(guān)閉灸促,若關(guān)閉則返回true诫欠,否則返回false。一般情況下會(huì)和shutdown方法組合使用;
  • invokeAll :作用是等待所有的任務(wù)執(zhí)行完成后統(tǒng)一返回;
  • invokeAny :將第一個(gè)得到的結(jié)果作為返回值浴栽,然后立刻終止所有的線程荒叼。如果設(shè)置了超時(shí)時(shí)間,未超時(shí)完成則正常返回結(jié)果典鸡,如果超時(shí)未完成則報(bào)超時(shí)異常被廓。

AbstractExcutorService抽象類

此類的定義并沒(méi)有特殊的意義僅僅是實(shí)現(xiàn)了ExecutorService接口


public abstract class AbstractExecutorService implements ExecutorService {
    //此方法很簡(jiǎn)單就是對(duì)runnable保證,將其包裝為一個(gè)FutureTask
    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
        return new FutureTask<T>(runnable, value);
    }
    //包裝callable為FutureTask
    //FutureTask其實(shí)就是對(duì)Callable的一個(gè)封裝
    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
        return new FutureTask<T>(callable);
    }
    //提交一個(gè)Runnable類型的任務(wù)
    public Future<?> submit(Runnable task) {
        //如果為null則拋出NPE
        if (task == null) throw new NullPointerException();
        //包裝任務(wù)為一個(gè)Future
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        //將任務(wù)丟給執(zhí)行器萝玷,而此處會(huì)拋出拒絕異常嫁乘,在講述ThreadPoolExecutor的時(shí)候有講述昆婿,不記得的讀者可以去再看看
        execute(ftask);
        return ftask;
    }

    //與上方方法相同只不過(guò)指定了返回結(jié)果
    public <T> Future<T> submit(Runnable task, T result) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task, result);
        execute(ftask);
        return ftask;
    }
    //與上方方法相同只是換成了callable
    public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }

    //執(zhí)行集合tasks結(jié)果是最后一個(gè)執(zhí)行結(jié)束的任務(wù)結(jié)果
    //可以設(shè)置超時(shí) timed為true并且nanos是未來(lái)的一個(gè)時(shí)間
    //任何一個(gè)任務(wù)完成都將會(huì)返回結(jié)果
    private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
                              boolean timed, long nanos)
        throws InterruptedException, ExecutionException, TimeoutException {
        //傳入的任務(wù)集合不能為null
        if (tasks == null)
            throw new NullPointerException();
        //傳入的任務(wù)數(shù)不能是0
        int ntasks = tasks.size();
        if (ntasks == 0)
            throw new IllegalArgumentException();
        //滿足上面的校驗(yàn)后將任務(wù)分裝到一個(gè)ArrayList中
        ArrayList<Future<T>> futures = new ArrayList<Future<T>>(ntasks);
        //并且創(chuàng)建一個(gè)執(zhí)行器傳入this
        //這里簡(jiǎn)單講述他的執(zhí)行原理,傳入this會(huì)使用傳入的this(類型為Executor)作為執(zhí)行器用于執(zhí)行任務(wù)蜓斧,當(dāng)submit提交任務(wù)的時(shí)候回將任務(wù)
        //封裝為一個(gè)內(nèi)部的Future并且重寫(xiě)他的done而此方法就是在future完成的時(shí)候調(diào)用的仓蛆,而他的寫(xiě)法則是將當(dāng)前完成的future添加到esc
        //維護(hù)的結(jié)果隊(duì)列中
        ExecutorCompletionService<T> ecs =
            new ExecutorCompletionService<T>(this);

        try {
            //創(chuàng)建一個(gè)執(zhí)行異常,以便后面拋出
            ExecutionException ee = null;
            //如果開(kāi)啟了超時(shí)則計(jì)算死線時(shí)間如果時(shí)間是0則代表沒(méi)有開(kāi)啟執(zhí)行超時(shí)
            final long deadline = timed ? System.nanoTime() + nanos : 0L;
            //獲取任務(wù)的迭代器
            Iterator<? extends Callable<T>> it = tasks.iterator();
            //先獲取迭代器中的第一個(gè)任務(wù)提交給前面創(chuàng)建的ecs執(zhí)行器
            futures.add(ecs.submit(it.next()));
            //前面記錄的任務(wù)數(shù)減一
            --ntasks;
            //當(dāng)前激活數(shù)為1
            int active = 1;
            //進(jìn)入死循環(huán)
            for (;;) {
                //獲取剛才提價(jià)的任務(wù)是否完成如果完成則f不是null否則為null
                Future<T> f = ecs.poll();
                //如果為null則代表任務(wù)還在繼續(xù)
                if (f == null) {
                    //如果當(dāng)前任務(wù)大于0 說(shuō)明除了剛才的任務(wù)還有別的任務(wù)存在
                    if (ntasks > 0) {
                        //則任務(wù)數(shù)減一
                        --ntasks;
                        //并且再次提交新的任務(wù)
                        futures.add(ecs.submit(it.next()));
                        //當(dāng)前的存活的執(zhí)行任務(wù)加一
                        ++active;
                    }
                    //如果當(dāng)前存活任務(wù)數(shù)是0則代表沒(méi)有任務(wù)在執(zhí)行了從而跳出循環(huán)
                    else if (active == 0)
                        break;
                    //如果當(dāng)前任務(wù)執(zhí)行設(shè)置了超時(shí)時(shí)間
                    else if (timed) {
                        //則設(shè)置指定的超時(shí)時(shí)間獲取
                        f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
                        //等待執(zhí)行超時(shí)還沒(méi)有獲取到則拋出超時(shí)異常
                        if (f == null)
                            throw new TimeoutException();
                        //否則使用當(dāng)前時(shí)間計(jì)算剩下的超時(shí)時(shí)間用于下一個(gè)循環(huán)使用
                        nanos = deadline - System.nanoTime();
                    }
                    //如果沒(méi)有設(shè)置超時(shí)則直接獲取任務(wù)
                    else
                        f = ecs.take();
                }
                //如果獲取到了任務(wù)結(jié)果f!=null
                if (f != null) {
                    //激活數(shù)減一
                    --active;
                    try {
                        //返回獲取到的結(jié)果
                        return f.get();
                        //如果獲取結(jié)果出錯(cuò)則包裝異常
                    } catch (ExecutionException eex) {
                        ee = eex;
                    } catch (RuntimeException rex) {
                        ee = new ExecutionException(rex);
                    }
                }
            }
            //如果異常不是null則拋出如果是則創(chuàng)建一個(gè)
            if (ee == null)
                ee = new ExecutionException();
            throw ee;

        } finally {
            //其他任務(wù)則設(shè)置取消
            for (int i = 0, size = futures.size(); i < size; i++)
                futures.get(i).cancel(true);
        }
    }
    //對(duì)上方方法的封裝
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
        throws InterruptedException, ExecutionException {
        try {
            return doInvokeAny(tasks, false, 0);
        } catch (TimeoutException cannotHappen) {
            assert false;
            return null;
        }
    }
    //對(duì)上方法的封裝
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                           long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException {
        return doInvokeAny(tasks, true, unit.toNanos(timeout));
    }
    //相對(duì)于上一個(gè)方法執(zhí)行成功任何一個(gè)則返回結(jié)果而此方法是全部執(zhí)行完然后統(tǒng)一返回結(jié)果
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException {
        //傳入的任務(wù)集合不能是null
        if (tasks == null)
            throw new NullPointerException();
        //創(chuàng)建一個(gè)集合用來(lái)保存獲取到的執(zhí)行future
        ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
        //任務(wù)是否執(zhí)行完成
        boolean done = false;
        try {
            //遍歷傳入的任務(wù)并且調(diào)用執(zhí)行方法將創(chuàng)建的future添加到集合中
            for (Callable<T> t : tasks) {
                RunnableFuture<T> f = newTaskFor(t);
                futures.add(f);
                execute(f);
            }
            //遍歷獲取到的future
            for (int i = 0, size = futures.size(); i < size; i++) {
                Future<T> f = futures.get(i);
                //如果當(dāng)前任務(wù)沒(méi)有成功則進(jìn)行f.get方法等待此方法執(zhí)行成功法精,如果方法執(zhí)行異扯嗦桑或者被取消將忽略異常
                if (!f.isDone()) {
                    try {
                        f.get();
                    } catch (CancellationException ignore) {
                    } catch (ExecutionException ignore) {
                    }
                }
            }
            //到這一步則代表所有的任務(wù)都已經(jīng)有了確切的結(jié)果
            done = true;
            //返回任務(wù)結(jié)果集合
            return futures;
        } finally {
            //如果不是true是false 則代表執(zhí)行過(guò)程中被中斷了則需要對(duì)任務(wù)進(jìn)行取消操作痴突,如果正常完成則不會(huì)被取消
            if (!done)
                for (int i = 0, size = futures.size(); i < size; i++)
                    futures.get(i).cancel(true);
        }
    }
    //與上方方法的區(qū)別在于對(duì)于任務(wù)集合可以設(shè)置超時(shí)時(shí)間
    //這里會(huì)針對(duì)差異進(jìn)行講解
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                         long timeout, TimeUnit unit)
        throws InterruptedException {
        if (tasks == null)
            throw new NullPointerException();
        //計(jì)算設(shè)置時(shí)長(zhǎng)的納秒時(shí)間
        long nanos = unit.toNanos(timeout);
        ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
        boolean done = false;
        try {
            for (Callable<T> t : tasks)
                futures.add(newTaskFor(t));
            //計(jì)算最終計(jì)算的確切時(shí)間點(diǎn)搂蜓,運(yùn)行時(shí)長(zhǎng)不能超過(guò)此時(shí)間也就是時(shí)間死線
            //這里是個(gè)細(xì)節(jié)future創(chuàng)建的時(shí)間并沒(méi)有算作執(zhí)行時(shí)間
            final long deadline = System.nanoTime() + nanos;
            //獲取當(dāng)前結(jié)果數(shù)
            final int size = futures.size();
            //遍歷將任務(wù)進(jìn)行執(zhí)行
            for (int i = 0; i < size; i++) {
                execute((Runnable)futures.get(i));
                //并且每次都計(jì)算死線
                nanos = deadline - System.nanoTime();
                //如果時(shí)間已經(jīng)超過(guò)則返回結(jié)果
                if (nanos <= 0L)
                    return futures;
            }
            //否則遍歷future確定每次執(zhí)行都獲取到了結(jié)果
            for (int i = 0; i < size; i++) {
                Future<T> f = futures.get(i);
                if (!f.isDone()) {
                    //如果在等待過(guò)程中已經(jīng)超時(shí)則返回當(dāng)前等待結(jié)合
                    if (nanos <= 0L)
                        return futures;
                    try {
                        //如果沒(méi)有超過(guò)死線則設(shè)置從future中獲取結(jié)果的時(shí)間如果超過(guò)則會(huì)派出timeout
                        f.get(nanos, TimeUnit.NANOSECONDS);
                    } catch (CancellationException ignore) {
                    } catch (ExecutionException ignore) {
                    } catch (TimeoutException toe) {
                        //拋出了異常則會(huì)返回當(dāng)前的列表
                        return futures;
                    }
                    //計(jì)算最新的超時(shí)時(shí)間
                    nanos = deadline - System.nanoTime();
                }
            }
            //之前的返回都沒(méi)有設(shè)置為true所以在finally中都會(huì)設(shè)置為取消唯獨(dú)正常執(zhí)行完成到此處返回的結(jié)果才是最終的結(jié)果
            done = true;
            return futures;
        } finally {
            if (!done)
                for (int i = 0, size = futures.size(); i < size; i++)
                    futures.get(i).cancel(true);
        }
    }

}

線程池的具體實(shí)現(xiàn)

  • ThreadPoolExecutor 默認(rèn)線程池
  • ScheduledThreadPoolExecutor 定時(shí)線程池 (下篇再做介紹)

ThreadPoolExecutor

線程池重點(diǎn)屬性

//用來(lái)標(biāo)記線程池狀態(tài)(高3位),線程個(gè)數(shù)(低29位)
//默認(rèn)是RUNNING狀態(tài)辽装,線程個(gè)數(shù)為0
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

//線程個(gè)數(shù)掩碼位數(shù)帮碰,并不是所有平臺(tái)int類型是32位,所以準(zhǔn)確說(shuō)是具體平臺(tái)下Integer的二進(jìn)制位數(shù)-3后的剩余位數(shù)才是線程的個(gè)數(shù)拾积,
private static final int COUNT_BITS = Integer.SIZE - 3;

//線程最大個(gè)數(shù)(低29位)000 11111111111111111111111111111
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

ctl 是對(duì)線程池的運(yùn)行狀態(tài)和線程池中有效線程的數(shù)量進(jìn)行控制的一個(gè)字段殉挽, 它包含兩部分的信息: 線程池的運(yùn)行狀態(tài) (runState) 和線程池內(nèi)有效線程的數(shù)量 (workerCount),這里可以看到拓巧,使用了Integer類型來(lái)保存斯碌,高3位保存runState,低29位保存workerCount肛度。COUNT_BITS 就是29傻唾,CAPACITY就是1左移29位減1(29個(gè)1),這個(gè)常量表示workerCount的上限值承耿,大約是5億冠骄。

ctl相關(guān)方法

  • runStateOf:獲取運(yùn)行狀態(tài);
  • workerCountOf:獲取活動(dòng)線程數(shù)加袋;
  • ctlOf:獲取運(yùn)行狀態(tài)和活動(dòng)線程數(shù)的值凛辣。
// 獲取高三位 運(yùn)行狀態(tài)
private static int runStateOf(int c)     { return c & ~CAPACITY; }

//獲取低29位 線程個(gè)數(shù)
private static int workerCountOf(int c)  { return c & CAPACITY; }

//計(jì)算ctl新值,線程狀態(tài) 與 線程個(gè)數(shù)
private static int ctlOf(int rs, int wc) { return rs | wc; }

線程池存在5種狀態(tài)

//運(yùn)行中 111 00000000000000000000000000000
private static final int RUNNING    = -1 << COUNT_BITS;
//關(guān)閉 000 00000000000000000000000000000
private static final int SHUTDOWN   =  0 << COUNT_BITS;
//停止 001 00000000000000000000000000000
private static final int STOP       =  1 << COUNT_BITS;
//整理 010 00000000000000000000000000000
private static final int TIDYING    =  2 << COUNT_BITS;
//終止 011 00000000000000000000000000000
private static final int TERMINATED =  3 << COUNT_BITS;

使用一個(gè)整形,前3位表示狀態(tài),后29位表示線程容量,也就是說(shuō)線程最多有 2^{30}?1 個(gè)

前三位 狀態(tài) 十進(jìn)制
111 RUNNING -1
000 SHUTDOWN 0
001 STOP 1
010 TIDYING 2
011 TERMINATED 3

也可以看出當(dāng)ctl小于零表示線程池仍在運(yùn)行

RUNNING

  • 狀態(tài)說(shuō)明:線程池處在RUNNING狀態(tài)時(shí)职烧,能夠接收新任務(wù)扁誓,以及對(duì)已添加的任務(wù)進(jìn)行處理。
  • 狀態(tài)切換:線程池的初始化狀態(tài)是RUNNING蚀之。換句話說(shuō)蝗敢,線程池被一旦被創(chuàng)建,就處于RUNNING狀態(tài)恬总,并且線程池中的任務(wù)數(shù)為0前普!

SHUTDOWN

  • 狀態(tài)說(shuō)明:線程池處在SHUTDOWN狀態(tài)時(shí),不接收新任務(wù)壹堰,但能處理已添加的任務(wù)拭卿。
  • 狀態(tài)切換:調(diào)用線程池的shutdown()接口時(shí)骡湖,線程池由RUNNING -> SHUTDOWN。

STOP

  • 狀態(tài)說(shuō)明:線程池處在STOP狀態(tài)時(shí)峻厚,不接收新任務(wù)响蕴,不處理已添加的任務(wù),并且會(huì)中斷正在處理的任務(wù)惠桃。
  • 狀態(tài)切換:調(diào)用線程池的shutdownNow()接口時(shí)浦夷,線程池由(RUNNING or SHUTDOWN ) -> STOP。

TIDYING

  • 狀態(tài)說(shuō)明:當(dāng)所有的任務(wù)已終止辜王,ctl記錄的”任務(wù)數(shù)量”為0劈狐,線程池會(huì)變?yōu)門(mén)IDYING狀態(tài)。當(dāng)線程池變?yōu)門(mén)IDYING狀態(tài)時(shí)呐馆,會(huì)執(zhí)行鉤子函數(shù)terminated()肥缔。terminated()在ThreadPoolExecutor類中是空的,若用戶想在線程池變?yōu)門(mén)IDYING時(shí)汹来,進(jìn)行相應(yīng)的處理续膳;可以通過(guò)重載terminated()函數(shù)來(lái)實(shí)現(xiàn)。
  • 狀態(tài)切換:當(dāng)線程池在SHUTDOWN狀態(tài)下收班,阻塞隊(duì)列為空并且線程池中執(zhí)行的任務(wù)也為空時(shí)坟岔,就會(huì)由 SHUTDOWN -> TIDYING。 當(dāng)線程池在STOP狀態(tài)下摔桦,線程池中執(zhí)行的任務(wù)為空時(shí)社付,就會(huì)由STOP -> TIDYING。

TERMINATED

  • 狀態(tài)說(shuō)明:線程池徹底終止酣溃,就變成TERMINATED狀態(tài)瘦穆。
  • 狀態(tài)切換:線程池處在TIDYING狀態(tài)時(shí),執(zhí)行完terminated()之后赊豌,就會(huì)由 TIDYING -> TERMINATED扛或。

進(jìn)入TERMINATED的條件如下:

  • 線程池不是RUNNING狀態(tài);
  • 線程池狀態(tài)不是TIDYING狀態(tài)或TERMINATED狀態(tài)碘饼;
  • 如果線程池狀態(tài)是SHUTDOWN并且workerQueue為空熙兔;
  • workerCount為0;
  • 設(shè)置TIDYING狀態(tài)成功艾恼。

線程池參數(shù)

corePoolSize

線程池中的核心線程數(shù)住涉,當(dāng)提交一個(gè)任務(wù)時(shí),線程池創(chuàng)建一個(gè)新線程執(zhí)行任務(wù)钠绍,直到當(dāng)前線程數(shù)等于corePoolSize舆声;如果當(dāng)前線程數(shù)為corePoolSize,繼續(xù)提交的任務(wù)被保存到阻塞隊(duì)列中,等待被執(zhí)行媳握;如果執(zhí)行了線程池的prestartAllCoreThreads()方法碱屁,線程池會(huì)提前創(chuàng)建并啟動(dòng)所有核心線程。

maximumPoolSize

線程池中允許的最大線程數(shù)蛾找。如果當(dāng)前阻塞隊(duì)列滿了娩脾,且繼續(xù)提交任務(wù),則創(chuàng)建新的線程執(zhí)行任務(wù)打毛,前提是當(dāng)前線程數(shù)小于maximumPoolSize柿赊;

keepAliveTim

線程池維護(hù)線程所允許的空閑時(shí)間。當(dāng)線程池中的線程數(shù)量大于corePoolSize的時(shí)候幻枉,如果這時(shí)沒(méi)有新的任務(wù)提交碰声,核心線程外的線程不會(huì)立即銷毀,而是會(huì)等待展辞,直到等待的時(shí)間超過(guò)了keepAliveTime奥邮;

unit

keepAliveTime的單位万牺;

workQueue

用來(lái)保存等待被執(zhí)行的任務(wù)的阻塞隊(duì)列罗珍,且任務(wù)必須實(shí)現(xiàn)Runable接口,在JDK中提供了如下阻塞隊(duì)列:

  • 1脚粟、ArrayBlockingQueue:基于數(shù)組結(jié)構(gòu)的有界阻塞隊(duì)列覆旱,按FIFO排序任務(wù);
  • 2核无、LinkedBlockingQuene:基于鏈表結(jié)構(gòu)的阻塞隊(duì)列扣唱,按FIFO排序任務(wù),吞吐量通常要高于ArrayBlockingQuene团南;
  • 3噪沙、SynchronousQuene:一個(gè)不存儲(chǔ)元素的阻塞隊(duì)列,每個(gè)插入操作必須等到另一個(gè)線程調(diào)用移除操作吐根,否則插入操作一直處于阻塞狀態(tài)正歼,吞吐量通常要高于LinkedBlockingQuene;
  • 4拷橘、priorityBlockingQuene:具有優(yōu)先級(jí)的無(wú)界阻塞隊(duì)列局义;

threadFactory

它是ThreadFactory類型的變量,用來(lái)創(chuàng)建新線程冗疮。默認(rèn)使用Executors.defaultThreadFactory() 來(lái)創(chuàng)建線程萄唇。使用默認(rèn)的ThreadFactory來(lái)創(chuàng)建線程時(shí),會(huì)使新創(chuàng)建的線程具有相同的NORM_PRIORITY優(yōu)先級(jí)并且是非守護(hù)線程术幔,同時(shí)也設(shè)置了線程的名稱另萤。

handler

線程池的飽和策略,當(dāng)阻塞隊(duì)列滿了诅挑,且沒(méi)有空閑的工作線程四敞,如果繼續(xù)提交任務(wù)勾缭,必須采取一種策略處理該任務(wù),線程池提供了4種策略:

  1. AbortPolicy:直接拋出異常目养,默認(rèn)策略俩由;
  2. CallerRunsPolicy:用調(diào)用者所在的線程來(lái)執(zhí)行任務(wù);
  3. DiscardOldestPolicy:丟棄阻塞隊(duì)列中靠最前的任務(wù)癌蚁,并執(zhí)行當(dāng)前任務(wù)幻梯;
  4. DiscardPolicy:直接丟棄任務(wù);

上面的4種策略都是ThreadPoolExecutor的內(nèi)部類努释。



當(dāng)然也可以根據(jù)應(yīng)用場(chǎng)景實(shí)現(xiàn)RejectedExecutionHandler接口碘梢,自定義飽和策略眨补,如記錄日志或持久化存儲(chǔ)不能處理的任務(wù)酿雪。

線程池的創(chuàng)建

有四個(gè)構(gòu)造函數(shù)蕾盯,其他三個(gè)都是調(diào)用下面代碼中的這個(gè)構(gòu)造函數(shù)

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) 

線程池監(jiān)控

public long getTaskCount() //線程池已執(zhí)行與未執(zhí)行的任務(wù)總數(shù)
public long getCompletedTaskCount() //已完成的任務(wù)數(shù)
public int getPoolSize() //線程池當(dāng)前的線程數(shù)
public int getActiveCount() //線程池中正在執(zhí)行任務(wù)的線程數(shù)量

線程池原理

核心方法分析

execute方法

execute 方法是提交任務(wù) command 到線程池進(jìn)行執(zhí)行

public void execute(Runnable command) {
    //如果任務(wù)為null排龄,則拋出NPE異常
    if (command == null)
        throw new NullPointerException();
    /*
     * clt記錄著runState和workerCount
     */
    int c = ctl.get();
    /*
     * workerCountOf方法取出低29位的值遏弱,表示當(dāng)前活動(dòng)的線程數(shù)渣窜;
     * 如果當(dāng)前活動(dòng)線程數(shù)小于corePoolSize呀枢,則新建一個(gè)線程放入線程池中霉晕;
     * 并把任務(wù)添加到該線程中缕减。
     */
    if (workerCountOf(c) < corePoolSize) {
        /*
         * addWorker中的第二個(gè)參數(shù)表示限制添加線程的數(shù)量是根據(jù)corePoolSize來(lái)判斷還是maximumPoolSize來(lái)判斷雷客;
         * 如果為true,根據(jù)corePoolSize來(lái)判斷桥狡;
         * 如果為false搅裙,則根據(jù)maximumPoolSize來(lái)判斷
         */
        if (addWorker(command, true))
            return;
        /*
         * 如果添加失敗,則重新獲取ctl值
         */
        c = ctl.get();
    }
    /*
     * 如果當(dāng)前線程池是運(yùn)行狀態(tài)并且任務(wù)添加到隊(duì)列成功
     */
    if (isRunning(c) && workQueue.offer(command)) {
        // 重新獲取ctl值
        int recheck = ctl.get();
        // 再次判斷線程池的運(yùn)行狀態(tài)裹芝,如果不是運(yùn)行狀態(tài)部逮,由于之前已經(jīng)把command添加到workQueue中了,
        // 這時(shí)需要移除該command
        // 執(zhí)行過(guò)后通過(guò)handler使用拒絕策略對(duì)該任務(wù)進(jìn)行處理嫂易,整個(gè)方法返回
        if (! isRunning(recheck) && remove(command))
            reject(command);
        /*
         * 獲取線程池中的有效線程數(shù)兄朋,如果數(shù)量是0,則執(zhí)行addWorker方法
         * 這里傳入的參數(shù)表示:
         * 1. 第一個(gè)參數(shù)為null炬搭,表示在線程池中創(chuàng)建一個(gè)線程蜈漓,但不去啟動(dòng);
         * 2. 第二個(gè)參數(shù)為false宫盔,將線程池的有限線程數(shù)量的上限設(shè)置為maximumPoolSize融虽,添加線程時(shí)根據(jù)maximumPoolSize來(lái)判斷;
         * 如果判斷workerCount大于0灼芭,則直接返回有额,在workQueue中新增的command會(huì)在將來(lái)的某個(gè)時(shí)刻被執(zhí)行。
         */
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    /*
     * 如果執(zhí)行到這里,有兩種情況:
     * 1. 線程池已經(jīng)不是RUNNING狀態(tài)巍佑;
     * 2. 線程池是RUNNING狀態(tài)茴迁,但workerCount >= corePoolSize并且workQueue已滿。
     * 這時(shí)萤衰,再次調(diào)用addWorker方法堕义,但第二個(gè)參數(shù)傳入為false,將線程池的有限線程數(shù)量的上限設(shè)置為maximumPoolSize脆栋;
     * 如果失敗則拒絕該任務(wù)
     */
    else if (!addWorker(command, false))
        reject(command);
}

簡(jiǎn)單來(lái)說(shuō)倦卖,在執(zhí)行execute()方法時(shí)如果狀態(tài)一直是RUNNING時(shí)的執(zhí)行過(guò)程如下:

  1. 如果 workerCount < corePoolSize,則創(chuàng)建并啟動(dòng)一個(gè)線程來(lái)執(zhí)行新提交的任務(wù)椿争;
  2. 如果 workerCount > corePoolSize怕膛,且線程池內(nèi)的阻塞隊(duì)列未滿,則將任務(wù)添加到阻塞隊(duì)列中秦踪;
  3. 如果 workerCount >= corePoolSize && workerCount < maximumPoolSize褐捻,且線程池內(nèi)的阻塞隊(duì)列已滿,則創(chuàng)建并啟動(dòng)一個(gè)線程來(lái)執(zhí)行新提交的任務(wù)椅邓;
  4. 如果 workerCount >= maximumPoolSize柠逞,并且線程池內(nèi)的阻塞隊(duì)列已滿,則根據(jù)拒絕策略來(lái)處理該任務(wù)希坚,默認(rèn)的處理方式是直接拋異常边苹。

注意:addWorker(null,false); ,也就是創(chuàng)建一個(gè)線程裁僧,但并沒(méi)有傳入從任務(wù),因?yàn)槿蝿?wù)已經(jīng)被添加到 workQueue中了慕购,所以 worker 在執(zhí)行的時(shí)候聊疲,會(huì)直接從 workQueue 中獲取任務(wù)。所以沪悲,在workerCountof(recheck) == 0 時(shí)執(zhí)行 addWorker(null,false); 也是為了保證線程池在RUNNING狀態(tài)下必須要有一個(gè)線程來(lái)執(zhí)行任務(wù)获洲。

execute方法執(zhí)行流程如下

addWorker方法

addWorker方法的主要工作是在線程池中創(chuàng)建一個(gè)新的線程并執(zhí)行:

  • firstTask參數(shù) 用于指定新增的線程執(zhí)行的第一個(gè)任務(wù);
  • core參數(shù) 為true表示在新增線程時(shí)會(huì)判斷當(dāng)前活動(dòng)線程數(shù)是否少于corePoolSize殿如,false表示新增線程前需要判斷當(dāng)前活動(dòng)線程數(shù)是否少于maximumPoolSize贡珊。

代碼如下:

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        //獲取ctl
        int c = ctl.get();
        // 獲取運(yùn)行狀態(tài)
        int rs = runStateOf(c);
        /*
         * 這個(gè)if判斷
         * 如果rs >= SHUTDOWN,則表示此時(shí)不再接收新任務(wù)涉馁;
         * 接著判斷以下3個(gè)條件门岔,只要有1個(gè)不滿足,則返回false:
         * 1. rs == SHUTDOWN烤送,這時(shí)表示關(guān)閉狀態(tài)寒随,不再接受新提交的任務(wù),但卻可以繼續(xù)處理阻塞隊(duì)列中已保存的任務(wù)
         * 2. firsTask為空
         * 3. 阻塞隊(duì)列不為空
         * 
         * 首先考慮rs == SHUTDOWN的情況
         * 這種情況下不會(huì)接受新提交的任務(wù),所以在firstTask不為空的時(shí)候會(huì)返回false妻往;
         * 然后互艾,如果firstTask為空,并且workQueue也為空讯泣,則返回false纫普,
         * 因?yàn)殛?duì)列中已經(jīng)沒(méi)有任務(wù)了,不需要再添加線程了
         */
        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                        firstTask == null &&
                        ! workQueue.isEmpty()))
            return false;
        for (;;) {
            // 獲取線程數(shù)
            int wc = workerCountOf(c);
            // 如果wc超過(guò)CAPACITY好渠,也就是ctl的低29位的最大值(二進(jìn)制是29個(gè)1)局嘁,返回false;
            // 這里的core是addWorker方法的第二個(gè)參數(shù)晦墙,如果為true表示根據(jù)corePoolSize來(lái)比較悦昵,
            // 如果為false則根據(jù)maximumPoolSize來(lái)比較。
            if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            // 嘗試增加workerCount晌畅,如果成功但指,則跳出第一個(gè)for循環(huán)
            if (compareAndIncrementWorkerCount(c))
                break retry;
            // 如果增加workerCount失敗,則重新獲取ctl的值
            c = ctl.get();  // Re-read ctl
            // 如果當(dāng)前的運(yùn)行狀態(tài)不等于rs抗楔,說(shuō)明狀態(tài)已被改變棋凳,返回第一個(gè)for循環(huán)繼續(xù)執(zhí)行
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        // 根據(jù)firstTask來(lái)創(chuàng)建Worker對(duì)象
        w = new Worker(firstTask);
        // 每一個(gè)Worker對(duì)象都會(huì)創(chuàng)建一個(gè)線程
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                int rs = runStateOf(ctl.get());
                // rs < SHUTDOWN表示是RUNNING狀態(tài);
                // 如果rs是RUNNING狀態(tài)或者rs是SHUTDOWN狀態(tài)并且firstTask為null连躏,向線程池中添加線程剩岳。
                // 因?yàn)樵赟HUTDOWN時(shí)不會(huì)在添加新的任務(wù),但還是會(huì)執(zhí)行workQueue中的任務(wù)
                if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    // workers是一個(gè)HashSet
                    workers.add(w);
                    int s = workers.size();
                    // largestPoolSize記錄著線程池中出現(xiàn)過(guò)的最大線程數(shù)量
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                // 啟動(dòng)線程
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

Worker類

線程池中的每一個(gè)線程被封裝成一個(gè) Worker 對(duì)象入热,ThreadPool 維護(hù)的其實(shí)就是一組 Worker 對(duì)象拍棕。

/**
* Woker主要維護(hù)著運(yùn)行task的worker的中斷控制信息,以及其他小記錄勺良。這個(gè)類繼承AbstractQueuedSynchronizer
* 而來(lái)簡(jiǎn)化獲取和釋放每一個(gè)任務(wù)執(zhí)行中的鎖绰播。這可以防止中斷那些打算喚醒正在等待其他線程任務(wù)的任務(wù),而不是
* 中斷正在運(yùn)行的任務(wù)尚困。我們實(shí)現(xiàn)一個(gè)簡(jiǎn)單的不可重入鎖而不是ReentrantLock蠢箩,因?yàn)槲覀儾幌氘?dāng)其調(diào)用setCorePoolSize
* 這樣的方法的時(shí)候能獲得鎖。
*/
//worker主要是對(duì)進(jìn)行中的任務(wù)進(jìn)行中斷控制事甜,順帶著對(duì)其他行為進(jìn)行記錄
private final class Worker
    extends AbstractQueuedSynchronizer
    implements Runnable
    {
        private static final long serialVersionUID = 6138294804551838833L;

        //正在跑的線程谬泌,如果是null標(biāo)識(shí)factory失敗
        final Thread thread;
        //初始化一個(gè)任務(wù)以運(yùn)行
        Runnable firstTask;
        //每個(gè)線程計(jì)數(shù)
        volatile long completedTasks;

        /**
         * 用給定的firstTask和從threadFactory創(chuàng)建
         */
        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }

        //主要調(diào)用了runWorker
        public void run() {
            runWorker(this);
        }

        // Lock methods
        //
        // The value 0 represents the unlocked state.
        // The value 1 represents the locked state.

        protected boolean isHeldExclusively() {
            return getState() != 0;
        }

        //嘗試獲取鎖
        protected boolean tryAcquire(int unused) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }
        //嘗試釋放鎖
        protected boolean tryRelease(int unused) {
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

        public void lock()        { acquire(1); }
        public boolean tryLock()  { return tryAcquire(1); }
        public void unlock()      { release(1); }
        public boolean isLocked() { return isHeldExclusively(); }

        void interruptIfStarted() {
            Thread t;
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }
    }

Worker類繼承了AQS,并實(shí)現(xiàn)了Runnable接口逻谦,注意其中的 firstTask 和 thread 屬性:

  • firstTask用它來(lái)保存?zhèn)魅氲娜蝿?wù)掌实;
  • thread是在調(diào)用構(gòu)造方法時(shí)通過(guò) ThreadFactory 來(lái)創(chuàng)建的線程,是用來(lái)處理任務(wù)的線程跨跨。

在調(diào)用構(gòu)造方法時(shí)潮峦,需要把任務(wù)傳入囱皿,這里通過(guò) getThreadFactory().newThread(this); 來(lái)新建一個(gè)線程,newThread 方法傳入的參數(shù)是 this忱嘹,因?yàn)?Worker 本身繼承了 Runnable 接口嘱腥,也就是一個(gè)線程,所以一個(gè) Worker 對(duì)象在啟動(dòng)的時(shí)候會(huì)調(diào)用 Worker 類中的 run 方法拘悦。

Worker 繼承了 AQS齿兔,使用AQS來(lái)實(shí)現(xiàn)獨(dú)占鎖的功能。為什么不使用 ReentrantLock 來(lái)實(shí)現(xiàn)呢础米?可以看到 tryAcquire 方法分苇,它是不允許重入的,而 ReentrantLock 是允許重入的:

  1. lock方法一旦獲取了獨(dú)占鎖屁桑,表示當(dāng)前線程正在執(zhí)行任務(wù)中医寿;
  2. 如果正在執(zhí)行任務(wù),則不應(yīng)該中斷線程蘑斧;
  3. 如果該線程現(xiàn)在不是獨(dú)占鎖的狀態(tài)靖秩,也就是空閑的狀態(tài),說(shuō)明它沒(méi)有在處理任務(wù)竖瘾,這時(shí)可以對(duì)該線程進(jìn)行中斷沟突;
  4. 線程池在執(zhí)行 shutdown 方法或 tryTerminate 方法時(shí)會(huì)調(diào)用 interruptIdleWorkers 方法來(lái)中斷空閑的線程,interruptIdleWorkers 方法會(huì)使用 tryLock 方法來(lái)判斷線程池中的線程是否是空閑狀態(tài)捕传;
  5. 之所以設(shè)置為不可重入惠拭,是因?yàn)槲覀儾幌M蝿?wù)在調(diào)用像 setCorePoolSize 這樣的線程池控制方法時(shí)重新獲取鎖。如果使用ReentrantLock庸论,它是可重入的职辅,這樣如果在任務(wù)中調(diào)用了如 setCorePoolSize 這類線程池控制的方法,會(huì)中斷正在運(yùn)行的線程葡公。

所以罐农,Worker繼承自AQS,用于判斷線程是否空閑以及是否可以被中斷催什。

Worker(Runnable firstTask) {
    setState(-1); // inhibit interrupts until runWorker
    this.firstTask = firstTask;
    this.thread = getThreadFactory().newThread(this);
}

此外,在構(gòu)造方法中執(zhí)行了 setState(-1); 宰睡,把 state 變量設(shè)置為 -1蒲凶,為什么這樣做呢?是因?yàn)锳QS中默認(rèn)的 state 是0拆内,如果剛剛創(chuàng)建了一個(gè) Worker 對(duì)象旋圆,還沒(méi)有執(zhí)行任務(wù)時(shí),這時(shí)就不應(yīng)該被中斷麸恍,看一下 tryAquire 方法:

protected boolean tryAcquire(int unused) {
    //cas修改state灵巧,不可重入
    if (compareAndSetState(0, 1)) { 
        setExclusiveOwnerThread(Thread.currentThread());
        return true;
    }
    return false;
}

tryAcquire方法是根據(jù)state是否是0來(lái)判斷的搀矫,所以,setState(-1);將state設(shè)置為-1是為了禁止在執(zhí)行任務(wù)前對(duì)線程進(jìn)行中斷刻肄。

既然是AQS,其對(duì)于state的解釋也很關(guān)鍵

state 含義
-1 初始化狀態(tài),禁止中斷
0 解鎖狀態(tài)
1 上鎖狀態(tài)(獨(dú)占)

正因?yàn)槿绱巳壳颍趓unWorker方法中會(huì)先調(diào)用Worker對(duì)象的unlock方法將state設(shè)置為0。

runWorker方法

在Worker類中的run方法調(diào)用了runWorker方法來(lái)執(zhí)行任務(wù)敏弃,runWorker方法的代碼如下:

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    // 獲取第一個(gè)任務(wù)
    Runnable task = w.firstTask;
    w.firstTask = null;
    // 解鎖,允許interrupt操作
    w.unlock(); // allow interrupts
    // 是否因?yàn)楫惓M顺鲅h(huán)
    boolean completedAbruptly = true;
    try {
        // 如果task為空卦羡,則通過(guò)getTask來(lái)獲取任務(wù)
        while (task != null || (task = getTask()) != null) {
            // 這兒對(duì)worker進(jìn)行加鎖,是為了達(dá)到下面的目的
            // 1. 降低鎖范圍麦到,提升性能
            // 2. 保證每個(gè)worker執(zhí)行的任務(wù)是串行的
            w.lock();
            // 如果線程池處于STOP狀態(tài)就中斷線程
            // 如果線程被中斷(清除中斷標(biāo)記),線程池處于STOP狀態(tài),線程沒(méi)有再被中斷
            if ((runStateAtLeast(ctl.get(), STOP) || //至少是STOP狀態(tài)
                    (Thread.interrupted() && //中斷過(guò)(并抹除中斷標(biāo)記)
                            runStateAtLeast(ctl.get(), STOP))) && //再次檢查
                    !wt.isInterrupted()) //wt沒(méi)有被中斷
                wt.interrupt();
            try {
                beforeExecute(wt, task); //鉤子方法
                Throwable thrown = null;
                try {
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown); //鉤子方法
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

這里說(shuō)明一下第一個(gè) if判斷 绿饵,目的是:

  • 如果線程池正在停止,那么要保證當(dāng)前線程是中斷狀態(tài)瓶颠;
  • 如果不是的話拟赊,則要保證當(dāng)前線程不是中斷狀態(tài);

這里要考慮在執(zhí)行該 if語(yǔ)句 期間可能也執(zhí)行了 shutdownNow 方法粹淋,shutdownNow 方法會(huì)把狀態(tài)設(shè)置為 STOP 吸祟,回顧一下 STOP 狀態(tài):不能接受新任務(wù),也不處理隊(duì)列中的任務(wù)廓啊,會(huì)中斷正在處理任務(wù)的線程欢搜。在線程池處于 RUNNING 或 SHUTDOWN 狀態(tài)時(shí),調(diào)用 shutdownNow() 方法會(huì)使線程池進(jìn)入該狀態(tài)谴轮。

總結(jié)一下runWorker方法的執(zhí)行過(guò)程:

  1. while循環(huán)不斷地通過(guò)getTask()方法獲取任務(wù)炒瘟;
  2. getTask()方法從阻塞隊(duì)列中取任務(wù);
  3. 如果線程池正在停止第步,那么要保證當(dāng)前線程是中斷狀態(tài)疮装,否則要保證當(dāng)前線程不是中斷狀態(tài);
  4. 調(diào)用task.run()執(zhí)行任務(wù)粘都;
  5. 如果task為null則跳出循環(huán)廓推,執(zhí)行processWorkerExit()方法;
  6. runWorker方法執(zhí)行完畢翩隧,也代表著Worker中的run方法執(zhí)行完畢樊展,銷毀線程。

這里的beforeExecute方法和afterExecute方法在ThreadPoolExecutor類中是空的堆生,留給子類來(lái)實(shí)現(xiàn)专缠。
completedAbruptly 變量來(lái)表示執(zhí)行任務(wù)過(guò)程中是否出現(xiàn)了異常,在processWorkerExit方法中會(huì)對(duì)該變量的值進(jìn)行判斷淑仆。

getTask方法

getTask方法用來(lái)從阻塞隊(duì)列中取任務(wù)涝婉,代碼如下:

private Runnable getTask() {
    // timeOut變量的值表示上次從阻塞隊(duì)列中取任務(wù)時(shí)是否超時(shí)
    boolean timedOut = false; // Did the last poll() time out?
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
        // Check if queue empty only if necessary.
        /*
         * 如果線程池狀態(tài)rs >= SHUTDOWN,也就是非RUNNING狀態(tài)蔗怠,再進(jìn)行以下判斷:
         * 1. rs >= STOP墩弯,線程池是否正在stop吩跋;
         * 2. 阻塞隊(duì)列是否為空。
         * 如果以上條件滿足渔工,則將workerCount減1并返回null锌钮。
         * 因?yàn)槿绻?dāng)前線程池狀態(tài)的值是SHUTDOWN或以上時(shí),不允許再向阻塞隊(duì)列中添加任務(wù)涨缚。
         */
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }
        int wc = workerCountOf(c);
        // Are workers subject to culling?
        // timed變量用于判斷是否需要進(jìn)行超時(shí)控制轧粟。
        // allowCoreThreadTimeOut默認(rèn)是false,也就是核心線程不允許進(jìn)行超時(shí)脓魏;
        // wc > corePoolSize兰吟,表示當(dāng)前線程池中的線程數(shù)量大于核心線程數(shù)量;
        // 對(duì)于超過(guò)核心線程數(shù)量的這些線程茂翔,需要進(jìn)行超時(shí)控制
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        /*
         * wc > maximumPoolSize的情況是因?yàn)榭赡茉诖朔椒▓?zhí)行階段同時(shí)執(zhí)行了setMaximumPoolSize方法混蔼;
         * timed && timedOut 如果為true,表示當(dāng)前操作需要進(jìn)行超時(shí)控制珊燎,并且上次從阻塞隊(duì)列中獲取任務(wù)發(fā)生了超時(shí)
         * 接下來(lái)判斷惭嚣,如果有效線程數(shù)量大于1,或者阻塞隊(duì)列是空的悔政,那么嘗試將workerCount減1晚吞;
         * 如果減1失敗,則返回重試谋国。
         * 如果wc == 1時(shí)槽地,也就說(shuō)明當(dāng)前線程是線程池中唯一的一個(gè)線程了。
         */
        if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }
        try {
            /*
             * 根據(jù)timed來(lái)判斷芦瘾,如果為true捌蚊,則通過(guò)阻塞隊(duì)列的poll方法進(jìn)行超時(shí)控制,如果在keepAliveTime時(shí)間內(nèi)沒(méi)有獲取到任務(wù)近弟,則返回null缅糟;
             * 否則通過(guò)take方法,如果這時(shí)隊(duì)列為空祷愉,則take方法會(huì)阻塞直到隊(duì)列不為空窗宦。
             *
             */
            Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
            if (r != null)
                return r;
            // 如果 r == null,說(shuō)明已經(jīng)超時(shí)二鳄,timedOut設(shè)置為true
            timedOut = true;
        } catch (InterruptedException retry) {
            // 如果獲取任務(wù)時(shí)當(dāng)前線程發(fā)生了中斷迫摔,則設(shè)置timedOut為false并返回循環(huán)重試
            timedOut = false;
        }
    }
}

這里重要的地方是第二個(gè) if判斷 ,目的是控制線程池的有效線程數(shù)量泥从。由上文中的分析可以知道,在執(zhí)行 execute 方法時(shí)沪摄,如果當(dāng)前線程池的線程數(shù)量超過(guò)了 corePoolSize 且小于 maximumPoolSize躯嫉,并且workQueue已滿時(shí)纱烘,則可以添加工作線程,但這時(shí)如果超時(shí)沒(méi)有獲取到任務(wù)祈餐,也就是 timedOut 為 true的情況擂啥,說(shuō)明 workQueue 已經(jīng)為空了,也就說(shuō)明了當(dāng)前線程池中不需要那么多線程來(lái)執(zhí)行任務(wù)了帆阳,可以把多余 corePoolSize 數(shù)量的線程銷毀掉哺壶,保持線程數(shù)量在 corePoolSize 即可。

什么時(shí)候會(huì)銷毀蜒谤?
當(dāng)然是runWorker方法執(zhí)行完之后山宾,也就是Worker中的run方法執(zhí)行完,由JVM自動(dòng)回收鳍徽。

getTask方法返回null時(shí)资锰,在runWorker方法中會(huì)跳出while循環(huán),然后會(huì)執(zhí)行 processWorkerExit方法阶祭。

processWorkerExit方法

private void processWorkerExit(Worker w, boolean completedAbruptly) {
    // 如果completedAbruptly值為true绷杜,則說(shuō)明線程執(zhí)行時(shí)出現(xiàn)了異常,需要將workerCount減1濒募;
    // 如果線程執(zhí)行時(shí)沒(méi)有出現(xiàn)異常鞭盟,說(shuō)明在getTask()方法中已經(jīng)已經(jīng)對(duì)workerCount進(jìn)行了減1操作,這里就不必再減了瑰剃。  
    if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
        decrementWorkerCount();
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        //統(tǒng)計(jì)完成的任務(wù)數(shù)
        completedTaskCount += w.completedTasks;
        // 從workers中移除齿诉,也就表示著從線程池中移除了一個(gè)工作線程
        workers.remove(w);
    } finally {
        mainLock.unlock();
    }
    // 根據(jù)線程池狀態(tài)進(jìn)行判斷是否結(jié)束線程池
    tryTerminate();
    int c = ctl.get();
    /*
     * 當(dāng)線程池是RUNNING或SHUTDOWN狀態(tài)時(shí),如果worker是異常結(jié)束培他,那么會(huì)直接addWorker鹃两;
     * 如果allowCoreThreadTimeOut=true,并且等待隊(duì)列有任務(wù)舀凛,至少保留一個(gè)worker俊扳;
     * 如果allowCoreThreadTimeOut=false,workerCount不少于corePoolSize猛遍。
     */
    if (runStateLessThan(c, STOP)) {
        if (!completedAbruptly) {
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }
        addWorker(null, false);
    }
}

至此馋记,processWorkerExit執(zhí)行完成之后,工作線程被銷毀懊烤,以上就是整個(gè)工作線程的生命周期梯醒,從 execute 方法開(kāi)始,Worker 使用 ThreadFactory 創(chuàng)建新的工作線程腌紧,runWorker 通過(guò) getTask 獲取任務(wù)茸习,然后執(zhí)行任務(wù),如果 getTask 返回null壁肋,進(jìn)入 oricessWorkerExit 方法号胚,整個(gè)線程結(jié)束籽慢。如下圖所示:
[外鏈圖片轉(zhuǎn)存失敗,源站可能有防盜鏈機(jī)制,建議將圖片保存下來(lái)直接上傳(img-aOmgFHaU-1607357995983)(https://imgkr2.cn-bj.ufileos.com/5171fa57-6556-4adb-a6e3-98a1805792e1.jpg?UCloudPublicKey=TOKEN_8d8b72be-579a-4e83-bfd0-5f6ce1546f13&Signature=d%252Fo2TdoPfih5GlF0LZOsql0wfWQ%253D&Expires=1607443308)]

shutdown方法

調(diào)用關(guān)系
[外鏈圖片轉(zhuǎn)存失敗,源站可能有防盜鏈機(jī)制,建議將圖片保存下來(lái)直接上傳(img-sfdnAYMr-1607357995986)(https://imgkr2.cn-bj.ufileos.com/cc964937-ee9d-4c8b-8d99-68b05c96c158.jpg?UCloudPublicKey=TOKEN_8d8b72be-579a-4e83-bfd0-5f6ce1546f13&Signature=ML48OwlgyVP5kZwQPCm99qp%252FGoU%253D&Expires=1607443292)]

shutdown

public void shutdown() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();//權(quán)限檢查
        advanceRunState(SHUTDOWN);//設(shè)置當(dāng)前線程池狀態(tài)為SHUTDOWN
        interruptIdleWorkers();//中斷空閑線程
        onShutdown(); // hook for ScheduledThreadPoolExecutor
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
}

interruptIdleWorkers

中斷空閑線程

private void interruptIdleWorkers(boolean onlyOne) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();//加鎖
    try {
        for (Worker w : workers) {
            Thread t = w.thread;
            if (!t.isInterrupted() && w.tryLock()) {//未被中斷且正在獨(dú)占運(yùn)行
                try {
                    t.interrupt();//中斷
                } catch (SecurityException ignore) {
                } finally {
                    w.unlock();//worker解鎖
                }
            }
            if (onlyOne)//如果只中斷一個(gè)
                break;
        }
    } finally {
        mainLock.unlock();//解鎖
    }
}

tryTerminate

final void tryTerminate() {
    for (;;) { // 無(wú)限循環(huán),確保操作成功
        // 獲取線程池控制狀態(tài)
        int c = ctl.get();
        if (isRunning(c) || // 線程池的運(yùn)行狀態(tài)為RUNNING
            runStateAtLeast(c, TIDYING) || // 線程池的運(yùn)行狀態(tài)最小要大于TIDYING
            (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))  // 線程池的運(yùn)行狀態(tài)為SHUTDOWN并且workQueue隊(duì)列不為null
            // 不能終止猫胁,直接返回
            return;
        if (workerCountOf(c) != 0) { // 線程池正在運(yùn)行的worker數(shù)量不為0    // Eligible to terminate
            // 僅僅中斷一個(gè)空閑的worker
            interruptIdleWorkers(ONLY_ONE);
            return;
        }
        // 獲取線程池的鎖
        final ReentrantLock mainLock = this.mainLock;
        // 獲取鎖
        mainLock.lock();
        try {
            if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { // 比較并設(shè)置線程池控制狀態(tài)為T(mén)IDYING
                try {
                    // 終止箱亿,鉤子函數(shù)
                    terminated();
                } finally {
                    // 設(shè)置線程池控制狀態(tài)為T(mén)ERMINATED
                    ctl.set(ctlOf(TERMINATED, 0));
                    // 釋放在termination條件上等待的所有線程
                    termination.signalAll();
                }
                return;
            }
        } finally {
            // 釋放鎖
            mainLock.unlock();
        }
        // else retry on failed CAS
    }
}

shutdownNow方法

public List<Runnable> shutdownNow() {
    List<Runnable> tasks;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        advanceRunState(STOP);//設(shè)置線程池狀態(tài)為STOP
        interruptWorkers();//中斷線程
        tasks = drainQueue();
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
    return tasks;
}

Executors類

Executors類,提供了一系列工廠方法用于創(chuàng)建線程池弃秆,返回的線程池都實(shí)現(xiàn)了ExecutorService接口届惋。


關(guān)于Callable的支持

  • Callable<Object> callable(Runnable task)

返回 Callable 對(duì)象,調(diào)用它時(shí)可運(yùn)行給定的任務(wù)菠赚。

public static Callable<Object> callable(Runnable task) {
    if (task == null)
        throw new NullPointerException();
    return new RunnableAdapter<Object>(task, null);
}
  • Callable<T> callable(Runnable task, T result)

返回 Callable 對(duì)象脑豹,調(diào)用它時(shí)可運(yùn)行給定的任務(wù)并返回給定的結(jié)果。callable(task)等價(jià)于callable(task, null)锈至。

public static <T> Callable<T> callable(Runnable task, T result) {
    if (task == null)
        throw new NullPointerException();
    return new RunnableAdapter<T>(task, result);
}

RunnableAdapter類

/**
* A callable that runs given task and returns given result
*/
static final class RunnableAdapter<T> implements Callable<T> {
    final Runnable task;
    final T result;
    RunnableAdapter(Runnable  task, T result) {
        this.task = task;
        this.result = result;
    }
    public T call() {
        task.run();
        return result;
    }
}
  • Callable callable(final PrivilegedAction action)

返回 Callable 對(duì)象晨缴,調(diào)用它時(shí)可運(yùn)行給定特權(quán)的操作并返回其結(jié)果。

public static Callable<Object> callable(final PrivilegedAction<?> action) {
    if (action == null)
        throw new NullPointerException();
    return new Callable<Object>() {
        public Object call() { return action.run(); }
    };
}
  • callable(PrivilegedExceptionAction action)

和Callable callable(final PrivilegedAction action)其類似峡捡,唯一的區(qū)別在于击碗,前者可拋出異常。

public static Callable<Object> callable(final PrivilegedExceptionAction<?> action) {
    if (action == null)
        throw new NullPointerException();
    return new Callable<Object>() {
        public Object call() throws Exception { return action.run(); }
    };
}

PrivilegedAction接口

public interface PrivilegedAction<T> {
    T run();
}
  • Callable privilegedCallable(Callable<T> callable)

返回 Callable 對(duì)象们拙,調(diào)用它時(shí)可在當(dāng)前的訪問(wèn)控制上下文中執(zhí)行給定的 callable 對(duì)象稍途。

public static <T> Callable<T> privilegedCallable(Callable<T> callable) {
    if (callable == null)
        throw new NullPointerException();
    return new PrivilegedCallable<T>(callable);
}
  • Callable privilegedCallableUsingCurrentClassLoader(Callable<T> callable)

返回 Callable 對(duì)象,調(diào)用它時(shí)可在當(dāng)前的訪問(wèn)控制上下文中砚婆,使用當(dāng)前上下文類加載器作為上下文類加載器來(lái)執(zhí)行給定的 callable 對(duì)象械拍。

public static <T> Callable<T> privilegedCallableUsingCurrentClassLoader(Callable<T> callable) {
    if (callable == null)
        throw new NullPointerException();
    return new PrivilegedCallableUsingCurrentClassLoader<T>(callable);
}

關(guān)于ThreadFactory的支持

  1. 根據(jù)需要?jiǎng)?chuàng)建新線程的對(duì)象。使用線程工廠就無(wú)需再手工編寫(xiě)對(duì) new Thread 的調(diào)用了装盯,從而允許應(yīng)用程序使用特殊的線程子類坷虑、屬性等等。
  2. Executors對(duì)其提供支持:DefaultThreadFactory和PrivilegedThreadFactory埂奈。
public interface ThreadFactory {
    Thread newThread(Runnable r);
}

ThreadFactory為默認(rèn)的defaultThreadFactory迄损,ThreadFactory定義了兩個(gè):defaultThreadFactory和privilegedThreadFactory,但是也可以自己定義账磺,實(shí)現(xiàn)ThreadFactory接口或者繼承已經(jīng)實(shí)現(xiàn)的這兩個(gè)實(shí)現(xiàn)并且修改對(duì)應(yīng)的代碼即可芹敌。

// 返回用于創(chuàng)建新線程的默認(rèn)線程工廠。
public static ThreadFactory defaultThreadFactory() {
    return new DefaultThreadFactory();
}

// 返回用于創(chuàng)建新線程的線程工廠垮抗,這些新線程與當(dāng)前線程具有相同的權(quán)限氏捞。
public static ThreadFactory privilegedThreadFactory() {
    return new PrivilegedThreadFactory();
}
  • DefaultThreadFactory
static class DefaultThreadFactory implements ThreadFactory {
    static final AtomicInteger poolNumber = new AtomicInteger(1);
    final ThreadGroup group;
    final AtomicInteger threadNumber = new AtomicInteger(1);
    final String namePrefix;

    DefaultThreadFactory() {
        SecurityManager s = System.getSecurityManager();
        group = (s != null)? s.getThreadGroup() :
                             Thread.currentThread().getThreadGroup();
        namePrefix = "pool-" +
                      poolNumber.getAndIncrement() +
                     "-thread-";
    }

    public Thread newThread(Runnable r) {
        Thread t = new Thread(group, r,
                              namePrefix + threadNumber.getAndIncrement(),
                              0);
        if (t.isDaemon())
            t.setDaemon(false);
        if (t.getPriority() != Thread.NORM_PRIORITY)
            t.setPriority(Thread.NORM_PRIORITY);
        return t;
    }
}

從源碼看出DefaultThreadFactory就是創(chuàng)建一個(gè)普通的線程,非守護(hù)線程冒版,優(yōu)先級(jí)為5液茎。
新線程具有可通過(guò) pool-N-thread-M 的 Thread.getName() 來(lái)訪問(wèn)的名稱,其中 N 是此工廠的序列號(hào), M 是此工廠所創(chuàng)建線程的序列號(hào)豁护。

  • PrivilegedThreadFactory
static class PrivilegedThreadFactory extends DefaultThreadFactory {
    private final ClassLoader ccl;
    private final AccessControlContext acc;

    PrivilegedThreadFactory() {
        super();
        this.ccl = Thread.currentThread().getContextClassLoader();
        this.acc = AccessController.getContext();
        acc.checkPermission(new RuntimePermission("setContextClassLoader"));
    }

    public Thread newThread(final Runnable r) {
        return super.newThread(new Runnable() {
            public void run() {
                AccessController.doPrivileged(new PrivilegedAction<Object>() {
                    public Object run() {
                        Thread.currentThread().setContextClassLoader(ccl);
                        r.run();
                        return null;
                    }
                }, acc);
            }
        });
    }
}

從源碼看出哼凯,PrivilegedThreadFactory extends DefaultThreadFactory從而具有與 defaultThreadFactory() 相同設(shè)置的線程。但增加了兩個(gè)特性:ClassLoader和AccessControlContext楚里,從而使運(yùn)行在此類線程中的任務(wù)具有與當(dāng)前線程相同的訪問(wèn)控制和類加載器。

關(guān)于RejectedExecutionHandler的支持

而默認(rèn)的拒絕策略RejectedExecutionHandler則為 AbortPolicy猎贴,并且所有的拒絕策略都是實(shí)現(xiàn)接口RejectedExecutionHandler

private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();

五種線程池創(chuàng)建類型

newFixedThreadPool(固定大小線程池)

newFixedThreadPool:創(chuàng)建一個(gè)核心線程個(gè)數(shù)和最大線程個(gè)數(shù)都為 nThreads 的線程池班缎,并且阻塞隊(duì)列長(zhǎng)度為 Integer.MAX_VALUEkeeyAliveTime=0 說(shuō)明只要線程個(gè)數(shù)比核心線程個(gè)數(shù)多并且當(dāng)前空閑則回收她渴。代碼如下:

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
 }
 //使用自定義線程創(chuàng)建工廠
 public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>(),
                                      threadFactory);
 }

newSingleThreadExecutor(單個(gè)后臺(tái)線程)

newSingleThreadExecutor:創(chuàng)建一個(gè)核心線程個(gè)數(shù)和最大線程個(gè)數(shù)都為1的線程池达址,并且阻塞隊(duì)列長(zhǎng)度為 Integer.MAX_VALUEkeeyAliveTime=0 說(shuō)明只要線程個(gè)數(shù)比核心線程個(gè)數(shù)多并且當(dāng)前空閑則回收趁耗。代碼如下:

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
 }

//使用自己的線程工廠
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
    return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>(),
                                    threadFactory));
}

newCachedThreadPool(無(wú)界線程池沉唠,可以進(jìn)行自動(dòng)線程回收)

newCachedThreadPool:創(chuàng)建一個(gè)按需創(chuàng)建線程的線程池,初始線程個(gè)數(shù)為 0苛败,最多線程個(gè)數(shù)為 Integer.MAX_VALUE满葛,并且阻塞隊(duì)列為同步隊(duì)列,keeyAliveTime=60 說(shuō)明只要當(dāng)前線程 60s 內(nèi)空閑則回收罢屈。這個(gè)特殊在于加入到同步隊(duì)列的任務(wù)會(huì)被馬上被執(zhí)行嘀韧,同步隊(duì)列里面最多只有一個(gè)任務(wù)。代碼如下:

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
}

//使用自定義的線程工廠
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>(),
                                      threadFactory);
}

newScheduledThreadPool (可調(diào)度)

newScheduledThreadPool:創(chuàng)建一個(gè)定長(zhǎng)線程池缠捌,支持定時(shí)及周期性任務(wù)執(zhí)行锄贷。newScheduledThreadPool 和 其他線程池最大的區(qū)別是使用的阻塞隊(duì)列是 DelayedWorkQueue,而且多了兩個(gè)定時(shí)執(zhí)行的方法scheduleAtFixedRate和scheduleWithFixedDelay

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
}

//使用自定義的線程工廠
public static ScheduledExecutorService newScheduledThreadPool(
        int corePoolSize, ThreadFactory threadFactory) {
    return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}

newWorkStealingPool(并行操作)

newWorkStealingPool:JDK1.8新增newWorkStealingPool曼月,適合使用在很耗時(shí)的操作谊却,但是newWorkStealingPool不是ThreadPoolExecutor的擴(kuò)展,它是新的線程池類ForkJoinPool的擴(kuò)展哑芹,但是都是在統(tǒng)一的一個(gè)Executors類中實(shí)現(xiàn)炎辨,由于能夠合理的使用CPU進(jìn)行對(duì)任務(wù)操作(并行操作),所以適合使用在很耗時(shí)的任務(wù)中绩衷。代碼如下:

public static ExecutorService newWorkStealingPool() {
    return new ForkJoinPool
            (Runtime.getRuntime().availableProcessors(),
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
}

public static ExecutorService newWorkStealingPool(int parallelism) {
    return new ForkJoinPool
            (parallelism,
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
}

相關(guān)文章

PS:以上代碼提交在 Githubhttps://github.com/Niuh-Study/niuh-juc-final.git

文章持續(xù)更新蹦魔,可以公眾號(hào)搜一搜「 一角錢(qián)技術(shù) 」第一時(shí)間閱讀, 本文 GitHub org_hejianhui/JavaStudy 已經(jīng)收錄咳燕,歡迎 Star勿决。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市招盲,隨后出現(xiàn)的幾起案子低缩,更是在濱河造成了極大的恐慌,老刑警劉巖,帶你破解...
    沈念sama閱讀 206,214評(píng)論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件咆繁,死亡現(xiàn)場(chǎng)離奇詭異讳推,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,307評(píng)論 2 382
  • 文/潘曉璐 我一進(jìn)店門(mén)燃异,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)坞琴,“玉大人,你說(shuō)我怎么就攤上這事究驴。” “怎么了匀伏?”我有些...
    開(kāi)封第一講書(shū)人閱讀 152,543評(píng)論 0 341
  • 文/不壞的土叔 我叫張陵洒忧,是天一觀的道長(zhǎng)。 經(jīng)常有香客問(wèn)我够颠,道長(zhǎng)熙侍,這世上最難降的妖魔是什么? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 55,221評(píng)論 1 279
  • 正文 為了忘掉前任履磨,我火速辦了婚禮蛉抓,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘蹬耘。我一直安慰自己芝雪,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,224評(píng)論 5 371
  • 文/花漫 我一把揭開(kāi)白布综苔。 她就那樣靜靜地躺著惩系,像睡著了一般。 火紅的嫁衣襯著肌膚如雪如筛。 梳的紋絲不亂的頭發(fā)上堡牡,一...
    開(kāi)封第一講書(shū)人閱讀 49,007評(píng)論 1 284
  • 那天,我揣著相機(jī)與錄音杨刨,去河邊找鬼晤柄。 笑死,一個(gè)胖子當(dāng)著我的面吹牛妖胀,可吹牛的內(nèi)容都是我干的芥颈。 我是一名探鬼主播,決...
    沈念sama閱讀 38,313評(píng)論 3 399
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼赚抡,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼爬坑!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起涂臣,我...
    開(kāi)封第一講書(shū)人閱讀 36,956評(píng)論 0 259
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤盾计,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體署辉,經(jīng)...
    沈念sama閱讀 43,441評(píng)論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡族铆,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 35,925評(píng)論 2 323
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了哭尝。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片哥攘。...
    茶點(diǎn)故事閱讀 38,018評(píng)論 1 333
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖刚夺,靈堂內(nèi)的尸體忽然破棺而出献丑,到底是詐尸還是另有隱情,我是刑警寧澤侠姑,帶...
    沈念sama閱讀 33,685評(píng)論 4 322
  • 正文 年R本政府宣布,位于F島的核電站箩做,受9級(jí)特大地震影響莽红,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜邦邦,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,234評(píng)論 3 307
  • 文/蒙蒙 一安吁、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧燃辖,春花似錦鬼店、人聲如沸。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 30,240評(píng)論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至氏身,卻和暖如春巍棱,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背蛋欣。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 31,464評(píng)論 1 261
  • 我被黑心中介騙來(lái)泰國(guó)打工航徙, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人陷虎。 一個(gè)月前我還...
    沈念sama閱讀 45,467評(píng)論 2 352
  • 正文 我出身青樓到踏,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親尚猿。 傳聞我的和親對(duì)象是個(gè)殘疾皇子窝稿,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,762評(píng)論 2 345

推薦閱讀更多精彩內(nèi)容