在實(shí)際生產(chǎn)環(huán)境中,線程的數(shù)量必須得到控制.大量創(chuàng)建線程對(duì)系統(tǒng)性能是有傷害的.
為了避免系統(tǒng)頻繁的創(chuàng)建和銷毀線程,我們可以讓創(chuàng)建的線程進(jìn)行復(fù)用,線程池中,總有那么幾個(gè)活躍線程,當(dāng)需要使用線程時(shí),可以從池子中隨便拿一個(gè)空閑線程,當(dāng)完成工作時(shí),不是馬上關(guān)閉線程,而是將這個(gè)線程放回線程池,方便下次再用.
通過(guò)Executors 可以創(chuàng)建特定功能的線程池,以下是平時(shí)主要用的幾種:
public static ExecutorService newFixedThreadPool(int nThreads)
public static ExecutorService newSingleThreadExecutor()
public static ExecutorService newCachedThreadPool()
- newFixedThreadPool(int nThreads)方法
/**
* @param nThreads the number of threads in the pool
* @return the newly created thread pool
* @throws IllegalArgumentException if {@code nThreads <= 0}
*/
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
根據(jù)JDK里面注釋可以得到: 該方法返回一個(gè)固定線程數(shù)量的線程池,
當(dāng)有新的任務(wù)提交,線程池中如果有空閑線程,則立即執(zhí)行.如果所有線程都在活動(dòng)狀態(tài),則新的任務(wù)會(huì)被暫存的在一個(gè)任務(wù)隊(duì)列中,等到線程空閑時(shí),就處理任務(wù)隊(duì)列中的任務(wù).
- newSingleThreadExecutor()方法
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
改方法傳回一個(gè)只有一個(gè)線程的線程池,多余的任務(wù)提交后將會(huì)保存到任務(wù)隊(duì)列中,待線程空閑,按先入先出的順序執(zhí)行隊(duì)列中的任務(wù).
- newCachedThreadPool()方法
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
改方法返回一個(gè)可根據(jù)實(shí)際情況調(diào)整線程數(shù)量的線程池.線程池的線程數(shù)量不確定,如果有空閑線程,就用空閑線程,如果所有線程都在工作,又有新的任務(wù)提交,則會(huì)創(chuàng)建新的線程處理任務(wù).
簡(jiǎn)單介紹線程池的用法
public class ThreadPoolDemo {
public static class Mytask implements Runnable{
@Override
public void run() {
System.out.println(System.currentTimeMillis()+ "---" + Thread.currentThread().getId());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
Mytask mytask = new Mytask();
ExecutorService service = Executors.newFixedThreadPool(5);
for (int i = 0; i < 10; i++) {
service.submit(mytask);
}
}
}
得到運(yùn)行結(jié)果
根據(jù)結(jié)果可以看到
10個(gè)任務(wù)分為兩批執(zhí)行,第一批和第二批的線程ID正好是一致的,而且前五個(gè)和后五個(gè)任務(wù)的時(shí)間正好相差一秒鐘.
- 線程池的內(nèi)部實(shí)現(xiàn):
根據(jù)上面三個(gè)簡(jiǎn)單線程池的代碼表面分析,都是使用ThreadPoolExecutor實(shí)現(xiàn).現(xiàn)在分析一下ThreadPoolExecutor;
/**
* Creates a new {@code ThreadPoolExecutor} with the given initial
* parameters.
*
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @param maximumPoolSize the maximum number of threads to allow in the
* pool
* @param keepAliveTime when the number of threads is greater than
* the core, this is the maximum time that excess idle threads
* will wait for new tasks before terminating.
* @param unit the time unit for the {@code keepAliveTime} argument
* @param workQueue the queue to use for holding tasks before they are
* executed. This queue will hold only the {@code Runnable}
* tasks submitted by the {@code execute} method.
* @param threadFactory the factory to use when the executor
* creates a new thread
* @param handler the handler to use when execution is blocked
* because the thread bounds and queue capacities are reached
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
- corePoolSize : 線程池中的線程數(shù)量
- maximumPoolSize : 線程池中可以容納的最大線程數(shù)
- keepAliveTime : 超過(guò)指定corePoolSize 的線程的存活時(shí)間.
- unit : keepAliveTime的時(shí)間單位
- workQueue : 沒(méi)有被提交的任務(wù)所存放的任務(wù)隊(duì)列.
- ThreadFactory: 線程工廠
- RejectedExecutionHandler : 拒絕策略, 當(dāng)任務(wù)來(lái)不及處理,如何拒絕任務(wù).
workQueue 沒(méi)有被提交的任務(wù)所存放的任務(wù)隊(duì)列 ,它是BlockingQueue接口的具體實(shí)現(xiàn),下面是對(duì)不同線程池的不同分析
通過(guò)看源碼可以得到:
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
newFixedThreadPool 該線程池的開(kāi)始線程數(shù)量和最大線程數(shù)量是一開(kāi)始就設(shè)置好的,并且使用了LinkedBlockingQueue 存放沒(méi)有被執(zhí)行的線程, LinkedBlockingQueue 是基于鏈表的實(shí)現(xiàn),適合做無(wú)界隊(duì)列,當(dāng)任務(wù)提交非常頻繁的時(shí)候, LinkedBlockingQueue 可能迅速膨脹.
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
newCachedThreadPool 該線程池的corePoolSize 是0, maximumPoolSize 為無(wú)窮大,意味著當(dāng)沒(méi)有任務(wù)提交時(shí),線程池里面沒(méi)有線程,當(dāng)有任務(wù)提交時(shí),會(huì)檢查是否有空閑線程,如果有的話就用空閑線程,如果沒(méi)有空閑線程,就會(huì)將任務(wù)加入 SynchronousQueue 隊(duì)列,
SynchronousQueue 是一種直接提交的隊(duì)列,它沒(méi)有容量,如果想要插入一個(gè)新的元素,就必須刪除一個(gè)元素,所以提交給他的任務(wù)不會(huì)真實(shí)的保存,而是直接扔給線程池執(zhí)行,當(dāng)任務(wù)執(zhí)行完畢后,由于corePoolSize = 0,所以空閑線程又會(huì)在指定的時(shí)間內(nèi)被回收.所以如果任務(wù)太多而當(dāng)任務(wù)執(zhí)行不夠快時(shí),他會(huì)開(kāi)啟等量的線程,這樣做線程池可能會(huì)開(kāi)啟大量的線程(后果自己想)
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
newSingleThreadExecutor 只是簡(jiǎn)單的將線程池線程數(shù)量設(shè)置為1,其余的和newFixedThreaPool 沒(méi)啥區(qū)別!