java Executors 類提供了四種線程池敛熬,分別為:
newCachedThreadPool 創(chuàng)建一個(gè)可緩存線程池池摧,如果線程池長度超過處理需要荔泳,可靈活回收空閑線程,若無可回收尊流,則新建線程。
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
}
newFixedThreadPool 創(chuàng)建一個(gè)定長線程池灯帮,可控制線程最大并發(fā)數(shù)崖技,超出的線程會(huì)在隊(duì)列中等待。
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
newScheduledThreadPool 創(chuàng)建一個(gè)定長線程池钟哥,支持定時(shí)及周期性任務(wù)執(zhí)行迎献。
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
newSingleThreadExecutor 創(chuàng)建一個(gè)單線程化的線程池,它只會(huì)用唯一的工作線程來執(zhí)行任務(wù)腻贰,保證所有任務(wù)按照指定順序(FIFO, LIFO, 優(yōu)先級(jí))執(zhí)行吁恍。
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
SynchronousQueue
SynchronousQueue 是無界的,是一種無緩沖的等待隊(duì)列播演,但是由于該 Queue 本身的特性冀瓦,在某次添加元素后必須等待其他線程取走后才能繼續(xù)添加;
可以認(rèn)為 SynchronousQueue 是一個(gè)緩存值為 1 的阻塞隊(duì)列写烤,但是 isEmpty()方法永遠(yuǎn)返回是true翼闽,remainingCapacity() 方法永遠(yuǎn)返回是 0,remove() 和 removeAll() 方法永遠(yuǎn)返回是 false劝评,iterator() 方法永遠(yuǎn)返回空绸吸,peek() 方法永遠(yuǎn)返回null。
聲明一個(gè) SynchronousQueue 有兩種不同的方式芽隆,它們之間有著不太一樣的行為询微。公平模式和非公平模式的區(qū)別:如果采用公平模式:SynchronousQueue 會(huì)采用公平鎖崖瞭,并配合一個(gè)FIFO隊(duì)列來阻塞多余的生產(chǎn)者和消費(fèi)者,從而體系整體的公平策略撑毛;
但如果是非公平模式(SynchronousQueue 默認(rèn)):SynchronousQueue 采用非公平鎖书聚,同時(shí)配合一個(gè)LIFO隊(duì)列來管理多余的生產(chǎn)者和消費(fèi)者,而后一種模式代态,如果生產(chǎn)者和消費(fèi)者的處理速度有差距寺惫,則很容易出現(xiàn)饑渴的情況,即可能有某些生產(chǎn)者或者是消費(fèi)者的數(shù)據(jù)永遠(yuǎn)都得不到處理蹦疑。
LinkedBlockingQueue
LinkedBlockingQueue 是無界的西雀,是一個(gè)無界緩存的等待隊(duì)列。
基于鏈表的阻塞隊(duì)列歉摧,內(nèi)部維持著一個(gè)數(shù)據(jù)緩沖隊(duì)列(該隊(duì)列由鏈表構(gòu)成)艇肴。
當(dāng)生產(chǎn)者往隊(duì)列中放入一個(gè)數(shù)據(jù)時(shí),隊(duì)列會(huì)從生產(chǎn)者手中獲取數(shù)據(jù)叁温,并緩存在隊(duì)列內(nèi)部再悼,而生產(chǎn)者立即返回;只有當(dāng)隊(duì)列緩沖區(qū)達(dá)到最大值緩存容量時(shí)(LinkedBlockingQueue 可以通過構(gòu)造函數(shù)指定該值)膝但,才會(huì)阻塞生產(chǎn)者隊(duì)列冲九,直到消費(fèi)者從隊(duì)列中消費(fèi)掉一份數(shù)據(jù),生產(chǎn)者線程會(huì)被喚醒跟束,反之對(duì)于消費(fèi)者這端的處理也基于同樣的原理莺奸。
LinkedBlockingQueue 之所以能夠高效的處理并發(fā)數(shù)據(jù),還因?yàn)槠鋵?duì)于生產(chǎn)者端和消費(fèi)者端分別采用了獨(dú)立的鎖來控制數(shù)據(jù)同步冀宴,這也意味著在高并發(fā)的情況下生產(chǎn)者和消費(fèi)者可以并行地操作隊(duì)列中的數(shù)據(jù)灭贷,以此來提高整個(gè)隊(duì)列的并發(fā)性能。
DelayedWorkQueue
DelayedWorkQueue 是無界的優(yōu)先級(jí)隊(duì)列略贮。用數(shù)組儲(chǔ)存數(shù)據(jù)甚疟,元素個(gè)數(shù)超過數(shù)組長度,就會(huì)調(diào)用grow()方法逃延,進(jìn)行數(shù)組擴(kuò)容览妖。
內(nèi)部用對(duì)實(shí)現(xiàn)優(yōu)先級(jí)隊(duì)列,將新元素添加到優(yōu)先級(jí)隊(duì)列中對(duì)應(yīng)的位置揽祥,通過siftUp方法黄痪,保證按照元素的優(yōu)先級(jí)排序。
如果新插入的元素是隊(duì)列頭盔然,即更換了隊(duì)列頭桅打,那么就要喚醒正在等待獲取任務(wù)的線程是嗜。這些線程可能是因?yàn)樵?duì)列頭元素的延時(shí)時(shí)間沒到,而等待的挺尾。
保證添加到隊(duì)列中的任務(wù)鹅搪,會(huì)按照任務(wù)的延時(shí)時(shí)間進(jìn)行排序,延時(shí)時(shí)間少的任務(wù)首先被獲取遭铺。
使用示例:
newCachedThreadPool:
package Demo;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ThreadPoolExecutorTest {
public static void main(String[] args) {
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
for (int i = 0; i < 10; i++) {
final int index = i;
try {
Thread.sleep(index * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
cachedThreadPool.execute(new Runnable() {
public void run() {
System.out.println(index);
}
});
}
}
}
newFixedThreadPool
package Demo;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ThreadPoolExecutorTest {
public static void main(String[] args) {
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3);
for (int i = 0; i < 10; i++) {
final int index = i;
fixedThreadPool.execute(new Runnable() {
public void run() {
try {
System.out.println(index);
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
}
}
newScheduledThreadPool
package Demo;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class ThreadPoolExecutorTest {
public static void main(String[] args) {
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);
scheduledThreadPool.schedule(new Runnable() {
public void run() {
System.out.println("delay 3 seconds");
}
}, 3, TimeUnit.SECONDS);
}
}
newSingleThreadExecutor
package Demo;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ThreadPoolExecutorTest {
public static void main(String[] args) {
ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
for (int i = 0; i < 10; i++) {
final int index = i;
singleThreadExecutor.execute(new Runnable() {
public void run() {
try {
System.out.println(index);
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
}
}