轉(zhuǎn)載自:https://xiaobailong24.me/2017/03/18/Android-RxJava2.x/
前言
最近學(xué)習(xí)了一下RxJava十兢,發(fā)現(xiàn)是個(gè)好東西,有點(diǎn)相見(jiàn)恨晚的感覺(jué)摇庙,一開始學(xué)習(xí)了RxJava 1.x旱物,看了很多國(guó)內(nèi)的博客,有點(diǎn)理解了跟匆,后來(lái)發(fā)現(xiàn)現(xiàn)在都 2.x 了异袄,于是各種搜索,最后發(fā)現(xiàn)Season_zlc寫的系列教程通俗易懂玛臂,非常適合初學(xué)者烤蜕。因此在這里記錄一下學(xué)習(xí)過(guò)程,感謝原作者的無(wú)私分享迹冤,希望在實(shí)戰(zhàn)中提升對(duì) RxJava 的理解讽营。
Gradle 配置
要在Android中使用RxJava2, 先添加Gradle配置,最新的版本可在GitHub找到泡徙。RxJava橱鹏、RxAndroid
1
2
compile "io.reactivex.rxjava2:rxjava:2.0.7"
compile "io.reactivex.rxjava2:rxandroid:2.0.1"
第一式 亢龍有悔
概述
網(wǎng)上也有很多介紹RxJava原理的文章,通常這些文章都從觀察者模式開始,先講觀察者莉兰,被觀察者挑围,訂閱關(guān)系巴拉巴拉一大堆,說(shuō)實(shí)話糖荒,當(dāng)我第一次看到這些文章的時(shí)候已經(jīng)被這些名詞給繞暈了杉辙,用了很長(zhǎng)的時(shí)間才理清楚它們之間的關(guān)系〈范洌可能是我太蠢了蜘矢,境界不夠皇筛,領(lǐng)會(huì)不到那么多高大上的名詞.
今天我用兩根水管代替觀察者和被觀察者, 試圖用通俗易懂的話把它們的關(guān)系解釋清楚, 在這里我將從事件流這個(gè)角度來(lái)說(shuō)明RxJava的基本工作原理啤月。
正文
先假設(shè)有兩根水管:
1.1
上面一根水管為事件產(chǎn)生的水管,叫它上游吧莹妒,下面一根水管為事件接收的水管叫它下游吧红碑。
兩根水管通過(guò)一定的方式連接起來(lái)舞吭,使得上游每產(chǎn)生一個(gè)事件,下游就能收到該事件句喷。注意這里和官網(wǎng)的事件圖是反過(guò)來(lái)的, 這里的事件發(fā)送的順序是先1,后2,后3這樣的順序, 事件接收的順序也是先1,后2,后3的順序, 我覺(jué)得這樣更符合我們普通人的思維, 簡(jiǎn)單明了.
這里的上游和下游就分別對(duì)應(yīng)著RxJava中的Observable和Observer镣典,它們之間的連接就對(duì)應(yīng)著subscribe(),因此這個(gè)關(guān)系用RxJava來(lái)表示就是:
//創(chuàng)建一個(gè)上游 Observable:
Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onComplete();
}
});
//創(chuàng)建一個(gè)下游 Observer
Observer<Integer> observer = new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "subscribe");
}
@Override
public void onNext(Integer value) {
Log.d(TAG, "" + value);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "error");
}
@Override
public void onComplete() {
Log.d(TAG, "complete");
}
};
//建立連接
observable.subscribe(observer);
這個(gè)運(yùn)行的結(jié)果就是:
12-02 03:37:17.818 4166-4166/zlc.season.rxjava2demo D/TAG: subscribe
12-02 03:37:17.819 4166-4166/zlc.season.rxjava2demo D/TAG: 1
12-02 03:37:17.819 4166-4166/zlc.season.rxjava2demo D/TAG: 2
12-02 03:37:17.819 4166-4166/zlc.season.rxjava2demo D/TAG: 3
12-02 03:37:17.819 4166-4166/zlc.season.rxjava2demo D/TAG: complete
注意: 只有當(dāng)上游和下游建立連接之后, 上游才會(huì)開始發(fā)送事件. 也就是調(diào)用了 subscribe() 方法之后才開始發(fā)送事件.
把這段代碼連起來(lái)寫就成了RxJava引以為傲的鏈?zhǔn)讲僮鳎?/p>
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onComplete();
}
}).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "subscribe");
}
@Override
public void onNext(Integer value) {
Log.d(TAG, "" + value);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "error");
}
@Override
public void onComplete() {
Log.d(TAG, "complete");
}
});
接下來(lái)解釋一下其中兩個(gè)陌生的玩意: ObservableEmitter 和 Disposable.
ObservableEmitter: Emitter 是發(fā)射器的意思唾琼,那就很好猜了兄春,這個(gè)就是用來(lái)發(fā)出事件的,它可以發(fā)出三種類型的事件锡溯,通過(guò)調(diào)用emitter的 onNext(T value)赶舆、 onComplete()和 onError(Throwable error)就可以分別發(fā)出next事件、complete事件和error事件祭饭。但是芜茵,請(qǐng)注意,并不意味著你可以隨意亂七八糟發(fā)射事件倡蝙,需要滿足一定的規(guī)則:
上游可以發(fā)送無(wú)限個(gè)onNext, 下游也可以接收無(wú)限個(gè)onNext.
當(dāng)上游發(fā)送了一個(gè)onComplete后, 上游onComplete之后的事件將會(huì)繼續(xù)發(fā)送, 而下游收到onComplete事件之后將不再繼續(xù)接收事件.
當(dāng)上游發(fā)送了一個(gè)onError后, 上游onError之后的事件將繼續(xù)發(fā)送, 而下游收到onError事件之后將不再繼續(xù)接收事件.
上游可以不發(fā)送onComplete或onError.
最為關(guān)鍵的是onComplete和onError必須唯一并且互斥, 即不能發(fā)多個(gè)onComplete, 也不能發(fā)多個(gè)onError, 也不能先發(fā)一個(gè)onComplete, 然后再發(fā)一個(gè)onError, 反之亦然注: 關(guān)于onComplete和onError唯一并且互斥這一點(diǎn), 是需要自行在代碼中進(jìn)行控制, 如果你的代碼邏輯中違背了這個(gè)規(guī)則, 并不一定會(huì)導(dǎo)致程序崩潰. 比如發(fā)送多個(gè)onComplete是可以正常運(yùn)行的, 依然是收到第一個(gè)onComplete就不再接收了, 但若是發(fā)送多個(gè)onError, 則收到第二個(gè)onError事件會(huì)導(dǎo)致程序會(huì)崩潰.
以上幾個(gè)規(guī)則用示意圖表示如下:| Action | 示意圖 || —————– | ———– || 只發(fā)送onNext事件 |
next
|| 發(fā)送onComplete事件 |
complete
|| 發(fā)送onError事件 |
error
|
介紹了ObservableEmitter, 接下來(lái)介紹Disposable, 這個(gè)單詞的字面意思是一次性用品,用完即可丟棄的. 那么在RxJava中怎么去理解它呢, 對(duì)應(yīng)于上面的水管的例子, 我們可以把它理解成兩根管道之間的一個(gè)機(jī)關(guān), 當(dāng)調(diào)用它的 dispose()方法時(shí), 它就會(huì)將兩根管道切斷, 從而導(dǎo)致下游收不到事件.
注意: 調(diào)用dispose()并不會(huì)導(dǎo)致上游不再繼續(xù)發(fā)送事件, 上游會(huì)繼續(xù)發(fā)送剩余的事件.
來(lái)看個(gè)例子, 我們讓上游依次發(fā)送1,2,3,complete,4,在下游收到第二個(gè)事件之后, 切斷水管, 看看運(yùn)行結(jié)果:
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
Log.d(TAG, "emit 1");
emitter.onNext(1);
Log.d(TAG, "emit 2");
emitter.onNext(2);
Log.d(TAG, "emit 3");
emitter.onNext(3);
Log.d(TAG, "emit complete");
emitter.onComplete();
Log.d(TAG, "emit 4");
emitter.onNext(4);
}
}).subscribe(new Observer<Integer>() {
private Disposable mDisposable;
private int i;
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "subscribe");
mDisposable = d;
}
@Override
public void onNext(Integer value) {
Log.d(TAG, "onNext: " + value);
i++;
if (i == 2) {
Log.d(TAG, "dispose");
mDisposable.dispose();
Log.d(TAG, "isDisposed : " + mDisposable.isDisposed());
}
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "error");
}
@Override
public void onComplete() {
Log.d(TAG, "complete");
}
});
運(yùn)行結(jié)果為:
12-02 06:54:07.728 7404-7404/zlc.season.rxjava2demo D/TAG: subscribe
12-02 06:54:07.728 7404-7404/zlc.season.rxjava2demo D/TAG: emit 1
12-02 06:54:07.728 7404-7404/zlc.season.rxjava2demo D/TAG: onNext: 1
12-02 06:54:07.728 7404-7404/zlc.season.rxjava2demo D/TAG: emit 2
12-02 06:54:07.728 7404-7404/zlc.season.rxjava2demo D/TAG: onNext: 2
12-02 06:54:07.728 7404-7404/zlc.season.rxjava2demo D/TAG: dispose
12-02 06:54:07.728 7404-7404/zlc.season.rxjava2demo D/TAG: isDisposed : true
12-02 06:54:07.728 7404-7404/zlc.season.rxjava2demo D/TAG: emit 3
12-02 06:54:07.728 7404-7404/zlc.season.rxjava2demo D/TAG: emit complete
12-02 06:54:07.728 7404-7404/zlc.season.rxjava2demo D/TAG: emit 4
從運(yùn)行結(jié)果我們看到, 在收到onNext 2這個(gè)事件后, 切斷了水管, 但是上游仍然發(fā)送了3, complete, 4這幾個(gè)事件, 而且上游并沒(méi)有因?yàn)榘l(fā)送了onComplete而停止. 同時(shí)可以看到下游的 onSubscribe()方法是最先調(diào)用的.Disposable的用處不止這些, 后面講解到了線程的調(diào)度之后, 我們會(huì)發(fā)現(xiàn)它的重要性. 隨著后續(xù)深入的講解, 我們會(huì)在更多的地方發(fā)現(xiàn)它的身影.另外, subscribe()有多個(gè)重載的方法:
public final Disposable subscribe() {}
public final Disposable subscribe(Consumer<? super T> onNext) {}
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError) {}
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete) {}
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Disposable> onSubscribe) {}
public final void subscribe(Observer<? super T> observer) {}
最后一個(gè)帶有Observer參數(shù)的我們已經(jīng)使用過(guò)了,這里對(duì)其他幾個(gè)方法進(jìn)行說(shuō)明.
不帶任何參數(shù)的 subscribe()表示下游不關(guān)心任何事件,你上游盡管發(fā)你的數(shù)據(jù)去吧, 老子可不管你發(fā)什么.
帶有一個(gè)Consumer參數(shù)的方法表示下游只關(guān)心onNext事件, 其他的事件我假裝沒(méi)看見(jiàn), 因此我們?nèi)绻恍枰猳nNext事件可以這么寫:
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
Log.d(TAG, "emit 1");
emitter.onNext(1);
Log.d(TAG, "emit 2");
emitter.onNext(2);
Log.d(TAG, "emit 3");
emitter.onNext(3);
Log.d(TAG, "emit complete");
emitter.onComplete();
Log.d(TAG, "emit 4");
emitter.onNext(4);
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "onNext: " + integer);
}
});
其他幾個(gè)方法同理, 這里就不一一解釋了.
第二式 飛龍?jiān)谔?br>
概述
上一節(jié)教程講解了最基本的RxJava2的使用, 在本節(jié)中, 我們將學(xué)習(xí)RxJava強(qiáng)大的線程控制.
正文
還是以之前的例子, 兩根水管:
2.1
正常情況下, 上游和下游是工作在同一個(gè)線程中的, 也就是說(shuō)上游在哪個(gè)線程發(fā)事件, 下游就在哪個(gè)線程接收事件.
在RxJava中, 當(dāng)我們?cè)谥骶€程中去創(chuàng)建一個(gè)上游Observable來(lái)發(fā)送事件, 則這個(gè)上游默認(rèn)就在主線程發(fā)送事件.
當(dāng)我們?cè)谥骶€程去創(chuàng)建一個(gè)下游Observer來(lái)接收事件, 則這個(gè)下游默認(rèn)就在主線程中接收事件, 來(lái)看段代碼:
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
Log.d(TAG, "Observable thread is : " + Thread.currentThread().getName());
Log.d(TAG, "emit 1");
emitter.onNext(1);
}
});
Consumer<Integer> consumer = new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "Observer thread is :" + Thread.currentThread().getName());
Log.d(TAG, "onNext: " + integer);
}
};
observable.subscribe(consumer);
}
在主線程中分別創(chuàng)建上游和下游, 然后將他們連接在一起, 同時(shí)分別打印出它們所在的線程, 運(yùn)行結(jié)果為:
D/TAG: Observable thread is : main
D/TAG: emit 1
D/TAG: Observer thread is :main
D/TAG: onNext: 1
這就驗(yàn)證了剛才所說(shuō), 上下游默認(rèn)是在同一個(gè)線程工作.
這樣肯定是滿足不了我們的需求的, 我們更多想要的是這么一種情況, 在子線程中做耗時(shí)的操作, 然后回到主線程中來(lái)操作UI, 用圖片來(lái)描述就是下面這個(gè)圖片:
2.2
在這個(gè)圖中, 我們用黃色水管表示子線程, 深藍(lán)色水管表示主線程.
要達(dá)到這個(gè)目的, 我們需要先改變上游發(fā)送事件的線程, 讓它去子線程中發(fā)送事件, 然后再改變下游的線程, 讓它去主線程接收事件. 通過(guò)RxJava內(nèi)置的線程調(diào)度器可以很輕松的做到這一點(diǎn). 接下來(lái)看一段代碼:
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
Log.d(TAG, "Observable thread is : " + Thread.currentThread().getName());
Log.d(TAG, "emit 1");
emitter.onNext(1);
}
});
Consumer<Integer> consumer = new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "Observer thread is :" + Thread.currentThread().getName());
Log.d(TAG, "onNext: " + integer);
}
};
observable.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(consumer);
}
還是剛才的例子, 只不過(guò)我們太添加了一點(diǎn)東西, 先來(lái)看看運(yùn)行結(jié)果:
D/TAG: Observable thread is : RxNewThreadScheduler-2
D/TAG: emit 1
D/TAG: Observer thread is :main
D/TAG: onNext: 1
可以看到, 上游發(fā)送事件的線程的確改變了, 是在一個(gè)叫 RxNewThreadScheduler-2的線程中發(fā)送的事件, 而下游仍然在主線程中接收事件, 這說(shuō)明我們的目的達(dá)成了, 接下來(lái)看看是如何做到的.和上一段代碼相比,這段代碼只不過(guò)是增加了兩行代碼:
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
簡(jiǎn)單的來(lái)說(shuō), subscribeOn() 指定的是上游發(fā)送事件的線程, observeOn() 指定的是下游接收事件的線程.
多次指定上游的線程只有第一次指定的有效, 也就是說(shuō)多次調(diào)用subscribeOn() 只有第一次的有效, 其余的會(huì)被忽略.
多次指定下游的線程是可以的, 也就是說(shuō)每調(diào)用一次observeOn() , 下游的線程就會(huì)切換一次.
在RxJava中, 已經(jīng)內(nèi)置了很多線程選項(xiàng)供我們選擇, 例如有
Schedulers.io() 代表io操作的線程, 通常用于網(wǎng)絡(luò),讀寫文件等io密集型的操作
Schedulers.computation() 代表CPU計(jì)算密集型的操作, 例如需要大量計(jì)算的操作
Schedulers.newThread() 代表一個(gè)常規(guī)的新線程
AndroidSchedulers.mainThread() 代表Android的主線程
這些內(nèi)置的Scheduler已經(jīng)足夠滿足我們開發(fā)的需求, 因此我們應(yīng)該使用內(nèi)置的這些選項(xiàng), 在RxJava內(nèi)部使用的是線程池來(lái)維護(hù)這些線程, 所有效率也比較高.
實(shí)踐
對(duì)于我們Android開發(fā)人員來(lái)說(shuō), 經(jīng)常會(huì)將一些耗時(shí)的操作放在后臺(tái), 比如網(wǎng)絡(luò)請(qǐng)求或者讀寫文件,操作數(shù)據(jù)庫(kù)等等,等到操作完成之后回到主線程去更新UI, 有了上面的這些基礎(chǔ), 那么現(xiàn)在我們就可以輕松的去做到這樣一些操作.
下面來(lái)舉幾個(gè)常用的場(chǎng)景.
網(wǎng)絡(luò)請(qǐng)求
Android中有名的網(wǎng)絡(luò)請(qǐng)求庫(kù)就那么幾個(gè), Retrofit能夠從中脫穎而出很大原因就是因?yàn)樗С諶xJava的方式來(lái)調(diào)用, 下面簡(jiǎn)單講解一下它的基本用法.
要使用 Retrofit,先添加Gradle配置,最新的版本可在GitHub找到九串。Retrofit、Gson converter寺鸥、RxJava2 Adapter猪钮、okhttp、logging-interceptor
//retrofit
compile 'com.squareup.retrofit2:retrofit:2.2.0'
//Gson converter
compile 'com.squareup.retrofit2:converter-gson:2.2.0'
//RxJava2 Adapter
compile 'com.squareup.retrofit2:adapter-rxjava2:2.2.0'
//okhttp
compile 'com.squareup.okhttp3:okhttp:3.6.0'
compile 'com.squareup.okhttp3:logging-interceptor:3.6.0'
隨后定義Api接口:
public interface Api {
@GET
Observable<LoginResponse> login(@Body LoginRequest request);
@GET
Observable<RegisterResponse> register(@Body RegisterRequest request);
}
接著創(chuàng)建一個(gè)Retrofit客戶端:
private static Retrofit create() {
OkHttpClient.Builder builder = new OkHttpClient().newBuilder();
builder.readTimeout(10, TimeUnit.SECONDS);
builder.connectTimeout(9, TimeUnit.SECONDS);
if (BuildConfig.DEBUG) {
HttpLoggingInterceptor interceptor = new HttpLoggingInterceptor();
interceptor.setLevel(HttpLoggingInterceptor.Level.BODY);
builder.addInterceptor(interceptor);
}
return new Retrofit.Builder().baseUrl(ENDPOINT)
.client(builder.build())
.addConverterFactory(GsonConverterFactory.create())
.addCallAdapterFactory(RxJava2CallAdapterFactory.create())
.build();
}
發(fā)起請(qǐng)求就很簡(jiǎn)單了:
Api api = retrofit.create(Api.class);
api.login(request)
.subscribeOn(Schedulers.io()) //在IO線程進(jìn)行網(wǎng)絡(luò)請(qǐng)求
.observeOn(AndroidSchedulers.mainThread()) //回到主線程去處理請(qǐng)求結(jié)果
.subscribe(new Observer<LoginResponse>() {
@Override
public void onSubscribe(Disposable d) {}
@Override
public void onNext(LoginResponse value) {}
@Override
public void onError(Throwable e) {
Toast.makeText(mContext, "登錄失敗", Toast.LENGTH_SHORT).show();
}
@Override
public void onComplete() {
Toast.makeText(mContext, "登錄成功", Toast.LENGTH_SHORT).show();
}
});
看似很完美, 但我們忽略了一點(diǎn), 如果在請(qǐng)求的過(guò)程中Activity已經(jīng)退出了, 這個(gè)時(shí)候如果回到主線程去更新UI, 那么APP肯定就崩潰了, 怎么辦呢, 上一節(jié)我們說(shuō)到了 Disposable , 說(shuō)它是個(gè)開關(guān), 調(diào)用它的dispose()方法時(shí)就會(huì)切斷水管, 使得下游收不到事件, 既然收不到事件, 那么也就不會(huì)再去更新UI了. 因此我們可以在Activity中將這個(gè)Disposable 保存起來(lái), 當(dāng)Activity退出時(shí), 切斷它即可.
那如果有多個(gè)Disposable 該怎么辦呢, RxJava中已經(jīng)內(nèi)置了一個(gè)容器 CompositeDisposable, 每當(dāng)我們得到一個(gè)Disposable時(shí)就調(diào)用 CompositeDisposable.add()將它添加到容器中, 在退出的時(shí)候, 調(diào)用CompositeDisposable.clear() 即可切斷所有的水管.
讀寫數(shù)據(jù)庫(kù)
上面說(shuō)了網(wǎng)絡(luò)請(qǐng)求的例子, 接下來(lái)再看看讀寫數(shù)據(jù)庫(kù), 讀寫數(shù)據(jù)庫(kù)也算一個(gè)耗時(shí)的操作, 因此我們也最好放在IO線程里去進(jìn)行, 這個(gè)例子就比較簡(jiǎn)單, 直接上代碼:
public Observable<List<Record>> readAllRecords() {
return Observable.create(new ObservableOnSubscribe<List<Record>>() {
@Override
public void subscribe(ObservableEmitter<List<Record>> emitter) throws Exception {
Cursor cursor = null;
try {
cursor = getReadableDatabase().rawQuery("select * from " + TABLE_NAME, new String[]{});
List<Record> result = new ArrayList<>();
while (cursor.moveToNext()) {
result.add(Db.Record.read(cursor));
}
emitter.onNext(result);
emitter.onComplete();
} finally {
if (cursor != null) {
cursor.close();
}
}
}
}).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread());
}
好了節(jié)就到這里吧, 后面將會(huì)教大家如何使用RxJava中強(qiáng)大的操作符. 通過(guò)使用這些操作符可以很輕松的做到各種吊炸天的效果.
第三式 見(jiàn)龍?jiān)谔?br>
概述
上一節(jié)講解了線程調(diào)度, 并且舉了兩個(gè)實(shí)際中的例子, 其中有一個(gè)登錄的例子, 不知大家有沒(méi)有想過(guò)這么一個(gè)問(wèn)題, 如果是一個(gè)新用戶, 必須先注冊(cè), 等注冊(cè)成功之后再自動(dòng)登錄該怎么做呢.很明顯, 這是一個(gè)嵌套的網(wǎng)絡(luò)請(qǐng)求, 首先需要去請(qǐng)求注冊(cè), 待注冊(cè)成功回調(diào)了再去請(qǐng)求登錄的接口.我們當(dāng)然可以想當(dāng)然的寫成這樣:
private void login() {
api.login(new LoginRequest())
.subscribeOn(Schedulers.io()) //在IO線程進(jìn)行網(wǎng)絡(luò)請(qǐng)求
.observeOn(AndroidSchedulers.mainThread()) //回到主線程去處理請(qǐng)求結(jié)果
.subscribe(new Consumer<LoginResponse>() {
@Override
public void accept(LoginResponse loginResponse) throws Exception {
Toast.makeText(MainActivity.this, "登錄成功", Toast.LENGTH_SHORT).show();
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
Toast.makeText(MainActivity.this, "登錄失敗", Toast.LENGTH_SHORT).show();
}
});
}
private void register() {
api.register(new RegisterRequest())
.subscribeOn(Schedulers.io()) //在IO線程進(jìn)行網(wǎng)絡(luò)請(qǐng)求
.observeOn(AndroidSchedulers.mainThread()) //回到主線程去處理請(qǐng)求結(jié)果
.subscribe(new Consumer<RegisterResponse>() {
@Override
public void accept(RegisterResponse registerResponse) throws Exception {
Toast.makeText(MainActivity.this, "注冊(cè)成功", Toast.LENGTH_SHORT).show();
login(); //注冊(cè)成功, 調(diào)用登錄的方法
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
Toast.makeText(MainActivity.this, "注冊(cè)失敗", Toast.LENGTH_SHORT).show();
}
});
}
這樣的代碼能夠工作, 但不夠優(yōu)雅, 通過(guò)本節(jié)的學(xué)習(xí), 可以讓我們用一種更優(yōu)雅的方式來(lái)解決這個(gè)問(wèn)題.
正文
先來(lái)看看最簡(jiǎn)單的變換操作符map吧.
Map
map是RxJava中最簡(jiǎn)單的一個(gè)變換操作符了, 它的作用就是對(duì)上游發(fā)送的每一個(gè)事件應(yīng)用一個(gè)函數(shù), 使得每一個(gè)事件都按照指定的函數(shù)去變化. 用事件圖表示如下:
map
圖中map中的函數(shù)作用是將圓形事件轉(zhuǎn)換為矩形事件, 從而導(dǎo)致下游接收到的事件就變?yōu)榱司匦?用代碼來(lái)表示這個(gè)例子就是:
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
}
}).map(new Function<Integer, String>() {
@Override
public String apply(Integer integer) throws Exception {
return "This is result " + integer;
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d(TAG, s);
}
});
在上游我們發(fā)送的是數(shù)字類型, 而在下游我們接收的是String類型, 中間起轉(zhuǎn)換作用的就是map操作符, 運(yùn)行結(jié)果為:
D/TAG: This is result 1
D/TAG: This is result 2
D/TAG: This is result 3
通過(guò)Map, 可以將上游發(fā)來(lái)的事件轉(zhuǎn)換為任意的類型, 可以是一個(gè)Object, 也可以是一個(gè)集合, 如此強(qiáng)大的操作符你難道不想試試?接下來(lái)我們來(lái)看另外一個(gè)廣為人知的操作符flatMap.
FlatMap
flatMap是一個(gè)非常強(qiáng)大的操作符, 先用一個(gè)比較難懂的概念說(shuō)明一下:FlatMap將一個(gè)發(fā)送事件的上游Observable變換為多個(gè)發(fā)送事件的Observables胆建,然后將它們發(fā)射的事件合并后放進(jìn)一個(gè)單獨(dú)的Observable里.這句話比較難以理解, 我們先通俗易懂的圖片來(lái)詳細(xì)的講解一下, 首先先來(lái)看看整體的一個(gè)圖片:
flatMap
先看看上游, 上游發(fā)送了三個(gè)事件, 分別是1,2,3, 注意它們的顏色.中間flatMap的作用是將圓形的事件轉(zhuǎn)換為一個(gè)發(fā)送矩形事件和一個(gè)三角形事件的新的上游Observable.
上游每發(fā)送一個(gè)事件, flatMap都將創(chuàng)建一個(gè)新的水管, 然后發(fā)送轉(zhuǎn)換之后的新的事件, 下游接收到的就是這些新的水管發(fā)送的數(shù)據(jù). 這里需要注意的是, flatMap并不保證事件的順序, 也就是圖中所看到的, 并不是事件1就在事件2的前面. 如果需要保證順序則需要使用concatMap.說(shuō)了原理, 我們還是來(lái)看看實(shí)際中的代碼如何寫吧:
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
}
}).flatMap(new Function<Integer, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(Integer integer) throws Exception {
final List<String> list = new ArrayList<>();
for (int i = 0; i < 3; i++) {
list.add("I am value " + integer);
}
return Observable.fromIterable(list).delay(10, TimeUnit.MILLISECONDS);
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d(TAG, s);
}
});
如代碼所示, 我們?cè)趂latMap中將上游發(fā)來(lái)的每個(gè)事件轉(zhuǎn)換為一個(gè)新的發(fā)送三個(gè)String事件的水管, 為了看到flatMap結(jié)果是無(wú)序的,所以加了10毫秒的延時(shí), 來(lái)看看運(yùn)行結(jié)果吧:
D/TAG: I am value 1
D/TAG: I am value 1
D/TAG: I am value 1
D/TAG: I am value 3
D/TAG: I am value 3
D/TAG: I am value 3
D/TAG: I am value 2
D/TAG: I am value 2
D/TAG: I am value 2
concatMap
這里也簡(jiǎn)單說(shuō)一下concatMap吧, 它和flatMap的作用幾乎一模一樣, 只是它的結(jié)果是嚴(yán)格按照上游發(fā)送的順序來(lái)發(fā)送的, 來(lái)看個(gè)代碼吧:
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
}
}).concatMap(new Function<Integer, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(Integer integer) throws Exception {
final List<String> list = new ArrayList<>();
for (int i = 0; i < 3; i++) {
list.add("I am value " + integer);
}
return Observable.fromIterable(list).delay(10,TimeUnit.MILLISECONDS);
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d(TAG, s);
}
});
只是將之前的flatMap改為了concatMap, 其余原封不動(dòng), 運(yùn)行結(jié)果如下:
D/TAG: I am value 1
D/TAG: I am value 1
D/TAG: I am value 1
D/TAG: I am value 2
D/TAG: I am value 2
D/TAG: I am value 2
D/TAG: I am value 3
D/TAG: I am value 3
D/TAG: I am value 3
可以看到, 結(jié)果仍然是有序的.
好了關(guān)于RxJava的操作符最基本的使用就講解到這里了, RxJava中內(nèi)置了許許多多的操作符, 這里通過(guò)講解map和flatMap只是起到一個(gè)拋磚引玉的作用, 關(guān)于其他的操作符只要大家按照本文的思路去理解, 再仔細(xì)閱讀文檔, 應(yīng)該是沒(méi)有問(wèn)題的了.
實(shí)踐
如何優(yōu)雅的解決嵌套請(qǐng)求, 只需要用flatMap轉(zhuǎn)換一下就行了.先回顧一下上一節(jié)的請(qǐng)求接口:
public interface Api {
@GET
Observable<LoginResponse> login(@Body LoginRequest request);
@GET
Observable<RegisterResponse> register(@Body RegisterRequest request);
}
可以看到登錄和注冊(cè)返回的都是一個(gè)上游Observable, 而我們的flatMap操作符的作用就是把一個(gè)Observable轉(zhuǎn)換為另一個(gè)Observable, 因此結(jié)果就很顯而易見(jiàn)了:
api.register(new RegisterRequest()) //發(fā)起注冊(cè)請(qǐng)求
.subscribeOn(Schedulers.io()) //在IO線程進(jìn)行網(wǎng)絡(luò)請(qǐng)求
.observeOn(AndroidSchedulers.mainThread()) //回到主線程去處理請(qǐng)求注冊(cè)結(jié)果
.doOnNext(new Consumer<RegisterResponse>() {
@Override
public void accept(RegisterResponse registerResponse) throws Exception {
//先根據(jù)注冊(cè)的響應(yīng)結(jié)果去做一些操作
}
})
.observeOn(Schedulers.io()) //回到IO線程去發(fā)起登錄請(qǐng)求
.flatMap(new Function<RegisterResponse, ObservableSource<LoginResponse>>() {
@Override
public ObservableSource<LoginResponse> apply(RegisterResponse registerResponse) throws Exception {
return api.login(new LoginRequest());
}
})
.observeOn(AndroidSchedulers.mainThread()) //回到主線程去處理請(qǐng)求登錄的結(jié)果
.subscribe(new Consumer<LoginResponse>() {
@Override
public void accept(LoginResponse loginResponse) throws Exception {
Toast.makeText(MainActivity.this, "登錄成功", Toast.LENGTH_SHORT).show();
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
Toast.makeText(MainActivity.this, "登錄失敗", Toast.LENGTH_SHORT).show();
}
});
從這個(gè)例子也可以看到我們切換線程是多么簡(jiǎn)單.下一節(jié)我們將會(huì)學(xué)到 zip 操作符.
第四式 鴻漸于陸
概述
下面學(xué)習(xí)zip這個(gè)操作符, 這個(gè)操作符也是比較牛逼的東西了烤低, 涉及到的東西也比較多, 主要是一些細(xì)節(jié)上的東西太多笆载, 通過(guò)學(xué)習(xí)這個(gè)操作符扑馁,可以為我們下一節(jié)的Backpressure做個(gè)鋪墊.
正文
Zip通過(guò)一個(gè)函數(shù)將多個(gè)Observable發(fā)送的事件結(jié)合到一起涯呻,然后發(fā)送這些組合到一起的事件. 它按照嚴(yán)格的順序應(yīng)用這個(gè)函數(shù)。它只發(fā)射與發(fā)射數(shù)據(jù)項(xiàng)最少的那個(gè)Observable一樣多的數(shù)據(jù)腻要。我們?cè)儆猛ㄋ滓锥膱D片來(lái)解釋一下:
zip
從這個(gè)圖中可以看見(jiàn), 這次上游和以往不同的是, 我們有兩根水管了.其中一根水管負(fù)責(zé)發(fā)送圓形事件 , 另外一根水管負(fù)責(zé)發(fā)送三角形事件 , 通過(guò)Zip操作符, 使得圓形事件和三角形事件 合并為了一個(gè)矩形事件.
組合的過(guò)程是分別從 兩根水管里各取出一個(gè)事件 來(lái)進(jìn)行組合, 并且一個(gè)事件只能被使用一次, 組合的順序是嚴(yán)格按照事件發(fā)送的順利 來(lái)進(jìn)行的, 也就是說(shuō)不會(huì)出現(xiàn)圓形1 事件和三角形B 事件進(jìn)行合并, 也不可能出現(xiàn)圓形2 和三角形A 進(jìn)行合并的情況.
最終下游收到的事件數(shù)量 是和上游中發(fā)送事件最少的那一根水管的事件數(shù)量 相同. 這個(gè)也很好理解, 因?yàn)槭菑拿恳桓?里取一個(gè)事件來(lái)進(jìn)行合并, 最少的 那個(gè)肯定就最先取完 , 這個(gè)時(shí)候其他的水管盡管還有事件 , 但是已經(jīng)沒(méi)有足夠的事件來(lái)組合了, 因此下游就不會(huì)收到剩余的事件了.
分析了大概的原理, 我們還是勞逸結(jié)合, 先來(lái)看看實(shí)際中的代碼怎么寫吧:
Observable<Integer> observable1 = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
Log.d(TAG, "emit 1");
emitter.onNext(1);
Log.d(TAG, "emit 2");
emitter.onNext(2);
Log.d(TAG, "emit 3");
emitter.onNext(3);
Log.d(TAG, "emit 4");
emitter.onNext(4);
Log.d(TAG, "emit complete1");
emitter.onComplete();
}
});
Observable<String> observable2 = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
Log.d(TAG, "emit A");
emitter.onNext("A");
Log.d(TAG, "emit B");
emitter.onNext("B");
Log.d(TAG, "emit C");
emitter.onNext("C");
Log.d(TAG, "emit complete2");
emitter.onComplete();
}
});
Observable.zip(observable1, observable2, new BiFunction<Integer, String, String>() {
@Override
public String apply(Integer integer, String s) throws Exception {
return integer + s;
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe");
}
@Override
public void onNext(String value) {
Log.d(TAG, "onNext: " + value);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
我們分別創(chuàng)建了兩個(gè)上游水管, 一個(gè)發(fā)送1,2,3,4,Complete, 另一個(gè)發(fā)送A,B,C,Complete, 接著用Zip把發(fā)出的事件組合, 來(lái)看看運(yùn)行結(jié)果吧:
D/TAG: onSubscribe
D/TAG: emit 1
D/TAG: emit 2
D/TAG: emit 3
D/TAG: emit 4
D/TAG: emit complete1
D/TAG: emit A
D/TAG: onNext: 1A
D/TAG: emit B
D/TAG: onNext: 2B
D/TAG: emit C
D/TAG: onNext: 3C
D/TAG: emit complete2
D/TAG: onComplete
結(jié)果似乎是對(duì)的… 但是總感覺(jué)什么地方不對(duì)勁…哪兒不對(duì)勁呢, 為什么感覺(jué)是水管一發(fā)送完了之后, 水管二才開始發(fā)送啊? 到底是不是呢, 我們來(lái)驗(yàn)證一下:
Observable<Integer> observable1 = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
Log.d(TAG, "emit 1");
emitter.onNext(1);
Thread.sleep(1000);
Log.d(TAG, "emit 2");
emitter.onNext(2);
Thread.sleep(1000);
Log.d(TAG, "emit 3");
emitter.onNext(3);
Thread.sleep(1000);
Log.d(TAG, "emit 4");
emitter.onNext(4);
Thread.sleep(1000);
Log.d(TAG, "emit complete1");
emitter.onComplete();
}
});
Observable<String> observable2 = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
Log.d(TAG, "emit A");
emitter.onNext("A");
Thread.sleep(1000);
Log.d(TAG, "emit B");
emitter.onNext("B");
Thread.sleep(1000);
Log.d(TAG, "emit C");
emitter.onNext("C");
Thread.sleep(1000);
Log.d(TAG, "emit complete2");
emitter.onComplete();
}
});
Observable.zip(observable1, observable2, new BiFunction<Integer, String, String>() {
@Override
public String apply(Integer integer, String s) throws Exception {
return integer + s;
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe");
}
@Override
public void onNext(String value) {
Log.d(TAG, "onNext: " + value);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
這次我們?cè)诿堪l(fā)送一個(gè)事件之后加入了一秒鐘的延時(shí), 來(lái)看看運(yùn)行結(jié)果吧, 注意這是個(gè)GIF圖:
zip
好像真的是先發(fā)送的水管一再發(fā)送的水管二呢, 為什么會(huì)有這種情況呢? 因?yàn)槲覀儍筛芏际沁\(yùn)行在同一個(gè)線程里, 同一個(gè)線程里執(zhí)行代碼肯定有先后順序呀.
因此我們來(lái)稍微改一下, 不讓他們?cè)谕粋€(gè)線程, 不知道怎么切換線程的, 請(qǐng)掉頭看前面幾節(jié).
Observable<Integer> observable1 = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
Log.d(TAG, "emit 1");
emitter.onNext(1);
Thread.sleep(1000);
Log.d(TAG, "emit 2");
emitter.onNext(2);
Thread.sleep(1000);
Log.d(TAG, "emit 3");
emitter.onNext(3);
Thread.sleep(1000);
Log.d(TAG, "emit 4");
emitter.onNext(4);
Thread.sleep(1000);
Log.d(TAG, "emit complete1");
emitter.onComplete();
}
}).subscribeOn(Schedulers.io());
Observable<String> observable2 = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
Log.d(TAG, "emit A");
emitter.onNext("A");
Thread.sleep(1000);
Log.d(TAG, "emit B");
emitter.onNext("B");
Thread.sleep(1000);
Log.d(TAG, "emit C");
emitter.onNext("C");
Thread.sleep(1000);
Log.d(TAG, "emit complete2");
emitter.onComplete();
}
}).subscribeOn(Schedulers.io());
Observable.zip(observable1, observable2, new BiFunction<Integer, String, String>() {
@Override
public String apply(Integer integer, String s) throws Exception {
return integer + s;
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe");
}
@Override
public void onNext(String value) {
Log.d(TAG, "onNext: " + value);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
好了, 這次我們讓水管都在IO線程里發(fā)送事件, 再來(lái)看看運(yùn)行結(jié)果:
D/TAG: onSubscribe
D/TAG: emit A
D/TAG: emit 1
D/TAG: onNext: 1A
D/TAG: emit B
D/TAG: emit 2
D/TAG: onNext: 2B
D/TAG: emit C
D/TAG: emit 3
D/TAG: onNext: 3C
D/TAG: emit complete2
D/TAG: onComplete
GIF圖:
zip
這下就對(duì)了嘛, 兩根水管同時(shí)開始發(fā)送, 每發(fā)送一個(gè), Zip就組合一個(gè), 再將組合結(jié)果發(fā)送給下游.不對(duì)呀! 可能細(xì)心點(diǎn)的朋友又看出端倪了, 第一根水管明明發(fā)送了四個(gè)數(shù)據(jù)+一個(gè)Complete, 之前明明還有的, 為啥到這里沒(méi)了呢?這是因?yàn)槲覀冎罢f(shuō)了, zip發(fā)送的事件數(shù)量跟上游中發(fā)送事件最少的那一根水管的事件數(shù)量是有關(guān)的, 在這個(gè)例子里我們第二根水管只發(fā)送了三個(gè)事件然后就發(fā)送了Complete, 這個(gè)時(shí)候盡管第一根水管還有事件4 和事件Complete 沒(méi)有發(fā)送, 但是它們發(fā)不發(fā)送還有什么意義呢? 所以本著節(jié)約是美德的思想, 就干脆打斷它的狗腿, 不讓它發(fā)了.至于前面的例子為什么會(huì)發(fā)送, 剛才不是已經(jīng)說(shuō)了是复罐!在!同雄家!一市栗!個(gè)!線咳短!程!里蛛淋!嗎A谩!:趾伞勾效!再問(wèn)老子打死你!有好事的程序員可能又要問(wèn)了叛甫, 那我不發(fā)送Complete呢层宫? 答案是顯然的, 上游會(huì)繼續(xù)發(fā)送事件, 但是下游仍然收不到那些多余的事件. 不信你可以試試.
實(shí)踐
學(xué)習(xí)了Zip的基本用法, 那么它在Android有什么用呢, 其實(shí)很多場(chǎng)景都可以用到Zip. 舉個(gè)例子.比如一個(gè)界面需要展示用戶的一些信息, 而這些信息分別要從兩個(gè)服務(wù)器接口中獲取, 而只有當(dāng)兩個(gè)都獲取到了之后才能進(jìn)行展示, 這個(gè)時(shí)候就可以用Zip了:首先分別定義這兩個(gè)請(qǐng)求接口:
public interface Api {
@GET
Observable<UserBaseInfoResponse> getUserBaseInfo(@Body UserBaseInfoRequest request);
@GET
Observable<UserExtraInfoResponse> getUserExtraInfo(@Body UserExtraInfoRequest request);
}
接著用Zip來(lái)打包請(qǐng)求:
Observable<UserBaseInfoResponse> observable1 =
api.getUserBaseInfo(new UserBaseInfoRequest()).subscribeOn(Schedulers.io());
Observable<UserExtraInfoResponse> observable2 =
api.getUserExtraInfo(new UserExtraInfoRequest()).subscribeOn(Schedulers.io());
Observable.zip(observable1, observable2,
new BiFunction<UserBaseInfoResponse, UserExtraInfoResponse, UserInfo>() {
@Override
public UserInfo apply(UserBaseInfoResponse baseInfo,
UserExtraInfoResponse extraInfo) throws Exception {
return new UserInfo(baseInfo, extraInfo);
}
}).observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<UserInfo>() {
@Override
public void accept(UserInfo userInfo) throws Exception {
//do something;
}
});
本次的教程就到這里吧.下一節(jié)我們將會(huì)學(xué)到 Flowable 以及理解Backpressure背壓的概念.
第五式 潛龍勿用
概述
這一節(jié)中我們將來(lái)學(xué)習(xí)Backpressure. 我看好多吃瓜群眾早已坐不住了, 別急, 我們先來(lái)回顧一下上一節(jié)講的Zip.
正文
上一節(jié)中我們說(shuō)到Zip可以將多個(gè)上游發(fā)送的事件組合起來(lái)發(fā)送給下游, 那大家有沒(méi)有想過(guò)一個(gè)問(wèn)題, 如果其中一個(gè)水管A發(fā)送事件特別快, 而另一個(gè)水管B發(fā)送事件特別慢, 那就可能出現(xiàn)這種情況, 發(fā)得快的水管A 已經(jīng)發(fā)送了1000個(gè)事件了, 而發(fā)的慢的水管B 才發(fā)一個(gè)出來(lái), 組合了一個(gè)之后水管A 還剩999個(gè)事件, 這些事件需要繼續(xù)等待水管B 發(fā)送事件出來(lái)組合, 那么這么多的事件是放在哪里的呢? 總有一個(gè)地方保存吧? 沒(méi)錯(cuò), Zip給我們的每一根水管都弄了一個(gè)水缸 , 用來(lái)保存這些事件, 用通俗易懂的圖片來(lái)表示就是:
zip
如圖中所示, 其中藍(lán)色的框框就是zip給我們的水缸! 它將每根水管發(fā)出的事件保存起來(lái), 等兩個(gè)水缸都有事件了之后就分別從水缸中取出一個(gè)事件來(lái)組合, 當(dāng)其中一個(gè)水缸是空的時(shí)候就處于等待的狀態(tài).題外話: 大家來(lái)分析一下這個(gè)水缸有什么特點(diǎn)呢? 它是按順序保存的, 先進(jìn)來(lái)的事件先取出來(lái), 這個(gè)特點(diǎn)是不是很熟悉呀? 沒(méi)錯(cuò), 這就是我們熟知的隊(duì)列, 這個(gè)水缸在Zip內(nèi)部的實(shí)現(xiàn)就是用的隊(duì)列, 感興趣的可以翻看源碼查看.好了回到正題上來(lái), 這個(gè)水缸有大小限制嗎? 要是一直往里存會(huì)怎樣? 我們來(lái)看個(gè)例子:
Observable<Integer> observable1 = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
for (int i = 0; ; i++) { //無(wú)限循環(huán)發(fā)事件
emitter.onNext(i);
}
}
}).subscribeOn(Schedulers.io());
Observable<String> observable2 = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("A");
}
}).subscribeOn(Schedulers.io());
Observable.zip(observable1, observable2, new BiFunction<Integer, String, String>() {
@Override
public String apply(Integer integer, String s) throws Exception {
return integer + s;
}
}).observeOn(AndroidSchedulers.mainThread()).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d(TAG, s);
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
Log.w(TAG, throwable);
}
});
在這個(gè)例子中, 我們分別創(chuàng)建了兩根水管, 第一根水管用機(jī)器指令的執(zhí)行速度來(lái)無(wú)限循環(huán)發(fā)送事件, 第二根水管隨便發(fā)送點(diǎn)什么, 由于我們沒(méi)有發(fā)送Complete事件, 因此第一根水管會(huì)一直發(fā)事件到它對(duì)應(yīng)的水缸里去, 我們來(lái)看看運(yùn)行結(jié)果是什么樣.運(yùn)行結(jié)果GIF圖:
zip
內(nèi)存占用以斜率為1的直線迅速上漲, 幾秒鐘就300多M , 最終報(bào)出了OOM:
zlc.season.rxjava2demo W/art: Throwing OutOfMemoryError "Failed to allocate a 28 byte allocation with
4194304 free bytes and 8MB until OOM;
zlc.season.rxjava2demo W/art: "main" prio=5 tid=1 Runnable
zlc.season.rxjava2demo W/art: | group="main" sCount=0 dsCount=0 obj=0x75188710 self=0x7fc0efe7ba00
zlc.season.rxjava2demo W/art: | sysTid=32686 nice=0 cgrp=default sched=0/0 handle=0x7fc0f37dc200
zlc.season.rxjava2demo W/art: | state=R schedstat=( 0 0 0 ) utm=948 stm=120 core=1 HZ=100
zlc.season.rxjava2demo W/art: | stack=0x7fff971e8000-0x7fff971ea000 stackSize=8MB
zlc.season.rxjava2demo W/art: | held mutexes= "mutator lock"(shared held)
zlc.season.rxjava2demo W/art: at java.lang.Integer.valueOf(Integer.java:742)
出現(xiàn)這種情況肯定是我們不想看見(jiàn)的, 這里就可以引出我們的Backpressure了, 所謂的Backpressure其實(shí)就是為了控制流量, 水缸存儲(chǔ)的能力畢竟有限, 因此我們還得從源頭去解決問(wèn)題, 既然你發(fā)那么快, 數(shù)據(jù)量那么大, 那我就想辦法不讓你發(fā)那么快唄.那么這個(gè)源頭到底在哪里, 究竟什么時(shí)候會(huì)出現(xiàn)這種情況, 這里只是說(shuō)的Zip這一個(gè)例子, 其他的地方會(huì)出現(xiàn)嗎? 帶著這個(gè)問(wèn)題我們來(lái)探究一下.我們讓事情變得簡(jiǎn)單一點(diǎn), 從一個(gè)單一的Observable說(shuō)起.來(lái)看段代碼:
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
for (int i = 0; ; i++) { //無(wú)限循環(huán)發(fā)事件
emitter.onNext(i);
}
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Thread.sleep(2000);
Log.d(TAG, "" + integer);
}
});
這段代碼很簡(jiǎn)單, 上游同樣無(wú)限循環(huán)的發(fā)送事件, 在下游每次接收事件前延時(shí)2秒. 上下游工作在同一個(gè)線程里, 來(lái)看下運(yùn)行結(jié)果:
5.3