前面說過了RXJava中的Observable磅网,本次來說說Subject。
附上前文的鏈接嵌施。喜歡的兄弟麻煩點(diǎn)個(gè)喜歡违帆,關(guān)注啥的吧剃根。
RXJava Observable詳解 (RXJava Part 1)
Subject
Subject有兩種用途:
- 做為observable向其他的observable發(fā)送事件
- 做為observer接收其他的observable發(fā)送的事件。
最后的一個(gè)例子會(huì)使用一個(gè)subject監(jiān)聽一個(gè)observable前方,并將observable發(fā)射的事件轉(zhuǎn)發(fā)給一個(gè)observer狈醉。
Subject做為Observable
- PublishSubject
該Subject不會(huì)改變事件的發(fā)送順序。
如果在已經(jīng)發(fā)送了一部分事件之后注冊(cè)的observer惠险,
是不會(huì)收到之前發(fā)送的事件苗傅。
重點(diǎn)跟下面三個(gè)類作對(duì)比
private void doPublishSubject() {
//將事件發(fā)送到observer,如果先前已經(jīng)漏掉的事件班巩,不會(huì)重新發(fā)送到后注冊(cè)的observer上
PublishSubject<String> publish = PublishSubject.create();
publish.subscribe(new PublishObserver<String>("first"));
publish.onNext("1");
publish.onNext("2");
publish.subscribe(new PublishObserver<String>("seconde"));
publish.onNext("3");
publish.onCompleted();
}
class SubjectObserver<T extends String> implements Observer<String> {
String name;
public SubjectObserver(String name) {
this.name = name;
}
@Override
public void onCompleted() {
LogUtils.d("publishObserver %s is completed", name);
}
@Override
public void onError(Throwable e) {
LogUtils.d("publishObserver %s error msg %s", name, e.getStackTrace());
}
@Override
public void onNext(java.lang.String s) {
LogUtils.d("publishObserver %s receive msg %s", name, s);
}
}
- BehaviorSubject
該類有創(chuàng)建時(shí)需要一個(gè)默認(rèn)參數(shù)渣慕,該默認(rèn)參數(shù)會(huì)在subject未發(fā)送過其他的事件時(shí),向注冊(cè)的observer發(fā)送抱慌。
注意看代碼注釋逊桦。
private void doBehaviorSubject() {
//將事件發(fā)送到observer,如果先前已經(jīng)漏掉的事件抑进,除了最近的一個(gè)事件以外强经,
//其他相關(guān)事件不會(huì)重新發(fā)送到后注冊(cè)的observer上。所以需要帶默認(rèn)值寺渗,
//第一次被observer注冊(cè)時(shí)匿情,observable中沒有內(nèi)容的時(shí)候,就會(huì)將默認(rèn)值發(fā)給observer
BehaviorSubject<String> behavior = BehaviorSubject.create("創(chuàng)建beahavior時(shí)候帶的消息");
behavior.subscribe(new SubjectObserver<String>("first"));
behavior.onNext("1");
behavior.onNext("2");
behavior.subscribe(new SubjectObserver<String>("seconde"));
behavior.onNext("3");
behavior.onCompleted();
}
- ReplaySubject
將事件發(fā)送到observer信殊,無論什么時(shí)候注冊(cè)observer炬称,
無論何時(shí)通過該observable發(fā)射的所有事件,均會(huì)發(fā)送給新的observer涡拘。
private void doReplaySubject() {
//將事件發(fā)送到observer玲躯,無論什么時(shí)候注冊(cè)observer,
//無論何時(shí)通過該observable發(fā)射的所有事件,均會(huì)發(fā)送給新的observer跷车。
ReplaySubject<String> replay = ReplaySubject.create();
replay.subscribe(new SubjectObserver<String>("first"));
replay.onNext("1");
replay.onNext("2");
replay.subscribe(new SubjectObserver<String>("seconde"));
replay.onNext("3");
replay.onCompleted();
}
- AsyncSubject
只有當(dāng)subject調(diào)用onComplete方法時(shí)晋控,才會(huì)將subject中的最后一個(gè)事件傳遞給observer。
如果不調(diào)用onComplete方法姓赤,則不會(huì)給observer發(fā)送任何事件。
private void doAsyncSubject() {
//只會(huì)有當(dāng)subject調(diào)用onComplete方法時(shí)仲吏,才會(huì)將subject中的最后一個(gè)事件傳遞給observer不铆。
//如果不調(diào)用onComplete方法,則不會(huì)向observer中發(fā)送任何事件
AsyncSubject async = AsyncSubject.create();
async.subscribe(new SubjectObserver<String>("first"));
async.onNext("1");
async.onNext("2");
async.onNext("3");
async.onCompleted();
async.subscribe(new SubjectObserver<String>("seconde"));
async.onCompleted();
}
Subject做為observable
文章開頭說了裹唆,subject及可以做observer也可以做observable誓斥。
示例會(huì)將subject注冊(cè)為一個(gè)observer來接收創(chuàng)建的observable中的事件。
并將observable中的事件再發(fā)送給在該Subject中注冊(cè)的observer中许帐。
private void doObserverSubject() {
//將Subject當(dāng)作Observer使用劳坑,并將另外的observer注冊(cè)到subject上,來監(jiān)聽原始的observable發(fā)出的事件
List<String> items = new ArrayList<>();
items.add("100");
items.add("103");
items.add("107");
Observable<String> observable = Observable.from(items);
ReplaySubject<String> replay = ReplaySubject.create();
observable.subscribe(replay);
replay.subscribe(new SubjectObserver<String>("first"));
}