ThreadPoolExecutor 優(yōu)先級(jí)的線程池

本文使用 ThreadPoolExecutor 實(shí)現(xiàn)一個(gè) 帶優(yōu)先級(jí)的線程池


最近做一個(gè)PPT轉(zhuǎn)PDF的功能, 調(diào)用 office 的另存為, 時(shí)間較長, (大約2S轉(zhuǎn)一個(gè)文件), 而且只能單線程來跑, 項(xiàng)目要求批量轉(zhuǎn)好并發(fā)郵件, 如果用戶手動(dòng)點(diǎn)擊的生成PDF則應(yīng)該盡快生成, 不能等批量轉(zhuǎn)好后再讓用戶下載.所以就實(shí)現(xiàn)了一個(gè)有優(yōu)先級(jí)的線程池任務(wù)隊(duì)列. 其實(shí)正常的實(shí)現(xiàn)方式是使用優(yōu)先級(jí)隊(duì)列(java.util.PriorityQueue / java.util.concurrent.PriorityBlockingQueue)這種方式?jīng)]辦法同步的獲取結(jié)果, 編程上有點(diǎn)復(fù)雜, java.util.concurrent.ThreadPoolExecutor 可以 public <T> Future<T> submit(Callable<T> task); 使用Future.get(), 阻塞線程, 等待結(jié)果, 來實(shí)現(xiàn)同步調(diào)用.

public class PriorityThreadPoolExecutor extends ThreadPoolExecutor;

實(shí)現(xiàn)方法很簡單, 繼承 ThreadPoolExecutor 使用 PriorityBlockingQueue 優(yōu)先級(jí)隊(duì)列. PriorityBlockingQueue 有個(gè)坑就是.

Operations on this class make no guarantees about the ordering of elements with equal priority.

*如果優(yōu)先級(jí)相同,不能確定順序. *

實(shí)際測(cè)試下來的結(jié)果是, 如果優(yōu)先級(jí)相同則執(zhí)行順序跟插入順序相反, 這就尷尬了, 著還是FIFO隊(duì)列嗎? 官網(wǎng)給了解決方式.對(duì)每一個(gè)隊(duì)列元素編號(hào), 照抄就可以了. 限制就是隊(duì)列歷史總個(gè)數(shù)不能超過 Long 個(gè). 實(shí)現(xiàn)一個(gè)Comparable 的類

class PriorityRunnable<E extends Comparable<? super E>> implements Runnable, Comparable<PriorityRunnable<E>>;

重載線程池的添加任務(wù)的方法, 追加一個(gè)參數(shù). 如果使用基類的方法, 優(yōu)先級(jí)為 0 .

    public void execute(Runnable command, int priority);
    public <T> Future<T> submit(Callable<T> task, int priority);
    public <T> Future<T> submit(Runnable task, T result, int priority);
    public Future<?> submit(Runnable task, int priority);

最終代碼如下

package wang.lcs.sys.util;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;

public class PriorityThreadPoolExecutor extends ThreadPoolExecutor {

    private static final Logger log = LoggerFactory.getLogger(PriorityThreadPoolExecutor.class);

    private ThreadLocal<Integer> local = new ThreadLocal<Integer>() {
        @Override
        protected Integer initialValue() {
            return 0;
        }
    };

    public PriorityThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, getWorkQueue());
    }

    public PriorityThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, getWorkQueue(), threadFactory);
    }

    public PriorityThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, getWorkQueue(), handler);
    }

    public PriorityThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, getWorkQueue(), threadFactory, handler);
    }

    protected static PriorityBlockingQueue getWorkQueue() {
        return new PriorityBlockingQueue();
    }

    @Override
    public void execute(Runnable command) {
        int priority = local.get();
        try {
            this.execute(command, priority);
        } finally {
            local.set(0);
        }
    }

    public void execute(Runnable command, int priority) {
        super.execute(new PriorityRunnable(command, priority));
    }

    public <T> Future<T> submit(Callable<T> task, int priority) {
        local.set(priority);
        return super.submit(task);
    }

    public <T> Future<T> submit(Runnable task, T result, int priority) {
        local.set(priority);
        return super.submit(task, result);
    }

    public Future<?> submit(Runnable task, int priority) {
        local.set(priority);
        return super.submit(task);
    }

    protected static class PriorityRunnable<E extends Comparable<? super E>> implements Runnable, Comparable<PriorityRunnable<E>> {
        private final static AtomicLong seq = new AtomicLong();
        private final long seqNum;
        Runnable run;
        private int priority;

        public PriorityRunnable(Runnable run, int priority) {
            seqNum = seq.getAndIncrement();
            this.run = run;
            this.priority = priority;
        }

        public int getPriority() {
            return priority;
        }

        public void setPriority(int priority) {
            this.priority = priority;
        }

        public Runnable getRun() {
            return run;
        }

        @Override
        public void run() {
            this.run.run();
        }

        @Override
        public int compareTo(PriorityRunnable<E> other) {
            int res = 0;
            if (this.priority == other.priority) {
                if (other.run != this.run) {// ASC
                    res = (seqNum < other.seqNum ? -1 : 1);
                }
            } else {// DESC
                res = this.priority > other.priority ? -1 : 1;
            }
            return res;
        }
    }
}

下面是測(cè)試用例

package wang.lcs.sys.util;

import org.junit.Assert;
import org.junit.Test;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

import static org.junit.Assert.*;

public class PriorityThreadPoolExecutorTest {

    @Test
    public void testDefault() throws InterruptedException, ExecutionException {
        PriorityThreadPoolExecutor pool = new PriorityThreadPoolExecutor(1, 1000, 1, TimeUnit.MINUTES);

        Future[] futures = new Future[20];
        StringBuffer buffer = new StringBuffer();
        for (int i = 0; i < futures.length; i++) {
            int index = i;
            futures[i] = pool.submit(new Callable() {
                @Override
                public Object call() throws Exception {
                    Thread.sleep(10);
                    buffer.append(index + ", ");
                    return null;
                }
            });
        }
        // 等待所有任務(wù)結(jié)束
        for (int i = 0; i < futures.length; i++) {
            futures[i].get();
        }
        System.out.println(buffer);
        assertEquals("0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, ", buffer.toString());
    }

    @Test
    public void testSamePriority() throws InterruptedException, ExecutionException {
        PriorityThreadPoolExecutor pool = new PriorityThreadPoolExecutor(1, 1000, 1, TimeUnit.MINUTES);

        Future[] futures = new Future[10];
        StringBuffer buffer = new StringBuffer();
        for (int i = 0; i < futures.length; i++) {
            futures[i] = pool.submit(new TenSecondTask(i, 1, buffer), 1);
        }
        // 等待所有任務(wù)結(jié)束
        for (int i = 0; i < futures.length; i++) {
            futures[i].get();
        }
        System.out.println(buffer);
        assertEquals("01@00, 01@01, 01@02, 01@03, 01@04, 01@05, 01@06, 01@07, 01@08, 01@09, ", buffer.toString());
    }

    @Test
    public void testRandomPriority() throws InterruptedException, ExecutionException {
        PriorityThreadPoolExecutor pool = new PriorityThreadPoolExecutor(1, 1000, 1, TimeUnit.MINUTES);

        Future[] futures = new Future[20];
        StringBuffer buffer = new StringBuffer();
        for (int i = 0; i < futures.length; i++) {
            int r = (int) (Math.random() * 100);
            futures[i] = pool.submit(new TenSecondTask(i, r, buffer), r);
        }
        // 等待所有任務(wù)結(jié)束
        for (int i = 0; i < futures.length; i++) {
            futures[i].get();
        }

        buffer.append("01@00");
        System.out.println(buffer);
        String[] split = buffer.toString().split(", ");
        // 從 2 開始, 因?yàn)榍懊娴娜蝿?wù)可能已經(jīng)開始
        for (int i = 2; i < split.length - 1; i++) {
            String s = split[i].split("@")[0];
            assertTrue(Integer.valueOf(s) >= Integer.valueOf(split[i + 1].split("@")[0]));
        }
    }

    public static class TenSecondTask<T> implements Callable<T> {
        private StringBuffer buffer;
        int index;
        int priority;

        public TenSecondTask(int index, int priority, StringBuffer buffer) {
            this.index = index;
            this.priority = priority;
            this.buffer = buffer;
        }

        @Override
        public T call() throws Exception {
            Thread.sleep(10);
            buffer.append(String.format("%02d@%02d", this.priority, index)).append(", ");
            return null;
        }
    }
}

需要說明的是: 使用了 ThreadLocal 類, 減少代碼的復(fù)制粘貼

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末再来,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子盈厘,更是在濱河造成了極大的恐慌春缕,老刑警劉巖盗胀,帶你破解...
    沈念sama閱讀 217,826評(píng)論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異锄贼,居然都是意外死亡票灰,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,968評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門宅荤,熙熙樓的掌柜王于貴愁眉苦臉地迎上來屑迂,“玉大人,你說我怎么就攤上這事冯键∪桥危” “怎么了?”我有些...
    開封第一講書人閱讀 164,234評(píng)論 0 354
  • 文/不壞的土叔 我叫張陵惫确,是天一觀的道長手报。 經(jīng)常有香客問我,道長改化,這世上最難降的妖魔是什么掩蛤? 我笑而不...
    開封第一講書人閱讀 58,562評(píng)論 1 293
  • 正文 為了忘掉前任,我火速辦了婚禮陈肛,結(jié)果婚禮上揍鸟,老公的妹妹穿的比我還像新娘。我一直安慰自己燥爷,他們只是感情好蜈亩,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,611評(píng)論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著前翎,像睡著了一般稚配。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上港华,一...
    開封第一講書人閱讀 51,482評(píng)論 1 302
  • 那天道川,我揣著相機(jī)與錄音,去河邊找鬼。 笑死冒萄,一個(gè)胖子當(dāng)著我的面吹牛臊岸,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播尊流,決...
    沈念sama閱讀 40,271評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼帅戒,長吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來了崖技?” 一聲冷哼從身側(cè)響起逻住,我...
    開封第一講書人閱讀 39,166評(píng)論 0 276
  • 序言:老撾萬榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎迎献,沒想到半個(gè)月后瞎访,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,608評(píng)論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡吁恍,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,814評(píng)論 3 336
  • 正文 我和宋清朗相戀三年扒秸,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片冀瓦。...
    茶點(diǎn)故事閱讀 39,926評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡伴奥,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出咕幻,到底是詐尸還是另有隱情渔伯,我是刑警寧澤顶霞,帶...
    沈念sama閱讀 35,644評(píng)論 5 346
  • 正文 年R本政府宣布肄程,位于F島的核電站,受9級(jí)特大地震影響选浑,放射性物質(zhì)發(fā)生泄漏蓝厌。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,249評(píng)論 3 329
  • 文/蒙蒙 一古徒、第九天 我趴在偏房一處隱蔽的房頂上張望拓提。 院中可真熱鬧,春花似錦隧膘、人聲如沸代态。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,866評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽蹦疑。三九已至,卻和暖如春萨驶,著一層夾襖步出監(jiān)牢的瞬間歉摧,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,991評(píng)論 1 269
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留叁温,地道東北人再悼。 一個(gè)月前我還...
    沈念sama閱讀 48,063評(píng)論 3 370
  • 正文 我出身青樓,卻偏偏與公主長得像膝但,于是被迫代替她去往敵國和親冲九。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,871評(píng)論 2 354

推薦閱讀更多精彩內(nèi)容