概述
rxjava2想必大家都用的很熟練了牙丽,但我們大多數(shù)工程師又有多少是從源碼來深入了解它呢,尤其是在找工作面試中兔魂,想必大家深有體會烤芦,知道怎么用,卻不不知道怎么說入热∨淖兀總的來說就是對原理不了解。
也許你由于工作原因沒時間勺良,也可能是其他原因绰播,總之沒了解,沒關(guān)系我也沒怎么了解尚困。由于打算回家發(fā)展蠢箩,辭掉了現(xiàn)在的工作,自己也很想寫一篇關(guān)于rxjava2的文章事甜,也不用再去寫業(yè)務(wù)方面的代碼谬泌,想想現(xiàn)在的生活狀態(tài),真的很nice逻谦。
由于Android studio默認(rèn)沒有rxjava2的相關(guān)api掌实,所以第一步就是在github中找到rxjava2的庫,當(dāng)我在查找的時候已經(jīng)出來了rxjava3了邦马,有需要了解的可以 點擊了解贱鼻。這里我們只是針對rxjava2,rxjava3可能有一些新的api或什么的 但大多api還是通用的這里就不過多的闡述滋将。
在日常的工作中rxjava使用的最多的就是在網(wǎng)絡(luò)請求中邻悬,但如果你對rxjava運用的比較熟悉的話,它干的事情也是很多的随闽,比如切線程,類型的轉(zhuǎn)換,舉個簡單例子父丰,比如輸入的int類型的數(shù)據(jù),通過rxjava可以轉(zhuǎn)換成String類型掘宪。一個實體類轉(zhuǎn)換成另外一個實體類蛾扇。或者修改類中的成員變量添诉,或者遍歷集合等等都可以用rxjava來實現(xiàn)屁桑。其實這個庫功能是相當(dāng)強大的。我們也不要局限在只是使用在網(wǎng)絡(luò)框架請求上栏赴。
網(wǎng)絡(luò)框架使用的三方庫依賴
implementation "io.reactivex.rxjava2:rxjava:2.2.8"
implementation "com.squareup.retrofit2:retrofit:2.5.0"
implementation "com.squareup.retrofit2:converter-gson:2.4.0"
implementation "com.squareup.retrofit2:adapter-rxjava2:2.4.0"
implementation "me.jessyan:retrofit-url-manager:1.4.0"
implementation "io.reactivex.rxjava2:rxkotlin:2.2.0"
implementation "io.reactivex.rxjava2:rxandroid:2.1.0"
implementation "com.squareup.okhttp3:okhttp:3.14.1"
implementation "com.google.code.gson:gson:2.8.5"
implementation "com.squareup.okhttp3:logging-interceptor:3.11.0"
注意:添加依賴后需要注明依賴來源 maven { url "https://jitpack.io" }
點擊右上方的同步按鈕蘑斧,依賴就算配置好了。
講解rxjava之前,先簡單的搭建下網(wǎng)絡(luò)請求的相關(guān)配置竖瘾,我這里以github 為例
比如使用users/{user}/repos網(wǎng)絡(luò)請求api
public interface Api {
@GET("users/{user}/repos")
Single<List<Repo>> listRepos(@Path("user") String user);
}
這里講解下Single 沟突,我這里使用的是Single,為什么使用它而沒有使用Observable或者是Flowable捕传,我個人覺得因為Single請求返回的只有成功或者失敗兩種狀態(tài)的返回方法惠拭,與網(wǎng)絡(luò)請求失敗或成功一致,因此我這里選擇了Single而不是其他兩種庸论,也有大佬說在網(wǎng)絡(luò)請求中使用Single比較好一點這也是選擇它的原因之一职辅,在實際項目中我這里的也是選擇Single的。
Repo實體類的內(nèi)容比較多我這里就不貼了聂示,主要還是看下網(wǎng)絡(luò)請求OkhttpClient和Retrofit的相關(guān)代碼
主要的地方我講下 其他的最后我會把代碼上傳到github域携,大家可以到github上看下就可以了
subscribeOn和observeOn
相信很多開發(fā)工作者對這兩種該如何使用有點犯暈,不知道這兩種該如何選擇使用鱼喉,甚至在代碼中亂用的現(xiàn)象秀鞭,有時候我看我同事使用的時候也會出現(xiàn)這種的現(xiàn)象。如何使用呢扛禽?其實也是很簡單的锋边。首先它們都是切換線程用的,它們都可以切換到主線程或者子線程编曼。比較通俗的講subscribeOn是對上游進行切換線程使用的豆巨,observeOn是對下游線程進行切換使用的。什么是上游什么又是下游呢掐场?其實也是很簡單的搀矫,我們都知道rxjava是鏈?zhǔn)秸{(diào)用的,這里的上游就是以你切換線程的地方作為分界點刻肄,上方就稱為上游,反之為下游融欧。比如上游需要進行耗時io操作敏弃。那么我們就可以使用subscribeOn(Schedulers.io()) ,當(dāng)然如果你了解了源碼噪馏,你會發(fā)現(xiàn)Schedulers可以選擇的線程方式開始有很多種的麦到,比如說newThread computation或者single等等。默認(rèn)的使用的是computation
computation
用于CPU密集型的計算任務(wù)欠肾,不適合I/O操作
newThread
為每個任務(wù)創(chuàng)建一個新的線程瓶颠。
single
single擁有一個線程單例,所有的任務(wù)都在這一個線程中執(zhí)行刺桃,當(dāng)此線程中有
任務(wù)執(zhí)行時粹淋,它的任務(wù)將會按照先進先出的順序依次執(zhí)行。
先來個簡單的例子
Single<String> stringSingle = Single.just("1");
stringSingle
.subscribe(new SingleObserver<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onSuccess(String s) {
mTextView.setText(s);
}
@Override
public void onError(Throwable e) {
}
});
先講解上面的代碼,首先創(chuàng)建一個單一的事件流桃移,通過訂閱subscribe發(fā)送事件流屋匕,把數(shù)據(jù)發(fā)送到onSuccess
到這里只是簡單的描述了下整個事件流的事件執(zhí)行過程,現(xiàn)在從源碼來了解下它是如何返回的single對象借杰。
public static <T> Single<T> just(final T item) {
ObjectHelper.requireNonNull(item, "item is null");
return RxJavaPlugins.onAssembly(new SingleJust<T>(item));
}
方法塊的第一行判斷是否為null过吻,第二行onAssembly這個方法其實是一個鉤子,可以點進去看下蔗衡,代碼如下:
public static <T> Single<T> onAssembly(@NonNull Single<T> source) {
Function<? super Single, ? extends Single> f = onSingleAssembly;
if (f != null) {
return apply(f, source);
}
return source;
}
onSingleAssembly是一個全局變量纤虽,它是對你的single進行一個統(tǒng)一的處理,比如增加日志绞惦,但我們這里是不需要使用到它的逼纸。也就是直接返回source而沒有執(zhí)行!=null判斷條件下的代碼塊。這里可以忽略它翩隧。
從上面的代碼可以分析到樊展,其實這里只是創(chuàng)建了一個SingleJust就返回了
我們在點擊SingleJust看下它的源碼長什么樣
public final class SingleJust<T> extends Single<T> {
final T value;
public SingleJust(T value) {
this.value = value;
}
@Override
protected void subscribeActual(SingleObserver<? super T> observer) {
observer.onSubscribe(Disposables.disposed());
observer.onSuccess(value);
}
}
這段代碼其實也很好理解,就是把發(fā)送的1這個值存儲起來堆生,subscribeActual這個方法专缠,稍后再講解下,接下來我們看下訂閱事件的subscribe
public final void subscribe(SingleObserver<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null SingleObserver. Please check the handler provided to RxJavaPlugins.setOnSingleSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");
try {
subscribeActual(observer);
} catch (NullPointerException ex) {
throw ex;
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
NullPointerException npe = new NullPointerException("subscribeActual failed");
npe.initCause(ex);
throw npe;
}
}
方法塊的第一行判斷null就不說了淑仆,第二行其實也是一個hook(鉤子)方法涝婉。點擊去看,你會發(fā)現(xiàn)跟上面的鉤子方法是很像的蔗怠。這個hook在默認(rèn)的情況下也是沒有用到的墩弯。這個方法最核心的代碼 subscribeActual(observer),我們點進去看下寞射,會發(fā)現(xiàn)這行代碼是一個抽象方法渔工。它是在哪里執(zhí)行的呢。其實就是上面講解的SingleJust中還沒有講解到的那個方法桥温。也就是這段代碼引矩,如下:
protected void subscribeActual(SingleObserver<? super T> observer) {
observer.onSubscribe(Disposables.disposed());
observer.onSuccess(value);
}
這就把創(chuàng)建的被觀察者和觀察者聯(lián)系起來了。
整個流程圖
有時候我們在開發(fā)的時候也會遇到這樣一種場景侵浸,就是當(dāng)前的activity不存在的情況旺韭,代碼正在執(zhí)行onSuccess方法,如果是在進行UI操作掏觉,由于找不到對應(yīng)的ui控件而直接crash的情況区端。那么我們該怎么做呢?這里就要用到Disposables澳腹。在當(dāng)前activity被onDestory之前阻斷事件流织盼。
在講解Disposable原理前杨何,我們對被觀察者分為后續(xù)延遲性和非后續(xù)非延遲性進行分類。比如上面提到的Single.just就屬于非后續(xù)非延遲性被觀察者悔政,Observable.just就屬于后續(xù)延遲性被觀察者晚吞。
這里給個簡單的例子來分析下Observable的后續(xù)和延遲性
Observable
.interval(1, TimeUnit.SECONDS)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
}
}, new Action() {
@Override
public void run() throws Exception {
}
}, new Consumer<Disposable>() {
@Override
public void accept(Disposable disposable) throws Exception {
}
});
通過入口interval找到IntervalObserver,先來分析下這個類
static final class IntervalObserver
extends AtomicReference<Disposable>
implements Disposable, Runnable {
private static final long serialVersionUID = 346773832286157679L;
final Observer<? super Long> downstream;
long count;
IntervalObserver(Observer<? super Long> downstream) {
this.downstream = downstream;
}
@Override
public void dispose() {
DisposableHelper.dispose(this);
}
@Override
public boolean isDisposed() {
return get() == DisposableHelper.DISPOSED;
}
@Override
public void run() {
if (get() != DisposableHelper.DISPOSED) {
downstream.onNext(count++);
}
}
public void setResource(Disposable d) {
DisposableHelper.setOnce(this, d);
}
}
它繼承至AtomicReference<Disposable>谋国,AtomicReference可以保證原子操作的Disposable類對象的引用槽地。實現(xiàn)了Disposable接口,這里我們就能夠知道它內(nèi)部的操作是可以被打斷的芦瘾,但實際真正打斷的卻是AtomicReference類中的打斷方法捌蚊,也就是DisposableHelper.dispose(this);點進去可以發(fā)現(xiàn)他就是通過真正可以取消的Dispose去執(zhí)行取消的操作。那么它真正的取消的對象又是在哪呢近弟?其實就是上面的 DisposableHelper.setOnce(this, d)缅糟,而它是通過執(zhí)行setResource方法,而這個方法又是在執(zhí)行定時任務(wù)可取消的 Disposable d = sch.schedulePeriodicallyDirect(is, initialDelay, period, unit)得到的祷愉,整個過程就算理順了窗宦,同時他還實現(xiàn)了Runnable,這就保證了被觀察者具有延遲和持續(xù)性的條件二鳄,這里也可以看出它是在子線程中進行計時操作的赴涵。到最后在進行ui操作是需要切換到UI線程執(zhí)行的,那么這里就可以理解為它是通過子線程切換到主線程然后循環(huán)往復(fù)操作的订讼。它的作用就是讓下游處理的數(shù)據(jù)比上游晚髓窜,這樣就可以持續(xù)和數(shù)據(jù)的穩(wěn)定。
我們都知道rxjava有很多的操作符欺殿,接下來講解下rxjava比較常見的操作符寄纵。
Map操作符
map字面意思是映射,通過映射操作符返回的結(jié)果通過map做下轉(zhuǎn)換脖苏。比如說事件源是一個int類型我們希望轉(zhuǎn)換成String類型程拭,這個我們就可以用map操作符來實現(xiàn),比如:
Single<Integer> stringSingle = Single.just(1);
stringSingle
.map(new Function<Integer, String>() {
@Override
public String apply(Integer s) throws Exception {
return String.valueOf(s);
}
})
.subscribe(new SingleObserver<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onSuccess(String s) {
mTextView.setText(s);
}
@Override
public void onError(Throwable e) {
}
});
接著從源碼解讀map是如何實現(xiàn)類型轉(zhuǎn)換的
public final <R> Single<R> map(Function<? super T, ? extends R> mapper) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new SingleMap<T, R>(this, mapper));
}
onAssembly上面已經(jīng)說過了,這里我們直接分析下SingleMap
public SingleMap(SingleSource<? extends T> source, Function<? super T, ? extends R> mapper) {
this.source = source;
this.mapper = mapper;
}
對source和mapper兩個參數(shù)直接賦值棍潘,source指的是事件來源哺壶,來源指的Single<Integer>,mapper指的Function蜒谤,它相當(dāng)于一個轉(zhuǎn)換器。
接著看下面的代碼
protected void subscribeActual(final SingleObserver<? super R> t) {
source.subscribe(new MapSingleObserver<T, R>(t, mapper));
}
了解到map把最上層single包裹在內(nèi)部至扰,通過內(nèi)部的上層的Single來訂閱事件并向下傳遞事件鳍徽。這里的范型t就是我們最外層看到的new Function<Integer, String>()
public void onSuccess(T value) {
R v;
try {
v = ObjectHelper.requireNonNull(mapper.apply(value), "The mapper function returned a null value.");
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
onError(e);
return;
}
t.onSuccess(v);
}
關(guān)鍵代碼 v = ObjectHelper.requireNonNull(mapper.apply(value)
它除了判斷mapper.apply(value)是否為空外還對其進行了賦值操作。返回的R指的是轉(zhuǎn)換過后的值敢课。
通過源碼能夠了解到阶祭,從最外層鏈?zhǔn)秸{(diào)用讓我們直接想到構(gòu)建模式绷杜,但從源碼可以看到內(nèi)部都是通過new 新創(chuàng)建的
執(zhí)行流程如下
rxjava2中的操作符其實大部分都是這樣的。講到這大部分都講完了濒募,如果讀者覺得還有些沒講到的可以在評論區(qū)補充鞭盟,講的不到位的或者有誤的還望訂正
為了讓大家更好的理解,個人覺得有些從網(wǎng)上找來圖片能夠更好的解釋一些問題瑰剃,如有侵權(quán)齿诉,請聯(lián)系我,我會第一時間刪掉晌姚。后續(xù)還會講到retrofit源碼解析以及自定義view的使用及實戰(zhàn)都會在這個項目中使用
代碼傳送門
??????創(chuàng)作不易粤剧,你的鼓勵就是我最大的創(chuàng)作動力 歡迎star 點贊??