[TOC]
在看此文之前建議看下食听,http://www.reibang.com/p/9ee9fa13eeef這篇文章亡驰,只有圖划咐,沒有字
RxJava是什么
一個基于觀察者模式的異步任務(wù)框架
好在哪切威?
好在用
RxJava
做的異步請求更簡明更清晰
舉例
需求:在IO線程上執(zhí)行三個網(wǎng)絡(luò)請求操作分別為query(A)
,query(B)
,query(C)
,且query(B)
依賴于query(A)
返回的結(jié)果勺馆,同樣query(C)
依賴于query(B)
返回的結(jié)果
用android的異步框架得這么寫(偽代碼):
Server server = ...;
server.makeRequest(new Query('A'), new Listener(){
onSuccess(boolean b){
if(b){
server.makeRequest(new new Query('B'), new Listener(){
onSuccess(boolean b){
if(b){
server.makeRequest(new Query('C'), new Listener(){
onSuccess(boolean b){
}
})
}
}
})
}
}
})
用Rxjava只需要這么寫(偽代碼)
Observable.just("A").flatMap((s) -> {
return makeRequest(new Query(s));
}).flatMap((aBoolean) -> {
if(aBoolean) return makeRequest(new Query("B"));
return null;
}).flatMap((aBoolean) -> {
if(aBoolean) return makeRequest(new Query("C"));
return null;
}).subscribeOn(Scheduals.io);
public static Observable<Boolean> makeRequest(Query query){
return Observable.just(query)
.map(new Function<Query, Boolean>() {
@Override
public Boolean apply(Query query) throws Exception {
//TODO
return true;
}
});
}
非常簡潔库物,避免了回調(diào)地獄粘茄,之后會通過分析源碼,去思考能夠避免回調(diào)地獄的原因
準備知識
響應(yīng)式編程
響應(yīng)式編程是一種通過異步和數(shù)據(jù)流來構(gòu)建事物關(guān)系的編程模型
數(shù)據(jù)流
是兩個事物(在這里我們理解為函數(shù))間關(guān)系的橋梁柒瓣,且只有一個方向儒搭,即從上游實體到下游實體搂鲫。舉個例子
f(x1)
與g(x2)
之間如何產(chǎn)生關(guān)系拣挪?x1
做為f的輸入,當f(x1)
生成后會通過數(shù)據(jù)(事件)流通知g(x2)
執(zhí)行媒吗,這里的f(x1)
就是上游實體闸英,g(x2)
就是下游實體锯岖。但如果有這樣的需求,三個獨立的函數(shù)f(x1),f(x2),f(x3)
都完成后再通知g(x2)
甫何?應(yīng)該怎樣去構(gòu)建他們的關(guān)系出吹?就是我們接下來要講用異步的方式去構(gòu)建
異步
數(shù)據(jù)流不能完全構(gòu)建出函數(shù)之間的關(guān)系。如數(shù)據(jù)流一節(jié)所說
f(x1),f(x2),f(x3)
是相互獨立的辙喂,他們之間的關(guān)系是獨立的捶牢。這種獨立的關(guān)系就可以用異步來表示。所以解決上一節(jié)的問題便是讓f(x1),f(x2),f(x3)
在各自的線程中執(zhí)行巍耗,完成后再用數(shù)據(jù)流通知給g(x)
小結(jié)
異步是為區(qū)分無關(guān)的事物秋麸,數(shù)據(jù)流是為了聯(lián)系起有關(guān)的事物。那么如何實現(xiàn)數(shù)據(jù)流傳遞呢炬太?就用到下面的觀察者模式
觀察者模式
觀察者模式面向的需求是:A對象對B對象的某種變化高度敏感灸蟆,當B對象發(fā)生變化時,A對象需要瞬間做出反應(yīng)亲族,一般實現(xiàn)觀察者模式需要有觀察者
Observer
即A對象炒考,有被觀察者Observable
即B對象,在實現(xiàn)的時候B對象需要持有A對象的引用可缚,這樣當B對象發(fā)生變化時,B對象才能通過A對象的引用讓A對象做出反應(yīng)斋枢,android中的典型實現(xiàn)便是監(jiān)聽器事件帘靡,View
是被觀察者,OnClickListener
是觀察者用setOnClickListener()
,讓View
持有OnClickListener
的引用瓤帚,當View
監(jiān)聽到點擊事件時便通知OnClickListener
進行處理描姚。這樣子就簡單的實現(xiàn)了數(shù)據(jù)流從B->A的傳遞。
解決問題的模型
RxJava
可以通過很多操作符(就是RxJava中的一些方法)解決許多問題模型, 盡然它是異步任務(wù)框架戈次,我們就來看看它是怎么處理異步任務(wù)問題模型的轰胁,只解釋其中兩種比較典型的問題模型。
map解決的模型
由模型圖可知朝扼,首先我們需要創(chuàng)建可觀測序列,然后再用觀察者模式去通知它的下游實體
map
操作(其實模型中的虛線基本上是由觀察者模式和異步實現(xiàn)的)霎肯,在Map操作完成后形成了另一個可觀測序列擎颖,在用觀察者模式去通知這個序列依次輸出。這樣的模型可用來解決如下需求:子線程執(zhí)行一個耗時任務(wù)观游,執(zhí)行完成后返回給主線程
通過模型圖可知搂捧,創(chuàng)建操作后需要通知變換操作,這個通知就用觀察者模式實現(xiàn)懂缕。而變換操作是獨立的而且在子線程允跑,所以需要通過異步來實現(xiàn),且變換操作執(zhí)行完成后要通知給主線程的搪柑。所以也要使用觀察者模式
創(chuàng)建操作
如前文所述聋丝,創(chuàng)建操作可以看做是一個函數(shù)
f(x)
,由于f(x)
要通知下游的,所以這里的f(x)
是被觀察者工碾,在RxJava里用Observable
表示被觀察者去發(fā)起通知弱睦。在RxJava中f
為just,假設(shè)這里的輸入x為"A",所以其創(chuàng)建操作為Observable.just('A')
.
變換操作 f
同理這里的變換操作為
map
渊额,需要運行在子線程况木,要用Handler
實現(xiàn)。
通知操作
而子線程的通知操作也要用觀察者模式實現(xiàn)旬迹,其需要引用一個觀察者火惊,這個觀察者需要自己定義,也就是說某個耗時的轉(zhuǎn)換操作在子線程運行完成后奔垦,要發(fā)送到你自己定義的主線程的觀察者中
flatMap解決模型
從模型圖我們可以看到屹耐,F(xiàn)latMap里面的數(shù)據(jù)有兩個特點:
- 數(shù)據(jù)被分成了n個
- 這n個數(shù)據(jù)也在可觀測的序列上
對應(yīng)的 ,它能解決兩個基本需求: - 原始的單個數(shù)據(jù)是集合類宴倍,比如
List<String>
张症,FlatMap
可以把它們變成一個個String
. - 這每個String都在可觀測序列上仓技,所以也能有通知操作的能力
所以對于第一點他能夠簡單遍歷二維數(shù)據(jù),舉個例子:需求是遍歷所有學(xué)生選的課程
final List<Student> list = ...
Disposable disposable = Observable.fromIterable(list)
/*將學(xué)生的課程發(fā)送出去俗他,從學(xué)生實例得到課程實例脖捻,再發(fā)射出去*/
.flatMap(new Function<Student, ObservableSource<Student.Course>>() {
@Override
public ObservableSource<Student.Course> apply(Student student) throws Exception {
//Log.d(TAG,"flatmap student name = "+student.name);
return Observable.fromIterable(student.getCourseList());
}
})
/*接受到學(xué)生的課程*/
.subscribe(new Consumer<Student.Course>() {
@Override
public void accept(Student.Course course) throws Exception {
System.out.printf("Consumer accept course = " + course.getName());
}
});
}
對于第二點,它能夠解決的是兆衅,網(wǎng)絡(luò)請求嵌套的問題地沮。舉個例子(該例子引用自https://blog.csdn.net/jdsjlzx/article/details/51493552):需求是queryA
和 queryB
. 并且queryB的運行依賴于queryA的結(jié)果
RxJava中主要類介紹
Observable
相當于觀察者模式中的被觀察者
Observer
相當于觀察者模式中的觀察者
主要類圖
RxJava中的是如何實現(xiàn)兩個模型的?
先以map
為例
創(chuàng)建可觀測序列Observable
-
just
:Observable observable = Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9);
創(chuàng)建變換后的可觀測序列
-
map
:
Observable observable2 = observable.map(new Function<Integer, Integer>() {
@Override
public Integer apply(Integer integer) throws Exception {
//模擬網(wǎng)絡(luò)請求
Thread.sleep(5000);
return 1;
}
})
創(chuàng)建觀察者Observer
Consumer consumer = new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
//TODO
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
}
};
如何關(guān)聯(lián)被觀察者與觀察者羡亩,形成數(shù)據(jù)流摩疑。
在RxJava中,subscribe()這里既是訂閱畏铆,其默認狀態(tài)也發(fā)生了變化. 我們可以用鏈式調(diào)用把他們串起來
observable2.subscribe(consumer);
上述代碼實現(xiàn)了在主線程傳遞序列雷袋,但實際上可以理解為循環(huán)了上述序列,但這只是一個同步的實現(xiàn)辞居。而RxJava是一個異步框架楷怒,能夠很方便的進行線程切換,只需要在合適的位置加上subscrieOn,observeOn即可瓦灶,接著上面的例子
observable2
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
subscribe(consumer);
該例子實現(xiàn)了map
操作在子線程運行鸠删,然后切換回主線程通知觀察者執(zhí)行. 下面深入具體的源碼去分析一下上面這個例子
模型一需求解決方案
需求
請求網(wǎng)絡(luò),有結(jié)果返回主線程
重貼一下上面的代碼
Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9)
.map(new Function<Integer, Integer>() {
@Override
public Integer apply(Integer integer) throws Exception {
//模擬網(wǎng)絡(luò)請求
Thread.sleep(5000);
return 1;
}
}) .subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
//TODO
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
}
});
源碼分析
- 可觀測序列的創(chuàng)建操作
-
just(item)
:時序圖為
調(diào)用just的類 -> Observable : just(items) Observable -> Observable : fromArray(items) Observable -> RxJavaPlugins : onAssembly(new ObservableFromArray<T>(items))
-
這里的onAssembly
方法解釋如下
new ObservableFromArray<T>(items)
,就是ObservableFromArray
這個被觀察者中保存items
這個數(shù)組
- map()
:存儲了ObservableFromArray
(map的上游實體)的引用,我們用ofa_this
表示,與Function
對象的引用我們用fun1
表示
- subscribeOn()
:存儲了其上游被觀察者ObservableMap
的引用map_this
和IO調(diào)度器
- observeOn()
:存儲了ObservableSubscribeOn
的上游的引用sub_this
和UI線程調(diào)度器
- 思考:為什么當前對象要存儲之前對象所對應(yīng)的Observable
引用贼陶?
> 因為后面需要用到這些引用去訂閱對應(yīng)的觀察者Observer刃泡,如下圖
-
訂閱操作:
- 思考:上游如何通知下游呢?
分析模型一可知碉怔,因為中間有線程切換操作加數(shù)據(jù)轉(zhuǎn)換操作烘贴,所以數(shù)據(jù)流必須流經(jīng)這兩個實體。所以如果想讓頂層通知到最后的底層的話撮胧,必須要經(jīng)過中間層庙楚,讓數(shù)據(jù)流一層一層傳遞。又根據(jù)觀察者模式趴樱,需要下游的observer訂閱上游的observable,才能讓數(shù)據(jù)從上游流向下游馒闷。
-
源碼中的解決方案
上圖步驟1的源碼如下,2叁征,3的原理同1:
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Disposable> onSubscribe) { //subscribe(observer)中的observer即是LambdaObserver LambdaObserver<T> ls = new LambdaObserver<T>(onNext, onError, onComplete, onSubscribe); subscribe(ls); return ls; } @SchedulerSupport(SchedulerSupport.NONE) @Override public final void subscribe(Observer<? super T> observer) { observer = RxJavaPlugins.onSubscribe(this, observer); ObjectHelper.requireNonNull(observer, "Plugin returned null Observer"); subscribeActual(observer); } //ObservableSubscribeOn.java @Override protected void subscribeActual(Observer<? super T> observer) { if (scheduler instanceof TrampolineScheduler) { source.subscribe(observer); } else { Scheduler.Worker w = scheduler.createWorker(); //ObserveOnObserver存儲了LambdaObserver的實例ls纳账,且在訂閱前操作中存儲的ObserveOnObserver訂閱被觀察者者source source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize)); } }
-
通知操作:RxJava中訂閱完成后,因為之前已經(jīng)訂閱過捺疼,所以上游可以調(diào)用onNext方法直接通知下游
在mapObserver.onNext()操作中會執(zhí)行如下代碼:
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != NONE) {
actual.onNext(null);
return;
}
U v;
try {
//回調(diào)map中定義的函數(shù)疏虫,mapper即上文存儲的函數(shù)
v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable ex) {
fail(ex);
return;
}
//利用之前保存的actual實例調(diào)用其下游的onNext
actual.onNext(v);
}
最后在執(zhí)行LambdaObserver的onNext中執(zhí)行消費者函數(shù):
@Override
public void onNext(T t) {
if (!isDisposed()) {
try {
//可以看到回調(diào)了觀察者內(nèi)部定義的函數(shù)
onNext.accept(t);
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
get().dispose();
onError(e);
}
}
}
- 另一個結(jié)合reforit的例子,一圖以蔽之,其中
bodyObservale
和callExecuteObservale
都是reforit
中的被觀察者卧秘,省略了網(wǎng)絡(luò)請求的創(chuàng)建被觀察者創(chuàng)操作呢袱,代碼如下:
disposable = Network.getGankApi()
.getBeauties(10, page)
.map(new Function<GankBeautyResult, List<Item>>() {
@Override
public List<Item> apply(GankBeautyResult gankBeautyResult) throws Exception {
return null;
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<List<Item>>() {
@Override
public void accept(@NonNull List<Item> items) throws Exception {
swipeRefreshLayout.setRefreshing(false);
pageTv.setText(getString(R.string.page_with_number, MapFragment.this.page));
adapter.setItems(items);
}
}, new Consumer<Throwable>() {
@Override
public void accept(@NonNull Throwable throwable) throws Exception {
swipeRefreshLayout.setRefreshing(false);
Toast.makeText(getActivity(), R.string.loading_failed, Toast.LENGTH_SHORT).show();
}
});
對應(yīng)的解釋圖如下:
模型二解決方案
需求
假設(shè)有一個數(shù)據(jù)結(jié)構(gòu)『學(xué)生』,要打印出每個學(xué)生所需要修的所有課程的名稱呢翅敌?
代碼
final List<Student> list = ...
Disposable disposable = Observable.fromIterable(list)
/*將學(xué)生的課程發(fā)送出去羞福,從學(xué)生實例得到課程實例,再發(fā)射出去*/
.flatMap(new Function<Student, ObservableSource<Student.Course>>() {
@Override
public ObservableSource<Student.Course> apply(Student student) throws Exception {
//Log.d(TAG,"flatmap student name = "+student.name);
return Observable.fromIterable(student.getCourseList());
}
})
/*接受到學(xué)生的課程*/
.subscribe(new Consumer<Student.Course>() {
@Override
public void accept(Student.Course course) throws Exception {
System.out.printf("Consumer accept course = " + course.getName());
}
});
}
源碼分析
-
訂閱前的操作
-
訂閱操作
-
通知操作
-
fro_this
通知在訂閱操作訂閱的觀察者MergeObserver
-
MergeObserver
收到通知蚯涮,執(zhí)行訂閱前操作的里面的fun2
治专,代碼如下,其返回的p是一個Observable
類型遭顶,這樣就實現(xiàn)了相當于把轉(zhuǎn)換后的數(shù)據(jù)放入了對應(yīng)的可觀測序列张峰,根據(jù)模型二可知,下一步就是要將每個可觀測序列中的菱形數(shù)據(jù)提出來在放入一個Observable
進行輸出棒旗。這個提出來的過程就是subscribeInner
實現(xiàn)的喘批。可以理解為做了一個map
操作铣揉。這個過程邏輯有點復(fù)雜谤祖,因為涉及到并發(fā)控制,所以略過老速。
@Override public void onNext(T t) { // safeguard against misbehaving sources if (done) { return; } ObservableSource<? extends U> p; try { //執(zhí)行fun2函數(shù) p = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper returned a null ObservableSource"); } catch (Throwable e) { Exceptions.throwIfFatal(e); s.dispose(); onError(e); return; } if (maxConcurrency != Integer.MAX_VALUE) { synchronized (this) { if (wip == maxConcurrency) { sources.offer(p); return; } wip++; } } //內(nèi)部訂閱 subscribeInner(p); }
-
線程控制
從模型一中我們可以總計如下:
subscribeOn()切換子線程是在訂閱過程中切換的
observerOn()切換成主線程是在通知的過程中
所以上面兩個一個操縱訂閱過程的線程,一個操縱通知過程的線程凸主,猜測可以產(chǎn)生出任何一種你想要的線程切換功能
-
舉個栗子理解一下:
Observable .map // 操作1 .flatMap // 操作2 .subscribeOn(io)//操作3 .map //操作4 .flatMap //操作5 .observeOn(main)//操作6 .map //操作7 .flatMap //操作8 .subscribeOn(io) //操作9 .subscribe(handleData)
上述例子的結(jié)果是1橘券,2,4卿吐,5運行在IO線程中旁舰,7,8運行在主線程中。
上述操作簡單畫圖如下
我們可以發(fā)現(xiàn)嗡官,第一個操作9的線程切換沒有產(chǎn)生效果,所以總結(jié)如下:subscribeOn只能調(diào)用一次箭窜,因為如果有多次,只會有一次有效果衍腥,observeOn()可以多次調(diào)用實現(xiàn)了你想要的線程的多次切換磺樱。
其他操作符
當然還有其他操作符,留給后面繼續(xù)討論找颓,不過掌握了map和flatmap后屯伞,后面的有些操作符應(yīng)該就不會特別難理解菠红,具體的可見官網(wǎng)