如果你已經(jīng)有了Rxjava1的使用基礎(chǔ),你可以看一下這一篇的大體的提綱,了解就可以鏈接到三堤舒,說不定三都不用看。奠旺。
如果沒有Rxjava1的基礎(chǔ),就 定心點施流,小腳并并攏响疚,坐正了往下看。
subscribeOn和observeOn
-
初見
我們之前在嘗試Observable
或者是Flowable
的subscribe方法時候瞪醋,有沒有在意IDE自動幫我們彈出的方法里有subscribeOn
這個鬼忿晕,于是我又好奇的點開Observable的源碼,搜了下observeOn
银受,結(jié)果也有杏糙。
看了下注釋,比較抽象蚓土。按照我們的chinglish直接從方法上來翻譯是在什么地方訂閱,在什么地方觀察赖淤。
按照我們之前的上下游觀點蜀漆,上游的人拋東西下來,下游的人接住咱旱。那么很顯然下游的人肯定是觀察的人确丢,這個毋庸置疑。那么相應(yīng)的上游的人就是訂閱者(實際上訂閱者還是太費解吐限,我們把它理解成拋東西的人)
那么這兩個方法現(xiàn)在按照我們的理解就是在哪里拋東西鲜侥,和在哪里觀察。
OK诸典,我們來看一下這兩個方法的傳參描函,都是Scheduler
。
點進去一看狐粱。舀寓。。肌蜻。抽象類互墓。。不能直接用了蒋搜。篡撵。判莉。跪了。育谬。
完券盅。。斑司。渗饮。
-
再探
就這么結(jié)束了?宿刮?開玩笑;フ尽!
還記得在第一篇中僵缺,我們不知道什么東西跟Subscriber一起連用的時候我們怎么做來著胡桃!沒錯!磕潮!github翠胰,源碼目錄走起。
我們在根目錄下發(fā)現(xiàn)schedulers目錄自脯,打開之景,看到Schedulers.java文件,點開來一看
public final class Schedulers {
@NonNull
static final Scheduler SINGLE;
@NonNull
static final Scheduler COMPUTATION;
@NonNull
static final Scheduler IO;
@NonNull
static final Scheduler TRAMPOLINE;
@NonNull
static final Scheduler NEW_THREAD;
...
}
給了我們五個靜態(tài)類膏潮,雖然看了一下锻狗,它似乎又在類中維護了這五個Scheduler的單例,但是畢竟是protected的不能直接給哥們兒拿來用啊焕参。
然后一不小心轻纪,接著往下看的時候發(fā)現(xiàn)了,公有的靜態(tài)方法叠纷,并且返回的還是我們正好需要的Scheduler刻帚,我去:
//處理io
public static Scheduler io()
//處理復(fù)雜計算
public static Scheduler computation()
//普通的單獨線程
public static Scheduler single()
//新起一個線程
public static Scheduler newThread()
//在當前線程中,但是會等到當前線程任務(wù)執(zhí)行完畢之后再去執(zhí)行
public static Scheduler trampoline()
相應(yīng)的注釋涩嚣,我也差不多備注在方法上崇众。
于是我們對我們的代碼做一定的改動,然后打上相應(yīng)的log
flowable.subscribeOn(Schedulers.single()).observeOn(Schedulers.newThread()).subscribe(subscriber)缓艳;
//當前線程名
Thread.currentThread().getName()
如果你不知道這段代碼是獲得當前線程的名字校摩,那要么出門左轉(zhuǎn)java線程基礎(chǔ),要么ctrl + W阶淘。衙吩。。溪窒。
我們發(fā)現(xiàn)答應(yīng)出來的log并沒有什么問題坤塞,和我們預(yù)料的一樣
02-22 01:05:19.346 31814-31814/org.ding.testmulti E/subscriber: onSubscribe thread : main
02-22 01:05:19.355 31814-4422/org.ding.testmulti E/subscriber: onNext : s hello flowable1
02-22 01:05:19.355 31814-4422/org.ding.testmulti E/subscriber: onNext thread : RxNewThreadScheduler-1
02-22 01:05:19.355 31814-4422/org.ding.testmulti E/subscriber: onNext : s hello flowable2
02-22 01:05:19.355 31814-4422/org.ding.testmulti E/subscriber: onNext thread : RxNewThreadScheduler-1
02-22 01:05:19.355 31814-4423/org.ding.testmulti E/flowable: thread : RxSingleScheduler-1
subscriber就是我們所說的下游的觀察者冯勉,由于我們這里使用到的是flowable和subscriber這一對鴛鴦。需要在onSubscribe方法中去調(diào)用Subscription的request方法摹芙,就很顯然的可以理解灼狰,在onSubscribe方法中實際上我們還沒有開始建立真正的連接,直到request之后我們在onNext浮禾,onError交胚,onComplete中才是真正在observerOn的線程中運行的,可以看到兩個onNext都是newThread沒錯盈电。
subscribeOn就是上游的線程蝴簇,定義在flowable的subscribe中的方法就是運行在我們定義的singleThread沒錯。
-
細思
-
主線程怎么沒有
主線程是我們使用最最頻繁的線程了匆帚,所有的UI操作都要放在我們的主線程中去進行熬词,那設(shè)想一下,如果我們需要在上游或者是下游做一些UI操作吸重,當然如果我們沒有刻意的去使用SubscribeOn
和ObserverOn
互拾,而我們的subscribe方法又正好在主線程中調(diào)用,那沒有問題嚎幸,整個都是在主線程中跑的颜矿,要是我們使用了呢,怎么辦呢嫉晶,Schedulers里面并沒有提供主線程這個東西啊或衡,沒有main
這個東西啊
我又去找github了,目錄翻了一圈车遂,也沒有找到MainThread這個鬼,好了斯辰,這回真放棄舶担,大家再見。彬呻。衣陶。。
醒了醒神闸氮,再回過頭想想剪况,主線程這個東西,是否是安卓特地適配的呢蒲跨,rxjava中怎么可能會出現(xiàn)android特有的東西呢译断。。或悲。
-
為我們的rxjava添上Android的模塊
說時遲孙咪,那時快堪唐。。翎蹈。淮菠。(好老)
我們立馬前往ReativeX的github庫,看到RxGo荤堪,RxKotlin(java8來了你顫抖么)合陵,再往下,找到了RxAndroid3窝簟拥知!
點進去,看目錄?苡举庶!
你沒有看錯,rxandroid里面只有這些揩抡。户侥。。
還正好出現(xiàn)了我們需要的AndroidSchedulers
峦嗤,趕緊吃飽辣條蕊唐,點擊去看看 -
/** A {@link Scheduler} which executes actions on the Android main thread. */
public static Scheduler mainThread() {
return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
}
還要我說廢話么。烁设。替梨。
去module setting里面搜搜看這個dependency,注意哦装黑,rxandroid也有適配rxjava1和rxjava2的兩種版本哦副瀑,我們使用的是rxjava2的版本
compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
* ######上下游的On關(guān)系
不知道細心的你有沒有去嘗試過一個問題,只用subscribeOn和只用observerOn會出現(xiàn)什么情況恋谭。
通過打印日志糠睡,你會發(fā)現(xiàn)
如果只定義了上游的線程,而沒有定義下游的線程疚颊,那么下游的線程將跟隨上游的線程狈孔;
如果只定義了下游的線程,那么上游的線程將依然使用當前線程材义。
實際上也很好理解均抽,上游發(fā)生了海嘯洪水必然會影響下游,導(dǎo)致下游也波瀾其掂。而下游起了波瀾油挥,上游該咋地還咋地。
* ######Schedulers的幾個方法具體區(qū)別
我們在上面大體的把幾個都解釋了一下,但是具體的什么時候用什么呢喘漏,我們一一來看
* compute
還記的上面說的維護了靜態(tài)五個Scheduler么护蝶,我們看到compute對應(yīng)的Scheduler是`ComputationScheduler`,源碼走起翩迈!
```
/**
* Create a scheduler with pool size equal to the available processor
* count and using least-recent worker selection policy.
*/
public ComputationScheduler() {
this(THREAD_FACTORY);
}
```
注釋夠明顯了么持灰,翻譯過來大致是說會起一個線程池,大小跟available processor的數(shù)量(就是CPU數(shù)量)相等负饲,并使用最近的工作線程選擇策略堤魁。
也就是說,compute方法會用把方法放在一個大小等于CPU核數(shù)的線程池中執(zhí)行返十。
* single
```
public SingleScheduler() {
this(SINGLE_THREAD_FACTORY);
}
public SingleScheduler(ThreadFactory threadFactory) {
this.threadFactory = threadFactory;
executor.lazySet(createExecutor(threadFactory));
}
static ScheduledExecutorService createExecutor(ThreadFactory threadFactory) {
return SchedulerPoolFactory.create(threadFactory);
}
我們看到也是創(chuàng)建了一個線程池妥泉,我們?nèi)reate方法繼續(xù)看,
public static ScheduledExecutorService create(ThreadFactory factory) {
final ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, factory);
if (exec instanceof ScheduledThreadPoolExecutor) {
ScheduledThreadPoolExecutor e = (ScheduledThreadPoolExecutor) exec;
POOLS.put(e, exec);
}
return exec;
}
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory) {
super(corePoolSize, Integer.MAX_VALUE,
DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
new DelayedWorkQueue(), threadFactory);
}
不知道在android面試實用版2中洞坑,大家有否記得盲链,說道的幾個線程池,這里用到到就是計劃線程池迟杂,主要是用來在未來的某一時刻進行執(zhí)行的線程池刽沾,我們看到傳入的是1,也就是說single會讓方法在核心線程為1的線程池中工作排拷。
* io
```
public IoScheduler() {
this(WORKER_THREAD_FACTORY);
}
public IoScheduler(ThreadFactory threadFactory) {
this.threadFactory = threadFactory;
this.pool = new AtomicReference<CachedWorkerPool>(NONE);
start();
}
使用的一個叫`CachedWorkerPool`的內(nèi)部類侧漓,
CachedWorkerPool(long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) {
this.keepAliveTime = unit != null ? unit.toNanos(keepAliveTime) : 0L;
this.expiringWorkerQueue = new ConcurrentLinkedQueue<ThreadWorker>();
this.allWorkers = new CompositeDisposable();
this.threadFactory = threadFactory;
ScheduledExecutorService evictor = null;
Future<?> task = null;
if (unit != null) {
evictor = Executors.newScheduledThreadPool(1, EVICTOR_THREAD_FACTORY);
task = evictor.scheduleWithFixedDelay(this, this.keepAliveTime, this.keepAliveTime, TimeUnit.NANOSECONDS);
}
evictorService = evictor;
evictorTask = task;
}
ScheduledThreadPoo線程池,每一個任務(wù)相隔60納秒(好小的樣子)监氢。并且維護了一個任務(wù)隊列
ThreadWorker get() {
if (allWorkers.isDisposed()) {
return SHUTDOWN_THREAD_WORKER;
}
while (!expiringWorkerQueue.isEmpty()) {
ThreadWorker threadWorker = expiringWorkerQueue.poll();
if (threadWorker != null) {
return threadWorker;
}
}
// No cached worker found, so create a new one.
ThreadWorker w = new ThreadWorker(threadFactory);
allWorkers.add(w);
return w;
}
可以不斷的對任務(wù)隊列進行處理布蔗,可以認為這個隊列是無限長的,好了浪腐。纵揍。這邊扯得有點遠。议街。
也就是說io會讓方法執(zhí)行在一個無數(shù)量上線的線程池
* newThread
newThreadScheduler的代碼是最少的骡男,我們直接可以看到
```
@NonNull
@Override
public Worker createWorker() {
return new NewThreadWorker(threadFactory);
}
每次都會起一個新的線程,也就是說newthread會讓方法都在新的線程中執(zhí)行
* trampoline
如注釋傍睹。就是在當前線程中執(zhí)行,并且等當前線程執(zhí)行完了之后再去執(zhí)行犹菱。
別問我怎么沒有源碼了拾稳,因為我不懂。
那么綜上所述腊脱,一般我們使用的最多的應(yīng)該就是主線程和io了访得,如果你要使用其他的方法,那么具體任務(wù)具體分析了。
電梯
抱著陌生的態(tài)度再看Rxjava(一)
抱著陌生的態(tài)度再看Rxjava(三)
抱著陌生的態(tài)度再看Rxjava(四)