RxJava如何結(jié)合觀察者與鏈?zhǔn)教幚?/h1>
Author: Dorae
Date: 2018年12月3日17:10:31
轉(zhuǎn)載請注明出處
一性芬、概述
Author: Dorae
Date: 2018年12月3日17:10:31
轉(zhuǎn)載請注明出處
首先問自己幾個(gè)問題慨蓝,如果非常清楚這幾個(gè)問題的目的與答案,那么恭喜你竹伸,不用繼續(xù)往下看了-_-泥栖。
- RxJava是干什么的簇宽;
- 鏈?zhǔn)秸{(diào)用中當(dāng)存在數(shù)個(gè)Observable.subscribeOn()時(shí)的處理方式;
- 鏈?zhǔn)秸{(diào)用中當(dāng)存在數(shù)個(gè)Observable.observeOn()時(shí)的處理方式吧享;
- 數(shù)據(jù)是如何經(jīng)過操作符進(jìn)行的處理魏割。
回顧
觀察者模式
如圖1-1所示
[圖片上傳失敗...(image-f38e70-1545098102748)]
圖 1-1
Java8的stream
二、RxJava是什么
一款為了簡化異步調(diào)用钢颂,且功能比較全面的框架钞它。
三、RxJava如何結(jié)合了觀察者模式與鏈?zhǔn)教幚?/h2>
參見Java8中的sink鏈殊鞭,在RxJava中同樣實(shí)現(xiàn)了鏈?zhǔn)教幚碓舛狻H绱a片段code1-1所示,我們對其結(jié)構(gòu)進(jìn)行分析:
code1-1
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("Dorae");
}
})
.filter(e -> e.contains("o"))
.map(e -> "AfterMap: " + e)
.filter(e -> e.contains("D"))
.subscribe(new Observer<String>() {
@Override
public void onNext(@NonNull String o) {
System.out.println("觀察者 onNext: " + o);
}
@Override
public void onSubscribe(Disposable d) {
System.out.println("觀察者onSubscribe: " + d + "### " + Thread.currentThread().getName());
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
1操灿、首先看下其輸出結(jié)果:
觀察者onSubscribe: io.reactivex.internal.operators.observable.ObservableFilter$FilterObserver@52d455b8### main
觀察者 onNext: AfterMap: Dorae
2锯仪、現(xiàn)在來對如何輸出這段結(jié)果進(jìn)行分析
首先了解下RxJava中的幾種基本角色:
- Observable,是RxJava描述的事件流趾盐,可以說其與Observer構(gòu)成了RxJava的基礎(chǔ)庶喜。在鏈?zhǔn)秸{(diào)用中,事件從創(chuàng)建到加工到最后被Observer接受谤碳,其實(shí)就是由一條Observerable鏈串起來的溃卡。
- Observer溢豆,RxJava中的訂閱者蜒简,也就是需要對事件進(jìn)行響應(yīng)的一個(gè)角色。其實(shí)除了我們通常自己實(shí)現(xiàn)的一個(gè)Observer漩仙,在鏈中的每一步都會(huì)產(chǎn)生一個(gè)Observer搓茬,然后這些Observer構(gòu)成一條鏈,最終完成整個(gè)鏈的計(jì)算队他。
- ObservableOnSubscribe卷仑,整個(gè)事件流的源頭,通常需要我們自己實(shí)現(xiàn)麸折,其依賴一個(gè)Emitter锡凝。
- Emitter,可以將其理解為觸發(fā)器垢啼,推動(dòng)整個(gè)流程的運(yùn)轉(zhuǎn)窜锯。
- Scheduler,這個(gè)其實(shí)不用太過關(guān)心芭析,RxJava用其封裝了Thread锚扎,用于完成線程切換等任務(wù)。
是不是感覺上邊的一堆廢話非衬倨簦枯燥驾孔?先上一張RxJava的核心結(jié)構(gòu),如圖3-1所示。
圖 3-1
現(xiàn)在我們再來看看code1-1翠勉,其最終形成的Observable鏈如圖3-2所示妖啥,每次調(diào)用map、filter等操作对碌,都會(huì)生成一個(gè)新對象迹栓,并且保持了一個(gè)對上游的引用(用于生成Observer鏈)。
[圖片上傳失敗...(image-e79420-1545098102748)]
圖 3-2
Observer鏈如圖3-3所示俭缓,整個(gè)事件流程由CreateEmitter觸發(fā)克伊,最終交由我們的實(shí)現(xiàn)Observer$1處理。
[圖片上傳失敗...(image-da5c92-1545098102748)]
圖 3-3
看了上邊幾張圖之后华坦,是不是感覺清晰了很多愿吹?那么讓我們進(jìn)一步看下Rxjava如何完成了一鍵線程切換。
四惜姐、RxJava如何實(shí)現(xiàn)線程切換
通常我們使用RxJava的線程切換功能時(shí)犁跪,只需要在調(diào)用鏈中加上一句subscribeOn()或observeOn(),其中Scheduler如上所述歹袁,其實(shí)就是一個(gè)包裝了ThreadPool的調(diào)度器坷衍。那么我們先來看下相關(guān)源碼。
1条舔、subscribeOn
如代碼code4-1所示枫耳,為subscribeOn的核心代碼。很明顯孟抗,其中在新線程中只是簡單的直接調(diào)用了source迁杨,也就是說這里之后的所有操作均在一個(gè)新線程中進(jìn)行,和單線程并沒有什么區(qū)別凄硼。
code 4-1
public final Observable<T> subscribeOn(Scheduler scheduler) {
return new ObservableSubscribeOn<T>() {
@Override
public void subscribeActual(final Observer<? super T> observer) {
scheduler.createWorker().schedule(new SubscribeTask() {
@Override
public void run() {
source.subscribe(e);
}
});
}
};
}
2铅协、observeOn
如代碼段code4-2所示,為observeOn的核心邏輯摊沉,可以看出其在訂閱階段(生成Observer鏈的階段)還是在當(dāng)前線程執(zhí)行狐史,只有觸發(fā)之后,到了ObserverOn的Observer的節(jié)點(diǎn)時(shí)才會(huì)真正的切換到新線程说墨。
code 4-2
public final Observable<T> observeOn(Scheduler scheduler) {
return new ObservableOnSubscribe<T>() {
@Override
public void subscribeActual(@NonNull Observer<Object> e) {
source.subscribe(new Observer<T>() {
@Override
public void onNext(T var1) {
scheduler.createWorker().schedule(new Runnable() {
@Override
public void run() {
e.onNext(var1);
}
});
}
});
}
};
}
多次Observable.subscribeOn()骏全、多次Observable.observeOn()會(huì)發(fā)生什么
通過上述code4-1、code4-2的分析婉刀,是不是可以推斷出當(dāng)多次subscribeOn時(shí)會(huì)發(fā)生什么吟温?沒錯(cuò),雖然每次subscribeOn都會(huì)產(chǎn)生一次線程切換突颊,但是真正起作用的只有最開始的一次subscribeOn鲁豪,也就相當(dāng)于只在最初的位置調(diào)用了subscribeOn潘悼;對于observeOn也是類似,每次都會(huì)產(chǎn)生新線程爬橡,但是每次都會(huì)產(chǎn)生一定的影響治唤,也就是每個(gè)線程都承擔(dān)了一部分工作。
小結(jié)
通過本文糙申,我們可以簡要了解到RxJava的基本原理宾添,但是對于其豐富的api還需要在實(shí)踐中進(jìn)行磨合。但是柜裸,RxJava既然作為一個(gè)異步框架缕陕,其必然有一定的局限,比如其切換線程時(shí)無法阻塞當(dāng)前線程(這種對于Android等需要渲染或者網(wǎng)絡(luò)IO的需求來說非常適用)疙挺,但是對于常見的服務(wù)端業(yè)務(wù)來說扛邑,還需要額外引入阻塞當(dāng)前線程的操作(因?yàn)榇蟛糠值膕erver代碼還是單線程模型),倘若完全不用線程切換在服務(wù)端強(qiáng)行引入铐然,可能會(huì)得不償失蔬崩。個(gè)人更推薦Java8的CompletableFuture。