Java多線程編程簡明教程
緣起
關(guān)于多線程編程的教程汗牛充棟了,比如阿里集團內(nèi)部就有一粟、高鐵等大牛的講座,更不用說有清英同學專門創(chuàng)建了并發(fā)編程網(wǎng)站來專注于這件事情。專門講Java并發(fā)開發(fā)的書籍也是相當豐富了播瞳。
我們舉個例子,典型的一本Java多線程開發(fā)的教材是這樣寫的免糕,上來就是介紹如何創(chuàng)建線程赢乓,然后再講線程安全,線程之間如何做同步和通信石窑,接著才是線程池和執(zhí)行器牌芋,最后是線程安全的數(shù)據(jù)結(jié)構(gòu)。
這樣寫當然從技術(shù)上講是沒問題的松逊,不過問題在于躺屁,門檻太高了。假如讀者的時間短经宏,只看完創(chuàng)建線程這一章就開始照貓畫虎寫多線程的代碼犀暑,于是,多線程競態(tài)的問題在那里等著呢烁兰。假如他多再多看一些耐亏,學會了線程的同步互斥,而且也解決了競態(tài)和死鎖的問題沪斟,他仍然不會想到要使用線程池广辰,在使用容器的時候也不免會遇到一些坑。
但是主之,真的要經(jīng)過這么專業(yè)的訓練才可以寫一個簡單的多任務的么择吊?比如對于很多任務來說,只是需要靜靜地在后臺去執(zhí)行一個任務而己槽奕,在運行過程中干发,并不需要同步訪問共享,最后只是返回一個結(jié)果就好了史翘。對于一些更復雜的問題,可以采用分治法處理冀续,規(guī)劃得當?shù)脑挷⒉粫霈F(xiàn)訪問沖突琼讽。對于前面說的第一種情況,使用JDK 5引入的Future模式已經(jīng)足夠了洪唐,而對于后一種的情況钻蹬,使用JDK 7中引入的Fork-Join框架就可以很好的解決。這兩種模式都是線程安全的凭需,根本不共享狀態(tài)有什么不安全的呢问欠。而學會了這兩個模式之后肝匆,習慣成自然地,將來再學習線程也會習慣性地使用線程池顺献,不會引入創(chuàng)建和銷毀大量線程的消耗旗国。
學會了這兩個模式之后,我們進一步再學習如何避免共享狀態(tài)注整,如何封閉狀態(tài)能曾。如何通過使用安全的數(shù)據(jù)結(jié)構(gòu)進行通信去共享狀態(tài)。
如果以上都實在解決不了問題肿轨,最后再老老實實地學習Java的內(nèi)存模型寿冕,再去實現(xiàn)傳統(tǒng)上一開始就講的創(chuàng)建線程這一套機制。
按照傳統(tǒng)的思路椒袍,講到創(chuàng)建線程的時候驼唱,初學者對于Java內(nèi)存模型,對于在多線程情況下的數(shù)據(jù)結(jié)構(gòu)驹暑,對于線程池這些完全沒有概念玫恳,一切都按照單任務的經(jīng)驗來寫代碼,于是造成了很多問題而不知岗钩。而現(xiàn)在是在不滿足前面的常用模式的情況下才選擇這條路纽窟,讀者已經(jīng)非常清楚他在做什么,要承擔什么樣的風險兼吓,以及如何降低這個風險臂港。
最后,他山之石视搏,可以攻玉审孽。學習借鑒其他語言和模型的成功經(jīng)驗和教訓,可能比只懂線程的同步和互斥有更廣的思路浑娜。
很多書籍的編排已經(jīng)透出了不少新意佑力,比如方騰飛大牛的《Java并發(fā)編程的藝術(shù)》,在講線程之前筋遭,先講一章龐大的Java內(nèi)存模型打颤。這樣在學習線程的時候,對于可見不可見漓滔,執(zhí)行順序等等已經(jīng)有很清楚的概念了编饺。稍嫌不足的是容器出場太晚,如果沒看到這章就開始寫的話响驴,對于獨立變量應該是沒問題了透且,一旦跟容器打交道了,還容易吃虧豁鲤,我就吃過這樣的虧秽誊。而《Java Concurrency in Practice》這點做得就更好一些鲸沮。
我的教程希望能夠,不管在哪一部分看完之后暫時中斷了锅论,學到的體系也是相對完整的讼溺,力爭避免會了線程,但是寫出一堆問題的代碼棍厌。哪怕學習了如何危險還不知道如何寫肾胯,也比不知道危險要好,至少知道要學完才能用耘纱。
Future模式
Future模式5分鐘教程
Future模式用于的場景是敬肚,不急于馬上就拿到結(jié)果的任務,可以放到后臺先做著束析,然后主線程繼續(xù)忙別的去艳馒。等主線程需要用后臺任務的結(jié)果了,再去從Future去拿結(jié)果员寇。如果Future任務已經(jīng)做完了弄慰,當然皆大歡喜,馬上使用結(jié)果蝶锋,代碼邏輯繼續(xù)往下跑陆爽;如果還沒做完,至少等待的時間比一直干等著強扳缕』疟眨總之,穩(wěn)賺不賠躯舔,何樂而不為驴剔。如果這時候那個結(jié)果已經(jīng)不重要了,取消了也沒問題粥庄。
這里面沒有競態(tài)丧失,沒有共享哪來的競態(tài)。同樣惜互,不需要懂鎖是什么東西布讹。
使用Future模式需要4個步驟:
- 構(gòu)造一個Callable接口的實現(xiàn),在其中寫在后臺要實現(xiàn)的邏輯
- 構(gòu)造一個線程池執(zhí)行器训堆,提交執(zhí)行
- 主線程繼續(xù)忙自己的
- 想要用這個Future的時候炒事,調(diào)用FutureTask對象的get()方法去獲取值
我們舉個簡單的例子來看一下:
public class AsyncTaskSimple {
public static class Result implements Callable<String>{
@Override
public String call() throws Exception {
return doRealLogic();
}
private String doRealLogic(){
//Here to do the background logic
return new String("Done");
}
}
public static void main(String[] args) {
FutureTask<String> future = new FutureTask<String>(new Result());
ExecutorService executor = Executors.newFixedThreadPool(1);
executor.submit(future);
someThingToDo();
try {
String s = future.get();
System.out.println("The result is:"+s);
}catch (InterruptedException e){
//Deal with InterruptedExcpeiotn
}catch(ExecutionException ee){
//Deal with ExecutionException
}
}
private static void someThingToDo(){
//Main thread logic
}
}
從上面的例子代碼可以看到,都是填空題蔫慧,邏輯很簡單:
- 需要的返回值是什么類型,就實現(xiàn)Callable<類型>接口权薯,然后在call()方法里實現(xiàn)后臺邏輯姑躲。
- 用Callable接口的實現(xiàn)類去構(gòu)造一個FutureTask對象睡扬。
- 構(gòu)建ExecutorService對象,決定用哪種線程池黍析。
- 提交任務卖怜。
- 讓后臺任務在空中飛一會兒,前臺該干嘛干嘛阐枣。
- 前臺的事兒忙完了马靠,去讀取后臺的結(jié)果。
就這么簡單蔼两,線程是什么甩鳄?鎖是什么?如何防止競態(tài)和死鎖额划,這些統(tǒng)統(tǒng)用不到妙啃。
線程池
關(guān)于線程池,我們只講三種最基本的俊戳,其余的細節(jié)后面詳細講揖赴。
這三種是:只有一個線程,固定數(shù)目的抑胎,按需分配的.
分別對應了Executors類的三個靜態(tài)方法:
- public static ExecutorService newSingleThreadExecutor ():就建一個燥滑,大家排隊按順序來
- public static ExecutorService newFixedThreadPool (int nThreads):定義固定nThreads個大小的線程池。
- public static ExecutorService newCachedThreadPool ():最省事了阿逃,完全由系統(tǒng)自己管理铭拧。能重用就重用,不能重用就建新的線程盆昙。
好羽历,F(xiàn)uture模式就講完了,大家可以上手寫代碼了淡喜。無鎖秕磷、無競態(tài)、使用了線程池炼团。從創(chuàng)建線程開始學的同學們還要學習Java內(nèi)存模型澎嚣,如何同步,如何互斥瘟芝,如何避免死鎖等等易桃,這些我們都暫時不用學,也不需要線程安全的數(shù)據(jù)結(jié)構(gòu)锌俱,因為傳進去的參數(shù)和返回的結(jié)果都是只讀的晤郑,沒有多線程去搶著寫它們。
揭開Android AsyncTask的面紗
下面我們說說Android中叫做AsyncTask的東西,其實就是在Future上做的一個簡要封裝造寝。
我們一起看下磕洪,有Android經(jīng)驗的同學正好借已有的經(jīng)驗來加深一下印象。
定義
public abstract class AsyncTask<Params, Progress, Result> {
這個類有三個泛型參數(shù)诫龙,Params, Progress, Result析显,這里先記住,我們后面會看到它們各起什么作用签赃。
構(gòu)造方法
我們把無關(guān)的代碼省略掉谷异,核心邏輯一共就兩句話,一句是調(diào)用子類的doInBackground方法锦聊,一句是調(diào)用postResult方法將返回值返回歹嘹。
288 public AsyncTask() {
289 mWorker = new WorkerRunnable<Params, Result>() {
290 public Result call() throws Exception {
...
295 Result result = doInBackground(mParams);
...
297 return postResult(result);
298 }
299 };
這個WorkerRunnable是實現(xiàn)了Callable<Result>的一個抽象類.
private static abstract class WorkerRunnable<Params, Result> implements Callable<Result> {
Params[] mParams;
}
通過這一步,完成了我們前面學習的Callable接口對象的創(chuàng)建括丁。按照我們前面學習的荞下,下一步該構(gòu)造FutureTask對象了,AsyncTask的構(gòu)造方法就是這么做的史飞!用剛才生成的mWorker對象去構(gòu)造mFuture對象尖昏。
這里與前面所講的有一點不同的是,前面我們是處理完了由主線程主動查詢Future是否結(jié)束构资,而AsyncTask是在任務結(jié)束之后抽诉,也就是重載done()方法,在這里面再去調(diào)用get()方法去獲取Future的值吐绵,再將其主動發(fā)出去迹淌。
301 mFuture = new FutureTask<Result>(mWorker) {
302 @Override
303 protected void done() {
304 try {
305 postResultIfNotInvoked(get());
306 } catch (InterruptedException e) {
307 android.util.Log.w(LOG_TAG, e);
308 } catch (ExecutionException e) {
309 throw new RuntimeException("An error occurred while executing doInBackground()",
310 e.getCause());
311 } catch (CancellationException e) {
312 postResultIfNotInvoked(null);
313 }
314 }
315 };
316 }
上面的調(diào)用的get()方法一點新意也沒有,就是mFuture.get()的簡單封裝己单,我們看代碼:
497 public final Result get() throws InterruptedException, ExecutionException {
498 return mFuture.get();
499 }
只有到了將結(jié)果通知出來這個過程是跟Android相關(guān)的唉窃,用到了Android的消息隊列。
318 private void postResultIfNotInvoked(Result result) {
319 final boolean wasTaskInvoked = mTaskInvoked.get();
320 if (!wasTaskInvoked) {
321 postResult(result);
322 }
323 }
324
325 private Result postResult(Result result) {
326 @SuppressWarnings("unchecked")
327 Message message = getHandler().obtainMessage(MESSAGE_POST_RESULT,
328 new AsyncTaskResult<Result>(this, result));
329 message.sendToTarget();
330 return result;
331 }
executeOnExecutor
我們再看看mFuture是何時被執(zhí)行的纹笼,一句話纹份,是在executeOnExecutor中。這個方法是在主線程中運行的廷痘,所以可以先安全地運行onPreExecute()蔓涧,而onPostExecute()則要是在Handler里面處理了。
587 @MainThread
588 public final AsyncTask<Params, Progress, Result> executeOnExecutor(Executor exec,
589 Params... params) {
...
604 onPreExecute();
605
606 mWorker.mParams = params;
607 exec.execute(mFuture);
608
609 return this;
610 }
onPostExecute和通知進度
這個都是在Handler中實現(xiàn)的了笋额,結(jié)束了就onPostExecute元暴,還沒完就通知下進度。
656 private static class InternalHandler extends Handler {
657 public InternalHandler() {
658 super(Looper.getMainLooper());
659 }
660
661 @SuppressWarnings({"unchecked", "RawUseOfParameterizedType"})
662 @Override
663 public void handleMessage(Message msg) {
664 AsyncTaskResult<?> result = (AsyncTaskResult<?>) msg.obj;
665 switch (msg.what) {
666 case MESSAGE_POST_RESULT:
667 // There is only one result
668 result.mTask.finish(result.mData[0]);
669 break;
670 case MESSAGE_POST_PROGRESS:
671 result.mTask.onProgressUpdate(result.mData);
672 break;
673 }
674 }
675 }
finish方法根據(jù)是否cancel決定調(diào)用哪一個兄猩,正常情況下是onPostExecute
647 private void finish(Result result) {
648 if (isCancelled()) {
649 onCancelled(result);
650 } else {
651 onPostExecute(result);
652 }
653 mStatus = Status.FINISHED;
654 }
Fork-Join模式
說起Fork-Join模式,我們不免聯(lián)想起了Map-Reduce.它們的原理都是分治法,就是將一個大問題劃分成若干個小問題,如果這些小問題之間互相不影響的話,就可以并發(fā)去執(zhí)行. 最后,統(tǒng)一將各小問題的結(jié)果匯總起來,就是這個大問題的結(jié)果.
這個任務最適合處理像一棵樹一樣的問題.
ForkJoinPool
Fork-Join模式不再是只管一個后臺作務,而是有多個任務并發(fā)執(zhí)行. 這時我們前面學到的簡單的線程池執(zhí)行器的功能就顯得不足了.這時候JDK 7開始為我們提供了ForkJoinPool.
ForkJoinPool不但自動計算開多大的線程池合適,而且提供了稱為工作竊取算法的算法來管理這些任務. 如果有的線程空閑, ForkJoinPool會從其它線程的隊列尾中竊取一個任務給空閑線程來運行.而正常的線程是從任務隊列頭中取任務,二者不會有沖突.
RecusiveTask
如同F(xiàn)utureTask一樣,Fork-Join模式也有自己的Task類ForkJoinTask. 不過一般我們都是從ForkJoinTask的子類RecursiveTask來繼承. 通過重載RecursiveTask類的compute方法,來實現(xiàn)Fork-Join的邏輯.
在compute方法里, 要實現(xiàn)兩件事, 顧名思義, Fork-Join就是要先fork出RecursiveTask對象的子任務,然后將它們join在一起.
Fork-Join模式10分鐘速成教程
我們先寫個copy二叉樹結(jié)構(gòu)的簡單任務學習一下如何利用Fork-Join框架來實現(xiàn)功能.
先實現(xiàn)一個最簡單的二叉樹節(jié)點,帶左右孩子,一個字符串吧:
public class BinaryTree {
public static class Node{
public Node leftChild;
public Node rightChild;
public String content;
public Node(String ct){
content = ct;
}
}
然后實現(xiàn)一個RecursiveTask的子類,重載它的compute方法.
public static class NodeCopyTask extends RecursiveTask<Node>{
Node mNode;
public NodeCopyTask(Node node){
mNode = node;
}
@Override
protected Node compute() {
if(mNode==null)
return null;
下面我們開始實現(xiàn)分叉, 對于左右子樹分別fork出一個子任務. 這兩個子任務又會分叉出它的的子任務,直至結(jié)束.
NodeCopyTask taskLeft = new NodeCopyTask(mNode.leftChild);
taskLeft.fork();
NodeCopyTask taskRight = new NodeCopyTask(mNode.rightChild);
taskRight.fork();
fork之后, 任務就在后臺開始運行了. 這時候我們開始構(gòu)造我們的左右子樹的父節(jié)點:
Node node = new Node(mNode.content);
實際問題中一般不會這么簡單.主線任務完成了之后,就是等待子任務交活兒,將它們組裝在一起:
node.leftChild = taskLeft.join();
node.rightChild = taskRight.join();
return node;
}
}
核心功能實現(xiàn)完了,下面我們寫個主函數(shù)讓它運行起來吧. 先構(gòu)造一個被復制的對象.
public static void main(String[] args){
Node node = new Node("Hello,Fork-Join");
node.leftChild = new Node("Left");
node.rightChild = new Node("Right");
下面我們前面介紹的主角之一 - ForkJoinPool粉墨登場. 沒什么復雜的設置,直接new一個就好:
ForkJoinPool forkJoinPool = new ForkJoinPool();
ForkJoinPool有了之后, 再創(chuàng)建一個我們的RecursiveTask的對象, 然后調(diào)用ForkJoinPool的submit方法將其提交, 這又是一個Future模式了. 最后我們通過這個FutureTask的get方法獲取結(jié)果就一切OK了.
NodeCopyTask task = new NodeCopyTask(node);
Future<Node> future = forkJoinPool.submit(task);
try {
Node node2 = future.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}
整理一下上面的步驟:
- 實現(xiàn)一個RecursiveTask的子類,重載compute方法實現(xiàn)fork-join邏輯
- 合理劃分任務,調(diào)用遞歸的RecursiveTask子類,fork出每個子任務
- 通過join方法獲取子任務的值,并將它們組合到一起
- 構(gòu)造ForkJoinPool線程池
- 創(chuàng)建第一步的子類的對象,通過Future模式,提交到ForkJoinPool線程中運行
- 獲取Future的值,即可得到Fork-Join的結(jié)果.
總結(jié)一下,把剛才拆散的代碼整合在一起:
public class BinaryTree {
public static class Node{
public Node leftChild;
public Node rightChild;
public String content;
public Node(String ct){
content = ct;
}
}
public static class NodeCopyTask extends RecursiveTask<Node>{
Node mNode;
public NodeCopyTask(Node node){
mNode = node;
}
@Override
protected Node compute() {
if(mNode==null)
return null;
NodeCopyTask taskLeft = new NodeCopyTask(mNode.leftChild);
taskLeft.fork();
NodeCopyTask taskRight = new NodeCopyTask(mNode.rightChild);
taskRight.fork();
Node node = new Node(mNode.content);
node.leftChild = taskLeft.join();
node.rightChild = taskRight.join();
return node;
}
}
public static void main(String[] args){
//TODO: construct a real tree
Node node = new Node("Hello,Fork-Join");
node.leftChild = new Node("Left");
node.rightChild = new Node("Right");
ForkJoinPool forkJoinPool = new ForkJoinPool();
NodeCopyTask task = new NodeCopyTask(node);
Future<Node> future = forkJoinPool.submit(task);
try {
Node nodeNew = future.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
注意事項
- 現(xiàn)在這個階段,暫時先不要共享內(nèi)存,通過復制成不變的對象傳遞給子任務.返回值也創(chuàng)建新對象,當然可以使用對象池等技術(shù).
- 暫時不要使用跨任務的容器,我們還沒有經(jīng)過相關(guān)的訓練,時機還不成熟.
- 暫時不要使用其他的同步機制,我們的知識儲備暫時還不夠.
- 要注意任務中的異常會接收不到,一定在任務中處理好可能出現(xiàn)的異常. 否則發(fā)生了異常,在主任務中卻收不到,會感到很奇怪.
- 注意I/O操作,建議目前階段在Fork-Join之前將I/O操作提前做好.
盡管有一些限制,但是Fork-Join框架還是給我們帶來了很大的便利. 按照Fork-Join設計好的代碼,在將來計算核數(shù)增加時,會自動給我們的代碼獲得性能提高.
另外需要說明的一點是,如果用并行花得代價大的話, 可以先做一個判斷, 在這樣的情況下保持串行.
不變模式
在結(jié)束這個快餐教程之前,我們得再次強調(diào)一下內(nèi)存共享的風險. 請初學的同學們一定要重視起來.目前我們還沒有學習Java對象模型和容器的安全用法, 所以目前階段最安全的就是不共享任何狀態(tài).
只讀的對象是不會引起線程安全問題的.我們所有的跨任務的數(shù)據(jù)傳遞,暫時都只傳遞不變的對象.
這樣的限制可能會帶來一些不便和一些性能損失.但是,它是線程安全的,對于開發(fā)人員是種投入小見效快的好事情. 如果暫時還不能滿意你的需求,我們會繼續(xù)學習,從此開始,沒有快餐式的速成教程了,我們要經(jīng)過一段非常扎實的訓練.
另一個例子
下面我們看下官方的例子, 通過這個例子我們想說明如果當前線程無事可做, 可以fork一部分, 在當前任務中執(zhí)行另一部分. 如下面所示, 它將一部分fork在f1去執(zhí)行, 另外一部分f2在當前任務中執(zhí)行.
class Fibonacci extends RecursiveTask {
final int n;
Fibonacci(int n) { this.n = n;
Integer compute() {
if (n <= 1)
return n;
Fibonacci f1 = new Fibonacci(n - 1);
f1.fork();
Fibonacci f2 = new Fibonacci(n - 2);
return f2.compute() + f1.join();
}
}}
Android的特別注意事項
請大家注意,Java中的Fork-Join并沒有辦法處理Android的UI線程等問題, 如果需要運行在UI線程, 區(qū)分主線程和工作線程等, 還請參考上節(jié)我們分析AsyncTask中的做法, 該使用Handler的還是要用Handler. 后面我們還會詳情說細節(jié).