RxJava2 源碼淺析

RxJava2 源碼淺析

ReactiveX

歷史:
ReactiveX是Reactive Extensions的縮寫,一般簡寫為Rx棘劣,最初是LINQ的一個擴展昆庇,由微軟的架構師Erik Meijer領導的團隊開發(fā)扎阶,在2012年11月開源,Rx是一個編程模型蜓洪,目標是提供一致的編程接口,幫助開發(fā)者更方便的處理異步數(shù)據(jù)流坯苹,Rx庫支持.NET隆檀、JavaScript和C++,Rx近幾年越來越流行了,現(xiàn)在已經(jīng)支持幾乎全部的流行編程語言了恐仑,Rx的大部分語言庫由ReactiveX這個組織負責維護泉坐,比較流行的有RxJava/RxJS/Rx.NET,社區(qū)網(wǎng)站是 reactivex.io

定義:
ReactiveX.io給的定義是裳仆,Rx是一個使用可觀察數(shù)據(jù)流進行異步編程的編程接口坚冀,ReactiveX結合了觀察者模式、迭代器模式和函數(shù)式編程的精華鉴逞。
ReactiveX不僅僅是一個編程接口记某,它是一種編程思想的突破,它影響了許多其它的程序庫和框架以及編程語言构捡。

filter操作符

這就是數(shù)據(jù)流液南?

RxJava2定義

a library for composing asynchronous and event-based programs by using observable sequences.(一個在 Java VM 上使用可觀測的序列來組成異步的、基于事件的程序的庫)

大致流程

上代碼:

Observable.
            create(new ObservableOnSubscribe<String>() {
                @Override
                public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                    emitter.onNext("test First");
                    Log.e("TAG", "subScribe test First");
                    emitter.onNext("test Second");
                    Log.e("TAG", "subScribe test Second");
                    emitter.onComplete();
                    Log.e("TAG", "subScribe onComplete");
                }
            }).
            subscribe(new Observer<String>() {
                @Override
                public void onSubscribe(Disposable d) {
                    Log.e("TAG", "onSubscribe");
                    mDisposable = d;
                }

                @Override
                public void onNext(String s) {
                    Log.e("TAG", "onNext");
                    Log.e("TAG", s);
                    if (s.equals("test First")) {
                        mDisposable.dispose();
                    }
                }

                @Override
                public void onError(Throwable e) {
                    Log.e("TAG", "onError");
                }

                @Override
                public void onComplete() {
                    Log.e("TAG", "onComplete");
                }
            });

剛開始學習RxJava 時勾徽,這段代碼給我最直觀的感受就是滑凉,這不就是自己調(diào)用自己嗎。ObservableEmitter<String> emitter 這個就是下面的subscribe(new Observer<String>())喘帚。對吧畅姊,我覺得大家應該都是這樣的感受吧...

追蹤一下源碼:點擊create()方法進去看一下:

@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
    ObjectHelper.requireNonNull(source, "source is null");
    return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}


public static <T> Observable<T> onAssembly(Observable<T> source) {
    Function<Observable, Observable> f = onObservableAssembly;
    if (f != null) {
        return apply(f, source);
    }
    return source;
}
看了代碼onObservableAssembly為null,所以create方法之后這個對象被包裝成new ObservableCreate<T>(source),source是外面?zhèn)鬟M來的吹由。

關鍵字:io.reactivex.internal.operators.observable.ObservableCreate

繼續(xù)看下一個操作符:subscribe()若未,點進去看一下

@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final void subscribe(Observer<? super T> observer) {
    ObjectHelper.requireNonNull(observer, "observer is null");
    try {
        observer = RxJavaPlugins.onSubscribe(this, observer);//observer原樣返回,沒改動

        ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");

        subscribeActual(observer);//關鍵點
    } catch (NullPointerException e) { // NOPMD
        throw e;
    } catch (Throwable e) {
        Exceptions.throwIfFatal(e);
        // can't call onError because no way to know if a Disposable has been set or not
        // can't call onSubscribe because the call might have set a Subscription already
        RxJavaPlugins.onError(e);

        NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
        npe.initCause(e);
        throw npe;
    }
}
// RxJavaPlugins.onSubscribe

  public static <T> Observer<? super T> onSubscribe(Observable<T> source, Observer<? super T> observer) {
    BiFunction<Observable, Observer, Observer> f = onObservableSubscribe;   //f 為null
    if (f != null) {
        return apply(f, source, observer);
    }
    return observer;
}

上面說過了經(jīng)過create()方法或這個對象已經(jīng)是ObservableCreate了倾鲫,那么最終會調(diào)用的就是subscribeActual(observer) 看一下ObservableCreate這個類的代碼:

@Override
protected void subscribeActual(Observer<? super T> observer) {
    CreateEmitter<T> parent = new CreateEmitter<T>(observer);
    observer.onSubscribe(parent);

    try {
        source.subscribe(parent);
    } catch (Throwable ex) {
        Exceptions.throwIfFatal(ex);
        parent.onError(ex);
    }
}


static final class CreateEmitter<T>
extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable {
CreateEmitter(Observer<? super T> observer) {
        this.observer = observer;
    }

    @Override
    public void onNext(T t) {
        if (t == null) {
            onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
            return;
        }
        if (!isDisposed()) {
            observer.onNext(t);
        }
    }
}

看一下粗合,這里會把observer包裝成一個CreateEmitter對象,然后source是Observable.create(new ObservableOnSubscribe<String>())傳進來的ObservableOnSubscribe對象乌昔。然后會調(diào)用observer.onSubscribe(parent);source.subscribe(parent);終于清晰了...可以回答上面的問題了隙疚,其實Observer和ObservableEmitter可以看成是一個對象,只是對observer做了個包裝...

Scheduler 線程變換(subscribeOn 和 observeOn)

說到線程變換即線程間通信磕道,因為我是學Android供屉,所以第一印象就是Handler,然后就是Future溺蕉×尕ぃ看了源碼后發(fā)現(xiàn)RxJava用的是Future,ScheduledExecutorService焙贷,Runnable撵割,二AndroidScheduler就是用Handler的,因為需要切換到Android中的UI線程辙芍。

subscribeOn(Schedulers.newThread())

點進去看一下:

public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;

public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
    super(source);
    this.scheduler = scheduler;
}

@Override
public void subscribeActual(final Observer<? super T> s) {
    final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);

    s.onSubscribe(parent);

    parent.setDisposable(scheduler.scheduleDirect(new Runnable() {
        @Override
        public void run() {
            source.subscribe(parent);
        }
    }));
}
}

傳進來的是Schedulers.newThread()啡彬,點擊Schedulers.newThread() 點進去
發(fā)現(xiàn)最終返回的是NewThreadScheduler

關鍵字:io.reactivex.internal.schedulers.NewThreadScheduler
io.reactivex.internal.schedulers.NewThreadWorker(真正做線程調(diào)度的類)

發(fā)現(xiàn)有scheduler.scheduleDirect(new Runnable())點擊進去羹与,最終調(diào)用
public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) {
final Worker w = createWorker();

    final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

    w.schedule(new Runnable() {
        @Override
        public void run() {
            try {
                decoratedRun.run();
            } finally {
                w.dispose();
            }
        }
    }, delay, unit);

    return w;
}
最終還是w.schedule(new Runnable()),w就是NewThreadWorker,找到這個類看一下schedule方法,最終會調(diào)用:


public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, TimeUnit unit, DisposableContainer parent) {
    Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

    ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);

    if (parent != null) {
        if (!parent.add(sr)) {
            return sr;
        }
    }

    Future<?> f;
    try {
        if (delayTime <= 0) {
            f = executor.submit((Callable<Object>)sr);
        } else {
            f = executor.schedule((Callable<Object>)sr, delayTime, unit);
        }
        sr.setFuture(f);
    } catch (RejectedExecutionException ex) {
        parent.remove(sr);
        RxJavaPlugins.onError(ex);
    }

    return sr;
}

看到future和executor了庶灿,這里就是線程切換

observeOn(AndroidSchedulers.mainThread())
ObservableObserveOn 最終調(diào)用的是HandlerScheduler和HandlerWorker

HandlerWorker:

@Override
    public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
        if (run == null) throw new NullPointerException("run == null");
        if (delay < 0) throw new IllegalArgumentException("delay < 0: " + delay);
        if (unit == null) throw new NullPointerException("unit == null");

        if (disposed) {
            return Disposables.disposed();
        }

        run = RxJavaPlugins.onSchedule(run);

        ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);

        Message message = Message.obtain(handler, scheduled);
        message.obj = this; // Used as token for batch disposal of this worker's runnables.

        handler.sendMessageDelayed(message, unit.toMillis(delay));

        // Re-check disposed state for removing in case we were racing a call to dispose().
        if (disposed) {
            handler.removeCallbacks(scheduled);
            return Disposables.disposed();
        }

        return scheduled;
    }

發(fā)現(xiàn)是用Handler來做線程切換纵搁,Handler管理的Looper是Looper.getMainLooper(),所以把消息發(fā)送到了主線程往踢。

?著作權歸作者所有,轉載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末腾誉,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子峻呕,更是在濱河造成了極大的恐慌利职,老刑警劉巖,帶你破解...
    沈念sama閱讀 221,635評論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件瘦癌,死亡現(xiàn)場離奇詭異猪贪,居然都是意外死亡,警方通過查閱死者的電腦和手機讯私,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,543評論 3 399
  • 文/潘曉璐 我一進店門热押,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人斤寇,你說我怎么就攤上這事桶癣。” “怎么了娘锁?”我有些...
    開封第一講書人閱讀 168,083評論 0 360
  • 文/不壞的土叔 我叫張陵牙寞,是天一觀的道長。 經(jīng)常有香客問我致盟,道長碎税,這世上最難降的妖魔是什么尤慰? 我笑而不...
    開封第一講書人閱讀 59,640評論 1 296
  • 正文 為了忘掉前任馏锡,我火速辦了婚禮,結果婚禮上伟端,老公的妹妹穿的比我還像新娘杯道。我一直安慰自己,他們只是感情好责蝠,可當我...
    茶點故事閱讀 68,640評論 6 397
  • 文/花漫 我一把揭開白布党巾。 她就那樣靜靜地躺著,像睡著了一般霜医。 火紅的嫁衣襯著肌膚如雪齿拂。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 52,262評論 1 308
  • 那天肴敛,我揣著相機與錄音署海,去河邊找鬼吗购。 笑死,一個胖子當著我的面吹牛砸狞,可吹牛的內(nèi)容都是我干的捻勉。 我是一名探鬼主播,決...
    沈念sama閱讀 40,833評論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼刀森,長吁一口氣:“原來是場噩夢啊……” “哼踱启!你這毒婦竟也來了?” 一聲冷哼從身側響起研底,我...
    開封第一講書人閱讀 39,736評論 0 276
  • 序言:老撾萬榮一對情侶失蹤埠偿,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后榜晦,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體胚想,經(jīng)...
    沈念sama閱讀 46,280評論 1 319
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,369評論 3 340
  • 正文 我和宋清朗相戀三年芽隆,在試婚紗的時候發(fā)現(xiàn)自己被綠了浊服。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 40,503評論 1 352
  • 序言:一個原本活蹦亂跳的男人離奇死亡胚吁,死狀恐怖牙躺,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情腕扶,我是刑警寧澤孽拷,帶...
    沈念sama閱讀 36,185評論 5 350
  • 正文 年R本政府宣布,位于F島的核電站半抱,受9級特大地震影響脓恕,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜窿侈,卻給世界環(huán)境...
    茶點故事閱讀 41,870評論 3 333
  • 文/蒙蒙 一炼幔、第九天 我趴在偏房一處隱蔽的房頂上張望首繁。 院中可真熱鬧卷胯,春花似錦、人聲如沸藤肢。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,340評論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至殉农,卻和暖如春刀脏,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背超凳。 一陣腳步聲響...
    開封第一講書人閱讀 33,460評論 1 272
  • 我被黑心中介騙來泰國打工愈污, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留危队,地道東北人。 一個月前我還...
    沈念sama閱讀 48,909評論 3 376
  • 正文 我出身青樓钙畔,卻偏偏與公主長得像茫陆,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子擎析,可洞房花燭夜當晚...
    茶點故事閱讀 45,512評論 2 359

推薦閱讀更多精彩內(nèi)容

  • 引入依賴: implementation 'io.reactivex.rxjava2:rxandroid:2.0....
    為夢想戰(zhàn)斗閱讀 1,306評論 0 0
  • 我從去年開始使用 RxJava 簿盅,到現(xiàn)在一年多了。今年加入了 Flipboard 后揍魂,看到 Flipboard 的...
    Jason_andy閱讀 5,492評論 7 62
  • 在正文開始之前的最后桨醋,放上GitHub鏈接和引入依賴的gradle代碼: Github: https://gith...
    蘇蘇說zz閱讀 679評論 0 2
  • 前言我從去年開始使用 RxJava ,到現(xiàn)在一年多了现斋。今年加入了 Flipboard 后喜最,看到 Flipboard...
    占導zqq閱讀 9,164評論 6 151
  • 最近項目里面有用到Rxjava框架瞬内,感覺很強大的巨作,所以在網(wǎng)上搜了很多相關文章限书,發(fā)現(xiàn)一片文章很不錯虫蝶,今天把這篇文...
    Scus閱讀 6,881評論 2 50