ExecutorService – 10個技巧和竅門

ExecutorService抽象從java5一直持續(xù)到現(xiàn)在氧卧。我們在這里討論2004桃笙,簡單提醒一下:java5和java6將不會被支持,java7won’t be in half a year沙绝。我提出這個問題的原因是因為很多java程序員仍然不能完全理解ExecutorService的工作原理搏明。有很多地方需要了解,今天我想分享一些鮮為人知的特性和實踐闪檬。然而這篇文章針對中級程序員星著,沒有特別牛逼的。

1粗悯,命名線程池

我不能強調(diào)這一點虚循。在dump一個運行的jvm的所有線程或者調(diào)試的時候,默認(rèn)的線程池命名模式是pool-N-thread-M,其中N代表線程池序列號(每次你創(chuàng)建一個新的線程池样傍,全局N計數(shù)增加)横缔,M是線程池里面線程的序列號。例如pool-2-thread-3表示在jvm進程生命周期里面創(chuàng)建的第二個線程池的第三個線程衫哥。參閱:Executors.defaultThreadFactory()茎刚。不是很具描述性。JDK使正確命名線程變得稍微有的復(fù)雜因為命名策略隱藏在ThreadFactory撤逢。幸運的是Guava(google開源的一組工具類集合)工具包有個幫助類來做這件事情:

import com.google.common.util.concurrent.ThreadFactoryBuilder;
//通過ThreadFactoryBuilder這個類來設(shè)置線程池的名稱并返回一個ThreadFactory
final ThreadFactory threadFactory = new ThreadFactoryBuilder()
        .setNameFormat("Orders-%d")
        .setDaemon(true)
        .build();
final ExecutorService executorService = Executors.newFixedThreadPool(10, threadFactory);

默認(rèn)情況下線程池創(chuàng)建非守護線程膛锭,取決于你是否需要這種類型的線程。

2蚊荣,根據(jù)上下文切換名稱

這是一個我從supercharged-jstack-how-to-debug-your-servers-at-100mph學(xué)到的技巧初狰,一旦我們記住線程的名稱,我們可以在運行隨時修改他們互例!這是有意義的因為線程dump顯示了類和方法名稱奢入,而不是參數(shù)和本地變量。通過調(diào)整線程名稱來保留一些基本的事物標(biāo)識符媳叨,我們可以容易跟蹤哪個消息/記錄/查詢等很慢或者引起了死鎖俊马。例如:

private void process(String messageId) {
    executorService.submit(() -> {
        final Thread currentThread = Thread.currentThread();
        final String oldName = currentThread.getName();
        currentThread.setName("Processing-" + messageId);
        try {
            //這里是業(yè)務(wù)邏輯...
        } finally {
            currentThread.setName(oldName);
        }
    });
}

在try-finally代碼塊里面當(dāng)前線程命名為Processing-WHATEVER-MESSAGE-ID-IS丁存。當(dāng)追蹤經(jīng)過這個系統(tǒng)的消息的時候這可能會帶來便利。

3柴我,明確和安全的關(guān)閉線程池

在客戶線程(待提交運行的任務(wù))和線程池(執(zhí)行任務(wù)的線程)之間有個任務(wù)對列解寝。當(dāng)你的應(yīng)用程序關(guān)閉的時候,你必須關(guān)心兩間事情:排隊的任務(wù)如何處理以及已經(jīng)在線程池里面的任務(wù)怎么運行(稍后會詳細(xì)介紹)艘儒。令人驚訝的是很多開發(fā)者并沒有正確地或有意識地關(guān)閉線程池聋伦。有兩種技術(shù):讓所有排隊的任務(wù)執(zhí)行(通過shutdown()這個方法)或者從隊列刪除他們(通過shutdownNow()這個方法)-完全取決于你的實際情況。例如如果我們想提交一堆任務(wù)并且想當(dāng)它們都完成了才結(jié)束界睁,這個情況可以使用shutdown():

private void sendAllEmails(List<String> emails) throws InterruptedException {
    emails.forEach(email ->
            executorService.submit(() ->
                    sendEmail(email)));
    executorService.shutdown();
    final boolean done = executorService.awaitTermination(1, TimeUnit.MINUTES);
    log.debug("All e-mails were sent so far? {}", done);

在這個場景觉增,我們發(fā)送一堆郵件,每一個郵件的發(fā)送在線程池里面作為一個單獨的任務(wù)翻斟。在提交這些任務(wù)之后我們關(guān)閉線程池以至于它不再接收新的任務(wù)逾礁。這時候我們等待最長一分鐘,直到這些任務(wù)都完成访惜。然而一些任務(wù)仍然未結(jié)束嘹履,awaitTermination()會返回false。此外债热,未結(jié)束的任務(wù)會繼續(xù)執(zhí)行砾嫉。我知道趕時髦的人這樣做:

emails.parallelStream().forEach(this::sendEmail);

稱我為老式的,但是我喜歡控制并發(fā)線程的數(shù)量窒篱。不要緊焕刮,一個優(yōu)雅替代shutdown()的是shutdownNow():

final List<Runnable> rejected = executorService.shutdownNow();
log.debug("Rejected tasks: {}", rejected.size());

這次所有排隊任務(wù)會被丟棄返回。已經(jīng)執(zhí)行的任務(wù)任然可以繼續(xù)執(zhí)行墙杯。

4配并,謹(jǐn)慎處理中斷

Future接口的鮮為人知的功能是取消。查看之前的文章InterruptedException and interrupting threads explained

5,監(jiān)控隊列長度并且讓隊列有界

大小不正確的線程池可能導(dǎo)致緩慢高镐,不穩(wěn)定以及內(nèi)存溢出溉旋。如果你配置太少的線程,隊列會堆積避消,耗費很多內(nèi)存低滩。另一方面太多的線程會減慢整個系統(tǒng)召夹,因為過多的線程上下文切換 - 并導(dǎo)致和之前相當(dāng)?shù)陌Y狀岩喷。查看隊列的深度并保持有界很重要,因此超載的線程暫時拒絕新的任務(wù)监憎。

final BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(100);
executorService = new ThreadPoolExecutor(n, n,
        0L, TimeUnit.MILLISECONDS,
        queue);

上面的代碼等同于Executors.newFixedThreadPool(n)纱意。而不是使用默認(rèn)的無界LinkedBlockingQueue我們使用容量大小為100的ArrayBlockingQueue。這意味著如果一百個任務(wù)已經(jīng)在隊列里面了(并且n個正在執(zhí)行)新的任務(wù)會被拒絕返回RejectedExecutionException鲸阔。此外偷霉,由于隊列現(xiàn)在可以在外部使用迄委,我們可以定期調(diào)用size()并將其放在logs / JMX /任何您使用的監(jiān)視機制中。

6类少,記得處理異常

下面代碼片段的結(jié)果是叙身?

executorService.submit(() -> {
    System.out.println(1 / 0);
});

我被上面的代碼坑過很多次了,它不會打印任何東西硫狞。沒有 java.lang.ArithmeticException: / by zero 整個的標(biāo)注信轿,什么都沒有。線程池只是吞掉整個異常残吩,好像它從未發(fā)生過一樣财忽。如果它是一個從頭開始創(chuàng)建的線程,UncaughtExceptionHandler 可以工作泣侮。但是使用線程池必須更加小心即彪。如果你提交一個Runnable(像上面沒有任何結(jié)果)你需要用try catche包括代碼主體,并且打印日志活尊。如果你提交一個Callable,確保你總是使用阻塞get()取消引用它來重新拋出異常:

final Future<Integer> division = executorService.submit(() -> 1 / 0);
//below will throw ExecutionException caused by ArithmeticException
division.get();

有趣的是即使spring框架使用@Async提交了整個bug,參閱:SPR-8995和SPR-12090隶校。

7,監(jiān)控在隊列里面的等待時間

監(jiān)控工作隊列深度是一方面酬凳。然而在追蹤單個事物/任務(wù)問題的時候很有必要看下在提交任務(wù)和實際執(zhí)行之間花了多少時間惠况。該持續(xù)時間應(yīng)該優(yōu)先為0(當(dāng)線程池里面有空閑線程的時候),然而它會增長當(dāng)任務(wù)必須排隊的時候宁仔。此外稠屠,如果線程池沒有一個固定的線程數(shù),運行新的任務(wù)需要創(chuàng)建新的線程翎苫,也會消耗短暫的時間权埠。為了清楚的監(jiān)控這個指標(biāo),用與此類似的東西包轉(zhuǎn)原來的ExecutorService:

public class WaitTimeMonitoringExecutorService implements ExecutorService {
    private final ExecutorService target;
    public WaitTimeMonitoringExecutorService(ExecutorService target) {
        this.target = target;
    }
    @Override
    public <T> Future<T> submit(Callable<T> task) {
        final long startTime = System.currentTimeMillis();
        return target.submit(() -> {
                    final long queueDuration = System.currentTimeMillis() - startTime;
                    log.debug("Task {} spent {}ms in queue", task, queueDuration);
                    return task.call();
                }
        );
    }
    @Override
    public <T> Future<T> submit(Runnable task, T result) {
        return submit(() -> {
            task.run();
            return result;
        });
    }
    @Override
    public Future<?> submit(Runnable task) {
        return submit(new Callable<Void>() {
            @Override
            public Void call() throws Exception {
                task.run();
                return null;
            }
        });
    }
    //...
}

這不是一個完整的實現(xiàn)煎谍,但你得到了基本的思想攘蔽。當(dāng)我們像線程池提交任務(wù)的時候,我們立即測量開始時間呐粘,一旦任務(wù)被選擇并執(zhí)行我們就停止測量满俗。不要被源代碼中的startTime和queueDuration非常接近而迷惑。事實上作岖,這兩行是在不同的線程中進行評估的唆垃,可能是幾毫秒甚至幾秒,例如:

Task com.nurkiewicz.MyTask@7c7f3894 spent 9883ms in queue

8痘儡,保留客戶端堆棧

如今辕万,反應(yīng)式編程似乎引起了很多關(guān)注。 Reactive manifesto, reactive streams, RxJava (just released 1.0!), Clojure agents, scala.rx… 他們都很好用,但堆棧跟蹤不再是你的朋友渐尿,它們至多是無用的醉途。例如,在提交給線程池的任務(wù)中發(fā)生異常:

java.lang.NullPointerException: null
    at com.nurkiewicz.MyTask.call(Main.java:76) ~[classes/:na]
    at com.nurkiewicz.MyTask.call(Main.java:72) ~[classes/:na]
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[na:1.8.0]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) ~[na:1.8.0]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) ~[na:1.8.0]
    at java.lang.Thread.run(Thread.java:744) ~[na:1.8.0]

我們?nèi)菀椎匕l(fā)下在76行MyTask threw NPE砖茸,但是我們不知道誰提交了這個任務(wù)隘擎,因為堆棧跟蹤只顯示Thread和ThreadPoolExecutor。我們可以在技術(shù)上瀏覽源代碼凉夯,希望找到一個創(chuàng)建MyTask的地方嵌屎。但是沒有線程(更不用說事件驅(qū)動,響應(yīng)式恍涂, actor-ninja-programming)我們可以立即看到全貌宝惰。如果我們可以保留客戶端代碼的堆棧并且顯示它,例如如果失敗了再沧?這個想法并不新鮮尼夺,例如Hazelcast from owner node to client code。這就是在出現(xiàn)故障時保持客戶端堆棧跟蹤的天真支持:

public class ExecutorServiceWithClientTrace implements ExecutorService {
    protected final ExecutorService target;
    public ExecutorServiceWithClientTrace(ExecutorService target) {
        this.target = target;
    }
    @Override
    public <T> Future<T> submit(Callable<T> task) {
        return target.submit(wrap(task, clientTrace(), Thread.currentThread().getName()));
    }
    private <T> Callable<T> wrap(final Callable<T> task, final Exception clientStack, String clientThreadName) {
        return () -> {
            try {
                return task.call();
            } catch (Exception e) {
                log.error("Exception {} in task submitted from thrad {} here:", e, clientThreadName, clientStack);
                throw e;
            }
        };
    }
    private Exception clientTrace() {
        return new Exception("Client stack trace");
    }
    @Override
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
        return tasks.stream().map(this::submit).collect(toList());
    }
    //...
}

這次在出現(xiàn)故障的時候我們可以檢索提交任務(wù)的地方所有的堆棧和線程的名稱炒瘸。與之前看到的標(biāo)準(zhǔn)異常相比淤堵,它更有價值:

Exception java.lang.NullPointerException in task submitted from thrad main here:
java.lang.Exception: Client stack trace
    at com.nurkiewicz.ExecutorServiceWithClientTrace.clientTrace(ExecutorServiceWithClientTrace.java:43) ~[classes/:na]
    at com.nurkiewicz.ExecutorServiceWithClientTrace.submit(ExecutorServiceWithClientTrace.java:28) ~[classes/:na]
    at com.nurkiewicz.Main.main(Main.java:31) ~[classes/:na]
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0]
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0]
    at java.lang.reflect.Method.invoke(Method.java:483) ~[na:1.8.0]
    at com.intellij.rt.execution.application.AppMain.main(AppMain.java:134) ~[idea_rt.jar:na]

9,優(yōu)先CompletableFuture

在Java 8中引入了更強大的CompletableFuture。請盡可能使用它顷扩。ExecutorService未擴展為支持這種增強的抽象拐邪,因此您必須自己處理它。代替:

final Future<BigDecimal> future = 
    executorService.submit(this::calculate);

這樣做:

final CompletableFuture<BigDecimal> future = 
    CompletableFuture.supplyAsync(this::calculate, executorService);

CompletableFuture擴展了Future隘截,所以一切都像以前一樣工作扎阶。 但是,API的更高級消費者將真正欣賞CompletableFuture提供的擴展功能婶芭。

10东臀,同步隊列

SynchronousQueue是一個有趣的BlockingQueue,它不是真正的隊列犀农。 它本身甚至都不是數(shù)據(jù)結(jié)構(gòu)惰赋。 最好將其解釋為容量為0的隊列。引用JavaDoc:
”each insert operation must wait for a corresponding remove operation by another thread, and vice versa. A synchronous queue does not have any internal capacity, not even a capacity of one. You cannot peek at a synchronous queue because an element is only present when you try to remove it; you cannot insert an element (using any method) unless another thread is trying to remove it; you cannot iterate as there is nothing to iterate. […]

Synchronous queues are similar to rendezvous channels used in CSP and Ada.“

這個怎么和線程池關(guān)聯(lián)上呢呵哨?嘗試將SynchronousQueue與ThreadPoolExecutor一起使用:

BlockingQueue<Runnable> queue = new SynchronousQueue<>();
ExecutorService executorService = new ThreadPoolExecutor(n, n,
        0L, TimeUnit.MILLISECONDS,
        queue);

我們創(chuàng)建了一個帶有兩個線程的線程池赁濒,并在它前面有一個SynchronousQueue。因為SynchronousQueue本質(zhì)上是一個容量為0的隊列孟害,所以如果有可用的空閑線程拒炎,這樣的ExecutorService將只接受新任務(wù)。 如果所有線程都忙纹坐,新任務(wù)將立即被拒絕枝冀,永遠(yuǎn)不會等待。 當(dāng)在后臺處理必須立即開始或被丟棄時耘子,這個執(zhí)行方式是被期待的果漾。

就是這樣,我希望你找到至少一個有趣的功能谷誓!

最后英文原文

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末绒障,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子捍歪,更是在濱河造成了極大的恐慌户辱,老刑警劉巖,帶你破解...
    沈念sama閱讀 206,311評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件糙臼,死亡現(xiàn)場離奇詭異庐镐,居然都是意外死亡,警方通過查閱死者的電腦和手機变逃,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,339評論 2 382
  • 文/潘曉璐 我一進店門必逆,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人揽乱,你說我怎么就攤上這事名眉。” “怎么了凰棉?”我有些...
    開封第一講書人閱讀 152,671評論 0 342
  • 文/不壞的土叔 我叫張陵损拢,是天一觀的道長。 經(jīng)常有香客問我撒犀,道長福压,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 55,252評論 1 279
  • 正文 為了忘掉前任或舞,我火速辦了婚禮隧膏,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘嚷那。我一直安慰自己胞枕,他們只是感情好,可當(dāng)我...
    茶點故事閱讀 64,253評論 5 371
  • 文/花漫 我一把揭開白布魏宽。 她就那樣靜靜地躺著腐泻,像睡著了一般。 火紅的嫁衣襯著肌膚如雪队询。 梳的紋絲不亂的頭發(fā)上派桩,一...
    開封第一講書人閱讀 49,031評論 1 285
  • 那天,我揣著相機與錄音蚌斩,去河邊找鬼铆惑。 笑死,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的员魏。 我是一名探鬼主播丑蛤,決...
    沈念sama閱讀 38,340評論 3 399
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼撕阎!你這毒婦竟也來了受裹?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 36,973評論 0 259
  • 序言:老撾萬榮一對情侶失蹤虏束,失蹤者是張志新(化名)和其女友劉穎棉饶,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體镇匀,經(jīng)...
    沈念sama閱讀 43,466評論 1 300
  • 正文 獨居荒郊野嶺守林人離奇死亡照藻,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 35,937評論 2 323
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了汗侵。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片岩梳。...
    茶點故事閱讀 38,039評論 1 333
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖晃择,靈堂內(nèi)的尸體忽然破棺而出冀值,到底是詐尸還是另有隱情,我是刑警寧澤宫屠,帶...
    沈念sama閱讀 33,701評論 4 323
  • 正文 年R本政府宣布列疗,位于F島的核電站,受9級特大地震影響浪蹂,放射性物質(zhì)發(fā)生泄漏抵栈。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 39,254評論 3 307
  • 文/蒙蒙 一坤次、第九天 我趴在偏房一處隱蔽的房頂上張望古劲。 院中可真熱鬧,春花似錦缰猴、人聲如沸产艾。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,259評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽闷堡。三九已至,卻和暖如春疑故,著一層夾襖步出監(jiān)牢的瞬間杠览,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,485評論 1 262
  • 我被黑心中介騙來泰國打工纵势, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留踱阿,地道東北人管钳。 一個月前我還...
    沈念sama閱讀 45,497評論 2 354
  • 正文 我出身青樓,卻偏偏與公主長得像软舌,于是被迫代替她去往敵國和親才漆。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 42,786評論 2 345