看了許多講解RxJava的文章报慕,有些文章講解的內(nèi)容是基于第一個版本的,有些文章的講解是通過比較常用的一些API和基礎的概念進行講解的。
但是每次看到RxJava的類中的幾十個方法的時候,總是感覺心里沒底。所以娃属,我打算自己去專門寫篇文章來從API的角度系統(tǒng)地梳理一下RxJava的各種方法和用法。
1护姆、RxJava 基本
1.1 RxJava 簡介
RxJava是一個在Java VM上使用可觀測的序列來組成異步的矾端、基于事件的程序的庫。
雖然卵皂,在Android中秩铆,我們可以使用AsyncTask來完成異步任務操作,但是當任務的梳理比較多的時候灯变,我們要為每個任務定義一個AsyncTask就變得非常繁瑣殴玛。
RxJava能幫助我們在實現(xiàn)異步執(zhí)行的前提下保持代碼的清晰。
它的原理就是創(chuàng)建一個Observable
來完成異步任務添祸,組合使用各種不同的鏈式操作滚粟,來實現(xiàn)各種復雜的操作,最終將任務的執(zhí)行結果發(fā)射給Observer
進行處理刃泌。
當然凡壤,RxJava不僅適用于Android,也適用于服務端等各種場景耙替。
我們總結以下RxJava的用途:
- 簡化異步程序的流程亚侠;
- 使用近似于Java8的流的操作進行編程:因為想要在Android中使用Java8的流編程有諸多的限制,所以我們可以使用RxJava來實現(xiàn)這個目的俗扇。
在使用RxJava之前硝烂,我們需要先在自己的項目中添加如下的依賴:
compile 'io.reactivex.rxjava2:rxjava:2.2.0'
compile 'io.reactivex.rxjava2:rxandroid:2.0.2'
這里我們使用的是RxJava2,它與RxJava的第一個版本有些許不同铜幽。在本文中滞谢,我們所有的關于RxJava的示例都將基于RxJava2.
注:如果想了解關于Java8的流編程的內(nèi)容的內(nèi)容串稀,可以參考我之前寫過的文章五分鐘學習Java8的流編程。
1.2 概要
下面是RxJava的一個基本的用例狮杨,這里我們定義了一個Observable
厨诸,然后在它內(nèi)部使用emitter
發(fā)射了一些數(shù)據(jù)和信息(其實就相當于調用了被觀察對象內(nèi)部的方法,通知所有的觀察者)禾酱。
然后,我們用Consumer
接口的實例作為subscribe()
方法的參數(shù)來觀察發(fā)射的結果绘趋。(這里的接口的方法都已經(jīng)被使用Lambda簡化過颤陶,應該學著適應它。)
Observable<Integer> observable = Observable.create(emitter -> {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
});
observable.subscribe(System.out::println);
這樣陷遮,我們就完成了一個基本的RxJava的示例滓走。從上面的例子中,你或許沒法看出Observable
中隱藏的流的概念帽馋,看下面的例子:
Observable.range(0, 10).map(String::valueOf).forEach(System.out::println);
這里我們先用Observable.range()
方法產(chǎn)生一個序列搅方,然后用map
方法將該整數(shù)序列映射成一個字符序列,最后將得到的序列輸出來绽族。從上面看出姨涡,這種操作和Java8里面的Stream編程很像。但是兩者之間是有區(qū)別的:
- 所謂的“推”和“拉”的區(qū)別:Stream中是通過從流中讀取數(shù)據(jù)來實現(xiàn)鏈式操作吧慢,而RxJava除了Stream中的功能之外涛漂,還可以通過“發(fā)射”數(shù)據(jù),來實現(xiàn)通知的功能检诗,即RxJava在Stream之上又多了一個觀察者的功能匈仗。
- Java8中的Stream可以通過
parall()
來實現(xiàn)并行,即基于分治算法將任務分解并計算得到結果之后將結果合并起來逢慌;而RxJava只能通過subscribeOn()
方法將所有的操作切換到某個線程中去悠轩。 - Stream只能被消費一次,但是
Observable
可以被多次進行訂閱攻泼;
RxJava除了為我們提供了Observable
之外火架,在新的RxJava中還提供了適用于其他場景的基礎類,它們之間的功能和主要區(qū)別如下:
-
Flowable
: 多個流忙菠,響應式流和背壓 -
Observable
: 多個流距潘,無背壓 -
Single
: 只有一個元素或者錯誤的流 -
Completable
: 沒有任何元素,只有一個完成和錯誤信號的流 -
Maybe
: 沒有任何元素或者只有一個元素或者只有一個錯誤的流
除了上面的幾個基礎類之外只搁,還有一個Disposable
音比。當我們監(jiān)聽某個流的時候,就能獲取到一個Disposable
對象氢惋。它提供了兩個方法洞翩,一個是isDisposed
稽犁,可以被用來判斷是否停止了觀察指定的流;另一個是dispose
方法骚亿,用來放棄觀察指定的流已亥,我們可以使用它在任意的時刻停止觀察操作。
1.3 總結
上面我們介紹了了關于RxJava的基本的概念和使用方式来屠,在下面的文章中我們會按照以上定義的順序從API的角度來講解以下RxJava各個模塊的使用方法虑椎。
2、RxJava 的使用
2.1 Observable
從上面的文章中我們可以得知俱笛,Observable
和后面3種操作功能近似捆姜,區(qū)別在于Flowable
加入了背壓的概念,Observable
的大部分方法也適用于其他3個操作和Flowable
迎膜。
因此泥技,我們這里先從Observable
開始梳理,然后我們再專門對Flowable
和背壓的進行介紹磕仅。
Observable
為我們提供了一些靜態(tài)的構造方法來創(chuàng)建一個Observable
對象珊豹,還有許多鏈式的方法來完成各種復雜的功能。
這里我們按照功能將它的這些方法分成各個類別并依次進行相關的說明榕订。
2.1.1 創(chuàng)建操作
1.interval & intervalRange
下面的操作可以每個3秒的時間發(fā)送一個整數(shù)店茶,整數(shù)從0開始:
Observable.interval(3, TimeUnit.SECONDS).subscribe(System.out::println);
如果想要設置從指定的數(shù)字開始也是可以的,實際上interval
提供了許多重載方法供我們是使用劫恒。下面我們連同與之功能相近的intervalRange
方法也一同給出:
public static Observable<Long> interval(long initialDelay, long period, TimeUnit unit, Scheduler scheduler)
public static Observable<Long> interval(long period, TimeUnit unit, Scheduler scheduler)
public static Observable<Long> intervalRange(long start, long count, long initialDelay, long period, TimeUnit unit, Scheduler scheduler)
這里的initialDelay
參數(shù)用來指示開始發(fā)射第一個整數(shù)的之前要停頓的時間忽妒,時間的單位與peroid
一樣,都是通過unit
參數(shù)來指定的兼贸;period
參數(shù)用來表示每個發(fā)射之間停頓多少時間段直;unit
表示時間的單位,是TimeUnit
類型的溶诞;scheduler
參數(shù)指定數(shù)據(jù)發(fā)射和等待時所在的線程鸯檬。
intervalRange
方法可以用來將發(fā)射的整數(shù)序列限制在一個范圍之內(nèi),這里的start
用來表示發(fā)射的數(shù)據(jù)的起始值螺垢,count
表示總共要發(fā)射幾個數(shù)字喧务,其他參數(shù)與上面的interval
方法一致。
2.range & rangeLong
下面的操作可以產(chǎn)生一個從5開始的連續(xù)10個整數(shù)構成的序列:
Observable.range(5, 10).subscribe(i -> System.out.println("1: " + i));
該方法需要傳入兩個參數(shù)枉圃,與之有相同功能的方法還有rangeLong
:
public static Observable<Integer> range(final int start, final int count)
public static Observable<Long> rangeLong(long start, long count)
這里的兩個參數(shù)start
用來指定用于生成的序列的開始值功茴,count
用來指示要生成的序列總共包含多少個數(shù)字,上面的兩個方法的主要區(qū)別在于一個是用來生成int型整數(shù)的孽亲,一個是用來生成long型整數(shù)的坎穿。
3.create
create
方法用于從頭開始創(chuàng)建一個Observable
,像下面顯示的那樣,你需要使用create
方法并傳一個發(fā)射器作為參數(shù)玲昧,在該發(fā)射器內(nèi)部調用onNext
栖茉、onComplete
和onError
方法就可以將數(shù)據(jù)發(fā)送給監(jiān)聽者。
Observable.create((ObservableOnSubscribe<Integer>) observableEmitter -> {
observableEmitter.onNext(1);
observableEmitter.onNext(2);
observableEmitter.onComplete();
}).subscribe(System.out::println);
4.defer
defer
直到有觀察者訂閱時才創(chuàng)建Observable孵延,并且為每個觀察者創(chuàng)建一個新的Observable吕漂。defer
操作符會一直等待直到有觀察者訂閱它,然后它使用Observable工廠方法生成一個Observable尘应。比如下面的代碼兩個訂閱輸出的結果是不一致的:
Observable<Long> observable = Observable.defer((Callable<ObservableSource<Long>>) () -> Observable.just(System.currentTimeMillis()));
observable.subscribe(System.out::print);
System.out.println();
observable.subscribe(System.out::print);
下面是該方法的定義惶凝,它接受一個Callable對象,可以在該對象中返回一個Observable的實例:
public static <T> Observable<T> defer(Callable<? extends ObservableSource<? extends T>> supplier)
5.empty & never & error
-
public static <T> Observable<T> empty()
:創(chuàng)建一個不發(fā)射任何數(shù)據(jù)但是正常終止的Observable犬钢; -
public static <T> Observable<T> never()
:創(chuàng)建一個不發(fā)射數(shù)據(jù)也不終止的Observable苍鲜; -
public static <T> Observable<T> error(Throwable exception)
:創(chuàng)建一個不發(fā)射數(shù)據(jù)以一個錯誤終止的Observable,它有幾個重載版本娜饵,這里給出其中的一個。
測試代碼:
Observable.empty().subscribe(i->System.out.print("next"),i->System.out.print("error"),()->System.out.print("complete"));
Observable.never().subscribe(i->System.out.print("next"),i->System.out.print("error"),()->System.out.print("complete"));
Observable.error(new Exception()).subscribe(i->System.out.print("next"),i->System.out.print("error"),()->System.out.print("complete"));
輸出結果:completeerror
6.from 系列
from
系列的方法用來從指定的數(shù)據(jù)源中獲取一個Observable:
-
public static <T> Observable<T> fromArray(T... items)
:從數(shù)組中獲裙俦病箱舞; -
public static <T> Observable<T> fromCallable(Callable<? extends T> supplier)
:從Callable中獲取拳亿; -
public static <T> Observable<T> fromFuture(Future<? extends T> future)
:從Future中獲取晴股,有多個重載版本,可以用來指定線程和超時等信息肺魁; -
public static <T> Observable<T> fromIterable(Iterable<? extends T> source)
:從Iterable中獲鹊缦妗; -
public static <T> Observable<T> fromPublisher(Publisher<? extends T> publisher)
:從Publisher中獲取鹅经。
7.just 系列
just系列的方法的一個參數(shù)的版本為下面的形式:public static <T> Observable<T> just(T item)
寂呛,它還有許多個重載的版本,區(qū)別在于接受的參數(shù)的個數(shù)不同瘾晃,最少1個贷痪,最多10個。
8.repeat
該方法用來表示指定的序列要發(fā)射多少次蹦误,下面的方法會將該序列無限次進行發(fā)送:
Observable.range(5, 10).repeat().subscribe(i -> System.out.println(i));
repeat
方法有以下幾個相似方法:
public final Observable<T> repeat()
public final Observable<T> repeat(long times)
public final Observable<T> repeatUntil(BooleanSupplier stop)
public final Observable<T> repeatWhen(Function<? super Observable<Object>, ? extends ObservableSource<?>> handler)
第1個無參的方法會無限次地發(fā)送指定的序列(實際上內(nèi)部調用了第2個方法并傳入了Long.MAX_VALUE)劫拢,第2個方法會將指定的序列重復發(fā)射指定的次數(shù);第3個方法會在滿足指定的要求的時候停止重復發(fā)送强胰,否則會一直發(fā)送舱沧。
9.timer
timer操作符創(chuàng)建一個在給定的時間段之后返回一個特殊值的Observable,它在延遲一段給定的時間后發(fā)射一個簡單的數(shù)字0偶洋。比如下面的程序會在500毫秒之后輸出一個數(shù)字0
熟吏。
Observable.timer(500, TimeUnit.MILLISECONDS).subscribe(System.out::print);
下面是該方法及其重載方法的定義,重載方法還可以指定一個調度器:
public static Observable<Long> timer(long delay, TimeUnit unit)
public static Observable<Long> timer(long delay, TimeUnit unit, Scheduler scheduler)
2.1.2 變換操作
1.map & cast
-
map
操作符對原始Observable發(fā)射的每一項數(shù)據(jù)應用一個你選擇的函數(shù)玄窝,然后返回一個發(fā)射這些結果的Observable分俯。默認不在任何特定的調度器上執(zhí)行肾筐。 -
cast
操作符將原始Observable發(fā)射的每一項數(shù)據(jù)都強制轉換為一個指定的類型(多態(tài)),然后再發(fā)射數(shù)據(jù)缸剪,它是map的一個特殊版本:
下面的第一段代碼用于將生成的整數(shù)序列轉換成一個字符串序列之后并輸出吗铐;第二段代碼用于將Date類型轉換成Object類型并進行輸出,這里如果前面的Class無法轉換成第二個Class就會出現(xiàn)異常:
Observable.range(1, 5).map(String::valueOf).subscribe(System.out::println);
Observable.just(new Date()).cast(Object.class).subscribe(System.out::print);
這兩個方法的定義如下:
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper)
public final <U> Observable<U> cast(Class<U> clazz)
這里的mapper
函數(shù)接受兩個泛型杏节,一個表示原始的數(shù)據(jù)類型唬渗,一個表示要轉換之后的數(shù)據(jù)類型,轉換的邏輯寫在該接口實現(xiàn)的方法中即可奋渔。
2.flatMap & contactMap
flatMap
將一個發(fā)送事件的上游Observable變換為多個發(fā)送事件的Observables镊逝,然后將它們發(fā)射的事件合并后放進一個單獨的Observable里。需要注意的是, flatMap并不保證事件的順序嫉鲸,也就是說轉換之后的Observables的順序不必與轉換之前的序列的順序一致撑蒜。比如下面的代碼用于將一個序列構成的整數(shù)轉換成多個單個的Observable
,然后組成一個OBservable
玄渗,并被訂閱座菠。下面輸出的結果仍將是一個字符串數(shù)字序列搀突,只是順序不一定是增序的劈愚。
Observable.range(1, 5)
.flatMap((Function<Integer, ObservableSource<String>>) i -> Observable.just(String.valueOf(i)))
.subscribe(System.out::println);
與flatMap
對應的方法是contactMap
,后者能夠保證最終輸出的順序與上游發(fā)送的順序一致留凭。下面是這兩個方法的定義:
public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper)
public final <R> Observable<R> concatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper)
flatMap
的重載方法數(shù)量過多岁钓,它們在數(shù)據(jù)源方面略有不同升略,有的支持錯誤等可選參數(shù),具體可以參考源代碼屡限。
3.flatMapIterable
flatMapIterable
可以用來將上流的任意一個元素轉換成一個Iterable
對象品嚣,然后我們可以對其進行消費。在下面的代碼中钧大,我們先生成一個整數(shù)的序列腰根,然后將每個整數(shù)映射成一個Iterable<string>
類型,最后拓型,我們對其進行訂閱和消費:
Observable.range(1, 5)
.flatMapIterable((Function<Integer, Iterable<String>>) integer -> Collections.singletonList(String.valueOf(integer)))
.subscribe(s -> System.out.println("flatMapIterable : " + s));
下面是該方法及其重載方法的定義:
public final <U> Observable<U> flatMapIterable(Function<? super T, ? extends Iterable<? extends U>> mapper)
public final <U, V> Observable<V> flatMapIterable(Function<? super T, ? extends Iterable<? extends U>> mapper, BiFunction<? super T, ? super U, ? extends V> resultSelector)
4.buffer
該方法用于將整個流進行分組额嘿。以下面的程序為例,我們會先生成一個7個整數(shù)構成的流劣挫,然后使用buffer
之后册养,這些整數(shù)會被3個作為一組進行輸出,所以當我們訂閱了buffer
轉換之后的Observable
之后得到的是一個列表構成的OBservable
:
Observable.range(1, 7).buffer(3)
.subscribe(integers -> System.out.println(Arrays.toString(integers.toArray())));
下面是這個方法及其重載方法的定義压固,它的重載方法太多球拦,這里我們只給出其中的兩個,其他的可以參考RxJava的源碼。這里的buffer應該理解為一個緩沖區(qū)坎炼,當緩沖區(qū)滿了或者剩余的數(shù)據(jù)不夠一個緩沖區(qū)的時候就將數(shù)據(jù)發(fā)射出去愧膀。
public final Observable<List<T>> buffer(int count)
public final Observable<List<T>> buffer(int count, int skip)
- ...
5.groupBy
groupBy
用于分組元素,它可以被用來根據(jù)指定的條件將元素分成若干組谣光。它將得到一個Observable<GroupedObservable<T, M>>
類型的Observable
檩淋。如下面的程序所示,這里我們使用concat
方法先將兩個Observable
拼接成一個Observable
萄金,然后對其元素進行分組蟀悦。這里我們的分組依據(jù)是整數(shù)的值,這樣我們將得到一個Observable<GroupedObservable<Integer, Integer>>
類型的Observable
氧敢。然后日戈,我們再將得到的序列拼接成一個并進行訂閱輸出:
Observable<GroupedObservable<Integer, Integer>> observable = Observable.concat(
Observable.range(1,4), Observable.range(1,6)).groupBy(integer -> integer);
Observable.concat(observable).subscribe(integer -> System.out.println("groupBy : " + integer));
該方法有多個重載版本,這里我們用到的一個的定義是:
public final <K> Observable<GroupedObservable<K, T>> groupBy(Function<? super T, ? extends K> keySelector)
6.scan
scan
操作符對原始Observable發(fā)射的第一項數(shù)據(jù)應用一個函數(shù)孙乖,然后將那個函數(shù)的結果作為自己的第一項數(shù)據(jù)發(fā)射浙炼。它將函數(shù)的結果同第二項數(shù)據(jù)一起填充給這個函數(shù)來產(chǎn)生它自己的第二項數(shù)據(jù)。它持續(xù)進行這個過程來產(chǎn)生剩余的數(shù)據(jù)序列唯袄。這個操作符在某些情況下被叫做accumulator弯屈。
以下面的程序為例,該程序的輸結果是2 6 24 120 720
越妈,可以看出這里的計算規(guī)則是季俩,我們把傳入到scan
中的函數(shù)記為f
钮糖,序列記為x
梅掠,生成的序列記為y
,那么這里的計算公式是y(0)=x(0); y(i)=f(y(i-1), x(i)), i>0
:
Observable.range(2, 5).scan((i1, i2) -> i1 * i2).subscribe(i -> System.out.print(i + " "));
除了上面的這種形式店归,scan
方法還有一個重載的版本阎抒,我們可以使用這個版本的方法來在生成序列的時候指定一個初始值。以下面的程序為例消痛,它的輸出結果是3 6 18 72 360 2160
且叁,可以看出它的輸出比上面的形式多了1個,這是因為當指定了初始值之后秩伞,生成的第一個數(shù)字就是那個初始值逞带,剩下的按照我們上面的規(guī)則進行的。所以纱新,用同樣的函數(shù)語言來描述的話展氓,那么它就應該是下面的這種形式:y(0)=initialValue; y(i)=f(y(i-1), x(i)), i>0
。
Observable.range(2, 5).scan(3, (i1, i2) -> i1 * i2).subscribe(i -> System.out.print(i + " "));
以上方法的定義是:
public final Observable<T> scan(BiFunction<T, T, T> accumulator)
public final <R> Observable<R> scan(R initialValue, BiFunction<R, ? super T, R> accumulator)
7.window
window
Window和Buffer類似脸爱,但不是發(fā)射來自原始Observable的數(shù)據(jù)包遇汞,它發(fā)射的是Observable,這些Observables中的每一個都發(fā)射原始Observable數(shù)據(jù)的一個子集,最后發(fā)射一個onCompleted通知空入。
以下面的程序為例络它,這里我們首先生成了一個由10個數(shù)字組成的整數(shù)序列,然后使用window
函數(shù)將它們每3個作為一組歪赢,每組會返回一個對應的Observable對象化戳。
這里我們對該返回的結果進行訂閱并進行消費,因為10個數(shù)字轨淌,所以會被分成4個組迂烁,每個對應一個Observable:
Observable.range(1, 10).window(3).subscribe(
observable -> observable.subscribe(integer -> System.out.println(observable.hashCode() + " : " + integer)));
除了對數(shù)據(jù)包進行分組,我們還可以根據(jù)時間來對發(fā)射的數(shù)據(jù)進行分組递鹉。該方法有多個重載的版本盟步,這里我們給出其中的比較具有代表性的幾個:
public final Observable<Observable<T>> window(long count)
public final Observable<Observable<T>> window(long timespan, long timeskip, TimeUnit unit)
public final <B> Observable<Observable<T>> window(ObservableSource<B> boundary)
public final <B> Observable<Observable<T>> window(Callable<? extends ObservableSource<B>> boundary)
2.1.3 過濾操作
1.filter
filter
用來根據(jù)指定的規(guī)則對源進行過濾,比如下面的程序用來過濾整數(shù)1到10中所有大于5的數(shù)字:
Observable.range(1,10).filter(i -> i > 5).subscribe(System.out::println);
下面是該方法的定義:
public final Observable<T> filter(Predicate<? super T> predicate)
2.elementAt & firstElement & lastElement
elementAt
用來獲取源中指定位置的數(shù)據(jù)躏结,它有幾個重載方法却盘,這里我們介紹一下最簡單的一個方法的用法。下面是elementAt
的一個示例媳拴,它將獲取源數(shù)據(jù)中索引為1的元素并交給觀察者訂閱黄橘。下面的程序將輸出1
Observable.range(1, 10).elementAt(0).subscribe(System.out::print);
這里我們給出elementAt
及其相關的方法的定義,它們的使用相似屈溉。注意一下這里的返回類型:
public final Maybe<T> elementAt(long index)
public final Single<T> elementAt(long index, T defaultItem)
public final Single<T> elementAtOrError(long index)
除了獲取指定索引的元素的方法之外塞关,RxJava中還有可以用來直接獲取第一個和最后一個元素的方法,這里我們直接給出方法的定義:
public final Maybe<T> firstElement()
public final Single<T> first(T defaultItem)
public final Single<T> firstOrError()
public final Maybe<T> lastElement()
public final Single<T> last(T defaultItem)
public final Single<T> lastOrError()
3.distinct & distinctUntilChanged
distinct
用來對源中的數(shù)據(jù)進行過濾子巾,以下面的程序為例帆赢,這里會把重復的數(shù)字7過濾掉:
Observable.just(1,2,3,4,5,6,7,7).distinct().subscribe(System.out::print);
與之類似的還有distinctUntilChanged
方法,與distinct
不同的是线梗,它只當相鄰的兩個元素相同的時候才會將它們過濾掉椰于。比如下面的程序會過濾掉其中的2和5,所以最終的輸出結果是12345676
:
Observable.just(1,2,2,3,4,5,5,6,7,6).distinctUntilChanged().subscribe(System.out::print);
該方法也有幾個功能相似的方法仪搔,這里給出它們的定義如下:
public final Observable<T> distinct()
public final <K> Observable<T> distinct(Function<? super T, K> keySelector)
public final <K> Observable<T> distinct(Function<? super T, K> keySelector, Callable<? extends Collection<? super K>> collectionSupplier)
public final Observable<T> distinctUntilChanged()
public final <K> Observable<T> distinctUntilChanged(Function<? super T, K> keySelector)
public final Observable<T> distinctUntilChanged(BiPredicate<? super T, ? super T> comparer)
4.skip & skipLast & skipUntil & skipWhile
skip
方法用于過濾掉數(shù)據(jù)的前n項瘾婿,比如下面的程序將會過濾掉前2項,因此輸出結果是345
:
Observable.range(1, 5).skip(2).subscribe(System.out::print);
與skip
方法對應的是take
方法烤咧,它用來表示只選擇數(shù)據(jù)源的前n項偏陪,該方法的示例就不給出了。這里煮嫌,我們說一下與之類功能類似的重載方法笛谦。skip
還有一個重載方法接受兩個參數(shù),用來表示跳過指定的時間立膛,也就是在指定的時間之后才開始進行訂閱和消費揪罕。下面的程序會在3秒之后才開始不斷地輸出數(shù)字:
Observable.range(1,5).repeat().skip(3, TimeUnit.SECONDS).subscribe(System.out::print);
與skip
功能相反的方法的還有skipLast
梯码,它用來表示過濾掉后面的幾項,以及最后的一段時間不進行發(fā)射等好啰。比如下面的方法轩娶,我們會在程序開始之前進行計時,然后會不斷重復輸出數(shù)字框往,直到5秒之后結束鳄抒。然后,我們用skipLast
方法表示最后的2秒不再進行發(fā)射椰弊。所以下面的程序會先不斷輸出數(shù)字3秒许溅,3秒結束后停止輸出,并在2秒之后結束程序:
long current = System.currentTimeMillis();
Observable.range(1,5)
.repeatUntil(() -> System.currentTimeMillis() - current > TimeUnit.SECONDS.toMillis(5))
.skipLast(2, TimeUnit.SECONDS).subscribe(System.out::print);
與上面的這些方法類似的還有一些秉版,這里我們不再一一列舉贤重。因為這些方法的重載方法比較多,下面我們給出其中的具有代表性的一部分:
public final Observable<T> skip(long count)
public final Observable<T> skip(long time, TimeUnit unit, Scheduler scheduler)
public final Observable<T> skipLast(int count)
public final Observable<T> skipLast(long time, TimeUnit unit, Scheduler scheduler, boolean delayError, int bufferSize)
public final <U> Observable<T> skipUntil(ObservableSource<U> other)
public final Observable<T> skipWhile(Predicate<? super T> predicate)
5.take & takeLast & takeUntil & takeWhile
與skip
方法對應的是take
方法清焕,它表示按照某種規(guī)則進行選擇操作并蝗。我們以下面的程序為例,這里第一段程序表示只發(fā)射序列中的前2個數(shù)據(jù):
Observable.range(1, 5).take(2).subscribe(System.out::print);
下面的程序表示只選擇最后2秒中輸出的數(shù)據(jù):
long current = System.currentTimeMillis();
Observable.range(1,5)
.repeatUntil(() -> System.currentTimeMillis() - current > TimeUnit.SECONDS.toMillis(5))
.takeLast(2, TimeUnit.SECONDS).subscribe(System.out::print);
下面是以上相關的方法的定義秸妥,同樣的滚停,我們只選擇其中比較有代表性的幾個:
public final Observable<T> take(long count)
public final Observable<T> takeLast(long count, long time, TimeUnit unit, Scheduler scheduler, boolean delayError, int bufferSize)
public final <U> Observable<T> takeUntil(ObservableSource<U> other)
public final Observable<T> takeUntil(Predicate<? super T> stopPredicate)
public final Observable<T> takeWhile(Predicate<? super T> predicate)
6.ignoreElements
該方法用來過濾所有源Observable產(chǎn)生的結果,只會把Observable的onComplete和onError事件通知給訂閱者粥惧。下面是該方法的定義:
public final Completable ignoreElements()
7.throttleFirst & throttleLast & throttleLatest & throttleWithTimeout
這些方法用來對輸出的數(shù)據(jù)進行限制键畴,它們是通過時間的”窗口“來進行限制的,你可以理解成按照指定的參數(shù)對時間進行分片突雪,然后根據(jù)各個方法的要求選擇第一個起惕、最后一個、最近的等進行發(fā)射挂签。下面是throttleLast
方法的用法示例疤祭,它會輸出每個500毫秒之間的數(shù)字中最后一個數(shù)字:
Observable.interval(80, TimeUnit.MILLISECONDS)
.throttleLast(500, TimeUnit.MILLISECONDS)
.subscribe(i -> System.out.print(i + " "));
其他的幾個方法的功能大致列舉如下:
-
throttleFirst
只會發(fā)射指定的Observable在指定的事件范圍內(nèi)發(fā)射出來的第一個數(shù)據(jù)盼产; -
throttleLast
只會發(fā)射指定的Observable在指定的事件范圍內(nèi)發(fā)射出來的最后一個數(shù)據(jù)饵婆; -
throttleLatest
用來發(fā)射距離指定的時間分片最近的那個數(shù)據(jù); -
throttleWithTimeout
僅在過了一段指定的時間還沒發(fā)射數(shù)據(jù)時才發(fā)射一個數(shù)據(jù),如果在一個時間片達到之前戏售,發(fā)射的數(shù)據(jù)之后又緊跟著發(fā)射了一個數(shù)據(jù)侨核,那么這個時間片之內(nèi)之前發(fā)射的數(shù)據(jù)會被丟掉,該方法底層是使用debounce
方法實現(xiàn)的灌灾。如果數(shù)據(jù)發(fā)射的頻率總是快過這里的timeout
參數(shù)指定的時間搓译,那么將不會再發(fā)射出數(shù)據(jù)來。
下面是這些方法及其重載方法的定義(選擇其中一部分):
public final Observable<T> throttleFirst(long skipDuration, TimeUnit unit, Scheduler scheduler)
public final Observable<T> throttleLast(long intervalDuration, TimeUnit unit, Scheduler scheduler)
public final Observable<T> throttleLatest(long timeout, TimeUnit unit, Scheduler scheduler, boolean emitLast)
public final Observable<T> throttleWithTimeout(long timeout, TimeUnit unit, Scheduler scheduler)
8.debounce
debounce
也是用來限制發(fā)射頻率過快的锋喜,它僅在過了一段指定的時間還沒發(fā)射數(shù)據(jù)時才發(fā)射一個數(shù)據(jù)些己。我們通過下面的圖來說明這個問題:
這里紅豌鸡、綠、藍三個球發(fā)射出來的原因都是因為當反射了這個球之后的一定的時間內(nèi)沒有其他的球發(fā)射出來段标,這個時間是我們可以通過參數(shù)來指定的涯冠。
該方法的用法與throttle
之類的方法類似,上面也說過throttle
那些方法底層用了debounce
實現(xiàn)逼庞,所以蛇更,這里我們不再為該方法專門編寫相關的測試代碼。
9.sample
實際上throttleLast
的實現(xiàn)中內(nèi)部調用的就是sample
赛糟。
2.1.4 組合操作
1.startWith & startWithArray
startWith
方法可以用來在指定的數(shù)據(jù)源的之前插入幾個數(shù)據(jù)派任,它的功能類似的方法有startWithArray
,另外還有幾個重載方法璧南。這里我們給出一個基本的用法示例掌逛,下面的程序會在原始的數(shù)字流1-5的前面加上0,所以最終的輸出結果是012345
:
Observable.range(1,5).startWith(0).subscribe(System.out::print);
下面是startWith
及其幾個功能相關的方法的定義:
public final Observable<T> startWith(Iterable<? extends T> items)
public final Observable<T> startWith(ObservableSource<? extends T> other)
public final Observable<T> startWith(T item)
public final Observable<T> startWithArray(T... items)
2.merge & mergeArray
merge
可以讓多個數(shù)據(jù)源的數(shù)據(jù)合并起來進行發(fā)射司倚,當然它可能會讓merge
之后的數(shù)據(jù)交錯發(fā)射颤诀。下面是一個示例,這個例子中对湃,我們使用merge
方法將兩個Observable
合并到了一起進行監(jiān)聽:
Observable.merge(Observable.range(1,5), Observable.range(6,5)).subscribe(System.out::print);
鑒于merge
方法及其功能類似的方法太多崖叫,我們這里挑選幾個比較有代表性的方法,具體的可以查看RxJava的源代碼:
public static <T> Observable<T> merge(Iterable<? extends ObservableSource<? extends T>> sources)
public static <T> Observable<T> mergeArray(ObservableSource<? extends T>... sources)
public static <T> Observable<T> mergeDelayError(Iterable<? extends ObservableSource<? extends T>> sources)
public static <T> Observable<T> mergeArrayDelayError(ObservableSource<? extends T>... sources)
這里的mergeError
方法與merge
方法的表現(xiàn)一致拍柒,只是在處理由onError
觸發(fā)的錯誤的時候有所不同心傀。mergeError
方法會等待所有的數(shù)據(jù)發(fā)射完畢之后才把錯誤發(fā)射出來,即使多個錯誤被觸發(fā)拆讯,該方法也只會發(fā)射出一個錯誤信息脂男。而如果使用merger
方法,那么當有錯誤被觸發(fā)的時候种呐,該錯誤會直接被拋出來宰翅,并結束發(fā)射操作。下面是該方法的一個使用的示例爽室,這里我們主線程停頓4秒汁讼,然后所有merge
的Observable中的一個會在線程開始的第2秒的時候觸發(fā)一個錯誤,該錯誤最終會在所有的數(shù)據(jù)發(fā)射完畢之后被發(fā)射出來:
Observable.mergeDelayError(Observable.range(1,5),
Observable.range(1,5).repeat(2),
Observable.create((ObservableOnSubscribe<String>) observableEmitter -> {
Thread.sleep(2000);
observableEmitter.onError(new Exception("error"));
})
).subscribe(System.out::print, System.out::print);
Thread.sleep(4000);
3.concat & concatArray & concatEager
該方法也是用來將多個Observable拼接起來阔墩,但是它會嚴格按照傳入的Observable的順序進行發(fā)射嘿架,一個Observable沒有發(fā)射完畢之前不會發(fā)射另一個Observable里面的數(shù)據(jù)。下面是一個程序示例啸箫,這里傳入了兩個Observable耸彪,會按照順序輸出12345678910
:
Observable.concat(Observable.range(1, 5), Observable.range(6, 5)).subscribe(System.out::print);
下面是該方法的定義,鑒于該方法及其重載方法太多忘苛,這里我們選擇幾個比較有代表性的說明:
public static <T> Observable<T> concat(Iterable<? extends ObservableSource<? extends T>> sources)
public static <T> Observable<T> concatDelayError(Iterable<? extends ObservableSource<? extends T>> sources)
public static <T> Observable<T> concatArray(ObservableSource<? extends T>... sources)
public static <T> Observable<T> concatArrayDelayError(ObservableSource<? extends T>... sources)
public static <T> Observable<T> concatEager(ObservableSource<? extends ObservableSource<? extends T>> sources)
public static <T> Observable<T> concatArrayEager(ObservableSource<? extends T>... sources)
對于concat
方法蝉娜,我們之前已經(jīng)介紹過它的用法唱较;這里的conactArray
的功能與之類似;對于concatEager
方法召川,當一個觀察者訂閱了它的結果绊汹,那么就相當于訂閱了它拼接的所有ObservableSource
,并且會先緩存這些ObservableSource發(fā)射的數(shù)據(jù)扮宠,然后再按照順序將它們發(fā)射出來西乖。而對于這里的concatDelayError
方法的作用和前面的mergeDelayError
類似,只有當所有的數(shù)據(jù)都發(fā)射完畢才會處理異常坛增。
4.zip & zipArray & zipIterable
zip
操作用來將多個數(shù)據(jù)項進行合并获雕,可以通過一個函數(shù)指定這些數(shù)據(jù)項的合并規(guī)則。比如下面的程序的輸出結果是6 14 24 36 50
收捣,顯然這里的合并的規(guī)則是相同索引的兩個數(shù)據(jù)的乘積届案。不過仔細看下這里的輸出結果,可以看出罢艾,如果一個數(shù)據(jù)項指定的位置沒有對應的值的時候楣颠,它是不會參與這個變換過程的:
Observable.zip(Observable.range(1, 6), Observable.range(6, 5), (integer, integer2) -> integer * integer2)
.subscribe(i -> System.out.print(i + " "));
zip
方法有多個重載的版本,同時也有功能近似的方法咐蚯,這里我們挑選有代表性的幾個進行說明:
public static <T, R> Observable<R> zip(Iterable<? extends ObservableSource<? extends T>> sources, Function<? super Object[], ? extends R> zipper)
ublic static <T, R> Observable<R> zipArray(Function<? super Object[], ? extends R> zipper, boolean delayError, int bufferSize, ObservableSource... sources)
public static <T, R> Observable<R> zipIterable(Iterable<? extends ObservableSource<? extends T>> sources, Function<? super Object[], ? extends R> zipper, boolean delayError, int bufferSize)
實際上上面幾個方法的用法和功能基本類似童漩,區(qū)別在于傳入的ObservableSource
的參數(shù)的形式。
5.combineLastest
與zip
操作類似春锋,但是這個操作的輸出結果與zip
截然不同矫膨,以下面的程序為例,它的輸出結果是36 42 48 54 60
:
Observable.combineLatest(Observable.range(1, 6), Observable.range(6, 5), (integer, integer2) -> integer * integer2)
.subscribe(i -> System.out.print(i + " "));
利用下面的這張圖可以比較容易來說明這個問題:
[圖片上傳失敗...(image-5b184-1534261018262)]
上圖中的上面的兩條橫線代表用于拼接的兩個數(shù)據(jù)項期奔,下面的一條橫線是拼接之后的結果侧馅。combineLatest
的作用是拼接最新發(fā)射的兩個數(shù)據(jù)。下面我們用上圖的過程來說明該方法是如何執(zhí)行的:開始第一條只有1的時候無法拼接呐萌,馁痴;當?shù)诙l出現(xiàn)A的時候,此時最新的數(shù)據(jù)是1和A肺孤,故組合成一個1A罗晕;第二個數(shù)據(jù)項發(fā)射了B,此時最新的數(shù)據(jù)是1和B渠旁,故組合成1B攀例;第一條橫線發(fā)射了2船逮,此時最新的數(shù)據(jù)是2和B顾腊,因此得到了2B,依次類推挖胃。然后再回到我們上面的問題杂靶,第一個數(shù)據(jù)項連續(xù)發(fā)射了5個數(shù)據(jù)的時候梆惯,第二個數(shù)據(jù)項一個都沒有發(fā)射出來,因此沒有任何輸出吗垮;然后第二個數(shù)據(jù)項開始發(fā)射數(shù)據(jù)垛吗,當?shù)诙€數(shù)據(jù)項發(fā)射了6的時候,此時最新的數(shù)據(jù)組合是6和6烁登,故得36怯屉;然后,第二個數(shù)據(jù)項發(fā)射了7饵沧,此時最新的數(shù)據(jù)組合是6和7锨络,故得42,依次類推狼牺。
該方法也有對應的combineLatestDelayError
方法羡儿,用途也是只有當所有的數(shù)據(jù)都發(fā)射完畢的時候才去處理錯誤邏輯。
2.1.5 輔助操作
1.delay
delay
方法用于在發(fā)射數(shù)據(jù)之前停頓指定的時間是钥,比如下面的程序會在真正地發(fā)射數(shù)據(jù)之前停頓1秒:
Observable.range(1, 5).delay(1000, TimeUnit.MILLISECONDS).subscribe(System.out::print);
Thread.sleep(1500);
同樣delay
方法也有幾個重載的方法掠归,可以供我們用來指定觸發(fā)的線程等信息,這里給出其中的兩個悄泥,其他的可以參考源碼和文檔:
public final Observable<T> delay(long delay, TimeUnit unit)
public final Observable<T> delay(long delay, TimeUnit unit, Scheduler scheduler)
2.do系列
RxJava中還有一系列的方法可以供我們使用虏冻,它們共同的特點是都是以do
開頭,下面我們列舉一下這些方法并簡要說明一下它們各自的用途:
-
public final Observable<T> doAfterNext(Consumer<? super T> onAfterNext)
弹囚,會在onNext
方法之后觸發(fā)兄旬; -
public final Observable<T> doAfterTerminate(Action onFinally)
,會在Observable終止之后觸發(fā)余寥; -
public final Observable<T> doFinally(Action onFinally)
领铐,當onComplete
或者onError
的時候觸發(fā); -
public final Observable<T> doOnDispose(Action onDispose)
宋舷,當被dispose的時候觸發(fā)绪撵; -
public final Observable<T> doOnComplete(Action onComplete)
,當complete的時候觸發(fā)祝蝠; -
public final Observable<T> doOnEach(final Observer<? super T> observer)
音诈,當每個onNext
調用的時候觸發(fā); -
public final Observable<T> doOnError(Consumer<? super Throwable> onError)
绎狭,當調用onError
的時候觸發(fā)细溅; public final Observable<T> doOnLifecycle(final Consumer<? super Disposable> onSubscribe, final Action onDispose)
-
public final Observable<T> doOnNext(Consumer<? super T> onNext)
,儡嘶,會在onNext
的時候觸發(fā)喇聊; -
public final Observable<T> doOnSubscribe(Consumer<? super Disposable> onSubscribe)
,會在訂閱的時候觸發(fā)蹦狂; -
public final Observable<T> doOnTerminate(final Action onTerminate)
誓篱,當終止之前觸發(fā)朋贬。
這些方法可以看作是對操作執(zhí)行過程的一個監(jiān)聽,當指定的操作被觸發(fā)的時候會同時觸發(fā)這些監(jiān)聽方法:
Observable.range(1, 5)
.doOnEach(integerNotification -> System.out.println("Each : " + integerNotification.getValue()))
.doOnComplete(() -> System.out.println("complete"))
.doFinally(() -> System.out.println("finally"))
.doAfterNext(i -> System.out.println("after next : " + i))
.doOnSubscribe(disposable -> System.out.println("subscribe"))
.doOnTerminate(() -> System.out.println("terminal"))
.subscribe(i -> System.out.println("subscribe : " + i));
3.subscribeOn & observeOn
subscribeOn
用于指定Observable自身運行的線程窜骄,observeOn
用于指定發(fā)射數(shù)據(jù)所處的線程锦募,比如Android中的異步任務需要用observeOn
指定發(fā)射數(shù)據(jù)所在的線程是非主線程,然后執(zhí)行完畢之后將結果發(fā)送給主線程邻遏,就需要用subscribeOn
來指定糠亩。比如下面的程序,我們用這兩個方法來指定所在的線程:
Observable.create((ObservableOnSubscribe<Integer>) observableEmitter -> {
System.out.println(Thread.currentThread());
observableEmitter.onNext(0);
}).observeOn(Schedulers.newThread()).subscribeOn(Schedulers.computation())
.subscribe(integer -> System.out.println(Thread.currentThread()));
最終的輸出結果如下所示:
Thread[RxComputationThreadPool-1,5,main]
Thread[RxNewThreadScheduler-1,5,main]
4.timeout
用來設置一個超時時間准验,如果指定的時間之內(nèi)沒有任何數(shù)據(jù)被發(fā)射出來削解,那么就會執(zhí)行我們指定的數(shù)據(jù)項。如下面的程序所示沟娱,我們先為設置了一個間隔200毫秒的數(shù)字產(chǎn)生器氛驮,開始發(fā)射數(shù)據(jù)之前要停頓1秒鐘,因為我們設置的超時時間是500毫秒济似,因而在第500毫秒的時候會執(zhí)行我們傳入的數(shù)據(jù)項:
Observable.interval(1000, 200, TimeUnit.MILLISECONDS)
.timeout(500, TimeUnit.MILLISECONDS, Observable.rangeLong(1, 5))
.subscribe(System.out::print);
Thread.sleep(2000);
timeout
方法有多個重載方法矫废,可以為其指定線程等參數(shù),可以參考源碼或者文檔了解詳情砰蠢。
2.1.6 錯誤處理操作符
錯誤處理操作符主要用來提供給Observable蓖扑,用來對錯誤信息做統(tǒng)一的處理,常用的兩個是catch
和retry
台舱。
1.catch
catch操作會攔截原始的Observable的onError
通知律杠,將它替換為其他數(shù)據(jù)項或者數(shù)據(jù)序列,讓產(chǎn)生的Observable能夠正常終止或者根本不終止竞惋。在RxJava中該操作有3終類型:
-
onErrorReturn
:這種操作會在onError觸發(fā)的時候返回一個特殊的項替換錯誤柜去,并調用觀察者的onCompleted方法,而不會將錯誤傳遞給觀察者拆宛; -
onErrorResumeNext
:會在onError觸發(fā)的時候發(fā)射備用的數(shù)據(jù)項給觀察者嗓奢; -
onExceptionResumeNext
:如果onError觸發(fā)的時候onError收到的Throwable不是Exception,它會將錯誤傳遞給觀察者的onError方法浑厚,不會使用備用的Observable股耽。
下面是onErrorReturn
和onErrorResumeNext
的程序示例,這里第一段代碼會在出現(xiàn)錯誤的時候輸出666
钳幅,而第二段會在出現(xiàn)錯誤的時候發(fā)射數(shù)字12345
:
Observable.create((ObservableOnSubscribe<Integer>) observableEmitter -> {
observableEmitter.onError(null);
observableEmitter.onNext(0);
}).onErrorReturn(throwable -> 666).subscribe(System.out::print);
Observable.create((ObservableOnSubscribe<Integer>) observableEmitter -> {
observableEmitter.onError(null);
observableEmitter.onNext(0);
}).onErrorResumeNext(Observable.range(1,5)).subscribe(System.out::print);
2.retry
retry
使用了一種錯誤重試機制物蝙,它可以在出現(xiàn)錯誤的時候進行重試,我們可以通過參數(shù)指定重試機制的條件敢艰。以下面的程序為例诬乞,這里我們設置了當出現(xiàn)錯誤的時候會進行2次重試,因此,第一次的時候出現(xiàn)錯誤會調用onNext
丽惭,重試2次又會調用2次onNext
击奶,第二次重試的時候因為重試又出現(xiàn)了錯誤辈双,因此此時會觸發(fā)onError
方法责掏。也就是說,下面這段代碼會觸發(fā)onNext
3次换衬,觸發(fā)onError()
1次:
Observable.create(((ObservableOnSubscribe<Integer>) emitter -> {
emitter.onNext(0);
emitter.onError(new Throwable("Error1"));
emitter.onError(new Throwable("Error2"));
})).retry(2).subscribe(i -> System.out.println("onNext : " + i), error -> System.out.print("onError : " + error));
retry
有幾個重載的方法和功能相近的方法,下面是這些方法的定義(選取部分):
-
public final Observable<T> retry()
:會進行無限次地重試证芭; public final Observable<T> retry(BiPredicate<? super Integer, ? super Throwable> predicate)
-
public final Observable<T> retry(long times)
:指定重試次數(shù)瞳浦; public final Observable<T> retry(long times, Predicate<? super Throwable> predicate)
public final Observable<T> retryUntil(final BooleanSupplier stop)
public final Observable<T> retryWhen(Function<? super Observable<Throwable>, ? extends ObservableSource<?>> handler)
2.1.7 條件操作符和布爾操作符
1.all & any
-
all
用來判斷指定的數(shù)據(jù)項是否全部滿足指定的要求,這里的“要求”可以使用一個函數(shù)來指定废士; -
any
用來判斷指定的Observable是否存在滿足指定要求的數(shù)據(jù)項叫潦。
在下面的程序中,我們用該函數(shù)來判斷指定的數(shù)據(jù)項是否全部滿足大于5的要求官硝,顯然是不滿足的矗蕊,因此下面的程序將會輸出false
:
Observable.range(5, 5).all(i -> i>5).subscribe(System.out::println); // false
Observable.range(5, 5).any(i -> i>5).subscribe(System.out::println); // true
以下是該方法的定義:
public final Single<Boolean> all(Predicate<? super T> predicate)
public final Single<Boolean> any(Predicate<? super T> predicate)
2.contains & isEmpty
這兩個方法分別用來判斷數(shù)據(jù)項中是否包含我們指定的數(shù)據(jù)項,已經(jīng)判斷數(shù)據(jù)項是否為空:
Observable.range(5, 5).contains(4).subscribe(System.out::println); // false
Observable.range(5, 5).isEmpty().subscribe(System.out::println); // false
以下是這兩個方法的定義:
public final Single<Boolean> isEmpty()
public final Single<Boolean> contains(final Object element)
3.sequenceEqual
sequenceEqual
用來判斷兩個Observable發(fā)射出的序列是否是相等的绷蹲。比如下面的方法用來判斷兩個序列是否相等:
Observable.sequenceEqual(Observable.range(1,5), Observable.range(1, 5)).subscribe(System.out::println);
4.amb
amb
作用的兩個或多個Observable或渤,但是只會發(fā)射最先發(fā)射數(shù)據(jù)的那個Observable的全部數(shù)據(jù):
Observable.amb(Arrays.asList(Observable.range(1, 5), Observable.range(6, 5))).subscribe(System.out::print)
該方法及其功能近似的方法的定義刘急,這里前兩個是靜態(tài)的方法,第二個屬于實例方法:
public static <T> Observable<T> amb(Iterable<? extends ObservableSource<? extends T>> sources)
public static <T> Observable<T> ambArray(ObservableSource<? extends T>... sources)
public final Observable<T> ambWith(ObservableSource<? extends T> other)
5.defaultIfEmpty
defaultIfEmpty
用來當指定的序列為空的時候指定一個用于發(fā)射的值卿操。下面的程序中,我們直接調用發(fā)射器的onComplete
方法孙援,因此序列是空的害淤,結果輸出一個整數(shù)6
:
Observable.create((ObservableOnSubscribe<Integer>) Emitter::onComplete).defaultIfEmpty(6).subscribe(System.out::print);
下面是該方法的定義:
public final Observable<T> defaultIfEmpty(T defaultItem)
2.1.8 轉換操作符
1.toList & toSortedList
toList
和toSortedList
用于將序列轉換成列表,后者相對于前者增加了排序的功能:
Observable.range(1, 5).toList().subscribe(System.out::println);
Observable.range(1, 5).toSortedList(Comparator.comparingInt(o -> -o)).subscribe(System.out::println);
下面是它們的定義拓售,它們有多個重載版本筝家,這里選擇其中的兩個進行說明:
public final Single<List<T>> toList()
public final Single<List<T>> toSortedList(final Comparator<? super T> comparator)
注意一下,這里的返回結果是Single
類型的邻辉,不過這并不妨礙我們繼續(xù)使用鏈式操作溪王,因為Single
的方法和Observable
基本一致。
另外還要注意這里的Single
中的參數(shù)是一個List<T>
值骇,也就是說莹菱,它把整個序列轉換成了一個列表對象。因此吱瘩,上面的兩個示例程序的輸出是:
[1, 2, 3, 4, 5]
[5, 4, 3, 2, 1]
2.toMap & toMultimap
toMap
用于將發(fā)射的數(shù)據(jù)轉換成另一個類型的值道伟,它的轉換過程是針對每一個數(shù)據(jù)項的。以下面的代碼為例,它會將原始的序列中的每個數(shù)字轉換成對應的十六進制蜜徽。但是祝懂,toMap
轉換的結果不一定是按照原始的序列的發(fā)射的順序來的:
Observable.range(8, 10).toMap(Integer::toHexString).subscribe(System.out::print);
與toMap
近似的是toMultimap
方法,它可以將原始序列的每個數(shù)據(jù)項轉換成一個集合類型:
Observable.range(8, 10).toMultimap(Integer::toHexString).subscribe(System.out::print);
上面的兩段程序的輸出結果是:
{11=17, a=10, b=11, c=12, d=13, e=14, f=15, 8=8, 9=9, 10=16}
{11=[17], a=[10], b=[11], c=[12], d=[13], e=[14], f=[15], 8=[8], 9=[9], 10=[16]}
上面的兩個方法的定義是(多個重載拘鞋,選擇部分):
public final <K> Single<Map<K, T>> toMap(final Function<? super T, ? extends K> keySelector)
public final <K> Single<Map<K, Collection<T>>> toMultimap(Function<? super T, ? extends K> keySelector)
3.toFlowable
該方法用于將一個Observable轉換成Flowable類型砚蓬,下面是該方法的定義,顯然這個方法使用了策略模式盆色,這里面涉及背壓相關的內(nèi)容灰蛙,我們后續(xù)再詳細介紹。
public final Flowable<T> toFlowable(BackpressureStrategy strategy)
4.to
相比于上面的方法隔躲,to
方法的限制更加得寬泛摩梧,你可以將指定的Observable轉換成任意你想要的類型(如果你可以做到的話),下面是一個示例代碼宣旱,用來將指定的整數(shù)序列轉換成另一個整數(shù)類型的Observable仅父,只不過這里的每個數(shù)據(jù)項都是原來的列表中的數(shù)據(jù)總數(shù)的值:
Observable.range(1, 5).to(Observable::count).subscribe(System.out::println);
下面是該方法的定義:
public final <R> R to(Function<? super Observable<T>, R> converter)
2.2 線程控制
之前有提到過RxJava的線程控制是通過subscribeOn
和observeOn
兩個方法來完成的。
這里我們梳理一下RxJava提供的幾種線程調度器以及RxAndroid為Android提供的調度器的使用場景和區(qū)別等浑吟。
-
Schedulers.io()
:代表適用于io操作的調度器笙纤,增長或縮減來自適應的線程池,通常用于網(wǎng)絡买置、讀寫文件等io密集型的操作粪糙。重點需要注意的是線程池是無限制的,大量的I/O調度操作將創(chuàng)建許多個線程并占用內(nèi)存忿项。 -
Schedulers.computation()
:計算工作默認的調度器蓉冈,代表CPU計算密集型的操作,與I/O操作無關轩触。它也是許多RxJava方法寞酿,比如buffer()
,debounce()
,delay()
,interval()
,sample()
,skip()
,的默認調度器脱柱。 -
Schedulers.newThread()
:代表一個常規(guī)的新線程伐弹。 -
Schedulers.immediate()
:這個調度器允許你立即在當前線程執(zhí)行你指定的工作。它是timeout()
,timeInterval()
以及timestamp()
方法默認的調度器榨为。 -
Schedulers.trampoline()
:當我們想在當前線程執(zhí)行一個任務時惨好,并不是立即,我們可以用trampoline()
將它入隊随闺。這個調度器將會處理它的隊列并且按序運行隊列中每一個任務日川。它是repeat()
和retry()
方法默認的調度器。
以及RxAndroid提供的線程調度器:
AndroidSchedulers.mainThread()
用來指代Android的主線程
2.3 總結
上面的這些操作也基本適用于Flowable
矩乐、Single
龄句、Completable
和Maybe
回论。
我們花費了很多的時間和精力來梳理了這些方法,按照上面的內(nèi)容分歇,使用RxJava實現(xiàn)一些基本的或者高級的操作都不是什么問題傀蓉。
但是,Observable更適用于處理一些數(shù)據(jù)規(guī)模較小的問題职抡,當數(shù)據(jù)規(guī)模比較多的時候可能會出現(xiàn)MissingBackpressureException
異常葬燎。
因此,我們還需要了解背壓和Flowable
的相關內(nèi)容才能更好地理解和應用RxJava.