Subject 是一種特殊的存在
在前面一篇文章Cold Observable 和 Hot Observable中擅耽,曾經(jīng)介紹過 Subject 既是 Observable 又是 Observer(Subscriber)次企。官網(wǎng)稱 Subject 可以看成是一個橋梁或者代理娃磺。
Subject的分類
Subject包含四種類型分別是AsyncSubject仰坦、BehaviorSubject迈勋、ReplaySubject和PublishSubject勉耀。
1. AsyncSubject
Observer會接收AsyncSubject的onComplete()之前的最后一個數(shù)據(jù)盼铁。
AsyncSubject<String> subject = AsyncSubject.create();
subject.onNext("asyncSubject1");
subject.onNext("asyncSubject2");
subject.onComplete();
subject.subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
System.out.println("asyncSubject:"+s);
}
}, new Consumer<Throwable>() {
@Override
public void accept(@NonNull Throwable throwable) throws Exception {
System.out.println("asyncSubject onError"); //不輸出(異常才會輸出)
}
}, new Action() {
@Override
public void run() throws Exception {
System.out.println("asyncSubject:complete"); //輸出 asyncSubject onComplete
}
});
subject.onNext("asyncSubject3");
subject.onNext("asyncSubject4");
執(zhí)行結(jié)果:
asyncSubject:asyncSubject2
asyncSubject:complete
改一下代碼粗蔚,將subject.onComplete()放在最后。
AsyncSubject<String> subject = AsyncSubject.create();
subject.onNext("asyncSubject1");
subject.onNext("asyncSubject2");
subject.subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
System.out.println("asyncSubject:"+s);
}
}, new Consumer<Throwable>() {
@Override
public void accept(@NonNull Throwable throwable) throws Exception {
System.out.println("asyncSubject onError"); //不輸出(異常才會輸出)
}
}, new Action() {
@Override
public void run() throws Exception {
System.out.println("asyncSubject:complete"); //輸出 asyncSubject onComplete
}
});
subject.onNext("asyncSubject3");
subject.onNext("asyncSubject4");
subject.onComplete();
執(zhí)行結(jié)果:
asyncSubject:asyncSubject4
asyncSubject:complete
注意饶火,subject.onComplete()必須要調(diào)用才會開始發(fā)送數(shù)據(jù)鹏控,否則Subscriber將不接收任何數(shù)據(jù)。
2. BehaviorSubject
Observer會接收到BehaviorSubject被訂閱之前的最后一個數(shù)據(jù)肤寝,再接收訂閱之后發(fā)射過來的數(shù)據(jù)当辐。如果BehaviorSubject被訂閱之前沒有發(fā)送任何數(shù)據(jù),則會發(fā)送一個默認數(shù)據(jù)鲤看。
BehaviorSubject<String> subject = BehaviorSubject.createDefault("behaviorSubject1");
subject.subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
System.out.println("behaviorSubject:"+s);
}
}, new Consumer<Throwable>() {
@Override
public void accept(@NonNull Throwable throwable) throws Exception {
System.out.println("behaviorSubject onError"); //不輸出(異常才會輸出)
}
}, new Action() {
@Override
public void run() throws Exception {
System.out.println("behaviorSubject:complete"); //輸出 behaviorSubject onComplete
}
});
subject.onNext("behaviorSubject2");
subject.onNext("behaviorSubject3");
執(zhí)行結(jié)果:
behaviorSubject:behaviorSubject1
behaviorSubject:behaviorSubject2
behaviorSubject:behaviorSubject3
在這里缘揪,behaviorSubject1是默認值。因為執(zhí)行了
BehaviorSubject<String> subject = BehaviorSubject.createDefault("behaviorSubject1");
稍微改一下代碼义桂,在subscribe()之前找筝,再發(fā)射一個事件。
BehaviorSubject<String> subject = BehaviorSubject.createDefault("behaviorSubject1");
subject.onNext("behaviorSubject2");
subject.subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
System.out.println("behaviorSubject:"+s); //輸出asyncSubject:asyncSubject3
}
}, new Consumer<Throwable>() {
@Override
public void accept(@NonNull Throwable throwable) throws Exception {
System.out.println("behaviorSubject onError"); //不輸出(異常才會輸出)
}
}, new Action() {
@Override
public void run() throws Exception {
System.out.println("behaviorSubject:complete"); //輸出 behaviorSubject onComplete
}
});
subject.onNext("behaviorSubject3");
subject.onNext("behaviorSubject4");
執(zhí)行結(jié)果:
behaviorSubject:behaviorSubject2
behaviorSubject:behaviorSubject3
behaviorSubject:behaviorSubject4
這次丟棄了默認值慷吊,而發(fā)射behaviorSubject2袖裕。
因為BehaviorSubject 每次只會發(fā)射調(diào)用subscribe()方法之前的最后一個事件和調(diào)用subscribe()方法之后的事件。
BehaviorSubject還可以緩存最近一次發(fā)出信息的數(shù)據(jù)溉瓶。
3. ReplaySubject
ReplaySubject會發(fā)射所有來自原始Observable的數(shù)據(jù)給觀察者急鳄,無論它們是何時訂閱的。
ReplaySubject<String> subject = ReplaySubject.create();
subject.onNext("replaySubject1");
subject.onNext("replaySubject2");
subject.subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
System.out.println("replaySubject:"+s);
}
}, new Consumer<Throwable>() {
@Override
public void accept(@NonNull Throwable throwable) throws Exception {
System.out.println("replaySubject onError"); //不輸出(異常才會輸出)
}
}, new Action() {
@Override
public void run() throws Exception {
System.out.println("replaySubject:complete"); //輸出 replaySubject onComplete
}
});
subject.onNext("replaySubject3");
subject.onNext("replaySubject4");
執(zhí)行結(jié)果:
replaySubject:replaySubject1
replaySubject:replaySubject2
replaySubject:replaySubject3
replaySubject:replaySubject4
稍微改一下代碼嚷闭,將create()改成createWithSize(1)只緩存訂閱前最后發(fā)送的1條數(shù)據(jù)
ReplaySubject<String> subject = ReplaySubject.createWithSize(1);
subject.onNext("replaySubject1");
subject.onNext("replaySubject2");
subject.subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
System.out.println("replaySubject:"+s);
}
}, new Consumer<Throwable>() {
@Override
public void accept(@NonNull Throwable throwable) throws Exception {
System.out.println("replaySubject onError"); //不輸出(異常才會輸出)
}
}, new Action() {
@Override
public void run() throws Exception {
System.out.println("replaySubject:complete"); //輸出 replaySubject onComplete
}
});
subject.onNext("replaySubject3");
subject.onNext("replaySubject4");
執(zhí)行結(jié)果:
replaySubject:replaySubject2
replaySubject:replaySubject3
replaySubject:replaySubject4
這個執(zhí)行結(jié)果跟BehaviorSubject是一樣的攒岛。但是從并發(fā)的角度來看,ReplaySubject 在處理并發(fā) subscribe() 和 onNext() 時會更加復(fù)雜胞锰。
ReplaySubject除了可以限制緩存數(shù)據(jù)的數(shù)量和還能限制緩存的時間灾锯。使用createWithTime()即可。
4. PublishSubject
Observer只接收PublishSubject被訂閱之后發(fā)送的數(shù)據(jù)嗅榕。
PublishSubject<String> subject = PublishSubject.create();
subject.onNext("publicSubject1");
subject.onNext("publicSubject2");
subject.onComplete();
subject.subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
System.out.println("publicSubject:"+s);
}
}, new Consumer<Throwable>() {
@Override
public void accept(@NonNull Throwable throwable) throws Exception {
System.out.println("publicSubject onError"); //不輸出(異常才會輸出)
}
}, new Action() {
@Override
public void run() throws Exception {
System.out.println("publicSubject:complete"); //輸出 publicSubject onComplete
}
});
subject.onNext("publicSubject3");
subject.onNext("publicSubject4");
執(zhí)行結(jié)果:
publicSubject:complete
因為subject在訂閱之前顺饮,已經(jīng)執(zhí)行了onComplete()方法吵聪,所以無法發(fā)射數(shù)據(jù)。稍微改一下代碼兼雄,將onComplete()方法放在最后吟逝。
PublishSubject<String> subject = PublishSubject.create();
subject.onNext("publicSubject1");
subject.onNext("publicSubject2");
subject.subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
System.out.println("publicSubject:"+s);
}
}, new Consumer<Throwable>() {
@Override
public void accept(@NonNull Throwable throwable) throws Exception {
System.out.println("publicSubject onError"); //不輸出(異常才會輸出)
}
}, new Action() {
@Override
public void run() throws Exception {
System.out.println("publicSubject:complete"); //輸出 publicSubject onComplete
}
});
subject.onNext("publicSubject3");
subject.onNext("publicSubject4");
subject.onComplete();
執(zhí)行結(jié)果:
publicSubject:publicSubject3
publicSubject:publicSubject4
publicSubject:complete
最后,一句話總結(jié)一下四個Subject的特性赦肋。
Subject | 發(fā)射行為 |
---|---|
AsyncSubject | 不論訂閱發(fā)生在什么時候,只會發(fā)射最后一個數(shù)據(jù) |
BehaviorSubject | 發(fā)送訂閱之前一個數(shù)據(jù)和訂閱之后的全部數(shù)據(jù) |
ReplaySubject | 不論訂閱發(fā)生在什么時候佃乘,都發(fā)射全部數(shù)據(jù) |
PublishSubject | 發(fā)送訂閱之后全部數(shù)據(jù) |
可能錯過的事件
Subject 作為一個Observable時,可以不停地調(diào)用onNext()來發(fā)送事件趣避,直到遇到onComplete()才會結(jié)束。
PublishSubject<String> subject = PublishSubject.create();
subject.subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
System.out.println(s);
}
}, new Consumer<Throwable>() {
@Override
public void accept(@NonNull Throwable throwable) throws Exception {
}
},new Action() {
@Override
public void run() throws Exception {
System.out.println("completed");
}
});
subject.onNext("Foo");
subject.onNext("Bar");
subject.onComplete();
執(zhí)行的結(jié)果:
Foo
Bar
completed
如果程帕,使用 subsribeOn 操作符將 subject 切換到IO線程,再使用 Thread.sleep(2000) 讓主線程休眠2秒愁拭。
PublishSubject<String> subject = PublishSubject.create();
subject.subscribeOn(Schedulers.io())
.subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
System.out.println(s);
}
}, new Consumer<Throwable>() {
@Override
public void accept(@NonNull Throwable throwable) throws Exception {
}
},new Action() {
@Override
public void run() throws Exception {
System.out.println("completed");
}
});
subject.onNext("Foo");
subject.onNext("Bar");
subject.onComplete();
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
這時,其執(zhí)行的結(jié)果變?yōu)椋?/p>
completed
為何會缺少打印Foo和Bar妆绞?
因為,subject 發(fā)射元素的線程被指派到了 IO 線程括饶,此時 IO 線程正在初始化還沒起來,subject 發(fā)射前這兩個元素Foo来涨、Bar還在主線程中图焰,主線程的這兩個元素往 IO 線程轉(zhuǎn)發(fā)的過程中由于 IO 線程還沒有起來,所以就被丟棄了蹦掐。此時技羔,無論Thread睡了多少秒,F(xiàn)oo卧抗、Bar都不會被打印出來藤滥。
其實,解決辦法也很簡單社裆,將subject改成使用Observable.create()來替代拙绊,它允許為每個訂閱者精確控制事件的發(fā)送,這樣就不會缺少打印Foo和Bar。
使用PublishSubject來實現(xiàn)簡化的RxBus
下面的代碼是一個簡化版本的Event Bus标沪,在這里使用了PublishSubject榄攀。因為事件總線是基于發(fā)布/訂閱模式
實現(xiàn)的,如果某一事件在多個Activity/Fragment中被訂閱的話金句,在App的任意地方一旦發(fā)布該事件檩赢,則多個訂閱的地方都能夠同時收到這一事件(在這里,訂閱事件的Activity/Fragment不能被destory违寞,一旦被destory就不能收到事件)贞瞒,這很符合Hot Observable的特性。所以趁曼,我們使用PublishSubject憔狞,考慮到多線程的情況,還需要使用 Subject 的 toSerialized() 方法彰阴。
import io.reactivex.Observable;
import io.reactivex.subjects.PublishSubject;
import io.reactivex.subjects.Subject;
public class RxBus {
private final Subject<Object> mBus;
private RxBus() {
mBus = PublishSubject.create().toSerialized();
}
public static RxBus get() {
return Holder.BUS;
}
public void post(Object obj) {
mBus.onNext(obj);
}
public <T> Observable<T> toObservable(Class<T> tClass) {
return mBus.ofType(tClass);
}
public Observable<Object> toObservable() {
return mBus;
}
public boolean hasObservers() {
return mBus.hasObservers();
}
private static class Holder {
private static final RxBus BUS = new RxBus();
}
}
在這里Subject的toSerialized(),使用SerializedSubject包裝了原先的Subject拍冠。
/**
* Wraps this Subject and serializes the calls to the onSubscribe, onNext, onError and
* onComplete methods, making them thread-safe.
* <p>The method is thread-safe.
* @return the wrapped and serialized subject
*/
@NonNull
public final Subject<T> toSerialized() {
if (this instanceof SerializedSubject) {
return this;
}
return new SerializedSubject<T>(this);
}
這個版本的Event Bus比較簡單尿这,并沒有考慮到背壓的情況,因為在 RxJava2.x 中 Subject 已經(jīng)不再支持背壓了庆杜。如果要增加背壓的處理射众,可以使用Processor,我們需要將 PublishSubject 改成 PublishProcessor晃财,對應(yīng)的 Observable 也需要改成 Flowable叨橱。
使用BehaviorSubject來實現(xiàn)預(yù)加載
預(yù)加載可以很好的提高程序的用戶體驗。
每當用戶處于弱網(wǎng)絡(luò)時断盛,打開一個App可能出現(xiàn)一片空白或者一直在loading罗洗,那用戶一定會很煩躁伙菜。此時贩绕,如果能夠預(yù)先加載一些數(shù)據(jù)淑倾,例如上一次打開App時保存的數(shù)據(jù)征椒,這樣不至于會損傷App的用戶體驗。
下面是借助 BehaviorSubject 的特性來實現(xiàn)一個簡單的預(yù)加載類RxPreLoader脱茉。
import io.reactivex.disposables.Disposable;
import io.reactivex.functions.Consumer;
import io.reactivex.subjects.BehaviorSubject;
/**
* Created by Tony Shen on 2017/6/2.
*/
public class RxPreLoader<T> {
//能夠緩存訂閱之前的最新數(shù)據(jù)
private BehaviorSubject<T> mData;
private Disposable disposable;
public RxPreLoader(T defaultValue) {
mData = BehaviorSubject.createDefault(defaultValue);
}
/**
* 發(fā)送事件
* @param object
*/
public void publish(T object) {
mData.onNext(object);
}
/**
* 訂閱事件
* @param onNext
* @return
*/
public Disposable subscribe(Consumer onNext) {
disposable = mData.subscribe(onNext);
return disposable;
}
/**
* 反訂閱
*
*/
public void dispose() {
if (disposable != null && !disposable.isDisposed()) {
disposable.dispose();
disposable = null;
}
}
/**
* 獲取緩存數(shù)據(jù)的Subject
*
* @return
*/
public BehaviorSubject<T> getCacheDataSubject() {
return mData;
}
/**
* 直接獲取最近的一個數(shù)據(jù)
*
* @return
*/
public T getLastCacheData() {
return mData.getValue();
}
}
可以考慮在基類的Activity/Fragment中也實現(xiàn)一個類似的RxPreLoader。
總結(jié)
RxJava 的 Subject 是一種特殊的存在溉躲,它的靈活性在使用時也會伴隨著風(fēng)險锻梳,沒有用好它的話會錯過事件,并且使用時還要小心 Subject 不是線程安全的辩块。當然很多開源框架都在使用Subject废亭,例如大名鼎鼎的RxLifecycle使用了BehaviorSubject豆村。