Observable的分類
Observable 有 Cold 和 Hot 之分。
Hot Observable 無論有沒有 Subscriber 訂閱匣缘,事件始終都會發(fā)生猖闪。當 Hot Observable 有多個訂閱者時,Hot Observable 與訂閱者們的關系是一對多的關系肌厨,可以與多個訂閱者共享信息培慌。
然而,Cold Observable 只有 Subscriber 訂閱時柑爸,才開始執(zhí)行發(fā)射數(shù)據(jù)流的代碼吵护。并且 Cold Observable 和 Subscriber 只能是一對一的關系,當有多個不同的訂閱者時表鳍,消息是重新完整發(fā)送的馅而。也就是說對 Cold Observable 而言,有多個Subscriber的時候譬圣,他們各自的事件是獨立的瓮恭。
如果上面的解釋有點枯燥的話,那么下面會更加形象地說明 Cold 和 Hot 的區(qū)別:
Think of a hot Observable as a radio station. All of the listeners that are listening to it at this moment listen to the same song.
A cold Observable is a music CD. Many people can buy it and listen to it independently.
by Nickolay Tsvetinov
Cold Observable
Observable 的 just厘熟、creat屯蹦、range、fromXXX 等操作符都能生成Cold Observable绳姨。
Consumer<Long> subscriber1 = new Consumer<Long>() {
@Override
public void accept(@NonNull Long aLong) throws Exception {
System.out.println("subscriber1: "+aLong);
}
};
Consumer<Long> subscriber2 = new Consumer<Long>() {
@Override
public void accept(@NonNull Long aLong) throws Exception {
System.out.println(" subscriber2: "+aLong);
}
};
Observable<Long> observable = Observable.create(new ObservableOnSubscribe<Long>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Long> e) throws Exception {
Observable.interval(10, TimeUnit.MILLISECONDS,Schedulers.computation())
.take(Integer.MAX_VALUE)
.subscribe(e::onNext);
}
}).observeOn(Schedulers.newThread());
observable.subscribe(subscriber1);
observable.subscribe(subscriber2);
try {
Thread.sleep(100L);
} catch (InterruptedException e) {
e.printStackTrace();
}
執(zhí)行結(jié)果:
subscriber1: 0
subscriber2: 0
subscriber1: 1
subscriber2: 1
subscriber1: 2
subscriber2: 2
subscriber2: 3
subscriber1: 3
subscriber1: 4
subscriber2: 4
subscriber2: 5
subscriber1: 5
subscriber1: 6
subscriber2: 6
subscriber1: 7
subscriber2: 7
subscriber1: 8
subscriber2: 8
subscriber1: 9
subscriber2: 9
可以看出登澜,subscriber1 和 subscriber2 的結(jié)果并不一定是相同的,二者是完全獨立的飘庄。
盡管 Cold Observable 很好脑蠕,但是對于某些事件不確定何時發(fā)生以及不確定 Observable 發(fā)射的元素數(shù)量,那還得使用 Hot Observable竭宰。比如:UI交互的事件空郊、網(wǎng)絡環(huán)境的變化份招、地理位置的變化切揭、服務器推送消息的到達等等。
Cold Observable 如何轉(zhuǎn)換成 Hot Observable锁摔?
1. 使用publish廓旬,生成 ConnectableObservable
使用 publish 操作符,可以讓 Cold Observable 轉(zhuǎn)換成 Hot Observable。它將原先的 Observable 轉(zhuǎn)換成 ConnectableObservable孕豹。
來看看剛才的例子:
Consumer<Long> subscriber1 = new Consumer<Long>() {
@Override
public void accept(@NonNull Long aLong) throws Exception {
System.out.println("subscriber1: "+aLong);
}
};
Consumer<Long> subscriber2 = new Consumer<Long>() {
@Override
public void accept(@NonNull Long aLong) throws Exception {
System.out.println(" subscriber2: "+aLong);
}
};
Consumer<Long> subscriber3 = new Consumer<Long>() {
@Override
public void accept(@NonNull Long aLong) throws Exception {
System.out.println(" subscriber3: "+aLong);
}
};
ConnectableObservable<Long> observable = Observable.create(new ObservableOnSubscribe<Long>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Long> e) throws Exception {
Observable.interval(10, TimeUnit.MILLISECONDS,Schedulers.computation())
.take(Integer.MAX_VALUE)
.subscribe(e::onNext);
}
}).observeOn(Schedulers.newThread()).publish();
observable.connect();
observable.subscribe(subscriber1);
observable.subscribe(subscriber2);
try {
Thread.sleep(20L);
} catch (InterruptedException e) {
e.printStackTrace();
}
observable.subscribe(subscriber3);
try {
Thread.sleep(100L);
} catch (InterruptedException e) {
e.printStackTrace();
}
注意涩盾,生成的 ConnectableObservable 需要調(diào)用connect()才能真正執(zhí)行。
執(zhí)行結(jié)果:
subscriber1: 0
subscriber2: 0
subscriber1: 1
subscriber2: 1
subscriber1: 2
subscriber2: 2
subscriber3: 2
subscriber1: 3
subscriber2: 3
subscriber3: 3
subscriber1: 4
subscriber2: 4
subscriber3: 4
subscriber1: 5
subscriber2: 5
subscriber3: 5
subscriber1: 6
subscriber2: 6
subscriber3: 6
subscriber1: 7
subscriber2: 7
subscriber3: 7
subscriber1: 8
subscriber2: 8
subscriber3: 8
subscriber1: 9
subscriber2: 9
subscriber3: 9
subscriber1: 10
subscriber2: 10
subscriber3: 10
subscriber1: 11
subscriber2: 11
subscriber3: 11
可以看到励背,多個訂閱的 Subscriber 共享同一事件春霍。
在這里,ConnectableObservable 是線程安全的叶眉。
2. 使用Subject/Processor
Subject 和 Processor 的作用是相同的址儒。Processor 是 RxJava2.x 新增的類,繼承自 Flowable 支持背壓控制衅疙。而 Subject 則不支持背壓控制莲趣。
Consumer<Long> subscriber1 = new Consumer<Long>() {
@Override
public void accept(@NonNull Long aLong) throws Exception {
System.out.println("subscriber1: "+aLong);
}
};
Consumer<Long> subscriber2 = new Consumer<Long>() {
@Override
public void accept(@NonNull Long aLong) throws Exception {
System.out.println(" subscriber2: "+aLong);
}
};
Consumer<Long> subscriber3 = new Consumer<Long>() {
@Override
public void accept(@NonNull Long aLong) throws Exception {
System.out.println(" subscriber3: "+aLong);
}
};
Observable<Long> observable = Observable.create(new ObservableOnSubscribe<Long>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Long> e) throws Exception {
Observable.interval(10, TimeUnit.MILLISECONDS,Schedulers.computation())
.take(Integer.MAX_VALUE)
.subscribe(e::onNext);
}
}).observeOn(Schedulers.newThread());
PublishSubject<Long> subject = PublishSubject.create();
observable.subscribe(subject);
subject.subscribe(subscriber1);
subject.subscribe(subscriber2);
try {
Thread.sleep(20L);
} catch (InterruptedException e) {
e.printStackTrace();
}
subject.subscribe(subscriber3);
try {
Thread.sleep(100L);
} catch (InterruptedException e) {
e.printStackTrace();
}
執(zhí)行結(jié)果跟上面使用 publish 操作符是一樣的。
Subject 既是 Observable 又是 Observer(Subscriber)饱溢。這一點可以從 Subject 的源碼上看到喧伞。
import io.reactivex.*;
import io.reactivex.annotations.*;
/**
* Represents an Observer and an Observable at the same time, allowing
* multicasting events from a single source to multiple child Subscribers.
* <p>All methods except the onSubscribe, onNext, onError and onComplete are thread-safe.
* Use {@link #toSerialized()} to make these methods thread-safe as well.
*
* @param <T> the item value type
*/
public abstract class Subject<T> extends Observable<T> implements Observer<T> {
/**
* Returns true if the subject has any Observers.
* <p>The method is thread-safe.
* @return true if the subject has any Observers
*/
public abstract boolean hasObservers();
/**
* Returns true if the subject has reached a terminal state through an error event.
* <p>The method is thread-safe.
* @return true if the subject has reached a terminal state through an error event
* @see #getThrowable()
* &see {@link #hasComplete()}
*/
public abstract boolean hasThrowable();
/**
* Returns true if the subject has reached a terminal state through a complete event.
* <p>The method is thread-safe.
* @return true if the subject has reached a terminal state through a complete event
* @see #hasThrowable()
*/
public abstract boolean hasComplete();
/**
* Returns the error that caused the Subject to terminate or null if the Subject
* hasn't terminated yet.
* <p>The method is thread-safe.
* @return the error that caused the Subject to terminate or null if the Subject
* hasn't terminated yet
*/
@Nullable
public abstract Throwable getThrowable();
/**
* Wraps this Subject and serializes the calls to the onSubscribe, onNext, onError and
* onComplete methods, making them thread-safe.
* <p>The method is thread-safe.
* @return the wrapped and serialized subject
*/
@NonNull
public final Subject<T> toSerialized() {
if (this instanceof SerializedSubject) {
return this;
}
return new SerializedSubject<T>(this);
}
}
當 Subject 作為 Subscriber 時,它可以訂閱目標 Cold Observable 使對方開始發(fā)送事件绩郎。同時它又作為Observable 轉(zhuǎn)發(fā)或者發(fā)送新的事件潘鲫,讓 Cold Observable 借助 Subject 轉(zhuǎn)換為 Hot Observable。
注意嗽上,Subject 并不是線程安全的次舌,如果想要其線程安全需要調(diào)用toSerialized()
方法。(在RxJava1.x的時代還可以用 SerializedSubject 代替 Subject兽愤,但是在RxJava2.x以后SerializedSubject不再是一個public class)
然而彼念,很多基于 EventBus 改造的 RxBus 并沒有這么做,包括我以前也寫過這樣的 RxBus :( 浅萧。這樣的做法是非常危險的逐沙,因為會遇到并發(fā)的情況。
Hot Observable 如何轉(zhuǎn)換成 Cold Observable洼畅?
1. ConnectableObservable的refCount操作符
reactivex官網(wǎng)的解釋是
make a Connectable Observable behave like an ordinary Observable
RefCount操作符把從一個可連接的 Observable 連接和斷開的過程自動化了吩案。它操作一個可連接的Observable,返回一個普通的Observable帝簇。當?shù)谝粋€訂閱者訂閱這個Observable時徘郭,RefCount連接到下層的可連接Observable。RefCount跟蹤有多少個觀察者訂閱它丧肴,直到最后一個觀察者完成才斷開與下層可連接Observable的連接残揉。
如果所有的訂閱者都取消訂閱了,則數(shù)據(jù)流停止芋浮。如果重新訂閱則重新開始數(shù)據(jù)流抱环。
Consumer<Long> subscriber1 = new Consumer<Long>() {
@Override
public void accept(@NonNull Long aLong) throws Exception {
System.out.println("subscriber1: "+aLong);
}
};
Consumer<Long> subscriber2 = new Consumer<Long>() {
@Override
public void accept(@NonNull Long aLong) throws Exception {
System.out.println(" subscriber2: "+aLong);
}
};
ConnectableObservable<Long> connectableObservable = Observable.create(new ObservableOnSubscribe<Long>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Long> e) throws Exception {
Observable.interval(10, TimeUnit.MILLISECONDS,Schedulers.computation())
.take(Integer.MAX_VALUE)
.subscribe(e::onNext);
}
}).observeOn(Schedulers.newThread()).publish();
connectableObservable.connect();
Observable<Long> observable = connectableObservable.refCount();
Disposable disposable1 = observable.subscribe(subscriber1);
Disposable disposable2 = observable.subscribe(subscriber2);
try {
Thread.sleep(20L);
} catch (InterruptedException e) {
e.printStackTrace();
}
disposable1.dispose();
disposable2.dispose();
System.out.println("重新開始數(shù)據(jù)流");
disposable1 = observable.subscribe(subscriber1);
disposable2 = observable.subscribe(subscriber2);
try {
Thread.sleep(20L);
} catch (InterruptedException e) {
e.printStackTrace();
}
執(zhí)行結(jié)果:
subscriber1: 0
subscriber2: 0
subscriber1: 1
subscriber2: 1
重新開始數(shù)據(jù)流
subscriber1: 0
subscriber2: 0
subscriber1: 1
subscriber2: 1
如果不是所有的訂閱者都取消了訂閱,只取消了部分。部分的訂閱者重新開始訂閱镇草,則不會從頭開始數(shù)據(jù)流眶痰。
Consumer<Long> subscriber1 = new Consumer<Long>() {
@Override
public void accept(@NonNull Long aLong) throws Exception {
System.out.println("subscriber1: "+aLong);
}
};
Consumer<Long> subscriber2 = new Consumer<Long>() {
@Override
public void accept(@NonNull Long aLong) throws Exception {
System.out.println(" subscriber2: "+aLong);
}
};
Consumer<Long> subscriber3 = new Consumer<Long>() {
@Override
public void accept(@NonNull Long aLong) throws Exception {
System.out.println(" subscriber3: "+aLong);
}
};
ConnectableObservable<Long> connectableObservable = Observable.create(new ObservableOnSubscribe<Long>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Long> e) throws Exception {
Observable.interval(10, TimeUnit.MILLISECONDS,Schedulers.computation())
.take(Integer.MAX_VALUE)
.subscribe(e::onNext);
}
}).observeOn(Schedulers.newThread()).publish();
connectableObservable.connect();
Observable<Long> observable = connectableObservable.refCount();
Disposable disposable1 = observable.subscribe(subscriber1);
Disposable disposable2 = observable.subscribe(subscriber2);
observable.subscribe(subscriber3);
try {
Thread.sleep(20L);
} catch (InterruptedException e) {
e.printStackTrace();
}
disposable1.dispose();
disposable2.dispose();
System.out.println("subscriber1、subscriber2 重新訂閱");
disposable1 = observable.subscribe(subscriber1);
disposable2 = observable.subscribe(subscriber2);
try {
Thread.sleep(20L);
} catch (InterruptedException e) {
e.printStackTrace();
}
執(zhí)行結(jié)果:
subscriber1: 0
subscriber2: 0
subscriber3: 0
subscriber1: 1
subscriber2: 1
subscriber3: 1
subscriber1梯啤、subscriber2 重新訂閱
subscriber3: 2
subscriber1: 2
subscriber2: 2
subscriber3: 3
subscriber1: 3
subscriber2: 3
subscriber3: 4
subscriber1: 4
subscriber2: 4
在這里竖伯,subscriber1和subscriber2先取消了訂閱,subscriber3并沒有取消訂閱因宇。之后黔夭,subscriber1和subscriber2又重新訂閱。最終subscriber1羽嫡、subscriber2本姥、subscriber3的值保持一致。
2. Observable的share操作符
share操作符封裝了publish().refCount()調(diào)用杭棵,可以看其源碼婚惫。
/**
* Returns a new {@link ObservableSource} that multicasts (shares) the original {@link ObservableSource}. As long as
* there is at least one {@link Observer} this {@link ObservableSource} will be subscribed and emitting data.
* When all subscribers have disposed it will dispose the source {@link ObservableSource}.
* <p>
* This is an alias for {@link #publish()}.{@link ConnectableObservable#refCount()}.
* <p>
* ![](http://upload-images.jianshu.io/upload_images/2613397-81dcef165b69aca2.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code share} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @return an {@code ObservableSource} that upon connection causes the source {@code ObservableSource} to emit items
* to its {@link Observer}s
* @see <a >ReactiveX operators documentation: RefCount</a>
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Observable<T> share() {
return publish().refCount();
}
總結(jié)
理解了 Hot Observable 和 Cold Observable 的區(qū)別才能夠?qū)懗龈肦x代碼。同理魂爪,也能理解Hot & Cold Flowable先舷。再者,在其他語言的Rx版本中包括 RxSwift滓侍、RxJS 等也存在 Hot Observable 和 Cold Observable 這樣的概念蒋川。