RxJava2 系列-3:使用 Subject

imgtrip-com-2559b11dd61c0946ae8868b27395dcce13569748.jpg

在這篇文章中袋马,我們會(huì)先分析一下 RxJava2 中的 Subject ;然后,我們會(huì)使用 Subject 制作一個(gè)類(lèi)似于 EventBus 的全局的通信工具损话。

在了解本篇文章的內(nèi)容之前,你需要先了解 RxJava2 中的一些基本的用法槽唾,比如 Observable 以及背壓的概念丧枪,你可以參考我的其他兩篇文章來(lái)獲取這部分內(nèi)容:《RxJava2 系列 (1):一篇的比較全面的 RxJava2 方法總結(jié)》《RxJava2 系列 (2):背壓和Flowable》

1庞萍、Subject

1.1 Subject 的兩個(gè)特性

Subject 可以同時(shí)代表 Observer 和 Observable拧烦,允許從數(shù)據(jù)源中多次發(fā)送結(jié)果給多個(gè)觀察者。除了 onSubscribe(), onNext(), onError() 和 onComplete() 之外钝计,所有的方法都是線程安全的恋博。此外,你還可以使用 toSerialized() 方法私恬,也就是轉(zhuǎn)換成串行的债沮,將這些方法設(shè)置成線程安全的。

如果你已經(jīng)了解了 Observable 和 Observer 本鸣,那么也許直接看 Subject 的源碼定義會(huì)更容易理解:

public abstract class Subject<T> extends Observable<T> implements Observer<T> {

    // ...
}

從上面看出疫衩,Subject 同時(shí)繼承了 Observable 和 Observer 兩個(gè)接口,說(shuō)明它既是被觀察的對(duì)象永高,同時(shí)又是觀察對(duì)象隧土,也就是可以生產(chǎn)、可以消費(fèi)命爬、也可以自己生產(chǎn)自己消費(fèi)曹傀。所以,我們可以項(xiàng)下面這樣來(lái)使用它饲宛。這里我們用到的是該接口的一個(gè)實(shí)現(xiàn) PublishSubject :

public static void main(String...args) {
    PublishSubject<Integer> subject = PublishSubject.create();
    subject.subscribe(System.out::println);

    Executor executor = Executors.newFixedThreadPool(5);
    Disposable disposable = Observable.range(1, 5).subscribe(i ->
            executor.execute(() -> {
                try {
                    Thread.sleep(i * 200);
                    subject.onNext(i);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }));
}

根據(jù)程序的執(zhí)行結(jié)果皆愉,程序在第200, 400, 600, 800, 1000毫秒依次輸出了1到5的數(shù)字。

在這里,我們用 PublishSubject 創(chuàng)建了一個(gè)主題并對(duì)其監(jiān)聽(tīng)幕庐,然后在線程當(dāng)中又通知該主題內(nèi)容變化久锥,整個(gè)過(guò)程我們都只操作了 PublishSubject 一個(gè)對(duì)象。顯然异剥,使用 Subject 我們可以達(dá)到對(duì)一個(gè)指定類(lèi)型的值的結(jié)果進(jìn)行監(jiān)聽(tīng)的目的——我們把值改變之后對(duì)應(yīng)的邏輯寫(xiě)在 subscribe() 方法中瑟由,然后每次調(diào)用 onNext() 等方法通知結(jié)果之后就可以自動(dòng)調(diào)用 subscribe() 方法進(jìn)行更新操作。

同時(shí)冤寿,因?yàn)?Subject 實(shí)現(xiàn)了 Observer 接口歹苦,并且在 Observable 等的 subscribe() 方法中存在一個(gè)以 Observer 作為參數(shù)的方法(如下),所以督怜,Subject 也是可以作為消費(fèi)者來(lái)對(duì)事件進(jìn)行消費(fèi)的殴瘦。

public final void subscribe(Observer<? super T> observer) 

以上就是 Subject 的兩個(gè)主要的特性。

1.2 Subject 的實(shí)現(xiàn)類(lèi)

在 RxJava2 号杠,Subject 有幾個(gè)默認(rèn)的實(shí)現(xiàn)蚪腋,下面我們對(duì)它們之間的區(qū)別做簡(jiǎn)單的說(shuō)明:

  1. AsyncSubject:只有當(dāng) Subject 調(diào)用 onComplete 方法時(shí),才會(huì)將 Subject 中的最后一個(gè)事件傳遞給所有的 Observer姨蟋。
  2. BehaviorSubject:該類(lèi)有創(chuàng)建時(shí)需要一個(gè)默認(rèn)參數(shù)屉凯,該默認(rèn)參數(shù)會(huì)在 Subject 未發(fā)送過(guò)其他的事件時(shí),向注冊(cè)的 Observer 發(fā)送芬探;新注冊(cè)的 Observer 不會(huì)收到之前發(fā)送的事件神得,這點(diǎn)和 PublishSubject 一致。
  3. PublishSubject:不會(huì)改變事件的發(fā)送順序偷仿;在已經(jīng)發(fā)送了一部分事件之后注冊(cè)的 Observer 不會(huì)收到之前發(fā)送的事件哩簿。
  4. ReplaySubject:無(wú)論什么時(shí)候注冊(cè) Observer 都可以接收到任何時(shí)候通過(guò)該 Observable 發(fā)射的事件。
  5. UnicastSubject:只允許一個(gè) Observer 進(jìn)行監(jiān)聽(tīng)酝静,在該 Observer 注冊(cè)之前會(huì)將發(fā)射的所有的事件放進(jìn)一個(gè)隊(duì)列中节榜,并在 Observer 注冊(cè)的時(shí)候一起通知給它。

對(duì)比 PublishSubject 和 ReplaySubject别智,它們的區(qū)別在于新注冊(cè)的 Observer 是否能夠收到在它注冊(cè)之前發(fā)送的事件宗苍。這個(gè)類(lèi)似于 EventBus 中的 StickyEvent 即黏性事件,為了說(shuō)明這一點(diǎn)薄榛,我們準(zhǔn)備了下面兩段代碼:

private static void testPublishSubject() throws InterruptedException {
    PublishSubject<Integer> subject = PublishSubject.create();
    subject.subscribe(i -> System.out.print("(1: " + i + ") "));

    Executor executor = Executors.newFixedThreadPool(5);
    Disposable disposable = Observable.range(1, 5).subscribe(i -> executor.execute(() -> {
        try {
            Thread.sleep(i * 200);
            subject.onNext(i);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }));

    Thread.sleep(500);
    subject.subscribe(i -> System.out.print("(2: " + i + ") "));

    Observable.timer(2, TimeUnit.SECONDS).subscribe(i -> ((ExecutorService) executor).shutdown());
}

private static void testReplaySubject() throws InterruptedException {
    ReplaySubject<Integer> subject = ReplaySubject.create();
    subject.subscribe(i -> System.out.print("(1: " + i + ") "));

    Executor executor = Executors.newFixedThreadPool(5);
    Disposable disposable = Observable.range(1, 5).subscribe(i -> executor.execute(() -> {
        try {
            Thread.sleep(i * 200);
            subject.onNext(i);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }));

    Thread.sleep(500);
    subject.subscribe(i -> System.out.print("(2: " + i + ") "));

    Observable.timer(2, TimeUnit.SECONDS).subscribe(i -> ((ExecutorService) executor).shutdown());
}

它們的輸出結(jié)果依次是

PublishSubject的結(jié)果:(1: 1) (1: 2) (1: 3) (2: 3) (1: 4) (2: 4) (1: 5) (2: 5)
ReplaySubject的結(jié)果: (1: 1) (1: 2) (2: 1) (2: 2) (1: 3) (2: 3) (1: 4) (2: 4) (1: 5) (2: 5)

從上面的結(jié)果對(duì)比中讳窟,我們可以看出前者與后者的區(qū)別在于新注冊(cè)的 Observer 并沒(méi)有收到在它注冊(cè)之前發(fā)送的事件。試驗(yàn)的結(jié)果與上面的敘述是一致的敞恋。

其他的測(cè)試代碼這不一并給出了丽啡,詳細(xì)的代碼可以參考Github - Java Advanced

2硬猫、用 RxJava 打造 EventBus

2.1 打造 EventBus

清楚了 Subject 的概念之后补箍,讓我們來(lái)做一個(gè)實(shí)踐——用 RxJava 打造 EventBus改执。

我們先考慮用一個(gè)全局的 PublishSubject 來(lái)解決這個(gè)問(wèn)題沦疾,當(dāng)然穴墅,這意味著我們發(fā)送的事件不是黏性事件碰镜。不過(guò)古劲,沒(méi)關(guān)系,只要這種實(shí)現(xiàn)方式搞懂了颁褂,用 ReplaySubject 做一個(gè)發(fā)送黏性事件的 EventBus 也非難事席里。

考慮一下签则,如果要實(shí)現(xiàn)這個(gè)功能我們需要做哪些準(zhǔn)備:

  1. 我們需要發(fā)送事件并能夠正確地接收到事件遥诉。要實(shí)現(xiàn)這個(gè)目的并不難后豫,因?yàn)?Subject 本身就具有發(fā)送和接收兩個(gè)能力,作為全局的之后就具有了全局的注冊(cè)和通知的能力突那。因此,不論你在什么位置發(fā)送了事件构眯,任何訂閱的地方都能收到該事件愕难。
  2. 首先,我們要在合適的位置對(duì)事件進(jìn)行監(jiān)聽(tīng)惫霸,并在合適的位置取消事件的監(jiān)聽(tīng)猫缭。如果我們沒(méi)有在適當(dāng)?shù)臅r(shí)機(jī)釋放事件,會(huì)不會(huì)造成內(nèi)存泄漏呢壹店?這還是有可能的猜丹。所以,我們需要對(duì)注冊(cè)監(jiān)聽(tīng)的觀察者進(jìn)行記錄硅卢,并提供注冊(cè)和取消注冊(cè)的方法射窒,給它們?cè)谥付ǖ纳芷谥羞M(jìn)行調(diào)用。

好了将塑,首先是全局的 Subject 的問(wèn)題脉顿,我們可以實(shí)現(xiàn)一個(gè)靜態(tài)的或者單例的 Subject。這里我們選擇使用后者点寥,所以艾疟,我們需要一個(gè)單例的方式來(lái)使用 Subject:

public class RxBus {

private static volatile RxBus rxBus;

private final Subject<Object> subject = PublishSubject.create().toSerialized();

public static RxBus getRxBus() {
    if (rxBus == null) {
        synchronized (RxBus.class) {
            if(rxBus == null) {
                rxBus = new RxBus();
            }
        }
    }
    return rxBus;
}

}

這里我們應(yīng)用了 DCL 的單例模式提供一個(gè)單例的 RxBus,對(duì)應(yīng)一個(gè)唯一的 Subject. 這里我們用到了 Subject 的toSerialized()敢辩,我們上面已經(jīng)提到過(guò)它的作用蔽莱,就是用來(lái)保證 onNext() 等方法的線程安全性。

另外戚长,因?yàn)?Observalbe 本身是不支持背壓的盗冷,所以,我們還需要將該 Observable 轉(zhuǎn)換成 Flowable 來(lái)實(shí)現(xiàn)背壓的效果:

public <T> Flowable<T> getObservable(Class<T> type){
    return subject.toFlowable(BackpressureStrategy.BUFFER).ofType(type);
}

這里我們用到的背壓的策略是BackpressureStrategy.BUFFER历葛,它會(huì)緩存發(fā)射結(jié)果正塌,直到有消費(fèi)者訂閱了它嘀略。而這里的ofType()方法的作用是用來(lái)過(guò)濾發(fā)射的事件的類(lèi)型,只有指定類(lèi)型的事件會(huì)被發(fā)布乓诽。

然后帜羊,我們需要記錄訂閱者的信息以便在適當(dāng)?shù)臅r(shí)機(jī)取消訂閱,這里我們用一個(gè)Map<String, CompositeDisposable>類(lèi)型的哈希表來(lái)解決鸠天。這里的CompositeDisposable用來(lái)存儲(chǔ) Disposable讼育,從而達(dá)到一個(gè)訂閱者對(duì)應(yīng)多個(gè) Disposable 的目的。CompositeDisposable是一個(gè) Disposable 的容器稠集,聲稱(chēng)可以達(dá)到 O(1) 的增奶段、刪的復(fù)雜度。這里的做法目的是使用注冊(cè)觀察之后的 Disposable 的 dispose() 方法來(lái)取消訂閱剥纷。所以痹籍,我們可以得到下面的這段代碼:

public void addSubscription(Object o, Disposable disposable) {
    String key = o.getClass().getName();
    if (disposableMap.get(key) != null) {
        disposableMap.get(key).add(disposable);
    } else {
        CompositeDisposable disposables = new CompositeDisposable();
        disposables.add(disposable);
        disposableMap.put(key, disposables);
    }
}

public void unSubscribe(Object o) {
    String key = o.getClass().getName();
    if (!disposableMap.containsKey(key)) {
        return;
    }
    if (disposableMap.get(key) != null) {
        disposableMap.get(key).dispose();
    }
    disposableMap.remove(key);
}

最后,對(duì)外提供一下 Subject 的訂閱和發(fā)布方法晦鞋,整個(gè) EventBus 就制作完成了:

public void post(Object o){
    subject.onNext(o);
}

public <T> Disposable doSubscribe(Class<T> type, Consumer<T> next, Consumer<Throwable> error){
    return getObservable(type)
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(next,error);
}

2.2 測(cè)試效果

我們只需要在最頂層的 Activity 基類(lèi)中加入如下的代碼蹲缠。這樣,我們就不需要在各個(gè) Activity 中取消注冊(cè)了悠垛。然后线定,就可以使用這些頂層的方法來(lái)進(jìn)行操作了。

protected void postEvent(Object object) {
    RxBus.getRxBus().post(object);
}

protected <M> void addSubscription(Class<M> eventType, Consumer<M> action) {
    Disposable disposable = RxBus.getRxBus().doSubscribe(eventType, action, LogUtils::d);
    RxBus.getRxBus().addSubscription(this, disposable);
}

protected <M> void addSubscription(Class<M> eventType, Consumer<M> action, Consumer<Throwable> error) {
    Disposable disposable = RxBus.getRxBus().doSubscribe(eventType, action, error);
    RxBus.getRxBus().addSubscription(this, disposable);
}

@Override
protected void onDestroy() {
    super.onDestroy();
    RxBus.getRxBus().unSubscribe(this);
}

在第一個(gè) Activity 中我們對(duì)指定的類(lèi)型的結(jié)果進(jìn)行監(jiān)聽(tīng):

addSubscription(RxMessage.class, rxMessage -> ToastUtils.makeToast(rxMessage.message));

然后确买,我們?cè)诹硪粋€(gè) Activity 中發(fā)布事件:

postEvent(new RxMessage("Hello world!"));

這樣當(dāng)?shù)诙€(gè) Activity 中調(diào)用指定的發(fā)送事件的方法之后斤讥,第一個(gè) Activity 就可以接收到發(fā)射的事件了。

總結(jié)

好了湾趾,以上就是 Subject 的使用芭商,如果要用一個(gè)詞來(lái)形容它的話,那么只能是“自給自足”了撑帖。就是說(shuō)蓉坎,它同時(shí)做了 Observable 和 Observer 的工作,既可以發(fā)射事件又可以對(duì)事件進(jìn)行消費(fèi)胡嘿,可謂身兼數(shù)職蛉艾。它在那種想要對(duì)某個(gè)值進(jìn)行監(jiān)聽(tīng)并處理的情形特別有用。因?yàn)樗恍枰銓?xiě)多個(gè)冗余的類(lèi)衷敌,只要它一個(gè)就完成了其他兩個(gè)類(lèi)來(lái)完成的任務(wù)勿侯,因而代碼更加簡(jiǎn)潔。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末缴罗,一起剝皮案震驚了整個(gè)濱河市助琐,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌面氓,老刑警劉巖兵钮,帶你破解...
    沈念sama閱讀 219,188評(píng)論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件蛆橡,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡掘譬,警方通過(guò)查閱死者的電腦和手機(jī)泰演,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,464評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門(mén),熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)葱轩,“玉大人睦焕,你說(shuō)我怎么就攤上這事⊙ス埃” “怎么了垃喊?”我有些...
    開(kāi)封第一講書(shū)人閱讀 165,562評(píng)論 0 356
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)袜炕。 經(jīng)常有香客問(wèn)我本谜,道長(zhǎng),這世上最難降的妖魔是什么偎窘? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 58,893評(píng)論 1 295
  • 正文 為了忘掉前任耕突,我火速辦了婚禮,結(jié)果婚禮上评架,老公的妹妹穿的比我還像新娘。我一直安慰自己炕泳,他們只是感情好纵诞,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,917評(píng)論 6 392
  • 文/花漫 我一把揭開(kāi)白布。 她就那樣靜靜地躺著培遵,像睡著了一般浙芙。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上籽腕,一...
    開(kāi)封第一講書(shū)人閱讀 51,708評(píng)論 1 305
  • 那天嗡呼,我揣著相機(jī)與錄音,去河邊找鬼皇耗。 笑死南窗,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的郎楼。 我是一名探鬼主播万伤,決...
    沈念sama閱讀 40,430評(píng)論 3 420
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼呜袁!你這毒婦竟也來(lái)了敌买?” 一聲冷哼從身側(cè)響起,我...
    開(kāi)封第一講書(shū)人閱讀 39,342評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤阶界,失蹤者是張志新(化名)和其女友劉穎虹钮,沒(méi)想到半個(gè)月后聋庵,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,801評(píng)論 1 317
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡芙粱,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,976評(píng)論 3 337
  • 正文 我和宋清朗相戀三年祭玉,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片宅倒。...
    茶點(diǎn)故事閱讀 40,115評(píng)論 1 351
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡攘宙,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出拐迁,到底是詐尸還是另有隱情蹭劈,我是刑警寧澤,帶...
    沈念sama閱讀 35,804評(píng)論 5 346
  • 正文 年R本政府宣布线召,位于F島的核電站铺韧,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏缓淹。R本人自食惡果不足惜哈打,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,458評(píng)論 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望讯壶。 院中可真熱鬧料仗,春花似錦、人聲如沸伏蚊。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 32,008評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)躏吊。三九已至氛改,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間比伏,已是汗流浹背胜卤。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 33,135評(píng)論 1 272
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留赁项,地道東北人葛躏。 一個(gè)月前我還...
    沈念sama閱讀 48,365評(píng)論 3 373
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像悠菜,于是被迫代替她去往敵國(guó)和親紫新。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,055評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容

  • Android 自定義View的各種姿勢(shì)1 Activity的顯示之ViewRootImpl詳解 Activity...
    passiontim閱讀 172,180評(píng)論 25 707
  • 發(fā)現(xiàn) 關(guān)注 消息 RxSwift入坑解讀-你所需要知道的各種概念 沸沸騰關(guān)注 2016.11.27 19:11*字...
    楓葉1234閱讀 2,797評(píng)論 0 2
  • 注:本系列文章主要用于博主個(gè)人學(xué)習(xí)記錄李剖,本文末尾附上了一些較好的文章提供學(xué)習(xí)芒率。轉(zhuǎn)載請(qǐng)附 原文鏈接RxJava學(xué)習(xí)系...
    黑丫山上小旋風(fēng)閱讀 2,146評(píng)論 1 5
  • 我真的很痛苦 我規(guī)劃好了一切 都一直事與愿違 我覺(jué)得可能你是唯一懂我的人 可是你也不給我機(jī)會(huì)傾訴 對(duì)啊 我們什么關(guān)...
    舒科舒科舒科閱讀 343評(píng)論 0 0
  • 這顆禾樹(shù)長(zhǎng)在我家院子外邊,樹(shù)干有兩個(gè)人合抱那么大篙顺。 自小生長(zhǎng)在這顆樹(shù)下偶芍,大禾樹(shù)留給我許多的記憶充择。 第一是喜鵲。 禾...
    蹈海閱讀 3,984評(píng)論 0 2