Future/FutureTask源碼分析

一. 關(guān)于Future/FutureTask

Future/FutureTask/ExecutorService相關(guān)類圖:


FutureTask.png

0x01: Future

下面是Future的Doc文檔

A {@code Future} represents the result of an asynchronous computation. Methods are provided to check if the computation is complete, to wait for its completion, and to retrieve the result of the computation. The result can only be retrieved using method {@code get} when the computation has completed, blocking if necessary until it is ready. Cancellation is performed by the {@code cancel} method. Additional methods are provided to determine if the task completed normally or was cancelled. Once a computation has completed, the computation cannot be cancelled. If you would like to use a {@code Future} for the sake of cancellability but not provide a usable result, you can declare types of the form {@code Future<?>} and return {@code null} as a result of the underlying task.

簡單的說, Future表示一個異步計算的結(jié)果. 異步計算是在其他線程進(jìn)行的, 因此異步計算的結(jié)果, 有可能有值, 也有可能沒有值. 于是, Future就提供了一些方法來處理這種未知狀態(tài):

a. isDone() 異步任務(wù)是否完成, 即否有結(jié)果
b. get() 獲取異步任務(wù)結(jié)果, 如果異步任務(wù)未完成, 此方法會一直阻塞, 直到異步方法完成 或 任務(wù)被取消 (調(diào)用了cancel()方法)
c. cancel() 取消異步任務(wù), 如果異步任務(wù)已經(jīng)完成, 那么取消失敗(即cancel()方法返回false)
d. isCancelled() 查詢異步任務(wù)是否已被取消

簡單用法:

interface ArchiveSearcher { 
    String search(String target); 
}

class App {

  ExecutorService executor = ...
  ArchiveSearcher searcher = ...

  void showSearch(final String target) throws InterruptedException {
    Future<String> future = executor.submit(new Callable<String>() {
        public String call() {
            return searcher.search(target);
        }});

    displayOtherThings(); // do other things while searching
    try {
      displayText(future.get()); // use future
    } catch (ExecutionException ex) { 
        cleanup(); 
        return; 
    }
  }

}

用Future主要是為了獲取異步計算的結(jié)果. 例如在Android中, 你會把網(wǎng)絡(luò)請求放在子線程中去執(zhí)行, 而請求的結(jié)果會拿到UI線程中來使用. 這時候就可以使用Future. 關(guān)鍵代碼如下:

interface NetworkService {
    ModelXxx requestXxx();
}

// 注意: 這是一個阻塞的方法, 不能在UI線程中直接調(diào)用
public ModelXxx xx() {
    final NetworkService networkService = ...
    ExecutorService executorService = ...

    Future<ModelXxx> future = executorService.submit(new Callable<ModelXxx>() {
        @Override
        public ModelXxx call() throws Exception {
            return networkService.requestXxx();
        }
    });

    ModelXxx modelXxx = future.get(); //此處可能阻塞
}

void bindDataToUi(ModelXxx model) {
    // do something with model ...
}

Future使用起來并不方便, 因為異步任務(wù)什么時完成我們并不知曉, 除非你用isDone()方法去查詢是否完成. 如果要異步任務(wù)完成后主動通知我們, 那么該如何做呢? 這個問題留到后面說.

0x02: FutureTask

下面是FutureTask的Doc文檔:

A cancellable asynchronous computation. This class provides a base implementation of {@link Future}, with methods to start and cancel a computation, query to see if the computation is complete, and retrieve the result of the computation. The result can only be retrieved when the computation has completed; the {@code get} methods will block if the computation has not yet completed. Once the computation has completed, the computation cannot be restarted or cancelled (unless the computation is invoked using {@link #runAndReset}).

a. 簡單來說: FutureTask就是一個可取消的異步任務(wù).
b. 把FutureTask的名字拆開來看, FutureTask是Future(異步任務(wù)結(jié)果)和Task(異步任務(wù))的集合. 因此, 我們可以直接把FutureTask扔給ExecutorService去執(zhí)行, 然后又可以獲取計算結(jié)果 (Future的特性FutureTask都有).
c. FutureTask的基本用法和步驟如下:
public void someBiz() {
        // 1. init services
        final String serviceUrl = ...
        final NetworkService networkService = ...
        final ExecutorService executorService = ...

        // 2. make task
        FutureTask<String> futureTask = new FutureTask<String>(new Callable<String>() {
            @Override
            public String call() throws Exception {
                return networkService.request(serviceUrl);
            }
        });
        
        // 3. submit task
        executorService.submit(futureTask);
        
        // 4. get result
        String result = futureTask.get();
        
        // 5. do something with result
        // ...
    }
d. 使用FutureTask執(zhí)行異步任務(wù)有個問題: 就是異步任務(wù)執(zhí)行完了并不會通知調(diào)用方. 但是, FutureTask已經(jīng)有支持異步任務(wù)執(zhí)行完畢就立刻通知調(diào)用方的基礎(chǔ). FutureTask有一個done() 方法, 此方法即是異步任務(wù)調(diào)用完畢的回調(diào)方法, 這個方法執(zhí)行時已經(jīng)可以獲取到結(jié)果數(shù)據(jù)了. 我們可以擴展FutureTask并重寫done()來支持異步任務(wù)執(zhí)行完成后的回調(diào)通知.

二. Future/FutureTask實現(xiàn)解析

a. Future的創(chuàng)建

先看看Future是如何產(chǎn)生的(java.util.concurrent.AbstractExecutorService類中):

public <T> Future<T> submit(Callable<T> task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task);
    execute(ftask);
    return ftask;
}

protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
    return new FutureTask<T>(callable);
}

當(dāng)我們提交一個異步任務(wù) (Callable<T>) 時, 會調(diào)用submit()方法, submit()方法內(nèi)部會調(diào)用newTask()方法創(chuàng)建一個FutureTask. 我們已經(jīng)知道, FutureTask是Future的實現(xiàn), 又是Runnable的實現(xiàn). 因此, 它既可以執(zhí)行又可以獲取結(jié)果.

b. FutureTask的執(zhí)行邏輯

創(chuàng)建FutureTask時, 會把我們提交的任務(wù) (Callbable)傳遞給FutureTask. 其實FutureTask執(zhí)行時, 會委托給傳進(jìn)來的Callable. 基本邏輯如下:

public class FutureTask<V> implements RunnableFuture<V> {
    // 用來存儲 "用戶提供的有實在業(yè)務(wù)邏輯的" 任務(wù)
    private Callable<V> callable;
    
    // 用來保存異步計算的結(jié)果
    private Object outcome;

    public FutureTask(Callable<V> callable) {
        if (callable == null) throw new NullPointerException();
        // 保存外部傳來的任務(wù), 待會在run()方法中調(diào)用
        this.callable = callable;
        this.state = NEW;       // ensure visibility of callable
    }

    public void run() {
        // 省略其他代碼 ...

        Callable<V> c = callable;

        //FutureTak的執(zhí)行邏輯委托給用戶提供的真正任務(wù)
        V result = c.call(); 

        // 設(shè)置異步任務(wù)結(jié)果
        set(result);
    }

    // 其他代碼省略 ...
}
c. FutureTask是如何保存計算結(jié)果的

可以看到, FutureTask實現(xiàn)了RunnableFuture接口, 此接口是Runnable接口和Future接口的結(jié)合. 自然FutureTask是要被當(dāng)成任務(wù)來在線程中執(zhí)行的( 線程內(nèi)部執(zhí)行的一般都是Runnable ). FutureTask內(nèi)部用了一個Object成員outcome來存儲異步任務(wù)的結(jié)果. run() 方法調(diào)用用戶傳過來的Callbable的call()方法并產(chǎn)生一個運算結(jié)果, 此時會調(diào)用set()方法來將計算結(jié)果存儲到成員outcome中. 下面就來看看FutureTask是如何將計算結(jié)果設(shè)置到outcome成員中的:

protected void set(V v) {
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        outcome = v;
        UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
        finishCompletion();
    }
}

非常簡單, 直接賦值就好了. 需要注意的是, 賦值之前有一個任務(wù)狀態(tài)的切換, 這個切換會同步. 因此設(shè)置完后, 其他線程就可以獲取到結(jié)果了.

d. 任務(wù)執(zhí)行完的收尾工作 (任務(wù)完成的回調(diào), 資源回收等)

我們注意到, 賦值后會調(diào)用一個finishCompletion()方法, , 那么就來看看此方法:

private void finishCompletion() {
        // 省略無關(guān)代碼 ... 
        
        done();
        callable = null;        // to reduce footprint
    }

其實就是做一些收尾工作, 將callbable置null等 (callable中可能會持有一些資源).
此方法里面還調(diào)用了done()方法, 此方法其實就是異步任務(wù)執(zhí)行完的回調(diào), 下面來看看此方法:

protected void done() { }

空的實現(xiàn)!! 且方法為protected, 這就是為擴展而存在的方法.
回到前面提的問題: **異步任務(wù)執(zhí)行完畢后如何主動通知調(diào)用者? **
其實我們只要擴展FutureTask, 再給擴展對象設(shè)置一個回調(diào)對象, 然后重寫done()方法, 在done()方法內(nèi)調(diào)用回調(diào)對象就可以了.
具體我們可以來看看Guava的com.google.common.util.concurrent.ListenableFutureTask:

public class ListenableFutureTask<V> extends FutureTask<V> implements ListenableFuture<V> {

  //存儲異步任務(wù)完成后的回調(diào)以及回調(diào)所在的Executor
  private final ExecutionList executionList = new ExecutionList(); 

  // 其他代碼省略 ...

  //添加異步任務(wù)完成后的回調(diào)和回調(diào)所在的Executor
  @Override
  public void addListener(Runnable listener, Executor exec) {
    executionList.add(listener, exec);
  }

  @Override
  protected void done() {
    // 異步任務(wù)完成后回調(diào)
    executionList.execute();
  }
}

需要注意的是FutureTaskdone()方法是在Worker線程中執(zhí)行的, 一般我們獲取結(jié)果是在其他線程, 因此需要把計算結(jié)果挪到指定的線程中去. 因此不僅需要指定任務(wù)完成的回調(diào), 還需要指定任務(wù)完成的回調(diào)所在的線程.

e. FutureTask計算結(jié)果同步問題

既然FutureTask是在Worker線程中執(zhí)行的, 那么其他線程獲取計算結(jié)果, 就會存在同步問題. 那么FutureTask是如何來同步計算結(jié)果的呢?

jdk1.8 FutureTask源碼中, 保存計算結(jié)果的成員變量聲明是這樣的:
private Object outcome; // non-volatile, protected by state reads/writes
可以看到后面的說明, 此變量是non-volatile的, 同步是用state字段的讀寫來保證的. 由于用了不公開的API, 邏輯也比較復(fù)雜, 具體就不多說了, 自己看源碼吧 --

三. 自定義FutureTask

a. 下面是對FutureTask的簡單擴展 (獲取網(wǎng)絡(luò)圖片)
import android.support.annotation.NonNull;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
import java.util.concurrent.atomic.AtomicReference;

public class ListenableFuture<V> extends FutureTask<V> {

    AtomicReference<Listener> mListenerRef;

    public ListenableFuture(@NonNull Callable<V> callable) {
        super(callable);
        mListenerRef = new AtomicReference<>();
    }

    public ListenableFuture(@NonNull Runnable runnable, V result) {
        super(runnable, result);
    }


    @Override
    protected void done() {
        final Listener<V> listener = mListenerRef.get();
        if(listener != null) {
            try {
                onSuccess(listener, get());
            } catch (InterruptedException e) {
                onFailed(listener, e);
            } catch (ExecutionException e) {
                onFailed(listener, e);
            }
            mListenerRef.set(null);
        }
    }

    private void onSuccess(final Listener<V> listener, final V result) {
        MainHandler.post(new Runnable() {
            @Override
            public void run() {
                listener.onSuccess(result);
            }
        });
    }

    private void onFailed(final Listener<V> listener, final Throwable e) {
        MainHandler.post(new Runnable() {
            @Override
            public void run() {
                listener.onFailed(e);
            }
        });
    }

    public void setListener(Listener<V> listener) {
        mListenerRef.compareAndSet(mListenerRef.get(), listener);
    }

    public interface Listener<V> {
        void onSuccess(V v);
        void onFailed(Throwable e);
    }
}

工具類MainHandler.java

import android.os.Handler;
import android.os.Looper;

public final class MainHandler {

    private static Handler mainHandler;

    private MainHandler() {
        //no instance
    }

    public static void post(Runnable task) {
        getMainHandler().post(task);
    }

    public static Handler getMainHandler() {
        if(mainHandler == null) {
            mainHandler = new Handler(Looper.getMainLooper());
        }
        return mainHandler;
    }
}

ListenableFuture的使用方法如下:

import com.stone.demo.R;

import android.Manifest;
import android.content.pm.PackageManager;
import android.graphics.Bitmap;
import android.graphics.BitmapFactory;
import android.os.Bundle;
import android.support.annotation.NonNull;
import android.support.annotation.Nullable;
import android.support.v4.app.ActivityCompat;
import android.support.v7.app.AppCompatActivity;
import android.view.View;
import android.widget.ImageView;
import android.widget.Toast;

import java.io.InputStream;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import okhttp3.Call;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;

public class MainActivity extends AppCompatActivity {
    ExecutorService service;

    @Override
    protected void onCreate(@Nullable Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_future_main);

        service = Executors.newFixedThreadPool(4);

        findViewById(R.id.btn).setOnClickListener(new View.OnClickListener() {
            @Override
            public void onClick(View v) {
                loadImage();
            }
        });
        checkPermission();
    }

    private void checkPermission() {
        int result = ActivityCompat.checkSelfPermission(this, Manifest.permission.WRITE_EXTERNAL_STORAGE);
        if(!(result == PackageManager.PERMISSION_GRANTED)) {
            ActivityCompat.requestPermissions(this, new String[] {Manifest.permission.WRITE_EXTERNAL_STORAGE}, 0x10);
        }
    }


    @Override
    public void onRequestPermissionsResult(int requestCode, @NonNull String[] permissions, @NonNull int[] grantResults) {
        if(requestCode == 0x10) {
            if(!(grantResults[0] == PackageManager.PERMISSION_GRANTED)) {
                Toast.makeText(this, "您拒絕了訪問磁盤", Toast.LENGTH_SHORT).show();
            }
        }
    }

    private void loadImage() {
        ListenableFuture<Bitmap> future = new ListenableFuture<Bitmap>(new Callable<Bitmap>() {
            @Override
            public Bitmap call() throws Exception {
                OkHttpClient client = new OkHttpClient();
                Request request = new Request.Builder().url("http://img06.tooopen.com/images/20161214/tooopen_sy_190570171299.jpg").build();
                Call call = client.newCall(request);
                Response response = call.execute();
                InputStream inputStream = null;
                if(response != null && response.body() != null && (inputStream = response.body().byteStream()) != null) {
                    Bitmap bitmap = BitmapFactory.decodeStream(inputStream);
                    inputStream.close();
                    return bitmap;
                }
                return null;
            }
        });

        future.setListener(new ListenableFuture.Listener<Bitmap>() {
            @Override
            public void onSuccess(Bitmap bitmap) {
                if(bitmap != null) {
                    ImageView.class.cast(findViewById(R.id.img)).setImageBitmap(bitmap);
                } else {
                    Toast.makeText(MainActivity.this, "圖片加載失敗", Toast.LENGTH_SHORT).show();
                }
            }

            @Override
            public void onFailed(Throwable e) {
                Toast.makeText(MainActivity.this, "圖片加載失敗", Toast.LENGTH_SHORT).show();
            }
        });

        service.submit(future);
    }
}

布局文件非常簡單, 上面一個Button, 下面一個ImageView, 就不貼出來了, 效果圖如下:

screen.png

如有錯誤之處, 歡迎指正 ~~

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市句惯,隨后出現(xiàn)的幾起案子耿币,更是在濱河造成了極大的恐慌嵌屎,老刑警劉巖,帶你破解...
    沈念sama閱讀 211,884評論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件晰甚,死亡現(xiàn)場離奇詭異娇未,居然都是意外死亡徽缚,警方通過查閱死者的電腦和手機贩汉,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,347評論 3 385
  • 文/潘曉璐 我一進(jìn)店門驱富,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人匹舞,你說我怎么就攤上這事褐鸥。” “怎么了赐稽?”我有些...
    開封第一講書人閱讀 157,435評論 0 348
  • 文/不壞的土叔 我叫張陵叫榕,是天一觀的道長。 經(jīng)常有香客問我姊舵,道長晰绎,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 56,509評論 1 284
  • 正文 為了忘掉前任括丁,我火速辦了婚禮荞下,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘史飞。我一直安慰自己尖昏,他們只是感情好,可當(dāng)我...
    茶點故事閱讀 65,611評論 6 386
  • 文/花漫 我一把揭開白布构资。 她就那樣靜靜地躺著抽诉,像睡著了一般。 火紅的嫁衣襯著肌膚如雪蚯窥。 梳的紋絲不亂的頭發(fā)上掸鹅,一...
    開封第一講書人閱讀 49,837評論 1 290
  • 那天塞帐,我揣著相機與錄音拦赠,去河邊找鬼。 笑死葵姥,一個胖子當(dāng)著我的面吹牛荷鼠,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播榔幸,決...
    沈念sama閱讀 38,987評論 3 408
  • 文/蒼蘭香墨 我猛地睜開眼允乐,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了削咆?” 一聲冷哼從身側(cè)響起牍疏,我...
    開封第一講書人閱讀 37,730評論 0 267
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎拨齐,沒想到半個月后鳞陨,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,194評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡瞻惋,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,525評論 2 327
  • 正文 我和宋清朗相戀三年厦滤,在試婚紗的時候發(fā)現(xiàn)自己被綠了援岩。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 38,664評論 1 340
  • 序言:一個原本活蹦亂跳的男人離奇死亡掏导,死狀恐怖享怀,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情趟咆,我是刑警寧澤添瓷,帶...
    沈念sama閱讀 34,334評論 4 330
  • 正文 年R本政府宣布,位于F島的核電站值纱,受9級特大地震影響仰坦,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜计雌,卻給世界環(huán)境...
    茶點故事閱讀 39,944評論 3 313
  • 文/蒙蒙 一悄晃、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧凿滤,春花似錦妈橄、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,764評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至反番,卻和暖如春沙热,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背罢缸。 一陣腳步聲響...
    開封第一講書人閱讀 31,997評論 1 266
  • 我被黑心中介騙來泰國打工篙贸, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人枫疆。 一個月前我還...
    沈念sama閱讀 46,389評論 2 360
  • 正文 我出身青樓爵川,卻偏偏與公主長得像,于是被迫代替她去往敵國和親息楔。 傳聞我的和親對象是個殘疾皇子寝贡,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 43,554評論 2 349

推薦閱讀更多精彩內(nèi)容