Java的線程既是工作單元,也是執(zhí)行機制。JDK 5開始测柠,把工作單元與執(zhí)行機制分離開來。工作單元包括Runnable和Callable缘滥,而執(zhí)行機制由Executor框架提供轰胁。
1.Executor框架簡介
①Executor框架的兩級調(diào)度模型
java.lang.Thread被一對一映射為本地操作系統(tǒng)線程。Java線程啟動時會創(chuàng)建一個本地操作系統(tǒng)線程朝扼;當該Java線程終止時赃阀,這個操作系統(tǒng)線程也會被回收。操作系統(tǒng)會調(diào)度所有線程并將它們分配給可用的CPU吟税。
在上層凹耙,Java多線程程序通常把應用分解為若干個任務姿现,然后使用用戶級的調(diào)度器(Executor框架)將這些任務映射為固定數(shù)量的線程肠仪;在底層,操作系統(tǒng)內(nèi)核將這些線程映射到硬件處理器上备典。
②Executor框架的結(jié)構(gòu)與成員
1)Executor框架的結(jié)構(gòu)
- 任務异旧。包括被執(zhí)行任務需要實現(xiàn)的接口Runnable和Callable接口。
- 任務的執(zhí)行提佣。包括任務執(zhí)行機制的核心接口Executor吮蛹,以及繼承自Executor的ExecutorService接口。Executor框架有兩個關鍵類實現(xiàn)了ExecutorService接口(ThreadPoolExecutor和ScheduledThreadPoolExecutor)拌屏。
- 一步計算的結(jié)果潮针。包括接口Future和實現(xiàn)Future接口的FutureTask類。
- Executor:是一個接口倚喂,是Executor框架的基礎每篷。
- ThreadPoolExecutor:是線程池的核心實現(xiàn)類,用來執(zhí)行被提交的任務端圈。
- ScheduledThreadPoolExecutor:是一個實現(xiàn)類焦读,可以在給定的延遲后運行命令,或者定期執(zhí)行命令。比Timer更靈活,功能更強大渣磷。
- Future接口和實現(xiàn)接口的FutureTask類:代表異步計算的結(jié)果域那。
- Runnable接口和Callable接口的實現(xiàn)類:都可以被ThreadPoolExecutor或ScheduledThreadPoolExecutor執(zhí)行辱姨。
2)Executor框架的成員
-
ThreadPoolExecutor
newFixedThreadPool:適用于為了滿足資源管理的需求产还,而需要限制當前線程數(shù)量的應用場景施禾,它適用于負載比較重的服務器祷嘶。
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); } public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory); }
newSingleThreadExecutor適用于需要保證順序地執(zhí)行各個任務吠冤;并且在任意時間點浑彰,不會有多個線程是活動的應用場景。
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory));
}
newCachedThreadPool是大小無界的線程池拯辙,適用于執(zhí)行很多的短期異步任務的小程序郭变,或者是負載較輕的服務器。
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
}
-
ScheduleThreadPoolExecutor
newScheduledThreadPool:包含若干個線程的ScheduledThreadPoolExecutor涯保。適用于需要多個后臺線程執(zhí)行周期任務诉濒,同時為了滿足資源管理的需求而需要限制后臺線程的數(shù)量的應用場景。
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); } public static ScheduledExecutorService newScheduledThreadPool( int corePoolSize, ThreadFactory threadFactory) { return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory); }
newSingleThreadScheduledExecutor:只包含一個線程的ScheduledThreadPoolExecutor夕春。適用于需要單個后臺線程執(zhí)行周期任務未荒,同時需要保證順序地執(zhí)行各個任務的應用場景。
public static ScheduledExecutorService newSingleThreadScheduledExecutor() { return new DelegatedScheduledExecutorService (new ScheduledThreadPoolExecutor(1)); } public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) { return new DelegatedScheduledExecutorService (new ScheduledThreadPoolExecutor(1, threadFactory)); }
-
Future
<T> Future<T> submit(Callable<T> task) <T> Future<T> submit(Runnable task, T result) Future<?> submit(Runnable task)
-
Runnable和Callable
public static Callable<Object> callable(Runnable task) { if (task == null) throw new NullPointerException(); return new RunnableAdapter<Object>(task, null); } public static <T> Callable<T> callable(Runnable task, T result) { if (task == null) throw new NullPointerException(); return new RunnableAdapter<T>(task, result); } static final class RunnableAdapter<T> implements Callable<T> { final Runnable task; final T result; RunnableAdapter(Runnable task, T result) { this.task = task; this.result = result; } public T call() { task.run(); return result; } }
2.ScheduleThreadPoolExecutor詳解
繼承自ThreadPoolExecutor及志。主要用來在給定的延遲后執(zhí)行運行任務片排,或者定期執(zhí)行任務。功能與Timer類似速侈,但功能更強大率寡、更靈活。Timer對應的是單個后臺線程倚搬,而ScheduleThreadPoolExecutor可以在構(gòu)造函數(shù)中指定多個對應的后臺線程數(shù)冶共。
①ScheduleThreadPoolExecutor的運行機制
當調(diào)用scheduleAtFixedRate方法或者scheduleWithFixedDelay方法時,會向ScheduleThreadPoolExecutor的DelayedWorkQueue添加了一個實現(xiàn)了RunnableScheduledFuture接口的ScheduledFutureTask每界。
線程池中的線程從DelayedWorkQueue中獲取ScheduledFutureTask捅僵,然后執(zhí)行任務。
ScheduleThreadPoolExecutor為了實現(xiàn)周期性的執(zhí)行任務眨层,對ThreadPoolExecutor做了如下的修改庙楚。
- 使用DelayedWorkQueue作為任務隊列。
- 獲取任務的方式不同趴樱。
- 執(zhí)行周期任務后馒闷,增加了額外的處理。
②ScheduleThreadPoolExecutor的實現(xiàn)
ScheduledFutureTask主要包含3個成員變量:
- long time;//表示這個任務將要被執(zhí)行的具體時間伊佃。
- long sequenceNumber;//表示這個任務被添加到ScheduleThreadPoolExecutor中的序號窜司。
- long period;//表示任務執(zhí)行的間隔周期。
- 從DelayWorkQueue中獲取已到期的ScheduleFutureTask(DelayWorkQueue.take())航揉。
- 線程1執(zhí)行這個ScheduleFutureTask塞祈。
- 線程1修改ScheduleFutureTask的time變量為下次將要執(zhí)行的時間。
- 線程1把這個修改time之后的ScheduleFutureTask放回DelayWorkQueue中(DelayWorkQueue.add())帅涂。
public RunnableScheduledFuture<?> take() throws InterruptedException {
//獲取鎖
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
//獲取周期任務
RunnableScheduledFuture<?> first = queue[0];
//任務數(shù)組為空
if (first == null)
available.await();//等待
else {
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0)
return finishPoll(first);//獲取頭部元素议薪,將最后一個元素放到第一位尤蛮,并自上而下添加到其堆排序點。
//頭部元素的time時間比當前時間大
first = null; // don't retain ref while waiting
if (leader != null)
available.await();
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
//到condition中等待到time時間
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && queue[0] != null)
available.signal();
lock.unlock();//釋放鎖
}
}
public boolean offer(Runnable x) {
if (x == null)
throw new NullPointerException();
RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
final ReentrantLock lock = this.lock;
lock.lock();//1 獲取鎖
try {
int i = size;
if (i >= queue.length)
grow();//調(diào)整堆數(shù)組大小
size = i + 1;
if (i == 0) {
queue[0] = e;
setIndex(e, 0);
} else {
siftUp(i, e);//2.1 在底部添加到堆排序點
}
if (queue[0] == e) {
leader = null;
available.signal();//2.2 通知
}
} finally {
lock.unlock();//3 釋放鎖
}
return true;
}
3.FutureTask詳解
①FutureTask簡介
實現(xiàn)了Future接口和Runnable接口斯议,可以交給Executor執(zhí)行产捞,也可以由調(diào)用線程直接執(zhí)行(FutureTask.run())。
FutureTask.get():未啟動和已啟動狀態(tài)時調(diào)用哼御,會導致線程阻塞坯临。已完成狀態(tài)時,調(diào)用線程立即返回結(jié)果或拋出異常恋昼。
FutureTask.cancel():未啟動狀態(tài)時看靠,調(diào)用將導致此任務永遠不會被執(zhí)行。已啟動狀態(tài)時液肌,cancel(true)將以中斷執(zhí)行此任務線程的方式來視圖停止任務挟炬;cancel(false)將不會對正在執(zhí)行此任務的線程產(chǎn)生任何影響(讓正在執(zhí)行的任務運行完成)。已完成狀態(tài)時嗦哆,執(zhí)行cancel返回false谤祖。
②FutureTask的使用
當一個線程需要等待另一個線程把某個任務執(zhí)行完成后才能繼續(xù)執(zhí)行,此時可以使用FutureTask老速。
假設有多個線程執(zhí)行若干個任務粥喜,每個任務最多只能被執(zhí)行一次,當多個線程視圖同時執(zhí)行同一個任務時烁峭,只允許一個線程執(zhí)行任務容客,其他線程需要等待這個任務執(zhí)行完成后才能繼續(xù)執(zhí)行秕铛。示例代碼:
private final ConcurrentHashMap<Object, Future<String>> taskCache = new ConcurrentHashMap<>();//每個任務只執(zhí)行一次约郁,多個線程可以獲取到執(zhí)行的結(jié)果
private String executionTask(final String taskName) throws ExecutionException, InterruptedException {
while (true) {
Future<String> future = taskCache.get(taskName); //1.1 執(zhí)行完1.3后執(zhí)行 2.1
if (future == null) {
FutureTask<String> futureTask = new FutureTask<>(() -> taskName);//1.2
//如果存在taskName,不更改value但两,返回舊值鬓梅,如果不存在,put
future = taskCache.putIfAbsent(taskName, futureTask);//1.3
if (future == null) {//put成功了
future = futureTask;
futureTask.run();//1.4執(zhí)行任務
}
}
try {
return future.get();//1.5 2.2
} catch (CancellationException e) {
taskCache.remove(taskName, future);//當taskName和future有映射關系的時候谨湘,才移除
}
}
}
當兩個線程試圖同時執(zhí)行同一個任務時绽快,如果Thread1執(zhí)行了1.3后Thread2執(zhí)行2.1,那么接下來Thread2將在2.2等待紧阔,知道Thread1執(zhí)行完成1.4后Thread2才能從2.2返回坊罢。