RxJava2.0從入門到放棄(二)

前言

RxJava2.0從入門到放棄(一)中簡單介紹了我對RxJava的理解以及RxJava最基本的一個(gè)寫法惜姐。這一部分繼續(xù)講講RxJava最重要的一個(gè)環(huán)節(jié)關(guān)于線程得調(diào)度。(本文案例用kotlin來做案例椿息。)

至于為什么說是最重要的载弄?RxJava在github是這么定義自己的RxJava is a Java VM implementation of [Reactive Extensions](http://reactivex.io/): a library for composing asynchronous and event-based programs by using observable sequences. 也就是說RxJava是一個(gè)專注于用來解決異步以及事件驅(qū)動(dòng)的庫。

講解之前我們先拋出一個(gè)問題吧:

先從IO線程獨(dú)讀一個(gè)文件夾撵颊,
再把文件夾里面的png圖片篩選出來宇攻,
然后在主線程中把這些圖片加載在UI上。

面對這樣一個(gè)需求該怎么處理倡勇。用Thread應(yīng)該差不多這樣實(shí)現(xiàn)

final File[] files = file.listFiles();
        new Thread(new Runnable() {
            @Override
            public void run() {
                for(File f:files) {
                    if(f.getName().endsWith(".png")){
                       final Bitmap bitmap = transFileToBitmap(f);
                       runOnUiThread(new Runnable() {
                           @Override
                           public void run() {
                               updateUI(bitmap);
                           }
                       });
                    }
                }
            }
        }).start();

后面再來用RxJava來實(shí)現(xiàn)一下這個(gè)需求逞刷。

正文

RxJava整體的線程調(diào)度涉及到三個(gè)關(guān)鍵點(diǎn)分別是subscribeOn observeOn Scheduler

RxJava在不指定線程的情況下妻熊,RxJava保持者線程不變的原則夸浅。也就是說『上游』在哪個(gè)線程上創(chuàng)建事件,『下游』就是在哪個(gè)線程上處理事件扔役,『上游』和『下游』線程保持一致帆喇。

用代碼來驗(yàn)證下:

 Observable.create<Int> { e ->
            for (i in 0..5) {
                Log.e(TAG, "Observable thread ${Thread.currentThread().name}")
                Log.e(TAG, "observable  $i")
                e.onNext(i)
            }
        }
                .subscribe { int ->
                    Log.e(TAG, "onNext  $int")
                    Log.e(TAG, "subscribe thread ${Thread.currentThread().name}")
                }

輸出結(jié)果是這樣:

08-23 17:55:33.635 19473 19473 E RxTag   : Observable thread main
08-23 17:55:33.635 19473 19473 E RxTag   : observable  0
08-23 17:55:33.635 19473 19473 E RxTag   : onNext  0
08-23 17:55:33.635 19473 19473 E RxTag   : subscribe thread main
08-23 17:55:33.635 19473 19473 E RxTag   : Observable thread main
08-23 17:55:33.635 19473 19473 E RxTag   : observable  1
08-23 17:55:33.635 19473 19473 E RxTag   : onNext  1
08-23 17:55:33.635 19473 19473 E RxTag   : subscribe thread main
08-23 17:55:33.635 19473 19473 E RxTag   : Observable thread main
08-23 17:55:33.635 19473 19473 E RxTag   : observable  2
08-23 17:55:33.635 19473 19473 E RxTag   : onNext  2
08-23 17:55:33.635 19473 19473 E RxTag   : subscribe thread main
08-23 17:55:33.635 19473 19473 E RxTag   : Observable thread main
08-23 17:55:33.635 19473 19473 E RxTag   : observable  3
08-23 17:55:33.635 19473 19473 E RxTag   : onNext  3
08-23 17:55:33.635 19473 19473 E RxTag   : subscribe thread main
08-23 17:55:33.635 19473 19473 E RxTag   : Observable thread main
08-23 17:55:33.635 19473 19473 E RxTag   : observable  4
08-23 17:55:33.635 19473 19473 E RxTag   : onNext  4
08-23 17:55:33.635 19473 19473 E RxTag   : subscribe thread main
08-23 17:55:33.635 19473 19473 E RxTag   : Observable thread main
08-23 17:55:33.635 19473 19473 E RxTag   : observable  5
08-23 17:55:33.635 19473 19473 E RxTag   : onNext  5
08-23 17:55:33.635 19473 19473 E RxTag   : subscribe thread main

可以看到所有的運(yùn)行都是在main線程運(yùn)行的,可以驗(yàn)證:

RxJava在不指定線程的情況下,『上游』和『下游』線程保持一致亿胸。

如果指定線程的話該怎么做坯钦?
在RxJava中可以分別通過 subscribeOn()observerOn()這兩個(gè)方法來指定『上游』事件產(chǎn)生的線程以及『下游』事件響應(yīng)的線程。
具體怎么做我們來看代碼:

Observable.create<Int> { e ->
            for (i in 0..2) {
                Log.e(TAG, "Observable thread ${Thread.currentThread().name}")
                Log.e(TAG, "observable  $i")
                e.onNext(i)
            }
        }
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe { int ->
                    Log.e(TAG, "onNext  $int")
                    Log.e(TAG, "subscribe thread ${Thread.currentThread().name}")
                }

輸出結(jié)果是:

08-24 11:02:28.614 19473 21708 E RxTag   : Observable thread RxCachedThreadScheduler-1
08-24 11:02:28.614 19473 21708 E RxTag   : observable  0
08-24 11:02:28.614 19473 21708 E RxTag   : Observable thread RxCachedThreadScheduler-1
08-24 11:02:28.615 19473 21708 E RxTag   : observable  1
08-24 11:02:28.615 19473 21708 E RxTag   : Observable thread RxCachedThreadScheduler-1
08-24 11:02:28.615 19473 21708 E RxTag   : observable  2
08-24 11:02:28.982 19473 19473 E RxTag   : onNext  0
08-24 11:02:28.982 19473 19473 E RxTag   : subscribe thread main
08-24 11:02:28.982 19473 19473 E RxTag   : onNext  1
08-24 11:02:28.983 19473 19473 E RxTag   : subscribe thread main
08-24 11:02:28.983 19473 19473 E RxTag   : onNext  2
08-24 11:02:28.983 19473 19473 E RxTag   : subscribe thread main

『上游』事件產(chǎn)生在RxCachedThreadScheduler-1這個(gè)線程侈玄,『下游』事件響應(yīng)的onNext()在main線程婉刀。

那么我們關(guān)注下Scheduler的幾個(gè)線程名稱:

  • Schedulers.trampoline() : 相當(dāng)于不指定線程。直接在之前的線程運(yùn)行序仙,依賴于調(diào)用操作的線程突颊。
  • Schedulers.io():(讀寫文件、讀寫數(shù)據(jù)庫潘悼、網(wǎng)絡(luò)信息交互等)所使用的 Scheduler律秃。行為模式和 newThread() 差不多,區(qū)別在于 io() 的內(nèi)部實(shí)現(xiàn)是是用一個(gè)無數(shù)量上限的線程池治唤,可以重用空閑的線程棒动,因此多數(shù)情況下 io() 比 newThread() 更有效率;
  • Schedulers.newThread(): 總是啟用新線程,并在新線程中執(zhí)行操作肝劲;
  • Schedulers.single(): 啟用一個(gè)線程池大小為1的線程池迁客,相當(dāng)于(newScheduledThreadPool(1))郭宝,重復(fù)利用這個(gè)線程;
  • Schedulers.computation(): 計(jì)算所使用的 Scheduler。這個(gè)計(jì)算指的是 CPU 密集型計(jì)算掷漱,即不會(huì)被 I/O 等操作限制性能的操作粘室,例如圖形的計(jì)算。這個(gè) Scheduler 使用的固定的線程池卜范,大小為 CPU 核數(shù)衔统。不要把 I/O 操作放在 computation() 中,否則 I/O 操作的等待時(shí)間會(huì)浪費(fèi) CPU海雪。

在RxAndroid中就會(huì)有這么一個(gè)線程:

  • AndroidSchedulers.mainThread():運(yùn)行在Android主線程中锦爵。main UI線程。

那么在Android中最簡單的異步或者請求網(wǎng)絡(luò)就這么寫了:

Observable.create<String> { e ->
            //請求網(wǎng)絡(luò)奥裸,返回一個(gè)String
            val str:String = api.getString()
            e.onNext(str)
        }.subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe({
                    str ->   
                  //獲得String险掀,更新到UI
                    updateUI(str)
                },{
                    error->
                    //連接錯(cuò)誤,提示錯(cuò)誤信息
                    netWorkError(error.message)
                })

回顧文章最開始提出的問題我們就可以這么的用 RxJava實(shí)現(xiàn)出來:

Observable.fromArray(f.listFiles())
                .filter(new Predicate<File>() {
                    @Override
                    public boolean test(File file) throws Exception {
                        return file.getName().endsWith(".png");
                    }
                })
                .map(new Function<File, Bitmap>() {
                    @Override
                    public Bitmap apply(File file) throws Exception {
                        return getBitMapFromFile(file);
                    }
                })
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<Bitmap>() {
                    @Override
                    public void accept(Bitmap bitmap) throws Exception {
                        updateUI(bitmap);
                    }
                }, new Consumer<Throwable>() {
                    @Override
                    public void accept(Throwable throwable) throws Exception {
                        showError(throwable.getMessage());
                    }
                });
    }

但是有兩點(diǎn)需要注意:

  • subscribeOn 講『上游』事件的發(fā)射切換到 Scheduler 所定義的線程湾宙, 如果多次調(diào)用 subscribeOn(),那么只有第一個(gè) subscribeOn 操作有效 樟氢;
  • observeOn 指定 observeOn 后續(xù)操作所在線程。也就是說 可以多次調(diào)用observeOn 可以多次切換接下來『下游』事件處理的線程 侠鳄;

舉個(gè)栗子吧:

    Observable.create<Int> { emitter ->
            for (i in 0..2) {
                Log.e(TAG, "Observable thread ${Thread.currentThread().name}")
                Log.e(TAG, "observable  $i")
                emitter.onNext(i)
            }
        }
                .subscribeOn(Schedulers.io())
                .subscribeOn(Schedulers.newThread())
                .observeOn(Schedulers.computation())
                .doOnNext(consumer())
                .observeOn(Schedulers.io())
                .doOnNext(consumer())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(consumer())
    }

     private fun consumer(): Consumer<Int> {
        return Consumer { i ->
            Log.e(TAG, "onNext thread ${Thread.currentThread().name}")
            Log.e(TAG, "onNext $i")
        }
    }

看一看輸出結(jié)果就證明之前的注意點(diǎn):

08-24 14:54:41.670 26674 27052 E RxTag   : Observable thread RxCachedThreadScheduler-6
08-24 14:54:41.670 26674 27052 E RxTag   : observable  0
08-24 14:54:41.670 26674 27052 E RxTag   : Observable thread RxCachedThreadScheduler-6
08-24 14:54:41.670 26674 27052 E RxTag   : observable  1
08-24 14:54:41.670 26674 27052 E RxTag   : Observable thread RxCachedThreadScheduler-6
08-24 14:54:41.670 26674 27052 E RxTag   : observable  2
08-24 14:54:41.672 26674 26880 E RxTag   : onNext thread RxComputationThreadPool-2
08-24 14:54:41.688 26674 26880 E RxTag   : onNext 0
08-24 14:54:41.688 26674 26880 E RxTag   : onNext thread RxComputationThreadPool-2
08-24 14:54:41.688 26674 26880 E RxTag   : onNext 1
08-24 14:54:41.688 26674 26880 E RxTag   : onNext thread RxComputationThreadPool-2
08-24 14:54:41.688 26674 26880 E RxTag   : onNext 2
08-24 14:54:41.691 26674 27054 E RxTag   : onNext thread RxCachedThreadScheduler-7
08-24 14:54:41.691 26674 27054 E RxTag   : onNext 0
08-24 14:54:41.697 26674 27054 E RxTag   : onNext thread RxCachedThreadScheduler-7
08-24 14:54:41.697 26674 27054 E RxTag   : onNext 1
08-24 14:54:41.698 26674 27054 E RxTag   : onNext thread RxCachedThreadScheduler-7
08-24 14:54:41.698 26674 27054 E RxTag   : onNext 2
08-24 14:54:42.058 26674 26674 E RxTag   : onNext thread main
08-24 14:54:42.058 26674 26674 E RxTag   : onNext 0
08-24 14:54:42.058 26674 26674 E RxTag   : onNext thread main
08-24 14:54:42.059 26674 26674 E RxTag   : onNext 1
08-24 14:54:42.059 26674 26674 E RxTag   : onNext thread main
08-24 14:54:42.059 26674 26674 E RxTag   : onNext 2

我們調(diào)用了兩次 subscribeOn()分別是 io()和newThread(),但是輸出結(jié)果就只有在RxCachedThreadScheduler -6線程中埠啃。但是每次調(diào)用doOnNext()都切換了一個(gè)線程,也就是說可以隨時(shí)隨地切換事件的處理線程伟恶。

總結(jié)

線程的調(diào)度就到這結(jié)束了碴开,把握好subscribeOn observableOn 以及scheduler 的切換,就能隨心所欲的進(jìn)行切換線程切換啦博秫。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末潦牛,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子台盯,更是在濱河造成了極大的恐慌罢绽,老刑警劉巖,帶你破解...
    沈念sama閱讀 221,635評論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件静盅,死亡現(xiàn)場離奇詭異,居然都是意外死亡寝殴,警方通過查閱死者的電腦和手機(jī)蒿叠,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,543評論 3 399
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來蚣常,“玉大人市咽,你說我怎么就攤上這事〉治茫” “怎么了施绎?”我有些...
    開封第一講書人閱讀 168,083評論 0 360
  • 文/不壞的土叔 我叫張陵溯革,是天一觀的道長。 經(jīng)常有香客問我谷醉,道長致稀,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 59,640評論 1 296
  • 正文 為了忘掉前任俱尼,我火速辦了婚禮抖单,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘遇八。我一直安慰自己矛绘,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,640評論 6 397
  • 文/花漫 我一把揭開白布刃永。 她就那樣靜靜地躺著货矮,像睡著了一般。 火紅的嫁衣襯著肌膚如雪斯够。 梳的紋絲不亂的頭發(fā)上次屠,一...
    開封第一講書人閱讀 52,262評論 1 308
  • 那天,我揣著相機(jī)與錄音雳刺,去河邊找鬼劫灶。 笑死,一個(gè)胖子當(dāng)著我的面吹牛掖桦,可吹牛的內(nèi)容都是我干的本昏。 我是一名探鬼主播,決...
    沈念sama閱讀 40,833評論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼枪汪,長吁一口氣:“原來是場噩夢啊……” “哼涌穆!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起雀久,我...
    開封第一講書人閱讀 39,736評論 0 276
  • 序言:老撾萬榮一對情侶失蹤宿稀,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后赖捌,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體祝沸,經(jīng)...
    沈念sama閱讀 46,280評論 1 319
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,369評論 3 340
  • 正文 我和宋清朗相戀三年越庇,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了罩锐。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,503評論 1 352
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡卤唉,死狀恐怖涩惑,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情桑驱,我是刑警寧澤竭恬,帶...
    沈念sama閱讀 36,185評論 5 350
  • 正文 年R本政府宣布蹦掐,位于F島的核電站被廓,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜标沪,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,870評論 3 333
  • 文/蒙蒙 一肺孤、第九天 我趴在偏房一處隱蔽的房頂上張望汉柒。 院中可真熱鬧琅坡,春花似錦、人聲如沸亭螟。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,340評論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽预烙。三九已至墨微,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間扁掸,已是汗流浹背翘县。 一陣腳步聲響...
    開封第一講書人閱讀 33,460評論 1 272
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留谴分,地道東北人锈麸。 一個(gè)月前我還...
    沈念sama閱讀 48,909評論 3 376
  • 正文 我出身青樓,卻偏偏與公主長得像忘伞,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個(gè)殘疾皇子沙兰,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,512評論 2 359

推薦閱讀更多精彩內(nèi)容