并發(fā)編程之Future&FutureTask深入解析

點(diǎn)贊再看溯职,養(yǎng)成習(xí)慣呈枉,搜一搜【一角錢技術(shù)】關(guān)注更多原創(chuàng)技術(shù)文章坐搔。本文 GitHub org_hejianhui/JavaStudy 已收錄,有我的系列文章。

Java線程實(shí)現(xiàn)方式主要有四種:

  • 繼承Thread類
  • 實(shí)現(xiàn)Runnable接口
  • 實(shí)現(xiàn)Callable接口通過(guò)FutureTask包裝器來(lái)創(chuàng)建Thread線程
  • 使用ExecutorService越败、Callable猖腕、Future實(shí)現(xiàn)有返回結(jié)果的多線程拆祈。

其中前兩種方式線程執(zhí)行完后都沒(méi)有返回值,后兩種是帶返回值的倘感。

Callable 和 Runnable 接口

Runnable接口

// 實(shí)現(xiàn)Runnable接口的類將被Thread執(zhí)行缘屹,表示一個(gè)基本的任務(wù)
public interface Runnable {
    // run方法就是它所有的內(nèi)容,就是實(shí)際執(zhí)行的任務(wù)
    public abstract void run();
}

沒(méi)有返回值

run 方法沒(méi)有返回值侠仇,雖然有一些別的方法也能實(shí)現(xiàn)返回值得效果轻姿,比如編寫日志文件或者修改共享變量等等犁珠,但是不僅容易出錯(cuò),效率也不高互亮。

不能拋出異常

public class RunThrowExceptionDemo {

    /**
     * 普通方法可以在方法簽名中拋出異常
     *
     * @throws IOException
     */
    public void normalMethod() throws IOException {
        throw new IOException();
    }

    class RunnableImpl implements Runnable {

        /**
         * run 方法內(nèi)無(wú)法拋出 checked Exception犁享,除非使用 try catch 進(jìn)行處理
         */
        @Override
        public void run() {
            try {
                throw new IOException();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

}

可以看到普通方法 normalMethod 可以在方法簽名上拋出異常,這樣上層接口就可以捕獲這個(gè)異常進(jìn)行處理豹休,但是實(shí)現(xiàn) Runnable 接口的類炊昆,run 方法無(wú)法拋出 checked Exception,只能在方法內(nèi)使用 try catch 進(jìn)行處理威根,這樣上層就無(wú)法得知線程中的異常鸳劳。

設(shè)計(jì)導(dǎo)致

其實(shí)這兩個(gè)缺陷主要原因就在于 Runnable 接口設(shè)計(jì)的 run 方法,這個(gè)方法已經(jīng)規(guī)定了 run() 方法的返回類型是 void渠羞,而且這個(gè)方法沒(méi)有聲明拋出任何異常译打。所以,當(dāng)實(shí)現(xiàn)并重寫這個(gè)方法時(shí)留美,我們既不能改返回值類型彰檬,也不能更改對(duì)于異常拋出的描述,因?yàn)樵趯?shí)現(xiàn)方法的時(shí)候谎砾,語(yǔ)法規(guī)定是不允許對(duì)這些內(nèi)容進(jìn)行修改的逢倍。

Runnable 為什么設(shè)計(jì)成這樣?

假設(shè) run() 方法可以返回返回值景图,或者可以拋出異常较雕,也無(wú)濟(jì)于事,因?yàn)槲覀儾](méi)有辦法在外層捕獲并處理挚币,這是因?yàn)檎{(diào)用 run() 方法的類(比如 Thread 類和線程池)是 Java 直接提供的郎笆,而不是我們編寫的。
所以就算它能有一個(gè)返回值忘晤,我們也很難把這個(gè)返回值利用到宛蚓,而 Callable 接口就是為了解決這兩個(gè)問(wèn)題。

Callable接口

public interface Callable<V> {
    //返回接口设塔,或者拋出異常
    V call() throws Exception;
}

可以看到 Callable 和 Runnable 接口其實(shí)比較相識(shí)凄吏,都只有一個(gè)方法,也就是線程任務(wù)執(zhí)行的方法闰蛔,區(qū)別就是 call 方法有返回值痕钢,而且聲明了 throws Exception。

Callable 和 Runnable 的不同之處

  • 方法名 :Callable 規(guī)定的執(zhí)行方法是 call()序六,而 Runnable 規(guī)定的執(zhí)行方法是 run()任连;
  • 返回值 :Callable 的任務(wù)執(zhí)行后有返回值,而 Runnable 的任務(wù)執(zhí)行后是沒(méi)有返回值的例诀;
  • 拋出異常 :call() 方法可拋出異常随抠,而 run() 方法是不能拋出受檢查異常的裁着;

與 Callable 配合的有一個(gè) Future 接口,通過(guò) Future 可以了解任務(wù)執(zhí)行情況拱她,或者取消任務(wù)的執(zhí)行二驰,還可獲取任務(wù)執(zhí)行的結(jié)果,這些功能都是 Runnable 做不到的秉沼,Callable 的功能要比 Runnable 強(qiáng)大桶雀。

Future接口

Future的作用

簡(jiǎn)單來(lái)說(shuō)就是利用線程達(dá)到異步的效果,同時(shí)還可以獲取子線程的返回值唬复。

比如當(dāng)做一定運(yùn)算的時(shí)候矗积,運(yùn)算過(guò)程可能比較耗時(shí),有時(shí)會(huì)去查數(shù)據(jù)庫(kù)敞咧,或是繁重的計(jì)算棘捣,比如壓縮、加密等妄均,在這種情況下柱锹,如果我們一直在原地等待方法返回哪自,顯然是不明智的丰包,整體程序的運(yùn)行效率會(huì)大大降低。

我們可以把運(yùn)算的過(guò)程放到子線程去執(zhí)行壤巷,再通過(guò) Future 去控制子線程執(zhí)行的計(jì)算過(guò)程邑彪,最后獲取到計(jì)算結(jié)果。這樣一來(lái)就可以把整個(gè)程序的運(yùn)行效率提高胧华,是一種異步的思想寄症。

Future的方法

Future 接口一共有5個(gè)方法,源代碼如下:


public interface Future<V> {

  /**
   * 嘗試取消任務(wù)矩动,如果任務(wù)已經(jīng)完成有巧、已取消或其他原因無(wú)法取消,則失敗悲没。
   * 1篮迎、如果任務(wù)還沒(méi)開(kāi)始執(zhí)行,則該任務(wù)不應(yīng)該運(yùn)行
   * 2示姿、如果任務(wù)已經(jīng)開(kāi)始執(zhí)行甜橱,由參數(shù)mayInterruptIfRunning來(lái)決定執(zhí)行該任務(wù)的線程是否應(yīng)該被中斷,這只是終止任務(wù)的一種嘗試栈戳。若mayInterruptIfRunning為true岂傲,則會(huì)立即中斷執(zhí)行任務(wù)的線程并返回true,若mayInterruptIfRunning為false子檀,則會(huì)返回true且不會(huì)中斷任務(wù)執(zhí)行線程镊掖。
   * 3乃戈、調(diào)用這個(gè)方法后,以后對(duì)isDone方法調(diào)用都返回true堰乔。
   * 4偏化、如果這個(gè)方法返回true,以后對(duì)isCancelled返回true。
   */
    boolean cancel(boolean mayInterruptIfRunning);

   /**
    * 判斷任務(wù)是否被取消了镐侯,如果調(diào)用了cance()則返回true
    */
    boolean isCancelled();

   /**
    * 如果任務(wù)完成侦讨,則返回ture
    * 任務(wù)完成包含正常終止、異常苟翻、取消任務(wù)韵卤。在這些情況下都返回true
    */
    boolean isDone();

   /**
    * 線程阻塞,直到任務(wù)完成崇猫,返回結(jié)果
    * 如果任務(wù)被取消沈条,則引發(fā)CancellationException
    * 如果當(dāng)前線程被中斷,則引發(fā)InterruptedException
    * 當(dāng)任務(wù)在執(zhí)行的過(guò)程中出現(xiàn)異常诅炉,則拋出ExecutionException
    */
    V get() throws InterruptedException, ExecutionException;

   /**
    * 線程阻塞一定時(shí)間等待任務(wù)完成蜡歹,并返回任務(wù)執(zhí)行結(jié)果,如果則超時(shí)則拋出TimeoutException
    */
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

get方法(獲取結(jié)果)

get 方法最主要的作用就是獲取任務(wù)執(zhí)行的結(jié)果涕烧,該方法在執(zhí)行時(shí)的行為取決于 Callable 任務(wù)的狀態(tài)月而,可能會(huì)發(fā)生以下 7 種情況。

  • 任務(wù)已經(jīng)執(zhí)行完议纯,執(zhí)行 get 方法可以立刻返回父款,獲取到任務(wù)執(zhí)行的結(jié)果。

  • 任務(wù)還沒(méi)有開(kāi)始執(zhí)行瞻凤,比如我們往線程池中放一個(gè)任務(wù)憨攒,線程池中可能積壓了很多任務(wù),還沒(méi)輪到我去執(zhí)行的時(shí)候阀参,就去 get 了肝集,在這種情況下,相當(dāng)于任務(wù)還沒(méi)開(kāi)始蛛壳,我們?nèi)フ{(diào)用 get 的時(shí)候杏瞻,會(huì)當(dāng)前的線程阻塞,直到任務(wù)完成再把結(jié)果返回回來(lái)炕吸。

  • 任務(wù)正在執(zhí)行中伐憾,但是執(zhí)行過(guò)程比較長(zhǎng),所以我去 get 的時(shí)候赫模,它依然在執(zhí)行的過(guò)程中树肃。這種情況調(diào)用 get 方法也會(huì)阻塞當(dāng)前線程,直到任務(wù)執(zhí)行完返回結(jié)果瀑罗。

  • 任務(wù)執(zhí)行過(guò)程中拋出異常胸嘴,我們?cè)偃フ{(diào)用 get 的時(shí)候雏掠,就會(huì)拋出 ExecutionException 異常,不管我們執(zhí)行 call 方法時(shí)里面拋出的異常類型是什么劣像,在執(zhí)行 get 方法時(shí)所獲得的異常都是 ExecutionException乡话。

  • 任務(wù)被取消了,如果任務(wù)被取消耳奕,我們用 get 方法去獲取結(jié)果時(shí)則會(huì)拋出 CancellationException绑青。

  • 任務(wù)被中斷了,如果任務(wù)被當(dāng)前線程中斷屋群,我們用 get 方法去獲取結(jié)果時(shí)則會(huì)拋出InterruptedException闸婴。

  • 任務(wù)超時(shí),我們知道 get 方法有一個(gè)重載方法芍躏,那就是帶延遲參數(shù)的邪乍,調(diào)用了這個(gè)帶延遲參數(shù)的 get 方法之后,如果 call 方法在規(guī)定時(shí)間內(nèi)正常順利完成了任務(wù)对竣,那么 get 會(huì)正常返回庇楞;但是如果到達(dá)了指定時(shí)間依然沒(méi)有完成任務(wù),get 方法則會(huì)拋出 TimeoutException否纬,代表超時(shí)了吕晌。

參考示例:

package com.niuh.future;

import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class FutureDemo {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        Future<Integer> future = executorService.submit(new FutureTask());
        try {
            Integer res = future.get(2000, TimeUnit.MILLISECONDS);
            System.out.println("Future線程返回值:" + res);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
    }

    static class FutureTask implements Callable<Integer> {

        @Override
        public Integer call() throws Exception {
            Thread.sleep(new Random().nextInt(3000));
            return new Random().nextInt(10);
        }
    }
}

isDone方法(判斷是否執(zhí)行完畢)

isDone() 方法,該方法是用來(lái)判斷當(dāng)前這個(gè)任務(wù)是否執(zhí)行完畢了

package com.niuh.future;

import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class FutureIsDoneDemo {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        Future<Integer> future = executorService.submit(new FutureTask());
        try {
            for (int i = 0; i < 3; i++) {
                Thread.sleep(1000);
                System.out.println("線程是否完成:" + future.isDone());
            }
            Integer res = future.get(2000, TimeUnit.MILLISECONDS);
            System.out.println("Future 線程返回值:" + res);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
    }

    static class FutureTask implements Callable<Integer> {

        @Override
        public Integer call() throws Exception {
            Thread.sleep(2000);
            return new Random().nextInt(10);
        }
    }
}

執(zhí)行結(jié)果:

線程是否完成:false
線程是否完成:false
線程是否完成:true
Future 線程返回值:9

可以看到前兩次 isDone 方法的返回結(jié)果是 false烦味,因?yàn)榫€程任務(wù)還沒(méi)有執(zhí)行完成聂使,第三次 isDone 方法的返回結(jié)果是 ture壁拉。

注意:這個(gè)方法返回 true 則代表執(zhí)行完成了谬俄,返回 false 則代表還沒(méi)完成。但返回 true弃理,并不代表這個(gè)任務(wù)是成功執(zhí)行的溃论,比如說(shuō)任務(wù)執(zhí)行到一半拋出了異常。那么在這種情況下痘昌,對(duì)于這個(gè) isDone 方法而言钥勋,它其實(shí)也是會(huì)返回 true 的,因?yàn)閷?duì)它來(lái)說(shuō)辆苔,雖然有異常發(fā)生了算灸,但是這個(gè)任務(wù)在未來(lái)也不會(huì)再被執(zhí)行,它確實(shí)已經(jīng)執(zhí)行完畢了驻啤。所以 isDone 方法在返回 true 的時(shí)候菲驴,不代表這個(gè)任務(wù)是成功執(zhí)行的,只代表它執(zhí)行完畢了骑冗。

我們將上面的示例稍作修改再來(lái)看下結(jié)果赊瞬,修改 FutureTask 代碼如下:

static class FutureTask implements Callable<Integer> {
    @Override
    public Integer call() throws Exception {
        Thread.sleep(2000);
        throw new Exception("故意拋出異常");
    }
}

執(zhí)行結(jié)果:



雖然拋出了異常先煎,但是 isDone 方法的返回結(jié)果依然是 ture。

這段代碼說(shuō)明了:

  • 即便任務(wù)拋出異常巧涧,isDone 方法依然會(huì)返回 true薯蝎。
  • 雖然拋出的異常是 IllegalArgumentException,但是對(duì)于 get 而言谤绳,它拋出的異常依然是 ExecutionException占锯。
  • 雖然在任務(wù)執(zhí)行到2秒的時(shí)候就拋出了異常,但是真正要等到我們執(zhí)行 get 的時(shí)候缩筛,才看到了異常烟央。

cancel方法(取消任務(wù)的執(zhí)行)

如果不想執(zhí)行某個(gè)任務(wù)了,則可以使用 cancel 方法歪脏,會(huì)有以下三種情況:

  • 第一種情況最簡(jiǎn)單疑俭,那就是當(dāng)任務(wù)還沒(méi)有開(kāi)始執(zhí)行時(shí),一旦調(diào)用 cancel婿失,這個(gè)任務(wù)就會(huì)被正常取消钞艇,未來(lái)也不會(huì)被執(zhí)行,那么 cancel 方法返回 true豪硅。

  • 第二種情況也比較簡(jiǎn)單哩照。如果任務(wù)已經(jīng)完成,或者之前已經(jīng)被取消過(guò)了懒浮,那么執(zhí)行 cancel 方法則代表取消失敗飘弧,返回 false。因?yàn)槿蝿?wù)無(wú)論是已完成還是已經(jīng)被取消過(guò)了砚著,都不能再被取消了次伶。

  • 第三種情況比較特殊,就是這個(gè)任務(wù)正在執(zhí)行稽穆,這個(gè)時(shí)候執(zhí)行 cancel 方法是不會(huì)直接取消這個(gè)任務(wù)的冠王,而是會(huì)根據(jù)我們傳入的參數(shù)做判斷。cancel 方法是必須傳入一個(gè)參數(shù)舌镶,該參數(shù)叫作 mayInterruptIfRunning柱彻,它是什么含義呢?

    • 如果傳入的參數(shù)是 true餐胀,執(zhí)行任務(wù)的線程就會(huì)收到一個(gè)中斷的信號(hào)哟楷,正在執(zhí)行的任務(wù)可能會(huì)有一些處理中斷的邏輯,進(jìn)而停止否灾,這個(gè)比較好理解卖擅。
    • 如果傳入的是 false 則就代表不中斷正在運(yùn)行的任務(wù),也就是說(shuō),本次 cancel 不會(huì)有任何效果磨镶,同時(shí) cancel 方法會(huì)返回 false溃蔫。

參考示例:

package com.niuh.future;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class FutureCancelDemo {

    static ExecutorService executorService = Executors.newSingleThreadExecutor();

    public static void main(String[] args) {
        // 當(dāng)任務(wù)還沒(méi)有開(kāi)始執(zhí)行
        // demo1();

        // 如果任務(wù)已經(jīng)執(zhí)行完
        // demo2();

        // 如果任務(wù)正在進(jìn)行中
        demo3();
    }

    private static void demo1() {
        for (int i = 0; i < 1000; i++) {
            executorService.submit(new FutureTask());
        }

        Future<String> future = executorService.submit(new FutureTask());
        try {
            boolean cancel = future.cancel(false);
            System.out.println("Future 任務(wù)是否被取消:" + cancel);
            String res = future.get(2000, TimeUnit.MILLISECONDS);
            System.out.println("Future 線程返回值:" + res);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        } finally {
            executorService.shutdown();
        }
    }


    private static void demo2() {
        Future<String> future = executorService.submit(new FutureTask());
        try {
            Thread.sleep(1000);
            boolean cancel = future.cancel(false);
            System.out.println("Future 任務(wù)是否被取消:" + cancel);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            executorService.shutdown();
        }
    }

    private static void demo3() {
        Future<String> future = executorService.submit(new FutureInterruptTask());
        try {
            Thread.sleep(1000);
            boolean cancel = future.cancel(true);
            System.out.println("Future 任務(wù)是否被取消:" + cancel);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            executorService.shutdown();
        }
    }


    static class FutureTask implements Callable<String> {

        @Override
        public String call() throws Exception {
            return "正常返回";
        }
    }

    static class FutureInterruptTask implements Callable<String> {

        @Override
        public String call() throws Exception {
            while (!Thread.currentThread().isInterrupted()) {
                System.out.println("循環(huán)執(zhí)行");
                Thread.sleep(500);
            }
            System.out.println("線程被中斷");
            return "正常返回";
        }
    }
}

這里,我們來(lái)分析下第三種情況(任務(wù)正在進(jìn)行中)琳猫,當(dāng)我們?cè)O(shè)置 true 時(shí)伟叛,線程停止

循環(huán)執(zhí)行
循環(huán)執(zhí)行
Future 任務(wù)是否被取消:true

當(dāng)我們?cè)O(shè)置 false 時(shí),任務(wù)雖然也被取消成功脐嫂,但是線程依然執(zhí)行统刮。

循環(huán)執(zhí)行
循環(huán)執(zhí)行
Future 任務(wù)是否被取消:true
循環(huán)執(zhí)行
循環(huán)執(zhí)行
循環(huán)執(zhí)行
循環(huán)執(zhí)行
......

那么如何選擇傳入 true 還是 false 呢?

  • 傳入 true 適用的情況是账千,明確知道這個(gè)任務(wù)能夠處理中斷侥蒙。
  • 傳入 false 適用于什么情況呢?
    • 如果我們明確知道這個(gè)線程不能處理中斷匀奏,那應(yīng)該傳入 false鞭衩。
    • 我們不知道這個(gè)任務(wù)是否支持取消(是否能響應(yīng)中斷),因?yàn)樵诖蠖鄶?shù)情況下代碼是多人協(xié)作的娃善,對(duì)于這個(gè)任務(wù)是否支持中斷论衍,我們不一定有十足的把握,那么在這種情況下也應(yīng)該傳入 false聚磺。
    • 如果這個(gè)任務(wù)一旦開(kāi)始運(yùn)行坯台,我們就希望它完全的執(zhí)行完畢。在這種情況下瘫寝,也應(yīng)該傳入 false蜒蕾。

需要注意的是,雖然示例中寫了 !Thread.currentThread().isInterrupted() 方法來(lái)判斷中斷焕阿,但是實(shí)際上并不是通過(guò)我們的代碼來(lái)進(jìn)行中斷咪啡,而是 Future#cancel(true) 內(nèi)部調(diào)用 t.interrupt 方法修改線程的狀態(tài)之后,Thread.sleep 會(huì)拋出 InterruptedException 異常捣鲸,線程池中會(huì)執(zhí)行異常的相關(guān)邏輯瑟匆,并退出當(dāng)前任務(wù)闽坡。 sleep 和 interrupt 會(huì)產(chǎn)生意想不到的效果栽惶。

比如我們將 FutureInterruptTask 代碼修改為 while(true) 形式,調(diào)用 cancel(true) 方法線程還是會(huì)被中斷疾嗅。

static class FutureInterruptTask implements Callable<String> {
    @Override
    public String call() throws Exception {
        while (true) {
            System.out.println("循環(huán)執(zhí)行");
            Thread.sleep(500);
        }
    }
}

isCancelled方法(判斷是否被取消)

isCancelled 方法外厂,判斷是否被取消,它和 cancel 方法配合使用代承,比較簡(jiǎn)單汁蝶,可以參考上面的示例。

Callable 和 Future 的關(guān)系

Callable 接口相比于 Runnable 的一大優(yōu)勢(shì)是可以有返回結(jié)果,返回結(jié)果就可以用 Future 類的 get 方法來(lái)獲取 掖棉。因此墓律,F(xiàn)uture 相當(dāng)于一個(gè)存儲(chǔ)器,它存儲(chǔ)了 Callable 的 call 方法的任務(wù)結(jié)果幔亥。

除此之外耻讽,我們還可以通過(guò) Future 的 isDone 方法來(lái)判斷任務(wù)是否已經(jīng)執(zhí)行完畢了,還可以通過(guò) cancel 方法取消這個(gè)任務(wù)帕棉,或限時(shí)獲取任務(wù)的結(jié)果等针肥,總之 Future 的功能比較豐富。

FutureTask

Future只是一個(gè)接口香伴,不能直接用來(lái)創(chuàng)建對(duì)象慰枕,其實(shí)現(xiàn)類是FutureTask,JDK1.8修改了FutureTask的實(shí)現(xiàn)即纲,JKD1.8不再依賴AQS來(lái)實(shí)現(xiàn)具帮,而是通過(guò)一個(gè)volatile變量state以及CAS操作來(lái)實(shí)現(xiàn)。FutureTask結(jié)構(gòu)如下所示:


我們來(lái)看一下 FutureTask 的代碼實(shí)現(xiàn):

public class FutureTask implements RunnableFuture {...}

可以看到低斋,它實(shí)現(xiàn)了一個(gè)接口匕坯,這個(gè)接口叫作 RunnableFuture。

RunnableFuture接口

我們來(lái)看一下 RunnableFuture 接口的代碼實(shí)現(xiàn):

public interface RunnableFuture<V> extends Runnable, Future<V> {
    /**
     * Sets this Future to the result of its computation
     * unless it has been cancelled.
     */
    void run();
}

既然 RunnableFuture 繼承了 Runnable 接口和 Future 接口拔稳,而 FutureTask 又實(shí)現(xiàn)了 RunnableFuture 接口葛峻,所以 FutureTask 既可以作為 Runnable 被線程執(zhí)行,又可以作為 Future 得到 Callable 的返回值巴比。

FutureTask源碼分析

成員變量

/*
 * 當(dāng)前任務(wù)運(yùn)行狀態(tài)
 * NEW -> COMPLETING -> NORMAL(正常結(jié)束术奖,返回結(jié)果)
 * NEW -> COMPLETING -> EXCEPTIONAL(返回異常結(jié)果)
 * NEW -> CANCELLED(任務(wù)被取消,無(wú)結(jié)果)
 * NEW -> INTERRUPTING -> INTERRUPTED(任務(wù)被打斷轻绞,無(wú)結(jié)果)
 */
private volatile int state;
private static final int NEW          = 0; // 新建 0
private static final int COMPLETING   = 1; // 執(zhí)行中 1
private static final int NORMAL       = 2; // 正常 2
private static final int EXCEPTIONAL  = 3; // 異常 3
private static final int CANCELLED    = 4; // 取消 4
private static final int INTERRUPTING = 5; // 中斷中 5
private static final int INTERRUPTED  = 6; // 被中斷 6

/** 將要被執(zhí)行的任務(wù) */
private Callable<V> callable;
/** 存放執(zhí)行結(jié)果采记,用于get()方法獲取結(jié)果,也可能用于get()方法拋出異常 */
private Object outcome; // non-volatile, protected by state reads/writes
/** 執(zhí)行任務(wù)Callable的線程; */
private volatile Thread runner;
/** 棧結(jié)構(gòu)的等待隊(duì)列政勃,該節(jié)點(diǎn)是棧中最頂層的節(jié)點(diǎn) */
private volatile WaitNode waiters;

為了后面更好的分析FutureTask的實(shí)現(xiàn)唧龄,這里有必要解釋下各個(gè)狀態(tài)。

  • NEW :表示是個(gè)新的任務(wù)或者還沒(méi)被執(zhí)行完的任務(wù)奸远。這是初始狀態(tài)既棺。
  • COMPLETING :任務(wù)已經(jīng)執(zhí)行完成或者執(zhí)行任務(wù)的時(shí)候發(fā)生異常,但是任務(wù)執(zhí)行結(jié)果或者異常原因還沒(méi)有保存到outcome字段(outcome字段用來(lái)保存任務(wù)執(zhí)行結(jié)果懒叛,如果發(fā)生異常丸冕,則用來(lái)保存異常原因)的時(shí)候,狀態(tài)會(huì)從NEW變更到COMPLETING薛窥。但是這個(gè)狀態(tài)會(huì)時(shí)間會(huì)比較短胖烛,屬于中間狀態(tài)眼姐。
  • NORMAL :任務(wù)已經(jīng)執(zhí)行完成并且任務(wù)執(zhí)行結(jié)果已經(jīng)保存到outcome字段,狀態(tài)會(huì)從COMPLETING轉(zhuǎn)換到NORMAL佩番。這是一個(gè)最終態(tài)众旗。
  • EXCEPTIONAL :任務(wù)執(zhí)行發(fā)生異常并且異常原因已經(jīng)保存到outcome字段中后,狀態(tài)會(huì)從COMPLETING轉(zhuǎn)換到EXCEPTIONAL趟畏。這是一個(gè)最終態(tài)逝钥。
  • CANCELLED :任務(wù)還沒(méi)開(kāi)始執(zhí)行或者已經(jīng)開(kāi)始執(zhí)行但是還沒(méi)有執(zhí)行完成的時(shí)候,用戶調(diào)用了cancel(false)方法取消任務(wù)且不中斷任務(wù)執(zhí)行線程拱镐,這個(gè)時(shí)候狀態(tài)會(huì)從NEW轉(zhuǎn)化為CANCELLED狀態(tài)艘款。這是一個(gè)最終態(tài)。
  • INTERRUPTING :任務(wù)還沒(méi)開(kāi)始執(zhí)行或者已經(jīng)執(zhí)行但是還沒(méi)有執(zhí)行完成的時(shí)候沃琅,用戶調(diào)用了cancel(true)方法取消任務(wù)并且要中斷任務(wù)執(zhí)行線程但是還沒(méi)有中斷任務(wù)執(zhí)行線程之前哗咆,狀態(tài)會(huì)從NEW轉(zhuǎn)化為INTERRUPTING。這是一個(gè)中間狀態(tài)益眉。
  • INTERRUPTED :調(diào)用interrupt()中斷任務(wù)執(zhí)行線程之后狀態(tài)會(huì)從INTERRUPTING轉(zhuǎn)換到INTERRUPTED晌柬。這是一個(gè)最終態(tài)。

有一點(diǎn)需要注意的是郭脂,所有值大于COMPLETING的狀態(tài)都表示任務(wù)已經(jīng)執(zhí)行完成(任務(wù)正常執(zhí)行完成年碘,任務(wù)執(zhí)行異常或者任務(wù)被取消)展鸡。

構(gòu)造方法

// Callable 構(gòu)造方法
public FutureTask(Callable<V> callable) {
    if (callable == null)
        throw new NullPointerException();
    this.callable = callable;
    this.state = NEW;       // ensure visibility of callable
}

// Runnable 構(gòu)造方法
public FutureTask(Runnable runnable, V result) {
    this.callable = Executors.callable(runnable, result);
    this.state = NEW;       // ensure visibility of callable
}

Runnable的構(gòu)造器屿衅,只有一個(gè)目的,就是通過(guò)Executors.callable把入?yún)⑥D(zhuǎn)化為RunnableAdapter莹弊,主要是因?yàn)镃allable的功能比Runnable豐富涤久,Callable有返回值,而Runnable沒(méi)有忍弛。

/**
* A callable that runs given task and returns given result
*/
static final class RunnableAdapter<T> implements Callable<T> {
    final Runnable task;
    final T result;
    RunnableAdapter(Runnable task, T result) {
        this.task = task;
        this.result = result;
    }
    public T call() {
        task.run();
        return result;
    }
}

這是一個(gè)典型的適配模型响迂,我們要把 Runnable 適配成 Callable,首先要實(shí)現(xiàn) Callable 的接口细疚,接著在 Callable 的 call 方法里面調(diào)用被適配對(duì)象(Runnable)的方法蔗彤。

內(nèi)部類

static final class WaitNode {
    volatile Thread thread;
    volatile WaitNode next;
    WaitNode() { thread = Thread.currentThread(); }
}

run方法

/**
 * run方法可以直接被調(diào)用
 * 也可以開(kāi)啟新的線程調(diào)用
 */
public void run() {
    // 狀態(tài)不是任務(wù)創(chuàng)建,或者當(dāng)前任務(wù)已經(jīng)有線程在執(zhí)行了疯兼,直接返回
    if (state != NEW ||
        !U.compareAndSwapObject(this, RUNNER, null, Thread.currentThread()))
        return;
    try {
        Callable<V> c = callable;
        // Callable 不為空然遏,并且已經(jīng)初始化完成
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
                //調(diào)用執(zhí)行
                result = c.call();
                ran = true;
            } catch (Throwable ex) {
                result = null;
                ran = false;//執(zhí)行失敗
                //通過(guò)CAS算法設(shè)置返回值(COMPLETING)和狀態(tài)值(EXCEPTIONAL)
                setException(ex);
            }
            //執(zhí)行成功通過(guò)CAS(UNSAFE)設(shè)置返回值(COMPLETING)和狀態(tài)值(NORMAL)
            if (ran)
                //將result賦值給outcome
                set(result);
        }
    } finally {
        // runner must be non-null until state is settled to
        // prevent concurrent calls to run()
        //將任務(wù)runner設(shè)置為null,避免發(fā)生并發(fā)調(diào)用run()方法
        runner = null;
        // state must be re-read after nulling runner to prevent
        // leaked interrupts
        //須重新讀取任務(wù)狀態(tài),避免不可達(dá)(泄漏)的中斷
        int s = state;
        //確保cancle(ture)操作時(shí)镇防,運(yùn)行中的任務(wù)能接收到中斷指令
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
}

  1. run方法是沒(méi)有返回值的啦鸣,通過(guò)給outcome屬性賦值(set(result)),get時(shí)就能從outcome屬性中拿到返回值。
  2. FutureTask 兩種構(gòu)造器来氧,最終都轉(zhuǎn)化成了 Callable,所以在 run 方法執(zhí)行的時(shí)候,只需要執(zhí)行 Callable 的 call 方法即可啦扬,在執(zhí)行 c.call()代碼時(shí)中狂,如果入?yún)⑹?Runnable 的話, 調(diào)用路徑為 c.call() -> RunnableAdapter.call() -> Runnable.run()扑毡,如果入?yún)⑹?Callable 的話胃榕,直接調(diào)用。

setException(Throwable t)方法

//發(fā)生異常時(shí)瞄摊,將返回值設(shè)置到outcome(=COMPLETING)中勋又,并更新任務(wù)狀態(tài)(EXCEPTIONAL)
protected void setException(Throwable t) {
    //調(diào)用UNSAFE類封裝的CAS算法,設(shè)置值
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        outcome = t;
    UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
    //喚醒因等待返回值而阻塞的線程
    finishCompletion();
    }
}

set(V v)方法

//任務(wù)正常完成换帜,將返回值設(shè)置到outcome(=COMPLETING)中楔壤,并更新任務(wù)狀態(tài)(=NORMAL)
protected void set(V v) {
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        outcome = v;
        UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
        finishCompletion();
    }
}

finishCompletion方法

//移除所有等待線程并發(fā)出信號(hào),調(diào)用done()惯驼,以及將任務(wù)callable清空
private void finishCompletion() {
    // assert state > COMPLETING;
    for (WaitNode q; (q = waiters) != null;) {
        if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
            //循環(huán)喚醒阻塞線程蹲嚣,直到阻塞隊(duì)列為空
            for (;;) {
                Thread t = q.thread;
                if (t != null) {
                    q.thread = null;
                    LockSupport.unpark(t);
                }
                WaitNode next = q.next;
                //一直到阻塞隊(duì)列為空,跳出循環(huán)
                if (next == null)
                    break;
                q.next = null; // unlink to help gc   方便gc在適當(dāng)?shù)臅r(shí)候回收
                q = next;
            }
            break;
        }
    }

    done();

    callable = null;        // to reduce footprint
}

handlePossibleCancellationInterrupt方法

private void handlePossibleCancellationInterrupt(int s) {
    // It is possible for our interrupter to stall before getting a
    // chance to interrupt us.  Let's spin-wait patiently.
    //自旋等待cancle(true)結(jié)束(中斷結(jié)束)
    if (s == INTERRUPTING)
        while (state == INTERRUPTING)
             Thread.yield(); // wait out pending interrupt

    // assert state == INTERRUPTED;

    // We want to clear any interrupt we may have received from
    // cancel(true).  However, it is permissible to use interrupts
    // as an independent mechanism for a task to communicate with
    // its caller, and there is no way to clear only the
    // cancellation interrupt.
    //
    // Thread.interrupted();
}

cancle方法

//取消任務(wù)執(zhí)行
public boolean cancel(boolean mayInterruptIfRunning) {
    //對(duì)NEW狀態(tài)的任務(wù)進(jìn)行中斷祟牲,并根據(jù)參數(shù)設(shè)置state
    if (!(state == NEW &&
          UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
              mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
        //任務(wù)已完成(已發(fā)出中斷或已取消)
        return false;       
    //中斷線程
    try {    // in case call to interrupt throws exception
        if (mayInterruptIfRunning) {//cancel(true)
            try {
                Thread t = runner;
                if (t != null)
                    t.interrupt();
            } finally { // final state
                //通過(guò)CAS算法隙畜,更新?tīng)顟B(tài)
                UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
            }
        }
    } finally {
        //喚醒阻塞線程
        finishCompletion();
    }
    return true;
}

get方法

 /**
 * 獲取執(zhí)行結(jié)果
 * @throws CancellationException {@inheritDoc}
 */
public V get() throws InterruptedException, ExecutionException {
    int s = state;
    if (s <= COMPLETING)
        //假如任務(wù)還沒(méi)有執(zhí)行完,則阻塞則塞線程说贝,直至任務(wù)執(zhí)行完成(結(jié)果已經(jīng)存放到對(duì)應(yīng)變量中)
        s = awaitDone(false, 0L);
    //返回結(jié)果
    return report(s);
}

/**
 * 獲取任務(wù)執(zhí)行結(jié)果议惰,指定時(shí)間結(jié)束,則超時(shí)返回乡恕,不再阻塞
 * @throws CancellationException {@inheritDoc}
 */
public V get(long timeout, TimeUnit unit)
    throws InterruptedException, ExecutionException, TimeoutException {
    if (unit == null)
        throw new NullPointerException();
    int s = state;
    if (s <= COMPLETING &&
        (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
        throw new TimeoutException();
    return report(s);
}

awaitDone方法

/**
 * Awaits completion or aborts on interrupt or timeout.
 * 如英文注釋:等待任務(wù)執(zhí)行完畢或任務(wù)中斷或任務(wù)超時(shí)
 *
 * @param timed true if use timed waits
 * @param nanos time to wait, if timed
 * @return state upon completion
 */
private int awaitDone(boolean timed, long nanos)
    throws InterruptedException {
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    WaitNode q = null;
    boolean queued = false;
    //循環(huán)等待
    for (;;) {
        //線程已經(jīng)中斷换淆,則移除等待任務(wù)
        if (Thread.interrupted()) {
            removeWaiter(q);
            //移除當(dāng)前任務(wù)后,拋出中斷異常
            throw new InterruptedException();
        }

        //任務(wù)已經(jīng)完成几颜,則返回任務(wù)狀態(tài)倍试,并對(duì)當(dāng)前任務(wù)清場(chǎng)處理
        int s = state;
        if (s > COMPLETING) {
            if (q != null) //任務(wù)不為空,則將執(zhí)行線程設(shè)為null蛋哭,避免并發(fā)下重復(fù)執(zhí)行
                q.thread = null;
            return s;
        }
        //設(shè)置結(jié)果县习,很快就能完成,自旋等待
        else if (s == COMPLETING) // cannot time out yet
            Thread.yield();  //任務(wù)提前退出
        //正在執(zhí)行或者還沒(méi)開(kāi)始谆趾,則構(gòu)建新的節(jié)點(diǎn)
        else if (q == null)
            q = new WaitNode();
        //判斷是否入隊(duì)躁愿,新節(jié)點(diǎn)一般在下一次循環(huán)入隊(duì)列阻塞
        else if (!queued)
            //沒(méi)有入隊(duì)列,設(shè)置q.next=waiters,并將waiters設(shè)為q
            queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                     q.next = waiters, q);
        //假如有超時(shí)限制沪蓬,則判斷是否超時(shí)
        else if (timed) {
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L) {
                //超時(shí)則將任務(wù)節(jié)點(diǎn)從阻塞隊(duì)列中移除彤钟,并返回狀態(tài)
                removeWaiter(q);
                return state;
            }
            //阻塞調(diào)用get方法的線程,有超時(shí)限制
            LockSupport.parkNanos(this, nanos);
        }
        else
            //阻塞調(diào)用get方法的線程跷叉,無(wú)超時(shí)限制
            LockSupport.park(this);
    }
}

removeWaiter方法

//移除任務(wù)節(jié)點(diǎn)
private void removeWaiter(WaitNode node) {
    if (node != null) {
        node.thread = null;
        retry:
        for (;;) {          // restart on removeWaiter race
            for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
                s = q.next;
                if (q.thread != null)
                    pred = q;
                else if (pred != null) {
                    pred.next = s;
                    if (pred.thread == null) // check for race
                        continue retry;
                    }
                else if (!UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                          q, s))
                    continue retry;
            }
            break;
        }
    }
}

done()方法

protected void done() { }

默認(rèn)實(shí)現(xiàn)不起任何作用逸雹。子類可以重寫营搅,此方法調(diào)用完成回調(diào)或執(zhí)行。注意:也可以在實(shí)現(xiàn)此方法來(lái)確定此任務(wù)是否已取消梆砸。

Future的使用

FutureTask可用于異步獲取執(zhí)行結(jié)果或取消執(zhí)行任務(wù)的場(chǎng)景转质。通過(guò)傳入Runnable或者Callable的任務(wù)給FutureTask,直接調(diào)用其run方法或者放入線程池執(zhí)行帖世,之后可以在外部通過(guò)FutureTask的get方法異步獲取執(zhí)行結(jié)果休蟹,因此,F(xiàn)utureTask非常適合用于耗時(shí)的計(jì)算日矫,主線程可以在完成自己的任務(wù)后赂弓,再去獲取結(jié)果。另外哪轿,F(xiàn)utureTask還可以確保即使調(diào)用了多次run方法盈魁,它都只會(huì)執(zhí)行一次Runnable或者Callable任務(wù),或者通過(guò)cancel取消FutureTask的執(zhí)行等缔逛。

FutureTask執(zhí)行多任務(wù)計(jì)算的使用場(chǎng)景

利用FutureTask和ExecutorService备埃,可以用多線程的方式提交計(jì)算任務(wù),主線程繼續(xù)執(zhí)行其他任務(wù)褐奴,當(dāng)主線程需要子線程的計(jì)算結(jié)果時(shí)按脚,在異步獲取子線程的執(zhí)行結(jié)果。

package com.niuh.future;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;

/**
 * <p>
 * FutureTask執(zhí)行多任務(wù)計(jì)算的使用場(chǎng)景
 * </p>
 */
public class FutureTaskForMultiCompute {
    public static void main(String[] args) {

        FutureTaskForMultiCompute inst = new FutureTaskForMultiCompute();

        // 創(chuàng)建任務(wù)集合
        List<FutureTask<Integer>> taskList = new ArrayList<FutureTask<Integer>>();

        // 創(chuàng)建線程池
        ExecutorService exec = Executors.newFixedThreadPool(5);

        for (int i = 0; i < 10; i++) {
            // 傳入Callable對(duì)象創(chuàng)建FutureTask對(duì)象
            FutureTask<Integer> ft = new FutureTask<Integer>(inst.new ComputeTask(i, "" + i));

            taskList.add(ft);
            
            // 提交給線程池執(zhí)行任務(wù)敦冬,也可以通過(guò)exec.invokeAll(taskList)一次性提交所有任務(wù);
            exec.submit(ft);
        }

        System.out.println("所有計(jì)算任務(wù)提交完畢, 主線程接著干其他事情辅搬!");

        // 開(kāi)始統(tǒng)計(jì)各計(jì)算線程計(jì)算結(jié)果
        Integer totalResult = 0;
        for (FutureTask<Integer> ft : taskList) {
            try {
                //FutureTask的get方法會(huì)自動(dòng)阻塞,直到獲取計(jì)算結(jié)果為止
                totalResult = totalResult + ft.get();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }

        // 關(guān)閉線程池
        exec.shutdown();
        System.out.println("多任務(wù)計(jì)算后的總結(jié)果是:" + totalResult);

    }

    private class ComputeTask implements Callable<Integer> {

        private Integer result = 0;
        private String taskName = "";

        public ComputeTask(Integer iniResult, String taskName) {
            result = iniResult;
            this.taskName = taskName;
            System.out.println("生成子線程計(jì)算任務(wù): " + taskName);
        }

        public String getTaskName() {
            return this.taskName;
        }

        @Override
        public Integer call() throws Exception {
            // TODO Auto-generated method stub

            for (int i = 0; i < 100; i++) {
                result = +i;
            }
            // 休眠5秒鐘,觀察主線程行為脖旱,預(yù)期的結(jié)果是主線程會(huì)繼續(xù)執(zhí)行堪遂,到要取得FutureTask的結(jié)果是等待直至完成。
            Thread.sleep(5000);
            System.out.println("子線程計(jì)算任務(wù): " + taskName + " 執(zhí)行完成!");
            return result;
        }
    }
}

執(zhí)行結(jié)果:

生成子線程計(jì)算任務(wù): 0
生成子線程計(jì)算任務(wù): 1
生成子線程計(jì)算任務(wù): 2
生成子線程計(jì)算任務(wù): 3
生成子線程計(jì)算任務(wù): 4
生成子線程計(jì)算任務(wù): 5
生成子線程計(jì)算任務(wù): 6
生成子線程計(jì)算任務(wù): 7
生成子線程計(jì)算任務(wù): 8
生成子線程計(jì)算任務(wù): 9
所有計(jì)算任務(wù)提交完畢, 主線程接著干其他事情萌庆!
子線程計(jì)算任務(wù): 0 執(zhí)行完成!
子線程計(jì)算任務(wù): 1 執(zhí)行完成!
子線程計(jì)算任務(wù): 3 執(zhí)行完成!
子線程計(jì)算任務(wù): 4 執(zhí)行完成!
子線程計(jì)算任務(wù): 2 執(zhí)行完成!
子線程計(jì)算任務(wù): 5 執(zhí)行完成!
子線程計(jì)算任務(wù): 7 執(zhí)行完成!
子線程計(jì)算任務(wù): 9 執(zhí)行完成!
子線程計(jì)算任務(wù): 8 執(zhí)行完成!
子線程計(jì)算任務(wù): 6 執(zhí)行完成!
多任務(wù)計(jì)算后的總結(jié)果是:990

FutureTask在高并發(fā)環(huán)境下確保任務(wù)只執(zhí)行一次

在很多高并發(fā)的環(huán)境下溶褪,往往我們只需要某些任務(wù)只執(zhí)行一次。這種使用情景FutureTask的特性恰能勝任践险。舉一個(gè)例子猿妈,假設(shè)有一個(gè)帶key的連接池,當(dāng)key存在時(shí)巍虫,即直接返回key對(duì)應(yīng)的對(duì)象彭则;當(dāng)key不存在時(shí),則創(chuàng)建連接占遥。對(duì)于這樣的應(yīng)用場(chǎng)景俯抖,通常采用的方法為使用一個(gè)Map對(duì)象來(lái)存儲(chǔ)key和連接池對(duì)應(yīng)的對(duì)應(yīng)關(guān)系,典型的代碼如下面所示:

package com.niuh.future;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.ReentrantLock;

/**
 * @program: 錯(cuò)誤示例
 * @description: 在很多高并發(fā)的環(huán)境下瓦胎,往往我們只需要某些任務(wù)只執(zhí)行一次芬萍。
 * 這種使用情景FutureTask的特性恰能勝任尤揣。舉一個(gè)例子,假設(shè)有一個(gè)帶key的連接池担忧,
 * 當(dāng)key存在時(shí)芹缔,即直接返回key對(duì)應(yīng)的對(duì)象坯癣;當(dāng)key不存在時(shí)瓶盛,則創(chuàng)建連接。對(duì)于這樣的應(yīng)用場(chǎng)景示罗,
 * 通常采用的方法為使用一個(gè)Map對(duì)象來(lái)存儲(chǔ)key和連接池對(duì)應(yīng)的對(duì)應(yīng)關(guān)系惩猫,典型的代碼如下
 * 在例子中,我們通過(guò)加鎖確保高并發(fā)環(huán)境下的線程安全蚜点,也確保了connection只創(chuàng)建一次轧房,然而卻犧牲了性能。
 */
public class FutureTaskConnection1 {
    private static Map<String, Connection> connectionPool = new HashMap<>();
    private static ReentrantLock lock = new ReentrantLock();

    public static Connection getConnection(String key) {
        try {
            lock.lock();
            Connection connection = connectionPool.get(key);
            if (connection == null) {
                Connection newConnection = createConnection();
                connectionPool.put(key, newConnection);
                return newConnection;
            }
            return connection;
        } finally {
            lock.unlock();
        }
    }

    private static Connection createConnection() {
        try {
            return DriverManager.getConnection("");
        } catch (SQLException e) {
            e.printStackTrace();
        }
        return null;
    }
}

在上面的例子中绍绘,我們通過(guò)加鎖確保高并發(fā)環(huán)境下的線程安全奶镶,也確保了connection只創(chuàng)建一次,然而卻犧牲了性能陪拘。改用ConcurrentHash的情況下厂镇,幾乎可以避免加鎖的操作,性能大大提高左刽。

package com.niuh.future;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.concurrent.ConcurrentHashMap;

/**
 * @description: 改用ConcurrentHash的情況下捺信,幾乎可以避免加鎖的操作,性能大大提高欠痴。
 * <p>
 * 但是在高并發(fā)的情況下有可能出現(xiàn)Connection被創(chuàng)建多次的現(xiàn)象迄靠。
 * 為什么呢?因?yàn)閯?chuàng)建Connection是一個(gè)耗時(shí)操作喇辽,假設(shè)多個(gè)線程涌入getConnection方法掌挚,都發(fā)現(xiàn)key對(duì)應(yīng)的鍵不存在,
 * 于是所有涌入的線程都開(kāi)始執(zhí)行conn=createConnection()菩咨,只不過(guò)最終只有一個(gè)線程能將connection插入到map里吠式。
 * 但是這樣以來(lái),其它線程創(chuàng)建的的connection就沒(méi)啥價(jià)值旦委,浪費(fèi)系統(tǒng)開(kāi)銷奇徒。
 */
public class FutureTaskConnection2 {
    private static ConcurrentHashMap<String, Connection> connectionPool = new ConcurrentHashMap<>();

    public static Connection getConnection(String key) {
        Connection connection = connectionPool.get(key);
        if (connection == null) {
            connection = createConnection();
            //根據(jù)putIfAbsent的返回值判斷是否有線程搶先插入了
            Connection returnConnection = connectionPool.putIfAbsent(key, connection);
            if (returnConnection != null) {
                connection = returnConnection;
            }
        } else {
            return connection;
        }
        return connection;
    }

    private static Connection createConnection() {
        try {
            return DriverManager.getConnection("");
        } catch (SQLException e) {
            e.printStackTrace();
        }
        return null;
    }

}

但是在高并發(fā)的情況下有可能出現(xiàn)Connection被創(chuàng)建多次的現(xiàn)象。
為什么呢缨硝?

因?yàn)閯?chuàng)建Connection是一個(gè)耗時(shí)操作摩钙,假設(shè)多個(gè)線程涌入getConnection方法,都發(fā)現(xiàn)key對(duì)應(yīng)的鍵不存在查辩,于是所有涌入的線程都開(kāi)始執(zhí)行conn=createConnection()胖笛,只不過(guò)最終只有一個(gè)線程能將connection插入到map里网持。但是這樣以來(lái),其它線程創(chuàng)建的的connection就沒(méi)啥價(jià)值长踊,浪費(fèi)系統(tǒng)開(kāi)銷功舀。

這時(shí)最需要解決的問(wèn)題就是當(dāng)key不存在時(shí),創(chuàng)建Connection的動(dòng)作(conn=createConnection();)能放在connectionPool.putIfAbsent()之后執(zhí)行身弊,這正是FutureTask發(fā)揮作用的時(shí)機(jī)辟汰,基于ConcurrentHashMap和FutureTask的改造代碼如下:

package com.niuh.future;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;

/**
 * @description: FutureTask在高并發(fā)環(huán)境下確保任務(wù)只執(zhí)行一次
 * 這時(shí)最需要解決的問(wèn)題就是當(dāng)key不存在時(shí),創(chuàng)建Connection的動(dòng)作(conn=createConnection();)
 * 能放在connectionPool.putIfAbsent()之后執(zhí)行阱佛,這正是FutureTask發(fā)揮作用的時(shí)機(jī)帖汞,
 * 基于ConcurrentHashMap和FutureTask的改造代碼如下:
 */
public class FutureTaskConnection3 {
    private static ConcurrentHashMap<String, FutureTask<Connection>> connectionPool = new ConcurrentHashMap<String, FutureTask<Connection>>();

    public static Connection getConnection(String key) {
        FutureTask<Connection> connectionFutureTask = connectionPool.get(key);
        try {
            if (connectionFutureTask != null) {
                return connectionFutureTask.get();
            } else {
                Callable<Connection> callable = new Callable<Connection>() {
                    @Override
                    public Connection call() throws Exception {
                        return createConnection();
                    }
                };
                FutureTask<Connection> newTask = new FutureTask<>(callable);
                FutureTask<Connection> returnFt = connectionPool.putIfAbsent(key, newTask);
                if (returnFt == null) {
                    connectionFutureTask = newTask;
                    newTask.run();
                }
                return connectionFutureTask.get();
            }
        } catch (ExecutionException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return null;
    }

    private static Connection createConnection() {
        try {
            return DriverManager.getConnection("");
        } catch (SQLException e) {
            e.printStackTrace();
        }
        return null;
    }
}

FutureTask任務(wù)執(zhí)行完回調(diào)

FutureTask有一個(gè)方法 void done()會(huì)在每個(gè)線程執(zhí)行完成return結(jié)果時(shí)回調(diào)。
假設(shè)現(xiàn)在需要實(shí)現(xiàn)每個(gè)線程完成任務(wù)執(zhí)行后主動(dòng)執(zhí)行后續(xù)任務(wù)凑术。

package com.niuh.future;

import lombok.extern.slf4j.Slf4j;

import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;

/**
 * FutureTask#done()
 */
@Slf4j
public class FutureTaskDemo1 {

    public static void main(String[] args) throws InterruptedException {
        // 月餅生產(chǎn)者
        final Callable<Integer> productor = new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                log.info("月餅制作中翩蘸。。淮逊。催首。");
                Thread.sleep(5000);
                return (Integer) new Random().nextInt(1000);
            }
        };

        // 月餅消費(fèi)者
        Runnable customer = new Runnable() {
            @Override
            public void run() {
                ExecutorService es = Executors.newCachedThreadPool();
                log.info("老板給我來(lái)一個(gè)月餅");
                for (int i = 0; i < 3; i++) {
                    FutureTask<Integer> futureTask = new FutureTask<Integer>(productor) {
                        @Override
                        protected void done() {
                            super.done();
                            try {
                                log.info(String.format(" 編號(hào)[%s]月餅已打包好", get()));
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            } catch (ExecutionException e) {
                                e.printStackTrace();
                            }
                        }
                    };
                    es.submit(futureTask);
                }
            }
        };
        new Thread(customer).start();
    }
}

執(zhí)行結(jié)果:

11:01:37.134 [Thread-0] INFO com.niuh.future.FutureTaskDemo1 - 老板給我來(lái)一個(gè)月餅
11:01:37.139 [pool-1-thread-1] INFO com.niuh.future.FutureTaskDemo1 - 月餅制作中。泄鹏。郎任。。
11:01:37.139 [pool-1-thread-2] INFO com.niuh.future.FutureTaskDemo1 - 月餅制作中命满。涝滴。。胶台。
11:01:37.139 [pool-1-thread-3] INFO com.niuh.future.FutureTaskDemo1 - 月餅制作中歼疮。。诈唬。蛹稍。
11:01:42.151 [pool-1-thread-2] INFO com.niuh.future.FutureTaskDemo1 -  編號(hào)[804]月餅已打包好
11:01:42.151 [pool-1-thread-3] INFO com.niuh.future.FutureTaskDemo1 -  編號(hào)[88]月餅已打包好
11:01:42.151 [pool-1-thread-1] INFO com.niuh.future.FutureTaskDemo1 -  編號(hào)[166]月餅已打包好

參考

PS:以上代碼提交在 Githubhttps://github.com/Niuh-Study/niuh-juc-final.git

文章持續(xù)更新巧勤,可以公眾號(hào)搜一搜「 一角錢技術(shù) 」第一時(shí)間閱讀, 本文 GitHub org_hejianhui/JavaStudy 已經(jīng)收錄,歡迎 Star弥奸。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末闻镶,一起剝皮案震驚了整個(gè)濱河市薪伏,隨后出現(xiàn)的幾起案子台谊,更是在濱河造成了極大的恐慌,老刑警劉巖八酒,帶你破解...
    沈念sama閱讀 206,214評(píng)論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件空民,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)界轩,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,307評(píng)論 2 382
  • 文/潘曉璐 我一進(jìn)店門画饥,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人浊猾,你說(shuō)我怎么就攤上這事抖甘。” “怎么了葫慎?”我有些...
    開(kāi)封第一講書人閱讀 152,543評(píng)論 0 341
  • 文/不壞的土叔 我叫張陵衔彻,是天一觀的道長(zhǎng)。 經(jīng)常有香客問(wèn)我幅疼,道長(zhǎng)米奸,這世上最難降的妖魔是什么昼接? 我笑而不...
    開(kāi)封第一講書人閱讀 55,221評(píng)論 1 279
  • 正文 為了忘掉前任爽篷,我火速辦了婚禮,結(jié)果婚禮上慢睡,老公的妹妹穿的比我還像新娘逐工。我一直安慰自己,他們只是感情好漂辐,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,224評(píng)論 5 371
  • 文/花漫 我一把揭開(kāi)白布泪喊。 她就那樣靜靜地躺著,像睡著了一般髓涯。 火紅的嫁衣襯著肌膚如雪袒啼。 梳的紋絲不亂的頭發(fā)上,一...
    開(kāi)封第一講書人閱讀 49,007評(píng)論 1 284
  • 那天纬纪,我揣著相機(jī)與錄音蚓再,去河邊找鬼。 笑死包各,一個(gè)胖子當(dāng)著我的面吹牛摘仅,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播问畅,決...
    沈念sama閱讀 38,313評(píng)論 3 399
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼娃属,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了护姆?” 一聲冷哼從身側(cè)響起矾端,我...
    開(kāi)封第一講書人閱讀 36,956評(píng)論 0 259
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎卵皂,沒(méi)想到半個(gè)月后秩铆,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 43,441評(píng)論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡渐裂,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 35,925評(píng)論 2 323
  • 正文 我和宋清朗相戀三年豺旬,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了钠惩。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 38,018評(píng)論 1 333
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡族阅,死狀恐怖篓跛,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情坦刀,我是刑警寧澤愧沟,帶...
    沈念sama閱讀 33,685評(píng)論 4 322
  • 正文 年R本政府宣布,位于F島的核電站鲤遥,受9級(jí)特大地震影響沐寺,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜盖奈,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,234評(píng)論 3 307
  • 文/蒙蒙 一混坞、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧钢坦,春花似錦究孕、人聲如沸。這莊子的主人今日做“春日...
    開(kāi)封第一講書人閱讀 30,240評(píng)論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至禾酱,卻和暖如春微酬,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背颤陶。 一陣腳步聲響...
    開(kāi)封第一講書人閱讀 31,464評(píng)論 1 261
  • 我被黑心中介騙來(lái)泰國(guó)打工颗管, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人指郁。 一個(gè)月前我還...
    沈念sama閱讀 45,467評(píng)論 2 352
  • 正文 我出身青樓忙上,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親闲坎。 傳聞我的和親對(duì)象是個(gè)殘疾皇子疫粥,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,762評(píng)論 2 345