RxJava2.0---創(chuàng)建被觀察者

RxJava2.0的簡(jiǎn)單使用
RxJava2.0---創(chuàng)建被觀察者
基于RxJava的事件總線RxBus

在RxJava中喉钢,我們可以創(chuàng)建出各種各樣的被觀察者(數(shù)據(jù)源),也對(duì)應(yīng)各種各樣的創(chuàng)建方法酷窥,下面就一一來(lái)解釋一下语御。能力有限,如有錯(cuò)誤請(qǐng)指出宽堆,謝謝郑原。

Create

使用Create操作符從頭開始創(chuàng)建一個(gè)Observable唉韭,給這個(gè)操作符傳遞一個(gè)接受觀察者作為參數(shù)的函數(shù),編寫這個(gè)函數(shù)讓它的行為表現(xiàn)為一個(gè)Observable--恰當(dāng)?shù)恼{(diào)用觀察者的onNext犯犁,onError和onCompleted方法属愤。這里只講非背壓的,背壓的也是類型創(chuàng)建方法酸役,也可以點(diǎn)擊這里進(jìn)行查看

 /**
 * 非背壓
 * Observable對(duì)應(yīng)Observer
 */
private void createObservable() {
    //被觀察者
    Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
        @Override
        public void subscribe(ObservableEmitter<String> e) throws Exception {
            e.onNext("This");
            e.onNext("is");
            e.onNext("RxJava");
            e.onComplete();
            e.onNext("Oh");//就算觀察者接收onComplete住诸,
            // 被觀察者還是會(huì)繼續(xù)發(fā)送數(shù)據(jù),只是觀察者不接收
            Log.i(TAG, "subscribe: ");
        }
    });
    //觀察者
    Observer<String> observer = new Observer<String>() {
        Disposable disposable;

        @Override
        public void onSubscribe(Disposable d) {
            disposable = d;
        }

        @Override
        public void onNext(String s) {
            Log.i(TAG, "onNext: " + s);
        }

        @Override
        public void onError(Throwable e) {
            Log.i(TAG, "onError: " + e.getLocalizedMessage());
        }

        @Override
        public void onComplete() {
            Log.i(TAG, "onComplete");
            //取消訂閱
            if (!disposable.isDisposed()) {
                disposable.dispose();
            }
        }
    };
    observable.subscribe(observer);
}

just涣澡、fromArray贱呐、fromIterable

可以將其它種類的對(duì)象和數(shù)據(jù)類型轉(zhuǎn)換為Observable

/**
 * 三種遍歷數(shù)組集合的Observable
 */
private void justFromObservable() {
    Log.i(TAG, "justFrom: ");
    //just
    Observable<Integer> observable = Observable.just(1, 2, 3);
//        //fromArray
//        observable = Observable.fromArray(1, 2, 3);
//
//        ArrayList<Integer> list = new ArrayList<>();
//        list.add(1);
//        //fromIterable
//        observable = Observable.fromIterable(list);
    observable.subscribe(new Observer<Integer>() {
        Disposable disposable;

        @Override
        public void onSubscribe(Disposable d) {
            disposable = d;
        }

        @Override
        public void onNext(Integer integer) {
            Log.i(TAG, "onNext: " + integer);
        }

        @Override
        public void onError(Throwable e) {
            Log.i(TAG, e.getLocalizedMessage());
        }

        @Override
        public void onComplete() {
            Log.i(TAG, "onComplete");
            if (!disposable.isDisposed())
                disposable.dispose();
        }
    });
}

defer

直到有觀察者訂閱時(shí)才創(chuàng)建Observable,并且為每個(gè)觀察者創(chuàng)建一個(gè)新的Observable入桂,例如下面創(chuàng)建deferObservable時(shí)傳入的num=10的(并非真正創(chuàng)建奄薇,訂閱時(shí)才真正創(chuàng)建),然后在訂閱前把num=20,訂閱后收到的數(shù)據(jù)是20事格,并非10惕艳。

 int num = 10;

/**
 * 延遲創(chuàng)建使用defer
 */
private void deferObservable() {
    Observable<Integer> observable = Observable.just(num);
    //使用num=10來(lái)創(chuàng)建deferObservable 
    Observable<Integer> deferObservable = Observable.defer(new Callable<ObservableSource<? extends Integer>>() {
        @Override
        public ObservableSource<? extends Integer> call() throws Exception {
            return Observable.just(num);
        }
    });
    num = 20;
    deferObservable.subscribe(new Observer<Integer>() {
        Disposable disposable;

        @Override
        public void onSubscribe(Disposable d) {
            disposable = d;
        }

        @Override
        public void onNext(Integer integer) {
            //輸出20
            Log.i(TAG, "deferObserve onNext: " + integer);
        }

        @Override
        public void onError(Throwable e) {
            Log.i(TAG, e.getLocalizedMessage());
        }

        @Override
        public void onComplete() {
            Log.i(TAG, "onComplete");
            if (!disposable.isDisposed())
                disposable.dispose();
        }
    });
    observable.subscribe(new Observer<Integer>() {
        Disposable disposable;

        @Override
        public void onSubscribe(Disposable d) {
            disposable = d;
        }

        @Override
        public void onNext(Integer integer) {
            //輸出10
            Log.i(TAG, "observable onNext: " + integer);
        }

        @Override
        public void onError(Throwable e) {
            Log.i(TAG, e.getLocalizedMessage());
        }

        @Override
        public void onComplete() {
            Log.i(TAG, "onComplete");
            if (!disposable.isDisposed())
                disposable.dispose();
        }
    });
}

empty、Never驹愚、Throw

1.創(chuàng)建一個(gè)不發(fā)射任何數(shù)據(jù)但是正常終止的Observable
2.創(chuàng)建一個(gè)不發(fā)射數(shù)據(jù)也不終止的Observable
3.創(chuàng)建一個(gè)不發(fā)射數(shù)據(jù)以一個(gè)錯(cuò)誤終止的Observable
這三個(gè)操作符生成的Observable行為非常特殊和受限。測(cè)試的時(shí)候很有用劣纲,有時(shí)候也用于結(jié)合其它的Observables逢捺,或者作為其它需要Observable的操作符的參數(shù)。

private void emptyNeverThrow() {
    Observable.empty().subscribe();
    Observable.never().subscribe();
    Observable.error(new NullPointerException()).subscribe();
}

interval

創(chuàng)建一個(gè)按固定時(shí)間間隔發(fā)射整數(shù)序列的Observable

private void timerObservable() {
    Observable.interval(2, TimeUnit.SECONDS).subscribe(new Consumer<Long>() {
        @Override
        public void accept(@NonNull Long aLong) throws Exception {
            Log.i(TAG, Thread.currentThread().getName() + "  accept: " + aLong);
        }
    });
}

range(1, 10)

創(chuàng)建一個(gè)發(fā)射特定整數(shù)序列的Observable(這里是1到10)

 private void rangeObservable() {
    Observable.range(1, 10).subscribe(new Consumer<Integer>() {
        @Override
        public void accept(@NonNull Integer integer) throws Exception {
            Log.i(TAG, Thread.currentThread().getName() + "  accept: " + integer);
        }
    });
}

timer

在一個(gè)給定的延遲后發(fā)射一個(gè)特殊的值

private void timerObservable() {
    Observable.timer(2, TimeUnit.SECONDS).subscribe(new Consumer<Long>() {
        @Override
        public void accept(@NonNull Long aLong) throws Exception {
            Log.i(TAG, Thread.currentThread().getName() + "  accept: " + aLong);
        }
    });
}

repeat癞季、repeatWhen

創(chuàng)建一個(gè)發(fā)射特定數(shù)據(jù)重復(fù)多次的Observable劫瞳,但是repeatWhen它不是緩存和重放原始Observable的數(shù)據(jù)序列,而是有條件的重新訂閱和發(fā)射原來(lái)的Observable绷柒。repeatWhen操作符默認(rèn)在trampoline調(diào)度器上執(zhí)行志于。

private void repeatObservable() {
    Observable<String> observable = Observable.just("AAA", "BBB", "CCC");
//        observable.repeat(5).subscribe(new Consumer<String>() {
//            @Override
//            public void accept(@NonNull String s) throws Exception {
//                Log.i(TAG, Thread.currentThread().getName() + "   accept: " + s);
//            }
//        });
    observable.repeatWhen(new Function<Observable<Object>, ObservableSource<?>>() {
        @Override
        public ObservableSource<?> apply(@NonNull Observable<Object> objectObservable) throws Exception {
            return objectObservable;
        }
    }).subscribe(new Consumer<String>() {
        @Override
        public void accept(@NonNull String s) throws Exception {
            Log.i(TAG, Thread.currentThread().getName() + "   accept: " + s);
        }
    });
}
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市废睦,隨后出現(xiàn)的幾起案子伺绽,更是在濱河造成了極大的恐慌,老刑警劉巖,帶你破解...
    沈念sama閱讀 206,723評(píng)論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件奈应,死亡現(xiàn)場(chǎng)離奇詭異澜掩,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)杖挣,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,485評(píng)論 2 382
  • 文/潘曉璐 我一進(jìn)店門肩榕,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人惩妇,你說(shuō)我怎么就攤上這事株汉。” “怎么了歌殃?”我有些...
    開封第一講書人閱讀 152,998評(píng)論 0 344
  • 文/不壞的土叔 我叫張陵郎逃,是天一觀的道長(zhǎng)。 經(jīng)常有香客問我挺份,道長(zhǎng)褒翰,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 55,323評(píng)論 1 279
  • 正文 為了忘掉前任匀泊,我火速辦了婚禮优训,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘各聘。我一直安慰自己揣非,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,355評(píng)論 5 374
  • 文/花漫 我一把揭開白布躲因。 她就那樣靜靜地躺著早敬,像睡著了一般。 火紅的嫁衣襯著肌膚如雪大脉。 梳的紋絲不亂的頭發(fā)上搞监,一...
    開封第一講書人閱讀 49,079評(píng)論 1 285
  • 那天,我揣著相機(jī)與錄音镰矿,去河邊找鬼琐驴。 笑死,一個(gè)胖子當(dāng)著我的面吹牛秤标,可吹牛的內(nèi)容都是我干的绝淡。 我是一名探鬼主播,決...
    沈念sama閱讀 38,389評(píng)論 3 400
  • 文/蒼蘭香墨 我猛地睜開眼苍姜,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼牢酵!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起衙猪,我...
    開封第一講書人閱讀 37,019評(píng)論 0 259
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤馍乙,失蹤者是張志新(化名)和其女友劉穎布近,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體潘拨,經(jīng)...
    沈念sama閱讀 43,519評(píng)論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡吊输,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 35,971評(píng)論 2 325
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了铁追。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片季蚂。...
    茶點(diǎn)故事閱讀 38,100評(píng)論 1 333
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖琅束,靈堂內(nèi)的尸體忽然破棺而出扭屁,到底是詐尸還是另有隱情,我是刑警寧澤涩禀,帶...
    沈念sama閱讀 33,738評(píng)論 4 324
  • 正文 年R本政府宣布料滥,位于F島的核電站,受9級(jí)特大地震影響艾船,放射性物質(zhì)發(fā)生泄漏葵腹。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,293評(píng)論 3 307
  • 文/蒙蒙 一屿岂、第九天 我趴在偏房一處隱蔽的房頂上張望践宴。 院中可真熱鬧,春花似錦爷怀、人聲如沸阻肩。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,289評(píng)論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)烤惊。三九已至,卻和暖如春吁朦,著一層夾襖步出監(jiān)牢的瞬間柒室,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,517評(píng)論 1 262
  • 我被黑心中介騙來(lái)泰國(guó)打工喇完, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留伦泥,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 45,547評(píng)論 2 354
  • 正文 我出身青樓锦溪,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親府怯。 傳聞我的和親對(duì)象是個(gè)殘疾皇子刻诊,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,834評(píng)論 2 345

推薦閱讀更多精彩內(nèi)容