基礎(chǔ)

注:本文分析的是RxJava 2.0.0 閱讀之前诅诱,希望你對(duì)RxJava1已經(jīng)有所了解奶赠。
首先了解幾個(gè)基本接口

public interface Emitter<T> 
    void onNext(T value);
    void onError(Throwable error);
    void onComplete();
public interface ObservableEmitter<T> extends Emitter<T>
    void setDisposable(Disposable d);
    void setCancellable(Cancellable c);
    boolean isDisposed();
    ObservableEmitter<T> serialize();
public interface Disposable {
    void dispose();
    boolean isDisposed();

和第一版的Subscriber挺像的近刘,把Subscriber拆分出了幾部分
再看一個(gè)接口

public interface Observer<T>
    void onSubscribe(Disposable d);
    void onNext(T value);
    void onError(Throwable e);
    void onComplete();

其中onSubscribe和第一版Subscriber的onStart方法類似
第二版多了個(gè)Disposable參數(shù),讓你可以取消橘沥。
我們可以寫(xiě)這個(gè)接口的子類窗轩,訂閱的時(shí)候使用

作為開(kāi)始,先寫(xiě)一個(gè)超級(jí)簡(jiǎn)單的例子:

Observable
       .create(new ObservableOnSubscribe<String>() {
      @Override public void subscribe(ObservableEmitter<String> e) throws Exception {
        e.onNext("1"); } })
        .subscribe(new Observer<String>() {
          @Override public void onSubscribe(Disposable d) {}
          @Override public void onNext(String value) { }
          @Override public void onError(Throwable e) {}
          @Override public void onComplete() {}
        });

(排版有點(diǎn)糟糕座咆,先將就一下~~)
create方法接受一個(gè)ObservableOnSubscribe對(duì)象
ObservableOnSubscribe是一個(gè)接口品姓,只有一個(gè)方法

public interface ObservableOnSubscribe<T> 
    void subscribe(ObservableEmitter<T> e) throws Exception;

ObservableEmitter這個(gè)接口在開(kāi)始的時(shí)候提到過(guò)
看看create的源碼:

public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
        ObjectHelper.requireNonNull(source, "source is null");
        return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
    }

先判空
RxJavaPlugins.onAssembly是一個(gè)hook,類似于監(jiān)聽(tīng)箫措,不設(shè)置的話傳入什么就返回什么
所以這里返回的是ObservableCreate對(duì)象
ObservableCreate繼承了Observable腹备,Observable是一個(gè)基類,也是一個(gè)抽象類
Observable規(guī)定了被觀察者的基本流程斤蔓,具體實(shí)現(xiàn)由子類完成
ObservableCreate的構(gòu)造植酥,僅僅是接口保存起來(lái)

現(xiàn)在來(lái)看subscribe方法
里面經(jīng)過(guò)一些檢查后,會(huì)調(diào)用subscribeActual方法
這個(gè)方法是abstract的
前面我們得到的實(shí)際上是ObservableCreate對(duì)象弦牡,所以這個(gè)具體實(shí)現(xiàn)在ObservableCreate里面

protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        observer.onSubscribe(parent);

        try {
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }

可以看到友驮,這里用CreateEmitter包裝了observer,這個(gè)observer就是訂閱時(shí)傳入的observer
然后驾锰,調(diào)用onSubscribe方法
之后卸留,source就是create的時(shí)候傳入的接口,終于回到我們的自定義方法了
然而椭豫,這個(gè)時(shí)候耻瑟,我們和observer還隔著一個(gè)CreateEmitter
先看看CreateEmitter部分代碼:

 @Override
        public void onNext(T t) {
            if (t == null) {
                onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                return;
            }
            if (!isDisposed()) {
                observer.onNext(t);
            }
        }

        @Override
        public void onError(Throwable t) {
            if (t == null) {
                t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
            }
            if (!isDisposed()) {
                try {
                    observer.onError(t);
                } finally {
                    dispose();
                }
            } else {
                RxJavaPlugins.onError(t);
            }
        }

        @Override
        public void onComplete() {
            if (!isDisposed()) {
                try {
                    observer.onComplete();
                } finally {
                    dispose();
                }
            }
        }

onNext: 檢查發(fā)射的值是否為null, observer是否已取消觀察, 然后才交給observer
注意赏酥,異常需要自己處理
onComplete:檢查observer是否已取消觀察喳整, 交給observer, 最后調(diào)用dispose
onError: 檢查異常是否為null, observer是否已取消觀察裸扶, 然后才交給observer框都, 最后dispose
注意,要保證onComplete和onError只能調(diào)用一次

再來(lái)看看取消觀察
CreateEmitter繼承了AtomicReference
也就是說(shuō)這個(gè)類有一個(gè)變量呵晨,這個(gè)變量用來(lái)標(biāo)志是否已經(jīng)取消魏保。
調(diào)用dispose最終會(huì)調(diào)用這個(gè)方法

public static boolean dispose(AtomicReference<Disposable> field) {
        Disposable current = field.get();
        Disposable d = DISPOSED;
        if (current != d) {
            current = field.getAndSet(d);
            if (current != d) {
                if (current != null) {
                    current.dispose();
                }
                return true;
            }
        }
        return false;
    }

簡(jiǎn)單來(lái)說(shuō),就是把這個(gè)標(biāo)志設(shè)置為DISPOSED摸屠,DISPOSED就是一個(gè)單例谓罗。

isDispose方法:

public static boolean isDisposed(Disposable d) {
        return d == DISPOSED;
    }

很簡(jiǎn)單,看這個(gè)標(biāo)志是否等于DISPOSE

回顧一下餐塘,在create的時(shí)候返回ObservableCreate對(duì)象妥衣,這個(gè)對(duì)象保存了傳入的接口
subscribe的時(shí)候,傳入的observer會(huì)到達(dá)ObservableCreate,進(jìn)行包裝税手,然后交給create時(shí)我們的自定義處理
取消訂閱就是設(shè)置標(biāo)志位蜂筹,本身并不會(huì)做些什么,這和Thread的停止是一樣的道理

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末芦倒,一起剝皮案震驚了整個(gè)濱河市艺挪,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌兵扬,老刑警劉巖麻裳,帶你破解...
    沈念sama閱讀 216,843評(píng)論 6 502
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異器钟,居然都是意外死亡津坑,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,538評(píng)論 3 392
  • 文/潘曉璐 我一進(jìn)店門傲霸,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)疆瑰,“玉大人,你說(shuō)我怎么就攤上這事昙啄∧乱郏” “怎么了?”我有些...
    開(kāi)封第一講書(shū)人閱讀 163,187評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵梳凛,是天一觀的道長(zhǎng)耿币。 經(jīng)常有香客問(wèn)我,道長(zhǎng)韧拒,這世上最難降的妖魔是什么淹接? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 58,264評(píng)論 1 292
  • 正文 為了忘掉前任,我火速辦了婚禮叭莫,結(jié)果婚禮上蹈集,老公的妹妹穿的比我還像新娘。我一直安慰自己雇初,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,289評(píng)論 6 390
  • 文/花漫 我一把揭開(kāi)白布减响。 她就那樣靜靜地躺著靖诗,像睡著了一般。 火紅的嫁衣襯著肌膚如雪支示。 梳的紋絲不亂的頭發(fā)上刊橘,一...
    開(kāi)封第一講書(shū)人閱讀 51,231評(píng)論 1 299
  • 那天,我揣著相機(jī)與錄音颂鸿,去河邊找鬼促绵。 笑死,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的败晴。 我是一名探鬼主播浓冒,決...
    沈念sama閱讀 40,116評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼尖坤!你這毒婦竟也來(lái)了稳懒?” 一聲冷哼從身側(cè)響起,我...
    開(kāi)封第一講書(shū)人閱讀 38,945評(píng)論 0 275
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤慢味,失蹤者是張志新(化名)和其女友劉穎场梆,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體纯路,經(jīng)...
    沈念sama閱讀 45,367評(píng)論 1 313
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡或油,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,581評(píng)論 2 333
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了驰唬。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片装哆。...
    茶點(diǎn)故事閱讀 39,754評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖定嗓,靈堂內(nèi)的尸體忽然破棺而出蜕琴,到底是詐尸還是另有隱情,我是刑警寧澤宵溅,帶...
    沈念sama閱讀 35,458評(píng)論 5 344
  • 正文 年R本政府宣布凌简,位于F島的核電站,受9級(jí)特大地震影響恃逻,放射性物質(zhì)發(fā)生泄漏雏搂。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,068評(píng)論 3 327
  • 文/蒙蒙 一寇损、第九天 我趴在偏房一處隱蔽的房頂上張望凸郑。 院中可真熱鬧,春花似錦矛市、人聲如沸芙沥。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 31,692評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)而昨。三九已至,卻和暖如春找田,著一層夾襖步出監(jiān)牢的瞬間歌憨,已是汗流浹背。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 32,842評(píng)論 1 269
  • 我被黑心中介騙來(lái)泰國(guó)打工墩衙, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留务嫡,地道東北人甲抖。 一個(gè)月前我還...
    沈念sama閱讀 47,797評(píng)論 2 369
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像心铃,于是被迫代替她去往敵國(guó)和親准谚。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,654評(píng)論 2 354

推薦閱讀更多精彩內(nèi)容

  • 前言 終究沒(méi)有經(jīng)受住RxJava的誘惑于个,只恨自己來(lái)的比較晚氛魁,走起~ RxJava 是什么? 一個(gè)在 Java VM...
    王永迪閱讀 4,273評(píng)論 3 37
  • 本文章內(nèi)部分圖片資源來(lái)自RayWenderlich.com 本文結(jié)合自己的理解來(lái)總結(jié)介紹一下RxSwift最基本的...
    FKSky閱讀 2,877評(píng)論 4 14
  • RxJava學(xué)習(xí) RxJava 是什么 異步 : 一個(gè)在 Java VM 上使用可觀測(cè)的序列來(lái)組成異步的厅篓、基于事件...
    scarerow閱讀 386評(píng)論 0 0
  • 我從去年開(kāi)始使用 RxJava 秀存,到現(xiàn)在一年多了。今年加入了 Flipboard 后羽氮,看到 Flipboard 的...
    Jason_andy閱讀 5,468評(píng)論 7 62
  • 文章轉(zhuǎn)自:http://gank.io/post/560e15be2dca930e00da1083作者:扔物線在正...
    xpengb閱讀 7,032評(píng)論 9 73