RxJava 知識(shí)梳理(1) - RxJava 基本思想

一莫鸭、基礎(chǔ)概述

RxJava的關(guān)鍵是異步,即使隨著程序的邏輯變得復(fù)雜横殴,它依然能夠保持簡潔被因。

二、API介紹和原理剖析

觀察者模式面向的需求是:A對(duì)象(觀察者)對(duì)B對(duì)象(被觀察者)的某種變化高度敏感,需要在B變化的一瞬間做出反應(yīng)梨与,觀察者采用注冊(cè)Register或者訂閱Subscribe的方式堕花,告訴觀察者,我需要你的某某狀態(tài)蛋欣,并在它變化的時(shí)候通知我航徙,在RxJava當(dāng)中,Observable是被觀察者陷虎,Observer就是觀察者到踏。

RxJava有四個(gè)基本概念:

  • Observable:被觀察者。
  • Observer:觀察者尚猿。
  • Subscribe:訂閱窝稿。
  • Event:事件。

ObservableObserver通過subscribe方法實(shí)現(xiàn)訂閱關(guān)系凿掂,Observable可以在需要的時(shí)候發(fā)出事件來通知Observer伴榔。

RxJava有以下三種事件:

  • onNext:普通事件。
  • onCompletedRxJava不僅把每個(gè)事件單獨(dú)處理庄萎,還會(huì)把它們看作一個(gè)隊(duì)列踪少,當(dāng)不會(huì)再有新的onNext事件發(fā)出時(shí),需要觸發(fā)onCompleted事件作為標(biāo)志糠涛。
  • onErroronCompleted和有且僅有一個(gè)援奢,并且是事件序列中的最后一個(gè)。

三忍捡、基本實(shí)現(xiàn)

RxJava的基本實(shí)現(xiàn)有以下三點(diǎn):
1)創(chuàng)建觀察者 - Observer

Observer<String> observer = new Observer<String>() {

    @Override
    public void onCompleted() {
        Log.d(TAG, "onCompleted");
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "onError");
    }

    @Override
    public void onNext(String s) {
        Log.d(TAG, "onNext");
    }
};

除了Observer接口之外集漾,RxJava還內(nèi)置了一個(gè)實(shí)現(xiàn)了Observer的抽象類:Subscriber,它對(duì)Observer接口進(jìn)行了一些擴(kuò)展砸脊,實(shí)質(zhì)上在RxJavasubscribe過程中具篇,Observer也總是被轉(zhuǎn)換成為一個(gè)Subscriber再使用,他們的區(qū)別在與:

  • onStart:這是新增的方法凌埂,它會(huì)在subscribe剛開始驱显,而事件還未發(fā)送之前被調(diào)用,它總是在subscribe所發(fā)生的線程被調(diào)用瞳抓。
  • unsubscribe:這是它實(shí)現(xiàn)的另一個(gè)接口Subscription的方法埃疫,用于取消訂閱,在這個(gè)方法被調(diào)用后挨下,Subscriber將不再接收事件,一般在調(diào)用這個(gè)方法前脐湾,可以使用isUnsubscribed判斷一下狀態(tài)臭笆,Observable在訂閱之后會(huì)持有Subscriber的引用,因此不釋放會(huì)有內(nèi)存泄漏的危險(xiǎn)。

2)創(chuàng)建被觀察者 - Observable
RxJavacreate方法來創(chuàng)建一個(gè)observable愁铺,

rx.Observable observable = rx.Observable.create(new rx.Observable.OnSubscribe<String>() {

    @Override
    public void call(Subscriber<? super String> subscriber) {
        subscriber.onNext("Hello World!");
        subscriber.onCompleted();
    }
});

這里傳入了一個(gè)Observable.OnSubscribe<T>對(duì)象作為參數(shù)鹰霍,它會(huì)被存儲(chǔ)在返回的Observable對(duì)象當(dāng)中,它的作用相當(dāng)于一個(gè)計(jì)劃表茵乱,當(dāng)Observable被訂閱的時(shí)候茂洒,OnSubscribecall方法會(huì)自動(dòng)被調(diào)用,事件序列被依次觸發(fā)瓶竭。
createRxJava最基本的創(chuàng)造事件序列的方法督勺,基于這個(gè)方法,還提供了一些快捷方法來創(chuàng)建事件隊(duì)列:

  • just(T...)
Observable observable = Observable.just("Hello", "Hi", "Aloha");
  • from(T[]) / from(Iterable<? extends T>)
String[] words = {"Hello", "Hi", "Aloha"};
Observable observable = Observable.from(words);

3)訂閱 - subscribe

observable.subscribe(observer);
observable.subscribe(subscriber);

其內(nèi)部核心的代碼類似于:

public Subscription subscribe(Subscriber subscriber) {
    //準(zhǔn)備方法斤贰。
    subscriber.onStart();
    //事件發(fā)送的邏輯開始執(zhí)行智哀,這個(gè)onSubscribe就是創(chuàng)建Observable時(shí)新建的OnSubscribe對(duì)象。
    onSubscribe.call(subscriber);
    //把傳入的Subscriber轉(zhuǎn)換為Subscription并返回荧恍,方便unsubscribe瓷叫。
    return subscriber;
}

Observable.subscribe方法除了支持傳入ObserverSubscriber,還支持傳入Action0送巡、Action1這樣不完整定義的回調(diào)摹菠,RxJava會(huì)自動(dòng)根據(jù)定義創(chuàng)建出Subscriber

四骗爆、線程控制

在不指定線程的情況下次氨,RxJava遵循這樣的原則,在哪個(gè)線程調(diào)用subscribe淮腾,就在哪個(gè)線程產(chǎn)生事件糟需,在哪個(gè)線程產(chǎn)生事件,就在哪個(gè)線程消費(fèi)事件谷朝,如果需要消費(fèi)線程洲押,那么就需要用到SchedulerRxJava內(nèi)置了幾個(gè)Scheduler

  • Schedulers.immediate:直接在當(dāng)前線程運(yùn)行圆凰。
  • Schedulers.newThread:總是啟用新線程杈帐,并在線程執(zhí)行操作。
  • Schedulers.io:其內(nèi)部實(shí)現(xiàn)是一個(gè)無數(shù)量上限的的線程池专钉,可以重用空閑的線程挑童,不要把計(jì)算工作放在io,可以避免創(chuàng)建不必要的線程跃须。
  • Schedulers.computation:使用固定的線程池站叼,大小為CPU核數(shù)。
  • AndroidSchedulers.mainThread:指定的操作將在Android主線程中運(yùn)行菇民。

對(duì)線程控制有以下兩個(gè)方法:

  • subscribeOn:指定subscribe發(fā)生的線程尽楔,即Observable.OnSubscribe被激活時(shí)所處的線程投储,也就是call方法執(zhí)行時(shí)所處的線程。
  • observeOn:指定Subscriber所運(yùn)行在的線程阔馋。

observeOn指定的是Subscriber的線程玛荞,而這個(gè)Subscriber并不一定是subscribe()參數(shù)中的Subscriber,而是observeOn執(zhí)行時(shí)的當(dāng)前Observable所對(duì)應(yīng)的Subscriber呕寝,即它的直接下級(jí)Subscriber勋眯,也就是它之后的操作所在的線程,因此下梢,如果有多次切換線程的要求客蹋,只要在每個(gè)想要切換線程的位置調(diào)用依次observeOn即可。
observeOn不同怔球,subscribeOn只能調(diào)用一次嚼酝,下面我們來分析一下它的內(nèi)部實(shí)現(xiàn),首先是subscribeOn的原理:
subscribeOnObserveOn都做了線程切換的工作:

  • subscribeOn的線程切換發(fā)生在OnSubscribe中竟坛,即在它通知上一級(jí)OnSubscribe時(shí)闽巩,這時(shí)事件還沒有發(fā)送,因此subscribeOn的線程控制可以從事件發(fā)出的開端造成影響担汤。
Paste_Image.png
  • observeOn的線程切換則發(fā)生在它內(nèi)建的Subscriber中涎跨,即發(fā)生在它即將給下一級(jí)Subscriber發(fā)送事件時(shí),因此控制的是它后面的線程崭歧。
Paste_Image.png

五隅很、變換

變換,就是將事件序列中的對(duì)象或整個(gè)序列進(jìn)行加工處理率碾,轉(zhuǎn)換不同的事件或者序列叔营。

5.1 map()

通過FuncX,把參數(shù)中的Integer轉(zhuǎn)換成為String所宰,是最常用的變換绒尊,這個(gè)變換是發(fā)生在subscribeOn所指定的線程當(dāng)中的。

Subscriber<String> subscriber = new Subscriber<String>() {

    @Override
    public void onCompleted() {

    }

    @Override
    public void onError(Throwable e) {

    }

    @Override
    public void onNext(String s) {
        long nextId = Thread.currentThread().getId();
        Log.d(TAG, "onNext:" + s + ", threadId=" + nextId);
    }
};
Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() {
    @Override
    public void call(Subscriber<? super Integer> subscriber) {
        long callId = Thread.currentThread().getId();
        subscriber.onNext(5);
        subscriber.onCompleted();
    }
});
observable.map(new Func1<Integer, String>() {

    @Override
    public String call(Integer integer) {
        long mapId = Thread.currentThread().getId();
        return "My Number is:" + integer;
    }
}).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(subscriber);

其示意圖類似于:


Paste_Image.png

5.2 flatMap

它和map有一個(gè)共同點(diǎn)仔粥,就是把傳入的參數(shù)轉(zhuǎn)化之后返回另一個(gè)對(duì)象婴谱,但是和map不同的是,flatMap返回的是一個(gè)Observable對(duì)象躯泰,而且它并不直接把這個(gè)對(duì)象傳給Subscriber谭羔,而是通過這個(gè)新建的Observable來發(fā)送事件,其整個(gè)的調(diào)用過程:

  • 使用傳入的事件對(duì)象創(chuàng)建一個(gè)Observable麦向。
  • 激活這個(gè)Observable瘟裸,通過它來發(fā)送事件。
  • 每一個(gè)創(chuàng)建出來的Observable發(fā)送的事件诵竭,被匯入同一個(gè)Observable话告,它復(fù)雜將這些事件同一交給Subscriber的回調(diào)方法十办。
Subscriber<String> subscriber = new Subscriber<String>() {

    @Override
    public void onCompleted() {}

    @Override
    public void onError(Throwable e) {}

    @Override
    public void onNext(String s) {
        Log.d(TAG, "onNext, s=" + s);
    }
};
Observable<List<String>> observable = Observable.create(new Observable.OnSubscribe<List<String>>() {

    @Override
    public void call(Subscriber<? super List<String>> subscriber) {
        List<String> list = new ArrayList<>();
        list.add("First");
        list.add("Second");
        list.add("Third");
        subscriber.onNext(list);
    }
});
observable.flatMap(new Func1<List<String>, Observable<String>>() {
    @Override
    public Observable<String> call(List<String> strings) {
        return Observable.from(strings);
    }
}).subscribe(subscriber);

其示意圖:


Paste_Image.png

六、變換的原理

變換的實(shí)質(zhì)是針對(duì)事件序列的處理和再發(fā)送超棺,在RxJava的內(nèi)部,它們是基于同一個(gè)基礎(chǔ)的變換方法lift(operator)

//生成了一個(gè)新的Observable并返回呵燕。
public <R> Observable<R> lift(Operator<? extends R, ? super T> operator) {
    //構(gòu)造新的Observable時(shí)棠绘,同時(shí)新建了一個(gè)OnSubscribe對(duì)象。
    return Observable.create(new OnSubscribe<R>() {
        @Override
        public void call(Subscriber subscriber) {
            Subscriber newSubscriber = operator.call(subscriber);
            newSubscriber.onStart();
            //原始的onSubscribe再扭。
            onSubscribe.call(newSubscriber);
        }
    });
}

示意圖:

Paste_Image.png
  • lift創(chuàng)建了一個(gè)Observable后氧苍,加上之前的原始Observable,有兩個(gè)Observable泛范。
  • 新的Observable里的OnSubscribe加上原始的让虐,共有兩個(gè)OnSubscribe
  • 當(dāng)用戶通過調(diào)用lift/map創(chuàng)建的Observable對(duì)象的subscribe方法時(shí)罢荡,于是它觸發(fā)了上面的call方法中的內(nèi)容赡突。
  • 在這個(gè)新的OnSubscribecall方法中,傳入了目標(biāo)的Subscriber区赵,同時(shí)其外部類中還持有了原始的OnSubscribe惭缰。我們先通過operator.call(oldSubscriber)方法,生成了新的Subscriber(new Subscriber)笼才,然后利用這個(gè)新的Subscriber向原始的Observable進(jìn)行訂閱漱受。

下面我們以前面map實(shí)現(xiàn)的例子來分析一下源碼,上面的例子通過map操作符把Integer類型的ObservableString類型的Subscriber生成了訂閱關(guān)系骡送。

  • map方法昂羡,它通過lift方法返回了一個(gè)String類型的Observable
//其中T=Integer摔踱,R=String虐先。
public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
        return lift(new OperatorMap<T, R>(func));
}
  • 下面看下OperatorMap這個(gè)對(duì)象,這個(gè)對(duì)象實(shí)現(xiàn)了operator<R,T>接口昌渤,而這個(gè)接口繼承于Func1<Subscriber<? super R>, Subscriber<? super T>>赴穗,在它實(shí)現(xiàn)的call方法中傳入了String類型的Subscriber(目標(biāo)Subscriber),并返回了Integer類型的Subscriber(代理Subscriber)膀息,當(dāng)它的方法被回調(diào)時(shí)般眉,會(huì)調(diào)用目標(biāo)Subscriber的對(duì)應(yīng)方法,其中在調(diào)用onNext時(shí)潜支,就用上了外部傳入的Func1函數(shù):
    @Override
    public Subscriber<? super T> call(final Subscriber<? super R> o) {
        return new Subscriber<T>(o) {

            @Override
            public void onCompleted() {
                o.onCompleted();
            }

            @Override
            public void onError(Throwable e) {
                o.onError(e);
            }

            @Override
            public void onNext(T t) {
                try {
                    o.onNext(transformer.call(t));
                } catch (Throwable e) {
                    Exceptions.throwIfFatal(e);
                    onError(OnErrorThrowable.addValueAsLastCause(e, t));
                }
            }

        };
    }
  • 接著再回過頭來看lift方法:
    public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
        return new Observable<R>(new OnSubscribe<R>() {
            @Override
            public void call(Subscriber<? super R> o) {
                try {
                    //返回一個(gè)Integer類型的Subscriber甸赃。
                    Subscriber<? super T> st = hook.onLift(operator).call(o);
                    try {
                        st.onStart();
                        //關(guān)鍵方法:Integer類型的OnSubscribe調(diào)用對(duì)應(yīng)的Subscribe,這個(gè)call方法里面寫了我們的邏輯冗酿,當(dāng)它調(diào)用onNext(Integer integer)時(shí)埠对,實(shí)際上調(diào)用的是onNext(String str)络断。
                        onSubscribe.call(st);
                    } catch (Throwable e) {
                        if (e instanceof OnErrorNotImplementedException) {
                            throw (OnErrorNotImplementedException) e;
                        }
                        st.onError(e);
                    }
                } catch (Throwable e) {
                    if (e instanceof OnErrorNotImplementedException) {
                        throw (OnErrorNotImplementedException) e;
                    }
                    o.onError(e);
                }
            }
        });
    }
  • 最后就是調(diào)用subscribe方法。
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末项玛,一起剝皮案震驚了整個(gè)濱河市貌笨,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌襟沮,老刑警劉巖锥惋,帶你破解...
    沈念sama閱讀 219,039評(píng)論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異开伏,居然都是意外死亡膀跌,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,426評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門固灵,熙熙樓的掌柜王于貴愁眉苦臉地迎上來捅伤,“玉大人,你說我怎么就攤上這事巫玻〈砸洌” “怎么了?”我有些...
    開封第一講書人閱讀 165,417評(píng)論 0 356
  • 文/不壞的土叔 我叫張陵仍秤,是天一觀的道長蘸际。 經(jīng)常有香客問我,道長徒扶,這世上最難降的妖魔是什么粮彤? 我笑而不...
    開封第一講書人閱讀 58,868評(píng)論 1 295
  • 正文 為了忘掉前任,我火速辦了婚禮姜骡,結(jié)果婚禮上导坟,老公的妹妹穿的比我還像新娘。我一直安慰自己圈澈,他們只是感情好惫周,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,892評(píng)論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著康栈,像睡著了一般递递。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上啥么,一...
    開封第一講書人閱讀 51,692評(píng)論 1 305
  • 那天登舞,我揣著相機(jī)與錄音,去河邊找鬼悬荣。 笑死菠秒,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的氯迂。 我是一名探鬼主播践叠,決...
    沈念sama閱讀 40,416評(píng)論 3 419
  • 文/蒼蘭香墨 我猛地睜開眼言缤,長吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來了禁灼?” 一聲冷哼從身側(cè)響起管挟,我...
    開封第一講書人閱讀 39,326評(píng)論 0 276
  • 序言:老撾萬榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎弄捕,沒想到半個(gè)月后哮独,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,782評(píng)論 1 316
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡察藐,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,957評(píng)論 3 337
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了舟扎。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片分飞。...
    茶點(diǎn)故事閱讀 40,102評(píng)論 1 350
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖睹限,靈堂內(nèi)的尸體忽然破棺而出譬猫,到底是詐尸還是另有隱情,我是刑警寧澤羡疗,帶...
    沈念sama閱讀 35,790評(píng)論 5 346
  • 正文 年R本政府宣布染服,位于F島的核電站,受9級(jí)特大地震影響叨恨,放射性物質(zhì)發(fā)生泄漏柳刮。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,442評(píng)論 3 331
  • 文/蒙蒙 一痒钝、第九天 我趴在偏房一處隱蔽的房頂上張望秉颗。 院中可真熱鬧,春花似錦送矩、人聲如沸蚕甥。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,996評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽菇怀。三九已至,卻和暖如春晌块,著一層夾襖步出監(jiān)牢的瞬間爱沟,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,113評(píng)論 1 272
  • 我被黑心中介騙來泰國打工匆背, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留钥顽,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 48,332評(píng)論 3 373
  • 正文 我出身青樓靠汁,卻偏偏與公主長得像蜂大,于是被迫代替她去往敵國和親闽铐。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,044評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容

  • 我從去年開始使用 RxJava 奶浦,到現(xiàn)在一年多了兄墅。今年加入了 Flipboard 后,看到 Flipboard 的...
    Jason_andy閱讀 5,473評(píng)論 7 62
  • 前言我從去年開始使用 RxJava 澳叉,到現(xiàn)在一年多了隙咸。今年加入了 Flipboard 后,看到 Flipboard...
    占導(dǎo)zqq閱讀 9,164評(píng)論 6 151
  • 文章轉(zhuǎn)自:http://gank.io/post/560e15be2dca930e00da1083作者:扔物線在正...
    xpengb閱讀 7,032評(píng)論 9 73
  • 前言 今天成洗,沒有風(fēng)和日麗五督,沒有太陽高照夏夏苦逼的坐著公交車去面試了一把。無論結(jié)果如何瓶殃,總要漲一波經(jīng)驗(yàn)的說于是乎夏夏...
    __夏至未至閱讀 526評(píng)論 6 6
  • 作者/胄寧 空山暮雨渲染著遲夏的凄悲充包, 劫后重生仍舊殘留著的余味, 你楚楚的眼神在向他傳達(dá)著曖昧遥椿, 在這曖昧的背后...
    胄寧閱讀 535評(píng)論 8 6