RxJava 的 Subject

streams everywhere.png

Subject 是一種特殊的存在

在前面一篇文章Cold Observable 和 Hot Observable中擅耽,曾經(jīng)介紹過 Subject 既是 Observable 又是 Observer(Subscriber)次企。官網(wǎng)稱 Subject 可以看成是一個橋梁或者代理娃磺。

Subject的分類

Subject包含四種類型分別是AsyncSubject仰坦、BehaviorSubject迈勋、ReplaySubject和PublishSubject勉耀。

1. AsyncSubject

Observer會接收AsyncSubject的onComplete()之前的最后一個數(shù)據(jù)盼铁。

AsyncSubject<String> subject = AsyncSubject.create();
        subject.onNext("asyncSubject1");
        subject.onNext("asyncSubject2");
        subject.onComplete();
        subject.subscribe(new Consumer<String>() {
            @Override
            public void accept(@NonNull String s) throws Exception {
                System.out.println("asyncSubject:"+s);
            }
        }, new Consumer<Throwable>() {
            @Override
            public void accept(@NonNull Throwable throwable) throws Exception {
                System.out.println("asyncSubject onError");  //不輸出(異常才會輸出)
            }
        }, new Action() {
            @Override
            public void run() throws Exception {
                System.out.println("asyncSubject:complete");  //輸出 asyncSubject onComplete
            }
        });

        subject.onNext("asyncSubject3");
        subject.onNext("asyncSubject4");

執(zhí)行結(jié)果:

asyncSubject:asyncSubject2
asyncSubject:complete

改一下代碼粗蔚,將subject.onComplete()放在最后。

        AsyncSubject<String> subject = AsyncSubject.create();
        subject.onNext("asyncSubject1");
        subject.onNext("asyncSubject2");

        subject.subscribe(new Consumer<String>() {
            @Override
            public void accept(@NonNull String s) throws Exception {
                System.out.println("asyncSubject:"+s);
            }
        }, new Consumer<Throwable>() {
            @Override
            public void accept(@NonNull Throwable throwable) throws Exception {
                System.out.println("asyncSubject onError");  //不輸出(異常才會輸出)
            }
        }, new Action() {
            @Override
            public void run() throws Exception {
                System.out.println("asyncSubject:complete");  //輸出 asyncSubject onComplete
            }
        });

        subject.onNext("asyncSubject3");
        subject.onNext("asyncSubject4");
        subject.onComplete();

執(zhí)行結(jié)果:

asyncSubject:asyncSubject4
asyncSubject:complete

注意饶火,subject.onComplete()必須要調(diào)用才會開始發(fā)送數(shù)據(jù)鹏控,否則Subscriber將不接收任何數(shù)據(jù)。

2. BehaviorSubject

Observer會接收到BehaviorSubject被訂閱之前的最后一個數(shù)據(jù)肤寝,再接收訂閱之后發(fā)射過來的數(shù)據(jù)当辐。如果BehaviorSubject被訂閱之前沒有發(fā)送任何數(shù)據(jù),則會發(fā)送一個默認數(shù)據(jù)鲤看。

        BehaviorSubject<String> subject = BehaviorSubject.createDefault("behaviorSubject1");

        subject.subscribe(new Consumer<String>() {
            @Override
            public void accept(@NonNull String s) throws Exception {
                System.out.println("behaviorSubject:"+s); 
            }
        }, new Consumer<Throwable>() {
            @Override
            public void accept(@NonNull Throwable throwable) throws Exception {
                System.out.println("behaviorSubject onError");  //不輸出(異常才會輸出)
            }
        }, new Action() {
            @Override
            public void run() throws Exception {
                System.out.println("behaviorSubject:complete");  //輸出 behaviorSubject onComplete
            }
        });

        subject.onNext("behaviorSubject2");
        subject.onNext("behaviorSubject3");

執(zhí)行結(jié)果:

behaviorSubject:behaviorSubject1
behaviorSubject:behaviorSubject2
behaviorSubject:behaviorSubject3

在這里缘揪,behaviorSubject1是默認值。因為執(zhí)行了

BehaviorSubject<String> subject = BehaviorSubject.createDefault("behaviorSubject1");

稍微改一下代碼义桂,在subscribe()之前找筝,再發(fā)射一個事件。

        BehaviorSubject<String> subject = BehaviorSubject.createDefault("behaviorSubject1");
        subject.onNext("behaviorSubject2");

        subject.subscribe(new Consumer<String>() {
            @Override
            public void accept(@NonNull String s) throws Exception {
                System.out.println("behaviorSubject:"+s);  //輸出asyncSubject:asyncSubject3
            }
        }, new Consumer<Throwable>() {
            @Override
            public void accept(@NonNull Throwable throwable) throws Exception {
                System.out.println("behaviorSubject onError");  //不輸出(異常才會輸出)
            }
        }, new Action() {
            @Override
            public void run() throws Exception {
                System.out.println("behaviorSubject:complete");  //輸出 behaviorSubject onComplete
            }
        });

        subject.onNext("behaviorSubject3");
        subject.onNext("behaviorSubject4");

執(zhí)行結(jié)果:

behaviorSubject:behaviorSubject2
behaviorSubject:behaviorSubject3
behaviorSubject:behaviorSubject4

這次丟棄了默認值慷吊,而發(fā)射behaviorSubject2袖裕。
因為BehaviorSubject 每次只會發(fā)射調(diào)用subscribe()方法之前的最后一個事件和調(diào)用subscribe()方法之后的事件。

BehaviorSubject還可以緩存最近一次發(fā)出信息的數(shù)據(jù)溉瓶。

3. ReplaySubject

ReplaySubject會發(fā)射所有來自原始Observable的數(shù)據(jù)給觀察者急鳄,無論它們是何時訂閱的。

        ReplaySubject<String> subject = ReplaySubject.create();
        subject.onNext("replaySubject1");
        subject.onNext("replaySubject2");

        subject.subscribe(new Consumer<String>() {
            @Override
            public void accept(@NonNull String s) throws Exception {
                System.out.println("replaySubject:"+s);
            }
        }, new Consumer<Throwable>() {
            @Override
            public void accept(@NonNull Throwable throwable) throws Exception {
                System.out.println("replaySubject onError");  //不輸出(異常才會輸出)
            }
        }, new Action() {
            @Override
            public void run() throws Exception {
                System.out.println("replaySubject:complete");  //輸出 replaySubject onComplete
            }
        });

        subject.onNext("replaySubject3");
        subject.onNext("replaySubject4");

執(zhí)行結(jié)果:

replaySubject:replaySubject1
replaySubject:replaySubject2
replaySubject:replaySubject3
replaySubject:replaySubject4

稍微改一下代碼嚷闭,將create()改成createWithSize(1)只緩存訂閱前最后發(fā)送的1條數(shù)據(jù)

        ReplaySubject<String> subject = ReplaySubject.createWithSize(1);
        subject.onNext("replaySubject1");
        subject.onNext("replaySubject2");

        subject.subscribe(new Consumer<String>() {
            @Override
            public void accept(@NonNull String s) throws Exception {
                System.out.println("replaySubject:"+s);
            }
        }, new Consumer<Throwable>() {
            @Override
            public void accept(@NonNull Throwable throwable) throws Exception {
                System.out.println("replaySubject onError");  //不輸出(異常才會輸出)
            }
        }, new Action() {
            @Override
            public void run() throws Exception {
                System.out.println("replaySubject:complete");  //輸出 replaySubject onComplete
            }
        });

        subject.onNext("replaySubject3");
        subject.onNext("replaySubject4");

執(zhí)行結(jié)果:

replaySubject:replaySubject2
replaySubject:replaySubject3
replaySubject:replaySubject4

這個執(zhí)行結(jié)果跟BehaviorSubject是一樣的攒岛。但是從并發(fā)的角度來看,ReplaySubject 在處理并發(fā) subscribe() 和 onNext() 時會更加復(fù)雜胞锰。

ReplaySubject除了可以限制緩存數(shù)據(jù)的數(shù)量和還能限制緩存的時間灾锯。使用createWithTime()即可。

4. PublishSubject

Observer只接收PublishSubject被訂閱之后發(fā)送的數(shù)據(jù)嗅榕。

        PublishSubject<String> subject = PublishSubject.create();
        subject.onNext("publicSubject1");
        subject.onNext("publicSubject2");
        subject.onComplete();

        subject.subscribe(new Consumer<String>() {
            @Override
            public void accept(@NonNull String s) throws Exception {
                System.out.println("publicSubject:"+s);
            }
        }, new Consumer<Throwable>() {
            @Override
            public void accept(@NonNull Throwable throwable) throws Exception {
                System.out.println("publicSubject onError");  //不輸出(異常才會輸出)
            }
        }, new Action() {
            @Override
            public void run() throws Exception {
                System.out.println("publicSubject:complete");  //輸出 publicSubject onComplete
            }
        });

        subject.onNext("publicSubject3");
        subject.onNext("publicSubject4");

執(zhí)行結(jié)果:

publicSubject:complete

因為subject在訂閱之前顺饮,已經(jīng)執(zhí)行了onComplete()方法吵聪,所以無法發(fā)射數(shù)據(jù)。稍微改一下代碼兼雄,將onComplete()方法放在最后吟逝。

        PublishSubject<String> subject = PublishSubject.create();
        subject.onNext("publicSubject1");
        subject.onNext("publicSubject2");

        subject.subscribe(new Consumer<String>() {
            @Override
            public void accept(@NonNull String s) throws Exception {
                System.out.println("publicSubject:"+s);
            }
        }, new Consumer<Throwable>() {
            @Override
            public void accept(@NonNull Throwable throwable) throws Exception {
                System.out.println("publicSubject onError");  //不輸出(異常才會輸出)
            }
        }, new Action() {
            @Override
            public void run() throws Exception {
                System.out.println("publicSubject:complete");  //輸出 publicSubject onComplete
            }
        });

        subject.onNext("publicSubject3");
        subject.onNext("publicSubject4");
        subject.onComplete();

執(zhí)行結(jié)果:

publicSubject:publicSubject3
publicSubject:publicSubject4
publicSubject:complete

最后,一句話總結(jié)一下四個Subject的特性赦肋。

Subject 發(fā)射行為
AsyncSubject 不論訂閱發(fā)生在什么時候,只會發(fā)射最后一個數(shù)據(jù)
BehaviorSubject 發(fā)送訂閱之前一個數(shù)據(jù)和訂閱之后的全部數(shù)據(jù)
ReplaySubject 不論訂閱發(fā)生在什么時候佃乘,都發(fā)射全部數(shù)據(jù)
PublishSubject 發(fā)送訂閱之后全部數(shù)據(jù)

可能錯過的事件

Subject 作為一個Observable時,可以不停地調(diào)用onNext()來發(fā)送事件趣避,直到遇到onComplete()才會結(jié)束。

PublishSubject<String> subject = PublishSubject.create();
        subject.subscribe(new Consumer<String>() {
                    @Override
                    public void accept(@NonNull String s) throws Exception {
                        System.out.println(s);
                    }
                }, new Consumer<Throwable>() {

                    @Override
                    public void accept(@NonNull Throwable throwable) throws Exception {

                    }
                },new Action() {
                    @Override
                    public void run() throws Exception {
                        System.out.println("completed");
                    }
                });
        subject.onNext("Foo");
        subject.onNext("Bar");
        subject.onComplete();

執(zhí)行的結(jié)果:

Foo
Bar
completed

如果程帕,使用 subsribeOn 操作符將 subject 切換到IO線程,再使用 Thread.sleep(2000) 讓主線程休眠2秒愁拭。

 PublishSubject<String> subject = PublishSubject.create();
        subject.subscribeOn(Schedulers.io())
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(@NonNull String s) throws Exception {
                        System.out.println(s);
                    }
                }, new Consumer<Throwable>() {

                    @Override
                    public void accept(@NonNull Throwable throwable) throws Exception {

                    }
                },new Action() {
                    @Override
                    public void run() throws Exception {
                        System.out.println("completed");
                    }
                });
        subject.onNext("Foo");
        subject.onNext("Bar");
        subject.onComplete();
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

這時,其執(zhí)行的結(jié)果變?yōu)椋?/p>

completed

為何會缺少打印Foo和Bar妆绞?

因為,subject 發(fā)射元素的線程被指派到了 IO 線程括饶,此時 IO 線程正在初始化還沒起來,subject 發(fā)射前這兩個元素Foo来涨、Bar還在主線程中图焰,主線程的這兩個元素往 IO 線程轉(zhuǎn)發(fā)的過程中由于 IO 線程還沒有起來,所以就被丟棄了蹦掐。此時技羔,無論Thread睡了多少秒,F(xiàn)oo卧抗、Bar都不會被打印出來藤滥。

其實,解決辦法也很簡單社裆,將subject改成使用Observable.create()來替代拙绊,它允許為每個訂閱者精確控制事件的發(fā)送,這樣就不會缺少打印Foo和Bar。

使用PublishSubject來實現(xiàn)簡化的RxBus

下面的代碼是一個簡化版本的Event Bus标沪,在這里使用了PublishSubject榄攀。因為事件總線是基于發(fā)布/訂閱模式實現(xiàn)的,如果某一事件在多個Activity/Fragment中被訂閱的話金句,在App的任意地方一旦發(fā)布該事件檩赢,則多個訂閱的地方都能夠同時收到這一事件(在這里,訂閱事件的Activity/Fragment不能被destory违寞,一旦被destory就不能收到事件)贞瞒,這很符合Hot Observable的特性。所以趁曼,我們使用PublishSubject憔狞,考慮到多線程的情況,還需要使用 Subject 的 toSerialized() 方法彰阴。

import io.reactivex.Observable;
import io.reactivex.subjects.PublishSubject;
import io.reactivex.subjects.Subject;

public class RxBus {

    private final Subject<Object> mBus;

    private RxBus() {
        mBus = PublishSubject.create().toSerialized();
    }

    public static RxBus get() {
        return Holder.BUS;
    }

    public void post(Object obj) {
        mBus.onNext(obj);
    }

    public <T> Observable<T> toObservable(Class<T> tClass) {
        return mBus.ofType(tClass);
    }

    public Observable<Object> toObservable() {
        return mBus;
    }

    public boolean hasObservers() {
        return mBus.hasObservers();
    }

    private static class Holder {
        private static final RxBus BUS = new RxBus();
    }
}

在這里Subject的toSerialized(),使用SerializedSubject包裝了原先的Subject拍冠。

    /**
     * Wraps this Subject and serializes the calls to the onSubscribe, onNext, onError and
     * onComplete methods, making them thread-safe.
     * <p>The method is thread-safe.
     * @return the wrapped and serialized subject
     */
    @NonNull
    public final Subject<T> toSerialized() {
        if (this instanceof SerializedSubject) {
            return this;
        }
        return new SerializedSubject<T>(this);
    }

這個版本的Event Bus比較簡單尿这,并沒有考慮到背壓的情況,因為在 RxJava2.x 中 Subject 已經(jīng)不再支持背壓了庆杜。如果要增加背壓的處理射众,可以使用Processor,我們需要將 PublishSubject 改成 PublishProcessor晃财,對應(yīng)的 Observable 也需要改成 Flowable叨橱。

使用BehaviorSubject來實現(xiàn)預(yù)加載

預(yù)加載可以很好的提高程序的用戶體驗。
每當用戶處于弱網(wǎng)絡(luò)時断盛,打開一個App可能出現(xiàn)一片空白或者一直在loading罗洗,那用戶一定會很煩躁伙菜。此時贩绕,如果能夠預(yù)先加載一些數(shù)據(jù)淑倾,例如上一次打開App時保存的數(shù)據(jù)征椒,這樣不至于會損傷App的用戶體驗。

下面是借助 BehaviorSubject 的特性來實現(xiàn)一個簡單的預(yù)加載類RxPreLoader脱茉。

import io.reactivex.disposables.Disposable;
import io.reactivex.functions.Consumer;
import io.reactivex.subjects.BehaviorSubject;

/**
 * Created by Tony Shen on 2017/6/2.
 */

public class RxPreLoader<T> {

    //能夠緩存訂閱之前的最新數(shù)據(jù)
    private  BehaviorSubject<T> mData;
    private Disposable disposable;

    public RxPreLoader(T defaultValue) {

        mData = BehaviorSubject.createDefault(defaultValue);
    }

    /**
     * 發(fā)送事件
     * @param object
     */
    public void publish(T object) {
        mData.onNext(object);
    }

    /**
     * 訂閱事件
     * @param onNext
     * @return
     */
    public  Disposable subscribe(Consumer onNext) {
        disposable = mData.subscribe(onNext);
        return disposable;
    }

    /**
     * 反訂閱
     *
     */
    public void dispose() {
        if (disposable != null && !disposable.isDisposed()) {
            disposable.dispose();
            disposable = null;
        }
    }

    /**
     * 獲取緩存數(shù)據(jù)的Subject
     *
     * @return
     */
    public BehaviorSubject<T> getCacheDataSubject() {
        return mData;
    }

    /**
     * 直接獲取最近的一個數(shù)據(jù)
     *
     * @return
     */
    public T getLastCacheData() {
        return mData.getValue();
    }
}

可以考慮在基類的Activity/Fragment中也實現(xiàn)一個類似的RxPreLoader。

總結(jié)

RxJava 的 Subject 是一種特殊的存在溉躲,它的靈活性在使用時也會伴隨著風(fēng)險锻梳,沒有用好它的話會錯過事件,并且使用時還要小心 Subject 不是線程安全的辩块。當然很多開源框架都在使用Subject废亭,例如大名鼎鼎的RxLifecycle使用了BehaviorSubject豆村。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末掌动,一起剝皮案震驚了整個濱河市粗恢,隨后出現(xiàn)的幾起案子撬统,更是在濱河造成了極大的恐慌恋追,老刑警劉巖,帶你破解...
    沈念sama閱讀 207,248評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件嗅绸,死亡現(xiàn)場離奇詭異鱼鸠,居然都是意外死亡,警方通過查閱死者的電腦和手機愉昆,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,681評論 2 381
  • 文/潘曉璐 我一進店門跛溉,熙熙樓的掌柜王于貴愁眉苦臉地迎上來芳室,“玉大人堪侯,你說我怎么就攤上這事荔仁》α海” “怎么了?”我有些...
    開封第一講書人閱讀 153,443評論 0 344
  • 文/不壞的土叔 我叫張陵,是天一觀的道長质蕉。 經(jīng)常有香客問我模暗,道長兑宇,這世上最難降的妖魔是什么粱坤? 我笑而不...
    開封第一講書人閱讀 55,475評論 1 279
  • 正文 為了忘掉前任枚驻,我火速辦了婚禮株旷,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘梯嗽。我一直安慰自己沽损,他們只是感情好缠俺,可當我...
    茶點故事閱讀 64,458評論 5 374
  • 文/花漫 我一把揭開白布壹士。 她就那樣靜靜地躺著,像睡著了一般唯笙。 火紅的嫁衣襯著肌膚如雪崩掘。 梳的紋絲不亂的頭發(fā)上苞慢,一...
    開封第一講書人閱讀 49,185評論 1 284
  • 那天挽放,我揣著相機與錄音蔓纠,去河邊找鬼腿倚。 笑死,一個胖子當著我的面吹牛暂筝,可吹牛的內(nèi)容都是我干的乖杠。 我是一名探鬼主播澄成,決...
    沈念sama閱讀 38,451評論 3 401
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼菲饼!你這毒婦竟也來了列赎?” 一聲冷哼從身側(cè)響起包吝,我...
    開封第一講書人閱讀 37,112評論 0 261
  • 序言:老撾萬榮一對情侶失蹤诗越,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后块促,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體竭翠,經(jīng)...
    沈念sama閱讀 43,609評論 1 300
  • 正文 獨居荒郊野嶺守林人離奇死亡斋扰,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,083評論 2 325
  • 正文 我和宋清朗相戀三年褥实,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片哥艇。...
    茶點故事閱讀 38,163評論 1 334
  • 序言:一個原本活蹦亂跳的男人離奇死亡貌踏,死狀恐怖祖乳,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情蜒秤,我是刑警寧澤汁咏,帶...
    沈念sama閱讀 33,803評論 4 323
  • 正文 年R本政府宣布攘滩,位于F島的核電站漂问,受9級特大地震影響女揭,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜磷仰,卻給世界環(huán)境...
    茶點故事閱讀 39,357評論 3 307
  • 文/蒙蒙 一芒划、第九天 我趴在偏房一處隱蔽的房頂上張望欧穴。 院中可真熱鬧,春花似錦拼苍、人聲如沸疮鲫。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,357評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽燕侠。三九已至绢彤,卻和暖如春蜓耻,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背饶氏。 一陣腳步聲響...
    開封第一講書人閱讀 31,590評論 1 261
  • 我被黑心中介騙來泰國打工嚷往, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人籍琳。 一個月前我還...
    沈念sama閱讀 45,636評論 2 355
  • 正文 我出身青樓趋急,卻偏偏與公主長得像势誊,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子查近,可洞房花燭夜當晚...
    茶點故事閱讀 42,925評論 2 344

推薦閱讀更多精彩內(nèi)容