線(xiàn)程池及線(xiàn)程調(diào)度

背景

文章通過(guò)接口層表象來(lái)實(shí)現(xiàn)一個(gè)簡(jiǎn)版且穩(wěn)定的線(xiàn)程調(diào)度庫(kù)刀荒,給予一個(gè)臺(tái)階首懈,當(dāng)你讀完文章的末尾看成,希望你有一探RxJava欲望與信心华畏。

目標(biāo)

  1. TaskScheduler.executeMain(...); //主線(xiàn)程, 執(zhí)行任務(wù)
  2. TaskScheduler.executeTask(...); //子線(xiàn)程, 線(xiàn)程池執(zhí)行任務(wù)
  3. TaskScheduler.executeSingle(...); //子線(xiàn)程, 單線(xiàn)程執(zhí)行任務(wù)
  4. TaskScheduler.create(...); //任務(wù)調(diào)度

項(xiàng)目

設(shè)計(jì)

  1. .func(...).func(...).func(...)...順序流執(zhí)行
  2. .observeOn(...)線(xiàn)程切換

效果圖

        TaskScheduler.create(new Task<List<String>>() {
            @Override
            public List<String> run() {
                ...do something in io thread
                return new ArrayList<>();
            }
        }).subscribeOn(Schedulers.io())
                .observeOn(Schedulers.newThread())
                .map(new Function<List<String>, String>() {
                    @Override
                    public String apply(@NonNull List<String> strings) throws Exception {
                        ...do something in new thread, such as time-consuming, map conversion, etc.
                        return "";
                    }
                })
                .observeOn(Schedulers.io())
                .map(new Function<String, Boolean>() {
                    @Override
                    public Boolean apply(@NonNull String s) throws Exception {
                        ...do something in io thread, such as time-consuming, map conversion, etc.
                        return true;
                    }
                })
                ...
                .observeOn(Schedulers.mainThread())
                .subscribe(new Observer<Boolean>() {
                    @Override
                    public void onNext(@NonNull Boolean result) {
                        ...do something in main thread
                    }

                    @Override
                    public void onError(Throwable e) {
                        ...do something in main thread
                    }
                });

分析

  1. 線(xiàn)程
  2. 線(xiàn)程切換
  3. 任務(wù)調(diào)度

1. 線(xiàn)程

public class TaskManager {
    private static TaskManager ins;

    private Handler mainHandler;
    private ExecutorService cachedThreadPool;
    private ExecutorService singleThreadExecutor;

    private TaskManager() {
        mainHandler = new Handler(Looper.getMainLooper());
        cachedThreadPool = Executors.newCachedThreadPool();
        singleThreadExecutor = Executors.newSingleThreadExecutor();
    }

    static TaskManager getIns() {
        if (ins == null) {
            synchronized (TaskManager.class) {
                if (ins == null) {
                    ins = new TaskManager();
                }
            }
        }
        return ins;
    }

    /**
     * Execute sync task in main thread
     */
    void executeMain(Runnable runnable) { mainHandler.post(runnable); }

    /**
     * Execute async task in cached thread pool
     */
    void executeTask(Runnable runnable) { cachedThreadPool.execute(runnable); }

    /**
     * Execute async task in single thread pool
     */
    void executeSingle(Runnable runnable) { singleThreadExecutor.execute(runnable); }

    /**
     * Execute async task in a new thread
     */
    void executeNew(Runnable runnable) { new Thread(runnable).start(); }
}

線(xiàn)程切換的方法:拋runnable到相應(yīng)線(xiàn)程鹏秋,由線(xiàn)程來(lái)調(diào)度執(zhí)行runnable,runnable中的方法即在相應(yīng)線(xiàn)程中執(zhí)行亡笑。
如無(wú)這樣的顯式切換線(xiàn)程侣夷,代碼流(無(wú)論多少次方法遞歸調(diào)用)將在當(dāng)前線(xiàn)程一直執(zhí)行下去。同一線(xiàn)程仑乌,代碼總是順序的執(zhí)行百拓。

Log.d("Current Thread", Thread.currentThread().getId() + "--NAME--" + Thread.currentThread().getName());

通過(guò)這行代碼可以打印出當(dāng)前在那一個(gè)線(xiàn)程琴锭。主線(xiàn)程的getName是main。

        new Thread(() -> {
                // Code block 1
                Log.d("Current Thread", Thread.currentThread().getId() + "--NAME--" + Thread.currentThread().getName());
                ...
                new Handler(Looper.getMainLooper()).post(new Runnable() {
                    @Override
                    public void run() {
                        // Code block 2
                        Log.d("Current Thread", Thread.currentThread().getId() + "--NAME--" + Thread.currentThread().getName());
                        ...
                    }
                });
        }).start();

這是一個(gè)通常的代碼形式
Code block 1處在一個(gè)子線(xiàn)程中執(zhí)行代碼耐版,通過(guò)new Handler(Looper.getMainLooper()).post(...)向主線(xiàn)程拋入一個(gè)runnable祠够,runnable進(jìn)入主線(xiàn)程消息隊(duì)列压汪,然后等主線(xiàn)程消息隊(duì)列取出該runnable執(zhí)行時(shí)粪牲,Code line 2處代碼即在主線(xiàn)程中執(zhí)行。
Code block 1與Code block 2在時(shí)間上并行執(zhí)行止剖。線(xiàn)程池同理腺阳。

public class TaskScheduler<T> {
    public static void executeMain(Runnable runnable) { TaskManager.getIns().executeMain(runnable); }

    public static void executeTask(Runnable runnable) { TaskManager.getIns().executeTask(runnable); }

    public static void executeSingle(Runnable runnable) { TaskManager.getIns().executeSingle(runnable); }

    ...
}

通過(guò)單例簡(jiǎn)單包裝,實(shí)現(xiàn)目標(biāo)1穿香、2亭引、3

2. 線(xiàn)程切換

    /**
     * Switch thread
     * scheduler 線(xiàn)程枚舉,int類(lèi)型: defaultThread皮获、newThread焙蚓、io、mainThread
     */
    public static void switchThread(@Scheduler int scheduler, final Runnable runnable) {
        if (scheduler == NEW_THREAD) {
            new Thread(() -> {
                    if (runnable != null) {
                        runnable.run();
                    }
            }).start();
            return;
        } else if (scheduler == IO) {
            TaskScheduler.executeTask(() -> {
                    if (runnable != null) {
                        runnable.run();
                    }
            });
            return;
        } else if (scheduler == MAIN_THREAD) {
            if (!isMainThread()) {
                TaskScheduler.executeMain(() -> {
                        if (runnable != null) {
                            runnable.run();
                        }
                });
                return;
            }
        }
        if (runnable != null) {
            runnable.run();
        }
    }

    public static boolean isMainThread() {
        return Looper.getMainLooper().getThread() == Thread.currentThread();
    }

3. 任務(wù)調(diào)度

3.1 開(kāi)始前的準(zhǔn)備

我們先來(lái)定義3個(gè)接口

interface.png

然后是2個(gè)對(duì)應(yīng)的包裝類(lèi)洒宝,后面會(huì)用到

Task -> TaskEmitter
Function -> FunctionEmitter

public class Emitter {
    public int scheduler;
}
public class TaskEmitter<T> extends Emitter {
    public Task<T> task;

    public TaskEmitter(Task<T> task, @Schedulers.Scheduler int scheduler) {
        this.task = task;
        this.scheduler = scheduler;
    }
}
public class FunctionEmitter<T, R> extends Emitter {
    public Function<? super T, ? extends R> function;

    public FunctionEmitter(Function<? super T, ? extends R> function, @Schedulers.Scheduler int scheduler) {
        this.function = function;
        this.scheduler = scheduler;
    }
}

3.2 Create

開(kāi)始前购公,我們知道一些開(kāi)源庫(kù)如Glide,慣用.with(...)形式雁歌,這種方式實(shí)質(zhì):靜態(tài)方法 + return new Instance()宏浩,
這里我們也用這種模式來(lái)開(kāi)始create(...)。

實(shí)現(xiàn)分三步走

Step 1: Create

    public static <T> TaskScheduler<T> create(final Task<T> task) {
        TaskScheduler<T> schedulers = new TaskScheduler<T>();
        schedulers.task = task;
        return schedulers;
    }

創(chuàng)建TaskScheduler實(shí)例靠瞎,持有 源任務(wù)task

    public TaskObserve<T> subscribeOn(@Schedulers.Scheduler int scheduler) {
        this.subscribeScheduler = scheduler;
        return new TaskObserve<T>(new TaskEmitter<T>(task, subscribeScheduler));
    }

指定 源任務(wù)task 執(zhí)行所在線(xiàn)程比庄,丟棄當(dāng)前TaskScheduler實(shí)例。
源任務(wù)task乏盐、 線(xiàn)程枚舉 注入TaskEmitter后佳窑,返回新的實(shí)例TaskObserve,后續(xù)邏輯全由TaskObserve處理

Step 2: TaskObserve中間件

public static class TaskObserve<T> {
        private TaskEmitter taskEmitter;
        private List<FunctionEmitter> emitters;
        private int observeOnScheduler = Schedulers.defaultThread();

        TaskObserve(TaskEmitter<T> taskEmitter) {
            this.taskEmitter = taskEmitter;
            this.emitters = new ArrayList<>();
        }

        ...
}

TaskObserve: 中間件父能,初始和map轉(zhuǎn)換時(shí)生成华嘹,包含以下成員
taskEmitter: 源任務(wù)
emitters: 轉(zhuǎn)換隊(duì)列,map轉(zhuǎn)換時(shí)遞增
observeOnScheduler: 線(xiàn)程枚舉法竞,observeOn觀察者所在線(xiàn)程耙厚,可重復(fù)調(diào)用,當(dāng)然只保留最后一次指定的線(xiàn)程

        TaskObserve(TaskObserve middle) {
            this.taskEmitter = middle.taskEmitter;
            this.observeOnScheduler = middle.observeOnScheduler;
            this.emitters = middle.emitters;
        }

        public <TR> TaskObserve<TR> map(Function<? super T, ? extends TR> f) {
            this.emitters.add(new FunctionEmitter<T, TR>(f, observeOnScheduler));
            return new TaskObserve<TR>(this);
        }

map轉(zhuǎn)換時(shí)岔霸,將 轉(zhuǎn)換體Function 薛躬、當(dāng)前 線(xiàn)程枚舉 observeOnScheduler注入 FunctionEmitter ,添加到 轉(zhuǎn)換隊(duì)列呆细。
返回新的實(shí)例TaskObserve型宝,丟棄當(dāng)前TaskObserve實(shí)例八匠,新實(shí)例線(xiàn)程枚舉observeOnScheduler默認(rèn)為默認(rèn)線(xiàn)程

Step 3: Subscribe,才是開(kāi)始E亢ā@媸鳌!

核心思想

  1. 先執(zhí)行 源任務(wù) 岖寞,返回值
  2. 遞歸從 轉(zhuǎn)換隊(duì)列 取出 FunctionEmitter (含有轉(zhuǎn)換體抡四、線(xiàn)程枚舉),Schedulers.switchThread(...)指定線(xiàn)程執(zhí)行仗谆,轉(zhuǎn)換返回值
  3. 轉(zhuǎn)換隊(duì)列 執(zhí)行盡指巡,提交任務(wù),任務(wù)結(jié)束
        public void subscribe(final Observer<T> callback) {
            // 指定源任務(wù)線(xiàn)程枚舉
            Schedulers.switchThread(taskEmitter.scheduler, () -> {
                    try {
                        // 執(zhí)行源任務(wù)
                        Object t = taskEmitter.task.run();
                        // 轉(zhuǎn)換隊(duì)列是否為空
                        if (assertInterrupt(t)) {
                            // 轉(zhuǎn)換隊(duì)列空隶垮,提交本次任務(wù)藻雪,任務(wù)結(jié)束
                            submit(t, callback);
                            return;
                        }
                        // 轉(zhuǎn)換隊(duì)列不為空,繼續(xù)轉(zhuǎn)換
                        apply(t, emitters, callback);
                    } catch (Throwable e) {
                        // 任務(wù)流拋出異常狸吞,即時(shí)中斷勉耀,任務(wù)結(jié)束
                        error(e, callback);
                    }
            });
        }
        private boolean assertInterrupt(Object emitter) throws Exception {
            if (emitter == null) {
                // 轉(zhuǎn)換返回值,不能為NullLF1愠狻!
                throw new RuntimeException("Apply output must not be null!");
            }
            return emitters.size() <= 0;
        }

assertInterrupt判斷當(dāng)前轉(zhuǎn)換隊(duì)列暖侨,是否執(zhí)行盡了

Step 3 - 1: Apply轉(zhuǎn)換隊(duì)列轉(zhuǎn)換

        private <E, F> void apply(final E o, final List<FunctionEmitter> emitters, final Observer<F> callback) {
            // 依次從轉(zhuǎn)換隊(duì)列取出FunctionEmitter椭住,然后移除
            final FunctionEmitter<E, F> f = emitters.get(0);
            emitters.remove(f);
            // 指定當(dāng)前轉(zhuǎn)換線(xiàn)程枚舉
            Schedulers.switchThread(f.scheduler, () -> {
                    try {
                        // 轉(zhuǎn)換,返回轉(zhuǎn)換值
                        Object emitter = f.function.apply(o);
                        // 轉(zhuǎn)換隊(duì)列是否為空
                        if (assertInterrupt(emitter)) {
                            // 轉(zhuǎn)換隊(duì)列空字逗,提交本次任務(wù)京郑,任務(wù)結(jié)束
                            submit(emitter, callback);
                            return;
                        }
                        // 轉(zhuǎn)換隊(duì)列不為空,繼續(xù)轉(zhuǎn)換
                        apply(emitter, emitters, callback);
                    } catch (Throwable e) {
                        // 任務(wù)流拋出異常葫掉,即時(shí)中斷些举,任務(wù)結(jié)束
                        error(e, callback);
                    }
            });
        }

Step 3 - 2: Submit提交

        private <S> void submit(final Object result, final Observer<S> callback) {
            // 指定當(dāng)前轉(zhuǎn)換線(xiàn)程枚舉,即當(dāng)前中間件線(xiàn)程枚舉observeOnScheduler
            Schedulers.switchThread(observeOnScheduler, () -> {
                    try {
                        if (callback != null) {
                            // 成功俭厚,任務(wù)結(jié)束
                            callback.onNext((S) result);
                        }
                    } catch (Throwable e) {
                        error(e, callback);
                    }
            });
        }

        private <S> void error(final Throwable e, final Observer<S> callback) {
            // 指定當(dāng)前轉(zhuǎn)換線(xiàn)程枚舉户魏,即當(dāng)前中間件線(xiàn)程枚舉observeOnScheduler
            Schedulers.switchThread(observeOnScheduler, () -> {
                    if (callback != null) {
                        // 出錯(cuò),任務(wù)結(jié)束
                        callback.onError(e);
                    }
            });
        }

小結(jié):

泛型: java泛型屬于類(lèi)型擦除挪挤,無(wú)論T叼丑、F還是R...,最終都是Object扛门。
設(shè)計(jì): 這里的 任務(wù)流 實(shí)現(xiàn)方式為遞歸嵌套調(diào)用鸠信。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市论寨,隨后出現(xiàn)的幾起案子星立,更是在濱河造成了極大的恐慌爽茴,老刑警劉巖,帶你破解...
    沈念sama閱讀 206,968評(píng)論 6 482
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件绰垂,死亡現(xiàn)場(chǎng)離奇詭異室奏,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)劲装,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,601評(píng)論 2 382
  • 文/潘曉璐 我一進(jìn)店門(mén)胧沫,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人酱畅,你說(shuō)我怎么就攤上這事琳袄〗。” “怎么了纺酸?”我有些...
    開(kāi)封第一講書(shū)人閱讀 153,220評(píng)論 0 344
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)址否。 經(jīng)常有香客問(wèn)我餐蔬,道長(zhǎng),這世上最難降的妖魔是什么佑附? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 55,416評(píng)論 1 279
  • 正文 為了忘掉前任樊诺,我火速辦了婚禮,結(jié)果婚禮上音同,老公的妹妹穿的比我還像新娘词爬。我一直安慰自己,他們只是感情好权均,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,425評(píng)論 5 374
  • 文/花漫 我一把揭開(kāi)白布顿膨。 她就那樣靜靜地躺著,像睡著了一般叽赊。 火紅的嫁衣襯著肌膚如雪恋沃。 梳的紋絲不亂的頭發(fā)上,一...
    開(kāi)封第一講書(shū)人閱讀 49,144評(píng)論 1 285
  • 那天必指,我揣著相機(jī)與錄音囊咏,去河邊找鬼。 笑死塔橡,一個(gè)胖子當(dāng)著我的面吹牛梅割,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播葛家,決...
    沈念sama閱讀 38,432評(píng)論 3 401
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼户辞,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了惦银?” 一聲冷哼從身側(cè)響起咆课,我...
    開(kāi)封第一講書(shū)人閱讀 37,088評(píng)論 0 261
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤末誓,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后书蚪,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體喇澡,經(jīng)...
    沈念sama閱讀 43,586評(píng)論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,028評(píng)論 2 325
  • 正文 我和宋清朗相戀三年殊校,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了晴玖。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 38,137評(píng)論 1 334
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡为流,死狀恐怖呕屎,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情敬察,我是刑警寧澤秀睛,帶...
    沈念sama閱讀 33,783評(píng)論 4 324
  • 正文 年R本政府宣布,位于F島的核電站莲祸,受9級(jí)特大地震影響蹂安,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜锐帜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,343評(píng)論 3 307
  • 文/蒙蒙 一毫玖、第九天 我趴在偏房一處隱蔽的房頂上張望脱吱。 院中可真熱鬧切油,春花似錦庆杜、人聲如沸。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 30,333評(píng)論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至语泽,卻和暖如春贸典,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背踱卵。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 31,559評(píng)論 1 262
  • 我被黑心中介騙來(lái)泰國(guó)打工廊驼, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人惋砂。 一個(gè)月前我還...
    沈念sama閱讀 45,595評(píng)論 2 355
  • 正文 我出身青樓妒挎,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親西饵。 傳聞我的和親對(duì)象是個(gè)殘疾皇子酝掩,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,901評(píng)論 2 345

推薦閱讀更多精彩內(nèi)容

  • Spring Cloud為開(kāi)發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見(jiàn)模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn)眷柔,斷路器期虾,智...
    卡卡羅2017閱讀 134,601評(píng)論 18 139
  • Android 自定義View的各種姿勢(shì)1 Activity的顯示之ViewRootImpl詳解 Activity...
    passiontim閱讀 171,522評(píng)論 25 707
  • 作為一本被眾人追捧原朝,被眾多知識(shí)型大咖認(rèn)可的暢銷(xiāo)書(shū),提起想要閱讀它的心镶苞,并不難喳坠, 但真正開(kāi)始讀下去,卻也不是件容易的...
    小夭生活館閱讀 1,527評(píng)論 0 2
  • 歲末豈無(wú)雪茂蚓,有意故來(lái)遲壕鹉。 寒夜燈帳里,斟酌古人詩(shī)聋涨。 【2015年1月24日】 ?
    d03e056874dc閱讀 214評(píng)論 0 0
  • 劉 娜 焦點(diǎn)解決網(wǎng)絡(luò)初級(jí)九期 駐馬店 2018~05~30 堅(jiān)持分享第95天 今天在天中晚報(bào)公眾微信號(hào)上看...
    洋帆起航閱讀 107評(píng)論 0 0