【RxJava】- 連接操作符源碼分析

目錄

【RxJava】- 創(chuàng)建操作符源碼分析
【RxJava】- 變換操作符源碼分析
【RxJava】- 過(guò)濾操作符源碼分析
【RxJava】- 結(jié)合操作符源碼分析

Connect

讓一個(gè)可連接的Observable開(kāi)始發(fā)射數(shù)據(jù)給訂閱者漓骚,可連接的Observable (connectable Observable)與普通的Observable差不多,不過(guò)它并不會(huì)在被訂閱時(shí)開(kāi)始發(fā)射數(shù)據(jù)预麸,而是直到使用了Connect操作符時(shí)才會(huì)開(kāi)始。用這個(gè)方法彭羹,你可以等待所有的觀察者都訂閱了Observable之后再開(kāi)始發(fā)射數(shù)據(jù)奕筐。

RxJava中connect是ConnectableObservable接口的一個(gè)方法工扎,使用publish操作符可以將一個(gè)普通的Observable轉(zhuǎn)換為一個(gè)ConnectableObservable徘钥。

Observable.create(null).replay().connect();

實(shí)現(xiàn)類

ConnectConsumer

自己查看,很簡(jiǎn)單肢娘,就幾行代碼呈础。

調(diào)用 connect(cc)方法,返回disposable實(shí)例橱健。

Replay

保證所有的觀察者收到相同的數(shù)據(jù)序列而钞,即使它們?cè)贠bservable開(kāi)始發(fā)射數(shù)據(jù)之后才訂閱.

可連接的Observable (connectable Observable)與普通的Observable差不多,不過(guò)它并不會(huì)在被訂閱時(shí)開(kāi)始發(fā)射數(shù)據(jù)拘荡,而是直到使用了Connect操作符時(shí)才會(huì)開(kāi)始臼节。用這種方法,你可以在任何時(shí)候讓一個(gè)Observable開(kāi)始發(fā)射數(shù)據(jù)珊皿。

實(shí)現(xiàn)類

ObservableReplay

Replay還有其它實(shí)現(xiàn)類网缝,請(qǐng)自己查看。這里講解replay()沒(méi)有傳入?yún)?shù)的實(shí)現(xiàn)蟋定》垭看一下創(chuàng)建過(guò)程,最終創(chuàng)建ObservableReplay實(shí)例傳入的參數(shù)如下:

  • source
    被觀察者
  • bufferFactory
    UnBoundedFactory實(shí)例
  • curr
    final AtomicReference<ReplayObserver<T>> curr = new AtomicReference<>();
  • onSubscribe
    ObservableSource<T> onSubscribe = new ReplaySource<>(curr, bufferFactory);
    

首先調(diào)用connect(Consumer<? super Disposable> connection)方法

返回初始化容量為16的UnboundedReplayBuffer數(shù)組

ReplayBuffer<T> buf = bufferFactory.call();

保存新創(chuàng)建的ReplayObserver對(duì)象

if (!current.compareAndSet(ps, u)){continue;}

調(diào)用accept方法

connection.accept(ps);

這里調(diào)用的是上面Connect中的ConnectConsumer類中的accept方法驶兜。

public void accept(Disposable t) {
    this.disposable = t;
}

調(diào)用被觀察者

if (doConnect) {
    source.subscribe(ps);
}

調(diào)用onNext(T t)

public void onNext(T t) {
    if (!done) {
        // 保存到數(shù)據(jù)
        buffer.next(t);
        replay();
    }
}

核心邏輯就在replay中扼仲,自己查看。

Publish

將普通的Observable轉(zhuǎn)換為可連接的Observable

ConnectableObservable observable = Observable.create((ObservableOnSubscribe<Integer>) emitter -> {
    ObservableEmitter<Integer> observableEmitter = emitter.serialize();
    try {
        // 序列化
        if (!emitter.isDisposed()) {
            for (int i = 1; i < 3; i++) {
                System.out.println("create operate--->emitter: "+i);
                if (1==i){
                    // ExceptionHelper.TERMINATED
                   //   observableEmitter.onError(new Throwable("error"));
                }else {
                    observableEmitter.onNext(i);
                }
            }
            observableEmitter.onComplete();
        }
    } catch (Exception e) {
        observableEmitter.onError(e);
    }
}).publish();
observable.subscribe(new Observer<Integer>() {...});
observable.connect();

observable.connect()不調(diào)用促王,是不會(huì)發(fā)射數(shù)據(jù)的犀盟。

實(shí)現(xiàn)類

ObservablePublish

看subscribeActual方法

protected void subscribeActual(Observer<? super T> observer) {
    ...
    InnerDisposable<T> inner = new InnerDisposable<>(observer, co
    observer.onSubscribe(inner);
    if (conn.add(inner)) {
        if (inner.isDisposed()) {
            conn.remove(inner);
        }
        return;
    }
    ...
}

conn.add(inner)為每一個(gè)訂閱者創(chuàng)建InnerDisposable對(duì)象,然后保存在PublishConnection中的一個(gè)數(shù)組中蝇狼。

看connect方法

public void connect(Consumer<? super Disposable> connection) {
    ...
    if (doConnect) {
        source.subscribe(conn);
    }
}

調(diào)用被觀察者subscribe對(duì)象,然后可以在里面發(fā)射數(shù)據(jù)倡怎。

PublishConnection中的 onNext方法

public void onNext(T t) {
    for (InnerDisposable<T> inner : get()) {
        inner.downstream.onNext(t);
    }

調(diào)用每一個(gè)訂閱者的onNext方法迅耘,將數(shù)據(jù)發(fā)射給訂閱者。

RefCount

讓一個(gè)可連接的Observable行為像普通的Observable监署〔ǎ可連接的Observable (connectable Observable)與普通的Observable差不多,不過(guò)它并不會(huì)在被訂閱時(shí)開(kāi)始發(fā)射數(shù)據(jù)钠乏,而是直到使用了Connect操作符時(shí)才會(huì)開(kāi)始栖秕。用這種方法,你可以在任何時(shí)候讓一個(gè)Observable開(kāi)始發(fā)射數(shù)據(jù)晓避。

Observable observable = Observable.create((ObservableOnSubscrib
    ObservableEmitter<Integer> observableEmitter = emitter.seri
    try {
        // 序列化
        if (!emitter.isDisposed()) {
            for (int i = 0; i < 4; i++) {
                System.out.println("create operate--->emitter: 
                if (1==i){
                    // ExceptionHelper.TERMINATED
                      observableEmitter.onError(new Throwable("
                }else {
                    observableEmitter.onNext(i);
                }
            }
            observableEmitter.onComplete();
        }
    } catch (Exception e) {
        observableEmitter.onError(e);
    }
}).publish().refCount(2);
observable.subscribe(...);
observable.subscribe(...);

實(shí)現(xiàn)類

ObservableRefCount

看一下subscribeActual方法

protected void subscribeActual(Observer<? super T> observer) {
    RefConnection conn;
    boolean connect = false;
    synchronized (this) {
        conn = connection;
        if (conn == null) {
            conn = new RefConnection(this);
            connection = conn;
        }
        long c = conn.subscriberCount;
        if (c == 0L && conn.timer != null) {
            conn.timer.dispose();
        }
        conn.subscriberCount = c + 1;
        if (!conn.connected && c + 1 == n) {
            connect = true;
            conn.connected = true;
        }
    }
    source.subscribe(new RefCountObserver<>(observer, this, conn));
    if (connect) {
        source.connect(conn);
    }
}

if (!conn.connected && c + 1 == n)當(dāng)訂閱者的數(shù)量等于refCount方法傳入的數(shù)量時(shí)簇捍,表示可以連接只壳,調(diào)用ObservablePublish中的connect方法。就與上面將Publish操作符調(diào)用connect方法作用相同暑塑。

當(dāng)然你可以隨時(shí)調(diào)用connect方法來(lái)強(qiáng)制發(fā)射數(shù)據(jù)吼句。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市事格,隨后出現(xiàn)的幾起案子惕艳,更是在濱河造成了極大的恐慌,老刑警劉巖驹愚,帶你破解...
    沈念sama閱讀 222,729評(píng)論 6 517
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件远搪,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡逢捺,警方通過(guò)查閱死者的電腦和手機(jī)终娃,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 95,226評(píng)論 3 399
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)蒸甜,“玉大人棠耕,你說(shuō)我怎么就攤上這事∧拢” “怎么了窍荧?”我有些...
    開(kāi)封第一講書人閱讀 169,461評(píng)論 0 362
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)恨憎。 經(jīng)常有香客問(wèn)我蕊退,道長(zhǎng),這世上最難降的妖魔是什么憔恳? 我笑而不...
    開(kāi)封第一講書人閱讀 60,135評(píng)論 1 300
  • 正文 為了忘掉前任瓤荔,我火速辦了婚禮,結(jié)果婚禮上钥组,老公的妹妹穿的比我還像新娘输硝。我一直安慰自己,他們只是感情好程梦,可當(dāng)我...
    茶點(diǎn)故事閱讀 69,130評(píng)論 6 398
  • 文/花漫 我一把揭開(kāi)白布点把。 她就那樣靜靜地躺著,像睡著了一般屿附。 火紅的嫁衣襯著肌膚如雪郎逃。 梳的紋絲不亂的頭發(fā)上,一...
    開(kāi)封第一講書人閱讀 52,736評(píng)論 1 312
  • 那天挺份,我揣著相機(jī)與錄音褒翰,去河邊找鬼。 笑死,一個(gè)胖子當(dāng)著我的面吹牛优训,可吹牛的內(nèi)容都是我干的朵你。 我是一名探鬼主播,決...
    沈念sama閱讀 41,179評(píng)論 3 422
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼型宙,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼撬呢!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起妆兑,我...
    開(kāi)封第一講書人閱讀 40,124評(píng)論 0 277
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤魂拦,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后搁嗓,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體芯勘,經(jīng)...
    沈念sama閱讀 46,657評(píng)論 1 320
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,723評(píng)論 3 342
  • 正文 我和宋清朗相戀三年腺逛,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了荷愕。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,872評(píng)論 1 353
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡棍矛,死狀恐怖安疗,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情够委,我是刑警寧澤荐类,帶...
    沈念sama閱讀 36,533評(píng)論 5 351
  • 正文 年R本政府宣布,位于F島的核電站茁帽,受9級(jí)特大地震影響玉罐,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜潘拨,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 42,213評(píng)論 3 336
  • 文/蒙蒙 一吊输、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧铁追,春花似錦季蚂、人聲如沸。這莊子的主人今日做“春日...
    開(kāi)封第一講書人閱讀 32,700評(píng)論 0 25
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至狰闪,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間濒生,已是汗流浹背埋泵。 一陣腳步聲響...
    開(kāi)封第一講書人閱讀 33,819評(píng)論 1 274
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人丽声。 一個(gè)月前我還...
    沈念sama閱讀 49,304評(píng)論 3 379
  • 正文 我出身青樓礁蔗,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親雁社。 傳聞我的和親對(duì)象是個(gè)殘疾皇子浴井,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,876評(píng)論 2 361

推薦閱讀更多精彩內(nèi)容

  • 記錄RxJava操作符,方便查詢(2.2.2版本) 英文文檔地址:http://reactivex.io/docu...
    凌云飛魚閱讀 826評(píng)論 0 0
  • ReactiveX 系列文章目錄 cache/cacheWithInitialCapacity 看注釋意思是將所有...
    三流之路閱讀 746評(píng)論 0 0
  • 創(chuàng)建操作 用于創(chuàng)建Observable的操作符Create通過(guò)調(diào)用觀察者的方法從頭創(chuàng)建一個(gè)ObservableEm...
    rkua閱讀 1,836評(píng)論 0 1
  • 注:只包含標(biāo)準(zhǔn)包中的操作符霉撵,用于個(gè)人學(xué)習(xí)及備忘參考博客:http://blog.csdn.net/maplejaw...
    小白要超神閱讀 943評(píng)論 0 3
  • 幾種主要的需求: 直接創(chuàng)建一個(gè)Observable(創(chuàng)建操作) 組合多個(gè)Observable(組合操作) 對(duì)Obs...
    三也視界閱讀 11,462評(píng)論 4 45