RxJava 3.X來襲柳爽,請做好準備~

看到此文媳握,你應該是你的技術(shù)圈第一個感知到RxJava 3.0.0-RC0來襲的大牛...

本文已經(jīng)更新在掘金,請不要重讀浪費時間哦

前言

每個Android開發(fā)者磷脯,都是愛RxJava的,簡潔線程切換和多網(wǎng)絡請求合并蛾找,再配合Retrofit,簡直是APP開發(fā)的福音。不知不覺赵誓,RxJava一路走來打毛,已經(jīng)更新到第三大版本了。不像RxJava 2對RxJava 1那么殘忍俩功,RxJava 3對RxJava 2的兼容性還是挺好的幻枉,目前并沒有做出很大的更改。RxJava2到2020年12月31號不再提供支持诡蜓,錯誤的會同時在2.x和3.x修復熬甫,但新功能只會在3.x上添加。

同時蔓罚,希望通過本文椿肩,能知道垃圾箱顏色分類。

作為嘗鮮豺谈,趕緊品嘗吧覆旱。

image

主要變化

主要特點

  • 單一依賴:Reactive-Streams
  • 繼續(xù)支持Java 6+和Android 2.3+
  • 修復了API錯誤和RxJava 2的許多限制
  • 旨在替代RxJava 2,具有相對較少的二進制不兼容更改
  • 提供Java 8 lambda友好的API
  • 關(guān)于并發(fā)源的不同意見
  • 異步或同步執(zhí)行
  • 參數(shù)化并發(fā)的虛擬時間和調(diào)度程序
  • 為測試schedulers核无,consumers和plugin hooks提供測試和診斷支持

與RxJava 2的主要區(qū)別是:

  • 將eagerTruncate添加到replay運算符扣唱,以便head節(jié)點將在截斷時丟失它保留的項引用
  • 新增 X.fromSupplier()
  • 使用 Scheduler 添加 concatMap,保證 mapper 函數(shù)的運行位置
  • 新增 startWithItem 和 startWithIterable
  • ConnectableFlowable/ConnetableFlowable 重新設計
  • 將 as() 并入 to()
  • 更改 Maybe.defaultIfEmpty() 以返回 Single
  • 用 Supplier 代替 Callable
  • 將一些實驗操作符推廣到標準
  • 從某些主題/處理器中刪除 getValues()
  • 刪除 replay(Scheduler) 及其重載
  • 刪除 dematerialize()
  • 刪除 startWith(T|Iterable)
  • 刪除 as()
  • 刪除 Maybe.toSingle(T)
  • 刪除 Flowable.subscribe(4 args)
  • 刪除 Observable.subscribe(4 args)
  • 刪除 Single.toCompletable()
  • 刪除 Completable.blockingGet()

到這里就結(jié)束了,想知道的都知道了噪沙。

image

入門

1炼彪、添加依賴

implementation "io.reactivex.rxjava3:rxjava:3.0.0-RC0"

不好意思哦,還沒看到RxAndroid出3.0正歼,這就很尷尬了...

2辐马、一些概念

2.1、上流局义、下流

在RxJava,數(shù)據(jù)以流的方式組織喜爷。也就是說,Rxjava包括一個源的數(shù)據(jù)流萄唇,數(shù)據(jù)流后跟著消費者的零個到多個消費數(shù)據(jù)流步驟檩帐。

source
  .operator1()
  .operator2()
  .operator3()
  .subscribe(consumer)

在上文代碼中,對于operator2來說另萤,在它前面叫做上流湃密,在它后面的叫做下流。憋住四敞,別笑泛源,真的是下流來的。

2.2忿危、流的對象

在RxJava的文檔中达箍,emission, emits, item, event, signal, data and message都被認為在數(shù)據(jù)流中被傳遞的數(shù)據(jù)對象。

2.3铺厨、背壓(Backpressure)

當數(shù)據(jù)流通過異步的步驟執(zhí)行時缎玫,這些步驟的執(zhí)行速度可能不一致。也就是說上流數(shù)據(jù)發(fā)送太快努释,下流沒有足夠的能力去處理。為了避免這種情況咬摇,一般要么緩存上流的數(shù)據(jù)伐蒂,要么拋棄數(shù)據(jù)。但這種處理方式肛鹏,有時會帶來很大的問題逸邦。為此,RxJava帶來了backpressure的概念在扰。背壓是一種流量的控制步驟缕减,在不知道上流還有多少數(shù)據(jù)的情形下控制內(nèi)存的使用,表示它們還能處理多少數(shù)據(jù)芒珠。

支持背壓的有Flowable類桥狡,不支持背壓的有Observable,Single, Maybe and Completable類。

2.4 線程調(diào)度器(Schedulers)

對于我們Android開發(fā)來說裹芝,最喜歡的就是它簡潔切換線程的操作部逮。RxJava通過調(diào)度器來方便線程的切換。

  • Schedulers.computation(): 適合運行在密集計算的操作嫂易,大多數(shù)異步操作符使用該調(diào)度器兄朋。
  • Schedulers.io():適合運行I/0和阻塞操作.
  • Schedulers.single():適合需要單一線程的操作
  • Schedulers.trampoline(): 適合需要順序運行的操作

在不同平臺還有不同的調(diào)度器,例如Android的主線程:AndroidSchedulers.mainThread()

Flowable.range(1, 10)
  .observeOn(Schedulers.computation())
  .map(v -> v * v)
  .blockingSubscribe(System.out::println);

2.5 基類

在 RxJava 3 可以發(fā)現(xiàn)有以下幾個基類(跟RxJava 2是一致的吧):

  • io.reactivex.Flowable:發(fā)送0個N個的數(shù)據(jù)怜械,支持Reactive-Streams和背壓
  • io.reactivex.Observable:發(fā)送0個N個的數(shù)據(jù)颅和,不支持背壓,
  • io.reactivex.Single:只能發(fā)送單個數(shù)據(jù)或者一個錯誤
  • io.reactivex.Completable:沒有發(fā)送任何數(shù)據(jù)缕允,但只處理 onComplete 和 onError 事件峡扩。
  • io.reactivex.Maybe:能夠發(fā)射0或者1個數(shù)據(jù),要么成功灼芭,要么失敗有额。

不建議再往下看了,建議點贊或收藏...

下文關(guān)于操作符內(nèi)容太多了

等需要了彼绷,再來查閱

下班時間還是好好護發(fā)吧

My GitHub

image

操作符

實用操作符

1巍佑、ObserveOn

指定觀察者的線程,例如在Android訪問網(wǎng)絡后寄悯,數(shù)據(jù)需要主線程消費萤衰,那么將觀察者的線程切換到主線就需要ObserveOn操作符。每次指定一次都會生效猜旬。

2脆栋、subscribeOn

指定被觀察者的線程,即數(shù)據(jù)源發(fā)生的線程。例如在Android訪問網(wǎng)絡時钮惠,需要將線程切換到子線程倍奢。多次指定只有第一次有效。

3秦踪、doOnEach

數(shù)據(jù)源(Observable)每發(fā)送一次數(shù)據(jù),就調(diào)用一次掸茅。

4椅邓、doOnNext

數(shù)據(jù)源每次調(diào)用onNext() 之前都會先回調(diào)該方法。

5昧狮、doOnError

數(shù)據(jù)源每次調(diào)用onError() 之前會回調(diào)該方法景馁。

6、doOnComplete

數(shù)據(jù)源每次調(diào)用onComplete() 之前會回調(diào)該方法

7逗鸣、doOnSubscribe

數(shù)據(jù)源每次調(diào)用onSubscribe() 之后會回調(diào)該方法

8合住、doOnDispose

數(shù)據(jù)源每次調(diào)用dispose() 之后會回調(diào)該方法

其他的見官網(wǎng)吧绰精,不難

實用操作符

image

對數(shù)據(jù)源過濾操作符

主要講對數(shù)據(jù)源進行選擇和過濾的常用操作符

1、skip(跳過)

可以作用于Flowable,Observable聊疲,表示源發(fā)射數(shù)據(jù)前茬底,跳過多少個。例如下面跳過前四個:

Observable<Integer> source = Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);

source.skip(4)
    .subscribe(System.out::print);

打印結(jié)果:5678910

Observable<Integer> source = Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);

source.skipLast(4)
    .subscribe(System.out::print);
    
打印結(jié)果:1 2 3 4 5 6

skipLast(n)操作表示從流的尾部跳過n個元素获洲。

2阱表、debounce(去抖動)

可作用于Flowable,Observable。在Android開發(fā)贡珊,通常為了防止用戶重復點擊而設置標記位最爬,而通過RxJava的debounce操作符可以有效達到該效果。在規(guī)定時間內(nèi)门岔,用戶重復點擊只有最后一次有效爱致,

Observable<String> source = Observable.create(emitter -> {
    emitter.onNext("A");

    Thread.sleep(1_500);
    emitter.onNext("B");

    Thread.sleep(500);
    emitter.onNext("C");

    Thread.sleep(250);
    emitter.onNext("D");

    Thread.sleep(2_000);
    emitter.onNext("E");
    emitter.onComplete();
});

source.subscribeOn(Schedulers.io())
        .debounce(1, TimeUnit.SECONDS)
        .blockingSubscribe(
                item -> System.out.print(item+" "),
                Throwable::printStackTrace,
                () -> System.out.println("onComplete"));

打印:A D E onComplete

上文代碼中寒随,數(shù)據(jù)源以一定的時間間隔發(fā)送A,B,C,D,E糠悯。操作符debounce的時間設為1秒,發(fā)送A后1.5秒并沒有發(fā)射其他數(shù)據(jù)妻往,所以A能成功發(fā)射互艾。發(fā)射B后,在1秒之內(nèi)讯泣,又發(fā)射了C和D,在D之后的2秒才發(fā)射E,所有B纫普、C都失效,只有D有效好渠;而E之后已經(jīng)沒有其他數(shù)據(jù)流了昨稼,所有E有效。

image

3拳锚、distinct(去重)

可作用于Flowable,Observable假栓,去掉數(shù)據(jù)源重復的數(shù)據(jù)。

Observable.just(2, 3, 4, 4, 2, 1)
        .distinct()
        .subscribe(System.out::println);

// 打印:2 3 4 2 1
Observable.just(1, 1, 2, 1, 2, 3, 3, 4)
        .distinctUntilChanged()
        .subscribe(System.out::print);
//打踊舨簟:1 2 1 2 3 4

distinctUntilChanged()去掉相鄰重復數(shù)據(jù)匾荆。

4、elementAt(獲取指定位置元素)

可作用于Flowable,Observable抗楔,從數(shù)據(jù)源獲取指定位置的元素棋凳,從0開始拦坠。

 Observable.just(2,4,3,1,5,8)
        .elementAt(0)
        .subscribe(integer -> 
         Log.d("TAG","elmentAt->"+integer));
打恿铩:2

Observable<String> source = Observable.just("Kirk", "Spock", "Chekov", "Sulu");
Single<String> element = source.elementAtOrError(4);

element.subscribe(
    name -> System.out.println("onSuccess will not be printed!"),
    error -> System.out.println("onError: " + error));
打印:onSuccess will not be printed!

elementAtOrError:指定元素的位置超過數(shù)據(jù)長度贞滨,則發(fā)射異常入热。

5拍棕、filter(過濾)

可作用于 Flowable,Observable,Maybe,Single。在filter中返回表示發(fā)射該元素勺良,返回false表示過濾該數(shù)據(jù)绰播。

Observable.just(1, 2, 3, 4, 5, 6)
        .filter(x -> x % 2 == 0)
        .subscribe(System.out::print);
打印:2 4 6

6尚困、first(第一個)

作用于 Flowable,Observable蠢箩。發(fā)射數(shù)據(jù)源第一個數(shù)據(jù),如果沒有則發(fā)送默認值事甜。

Observable<String> source = Observable.just("A", "B", "C");
Single<String> firstOrDefault = source.first("D");
firstOrDefault.subscribe(System.out::println);
打用凇:A

Observable<String> emptySource = Observable.empty();
Single<String> firstOrError = emptySource.firstOrError();
firstOrError.subscribe(
        element -> System.out.println("onSuccess will not be printed!"),
        error -> System.out.println("onError: " + error));
打印:onError: java.util.NoSuchElementException

和firstElement的區(qū)別是first返回的是Single逻谦,而firstElement返回Maybe掌实。firstOrError在沒有數(shù)據(jù)會返回異常。

7邦马、last(最后一個)

last贱鼻、lastElement、lastOrError與fist滋将、firstElement邻悬、firstOrError相對應。

Observable<String> source = Observable.just("A", "B", "C");
Single<String> lastOrDefault = source.last("D");
lastOrDefault.subscribe(System.out::println);
//打印:C

Observable<String> source = Observable.just("A", "B", "C");
Maybe<String> last = source.lastElement();
last.subscribe(System.out::println);
//打印:C

Observable<String> emptySource = Observable.empty();
Single<String> lastOrError = emptySource.lastOrError();
lastOrError.subscribe(
        element -> System.out.println("onSuccess will not be printed!"),
        error -> System.out.println("onError: " + error));
// 打痈省:onError: java.util.NoSuchElementException

8拘悦、ignoreElements & ignoreElement(忽略元素)

ignoreElements 作用于Flowable、Observable橱脸。ignoreElement作用于Maybe础米、Single。兩者都是忽略掉數(shù)據(jù)添诉,返回完成或者錯誤時間屁桑。

Single<Long> source = Single.timer(1, TimeUnit.SECONDS);
Completable completable = source.ignoreElement();
completable.doOnComplete(() -> System.out.println("Done!"))
        .blockingAwait();
// 1秒后打印:Donde!

Observable<Long> source = Observable.intervalRange(1, 5, 1, 1, TimeUnit.SECONDS);
Completable completable = source.ignoreElements();
completable.doOnComplete(() -> System.out.println("Done!"))
        .blockingAwait();
// 五秒后打永父啊:Done!

9蘑斧、ofType(過濾掉類型)

作用于Flowable、Observable须眷、Maybe竖瘾、過濾掉類型。

Observable<Number> numbers = Observable.just(1, 4.0, 3, 2.71, 2f, 7);
Observable<Integer> integers = numbers.ofType(Integer.class);
integers.subscribe((Integer x) -> System.out.print(x+" "));
//打印:1 3 7

10花颗、sample

作用于Flowable捕传、Observable,在一個周期內(nèi)發(fā)射最新的數(shù)據(jù)扩劝。

Observable<String> source = Observable.create(emitter -> {
    emitter.onNext("A");

    Thread.sleep(500);
    emitter.onNext("B");

    Thread.sleep(200);
    emitter.onNext("C");

    Thread.sleep(800);
    emitter.onNext("D");

    Thread.sleep(600);
    emitter.onNext("E");
    emitter.onComplete();
});

source.subscribeOn(Schedulers.io())
        .sample(1, TimeUnit.SECONDS)
        .blockingSubscribe(
                item -> System.out.print(item+" "),
                Throwable::printStackTrace,
                () -> System.out.print("onComplete"));
                
// 打佑孤邸: C D onComplete

與debounce的區(qū)別是职辅,sample是以時間為周期的發(fā)射,一秒又一秒內(nèi)的最新數(shù)據(jù)聂示。而debounce是最后一個有效數(shù)據(jù)開始域携。

image

11、throttleFirst & throttleLast & throttleWithTimeout

作用于Flowable鱼喉、Observable秀鞭。throttleLast與smaple一致,而throttleFirst是指定周期內(nèi)第一個數(shù)據(jù)扛禽。throttleWithTimeout與debounce一致气筋。

Observable<String> source = Observable.create(emitter -> {
    emitter.onNext("A");

    Thread.sleep(500);
    emitter.onNext("B");

    Thread.sleep(200);
    emitter.onNext("C");

    Thread.sleep(800);
    emitter.onNext("D");

    Thread.sleep(600);
    emitter.onNext("E");
    emitter.onComplete();
});

source.subscribeOn(Schedulers.io())
        .throttleFirst(1, TimeUnit.SECONDS)
        .blockingSubscribe(
                item -> System.out.print(item+" "),
                Throwable::printStackTrace,
                () -> System.out.print(" onComplete"));
//打印:A D onComplete

source.subscribeOn(Schedulers.io())
        .throttleLast(1, TimeUnit.SECONDS)
        .blockingSubscribe(
                item -> System.out.print(item+" "),
                Throwable::printStackTrace,
                () -> System.out.print(" onComplete"));

// 打印:C D onComplete

12、throttleLatest

之所以拿出來單獨說旋圆,我看不懂官網(wǎng)的解釋宠默。然后看別人的文章:throttleFirst+throttleLast的組合?開玩笑的吧灵巧。個人理解是:如果源的第一個數(shù)據(jù)總會被發(fā)射搀矫,然后開始周期計時,此時的效果就會跟throttleLast一致刻肄。

Observable<String> source = Observable.create(emitter -> {
            emitter.onNext("A");

            Thread.sleep(500);
            emitter.onNext("B");

            Thread.sleep(200);
            emitter.onNext("C");

            Thread.sleep(200);
            emitter.onNext("D");

            Thread.sleep(400);
            emitter.onNext("E");
            
            Thread.sleep(400);
            emitter.onNext("F");
            
            Thread.sleep(400);
            emitter.onNext("G");
            
            Thread.sleep(2000);
            emitter.onComplete();
        });
        source.subscribeOn(Schedulers.io())
        .throttleLatest(1, TimeUnit.SECONDS)
        .blockingSubscribe(
            item -> Log.e("RxJava",item),
                 Throwable::printStackTrace,
            () -> Log.e("RxJava","finished"));

打印結(jié)果:

image

13瓤球、take & takeLast

作用于Flowable、Observable敏弃,take發(fā)射前n個元素;takeLast發(fā)射后n個元素卦羡。

Observable<Integer> source = Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);

source.take(4)
    .subscribe(System.out::print);
//打印:1 2 3 4

source.takeLast(4)
    .subscribe(System.out::println);
//打印:7 8 9 10

14、timeout(超時)

作用于Flowable麦到、Observable绿饵、Maybe、Single瓶颠、Completabl拟赊。后一個數(shù)據(jù)發(fā)射未在前一個元素發(fā)射后規(guī)定時間內(nèi)發(fā)射則返回超時異常。

Observable<String> source = Observable.create(emitter -> {
    emitter.onNext("A");

    Thread.sleep(800);
    emitter.onNext("B");

    Thread.sleep(400);
    emitter.onNext("C");

    Thread.sleep(1200);
    emitter.onNext("D");
    emitter.onComplete();
});

source.timeout(1, TimeUnit.SECONDS)
        .subscribe(
                item -> System.out.println("onNext: " + item),
                error -> System.out.println("onError: " + error),
                () -> System.out.println("onComplete will not be printed!"));
// 打印:
// onNext: A
// onNext: B
// onNext: C
// onError: java.util.concurrent.TimeoutException: 
            The source did not signal an event for 1 seconds 
            and has been terminated.
image

連接操作符

通過連接操作符粹淋,將多個被觀察數(shù)據(jù)(數(shù)據(jù)源)連接在一起吸祟。

1、startWith

可作用于Flowable桃移、Observable屋匕。將指定數(shù)據(jù)源合并在另外數(shù)據(jù)源的開頭。

Observable<String> names = Observable.just("Spock", "McCoy");
Observable<String> otherNames = Observable.just("Git", "Code","8");
names.startWith(otherNames).subscribe(item -> Log.d(TAG,item));

//打咏杞堋:
RxJava: Git
RxJava: Code
RxJava: 8
RxJava: Spock
RxJava: McCo

2过吻、merge

可作用所有數(shù)據(jù)源類型,用于合并多個數(shù)據(jù)源到一個數(shù)據(jù)源第步。

Observable<String> names = Observable.just("Hello", "world");
Observable<String> otherNames = Observable.just("Git", "Code","8");

Observable.merge(names,otherNames).subscribe(name -> Log.d(TAG,name));

//也可以是
//names.mergeWith(otherNames).subscribe(name -> Log.d(TAG,name));

//打哟啊:
RxJava: Hello
RxJava: world
RxJava: Git
RxJava: Code
RxJava: 8

merge在合并數(shù)據(jù)源時,如果一個合并發(fā)生異常后會立即調(diào)用觀察者的onError方法粘都,并停止合并廓推。可通過mergeDelayError操作符翩隧,將發(fā)生的異常留到最后處理樊展。

Observable<String> names = Observable.just("Hello", "world"); 
Observable<String> otherNames = Observable.just("Git", "Code","8");
Observable<String> error = Observable.error(    
                            new NullPointerException("Error!"));
Observable.mergeDelayError(names,error,otherNames).subscribe(
    name -> Log.d(TAG,name), e->Log.d(TAG,e.getMessage()));
    
//打印:
RxJava: Hello
RxJava: world
RxJava: Git
RxJava: Code
RxJava: 8
RxJava: Error!

3堆生、zip

可作用于Flowable专缠、Observable、Maybe淑仆、Single涝婉。將多個數(shù)據(jù)源的數(shù)據(jù)一個一個的合并在一起哇。當其中一個數(shù)據(jù)源發(fā)射完事件之后蔗怠,若其他數(shù)據(jù)源還有數(shù)據(jù)未發(fā)射完畢墩弯,也會停止。

Observable<String> names = Observable.just("Hello", "world");
Observable<String> otherNames = Observable.just("Git", "Code", "8");
names.zipWith(otherNames, (first, last) -> first + "-" + last)
       .subscribe(item -> Log.d(TAG, item));

//打幽洹:
RxJava: Hello-Git
RxJava: world-Code

4渔工、combineLatest

可作用于Flowable, Observable。在結(jié)合不同數(shù)據(jù)源時桥温,發(fā)射速度快的數(shù)據(jù)源最新item與較慢的相結(jié)合引矩。
如下時間線,Observable-1發(fā)射速率快侵浸,發(fā)射了65旺韭,Observable-2才發(fā)射了C, 那么兩者結(jié)合就是C5。


image

5掏觉、switchOnNext

一個發(fā)射多個小數(shù)據(jù)源的數(shù)據(jù)源茂翔,這些小數(shù)據(jù)源發(fā)射數(shù)據(jù)的時間發(fā)生重復時,取最新的數(shù)據(jù)源履腋。

image
image

變換操作符

變化數(shù)據(jù)源的數(shù)據(jù)珊燎,并轉(zhuǎn)化為新的數(shù)據(jù)源。

1遵湖、buffer

作用于Flowable悔政、Observable。指將數(shù)據(jù)源拆解含有長度為n的list的多個數(shù)據(jù)源延旧,不夠n的成為一個數(shù)據(jù)源谋国。

Observable.range(0, 10)
    .buffer(4)
    .subscribe((List<Integer> buffer) -> System.out.println(buffer));

// 打印:
// [0, 1, 2, 3]
// [4, 5, 6, 7]
// [8, 9]

2、cast

作用于Flowable迁沫、Observable芦瘾、Maybe捌蚊、Single。將數(shù)據(jù)元素轉(zhuǎn)型成其他類型,轉(zhuǎn)型失敗會拋出異常近弟。

Observable<Number> numbers = Observable.just(1, 4.0, 3f, 7, 12, 4.6, 5);

numbers.filter((Number x) -> Integer.class.isInstance(x))
    .cast(Integer.class)
    .subscribe((Integer x) -> System.out.println(x));
// prints:
// 1
// 7
// 12
// 5

3缅糟、concatMap

作用于Flowable、Observable祷愉、Maybe窗宦。將數(shù)據(jù)源的元素作用于指定函數(shù)后,將函數(shù)的返回值有序的存在新的數(shù)據(jù)源二鳄。

Observable.range(0, 5)
    .concatMap(i -> {
        long delay = Math.round(Math.random() * 2);

        return Observable.timer(delay, TimeUnit.SECONDS).map(n -> i);
    })
    .blockingSubscribe(System.out::print);

// prints 01234

4赴涵、concatMapDelayError

與concatMap作用相同,只是將過程發(fā)送的所有錯誤延遲到最后處理订讼。

Observable.intervalRange(1, 3, 0, 1, TimeUnit.SECONDS)
    .concatMapDelayError(x -> {
        if (x.equals(1L)) return Observable.error(new IOException("Something went wrong!"));
        else return Observable.just(x, x * x);
    })
    .blockingSubscribe(
        x -> System.out.println("onNext: " + x),
        error -> System.out.println("onError: " + error.getMessage()));

// prints:
// onNext: 2
// onNext: 4
// onNext: 3
// onNext: 9
// onError: Something went wrong!

5髓窜、concatMapCompletable

作用于Flowable、Observable欺殿。與contactMap類似纱烘,不過應用于函數(shù)后,返回的是CompletableSource祈餐。訂閱一次并在所有CompletableSource對象完成時返回一個Completable對象擂啥。

Observable<Integer> source = Observable.just(2, 1, 3);
Completable completable = source.concatMapCompletable(x -> {
    return Completable.timer(x, TimeUnit.SECONDS)
        .doOnComplete(() -> System.out.println("Info: Processing of item \"" + x + "\" completed"));
    });

completable.doOnComplete(() -> System.out.println("Info: Processing of all items completed"))
    .blockingAwait();

// prints:
// Info: Processing of item "2" completed
// Info: Processing of item "1" completed
// Info: Processing of item "3" completed
// Info: Processing of all items completed

6、concatMapCompletableDelayError

與concatMapCompletable作用相同帆阳,只是將過程發(fā)送的所有錯誤延遲到最后處理哺壶。

Observable<Integer> source = Observable.just(2, 1, 3);
Completable completable = source.concatMapCompletableDelayError(x -> {
    if (x.equals(2)) {
        return Completable.error(new IOException("Processing of item \"" + x + "\" failed!"));
    } else {
        return Completable.timer(1, TimeUnit.SECONDS)
            .doOnComplete(() -> System.out.println("Info: Processing of item \"" + x + "\" completed"));
    }
});

completable.doOnError(error -> System.out.println("Error: " + error.getMessage()))
    .onErrorComplete()
    .blockingAwait();

// prints:
// Info: Processing of item "1" completed
// Info: Processing of item "3" completed
// Error: Processing of item "2" failed!

ContactMap

8、flatMap

作用于Flowable蜒谤、Observable山宾、Maybe、Single鳍徽。與contactMap類似资锰,只是contactMap的數(shù)據(jù)發(fā)射是有序的,而flatMap是無序的阶祭。

Observable.just("A", "B", "C")
    .flatMap(a -> {
        return Observable.intervalRange(1, 3, 0, 1, TimeUnit.SECONDS)
                .map(b -> '(' + a + ", " + b + ')');
    })
    .blockingSubscribe(System.out::println);

// prints (not necessarily in this order):
// (A, 1)
// (C, 1)
// (B, 1)
// (A, 2)
// (C, 2)
// (B, 2)
// (A, 3)
// (C, 3)
// (B, 3)

9绷杜、flatMapXXX 和 contactMapXXX

太多了,減少篇幅濒募,大家感興趣自己查閱官網(wǎng)吧鞭盟。功能與flatMap和contactMap類似。

10瑰剃、flattenAsFlowable & flattenAsObservable

作用于Maybe齿诉、Single,將其轉(zhuǎn)化為Flowable,或Observable粤剧。

Single<Double> source = Single.just(2.0);
Flowable<Double> flowable = source.flattenAsFlowable(x -> {
    return List.of(x, Math.pow(x, 2), Math.pow(x, 3));
});

flowable.subscribe(x -> System.out.println("onNext: " + x));

// prints:
// onNext: 2.0
// onNext: 4.0
// onNext: 8.0

11歇竟、groupBy

作用于Flowable、Observable抵恋。根據(jù)一定的規(guī)則對數(shù)據(jù)源進行分組焕议。

Observable<String> animals = Observable.just(
    "Tiger", "Elephant", "Cat", "Chameleon", "Frog", "Fish", "Turtle", "Flamingo");

animals.groupBy(animal -> animal.charAt(0), String::toUpperCase)
    .concatMapSingle(Observable::toList)
    .subscribe(System.out::println);

// prints:
// [TIGER, TURTLE]
// [ELEPHANT]
// [CAT, CHAMELEON]
// [FROG, FISH, FLAMINGO]

12、scan

作用于Flowable馋记、Observable。對數(shù)據(jù)進行相關(guān)聯(lián)操作懊烤,例如聚合等梯醒。

Observable.just(5, 3, 8, 1, 7)
    .scan(0, (partialSum, x) -> partialSum + x)
    .subscribe(System.out::println);

// prints:
// 0
// 5
// 8
// 16
// 17
// 24

13、window

對數(shù)據(jù)源發(fā)射出來的數(shù)據(jù)進行收集腌紧,按照指定的數(shù)量進行分組茸习,以組的形式重新發(fā)射。

Observable.range(1, 4)
    // Create windows containing at most 2 items, and skip 3 items before starting a new window.
    .window(2)
    .flatMapSingle(window -> {
        return window.map(String::valueOf)
                .reduce(new StringJoiner(", ", "[", "]"), StringJoiner::add);
    })
    .subscribe(System.out::println);

// prints:
// [1, 2]
// [3, 4]
image

錯誤處理操作符

1壁肋、onErrorReturn

作用于Flowable号胚、Observable、Maybe浸遗、Single猫胁。但調(diào)用數(shù)據(jù)源的onError函數(shù)后會回到該函數(shù),可對錯誤進行處理跛锌,然后返回值弃秆,會調(diào)用觀察者onNext()繼續(xù)執(zhí)行,執(zhí)行完調(diào)用onComplete()函數(shù)結(jié)束所有事件的發(fā)射髓帽。

Single.just("2A")
    .map(v -> Integer.parseInt(v, 10))
    .onErrorReturn(error -> {
        if (error instanceof NumberFormatException) return 0;
        else throw new IllegalArgumentException();
    })
    .subscribe(
        System.out::println,
        error -> System.err.println("onError should not be printed!"));

// prints 0

2菠赚、onErrorReturnItem

與onErrorReturn類似,onErrorReturnItem不對錯誤進行處理郑藏,直接返回一個值衡查。

Single.just("2A")
    .map(v -> Integer.parseInt(v, 10))
    .onErrorReturnItem(0)
    .subscribe(
        System.out::println,
        error -> System.err.println("onError should not be printed!"));

// prints 0

3、onExceptionResumeNext

可作用于Flowable必盖、Observable拌牲、Maybe。onErrorReturn發(fā)生異常時歌粥,回調(diào)onComplete()函數(shù)后不再往下執(zhí)行们拙,而onExceptionResumeNext則是要在處理異常的時候返回一個數(shù)據(jù)源,然后繼續(xù)執(zhí)行阁吝,如果返回null砚婆,則調(diào)用觀察者的onError()函數(shù)。

Observable.create((ObservableOnSubscribe<Integer>) e -> {
            e.onNext(1);
            e.onNext(2);
            e.onNext(3);
            e.onError(new NullPointerException());
            e.onNext(4);
        })
                .onErrorResumeNext(throwable -> {
                    Log.d(TAG, "onErrorResumeNext ");
                    return Observable.just(4);
                })
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.d(TAG, "onSubscribe ");
                    }

                    @Override
                    public void onNext(Integer integer) {
                        Log.d(TAG, "onNext " + integer);
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "onError ");
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete ");
                    }
                });

結(jié)果:

image

onExceptionResumeNext操作符也是類似的,只是捕獲Exception装盯。

4坷虑、retry

可作用于所有的數(shù)據(jù)源,當發(fā)生錯誤時埂奈,數(shù)據(jù)源重復發(fā)射item迄损,直到?jīng)]有異常或者達到所指定的次數(shù)账磺。

boolean first=true;

Observable.create((ObservableOnSubscribe<Integer>) e -> {
            e.onNext(1);
            e.onNext(2);

            if (first){
                first=false;
                e.onError(new NullPointerException());

            }
            
        })
                .retry(9)
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.d(TAG, "onSubscribe ");
                    }

                    @Override
                    public void onNext(Integer integer) {
                        Log.d(TAG, "onNext " + integer);

                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "onError ");
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete ");
                    }
                });

結(jié)果:

image

5芹敌、retryUntil

作用于Flowable、Observable垮抗、Maybe氏捞。與retry類似,但發(fā)生異常時冒版,返回值是false表示繼續(xù)執(zhí)行(重復發(fā)射數(shù)據(jù))液茎,true不再執(zhí)行,但會調(diào)用onError方法。

 Observable.create((ObservableOnSubscribe<Integer>) e -> {
            e.onNext(1);
            e.onNext(2);
            e.onError(new NullPointerException());
            e.onNext(3);
            e.onComplete();
        })
                .retryUntil(() -> true)
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.d(TAG, "onSubscribe ");
                    }

                    @Override
                    public void onNext(Integer integer) {
                        Log.d(TAG, "onNext " + integer);

                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "onError ");
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete ");
                    }
                });

結(jié)果:


image

retryWhen與此類似辞嗡,但其判斷標準不是BooleanSupplier對象的getAsBoolean()函數(shù)的返回值捆等。而是返回的 Observable或Flowable是否會發(fā)射異常事件。

總結(jié)

太多操作符太累了续室,看得心好累栋烤。還是根據(jù)實際開發(fā)需要查閱文檔才是正確的姿勢。本文只是RxJava冰山一角挺狰,更多請參閱官網(wǎng)班缎。同時不建議立馬在項目上實踐,給它點時間報bug她渴。

參閱官網(wǎng)

好東西要分享

如果你看到了這达址,點個贊,收下我雙膝趁耗。如果文章有誤沉唠,幫忙指正,謝謝大佬們苛败。

image
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末满葛,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子罢屈,更是在濱河造成了極大的恐慌嘀韧,老刑警劉巖,帶你破解...
    沈念sama閱讀 216,692評論 6 501
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件缠捌,死亡現(xiàn)場離奇詭異锄贷,居然都是意外死亡译蒂,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,482評論 3 392
  • 文/潘曉璐 我一進店門谊却,熙熙樓的掌柜王于貴愁眉苦臉地迎上來柔昼,“玉大人,你說我怎么就攤上這事炎辨〔锻福” “怎么了?”我有些...
    開封第一講書人閱讀 162,995評論 0 353
  • 文/不壞的土叔 我叫張陵碴萧,是天一觀的道長乙嘀。 經(jīng)常有香客問我,道長破喻,這世上最難降的妖魔是什么虎谢? 我笑而不...
    開封第一講書人閱讀 58,223評論 1 292
  • 正文 為了忘掉前任,我火速辦了婚禮低缩,結(jié)果婚禮上嘉冒,老公的妹妹穿的比我還像新娘曹货。我一直安慰自己咆繁,他們只是感情好,可當我...
    茶點故事閱讀 67,245評論 6 388
  • 文/花漫 我一把揭開白布顶籽。 她就那樣靜靜地躺著玩般,像睡著了一般。 火紅的嫁衣襯著肌膚如雪礼饱。 梳的紋絲不亂的頭發(fā)上坏为,一...
    開封第一講書人閱讀 51,208評論 1 299
  • 那天,我揣著相機與錄音镊绪,去河邊找鬼匀伏。 笑死,一個胖子當著我的面吹牛蝴韭,可吹牛的內(nèi)容都是我干的够颠。 我是一名探鬼主播,決...
    沈念sama閱讀 40,091評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼榄鉴,長吁一口氣:“原來是場噩夢啊……” “哼履磨!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起庆尘,我...
    開封第一講書人閱讀 38,929評論 0 274
  • 序言:老撾萬榮一對情侶失蹤剃诅,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后驶忌,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體矛辕,經(jīng)...
    沈念sama閱讀 45,346評論 1 311
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,570評論 2 333
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了如筛。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片堡牡。...
    茶點故事閱讀 39,739評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖杨刨,靈堂內(nèi)的尸體忽然破棺而出晤柄,到底是詐尸還是另有隱情,我是刑警寧澤妖胀,帶...
    沈念sama閱讀 35,437評論 5 344
  • 正文 年R本政府宣布芥颈,位于F島的核電站,受9級特大地震影響赚抡,放射性物質(zhì)發(fā)生泄漏爬坑。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,037評論 3 326
  • 文/蒙蒙 一涂臣、第九天 我趴在偏房一處隱蔽的房頂上張望盾计。 院中可真熱鬧,春花似錦赁遗、人聲如沸署辉。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,677評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽哭尝。三九已至,卻和暖如春剖煌,著一層夾襖步出監(jiān)牢的瞬間材鹦,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,833評論 1 269
  • 我被黑心中介騙來泰國打工耕姊, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留桶唐,地道東北人。 一個月前我還...
    沈念sama閱讀 47,760評論 2 369
  • 正文 我出身青樓茉兰,卻偏偏與公主長得像尤泽,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子邦邦,可洞房花燭夜當晚...
    茶點故事閱讀 44,647評論 2 354