今天對(duì)五種常見(jiàn)的java內(nèi)置線(xiàn)程池進(jìn)行講解。
線(xiàn)程使用的demo
public static void cache() {
ExecutorService pool = Executors.newCachedThreadPool();
long start = System.currentTimeMillis();
pool.execute(() -> {
int sum = 0;
for (int i = 0; i < 10; i++) {
sum = (int) Math.sqrt(i * i - 1 + i);
System.out.println(sum);
}
});
System.out.println("cache: " + (System.currentTimeMillis() - start));
}
newCachedThreadPool
- 重用之前的線(xiàn)程
- 適合執(zhí)行許多短期異步任務(wù)的程序顽铸。
- 調(diào)用 execute() 將重用以前構(gòu)造的線(xiàn)程
- 如果沒(méi)有可用的線(xiàn)程嫉到,則創(chuàng)建一個(gè)新線(xiàn)程并添加到池中
- 默認(rèn)為60s未使用就被終止和移除
- 長(zhǎng)期閑置的池將會(huì)不消耗任何資源
源碼:
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
通過(guò)源碼可以看出底層調(diào)用的是ThreadPoolExecutor方法奸晴,傳入一個(gè)同步的阻塞隊(duì)列實(shí)現(xiàn)緩存绽诚。
下面說(shuō)一下ThreadPoolExecutor
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
通過(guò)源碼可以看出,我們可以傳入線(xiàn)程池的核心線(xiàn)程數(shù)(最小線(xiàn)程數(shù))内列,最大線(xiàn)程數(shù)量撵术,保持時(shí)間,時(shí)間單位话瞧,阻塞隊(duì)列這些參數(shù)嫩与,最大線(xiàn)程數(shù)設(shè)置為jvm可用的cpu數(shù)量為最佳實(shí)踐
newWorkStealingPool
- 獲取當(dāng)前可用的線(xiàn)程數(shù)量進(jìn)行創(chuàng)建作為并行級(jí)別
- 使用ForkJoinPool
源碼:
public static ExecutorService newWorkStealingPool() {
return new ForkJoinPool
(Runtime.getRuntime().availableProcessors(),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
通過(guò)源碼可以看出底層調(diào)用的是ForkJoinPool線(xiàn)程池
下面說(shuō)一下ForkJoinPool
public ForkJoinPool(int parallelism,
ForkJoinWorkerThreadFactory factory,
UncaughtExceptionHandler handler,
boolean asyncMode) {
this(checkParallelism(parallelism),
checkFactory(factory),
handler,
asyncMode ? FIFO_QUEUE : LIFO_QUEUE,
"ForkJoinPool-" + nextPoolId() + "-worker-");
checkPermission();
}
使用一個(gè)無(wú)限隊(duì)列來(lái)保存需要執(zhí)行的任務(wù)寝姿,可以傳入線(xiàn)程的數(shù)量,不傳入划滋,則默認(rèn)使用當(dāng)前計(jì)算機(jī)中可用的cpu數(shù)量饵筑,使用分治法來(lái)解決問(wèn)題,使用fork()和join()來(lái)進(jìn)行調(diào)用
newSingleThreadExecutor
- 在任何情況下都不會(huì)有超過(guò)一個(gè)任務(wù)處于活動(dòng)狀態(tài)
- 與newFixedThreadPool(1)不同是不能重新配置加入線(xiàn)程处坪,使用FinalizableDelegatedExecutorService進(jìn)行包裝
- 能保證執(zhí)行順序根资,先提交的先執(zhí)行。
- 當(dāng)線(xiàn)程執(zhí)行中出現(xiàn)異常同窘,去創(chuàng)建一個(gè)新的線(xiàn)程替換之
源碼:
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
newFixedThreadPool
- 創(chuàng)建重用固定數(shù)量線(xiàn)程的線(xiàn)程池玄帕,不能隨時(shí)新建線(xiàn)程
- 當(dāng)所有線(xiàn)程都處于活動(dòng)狀態(tài)時(shí),如果提交了其他任務(wù)想邦,
他們將在隊(duì)列中等待一個(gè)線(xiàn)程可用 - 線(xiàn)程會(huì)一直存在裤纹,直到調(diào)用shutdown
源碼:
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
newScheduledThreadPool
- 設(shè)定延遲時(shí)間,定期執(zhí)行
- 空閑線(xiàn)程會(huì)進(jìn)行保留
源碼:
return new ScheduledThreadPoolExecutor(corePoolSize);
通過(guò)源碼可以看出底層調(diào)用的是一個(gè)ScheduledThreadPoolExecutor案狠,然后傳入線(xiàn)程數(shù)量
下面來(lái)介紹一下ScheduledThreadPoolExecutor
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
通過(guò)源碼可以看出底層調(diào)用了ThreadPoolExecutor,維護(hù)了一個(gè)延遲隊(duì)列钱雷,可以傳入線(xiàn)程數(shù)量骂铁,傳入延時(shí)的時(shí)間等參數(shù),下面給出一個(gè)demo
public static void main(String[] args) {
ScheduledExecutorService pool = Executors.newScheduledThreadPool(5);
for (int i = 0; i < 15; i = i + 5) {
pool.schedule(() -> System.out.println("我被執(zhí)行了罩抗,當(dāng)前時(shí)間" + new Date()), i, TimeUnit.SECONDS);
}
pool.shutdown();
}
執(zhí)行結(jié)果
我被執(zhí)行了拉庵,當(dāng)前時(shí)間Fri Jan 12 11:20:41 CST 2018
我被執(zhí)行了,當(dāng)前時(shí)間Fri Jan 12 11:20:46 CST 2018
我被執(zhí)行了套蒂,當(dāng)前時(shí)間Fri Jan 12 11:20:51 CST 2018
有的小伙伴可能會(huì)用疑問(wèn)钞支,為什么使用schedule()而不使用submit()或者execute()呢,下面通過(guò)源碼來(lái)分析
public void execute(Runnable command) {
schedule(command, 0, NANOSECONDS);
}
public Future<?> submit(Runnable task) {
return schedule(task, 0, NANOSECONDS);
}
通過(guò)源碼可以發(fā)現(xiàn)這兩個(gè)方法都是調(diào)用的schedule(),而且將延時(shí)時(shí)間設(shè)置為了0操刀,所以想要實(shí)現(xiàn)延時(shí)操作烁挟,需要直接調(diào)用schedule()
下面我們?cè)賮?lái)分析一下submit()和execute()的以及shutdown()和shutdownNow()的區(qū)別
- submit(),提交一個(gè)線(xiàn)程任務(wù)骨坑,可以接受回調(diào)函數(shù)的返回值嗎撼嗓,適用于需要處理返回著或者異常的業(yè)務(wù)場(chǎng)景
- execute(),執(zhí)行一個(gè)任務(wù)欢唾,沒(méi)有返回值
- shutdown()且警,表示不再接受新任務(wù),但不會(huì)強(qiáng)行終止已經(jīng)提交或者正在執(zhí)行中的任務(wù)
- shutdownNow()礁遣,對(duì)于尚未執(zhí)行的任務(wù)全部取消斑芜,正在執(zhí)行的任務(wù)全部發(fā)出interrupt(),停止執(zhí)行
五種線(xiàn)程池的適應(yīng)場(chǎng)景
- newCachedThreadPool:用來(lái)創(chuàng)建一個(gè)可以無(wú)限擴(kuò)大的線(xiàn)程池祟霍,適用于服務(wù)器負(fù)載較輕杏头,執(zhí)行很多短期異步任務(wù)盈包。
- newFixedThreadPool:創(chuàng)建一個(gè)固定大小的線(xiàn)程池,因?yàn)椴捎脽o(wú)界的阻塞隊(duì)列大州,所以實(shí)際線(xiàn)程數(shù)量永遠(yuǎn)不會(huì)變化续语,適用于可以預(yù)測(cè)線(xiàn)程數(shù)量的業(yè)務(wù)中,或者服務(wù)器負(fù)載較重厦画,對(duì)當(dāng)前線(xiàn)程數(shù)量進(jìn)行限制疮茄。
- newSingleThreadExecutor:創(chuàng)建一個(gè)單線(xiàn)程的線(xiàn)程池,適用于需要保證順序執(zhí)行各個(gè)任務(wù)根暑,并且在任意時(shí)間點(diǎn)力试,不會(huì)有多個(gè)線(xiàn)程是活動(dòng)的場(chǎng)景。
- newScheduledThreadPool:可以延時(shí)啟動(dòng)排嫌,定時(shí)啟動(dòng)的線(xiàn)程池畸裳,適用于需要多個(gè)后臺(tái)線(xiàn)程執(zhí)行周期任務(wù)的場(chǎng)景。
- newWorkStealingPool:創(chuàng)建一個(gè)擁有多個(gè)任務(wù)隊(duì)列的線(xiàn)程池淳地,可以減少連接數(shù)怖糊,創(chuàng)建當(dāng)前可用cpu數(shù)量的線(xiàn)程來(lái)并行執(zhí)行,適用于大耗時(shí)的操作颇象,可以并行來(lái)執(zhí)行
以上就是今天所想要分享的內(nèi)容伍伤,由于并發(fā)接觸并不深入,如有錯(cuò)誤遣钳,請(qǐng)聯(lián)系博主
浩瀚星辰中渺小的我們扰魂,卻有著改變世界的夢(mèng)想