理解RxJava(四)Subject用法及原理分析

概述

本文將盡可能將RxJava中的Subject相關類的用法做一個闡述,并對其原理進行簡單的解析驴一。

說到Subject,很多人可能都不是很熟悉它,因為相對于RxJava的Observable璃弄、Schedulers、Subscribes等關鍵字來講构回,它拋頭露面的場合似乎很少夏块。

事實上,Subject作用是很大的纤掸,借用官方的解釋脐供,Subject在同一時間內,既可以作為Observable借跪,也可以作為Observer:

在RxJava2.x中政己,官方一共為我們提供了以下幾種Subject:

  • ReplaySubject (釋放接收到的所有數(shù)據(jù))
  • BehaviorSubject (釋放訂閱前最后一個數(shù)據(jù)和訂閱后接收到的所有數(shù)據(jù))
  • PublishSubject (釋放訂閱后接收到的數(shù)據(jù))
  • AsyncSubject (僅釋放接收到的最后一個數(shù)據(jù))
  • SerializedSubject(串行Subject)
  • UnicastSubject (僅支持訂閱一次的Subject)
  • TestSubject(已廢棄,在2.x中被TestScheduler和TestObserver替代)

下面依次對以上的Subject進行講解掏愁,本文將重點講解BehaviorSubjectPublishSubject歇由。前者是RxLifecycle中核心類所使用到,后者則是RxBus實現(xiàn)事件總線的核心類果港。

在開始正文之前沦泌,我們需要搞清楚一個問題:

Subject是什么?

Subject在ReactiveX是作為observer和observerable的一個bridge或者proxy辛掠。因為它是一個觀察者谢谦,所以它可以訂閱一個或多個可觀察對象,同時因為他是一個可觀測對象萝衩,所以它可以傳遞和釋放它觀測到的數(shù)據(jù)對象回挽,并且能釋放新的對象。

上文已經(jīng)說的很清楚了猩谊,它既可以是數(shù)據(jù)源observerable厅各,也可以是數(shù)據(jù)的訂閱者Observer

我們點開源碼:

public abstract class Subject<T> extends Observable<T> implements Observer<T> {
    ...
}

可以看到预柒,Subject實際上還是Observable队塘,只不過它繼承了Observer接口袁梗,可以通過onNext、onComplete憔古、onError方法發(fā)射和終止發(fā)射數(shù)據(jù)遮怜。

我們從不同種類的Subject展開來講:

一、ReplaySubject

介紹

該Subject會接收數(shù)據(jù)鸿市,當被訂閱時锯梁,將所有接收到的數(shù)據(jù)全部發(fā)送給訂閱者。

這意味著焰情,不管何時訂閱這個Subject陌凳,這個Subject會把它接收到的數(shù)據(jù)都發(fā)送出去:

  ReplaySubject<Object> subject = new ReplaySubject<>();
  subject.onNext("one");
  subject.onNext("two");
  subject.onNext("three");
  subject.onComplete();

  // both of the following will get the onNext/onComplete calls from above
  subject.subscribe(observer1);
  subject.subscribe(observer2);

一圖頂千言,下方的箭頭代表兩次不同時間的訂閱:

ReplaySubject

很顯然,無論是在接收到數(shù)據(jù)前還是數(shù)據(jù)后訂閱内舟,ReplaySubject都會發(fā)射所有數(shù)據(jù)給訂閱者合敦。

原理

事實上,每個Subject的實現(xiàn)都不可避免的涉及到線程安全的問題验游,而RxJava則是依靠在內部使用原子操作類(AtomicXXX系列)保證線程安全充岛,本文不深入討論原子操作類相關知識,有興趣的朋友可以參考:
深入分析Java中的原子操作 @碼農一枚

ReplaySubject類時Subject相關類中代碼最多的一個類耕蝉,但這并不意味著這個類是最難理解的崔梗,相反,它的原理很直白:通過一個List動態(tài)存儲所有接收到的數(shù)據(jù)垒在,當被訂閱時蒜魄,將所有的數(shù)據(jù)都發(fā)送給訂閱者。

是不是覺得很熟悉场躯?是的权悟,其根本就是一個動態(tài)的鏈表,甚至其創(chuàng)建時的基礎容量也是16推盛,并且隨著數(shù)據(jù)的不斷增加峦阁,每次遞增50%,如果想要節(jié)省開支耘成,也可以自己定義初始容量和遞增規(guī)則榔昔。

千余行的代碼并非上述總結的那么簡單,不過作為了解和使用已經(jīng)足夠瘪菌,事實上撒会,這其中絕大部分都是算法和數(shù)據(jù)結構的處理,筆者研究源碼時师妙,也并沒有去深入分析源碼诵肛,只是淺嘗輒止。

二默穴、BehaviorSubject

當Observer訂閱了一個BehaviorSubject怔檩,它一開始就會釋放Observable最近釋放的一個數(shù)據(jù)對象褪秀,當還沒有任何數(shù)據(jù)釋放時,它則是一個默認值薛训。接下來就會釋放Observable釋放的所有數(shù)據(jù)媒吗。如果Observable因異常終止,BehaviorSubject將不會向后續(xù)的Observer釋放數(shù)據(jù)乙埃,但是會向Observer傳遞一個異常通知闸英。

簡單來說,就是釋放訂閱前最后一個數(shù)據(jù)訂閱后接收到的所有數(shù)據(jù):

  // observer will receive all 4 events (including "default").
  BehaviorSubject<Object> subject = BehaviorSubject.createDefault("default");
  subject.subscribe(observer);
  subject.onNext("one");
  subject.onNext("two");
  subject.onNext("three");

  // observer will receive the "one", "two" and "three" events, but not "zero"
  BehaviorSubject<Object> subject = BehaviorSubject.create();
  subject.onNext("zero");
  subject.onNext("one");
  subject.subscribe(observer);
  subject.onNext("two");
  subject.onNext("three");

  // observer will receive only onComplete
  BehaviorSubject<Object> subject = BehaviorSubject.create();
  subject.onNext("zero");
  subject.onNext("one");
  subject.onComplete();
  subject.subscribe(observer);

  // observer will receive only onError
  BehaviorSubject<Object> subject = BehaviorSubject.create();
  subject.onNext("zero");
  subject.onNext("one");
  subject.onError(new RuntimeException("error"));
  subject.subscribe(observer);

所有case都通過上述代碼進行了分析介袜,我們看下圖:

BehaviorSubject

原理

我們簡單看一下BehaviorSubject內部的幾個成員:

public final class BehaviorSubject<T> extends Subject<T> {

    private static final Object[] EMPTY_ARRAY = new Object[0];
    //原子操作類甫何,當前接收到的最后一個數(shù)據(jù)
    final AtomicReference<Object> value;
  
    //原子操作類,BehaviorDisposable內部存儲了所有接受到的數(shù)據(jù)
    final AtomicReference<BehaviorDisposable<T>[]> subscribers;
    
    //標記遇伞,意味著一個空的BehaviorDisposable
    static final BehaviorDisposable[] EMPTY = new BehaviorDisposable[0];
    
    //標記辙喂,意味著已經(jīng)達到了TERMINATED,終止數(shù)據(jù)的發(fā)射
    static final BehaviorDisposable[] TERMINATED = new BehaviorDisposable[0];
...
}

我們再來看一下BehaviorDisposable:

    static final class BehaviorDisposable<T> implements Disposable, NonThrowingPredicate<Object> {
      ...
      //實際上內部有一個List赃额,存儲所有接收到的數(shù)據(jù)
      AppendOnlyLinkedArrayList<Object> queue;
      ...
}

其原理是:

1.當BehaviorSubject.create()實例化時,默認將EMPTY賦值給subscribers叫确,這意味著此時Subject中沒有數(shù)據(jù)跳芳,:

    BehaviorSubject() {
        //...省略其他代碼
        //這意味著Subject中沒有數(shù)據(jù)
        this.subscribers = new AtomicReference<BehaviorDisposable<T>[]>(EMPTY);  
        //當前沒有最終的數(shù)據(jù)
        this.value = new AtomicReference<Object>();
    }

2.當通過onNext發(fā)射數(shù)據(jù)時,存儲數(shù)據(jù):

    @Override
    public void onNext(T t) {
        //省略判斷代碼
        Object o = NotificationLite.next(t);
        setCurrent(o);    //內部就是給成員value賦值竹勉,記錄最新接收到的數(shù)據(jù)
        for (BehaviorDisposable<T> bs : subscribers.get()) {
            bs.emitNext(o, index);  //其實就是EMPTY.emitNext()
        }
    }

3.當訂閱時飞盆,執(zhí)行subscribeActual方法:

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        BehaviorDisposable<T> bs = new BehaviorDisposable<T>(observer, this);
        observer.onSubscribe(bs);
        //1.執(zhí)行add(bs)方法
        if (add(bs)) {
            if (bs.cancelled) {
                remove(bs);//2.如果已經(jīng)取消發(fā)射,則不中斷數(shù)據(jù)的傳遞
            } else {
                bs.emitFirst();//3.執(zhí)行bs.emitFirst方法
            }
        } else {
            //....
        }
    }

看一下add方法:

    boolean add(BehaviorDisposable<T> rs) {
        for (;;) {
            BehaviorDisposable<T>[] a = subscribers.get();
            if (a == TERMINATED) {
                return false;
            }
            int len = a.length;
            BehaviorDisposable<T>[] b = new BehaviorDisposable[len + 1];
            System.arraycopy(a, 0, b, 0, len);
            b[len] = rs;
            //原子數(shù)據(jù)的更新操作
            if (subscribers.compareAndSet(a, b)) {
                return true;
            }
        }
    }

不熟悉原子操作類的同學次乓,我簡單介紹一下上述代碼的思想:
首先獲取subscribers的值(應該獲得一個BehaviorDisposable數(shù)組吓歇,其中有且僅有一個元素EMPTY),然后將該數(shù)組擴容+1,將新創(chuàng)建的BehaviorDisposable對象放入數(shù)組index=1的位置上票腰,并將該數(shù)組賦值給subscriber城看。

可以看到,RxJava中的源碼并沒有使用Java Util包中的Collections相關工具類進行數(shù)據(jù)的操作杏慰,而是直接使用System.arraycopy(a, 0, b, 0, len)這個Native底層方法進行數(shù)組的處理测柠,減少了額外的開支。

至于BehaviorDisposable的emitFirst方法缘滥,我們不難想象轰胁,應該是處理數(shù)據(jù)的發(fā)射相關邏輯,實際上它會發(fā)射BehaviorSubject的value朝扼,這也就是為什么當訂閱后赃阀,observer會先接收到訂閱前的最后一個數(shù)據(jù)的原因。

這之后擎颖,每當onNext()從上游接收到一個數(shù)據(jù)榛斯,都會subscribers數(shù)組里對每一個BehaviorDisposable中的observer向下發(fā)射數(shù)據(jù)观游。

BehaviorSubject小結

其原理就是通過subscribers這個核心的成員,它是一個不斷變化的數(shù)組肖抱。在創(chuàng)建時备典,其內部只是一個EMPTY(BehaviorDisposable)對象,每次被訂閱意述,都會在既有的數(shù)組上新加一個BehaviorDisposable對象提佣,這個對象中包含了一個List,存儲之后會收到的數(shù)據(jù)荤崇。

同時拌屏,BehaviorSubject還有一個value的成員,該成員會隨著數(shù)據(jù)的不斷接收而進行更新术荤,它總是記錄著當前最后一個接收到的數(shù)據(jù)倚喂,當被subscribe時,會執(zhí)行emitFirst()方法瓣戚,發(fā)射當前記錄的數(shù)據(jù)端圈,也就是訂閱前接收到的最后一個數(shù)據(jù)。

三子库、PublishSubject

PublishSubject僅會向Observer釋放在訂閱之后Observable釋放的數(shù)據(jù)舱权。

參考前兩個Subject的子類,這個類很好理解仑嗅,先上代碼:

  PublishSubject<Object> subject = PublishSubject.create();
  // observer1 will receive all onNext and onComplete events
  subject.subscribe(observer1);
  subject.onNext("one");
  subject.onNext("two");

  // observer2 will only receive "three" and onComplete
  subject.subscribe(observer2);
  subject.onNext("three");
  subject.onComplete();

用圖說話:

PublishSubject

原理

和BehaviorSubject原理思想如出一轍宴倍, 而PublishSubject更簡單,其內部的PublishDisposable的原子操作類連AtomicReference都不是仓技,而是AtomicBoolean:

public final class PublishSubject<T> extends Subject<T> {
    static final PublishDisposable[] TERMINATED = new PublishDisposable[0];
    static final PublishDisposable[] EMPTY = new PublishDisposable[0];

    final AtomicReference<PublishDisposable<T>[]> subscribers;
}
static final class PublishDisposable<T> extends AtomicBoolean implements Disposable {
}

其原理就是通過subscribers這個核心的成員鸵贬,它是一個不斷變化的數(shù)組。在創(chuàng)建時脖捻,其內部只是一個EMPTY(PublishDisposable)對象阔逼,每次被訂閱,都會在既有的數(shù)組上新加一個PublishDisposable對象地沮,這個對象中包含了一個List颜价,存儲之后會收到的數(shù)據(jù)。

其實往上翻诉濒,你會發(fā)現(xiàn)周伦,這段總結是我復制BehaviorSubject小結的第一段......不是(其實就是)偷懶,而是兩者本身幾乎沒什么區(qū)別未荒,PublishDisposable更簡單专挪。

嘗試放棄RxBus !!!

為什么要著重說PublishSubject這個類,原因很簡單,因為在目前流行的事件總線處理方式RxBus中寨腔,其原理就是使用PublishSubject進行數(shù)據(jù)的分發(fā)速侈。

RxBus的代碼基本如下:

public class RxBus {

    private static RxBus rxBus;
    private final Subject<Object, Object> _bus = new SerializedSubject<>(PublishSubject.create());

    private RxBus() {
    }

    public static RxBus getInstance() {
        if (rxBus == null) {
            synchronized (RxBus.class) {
                if (rxBus == null) {
                    rxBus = new RxBus();
                }
            }
        }
        return rxBus;
    }

    public void send(Object o) {
        _bus.onNext(o);
    }

    public Observable<Object> toObserverable() {
        return _bus;
    }
}

每個人的RxBus類略有不同,可能有的RxBus類會添加類似ofType()相關的filter操作符對數(shù)據(jù)進行篩選迫卢,但其原理基本一模一樣倚搬,都是通過PublishSubject進行數(shù)據(jù)的接收和分發(fā),這樣的使用會有什么后果乾蛤,請參考下文:

放棄RxBus每界,擁抱RxJava(一):為什么避免使用EventBus/RxBus

文中作者已經(jīng)很形象舉出了RxBus的種種缺陷,以及對于事件的傳遞應該如何處理家卖。

在分析完PublishSubject的原理后眨层,我們更應該清楚,RxBus類似一個全局的Observable上荡,不斷地接受新的數(shù)據(jù)趴樱,以及接受訂閱,如果使用不當酪捡,會導致代碼的混亂叁征,我們也應該考慮,在接受的數(shù)據(jù)和訂閱越來越多逛薇,PublishSubject的單例對象會占用越來越多的內存捺疼,如果沒有處理好生命周期相關,則會導致嚴重的內存泄漏金刁!

因此帅涂,已經(jīng)分析完RxBus(實際上也就是PublishSubject )后议薪,我們應當對其使用的方式有一個全新的認知尤蛮,使用,亦或放棄RxBus斯议。

四产捞、AsyncSubject

AsyncSubject僅釋放Observable釋放的最后一個數(shù)據(jù),并且僅在Observable完成之后哼御。然而如果當Observable因為異常而終止坯临,AsyncSubject將不會釋放任何數(shù)據(jù),但是會向Observer傳遞一個異常通知恋昼。

隨著可以接收到數(shù)據(jù)的范圍越來越小看靠,似乎也越來越好理解了,直接看圖:

AsyncSubject

原理

其原理就更簡單了液肌,這下原子操作了連AtomicBoolean都不是了挟炬,直接就是AtomicInteger:

public final class AsyncSubject<T> extends Subject<T> {

    static final AsyncDisposable[] EMPTY = new AsyncDisposable[0];

    static final AsyncDisposable[] TERMINATED = new AsyncDisposable[0];

    final AtomicReference<AsyncDisposable<T>[]> subscribers;
    //記錄最新的數(shù)據(jù)
    T value;
}
//繼承DeferredScalarDisposable
static final class AsyncDisposable<T> extends DeferredScalarDisposable<T> {}

//繼承BasicIntQueueDisposable
public class DeferredScalarDisposable<T> extends BasicIntQueueDisposable<T> {}

//繼承AtomicInteger
public abstract class BasicIntQueueDisposable<T>
extends AtomicInteger
implements QueueDisposable<T> {}

每當執(zhí)行onComplete()的時候,都會執(zhí)行complete(Object o)方法,先發(fā)送最后一個數(shù)據(jù),然后執(zhí)行onComplete():

    public final void complete(T value) {
        //......
        Observer<? super T> a = actual;
        //先發(fā)送最后一個數(shù)據(jù)
        a.onNext(value);
        if (get() != DISPOSED) {
            //執(zhí)行onComplete()
            a.onComplete();
        }
    }

五谤祖、簡單介紹UnicastSubject和SerializedSubject

UnicastSubject

僅支持訂閱一次的Subject,如果多個訂閱者試圖訂閱這個Subject婿滓,若該subject未terminate,將會受到IllegalStateException 粥喜,若已經(jīng)terminate凸主,那么只會執(zhí)行onError或者onComplete方法。

SerializedSubject

回到剛剛的RxBus類中额湘,我們發(fā)現(xiàn)了SerializedSubject的身影卿吐,實際上,它的作用就是:

將Subject串行化的方法缩挑,所有其他的Observable和Subject方法都是線程安全的但两。

我們也可以通過Subject.toSerialized()方法將Subject對象串行化保證其線程安全:

public abstract class Subject<T> extends Observable<T> implements Observer<T> {
    //......
    public final Subject<T> toSerialized() {
        if (this instanceof SerializedSubject) {
            return this;
        }
        return new SerializedSubject<T>(this);
    }
}

我們直接打開這個類,會發(fā)現(xiàn)供置,其實該類中谨湘,大部分代碼都會通過synchronized加鎖,這意味著會有額外的性能消耗芥丧。

這也更進一步說明了紧阔,使用RxBus,會額外增加更多的支出(因為RxBus中PublisSuject本身的單例對象就是調用了toSerialized()方法保證線程安全)续担。

?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末擅耽,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子物遇,更是在濱河造成了極大的恐慌乖仇,老刑警劉巖,帶你破解...
    沈念sama閱讀 216,402評論 6 499
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件询兴,死亡現(xiàn)場離奇詭異乃沙,居然都是意外死亡,警方通過查閱死者的電腦和手機诗舰,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,377評論 3 392
  • 文/潘曉璐 我一進店門警儒,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人眶根,你說我怎么就攤上這事蜀铲。” “怎么了属百?”我有些...
    開封第一講書人閱讀 162,483評論 0 353
  • 文/不壞的土叔 我叫張陵记劝,是天一觀的道長。 經(jīng)常有香客問我族扰,道長厌丑,這世上最難降的妖魔是什么钳恕? 我笑而不...
    開封第一講書人閱讀 58,165評論 1 292
  • 正文 為了忘掉前任,我火速辦了婚禮蹄衷,結果婚禮上忧额,老公的妹妹穿的比我還像新娘。我一直安慰自己愧口,他們只是感情好睦番,可當我...
    茶點故事閱讀 67,176評論 6 388
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著耍属,像睡著了一般托嚣。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上厚骗,一...
    開封第一講書人閱讀 51,146評論 1 297
  • 那天示启,我揣著相機與錄音,去河邊找鬼领舰。 笑死夫嗓,一個胖子當著我的面吹牛,可吹牛的內容都是我干的冲秽。 我是一名探鬼主播舍咖,決...
    沈念sama閱讀 40,032評論 3 417
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼锉桑!你這毒婦竟也來了排霉?” 一聲冷哼從身側響起,我...
    開封第一講書人閱讀 38,896評論 0 274
  • 序言:老撾萬榮一對情侶失蹤民轴,失蹤者是張志新(化名)和其女友劉穎攻柠,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體后裸,經(jīng)...
    沈念sama閱讀 45,311評論 1 310
  • 正文 獨居荒郊野嶺守林人離奇死亡瑰钮,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 37,536評論 2 332
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了轻抱。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片飞涂。...
    茶點故事閱讀 39,696評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡旦部,死狀恐怖祈搜,靈堂內的尸體忽然破棺而出,到底是詐尸還是另有隱情士八,我是刑警寧澤容燕,帶...
    沈念sama閱讀 35,413評論 5 343
  • 正文 年R本政府宣布,位于F島的核電站婚度,受9級特大地震影響蘸秘,放射性物質發(fā)生泄漏。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,008評論 3 325
  • 文/蒙蒙 一醋虏、第九天 我趴在偏房一處隱蔽的房頂上張望寻咒。 院中可真熱鬧,春花似錦颈嚼、人聲如沸毛秘。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,659評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽叫挟。三九已至,卻和暖如春限煞,著一層夾襖步出監(jiān)牢的瞬間抹恳,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,815評論 1 269
  • 我被黑心中介騙來泰國打工署驻, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留奋献,地道東北人。 一個月前我還...
    沈念sama閱讀 47,698評論 2 368
  • 正文 我出身青樓旺上,卻偏偏與公主長得像秽荞,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子抚官,可洞房花燭夜當晚...
    茶點故事閱讀 44,592評論 2 353

推薦閱讀更多精彩內容