前言
Rxjava
由于其基于事件流的鏈?zhǔn)秸{(diào)用、邏輯簡潔 & 使用簡單的特點(diǎn),深受各大 Android
開發(fā)者的歡迎。
如果還不了解RxJava潘靖,請看文章:Android:這是一篇 清晰 & 易懂的Rxjava 入門教程
- 今天,我將為大家?guī)?源碼分析:
Rxjava
的訂閱流程蚤蔓,其為Rxjava
使用的基本 & 核心卦溢,希望大家會喜歡。
Carson帶你學(xué)RxJava系列文章秀又,包括 原理单寂、操作符、應(yīng)用場景吐辙、背壓等等宣决,請關(guān)注看文章:Android:這是一份全面 & 詳細(xì)的RxJava學(xué)習(xí)指南
目錄
1. RxJava簡介
此處簡單介紹RxJava
若還不了解RxJava,請看文章:Android:這是一篇 清晰 & 易懂的Rxjava 入門教程
2. 訂閱流程 的使用
2.1 使用步驟
RxJava
的訂閱流程 使用方式 = 基于事件流的鏈?zhǔn)秸{(diào)用昏苏,具體步驟如下:
步驟1:創(chuàng)建被觀察者(Observable)
& 定義需發(fā)送的事件
步驟2:創(chuàng)建觀察者(Observer)
& 定義響應(yīng)事件的行為
步驟3:通過訂閱(subscribe)
連接觀察者和被觀察者
2.2 實(shí)例講解
// RxJava的鏈?zhǔn)讲僮? Observable.create(new ObservableOnSubscribe<Integer>() {
// 1. 創(chuàng)建被觀察者(Observable) & 定義需發(fā)送的事件
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onComplete();
}
}).subscribe(new Observer<Integer>() {
// 2. 創(chuàng)建觀察者(Observer) & 定義響應(yīng)事件的行為
// 3. 通過訂閱(subscribe)連接觀察者和被觀察者
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "開始采用subscribe連接");
}
// 默認(rèn)最先調(diào)用復(fù)寫的 onSubscribe()
@Override
public void onNext(Integer value) {
Log.d(TAG, "對Next事件"+ value +"作出響應(yīng)" );
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "對Error事件作出響應(yīng)");
}
@Override
public void onComplete() {
Log.d(TAG, "對Complete事件作出響應(yīng)");
}
});
}
}
- 運(yùn)行結(jié)果
3. 源碼分析
下面尊沸,我將根據(jù) 使用步驟 進(jìn)行RxJava
的源碼分析:
步驟1:創(chuàng)建被觀察者(Observable)
& 定義需發(fā)送的事件
步驟2:創(chuàng)建觀察者(Observer)
& 定義響應(yīng)事件的行為
步驟3:通過訂閱(subscribe)
連接觀察者和被觀察者
步驟1:創(chuàng)建被觀察者(Observable)& 定義需發(fā)送的事件
- 源碼分析如下
/**
* 使用步驟1:創(chuàng)建被觀察者(Observable)& 定義需發(fā)送的事件
**/
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onComplete();
}
})
/**
* 源碼分析:Observable.create(new ObservableOnSubscribe<Integer>(){...})
**/
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
...
// 僅貼出關(guān)鍵源碼
return new ObservableCreate<T>(source);
// 創(chuàng)建ObservableCreate類對象 ->>分析1
// 注:傳入source對象(即 我們手動創(chuàng)建的ObservableOnSubscribe對象)
}
/**
* 分析1:new ObservableCreate<T>(source)
**/
public final class ObservableCreate<T> extends Observable<T> {
// ObservableCreate類 = Observable的子類
...
// 僅貼出關(guān)鍵源碼
final ObservableOnSubscribe<T> source;
// 構(gòu)造函數(shù)
// 傳入了傳入source對象 = 手動創(chuàng)建的ObservableOnSubscribe對象
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
/**
* 重點(diǎn)關(guān)注:復(fù)寫了subscribeActual()
* 作用:訂閱時,通過接口回調(diào) 調(diào)用被觀察者(Observerable) 與 觀察者(Observer)的方法
**/
@Override
protected void subscribeActual(Observer<? super T> observer) {
// 1. 創(chuàng)建1個CreateEmitter對象(封裝成1個Disposable對象)
// 作用:發(fā)射事件
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
// 2. 調(diào)用觀察者(Observer)的onSubscribe()
// onSubscribe()的實(shí)現(xiàn) = 使用步驟2(創(chuàng)建觀察者(Observer))時復(fù)寫的onSubscribe()
observer.onSubscribe(parent);
try {
// 3. 調(diào)用source對象的subscribe()
// source對象 = 使用步驟1(創(chuàng)建被觀察者(Observable))中創(chuàng)建的ObservableOnSubscribe對象
// subscribe()的實(shí)現(xiàn) = 使用步驟1(創(chuàng)建被觀察者(Observable))中復(fù)寫的subscribe()->>分析2
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
/**
* 分析2:emitter.onNext("1");
* 此處僅講解subscribe()實(shí)現(xiàn)中的onNext()
* onError()捷雕、onComplete()類似椒丧,此處不作過多描述
**/
static final class CreateEmitter<T> extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable {
...
// 僅貼出關(guān)鍵代碼
// onNext()源碼分析
@Override
public void onNext(T t) {
// 注:發(fā)送的事件不可為空
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
// 若無斷開連接(調(diào)用Disposable.dispose()),則調(diào)用觀察者(Observer)的同名方法 = onNext()
// 觀察者的onNext()的內(nèi)容 = 使用步驟2中復(fù)寫內(nèi)容
if (!isDisposed()) {
observer.onNext(t);
}
}
// onError()救巷、onComplete()類似,此處不作過多描述
// 特別說明:調(diào)用該2方法句柠,最終都會自動調(diào)用dispose()浦译,即斷開觀察者 & 被觀察者的連接
@Override
public void onError(Throwable t) {
if (t == null) {
t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
}
if (!isDisposed()) {
try {
observer.onError(t);
} finally {
dispose();
}
} else {
RxJavaPlugins.onError(t);
}
}
@Override
public void onComplete() {
if (!isDisposed()) {
try {
observer.onComplete();
} finally {
dispose();
}
}
}
- 步驟1總結(jié)
步驟2:創(chuàng)建觀察者 & 定義響應(yīng)事件的行為
- 源碼分析
/**
* 使用步驟2:創(chuàng)建觀察者 & 定義響應(yīng)事件的行為(方法內(nèi)的創(chuàng)建對象代碼)
**/
subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "開始采用subscribe連接");
}
// 默認(rèn)最先調(diào)用復(fù)寫的 onSubscribe()
@Override
public void onNext(Integer value) {
Log.d(TAG, "對Next事件"+ value +"作出響應(yīng)" );
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "對Error事件作出響應(yīng)");
}
@Override
public void onComplete() {
Log.d(TAG, "對Complete事件作出響應(yīng)");
}
});
/**
* 源碼分析:Observer類
**/
public interface Observer<T> {
// 注:Observer本質(zhì) = 1個接口
// 接口內(nèi)含4個方法,分別用于 響應(yīng) 對應(yīng)于被觀察者發(fā)送的不同事件
void onSubscribe(@NonNull Disposable d); // 內(nèi)部參數(shù):Disposable 對象溯职,可結(jié)束事件
void onNext(@NonNull T t);
void onError(@NonNull Throwable e);
void onComplete();
}
/**
* 特別說明:Subscriber類
* 定義:RxJava 內(nèi)置的一個實(shí)現(xiàn)了 Observer 的抽象類
* 作用:擴(kuò)展Observer 接口 = 新增了2個方法 =
* 1. onStart():在還未響應(yīng)事件前調(diào)用精盅,用于初始化工作
* 2. unsubscribe():用于取消訂閱。在該方法被調(diào)用后谜酒,觀察者將不再接收 & 響應(yīng)事件
* 注:調(diào)用該方法前叹俏,先使用 isUnsubscribed() 判斷狀態(tài),確定被觀察者Observable是否還持有觀察者Subscriber的引用僻族;若引用不能及時釋放粘驰,就會出現(xiàn)內(nèi)存泄露
* 使用方式:與Observer使用幾乎相同(實(shí)質(zhì)上,Observer總是會先被轉(zhuǎn)換成Subscriber再使用)
**/
Subscriber<String> subscriber = new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
Log.d(TAG, "開始采用subscribe連接");
}
@Override
public void onNext(Integer value) {
Log.d(TAG, "對Next事件作出響應(yīng)" + value);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "對Error事件作出響應(yīng)");
}
@Override
public void onComplete() {
Log.d(TAG, "對Complete事件作出響應(yīng)");
}
};
步驟3:通過訂閱(subscribe)連接觀察者和被觀察者
- 源碼分析
/**
* 使用步驟3:通過訂閱(subscribe)連接觀察者和被觀察者 = subscribe()
**/
subscribe(new Observer<Integer>() {
// 2. 通過通過訂閱(subscribe)連接觀察者和被觀察者
// 3. 創(chuàng)建觀察者 & 定義響應(yīng)事件的行為
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "開始采用subscribe連接");
}
// 默認(rèn)最先調(diào)用復(fù)寫的 onSubscribe()
@Override
public void onNext(Integer value) {
Log.d(TAG, "對Next事件"+ value +"作出響應(yīng)" );
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "對Error事件作出響應(yīng)");
}
@Override
public void onComplete() {
Log.d(TAG, "對Complete事件作出響應(yīng)");
}
});
/**
* 源碼分析:Observable.subscribe(observer)
* 說明:該方法屬于 Observable 類的方法(注:傳入1個 Observer 對象)
**/
@Override
public final void subscribe(Observer<? super T> observer) {
...
// 僅貼出關(guān)鍵源碼
subscribeActual(observer);
// 繼續(xù)往下看:分析1
}
/**
* Observable.subscribeActual(observer)
* 說明:屬于抽象方法述么,由子類實(shí)現(xiàn)蝌数;此處的子類 = 步驟1創(chuàng)建被觀察者(Observable)時創(chuàng)建的ObservableCreate類
* 即 在訂閱時,實(shí)際上是調(diào)用了步驟1創(chuàng)建被觀察者(Observable)時創(chuàng)建的ObservableCreate類里的subscribeActual()
* 此時度秘,你應(yīng)該回頭看上面的步驟1里的subscribeActual()顶伞,應(yīng)該能理解RxJava的整個訂閱流程了。
**/
protected abstract void subscribeActual(Observer<? super T> observer);
-
總結(jié)
4. 源碼總結(jié)
- 在步驟1(創(chuàng)建被觀察者(
Observable
))、步驟2(創(chuàng)建觀察者(Observer
))時唆貌,僅僅只是定義了發(fā)送的事件 & 響應(yīng)事件的行為滑潘; - 只有在步驟3(訂閱時),才開始發(fā)送事件 & 響應(yīng)事件锨咙,真正連接了被觀察者 & 觀察者
- 具體源碼總結(jié)如下
5. 特別注意:涉及多個被觀察者(Observable)
的發(fā)送事件順序
- 具體描述
- 實(shí)例講解
/**
* 存在涉及多個被觀察者(Observable)的情況
**/
// 創(chuàng)建第1個被觀察者(Observable1)
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
}
})
// 使用flatMap操作符(內(nèi)部會創(chuàng)建第2個被觀察者(Observable2))
.flatMap(new Function<Integer, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(Integer integer) throws Exception {
final List<String> list = new ArrayList<>();
for (int i = 0; i < 3; i++) {
list.add("我是事件" + integer + "拆分后的子事件" + i);
// 通過flatMap中將被觀察者生產(chǎn)的事件序列先進(jìn)行拆分语卤,再將每個事件轉(zhuǎn)換為一個新的發(fā)送三個String事件
// 最終合并,再發(fā)送給被觀察者
}
return Observable.fromIterable(list);
}
})
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "開始采用subscribe連接");
}
// 默認(rèn)最先調(diào)用復(fù)寫的 onSubscribe()
@Override
public void onNext(String value) {
Log.d(TAG, "響應(yīng)事件:"+ value );
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "對Error事件作出響應(yīng)");
}
@Override
public void onComplete() {
Log.d(TAG, "對Complete事件作出響應(yīng)");
}
});
// 過程講解
// 調(diào)用順序:先回調(diào)Observable2的subscribe(Observer) 蓖租、subscribeActual(Observer)粱侣、再回調(diào)Observable1的subscribe(Observer) 、subscribeActual(Observer)
// Observable的發(fā)送順序 = 先發(fā)送Observable1蓖宦、再發(fā)送Observable2
-
測試結(jié)果
6. 總結(jié)
本文主要對
RxJava2
中 的訂閱流程進(jìn)行了源碼分析Carson帶你學(xué)RxJava系列文章:
入門
Carson帶你學(xué)Android:這是一篇清晰易懂的Rxjava入門教程
Carson帶你學(xué)Android:面向初學(xué)者的RxJava使用指南
Carson帶你學(xué)Android:RxJava2.0到底更新了什么齐婴?
原理
Carson帶你學(xué)Android:圖文解析RxJava原理
Carson帶你學(xué)Android:手把手帶你源碼分析RxJava
使用教程:操作符
Carson帶你學(xué)Android:RxJava操作符教程
Carson帶你學(xué)Android:RxJava創(chuàng)建操作符
Carson帶你學(xué)Android:RxJava功能性操作符
Carson帶你學(xué)Android:RxJava過濾操作符
Carson帶你學(xué)Android:RxJava組合/合并操作符
Carson帶你學(xué)Android:RxJava變換操作符
Carson帶你學(xué)Android:RxJava條件/布爾操作符
實(shí)戰(zhàn)
Carson帶你學(xué)Android:什么時候應(yīng)該使用Rxjava?(開發(fā)場景匯總)
Carson帶你學(xué)Android:RxJava線程控制(含實(shí)例講解)
Carson帶你學(xué)Android:圖文詳解RxJava背壓策略
Carson帶你學(xué)Android:RxJava稠茂、Retrofit聯(lián)合使用匯總(含實(shí)例教程)
Carson帶你學(xué)Android:優(yōu)雅實(shí)現(xiàn)網(wǎng)絡(luò)請求嵌套回調(diào)
Carson帶你學(xué)Android:網(wǎng)絡(luò)請求輪詢(有條件)
Carson帶你學(xué)Android:網(wǎng)絡(luò)請求輪詢(無條件)
Carson帶你學(xué)Android:網(wǎng)絡(luò)請求出錯重連(結(jié)合Retrofit)
Carson帶你學(xué)Android:合并數(shù)據(jù)源
Carson帶你學(xué)Android:聯(lián)想搜索優(yōu)化
Carson帶你學(xué)Android:功能防抖
Carson帶你學(xué)Android:從磁盤/內(nèi)存緩存中獲取緩存數(shù)據(jù)
Carson帶你學(xué)Android:聯(lián)合判斷
歡迎關(guān)注Carson_Ho的簡書
不定期分享關(guān)于安卓開發(fā)的干貨柠偶,追求短、平睬关、快诱担,但卻不缺深度。