Rxjava2

Season_zl給初學(xué)者的RxJava2.0教程

ObservableEmitter<T> emitter

1.發(fā)射器發(fā)出onComplete()或者onError()后副编,接收器將不再接收時間茅撞。
2.游可以不發(fā)送onComplete或onError。
3.最為關(guān)鍵的是onComplete和onError必須唯一并且互斥, 即不能發(fā)多個onComplete, 也不能發(fā)多個onError, 也不能先發(fā)一個onComplete, 然后再發(fā)一個onError, 反之亦然用狱。

注: 關(guān)于onComplete和onError唯一并且互斥這一點(diǎn), 是需要自行在代碼中進(jìn)行控制, 如果你的代碼邏輯中違背了這個規(guī)則, **并不一定會導(dǎo)致程序崩潰. ** 比如發(fā)送多個onComplete是可以正常運(yùn)行的, 依然是收到第一個onComplete就不再接收了, 但若是發(fā)送多個onError, 則收到第二個onError事件會導(dǎo)致程序會崩潰.

Disposable d

當(dāng)調(diào)用它的dispose()方法時, 它就會將兩根管道切斷, 從而導(dǎo)致下游收不到事件嘉抓,但上游的還會繼續(xù)發(fā)送剩余事件始绍。
在Activity中將這個Disposable保存起來, 當(dāng)Activity退出時切斷它即可访锻。多個Disposable則使用CompositeDisposable管理,CompositeDisposable.add()CompositeDisposable.clear()

總結(jié)

ObservableEmitter<T> emitteronComplete()onError()因谎,以及Disposable ddispose()都只會讓下游接收不到事件基括,但上游假如還存在事件則會繼續(xù)發(fā)送,以上的方法都可以視為階段器财岔,

subscribeOn()observeOn()
  • subscribeOn()指定的是上游發(fā)送事件的線程,observeOn()指定的是下游接收事件的線程.
  • 多次指定上游的線程只有第一次指定的有效, 也就是說多次調(diào)用subscribeOn() 只有第一次的有效, 其余的會被忽略.
  • 多次指定下游的線程是可以的, 也就是說每調(diào)用一次observeOn() , 下游的線程就會切換一次.
    Schedulers.io() 代表io操作的線程, 通常用于網(wǎng)絡(luò),讀寫文件等io密集型的操作
    Schedulers.computation() 代表CPU計算密集型的操作, 例如需要大量計算的操作
    Schedulers.newThread() 代表一個常規(guī)的新線程
    AndroidSchedulers.mainThread()  代表Android的主線程
Map操作符

對原數(shù)據(jù)進(jìn)行變化操作(其實(shí)就是一個方法风皿,接收原數(shù)據(jù)操作然后返回結(jié)果數(shù)據(jù))

FlatMap操作符(玩的熟才用,否則容易暈)

將第一次發(fā)送的數(shù)據(jù)和flatMap發(fā)送的數(shù)據(jù)進(jìn)行組合再此發(fā)送匠璧。比如第一次發(fā)送ABC桐款,第二次發(fā)送123,那么可能(因?yàn)椴槐WC順序)會出現(xiàn)A1A2A3 B1B2B3 C1C2C3 夷恍。保證順序的話用concatMap

Zip操作符

對多個發(fā)送源的數(shù)據(jù)進(jìn)行合并魔眨,每個源數(shù)據(jù)的對應(yīng)角標(biāo)的元素進(jìn)行合并,以最短發(fā)送源的為準(zhǔn)酿雪,較長發(fā)送源的剩余元素被舍棄遏暴。同一線程一定有會有一個發(fā)送源先全部發(fā)送完畢。

Flowable(默認(rèn)緩存為128個事件执虹,響應(yīng)式拉韧鼗印)

背壓策略:BackpressureStrategy(水缸)唠梨。一般的使用場景都是發(fā)送量大且異步(因?yàn)檫@兩個都可以會引起內(nèi)存溢出)

  • ERROR袋励,上游積壓超過128事件則會直接報異常
  • BUFFER, 無限制緩存大小,但是會存在OOM風(fēng)險
  • DROP, 丟棄超過128個事件的剩余事件(默認(rèn)緩存為128当叭,你發(fā)了129茬故,那么第129不會進(jìn)入水缸)。 Drop就是直接把存不下的事件丟棄
  • LATEST, Latest就是只保留最新的事件蚁鳖,當(dāng)水缸(緩存128)已經(jīng)存滿了128個事件磺芭,那么這時候還有事件進(jìn)入的話則前面的事件會被覆蓋掉。
背壓源碼解析
Flowable
//  創(chuàng)建上游的方法
   public static <T> Flowable<T> create(FlowableOnSubscribe<T> source, BackpressureStrategy mode) {
        // 檢查是否為null的工具類醉箕,不必深究
        ObjectHelper.requireNonNull(source, "source is null");
        ObjectHelper.requireNonNull(mode, "mode is null");
        //  RxJavaPlugins.onAssembly()钾腺。因?yàn)槭擎準(zhǔn)侥J剑苑祷乇旧砑タ悖@個方法就是一個包裹轉(zhuǎn)換的功能放棒,不必深究
        // FlowableCreate,這個類才是重點(diǎn)
        return RxJavaPlugins.onAssembly(new FlowableCreate<T>(source, mode));
    }
// 訂閱下游的方法
public final void subscribe(Subscriber<? super T> s) {
        // 一般我都是直接new一個Subscriber己英,所以走else塊间螟。
        if (s instanceof FlowableSubscriber) {
            subscribe((FlowableSubscriber<? super T>)s);
        } else {
            ObjectHelper.requireNonNull(s, "s is null");
            // 包裹一層
            subscribe(new StrictSubscriber<T>(s));
        }


   public final void subscribe(FlowableSubscriber<? super T> s) {
        ObjectHelper.requireNonNull(s, "s is null");
        try {
            Subscriber<? super T> z = RxJavaPlugins.onSubscribe(this, s);

            ObjectHelper.requireNonNull(z, "The RxJavaPlugins.onSubscribe hook returned a null FlowableSubscriber. Please check the handler provided to RxJavaPlugins.setOnFlowableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");
            
            subscribeActual(z);  // !!O崞啤H偕!D帷0恃妗!<忧凇仙辟!真實(shí)發(fā)起訂閱(其他代碼可不看,就看這個句)
        } catch (NullPointerException e) { // NOPMD
            throw e;
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            // can't call onError because no way to know if a Subscription has been set or not
            // can't call onSubscribe because the call might have set a Subscription already
            RxJavaPlugins.onError(e);

            NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
            npe.initCause(e);
            throw npe;
        }
    }
StrictSubscriber 下游類
public class StrictSubscriber<T>
extends AtomicInteger
implements FlowableSubscriber<T>, Subscription {

    private static final long serialVersionUID = -4945028590049415624L;

    final Subscriber<? super T> downstream;

    final AtomicThrowable error;

    final AtomicLong requested;

    final AtomicReference<Subscription> upstream;

    final AtomicBoolean once;

    volatile boolean done;

    public StrictSubscriber(Subscriber<? super T> downstream) {
        this.downstream = downstream;
        this.error = new AtomicThrowable();
        this.requested = new AtomicLong();
        this.upstream = new AtomicReference<Subscription>();
        this.once = new AtomicBoolean();
    }

    @Override
    public void request(long n) {
        if (n <= 0) {
            cancel();
            onError(new IllegalArgumentException("§3.9 violated: positive request amount required but it was " + n));
        } else {
            SubscriptionHelper.deferredRequest(upstream, requested, n);
        }
    }

    @Override
    public void cancel() {
        if (!done) {
            SubscriptionHelper.cancel(upstream);
        }
    }

    @Override
    public void onSubscribe(Subscription s) {
        if (once.compareAndSet(false, true)) {

            downstream.onSubscribe(this);

            SubscriptionHelper.deferredSetOnce(this.upstream, requested, s);
        } else {
            s.cancel();
            cancel();
            onError(new IllegalStateException("§2.12 violated: onSubscribe must be called at most once"));
        }
    }

    @Override
    public void onNext(T t) {
        HalfSerializer.onNext(downstream, t, this, error);
    }

    @Override
    public void onError(Throwable t) {
        done = true;
        HalfSerializer.onError(downstream, t, this, error);
    }

    @Override
    public void onComplete() {
        done = true;
        HalfSerializer.onComplete(downstream, this, error);
    }
}

FlowableCreate(繼承Flowable)
  // 構(gòu)造方法
   public FlowableCreate(FlowableOnSubscribe<T> source, BackpressureStrategy backpressure) {
        //  持有把上游對象
        this.source = source;
        // 持有背壓模式對象
        this.backpressure = backpressure;
    }

  // 實(shí)際訂閱鳄梅,F(xiàn)lowable的subscribe()內(nèi)部會調(diào)用這個方法叠国。
  // 當(dāng)你使用訂閱下游的時候,會把下游對象傳到這個方法戴尸。
@Override
    public void subscribeActual(Subscriber<? super T> t) {
        BaseEmitter<T> emitter;
        
        // 工廠模式粟焊,根據(jù)背壓模式實(shí)例化對應(yīng)的發(fā)射器,且會把下游對象通過發(fā)射器的構(gòu)造方法讓發(fā)射器內(nèi)部持有(所以我們在發(fā)射器才會知道下游所需的處理能力)孙蒙。
      // 背壓的核心就是這些工廠類项棠,執(zhí)行的條件不同產(chǎn)生的效果就不同
        switch (backpressure) {
        case MISSING: {
            emitter = new MissingEmitter<T>(t);
            break;
        }
        case ERROR: {
            emitter = new ErrorAsyncEmitter<T>(t);
            break;
        }
        case DROP: {
            emitter = new DropAsyncEmitter<T>(t);
            break;
        }
        case LATEST: {
            emitter = new LatestAsyncEmitter<T>(t);
            break;
        }
        default: {
            emitter = new BufferAsyncEmitter<T>(t, bufferSize());
            break;
        }
        }
        
        // 調(diào)用下游的onSubscribe,并且把發(fā)射器對象傳遞過去讓下游對象持有挎峦。(雙向傳遞香追,下游和發(fā)射器互相持有對方的對象)
        t.onSubscribe(emitter);
        try {
            // 上游持有了發(fā)射器對象
            // 使用上游對象執(zhí)行該對象的subscribe,其實(shí)就是走發(fā)射事件的邏輯
            source.subscribe(emitter);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            emitter.onError(ex);
        }
    }
BaseEmitter背壓發(fā)射器基類
abstract static class BaseEmitter<T>
    extends AtomicLong
    implements FlowableEmitter<T>, Subscription {
        private static final long serialVersionUID = 7326289992464377023L;

        final Subscriber<? super T> downstream;

        final SequentialDisposable serial;

        BaseEmitter(Subscriber<? super T> downstream) {
            // 下游對象
            this.downstream = downstream;
            // 切斷對象
            this.serial = new SequentialDisposable();
        }

        @Override
        public void onComplete() {
            complete();
        }

        protected void complete() {
            // 如果已經(jīng)切斷了就跳過坦胶,所以下游不會收到onComplete()事件
            if (isCancelled()) {
                return;
            }
            try {
                // 回調(diào)下游的onComplete()事件
                downstream.onComplete();
            } finally {
                // 切斷
                serial.dispose();
            }
        }

        @Override
        public final void onError(Throwable e) {
            if (!tryOnError(e)) {
                // 已經(jīng)切斷透典,如果接著發(fā)送onError內(nèi)部會拋異常
                RxJavaPlugins.onError(e);
            }
        }

        @Override
        public boolean tryOnError(Throwable e) {
            return error(e);
        }

        protected boolean error(Throwable e) {
            // 判斷開發(fā)者傳遞的異常是否為null
            if (e == null) {
                e = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
            }
         
            if (isCancelled()) {
                return false;
            }
            try {
                // 回調(diào)下游的方法
                downstream.onError(e);
            } finally {
               // 切斷
                serial.dispose();
            }
            return true;
        }

        @Override
        public final void cancel() {
            // 切斷
            serial.dispose();
            onUnsubscribed();
        }
        
       // 注銷訂閱,空實(shí)現(xiàn)
        void onUnsubscribed() {
            // default is no-op
        }

        @Override
        public final boolean isCancelled() {
            return serial.isDisposed();
        }

        @Override
        public final void request(long n) {
            if (SubscriptionHelper.validate(n)) {
                // 將下游請求的事件數(shù)存放
                BackpressureHelper.add(this, n);
                onRequested();
            }
        }

        void onRequested() {
            // default is no-op
        }

        @Override
        public final void setDisposable(Disposable d) {
            serial.update(d);
        }

        @Override
        public final void setCancellable(Cancellable c) {
            setDisposable(new CancellableDisposable(c));
        }

        @Override
        public final long requested() {
            return get();
        }

        @Override
        public final FlowableEmitter<T> serialize() {
            return new SerializedEmitter<T>(this);
        }

        @Override
        public String toString() {
            return String.format("%s{%s}", getClass().getSimpleName(), super.toString());
        }
    }
ErrorAsyncEmitter背壓發(fā)射器(繼承了NoOverflowBaseAsyncEmitter)

 static final class ErrorAsyncEmitter<T> extends NoOverflowBaseAsyncEmitter<T> {

        private static final long serialVersionUID = 338953216916120960L;

        ErrorAsyncEmitter(Subscriber<? super T> downstream) {
            super(downstream);
        }

        @Override
        void onOverflow() {
            // 回調(diào)下游的onError()顿苇,直接拋出異常
            onError(new MissingBackpressureException("create: could not emit value due to lack of requests"));
        }
    }



   abstract static class NoOverflowBaseAsyncEmitter<T> extends BaseEmitter<T> {

        private static final long serialVersionUID = 4127754106204442833L;

        NoOverflowBaseAsyncEmitter(Subscriber<? super T> downstream) {
            super(downstream);
        }

        @Override
        public final void onNext(T t) {
            if (isCancelled()) {
                return;
            }

            if (t == null) {
                onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                return;
            }
            
            if (get() != 0) {  // 下游所需事件不為0峭咒,就是下游還有處理的事件
                downstream.onNext(t);
                BackpressureHelper.produced(this, 1);
            } else {
                // 調(diào)用子類重寫的方法
                onOverflow();
            }
        }
        
        // 子類重寫
        abstract void onOverflow();
    }

    // BackpressureHelper的方法
    public static long produced(AtomicLong requested, long n) {
        for (;;) {
            long current = requested.get();
            if (current == Long.MAX_VALUE) {
                return Long.MAX_VALUE;
            }
            // 下游所需事件 - 1
            long update = current - n;
            if (update < 0L) {
                RxJavaPlugins.onError(new IllegalStateException("More produced than requested: " + update));
                update = 0L;
            }
            // 重置所需事件數(shù)
            if (requested.compareAndSet(current, update)) {
                return update;
            }
        }
    }
上游,下游纪岁,發(fā)射器關(guān)系丑圖
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末凑队,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子幔翰,更是在濱河造成了極大的恐慌漩氨,老刑警劉巖,帶你破解...
    沈念sama閱讀 206,214評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件遗增,死亡現(xiàn)場離奇詭異叫惊,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)贡定,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,307評論 2 382
  • 文/潘曉璐 我一進(jìn)店門赋访,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人,你說我怎么就攤上這事蚓耽∏” “怎么了?”我有些...
    開封第一講書人閱讀 152,543評論 0 341
  • 文/不壞的土叔 我叫張陵步悠,是天一觀的道長签杈。 經(jīng)常有香客問我,道長鼎兽,這世上最難降的妖魔是什么答姥? 我笑而不...
    開封第一講書人閱讀 55,221評論 1 279
  • 正文 為了忘掉前任,我火速辦了婚禮谚咬,結(jié)果婚禮上鹦付,老公的妹妹穿的比我還像新娘。我一直安慰自己择卦,他們只是感情好敲长,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,224評論 5 371
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著秉继,像睡著了一般祈噪。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上尚辑,一...
    開封第一講書人閱讀 49,007評論 1 284
  • 那天辑鲤,我揣著相機(jī)與錄音,去河邊找鬼杠茬。 笑死月褥,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的澈蝙。 我是一名探鬼主播吓坚,決...
    沈念sama閱讀 38,313評論 3 399
  • 文/蒼蘭香墨 我猛地睜開眼撵幽,長吁一口氣:“原來是場噩夢啊……” “哼灯荧!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起盐杂,我...
    開封第一講書人閱讀 36,956評論 0 259
  • 序言:老撾萬榮一對情侶失蹤逗载,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后链烈,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體厉斟,經(jīng)...
    沈念sama閱讀 43,441評論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 35,925評論 2 323
  • 正文 我和宋清朗相戀三年强衡,在試婚紗的時候發(fā)現(xiàn)自己被綠了擦秽。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 38,018評論 1 333
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖感挥,靈堂內(nèi)的尸體忽然破棺而出缩搅,到底是詐尸還是另有隱情,我是刑警寧澤触幼,帶...
    沈念sama閱讀 33,685評論 4 322
  • 正文 年R本政府宣布硼瓣,位于F島的核電站,受9級特大地震影響置谦,放射性物質(zhì)發(fā)生泄漏堂鲤。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,234評論 3 307
  • 文/蒙蒙 一媒峡、第九天 我趴在偏房一處隱蔽的房頂上張望瘟栖。 院中可真熱鬧,春花似錦谅阿、人聲如沸慢宗。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,240評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽镜沽。三九已至,卻和暖如春贱田,著一層夾襖步出監(jiān)牢的瞬間缅茉,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,464評論 1 261
  • 我被黑心中介騙來泰國打工男摧, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留蔬墩,地道東北人。 一個月前我還...
    沈念sama閱讀 45,467評論 2 352
  • 正文 我出身青樓耗拓,卻偏偏與公主長得像拇颅,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子乔询,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,762評論 2 345