ExecutorService抽象從java5一直持續(xù)到現(xiàn)在氧卧。我們在這里討論2004桃笙,簡單提醒一下:java5和java6將不會被支持,java7won’t be in half a year沙绝。我提出這個問題的原因是因為很多java程序員仍然不能完全理解ExecutorService的工作原理搏明。有很多地方需要了解,今天我想分享一些鮮為人知的特性和實踐闪檬。然而這篇文章針對中級程序員星著,沒有特別牛逼的。
1粗悯,命名線程池
我不能強調(diào)這一點虚循。在dump一個運行的jvm的所有線程或者調(diào)試的時候,默認(rèn)的線程池命名模式是pool-N-thread-M,其中N代表線程池序列號(每次你創(chuàng)建一個新的線程池样傍,全局N計數(shù)增加)横缔,M是線程池里面線程的序列號。例如pool-2-thread-3表示在jvm進程生命周期里面創(chuàng)建的第二個線程池的第三個線程衫哥。參閱:Executors.defaultThreadFactory()茎刚。不是很具描述性。JDK使正確命名線程變得稍微有的復(fù)雜因為命名策略隱藏在ThreadFactory撤逢。幸運的是Guava(google開源的一組工具類集合)工具包有個幫助類來做這件事情:
import com.google.common.util.concurrent.ThreadFactoryBuilder;
//通過ThreadFactoryBuilder這個類來設(shè)置線程池的名稱并返回一個ThreadFactory
final ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setNameFormat("Orders-%d")
.setDaemon(true)
.build();
final ExecutorService executorService = Executors.newFixedThreadPool(10, threadFactory);
默認(rèn)情況下線程池創(chuàng)建非守護線程膛锭,取決于你是否需要這種類型的線程。
2蚊荣,根據(jù)上下文切換名稱
這是一個我從supercharged-jstack-how-to-debug-your-servers-at-100mph學(xué)到的技巧初狰,一旦我們記住線程的名稱,我們可以在運行隨時修改他們互例!這是有意義的因為線程dump顯示了類和方法名稱奢入,而不是參數(shù)和本地變量。通過調(diào)整線程名稱來保留一些基本的事物標(biāo)識符媳叨,我們可以容易跟蹤哪個消息/記錄/查詢等很慢或者引起了死鎖俊马。例如:
private void process(String messageId) {
executorService.submit(() -> {
final Thread currentThread = Thread.currentThread();
final String oldName = currentThread.getName();
currentThread.setName("Processing-" + messageId);
try {
//這里是業(yè)務(wù)邏輯...
} finally {
currentThread.setName(oldName);
}
});
}
在try-finally代碼塊里面當(dāng)前線程命名為Processing-WHATEVER-MESSAGE-ID-IS丁存。當(dāng)追蹤經(jīng)過這個系統(tǒng)的消息的時候這可能會帶來便利。
3柴我,明確和安全的關(guān)閉線程池
在客戶線程(待提交運行的任務(wù))和線程池(執(zhí)行任務(wù)的線程)之間有個任務(wù)對列解寝。當(dāng)你的應(yīng)用程序關(guān)閉的時候,你必須關(guān)心兩間事情:排隊的任務(wù)如何處理以及已經(jīng)在線程池里面的任務(wù)怎么運行(稍后會詳細(xì)介紹)艘儒。令人驚訝的是很多開發(fā)者并沒有正確地或有意識地關(guān)閉線程池聋伦。有兩種技術(shù):讓所有排隊的任務(wù)執(zhí)行(通過shutdown()這個方法)或者從隊列刪除他們(通過shutdownNow()這個方法)-完全取決于你的實際情況。例如如果我們想提交一堆任務(wù)并且想當(dāng)它們都完成了才結(jié)束界睁,這個情況可以使用shutdown():
private void sendAllEmails(List<String> emails) throws InterruptedException {
emails.forEach(email ->
executorService.submit(() ->
sendEmail(email)));
executorService.shutdown();
final boolean done = executorService.awaitTermination(1, TimeUnit.MINUTES);
log.debug("All e-mails were sent so far? {}", done);
在這個場景觉增,我們發(fā)送一堆郵件,每一個郵件的發(fā)送在線程池里面作為一個單獨的任務(wù)翻斟。在提交這些任務(wù)之后我們關(guān)閉線程池以至于它不再接收新的任務(wù)逾礁。這時候我們等待最長一分鐘,直到這些任務(wù)都完成访惜。然而一些任務(wù)仍然未結(jié)束嘹履,awaitTermination()會返回false。此外债热,未結(jié)束的任務(wù)會繼續(xù)執(zhí)行砾嫉。我知道趕時髦的人這樣做:
emails.parallelStream().forEach(this::sendEmail);
稱我為老式的,但是我喜歡控制并發(fā)線程的數(shù)量窒篱。不要緊焕刮,一個優(yōu)雅替代shutdown()的是shutdownNow():
final List<Runnable> rejected = executorService.shutdownNow();
log.debug("Rejected tasks: {}", rejected.size());
這次所有排隊任務(wù)會被丟棄返回。已經(jīng)執(zhí)行的任務(wù)任然可以繼續(xù)執(zhí)行墙杯。
4配并,謹(jǐn)慎處理中斷
Future接口的鮮為人知的功能是取消。查看之前的文章InterruptedException and interrupting threads explained
5,監(jiān)控隊列長度并且讓隊列有界
大小不正確的線程池可能導(dǎo)致緩慢高镐,不穩(wěn)定以及內(nèi)存溢出溉旋。如果你配置太少的線程,隊列會堆積避消,耗費很多內(nèi)存低滩。另一方面太多的線程會減慢整個系統(tǒng)召夹,因為過多的線程上下文切換 - 并導(dǎo)致和之前相當(dāng)?shù)陌Y狀岩喷。查看隊列的深度并保持有界很重要,因此超載的線程暫時拒絕新的任務(wù)监憎。
final BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(100);
executorService = new ThreadPoolExecutor(n, n,
0L, TimeUnit.MILLISECONDS,
queue);
上面的代碼等同于Executors.newFixedThreadPool(n)纱意。而不是使用默認(rèn)的無界LinkedBlockingQueue我們使用容量大小為100的ArrayBlockingQueue。這意味著如果一百個任務(wù)已經(jīng)在隊列里面了(并且n個正在執(zhí)行)新的任務(wù)會被拒絕返回RejectedExecutionException鲸阔。此外偷霉,由于隊列現(xiàn)在可以在外部使用迄委,我們可以定期調(diào)用size()并將其放在logs / JMX /任何您使用的監(jiān)視機制中。
6类少,記得處理異常
下面代碼片段的結(jié)果是叙身?
executorService.submit(() -> {
System.out.println(1 / 0);
});
我被上面的代碼坑過很多次了,它不會打印任何東西硫狞。沒有 java.lang.ArithmeticException: / by zero 整個的標(biāo)注信轿,什么都沒有。線程池只是吞掉整個異常残吩,好像它從未發(fā)生過一樣财忽。如果它是一個從頭開始創(chuàng)建的線程,UncaughtExceptionHandler 可以工作泣侮。但是使用線程池必須更加小心即彪。如果你提交一個Runnable(像上面沒有任何結(jié)果)你需要用try catche包括代碼主體,并且打印日志活尊。如果你提交一個Callable,確保你總是使用阻塞get()取消引用它來重新拋出異常:
final Future<Integer> division = executorService.submit(() -> 1 / 0);
//below will throw ExecutionException caused by ArithmeticException
division.get();
有趣的是即使spring框架使用@Async提交了整個bug,參閱:SPR-8995和SPR-12090隶校。
7,監(jiān)控在隊列里面的等待時間
監(jiān)控工作隊列深度是一方面酬凳。然而在追蹤單個事物/任務(wù)問題的時候很有必要看下在提交任務(wù)和實際執(zhí)行之間花了多少時間惠况。該持續(xù)時間應(yīng)該優(yōu)先為0(當(dāng)線程池里面有空閑線程的時候),然而它會增長當(dāng)任務(wù)必須排隊的時候宁仔。此外稠屠,如果線程池沒有一個固定的線程數(shù),運行新的任務(wù)需要創(chuàng)建新的線程翎苫,也會消耗短暫的時間权埠。為了清楚的監(jiān)控這個指標(biāo),用與此類似的東西包轉(zhuǎn)原來的ExecutorService:
public class WaitTimeMonitoringExecutorService implements ExecutorService {
private final ExecutorService target;
public WaitTimeMonitoringExecutorService(ExecutorService target) {
this.target = target;
}
@Override
public <T> Future<T> submit(Callable<T> task) {
final long startTime = System.currentTimeMillis();
return target.submit(() -> {
final long queueDuration = System.currentTimeMillis() - startTime;
log.debug("Task {} spent {}ms in queue", task, queueDuration);
return task.call();
}
);
}
@Override
public <T> Future<T> submit(Runnable task, T result) {
return submit(() -> {
task.run();
return result;
});
}
@Override
public Future<?> submit(Runnable task) {
return submit(new Callable<Void>() {
@Override
public Void call() throws Exception {
task.run();
return null;
}
});
}
//...
}
這不是一個完整的實現(xiàn)煎谍,但你得到了基本的思想攘蔽。當(dāng)我們像線程池提交任務(wù)的時候,我們立即測量開始時間呐粘,一旦任務(wù)被選擇并執(zhí)行我們就停止測量满俗。不要被源代碼中的startTime和queueDuration非常接近而迷惑。事實上作岖,這兩行是在不同的線程中進行評估的唆垃,可能是幾毫秒甚至幾秒,例如:
Task com.nurkiewicz.MyTask@7c7f3894 spent 9883ms in queue
8痘儡,保留客戶端堆棧
如今辕万,反應(yīng)式編程似乎引起了很多關(guān)注。 Reactive manifesto, reactive streams, RxJava (just released 1.0!), Clojure agents, scala.rx… 他們都很好用,但堆棧跟蹤不再是你的朋友渐尿,它們至多是無用的醉途。例如,在提交給線程池的任務(wù)中發(fā)生異常:
java.lang.NullPointerException: null
at com.nurkiewicz.MyTask.call(Main.java:76) ~[classes/:na]
at com.nurkiewicz.MyTask.call(Main.java:72) ~[classes/:na]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[na:1.8.0]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) ~[na:1.8.0]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) ~[na:1.8.0]
at java.lang.Thread.run(Thread.java:744) ~[na:1.8.0]
我們?nèi)菀椎匕l(fā)下在76行MyTask threw NPE砖茸,但是我們不知道誰提交了這個任務(wù)隘擎,因為堆棧跟蹤只顯示Thread和ThreadPoolExecutor。我們可以在技術(shù)上瀏覽源代碼凉夯,希望找到一個創(chuàng)建MyTask的地方嵌屎。但是沒有線程(更不用說事件驅(qū)動,響應(yīng)式恍涂, actor-ninja-programming)我們可以立即看到全貌宝惰。如果我們可以保留客戶端代碼的堆棧并且顯示它,例如如果失敗了再沧?這個想法并不新鮮尼夺,例如Hazelcast from owner node to client code。這就是在出現(xiàn)故障時保持客戶端堆棧跟蹤的天真支持:
public class ExecutorServiceWithClientTrace implements ExecutorService {
protected final ExecutorService target;
public ExecutorServiceWithClientTrace(ExecutorService target) {
this.target = target;
}
@Override
public <T> Future<T> submit(Callable<T> task) {
return target.submit(wrap(task, clientTrace(), Thread.currentThread().getName()));
}
private <T> Callable<T> wrap(final Callable<T> task, final Exception clientStack, String clientThreadName) {
return () -> {
try {
return task.call();
} catch (Exception e) {
log.error("Exception {} in task submitted from thrad {} here:", e, clientThreadName, clientStack);
throw e;
}
};
}
private Exception clientTrace() {
return new Exception("Client stack trace");
}
@Override
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
return tasks.stream().map(this::submit).collect(toList());
}
//...
}
這次在出現(xiàn)故障的時候我們可以檢索提交任務(wù)的地方所有的堆棧和線程的名稱炒瘸。與之前看到的標(biāo)準(zhǔn)異常相比淤堵,它更有價值:
Exception java.lang.NullPointerException in task submitted from thrad main here:
java.lang.Exception: Client stack trace
at com.nurkiewicz.ExecutorServiceWithClientTrace.clientTrace(ExecutorServiceWithClientTrace.java:43) ~[classes/:na]
at com.nurkiewicz.ExecutorServiceWithClientTrace.submit(ExecutorServiceWithClientTrace.java:28) ~[classes/:na]
at com.nurkiewicz.Main.main(Main.java:31) ~[classes/:na]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0]
at java.lang.reflect.Method.invoke(Method.java:483) ~[na:1.8.0]
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:134) ~[idea_rt.jar:na]
9,優(yōu)先CompletableFuture
在Java 8中引入了更強大的CompletableFuture。請盡可能使用它顷扩。ExecutorService未擴展為支持這種增強的抽象拐邪,因此您必須自己處理它。代替:
final Future<BigDecimal> future =
executorService.submit(this::calculate);
這樣做:
final CompletableFuture<BigDecimal> future =
CompletableFuture.supplyAsync(this::calculate, executorService);
CompletableFuture擴展了Future隘截,所以一切都像以前一樣工作扎阶。 但是,API的更高級消費者將真正欣賞CompletableFuture提供的擴展功能婶芭。
10东臀,同步隊列
SynchronousQueue是一個有趣的BlockingQueue,它不是真正的隊列犀农。 它本身甚至都不是數(shù)據(jù)結(jié)構(gòu)惰赋。 最好將其解釋為容量為0的隊列。引用JavaDoc:
”each insert operation must wait for a corresponding remove operation by another thread, and vice versa. A synchronous queue does not have any internal capacity, not even a capacity of one. You cannot peek at a synchronous queue because an element is only present when you try to remove it; you cannot insert an element (using any method) unless another thread is trying to remove it; you cannot iterate as there is nothing to iterate. […]
Synchronous queues are similar to rendezvous channels used in CSP and Ada.“
這個怎么和線程池關(guān)聯(lián)上呢呵哨?嘗試將SynchronousQueue與ThreadPoolExecutor一起使用:
BlockingQueue<Runnable> queue = new SynchronousQueue<>();
ExecutorService executorService = new ThreadPoolExecutor(n, n,
0L, TimeUnit.MILLISECONDS,
queue);
我們創(chuàng)建了一個帶有兩個線程的線程池赁濒,并在它前面有一個SynchronousQueue。因為SynchronousQueue本質(zhì)上是一個容量為0的隊列孟害,所以如果有可用的空閑線程拒炎,這樣的ExecutorService將只接受新任務(wù)。 如果所有線程都忙纹坐,新任務(wù)將立即被拒絕枝冀,永遠(yuǎn)不會等待。 當(dāng)在后臺處理必須立即開始或被丟棄時耘子,這個執(zhí)行方式是被期待的果漾。
就是這樣,我希望你找到至少一個有趣的功能谷誓!
最后英文原文