前言
-
Rxjava
由于其基于事件流的鏈?zhǔn)秸{(diào)用、邏輯簡潔 & 使用簡單的特點匣屡,深受各大Android
開發(fā)者的歡迎坠敷。
-
本文主要:
- 面向 剛接觸
Rxjava
的初學(xué)者 - 提供了一份 清晰、簡潔铃剔、易懂的
Rxjava
入門教程
涵蓋 基本介紹循榆、原理 & 具體使用等
- 解決的是初學(xué)者不理解
Rxjava
原理 & 不懂得如何使用的問題
- 面向 剛接觸
希望你們會喜歡析恢。
- 本文主要基于
Rxjava 2.0
- 如果讀者還沒學(xué)習(xí)過
Rxjava 1.0
也沒關(guān)系,因為Rxjava 2.0
只是在Rxjava 1.0
上增加了一些新特性秧饮,本質(zhì)原理 & 使用基本相同
目錄
1. 定義
-
RxJava
在GitHub
的介紹:
RxJava:a library for composing asynchronous and event-based programs using observable sequences for the Java VM
// 翻譯:RxJava 是一個在 Java VM 上使用可觀測的序列來組成異步的映挂、基于事件的程序的庫
- 總結(jié):
RxJava
是一個 基于事件流泽篮、實現(xiàn)異步操作的庫
2. 作用
實現(xiàn)異步操作
類似于
Android
中的AsyncTask
、Handler
作用
3. 特點
由于 RxJava
的使用方式是:基于事件流的鏈?zhǔn)秸{(diào)用柑船,所以使得 RxJava
:
- 邏輯簡潔
- 實現(xiàn)優(yōu)雅
- 使用簡單
更重要的是帽撑,隨著程序邏輯的復(fù)雜性提高,它依然能夠保持簡潔 & 優(yōu)雅
4. 原理
4.1 生活例子引入
- 我用一個生活例子引入 & 講解
Rxjava
原理: 顧客到飯店吃飯
4.2 Rxjava原理介紹
Rxjava
原理 基于 一種擴(kuò)展的觀察者模式Rxjava
的擴(kuò)展觀察者模式中有4個角色:
角色 | 作用 | 類比 |
---|---|---|
被觀察者(Observable) | 產(chǎn)生事件 | 顧客 |
觀察者(Observer) | 接收事件鞍时,并給出響應(yīng)動作 | 廚房 |
訂閱(Subscribe) | 連接 被觀察者 & 觀察者 | 服務(wù)員 |
事件(Event) | 被觀察者 & 觀察者 溝通的載體 | 菜式 |
- 具體原理
請結(jié)合上述 顧客到飯店吃飯 的生活例子理解:
即RxJava
原理可總結(jié)為:被觀察者 (Observable)
通過 訂閱(Subscribe)
按順序發(fā)送事件 給觀察者 (Observer)
油狂, 觀察者(Observer)
按順序接收事件 & 作出對應(yīng)的響應(yīng)動作。具體如下圖:
至此寸癌,RxJava
原理講解完畢专筷。
5. 基本使用
- 本文只關(guān)注
RxJava
的基本使用,更深入的RxJava
使用請繼續(xù)關(guān)注Carson_Ho的RxJava系列 -
Rxjava
的使用方式有兩種:- 分步驟實現(xiàn):該方法主要為了深入說明
Rxjava
的原理 & 使用蒸苇,主要用于演示說明 - 基于事件流的鏈?zhǔn)秸{(diào)用:主要用于實際使用
- 分步驟實現(xiàn):該方法主要為了深入說明
5.1 方式1:分步驟實現(xiàn)
5.1.1 使用步驟
5.1.2 步驟詳解
步驟1:創(chuàng)建被觀察者 (Observable
)& 生產(chǎn)事件
- 即 顧客入飯店 - 坐下餐桌 - 點菜
- 具體實現(xiàn)
// 1\. 創(chuàng)建被觀察者 Observable 對象
Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
// create() 是 RxJava 最基本的創(chuàng)造事件序列的方法
// 此處傳入了一個 OnSubscribe 對象參數(shù)
// 當(dāng) Observable 被訂閱時磷蛹,OnSubscribe 的 call() 方法會自動被調(diào)用,即事件序列就會依照設(shè)定依次被觸發(fā)
// 即觀察者會依次調(diào)用對應(yīng)事件的復(fù)寫方法從而響應(yīng)事件
// 從而實現(xiàn)被觀察者調(diào)用了觀察者的回調(diào)方法 & 由被觀察者向觀察者的事件傳遞溪烤,即觀察者模式
// 2\. 在復(fù)寫的subscribe()里定義需要發(fā)送的事件
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
// 通過 ObservableEmitter類對象產(chǎn)生事件并通知觀察者
// ObservableEmitter類介紹
// a. 定義:事件發(fā)射器
// b. 作用:定義需要發(fā)送的事件 & 向觀察者發(fā)送事件
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onComplete();
}
});
<--擴(kuò)展:RxJava 提供了其他方法用于 創(chuàng)建被觀察者對象Observable -->
// 方法1:just(T...):直接將傳入的參數(shù)依次發(fā)送出來
Observable observable = Observable.just("A", "B", "C");
// 將會依次調(diào)用:
// onNext("A");
// onNext("B");
// onNext("C");
// onCompleted();
// 方法2:from(T[]) / from(Iterable<? extends T>) : 將傳入的數(shù)組 / Iterable 拆分成具體對象后味咳,依次發(fā)送出來
String[] words = {"A", "B", "C"};
Observable observable = Observable.from(words);
// 將會依次調(diào)用:
// onNext("A");
// onNext("B");
// onNext("C");
// onCompleted();
步驟2:創(chuàng)建觀察者 (Observer
)并 定義響應(yīng)事件的行為
- 即 開廚房 - 確定對應(yīng)菜式
- 發(fā)生的事件類型包括:
Next
事件、Complete
事件 &Error
事件檬嘀。具體如下:
- 具體實現(xiàn)
<--方式1:采用Observer 接口 -->
// 1\. 創(chuàng)建觀察者 (Observer )對象
Observer<Integer> observer = new Observer<Integer>() {
// 2\. 創(chuàng)建對象時通過對應(yīng)復(fù)寫對應(yīng)事件方法 從而 響應(yīng)對應(yīng)事件
// 觀察者接收事件前槽驶,默認(rèn)最先調(diào)用復(fù)寫 onSubscribe()
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "開始采用subscribe連接");
}
// 當(dāng)被觀察者生產(chǎn)Next事件 & 觀察者接收到時,會調(diào)用該復(fù)寫方法 進(jìn)行響應(yīng)
@Override
public void onNext(Integer value) {
Log.d(TAG, "對Next事件作出響應(yīng)" + value);
}
// 當(dāng)被觀察者生產(chǎn)Error事件& 觀察者接收到時鸳兽,會調(diào)用該復(fù)寫方法 進(jìn)行響應(yīng)
@Override
public void onError(Throwable e) {
Log.d(TAG, "對Error事件作出響應(yīng)");
}
// 當(dāng)被觀察者生產(chǎn)Complete事件& 觀察者接收到時掂铐,會調(diào)用該復(fù)寫方法 進(jìn)行響應(yīng)
@Override
public void onComplete() {
Log.d(TAG, "對Complete事件作出響應(yīng)");
}
};
<--方式2:采用Subscriber 抽象類 -->
// 說明:Subscriber類 = RxJava 內(nèi)置的一個實現(xiàn)了 Observer 的抽象類,對 Observer 接口進(jìn)行了擴(kuò)展
// 1\. 創(chuàng)建觀察者 (Observer )對象
Subscriber<String> subscriber = new Subscriber<Integer>() {
// 2\. 創(chuàng)建對象時通過對應(yīng)復(fù)寫對應(yīng)事件方法 從而 響應(yīng)對應(yīng)事件
// 觀察者接收事件前揍异,默認(rèn)最先調(diào)用復(fù)寫 onSubscribe()
@Override
public void onSubscribe(Subscription s) {
Log.d(TAG, "開始采用subscribe連接");
}
// 當(dāng)被觀察者生產(chǎn)Next事件 & 觀察者接收到時全陨,會調(diào)用該復(fù)寫方法 進(jìn)行響應(yīng)
@Override
public void onNext(Integer value) {
Log.d(TAG, "對Next事件作出響應(yīng)" + value);
}
// 當(dāng)被觀察者生產(chǎn)Error事件& 觀察者接收到時,會調(diào)用該復(fù)寫方法 進(jìn)行響應(yīng)
@Override
public void onError(Throwable e) {
Log.d(TAG, "對Error事件作出響應(yīng)");
}
// 當(dāng)被觀察者生產(chǎn)Complete事件& 觀察者接收到時衷掷,會調(diào)用該復(fù)寫方法 進(jìn)行響應(yīng)
@Override
public void onComplete() {
Log.d(TAG, "對Complete事件作出響應(yīng)");
}
};
<--特別注意:2種方法的區(qū)別辱姨,即Subscriber 抽象類與Observer 接口的區(qū)別 -->
// 相同點:二者基本使用方式完全一致(實質(zhì)上,在RxJava的 subscribe 過程中戚嗅,Observer總是會先被轉(zhuǎn)換成Subscriber再使用)
// 不同點:Subscriber抽象類對 Observer 接口進(jìn)行了擴(kuò)展雨涛,新增了兩個方法:
// 1\. onStart():在還未響應(yīng)事件前調(diào)用,用于做一些初始化工作
// 2\. unsubscribe():用于取消訂閱懦胞。在該方法被調(diào)用后替久,觀察者將不再接收 & 響應(yīng)事件
// 調(diào)用該方法前,先使用 isUnsubscribed() 判斷狀態(tài)医瘫,確定被觀察者Observable是否還持有觀察者Subscriber的引用渣叛,如果引用不能及時釋放碉碉,就會出現(xiàn)內(nèi)存泄露
步驟3:通過訂閱(Subscribe
)連接觀察者和被觀察者
- 即 顧客找到服務(wù)員 - 點菜 - 服務(wù)員下單到廚房 - 廚房烹調(diào)
- 具體實現(xiàn)
observable.subscribe(observer);
// 或者 observable.subscribe(subscriber)哀蘑;
- 擴(kuò)展說明
<-- Observable.subscribe(Subscriber) 的內(nèi)部實現(xiàn) -->
public Subscription subscribe(Subscriber subscriber) {
subscriber.onStart();
// 步驟1中 觀察者 subscriber抽象類復(fù)寫的方法,用于初始化工作
onSubscribe.call(subscriber);
// 通過該調(diào)用丈攒,從而回調(diào)觀察者中的對應(yīng)方法從而響應(yīng)被觀察者生產(chǎn)的事件
// 從而實現(xiàn)被觀察者調(diào)用了觀察者的回調(diào)方法 & 由被觀察者向觀察者的事件傳遞,即觀察者模式
// 同時也看出:Observable只是生產(chǎn)事件嫂便,真正的發(fā)送事件是在它被訂閱的時候念赶,即當(dāng) subscribe() 方法執(zhí)行時
}
5.2 方式2:優(yōu)雅的實現(xiàn)方法 - 基于事件流的鏈?zhǔn)秸{(diào)用
- 上述的實現(xiàn)方式是為了說明
Rxjava
的原理 & 使用 - 在實際應(yīng)用中,會將上述步驟&代碼連在一起怖竭,從而更加簡潔锥债、更加優(yōu)雅,即所謂的
RxJava
基于事件流的鏈?zhǔn)秸{(diào)用
// RxJava的鏈?zhǔn)讲僮? Observable.create(new ObservableOnSubscribe<Integer>() {
// 1\. 創(chuàng)建被觀察者 & 生產(chǎn)事件
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onComplete();
}
}).subscribe(new Observer<Integer>() {
// 2\. 通過通過訂閱(subscribe)連接觀察者和被觀察者
// 3\. 創(chuàng)建觀察者 & 定義響應(yīng)事件的行為
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "開始采用subscribe連接");
}
// 默認(rèn)最先調(diào)用復(fù)寫的 onSubscribe()
@Override
public void onNext(Integer value) {
Log.d(TAG, "對Next事件"+ value +"作出響應(yīng)" );
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "對Error事件作出響應(yīng)");
}
@Override
public void onComplete() {
Log.d(TAG, "對Complete事件作出響應(yīng)");
}
});
}
}
注:整體方法調(diào)用順序:觀察者.onSubscribe()> 被觀察者.subscribe()> 觀察者.onNext()>觀察者.onComplete()
這種 基于事件流的鏈?zhǔn)秸{(diào)用痊臭,使得RxJava
:
- 邏輯簡潔
- 實現(xiàn)優(yōu)雅
- 使用簡單
更重要的是哮肚,隨著程序邏輯的復(fù)雜性提高,它依然能夠保持簡潔 & 優(yōu)雅广匙。所以允趟,一般建議使用這種基于事件流的鏈?zhǔn)秸{(diào)用方式實現(xiàn)RxJava
。
特別注意
RxJava 2.x
提供了多個函數(shù)式接口 鸦致,用于實現(xiàn)簡便式的觀察者模式潮剪。具體如下:
以 Consumer
為例:實現(xiàn)簡便式的觀察者模式
Observable.just("hello").subscribe(new Consumer<String>() {
// 每次接收到Observable的事件都會調(diào)用Consumer.accept()
@Override
public void accept(String s) throws Exception {
System.out.println(s);
}
});
6. 實例說明
我將用一個實際工程實例來演示 Rxjava
的使用
6.1 方式1:分步驟實現(xiàn)
步驟1:加入依賴
compile 'io.reactivex.rxjava2:rxjava:2.0.1'
compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
步驟2:直接在MainActivity.java
中實現(xiàn)下述步驟
- 創(chuàng)建被觀察者
(Observable )
& 生產(chǎn)事件 - 創(chuàng)建觀察者
(Observer )
并 定義響應(yīng)事件的行為 - 通過訂閱
(Subscribe)
連接觀察者和被觀察者
public class MainActivity extends AppCompatActivity {
private static final String TAG = "Rxjava";
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
// 步驟1:創(chuàng)建被觀察者 Observable & 生產(chǎn)事件
// 即 顧客入飯店 - 坐下餐桌 - 點菜
// 1\. 創(chuàng)建被觀察者 Observable 對象
Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
// 2\. 在復(fù)寫的subscribe()里定義需要發(fā)送的事件
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
// 通過 ObservableEmitter類對象產(chǎn)生事件并通知觀察者
// ObservableEmitter類介紹
// a. 定義:事件發(fā)射器
// b. 作用:定義需要發(fā)送的事件 & 向觀察者發(fā)送事件
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onComplete();
}
});
// 步驟2:創(chuàng)建觀察者 Observer 并 定義響應(yīng)事件行為
// 即 開廚房 - 確定對應(yīng)菜式
Observer<Integer> observer = new Observer<Integer>() {
// 通過復(fù)寫對應(yīng)方法來 響應(yīng) 被觀察者
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "開始采用subscribe連接");
}
// 默認(rèn)最先調(diào)用復(fù)寫的 onSubscribe()
@Override
public void onNext(Integer value) {
Log.d(TAG, "對Next事件"+ value +"作出響應(yīng)" );
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "對Error事件作出響應(yīng)");
}
@Override
public void onComplete() {
Log.d(TAG, "對Complete事件作出響應(yīng)");
}
};
// 步驟3:通過訂閱(subscribe)連接觀察者和被觀察者
// 即 顧客找到服務(wù)員 - 點菜 - 服務(wù)員下單到廚房 - 廚房烹調(diào)
observable.subscribe(observer);
- 測試結(jié)果
6.2 方式2:基于事件流的鏈?zhǔn)秸{(diào)用方式
public class MainActivity extends AppCompatActivity {
private static final String TAG = "Rxjava";
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
// RxJava的流式操作
Observable.create(new ObservableOnSubscribe<Integer>() {
// 1\. 創(chuàng)建被觀察者 & 生產(chǎn)事件
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onComplete();
}
}).subscribe(new Observer<Integer>() {
// 2\. 通過通過訂閱(subscribe)連接觀察者和被觀察者
// 3\. 創(chuàng)建觀察者 & 定義響應(yīng)事件的行為
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "開始采用subscribe連接");
}
// 默認(rèn)最先調(diào)用復(fù)寫的 onSubscribe()
@Override
public void onNext(Integer value) {
Log.d(TAG, "對Next事件"+ value +"作出響應(yīng)" );
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "對Error事件作出響應(yīng)");
}
@Override
public void onComplete() {
Log.d(TAG, "對Complete事件作出響應(yīng)");
}
});
}
}
- 測試效果
實現(xiàn)效果同上
- Demo 下載地址
Carson_Ho的Github地址 = RxJava2系列:基礎(chǔ)使用
7. 額外說明
7.1 被觀察者 Observable的subscribe()具備多個重載的方法
public final Disposable subscribe() {}
// 表示觀察者不對被觀察者發(fā)送的事件作出任何響應(yīng)(但被觀察者還是可以繼續(xù)發(fā)送事件)
public final Disposable subscribe(Consumer<? super T> onNext) {}
// 表示觀察者只對被觀察者發(fā)送的Next事件作出響應(yīng)
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError) {}
// 表示觀察者只對被觀察者發(fā)送的Next事件 & Error事件作出響應(yīng)
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete) {}
// 表示觀察者只對被觀察者發(fā)送的Next事件、Error事件 & Complete事件作出響應(yīng)
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Disposable> onSubscribe) {}
// 表示觀察者只對被觀察者發(fā)送的Next事件分唾、Error事件 抗碰、Complete事件 & onSubscribe事件作出響應(yīng)
public final void subscribe(Observer<? super T> observer) {}
// 表示觀察者對被觀察者發(fā)送的任何事件都作出響應(yīng)
7.2 可采用 Disposable.dispose() 切斷觀察者 與 被觀察者 之間的連接
- 即觀察者 無法繼續(xù) 接收 被觀察者的事件,但被觀察者還是可以繼續(xù)發(fā)送事件
- 具體使用
// 主要在觀察者 Observer中 實現(xiàn)
Observer<Integer> observer = new Observer<Integer>() {
// 1\. 定義Disposable類變量
private Disposable mDisposable;
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "開始采用subscribe連接");
// 2\. 對Disposable類變量賦值
mDisposable = d;
}
@Override
public void onNext(Integer value) {
Log.d(TAG, "對Next事件"+ value +"作出響應(yīng)" );
if (value == 2) {
// 設(shè)置在接收到第二個事件后切斷觀察者和被觀察者的連接
mDisposable.dispose();
Log.d(TAG, "已經(jīng)切斷了連接:" + mDisposable.isDisposed());
}
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "對Error事件作出響應(yīng)");
}
@Override
public void onComplete() {
Log.d(TAG, "對Complete事件作出響應(yīng)");
}
};
- 效果圖
8. 總結(jié)
本文轉(zhuǎn)載自此處:http://www.reibang.com/p/a406b94f3188
作者:Carson_Ho
來源:簡書
著作權(quán)歸作者所有绽乔。