線(xiàn)程切換
只要使用RxJava肯定對(duì)下面的代碼特別熟悉
Observable.from(list)
.subcribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1(){
public void call(Object obj){
}
});
其中subscribeOn()可以把事件發(fā)生的線(xiàn)程切換到io線(xiàn)程白群,observeOn()可以把處理事件的線(xiàn)程切換到Android應(yīng)用程序主線(xiàn)程。
那他是怎么做到這么簡(jiǎn)潔的切換呢茫舶?
Rxjava的所有變換都基于一個(gè)lift模型虫埂,我們接下來(lái)介紹一下這個(gè)模型房资。
首先回顧一下Observable通知Subscriber的原理队塘。
在生成Observable的時(shí)候我們會(huì)傳入一個(gè)OnSubscribe的實(shí)例,在發(fā)生訂閱關(guān)系subscribe
方法中觉吭,OnSubscribe實(shí)例的call(Subscriber)
方法中就會(huì)調(diào)用傳入的Subscriber的的相關(guān)方法腾供,從而實(shí)現(xiàn)通知,消息發(fā)送鲜滩。
流程大概如下
Observable—(實(shí)例化時(shí))—>OnSubscribe—(訂閱關(guān)系發(fā)生subscribe時(shí)) —>Subscriber
那首先考慮一下我們?cè)趺磳?shí)現(xiàn)在Observable中發(fā)送的是 圓形
事件伴鳖,但是在Subscriber中接收到 方形
事件并處理呢。
如果Observable和Subscriber的事件都不一樣徙硅,這都不能發(fā)生訂閱關(guān)系的榜聂,因?yàn)樵诰幾g檢查的時(shí)候就無(wú)法通過(guò)。
RxJava采用的方法是提供一個(gè)變換方法lift(Operater)
闷游,該方法返回一個(gè)Observable對(duì)象峻汉。該Observable對(duì)象會(huì)發(fā)送方形
事件,這樣就可以用這個(gè)新的Observable對(duì)象來(lái)訂閱原始的Subscriber了脐往。
在新生成的Obaservable對(duì)象的時(shí)候休吠,我們也會(huì)生成該Observable對(duì)應(yīng)的OnSubscribe對(duì)象,并實(shí)現(xiàn)新的OnSubscribe對(duì)象的call(Subscriber)
方法业簿。由于我們用新生成的Observable對(duì)象去訂閱原始的Subscriber,所以新生成的OnSubscribe的call方法中的參數(shù)就是原始的Subscriber了瘤礁。
接下來(lái)就是lift(Operator)
中Operator接口發(fā)揮作用的時(shí)候了,這個(gè)Operator接口的call方法實(shí)現(xiàn)把一個(gè)Subscriber變換成另一個(gè)Subscrber的功能梅尤。
在這里就是把原始的響應(yīng)方形
事件的Subscriber轉(zhuǎn)換成響應(yīng)圓形
事件的Subscriber柜思,這樣就可以調(diào)用原始的OnSubscribe的call方法,把圓形
事件變成發(fā)送給這個(gè)新的響應(yīng)圓形事件的Subscriber.
這樣新的Observable中就同時(shí)包含了:
- 原始的OnSubscribe(它能發(fā)送
圓形
事件) - 新的Subscriber(它能接受
圓形
事件) - 原始的Subscriber(它能接受
方形
事件)
這樣巷燥,在發(fā)生訂閱關(guān)系時(shí)赡盘,原始的Observable(原始的OnSubscribe)會(huì)發(fā)送圓形
事件給新的Subscriber,新的Subscriber在處理的時(shí)候,就把這個(gè)事件轉(zhuǎn)換一下傳遞給原始的Subscriber缰揪。
基本流程就是這樣陨享,Observable.Operator就是實(shí)現(xiàn)新老Subscriber關(guān)聯(lián)的紐帶。
借用扔物線(xiàn)的snippet
//這個(gè)lift的作用就是把只發(fā)送圓形事件的Observable轉(zhuǎn)換成發(fā)送方形事件的Observable
public <Rectangle> Observable<Rectangle> lift(Operator<? extends Rectangle, ? super Round> operator) {
return Observable.create(new OnSubscribe<Rectangle>() {
@Override
//這個(gè)參數(shù)subscriber是原始的subscriber,只接受方形事件
public void call(Subscriber subscriber) {
//這個(gè)newSubscriber是operator生成的新的Subscriber抛姑,它可以接收?qǐng)A形事件
//新的subscriber會(huì)調(diào)用原始的sunscriber的相關(guān)方法
Subscriber newSubscriber = operator.call(subscriber);
newSubscriber.onStart();
//這個(gè)onSubscribe是原始的OnSubscribe赞厕,它發(fā)送圓形事件
//所以可以用新生成的newSubscriber來(lái)接收
onSubscribe.call(newSubscriber);
}
});
}
記不住以上內(nèi)容也沒(méi)有關(guān)系,只要記住在變換中定硝,會(huì)成成一個(gè)新的Observable和Subscriber就可以了皿桑,我們?cè)谧儞Q完之后所進(jìn)行的操作都是針對(duì)新生成的Observable和新Subscriber。
經(jīng)過(guò)變換之后蔬啡,我們擁有兩個(gè)Observable(OnSubscribe),也擁有兩個(gè)Subscriber.
那我們想要切換事件處理的線(xiàn)程怎么辦呢诲侮?我們可以在Operator中生成新的Subscriber的時(shí)候進(jìn)行處理,在newSubscriber和原始subscriber進(jìn)行映射的時(shí)候進(jìn)行切換星爪,所以可以知道observeOn()切換的是所有它下游的線(xiàn)程浆西。
所以如果我們想要切換事件發(fā)生的線(xiàn)程粉私,會(huì)怎么辦呢顽腾?根據(jù)上面的代碼,可以知道诺核,只需要讓最后的onSubscribe.call(newSubscriber)
運(yùn)行在新線(xiàn)程就可以了抄肖。從這句代碼也可以看出,subscribeOn()方法切換的是它的上游線(xiàn)程窖杀,這種線(xiàn)程切換一直會(huì)影響到最原始的observable漓摩。
但是如果在一條鏈?zhǔn)秸{(diào)用中出現(xiàn)了多個(gè)subscribeOn()方法,由于鏈?zhǔn)秸{(diào)用最上游的第一個(gè)subscribeOn方法會(huì)直接影響到最原始的observable入客,而在接下來(lái)的的鏈?zhǔn)秸{(diào)用中消息的發(fā)送是有newSubscriber來(lái)控制的管毙,所以第二個(gè)subscribeOn方法不會(huì)影響線(xiàn)程的切換。
多說(shuō)無(wú)益桌硫,直接上代碼
Observable.just("hello","world","rxjava","rxandroid")
.map(new Func1<String, String>() {
@Override
public String call(String s) {
//此處由于受到下面第一個(gè)subscribeOn的影響夭咬,輸出RxComputationScheduler-1
Log.v("chicodong","thread 1 is: "+Thread.currentThread().getName());
return s.toUpperCase();
}
})
.subscribeOn(Schedulers.computation())
.map(new Func1<String, Integer>() {
@Override
public Integer call(String s) {
//此處由于上一個(gè)subscribeOn切換過(guò)線(xiàn)程,新生成的subscriber是在RxComputationScheduler-1發(fā)送事件的铆隘,所以仍然輸出RxComputationScheduler-1
Log.v("chicodong","thread 2 is: "+Thread.currentThread().getName());
return s.length();
}
})
.map(new Func1<Integer, Integer>() {
@Override
public Integer call(Integer integer) {
//此處由于上一個(gè)subscribeOn切換過(guò)線(xiàn)程卓舵,新生成的subscriber是在RxComputationScheduler-1發(fā)送事件的,所以仍然輸出RxComputationScheduler-1
Log.v("chicodong","thread 3 is: "+Thread.currentThread().getName());
return integer+100;
}
})
//此處的第二個(gè)subscribeOn相當(dāng)于沒(méi)有起作用
.subscribeOn(Schedulers.io())
.map(new Func1<Integer, String>() {
@Override
public String call(Integer integer) {
//此處由于上一個(gè)subscribeOn切換過(guò)線(xiàn)程膀钠,新生成的subscriber是在RxComputationScheduler-1發(fā)送事件的掏湾,所以仍然輸出RxComputationScheduler-1
Log.v("chicodong","thread 4 is: "+Thread.currentThread().getName());
return integer.toString();
}
})
.observeOn(Schedulers.newThread())
.map(new Func1<String, String>() {
@Override
public String call(String s) {
//此處受到obServeOn的影響,輸出RxNewThreadScheduler-1
Log.v("chicodong","thread 5 is: "+Thread.currentThread().getName());
return s+" lalala";
}
})
.observeOn(AndroidSchedulers.mainThread())
.map(new Func1<String, String>() {
@Override
public String call(String s) {
//此處受到第二個(gè)observeOn的影響肿嘲,輸出main
Log.v("chicodong","thread 6 is: "+Thread.currentThread().getName());
return s.toUpperCase();
}
})
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
//此處受到第二個(gè)observeOn的影響融击,輸出main
Log.v("chicodong","final result is "+s+" thread is "+Thread.currentThread().getName());
}
});
雖然多余一個(gè)subscribeOn對(duì)于線(xiàn)程切換沒(méi)有影響,但是它可以在事件還沒(méi)有發(fā)生時(shí)起作用雳窟,最常見(jiàn)的就是doOnSubscribe()方法了
Observable.from(list)
.subscribeOn(Schedulers.io())
.doOnSubscribe(new Action0(){
public void call(){
}
})
.subscribeOn(AndroidSchedulers.mainThread())
.observeOn(AndroidScheduler.mainThread())
.subscribe(new Action1(){
public void call(Object obj){
}
});
上面的代碼中第二個(gè)subscribeOn對(duì)于線(xiàn)程切換沒(méi)有影響尊浪,但是卻可以使doOnSubscribe()運(yùn)行在主線(xiàn)程中。