RxJava入門筆記

特別注意

下文中的所有 [Observable] 都是指可觀察者對(duì)象(被觀察者)這個(gè)概念,在 RxJava 中有一個(gè) [Observable] 的實(shí)現(xiàn),它的名字叫做 Observable.class,一個(gè)是概念一個(gè)是實(shí)現(xiàn),下面文章看的時(shí)候還請(qǐng)?zhí)貏e區(qū)分~

前言

RxJavaReactiveXJava 上的一個(gè)實(shí)現(xiàn).官網(wǎng)為:

http://reactivex.io/

筆者是一個(gè)做 Android 的開(kāi)發(fā)者,RxJava 從入門到現(xiàn)在的水平已經(jīng)過(guò)了兩年的時(shí)間.其實(shí)這東西入門真的挺難的.很多人看了幾篇介紹的博客就覺(jué)得自己會(huì)了.其實(shí)并不是這樣的.因?yàn)楹芏嗳丝戳酥笠廊徊幻靼變?nèi)在的很多東西,比如:

  • 線程切換
    • 切換訂閱線程
    • 信號(hào)分發(fā)線程
  • 操作符常用的有哪些,怎么用
  • 何為熱 Observable 和冷 Observable
  • 如何利用 RxJava 寫(xiě)多線程并發(fā)
  • 什么時(shí)候去使用 RxJava2 中的幾種 [Observable]
    • Observable
    • Flowable
    • Single
    • Completable
    • Maybe
  • 如何用RxJava 去搭建整體風(fēng)格是響應(yīng)式的 App 架構(gòu)

所以筆者寫(xiě)這個(gè)文章就是想分享和大家討論以上的情況.下面的內(nèi)容會(huì)陸陸續(xù)續(xù)的講解到以上的內(nèi)容, 但是順序不一定是上述的順序, 還請(qǐng)大家專心一點(diǎn). 我想讓大家真正的認(rèn)識(shí)RxJava,而并不是像現(xiàn)在 Android 大多使用的情況一樣,只是和 Rretrofit 結(jié)合一下做一下網(wǎng)絡(luò)請(qǐng)求.對(duì)于那些不支持 RxJava 的庫(kù)或者場(chǎng)景就不知道如何去設(shè)計(jì)成響應(yīng)式.好了,廢話就說(shuō)到這里了,下面我會(huì)對(duì) RxJava 整體做一個(gè)簡(jiǎn)單的介紹和用它到底能做什么

RxJava 到底是什么

官方翻譯

RxJava 是一個(gè)基于觀察者的庫(kù),利用可觀察的序列然后去編寫(xiě)基于異步的程序. 并且提供了大量的操作符可以讓您不再關(guān)注多線程和線程切換.讓你專注于業(yè)務(wù)流程的編寫(xiě)

通俗易懂的解釋

先羅列一下 RxJava 帶來(lái)的好處

  • 不再過(guò)多關(guān)注寫(xiě)多線程
  • 統(tǒng)一了異步和同步的代碼寫(xiě)法
  • 提供了方便移動(dòng)端小伙幫經(jīng)常要用的線程切換
  • 提供了大量的操作符幫助我們完成各種場(chǎng)景的需求
  • 消除了 Callback 回調(diào)地獄
  • 讓整個(gè)流程是一條線的寫(xiě)下來(lái),代碼邏輯清晰

RxJava 的壞處

  • 入門難度大!
  • 發(fā)生錯(cuò)誤需要了解比較深入的人才能很好的解決
  • 需要團(tuán)隊(duì)的人員對(duì) RxJava 的了解基本在一個(gè)水平, 不然容易

一個(gè)小故事了解 RxJava

RxJava 的世界中, EveryThing 都是 [Observable](被觀察者). 利用一個(gè)小故事來(lái)解釋什么是 RxJava 世界中的 !![Observable]!!(被觀察者)

有一個(gè)小伙叫做小金子,他在他想喝的時(shí)候會(huì)去超市買一箱牛奶.

方式1:每次想喝的時(shí)候我去到超市,和老板說(shuō),老板,給我送一箱牛奶,這是我住的地址,老板讓一個(gè)小伙 A 拿起牛奶就當(dāng)當(dāng)當(dāng)?shù)耐易〉牡胤娇?br> 方式2:在我第一次發(fā)現(xiàn)這個(gè)超市的時(shí)候,我就和超市老板說(shuō),老板,你給我一個(gè)電話好嗎,以后每次我想喝的時(shí)候打你電話告訴你住的地址,你就給我送一箱牛奶,老板每次收到我的電話都會(huì)叫小伙 A 當(dāng)當(dāng)當(dāng)?shù)耐易〉牡胤娇?/p>

方式3:我自己去超市買牛奶,自己把牛奶扛回來(lái),整個(gè)過(guò)程自己完成(這種是全程同步完成的,不在這次講解的范圍)

兩種方式都可以完成我的需求,但是不知道你們發(fā)現(xiàn)兩者的區(qū)別了沒(méi)有.

  • 第一種方式需要我每次想喝牛奶的時(shí)候都跑過(guò)去和老板說(shuō)了之后就會(huì)觸發(fā)送牛奶的操作
  • 第二種操作在我去過(guò)超市之后不會(huì)立馬觸發(fā)之后的操作,而是等到我后面的打了電話,才會(huì)觸發(fā)送牛奶的操作,而且我每打一次電話,就會(huì)觸發(fā)一次送牛奶的操作

那到底區(qū)別是什么呢黎比?

  • 第一種方式的觸發(fā)時(shí)機(jī)是去過(guò)超市之后,只要你去過(guò)超市就會(huì)觸發(fā)
  • 第二種方式讓一件事情或者一個(gè)動(dòng)作的觸發(fā)滯后**了, 需要你至少一次去過(guò)超市拿到電話號(hào)碼,并且后續(xù)打電話給超市老板

簡(jiǎn)單的可以把上面的描述分為兩個(gè)事情:

  • 描述要做的事情
  • 觸發(fā)做這件事情的時(shí)機(jī)

第一種方式就是把兩者結(jié)合到一起了

第二種方式是先和老板描述了要做的事情, 至于什么時(shí)候觸發(fā)完全是看后續(xù)的心情

所以 RxJava 簡(jiǎn)單的說(shuō)就是做了一件上述的事情,讓所有的動(dòng)作或者事情都可以滯后的發(fā)生,并且上面我們拿到的超市老板的電話,在 RxJava 中就是 [Observable](被觀察者,能夠?qū)ν?strong>發(fā)射信號(hào)(對(duì)應(yīng)上面的例子中就是小伙 A 送牛奶)). 當(dāng)你打電話給超市老板的時(shí)候,實(shí)際上就是對(duì)一個(gè) !![Observable]!!(被觀察者) 訂閱 的過(guò)程,當(dāng)你訂閱之后小伙 A 送牛奶

上面解釋了 RxJava 在上述的例子中扮演了什么角色,但是你們還不清楚,這么做到底帶來(lái)了什么好處

再看兩個(gè)場(chǎng)景,一個(gè)使用 RxJava 去完成這個(gè)邏輯(登錄緊接著獲取訂單信息),另一個(gè)不使用

// 使用原生寫(xiě)法
login(new CallBack(){
    void onSuccess(final User user){
        handler.post(new Runnable() {
            @Override
            public void run() {
                // 更新 User 信息
                getOrderDetail(new CallBack(){
                    handler.post(new Runnable() {
                        @Override
                        public void run() {
                            // 更新 Order 信息
                        }
                    });
                });
            }
        });
    }
});
// 使用 RxJava 
login()
    .observeOn(AndroidSchedulers.mainThread())
    .doOnSuccess(new Consumer<User>() {
        @Override
        public void accept(User user) throws Exception {
            // 更新 user 信息
        }
    })
    .observeOn(Schedulers.io())
    .flatMap(new Function<User, SingleSource<Order>>() {
        @Override
        public SingleSource<Order> apply(User user) throws Exception {
            return getOrderDetail();
        }
    })
    .observeOn(AndroidSchedulers.mainThread())
    .doOnSuccess(new Consumer<Order>() {
        @Override
        public void accept(Order order) throws Exception {
            // 更新 order 信息
        }
    })
    // 訂閱
    .subscribe();       

兩段代碼實(shí)現(xiàn)同樣的功能,表面上就能看出來(lái) RxJava 方式有以下幾個(gè)優(yōu)點(diǎn)

  • 代碼隨著邏輯的復(fù)雜依然保持清爽
  • 切換線程的方式是那么的簡(jiǎn)單明了

你不容易發(fā)現(xiàn)的優(yōu)點(diǎn)還有

  • 當(dāng)你訂閱之后能拿到一個(gè)可取消的對(duì)象,可用于本次流程的取消(原生寫(xiě)法虎谢。均驶。颓影。很難做到)
  • [Observable](被觀察者) 內(nèi)置很多的操作符,可以讓你輕松的完成工作,而不必關(guān)心內(nèi)在的實(shí)現(xiàn)

其實(shí)最終的效果和 Callback 的方式?jīng)]有什么實(shí)質(zhì)性的差別,無(wú)非就是 RxJava 讓你的代碼寫(xiě)起來(lái)更加的清爽、擺脫了 Callback 嵌套盾鳞、更方便的線程切換犬性、大量的內(nèi)置操作符的支持你平常的工作

而使用 RxJava 方式完成上述工作用一個(gè)小故事來(lái)說(shuō)如下:

  1. 小金子去到超市,要了一個(gè)老板的電話,打這個(gè)電話告訴老板地址就會(huì)送牛奶
  2. 我拿著第一步拿到的電話號(hào)碼來(lái)到冰激凌店,告訴冰激凌老板 超市老板的電話號(hào)碼 和我要做牛奶冰激凌的意向,我成功的拿到了冰激凌店的電話
  3. 最后當(dāng)我想吃牛奶冰激凌的時(shí)候,我就打電話給冰激凌店,冰激凌店就會(huì)打電話給超市要求送一箱牛奶到冰激凌店用于加工,我只要在家等待 冰激凌店 給我送做好的冰激凌就好了
  4. 以后我只要想吃牛奶冰激凌,就可以打電話冰激凌店,而不需要我每次從告訴超市老板我要買牛奶開(kāi)始

以上我希望我已經(jīng)能講明白 RxJava 到底是一個(gè)什么東西了,此時(shí)你再回過(guò)頭看下 RxJava 的定義,應(yīng)該能更理解一些了吧

RxJava 的幾個(gè)重要概念

signal(信號(hào))

在很多很多的文章中,這個(gè)詞叫法很多,比如:元素腾仅、Item乒裆、信號(hào)等等,這些說(shuō)的其實(shí)都是一個(gè)東西,說(shuō)的都是
ObServable 可能會(huì)發(fā)射出來(lái)的東西

Observable(被觀察者)

這里指的是可觀察者(被觀察者)這個(gè)概念推励,表示這個(gè)對(duì)象是可以被觀察的鹤耍。
理論上被觀察者可以是任何一個(gè)對(duì)象,everything
在你觀察的時(shí)候验辞,你可以收到它發(fā)射出來(lái)的信號(hào).在 RxJava 中有五種基本實(shí)現(xiàn).
因?yàn)檫@個(gè) Observable 名詞和 Observable 實(shí)現(xiàn)類重名了,所以之后的文章,我都會(huì)用 被觀察者 來(lái)表示它,而之后出現(xiàn)的 ObServable 都表示具體的 ObServable 實(shí)現(xiàn)類

  • Observable 描述可以 (發(fā)射 N 個(gè)信號(hào) + 結(jié)束信號(hào)) 或者 一個(gè)錯(cuò)誤
  • Flowable 描述可以 (發(fā)射 N 個(gè)信號(hào) + 結(jié)束信號(hào)) 或者 一個(gè)錯(cuò)誤, 在 Observable 的基礎(chǔ)上有背壓的實(shí)現(xiàn).
  • Single 描述可以發(fā)射 1 個(gè)信號(hào)或者一個(gè)錯(cuò)誤
  • Completable 描述可以發(fā)射 1 個(gè)完成信號(hào)或者一個(gè)錯(cuò)誤
  • Maybe 描述 可能 (發(fā)射 1 個(gè)信號(hào) + 結(jié)束信號(hào)) 或者 一個(gè)錯(cuò)誤

介紹了這五種操作符, 建議大家平時(shí)真的要注意區(qū)分, 不要什么場(chǎng)景都使用 Observable, 下面簡(jiǎn)單的教大家區(qū)分一下使用場(chǎng)景:

  • Single<T>
    • 方法原本是返回一個(gè)對(duì)象的, 包括集合類型,比如返回的是 List<User>, 那么轉(zhuǎn)化為 Single<List<User>>
    • Retrofit 請(qǐng)求接口的返回值
  • Observable
    • 想給調(diào)用者多個(gè)結(jié)果的
    • 方法原本是通過(guò)傳入 Callback 然后方法內(nèi)部調(diào)用 Callback 通知調(diào)用者的
  • Flowable
    • 在使用 Observable 的時(shí)候有背壓的時(shí)候使用 Flowable
  • Completable
    • 方法沒(méi)返回值的數(shù)據(jù), 比如返回的是 void
    • 執(zhí)行一段不需要結(jié)果的代碼, 也適合封裝成 Completable
  • Maybe 基本用不到......., 不在闡述

Observer(觀察者)

表示觀察者稿黄,是去觀察被觀察者的對(duì)象。一個(gè)很直觀的場(chǎng)景就是 Android 中的監(jiān)聽(tīng)點(diǎn)擊事件跌造,下面的 Button 就是被觀察者杆怕,listener 就是觀察者 button 就是一個(gè) 被觀察者

button.setOnClickListener(listener);

RxJava 一個(gè)簡(jiǎn)單原理圖和解釋

<img src="https://xiaojinzi.oss-cn-shanghai.aliyuncs.com/blogImages/20190624165127.png" width=400px height=400px />

看不懂沒(méi)關(guān)系,下面會(huì)用大白話來(lái)解釋清楚

首先有 AA,BB,CC 三個(gè)人, AA 是一個(gè)賣汽車的銷售員,BB 是奔馳4s店,CC 是一個(gè)奔馳汽車的提供商.而帥氣的我 小金子 是一個(gè)窮逼,準(zhǔn)備買車!,分為兩個(gè)過(guò)程

  • subscribe(訂閱)過(guò)程
    • 今天小金子來(lái)到店里面,和 AA 銷售員說(shuō)我要全款買輛車(就是這么霸氣),給我送到 xxx 地址.(小金子訂閱了 AA 銷售員的賣車服務(wù),對(duì)應(yīng) Subscriber 訂閱 [Observable])
    • AA 銷售員收到我的請(qǐng)求,首先先告訴了我一聲她已經(jīng)收到了我的訂車請(qǐng)求(對(duì)應(yīng) onSubscribe 方法)
    • AA 銷售員然后向 BB 4s店申請(qǐng)一個(gè)車(AA 訂閱 BB,對(duì)應(yīng) Subscriber 訂閱 [Observable])
    • BB 4s店告訴 AA 銷售員已經(jīng)收到你的請(qǐng)求 (對(duì)應(yīng) onSubscribe 方法)
    • BB 4s店向 CC 供應(yīng)商發(fā)出一個(gè)調(diào)用車輛請(qǐng)求(BB 訂閱 CC,對(duì)應(yīng) Subscriber 訂閱 [Observable])
    • CC 供應(yīng)商告訴 BB 4s店已經(jīng)收到你的請(qǐng)求 (對(duì)應(yīng) onSubscribe 方法)
  • signal信號(hào) 發(fā)射過(guò)程
    • CC 供應(yīng)商將準(zhǔn)備好的車送到了 BB 4s店
    • BB 4s店將收到的車送到了 AA 銷售員處
    • AA 銷售員將收到的車送到了 小金子 留下的 xxx 地址
    • 小金子 在 xxx 地址處收到了 AA 銷售員送過(guò)來(lái)的車

上面的例子我簡(jiǎn)明扼要的說(shuō)明了 RxJava 整個(gè)流程是如何執(zhí)行的,

  • 故事中每一個(gè)角色(除了小金子)都是一個(gè) [Observable], 描述了這個(gè)角色能提供什么樣的數(shù)據(jù)給訂閱者訂閱
  • 故事中每一個(gè)角色(除了頂層的 CC 供應(yīng)商)都內(nèi)置了一個(gè) Observer, 用于訂閱上游的 [Observable]
  • 故事中有兩個(gè)流程(所以為什么會(huì)有兩個(gè)切換線程的操作符 subscribeOnobserveOn)
    • 從后往前的一個(gè)訂閱的流程
    • 從前往后的一個(gè)信號(hào)發(fā)射流程

RxJava 線程切換原理

上面我們簡(jiǎn)單的描述了一下 RxJava 的原理,我們可以清楚一個(gè)事實(shí)那就是,每一個(gè)流程都會(huì)分為兩個(gè)過(guò)程

  • subscribe(訂閱)過(guò)程
  • signal信號(hào) 發(fā)射過(guò)程

所以這就對(duì)應(yīng)了 RxJava 為什么會(huì)有兩個(gè)切換線程的操作符,假設(shè)我們的業(yè)務(wù)流程從上而下應(yīng)該是

CC 供應(yīng)商 —> BB 4s店 —> AA 銷售員 —> 小金子

  • subscribeOn 是用于切換訂閱過(guò)程的線程,訂閱的過(guò)程和業(yè)務(wù)流程相反

    • 小金子 —> AA 銷售員 —> BB 4s店 —> CC 供應(yīng)商
  • observeOn 是用于切換 signal 發(fā)射的線程, signal信號(hào) 發(fā)射過(guò)程和業(yè)務(wù)流程一致

    • CC 供應(yīng)商 —> BB 4s店 —> AA 銷售員 —> 小金子

你會(huì)發(fā)現(xiàn) observeOn 切換線程的方向是和業(yè)務(wù)代碼的書(shū)寫(xiě)流程是一致的,所以很多人對(duì) observeOn 切換線程的大致理解是蒙對(duì)的,但是很多人對(duì) subscribeOn 操作符是一臉懵逼

這里簡(jiǎn)單的寫(xiě)了一段代碼,代碼做的事情很簡(jiǎn)單簡(jiǎn)單,最頂層的 [Observable] 是用于發(fā)送一個(gè) hello 字符串,然后 map 操作符在原有的基礎(chǔ)上拼接上 world,最終完成 hello world 這個(gè)信號(hào)的發(fā)射,然后多個(gè)地方被我插入了兩個(gè)切換線程的操作符

假設(shè)當(dāng)前線程是 MainThread


// 創(chuàng)建Observable(創(chuàng)建 Observable 的過(guò)程在當(dāng)前線程MainThread)
Single<String> singleObservable = Single.just("hello")
            // 這句話讓信號(hào)發(fā)射的線程切換到 Thread-3, 并用 Thread-3 線程繼續(xù)信號(hào)的發(fā)射
        .observeOn(Thread-3線程調(diào)度器)
            // 這句話讓訂閱的線程切換到 Thread-2,并用 Thread-2 線程繼續(xù)訂閱上游的 [Observable]
            .subscribeOn(Thread-2線程調(diào)度器)
        .map(new Function<String, String>() {
            @Override
            public String apply(String s) throws Exception {
                // 因?yàn)檫@里是信號(hào)的發(fā)射流程,所以這里的線程取決上游的最近的一個(gè) observeOn 操作符
                // 所以這里的線程是 Thread-3 而不是 Thread-2
                return s + " world";
            }
        })
            .doOnSubscribe(new Consumer<Disposable>() {
            @Override
            public void accept(Disposable disposable) throws Exception {
                // 這是訂閱流程的 Callback, 所以和下游的聲明的最近的一個(gè) subscribeOn 操作符
                // 你可以做一些事情,或者立馬取消訂閱, 通過(guò) disposable.dispose()
            }
        })
            // 這句話讓信號(hào)發(fā)射的線程切換到 Thread-4, 并用 Thread-4 線程繼續(xù)信號(hào)的發(fā)射
        .observeOn(Thread-4線程調(diào)度器)
            // 這句話讓訂閱的線程切換到 Thread-1,并用 Thread-1 線程繼續(xù)訂閱上游的 [Observable]
        .subscribeOn(Thread-1線程調(diào)度器); 

// 創(chuàng)建 Observer
SingleObserver<String> singleObserver = new SingleObserver<String>() {
    @Override
    public void onSubscribe(Disposable d) {
        // 這是訂閱流程的 OnSubscribe Callback, 取決于 Observer 在什么線程上訂閱 Observable
        // 所以這個(gè)地方的線程是 MainThread
        System.out.println("onSubscribe");
    }

    @Override
    public void onSuccess(String result) {
        // 因?yàn)檫@里是信號(hào)的發(fā)射流程,所以這里的線程取決上游的最近的一個(gè) observeOn 操作符
        // 所以這里是 Thread-4 線程
        System.out.println("onSuccess:" + result);
    }

    @Override
    public void onError(Throwable e) {
        // 因?yàn)檫@里是信號(hào)的發(fā)射流程,所以這里的線程取決上游的最近的一個(gè) observeOn 操作符
        // 所以這里是 Thread-4 線程
        System.out.println("onError");
    }
};

// singleObserver 訂閱 singleObservable, 在 MainThread 上
singleObservable.subscribe(singleObserver);

RxJava 常用的操作符有哪些

這里推薦一個(gè) rx 思想的操作符可視化的一個(gè)網(wǎng)站, 還是一個(gè)信號(hào)可移動(dòng)的

http://rxmarbles.com/

幾乎常用的操作符在上面都有, 我這里給大家稍微解釋下, 教大家如何去看

這里舉例看一個(gè) merge 操作符

image
  • 水平方向 表示時(shí)間的過(guò)去

  • 每一條線表示一個(gè) [Observable]

  • 每一個(gè)小球表示某個(gè) [Observable] 會(huì)發(fā)射的信號(hào)

  • 最底下的 [Observable] 表示隨著時(shí)間接受到的信號(hào)

  • 每條線條最后的豎直的線表示一個(gè) [Observable] 完成發(fā)射信號(hào)的時(shí)間點(diǎn)

  • 每一個(gè)操作符使用了之后還能返回一個(gè) [Observable] , 用于繼續(xù)使用操作符. 只不過(guò) [Observable] 中的信號(hào)類型可能會(huì)發(fā)生變化

了解了圖中各個(gè)部分的意義, 就可以來(lái)看 merge 操作符到底干了什么了

我們從圖中可以看到, 第一個(gè)和第二個(gè) [Observable] 發(fā)射的信號(hào)最終都在第三個(gè) [Observable] 中了. 并且是按照時(shí)間的順序的.

所以 merge 操作符是合并 N 個(gè) [Observable] 發(fā)射的信號(hào), 所有的信號(hào)按照發(fā)射的時(shí)間排列.

那你可能會(huì)問(wèn)了, 那 merge 操作符有啥用?請(qǐng)記住一句話, 在 RxJava 的世界中, 萬(wàn)物都是 [Observable], 所以可完成的事情很多,最常用的可以是如下使用:

  • N個(gè)無(wú)關(guān)聯(lián)的請(qǐng)求并發(fā)請(qǐng)求服務(wù)器
  • N 個(gè)相同的任務(wù)并發(fā)的執(zhí)行, 比如并發(fā)的執(zhí)行圖片的上傳,最終訂閱者拿到 N 的圖片的網(wǎng)絡(luò)地址

所以 merge 操作符是可以并發(fā)的去完成一些事情的, 從這里你可以感受到你沒(méi)有用到多線程,但是卻完成了多線程的工作. 而你用 RxJava 僅僅要做的只不過(guò)是每一個(gè)要讓 merge 合并的 [Observable] 切換訂閱線程到一個(gè)獨(dú)立的線程或者一個(gè) IO 線程即可.

我們繼續(xù)看一個(gè)操作符!

Map 操作符

image

我們可以看到 Map 操作符針做的事情特別簡(jiǎn)單, 就是改變每一個(gè)發(fā)射的信號(hào), 從上圖中我們看得出, 上圖的 Map(x ==> 10 * x) 的具體實(shí)現(xiàn)是把每一個(gè)信號(hào)都 * 10, 所以使用了就可以讓每一個(gè)信號(hào)都乘以 10 的效果.

Concat 操作符

image

有了上面兩個(gè)例子,相信這個(gè)操作符也不是什么問(wèn)題了.

我們可以看到, concat 操作符可以把 N 個(gè) [Observable] 發(fā)射的信號(hào)按照訂閱 [Observable] 的順序給排列

這非常符合我們要做一些有順序的事情

比如你要做兩件毫不相干的事情, 但是卻有順序, 那么 concat 再適合不過(guò)了.

如果你要做的事情不僅有順序,而且前后之間還有聯(lián)系, 那么情使用 FlatMap

FlatMap 操作符

FlatMap 可以把一個(gè)信號(hào)轉(zhuǎn)化為另一個(gè) [Observable], 這個(gè)操作符可以很好的做

一個(gè)邏輯和另一個(gè)邏輯的銜接.

比如請(qǐng)求 A 接口成功了之后, 需要使用 A 接口返回的數(shù)據(jù)繼續(xù)請(qǐng)求 B 接口.

更多的操作符

其他更多的操作符希望大家多去 http://rxmarbles.com/ 網(wǎng)站了解下

并且 RxJava 庫(kù)的每一個(gè)操作符方法的注釋上都有一個(gè)這個(gè)操作符的原理圖, 比如 filter 操作符

image

“Hot” and “Cold” Observables

下文中的

When does an Observable begin emitting its sequence of items? It depends on the Observable. A “hot” Observable may begin emitting items as soon as it is created, and so any observer who later subscribes to that Observable may start observing the sequence somewhere in the middle. A “cold” Observable, on the other hand, waits until an observer subscribes to it before it begins to emit items, and so such an observer is guaranteed to see the whole sequence from the beginning.

以上是官方文檔中對(duì)熱 Observable 和冷 Observable 的一個(gè)解釋. 英文還行的同學(xué)其實(shí)對(duì)上面的這段話應(yīng)該不難理解, 大概的意思就是:

一個(gè)冷 [Observable] , 不管訂閱者何時(shí)來(lái)訂閱它, 它都會(huì)從頭到尾的發(fā)射所有的信號(hào)給這個(gè)訂閱者. 而我們上述的的篇幅介紹的所有內(nèi)容都是屬于冷 [Observable] .

而一個(gè)熱的 [Observable] 可能發(fā)射信號(hào)當(dāng)它創(chuàng)建的時(shí)候, 當(dāng)訂閱者訂閱了這個(gè) 熱的 [Observable] 有可能只能收到訂閱關(guān)系建立之后的信號(hào). 熱 [Observable] 的行為取決于熱 [Observable] 的實(shí)現(xiàn)

作者建議一種方法, 可以十分方便的區(qū)分出來(lái)這個(gè) [Observable] 是一個(gè)熱的還是冷的

[Observable] 信號(hào)的產(chǎn)生是否和訂閱者的訂閱有關(guān). 請(qǐng)注意看前面的幾個(gè)字 "信號(hào)的產(chǎn)生"

因?yàn)橐粋€(gè)熱 [Observable] 信號(hào)的產(chǎn)生往往和訂閱者無(wú)關(guān), 比如你關(guān)注的一個(gè)博主發(fā)布了一篇文章, 而你作為關(guān)注者收到了, 這個(gè)博主發(fā)布這個(gè)文章不會(huì)因?yàn)槟闶欠耜P(guān)注它而收到影響! 而如果你之前沒(méi)有關(guān)注這個(gè)博主, 那么很可惜你就收不到這篇文章的推送了.

一個(gè)冷 [Observable] 信號(hào)是在訂閱者訂閱自己之后產(chǎn)生, 每一個(gè)訂閱者訂閱都會(huì)重新產(chǎn)生一份發(fā)射給訂閱者.

從上面的一個(gè)解釋可以看出, 熱 [Observable] 才是真的體現(xiàn)觀察者模式的 ,在我看來(lái)冷的 [Observable] 只是一個(gè) "假的" 觀察者模式.

我們上面所有的例子, 其實(shí)都是一個(gè)冷 [Observable] 的一個(gè)使用, 比如:

  • 請(qǐng)求 Retrofit 接口, Single<User>
  • 調(diào)用 Room 數(shù)據(jù)庫(kù)提供的 Single<User>
  • 一些流程代碼封裝的 [Observable] 的使用
  • …….

[Observable]RxJava 中的實(shí)現(xiàn)類為 Subject, 它既是一個(gè) [Observable] 也是一個(gè) Observer , 它有多個(gè)子類, 用于實(shí)現(xiàn)熱 [Observable] 不同的行為

  • 訂閱者訂閱了之后只能收到訂閱之后的信號(hào) <==> PublishSubject.class
  • 訂閱者訂閱了之后也能收到全部的信號(hào) <==> ReplaySubject.class
  • 訂閱者訂閱了之后只能收到最近發(fā)射的一個(gè)信號(hào), 沒(méi)發(fā)射則收不到 <==> BehaviorSubject.class
  • 訂閱者訂閱了之后只能收到最后一個(gè)信號(hào) <==> AsyncSubject.class
  • …...

那熱 [Observable] 到底在哪里使用壳贪?可以是以下幾個(gè)場(chǎng)景

  • 全局的靜態(tài)變量 A, 你需要做到修改了它, 通知那些對(duì)這個(gè)變量感興趣的地方, 你普通代碼寫(xiě)無(wú)非就是修改了之后發(fā)送一個(gè)廣播之類的, 而現(xiàn)在你就可以設(shè)計(jì)這個(gè)變量為一個(gè) Subject<A>, 你只需發(fā)射, 關(guān)心此數(shù)據(jù)的地方在你發(fā)射之前就已經(jīng)訂閱了它. 而你只要發(fā)射一個(gè)新的值, 他們就可以收到. 這和你關(guān)注別人公眾號(hào)就能收到文章一樣
  • 描述數(shù)據(jù)和數(shù)據(jù)之間的關(guān)系. 比如你想做一件事情是讓數(shù)據(jù) A 變化了之后能影響到 B, 其實(shí)做這件事情最好的辦法就是設(shè)計(jì)數(shù)據(jù)A 為發(fā)射信號(hào) A 的 [Observable] , 那么你只需要寫(xiě)一段代碼去訂閱 A, 然后收到信號(hào)之后修改 B. 而你不用 RxJava 實(shí)現(xiàn)是很惡心的, 因?yàn)槟阋诿恳粋€(gè)修改 A 數(shù)據(jù)的地方都要記得去修改 B
  • 作為 MVVM 框架實(shí)現(xiàn)的一部分, 這個(gè)下文會(huì)講到

RxJavaMVVM 是什么關(guān)系

RxJava 其實(shí)是可以看成是一個(gè) MVVM 的框架陵珍,它可以讓您的項(xiàng)目整體架構(gòu)是一個(gè) MVVM 的風(fēng)格. 這得益于 RxJava 中的熱 [Observable], 它可以描述任何一個(gè)數(shù)據(jù)和其他數(shù)據(jù)的一個(gè)關(guān)系. 可以組建出一個(gè)關(guān)系網(wǎng), 描述了哪個(gè)數(shù)據(jù)的變化會(huì)影響到哪個(gè)數(shù)據(jù), 利用觀察者的模式去寫(xiě)項(xiàng)目.

MVVM 只是一個(gè)概念, 是 Model-View-ViewModel的簡(jiǎn)寫(xiě), 具體的解釋, 可以進(jìn)入 百度百科 進(jìn)行了解 , 在 Android 中, 大家對(duì) MVVM 的理解可能就局限于 DataBinding 這個(gè)框架, 認(rèn)為它就是 MVVM.

DataBinding

這里還是要描述清楚, MVVM 只是一個(gè)概念, 它的實(shí)現(xiàn)中有一個(gè)叫做 DataBinding 的框架. DataBinding 框架幫助你實(shí)現(xiàn)了視圖中的控件和 ViewModel 中的數(shù)據(jù)進(jìn)行了一個(gè)雙向的綁定, 另外它也有提供數(shù)據(jù)的觀察者模式, 比如 ObservableField, 利用諸如 ObservableField 的類對(duì)真實(shí)的數(shù)據(jù)進(jìn)行包裝, 你就可以描述數(shù)據(jù)和數(shù)據(jù)之間的關(guān)系. 比如 B數(shù)據(jù) 的改變是因?yàn)?A數(shù)據(jù). 然后我們就可以利用 ObservableField 提供的監(jiān)聽(tīng)方法設(shè)置監(jiān)聽(tīng)然后拿到數(shù)據(jù) A 之后去影響 B.

為什么 RxJava 也可以是一個(gè) MVVM 框架

我們上面說(shuō)過(guò) RxJava 的熱 [Observable], 如果我們不手動(dòng)讓熱[Observable] 結(jié)束, 它可以永遠(yuǎn)都處于待發(fā)射信號(hào)的狀態(tài)(也就是沒(méi)有完成), 而我們的 App 在沒(méi)有被殺死之前, 其實(shí)很多時(shí)候就是在描述數(shù)據(jù)和數(shù)據(jù)撑碴、數(shù)據(jù)和視圖之間的關(guān)系. 而 RxJava[Observable] 正好可以勝任這件事情, 而且可以做的非常出色.下面我從簡(jiǎn)單的幾個(gè)例子中說(shuō)明 RxJavaMVVM 方面的好處:

用觀察者模式去描述數(shù)據(jù)之間的關(guān)系

MVP 模式下, 如果多個(gè)流程會(huì)影響到同一個(gè)視圖, 我們通常都是每一個(gè)流程的代碼中會(huì)去調(diào)用 View 層的接口達(dá)到我們的目的. 示意圖如下

graph LR
        A流程 --> A數(shù)據(jù)
        A1流程 --> A數(shù)據(jù)
    A流程 --> T視圖的顯示
    A1流程 --> T視圖的顯示
    B流程 --> B數(shù)據(jù)
    B流程 --> T視圖的顯示
    其他流程 --> 其他流程數(shù)據(jù)
    其他流程 --> T視圖的顯示

我們可以看到 T視圖 的顯示的代碼嵌入在每一個(gè)流程中, 當(dāng)以后維護(hù)的時(shí)候, 假如我其他地方一個(gè)地方修改到了 A產(chǎn)物, 我們都得記得去通知 T視圖 顯示, 那么很明顯我們的設(shè)計(jì)是失敗的, 因?yàn)槲覀兒罄m(xù)的代碼其實(shí)就是想修改 A產(chǎn)物 即可, 而我們還需關(guān)系 T視圖 的顯示. 很明顯不利于維護(hù)

而用熱 Observable 是怎么樣的呢?

graph TB
        A流程 --> A數(shù)據(jù)
        A1流程 --> A數(shù)據(jù)
    B流程 --> B數(shù)據(jù)
    其他流程 --> 其他流程數(shù)據(jù)
    A數(shù)據(jù) --> T視圖的顯示
    B數(shù)據(jù) --> T視圖的顯示
    其他流程數(shù)據(jù) --> T視圖的顯示

首先我們可以看見(jiàn)流程變清晰的, 每一個(gè)流程只會(huì)專注于修改自己流程上產(chǎn)生的數(shù)據(jù). 而我們還可以看到各個(gè)數(shù)據(jù)都指向了 T視圖 的顯示, 這里是因?yàn)?T視圖 的顯示去訂閱了 A數(shù)據(jù)朝墩、B數(shù)據(jù)其他數(shù)據(jù), 當(dāng)其中任何一個(gè)數(shù)據(jù)變化都能導(dǎo)致 T視圖 的變化. 很明顯這種方式后期維護(hù)的時(shí)候更加的容易并且不容易出錯(cuò).當(dāng)添加了一個(gè) A11 流程也是修改 A數(shù)據(jù) 的, 那么再也不需要關(guān)心 A數(shù)據(jù) 的修改還需要通知 T視圖 這件事了.

我們這種場(chǎng)景對(duì)應(yīng)的熱 ObservableBehaviorSubject 這個(gè)實(shí)現(xiàn)類, 示例代碼如下:

BehaviorSubject<A數(shù)據(jù)> aBehaviorSubject = BehaviorSubject.create();
BehaviorSubject<B數(shù)據(jù)> bBehaviorSubject = BehaviorSubject.create();

// 構(gòu)造函數(shù)
xxx(){
  aBehaviorSubject
      // 中間你還可以添加很多很多的數(shù)據(jù)錯(cuò)誤的操作符去寫(xiě)你的流程
        .subscribe(new Consumer<Object>() {
        @Override
        public void accept(Object o) throws Exception {
            // 通知 T視圖
            view.xxx();
        }
      });
  bBehaviorSubject
      // 中間你還可以添加很多很多的數(shù)據(jù)錯(cuò)誤的操作符去寫(xiě)你的流程
        .subscribe(new Consumer<Object>() {
        @Override
        public void accept(Object o) throws Exception {
            // 通知 T視圖
            view.xxx();
        }
      });
}

感覺(jué)代碼量上漲了醉拓?哈哈,確實(shí)是的, 但是我們要關(guān)注的是代碼的一個(gè)維護(hù)性, 如果使用 java8 代碼量會(huì)少很多.

上述有些人可以還是沒(méi)有多大的感覺(jué), 就覺(jué)得這只是把之前流程中的一些代碼抽取出來(lái)寫(xiě)而已.

但是請(qǐng)大家注意, 上述兩張圖表達(dá)的編碼方式和維護(hù)性其實(shí)完成不一樣. 圖一更加的專注數(shù)據(jù)的處理過(guò)程, 而圖2更專注于數(shù)據(jù)和數(shù)據(jù)伟姐、數(shù)據(jù)和視圖之間的關(guān)系. 當(dāng)你描述完這些關(guān)系之后, 你寫(xiě)代碼會(huì)更加的清晰明了并且不易出錯(cuò). 而且有一句話是最好的注釋其實(shí)是你的代碼, 用 RxJavaObservable 去描述數(shù)據(jù)和數(shù)據(jù)、數(shù)據(jù)和視圖之間的關(guān)系在代碼上就會(huì)有體現(xiàn), 而不是原始的方式你需要去研究各個(gè)的流程代碼之后才能得到 xxx 和 xxx 之間的一個(gè)關(guān)系.

基于第一點(diǎn)我們?cè)賮?lái)一個(gè)經(jīng)典例子(多表單驗(yàn)證)

我們都做過(guò)類似于登錄注冊(cè)的界面, 各個(gè)的影響關(guān)系的關(guān)系如下

graph LR
    用戶輸入name --> name輸入框
    name輸入框 --> check方法
        check方法 --> 按鈕是否可用
        點(diǎn)擊清除密碼圖標(biāo) --> password輸入框
        password輸入框 --> 清除密碼圖標(biāo)是否顯示
        用戶輸入password --> password輸入框
        password輸入框 --> check方法

代碼很簡(jiǎn)單, 很多人第一反應(yīng)就是去設(shè)置 name輸入框pass輸入框的文本改變監(jiān)聽(tīng), 然后在監(jiān)聽(tīng)方法中調(diào)用同一個(gè)方法 check(), 去檢查所有的輸入框是否滿足需求, 然后決定按鈕是否可用.

換句話說(shuō)就是上述的任何一個(gè)影響到 按鈕是否可用 的流程你都要調(diào)用檢查全部控件是否滿足條件的方法.這種方式其實(shí)是對(duì)的, 但是卻讓人寫(xiě)代碼不愉快

如果通過(guò) RxJava 來(lái)構(gòu)建他們的關(guān)系, 關(guān)系圖如下. 關(guān)系圖沒(méi)有變多少, 但是內(nèi)在的實(shí)現(xiàn)變化很大

graph LR
            用戶輸入name --> name輸入框
      name輸入框 --> ViewModel中的nameObservable
      點(diǎn)擊清除密碼圖標(biāo) --> password輸入框
      用戶輸入password --> password輸入框
      password輸入框 --> ViewModel中的passwordObservable
      ViewModel中的passwordObservable --> 清除密碼圖標(biāo)是否顯示
      ViewModel中的nameObservable --> 按鈕是否可用
      ViewModel中的passwordObservable --> 按鈕是否可用

配合操作符 combineLatest, 可以讓代碼變得非常的舒服

N 個(gè) [Observable], 任何一個(gè) [Observable] 發(fā)射一個(gè)信號(hào)都會(huì)產(chǎn)生一個(gè)結(jié)合信號(hào), 組合每一個(gè) [Observable] 最后一個(gè)信號(hào). 效果圖如下, 這個(gè)正好符合我們上述的多個(gè)輸入框的值影響到同一個(gè)視圖的場(chǎng)景.

image
// 名稱的 [Observable]
BehaviorSubject<String> nameBehaviorSubject = BehaviorSubject.create();
// 密碼的 [Observable]
BehaviorSubject<String> passwordBehaviorSubject = BehaviorSubject.create();
// 構(gòu)造函數(shù)
LoginViewModel() {
// 可以合并 N 個(gè) [Observable], 這里就賬號(hào)和密碼的數(shù)據(jù)
// 當(dāng)賬號(hào)和密碼的數(shù)據(jù)其中一個(gè)發(fā)生變化都會(huì)觸發(fā)一次組合合并出一個(gè)新的信號(hào)
    Observable
    .combineLatest(nameBehaviorSubject,passwordBehaviorSubject,....)
        .combineLatest(observable, observable, 
               (BiFunction<String, String, Boolean>) (name, pass) -> {
                             // 對(duì) name 和 pass 一頓判斷之后得出一個(gè) boolean
                       return result;
               }
    })
    .subscribe(aBoolean -> {
      // 拿到這個(gè) boolean 就可以去控制 按鈕是否可用啦
    })
}

// 以下我沒(méi)有用 databinding 的雙向綁定實(shí)現(xiàn), 用了最普通的代碼

//  Activity 監(jiān)聽(tīng) name 輸入框變化調(diào)用此方法
public void setName(String name) {
    nameBehaviorSubject.onNext(name);
}

//  Activity 監(jiān)聽(tīng) password 輸入框變化調(diào)用此方法
public void setPassword(String password) {
    passwordBehaviorSubject.onNext(password);
}

// Activity 初始化的時(shí)候就調(diào)用此方法訂閱 name Observable
// 請(qǐng)注意訂閱的這個(gè) Observable 是一個(gè)熱 [Observable], 不會(huì)因?yàn)槟愕挠嗛喍|發(fā)信號(hào)的產(chǎn)生
public Observable getNameObservable(){
    return nameBehaviorSubject
                        // 去重, 防止死循環(huán)
                        .distinct();
}

// Activity 初始化的時(shí)候就調(diào)用此方法訂閱 password Observable
// 請(qǐng)注意訂閱的這個(gè) Observable 是一個(gè)熱 [Observable], 不會(huì)因?yàn)槟愕挠嗛喍|發(fā)信號(hào)的產(chǎn)生
public Observable getPasswordObservable(){
    return passwordBehaviorSubject
                        // 去重, 防止死循環(huán)
                        .distinct();
}

用熱 [Observable] 代替靜態(tài)變量和廣播

我們 App 中經(jīng)常會(huì)有靜態(tài)變量, 它是一個(gè)好東西, 但是有一個(gè)很明顯的問(wèn)題是:

一個(gè)靜態(tài)變量的值的改變有時(shí)候一些地方是關(guān)心的, 是需要迫切的知道這個(gè)靜態(tài)變量已經(jīng)被改變了.

這個(gè)問(wèn)題其實(shí)可以讓訪問(wèn)這個(gè)變量的方式改變一下, 靜態(tài)變量不再是 public, 而是一個(gè) private 的, 為這個(gè)變量添加 get set 方法, 讓別的地方可以設(shè)置監(jiān)聽(tīng)器來(lái)監(jiān)聽(tīng)這個(gè)變量的值. 然后在 set 方法中我們可以發(fā)送一個(gè)廣播或者調(diào)用監(jiān)聽(tīng)器的回調(diào)通知外部. 這是完全可行的, 并且是易于維護(hù)的.

上面的優(yōu)化方式使用 RxJava 的熱 [Observable] 更為方便. 你只需設(shè)計(jì)原有的靜態(tài)變量為

// 熱 [Observable]
BehaviorSubject<String> xxxBehaviorSubject = BehaviorSubject.create();

別的地方如果關(guān)心這個(gè)變量只需訂閱即可, 如果后面需要改變變量的值, 只需執(zhí)行下面的代碼發(fā)射一個(gè)新的信號(hào)即可

xxxBehaviorSubject.onNext("newString");

這樣子所有關(guān)心此數(shù)據(jù)的地方都會(huì)收到通知, 這種方式不僅簡(jiǎn)寫(xiě)了代碼, 而且代替了廣播和少設(shè)計(jì)了監(jiān)聽(tīng)器.

這種屬于比較深入的使用 RxJava 了, 市面上很多的博客其實(shí)都是抄來(lái)抄去, 講的都是操作符怎么使用. 其實(shí)沒(méi)啥意思. 很多博客的比喻還都是錯(cuò)誤的, 就好比一篇博客中拿 開(kāi)關(guān)燈泡 之間的關(guān)系來(lái)舉例, 說(shuō)的都是冷 [Observable]的內(nèi)容, 但是這個(gè)例子其實(shí)是一個(gè)熱 [Observable]的典型代表. 你覺(jué)得你燈泡訂閱了開(kāi)關(guān) 和 開(kāi)關(guān)產(chǎn)生 開(kāi)關(guān) 的信號(hào)有關(guān)嗎亿卤?

開(kāi)關(guān)產(chǎn)生信號(hào)和我們?nèi)耸裁慈ビ|發(fā)有關(guān)系, 和燈泡的訂閱無(wú)關(guān)!熱冷 [Observable] 的我小金子說(shuō)的最典型的兩個(gè)例子為:

  • 老板給我一箱牛奶
  • 老板, 這是我的電話號(hào)碼, 以后你店進(jìn)一次牛奶, 就給我送一箱

這兩種不用我說(shuō)誰(shuí)是冷的誰(shuí)是熱的了吧愤兵?自己好好體會(huì)去吧.

如何利用 RxJava 寫(xiě)多線程并發(fā)

很多人其實(shí)用 RxJava 根本不會(huì)寫(xiě)并發(fā), 也不知道從哪里看. 其實(shí)這和我們平常寫(xiě)代碼是一樣的, 只不過(guò)要轉(zhuǎn)化為 RxJava 的方式. 為什么要用 RxJava 的方式?因?yàn)樗峁┑牟僮鞣芎?jiǎn)單呀, 我們根本不用例會(huì)底層是如何實(shí)現(xiàn)的, 也不用管線程之間的消息的傳遞, 更不用管如果丟棄操作.下面這塊我就簡(jiǎn)明扼要的說(shuō)關(guān)鍵點(diǎn), 讓大家知道如何使用 RxJava 寫(xiě)并發(fā)!

假設(shè)現(xiàn)在有一個(gè)需求, 我們有 10 個(gè)文件, 我們需要把它上傳到服務(wù)器, 按順序 拿到文件的地址

需求很簡(jiǎn)單. 但是不用 RxJava 會(huì)很惡心, 因?yàn)槟阈枰h(huán)做這些事情, 并且還要?jiǎng)?chuàng)建子線程, 最惡心的是你還要管理每一個(gè)子線程成功與否, 都要計(jì)數(shù), 最終都回來(lái)了之后才能進(jìn)行排序和下一步的處理

那用 RxJava 怎么做排吴?請(qǐng)記住一點(diǎn), 任何代碼都要學(xué)會(huì)拆分, 首先我們的 10 個(gè)文件我們可以先考慮一個(gè)文件是怎么做的, 這個(gè)很簡(jiǎn)單, 順手就寫(xiě)好了, 很簡(jiǎn)單:

// 這里描述了一個(gè)上傳的操作, 入?yún)⑹且粋€(gè)文件的本地地址, 返回值是一個(gè) Single<String>
// 如果成功就拿到一個(gè) url 地址, 如果失敗就收到 error 回調(diào)
Single<String> upload(String filePath){
        return Single.just(filePath)
                .map(path -> new File(path))
                // 做一個(gè)文件是否存在檢查
                .doOnSuccess(file -> {
                    // 如果不存在就拋異常
                    if (!file.exists()) {
                        throw new FileNotFoundException(file.getPath());
                    }
                })
                // 信號(hào)轉(zhuǎn)換為另一個(gè) [Observable]
                .flatMap((Function<File, SingleSource<? extends String>>) file -> {
                    // 調(diào)用網(wǎng)絡(luò)請(qǐng)求
                    return Xxx.upload(file);
                });
    }

我們接下去寫(xiě), 如果讓 10 個(gè)都異步的跑起來(lái). 我們定義一個(gè)類

// 任務(wù)上傳的類, 方便 RxJava 流程處理的時(shí)候一直用一個(gè)類
// 不然你中途數(shù)據(jù)類型變換一定會(huì)用到成員變量的
class FileUploadTask {
        public String filePath;
        public int index;
        public String resultUrl;
        public FileUploadTask(String filePath, int index) {
            this.filePath = filePath;
            this.index = index;
        }
}
        String[] files = new String[]{"xxx", "xxx", "xxx"};
                // 轉(zhuǎn)化為 N 個(gè)任務(wù)類
        FileUploadTask[] tasks = new FileUploadTask[files.length];
        for (int i = 0; i < files.length; i++) {
            tasks[i] = new FileUploadTask(files[i], i);
        }
        Observable.fromArray(tasks)
                // 這一步的操作至關(guān)重要, flatMap 操作符能讓一個(gè)信號(hào)轉(zhuǎn)化為一個(gè)新的 [Observable]
                    // 然后讓這個(gè) [Observable] 在另一個(gè)線程上執(zhí)行即可
                .flatMapSingle((Function<FileUploadTask, 
                                SingleSource<FileUploadTask>>) task -> {
                  
                    final FileUploadTask currentTask = task;
                    return upload(currentTask.filePath)
                            // 切換訂閱的線程為一個(gè)新的線程
                            // 為什么用 subscribeOn 可以看上面切換線程的介紹
                            // !!!!!!!!!!!!! 這個(gè)切換是最重要的 !!!!!!!!!!!!!
                            .subscribeOn(Schedulers.newThread())
                            .map(url -> {
                                currentTask.resultUrl = url;
                                return currentTask;
                            });

                })
                    // 為什么要排序秆乳?因?yàn)槎嗑€程會(huì)打亂數(shù)據(jù)會(huì)來(lái)的順序, 如果你不在意順序, 那就不要排序
                // 這里的排序是倒序還是正序我忘了, 不影響大局, 如果你們發(fā)現(xiàn)錯(cuò)了, o1 和 o2 記得換一下
                .sorted((o1, o2) -> o1.index - o2.index)
                // 只要 url 結(jié)果
                .map(task -> task.resultUrl)
                // 所有信號(hào)收集成為一個(gè) List
                .toList()
                // 切換信號(hào)發(fā)射的線程到主線程
                .observeOn(AndroidSchedulers.mainThread())
                // 切換訂閱的線程為 IO 線程
                .subscribeOn(Schedulers.io())
                .subscribe(urls -> {
                    // 這里拿到的就是所有上傳好的 urls
                }, throwable -> {
                    // 就是就是其中發(fā)生了錯(cuò)誤
                });

上面的代碼我寫(xiě)了一個(gè)典型的例子, 希望大家能有所理解. 上面的使用 flatMap 操作符讓新的 [Observable] 運(yùn)行在新線程上, 其實(shí)還可以先創(chuàng)建多 N 個(gè) [Observable] , 利用上面說(shuō)過(guò)的 merge 操作符.

                String[] files = new String[]{"xxx", "xxx", "xxx"};
                // 先創(chuàng)建多 N 個(gè)上傳任務(wù)的 Observable
        Observable<FileUploadTask>[] uploadTaskObservables = new Observable[files.length];
        for (int i = 0; i < files.length; i++) {
            final FileUploadTask fileUploadTask = new FileUploadTask(files[i], i);
            uploadTaskObservables[i] = upload(files[i])
                    .map(url -> {
                        fileUploadTask.resultUrl = url;
                        return fileUploadTask;
                    })
                    // 切換訂閱的線程為一個(gè)新的線程
                    // !!!!!!!!!!!!! 這個(gè)切換是最重要的 !!!!!!!!!!!!!
                    .subscribeOn(Schedulers.newThread())
                    // 轉(zhuǎn)化為 Observable, 只有 Observable 才能使用 merge
                    .toObservable();
        }
                // 利用 merge 操作符
        Observable.mergeArray(uploadTaskObservables)
                    // 為什么要排序?因?yàn)槎嗑€程會(huì)打亂數(shù)據(jù)會(huì)來(lái)的順序, 如果你不在意順序, 那就不要排序
                // 這里的排序是倒序還是正序我忘了, 不影響大局, 如果你們發(fā)現(xiàn)錯(cuò)了, o1 和 o2 記得換一下
                .sorted((o1, o2) -> o1.index - o2.index)
                // 只要 url 結(jié)果
                .map(task -> task.resultUrl)
                // 切換信號(hào)發(fā)射的線程到主線程
                .observeOn(AndroidSchedulers.mainThread())
                // 切換訂閱的線程為 IO 線程
                .subscribeOn(Schedulers.io())
                .subscribe(urls -> {
                    // 這里拿到的就是所有上傳好的 urls
                }, throwable -> {
                    // 就是就是其中發(fā)生了錯(cuò)誤
                });

利用這種也是可以完成的, 好啦, 多線程的演示就到這里結(jié)束吧, 希望你們會(huì)喜歡. 如果覺(jué)得不錯(cuò)請(qǐng)點(diǎn)贊或者評(píng)論!!!

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末钻哩,一起剝皮案震驚了整個(gè)濱河市屹堰,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌街氢,老刑警劉巖扯键,帶你破解...
    沈念sama閱讀 216,372評(píng)論 6 498
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異珊肃,居然都是意外死亡荣刑,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,368評(píng)論 3 392
  • 文/潘曉璐 我一進(jìn)店門伦乔,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)厉亏,“玉大人,你說(shuō)我怎么就攤上這事烈和“唬” “怎么了?”我有些...
    開(kāi)封第一講書(shū)人閱讀 162,415評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵斥杜,是天一觀的道長(zhǎng)虱颗。 經(jīng)常有香客問(wèn)我,道長(zhǎng)蔗喂,這世上最難降的妖魔是什么忘渔? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 58,157評(píng)論 1 292
  • 正文 為了忘掉前任,我火速辦了婚禮缰儿,結(jié)果婚禮上畦粮,老公的妹妹穿的比我還像新娘。我一直安慰自己乖阵,他們只是感情好宣赔,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,171評(píng)論 6 388
  • 文/花漫 我一把揭開(kāi)白布。 她就那樣靜靜地躺著瞪浸,像睡著了一般儒将。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上对蒲,一...
    開(kāi)封第一講書(shū)人閱讀 51,125評(píng)論 1 297
  • 那天钩蚊,我揣著相機(jī)與錄音贡翘,去河邊找鬼。 笑死砰逻,一個(gè)胖子當(dāng)著我的面吹牛瓤狐,可吹牛的內(nèi)容都是我干的文狱。 我是一名探鬼主播雕擂,決...
    沈念sama閱讀 40,028評(píng)論 3 417
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼贰健,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了刚操?” 一聲冷哼從身側(cè)響起闸翅,我...
    開(kāi)封第一講書(shū)人閱讀 38,887評(píng)論 0 274
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎赡茸,沒(méi)想到半個(gè)月后缎脾,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,310評(píng)論 1 310
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡占卧,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,533評(píng)論 2 332
  • 正文 我和宋清朗相戀三年遗菠,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片华蜒。...
    茶點(diǎn)故事閱讀 39,690評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡辙纬,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出叭喜,到底是詐尸還是另有隱情贺拣,我是刑警寧澤,帶...
    沈念sama閱讀 35,411評(píng)論 5 343
  • 正文 年R本政府宣布捂蕴,位于F島的核電站譬涡,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏啥辨。R本人自食惡果不足惜涡匀,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,004評(píng)論 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望溉知。 院中可真熱鬧陨瘩,春花似錦、人聲如沸级乍。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 31,659評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)玫荣。三九已至甚淡,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間捅厂,已是汗流浹背贯卦。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 32,812評(píng)論 1 268
  • 我被黑心中介騙來(lái)泰國(guó)打工底挫, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人脸侥。 一個(gè)月前我還...
    沈念sama閱讀 47,693評(píng)論 2 368
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像盈厘,于是被迫代替她去往敵國(guó)和親睁枕。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,577評(píng)論 2 353

推薦閱讀更多精彩內(nèi)容