rxjava 的東西是很多的马澈,難免有理解錯誤的地方,這兩天面試碰到有人問 subscribeOn/observeOn 線程切換的問題弄息,我回答完痊班,面試官明顯不滿意,回來找了找資料摹量,還真是自己理解錯了涤伐,有必要專門寫一篇文章出來馒胆。
例子1 : subscribeOn/observeOn 最簡單使用
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
Log.d("AA", "數(shù)據(jù)源" + Thread.currentThread().getName());
emitter.onNext("");
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d("AA", "監(jiān)聽者" + Thread.currentThread().getName());
}
});
按照之前的理解:
- subscribeOn 是決定 observable 中產(chǎn)生數(shù)據(jù)的方法執(zhí)行在哪個線程
- observeOn 是決定 observer 消費數(shù)據(jù)的方法執(zhí)行在哪個線程
我們看這個最簡單的例子,的確是這樣凝果,那么更復雜的情況呢祝迂。
例子2:subscribeOn/observeOn 連著重復寫,哪個為準
還是以上面那個最簡單的例子來
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
Log.d("AA", "數(shù)據(jù)源" + Thread.currentThread().getName());
emitter.onNext("");
}
})
.subscribeOn(Schedulers.io())
.subscribeOn(Schedulers.computation())
.observeOn(AndroidSchedulers.mainThread())
.observeOn(Schedulers.io())
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d("AA", "監(jiān)聽者" + Thread.currentThread().getName());
}
});
從結(jié)果來看:
- 多個 subscribeOn 連著寫器净,以第一個為準
- 多個 observeOn 連著寫型雳,以最后一個為準
例子3 :添加多個操作符呢
rxjava 中的操作符基本都會生成一個新的 observable 出來,上下游的關(guān)系就復雜了山害,情況會不會有變化呢纠俭,這個例子就復雜了
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
Log.d("AA", "數(shù)據(jù)源" + Thread.currentThread().getName());
emitter.onNext("");
}
})
.subscribeOn(Schedulers.computation())
.map(new Function<String, String>() {
@Override
public String apply(String s) throws Exception {
Log.d("AA", "第1次變化" + Thread.currentThread().getName());
return "";
}
})
.subscribeOn(AndroidSchedulers.mainThread())
.map(new Function<String, String>() {
@Override
public String apply(String s) throws Exception {
Log.d("AA", "第2次變化" + Thread.currentThread().getName());
return "";
}
})
.observeOn(Schedulers.io())
.map(new Function<String, String>() {
@Override
public String apply(String s) throws Exception {
Log.d("AA", "第3次變化" + Thread.currentThread().getName());
return "";
}
})
.map(new Function<String, String>() {
@Override
public String apply(String s) throws Exception {
Log.d("AA", "第4次變化" + Thread.currentThread().getName());
return "";
}
})
.observeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d("AA", "監(jiān)聽者" + Thread.currentThread().getName());
}
});
從結(jié)果看:
- subscribeOn 決定 Observable.create 的執(zhí)行線程,之后再寫 subscribeOn 浪慌,無論是挨著寫冤荆,還是隔著操作符寫都沒有意思
- subscribeOn 決定數(shù)據(jù)源的執(zhí)行線程后,也會當前線程置為這個線程眷射,若無其他設(shè)置匙赞,之后操作符的操作也是在當前線程執(zhí)行,也就是 subscribeOn 指定的線程
- observeOn 不僅僅可以決定 .subscribe 執(zhí)行的線程妖碉,更能夠更改 observeOn 之后書寫的操作符的執(zhí)行線程涌庭,也就是可以切換當前線程。
例子4:用 observeOn 給多個操作符切換線程
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
Log.d("AA", "數(shù)據(jù)源" + Thread.currentThread().getName());
emitter.onNext("");
}
})
.subscribeOn(Schedulers.computation())
.observeOn( Schedulers.io() )
.observeOn(AndroidSchedulers.mainThread())
.map(new Function<String, String>() {
@Override
public String apply(String s) throws Exception {
Log.d("AA", "第1次變化" + Thread.currentThread().getName());
return "";
}
})
.observeOn(Schedulers.io())
.map(new Function<String, String>() {
@Override
public String apply(String s) throws Exception {
Log.d("AA", "第2次變化" + Thread.currentThread().getName());
return "";
}
})
.observeOn(Schedulers.computation())
.map(new Function<String, String>() {
@Override
public String apply(String s) throws Exception {
Log.d("AA", "第3次變化" + Thread.currentThread().getName());
return "";
}
})
.observeOn(AndroidSchedulers.mainThread())
.map(new Function<String, String>() {
@Override
public String apply(String s) throws Exception {
Log.d("AA", "第4次變化" + Thread.currentThread().getName());
return "";
}
})
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d("AA", "監(jiān)聽者" + Thread.currentThread().getName());
}
});
從結(jié)果看:
- observeOn 的的確確是可以 rxjava 所在線程
好了來說說原理
因為 observeOn() 指定的是 Subscriber 的線程欧宜,而這個 Subscriber 并不是(嚴格說應該為『不一定是』坐榆,但這里不妨理解為『不是』)subscribe() 參數(shù)中的 Subscriber ,而是 observeOn() 執(zhí)行時的當前 Observable 所對應的 Subscriber 冗茸,即它的直接下級 Subscriber 席镀。換句話說,observeOn() 指定的是它之后的操作所在的線程夏漱。因此如果有多次切換線程的需求豪诲,只要在每個想要切換線程的位置調(diào)用一次 observeOn() 即可。 ——扔物線
摘自:擁抱RxJava(番外篇):關(guān)于RxJava的Tips & Tricks 挂绰,推薦大家去看看原文
我們翻翻源碼呢屎篱,看看能不能簡單的走一下邏輯
- 我們可以看到 map 是生成了一個新的 observable 出來,這個 observable 還有我們的變化數(shù)據(jù)的接口類
- 在這個 新的 observable 里面葵蒂,有上面的 observable 對象引用交播,然后給這個上面的 observable 對象注冊了一個新的觀察者進來
- 這個新的觀察者即是一個 observer,但同時還是一個 observable 践付,這個新的觀察者在數(shù)據(jù)生成方法中接受上一級 observable 發(fā)送過來的數(shù)據(jù)秦士,然后根據(jù)我們傳入的數(shù)據(jù)變換接口對象計算出新的數(shù)據(jù),最后發(fā)送給消費者或是下一級
不是很好理解永高,但是大概應該是這個意思
換個更容易理解的描述:
- subscribeOn 決定的是上游線程隧土,上游切換到哪個線程提针,下游要是不改的話,rxjava 就在這個線程一直跑
- observeOn 決定的是下游線程
- 整個 rxjava 中嚴格說來真正的上游只有一個次洼,那就是產(chǎn)生數(shù)據(jù)的位置关贵,比如 .just / ,create,其他任何變換和操作符卖毁,注冊都是下游揖曾。
- 所以 subscribeOn 只有第一次切換有效,作用范圍也是最小的亥啦,就是 .just / ,create
- 基本上操作符都會生成一個新的 observable 出來炭剪,和之前的 observable 關(guān)聯(lián)(其實也是注冊到之前的 observable)。所以在一個操作的范圍來看翔脱,前一個 observable 發(fā)送數(shù)據(jù)給我奴拦,算是上游,我這個操作符消費數(shù)據(jù)届吁,產(chǎn)生新的 observable 错妖,算是下游
- 所以 observeOn 可以多次切換他之后的操作符的線程