概述
本文將盡可能將RxJava中的Subject相關類的用法做一個闡述,并對其原理進行簡單的解析驴一。
說到Subject,很多人可能都不是很熟悉它,因為相對于RxJava的Observable璃弄、Schedulers、Subscribes等關鍵字來講构回,它拋頭露面的場合似乎很少夏块。
事實上,Subject作用是很大的纤掸,借用官方的解釋脐供,Subject在同一時間內,既可以作為Observable借跪,也可以作為Observer:
在RxJava2.x中政己,官方一共為我們提供了以下幾種Subject:
- ReplaySubject (釋放接收到的所有數(shù)據(jù))
- BehaviorSubject (釋放訂閱前最后一個數(shù)據(jù)和訂閱后接收到的所有數(shù)據(jù))
- PublishSubject (釋放訂閱后接收到的數(shù)據(jù))
- AsyncSubject (僅釋放接收到的最后一個數(shù)據(jù))
- SerializedSubject(串行Subject)
- UnicastSubject (僅支持訂閱一次的Subject)
- TestSubject(已廢棄,在2.x中被TestScheduler和TestObserver替代)
下面依次對以上的Subject進行講解掏愁,本文將重點講解BehaviorSubject和PublishSubject歇由。前者是RxLifecycle中核心類所使用到,后者則是RxBus實現(xiàn)事件總線的核心類果港。
在開始正文之前沦泌,我們需要搞清楚一個問題:
Subject是什么?
Subject在ReactiveX是作為observer和observerable的一個bridge或者proxy辛掠。因為它是一個觀察者谢谦,所以它可以訂閱一個或多個可觀察對象,同時因為他是一個可觀測對象萝衩,所以它可以傳遞和釋放它觀測到的數(shù)據(jù)對象回挽,并且能釋放新的對象。
上文已經(jīng)說的很清楚了猩谊,它既可以是數(shù)據(jù)源observerable厅各,也可以是數(shù)據(jù)的訂閱者Observer。
我們點開源碼:
public abstract class Subject<T> extends Observable<T> implements Observer<T> {
...
}
可以看到预柒,Subject實際上還是Observable队塘,只不過它繼承了Observer接口袁梗,可以通過onNext、onComplete憔古、onError方法發(fā)射和終止發(fā)射數(shù)據(jù)遮怜。
我們從不同種類的Subject展開來講:
一、ReplaySubject
介紹
該Subject會接收數(shù)據(jù)鸿市,當被訂閱時锯梁,將所有接收到的數(shù)據(jù)全部發(fā)送給訂閱者。
這意味著焰情,不管何時訂閱這個Subject陌凳,這個Subject會把它接收到的數(shù)據(jù)都發(fā)送出去:
ReplaySubject<Object> subject = new ReplaySubject<>();
subject.onNext("one");
subject.onNext("two");
subject.onNext("three");
subject.onComplete();
// both of the following will get the onNext/onComplete calls from above
subject.subscribe(observer1);
subject.subscribe(observer2);
一圖頂千言,下方的箭頭代表兩次不同時間的訂閱:
很顯然,無論是在接收到數(shù)據(jù)前還是數(shù)據(jù)后訂閱内舟,ReplaySubject都會發(fā)射所有數(shù)據(jù)給訂閱者合敦。
原理
事實上,每個Subject的實現(xiàn)都不可避免的涉及到線程安全的問題验游,而RxJava則是依靠在內部使用原子操作類(AtomicXXX系列)保證線程安全充岛,本文不深入討論原子操作類相關知識,有興趣的朋友可以參考:
深入分析Java中的原子操作 @碼農一枚
ReplaySubject類時Subject相關類中代碼最多的一個類耕蝉,但這并不意味著這個類是最難理解的崔梗,相反,它的原理很直白:通過一個List動態(tài)存儲所有接收到的數(shù)據(jù)垒在,當被訂閱時蒜魄,將所有的數(shù)據(jù)都發(fā)送給訂閱者。
是不是覺得很熟悉场躯?是的权悟,其根本就是一個動態(tài)的鏈表,甚至其創(chuàng)建時的基礎容量也是16推盛,并且隨著數(shù)據(jù)的不斷增加峦阁,每次遞增50%,如果想要節(jié)省開支耘成,也可以自己定義初始容量和遞增規(guī)則榔昔。
千余行的代碼并非上述總結的那么簡單,不過作為了解和使用已經(jīng)足夠瘪菌,事實上撒会,這其中絕大部分都是算法和數(shù)據(jù)結構的處理,筆者研究源碼時师妙,也并沒有去深入分析源碼诵肛,只是淺嘗輒止。
二默穴、BehaviorSubject
當Observer訂閱了一個BehaviorSubject怔檩,它一開始就會釋放Observable最近釋放的一個數(shù)據(jù)對象褪秀,當還沒有任何數(shù)據(jù)釋放時,它則是一個默認值薛训。接下來就會釋放Observable釋放的所有數(shù)據(jù)媒吗。如果Observable因異常終止,BehaviorSubject將不會向后續(xù)的Observer釋放數(shù)據(jù)乙埃,但是會向Observer傳遞一個異常通知闸英。
簡單來說,就是釋放訂閱前最后一個數(shù)據(jù)和訂閱后接收到的所有數(shù)據(jù):
// observer will receive all 4 events (including "default").
BehaviorSubject<Object> subject = BehaviorSubject.createDefault("default");
subject.subscribe(observer);
subject.onNext("one");
subject.onNext("two");
subject.onNext("three");
// observer will receive the "one", "two" and "three" events, but not "zero"
BehaviorSubject<Object> subject = BehaviorSubject.create();
subject.onNext("zero");
subject.onNext("one");
subject.subscribe(observer);
subject.onNext("two");
subject.onNext("three");
// observer will receive only onComplete
BehaviorSubject<Object> subject = BehaviorSubject.create();
subject.onNext("zero");
subject.onNext("one");
subject.onComplete();
subject.subscribe(observer);
// observer will receive only onError
BehaviorSubject<Object> subject = BehaviorSubject.create();
subject.onNext("zero");
subject.onNext("one");
subject.onError(new RuntimeException("error"));
subject.subscribe(observer);
所有case都通過上述代碼進行了分析介袜,我們看下圖:
原理
我們簡單看一下BehaviorSubject內部的幾個成員:
public final class BehaviorSubject<T> extends Subject<T> {
private static final Object[] EMPTY_ARRAY = new Object[0];
//原子操作類甫何,當前接收到的最后一個數(shù)據(jù)
final AtomicReference<Object> value;
//原子操作類,BehaviorDisposable內部存儲了所有接受到的數(shù)據(jù)
final AtomicReference<BehaviorDisposable<T>[]> subscribers;
//標記遇伞,意味著一個空的BehaviorDisposable
static final BehaviorDisposable[] EMPTY = new BehaviorDisposable[0];
//標記辙喂,意味著已經(jīng)達到了TERMINATED,終止數(shù)據(jù)的發(fā)射
static final BehaviorDisposable[] TERMINATED = new BehaviorDisposable[0];
...
}
我們再來看一下BehaviorDisposable:
static final class BehaviorDisposable<T> implements Disposable, NonThrowingPredicate<Object> {
...
//實際上內部有一個List赃额,存儲所有接收到的數(shù)據(jù)
AppendOnlyLinkedArrayList<Object> queue;
...
}
其原理是:
1.當BehaviorSubject.create()實例化時,默認將EMPTY賦值給subscribers叫确,這意味著此時Subject中沒有數(shù)據(jù)跳芳,:
BehaviorSubject() {
//...省略其他代碼
//這意味著Subject中沒有數(shù)據(jù)
this.subscribers = new AtomicReference<BehaviorDisposable<T>[]>(EMPTY);
//當前沒有最終的數(shù)據(jù)
this.value = new AtomicReference<Object>();
}
2.當通過onNext發(fā)射數(shù)據(jù)時,存儲數(shù)據(jù):
@Override
public void onNext(T t) {
//省略判斷代碼
Object o = NotificationLite.next(t);
setCurrent(o); //內部就是給成員value賦值竹勉,記錄最新接收到的數(shù)據(jù)
for (BehaviorDisposable<T> bs : subscribers.get()) {
bs.emitNext(o, index); //其實就是EMPTY.emitNext()
}
}
3.當訂閱時飞盆,執(zhí)行subscribeActual方法:
@Override
protected void subscribeActual(Observer<? super T> observer) {
BehaviorDisposable<T> bs = new BehaviorDisposable<T>(observer, this);
observer.onSubscribe(bs);
//1.執(zhí)行add(bs)方法
if (add(bs)) {
if (bs.cancelled) {
remove(bs);//2.如果已經(jīng)取消發(fā)射,則不中斷數(shù)據(jù)的傳遞
} else {
bs.emitFirst();//3.執(zhí)行bs.emitFirst方法
}
} else {
//....
}
}
看一下add方法:
boolean add(BehaviorDisposable<T> rs) {
for (;;) {
BehaviorDisposable<T>[] a = subscribers.get();
if (a == TERMINATED) {
return false;
}
int len = a.length;
BehaviorDisposable<T>[] b = new BehaviorDisposable[len + 1];
System.arraycopy(a, 0, b, 0, len);
b[len] = rs;
//原子數(shù)據(jù)的更新操作
if (subscribers.compareAndSet(a, b)) {
return true;
}
}
}
不熟悉原子操作類的同學次乓,我簡單介紹一下上述代碼的思想:
首先獲取subscribers的值(應該獲得一個BehaviorDisposable數(shù)組吓歇,其中有且僅有一個元素EMPTY),然后將該數(shù)組擴容+1,將新創(chuàng)建的BehaviorDisposable對象放入數(shù)組index=1的位置上票腰,并將該數(shù)組賦值給subscriber城看。
可以看到,RxJava中的源碼并沒有使用Java Util包中的Collections相關工具類進行數(shù)據(jù)的操作杏慰,而是直接使用System.arraycopy(a, 0, b, 0, len)這個Native底層方法進行數(shù)組的處理测柠,減少了額外的開支。
至于BehaviorDisposable的emitFirst方法缘滥,我們不難想象轰胁,應該是處理數(shù)據(jù)的發(fā)射相關邏輯,實際上它會發(fā)射BehaviorSubject的value朝扼,這也就是為什么當訂閱后赃阀,observer會先接收到訂閱前的最后一個數(shù)據(jù)的原因。
這之后擎颖,每當onNext()從上游接收到一個數(shù)據(jù)榛斯,都會subscribers數(shù)組里對每一個BehaviorDisposable中的observer向下發(fā)射數(shù)據(jù)观游。
BehaviorSubject小結
其原理就是通過subscribers這個核心的成員,它是一個不斷變化的數(shù)組肖抱。在創(chuàng)建時备典,其內部只是一個EMPTY(BehaviorDisposable)對象,每次被訂閱意述,都會在既有的數(shù)組上新加一個BehaviorDisposable對象提佣,這個對象中包含了一個List,存儲之后會收到的數(shù)據(jù)荤崇。
同時拌屏,BehaviorSubject還有一個value的成員,該成員會隨著數(shù)據(jù)的不斷接收而進行更新术荤,它總是記錄著當前最后一個接收到的數(shù)據(jù)倚喂,當被subscribe時,會執(zhí)行emitFirst()方法瓣戚,發(fā)射當前記錄的數(shù)據(jù)端圈,也就是訂閱前接收到的最后一個數(shù)據(jù)。
三子库、PublishSubject
PublishSubject僅會向Observer釋放在訂閱之后Observable釋放的數(shù)據(jù)舱权。
參考前兩個Subject的子類,這個類很好理解仑嗅,先上代碼:
PublishSubject<Object> subject = PublishSubject.create();
// observer1 will receive all onNext and onComplete events
subject.subscribe(observer1);
subject.onNext("one");
subject.onNext("two");
// observer2 will only receive "three" and onComplete
subject.subscribe(observer2);
subject.onNext("three");
subject.onComplete();
用圖說話:
原理
和BehaviorSubject原理思想如出一轍宴倍, 而PublishSubject更簡單,其內部的PublishDisposable的原子操作類連AtomicReference都不是仓技,而是AtomicBoolean:
public final class PublishSubject<T> extends Subject<T> {
static final PublishDisposable[] TERMINATED = new PublishDisposable[0];
static final PublishDisposable[] EMPTY = new PublishDisposable[0];
final AtomicReference<PublishDisposable<T>[]> subscribers;
}
static final class PublishDisposable<T> extends AtomicBoolean implements Disposable {
}
其原理就是通過subscribers這個核心的成員鸵贬,它是一個不斷變化的數(shù)組。在創(chuàng)建時脖捻,其內部只是一個EMPTY(PublishDisposable)對象阔逼,每次被訂閱,都會在既有的數(shù)組上新加一個PublishDisposable對象地沮,這個對象中包含了一個List颜价,存儲之后會收到的數(shù)據(jù)。
其實往上翻诉濒,你會發(fā)現(xiàn)周伦,這段總結是我復制BehaviorSubject小結的第一段......不是(其實就是)偷懶,而是兩者本身幾乎沒什么區(qū)別未荒,PublishDisposable更簡單专挪。
嘗試放棄RxBus !!!
為什么要著重說PublishSubject這個類,原因很簡單,因為在目前流行的事件總線處理方式RxBus中寨腔,其原理就是使用PublishSubject進行數(shù)據(jù)的分發(fā)速侈。
RxBus的代碼基本如下:
public class RxBus {
private static RxBus rxBus;
private final Subject<Object, Object> _bus = new SerializedSubject<>(PublishSubject.create());
private RxBus() {
}
public static RxBus getInstance() {
if (rxBus == null) {
synchronized (RxBus.class) {
if (rxBus == null) {
rxBus = new RxBus();
}
}
}
return rxBus;
}
public void send(Object o) {
_bus.onNext(o);
}
public Observable<Object> toObserverable() {
return _bus;
}
}
每個人的RxBus類略有不同,可能有的RxBus類會添加類似ofType()相關的filter操作符對數(shù)據(jù)進行篩選迫卢,但其原理基本一模一樣倚搬,都是通過PublishSubject進行數(shù)據(jù)的接收和分發(fā),這樣的使用會有什么后果乾蛤,請參考下文:
放棄RxBus每界,擁抱RxJava(一):為什么避免使用EventBus/RxBus
文中作者已經(jīng)很形象舉出了RxBus的種種缺陷,以及對于事件的傳遞應該如何處理家卖。
在分析完PublishSubject的原理后眨层,我們更應該清楚,RxBus類似一個全局的Observable上荡,不斷地接受新的數(shù)據(jù)趴樱,以及接受訂閱,如果使用不當酪捡,會導致代碼的混亂叁征,我們也應該考慮,在接受的數(shù)據(jù)和訂閱越來越多逛薇,PublishSubject的單例對象會占用越來越多的內存捺疼,如果沒有處理好生命周期相關,則會導致嚴重的內存泄漏金刁!
因此帅涂,已經(jīng)分析完RxBus(實際上也就是PublishSubject )后议薪,我們應當對其使用的方式有一個全新的認知尤蛮,使用,亦或放棄RxBus斯议。
四产捞、AsyncSubject
AsyncSubject僅釋放Observable釋放的最后一個數(shù)據(jù),并且僅在Observable完成之后哼御。然而如果當Observable因為異常而終止坯临,AsyncSubject將不會釋放任何數(shù)據(jù),但是會向Observer傳遞一個異常通知恋昼。
隨著可以接收到數(shù)據(jù)的范圍越來越小看靠,似乎也越來越好理解了,直接看圖:
原理
其原理就更簡單了液肌,這下原子操作了連AtomicBoolean都不是了挟炬,直接就是AtomicInteger:
public final class AsyncSubject<T> extends Subject<T> {
static final AsyncDisposable[] EMPTY = new AsyncDisposable[0];
static final AsyncDisposable[] TERMINATED = new AsyncDisposable[0];
final AtomicReference<AsyncDisposable<T>[]> subscribers;
//記錄最新的數(shù)據(jù)
T value;
}
//繼承DeferredScalarDisposable
static final class AsyncDisposable<T> extends DeferredScalarDisposable<T> {}
//繼承BasicIntQueueDisposable
public class DeferredScalarDisposable<T> extends BasicIntQueueDisposable<T> {}
//繼承AtomicInteger
public abstract class BasicIntQueueDisposable<T>
extends AtomicInteger
implements QueueDisposable<T> {}
每當執(zhí)行onComplete()的時候,都會執(zhí)行complete(Object o)方法,先發(fā)送最后一個數(shù)據(jù),然后執(zhí)行onComplete():
public final void complete(T value) {
//......
Observer<? super T> a = actual;
//先發(fā)送最后一個數(shù)據(jù)
a.onNext(value);
if (get() != DISPOSED) {
//執(zhí)行onComplete()
a.onComplete();
}
}
五谤祖、簡單介紹UnicastSubject和SerializedSubject
UnicastSubject
僅支持訂閱一次的Subject,如果多個訂閱者試圖訂閱這個Subject婿滓,若該subject未terminate,將會受到IllegalStateException 粥喜,若已經(jīng)terminate凸主,那么只會執(zhí)行onError或者onComplete方法。
SerializedSubject
回到剛剛的RxBus類中额湘,我們發(fā)現(xiàn)了SerializedSubject的身影卿吐,實際上,它的作用就是:
將Subject串行化的方法缩挑,所有其他的Observable和Subject方法都是線程安全的但两。
我們也可以通過Subject.toSerialized()方法將Subject對象串行化保證其線程安全:
public abstract class Subject<T> extends Observable<T> implements Observer<T> {
//......
public final Subject<T> toSerialized() {
if (this instanceof SerializedSubject) {
return this;
}
return new SerializedSubject<T>(this);
}
}
我們直接打開這個類,會發(fā)現(xiàn)供置,其實該類中谨湘,大部分代碼都會通過synchronized加鎖,這意味著會有額外的性能消耗芥丧。
這也更進一步說明了紧阔,使用RxBus,會額外增加更多的支出(因為RxBus中PublisSuject本身的單例對象就是調用了toSerialized()方法保證線程安全)续担。