Cold Observable 和 Hot Observable

Observable的分類

Observable 有 Cold 和 Hot 之分。

hot&cold observable.jpg

Hot Observable 無論有沒有 Subscriber 訂閱匣缘,事件始終都會發(fā)生猖闪。當 Hot Observable 有多個訂閱者時,Hot Observable 與訂閱者們的關系是一對多的關系肌厨,可以與多個訂閱者共享信息培慌。

然而,Cold Observable 只有 Subscriber 訂閱時柑爸,才開始執(zhí)行發(fā)射數(shù)據(jù)流的代碼吵护。并且 Cold Observable 和 Subscriber 只能是一對一的關系,當有多個不同的訂閱者時表鳍,消息是重新完整發(fā)送的馅而。也就是說對 Cold Observable 而言,有多個Subscriber的時候譬圣,他們各自的事件是獨立的瓮恭。

如果上面的解釋有點枯燥的話,那么下面會更加形象地說明 Cold 和 Hot 的區(qū)別:

Think of a hot Observable as a radio station. All of the listeners that are listening to it at this moment listen to the same song.
A cold Observable is a music CD. Many people can buy it and listen to it independently.
by Nickolay Tsvetinov

Cold Observable

Observable 的 just厘熟、creat屯蹦、range、fromXXX 等操作符都能生成Cold Observable绳姨。

        Consumer<Long> subscriber1 = new Consumer<Long>() {
            @Override
            public void accept(@NonNull Long aLong) throws Exception {
                System.out.println("subscriber1: "+aLong);
            }
        };

        Consumer<Long> subscriber2 = new Consumer<Long>() {
            @Override
            public void accept(@NonNull Long aLong) throws Exception {
                System.out.println("   subscriber2: "+aLong);
            }
        };

        Observable<Long> observable = Observable.create(new ObservableOnSubscribe<Long>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<Long> e) throws Exception {
                Observable.interval(10, TimeUnit.MILLISECONDS,Schedulers.computation())
                        .take(Integer.MAX_VALUE)
                        .subscribe(e::onNext);
            }
        }).observeOn(Schedulers.newThread());

        observable.subscribe(subscriber1);
        observable.subscribe(subscriber2);

        try {
            Thread.sleep(100L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

執(zhí)行結(jié)果:

subscriber1: 0
   subscriber2: 0
subscriber1: 1
   subscriber2: 1
subscriber1: 2
   subscriber2: 2
   subscriber2: 3
subscriber1: 3
subscriber1: 4
   subscriber2: 4
   subscriber2: 5
subscriber1: 5
subscriber1: 6
   subscriber2: 6
subscriber1: 7
   subscriber2: 7
subscriber1: 8
   subscriber2: 8
subscriber1: 9
   subscriber2: 9

可以看出登澜,subscriber1 和 subscriber2 的結(jié)果并不一定是相同的,二者是完全獨立的飘庄。

盡管 Cold Observable 很好脑蠕,但是對于某些事件不確定何時發(fā)生以及不確定 Observable 發(fā)射的元素數(shù)量,那還得使用 Hot Observable竭宰。比如:UI交互的事件空郊、網(wǎng)絡環(huán)境的變化份招、地理位置的變化切揭、服務器推送消息的到達等等。

Cold Observable 如何轉(zhuǎn)換成 Hot Observable锁摔?

1. 使用publish廓旬,生成 ConnectableObservable

使用 publish 操作符,可以讓 Cold Observable 轉(zhuǎn)換成 Hot Observable。它將原先的 Observable 轉(zhuǎn)換成 ConnectableObservable孕豹。

來看看剛才的例子:

        Consumer<Long> subscriber1 = new Consumer<Long>() {
            @Override
            public void accept(@NonNull Long aLong) throws Exception {
                System.out.println("subscriber1: "+aLong);
            }
        };

        Consumer<Long> subscriber2 = new Consumer<Long>() {
            @Override
            public void accept(@NonNull Long aLong) throws Exception {
                System.out.println("   subscriber2: "+aLong);
            }
        };

        Consumer<Long> subscriber3 = new Consumer<Long>() {
            @Override
            public void accept(@NonNull Long aLong) throws Exception {
                System.out.println("      subscriber3: "+aLong);
            }
        };

        ConnectableObservable<Long> observable = Observable.create(new ObservableOnSubscribe<Long>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<Long> e) throws Exception {
                Observable.interval(10, TimeUnit.MILLISECONDS,Schedulers.computation())
                        .take(Integer.MAX_VALUE)
                        .subscribe(e::onNext);
            }
        }).observeOn(Schedulers.newThread()).publish();
        observable.connect();

        observable.subscribe(subscriber1);
        observable.subscribe(subscriber2);

        try {
            Thread.sleep(20L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        observable.subscribe(subscriber3);

        try {
            Thread.sleep(100L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

注意涩盾,生成的 ConnectableObservable 需要調(diào)用connect()才能真正執(zhí)行。

執(zhí)行結(jié)果:

subscriber1: 0
   subscriber2: 0
subscriber1: 1
   subscriber2: 1
subscriber1: 2
   subscriber2: 2
      subscriber3: 2
subscriber1: 3
   subscriber2: 3
      subscriber3: 3
subscriber1: 4
   subscriber2: 4
      subscriber3: 4
subscriber1: 5
   subscriber2: 5
      subscriber3: 5
subscriber1: 6
   subscriber2: 6
      subscriber3: 6
subscriber1: 7
   subscriber2: 7
      subscriber3: 7
subscriber1: 8
   subscriber2: 8
      subscriber3: 8
subscriber1: 9
   subscriber2: 9
      subscriber3: 9
subscriber1: 10
   subscriber2: 10
      subscriber3: 10
subscriber1: 11
   subscriber2: 11
      subscriber3: 11

可以看到励背,多個訂閱的 Subscriber 共享同一事件春霍。
在這里,ConnectableObservable 是線程安全的叶眉。

2. 使用Subject/Processor

Subject 和 Processor 的作用是相同的址儒。Processor 是 RxJava2.x 新增的類,繼承自 Flowable 支持背壓控制衅疙。而 Subject 則不支持背壓控制莲趣。

        Consumer<Long> subscriber1 = new Consumer<Long>() {
            @Override
            public void accept(@NonNull Long aLong) throws Exception {
                System.out.println("subscriber1: "+aLong);
            }
        };

        Consumer<Long> subscriber2 = new Consumer<Long>() {
            @Override
            public void accept(@NonNull Long aLong) throws Exception {
                System.out.println("   subscriber2: "+aLong);
            }
        };

        Consumer<Long> subscriber3 = new Consumer<Long>() {
            @Override
            public void accept(@NonNull Long aLong) throws Exception {
                System.out.println("      subscriber3: "+aLong);
            }
        };

        Observable<Long> observable = Observable.create(new ObservableOnSubscribe<Long>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<Long> e) throws Exception {
                Observable.interval(10, TimeUnit.MILLISECONDS,Schedulers.computation())
                        .take(Integer.MAX_VALUE)
                        .subscribe(e::onNext);
            }
        }).observeOn(Schedulers.newThread());

        PublishSubject<Long> subject = PublishSubject.create();
        observable.subscribe(subject);

        subject.subscribe(subscriber1);
        subject.subscribe(subscriber2);

        try {
            Thread.sleep(20L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        subject.subscribe(subscriber3);

        try {
            Thread.sleep(100L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

執(zhí)行結(jié)果跟上面使用 publish 操作符是一樣的。

Subject 既是 Observable 又是 Observer(Subscriber)饱溢。這一點可以從 Subject 的源碼上看到喧伞。

import io.reactivex.*;
import io.reactivex.annotations.*;

/**
 * Represents an Observer and an Observable at the same time, allowing
 * multicasting events from a single source to multiple child Subscribers.
 * <p>All methods except the onSubscribe, onNext, onError and onComplete are thread-safe.
 * Use {@link #toSerialized()} to make these methods thread-safe as well.
 *
 * @param <T> the item value type
 */
public abstract class Subject<T> extends Observable<T> implements Observer<T> {
    /**
     * Returns true if the subject has any Observers.
     * <p>The method is thread-safe.
     * @return true if the subject has any Observers
     */
    public abstract boolean hasObservers();

    /**
     * Returns true if the subject has reached a terminal state through an error event.
     * <p>The method is thread-safe.
     * @return true if the subject has reached a terminal state through an error event
     * @see #getThrowable()
     * &see {@link #hasComplete()}
     */
    public abstract boolean hasThrowable();

    /**
     * Returns true if the subject has reached a terminal state through a complete event.
     * <p>The method is thread-safe.
     * @return true if the subject has reached a terminal state through a complete event
     * @see #hasThrowable()
     */
    public abstract boolean hasComplete();

    /**
     * Returns the error that caused the Subject to terminate or null if the Subject
     * hasn't terminated yet.
     * <p>The method is thread-safe.
     * @return the error that caused the Subject to terminate or null if the Subject
     * hasn't terminated yet
     */
    @Nullable
    public abstract Throwable getThrowable();

    /**
     * Wraps this Subject and serializes the calls to the onSubscribe, onNext, onError and
     * onComplete methods, making them thread-safe.
     * <p>The method is thread-safe.
     * @return the wrapped and serialized subject
     */
    @NonNull
    public final Subject<T> toSerialized() {
        if (this instanceof SerializedSubject) {
            return this;
        }
        return new SerializedSubject<T>(this);
    }
}

當 Subject 作為 Subscriber 時,它可以訂閱目標 Cold Observable 使對方開始發(fā)送事件绩郎。同時它又作為Observable 轉(zhuǎn)發(fā)或者發(fā)送新的事件潘鲫,讓 Cold Observable 借助 Subject 轉(zhuǎn)換為 Hot Observable。

注意嗽上,Subject 并不是線程安全的次舌,如果想要其線程安全需要調(diào)用toSerialized()方法。(在RxJava1.x的時代還可以用 SerializedSubject 代替 Subject兽愤,但是在RxJava2.x以后SerializedSubject不再是一個public class)
然而彼念,很多基于 EventBus 改造的 RxBus 并沒有這么做,包括我以前也寫過這樣的 RxBus :( 浅萧。這樣的做法是非常危險的逐沙,因為會遇到并發(fā)的情況。

Hot Observable 如何轉(zhuǎn)換成 Cold Observable洼畅?

1. ConnectableObservable的refCount操作符

reactivex官網(wǎng)的解釋是

make a Connectable Observable behave like an ordinary Observable

RefCount.png

RefCount操作符把從一個可連接的 Observable 連接和斷開的過程自動化了吩案。它操作一個可連接的Observable,返回一個普通的Observable帝簇。當?shù)谝粋€訂閱者訂閱這個Observable時徘郭,RefCount連接到下層的可連接Observable。RefCount跟蹤有多少個觀察者訂閱它丧肴,直到最后一個觀察者完成才斷開與下層可連接Observable的連接残揉。

如果所有的訂閱者都取消訂閱了,則數(shù)據(jù)流停止芋浮。如果重新訂閱則重新開始數(shù)據(jù)流抱环。

        Consumer<Long> subscriber1 = new Consumer<Long>() {
            @Override
            public void accept(@NonNull Long aLong) throws Exception {
                System.out.println("subscriber1: "+aLong);
            }
        };

        Consumer<Long> subscriber2 = new Consumer<Long>() {
            @Override
            public void accept(@NonNull Long aLong) throws Exception {
                System.out.println("   subscriber2: "+aLong);
            }
        };

        ConnectableObservable<Long> connectableObservable = Observable.create(new ObservableOnSubscribe<Long>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<Long> e) throws Exception {
                Observable.interval(10, TimeUnit.MILLISECONDS,Schedulers.computation())
                        .take(Integer.MAX_VALUE)
                        .subscribe(e::onNext);
            }
        }).observeOn(Schedulers.newThread()).publish();
        connectableObservable.connect();

        Observable<Long> observable = connectableObservable.refCount();

        Disposable disposable1 = observable.subscribe(subscriber1);
        Disposable disposable2 = observable.subscribe(subscriber2);

        try {
            Thread.sleep(20L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        disposable1.dispose();
        disposable2.dispose();

        System.out.println("重新開始數(shù)據(jù)流");

        disposable1 = observable.subscribe(subscriber1);
        disposable2 = observable.subscribe(subscriber2);

        try {
            Thread.sleep(20L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

執(zhí)行結(jié)果:

subscriber1: 0
   subscriber2: 0
subscriber1: 1
   subscriber2: 1
重新開始數(shù)據(jù)流
subscriber1: 0
   subscriber2: 0
subscriber1: 1
   subscriber2: 1

如果不是所有的訂閱者都取消了訂閱,只取消了部分。部分的訂閱者重新開始訂閱镇草,則不會從頭開始數(shù)據(jù)流眶痰。

        Consumer<Long> subscriber1 = new Consumer<Long>() {
            @Override
            public void accept(@NonNull Long aLong) throws Exception {
                System.out.println("subscriber1: "+aLong);
            }
        };

        Consumer<Long> subscriber2 = new Consumer<Long>() {
            @Override
            public void accept(@NonNull Long aLong) throws Exception {
                System.out.println("   subscriber2: "+aLong);
            }
        };

        Consumer<Long> subscriber3 = new Consumer<Long>() {
            @Override
            public void accept(@NonNull Long aLong) throws Exception {
                System.out.println("      subscriber3: "+aLong);
            }
        };

        ConnectableObservable<Long> connectableObservable = Observable.create(new ObservableOnSubscribe<Long>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<Long> e) throws Exception {
                Observable.interval(10, TimeUnit.MILLISECONDS,Schedulers.computation())
                        .take(Integer.MAX_VALUE)
                        .subscribe(e::onNext);
            }
        }).observeOn(Schedulers.newThread()).publish();
        connectableObservable.connect();

        Observable<Long> observable = connectableObservable.refCount();

        Disposable disposable1 = observable.subscribe(subscriber1);
        Disposable disposable2 = observable.subscribe(subscriber2);
        observable.subscribe(subscriber3);

        try {
            Thread.sleep(20L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        disposable1.dispose();
        disposable2.dispose();

        System.out.println("subscriber1、subscriber2 重新訂閱");

        disposable1 = observable.subscribe(subscriber1);
        disposable2 = observable.subscribe(subscriber2);

        try {
            Thread.sleep(20L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

執(zhí)行結(jié)果:

subscriber1: 0
   subscriber2: 0
      subscriber3: 0
subscriber1: 1
   subscriber2: 1
      subscriber3: 1
subscriber1梯啤、subscriber2 重新訂閱
      subscriber3: 2
subscriber1: 2
   subscriber2: 2
      subscriber3: 3
subscriber1: 3
   subscriber2: 3
      subscriber3: 4
subscriber1: 4
   subscriber2: 4

在這里竖伯,subscriber1和subscriber2先取消了訂閱,subscriber3并沒有取消訂閱因宇。之后黔夭,subscriber1和subscriber2又重新訂閱。最終subscriber1羽嫡、subscriber2本姥、subscriber3的值保持一致。

2. Observable的share操作符

share操作符封裝了publish().refCount()調(diào)用杭棵,可以看其源碼婚惫。

    /**
     * Returns a new {@link ObservableSource} that multicasts (shares) the original {@link ObservableSource}. As long as
     * there is at least one {@link Observer} this {@link ObservableSource} will be subscribed and emitting data.
     * When all subscribers have disposed it will dispose the source {@link ObservableSource}.
     * <p>
     * This is an alias for {@link #publish()}.{@link ConnectableObservable#refCount()}.
     * <p>
     * ![](http://upload-images.jianshu.io/upload_images/2613397-81dcef165b69aca2.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
     * <dl>
     *  <dt><b>Scheduler:</b></dt>
     *  <dd>{@code share} does not operate by default on a particular {@link Scheduler}.</dd>
     * </dl>
     *
     * @return an {@code ObservableSource} that upon connection causes the source {@code ObservableSource} to emit items
     *         to its {@link Observer}s
     * @see <a >ReactiveX operators documentation: RefCount</a>
     */
    @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.NONE)
    public final Observable<T> share() {
        return publish().refCount();
    }

總結(jié)

理解了 Hot Observable 和 Cold Observable 的區(qū)別才能夠?qū)懗龈肦x代碼。同理魂爪,也能理解Hot & Cold Flowable先舷。再者,在其他語言的Rx版本中包括 RxSwift滓侍、RxJS 等也存在 Hot Observable 和 Cold Observable 這樣的概念蒋川。

最后編輯于
?著作權歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市撩笆,隨后出現(xiàn)的幾起案子捺球,更是在濱河造成了極大的恐慌,老刑警劉巖夕冲,帶你破解...
    沈念sama閱讀 206,378評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件氮兵,死亡現(xiàn)場離奇詭異,居然都是意外死亡歹鱼,警方通過查閱死者的電腦和手機泣栈,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,356評論 2 382
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來弥姻,“玉大人南片,你說我怎么就攤上這事⊥ザ兀” “怎么了疼进?”我有些...
    開封第一講書人閱讀 152,702評論 0 342
  • 文/不壞的土叔 我叫張陵,是天一觀的道長螺捐。 經(jīng)常有香客問我颠悬,道長,這世上最難降的妖魔是什么定血? 我笑而不...
    開封第一講書人閱讀 55,259評論 1 279
  • 正文 為了忘掉前任赔癌,我火速辦了婚禮,結(jié)果婚禮上澜沟,老公的妹妹穿的比我還像新娘灾票。我一直安慰自己,他們只是感情好茫虽,可當我...
    茶點故事閱讀 64,263評論 5 371
  • 文/花漫 我一把揭開白布刊苍。 她就那樣靜靜地躺著,像睡著了一般濒析。 火紅的嫁衣襯著肌膚如雪正什。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,036評論 1 285
  • 那天号杏,我揣著相機與錄音婴氮,去河邊找鬼。 笑死盾致,一個胖子當著我的面吹牛主经,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播庭惜,決...
    沈念sama閱讀 38,349評論 3 400
  • 文/蒼蘭香墨 我猛地睜開眼罩驻,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了护赊?” 一聲冷哼從身側(cè)響起惠遏,我...
    開封第一講書人閱讀 36,979評論 0 259
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎骏啰,沒想到半個月后爽哎,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 43,469評論 1 300
  • 正文 獨居荒郊野嶺守林人離奇死亡器一,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 35,938評論 2 323
  • 正文 我和宋清朗相戀三年课锌,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片祈秕。...
    茶點故事閱讀 38,059評論 1 333
  • 序言:一個原本活蹦亂跳的男人離奇死亡渺贤,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出请毛,到底是詐尸還是另有隱情志鞍,我是刑警寧澤,帶...
    沈念sama閱讀 33,703評論 4 323
  • 正文 年R本政府宣布方仿,位于F島的核電站固棚,受9級特大地震影響统翩,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜此洲,卻給世界環(huán)境...
    茶點故事閱讀 39,257評論 3 307
  • 文/蒙蒙 一厂汗、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧呜师,春花似錦娶桦、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,262評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至知牌,卻和暖如春祈争,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背角寸。 一陣腳步聲響...
    開封第一講書人閱讀 31,485評論 1 262
  • 我被黑心中介騙來泰國打工铛嘱, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人袭厂。 一個月前我還...
    沈念sama閱讀 45,501評論 2 354
  • 正文 我出身青樓墨吓,卻偏偏與公主長得像,于是被迫代替她去往敵國和親纹磺。 傳聞我的和親對象是個殘疾皇子帖烘,可洞房花燭夜當晚...
    茶點故事閱讀 42,792評論 2 345

推薦閱讀更多精彩內(nèi)容