作者: 一字馬胡
轉(zhuǎn)載標(biāo)志 【2017-12-13】
更新日志
日期 | 更新內(nèi)容 | 備注 |
---|---|---|
2017-12-13 | RxJava學(xué)習(xí)筆記系列 | 系列筆記 (一) |
2017-12-15 | RxJava 學(xué)習(xí)筆記 (二) | 增加系列筆記(二) |
2017-12-15 21:36 | 考慮到RxJava很大程度上用于android開發(fā)中炭玫,而我自身不是移動開發(fā)者性穿,所以暫時將RxJava學(xué)習(xí)筆記系列掛起蠢络,在未來需要使用RxJava的時候再繼續(xù)學(xué)習(xí)椿猎,并且結(jié)合實際的應(yīng)用來學(xué)習(xí)會收獲更多 | 掛起 |
導(dǎo)入
其實在很早以前就接觸過RxJava碰纬,并且當(dāng)時學(xué)習(xí)RxJava還有一個產(chǎn)出:JSwitcher抬旺,這是一個基于RxJava的實驗性框架暖庄,對于該框架的介紹可以參考下面的描述:
JSwitcher is a Convenient tool to switch schedule base on RxJava, and Jswitcher also implement a sample Version Observer/Observable, you can learn how RxJava works from the sample codes. it's easy to switch to Another schedule from current schedule. you just need to care about your bussiness, using 'switchTo' Operator to switch to the longing schedlue when you want to do the work on the suitable schedule. There are some especial schedules for you, like I/O Bound Schedule, Cpu Bound Schedule, And Single Schedule, etc, if you want to create an extra schedule by yourself, it's ok for JSwitcher, and it's very easy to do this .And the most important thing is the jswitch support 'chain operator', that means you can switch to a schedule, then fit on this schedule some works, then you can do switch operator continue from current position, or you can just fit another work on current schedule, and jswitcher has terminal operator like 'waitAndShutdown', after do the operator, you can not do 'chain operator' anymore. and the jswitcher will wait some time and shutdown all of schedule.
該框架將RxJava的核心部分抽離出來并做了一些簡化處理聊替,說到這里,需要提及一下培廓,將一個復(fù)雜框架中的某部分抽象出來看似很簡單惹悄,但是實際操作起來還是有一些困難的,并且在實際操作的過程中為了不涉及過多外圍的內(nèi)容時常需要簡化肩钠,就是將一些依賴外圍的核心部分中的某些內(nèi)容拋棄泣港,但是最為主要的骨架不能丟掉,這樣操作下來會對整個框架有一定的了解价匠。如果上面的描述激起了你的興趣当纱,可以實際去閱讀JSwitcher框架代碼,也可以作為快速入門RxJava的學(xué)習(xí)材料踩窖,但是該框架存在一些不確定性以及一些待研究正確性的點坡氯,所以不宜在實際項目中應(yīng)用。
JSwitcher的核心功能是實現(xiàn)線程池的切換洋腮,并且支持按任務(wù)性質(zhì)(I/O廉沮,Compute)來劃分線程池,切換到合適的線程池可以提交任務(wù)徐矩,具體的使用可以參考下面的例子:
SwitcherFitter.switcherFitter()
.switchToIoSchedule() //switch to i/o bound schedule
.switchToSingleSchedule() //switch to single schedule
.fit(normalRunner, future1, true) //do the normal runner at current schedule
.switchToComputeSchedule() // switch to cpu bound schedule
.fit(normalRunner, future2, true) // do
.fit(timeoutRunner, future3, true) // do
.switchToSingleSchedule() //switch
.switchToSingleSchedule() //switch
.fit(timeoutRunner, future4, true) //do
.awaitFuturesCompletedOrTimeout(100,
completableFutures, timeoutFutures, 10) //wait for the future
.switchToComputeSchedule() //switch
.fit(() -> {
System.out.println("i am a tester->" + Thread.currentThread().getName());
}) // do the stupid work
.waitAndShutdown(1000); //wait and shutdown !
關(guān)于JSwitcher的設(shè)計滞时,可以參考下面的圖片:
本文作為學(xué)習(xí)RxJava的學(xué)習(xí)筆記的第一篇文章,會從RxJava的一些核心概念出發(fā)滤灯,并且從實際的例子來梳理RxJava的實現(xiàn)原理坪稽,當(dāng)然曼玩,為了閱讀的流暢性,每一篇文章不會涉及太多的內(nèi)容窒百。需要說明的一點是黍判,本文乃至本系列的所有文章都是基于RxJava2,RxJava目前有兩個版本篙梢,一個是RxJava1顷帖,一個是RxJava2,據(jù)說兩個版本間的差別還是很大的渤滞,介于我的學(xué)習(xí)都是基于RxJava2的贬墩,并且沒有接觸過RxJava1,所以本系列文章不會涉及RxJava1與RxJava2的對比內(nèi)容妄呕,所有內(nèi)容都是基于RxJava2的陶舞。
Observer和Observable
學(xué)習(xí)RxJava之前,你需要了解什么是Reactive绪励,我的理解是應(yīng)該要和傳統(tǒng)的代碼進行對比學(xué)習(xí)肿孵,我們一般寫代碼都是命令式的,我們希望做什么就做什么疏魏,比如我們想下載一張圖片停做,然后判斷圖片是否下載成功,如果成功了就展示出來大莫,如果沒有下載成功則使用兜底圖片進行展示雅宾,如果沒有兜底圖片則不展示。下面是這個功能的偽代碼實現(xiàn):
Image img = EntryDownloadHelper.downloadImageByUrl(url, timeout)
if img is null
then
if FALLBACK_IMG != null
then img = FALLBACK_IMG
if img != null
then
ShowEntryHelper.showImage(img, height, weight)
看起來很熟悉并且很容易理解葵硕,那什么是Reactive的呢?如果使用RxJava來重寫上面的代碼贯吓,則代碼看起來像下面這樣:
String imgUrl = "xxx";
Image img = null;
Image FALLBACK_IMG = "xxx";
int timeout = 1000;
int height = 100;
int weight = 200;
Observable.create(new ObservableOnSubscribe<Image>() {
public void subscribe(ObservableEmitter<Image> e) throws Exception {
if (imgUrl == null || imgUrl.isEmpty()) {
e.onNext(FALLBACK_IMG);
} else {
img = EntryDownloadHelper.downloadImageByUrl(imgUrl, timeout);
if (img == null) {
e.onNext(FALLBACK_IMG);
} else {
e.onNext(img);
}
}
e. onComplete();
}
}).subscribe(new Observer<Image>() {
public void onSubscribe(Disposable disposable) {
}
public void onNext(Image s) {
if (s != null) {
ShowEntryHelper.showImage(img, height, weight);
}
}
public void onError(Throwable throwable) {
System.out.println("onError:" + throwable);
}
public void onComplete() {
}
});
這只是一個簡單的小例子懈凹,并沒有什么使用價值,并且需要說明的一點是悄谐,RxJava更適合用于移動應(yīng)用的開發(fā)介评,所以如果是做移動開發(fā)的話,學(xué)習(xí)RxJava的價值會更大爬舰,但是在一些其他的開發(fā)過程中也會使用到RxJava们陆。
在上面的例子中,出現(xiàn)了兩個比較關(guān)鍵的對象情屹,ObServer和Observable坪仇,RxJava在實現(xiàn)Reactive的時候使用了觀察者設(shè)計模式,Observable是被觀察者垃你,可以叫數(shù)據(jù)源椅文,也可以叫做生產(chǎn)者喂很,反正就是負(fù)責(zé)生產(chǎn)數(shù)據(jù),并且將數(shù)據(jù)推送出去的東西皆刺,而ObServer是觀察者對象少辣,它會綁定到一個Observable上,并且觀察Observable的行為羡蛾,當(dāng)ObServable觸發(fā)事件的時候漓帅,ObServer會接收到事件,并且對相應(yīng)的事件作出相應(yīng)痴怨。所以可以將ObServer叫做事件的接收者忙干,也可以叫做事件的消費者。有了觀察者和被觀察者腿箩,需要將兩個角色聯(lián)系起來豪直,也就是上面所說到的將Observer綁定到Observable上,這個時候就需要使用Observable的subscribe方法珠移,叫做訂閱弓乙,下面會詳細講解Observable是如何將事件傳遞給Observer的。
學(xué)習(xí)一個新技術(shù)最開始需要做的就是寫一個demo钧惧,并且運行起來暇韧,然后再繼續(xù)學(xué)習(xí)下去。下面首先寫一個RxJava的demo浓瞪,下面的分析將會基于該demo:
Observable.create(new ObservableOnSubscribe<String>() {
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("test");
e.onComplete();
}
}).subscribe(new Observer<String>() {
public void onSubscribe(Disposable disposable) {
System.out.println("onSubscribe");
}
public void onNext(String s) {
System.out.println("onNext:" + s);
}
public void onError(Throwable throwable) {
System.out.println("onError:" + throwable);
}
public void onComplete() {
System.out.println("onComplete:");
}
});
首先需要創(chuàng)建一個Observable懈玻,可以使用Observable的靜態(tài)方法create,當(dāng)然可以直接new一個Observable對象乾颁,并且實現(xiàn)Observable的方法來實現(xiàn)涂乌,就像下面這樣:
Observable<String> observable = new Observable<String>() {
@Override
protected void subscribeActual(Observer<? super String> observer) {
observer.onNext("ok");
observer.onComplete();
}
};
現(xiàn)在,Observable已經(jīng)有了英岭,下面就需要在該Observable上綁定一個Observer湾盒,就像上面的例子一樣,使用Observable的subscribe方法诅妹,需要說明的一點是罚勾,可以在Observable做非常豐富的聚合操作,可以對Observable進行一系列聚合操作(比如map吭狡,filter等操作)之后再綁定Observer尖殃,但是本文不會涉及這些操作的內(nèi)容,這些內(nèi)容將在下一篇該系列的文章中出現(xiàn)划煮。
目前Observable有六個subscribe方法供Observer選擇:
- public final Disposable subscribe()
- public final Disposable subscribe(Consumer<? super T> onNext)
- public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError)
- public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,Action onComplete)
- public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Disposable> onSubscribe)
- public final void subscribe(Observer<? super T> observer)
可以選擇這六個中的任意一個來綁定Observer送丰,本文以一個看起來較為簡單的subscribe方法來分析,也就是上面例子中使用的版本:
- public final void subscribe(Observer<? super T> observer)
下面展示了該方法的詳細實現(xiàn)細節(jié):
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Disposable has been set or not
// can't call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}
看起來代碼很多弛秋,但是核心代碼就一句:subscribeActual(observer)蚪战,然后:
/**
* Operator implementations (both source and intermediate) should implement this method that
* performs the necessary business logic.
* <p>There is no need to call any of the plugin hooks on the current Observable instance or
* the Subscriber.
* @param observer the incoming Observer, never null
*/
protected abstract void subscribeActual(Observer<? super T> observer);
再看一下new一個Observable的代碼:
Observable<String> observable = new Observable<String>() {
@Override
protected void subscribeActual(Observer<? super String> observer) {
// XXX
}
};
也就是說牵现,subscribe方法中會調(diào)用Observable的subscribeActual方法,并且將subscribe的參數(shù)(也就是綁定到該Observable的Observer)傳遞給subscribeActual邀桑,然后瞎疼,我們在subscribeActual方法里面對subscribeActual的參數(shù)observer的操作實際上就是直接調(diào)用了Observer的方法,所以O(shè)bserver當(dāng)然會對響應(yīng)相應(yīng)的事件壁畸。
這個理解起來不太困難贼急,下面看一下使用Observable的create靜態(tài)方法來創(chuàng)建Observable的時候是怎么講一個Observer綁定到一個create出來的Observable上的,回頭看下面的代碼:
Observable.create(new ObservableOnSubscribe<String>() {
public void subscribe(ObservableEmitter<String> e) throws Exception {
// XXX
}
}).subscribe(new Observer<String>() {
// XXX
});
這個看起來好像不能像上面那種情況一樣理解捏萍,因為create的參數(shù)是new一個ObservableOnSubscribe對象太抓,現(xiàn)在先來看一下create方法的具體實現(xiàn)細節(jié):
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
可以看到,create方法返回的是一個ObservableCreate對象令杈,并且將我們的Observable對象傳遞給了ObservableCreate走敌,這里使用了包裝模式,將Observable包裝成了ObservableCreate對象逗噩。在ObservableCreate類中找到了subscribeActual的實現(xiàn)掉丽,而這個subscribeActual正是實現(xiàn)了Observable的subscribeActual。所以包裝需要包裝徹底啊异雁。下面是ObservableCreate類的subscribeActual的具體實現(xiàn):
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
在subscribeActual內(nèi)部捶障,又對Observer做了一次包裝,將Observer對象包裝成了CreateEmitter對象纲刀,為什么呢项炼?因為在create方法的參數(shù)中我們new的Observable是一個ObservableOnSubscribe類型的對象,而ObservableOnSubscribe的subscribe的參數(shù)需要是CreateEmitter類型的示绊,那我們new出來的ObservableOnSubscribe到哪去了呢锭部?看下面的構(gòu)造函數(shù):
final ObservableOnSubscribe<T> source;
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
可以看到,我們new出來的ObservableOnSubscribe被保存在source字段中面褐,在來看ObservableCreate類的subscribeActual方法拌禾,其中有關(guān)鍵的一句話:source.subscribe(parent),source是Observable盆耽,parent是Observer,只是Observer和Observable都是被包裝了一層的扼菠。如果想具體了解到底是怎么包裝的摄杂,可以參考CreateEmitter類,也可以借助這個機會學(xué)習(xí)一下包裝模式循榆,還是比較有用的析恢。
本文是對RxJava學(xué)習(xí)筆記系列的第一篇文章,內(nèi)容淺顯易懂秧饮,沒有涉及太多的內(nèi)容映挂,主要分析了一下RxJava中的兩個重要的對象泽篮,Observable和Observer,并且梳理了一下一個Observer是如何綁定到一個Observable上的柑船,當(dāng)然帽撑,這是學(xué)習(xí)RxJava的基礎(chǔ)內(nèi)容,如果對這一部分內(nèi)容都不清楚的話鞍时,還需要繼續(xù)學(xué)習(xí)一下亏拉,本文涉及到兩個設(shè)計模式,一個是觀察者模式逆巍,一個是包裝模式及塘,結(jié)合具體的例子來看還是很好理解的。本文開頭還介紹了一下JSwitcher锐极,對于學(xué)習(xí)RxJava還是比較有幫助的笙僚。下面簡單做一下RxJava學(xué)習(xí)筆記系列的文章計劃:
- 《RxJava學(xué)習(xí)筆記 (一)》 : 了解RxJava中的Observable和Observer,并且明白如何實現(xiàn)訂閱
- 《RxJava學(xué)習(xí)筆記 (二)》 : RxJava中Observable豐富的聚合操作支持的學(xué)習(xí)筆記
- 《RxJava學(xué)習(xí)筆記 (三)》 : RxJava2中的線程切換學(xué)習(xí)筆記
- 《RxJava學(xué)習(xí)筆記 (四)》 : RxJava Flowable學(xué)習(xí)
暫時定這幾部分內(nèi)容灵再,在總結(jié)過程中如果發(fā)現(xiàn)還有什么內(nèi)容需要補充的時候會進行補充更新肋层。