前言
RxJava 是在今年年初的時候上的車,接觸也快要滿一年了没宾。從最初只知道幾個操作符凌彬,寫寫 Demo 琼梆,或者跟著別人的項目和經(jīng)驗依葫蘆畫瓢偎行,到目前終于有點初窺門徑的地步舰攒。
RxJava 對于 Android 來說诞外,最直觀地便利就在于線程切換羞海。所以本篇內(nèi)容就是學(xué)習(xí) RxJava 是如何實現(xiàn)切換線程环戈。
希望讀者閱讀此篇文章宵膨,是有用過 RxJava 的童鞋犯建。
本章內(nèi)容基于源碼版本
RxJava: 1.2.4
- 文章來源:itsCoder 的 WeeklyBolg 項目
- itsCoder主頁:http://itscoder.com/
- 作者:謝三弟
- 審閱者:
目錄
準備
答案我會放在文章末尾
先來一道開胃菜:
指出下列程序操作符所運行的線程迁酸。
Observable.just() //1
.subscribeOn(Schedulers.newThread())
.map() //2
.subscribeOn(Schedulers.io())
.map() //3
.observeOn(Schedulers.computation())
.map() //4
.observeOn(Schedulers.newThread())
.subscribe() //5
開胃菜就到上面結(jié)束先鱼,如果你能夠清楚明白每個操作運行的線程,說明對于 RxJava 的線程切換的理解很正確胁出。
再具體分析 RxJava 是如何線程切換的型型,希望能清楚以下幾個 RxJava 中名詞的意思。
- Create()
- OnSubscribe
- Operator
如果你特別明白這幾個 RxJava 類/方法的作用全蝶,可以直接跳過看切換這部分闹蒜。
-
Create()
/** * Returns an Observable that will execute the specified function when a {@link Subscriber} subscribes to * it. */ public static <T> Observable<T> create(OnSubscribe<T> f) { return new Observable<T>(RxJavaHooks.onCreate(f)); }
方法注釋上說明,當訂閱者訂閱之后抑淫,該函數(shù)會返回將會執(zhí)行具體功能的流绷落。操作符進入源碼會發(fā)現(xiàn)他們最終都會調(diào)用到
create()
函數(shù)。 -
OnSubscribe
/** * Invoked when Observable.subscribe is called. * @param <T> the output value type */ public interface OnSubscribe<T> extends Action1<Subscriber<? super T>> {}
首先我們知道這是一個繼承
Action1
的接口始苇,并且是在Observable.subscribe
流進行訂閱操作后回調(diào)砌烁。而且回顧剛剛create()
源碼中也發(fā)現(xiàn)參數(shù)就是這個OnSubscribe
。Action
的作用就是執(zhí)行其中的call()
方法催式。Observable.OnSubscribe 有點像 Todo List 函喉,里面都是一個一個待處理的事務(wù),并且這個 List 是有序的(這個很關(guān)鍵)荣月。
-
Operator
public interface Operator<R, T> extends Func1<Subscriber<? super R>, Subscriber<? super T>> { // cover for generics insanity }
簡單來說它的職責(zé)就是將一個
Subscriber
變成另外一個Subscriber
管呵。
切換
上面知識點是一些小鋪墊,因為后面的內(nèi)容核心其實就是上面幾個類的作用哺窄。
SubscribeOn
追蹤這個方法捐下,核心是在這個類:
public final class OperatorSubscribeOn<T> implements OnSubscribe<T> {
final Scheduler scheduler;
final Observable<T> source;
public OperatorSubscribeOn(Observable<T> source, Scheduler scheduler) {
this.scheduler = scheduler;
this.source = source;
}
}
我先貼出這個類的账锹,構(gòu)造方法和成員變量,因為很重要坷襟,我們先把前因弄清楚奸柬。
首先我們發(fā)現(xiàn)這個類是實現(xiàn)了 OnSubscribe
接口,之前復(fù)習(xí)到這個的作用就是在該流被訂閱之后執(zhí)行 call()
方法婴程,這里面就是后果廓奕,待會我們來看。
前因其實很簡單档叔,就是傳入兩個參數(shù):
一個是
Scheduler
懂从,調(diào)度器,它的具體實現(xiàn)在Schedulers
里蹲蒲。-
Observable<T> source
這個其實就是當前這個流。public final Observable<T> subscribeOn(Scheduler scheduler) { if (this instanceof ScalarSynchronousObservable) { return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler); } return create(new OperatorSubscribeOn<T>(this, scheduler)); }
接下來看看 call()
核心代碼里做的事情:
// 因為是 OnSubscribe 類侵贵,這里 call() 中傳入的參數(shù)是 Observable.subscribe(s) 中的 s
@Override
public void call(final Subscriber<? super T> subscriber) {
// 根據(jù)傳入的調(diào)度器届搁,創(chuàng)建一個 Worker 對象 inner
final Worker inner = scheduler.createWorker();
subscriber.add(inner);
// 在 Worker 對象 inner 中執(zhí)行(意思就是,在我們指定的調(diào)度器創(chuàng)建的線程中運行)
inner.schedule(new Action0() {
@Override
public void call() {
final Thread t = Thread.currentThread();
// 對訂閱者包裝
Subscriber<T> s = new Subscriber<T>(subscriber) {
@Override
public void onNext(T t) {
subscriber.onNext(t);
}
······
};
// 這一句位置很關(guān)鍵
// 首先 source 是之前傳入的流(也就是當前流)窍育,在 Worker 內(nèi)部進行了訂閱操作卡睦,所以該流所有操作都執(zhí)行在其中
source.unsafeSubscribe(s);
}
});
}
通過我們指定的調(diào)度器,創(chuàng)建好 Worker 漱抓,之前傳入的流在 Worker 內(nèi)部表锻,對重新包裹的 subscriber 進行訂閱操作。
所以 SubscribeOn()
最關(guān)鍵的地方其實是因為這行代碼在調(diào)度器創(chuàng)建的 Worker 的 call()
中
source.unsafeSubscribe(s);
總結(jié):
subscribeOn
其實是改變了調(diào)用前序列所運行的線程乞娄。
ObserveOn
同樣的方法來分析瞬逊,最終的回調(diào)會到:
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
if (this instanceof ScalarSynchronousObservable) {
return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
}
return lift(new OperatorObserveOn<T>(scheduler, delayError, bufferSize));
}
其實看到關(guān)鍵字 lift 和 operator 就大約可以猜到是做什么的了。
接下來我們進入到 OperatorObserveOn
類中:
public final class OperatorObserveOn<T> implements Operator<T, T> {
private final Scheduler scheduler;
// 省略不必要的代碼
@Override
public Subscriber<? super T> call(Subscriber<? super T> child) {
// 省略 ···
ObserveOnSubscriber<T> parent = new ObserveOnSubscriber<T>(scheduler, child, delayError, bufferSize);
parent.init();
return parent;
}
}
}
我們首先會注意到它是一個 Operator
仪或,并且沒有對上層 Observale 做任何修改和包裝确镊。那么它的作用就是將一個 Subscriber
變成另外一個 Subscriber
。所以接下來我們的首要任務(wù)就是看轉(zhuǎn)換后的 Subscriber
做了什么改變范删。
關(guān)鍵代碼在
ObserveOnSubscriber<T> parent = new ObserveOnSubscriber<T>(scheduler, child, delayError, bufferSize);
parent.init();
child 是改變前的 Subscriber
蕾域,最后返回了 parent 。
我們發(fā)現(xiàn) ObserveOnSubscriber
同樣也是一個 Subscriber
類到旦,所以肯定含有 onNext/onError/onComplete
這三個標準方法旨巷,重要的肯定是 onNext
,所以我只貼上了該類三個有關(guān)函數(shù)添忘。
void init() {
Subscriber<? super T> localChild = child;
localChild.setProducer(new Producer() {
@Override
public void request(long n) {
if (n > 0L) {
BackpressureUtils.getAndAddRequest(requested, n);
// 執(zhí)行
schedule();
}
}
});
// recursiveScheduler 這個是構(gòu)造函數(shù)時傳入調(diào)度器創(chuàng)建的 worker
localChild.add(recursiveScheduler);
localChild.add(this);
}
@Override
public void onNext(final T t) {
if (isUnsubscribed() || finished) {
return;
}
// 條件判斷里先將之前流的結(jié)果緩存進隊列
if (!queue.offer(on.next(t))) {
onError(new MissingBackpressureException());
return;
}
// 執(zhí)行
schedule();
}
protected void schedule() {
if (counter.getAndIncrement() == 0) {
// 在當前 worker 上執(zhí)行該類的 call 方法
recursiveScheduler.schedule(this);
}
}
call()
方法有點冗長采呐,做的事情其實很簡單,就是取出我們緩存之前流的所有值昔汉,然后在 Worker 工作線程中傳下去懈万。
總結(jié):
- ObserveOn 不會關(guān)心之前的流的線程
- ObserveOn 會先將之前的流的值緩存起來拴清,然后再在指定的線程上,將緩存推送給后面的
Subscriber
共用時各自的作用域
Observable.just() //1
.subscribeOn(Schedulers.newThread())
.map() //2
.map() //3
.observeOn(Schedulers.computation())
.map() //4
.observeOn(Schedulers.newThread())
.subscribe() //5
如果分析這個流各個操作符的執(zhí)行線程会通,我們先把第一個 subscribeOn()
之前和第一個 observeOn()
之前的 Todo Items 找出來然后求并集:
得到的結(jié)果就是 subscribeOn()
的作用域口予。
之后的線程切換簡單了,遇到 observeOn()
就切換一次涕侈。
思考
為什么subscribeOn
只有第一次調(diào)用生效沪停?
我的理解如下:
subscribeOn
的作用域就是調(diào)用前序列中所有的 Todo List 任務(wù)清單(Observable.OnSubscribe),當我們執(zhí)行 subscribe()
時裳涛,這些任務(wù)清單就會執(zhí)行在 subscribeOn
指定的工作線程木张,而第二個 subscribeOn
早就沒有任務(wù)可做了,所以無法生效端三。
知乎里這段說的比我專業(yè):
正像 StackOverflow 上那段描述的舷礼,整個 Observable 數(shù)據(jù)流工作起來是分為兩個階段(或者說是兩個 lifecycle):upstream 的 subscription-time 和 downstream 的 runtime。
subscription-time 的階段郊闯,是為了發(fā)起和驅(qū)動數(shù)據(jù)流的啟動妻献,在內(nèi)部實現(xiàn)上體現(xiàn)為 OnSubscribe 向上游的逐級調(diào)用(控制流向上游傳遞)。支持 backpressure 的 producer request 也屬于這個階段团赁。除了 producer request 的情況之外育拨,subscription-time 階段一般就是從下游到上游調(diào)用一次就結(jié)束了,最終到達生產(chǎn)者(以最上游的那個 OnSubscribe 來體現(xiàn))欢摄。接下來數(shù)據(jù)流就開始向下游流動了熬丧。
Rxjava 中, subscribeOn 及 observeOn 方法切換線程發(fā)生的位置為什么設(shè)計為不同的怀挠? - 知乎
doOnSubscribe 的例外
我們再改動下開胃菜的代碼:
Observable.just() //1
.subscribeOn(Schedulers.newThread())
.map() //2
.subscribeOn(Schedulers.io())
.map() //3
.observeOn(Schedulers.computation())
.map() //4
.doOnSubscribe() //6
.observeOn(Schedulers.newThread())
.subscribe() //5
只添加了一行.doOnSubscribe() //6
析蝴,也是探討這個操作符執(zhí)行的線程。
public class OperatorDoOnSubscribe<T> implements Operator<T, T> {
private final Action0 subscribe;
public OperatorDoOnSubscribe(Action0 subscribe) {
this.subscribe = subscribe;
}
@Override
public Subscriber<? super T> call(final Subscriber<? super T> child) {
// 執(zhí)行我們的 Action
subscribe.call();
// Wrap 里面是包裝成一個新的 Subscriber 返回唆香,不對這個流做任何改變
return Subscribers.wrap(child);
}
}
doOnSubscribe 執(zhí)行的線程其實就是 subscribe.call();
所在的線程嫌变。這里觸發(fā)的時機就是,當我們進行 Observable.subscribe()
時躬它,如果我們沒有在緊接之后SubscribeOn
指定線程腾啥,那么它就會運行在默認線程,然后返回一個新的流冯吓。
關(guān)于 doOnSubscribe()
留一個問題
Observable.just()
.doOnSubscribe() // 1
.doOnSubscribe() // 2
.subscribe()
問題是倘待,對于 1 和 2 的執(zhí)行順序?
在開發(fā)中组贺,我們肯定不會像問題那樣寫代碼凸舵,只是自己在看 doOnSubscribe 源碼的時候,在問自己為什么它在其他操作符之前失尖,拓展到了 RxJava 流的一個執(zhí)行順序啊奄,也是自己想要明白的地方渐苏。所以下次準備探討學(xué)習(xí)。
對了菇夸,老司機說 RxJava 很像洋蔥琼富,一層一層。
進行分析學(xué)習(xí)的時候可以類比幫助理解庄新。
參考
Thomas Nield: RxJava- Understanding observeOn() and subscribeOn()
SubscribeOn 和 ObserveOn |Piasy Blog
答案:
1 newThread
2 newThread
3 newThread
4 computation
5 newThread