抱著陌生的態(tài)度再看Rxjava(二)

如果你已經(jīng)有了Rxjava1的使用基礎(chǔ),你可以看一下這一篇的大體的提綱,了解就可以鏈接到三堤舒,說不定三都不用看。奠旺。
如果沒有Rxjava1的基礎(chǔ),就 定心點施流,小腳并并攏响疚,坐正了往下看

subscribeOn和observeOn

  • 初見

我們之前在嘗試Observable或者是Flowable的subscribe方法時候瞪醋,有沒有在意IDE自動幫我們彈出的方法里有subscribeOn這個鬼忿晕,于是我又好奇的點開Observable的源碼,搜了下observeOn银受,結(jié)果也有杏糙。

看了下注釋,比較抽象蚓土。按照我們的chinglish直接從方法上來翻譯是在什么地方訂閱,在什么地方觀察赖淤。

按照我們之前的上下游觀點蜀漆,上游的人拋東西下來,下游的人接住咱旱。那么很顯然下游的人肯定是觀察的人确丢,這個毋庸置疑。那么相應(yīng)的上游的人就是訂閱者(實際上訂閱者還是太費解吐限,我們把它理解成拋東西的人)

那么這兩個方法現(xiàn)在按照我們的理解就是在哪里拋東西鲜侥,和在哪里觀察。

OK诸典,我們來看一下這兩個方法的傳參描函,都是Scheduler

點進去一看狐粱。舀寓。。肌蜻。抽象類互墓。。不能直接用了蒋搜。篡撵。判莉。跪了。育谬。

完券盅。。斑司。渗饮。

  • 再探

    就這么結(jié)束了?宿刮?開玩笑;フ尽!

    還記得在第一篇中僵缺,我們不知道什么東西跟Subscriber一起連用的時候我們怎么做來著胡桃!沒錯!磕潮!github翠胰,源碼目錄走起。


    schedulers目錄

    我們在根目錄下發(fā)現(xiàn)schedulers目錄自脯,打開之景,看到Schedulers.java文件,點開來一看

public final class Schedulers {
    @NonNull
    static final Scheduler SINGLE;

    @NonNull
    static final Scheduler COMPUTATION;

    @NonNull
    static final Scheduler IO;

    @NonNull
    static final Scheduler TRAMPOLINE;

    @NonNull
    static final Scheduler NEW_THREAD;
    ...
}

給了我們五個靜態(tài)類膏潮,雖然看了一下锻狗,它似乎又在類中維護了這五個Scheduler的單例,但是畢竟是protected的不能直接給哥們兒拿來用啊焕参。

然后一不小心轻纪,接著往下看的時候發(fā)現(xiàn)了,公有的靜態(tài)方法叠纷,并且返回的還是我們正好需要的Scheduler刻帚,我去:

    //處理io
    public static Scheduler io() 
    //處理復(fù)雜計算
    public static Scheduler computation() 
    //普通的單獨線程
    public static Scheduler single() 
    //新起一個線程
    public static Scheduler newThread()
    //在當前線程中,但是會等到當前線程任務(wù)執(zhí)行完畢之后再去執(zhí)行
    public static Scheduler trampoline()

相應(yīng)的注釋涩嚣,我也差不多備注在方法上崇众。

于是我們對我們的代碼做一定的改動,然后打上相應(yīng)的log

flowable.subscribeOn(Schedulers.single()).observeOn(Schedulers.newThread()).subscribe(subscriber)缓艳;
//當前線程名
Thread.currentThread().getName()

如果你不知道這段代碼是獲得當前線程的名字校摩,那要么出門左轉(zhuǎn)java線程基礎(chǔ),要么ctrl + W阶淘。衙吩。。溪窒。

我們發(fā)現(xiàn)答應(yīng)出來的log并沒有什么問題坤塞,和我們預(yù)料的一樣

02-22 01:05:19.346 31814-31814/org.ding.testmulti E/subscriber: onSubscribe   thread   :    main
02-22 01:05:19.355 31814-4422/org.ding.testmulti E/subscriber: onNext    :    s  hello flowable1
02-22 01:05:19.355 31814-4422/org.ding.testmulti E/subscriber: onNext   thread   :    RxNewThreadScheduler-1
02-22 01:05:19.355 31814-4422/org.ding.testmulti E/subscriber: onNext    :    s  hello flowable2
02-22 01:05:19.355 31814-4422/org.ding.testmulti E/subscriber: onNext   thread   :    RxNewThreadScheduler-1
02-22 01:05:19.355 31814-4423/org.ding.testmulti E/flowable: thread   :    RxSingleScheduler-1
subscriber就是我們所說的下游的觀察者冯勉,由于我們這里使用到的是flowable和subscriber這一對鴛鴦。需要在onSubscribe方法中去調(diào)用Subscription的request方法摹芙,就很顯然的可以理解灼狰,在onSubscribe方法中實際上我們還沒有開始建立真正的連接,直到request之后我們在onNext浮禾,onError交胚,onComplete中才是真正在observerOn的線程中運行的,可以看到兩個onNext都是newThread沒錯盈电。
 subscribeOn就是上游的線程蝴簇,定義在flowable的subscribe中的方法就是運行在我們定義的singleThread沒錯。
  • 細思
    • 主線程怎么沒有
      主線程是我們使用最最頻繁的線程了匆帚,所有的UI操作都要放在我們的主線程中去進行熬词,那設(shè)想一下,如果我們需要在上游或者是下游做一些UI操作吸重,當然如果我們沒有刻意的去使用SubscribeOnObserverOn互拾,而我們的subscribe方法又正好在主線程中調(diào)用,那沒有問題嚎幸,整個都是在主線程中跑的颜矿,要是我們使用了呢,怎么辦呢嫉晶,Schedulers里面并沒有提供主線程這個東西啊或衡,沒有main這個東西啊

    我又去找github了,目錄翻了一圈车遂,也沒有找到MainThread這個鬼,好了斯辰,這回真放棄舶担,大家再見。彬呻。衣陶。。

    醒了醒神闸氮,再回過頭想想剪况,主線程這個東西,是否是安卓特地適配的呢蒲跨,rxjava中怎么可能會出現(xiàn)android特有的東西呢译断。。或悲。

    • 為我們的rxjava添上Android的模塊

    說時遲孙咪,那時快堪唐。。翎蹈。淮菠。(好老)
    我們立馬前往ReativeX的github庫,看到RxGo荤堪,RxKotlin(java8來了你顫抖么)合陵,再往下,找到了RxAndroid3窝簟拥知!
    點進去,看目錄?苡举庶!

    rxandroid目錄結(jié)構(gòu)

    你沒有看錯,rxandroid里面只有這些揩抡。户侥。。
    還正好出現(xiàn)了我們需要的AndroidSchedulers峦嗤,趕緊吃飽辣條蕊唐,點擊去看看

/** A {@link Scheduler} which executes actions on the Android main thread. */
public static Scheduler mainThread() {
return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
}

    還要我說廢話么。烁设。替梨。
    去module setting里面搜搜看這個dependency,注意哦装黑,rxandroid也有適配rxjava1和rxjava2的兩種版本哦副瀑,我們使用的是rxjava2的版本
compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
    * ######上下游的On關(guān)系
    不知道細心的你有沒有去嘗試過一個問題,只用subscribeOn和只用observerOn會出現(xiàn)什么情況恋谭。
    通過打印日志糠睡,你會發(fā)現(xiàn)

          如果只定義了上游的線程,而沒有定義下游的線程疚颊,那么下游的線程將跟隨上游的線程狈孔;
          如果只定義了下游的線程,那么上游的線程將依然使用當前線程材义。
    
    實際上也很好理解均抽,上游發(fā)生了海嘯洪水必然會影響下游,導(dǎo)致下游也波瀾其掂。而下游起了波瀾油挥,上游該咋地還咋地。

    * ######Schedulers的幾個方法具體區(qū)別
    我們在上面大體的把幾個都解釋了一下,但是具體的什么時候用什么呢喘漏,我們一一來看
      * compute
        還記的上面說的維護了靜態(tài)五個Scheduler么护蝶,我們看到compute對應(yīng)的Scheduler是`ComputationScheduler`,源碼走起翩迈!
     ```
      /**
        * Create a scheduler with pool size equal to the available processor
        * count and using least-recent worker selection policy.
        */
    public ComputationScheduler() {
            this(THREAD_FACTORY);
    }
      ```
  注釋夠明顯了么持灰,翻譯過來大致是說會起一個線程池,大小跟available processor的數(shù)量(就是CPU數(shù)量)相等负饲,并使用最近的工作線程選擇策略堤魁。
   也就是說,compute方法會用把方法放在一個大小等于CPU核數(shù)的線程池中執(zhí)行返十。
     * single
      ```
public SingleScheduler() {
          this(SINGLE_THREAD_FACTORY);
    }
    public SingleScheduler(ThreadFactory threadFactory) {
          this.threadFactory = threadFactory;
          executor.lazySet(createExecutor(threadFactory));
    }
    static ScheduledExecutorService createExecutor(ThreadFactory threadFactory) {
          return SchedulerPoolFactory.create(threadFactory);
    }
我們看到也是創(chuàng)建了一個線程池妥泉,我們?nèi)reate方法繼續(xù)看,
 public static ScheduledExecutorService create(ThreadFactory factory) {
            final ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, factory);
            if (exec instanceof ScheduledThreadPoolExecutor) {
                ScheduledThreadPoolExecutor e = (ScheduledThreadPoolExecutor) exec;
                POOLS.put(e, exec);
            }
            return exec;
    }
public ScheduledThreadPoolExecutor(int corePoolSize,
                                       ThreadFactory threadFactory) {
              super(corePoolSize, Integer.MAX_VALUE,
              DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
              new DelayedWorkQueue(), threadFactory);
    }

不知道在android面試實用版2中洞坑,大家有否記得盲链,說道的幾個線程池,這里用到到就是計劃線程池迟杂,主要是用來在未來的某一時刻進行執(zhí)行的線程池刽沾,我們看到傳入的是1,也就是說single會讓方法在核心線程為1的線程池中工作排拷。

 * io
   ```

public IoScheduler() {
this(WORKER_THREAD_FACTORY);
}
public IoScheduler(ThreadFactory threadFactory) {
this.threadFactory = threadFactory;
this.pool = new AtomicReference<CachedWorkerPool>(NONE);
start();
}

   使用的一個叫`CachedWorkerPool`的內(nèi)部類侧漓,

CachedWorkerPool(long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) {
this.keepAliveTime = unit != null ? unit.toNanos(keepAliveTime) : 0L;
this.expiringWorkerQueue = new ConcurrentLinkedQueue<ThreadWorker>();
this.allWorkers = new CompositeDisposable();
this.threadFactory = threadFactory;

        ScheduledExecutorService evictor = null;
        Future<?> task = null;
        if (unit != null) {
            evictor = Executors.newScheduledThreadPool(1, EVICTOR_THREAD_FACTORY);
            task = evictor.scheduleWithFixedDelay(this, this.keepAliveTime, this.keepAliveTime, TimeUnit.NANOSECONDS);
        }
        evictorService = evictor;
        evictorTask = task;
    }
     ScheduledThreadPoo線程池,每一個任務(wù)相隔60納秒(好小的樣子)监氢。并且維護了一個任務(wù)隊列

ThreadWorker get() {
if (allWorkers.isDisposed()) {
return SHUTDOWN_THREAD_WORKER;
}
while (!expiringWorkerQueue.isEmpty()) {
ThreadWorker threadWorker = expiringWorkerQueue.poll();
if (threadWorker != null) {
return threadWorker;
}
}

        // No cached worker found, so create a new one.
        ThreadWorker w = new ThreadWorker(threadFactory);
        allWorkers.add(w);
        return w;
    }
   可以不斷的對任務(wù)隊列進行處理布蔗,可以認為這個隊列是無限長的,好了浪腐。纵揍。這邊扯得有點遠。议街。
   也就是說io會讓方法執(zhí)行在一個無數(shù)量上線的線程池

     * newThread
     newThreadScheduler的代碼是最少的骡男,我們直接可以看到
       ```
@NonNull
    @Override
    public Worker createWorker() {
            return new NewThreadWorker(threadFactory);
    }
 每次都會起一個新的線程,也就是說newthread會讓方法都在新的線程中執(zhí)行
  * trampoline
  如注釋傍睹。就是在當前線程中執(zhí)行,并且等當前線程執(zhí)行完了之后再去執(zhí)行犹菱。
  別問我怎么沒有源碼了拾稳,因為我不懂。

 那么綜上所述腊脱,一般我們使用的最多的應(yīng)該就是主線程和io了访得,如果你要使用其他的方法,那么具體任務(wù)具體分析了。

電梯

抱著陌生的態(tài)度再看Rxjava(一)
抱著陌生的態(tài)度再看Rxjava(三)
抱著陌生的態(tài)度再看Rxjava(四)

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末悍抑,一起剝皮案震驚了整個濱河市鳄炉,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌搜骡,老刑警劉巖拂盯,帶你破解...
    沈念sama閱讀 211,743評論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異记靡,居然都是意外死亡谈竿,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,296評論 3 385
  • 文/潘曉璐 我一進店門摸吠,熙熙樓的掌柜王于貴愁眉苦臉地迎上來空凸,“玉大人,你說我怎么就攤上這事寸痢⊙街蓿” “怎么了?”我有些...
    開封第一講書人閱讀 157,285評論 0 348
  • 文/不壞的土叔 我叫張陵啼止,是天一觀的道長道逗。 經(jīng)常有香客問我,道長族壳,這世上最難降的妖魔是什么憔辫? 我笑而不...
    開封第一講書人閱讀 56,485評論 1 283
  • 正文 為了忘掉前任,我火速辦了婚禮仿荆,結(jié)果婚禮上贰您,老公的妹妹穿的比我還像新娘。我一直安慰自己拢操,他們只是感情好锦亦,可當我...
    茶點故事閱讀 65,581評論 6 386
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著令境,像睡著了一般杠园。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上舔庶,一...
    開封第一講書人閱讀 49,821評論 1 290
  • 那天抛蚁,我揣著相機與錄音,去河邊找鬼惕橙。 笑死瞧甩,一個胖子當著我的面吹牛,可吹牛的內(nèi)容都是我干的弥鹦。 我是一名探鬼主播肚逸,決...
    沈念sama閱讀 38,960評論 3 408
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了朦促?” 一聲冷哼從身側(cè)響起膝晾,我...
    開封第一講書人閱讀 37,719評論 0 266
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎务冕,沒想到半個月后血当,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,186評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡洒疚,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,516評論 2 327
  • 正文 我和宋清朗相戀三年歹颓,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片油湖。...
    茶點故事閱讀 38,650評論 1 340
  • 序言:一個原本活蹦亂跳的男人離奇死亡巍扛,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出乏德,到底是詐尸還是另有隱情撤奸,我是刑警寧澤,帶...
    沈念sama閱讀 34,329評論 4 330
  • 正文 年R本政府宣布喊括,位于F島的核電站胧瓜,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏郑什。R本人自食惡果不足惜府喳,卻給世界環(huán)境...
    茶點故事閱讀 39,936評論 3 313
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望蘑拯。 院中可真熱鬧钝满,春花似錦、人聲如沸申窘。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,757評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽剃法。三九已至碎捺,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間贷洲,已是汗流浹背收厨。 一陣腳步聲響...
    開封第一講書人閱讀 31,991評論 1 266
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留优构,地道東北人诵叁。 一個月前我還...
    沈念sama閱讀 46,370評論 2 360
  • 正文 我出身青樓,卻偏偏與公主長得像俩块,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 43,527評論 2 349

推薦閱讀更多精彩內(nèi)容