RxJava

RxJava概述

  • RxJava 是一種響應式編程,來創(chuàng)建基于事件的異步操作庫∽樘猓基于事件流的鏈式調(diào)用橱脸、邏輯清晰簡潔础米。
  • RxJava 我的理解是將事件從起點(上游)流向終點(下游),中間有很多卡片對數(shù)據(jù)進操作并傳遞添诉,每個卡片獲取上一個卡片傳遞下來的結(jié)果然后對事件進行處理然后將結(jié)果傳遞給下一個卡片屁桑,這樣事件就從起點通過卡片一次次傳遞直到流向終點。

RxJava觀察者模式

  • 傳統(tǒng)觀察者是一個被觀察者多過觀察者栏赴,當被觀察者發(fā)生改變時候及時通知所有觀察者
  • RXjava是一個觀察者多個被觀察者蘑斧,被觀察者像鏈條一樣串起來,數(shù)據(jù)在被觀察者之間朝著一個方向傳遞须眷,直到傳遞給觀察者 竖瘾。

RxJava原理理解

  • 被觀察者通過訂閱將事件按順序依次傳遞給觀察者,


    image.png
//RxAndroid中包含RxJava的內(nèi)容花颗,只引入RxAndroid還是會報錯
dependencies {
    ......
    compile 'io.reactivex.rxjava2:rxjava:2.1.3'
    compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
}
image.png

創(chuàng)建Observer(觀察者)

        Observer<Integer> observer = new Observer<Integer>() {
 
            // 觀察者接收事件前  捕传,當 Observable 被訂閱時,觀察者onSubscribe方法會自動被調(diào)用 
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "開始采用subscribe連接");
            }

            // 當被觀察者生產(chǎn)Next事件 
            @Override
            public void onNext(Integer value) {
                Log.d(TAG, "對Next事件作出響應" + value);
            }

            // 當被觀察者生產(chǎn)Error事件 
            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "對Error事件作出響應");
            }

            // 當被觀察者生產(chǎn)Complete事件 
            @Override
            public void onComplete() {
                Log.d(TAG, "對Complete事件作出響應");
            }
        };
       //Subscriber類 = RxJava 內(nèi)置的一個實現(xiàn)了 Observer 的抽象類扩劝,對 Observer 接口進行了擴展 
       Subscriber<Integer> subscriber = new Subscriber<Integer>() {

           // 觀察者接收事件前 庸论,當 Observable 被訂閱時,觀察者onSubscribe方法會自動被調(diào)用 
            @Override
            public void onSubscribe(Disposable d) { 
                Log.d(TAG, "開始采用subscribe連接");
            }

            // 當被觀察者生產(chǎn)Next事件 
            @Override
            public void onNext(Integer value) {
                Log.d(TAG, "對Next事件作出響應" + value);
            }

            // 當被觀察者生產(chǎn)Error事件 
            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "對Error事件作出響應");
            }

            // 當被觀察者生產(chǎn)Complete事件 
            @Override
            public void onComplete() {
                Log.d(TAG, "對Complete事件作出響應");
            }
        };

Subscriber 抽象類與Observer 接口的區(qū)別

  • 二者基本使用方式一致(在RxJava的subscribe過程中棒呛,Observer會先被轉(zhuǎn)換成Subscriber再使用)
  • Subscriber抽象類對 Observer 接口進行了擴展聂示,新增了兩個方法:
    1. onStart():在還未響應事件前調(diào)用,用于做一些初始化工作簇秒,他是在subscribe 所在的線程調(diào)用鱼喉,不能切換線程,所以不能進行界面UI更新比如彈框這些。
    2. unsubscribe():用于取消訂閱蒲凶。在該方法被調(diào)用后气筋,觀察者將不再接收響應事件,比如在onStop方法中可以調(diào)用此方法結(jié)束訂閱旋圆。調(diào)用該方法前宠默,先使用 isUnsubscribed() 判斷狀態(tài),確定被觀察者Observable是否還持有觀察者Subscriber的引用灵巧。

創(chuàng)建 Observable (被觀察者)

        Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                // 通過 ObservableEmitter類對象產(chǎn)生事件并通知觀察者
                // ObservableEmitter:定義需要發(fā)送的事件 & 向觀察者發(fā)送事件
                     
                emitter.onNext(1);
                emitter.onComplete();
            }
        });

RxJava 提供了其他方法用于 創(chuàng)建被觀察者對象Observable

// 方法1:just(T...):直接將傳入的參數(shù)依次發(fā)送出來
  Observable observable = Observable.just("A", "B", "C");
  // 將會依次調(diào)用:
  // onNext("A");
  // onNext("B");
  // onNext("C");
  // onCompleted();

// 方法2:fromArray(T[]) / from(Iterable<? extends T>) : 將傳入的數(shù)組 / Iterable 拆分成具體對象后搀矫,依次發(fā)送出來
  String[] words = {"A", "B", "C"};
  Observable observable = Observable.fromArray(words);
  // 將會依次調(diào)用:
  // onNext("A");
  // onNext("B");
  // onNext("C");
  // onCompleted();

以上兩種方法創(chuàng)建出來的觀察者都是繼承Observable,比如ObservableCreate刻肄、ObservableFromArray瓤球、ObservableMap...,

public interface ObservableSource<T> {
    void subscribe(@NonNull Observer<? super T> observer);
}

--------------------------------------------------------------------------------------------------------

public abstract class Observable<T> implements ObservableSource<T> {

   ...

    protected abstract void subscribeActual(Observer<? super T> observer);
 
    @Override
    public final void subscribe(Observer<? super T> observer) {
     ...
        try {
            ...
            subscribeActual(observer);
        }  catch (Throwable e) {
           ...
        }
    }
}

public final class ObservableCreate<T> extends Observable<T> {

   final ObservableOnSubscribe<T> source;

   @Override
    protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        observer.onSubscribe(parent);
        try {
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }

   static final class CreateEmitter<T> extends AtomicReference<Disposable> 
 implements ObservableEmitter<T>, Disposable {
  
 ...

        @Override
        public void onNext(T t) {
            if (t == null) {
                return;
            }
            if (!isDisposed()) {
                observer.onNext(t);
            }
        }

        @Override
        public void onError(Throwable t) {
            if (!tryOnError(t)) {
                RxJavaPlugins.onError(t);
            }
        }

        @Override
        public void onComplete() {
            if (!isDisposed()) {
                try {
                    observer.onComplete();
                } finally {
                    dispose();
                }
            }
        }

        @Override
        public void dispose() {
            DisposableHelper.dispose(this);
        }

        ...
    }
}

public final class ObservableFromArray<T> extends Observable<T> {

    final T[] array;
   
    @Override
    public void subscribeActual(Observer<? super T> s) {
        FromArrayDisposable<T> d = new FromArrayDisposable<T>(s, array);

        s.onSubscribe(d);

        if (d.fusionMode) {
            return;
        }

        d.run();
    }

    static final class FromArrayDisposable<T> extends BasicQueueDisposable<T> {

        final Observer<? super T> actual; //對應觀察者

        final T[] array;
        
        ...

        @Override
        public void dispose() {
            disposed = true;
        }

        @Override
        public boolean isDisposed() {
            return disposed;
        }

        void run() {
            T[] a = array;
            int n = a.length;

            for (int i = 0; i < n && !isDisposed(); i++) {
                T value = a[i];
                if (value == null) {
                    actual.onError(new NullPointerException("The " + i + "th element is null"));
                    return;
                }
                actual.onNext(value);
            }
            if (!isDisposed()) {
                actual.onComplete();
            }
        }
    }
}

public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
    final Function<? super T, ? extends U> function;

    ... 

    @Override
    public void subscribeActual(Observer<? super U> t) {
        source.subscribe(new MapObserver<T, U>(t, function));
    }


    static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
        final Function<? super T, ? extends U> mapper;

        MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
            super(actual);
            this.mapper = mapper;
        }

        @Override
        public void onNext(T t) {
            if (done) {
                return;
            }

            if (sourceMode != NONE) {
                actual.onNext(null);
                return;
            }

            U v;

            try {
                v = ObjectHelper.requireNonNull(mapper.apply(t), 
                      "The mapper function returned a null value.");
            } catch (Throwable ex) {
                fail(ex);
                return;
            }
            actual.onNext(v);
        }

        @Override
        public int requestFusion(int mode) {
            return transitiveBoundaryFusion(mode);
        }

        @Nullable
        @Override
        public U poll() throws Exception {
            T t = qs.poll();
            return t != null ? ObjectHelper.<U>requireNonNull(mapper.apply(t),
                      "The mapper function returned a null value.") : null;
        }
    }
}

public final class ObservableJust<T> extends Observable<T> implements ScalarCallable<T> {

    private final T value;
    public ObservableJust(final T value) {
        this.value = value;
    }

    @Override
    protected void subscribeActual(Observer<? super T> s) {
        ScalarDisposable<T> sd = new ScalarDisposable<T>(s, value);
        s.onSubscribe(sd);
        sd.run();
    }

    @Override
    public T call() {
        return value;
    }
}

---------------------------------------------------------------------------------------------

 public static final class ScalarDisposable<T>
    extends AtomicInteger
    implements QueueDisposable<T>, Runnable {

        final Observer<? super T> observer;

        final T value;
  
        @Override
        public void dispose() {
            set(ON_COMPLETE);
        }

       ....
         
        @Override
        public void run() {
            if (get() == START && compareAndSet(START, ON_NEXT)) {
                observer.onNext(value);
                if (get() == ON_NEXT) {
                    lazySet(ON_COMPLETE);
                    observer.onComplete();
                }
            }
        }
    }

觀察者和被觀察者通過subscribe訂閱敏弃,訂閱完成后被觀察者就可以像觀察者發(fā)送數(shù)據(jù)

 
        Observable.create(new ObservableOnSubscribe<Integer>() {
       
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
                emitter.onComplete();
            }
        }).subscribe(new Observer<Integer>() {
   
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "開始采用subscribe連接");
            }
 
            @Override
            public void onNext(Integer value) {
                Log.d(TAG, "對Next事件"+ value +"作出響應"  );
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "對Error事件作出響應");
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "對Complete事件作出響應");
            }

        });
    }
}

image.png

鏈式調(diào)用


image.png
     Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> e) throws Exception {
            }
        }).map(new Function<String, String>() {

            @Override
            public String apply(@NonNull String s) throws Exception {
                return null;
            }
        }).map(new Function<String, String>() {

            @Override
            public String apply(@NonNull String s) throws Exception {
                return null;
            }
        }).subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                disposable = d;
            }

            @Override
            public void onNext(String s) {
            }

            @Override
            public void onError(Throwable e) {
            }

            @Override
            public void onComplete() {
            }
        });

這個訂閱的過程就如同洋蔥一樣一層層封裝卦羡,當訂閱完成后就像剝洋蔥一樣一層層剝,用發(fā)射器發(fā)送數(shù)據(jù)麦到,用onNext方法一層層發(fā)送绿饵,發(fā)送給每一層的時候就回調(diào)每一層的Function類apply方法,這個方法由開發(fā)者實現(xiàn)瓶颠,該方法處理數(shù)據(jù)后就返回處理后的數(shù)據(jù)拟赊,然后數(shù)據(jù)又往下一層傳遞,直到傳遞到觀察者手里粹淋,然后觀察者接收數(shù)據(jù)


image.png
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末吸祟,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子桃移,更是在濱河造成了極大的恐慌屋匕,老刑警劉巖,帶你破解...
    沈念sama閱讀 222,104評論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件借杰,死亡現(xiàn)場離奇詭異炒瘟,居然都是意外死亡,警方通過查閱死者的電腦和手機第步,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,816評論 3 399
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來缘琅,“玉大人粘都,你說我怎么就攤上這事∷⑴郏” “怎么了翩隧?”我有些...
    開封第一講書人閱讀 168,697評論 0 360
  • 文/不壞的土叔 我叫張陵,是天一觀的道長呻纹。 經(jīng)常有香客問我堆生,道長专缠,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 59,836評論 1 298
  • 正文 為了忘掉前任淑仆,我火速辦了婚禮涝婉,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘蔗怠。我一直安慰自己墩弯,他們只是感情好,可當我...
    茶點故事閱讀 68,851評論 6 397
  • 文/花漫 我一把揭開白布寞射。 她就那樣靜靜地躺著渔工,像睡著了一般。 火紅的嫁衣襯著肌膚如雪桥温。 梳的紋絲不亂的頭發(fā)上引矩,一...
    開封第一講書人閱讀 52,441評論 1 310
  • 那天,我揣著相機與錄音侵浸,去河邊找鬼旺韭。 笑死,一個胖子當著我的面吹牛通惫,可吹牛的內(nèi)容都是我干的茂翔。 我是一名探鬼主播,決...
    沈念sama閱讀 40,992評論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼履腋,長吁一口氣:“原來是場噩夢啊……” “哼珊燎!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起遵湖,我...
    開封第一講書人閱讀 39,899評論 0 276
  • 序言:老撾萬榮一對情侶失蹤悔政,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后延旧,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體谋国,經(jīng)...
    沈念sama閱讀 46,457評論 1 318
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,529評論 3 341
  • 正文 我和宋清朗相戀三年迁沫,在試婚紗的時候發(fā)現(xiàn)自己被綠了芦瘾。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 40,664評論 1 352
  • 序言:一個原本活蹦亂跳的男人離奇死亡集畅,死狀恐怖近弟,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情挺智,我是刑警寧澤祷愉,帶...
    沈念sama閱讀 36,346評論 5 350
  • 正文 年R本政府宣布,位于F島的核電站,受9級特大地震影響二鳄,放射性物質(zhì)發(fā)生泄漏赴涵。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 42,025評論 3 334
  • 文/蒙蒙 一订讼、第九天 我趴在偏房一處隱蔽的房頂上張望髓窜。 院中可真熱鬧,春花似錦躯嫉、人聲如沸纱烘。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,511評論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽擂啥。三九已至,卻和暖如春帆阳,著一層夾襖步出監(jiān)牢的瞬間哺壶,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,611評論 1 272
  • 我被黑心中介騙來泰國打工蜒谤, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留山宾,地道東北人。 一個月前我還...
    沈念sama閱讀 49,081評論 3 377
  • 正文 我出身青樓鳍徽,卻偏偏與公主長得像资锰,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子阶祭,可洞房花燭夜當晚...
    茶點故事閱讀 45,675評論 2 359

推薦閱讀更多精彩內(nèi)容

  • 前言 我從去年開始使用 RxJava ,到現(xiàn)在一年多了瑰剃。今年加入了 Flipboard 后齿诉,看到 Flipboar...
    Kepler_II閱讀 1,335評論 0 3
  • 最近項目里面有用到Rxjava框架粤剧,感覺很強大的巨作,所以在網(wǎng)上搜了很多相關(guān)文章挥唠,發(fā)現(xiàn)一片文章很不錯俊扳,今天把這篇文...
    Scus閱讀 6,884評論 2 50
  • 我從去年開始使用 RxJava ,到現(xiàn)在一年多了猛遍。今年加入了 Flipboard 后,看到 Flipboard 的...
    huqj閱讀 1,862評論 0 21
  • 其它文章 RxJava操作符大全 1、RxJava之一——一次性學會使用RxJava RxJava簡單的使用和使用...
    沐左閱讀 428評論 0 0
  • 16宿命:用概率思維提高你的勝算 以前的我是風險厭惡者懊烤,不喜歡去冒險梯醒,但是人生放棄了冒險,也就放棄了無數(shù)的可能腌紧。 ...
    yichen大刀閱讀 6,059評論 0 4