引言
簡(jiǎn)單闡述RxJava流程源碼,RxJava有以下三種流程,向下遞增挽绩。
- Observable->Observer
- Observable->Operator->Observer
- Observable->Operator->Scheduler->Observer
簡(jiǎn)單流程
先看一下RxJava簡(jiǎn)單的代碼:
Observable.create(new Observable.OnSubscribe<Boolean>() {
@Override
public void call(Subscriber<? super Boolean> s) {
s.onNext(true);
s.onCompleted();
}
}).subscribe(new Observer<Boolean>() {
@Override
public void onCompleted() {
_log("On complete");
_progress.setVisibility(View.INVISIBLE);
}
@Override
public void onError(Throwable e) {
Timber.e(e, "Error in RxJava Demo concurrency");
_log(String.format("Boo! Error %s", e.getMessage()));
_progress.setVisibility(View.INVISIBLE);
}
@Override
public void onNext(Boolean bool) {
_log(String.format("onNext with return value \"%b\"", bool));
}
});
據(jù)上:Obervable create傳入Observable.OnSubscribe作為參數(shù)膛壹。OnSubscribe 會(huì)被存儲(chǔ)在返回的 Observable 對(duì)象中,它的作用相當(dāng)于一個(gè)計(jì)劃表唉堪,當(dāng)Observable被訂閱的時(shí)候模聋,即當(dāng)Obervable.subscribe(observer)的時(shí)候,OnSubscribe 的 call() 方法會(huì)自動(dòng)被調(diào)用,事件序列就會(huì)依照設(shè)定依次觸發(fā)唠亚。
看下subscribe method 源碼
將源碼簡(jiǎn)化一下就是這樣:
public Subscription subscribe(Subscriber subscriber) {
subscriber.onStart();//可選的準(zhǔn)備方法
onSubscribe.call(subscriber);//開始執(zhí)行計(jì)劃表
return subscriber;//返回Subscription 方便unsubscribe()
}
你會(huì)發(fā)現(xiàn)觸發(fā)Observable的時(shí)間是subscribe產(chǎn)生訂閱關(guān)系的時(shí)候链方。
另外你會(huì)看到subscribe中參數(shù)是Subscriber類型,Subscriber是實(shí)現(xiàn)Oberver的抽象類。
帶有Operator的流程
這里我們說的Operator是一種抽象的概念,上述簡(jiǎn)單流程中我們是手寫計(jì)劃表OnSubscribe,而這里我們不需要,我們只需要調(diào)用just,map,等轉(zhuǎn)換操作,這些操作內(nèi)部幫我們實(shí)現(xiàn)了OnSubscribe計(jì)劃表,我將這些操作稱之為Operator.
我們看一下帶有Operator的簡(jiǎn)單代碼:
Observable.just(true).map(new Func1<Boolean, Boolean>() {
@Override
public Boolean call(Boolean aBoolean) {
_log("Within Observable");
_doSomeLongOperation_thatBlocksCurrentThread();
return aBoolean;
}
}).subscribe(new Observer<Boolean>() {
@Override
public void onCompleted() {
_log("On complete");
_progress.setVisibility(View.INVISIBLE);
}
@Override
public void onError(Throwable e) {
Timber.e(e, "Error in RxJava Demo concurrency");
_log(String.format("Boo! Error %s", e.getMessage()));
_progress.setVisibility(View.INVISIBLE);
}
@Override
public void onNext(Boolean bool) {
_log(String.format("onNext with return value \"%b\"", bool));
}
});
據(jù)上:和第一種簡(jiǎn)單流程的代碼相比,subscribe之后的代碼是一致的,不同的是subscribe之前的代碼趾撵。兩個(gè)方法just和map method,通過源碼分析下功能侄柔。
- just method
just function:將傳入的參數(shù)發(fā)送給Subscriber訂閱者。
你可以根據(jù)上面ScalarSynchronousObservable的構(gòu)造函數(shù),重寫OnSubscribe計(jì)劃表看出來占调。
- map method
將上面代碼簡(jiǎn)化就是:
public <R> Observable<R> lift(Operator<? extends R, ? super T> operator) {
return Observable.create(new OnSubscribe<R>() {
@Override
public void call(Subscriber subscriber) {
Subscriber newSubscriber = operator.call(subscriber);//通過Operator創(chuàng)建一個(gè)新的Subscriber,通過OperatorMap將新舊Subscriber建立一種關(guān)系暂题。
newSubscriber.onStart();//準(zhǔn)備操作
onSubscribe.call(newSubscriber);//onSubscribe計(jì)劃表喚醒新的Subscriber,新的Subscriber會(huì)聯(lián)系subscribe訂閱的Subscriber(訂閱者)。
}
});
}
借助拋物線文章中的圖抽象表現(xiàn)出來:
再帶上Scheduler的流程
Scheduler function:指明Observable和Observer是運(yùn)行在哪個(gè)線程中究珊。
- subscribeOn指定的是Observable(被觀察者)所在線程
- observeOn指定的是Observer(觀察者)所在線程
我們?cè)趲в蠴perator的基礎(chǔ)上加上Scheduler.
Observable.just(true).map(new Func1<Boolean, Boolean>() {
@Override
public Boolean call(Boolean aBoolean) {
_log("Within Observable");
_doSomeLongOperation_thatBlocksCurrentThread();
return aBoolean;
}
}).subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread()).subscribe(...)
observeOn與subscribeOn也是通過lift變換原理實(shí)現(xiàn)線程切換, observeOn切換線程是發(fā)生在lift方式中的內(nèi)建的Subscriber中,subscribeOn切換線程是發(fā)生在OnSubscribe計(jì)劃表中薪者。
調(diào)度器源碼我也沒太搞明白,以后有機(jī)會(huì)補(bǔ)上。不過調(diào)度器作為RxJava的一大特性,在Android編程中使用非常方便。
總結(jié)
- RxJava最核心流程是Observable->Oberver 當(dāng)subscribe訂閱的時(shí)候,不管增加怎樣復(fù)雜的變化,一定是OnSubscribe計(jì)劃表來通知觀察者,被觀察者發(fā)生變化。
- RxJava靈活的特性主要在兩個(gè)方面,數(shù)據(jù)序列的變化機(jī)制和線程切換成畦。