關(guān)于RxJava最友好的文章(進階)

轉(zhuǎn)載分享請事先與本人取得聯(lián)系

前言

之前就寫過一篇《關(guān)于Rxjava最友好的文章》,反響很不錯攘轩,由于那篇文章的定位就是簡單友好桐猬,因此盡可能的摒棄復(fù)雜的概念插龄,只抓住關(guān)鍵的東西來講琅束,以保證大家都能看懂。

不過那篇文章寫完之后低缩,我就覺得應(yīng)該還得有一篇文章給RxJava做一個深入的講解才算完美嘉冒,于是就有了今天的進階篇。因為一個團隊里可能大家都會用RxJava咆繁,但是必須要有一個人很懂這個讳推,不然碰到問題可就麻煩了。

在前一篇文章中的最后玩般,我們得出結(jié)論:RxJava就是在觀察者模式的骨架下银觅,通過豐富的操作符和便捷的異步操作來完成對于復(fù)雜業(yè)務(wù)的處理。今天我們還是就結(jié)論中的觀察者模式操作符來做深入的拓展坏为。

在進入正題之前究驴,還是希望大家先去我的主頁上看看《關(guān)于Rxjava最友好的文章》。

關(guān)于觀察者模式

前一篇文章首先就重點談到了觀察者模式匀伏,我們認(rèn)為觀察者模式RxJava的骨架*洒忧。在這里不是要推翻之前的結(jié)論,而是希望從深入它的內(nèi)部的去了解它的實現(xiàn)够颠。

依然使用之前文章中關(guān)于開關(guān)和臺燈的代碼

//創(chuàng)建一個被觀察者(開關(guān))
 Observable switcher=Observable.create(new Observable.OnSubscribe<String>(){

            @Override
            public void call(Subscriber<? super String> subscriber) {
                subscriber.onNext("On");
                subscriber.onNext("Off");
                subscriber.onNext("On");
                subscriber.onNext("On");
                subscriber.onCompleted();
            }
        });
//創(chuàng)建一個觀察者(臺燈)
 Subscriber light=new Subscriber<String>() {
            @Override
            public void onCompleted() {
                //被觀察者的onCompleted()事件會走到這里;
                Log.d("DDDDDD","結(jié)束觀察...\n");
            }

            @Override
            public void onError(Throwable e) {
                    //出現(xiàn)錯誤會調(diào)用這個方法
            }
            @Override
            public void onNext(String s) {
                //處理傳過來的onNext事件
                Log.d("DDDDD","handle this---"+s)
            }
//訂閱
switcher.subscribe(light);

以上就是一個RxJava觀察者架構(gòu)熙侍,
看到這樣的代碼不知道你會不會有一些疑惑:

  • 被觀察者中的Observable.OnSubscribe是什么,有什么用履磨?
  • call(subscriber)方法中蛉抓,subscriber哪里來的?
  • 為什么只有在訂閱之后,被觀察者才會開始發(fā)送消息剃诅?

其實芝雪,這些問題都可以通過了解OnSubscribe來解決。

那我們先來看看關(guān)于OnSubscribe的定義

//上一篇文章也提到Acton1這個接口综苔,內(nèi)部只有一個待實現(xiàn)call()方法
//沒啥特別惩系,人畜無害
public interface Action1<T> extends Action {
    void call(T t);
}
//OnSubscribe繼承了這個Action1接口
public interface OnSubscribe<T> extends Action1<Subscriber<? super T>> {
        // OnSubscribe仍然是個接口
    }

那么也就是說,OnSubscribe本質(zhì)上也是和 Action1一樣的接口如筛,只不過它專門用于Observable內(nèi)部堡牡。

而在Observable觀察者的類中,OnSubscribe是它唯一的屬性,同時也是Observable構(gòu)造函數(shù)中唯一必須傳入的參數(shù)杨刨,也就是說晤柄,只要創(chuàng)建了Observable,那么內(nèi)部也一定有一個OnSubscribe對象妖胀。

當(dāng)然芥颈,Observable是沒有辦法直接new的惠勒,我們只能通過create(),just()等等方法創(chuàng)建爬坑,當(dāng)然纠屋,這些方法背后去調(diào)用了new Observable(onSubscribe)

public class Observable<T> {
    //唯一的屬性
    final OnSubscribe<T> onSubscribe;
    //構(gòu)造函數(shù),因為protected盾计,我們只能使用create函數(shù)
    protected Observable(OnSubscribe<T> f) {
        this.onSubscribe = f;
    }
    //create(onSubscribe) 內(nèi)部調(diào)用構(gòu)造函數(shù)售担。
    public static <T> Observable<T> create(OnSubscribe<T> f) {
        return new Observable<T>(RxJavaHooks.onCreate(f));
    }
    ....
    ....
    }
    

當(dāng)創(chuàng)建了Observable和Subscribe之后,調(diào)用subscribe(subscriber)方法時署辉,發(fā)生了什么呢族铆?

    //傳入了觀察者對象
    public final Subscription subscribe(final Observer<? super T> observer) {
       ....
        //往下調(diào)用
        return subscribe(new ObserverSubscriber<T>(observer));
    }
    
    public final Subscription subscribe(Subscriber<? super T> subscriber) {
        //往下調(diào)用
        return Observable.subscribe(subscriber, this);
    }
    
    
    //調(diào)用到這個函數(shù)
 static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
        // new Subscriber so onStart it
        subscriber.onStart();

        // add a significant depth to already huge call stacks.
        try {
            // 在這里簡單講,對onSubscribe進行封裝哭尝,不必緊張哥攘。
            OnSubscribe onSubscribe=RxJavaHooks.onObservableStart(observable, observable.onSubscribe);
            
            //這個才是重點!2酿小逝淹!
            //這個調(diào)用的具體實現(xiàn)方法就是我們創(chuàng)建觀察者時
            //寫在Observable.create()中的call()呀
            //而調(diào)用了那個call(),就意味著事件開始發(fā)送了
            onSubscribe.call(subscriber);
            //不信你往回看
            
            return RxJavaHooks.onObservableReturn(subscriber);
        } catch (Throwable e) {
            ....
            ....
            }
            return Subscriptions.unsubscribed();
        }
    }

代碼看到這里,我們就可以對上面三個問題做統(tǒng)一的回答了:

  • onSubscribe是Observable內(nèi)部唯一屬性侠姑,是連接Observable和subscriber的關(guān)鍵,相當(dāng)于連接臺燈和開關(guān)的那根電線
  • call(Subscriber<? super String> subscriber)中的subscriber箩做,就是我們自己創(chuàng)建的那個觀察者
  • 只有在訂閱的時候莽红,才會發(fā)生onSubscribe.call(subscriber),進而才會開始調(diào)用onNext(),onComplete()等邦邦。

到這里安吁,你是不是對于RxJava的觀察者模式了解更加清晰了呢?我們用流程圖復(fù)習(xí)一下剛才的過程燃辖。

了解了上面這些鬼店,我們就可以更進一步做以下總結(jié):

  • 訂閱這個動作,實際上是觀察者(subscriber)對象把自己傳遞給被觀察者(observable)內(nèi)部的onSubscribe黔龟。
  • onSubscribe的工作就是調(diào)用call(subscriber)來通知被觀察者發(fā)送消息給這個subscriber妇智。

以上的結(jié)論對于下面我們理解操作符的原理十分有幫助,因此一定要看明白氏身。

觀察者模式介紹到這里巍棱,才敢說講完了。

關(guān)于操作符

上一篇文章講了一些操作符蛋欣,并且在github上放了很多其他的操作符使用范例給大家航徙,因此在這里不會介紹更多操作符的用法,而是講解操作符的實現(xiàn)原理陷虎。他是如何攔截事件到踏,然后變換處理之后杠袱,最后傳遞到觀察者手中的呢?

相信了解相關(guān)內(nèi)容的人可能會想到lift()操作符窝稿,它本來是其他操作符做變換的基礎(chǔ)楣富,不過那已經(jīng)是好幾個版本以前的事情了。但是目前版本中RxJava已經(jīng)不一樣了了讹躯,直接把lift()的工作下放到每個操作符中菩彬,把lift的弱化了(但是依然保留了lift()操作符)。

因此潮梯,我們在這里不必講解lift骗灶,直接拿一個操作符做例子,來了解它的原理即可秉馏,因為基本上操作符的實現(xiàn)原理都是一樣的耙旦。

以map()為例,依然拿之前文章里面的例子:

 Observable.just(getFilePath())
            //使用map操作來完成類型轉(zhuǎn)換
            .map(new Func1<String, Bitmap>() {
              @Override
              public Bitmap call(String s) {
                //顯然自定義的createBitmapFromPath(s)方法萝究,是一個極其耗時的操作
                  return createBitmapFromPath(s);
              }
          })
            .subscribe(
                 //創(chuàng)建觀察者免都,作為事件傳遞的終點處理事件    
                  new Subscriber<Bitmap>() {
                        @Override
                        public void onCompleted() {
                            Log.d("DDDDDD","結(jié)束觀察...\n");
                        }

                        @Override
                        public void onError(Throwable e) {
                            //出現(xiàn)錯誤會調(diào)用這個方法
                        }
                        @Override
                        public void onNext(Bitmap s) {
                            //處理事件
                            showBitmap(s)
                        }
                    );

看看map背后到底做了什么

 public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
            //創(chuàng)建了全新代理的的Observable,構(gòu)造函數(shù)傳入的參數(shù)是OnSubscribe
            //OnSubscribeMap顯然是OnSubscribe的一個實現(xiàn)類帆竹,
            //也就是說绕娘,OnSubscribeMap需要實現(xiàn)call()方法
            //構(gòu)造函數(shù)傳入了真實的Observable對象
            //和一個開發(fā)者自己實現(xiàn)的Func1的實例
        return create(new OnSubscribeMap<T, R>(this, func));
    }

看OnSubscribeMap的具體實現(xiàn):

public final class OnSubscribeMap<T, R> implements OnSubscribe<R> {
    //用于保存真實的Observable對象
    final Observable<T> source;
    //還有我們傳入的那個Func1的實例
    final Func1<? super T, ? extends R> transformer;

    public OnSubscribeMap(Observable<T> source, Func1<? super T, ? extends R> transformer) {
        this.source = source;
        this.transformer = transformer;
    }

    //實現(xiàn)了call方法,我們知道call方法傳入的Subscriber
    //就是訂閱之后栽连,外部傳入真實的的觀察者
    @Override
    public void call(final Subscriber<? super R> o) {
        //把外部傳入的真實觀察者傳入到MapSubscriber险领,構(gòu)造一個代理的觀察者
        MapSubscriber<T, R> parent = new MapSubscriber<T, R>(o, transformer);
        o.add(parent);
        //讓外部的Observable去訂閱這個代理的觀察者
        source.unsafeSubscribe(parent);
    }

    //Subscriber的子類,用于構(gòu)造一個代理的觀察者
    static final class MapSubscriber<T, R> extends Subscriber<T> {
            //這個Subscriber保存了真實的觀察者
        final Subscriber<? super R> actual;
        //我們自己在外部自己定義的Func1
        final Func1<? super T, ? extends R> mapper;

        boolean done;

        public MapSubscriber(Subscriber<? super R> actual, Func1<? super T, ? extends R> mapper) {
            this.actual = actual;
            this.mapper = mapper;
        }
        //外部的Observable發(fā)送的onNext()等事件
        //都會首先傳遞到代理觀察者這里
        @Override
        public void onNext(T t) {
            R result;

            try {
                //mapper其實就是開發(fā)者自己創(chuàng)建的Func1秒紧,
                //call()開始變換數(shù)據(jù)
                result = mapper.call(t);
            } catch (Throwable ex) {
                Exceptions.throwIfFatal(ex);
                unsubscribe();
                onError(OnErrorThrowable.addValueAsLastCause(ex, t));
                return;
            }
            //調(diào)用真實的觀察者的onNext()
            //從而在變換數(shù)據(jù)之后绢陌,把數(shù)據(jù)送到真實的觀察者手中
            actual.onNext(result);
        }
        //onError()方法也是一樣
        @Override
        public void onError(Throwable e) {
            if (done) {
                RxJavaHooks.onError(e);
                return;
            }
            done = true;

            actual.onError(e);
        }


        @Override
        public void onCompleted() {
            if (done) {
                return;
            }
            actual.onCompleted();
        }

        @Override
        public void setProducer(Producer p) {
            actual.setProducer(p);
        }
    }
}

map操作符的原理基本上就講完了,其他的操作符和map在原理上是一致的熔恢。

假如你想創(chuàng)建自定義的操作符(這其實是不建議的)脐湾,你應(yīng)該按照上面的步驟

  • 創(chuàng)建一個代理的被觀察者
  • 實現(xiàn)被觀察者中onSubscribe的call方法
  • 在call方法中創(chuàng)建一個代理的觀察者,讓真實的被觀察者訂閱它叙淌。

我知道你會有點暈秤掌,沒關(guān)系,我后面會寫一個自定義操作符放在我的github上,可以關(guān)注下鹰霍。

下面机杜,我們先通過一個流程圖鞏固一下前面學(xué)習(xí)的成果。

下次你使用操作符時衅谷,心里應(yīng)該清楚椒拗,每使用一個操作符,都會創(chuàng)建一個代理觀察者和一個代理被觀察者,以及他們之間是如何相互調(diào)用的蚀苛。相信有了這一層的理解在验,今后使用RxJava應(yīng)該會更加得心應(yīng)手。

不過最后堵未,我想給大家留一個思考題:使用一個操作符時腋舌,流程圖是這樣的,那么使用多個操作符呢渗蟹?

勘誤

暫無

后記

到這里块饺,關(guān)于RxJava的講解就基本可以告一段落了,

我相信雌芽,兩篇文章讀下來授艰,對于RxJava的理解應(yīng)該已經(jīng)到了比較高的一個層次了,我的目標(biāo)也就達(dá)到了世落。

接下來....

因為RxJava是一個事件的異步處理框架淮腾,理論上,他可以封裝任何其他的庫屉佳,那么.....

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末谷朝,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子武花,更是在濱河造成了極大的恐慌圆凰,老刑警劉巖,帶你破解...
    沈念sama閱讀 211,948評論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件体箕,死亡現(xiàn)場離奇詭異专钉,居然都是意外死亡,警方通過查閱死者的電腦和手機干旁,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,371評論 3 385
  • 文/潘曉璐 我一進店門驶沼,熙熙樓的掌柜王于貴愁眉苦臉地迎上來炮沐,“玉大人争群,你說我怎么就攤上這事〈竽辏” “怎么了换薄?”我有些...
    開封第一講書人閱讀 157,490評論 0 348
  • 文/不壞的土叔 我叫張陵,是天一觀的道長翔试。 經(jīng)常有香客問我轻要,道長,這世上最難降的妖魔是什么垦缅? 我笑而不...
    開封第一講書人閱讀 56,521評論 1 284
  • 正文 為了忘掉前任冲泥,我火速辦了婚禮,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘凡恍。我一直安慰自己志秃,他們只是感情好,可當(dāng)我...
    茶點故事閱讀 65,627評論 6 386
  • 文/花漫 我一把揭開白布嚼酝。 她就那樣靜靜地躺著浮还,像睡著了一般。 火紅的嫁衣襯著肌膚如雪闽巩。 梳的紋絲不亂的頭發(fā)上钧舌,一...
    開封第一講書人閱讀 49,842評論 1 290
  • 那天,我揣著相機與錄音涎跨,去河邊找鬼洼冻。 笑死,一個胖子當(dāng)著我的面吹牛六敬,可吹牛的內(nèi)容都是我干的碘赖。 我是一名探鬼主播,決...
    沈念sama閱讀 38,997評論 3 408
  • 文/蒼蘭香墨 我猛地睜開眼外构,長吁一口氣:“原來是場噩夢啊……” “哼普泡!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起审编,我...
    開封第一講書人閱讀 37,741評論 0 268
  • 序言:老撾萬榮一對情侶失蹤撼班,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后垒酬,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體砰嘁,經(jīng)...
    沈念sama閱讀 44,203評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,534評論 2 327
  • 正文 我和宋清朗相戀三年勘究,在試婚紗的時候發(fā)現(xiàn)自己被綠了矮湘。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 38,673評論 1 341
  • 序言:一個原本活蹦亂跳的男人離奇死亡口糕,死狀恐怖缅阳,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情景描,我是刑警寧澤十办,帶...
    沈念sama閱讀 34,339評論 4 330
  • 正文 年R本政府宣布,位于F島的核電站超棺,受9級特大地震影響向族,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜棠绘,卻給世界環(huán)境...
    茶點故事閱讀 39,955評論 3 313
  • 文/蒙蒙 一件相、第九天 我趴在偏房一處隱蔽的房頂上張望再扭。 院中可真熱鬧,春花似錦夜矗、人聲如沸霍衫。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,770評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽敦跌。三九已至,卻和暖如春逛揩,著一層夾襖步出監(jiān)牢的瞬間柠傍,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,000評論 1 266
  • 我被黑心中介騙來泰國打工辩稽, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留惧笛,地道東北人。 一個月前我還...
    沈念sama閱讀 46,394評論 2 360
  • 正文 我出身青樓逞泄,卻偏偏與公主長得像患整,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子喷众,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 43,562評論 2 349

推薦閱讀更多精彩內(nèi)容