10-Executor框架

Java的線程既是工作單元,也是執(zhí)行機制。JDK 5開始测柠,把工作單元與執(zhí)行機制分離開來。工作單元包括Runnable和Callable缘滥,而執(zhí)行機制由Executor框架提供轰胁。

1.Executor框架簡介

①Executor框架的兩級調(diào)度模型

java.lang.Thread被一對一映射為本地操作系統(tǒng)線程。Java線程啟動時會創(chuàng)建一個本地操作系統(tǒng)線程朝扼;當該Java線程終止時赃阀,這個操作系統(tǒng)線程也會被回收。操作系統(tǒng)會調(diào)度所有線程并將它們分配給可用的CPU吟税。

在上層凹耙,Java多線程程序通常把應用分解為若干個任務姿现,然后使用用戶級的調(diào)度器(Executor框架)將這些任務映射為固定數(shù)量的線程肠仪;在底層,操作系統(tǒng)內(nèi)核將這些線程映射到硬件處理器上备典。

②Executor框架的結(jié)構(gòu)與成員

1)Executor框架的結(jié)構(gòu)
  • 任務异旧。包括被執(zhí)行任務需要實現(xiàn)的接口Runnable和Callable接口。
  • 任務的執(zhí)行提佣。包括任務執(zhí)行機制的核心接口Executor吮蛹,以及繼承自Executor的ExecutorService接口。Executor框架有兩個關鍵類實現(xiàn)了ExecutorService接口(ThreadPoolExecutor和ScheduledThreadPoolExecutor)拌屏。
  • 一步計算的結(jié)果潮针。包括接口Future和實現(xiàn)Future接口的FutureTask類。
  • Executor:是一個接口倚喂,是Executor框架的基礎每篷。
  • ThreadPoolExecutor:是線程池的核心實現(xiàn)類,用來執(zhí)行被提交的任務端圈。
  • ScheduledThreadPoolExecutor:是一個實現(xiàn)類焦读,可以在給定的延遲后運行命令,或者定期執(zhí)行命令。比Timer更靈活,功能更強大渣磷。
  • Future接口和實現(xiàn)接口的FutureTask類:代表異步計算的結(jié)果域那。
  • Runnable接口和Callable接口的實現(xiàn)類:都可以被ThreadPoolExecutor或ScheduledThreadPoolExecutor執(zhí)行辱姨。
2)Executor框架的成員
  • ThreadPoolExecutor

    newFixedThreadPool:適用于為了滿足資源管理的需求产还,而需要限制當前線程數(shù)量的應用場景施禾,它適用于負載比較重的服務器祷嘶。

        public static ExecutorService newFixedThreadPool(int nThreads) {
            return new ThreadPoolExecutor(nThreads, nThreads,
                                          0L, TimeUnit.MILLISECONDS,
                                          new LinkedBlockingQueue<Runnable>());
        }
        public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
            return new ThreadPoolExecutor(nThreads, nThreads,
                                          0L, TimeUnit.MILLISECONDS,
                                          new LinkedBlockingQueue<Runnable>(),
                                          threadFactory);
        }
    

newSingleThreadExecutor適用于需要保證順序地執(zhí)行各個任務吠冤;并且在任意時間點浑彰,不會有多個線程是活動的應用場景。

    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }
    public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>(),
                                    threadFactory));
    }

newCachedThreadPool是大小無界的線程池拯辙,適用于執(zhí)行很多的短期異步任務的小程序郭变,或者是負載較輕的服務器。

    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
    public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>(),
                                      threadFactory);
    }
  • ScheduleThreadPoolExecutor

    newScheduledThreadPool:包含若干個線程的ScheduledThreadPoolExecutor涯保。適用于需要多個后臺線程執(zhí)行周期任務诉濒,同時為了滿足資源管理的需求而需要限制后臺線程的數(shù)量的應用場景。

        public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
            return new ScheduledThreadPoolExecutor(corePoolSize);
        }
        public static ScheduledExecutorService newScheduledThreadPool(
                int corePoolSize, ThreadFactory threadFactory) {
            return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
        }
    

    newSingleThreadScheduledExecutor:只包含一個線程的ScheduledThreadPoolExecutor夕春。適用于需要單個后臺線程執(zhí)行周期任務未荒,同時需要保證順序地執(zhí)行各個任務的應用場景。

        public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
            return new DelegatedScheduledExecutorService
                (new ScheduledThreadPoolExecutor(1));
        }
        public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) {
            return new DelegatedScheduledExecutorService
                (new ScheduledThreadPoolExecutor(1, threadFactory));
        }
    
  • Future

    <T> Future<T> submit(Callable<T> task)
    <T> Future<T> submit(Runnable task, T result)
    Future<?> submit(Runnable task)
    
  • Runnable和Callable

        public static Callable<Object> callable(Runnable task) {
            if (task == null)
                throw new NullPointerException();
            return new RunnableAdapter<Object>(task, null);
        }
        public static <T> Callable<T> callable(Runnable task, T result) {
            if (task == null)
                throw new NullPointerException();
            return new RunnableAdapter<T>(task, result);
        }
        static final class RunnableAdapter<T> implements Callable<T> {
            final Runnable task;
            final T result;
            RunnableAdapter(Runnable task, T result) {
                this.task = task;
                this.result = result;
            }
            public T call() {
                task.run();
                return result;
            }
        }
    

2.ScheduleThreadPoolExecutor詳解

繼承自ThreadPoolExecutor及志。主要用來在給定的延遲后執(zhí)行運行任務片排,或者定期執(zhí)行任務。功能與Timer類似速侈,但功能更強大率寡、更靈活。Timer對應的是單個后臺線程倚搬,而ScheduleThreadPoolExecutor可以在構(gòu)造函數(shù)中指定多個對應的后臺線程數(shù)冶共。

①ScheduleThreadPoolExecutor的運行機制

  • 當調(diào)用scheduleAtFixedRate方法或者scheduleWithFixedDelay方法時,會向ScheduleThreadPoolExecutor的DelayedWorkQueue添加了一個實現(xiàn)了RunnableScheduledFuture接口的ScheduledFutureTask每界。

  • 線程池中的線程從DelayedWorkQueue中獲取ScheduledFutureTask捅僵,然后執(zhí)行任務。

ScheduleThreadPoolExecutor為了實現(xiàn)周期性的執(zhí)行任務眨层,對ThreadPoolExecutor做了如下的修改庙楚。

  • 使用DelayedWorkQueue作為任務隊列。
  • 獲取任務的方式不同趴樱。
  • 執(zhí)行周期任務后馒闷,增加了額外的處理。

②ScheduleThreadPoolExecutor的實現(xiàn)

ScheduledFutureTask主要包含3個成員變量:

  • long time;//表示這個任務將要被執(zhí)行的具體時間伊佃。
  • long sequenceNumber;//表示這個任務被添加到ScheduleThreadPoolExecutor中的序號窜司。
  • long period;//表示任務執(zhí)行的間隔周期。
  • 從DelayWorkQueue中獲取已到期的ScheduleFutureTask(DelayWorkQueue.take())航揉。
  • 線程1執(zhí)行這個ScheduleFutureTask塞祈。
  • 線程1修改ScheduleFutureTask的time變量為下次將要執(zhí)行的時間。
  • 線程1把這個修改time之后的ScheduleFutureTask放回DelayWorkQueue中(DelayWorkQueue.add())帅涂。
        public RunnableScheduledFuture<?> take() throws InterruptedException {
            //獲取鎖
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
                for (;;) {
                    //獲取周期任務
                    RunnableScheduledFuture<?> first = queue[0];
                    //任務數(shù)組為空
                    if (first == null)
                        available.await();//等待
                    else {
                        long delay = first.getDelay(NANOSECONDS);
                        if (delay <= 0) 
                            return finishPoll(first);//獲取頭部元素议薪,將最后一個元素放到第一位尤蛮,并自上而下添加到其堆排序點。
                        //頭部元素的time時間比當前時間大
                        first = null; // don't retain ref while waiting
                        if (leader != null) 
                            available.await();
                        else {
                            Thread thisThread = Thread.currentThread();
                            leader = thisThread; 
                            try {
                                //到condition中等待到time時間
                                available.awaitNanos(delay);
                            } finally {
                                if (leader == thisThread)
                                    leader = null;
                            }
                        }
                    }
                }
            } finally {
                if (leader == null && queue[0] != null)
                    available.signal();
                lock.unlock();//釋放鎖
            }
        }
        public boolean offer(Runnable x) {
            if (x == null)
                throw new NullPointerException();
            RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
            final ReentrantLock lock = this.lock;
            lock.lock();//1 獲取鎖
            try {
                int i = size;
                if (i >= queue.length)
                    grow();//調(diào)整堆數(shù)組大小
                size = i + 1;
                if (i == 0) {
                    queue[0] = e;
                    setIndex(e, 0);
                } else {
                    siftUp(i, e);//2.1 在底部添加到堆排序點
                }
                if (queue[0] == e) {
                    leader = null;
                    available.signal();//2.2 通知
                }
            } finally {
                lock.unlock();//3 釋放鎖
            }
            return true;
        }

3.FutureTask詳解

①FutureTask簡介

實現(xiàn)了Future接口和Runnable接口斯议,可以交給Executor執(zhí)行产捞,也可以由調(diào)用線程直接執(zhí)行(FutureTask.run())。

FutureTask.get():未啟動和已啟動狀態(tài)時調(diào)用哼御,會導致線程阻塞坯临。已完成狀態(tài)時,調(diào)用線程立即返回結(jié)果或拋出異常恋昼。

FutureTask.cancel():未啟動狀態(tài)時看靠,調(diào)用將導致此任務永遠不會被執(zhí)行。已啟動狀態(tài)時液肌,cancel(true)將以中斷執(zhí)行此任務線程的方式來視圖停止任務挟炬;cancel(false)將不會對正在執(zhí)行此任務的線程產(chǎn)生任何影響(讓正在執(zhí)行的任務運行完成)。已完成狀態(tài)時嗦哆,執(zhí)行cancel返回false谤祖。

②FutureTask的使用

當一個線程需要等待另一個線程把某個任務執(zhí)行完成后才能繼續(xù)執(zhí)行,此時可以使用FutureTask老速。

假設有多個線程執(zhí)行若干個任務粥喜,每個任務最多只能被執(zhí)行一次,當多個線程視圖同時執(zhí)行同一個任務時烁峭,只允許一個線程執(zhí)行任務容客,其他線程需要等待這個任務執(zhí)行完成后才能繼續(xù)執(zhí)行秕铛。示例代碼:

    private final ConcurrentHashMap<Object, Future<String>> taskCache = new ConcurrentHashMap<>();//每個任務只執(zhí)行一次约郁,多個線程可以獲取到執(zhí)行的結(jié)果
    private String executionTask(final String taskName) throws ExecutionException, InterruptedException {
        while (true) {
            Future<String> future = taskCache.get(taskName);    //1.1   執(zhí)行完1.3后執(zhí)行  2.1
            if (future == null) {
                FutureTask<String> futureTask = new FutureTask<>(() -> taskName);//1.2
                //如果存在taskName,不更改value但两,返回舊值鬓梅,如果不存在,put
                future = taskCache.putIfAbsent(taskName, futureTask);//1.3
                if (future == null) {//put成功了
                    future = futureTask;
                    futureTask.run();//1.4執(zhí)行任務
                }
            }
            try {
                return future.get();//1.5 2.2
            } catch (CancellationException e) {
                taskCache.remove(taskName, future);//當taskName和future有映射關系的時候谨湘,才移除
            }
        }
    }

當兩個線程試圖同時執(zhí)行同一個任務時绽快,如果Thread1執(zhí)行了1.3后Thread2執(zhí)行2.1,那么接下來Thread2將在2.2等待紧阔,知道Thread1執(zhí)行完成1.4后Thread2才能從2.2返回坊罢。

?著作權歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市擅耽,隨后出現(xiàn)的幾起案子活孩,更是在濱河造成了極大的恐慌,老刑警劉巖乖仇,帶你破解...
    沈念sama閱讀 218,858評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件憾儒,死亡現(xiàn)場離奇詭異询兴,居然都是意外死亡,警方通過查閱死者的電腦和手機起趾,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,372評論 3 395
  • 文/潘曉璐 我一進店門诗舰,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人训裆,你說我怎么就攤上這事眶根。” “怎么了边琉?”我有些...
    開封第一講書人閱讀 165,282評論 0 356
  • 文/不壞的土叔 我叫張陵汛闸,是天一觀的道長。 經(jīng)常有香客問我艺骂,道長诸老,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,842評論 1 295
  • 正文 為了忘掉前任钳恕,我火速辦了婚禮别伏,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘忧额。我一直安慰自己厘肮,他們只是感情好,可當我...
    茶點故事閱讀 67,857評論 6 392
  • 文/花漫 我一把揭開白布睦番。 她就那樣靜靜地躺著类茂,像睡著了一般。 火紅的嫁衣襯著肌膚如雪托嚣。 梳的紋絲不亂的頭發(fā)上巩检,一...
    開封第一講書人閱讀 51,679評論 1 305
  • 那天,我揣著相機與錄音示启,去河邊找鬼兢哭。 笑死,一個胖子當著我的面吹牛夫嗓,可吹牛的內(nèi)容都是我干的迟螺。 我是一名探鬼主播,決...
    沈念sama閱讀 40,406評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼舍咖,長吁一口氣:“原來是場噩夢啊……” “哼矩父!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起排霉,我...
    開封第一講書人閱讀 39,311評論 0 276
  • 序言:老撾萬榮一對情侶失蹤窍株,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體夹姥,經(jīng)...
    沈念sama閱讀 45,767評論 1 315
  • 正文 獨居荒郊野嶺守林人離奇死亡杉武,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,945評論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了辙售。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片轻抱。...
    茶點故事閱讀 40,090評論 1 350
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖旦部,靈堂內(nèi)的尸體忽然破棺而出祈搜,到底是詐尸還是另有隱情,我是刑警寧澤士八,帶...
    沈念sama閱讀 35,785評論 5 346
  • 正文 年R本政府宣布容燕,位于F島的核電站,受9級特大地震影響婚度,放射性物質(zhì)發(fā)生泄漏蘸秘。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,420評論 3 331
  • 文/蒙蒙 一蝗茁、第九天 我趴在偏房一處隱蔽的房頂上張望醋虏。 院中可真熱鬧,春花似錦哮翘、人聲如沸颈嚼。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,988評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽阻课。三九已至,卻和暖如春艰匙,著一層夾襖步出監(jiān)牢的瞬間限煞,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,101評論 1 271
  • 我被黑心中介騙來泰國打工旬薯, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留晰骑,地道東北人适秩。 一個月前我還...
    沈念sama閱讀 48,298評論 3 372
  • 正文 我出身青樓绊序,卻偏偏與公主長得像,于是被迫代替她去往敵國和親秽荞。 傳聞我的和親對象是個殘疾皇子骤公,可洞房花燭夜當晚...
    茶點故事閱讀 45,033評論 2 355

推薦閱讀更多精彩內(nèi)容

  • 在Java中,使用線程來執(zhí)行異步任務扬跋。Java線程的創(chuàng)建于銷毀需要一定的開銷阶捆,如果我們?yōu)槊恳粋€任務創(chuàng)建一個新線程來...
    伊凡的一天閱讀 333評論 0 1
  • 一 用線程池好處 限制和管理資源(執(zhí)行任務)。 維護統(tǒng)計信息,如已完成任務數(shù)洒试。 1)重復利用已創(chuàng)建線程降低線程創(chuàng)建...
    hedgehog1112閱讀 828評論 0 0
  • 為什么要用線程池 關于為什么要使用多線程倍奢,請參考【多線程與并發(fā)】:線程的創(chuàng)建、狀態(tài)垒棋、方法中的最后一點卒煞。 那為什么要...
    maxwellyue閱讀 550評論 0 1
  • Executor框架是在Java5中引入的,可以通過該框架來控制線程的啟動叼架,執(zhí)行畔裕,關閉,簡化并發(fā)編程乖订。Execut...
    加大裝益達閱讀 3,737評論 0 3
  • 猜你 如同揭秘宇宙 我智慧不夠 不猜 宇宙失去意義 此前的探索 淪為史前的黑洞 ——《猜宇宙》
    段童閱讀 244評論 1 2