線程池的拒絕策略示例

Java的線程池中涩拙,如果不斷往線程池提交任務际长,最終會發(fā)生什么?
如果work queue是一個有界隊列吃环,隊列放滿,線程數量達到maxsize洋幻,且沒有空閑線程時郁轻,再往線程池提交任務會觸發(fā)線程池的拒絕策略。

線程池有哪些拒絕策略呢文留?

1. AbortPolicy 丟棄并拋出異常-- 默認策略

定義
    /**
     * The default rejected execution handler
     */
    private static final RejectedExecutionHandler defaultHandler =
        new AbortPolicy();
    /**
     * A handler for rejected tasks that throws a
     * {@code RejectedExecutionException}.
     */
    public static class AbortPolicy implements RejectedExecutionHandler {
        /**
         * Creates an {@code AbortPolicy}.
         */
        public AbortPolicy() { }

        /**
         * Always throws RejectedExecutionException.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         * @throws RejectedExecutionException always
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            throw new RejectedExecutionException("Task " + r.toString() +
                                                 " rejected from " +
                                                 e.toString());
        }
    }
示例
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ThreadPoolRejectPolicyTest {

    public static void main(String[] args) {
        ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1, 2, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(5));
        
        for (int i = 0; i < 8; i++) {
            Runnable task = new Runnable() {
                @Override
                public void run() {
                    System.out.println("Thread is submitted.");
            }};
                
            threadPool.submit(task);
        }
    }
}
Thread is submitted.
Thread is submitted.
Thread is submitted.
Thread is submitted.
Thread is submitted.
Thread is submitted.
Thread is submitted.
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@6bc7c054 rejected from java.util.concurrent.ThreadPoolExecutor@232204a1[Running, pool size = 2, active threads = 0, queued tasks = 0, completed tasks = 7]
    at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
    at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
    at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
    at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:112)
    at demo.multithread.ThreadPoolRejectPolicyTest.main(ThreadPoolRejectPolicyTest.java:21)

2. DiscardPolicy 直接丟棄任務

定義
    /**
     * A handler for rejected tasks that silently discards the
     * rejected task.
     */
    public static class DiscardPolicy implements RejectedExecutionHandler {
        /**
         * Creates a {@code DiscardPolicy}.
         */
        public DiscardPolicy() { }

        /**
         * Does nothing, which has the effect of discarding task r.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        }
    }
示例

一共提交8個任務好唯,其中有一個默默被丟棄。

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ThreadPoolRejectPolicyTest {

    public static void main(String[] args) {
        ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1, 2, 60, 
                TimeUnit.SECONDS, new ArrayBlockingQueue<>(5), new ThreadPoolExecutor.DiscardPolicy());
        
        for (int i = 0; i < 8; i++) {
            Runnable task = new Runnable() {
                @Override
                public void run() {
                    System.out.println("Thread is submitted.");
                    try {
                        Thread.sleep(60000l);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
            }};
                
            threadPool.submit(task);
        }
        
        int threadCount = ((ThreadPoolExecutor)threadPool).getPoolSize();
        System.out.println("threadCount " + threadCount);
        
        int threadCntInWorkQueue = ((ThreadPoolExecutor)threadPool).getQueue().size();
        System.out.println("threadCntInWorkQueue " + threadCntInWorkQueue);
                
        threadPool.shutdown();
    }
}
Thread is submitted.
threadCount 2
threadCntInWorkQueue 5
Thread is submitted.

3. DiscardOldestPolicy 丟棄最舊的任務燥翅,再試著處理

定義
    /**
     * A handler for rejected tasks that discards the oldest unhandled
     * request and then retries {@code execute}, unless the executor
     * is shut down, in which case the task is discarded.
     */
    public static class DiscardOldestPolicy implements RejectedExecutionHandler {
        /**
         * Creates a {@code DiscardOldestPolicy} for the given executor.
         */
        public DiscardOldestPolicy() { }

        /**
         * Obtains and ignores the next task that the executor
         * would otherwise execute, if one is immediately available,
         * and then retries execution of task r, unless the executor
         * is shut down, in which case task r is instead discarded.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                e.getQueue().poll();
                e.execute(r);
            }
        }
    }
示例

線程池1個核心線程骑篙,max線程數為2,work queue大小為5.
可以看到森书,提交8個任務后靶端,第2個任務被丟棄了。因為第2個任務是oldest凛膏,第一個被放進queue的任務杨名。

package demo.multithread;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ThreadPoolRejectPolicyTest {

    public static void main(String[] args) {
        ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1, 2, 60, 
                TimeUnit.SECONDS, new ArrayBlockingQueue<>(5), new ThreadPoolExecutor.DiscardOldestPolicy());
        
        for (int i = 0; i < 8; i++) {
            Runnable task = new MyTask(i);
                
            threadPool.submit(task);
        }
        
        int threadCount = ((ThreadPoolExecutor)threadPool).getPoolSize();
        System.out.println("threadCount " + threadCount);
        
        int threadCntInWorkQueue = ((ThreadPoolExecutor)threadPool).getQueue().size();
        System.out.println("threadCntInWorkQueue " + threadCntInWorkQueue);
                
        threadPool.shutdown();
    }
}

class MyTask implements Runnable {
    private int taskId;
    
    MyTask(int i) {
        taskId = i;
    }

    @Override
    public void run() {
        System.out.println("Running thread - id " + taskId);
        try {
            Thread.sleep(1000l);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public int getTaskId() {
        return taskId;
    }

    public void setTaskId(int taskId) {
        this.taskId = taskId;
    }
}
Running thread - id 0
threadCount 2
threadCntInWorkQueue 5
Running thread - id 6
Running thread - id 2
Running thread - id 3
Running thread - id 4
Running thread - id 5
Running thread - id 7

4. CallerRunsPolicy 由調用者線程執(zhí)行任務

用這種拒絕策略時要注意,主線程既需要負責創(chuàng)建線程猖毫,又需要執(zhí)行任務台谍,會造成性能問題。

定義
    /**
     * A handler for rejected tasks that runs the rejected task
     * directly in the calling thread of the {@code execute} method,
     * unless the executor has been shut down, in which case the task
     * is discarded.
     */
    public static class CallerRunsPolicy implements RejectedExecutionHandler {
        /**
         * Creates a {@code CallerRunsPolicy}.
         */
        public CallerRunsPolicy() { }

        /**
         * Executes task r in the caller's thread, unless the executor
         * has been shut down, in which case the task is discarded.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                r.run();
            }
        }
    }
示例

在輸出中吁断,能看出趁蕊,主線程號為1,而提交的任務中仔役,其中一個任務(最后一個被提交的任務)就是由主線程來執(zhí)行的掷伙。

package demo.multithread;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ThreadPoolRejectPolicyTest {

    public static void main(String[] args) {
        ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1, 2, 60, 
                TimeUnit.SECONDS, new ArrayBlockingQueue<>(5), new ThreadPoolExecutor.CallerRunsPolicy());
        
        System.out.println("Main thread is " + Thread.currentThread().getId());
        
        for (int i = 0; i < 8; i++) {
            Runnable task = new MyTask(i);
                
            threadPool.submit(task);
        }
        
        int threadCount = ((ThreadPoolExecutor)threadPool).getPoolSize();
        System.out.println("threadCount " + threadCount);
        
        int threadCntInWorkQueue = ((ThreadPoolExecutor)threadPool).getQueue().size();
        System.out.println("threadCntInWorkQueue " + threadCntInWorkQueue);
                
        threadPool.shutdown();
    }
}

class MyTask implements Runnable {
    private int taskId;
    
    MyTask(int i) {
        taskId = i;
    }

    @Override
    public void run() {
        System.out.println("Running thread - id " + taskId);
        System.out.println("The thread id is " + Thread.currentThread().getId());
        try {
            Thread.sleep(1000l);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public int getTaskId() {
        return taskId;
    }

    public void setTaskId(int taskId) {
        this.taskId = taskId;
    }
}
Main thread is 1
Running thread - id 0
The thread id is 9
Running thread - id 7
The thread id is 1
Running thread - id 6
The thread id is 10
Running thread - id 1
The thread id is 9
threadCount 2
threadCntInWorkQueue 3
Running thread - id 2
The thread id is 10
Running thread - id 3
The thread id is 9
Running thread - id 4
The thread id is 10
Running thread - id 5
The thread id is 9

5. 自定義拒絕策略

了解了前四種拒絕策略,發(fā)現:
abort又兵,discard炎咖,discardOldest都會丟棄任務;
callerRun雖然執(zhí)行了任務,但是會影響主線程性能乘盼。

若將work queue設置為無界隊列升熊,或者將maxsize設置為最大整數,都有可能造成out of memory绸栅。

那么可以通過自定義拒絕策略级野,讓后進來的task阻塞住,有資源了再處理粹胯。這樣可以讓每一個任務都得到執(zhí)行蓖柔。

package demo.multithread;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ThreadPoolRejectPolicyTest {

    public static void main(String[] args) {
        ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1, 2, 60, 
                TimeUnit.SECONDS, new ArrayBlockingQueue<>(5), new CustomRejectPolicy());
                
        for (int i = 0; i < 8; i++) {
            Runnable task = new MyTask(i);
                
            threadPool.submit(task);
        }
                
        threadPool.shutdown();
    }
}

class CustomRejectPolicy implements RejectedExecutionHandler {

    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        try {
            executor.getQueue().put(r);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        
    }
    
}

class MyTask implements Runnable {
    private int taskId;
    
    MyTask(int i) {
        taskId = i;
    }

    @Override
    public void run() {
        System.out.println("Running thread - id " + taskId);
        try {
            Thread.sleep(1000l);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public int getTaskId() {
        return taskId;
    }

    public void setTaskId(int taskId) {
        this.taskId = taskId;
    }
}
Running thread - id 0
Running thread - id 6
Running thread - id 1
Running thread - id 2
Running thread - id 3
Running thread - id 4
Running thread - id 5
Running thread - id 7
?著作權歸作者所有,轉載或內容合作請聯系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市风纠,隨后出現的幾起案子况鸣,更是在濱河造成了極大的恐慌,老刑警劉巖竹观,帶你破解...
    沈念sama閱讀 207,113評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件镐捧,死亡現場離奇詭異,居然都是意外死亡臭增,警方通過查閱死者的電腦和手機懂酱,發(fā)現死者居然都...
    沈念sama閱讀 88,644評論 2 381
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來誊抛,“玉大人列牺,你說我怎么就攤上這事∞智裕” “怎么了瞎领?”我有些...
    開封第一講書人閱讀 153,340評論 0 344
  • 文/不壞的土叔 我叫張陵,是天一觀的道長随夸。 經常有香客問我默刚,道長,這世上最難降的妖魔是什么逃魄? 我笑而不...
    開封第一講書人閱讀 55,449評論 1 279
  • 正文 為了忘掉前任荤西,我火速辦了婚禮,結果婚禮上伍俘,老公的妹妹穿的比我還像新娘邪锌。我一直安慰自己,他們只是感情好癌瘾,可當我...
    茶點故事閱讀 64,445評論 5 374
  • 文/花漫 我一把揭開白布觅丰。 她就那樣靜靜地躺著,像睡著了一般妨退。 火紅的嫁衣襯著肌膚如雪妇萄。 梳的紋絲不亂的頭發(fā)上蜕企,一...
    開封第一講書人閱讀 49,166評論 1 284
  • 那天,我揣著相機與錄音冠句,去河邊找鬼轻掩。 笑死,一個胖子當著我的面吹牛懦底,可吹牛的內容都是我干的唇牧。 我是一名探鬼主播,決...
    沈念sama閱讀 38,442評論 3 401
  • 文/蒼蘭香墨 我猛地睜開眼聚唐,長吁一口氣:“原來是場噩夢啊……” “哼丐重!你這毒婦竟也來了?” 一聲冷哼從身側響起杆查,我...
    開封第一講書人閱讀 37,105評論 0 261
  • 序言:老撾萬榮一對情侶失蹤扮惦,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后亲桦,有當地人在樹林里發(fā)現了一具尸體崖蜜,經...
    沈念sama閱讀 43,601評論 1 300
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 36,066評論 2 325
  • 正文 我和宋清朗相戀三年烙肺,在試婚紗的時候發(fā)現自己被綠了纳猪。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片氧卧。...
    茶點故事閱讀 38,161評論 1 334
  • 序言:一個原本活蹦亂跳的男人離奇死亡桃笙,死狀恐怖,靈堂內的尸體忽然破棺而出沙绝,到底是詐尸還是另有隱情搏明,我是刑警寧澤,帶...
    沈念sama閱讀 33,792評論 4 323
  • 正文 年R本政府宣布闪檬,位于F島的核電站星著,受9級特大地震影響,放射性物質發(fā)生泄漏粗悯。R本人自食惡果不足惜虚循,卻給世界環(huán)境...
    茶點故事閱讀 39,351評論 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望样傍。 院中可真熱鬧横缔,春花似錦、人聲如沸衫哥。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,352評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽撤逢。三九已至膛锭,卻和暖如春粮坞,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背初狰。 一陣腳步聲響...
    開封第一講書人閱讀 31,584評論 1 261
  • 我被黑心中介騙來泰國打工莫杈, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人跷究。 一個月前我還...
    沈念sama閱讀 45,618評論 2 355
  • 正文 我出身青樓姓迅,卻偏偏與公主長得像,于是被迫代替她去往敵國和親俊马。 傳聞我的和親對象是個殘疾皇子丁存,可洞房花燭夜當晚...
    茶點故事閱讀 42,916評論 2 344

推薦閱讀更多精彩內容