RxJava2 系列 -1:一篇的比較全面的 RxJava2 方法總結(jié)

imgtrip-com-2559b11dd1a96b8dffae0285a2fb9f82f589e624.jpg

看了許多講解RxJava的文章,有些文章講解的內(nèi)容是基于第一個(gè)版本的棠枉,有些文章的講解是通過比較常用的一些API和基礎(chǔ)的概念進(jìn)行講解的。
但是每次看到RxJava的類中的幾十個(gè)方法的時(shí)候泡挺,總是感覺心里沒底辈讶。所以,我打算自己去專門寫篇文章來從API的角度系統(tǒng)地梳理一下RxJava的各種方法和用法娄猫。

1荞估、RxJava 基本

1.1 RxJava 簡介

RxJava是一個(gè)在Java VM上使用可觀測的序列來組成異步的、基于事件的程序的庫稚新。

雖然勘伺,在Android中,我們可以使用AsyncTask來完成異步任務(wù)操作褂删,但是當(dāng)任務(wù)的梳理比較多的時(shí)候飞醉,我們要為每個(gè)任務(wù)定義一個(gè)AsyncTask就變得非常繁瑣。
RxJava能幫助我們在實(shí)現(xiàn)異步執(zhí)行的前提下保持代碼的清晰。
它的原理就是創(chuàng)建一個(gè)Observable來完成異步任務(wù)缅帘,組合使用各種不同的鏈?zhǔn)讲僮髦崾酰瑏韺?shí)現(xiàn)各種復(fù)雜的操作,最終將任務(wù)的執(zhí)行結(jié)果發(fā)射給Observer進(jìn)行處理钦无。
當(dāng)然逗栽,RxJava不僅適用于Android,也適用于服務(wù)端等各種場景失暂。

我們總結(jié)以下RxJava的用途:

  1. 簡化異步程序的流程彼宠;
  2. 使用近似于Java8的流的操作進(jìn)行編程:因?yàn)橄胍贏ndroid中使用Java8的流編程有諸多的限制,所以我們可以使用RxJava來實(shí)現(xiàn)這個(gè)目的弟塞。

在使用RxJava之前凭峡,我們需要先在自己的項(xiàng)目中添加如下的依賴:

compile 'io.reactivex.rxjava2:rxjava:2.2.0'
compile 'io.reactivex.rxjava2:rxandroid:2.0.2'

這里我們使用的是RxJava2,它與RxJava的第一個(gè)版本有些許不同决记。在本文中摧冀,我們所有的關(guān)于RxJava的示例都將基于RxJava2.

注:如果想了解關(guān)于Java8的流編程的內(nèi)容的內(nèi)容,可以參考我之前寫過的文章五分鐘學(xué)習(xí)Java8的流編程系宫。

1.2 概要

下面是RxJava的一個(gè)基本的用例索昂,這里我們定義了一個(gè)Observable,然后在它內(nèi)部使用emitter發(fā)射了一些數(shù)據(jù)和信息(其實(shí)就相當(dāng)于調(diào)用了被觀察對象內(nèi)部的方法扩借,通知所有的觀察者)椒惨。
然后,我們用Consumer接口的實(shí)例作為subscribe()方法的參數(shù)來觀察發(fā)射的結(jié)果往枷。(這里的接口的方法都已經(jīng)被使用Lambda簡化過,應(yīng)該學(xué)著適應(yīng)它凄杯。)

Observable<Integer> observable = Observable.create(emitter -> {
    emitter.onNext(1);
    emitter.onNext(2);
    emitter.onNext(3);
});
observable.subscribe(System.out::println);

這樣错洁,我們就完成了一個(gè)基本的RxJava的示例。從上面的例子中戒突,你或許沒法看出Observable中隱藏的流的概念屯碴,看下面的例子:

Observable.range(0, 10).map(String::valueOf).forEach(System.out::println);

這里我們先用Observable.range()方法產(chǎn)生一個(gè)序列,然后用map方法將該整數(shù)序列映射成一個(gè)字符序列膊存,最后將得到的序列輸出來导而。從上面看出,這種操作和Java8里面的Stream編程很像隔崎。但是兩者之間是有區(qū)別的:

  1. 所謂的“推”和“拉”的區(qū)別:Stream中是通過從流中讀取數(shù)據(jù)來實(shí)現(xiàn)鏈?zhǔn)讲僮鹘褚眨鳵xJava除了Stream中的功能之外,還可以通過“發(fā)射”數(shù)據(jù)爵卒,來實(shí)現(xiàn)通知的功能虚缎,即RxJava在Stream之上又多了一個(gè)觀察者的功能。
  2. Java8中的Stream可以通過parall()來實(shí)現(xiàn)并行钓株,即基于分治算法將任務(wù)分解并計(jì)算得到結(jié)果之后將結(jié)果合并起來实牡;而RxJava只能通過subscribeOn()方法將所有的操作切換到某個(gè)線程中去陌僵。
  3. Stream只能被消費(fèi)一次,但是Observable可以被多次進(jìn)行訂閱创坞;

RxJava除了為我們提供了Observable之外碗短,在新的RxJava中還提供了適用于其他場景的基礎(chǔ)類,它們之間的功能和主要區(qū)別如下:

  1. Flowable: 多個(gè)流题涨,響應(yīng)式流和背壓
  2. Observable: 多個(gè)流偎谁,無背壓
  3. Single: 只有一個(gè)元素或者錯(cuò)誤的流
  4. Completable: 沒有任何元素,只有一個(gè)完成和錯(cuò)誤信號的流
  5. Maybe: 沒有任何元素或者只有一個(gè)元素或者只有一個(gè)錯(cuò)誤的流

除了上面的幾個(gè)基礎(chǔ)類之外携栋,還有一個(gè)Disposable搭盾。當(dāng)我們監(jiān)聽某個(gè)流的時(shí)候,就能獲取到一個(gè)Disposable對象婉支。它提供了兩個(gè)方法鸯隅,一個(gè)是isDisposed,可以被用來判斷是否停止了觀察指定的流向挖;另一個(gè)是dispose方法蝌以,用來放棄觀察指定的流,我們可以使用它在任意的時(shí)刻停止觀察操作何之。

1.3 總結(jié)

上面我們介紹了了關(guān)于RxJava的基本的概念和使用方式跟畅,在下面的文章中我們會按照以上定義的順序從API的角度來講解以下RxJava各個(gè)模塊的使用方法。

2溶推、RxJava 的使用

2.1 Observable

從上面的文章中我們可以得知徊件,Observable和后面3種操作功能近似,區(qū)別在于Flowable加入了背壓的概念蒜危,Observable的大部分方法也適用于其他3個(gè)操作和Flowable虱痕。
因此,我們這里先從Observable開始梳理辐赞,然后我們再專門對Flowable和背壓的進(jìn)行介紹部翘。

Observable為我們提供了一些靜態(tài)的構(gòu)造方法來創(chuàng)建一個(gè)Observable對象,還有許多鏈?zhǔn)降姆椒▉硗瓿筛鞣N復(fù)雜的功能响委。
這里我們按照功能將它的這些方法分成各個(gè)類別并依次進(jìn)行相關(guān)的說明新思。

2.1.1 創(chuàng)建操作

1.interval & intervalRange

下面的操作可以每個(gè)3秒的時(shí)間發(fā)送一個(gè)整數(shù),整數(shù)從0開始:

Observable.interval(3, TimeUnit.SECONDS).subscribe(System.out::println);

如果想要設(shè)置從指定的數(shù)字開始也是可以的赘风,實(shí)際上interval提供了許多重載方法供我們是使用夹囚。下面我們連同與之功能相近的intervalRange方法也一同給出:

  1. public static Observable<Long> interval(long initialDelay, long period, TimeUnit unit, Scheduler scheduler)
  2. public static Observable<Long> interval(long period, TimeUnit unit, Scheduler scheduler)
  3. public static Observable<Long> intervalRange(long start, long count, long initialDelay, long period, TimeUnit unit, Scheduler scheduler)

這里的initialDelay參數(shù)用來指示開始發(fā)射第一個(gè)整數(shù)的之前要停頓的時(shí)間,時(shí)間的單位與peroid一樣邀窃,都是通過unit參數(shù)來指定的崔兴;period參數(shù)用來表示每個(gè)發(fā)射之間停頓多少時(shí)間;unit表示時(shí)間的單位,是TimeUnit類型的敲茄;scheduler參數(shù)指定數(shù)據(jù)發(fā)射和等待時(shí)所在的線程位谋。

intervalRange方法可以用來將發(fā)射的整數(shù)序列限制在一個(gè)范圍之內(nèi),這里的start用來表示發(fā)射的數(shù)據(jù)的起始值堰燎,count表示總共要發(fā)射幾個(gè)數(shù)字掏父,其他參數(shù)與上面的interval方法一致。

2.range & rangeLong

下面的操作可以產(chǎn)生一個(gè)從5開始的連續(xù)10個(gè)整數(shù)構(gòu)成的序列:

Observable.range(5, 10).subscribe(i -> System.out.println("1: " + i));

該方法需要傳入兩個(gè)參數(shù)秆剪,與之有相同功能的方法還有rangeLong

  1. public static Observable<Integer> range(final int start, final int count)
  2. public static Observable<Long> rangeLong(long start, long count)

這里的兩個(gè)參數(shù)start用來指定用于生成的序列的開始值赊淑,count用來指示要生成的序列總共包含多少個(gè)數(shù)字,上面的兩個(gè)方法的主要區(qū)別在于一個(gè)是用來生成int型整數(shù)的仅讽,一個(gè)是用來生成long型整數(shù)的陶缺。

3.create

create方法用于從頭開始創(chuàng)建一個(gè)Observable,像下面顯示的那樣洁灵,你需要使用create方法并傳一個(gè)發(fā)射器作為參數(shù)饱岸,在該發(fā)射器內(nèi)部調(diào)用onNextonCompleteonError方法就可以將數(shù)據(jù)發(fā)送給監(jiān)聽者徽千。

Observable.create((ObservableOnSubscribe<Integer>) observableEmitter -> {
    observableEmitter.onNext(1);
    observableEmitter.onNext(2);
    observableEmitter.onComplete();
}).subscribe(System.out::println);

4.defer

defer直到有觀察者訂閱時(shí)才創(chuàng)建Observable苫费,并且為每個(gè)觀察者創(chuàng)建一個(gè)新的Observable。defer操作符會一直等待直到有觀察者訂閱它双抽,然后它使用Observable工廠方法生成一個(gè)Observable百框。比如下面的代碼兩個(gè)訂閱輸出的結(jié)果是不一致的:

Observable<Long> observable = Observable.defer((Callable<ObservableSource<Long>>) () -> Observable.just(System.currentTimeMillis()));
observable.subscribe(System.out::print);
System.out.println();
observable.subscribe(System.out::print);

下面是該方法的定義,它接受一個(gè)Callable對象牍汹,可以在該對象中返回一個(gè)Observable的實(shí)例:

public static <T> Observable<T> defer(Callable<? extends ObservableSource<? extends T>> supplier)

5.empty & never & error

  1. public static <T> Observable<T> empty():創(chuàng)建一個(gè)不發(fā)射任何數(shù)據(jù)但是正常終止的Observable铐维;
  2. public static <T> Observable<T> never():創(chuàng)建一個(gè)不發(fā)射數(shù)據(jù)也不終止的Observable;
  3. public static <T> Observable<T> error(Throwable exception):創(chuàng)建一個(gè)不發(fā)射數(shù)據(jù)以一個(gè)錯(cuò)誤終止的Observable慎菲,它有幾個(gè)重載版本嫁蛇,這里給出其中的一個(gè)。

測試代碼:

Observable.empty().subscribe(i->System.out.print("next"),i->System.out.print("error"),()->System.out.print("complete"));
Observable.never().subscribe(i->System.out.print("next"),i->System.out.print("error"),()->System.out.print("complete"));
Observable.error(new Exception()).subscribe(i->System.out.print("next"),i->System.out.print("error"),()->System.out.print("complete"));

輸出結(jié)果:completeerror

6.from 系列

from系列的方法用來從指定的數(shù)據(jù)源中獲取一個(gè)Observable:

  1. public static <T> Observable<T> fromArray(T... items):從數(shù)組中獲染弧棠众;
  2. public static <T> Observable<T> fromCallable(Callable<? extends T> supplier):從Callable中獲攘帐琛有决;
  3. public static <T> Observable<T> fromFuture(Future<? extends T> future):從Future中獲取,有多個(gè)重載版本空盼,可以用來指定線程和超時(shí)等信息书幕;
  4. public static <T> Observable<T> fromIterable(Iterable<? extends T> source):從Iterable中獲取揽趾;
  5. public static <T> Observable<T> fromPublisher(Publisher<? extends T> publisher):從Publisher中獲取台汇。

7.just 系列

just系列的方法的一個(gè)參數(shù)的版本為下面的形式:public static <T> Observable<T> just(T item),它還有許多個(gè)重載的版本,區(qū)別在于接受的參數(shù)的個(gè)數(shù)不同苟呐,最少1個(gè)痒芝,最多10個(gè)。

8.repeat

該方法用來表示指定的序列要發(fā)射多少次牵素,下面的方法會將該序列無限次進(jìn)行發(fā)送:

Observable.range(5, 10).repeat().subscribe(i -> System.out.println(i));

repeat方法有以下幾個(gè)相似方法:

  1. public final Observable<T> repeat()
  2. public final Observable<T> repeat(long times)
  3. public final Observable<T> repeatUntil(BooleanSupplier stop)
  4. public final Observable<T> repeatWhen(Function<? super Observable<Object>, ? extends ObservableSource<?>> handler)

第1個(gè)無參的方法會無限次地發(fā)送指定的序列(實(shí)際上內(nèi)部調(diào)用了第2個(gè)方法并傳入了Long.MAX_VALUE)严衬,第2個(gè)方法會將指定的序列重復(fù)發(fā)射指定的次數(shù);第3個(gè)方法會在滿足指定的要求的時(shí)候停止重復(fù)發(fā)送笆呆,否則會一直發(fā)送请琳。

9.timer

timer操作符創(chuàng)建一個(gè)在給定的時(shí)間段之后返回一個(gè)特殊值的Observable,它在延遲一段給定的時(shí)間后發(fā)射一個(gè)簡單的數(shù)字0赠幕。比如下面的程序會在500毫秒之后輸出一個(gè)數(shù)字0俄精。

Observable.timer(500, TimeUnit.MILLISECONDS).subscribe(System.out::print);

下面是該方法及其重載方法的定義,重載方法還可以指定一個(gè)調(diào)度器:

  1. public static Observable<Long> timer(long delay, TimeUnit unit)
  2. public static Observable<Long> timer(long delay, TimeUnit unit, Scheduler scheduler)

2.1.2 變換操作

1.map & cast

  1. map操作符對原始Observable發(fā)射的每一項(xiàng)數(shù)據(jù)應(yīng)用一個(gè)你選擇的函數(shù)榕堰,然后返回一個(gè)發(fā)射這些結(jié)果的Observable竖慧。默認(rèn)不在任何特定的調(diào)度器上執(zhí)行。
  2. cast操作符將原始Observable發(fā)射的每一項(xiàng)數(shù)據(jù)都強(qiáng)制轉(zhuǎn)換為一個(gè)指定的類型(多態(tài))局冰,然后再發(fā)射數(shù)據(jù)测蘑,它是map的一個(gè)特殊版本:

下面的第一段代碼用于將生成的整數(shù)序列轉(zhuǎn)換成一個(gè)字符串序列之后并輸出;第二段代碼用于將Date類型轉(zhuǎn)換成Object類型并進(jìn)行輸出康二,這里如果前面的Class無法轉(zhuǎn)換成第二個(gè)Class就會出現(xiàn)異常:

Observable.range(1, 5).map(String::valueOf).subscribe(System.out::println);
Observable.just(new Date()).cast(Object.class).subscribe(System.out::print);

這兩個(gè)方法的定義如下:

  1. public final <R> Observable<R> map(Function<? super T, ? extends R> mapper)
  2. public final <U> Observable<U> cast(Class<U> clazz)

這里的mapper函數(shù)接受兩個(gè)泛型碳胳,一個(gè)表示原始的數(shù)據(jù)類型,一個(gè)表示要轉(zhuǎn)換之后的數(shù)據(jù)類型沫勿,轉(zhuǎn)換的邏輯寫在該接口實(shí)現(xiàn)的方法中即可挨约。

2.flatMap & contactMap

flatMap將一個(gè)發(fā)送事件的上游Observable變換為多個(gè)發(fā)送事件的Observables,然后將它們發(fā)射的事件合并后放進(jìn)一個(gè)單獨(dú)的Observable里产雹。需要注意的是, flatMap并不保證事件的順序诫惭,也就是說轉(zhuǎn)換之后的Observables的順序不必與轉(zhuǎn)換之前的序列的順序一致。比如下面的代碼用于將一個(gè)序列構(gòu)成的整數(shù)轉(zhuǎn)換成多個(gè)單個(gè)的Observable蔓挖,然后組成一個(gè)OBservable夕土,并被訂閱。下面輸出的結(jié)果仍將是一個(gè)字符串?dāng)?shù)字序列瘟判,只是順序不一定是增序的怨绣。

Observable.range(1, 5)
        .flatMap((Function<Integer, ObservableSource<String>>) i -> Observable.just(String.valueOf(i)))
        .subscribe(System.out::println);

flatMap對應(yīng)的方法是contactMap,后者能夠保證最終輸出的順序與上游發(fā)送的順序一致拷获。下面是這兩個(gè)方法的定義:

  1. public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper)
  2. public final <R> Observable<R> concatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper)

flatMap的重載方法數(shù)量過多篮撑,它們在數(shù)據(jù)源方面略有不同,有的支持錯(cuò)誤等可選參數(shù)匆瓜,具體可以參考源代碼赢笨。

3.flatMapIterable

flatMapIterable可以用來將上流的任意一個(gè)元素轉(zhuǎn)換成一個(gè)Iterable對象未蝌,然后我們可以對其進(jìn)行消費(fèi)。在下面的代碼中茧妒,我們先生成一個(gè)整數(shù)的序列萧吠,然后將每個(gè)整數(shù)映射成一個(gè)Iterable<string>類型,最后桐筏,我們對其進(jìn)行訂閱和消費(fèi):

Observable.range(1, 5)
        .flatMapIterable((Function<Integer, Iterable<String>>) integer -> Collections.singletonList(String.valueOf(integer)))
        .subscribe(s -> System.out.println("flatMapIterable : " + s));

下面是該方法及其重載方法的定義:

  1. public final <U> Observable<U> flatMapIterable(Function<? super T, ? extends Iterable<? extends U>> mapper)
  2. public final <U, V> Observable<V> flatMapIterable(Function<? super T, ? extends Iterable<? extends U>> mapper, BiFunction<? super T, ? super U, ? extends V> resultSelector)

4.buffer

該方法用于將整個(gè)流進(jìn)行分組怎憋。以下面的程序?yàn)槔覀儠壬梢粋€(gè)7個(gè)整數(shù)構(gòu)成的流九昧,然后使用buffer之后绊袋,這些整數(shù)會被3個(gè)作為一組進(jìn)行輸出,所以當(dāng)我們訂閱了buffer轉(zhuǎn)換之后的Observable之后得到的是一個(gè)列表構(gòu)成的OBservable

Observable.range(1, 7).buffer(3)
        .subscribe(integers -> System.out.println(Arrays.toString(integers.toArray())));

下面是這個(gè)方法及其重載方法的定義铸鹰,它的重載方法太多癌别,這里我們只給出其中的兩個(gè),其他的可以參考RxJava的源碼蹋笼。這里的buffer應(yīng)該理解為一個(gè)緩沖區(qū)展姐,當(dāng)緩沖區(qū)滿了或者剩余的數(shù)據(jù)不夠一個(gè)緩沖區(qū)的時(shí)候就將數(shù)據(jù)發(fā)射出去。

  1. public final Observable<List<T>> buffer(int count)
  2. public final Observable<List<T>> buffer(int count, int skip)
  3. ...

5.groupBy

groupBy用于分組元素剖毯,它可以被用來根據(jù)指定的條件將元素分成若干組圾笨。它將得到一個(gè)Observable<GroupedObservable<T, M>>類型的Observable。如下面的程序所示逊谋,這里我們使用concat方法先將兩個(gè)Observable拼接成一個(gè)Observable擂达,然后對其元素進(jìn)行分組。這里我們的分組依據(jù)是整數(shù)的值胶滋,這樣我們將得到一個(gè)Observable<GroupedObservable<Integer, Integer>>類型的Observable板鬓。然后,我們再將得到的序列拼接成一個(gè)并進(jìn)行訂閱輸出:

Observable<GroupedObservable<Integer, Integer>> observable = Observable.concat(
        Observable.range(1,4), Observable.range(1,6)).groupBy(integer -> integer);
Observable.concat(observable).subscribe(integer -> System.out.println("groupBy : " + integer));

該方法有多個(gè)重載版本究恤,這里我們用到的一個(gè)的定義是:

public final <K> Observable<GroupedObservable<K, T>> groupBy(Function<? super T, ? extends K> keySelector)

6.scan

scan操作符對原始Observable發(fā)射的第一項(xiàng)數(shù)據(jù)應(yīng)用一個(gè)函數(shù)俭令,然后將那個(gè)函數(shù)的結(jié)果作為自己的第一項(xiàng)數(shù)據(jù)發(fā)射。它將函數(shù)的結(jié)果同第二項(xiàng)數(shù)據(jù)一起填充給這個(gè)函數(shù)來產(chǎn)生它自己的第二項(xiàng)數(shù)據(jù)部宿。它持續(xù)進(jìn)行這個(gè)過程來產(chǎn)生剩余的數(shù)據(jù)序列抄腔。這個(gè)操作符在某些情況下被叫做accumulator。

以下面的程序?yàn)槔碚牛摮绦虻妮斀Y(jié)果是2 6 24 120 720赫蛇,可以看出這里的計(jì)算規(guī)則是,我們把傳入到scan中的函數(shù)記為f涯穷,序列記為x棍掐,生成的序列記為y藏雏,那么這里的計(jì)算公式是y(0)=x(0); y(i)=f(y(i-1), x(i)), i>0

Observable.range(2, 5).scan((i1, i2) -> i1 * i2).subscribe(i -> System.out.print(i + " "));

除了上面的這種形式拷况,scan方法還有一個(gè)重載的版本作煌,我們可以使用這個(gè)版本的方法來在生成序列的時(shí)候指定一個(gè)初始值。以下面的程序?yàn)槔荩妮敵鼋Y(jié)果是3 6 18 72 360 2160粟誓,可以看出它的輸出比上面的形式多了1個(gè),這是因?yàn)楫?dāng)指定了初始值之后起意,生成的第一個(gè)數(shù)字就是那個(gè)初始值鹰服,剩下的按照我們上面的規(guī)則進(jìn)行的。所以揽咕,用同樣的函數(shù)語言來描述的話悲酷,那么它就應(yīng)該是下面的這種形式:y(0)=initialValue; y(i)=f(y(i-1), x(i)), i>0

Observable.range(2, 5).scan(3, (i1, i2) -> i1 * i2).subscribe(i -> System.out.print(i + " "));

以上方法的定義是:

  1. public final Observable<T> scan(BiFunction<T, T, T> accumulator)
  2. public final <R> Observable<R> scan(R initialValue, BiFunction<R, ? super T, R> accumulator)

7.window

windowWindow和Buffer類似亲善,但不是發(fā)射來自原始Observable的數(shù)據(jù)包设易,它發(fā)射的是Observable,這些Observables中的每一個(gè)都發(fā)射原始Observable數(shù)據(jù)的一個(gè)子集蛹头,最后發(fā)射一個(gè)onCompleted通知顿肺。

以下面的程序?yàn)槔谧遥@里我們首先生成了一個(gè)由10個(gè)數(shù)字組成的整數(shù)序列映砖,然后使用window函數(shù)將它們每3個(gè)作為一組吵聪,每組會返回一個(gè)對應(yīng)的Observable對象泛源。
這里我們對該返回的結(jié)果進(jìn)行訂閱并進(jìn)行消費(fèi)仔燕,因?yàn)?0個(gè)數(shù)字舒帮,所以會被分成4個(gè)組蒙袍,每個(gè)對應(yīng)一個(gè)Observable:

Observable.range(1, 10).window(3).subscribe(
        observable -> observable.subscribe(integer -> System.out.println(observable.hashCode() + " : " + integer)));

除了對數(shù)據(jù)包進(jìn)行分組其兴,我們還可以根據(jù)時(shí)間來對發(fā)射的數(shù)據(jù)進(jìn)行分組骚烧。該方法有多個(gè)重載的版本控淡,這里我們給出其中的比較具有代表性的幾個(gè):

  1. public final Observable<Observable<T>> window(long count)
  2. public final Observable<Observable<T>> window(long timespan, long timeskip, TimeUnit unit)
  3. public final <B> Observable<Observable<T>> window(ObservableSource<B> boundary)
  4. public final <B> Observable<Observable<T>> window(Callable<? extends ObservableSource<B>> boundary)

2.1.3 過濾操作

1.filter

filter用來根據(jù)指定的規(guī)則對源進(jìn)行過濾,比如下面的程序用來過濾整數(shù)1到10中所有大于5的數(shù)字:

Observable.range(1,10).filter(i -> i > 5).subscribe(System.out::println);

下面是該方法的定義:

  1. public final Observable<T> filter(Predicate<? super T> predicate)

2.elementAt & firstElement & lastElement

elementAt用來獲取源中指定位置的數(shù)據(jù)止潘,它有幾個(gè)重載方法掺炭,這里我們介紹一下最簡單的一個(gè)方法的用法。下面是elementAt的一個(gè)示例凭戴,它將獲取源數(shù)據(jù)中索引為1的元素并交給觀察者訂閱涧狮。下面的程序?qū)⑤敵?code>1

Observable.range(1, 10).elementAt(0).subscribe(System.out::print);

這里我們給出elementAt及其相關(guān)的方法的定義,它們的使用相似么夫。注意一下這里的返回類型:

  1. public final Maybe<T> elementAt(long index)
  2. public final Single<T> elementAt(long index, T defaultItem)
  3. public final Single<T> elementAtOrError(long index)

除了獲取指定索引的元素的方法之外者冤,RxJava中還有可以用來直接獲取第一個(gè)和最后一個(gè)元素的方法,這里我們直接給出方法的定義:

  1. public final Maybe<T> firstElement()
  2. public final Single<T> first(T defaultItem)
  3. public final Single<T> firstOrError()
  4. public final Maybe<T> lastElement()
  5. public final Single<T> last(T defaultItem)
  6. public final Single<T> lastOrError()

3.distinct & distinctUntilChanged

distinct用來對源中的數(shù)據(jù)進(jìn)行過濾档痪,以下面的程序?yàn)槔娣悖@里會把重復(fù)的數(shù)字7過濾掉:

Observable.just(1,2,3,4,5,6,7,7).distinct().subscribe(System.out::print);

與之類似的還有distinctUntilChanged方法,與distinct不同的是腐螟,它只當(dāng)相鄰的兩個(gè)元素相同的時(shí)候才會將它們過濾掉愿汰。比如下面的程序會過濾掉其中的2和5困后,所以最終的輸出結(jié)果是12345676

Observable.just(1,2,2,3,4,5,5,6,7,6).distinctUntilChanged().subscribe(System.out::print);

該方法也有幾個(gè)功能相似的方法,這里給出它們的定義如下:

  1. public final Observable<T> distinct()
  2. public final <K> Observable<T> distinct(Function<? super T, K> keySelector)
  3. public final <K> Observable<T> distinct(Function<? super T, K> keySelector, Callable<? extends Collection<? super K>> collectionSupplier)
  4. public final Observable<T> distinctUntilChanged()
  5. public final <K> Observable<T> distinctUntilChanged(Function<? super T, K> keySelector)
  6. public final Observable<T> distinctUntilChanged(BiPredicate<? super T, ? super T> comparer)

4.skip & skipLast & skipUntil & skipWhile

skip方法用于過濾掉數(shù)據(jù)的前n項(xiàng)衬廷,比如下面的程序?qū)^濾掉前2項(xiàng)摇予,因此輸出結(jié)果是345

Observable.range(1, 5).skip(2).subscribe(System.out::print);

skip方法對應(yīng)的是take方法,它用來表示只選擇數(shù)據(jù)源的前n項(xiàng)吗跋,該方法的示例就不給出了侧戴。這里,我們說一下與之類功能類似的重載方法跌宛。skip還有一個(gè)重載方法接受兩個(gè)參數(shù)酗宋,用來表示跳過指定的時(shí)間,也就是在指定的時(shí)間之后才開始進(jìn)行訂閱和消費(fèi)疆拘。下面的程序會在3秒之后才開始不斷地輸出數(shù)字:

Observable.range(1,5).repeat().skip(3, TimeUnit.SECONDS).subscribe(System.out::print);

skip功能相反的方法的還有skipLast本缠,它用來表示過濾掉后面的幾項(xiàng),以及最后的一段時(shí)間不進(jìn)行發(fā)射等入问。比如下面的方法丹锹,我們會在程序開始之前進(jìn)行計(jì)時(shí),然后會不斷重復(fù)輸出數(shù)字芬失,直到5秒之后結(jié)束楣黍。然后,我們用skipLast方法表示最后的2秒不再進(jìn)行發(fā)射棱烂。所以下面的程序會先不斷輸出數(shù)字3秒租漂,3秒結(jié)束后停止輸出,并在2秒之后結(jié)束程序:

long current = System.currentTimeMillis();
Observable.range(1,5)
        .repeatUntil(() -> System.currentTimeMillis() - current > TimeUnit.SECONDS.toMillis(5))
        .skipLast(2, TimeUnit.SECONDS).subscribe(System.out::print);

與上面的這些方法類似的還有一些颊糜,這里我們不再一一列舉哩治。因?yàn)檫@些方法的重載方法比較多,下面我們給出其中的具有代表性的一部分:

  1. public final Observable<T> skip(long count)
  2. public final Observable<T> skip(long time, TimeUnit unit, Scheduler scheduler)
  3. public final Observable<T> skipLast(int count)
  4. public final Observable<T> skipLast(long time, TimeUnit unit, Scheduler scheduler, boolean delayError, int bufferSize)
  5. public final <U> Observable<T> skipUntil(ObservableSource<U> other)
  6. public final Observable<T> skipWhile(Predicate<? super T> predicate)

5.take & takeLast & takeUntil & takeWhile

skip方法對應(yīng)的是take方法衬鱼,它表示按照某種規(guī)則進(jìn)行選擇操作业筏。我們以下面的程序?yàn)槔@里第一段程序表示只發(fā)射序列中的前2個(gè)數(shù)據(jù):

Observable.range(1, 5).take(2).subscribe(System.out::print);

下面的程序表示只選擇最后2秒中輸出的數(shù)據(jù):

long current = System.currentTimeMillis();
Observable.range(1,5)
        .repeatUntil(() -> System.currentTimeMillis() - current > TimeUnit.SECONDS.toMillis(5))
        .takeLast(2, TimeUnit.SECONDS).subscribe(System.out::print);

下面是以上相關(guān)的方法的定義鸟赫,同樣的蒜胖,我們只選擇其中比較有代表性的幾個(gè):

  1. public final Observable<T> take(long count)
  2. public final Observable<T> takeLast(long count, long time, TimeUnit unit, Scheduler scheduler, boolean delayError, int bufferSize)
  3. public final <U> Observable<T> takeUntil(ObservableSource<U> other)
  4. public final Observable<T> takeUntil(Predicate<? super T> stopPredicate)
  5. public final Observable<T> takeWhile(Predicate<? super T> predicate)

6.ignoreElements

該方法用來過濾所有源Observable產(chǎn)生的結(jié)果,只會把Observable的onComplete和onError事件通知給訂閱者抛蚤。下面是該方法的定義:

  1. public final Completable ignoreElements()

7.throttleFirst & throttleLast & throttleLatest & throttleWithTimeout

這些方法用來對輸出的數(shù)據(jù)進(jìn)行限制台谢,它們是通過時(shí)間的”窗口“來進(jìn)行限制的,你可以理解成按照指定的參數(shù)對時(shí)間進(jìn)行分片岁经,然后根據(jù)各個(gè)方法的要求選擇第一個(gè)朋沮、最后一個(gè)、最近的等進(jìn)行發(fā)射缀壤。下面是throttleLast方法的用法示例樊拓,它會輸出每個(gè)500毫秒之間的數(shù)字中最后一個(gè)數(shù)字:

Observable.interval(80, TimeUnit.MILLISECONDS)
        .throttleLast(500, TimeUnit.MILLISECONDS)
        .subscribe(i -> System.out.print(i + " "));

其他的幾個(gè)方法的功能大致列舉如下:

  1. throttleFirst只會發(fā)射指定的Observable在指定的事件范圍內(nèi)發(fā)射出來的第一個(gè)數(shù)據(jù)纠亚;
  2. throttleLast只會發(fā)射指定的Observable在指定的事件范圍內(nèi)發(fā)射出來的最后一個(gè)數(shù)據(jù);
  3. throttleLatest用來發(fā)射距離指定的時(shí)間分片最近的那個(gè)數(shù)據(jù);
  4. throttleWithTimeout僅在過了一段指定的時(shí)間還沒發(fā)射數(shù)據(jù)時(shí)才發(fā)射一個(gè)數(shù)據(jù)骑脱,如果在一個(gè)時(shí)間片達(dá)到之前,發(fā)射的數(shù)據(jù)之后又緊跟著發(fā)射了一個(gè)數(shù)據(jù)苍糠,那么這個(gè)時(shí)間片之內(nèi)之前發(fā)射的數(shù)據(jù)會被丟掉叁丧,該方法底層是使用debounce方法實(shí)現(xiàn)的。如果數(shù)據(jù)發(fā)射的頻率總是快過這里的timeout參數(shù)指定的時(shí)間岳瞭,那么將不會再發(fā)射出數(shù)據(jù)來拥娄。

下面是這些方法及其重載方法的定義(選擇其中一部分):

  1. public final Observable<T> throttleFirst(long skipDuration, TimeUnit unit, Scheduler scheduler)
  2. public final Observable<T> throttleLast(long intervalDuration, TimeUnit unit, Scheduler scheduler)
  3. public final Observable<T> throttleLatest(long timeout, TimeUnit unit, Scheduler scheduler, boolean emitLast)
  4. public final Observable<T> throttleWithTimeout(long timeout, TimeUnit unit, Scheduler scheduler)

8.debounce

debounce也是用來限制發(fā)射頻率過快的,它僅在過了一段指定的時(shí)間還沒發(fā)射數(shù)據(jù)時(shí)才發(fā)射一個(gè)數(shù)據(jù)瞳筏。我們通過下面的圖來說明這個(gè)問題:

debounce

這里紅稚瘾、綠、藍(lán)三個(gè)球發(fā)射出來的原因都是因?yàn)楫?dāng)反射了這個(gè)球之后的一定的時(shí)間內(nèi)沒有其他的球發(fā)射出來姚炕,這個(gè)時(shí)間是我們可以通過參數(shù)來指定的摊欠。

該方法的用法與throttle之類的方法類似,上面也說過throttle那些方法底層用了debounce實(shí)現(xiàn)柱宦,所以些椒,這里我們不再為該方法專門編寫相關(guān)的測試代碼。

9.sample

實(shí)際上throttleLast的實(shí)現(xiàn)中內(nèi)部調(diào)用的就是sample掸刊。

2.1.4 組合操作

1.startWith & startWithArray

startWith方法可以用來在指定的數(shù)據(jù)源的之前插入幾個(gè)數(shù)據(jù)免糕,它的功能類似的方法有startWithArray,另外還有幾個(gè)重載方法忧侧。這里我們給出一個(gè)基本的用法示例石窑,下面的程序會在原始的數(shù)字流1-5的前面加上0,所以最終的輸出結(jié)果是012345

Observable.range(1,5).startWith(0).subscribe(System.out::print);

下面是startWith及其幾個(gè)功能相關(guān)的方法的定義:

  1. public final Observable<T> startWith(Iterable<? extends T> items)
  2. public final Observable<T> startWith(ObservableSource<? extends T> other)
  3. public final Observable<T> startWith(T item)
  4. public final Observable<T> startWithArray(T... items)

2.merge & mergeArray

merge可以讓多個(gè)數(shù)據(jù)源的數(shù)據(jù)合并起來進(jìn)行發(fā)射蚓炬,當(dāng)然它可能會讓merge之后的數(shù)據(jù)交錯(cuò)發(fā)射松逊。下面是一個(gè)示例,這個(gè)例子中肯夏,我們使用merge方法將兩個(gè)Observable合并到了一起進(jìn)行監(jiān)聽:

Observable.merge(Observable.range(1,5), Observable.range(6,5)).subscribe(System.out::print);

鑒于merge方法及其功能類似的方法太多棺棵,我們這里挑選幾個(gè)比較有代表性的方法,具體的可以查看RxJava的源代碼:

  1. public static <T> Observable<T> merge(Iterable<? extends ObservableSource<? extends T>> sources)
  2. public static <T> Observable<T> mergeArray(ObservableSource<? extends T>... sources)
  3. public static <T> Observable<T> mergeDelayError(Iterable<? extends ObservableSource<? extends T>> sources)
  4. public static <T> Observable<T> mergeArrayDelayError(ObservableSource<? extends T>... sources)

這里的mergeError方法與merge方法的表現(xiàn)一致熄捍,只是在處理由onError觸發(fā)的錯(cuò)誤的時(shí)候有所不同烛恤。mergeError方法會等待所有的數(shù)據(jù)發(fā)射完畢之后才把錯(cuò)誤發(fā)射出來,即使多個(gè)錯(cuò)誤被觸發(fā)余耽,該方法也只會發(fā)射出一個(gè)錯(cuò)誤信息缚柏。而如果使用merger方法,那么當(dāng)有錯(cuò)誤被觸發(fā)的時(shí)候碟贾,該錯(cuò)誤會直接被拋出來币喧,并結(jié)束發(fā)射操作轨域。下面是該方法的一個(gè)使用的示例,這里我們主線程停頓4秒杀餐,然后所有merge的Observable中的一個(gè)會在線程開始的第2秒的時(shí)候觸發(fā)一個(gè)錯(cuò)誤干发,該錯(cuò)誤最終會在所有的數(shù)據(jù)發(fā)射完畢之后被發(fā)射出來:

Observable.mergeDelayError(Observable.range(1,5),
        Observable.range(1,5).repeat(2),
        Observable.create((ObservableOnSubscribe<String>) observableEmitter -> {
            Thread.sleep(2000);
            observableEmitter.onError(new Exception("error"));
        })
).subscribe(System.out::print, System.out::print);
Thread.sleep(4000);

3.concat & concatArray & concatEager

該方法也是用來將多個(gè)Observable拼接起來,但是它會嚴(yán)格按照傳入的Observable的順序進(jìn)行發(fā)射史翘,一個(gè)Observable沒有發(fā)射完畢之前不會發(fā)射另一個(gè)Observable里面的數(shù)據(jù)枉长。下面是一個(gè)程序示例,這里傳入了兩個(gè)Observable琼讽,會按照順序輸出12345678910

Observable.concat(Observable.range(1, 5), Observable.range(6, 5)).subscribe(System.out::print);

下面是該方法的定義必峰,鑒于該方法及其重載方法太多,這里我們選擇幾個(gè)比較有代表性的說明:

  1. public static <T> Observable<T> concat(Iterable<? extends ObservableSource<? extends T>> sources)
  2. public static <T> Observable<T> concatDelayError(Iterable<? extends ObservableSource<? extends T>> sources)
  3. public static <T> Observable<T> concatArray(ObservableSource<? extends T>... sources)
  4. public static <T> Observable<T> concatArrayDelayError(ObservableSource<? extends T>... sources)
  5. public static <T> Observable<T> concatEager(ObservableSource<? extends ObservableSource<? extends T>> sources)
  6. public static <T> Observable<T> concatArrayEager(ObservableSource<? extends T>... sources)

對于concat方法钻蹬,我們之前已經(jīng)介紹過它的用法吼蚁;這里的conactArray的功能與之類似;對于concatEager方法问欠,當(dāng)一個(gè)觀察者訂閱了它的結(jié)果肝匆,那么就相當(dāng)于訂閱了它拼接的所有ObservableSource,并且會先緩存這些ObservableSource發(fā)射的數(shù)據(jù)顺献,然后再按照順序?qū)⑺鼈儼l(fā)射出來术唬。而對于這里的concatDelayError方法的作用和前面的mergeDelayError類似,只有當(dāng)所有的數(shù)據(jù)都發(fā)射完畢才會處理異常滚澜。

4.zip & zipArray & zipIterable

zip操作用來將多個(gè)數(shù)據(jù)項(xiàng)進(jìn)行合并粗仓,可以通過一個(gè)函數(shù)指定這些數(shù)據(jù)項(xiàng)的合并規(guī)則。比如下面的程序的輸出結(jié)果是6 14 24 36 50设捐,顯然這里的合并的規(guī)則是相同索引的兩個(gè)數(shù)據(jù)的乘積借浊。不過仔細(xì)看下這里的輸出結(jié)果,可以看出萝招,如果一個(gè)數(shù)據(jù)項(xiàng)指定的位置沒有對應(yīng)的值的時(shí)候蚂斤,它是不會參與這個(gè)變換過程的:

Observable.zip(Observable.range(1, 6), Observable.range(6, 5), (integer, integer2) -> integer * integer2)
        .subscribe(i -> System.out.print(i + " "));

zip方法有多個(gè)重載的版本,同時(shí)也有功能近似的方法槐沼,這里我們挑選有代表性的幾個(gè)進(jìn)行說明:

  1. public static <T, R> Observable<R> zip(Iterable<? extends ObservableSource<? extends T>> sources, Function<? super Object[], ? extends R> zipper)
  2. ublic static <T, R> Observable<R> zipArray(Function<? super Object[], ? extends R> zipper, boolean delayError, int bufferSize, ObservableSource... sources)
  3. public static <T, R> Observable<R> zipIterable(Iterable<? extends ObservableSource<? extends T>> sources, Function<? super Object[], ? extends R> zipper, boolean delayError, int bufferSize)

實(shí)際上上面幾個(gè)方法的用法和功能基本類似曙蒸,區(qū)別在于傳入的ObservableSource的參數(shù)的形式。

5.combineLastest

zip操作類似岗钩,但是這個(gè)操作的輸出結(jié)果與zip截然不同纽窟,以下面的程序?yàn)槔妮敵鼋Y(jié)果是36 42 48 54 60

Observable.combineLatest(Observable.range(1, 6), Observable.range(6, 5), (integer, integer2) -> integer * integer2)
        .subscribe(i -> System.out.print(i + " "));

利用下面的這張圖可以比較容易來說明這個(gè)問題:

[圖片上傳失敗...(image-5b184-1534261018262)]

上圖中的上面的兩條橫線代表用于拼接的兩個(gè)數(shù)據(jù)項(xiàng)兼吓,下面的一條橫線是拼接之后的結(jié)果臂港。combineLatest的作用是拼接最新發(fā)射的兩個(gè)數(shù)據(jù)。下面我們用上圖的過程來說明該方法是如何執(zhí)行的:開始第一條只有1的時(shí)候無法拼接,审孽;當(dāng)?shù)诙l出現(xiàn)A的時(shí)候县袱,此時(shí)最新的數(shù)據(jù)是1和A,故組合成一個(gè)1A佑力;第二個(gè)數(shù)據(jù)項(xiàng)發(fā)射了B式散,此時(shí)最新的數(shù)據(jù)是1和B,故組合成1B打颤;第一條橫線發(fā)射了2暴拄,此時(shí)最新的數(shù)據(jù)是2和B,因此得到了2B瘸洛,依次類推揍移。然后再回到我們上面的問題次和,第一個(gè)數(shù)據(jù)項(xiàng)連續(xù)發(fā)射了5個(gè)數(shù)據(jù)的時(shí)候反肋,第二個(gè)數(shù)據(jù)項(xiàng)一個(gè)都沒有發(fā)射出來,因此沒有任何輸出踏施;然后第二個(gè)數(shù)據(jù)項(xiàng)開始發(fā)射數(shù)據(jù)石蔗,當(dāng)?shù)诙€(gè)數(shù)據(jù)項(xiàng)發(fā)射了6的時(shí)候,此時(shí)最新的數(shù)據(jù)組合是6和6畅形,故得36养距;然后,第二個(gè)數(shù)據(jù)項(xiàng)發(fā)射了7日熬,此時(shí)最新的數(shù)據(jù)組合是6和7棍厌,故得42,依次類推竖席。

該方法也有對應(yīng)的combineLatestDelayError方法耘纱,用途也是只有當(dāng)所有的數(shù)據(jù)都發(fā)射完畢的時(shí)候才去處理錯(cuò)誤邏輯。

2.1.5 輔助操作

1.delay

delay方法用于在發(fā)射數(shù)據(jù)之前停頓指定的時(shí)間毕荐,比如下面的程序會在真正地發(fā)射數(shù)據(jù)之前停頓1秒:

Observable.range(1, 5).delay(1000, TimeUnit.MILLISECONDS).subscribe(System.out::print);
Thread.sleep(1500);

同樣delay方法也有幾個(gè)重載的方法束析,可以供我們用來指定觸發(fā)的線程等信息,這里給出其中的兩個(gè)憎亚,其他的可以參考源碼和文檔:

  1. public final Observable<T> delay(long delay, TimeUnit unit)
  2. public final Observable<T> delay(long delay, TimeUnit unit, Scheduler scheduler)

2.do系列

RxJava中還有一系列的方法可以供我們使用员寇,它們共同的特點(diǎn)是都是以do開頭,下面我們列舉一下這些方法并簡要說明一下它們各自的用途:

  1. public final Observable<T> doAfterNext(Consumer<? super T> onAfterNext)第美,會在onNext方法之后觸發(fā)蝶锋;
  2. public final Observable<T> doAfterTerminate(Action onFinally),會在Observable終止之后觸發(fā)什往;
  3. public final Observable<T> doFinally(Action onFinally)牲览,當(dāng)onComplete或者onError的時(shí)候觸發(fā);
  4. public final Observable<T> doOnDispose(Action onDispose),當(dāng)被dispose的時(shí)候觸發(fā)第献;
  5. public final Observable<T> doOnComplete(Action onComplete)贡必,當(dāng)complete的時(shí)候觸發(fā);
  6. public final Observable<T> doOnEach(final Observer<? super T> observer)庸毫,當(dāng)每個(gè)onNext調(diào)用的時(shí)候觸發(fā)仔拟;
  7. public final Observable<T> doOnError(Consumer<? super Throwable> onError),當(dāng)調(diào)用onError的時(shí)候觸發(fā)飒赃;
  8. public final Observable<T> doOnLifecycle(final Consumer<? super Disposable> onSubscribe, final Action onDispose)
  9. public final Observable<T> doOnNext(Consumer<? super T> onNext)利花,,會在onNext的時(shí)候觸發(fā)载佳;
  10. public final Observable<T> doOnSubscribe(Consumer<? super Disposable> onSubscribe)炒事,會在訂閱的時(shí)候觸發(fā);
  11. public final Observable<T> doOnTerminate(final Action onTerminate)蔫慧,當(dāng)終止之前觸發(fā)挠乳。

這些方法可以看作是對操作執(zhí)行過程的一個(gè)監(jiān)聽,當(dāng)指定的操作被觸發(fā)的時(shí)候會同時(shí)觸發(fā)這些監(jiān)聽方法:

Observable.range(1, 5)
        .doOnEach(integerNotification -> System.out.println("Each : " + integerNotification.getValue()))
        .doOnComplete(() -> System.out.println("complete"))
        .doFinally(() -> System.out.println("finally"))
        .doAfterNext(i -> System.out.println("after next : " + i))
        .doOnSubscribe(disposable -> System.out.println("subscribe"))
        .doOnTerminate(() -> System.out.println("terminal"))
        .subscribe(i -> System.out.println("subscribe : " + i));

3.subscribeOn & observeOn

subscribeOn用于指定Observable自身運(yùn)行的線程姑躲,observeOn用于指定發(fā)射數(shù)據(jù)所處的線程睡扬,比如Android中的異步任務(wù)需要用observeOn指定發(fā)射數(shù)據(jù)所在的線程是非主線程,然后執(zhí)行完畢之后將結(jié)果發(fā)送給主線程黍析,就需要用subscribeOn來指定卖怜。比如下面的程序,我們用這兩個(gè)方法來指定所在的線程:

 Observable.create((ObservableOnSubscribe<Integer>) observableEmitter -> {
    System.out.println(Thread.currentThread());
    observableEmitter.onNext(0);
}).observeOn(Schedulers.newThread()).subscribeOn(Schedulers.computation())
        .subscribe(integer -> System.out.println(Thread.currentThread()));

最終的輸出結(jié)果如下所示:

Thread[RxComputationThreadPool-1,5,main]
Thread[RxNewThreadScheduler-1,5,main]

4.timeout

用來設(shè)置一個(gè)超時(shí)時(shí)間阐枣,如果指定的時(shí)間之內(nèi)沒有任何數(shù)據(jù)被發(fā)射出來马靠,那么就會執(zhí)行我們指定的數(shù)據(jù)項(xiàng)。如下面的程序所示蔼两,我們先為設(shè)置了一個(gè)間隔200毫秒的數(shù)字產(chǎn)生器甩鳄,開始發(fā)射數(shù)據(jù)之前要停頓1秒鐘,因?yàn)槲覀冊O(shè)置的超時(shí)時(shí)間是500毫秒宪哩,因而在第500毫秒的時(shí)候會執(zhí)行我們傳入的數(shù)據(jù)項(xiàng):

Observable.interval(1000, 200, TimeUnit.MILLISECONDS)
        .timeout(500, TimeUnit.MILLISECONDS, Observable.rangeLong(1, 5))
        .subscribe(System.out::print);
Thread.sleep(2000);

timeout方法有多個(gè)重載方法娩贷,可以為其指定線程等參數(shù),可以參考源碼或者文檔了解詳情锁孟。

2.1.6 錯(cuò)誤處理操作符

錯(cuò)誤處理操作符主要用來提供給Observable彬祖,用來對錯(cuò)誤信息做統(tǒng)一的處理,常用的兩個(gè)是catchretry品抽。

1.catch

catch操作會攔截原始的Observable的onError通知储笑,將它替換為其他數(shù)據(jù)項(xiàng)或者數(shù)據(jù)序列,讓產(chǎn)生的Observable能夠正常終止或者根本不終止圆恤。在RxJava中該操作有3終類型:

  1. onErrorReturn:這種操作會在onError觸發(fā)的時(shí)候返回一個(gè)特殊的項(xiàng)替換錯(cuò)誤突倍,并調(diào)用觀察者的onCompleted方法,而不會將錯(cuò)誤傳遞給觀察者;
  2. onErrorResumeNext:會在onError觸發(fā)的時(shí)候發(fā)射備用的數(shù)據(jù)項(xiàng)給觀察者羽历;
  3. onExceptionResumeNext:如果onError觸發(fā)的時(shí)候onError收到的Throwable不是Exception焊虏,它會將錯(cuò)誤傳遞給觀察者的onError方法,不會使用備用的Observable秕磷。

下面是onErrorReturnonErrorResumeNext的程序示例诵闭,這里第一段代碼會在出現(xiàn)錯(cuò)誤的時(shí)候輸出666,而第二段會在出現(xiàn)錯(cuò)誤的時(shí)候發(fā)射數(shù)字12345

    Observable.create((ObservableOnSubscribe<Integer>) observableEmitter -> {
        observableEmitter.onError(null);
        observableEmitter.onNext(0);
    }).onErrorReturn(throwable -> 666).subscribe(System.out::print);

    Observable.create((ObservableOnSubscribe<Integer>) observableEmitter -> {
        observableEmitter.onError(null);
        observableEmitter.onNext(0);
    }).onErrorResumeNext(Observable.range(1,5)).subscribe(System.out::print);

2.retry

retry使用了一種錯(cuò)誤重試機(jī)制澎嚣,它可以在出現(xiàn)錯(cuò)誤的時(shí)候進(jìn)行重試疏尿,我們可以通過參數(shù)指定重試機(jī)制的條件。以下面的程序?yàn)槔滋遥@里我們設(shè)置了當(dāng)出現(xiàn)錯(cuò)誤的時(shí)候會進(jìn)行2次重試褥琐,因此,第一次的時(shí)候出現(xiàn)錯(cuò)誤會調(diào)用onNext晤郑,重試2次又會調(diào)用2次onNext敌呈,第二次重試的時(shí)候因?yàn)橹卦囉殖霈F(xiàn)了錯(cuò)誤,因此此時(shí)會觸發(fā)onError方法贩汉。也就是說驱富,下面這段代碼會觸發(fā)onNext3次锚赤,觸發(fā)onError()1次:

    Observable.create(((ObservableOnSubscribe<Integer>) emitter -> {
        emitter.onNext(0);
        emitter.onError(new Throwable("Error1"));
        emitter.onError(new Throwable("Error2"));
    })).retry(2).subscribe(i -> System.out.println("onNext : " + i), error -> System.out.print("onError : " + error));

retry有幾個(gè)重載的方法和功能相近的方法匹舞,下面是這些方法的定義(選取部分):

  1. public final Observable<T> retry():會進(jìn)行無限次地重試;
  2. public final Observable<T> retry(BiPredicate<? super Integer, ? super Throwable> predicate)
  3. public final Observable<T> retry(long times):指定重試次數(shù)线脚;
  4. public final Observable<T> retry(long times, Predicate<? super Throwable> predicate)
  5. public final Observable<T> retryUntil(final BooleanSupplier stop)
  6. public final Observable<T> retryWhen(Function<? super Observable<Throwable>, ? extends ObservableSource<?>> handler)

2.1.7 條件操作符和布爾操作符

1.all & any

  1. all用來判斷指定的數(shù)據(jù)項(xiàng)是否全部滿足指定的要求赐稽,這里的“要求”可以使用一個(gè)函數(shù)來指定;
  2. any用來判斷指定的Observable是否存在滿足指定要求的數(shù)據(jù)項(xiàng)浑侥。

在下面的程序中姊舵,我們用該函數(shù)來判斷指定的數(shù)據(jù)項(xiàng)是否全部滿足大于5的要求,顯然是不滿足的寓落,因此下面的程序?qū)敵?code>false:

Observable.range(5, 5).all(i -> i>5).subscribe(System.out::println); // false
Observable.range(5, 5).any(i -> i>5).subscribe(System.out::println); // true

以下是該方法的定義:

  1. public final Single<Boolean> all(Predicate<? super T> predicate)
  2. public final Single<Boolean> any(Predicate<? super T> predicate)

2.contains & isEmpty

這兩個(gè)方法分別用來判斷數(shù)據(jù)項(xiàng)中是否包含我們指定的數(shù)據(jù)項(xiàng)括丁,已經(jīng)判斷數(shù)據(jù)項(xiàng)是否為空:

Observable.range(5, 5).contains(4).subscribe(System.out::println); // false
Observable.range(5, 5).isEmpty().subscribe(System.out::println); // false

以下是這兩個(gè)方法的定義:

  1. public final Single<Boolean> isEmpty()
  2. public final Single<Boolean> contains(final Object element)

3.sequenceEqual

sequenceEqual用來判斷兩個(gè)Observable發(fā)射出的序列是否是相等的。比如下面的方法用來判斷兩個(gè)序列是否相等:

Observable.sequenceEqual(Observable.range(1,5), Observable.range(1, 5)).subscribe(System.out::println);

4.amb

amb作用的兩個(gè)或多個(gè)Observable伶选,但是只會發(fā)射最先發(fā)射數(shù)據(jù)的那個(gè)Observable的全部數(shù)據(jù):

Observable.amb(Arrays.asList(Observable.range(1, 5), Observable.range(6, 5))).subscribe(System.out::print)

該方法及其功能近似的方法的定義史飞,這里前兩個(gè)是靜態(tài)的方法,第二個(gè)屬于實(shí)例方法:

  1. public static <T> Observable<T> amb(Iterable<? extends ObservableSource<? extends T>> sources)
  2. public static <T> Observable<T> ambArray(ObservableSource<? extends T>... sources)
  3. public final Observable<T> ambWith(ObservableSource<? extends T> other)

5.defaultIfEmpty

defaultIfEmpty用來當(dāng)指定的序列為空的時(shí)候指定一個(gè)用于發(fā)射的值仰税。下面的程序中构资,我們直接調(diào)用發(fā)射器的onComplete方法,因此序列是空的陨簇,結(jié)果輸出一個(gè)整數(shù)6

Observable.create((ObservableOnSubscribe<Integer>) Emitter::onComplete).defaultIfEmpty(6).subscribe(System.out::print);

下面是該方法的定義:

  1. public final Observable<T> defaultIfEmpty(T defaultItem)

2.1.8 轉(zhuǎn)換操作符

1.toList & toSortedList

toListtoSortedList用于將序列轉(zhuǎn)換成列表吐绵,后者相對于前者增加了排序的功能:

Observable.range(1, 5).toList().subscribe(System.out::println);
Observable.range(1, 5).toSortedList(Comparator.comparingInt(o -> -o)).subscribe(System.out::println);

下面是它們的定義,它們有多個(gè)重載版本,這里選擇其中的兩個(gè)進(jìn)行說明:

  1. public final Single<List<T>> toList()
  2. public final Single<List<T>> toSortedList(final Comparator<? super T> comparator)

注意一下己单,這里的返回結(jié)果是Single類型的唉窃,不過這并不妨礙我們繼續(xù)使用鏈?zhǔn)讲僮鳎驗(yàn)?code>Single的方法和Observable基本一致纹笼。
另外還要注意這里的Single中的參數(shù)是一個(gè)List<T>句携,也就是說,它把整個(gè)序列轉(zhuǎn)換成了一個(gè)列表對象允乐。因此矮嫉,上面的兩個(gè)示例程序的輸出是:

[1, 2, 3, 4, 5]
[5, 4, 3, 2, 1]

2.toMap & toMultimap

toMap用于將發(fā)射的數(shù)據(jù)轉(zhuǎn)換成另一個(gè)類型的值,它的轉(zhuǎn)換過程是針對每一個(gè)數(shù)據(jù)項(xiàng)的牍疏。以下面的代碼為例蠢笋,它會將原始的序列中的每個(gè)數(shù)字轉(zhuǎn)換成對應(yīng)的十六進(jìn)制。但是鳞陨,toMap轉(zhuǎn)換的結(jié)果不一定是按照原始的序列的發(fā)射的順序來的:

Observable.range(8, 10).toMap(Integer::toHexString).subscribe(System.out::print);

toMap近似的是toMultimap方法昨寞,它可以將原始序列的每個(gè)數(shù)據(jù)項(xiàng)轉(zhuǎn)換成一個(gè)集合類型:

Observable.range(8, 10).toMultimap(Integer::toHexString).subscribe(System.out::print);

上面的兩段程序的輸出結(jié)果是:

{11=17, a=10, b=11, c=12, d=13, e=14, f=15, 8=8, 9=9, 10=16}
{11=[17], a=[10], b=[11], c=[12], d=[13], e=[14], f=[15], 8=[8], 9=[9], 10=[16]}

上面的兩個(gè)方法的定義是(多個(gè)重載,選擇部分):

  1. public final <K> Single<Map<K, T>> toMap(final Function<? super T, ? extends K> keySelector)
  2. public final <K> Single<Map<K, Collection<T>>> toMultimap(Function<? super T, ? extends K> keySelector)

3.toFlowable

該方法用于將一個(gè)Observable轉(zhuǎn)換成Flowable類型厦滤,下面是該方法的定義援岩,顯然這個(gè)方法使用了策略模式,這里面涉及背壓相關(guān)的內(nèi)容掏导,我們后續(xù)再詳細(xì)介紹享怀。

public final Flowable<T> toFlowable(BackpressureStrategy strategy)

4.to

相比于上面的方法,to方法的限制更加得寬泛趟咆,你可以將指定的Observable轉(zhuǎn)換成任意你想要的類型(如果你可以做到的話)添瓷,下面是一個(gè)示例代碼,用來將指定的整數(shù)序列轉(zhuǎn)換成另一個(gè)整數(shù)類型的Observable值纱,只不過這里的每個(gè)數(shù)據(jù)項(xiàng)都是原來的列表中的數(shù)據(jù)總數(shù)的值:

Observable.range(1, 5).to(Observable::count).subscribe(System.out::println);

下面是該方法的定義:

public final <R> R to(Function<? super Observable<T>, R> converter)

2.2 線程控制

之前有提到過RxJava的線程控制是通過subscribeOnobserveOn兩個(gè)方法來完成的鳞贷。
這里我們梳理一下RxJava提供的幾種線程調(diào)度器以及RxAndroid為Android提供的調(diào)度器的使用場景和區(qū)別等。

  1. Schedulers.io():代表適用于io操作的調(diào)度器虐唠,增長或縮減來自適應(yīng)的線程池搀愧,通常用于網(wǎng)絡(luò)、讀寫文件等io密集型的操作疆偿。重點(diǎn)需要注意的是線程池是無限制的咱筛,大量的I/O調(diào)度操作將創(chuàng)建許多個(gè)線程并占用內(nèi)存。
  2. Schedulers.computation():計(jì)算工作默認(rèn)的調(diào)度器翁脆,代表CPU計(jì)算密集型的操作眷蚓,與I/O操作無關(guān)。它也是許多RxJava方法反番,比如buffer(),debounce(),delay(),interval(),sample(),skip()沙热,的默認(rèn)調(diào)度器叉钥。
  3. Schedulers.newThread():代表一個(gè)常規(guī)的新線程。
  4. Schedulers.immediate():這個(gè)調(diào)度器允許你立即在當(dāng)前線程執(zhí)行你指定的工作篙贸。它是timeout(),timeInterval()以及timestamp()方法默認(rèn)的調(diào)度器投队。
  5. Schedulers.trampoline():當(dāng)我們想在當(dāng)前線程執(zhí)行一個(gè)任務(wù)時(shí),并不是立即爵川,我們可以用trampoline()將它入隊(duì)敷鸦。這個(gè)調(diào)度器將會處理它的隊(duì)列并且按序運(yùn)行隊(duì)列中每一個(gè)任務(wù)。它是repeat()retry()方法默認(rèn)的調(diào)度器寝贡。

以及RxAndroid提供的線程調(diào)度器:

AndroidSchedulers.mainThread()用來指代Android的主線程

2.3 總結(jié)

上面的這些操作也基本適用于Flowable扒披、SingleCompletableMaybe圃泡。

我們花費(fèi)了很多的時(shí)間和精力來梳理了這些方法碟案,按照上面的內(nèi)容,使用RxJava實(shí)現(xiàn)一些基本的或者高級的操作都不是什么問題颇蜡。

但是价说,Observable更適用于處理一些數(shù)據(jù)規(guī)模較小的問題,當(dāng)數(shù)據(jù)規(guī)模比較多的時(shí)候可能會出現(xiàn)MissingBackpressureException異常风秤。
因此鳖目,我們還需要了解背壓和Flowable的相關(guān)內(nèi)容才能更好地理解和應(yīng)用RxJava.

以上,感謝閱讀~

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末缤弦,一起剝皮案震驚了整個(gè)濱河市领迈,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌甸鸟,老刑警劉巖惦费,帶你破解...
    沈念sama閱讀 217,542評論 6 504
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件兵迅,死亡現(xiàn)場離奇詭異抢韭,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)恍箭,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,822評論 3 394
  • 文/潘曉璐 我一進(jìn)店門刻恭,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人扯夭,你說我怎么就攤上這事鳍贾。” “怎么了交洗?”我有些...
    開封第一講書人閱讀 163,912評論 0 354
  • 文/不壞的土叔 我叫張陵骑科,是天一觀的道長。 經(jīng)常有香客問我构拳,道長咆爽,這世上最難降的妖魔是什么梁棠? 我笑而不...
    開封第一講書人閱讀 58,449評論 1 293
  • 正文 為了忘掉前任,我火速辦了婚禮斗埂,結(jié)果婚禮上符糊,老公的妹妹穿的比我還像新娘。我一直安慰自己呛凶,他們只是感情好男娄,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,500評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著漾稀,像睡著了一般模闲。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上崭捍,一...
    開封第一講書人閱讀 51,370評論 1 302
  • 那天围橡,我揣著相機(jī)與錄音,去河邊找鬼缕贡。 笑死翁授,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的晾咪。 我是一名探鬼主播收擦,決...
    沈念sama閱讀 40,193評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼谍倦!你這毒婦竟也來了塞赂?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,074評論 0 276
  • 序言:老撾萬榮一對情侶失蹤昼蛀,失蹤者是張志新(化名)和其女友劉穎宴猾,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體叼旋,經(jīng)...
    沈念sama閱讀 45,505評論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡仇哆,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,722評論 3 335
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了夫植。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片讹剔。...
    茶點(diǎn)故事閱讀 39,841評論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖详民,靈堂內(nèi)的尸體忽然破棺而出延欠,到底是詐尸還是另有隱情,我是刑警寧澤沈跨,帶...
    沈念sama閱讀 35,569評論 5 345
  • 正文 年R本政府宣布由捎,位于F島的核電站,受9級特大地震影響饿凛,放射性物質(zhì)發(fā)生泄漏狞玛。R本人自食惡果不足惜邻奠,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,168評論 3 328
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望为居。 院中可真熱鬧碌宴,春花似錦、人聲如沸蒙畴。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,783評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽膳凝。三九已至碑隆,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間蹬音,已是汗流浹背上煤。 一陣腳步聲響...
    開封第一講書人閱讀 32,918評論 1 269
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留著淆,地道東北人劫狠。 一個(gè)月前我還...
    沈念sama閱讀 47,962評論 2 370
  • 正文 我出身青樓,卻偏偏與公主長得像永部,于是被迫代替她去往敵國和親独泞。 傳聞我的和親對象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,781評論 2 354

推薦閱讀更多精彩內(nèi)容