RxJava2的使用與原理淺析(一)

前言

首先,感謝以下作者分享自己對(duì)于RxJava的理解
給 Android 開(kāi)發(fā)者的 RxJava 詳解
針對(duì)RxJava1.×起便,入門必備
RxJava2 只看這一篇文章就夠了 - 掘金
RxJava2的API大全蒜埋,適合需要使用對(duì)應(yīng)APID時(shí)作為資料查看

本文設(shè)計(jì)源碼以及介紹均是基于RxJava2鹰服,示例均是為了示例而示例齿风,項(xiàng)目中要使用RxJava,首先在gradle中添加依賴:

implementation 'io.reactivex.rxjava2:rxjava:2.1.4'
implementation 'io.reactivex.rxjava2:rxandroid:2.0.2'

一您朽、RxJava2簡(jiǎn)介

什么是RxJava

A library for composing asynchronous and event-based programs by using observable sequences.
一個(gè)在 Java VM 上使用可觀測(cè)的序列來(lái)組成異步的狂丝、基于事件的程序的庫(kù)

引用RxJava Github Readme中的介紹,可能看起來(lái)仍然讓人感到費(fèi)解哗总,我的理解是:
使用觀察者模式几颜,采用鏈?zhǔn)骄幊蹋谑录膶?shí)現(xiàn)異步的庫(kù)
到這里讯屈,沒(méi)有使用過(guò)RxJava的話蛋哭,一定還是不知道這個(gè)庫(kù)究竟是做什么的,沒(méi)關(guān)系涮母,我們慢慢道來(lái)谆趾。首先我們需要知道RxJava究竟解決了什么問(wèn)題。

RxJava解決了什么問(wèn)題

即使沒(méi)有使用過(guò)叛本,一定也聽(tīng)說(shuō)過(guò)RxJava沪蓬,RxJava應(yīng)用如此廣泛,那么它究竟解決了什么問(wèn)題呢来候?一個(gè)詞:異步跷叉,但是同樣是異步,可以使用的工具有很多营搅,我們?yōu)槭裁雌褂肦xJava呢云挟?

為什么使用RxJava

我們知道,Android中實(shí)現(xiàn)異步的方式有很多转质,Handler园欣、AsyncTask、runOnUiThread等休蟹,為什么我們就要使用RxJava來(lái)實(shí)現(xiàn)異步呢沸枯?換言之日矫,RxJava相比其他實(shí)現(xiàn)異步的方法有什么優(yōu)勢(shì)呢?最大的優(yōu)勢(shì)就是邏輯清晰辉饱。

假設(shè)有這樣一個(gè)需求:界面上有一個(gè)自定義的視圖 imageCollectorView 搬男,它的作用是顯示多張圖片拣展,并能使用 add(Bitmap) 方法來(lái)任意增加顯示的圖片∨碚樱現(xiàn)在需要程序?qū)⒁粋€(gè)給出的目錄數(shù)組 File[] folders 中每個(gè)目錄下的 png 圖片都加載出來(lái)并顯示在 imageCollectorView 中。需要注意的是备埃,由于讀取圖片的這一過(guò)程較為耗時(shí)姓惑,需要放在后臺(tái)執(zhí)行,而圖片的顯示則必須在 UI 線程執(zhí)行按脚。常用的實(shí)現(xiàn)方式有多種于毙,我這里貼出其中一種:

new Thread(() -> {
            for (File folder : folders) {
                File[] files = folder.listFiles();
                for (File file : files) {
                    if (file.getName().endsWith(".png")) {
                        final Bitmap bitmap = getBitmapFromFile(file);
                        runOnUiThread(() -> imageCollectorView.add(bitmap));
                    }
                }
            }
        }).start();

而如果使用RxJava,實(shí)現(xiàn)方式是這樣的:

Observable.fromArray(folders)
                .flatMap((Function<File, ObservableSource<File>>) folder -> Observable.fromArray(folder.listFiles()))
                .filter(file -> file.getName().endsWith("png"))
                .map((Function<File, Bitmap>) file -> getBitmapFromFile(file))
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(bitmap -> imageCollectorView.add(bitmap));

一定有人會(huì)覺(jué)得辅搬,這完全沒(méi)有簡(jiǎn)潔呀唯沮,哪里看出來(lái)簡(jiǎn)潔了,上面所說(shuō)的簡(jiǎn)潔只是邏輯簡(jiǎn)潔堪遂,而非代碼簡(jiǎn)潔溶褪。

觀察一下,你就會(huì)發(fā)現(xiàn)猿妈,使用RxJava吹菱,是一條鏈?zhǔn)秸{(diào)用下來(lái)的鳍刷,完全沒(méi)有任何嵌套,這種優(yōu)勢(shì)在需求變得復(fù)雜之后俯抖,優(yōu)勢(shì)會(huì)更加明顯输瓜,比如我們現(xiàn)在增加一條需求前痘,只需要加載前十張圖片担忧,不使用RxJava要怎么去做呢瓶盛?相信大家都會(huì)做示罗,不妨實(shí)現(xiàn)去看看蚜点,而如果是使用RxJava绍绘,修改后的實(shí)現(xiàn)如下:

Observable.fromArray(folders)
                .flatMap((Function<File, ObservableSource<File>>) folder -> Observable.fromArray(folder.listFiles()))
                .filter(file -> file.getName().endsWith("png"))
                .map((Function<File, Bitmap>) file -> getBitmapFromFile(file))
                .take(LOAD_IMAGE_NUM)
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(bitmap -> imageCollectorView.add(bitmap));

可以看到陪拘,僅僅只增加了一行代碼纤壁,且未新增任何嵌套酌媒。

所以RxJava最大的優(yōu)勢(shì)就是簡(jiǎn)潔,無(wú)論邏輯如何復(fù)雜喇辽,總是一條鏈?zhǔn)秸{(diào)用的簡(jiǎn)潔拭荤。

二舅世、基本使用

在分析RxJava的原理之前雏亚,我們首先要了解一下觀察者模式罢低,如果已經(jīng)熟知觀察者模式,可以跳過(guò)這段宜岛。

觀察者模式

先讓我們想象一下生活中一個(gè)場(chǎng)景:

讀者(觀察者)發(fā)現(xiàn)一個(gè)微信公眾號(hào)(被觀察者)很不錯(cuò)萍倡,希望長(zhǎng)期關(guān)注列敲,于是便關(guān)注(注冊(cè))了這個(gè)微信公眾號(hào)戴而,當(dāng)這個(gè)微信公眾號(hào)發(fā)布了一篇新的文章所意,關(guān)注了該微信公眾號(hào)的讀者就會(huì)接收到通知(被觀察者發(fā)送通知給注冊(cè)了的觀察者)扁眯,如果讀者不再關(guān)注(注銷)該微信公眾號(hào)翅帜,就再不會(huì)接收到通知了

以上就是一個(gè)典型的觀察者模式的流程涝滴,生活中像這樣的場(chǎng)景還有很多歼疮,那么什么是觀察者模式呢韩脏?

概念

引用《Head First設(shè)計(jì)模式》一書中的原話

觀察者模式定義了對(duì)象之間一對(duì)多的依賴赡矢,這樣一來(lái)吹散,當(dāng)該對(duì)象改變狀態(tài)時(shí),它的所有依賴者會(huì)受到通知并自動(dòng)更新
——Head First設(shè)計(jì)模式

以下是觀察者模式UML圖


觀察者模式UML圖

我們可以看到,觀察者模式首先定義了兩個(gè)接口類:Observable(被觀察者)画饥、Observer(觀察者)抖甘,
Observable接口中包含三個(gè)方法

  • registerObserver()注冊(cè)
  • removeObserver()注銷
  • notifyObservers()發(fā)送通知

Observer接口中包含一個(gè)方法

  • update()觀察者接收到通知后的行為

以上兩個(gè)接口单山,很簡(jiǎn)單,在這里我就不貼代碼了昼接,那么實(shí)現(xiàn)類要怎么做呢慢睡?

以下是實(shí)現(xiàn)類的代碼
Observable實(shí)現(xiàn)類:

public class ConcreteObservable implements Observable {
    private static final String TAG = ConcreteObservable.class.getSimpleName();
    List<Observer> observers;

    public ConcreteObservable(){
        observers = new ArrayList<>();
    }
    @Override
    public void register(Observer observer) {
        if (observer == null) {
            throw new NullPointerException("observer cannot be null");
        }
        if (observers.contains(observer)) {
            Log.d("TAG", "you have registered this");
        } else {
            Log.d("TAG", observer.toString() + " register");
            observers.add(observer);
        }
    }

    @Override
    public void removeObserver(Observer observer) {
        if (observer == null) {
            throw new NullPointerException("observer cannot be null");
        }
        if (observers.contains(observer)) {
            Log.d("TAG", observer.toString() + " remove");
            observers.remove(observer);
        } else {
            Log.d("TAG", "you have not registered this");
        }
    }

    @Override
    public void notifyObservers() {
        for (Observer observer : observers) {
            observer.update();
        }
    }

Observer實(shí)現(xiàn)類:

public class TomObserver implements Observer {
    private static final String TAG = TomObserver.class.getSimpleName();
    @Override
    public void update() {
        Log.d("TAG", "notify Tom");
    }

    @Override
    public String toString() {
        return "Tom";
    }
}
public class KittyObserver implements Observer {
    @Override
    public void update() {
        Log.d("TAG", "notify Kitty");
    }

    @Override
    public String toString() {
        return "Kitty";
    }
}

使用:

public class MainActivity extends AppCompatActivity {

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_main);
        Observer tomObserver = new TomObserver(), kittyObserver = new KittyObserver();
        Observable observable = new ConcreteObservable();
        observable.register(tomObserver);
        observable.register(kittyObserver);
        observable.notifyObservers();
        observable.removeObserver(tomObserver);
        observable.notifyObservers();
    }
}

打印日志

D/TAG: Tom register
D/TAG: Kitty register
D/TAG: notify Tom
D/TAG: notify Kitty
D/TAG: Tom remove
D/TAG: notify Kitty

觀察者模式在RxJava中的應(yīng)用

RxJava中有四個(gè)概念棕硫,Observable(被觀察者)、Observer(觀察者)纬纪、subscribe(訂閱)包各、事件问畅,
Observer和Observable通過(guò)subscribe方法實(shí)現(xiàn)訂閱护姆,從而Observable發(fā)送事件時(shí)签则,Observer可以收到相關(guān)通知渐裂。

與標(biāo)準(zhǔn)觀察者模式不同的是钠惩,RxJava除了onNext(對(duì)應(yīng)標(biāo)準(zhǔn)觀察者模式的notifyObservers())事件以外篓跛,還有onError()和onComplete()愧沟,當(dāng)Observable發(fā)送完一系列onNext()事件后,必須調(diào)用onComplete()標(biāo)志著事件序列的結(jié)束盖奈,中間如果出現(xiàn)錯(cuò)誤钢坦,便會(huì)走到onError()事件爹凹,這兩個(gè)事件均只會(huì)被調(diào)用一次镶殷,且是相互排斥的批钠,即走到了onComplete()就不會(huì)走到onError()埋心,反之亦然拷呆。

基本實(shí)現(xiàn)

Observable(被觀察者)

Observable<String> observable = Observable.create(emitter -> {
            emitter.onNext("Hello, observer");
            emitter.onComplete();
        });

可以看到通過(guò)Observable.create()方法創(chuàng)建了Observable對(duì)象茬斧,方法中傳入了ObservableOnSubscribe<T>對(duì)象项秉,這個(gè)類中使用了泛型娄蔼,這里的泛型就對(duì)應(yīng)著onNext()事件傳入的參數(shù)類型岁诉,以上例子這里的數(shù)據(jù)類型是String涕癣,在傳入的ObservableOnSubscribe<T>對(duì)象中坠韩,重寫了subscribe方法只搁,上述例子發(fā)送了一個(gè)onNext()事件须蜗,又發(fā)送了一個(gè)onComplete()事件明肮,告訴Observer事件序列結(jié)束,不會(huì)再有新的onNext()事件循未。
當(dāng)然的妖,Observable對(duì)象并不止這一種創(chuàng)建方式嫂粟,RxJava提供了很多相關(guān)的API星虹,例如fromArray()宽涌、just()等等卸亮,暫且按下不表兼贸。

Observer(觀察者)

Observer<String> observer = new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.d("TAG", "onSubscribe");
            }

            @Override
            public void onNext(String o) {
                Log.d("TAG", o);
            }

            @Override
            public void onError(Throwable e) {
                Log.d("TAG", "onError");
            }

            @Override
            public void onComplete() {
                Log.d("TAG", "onComplete");
            }
        };

創(chuàng)建Observer對(duì)象需要重寫四個(gè)方法,可以看到其中三個(gè)重寫的方法剛好對(duì)應(yīng)三個(gè)事件很澄,可以猜測(cè)甩苛,Observable發(fā)送的事件應(yīng)該是交由對(duì)應(yīng)的方法處理,但是多出來(lái)的onSubscribe()方法是做什么的呢?onSubscribe()方法是在Observable發(fā)送事件前被調(diào)用的赁酝,稍后會(huì)說(shuō)到這點(diǎn)酌呆。

當(dāng)然搔耕,RxJava中的觀察者對(duì)象并不止Observer這一個(gè)類弃榨,RxJava還提供了一些其他的類供實(shí)現(xiàn)鲸睛,例如Consumer等腊凶。

subscribe(訂閱)

observable.subscribe(observer);

這一步就非常簡(jiǎn)單钧萍,但是會(huì)發(fā)現(xiàn)這里有個(gè)奇怪的地方风瘦,正常流程應(yīng)該是觀察者訂閱被觀察者万搔,但是這里卻是被觀察者訂閱觀察者,顯然酗捌,observer.subscribe(observable)這樣的寫法更加容易讓讀者接受

過(guò)如果把 API 設(shè)計(jì)成 observer.subscribe(observable) / subscriber.subscribe(observable) 胖缤,雖然更加符合思維邏輯哪廓,但對(duì)流式 API 的設(shè)計(jì)就造成影響了涡真,比較起來(lái)明顯是得不償失的哆料。

以上引用扔物線的一段話

RxJava中線程控制

有一些讀者一定好奇剧劝,說(shuō)了半天讥此,好像也沒(méi)有看到實(shí)現(xiàn)了異步呀萄喳,別著急他巨,這就介紹RxJava是如何在這個(gè)鏈?zhǔn)骄幊讨袑?shí)現(xiàn)異步的染突。

我們看一下下面的示例代碼

Observable
                .create((ObservableOnSubscribe<String>) emitter -> {
                    //模擬網(wǎng)絡(luò)訪問(wèn)流程
                    Log.d("TAG", "subscribe " + Thread.currentThread().getName());
                    Thread.sleep(200);
                    emitter.onNext("json ");
                    emitter.onComplete();
                })
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Observer<String>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.d("TAG", "onSubscribe " + Thread.currentThread().getName());
                    }

                    @Override
                    public void onNext(String s) {
                        Log.d("TAG", s + Thread.currentThread().getName());
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d("TAG", "onError " + Thread.currentThread().getName());
                    }

                    @Override
                    public void onComplete() {
                        Log.d("TAG", "onComplete " + Thread.currentThread().getName());
                    }
                });

以下是日志

D/TAG: onSubscribe main
D/TAG: subscribe RxCachedThreadScheduler-1
D/TAG: json main
D/TAG: onComplete main

我們發(fā)現(xiàn)也榄,Observable發(fā)送事件的方法是運(yùn)行在子線程的甜紫,而Observer處理事件的方法均是運(yùn)行在主線程(UI線程)囚霸,而我們僅僅只是增加了兩行代碼拓型,就實(shí)現(xiàn)了異步編程岩睁,是不是很方便呢捕儒?但是有一個(gè)例外刘莹,onSubscribe()方法根據(jù)日志打印点弯,是運(yùn)行在主線程的抢肛,那么它真的就是和其他的事件一樣捡絮,由于增加了兩行代碼福稳,運(yùn)行在了主線程嗎的圆?我們讓這個(gè)任務(wù)在子線程運(yùn)行越妈,再看看打印的日志

D/TAG: onSubscribe Thread-4
D/TAG: subscribe RxCachedThreadScheduler-1
D/TAG: json main
D/TAG: onComplete main

由此我們知道,onSubscribe()既不是運(yùn)行在發(fā)送事件的線程,也不是運(yùn)行在處理事件的線程娱节,而是運(yùn)行在當(dāng)前線程肄满,具體這是為什么呢稠歉,我們后面會(huì)介紹〈ィ現(xiàn)在我們先來(lái)看一下勺疼,增加的兩行代碼究竟是什么意思呢执庐?

.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())

subscribeOn(Scheduler scheduler)方法定義了發(fā)送事件運(yùn)行的線程轨淌,而observeOn(Scheduler scheduler)方法則是指定處理事件運(yùn)行的線程递鹉,RxJava中也指定了一些可以直接使用的Scheduler

  • AndroidSchedulers.mainThread()
    Android主線程,即UI線程
  • Schedulers.newThread()
    創(chuàng)建一個(gè)新的線程
  • Schedulers.io()
    用于 IO 密集型任務(wù)窜觉,如果異步阻塞 IO 操作禀挫。
  • Schedulers.computation()
    用于使用計(jì)算任務(wù)语婴,如事件循環(huán)和回調(diào)處理
  • Schedulers.trampoline()
    當(dāng)前線程
  • Schedulers.single()
    單線程砰左,如果線程中有正在處理的任務(wù),新來(lái)的任務(wù)會(huì)進(jìn)入等待隊(duì)列
  • Schedulers.from(Executor executor)
    指定一個(gè)線程池

常用Api

創(chuàng)建操作符

1. create()
有什么用
創(chuàng)建一個(gè)Observable
怎么用

Observable
                .create((ObservableOnSubscribe<String>) emitter -> {
                    emitter.onNext("1");
                    emitter.onNext("2");
                    emitter.onNext("3");
                    emitter.onComplete();
                })
                .subscribe((s -> Log.d(TAG, "onNext: " + s)));

日志打印

D/MainActivity: onNext: 1
D/MainActivity: onNext: 2
D/MainActivity: onNext: 3

2. just()
有什么用
傳入不超過(guò)十個(gè)對(duì)象僻造,依次發(fā)送
怎么用

Observable.just(1, 3, 5)
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onNext(Integer integer) {
                        Log.d(TAG, "onNext: " + integer);
                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete");
                    }
                });

日志打印

D/MainActivity: onNext: 1
D/MainActivity: onNext: 3
D/MainActivity: onNext: 5
D/MainActivity: onComplete

3. fromArray()
有什么用
與just()類似竹挡,只不過(guò)fromArray()可以傳入超過(guò)10個(gè)item立膛,也可以傳入數(shù)組揪罕、集合等
怎么用

Integer[] array = {1, 3, 5};
        Observable.fromArray(array)
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onNext(Integer integer) {
                        Log.d(TAG, "onNext: " + integer);
                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete");
                    }
                });

日志打印

D/MainActivity: onNext: 1
D/MainActivity: onNext: 3
D/MainActivity: onNext: 5
D/MainActivity: onComplete

注意:這里的數(shù)組必須是對(duì)象數(shù)組,而不能是基本類型數(shù)組
4. fromCallable()
有什么用
這里的 Callable 是 java.util.concurrent 中的 Callable旧巾,Callable 和 Runnable 的用法基本一致耸序,只是它會(huì)返回一個(gè)結(jié)果值,這個(gè)結(jié)果值就是發(fā)給觀察者的鲁猩。
怎么用

Observable.fromCallable(() -> 1)
                .subscribe(integer -> Log.d(TAG, "accept" + integer));

日志打印

D/MainActivity: accept: 1

5. timer()
有什么用
指定時(shí)間倒計(jì)時(shí)結(jié)束后,會(huì)發(fā)送一個(gè)0L的值給觀察者
怎么用

Observable.timer(5, TimeUnit.SECONDS)
                .subscribe(new Observer<Long>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.d(TAG, "onSubscribe");
                    }

                    @Override
                    public void onNext(Long aLong) {
                        Log.d(TAG, "onNext: " + aLong);
                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onComplete() {

                    }
                });

日志打印

2019-05-20 21:59:42.912 5248-5248/com.danc.calendartext D/MainActivity: onSubscribe
2019-05-20 21:59:47.918 5248-5302/com.danc.calendartext D/MainActivity: onNext: 0

可以看到廓握,onSubscribe之后隔了5s中搅窿,接收到onNext事件
6. interval()
有什么用
每隔指定時(shí)間發(fā)送一個(gè)事件,這個(gè)事件包含一個(gè)數(shù)字隙券,從0開(kāi)始男应,每次自增1
怎么用

Observable.interval(0, 1, TimeUnit.SECONDS)
                .subscribe(aLong -> {
                    Log.d(TAG, "計(jì)時(shí)(s): " + aLong);
                });

日志打印

2019-05-20 22:03:29.786 5375-5420/com.danc.calendartext D/MainActivity: 計(jì)時(shí)(s): 0
2019-05-20 22:03:30.786 5375-5420/com.danc.calendartext D/MainActivity: 計(jì)時(shí)(s): 1
2019-05-20 22:03:31.786 5375-5420/com.danc.calendartext D/MainActivity: 計(jì)時(shí)(s): 2
2019-05-20 22:03:32.787 5375-5420/com.danc.calendartext D/MainActivity: 計(jì)時(shí)(s): 3
2019-05-20 22:03:33.786 5375-5420/com.danc.calendartext D/MainActivity: 計(jì)時(shí)(s): 4
2019-05-20 22:03:34.786 5375-5420/com.danc.calendartext D/MainActivity: 計(jì)時(shí)(s): 5

轉(zhuǎn)化操作符

1. map()
有什么用
將被觀察者發(fā)送的數(shù)據(jù)類型轉(zhuǎn)換為其他類型
怎么用
以下實(shí)例將Integer類型轉(zhuǎn)換為String

Observable.just(1, 3, 5)
                .map(integer -> "--" + integer)
                .subscribe(s -> Log.d(TAG, "accept: " + s));

日志打印

2019-05-20 22:05:58.079 5494-5494/com.danc.calendartext D/MainActivity: accept: --1
2019-05-20 22:05:58.079 5494-5494/com.danc.calendartext D/MainActivity: accept: --3
2019-05-20 22:05:58.079 5494-5494/com.danc.calendartext D/MainActivity: accept: --5

2. flatMap()
有什么用
這個(gè)方法可以將事件序列中的元素進(jìn)行整合加工,返回一個(gè)新的被觀察者娱仔。
怎么用
flatMap() 其實(shí)與 map() 類似沐飘,但是 flatMap() 返回的是一個(gè) Observerable。現(xiàn)在用一個(gè)例子來(lái)說(shuō)明 flatMap() 的用法牲迫。
假設(shè)一個(gè)有一個(gè) Person 類耐朴,這個(gè)類的定義如下:

public class Person {
    private String name;
    private List<Plan> planList = new ArrayList<>();
    public Person(String name, List<Plan> planList) {
        this.name = name;
        this.planList = planList;
    }
    public String getName() {
        return name;
    }
    public void setName(String name) {
        this.name = name;
    }
    public List<Plan> getPlanList() {
        return planList;
    }
    public void setPlanList(List<Plan> planList) {
        this.planList = planList;
    }
}

Person 類有一個(gè) name 和 planList 兩個(gè)變量,分別代表的是人名和計(jì)劃清單盹憎。
Plan 類的定義如下:

public class Plan {

    private String time;
    private String content;
    private List<String> actionList = new ArrayList<>();

    public Plan(String time, String content) {
        this.time = time;
        this.content = content;
    }

    public String getTime() {
        return time;
    }

    public void setTime(String time) {
        this.time = time;
    }

    public String getContent() {
        return content;
    }

    public void setContent(String content) {
        this.content = content;
    }

    public List<String> getActionList() {
        return actionList;
    }

    public void setActionList(List<String> actionList) {
        this.actionList = actionList;
    }
}

現(xiàn)在有一個(gè)需求就是要將 Person 集合中的每個(gè)元素中的 Plan 的 action 打印出來(lái)筛峭。
首先用 map() 來(lái)實(shí)現(xiàn)這個(gè)需求看看:

Observable.fromIterable(personList)
.map(new Function < Person, List < Plan >> () {
    @Override
    public List < Plan > apply(Person person) throws Exception {
        return person.getPlanList();
    }
})
.subscribe(new Observer < List < Plan >> () {
    @Override
    public void onSubscribe(Disposable d) {

    }

    @Override
    public void onNext(List < Plan > plans) {
        for (Plan plan: plans) {
            List < String > planActionList = plan.getActionList();
            for (String action: planActionList) {
                Log.d(TAG, "==================action " + action);
            }
        }
    }

    @Override
    public void onError(Throwable e) {

    }

    @Override
    public void onComplete() {

    }
});

可以看到 onNext() 用了嵌套 for 循環(huán)來(lái)實(shí)現(xiàn),如果代碼邏輯復(fù)雜起來(lái)的話陪每,可能需要多重循環(huán)才可以實(shí)現(xiàn)影晓。
現(xiàn)在看下使用 flatMap() 實(shí)現(xiàn):

Observable.fromIterable(personList)
.flatMap(new Function < Person, ObservableSource < Plan >> () {
    @Override
    public ObservableSource < Plan > apply(Person person) {
        return Observable.fromIterable(person.getPlanList());
    }
})
.flatMap(new Function < Plan, ObservableSource < String >> () {
    @Override
    public ObservableSource < String > apply(Plan plan) throws Exception {
        return Observable.fromIterable(plan.getActionList());
    }
})
.subscribe(new Observer < String > () {
    @Override
    public void onSubscribe(Disposable d) {

    }

    @Override
    public void onNext(String s) {
        Log.d(TAG, "==================action: " + s);
    }

    @Override
    public void onError(Throwable e) {

    }

    @Override
    public void onComplete() {

    }
});

從代碼可以看出,只需要兩個(gè) flatMap() 就可以完成需求檩禾,并且代碼邏輯非常清晰挂签。

組合操作符

1. zip()
有什么用
將兩個(gè)被觀察者事件整合發(fā)送給觀察者,事件數(shù)量與兩個(gè)觀察這種事件數(shù)量較小的相同
怎么用

Observable.zip(Observable.intervalRange(1, 5, 1, 2, TimeUnit.SECONDS)
                        .map(aLong -> {
                            String s1 = "A" + aLong;
                            Log.d(TAG, "===================A 發(fā)送的事件 " + s1);
                            return s1;
                        }).subscribeOn(Schedulers.io()),
                Observable.intervalRange(1, 6, 1, 1, TimeUnit.SECONDS)
                        .map(aLong -> {
                            String s2 = "B" + aLong;
                            Log.d(TAG, "===================B 發(fā)送的事件 " + s2);
                            return s2;
                        }).subscribeOn(Schedulers.io()),
                (s, s2) -> {
                    String res = s + s2;
                    return res;
                })
                .subscribe(new Observer<String>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.d(TAG, "===================onSubscribe ");
                    }

                    @Override
                    public void onNext(String s) {
                        Log.d(TAG, "===================onNext " + s);
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "===================onError ");
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "===================onComplete ");
                    }
                });

日志打优尾:

2019-05-20 22:19:10.293 6183-6183/com.danc.calendartext D/MainActivity: ===================onSubscribe 
2019-05-20 22:19:11.308 6183-6210/com.danc.calendartext D/MainActivity: ===================A 發(fā)送的事件 A1
2019-05-20 22:19:11.308 6183-6209/com.danc.calendartext D/MainActivity: ===================B 發(fā)送的事件 B1
2019-05-20 22:19:11.308 6183-6209/com.danc.calendartext D/MainActivity: ===================onNext A1B1
2019-05-20 22:19:12.307 6183-6209/com.danc.calendartext D/MainActivity: ===================B 發(fā)送的事件 B2
2019-05-20 22:19:13.307 6183-6210/com.danc.calendartext D/MainActivity: ===================A 發(fā)送的事件 A2
2019-05-20 22:19:13.307 6183-6210/com.danc.calendartext D/MainActivity: ===================onNext A2B2
2019-05-20 22:19:13.307 6183-6209/com.danc.calendartext D/MainActivity: ===================B 發(fā)送的事件 B3
2019-05-20 22:19:14.308 6183-6209/com.danc.calendartext D/MainActivity: ===================B 發(fā)送的事件 B4
2019-05-20 22:19:15.306 6183-6210/com.danc.calendartext D/MainActivity: ===================A 發(fā)送的事件 A3
2019-05-20 22:19:15.307 6183-6210/com.danc.calendartext D/MainActivity: ===================onNext A3B3
2019-05-20 22:19:15.307 6183-6209/com.danc.calendartext D/MainActivity: ===================B 發(fā)送的事件 B5
2019-05-20 22:19:16.307 6183-6209/com.danc.calendartext D/MainActivity: ===================B 發(fā)送的事件 B6
2019-05-20 22:19:17.307 6183-6210/com.danc.calendartext D/MainActivity: ===================A 發(fā)送的事件 A4
2019-05-20 22:19:17.308 6183-6210/com.danc.calendartext D/MainActivity: ===================onNext A4B4
2019-05-20 22:19:19.307 6183-6210/com.danc.calendartext D/MainActivity: ===================A 發(fā)送的事件 A5
2019-05-20 22:19:19.307 6183-6210/com.danc.calendartext D/MainActivity: ===================onNext A5B5
2019-05-20 22:19:19.307 6183-6210/com.danc.calendartext D/MainActivity: ===================onComplete 

功能操作符

1. delay()
有什么用
訂閱后竹握,延遲一段時(shí)間發(fā)送事件
怎么用

Observable.just(1, 3, 5)
                .delay(2, TimeUnit.SECONDS)
                .subscribe(integer -> Log.d(TAG, "accept: " + integer));

打印日志

2019-05-20 22:24:53.879 6430-6430/com.danc.calendartext D/MainActivity: onSubscribe
2019-05-20 22:24:55.881 6430-6475/com.danc.calendartext D/MainActivity: accept: 1
2019-05-20 22:24:55.881 6430-6475/com.danc.calendartext D/MainActivity: accept: 3
2019-05-20 22:24:55.882 6430-6475/com.danc.calendartext D/MainActivity: accept: 5

從日志可以暗處,訂閱之后延遲2s開(kāi)始發(fā)送事件

2. doOnNext()
有什么用
每次收到onNext()事件辆飘,先執(zhí)行doOnNext()
怎么用

Observable.create((ObservableOnSubscribe<Integer>) e -> {
            e.onNext(1);
            e.onNext(2);
            e.onNext(3);
            e.onComplete();
        })
                .doOnNext(integer -> Log.d(TAG, "doOnNext " + integer))
                .subscribe(new Observer < Integer > () {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.d(TAG, "onSubscribe ");
                    }

                    @Override
                    public void onNext(Integer integer) {
                        Log.d(TAG, "onNext " + integer);
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "onError ");
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete ");
                    }
                });

日志打印

D/MainActivity: onSubscribe 
D/MainActivity: doOnNext 1
D/MainActivity: onNext 1
D/MainActivity: doOnNext 2
D/MainActivity: onNext 2
D/MainActivity: doOnNext 3
D/MainActivity: onNext 3
D/MainActivity: onComplete 

想要查看更多的RxJava2的API,可以看下面這篇文章。這篇文章更像是查閱的資料驹闰,只需要通覽了解一遍即可晨雳,熟悉常用的API,其余有個(gè)印象紧卒,需要的時(shí)候再查閱
RxJava2 只看這一篇文章就夠了 - 掘金

生命周期管理

RxJava固然很好用侥衬,但是使用不當(dāng),很容易造成內(nèi)存泄漏跑芳。

比如轴总,使用RxJava發(fā)布一個(gè)訂閱后,當(dāng)Activity被finish博个,此時(shí)訂閱邏輯還未完成怀樟,如果沒(méi)有及時(shí)取消訂閱,就會(huì)導(dǎo)致Activity無(wú)法被回收盆佣,從而引發(fā)內(nèi)存泄漏往堡。

目前,針對(duì)以上這種情況共耍,主流的有兩種解決方案:

  1. 通過(guò)封裝虑灰,手動(dòng)控制每一次訂閱,在合適的時(shí)機(jī)取消訂閱
  2. 使用RxLifecycle痹兜,關(guān)注Fragment穆咐、Activity的生命周期,在onDestroy()生命周期中自動(dòng)取消訂閱

手動(dòng)實(shí)現(xiàn)

以下是基于MVP架構(gòu)封裝的
protected CompositeDisposable compositeDisposable;

    @UiThread
    public void attachView(V view) {
        mView = view;
        compositeDisposable = new CompositeDisposable();
    }
    @UiThread
    public void detachView() {
        mView = null;
        if (compositeDisposable != null) {
            compositeDisposable.clear();
            compositeDisposable = null;
        }
    }

如何使用
在每次發(fā)生訂閱關(guān)系時(shí)字旭,將返回的Disposable實(shí)例保存在CompositeDisposable中对湃,在onDestroy()生命周期解除所有的訂閱關(guān)系

RxLifecycle

1. 添加依賴
在項(xiàng)目中添加以下依賴

implementation 'com.trello.rxlifecycle2:rxlifecycle:2.2.1'
implementation 'com.trello.rxlifecycle2:rxlifecycle-android:2.2.1'
implementation 'com.trello.rxlifecycle2:rxlifecycle-components:2.2.1'

2. 綁定生命周期
RxLifecycle提供了大量的Activity、Fragment以供繼承

RxLifecycle支持的Component
谐算、
代碼如下:

public class MainActivity extends RxAppCompatActivity {
      ...
      ...
}

使用bindToLifecycle()
以Activity為例熟尉,在Activity中使用bindToLifecycle()方法,完成Observable發(fā)布的事件和當(dāng)前的組件綁定洲脂,實(shí)現(xiàn)生命周期同步斤儿。從而實(shí)現(xiàn)當(dāng)前組件生命周期結(jié)束時(shí),自動(dòng)取消對(duì)Observable訂閱恐锦,代碼如下:

public class MainActivity extends RxAppCompatActivity {
     
    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_main);

        // 當(dāng)執(zhí)行onDestory()時(shí)往果, 自動(dòng)解除訂閱
        Observable.interval(1, TimeUnit.SECONDS)
            .doOnDispose(new Action() {
                @Override
                public void run() throws Exception {
                    Log.i(TAG, "Unsubscribing subscription from onCreate()");
                }
            })
            .compose(this.<Long>bindToLifecycle())
            .subscribe(new Consumer<Long>() {
                @Override
                public void accept(Long num) throws Exception {
                    Log.i(TAG, "Started in onCreate(), running until onDestory(): " + num);
                }
            });
    }
}

下一篇文章將針對(duì)訂閱流程以及線程管理的源碼進(jìn)行分析

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市一铅,隨后出現(xiàn)的幾起案子陕贮,更是在濱河造成了極大的恐慌,老刑警劉巖潘飘,帶你破解...
    沈念sama閱讀 217,185評(píng)論 6 503
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件肮之,死亡現(xiàn)場(chǎng)離奇詭異掉缺,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)戈擒,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,652評(píng)論 3 393
  • 文/潘曉璐 我一進(jìn)店門眶明,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人筐高,你說(shuō)我怎么就攤上這事搜囱。” “怎么了柑土?”我有些...
    開(kāi)封第一講書人閱讀 163,524評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵蜀肘,是天一觀的道長(zhǎng)。 經(jīng)常有香客問(wèn)我稽屏,道長(zhǎng)扮宠,這世上最難降的妖魔是什么? 我笑而不...
    開(kāi)封第一講書人閱讀 58,339評(píng)論 1 293
  • 正文 為了忘掉前任诫欠,我火速辦了婚禮涵卵,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘荒叼。我一直安慰自己轿偎,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,387評(píng)論 6 391
  • 文/花漫 我一把揭開(kāi)白布被廓。 她就那樣靜靜地躺著坏晦,像睡著了一般。 火紅的嫁衣襯著肌膚如雪嫁乘。 梳的紋絲不亂的頭發(fā)上昆婿,一...
    開(kāi)封第一講書人閱讀 51,287評(píng)論 1 301
  • 那天,我揣著相機(jī)與錄音蜓斧,去河邊找鬼仓蛆。 笑死,一個(gè)胖子當(dāng)著我的面吹牛挎春,可吹牛的內(nèi)容都是我干的看疙。 我是一名探鬼主播,決...
    沈念sama閱讀 40,130評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼直奋,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼能庆!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起脚线,我...
    開(kāi)封第一講書人閱讀 38,985評(píng)論 0 275
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤搁胆,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體渠旁,經(jīng)...
    沈念sama閱讀 45,420評(píng)論 1 313
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡攀例,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,617評(píng)論 3 334
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了一死。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片肛度。...
    茶點(diǎn)故事閱讀 39,779評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖投慈,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情冠骄,我是刑警寧澤伪煤,帶...
    沈念sama閱讀 35,477評(píng)論 5 345
  • 正文 年R本政府宣布,位于F島的核電站凛辣,受9級(jí)特大地震影響抱既,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜扁誓,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,088評(píng)論 3 328
  • 文/蒙蒙 一防泵、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧蝗敢,春花似錦捷泞、人聲如沸。這莊子的主人今日做“春日...
    開(kāi)封第一講書人閱讀 31,716評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至讶泰,卻和暖如春咏瑟,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背痪署。 一陣腳步聲響...
    開(kāi)封第一講書人閱讀 32,857評(píng)論 1 269
  • 我被黑心中介騙來(lái)泰國(guó)打工码泞, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人狼犯。 一個(gè)月前我還...
    沈念sama閱讀 47,876評(píng)論 2 370
  • 正文 我出身青樓余寥,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親辜王。 傳聞我的和親對(duì)象是個(gè)殘疾皇子劈狐,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,700評(píng)論 2 354