前言
上一篇內(nèi)容寫了Java
中線程池的實(shí)現(xiàn)原理及源碼分析嫌褪,說好的是實(shí)實(shí)在在的大滿足,想通過一篇文章讓大家對(duì)線程池有個(gè)透徹的了解,但是文章寫完總覺得還缺點(diǎn)什么?
上篇文章只提到線程提交的execute()
方法,并沒有講解線程提交的submit()
方法芒珠,submit()
有一個(gè)返回值,可以獲取線程執(zhí)行的結(jié)果Future<T>
缨伊,這一講就那深入學(xué)習(xí)下submit()
和FutureTask
實(shí)現(xiàn)原理。
使用場(chǎng)景&示例
使用場(chǎng)景
我能想到的使用場(chǎng)景就是在并行計(jì)算的時(shí)候进宝,例如一個(gè)方法中調(diào)用methodA()刻坊、methodB()
,我們可以通過線程池異步去提交方法A党晋、B谭胚,然后在主線程中獲取組裝方法A徐块、B計(jì)算后的結(jié)果,能夠大大提升方法的吞吐量灾而。
使用示例
/**
* @author wangmeng
* @date 2020/5/28 15:30
*/
public class FutureTaskTest {
public static void main(String[] args) throws InterruptedException, ExecutionException {
ExecutorService threadPool = Executors.newCachedThreadPool();
System.out.println("====執(zhí)行FutureTask線程任務(wù)====");
Future<String> futureTask = threadPool.submit(new Callable<String>() {
@Override
public String call() throws Exception {
System.out.println("FutureTask執(zhí)行業(yè)務(wù)邏輯");
Thread.sleep(2000);
System.out.println("FutureTask業(yè)務(wù)邏輯執(zhí)行完畢胡控!");
return "歡迎關(guān)注: 一枝花算不算浪漫!";
}
});
System.out.println("====執(zhí)行主線程任務(wù)====");
Thread.sleep(1000);
boolean flag = true;
while(flag){
if(futureTask.isDone() && !futureTask.isCancelled()){
System.out.println("FutureTask異步任務(wù)執(zhí)行結(jié)果:" + futureTask.get());
flag = false;
}
}
threadPool.shutdown();
}
}
上面的使用很簡(jiǎn)單绰疤,submit()
內(nèi)部傳遞的實(shí)際上是個(gè)Callable
接口铜犬,我們自己實(shí)現(xiàn)其中的call()
方法舞终,我們通過futureTask
既可以獲取到具體的返回值轻庆。
submit()實(shí)現(xiàn)原理
submit()
是也是提交任務(wù)到線程池,只是它可以獲取任務(wù)返回結(jié)果敛劝,返回結(jié)果是通過FutureTask
來實(shí)現(xiàn)的余爆,先看下ThreadPoolExecutor
中代碼實(shí)現(xiàn):
public class ThreadPoolExecutor extends AbstractExecutorService {
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
}
public abstract class AbstractExecutorService implements ExecutorService {
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}
}
提交任務(wù)還是執(zhí)行execute()
方法,只是task
被包裝成了FutureTask
夸盟,也就是在excute()
中啟動(dòng)線程后會(huì)執(zhí)行FutureTask.run()
方法蛾方。
再來具體看下它執(zhí)行的完整鏈路圖:
上圖可以看到,執(zhí)行任務(wù)并返回執(zhí)行結(jié)果的核心邏輯實(shí)在FutureTask
中上陕,我們以FutureTask.run/get
兩個(gè)方法為突破口桩砰,一點(diǎn)點(diǎn)剖析FutureTask
的實(shí)現(xiàn)原理。
FutureTask源碼初探
先看下FutureTask
中部分屬性:
public class FutureTask<V> implements RunnableFuture<V> {
private volatile int state;
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;
private Callable<V> callable;
private Object outcome;
private volatile Thread runner;
private volatile WaitNode waiters;
}
- state
當(dāng)前task狀態(tài)释簿,共有7中類型亚隅。
NEW: 當(dāng)前任務(wù)尚未執(zhí)行
COMPLETING: 當(dāng)前任務(wù)正在結(jié)束,尚未完全結(jié)束庶溶,一種臨界狀態(tài)
NORMAL:當(dāng)前任務(wù)正常結(jié)束
EXCEPTIONAL: 當(dāng)前任務(wù)執(zhí)行過程中發(fā)生了異常煮纵。
CANCELLED: 當(dāng)前任務(wù)被取消
INTERRUPTING: 當(dāng)前任務(wù)中斷中..
INTERRUPTED: 當(dāng)前任務(wù)已中斷
- callble
用戶提交任務(wù)傳遞的Callable,自定義call方法偏螺,實(shí)現(xiàn)業(yè)務(wù)邏輯
- outcome
任務(wù)結(jié)束時(shí)行疏,outcome保存執(zhí)行結(jié)果或者異常信息。
- runner
當(dāng)前任務(wù)被線程執(zhí)行期間套像,保存當(dāng)前任務(wù)的線程對(duì)象引用
- waiters
因?yàn)闀?huì)有很多線程去get當(dāng)前任務(wù)的結(jié)果酿联,所以這里使用了一種stack數(shù)據(jù)結(jié)構(gòu)來保存
FutureTask.run()實(shí)現(xiàn)原理
我們已經(jīng)知道在線程池runWorker()
中最終會(huì)調(diào)用到FutureTask.run()
方法中,我們就來看下它的執(zhí)行原理吧:
具體代碼如下:
public class FutureTask<V> implements RunnableFuture<V> {
public void run() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result);
}
} finally {
runner = null;
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
}
首先是判斷FutureTask
中state
狀態(tài)夺巩,必須是NEW
才可以繼續(xù)執(zhí)行货葬。
然后通過CAS
修改runner
引用為當(dāng)前線程。
接著執(zhí)行用戶自定義的call()
方法劲够,將返回結(jié)果設(shè)置到result中震桶,result可能為正常返回也可能為異常信息。這里主要是調(diào)用set()/setException()
FutureTask.set()實(shí)現(xiàn)原理
set()
方法的實(shí)現(xiàn)很簡(jiǎn)單征绎,直接看下代碼:
public class FutureTask<V> implements RunnableFuture<V> {
protected void set(V v) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v;
UNSAFE.putOrderedInt(this, stateOffset, NORMAL);
finishCompletion();
}
}
}
將call()
返回的數(shù)據(jù)賦值給全局變量outcome
上蹲姐,然后修改state
狀態(tài)為NORMAL
磨取,最后調(diào)用finishCompletion()
來做掛起線程的喚醒操作,這個(gè)方法等到get()
后面再來講解柴墩。
FutureTask.get()實(shí)現(xiàn)原理
接著看下代碼:
public class FutureTask<V> implements RunnableFuture<V> {
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}
}
如果FutureTask
中state
為NORMAL
或者COMPLETING
忙厌,說明當(dāng)前任務(wù)并沒有執(zhí)行完成,調(diào)用get()
方法會(huì)被阻塞江咳,具體的阻塞邏輯在awaitDone()
方法:
private int awaitDone(boolean timed, long nanos) throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
for (;;) {
if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}
int s = state;
if (s > COMPLETING) {
if (q != null)
q.thread = null;
return s;
}
else if (s == COMPLETING)
Thread.yield();
else if (q == null)
q = new WaitNode();
else if (!queued)
queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q);
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
LockSupport.parkNanos(this, nanos);
}
else
LockSupport.park(this);
}
}
這個(gè)方法可以說是FutureTask
中最核心的方法了逢净,一步步來分析:
如果timed
不為空,這說明指定nanos
時(shí)間還未返回結(jié)果歼指,線程就會(huì)退出爹土。
q
是一個(gè)WaitNode
對(duì)象,是將當(dāng)前引用線程封裝在一個(gè)stack
數(shù)據(jù)結(jié)構(gòu)中踩身,WaitNode
對(duì)象屬性如下:
static final class WaitNode {
volatile Thread thread;
volatile WaitNode next;
WaitNode() { thread = Thread.currentThread(); }
}
接著判斷當(dāng)前線程是否中斷胀茵,如果中斷則拋出中斷異常。
下面就進(jìn)入一輪輪的if... else if...
判斷邏輯挟阻,我們還是采用分支的方式去分析琼娘。
分支一:if (s > COMPLETING) {
此時(shí)get()
方法已經(jīng)有結(jié)果了,無論是正常返回的結(jié)果附鸽,還是異常脱拼、中斷、取消等坷备,此時(shí)直接返回state
狀態(tài)熄浓,然后執(zhí)行report()
方法。
分支二:else if (s == COMPLETING)
條件成立击你,說明當(dāng)前任務(wù)接近完成狀態(tài)玉组,這里讓當(dāng)前線程再釋放cpu
,進(jìn)行下一輪搶占cpu
丁侄。
分支三:else if (q == null)
第一次自旋執(zhí)行惯雳,WaitNode
還沒有初始化,初始化q=new WaitNode();
分支四:else if (!queued){
queued
代表當(dāng)前線程是否入棧鸿摇,如果沒有入棧則進(jìn)行入棧操作石景,順便將全局變量waiters
指向棧頂元素。
分支五/六:LockSupport.park
如果設(shè)置了超時(shí)時(shí)間拙吉,則使用parkNanos
來掛起當(dāng)前線程潮孽,否則使用park()
經(jīng)過這么一輪自旋循環(huán)后,如果執(zhí)行call()
還沒有返回結(jié)果筷黔,那么調(diào)用get()
方法的線程都會(huì)被掛起往史。
被掛起的線程會(huì)等待run()
返回結(jié)果后依次喚醒,具體的執(zhí)行邏輯在finishCompletion()
中佛舱。
最終stack
結(jié)構(gòu)中數(shù)據(jù)如下:
FutureTask.finishCompletion()實(shí)現(xiàn)原理
具體實(shí)現(xiàn)代碼如下:
private void finishCompletion() {
for (WaitNode q; (q = waiters) != null;) {
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
LockSupport.unpark(t);
}
WaitNode next = q.next;
if (next == null)
break;
q.next = null;
q = next;
}
break;
}
}
done();
callable = null;
}
代碼實(shí)現(xiàn)很簡(jiǎn)單椎例,看過get()
方法后挨决,我們知道所有調(diào)用get()
方法的線程,在run()
還沒有返回結(jié)果前订歪,都會(huì)保存到一個(gè)有WaitNode
構(gòu)成的statck
數(shù)據(jù)結(jié)構(gòu)中脖祈,而且每個(gè)線程都會(huì)被掛起。
這里是遍歷waiters
棧頂元素刷晋,然后依次查詢起next
節(jié)點(diǎn)進(jìn)行喚醒盖高,喚醒后的節(jié)點(diǎn)接著會(huì)往后調(diào)用report()
方法。
FutureTask.report()實(shí)現(xiàn)原理
具體代碼如下:
private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL)
return (V)x;
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}
這個(gè)方法很簡(jiǎn)單眼虱,因?yàn)閳?zhí)行到了這里喻奥,說明當(dāng)前state
狀態(tài)肯定大于COMPLETING
,判斷如果是正常返回蒙幻,那么返回outcome
數(shù)據(jù)映凳。
如果state
是取消狀態(tài)胆筒,拋出CancellationException
異常邮破。
如果狀態(tài)都不滿足,則說明執(zhí)行中出現(xiàn)了差錯(cuò)仆救,直接拋出ExecutionException
FutureTask.cancel()實(shí)現(xiàn)原理
public boolean cancel(boolean mayInterruptIfRunning) {
if (!(state == NEW && UNSAFE.compareAndSwapInt(this, stateOffset, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try {
if (mayInterruptIfRunning) {
try {
Thread t = runner;
if (t != null)
t.interrupt();
} finally {
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
}
}
} finally {
finishCompletion();
}
return true;
}
cancel()
方法的邏輯很簡(jiǎn)單抒和,就是修改state
狀態(tài)為CANCELLED
,然后調(diào)用finishCompletion()
來喚醒等待的線程彤蔽。
這里如果mayInterruptIfRunning
摧莽,就會(huì)先中斷當(dāng)前線程,然后再去喚醒等待的線程顿痪。
總結(jié)
FutureTask
的實(shí)現(xiàn)原理其實(shí)很簡(jiǎn)單镊辕,每個(gè)方法基本上都畫了一個(gè)簡(jiǎn)單的流程圖來方便立即。
后面還打算分享一個(gè)BlockingQueue
相關(guān)的源碼解讀蚁袭,這樣線程池也可以算是完結(jié)了征懈。
在這之前可能會(huì)先分享一個(gè)SpringCloud
常見配置代碼分析、最佳實(shí)踐等手冊(cè)揩悄,方便工作中使用卖哎,也是對(duì)之前看過的源碼一種總結(jié)。敬請(qǐng)期待删性!
歡迎關(guān)注: