一尖淘、RXjava介紹
首先看一下Rxjava這個名字,其中java代表java語言著觉,而Rx是什么意思呢村生?Rx是Reactive Extensions的簡寫,翻譯過來就是饼丘,響應式拓展。所以Rxjava的名字的含義就是肄鸽,對java語言的拓展卫病,讓其可以實現(xiàn)對數據的響應式編程。
那么響應的是什么呢帜平?響應的是上游數據的變化。常規(guī)用法是,對數據源進行監(jiān)聽箍邮,然后做出響應茉帅。
RxJava的整體結構是一條鏈,其中有這三個角色媒殉。
- 鏈的上游:生產者 Observable
- 鏈的下游:觀察者 Observer
- 鏈的中間:各個中介節(jié)點担敌,既是下游的Observable,又是上游的Observer
二廷蓉、Rxjava基本使用
Single.just("hfhuaizhi").subscribe(object : SingleObserver<String> {
override fun onSubscribe(d: Disposable) {
Log.e(TAG, "onSubscribe")
}
override fun onSuccess(t: String) {
Log.e(TAG, "onSuccess:$t")
}
override fun onError(e: Throwable) {
Log.e(TAG, "onError:$e")
}
})
上面這段代碼是對Rxjava簡單的使用全封,其中
- Single 發(fā)出單個數據的被觀察者Observable马昙,只發(fā)送一次,只有Success和Error兩種狀態(tài)刹悴,沒有next行楞,在Rxjava2中新增
- just 被觀察者生產的數據,參數類型是一個泛型土匀,這里傳進去的是一個String
- subscribe 觀察者Observer子房,這里聲明的是SingleObserver,用來對Single中產生的數據進行響應
- SingleObserver
- onSubscribe 訂閱成功后就會回調就轧,一般會在此方法中進行一些初始化操作证杭。其參數類型是Disposable,可以通過調用d.dispose() 取消對Observable的監(jiān)聽妒御,并讓其停止發(fā)送消息解愤。
- onSuccess 接收數據成功后就會回調,只會回調一次乎莉,其參數類型和Observable中just方法傳入的數據類型一致送讲,這里是String類型
- onError 發(fā)生錯誤時回調,參數是Throwable惋啃,包含錯誤信息哼鬓。
運行效果
2021-12-18 13:54:12.450 29223-29223/com.hfhuaizhi.rxjavatest E/hftest: onSubscribe
2021-12-18 13:54:12.451 29223-29223/com.hfhuaizhi.rxjavatest E/hftest: onSuccess:hfhuaizhi
可以看到首先onSubscribe被調用,表明注冊了觀察者边灭。然后接收數據成功异希,打印出'hfhuaizhi'。 到這里我們就了解了Rxjava最基本的用法存筏,接下來分析一下函數的內部做了什么宠互。
三、Rxjava原理解析
1. just方法分析
public static <@NonNull T> Single<T> just(T item) {
Objects.requireNonNull(item, "item is null");
return RxJavaPlugins.onAssembly(new SingleJust<>(item));
}
- 對方法參數進行判空
- 調用
RxJavaPlugins.onAssembly
方法椭坚,其參數是一個SingleJust予跌,構造方法傳入了item- 其中onAssembly方法內部對傳入的參數進行一些處理,然后返回原參數類型善茎,所以接下來分析的過程中會忽略此方法券册,可以簡單認為just方法直接返回了一個SingleJust實例。
// onSingleAssembly 參數默認是空的垂涯,所以這個方法原樣返回了source,當設置onSingleAssembly后烁焙,
// 會先對source進行處理后再返回
public static <@NonNull T> Single<T> onAssembly(@NonNull Single<T> source) {
Function<? super Single, ? extends Single> f = onSingleAssembly;
if (f != null) {
return apply(f, source);
}
return source;
}
final T value;
public SingleJust(T value) {
this.value = value;
}
SingleJust將構造方法傳入的item保存在value字段中。 由上述分析可知耕赘,Single.just
方法會返回一個SingleJust實例骄蝇,所以在我們鏈式調用中的subscribe方法,實際上調用的是SingleJust的subscribe方法
public final void subscribe(@NonNull SingleObserver<? super T> observer) {
// 1. 判空
Objects.requireNonNull(observer, "observer is null");
// 2. 對參數中的observer進行處理后又返回observer
observer = RxJavaPlugins.onSubscribe(this, observer);
// 3. 對Observer進行判空
Objects.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null SingleObserver. Please check the handler provided to RxJavaPlugins.setOnSingleSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");
try {
// 4. 調用真實注冊方法
subscribeActual(observer);
} catch (NullPointerException ex) {
throw ex;
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
NullPointerException npe = new NullPointerException("subscribeActual failed");
npe.initCause(ex);
throw npe;
}
}
subscrib方法中主要做了注釋中所寫的四步操作操骡,其中重要的是第4步subscribeActual
九火,這里才是真正做事的赚窃,之前都是數據的校驗,因為我們這個類的實例是SingleJust岔激,所以接下來看一下SingleJust的subscribeActual方法做了什么勒极。
@Override
protected void subscribeActual(SingleObserver<? super T> observer) {
observer.onSubscribe(Disposable.disposed());
observer.onSuccess(value);
}
可以看到內容十分簡單
- 調用observer的onSubscribe方法,表明訂閱成功虑鼎,參數是Disposable.disposed()返回值
- 調用observer的onSuccess方法辱匿,表明數據回調成功,參數是value炫彩,而value就是通過Single的just函數傳進來的松嘶,通過構造方法傳入SingleJust實例中体谒,因此馏予,這一步的操作就是簡單地將構造方法中傳入的值雳刺,通過observer的onSuccess方法回調給我們定義的觀察者SingleObserver。
這樣就完事了划址,因為之前說過Single.just是最簡單的RxJava使用方式,先調用onSubscribe表明注冊監(jiān)聽限府,然后又緊接著通過onSuccess回調數據夺颤,所以不會有失敗的情況。
2. map方法分析
map是Rxjava中比較常用的用法胁勺,用來實現(xiàn)數據類型的轉換 比如像這樣世澜,我們發(fā)送的數據類型是Integer,接收的數據類型是String署穗,這樣當然是無法直接接收的寥裂,所以需要進行一下轉換,將上游數據發(fā)送的Integer轉換為String案疲,然后由下游接收封恰。
private fun testMap(view: View) {
Single.just(123).map(object : Function<Int, String> {
override fun apply(t: Int): String {
return "$t"
}
}).subscribe(object : SingleObserver<String> {
override fun onSubscribe(d: Disposable) {
Log.e(TAG, "onSubscribe")
}
override fun onSuccess(t: String) {
Log.e(TAG, "onSuccess:$t")
}
override fun onError(e: Throwable) {
Log.e(TAG, "onError:$e")
}
})
}
打印結果
021-12-18 22:32:02.958 5210-5210/com.hfhuaizhi.rxjavatest E/hftest: onSubscribe
2021-12-18 22:32:02.958 5210-5210/com.hfhuaizhi.rxjavatest E/hftest: onSuccess:123
just方法傳入的123是Integer類型,onSuccess處接收的數據是String類型褐啡,通過map進行轉換诺舔。其中map方法傳入的參數是一個Function<T,E>,此類有兩個泛型參數,T代表輸入數據類型备畦,E表示輸出數據類型低飒,這里的輸入數據類型是Integer,返回類型是String懂盐,apply方法中返回了String類型的輸出數據褥赊。
map(object : Function<Int, String> {
override fun apply(t: Int): String {
return "$t"
}
})
public final <@NonNull R> Single<R> map(@NonNull Function<? super T, ? extends R> mapper) {
Objects.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new SingleMap<>(this, mapper));
}
進入map方法內部,此方法判空后莉恼,返回了SingleMap實例拌喉,其構造方法傳入了當前SingleJust實例和mapper轉換參數,并將其分別保存在source和mapper成員變量中速那。
public final class SingleMap<T, R> extends Single<R> {
final SingleSource<? extends T> source;
final Function<? super T, ? extends R> mapper;
public SingleMap(SingleSource<? extends T> source, Function<? super T, ? extends R> mapper) {
this.source = source;
this.mapper = mapper;
}
...
}
好,map方法暫且看到這兒司光,我們接下來繼續(xù)分析鏈式調用中的subscribe方法琅坡。
subccribe傳入了一個SingleObserver,和之前分析的類似残家,但是區(qū)別在于調用的不再是SingleJust的subscribe方法榆俺,而是map方法返回的SingleMap的subscribe方法,由之前的分析可知坞淮,此方法調用會在數據的判空后調用到SingleMap的subscribeActual
方法茴晋。 由之前的分析可知,鏈式調用到subscribe方法會調用到SingleMap的subscribeActual
方法
public final class SingleMap<T, R> extends Single<R> {
final SingleSource<? extends T> source;
final Function<? super T, ? extends R> mapper;
public SingleMap(SingleSource<? extends T> source, Function<? super T, ? extends R> mapper) {
this.source = source;
this.mapper = mapper;
}
@Override
protected void subscribeActual(final SingleObserver<? super R> t) {
source.subscribe(new MapSingleObserver<T, R>(t, mapper));
}
...
}
由之前的分析可知回窘,source就是map的上游SingleJust诺擅, 所以在single的實際subscribe方法中會調用其上游的subscribe方法,并傳入了一個封裝好的新的MapSingleObserver啡直,MapSingleObserver的構造方法中第一個參數t烁涌,是下游觀察者,在我們這塊代碼中就是鏈式調用的時候傳入的SingleObserver酒觅。第二個參數是我們在map方法中傳入的數據類型轉換轉換器mapper撮执。 由之前的分析可知,當source舷丹,也就是SingleJust的subscribe方法調用后抒钱,會依次調用其參數傳入的Observer的onSubscribe方法和onSuccess方法,此時參數傳入的Observer就是上面代碼塊里的MapSingleObserver
static final class MapSingleObserver<T, R> implements SingleObserver<T> {
final SingleObserver<? super R> t;
final Function<? super T, ? extends R> mapper;
MapSingleObserver(SingleObserver<? super R> t, Function<? super T, ? extends R> mapper) {
this.t = t;
this.mapper = mapper;
}
@Override
public void onSubscribe(Disposable d) {
t.onSubscribe(d);
}
@Override
public void onSuccess(T value) {
R v;
try {
v = Objects.requireNonNull(mapper.apply(value), "The mapper function returned a null value.");
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
onError(e);
return;
}
t.onSuccess(v);
}
@Override
public void onError(Throwable e) {
t.onError(e);
}
}
onSubscribe方法原封不動的調用了t.onSubscribe(d);而t就是在MapSingleObserver構造方法傳入的下游觀察者颜凯,也就是SingleObserver實例谋币。這里直接調用了其onSubscribe方法表示注冊監(jiān)聽成功。 onSuccess方法中調用了mapper.apply(value)症概,這個mapper就是我們在map方法中傳入的轉換函數蕾额,這里輸入了Integer數據類型,得到了String類型輸出穴豫,最后調用t.onSuccess回調轉換后的數據凡简,也就是調用我們subscribe方法傳入的實例的onSuccess。
map方法總結
map主要做的就是一個承上啟下精肃,鏈式調用中subscribe方法調用后秤涩,會依次向上調用中間節(jié)點的subscribe方法,直到調用到最初始的沒有上游的Observable,最上層的Observable會在其subscribeActual方法中調用其下游觀察者的onSubscribe和onSuccess/onError司抱,將數據一層一層傳下去筐眷,數據傳遞的過程中,中間節(jié)點可能會對數據進行處理后再接著向下傳习柠,最終傳遞到最底層的Observer匀谣,整個流程如圖所示
圖片含義解釋
最上游的Single就是我們調用Single.just產生的SingleJust照棋,其subscribe方法中會調用onSubscribe()和onSuccess(),向下方觀察者傳遞Integer類型的結果,中間觀察者SingleObserver由map方法創(chuàng)建武翎,其接收到上游傳遞下來的數據后烈炭,將其轉換為String,然后傳遞給下方觀察者宝恶,最后下游收到的數據結果就是String類型符隙。
3. 線程切換
線程切換可以說是RxJava中最常用的操作了,甚至很多人選擇RxJava垫毙,就是因為RxJava可以和方便地實現(xiàn)線程切換霹疫。 線程切換主要用到這兩個函數:
- subscribeOn
- observerOn
private fun testSubscribe(view: View) {
Single.just("hfhuaizhi").subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread()).subscribe(object : SingleObserver<String> {
override fun onSubscribe(d: Disposable) {
Log.e(TAG, "onSubscribe")
}
override fun onSuccess(t: String) {
Log.e(TAG, "onSuccess:$t")
}
override fun onError(e: Throwable) {
Log.e(TAG, "onError:$e")
}
})
}
這樣寫,可以實現(xiàn)subscribe調用之前的消息發(fā)送在io線程综芥,observerOn調用之后的Observer回調在android主線程丽蝎,其中AndroidSchedulers類不在Rxjava標準庫中,需要額外引入RxAndroid依賴膀藐。
subscribeOn
public final Single<T> subscribeOn(@NonNull Scheduler scheduler) {
Objects.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new SingleSubscribeOn<>(this, scheduler));
}
subscribeOn方法返回一個SingleSubscribeOn實例屠阻,其構造方法中傳入了this(上游被觀察者)和scheduler(線程調度器,我們傳入的是Schedulers.io())额各。 由之前的分析可知栏笆,鏈式調用中最終subscribe方法調用的時候,會由下向上依次調用各個節(jié)點的subscribe方法臊泰,這里我們看一下SingleSubscribeOn這一線程切換的節(jié)點的subscribe方法做了什么,因為SingleSubscribeOn和SingleJust一樣繼承自Single蚜枢,其subscribe方法也是調用到了subscribeActual方法
public final class SingleSubscribeOn<T> extends Single<T> {
final SingleSource<? extends T> source;
final Scheduler scheduler;
public SingleSubscribeOn(SingleSource<? extends T> source, Scheduler scheduler) {
// 上層被觀察對象
this.source = source;
// 線程類型
this.scheduler = scheduler;
}
@Override
protected void subscribeActual(final SingleObserver<? super T> observer) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<>(observer, source);
observer.onSubscribe(parent);
Disposable f = scheduler.scheduleDirect(parent);
parent.task.replace(f);
}
}
- 將observer(下游觀察者)和source(上游被觀察者)封裝進一個新的觀察者SubscribeOnObserver
- 調用下游觀察者的onSubscribe方法
- 調用scheduler的scheduleDirect方法缸逃,參數傳入剛封裝的新的觀察者SubscribeOnObserver實例
- 將parent的task變量替換為由傳入的scheduler生成的Disposable
final SequentialDisposable task;
- 這個task的參數類型是Disposable,之前有提到過厂抽,在Observer的onSubscribe方法中會傳入一個Disposable需频,調用Disposable的dispose()方法后,會取消注冊并讓上游停止發(fā)送任務筷凤,這個Disposable繼承自AtomicReference 實現(xiàn)了Disposable接口,AtomicReference是java里的原子引用類型昭殉,可以線程安全地對對象引用進行修改,類似地還有AtomicInteger等藐守,所以這里的parent.task.replace(f)就是將parent中的task這個disposable線程安全地替換為scheduler創(chuàng)建地這個新的Disposable挪丢,從而可以實現(xiàn)任務的取消。
static final class SubscribeOnObserver<T>
extends AtomicReference<Disposable>
implements SingleObserver<T>, Disposable, Runnable {
...
}
接下來分析一下第3步主要做了什么
public Disposable scheduleDirect(@NonNull Runnable run) {
return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
}
scheduleDirect方法中傳入了一個Runnable類型參數卢厂,因為SubscribeOnObserver類實現(xiàn)了Runnable接口乾蓬,所以可以被當作Runnable傳進去。
因為我們傳入的scheduler參數是由Schedulers.io()方法創(chuàng)建的慎恒,而此方法默認會返回一個IoScheduler
這個Scheduler的注釋寫著任内,會創(chuàng)建并緩存一個線程池撵渡。所以我們知道了scheduleDirect方法會將傳入的Runnable放入一個線程池里執(zhí)行,從而實現(xiàn)任務的異步執(zhí)行,所以接下來我們去看一下SubscribeOnObserver的run方法里做了什么死嗦。
static final class SubscribeOnObserver<T>
extends AtomicReference<Disposable>
implements SingleObserver<T>, Disposable, Runnable {
private static final long serialVersionUID = 7000911171163930287L;
final SingleObserver<? super T> downstream;
final SequentialDisposable task;
final SingleSource<? extends T> source;
SubscribeOnObserver(SingleObserver<? super T> actual, SingleSource<? extends T> source) {
this.downstream = actual;
this.source = source;
this.task = new SequentialDisposable();
}
@Override
public void run() {
source.subscribe(this);
}
@Override
public void onSubscribe(Disposable d) {
DisposableHelper.setOnce(this, d);
}
@Override
public void onSuccess(T value) {
downstream.onSuccess(value);
}
...
}
SubscribeOnObserver的run方法中會調用source.subscribe,并傳入自己(自己也是一個Observer)趋距,由之前分析我們知道source就是我們監(jiān)聽的上游,這里調用了SingleJust的subscribe越除,由之前的分析我們知道subscribe會調用到subscribeActual
节腐,這里做任務的真正執(zhí)行,因此就這樣實現(xiàn)了讓上游任務在異步線程中的執(zhí)行廊敌,上游任務執(zhí)行過后铜跑,會將數據向下傳遞,傳遞到當前SubscribeOnObserver節(jié)點的時候會調用其onSuccess方法骡澈,其調用downstream锅纺,也就是下游觀察者的onSuccess方法,將數據繼續(xù)向下傳遞肋殴,此時數據傳遞的線程也是run方法執(zhí)行的線程囤锉,因為此時并沒有再次對線程進行切換。
observerOn
public final Single<T> observeOn(@NonNull Scheduler scheduler) {
Objects.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new SingleObserveOn<>(this, scheduler));
}
observeOn函數返回了一個SingleObserveOn护锤,也是需要傳入this(上游被觀察者)官地,和scheduler(線程調度器類型,此時我們傳入的是AndroidSchedulers.mainThread())烙懦,由之前分析可知我們此時應該去看SingleObserveOn的subscribeActual方法調用
protected void subscribeActual(final SingleObserver<? super T> observer) {
source.subscribe(new ObserveOnSingleObserver<>(observer, scheduler));
}
此方法中調用了其上游的subscribe方法驱入,和之前分析的數據流轉過程一致,需要依次調用到最根節(jié)點的subscribe氯析,參數傳入的是封裝后的觀察者ObserveOnSingleObserver亏较,其構造方法中傳入了下游觀察者和線程調度類型,接下來我們看一下當ObserveOnSingleObserver收到上游傳下來的數據后進行了怎樣的操作掩缓。
static final class ObserveOnSingleObserver<T> extends AtomicReference<Disposable>
implements SingleObserver<T>, Disposable, Runnable {
private static final long serialVersionUID = 3528003840217436037L;
final SingleObserver<? super T> downstream;
final Scheduler scheduler;
T value;
Throwable error;
ObserveOnSingleObserver(SingleObserver<? super T> actual, Scheduler scheduler) {
this.downstream = actual;
this.scheduler = scheduler;
}
@Override
public void onSubscribe(Disposable d) {
if (DisposableHelper.setOnce(this, d)) {
downstream.onSubscribe(this);
}
}
@Override
public void onSuccess(T value) {
this.value = value;
Disposable d = scheduler.scheduleDirect(this);
DisposableHelper.replace(this, d);
}
@Override
public void run() {
Throwable ex = error;
if (ex != null) {
downstream.onError(ex);
} else {
downstream.onSuccess(value);
}
}
...
}
可以看到在onSuccess方法中調用了scheduler.scheduleDirect(this)雪情,并穿了個this,而且自身實現(xiàn)了runnable接口你辣,由之前分析可知巡通,run方法會在某一時刻被調用。傳入的scheduler是AndroidSchedulers.mainThread()
其返回的是HandlerScheduler
舍哄,其內部封裝了個Handler宴凉,將Runnable 弄到主線程去執(zhí)行。最終結果就是ObserveOnSingleObserver的run方法在主線程中被調用表悬, 其run方法調用了下游觀察者downstream的onSuccess/onError跪解。 由此分析可知,observerOn方法控制此節(jié)點后的被觀察者收到數據時所在的線程,無法影響其上游節(jié)點叉讥。
最后
您的點贊收藏就是對我最大的鼓勵窘行! 歡迎關注我,分享Android干貨图仓,交流Android技術罐盔。 對文章有何見解,或者有何技術問題救崔,歡迎在評論區(qū)一起留言討論惶看!最后給大家分享一些Android相關的視頻教程,感興趣的朋友可以去看看六孵。
【Android源碼解析】Android中高級架構進階學習——百大框架源碼解析Retrofit/OkHttp/Glide/RxJava/EventBus...._嗶哩嗶哩_bilibili
Android流行框架零基礎入門到精通全套教程/熱修復/Glide/插件化/Retrofit/OKHTTP/Gson/組件化/Jetpack/IOC/高德地圖_嗶哩嗶哩_bilibili
Android開發(fā)進階學習—設計思想解讀開源框架 · 已更新至104集(持續(xù)更新中~)_嗶哩嗶哩_bilibili
價值100W+Android實戰(zhàn)項目大全/高級UI/靈動的錦鯉/QQ空間熱修復/插件化框架/組件化框架設計/網絡訪問框架/RXJava/IOC/MVVM/NDK_嗶哩嗶哩_bilibili
Android項目實戰(zhàn)-從0開始手把手實現(xiàn)組件化路由SDK項目實戰(zhàn)_嗶哩嗶哩_bilibili