本文基于
RxJava1.x
版本切厘,閱讀本文前請先了解 RxJava 的基本使用。
RxJava
版本已升級到 RxJava2.x 疫稿,各個 API 均有不同程度的變化,具體請查看官方文檔员萍。
參考文檔:
1 Observable 的創(chuàng)建
1.1 from()
public static <T> Observable<T> from(Iterable<? extends T> iterable);
轉(zhuǎn)換集合為一個每次發(fā)射集合中一個元素的 Observable 對象。
使用場景: 對集合(數(shù)組、List 等)進行遍歷日麸。
其他 from() API:
public static <T> Observable<T> from(Future<? extends T> future);
public static <T> Observable<T> from(Future<? extends T> future, long timeout, TimeUnit unit);
public static <T> Observable<T> from(Future<? extends T> future, Scheduler scheduler);
舉例:
// 1. 遍歷集合
Observable<String> observable = Observable.from(new String[]{"hello", "hi"});
// 2. 使用 Future 創(chuàng)建 Observable涕刚,F(xiàn)uture 表示一個異步計算的結果极景。
FutureTask<String> futureTask = new FutureTask<String>(new Callable<String>() {
@Override
public String call() throws Exception {
// TODO 執(zhí)行異步操作并返回數(shù)據(jù)
return "hihi";
}
});
Scheduler.Worker worker = Schedulers.io().createWorker();
worker.schedule(new Action0() {
@Override
public void call() {
futureTask.run();
}
});
Observable<String> observable = Observable.from(futureTask);
1.2 just()
public static <T> Observable<T> just(final T value);
轉(zhuǎn)換一個或多個 Object 為依次發(fā)射這些 Object 的 Observable 對象。
使用場景: 轉(zhuǎn)換一個或多個普通 Object 為 Observable 對象译秦,如轉(zhuǎn)換數(shù)據(jù)庫查詢結果棋返、網(wǎng)絡查詢結果等晰房。
其他 just() API:
public static <T> Observable<T> just(T t1, T t2, T t3, T t4);
public static <T> Observable<T> just(T t1, T t2, T t3, T t4, T t5);
public static <T> Observable<T> just(T t1, T t2, T t3, T t4, T t5, T t6);
public static <T> Observable<T> just(T t1, T t2, T t3, T t4, T t5, T t6, T t7);
public static <T> Observable<T> just(T t1, T t2, T t3, T t4, T t5, T t6, T t7, T t8);
public static <T> Observable<T> just(T t1, T t2, T t3, T t4, T t5, T t6, T t7, T t8, T t9);
public static <T> Observable<T> just(T t1, T t2, T t3, T t4, T t5, T t6, T t7, T t8, T t9, T t10);
舉例:
Observable<String> observable = Observable.just("hello");
// 使用 just() 遍歷幾個元素
Observable<String> observable = Observable.just("hello", "hi", "...");
// 使用 from() 方法遍歷猖吴,效果和 just() 一樣。
String[] stringArrs = new String[]{"hello", "hi", "..."};
Observable<String> observable = Observable.from(stringArrs);
just()
方法可傳入 1~10 個參數(shù)党窜,也就說當元素個數(shù)小于等于 10 的時候既可以使用 just()
也可以使用 from()
壤玫,否則只能用 from()
方法楚里。
1.3 create()
public static <T> Observabl<T> create(OnSubscribe<T> f);
返回一個在被 OnSubscribe 訂閱時執(zhí)行特定方法的 Observable 對象家坎。
使用場景: 不推薦使用苏携,可使用其他操作符替代装蓬,如使用 from()
操作符完成遍歷。
其他 create() API:
@Beta public static <S, T> Observable<T> create(SyncOnSubscribe<S, T> syncOnSubscribe);
@Experimental public static <S, T> Observable<T> create(AsyncOnSubscribe<S, T> asyncOnSubscribe);
舉例:
Observable.OnSubscribe<String> onSubscribe = new Observable.OnSubscribe< String >() {
@Override
public void call(Subscriber<? super String > subscriber) {
// onNext() 方法可執(zhí)行多次
subscribe.onNext("hello");
subscribe.onCompleted();
}
};
Observable<Object> observable = Observable.create(onSubscribe);
此方法不常用,大多數(shù)時候都是使用 just()
、from()
等方法岳锁,如上面那串代碼就可以寫成:
Observable<Object> observable = Observable.just("hello");
1.4 interval()
public static Observable<Long> interval(long interval, TimeUnit unit);
返回一個每隔指定的時間間隔就發(fā)射一個序列號的 Observable 對象。
使用場景: 可使用該操作符完成定時柱搜、倒計時等功能表制。
其他 interval() API:
public static Observable<Long> interval(long interval, TimeUnit unit, Scheduler scheduler);
public static Observable<Long> interval(long initialDelay, long period, TimeUnit unit);
舉例:
// 每隔 1 s 發(fā)送一個序列號,序列號從 0 開始设拟,每次累加 1。
Observable<Long> observable = Observable.interval(1, TimeUnit.SECONDS);
1.5 timer()
public static Observable<Long> timer(long delay, TimeUnit unit);
創(chuàng)建一個在指定延遲時間后發(fā)射一條數(shù)據(jù)(固定值:0)的 Observable 對象。
使用場景: 可用來完成定時功能牢硅。
其他 timer() API:
舉例:
// 定時 3 s
Observable<Long> observable = Observable.timer(3, TimeUnit.SECONDS);
1.6 range()
public static Observable<Integer> range(int start, int count);
創(chuàng)建一個發(fā)射指定范圍內(nèi)的連續(xù)整數(shù)的 Observable 對象佳励。
使用場景: 可使用該操作符完成一個 fori
的循環(huán)妙黍,如 for(int i=5;i<=7;i++)
--> Observable.range(5, 3)
。
其他 range() API:
舉例:
// 依次發(fā)射 5、6妇垢、7
Observable<Integer> observable = Observable.range(5, 3);
1.7 empty()
public static <T> Observable<T> empty();
創(chuàng)建一個不發(fā)射任何數(shù)據(jù)就發(fā)出 onCompleted()
通知的 Observable 對象肉康。
舉例:
// 發(fā)出一個 onCompleted() 通知
Observable<Object> observable = Observable.empty();
1.8 error()
public static <T> Observable<T> error(Throwable exception);
創(chuàng)建不發(fā)射任何數(shù)據(jù)就發(fā)出 onError
通知的 Observable 對象闯估。
使用場景: 程序中捕獲異常后,可使用該操作符把捕獲的異常傳遞到后面的邏輯中處理吼和。
舉例:
// 發(fā)出一個 onError() 通知
Observable<Object> observable = Observable.error(new Throwable("message"));
1.9 never()
public static <T> Observable<T> never();
創(chuàng)建一個不發(fā)射任何數(shù)據(jù)和通知的 Observable 對象涨薪。
舉例:
Observable<Object> observable = Observable.never();
1.10 defer()
public static <T> Observable<T> defer(Func0<Observable<T>> observableFactory);
在訂閱的時候才會創(chuàng)建 Observable 對象;每一次訂閱都創(chuàng)建一個新的 Observable 對象。
使用場景: 可以使用該操作符封裝需要被多次執(zhí)行的函數(shù)筐摘。
舉例:
Observable<String> observable = Observable.defer(new Func0<Observable<String>>() {
@Override
public Observable<String> call() {
return Observable.just("string");
}
});
2 重做
2.1 repeat()
public final Observable<T> repeat();
使Observable 對象在發(fā)出 onNext()
通知之后重復發(fā)射數(shù)據(jù)。重做結束才會發(fā)出 onComplete()
通知桨菜,若重做過程中出現(xiàn)異常則會中斷并發(fā)出 onError()
通知。
使用場景: 可使用該操作符指定一次任務執(zhí)行完成后立即重復執(zhí)行上一次的任務矿卑,如發(fā)送多次網(wǎng)絡請求等业舍。
其他 repeat() API:
舉例:
Observable<String> observable = Observable.just("string");
// 無限重復執(zhí)行
observable.repeat();
// 重復執(zhí)行 5 次
observable.repeat(5);
2.2 repeatWhen()
使Observable 對象在發(fā)出 onNext()
通知之后有條件的重復發(fā)射數(shù)據(jù)机杜。重做結束才會發(fā)出 onCompleted()
通知,若重做過程中出現(xiàn)異常則會中斷并發(fā)出 onError()
通知。
使用場景: 可使用該操作符指定滿足一定條件時重復執(zhí)行一個任務来破,如發(fā)送多次網(wǎng)絡請求等。
其他 repeatWhen() API:
舉例:
observable.repeatWhen(new Func1<Observable<? extends Void>, Observable<?>>() {
@Override
public Observable<?> call(Observable<? extends Void> observable) {
// 重復 3 次, 每次間隔 1 s
return observable.zipWith(Observable.range(1, 3), new Func2<Void, Integer, Integer>() {
@Override
public Integer call(Void aVoid, Integer integer) {
return integer;
}
}).flatMap(integer -> Observable.timer(1, TimeUnit.SECONDS));
}
});
3 重試
3.1 retry()
public final Observable<T> retry();
在執(zhí)行 Observable對象的序列出現(xiàn)異常時祭阀,不直接發(fā)出 onError()
通知柏蘑,而是重新訂閱該 Observable對象,直到重做過程中未出現(xiàn)異常,則會發(fā)出 onNext()
和 onCompleted()
通知劫哼;若重做過程中也出現(xiàn)異常橘洞,則會繼續(xù)重試炭懊,直到達到重試次數(shù)上限嘲碧,超出次數(shù)后發(fā)出最新的 onError()
通知。
使用場景: 網(wǎng)絡等請求異常出錯后履婉,可重新發(fā)起請求苛茂。
其他 retry() API:
舉例:
Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
System.out.println(".......");
int a = 1 / 0;
subscriber.onNext(a);
subscriber.onCompleted();
}
});
// 無限次的重試
observable.retry();
// 重試 3 次
observable.retry(3);
// 使用謂語函數(shù)決定是否重試
observable.retry(new Func2<Integer, Throwable, Boolean>() {
@Override
public Boolean call(Integer integer, Throwable throwable) {
// 參數(shù) integer 是訂閱的次數(shù); 參數(shù) throwable 是拋出的異常
// 返回值為 true 表示重試, 返回值為 false 表示不重試
return false;
}
});
3.2 retryWhen()
有條件的執(zhí)行重試。
使用場景: 網(wǎng)絡等請求異常出錯后,若滿足一定條件费韭,則重新發(fā)起請求。
其他 retryWhen() API:
舉例:
// 重試 3 次督暂,每次間隔 1 s
observable.retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() {
@Override
public Observable<?> call(Observable<? extends Throwable> observable) {
return observable.zipWith(Observable.range(1, 3), new Func2<Throwable, Integer, Object>() {
@Override
public Object call(Throwable throwable, Integer integer) {
return integer;
}
}).flatMap(new Func1<Object, Observable<?>>() {
@Override
public Observable<?> call(Object o) {
return Observable.timer(1, TimeUnit.SECONDS);
}
});
}
});
4 變換
4.1 map()
public final <R> Observable<R> map(Func1<? super T, ? extends R> func);
把源 Observable 發(fā)射的元素應用于指定的函數(shù),并發(fā)送該函數(shù)的結果穷吮。
使用場景: 將從網(wǎng)絡獲取的數(shù)據(jù)(NetData 對象)轉(zhuǎn)換為數(shù)據(jù)庫相關對象(DBData對象)并使用 Observable 發(fā)送逻翁。
舉例:
Observable.just(2)
.map(new Func1<Integer, String>() {
@Override
public String call(Integer integer) {
return String.valueOf(String.format("原始數(shù)據(jù)的兩倍為: %s", integer * 2));
}
});
4.2 flatMap()
public final <R> Observable<R> flatMap(Func1<? super T, ? extends Observable<? extends R>> func);
轉(zhuǎn)換源 Observable 對象為另一個 Observable 對象。
使用場景: 從網(wǎng)絡獲取數(shù)據(jù)并使用 obsA 對象發(fā)射捡鱼,flatMap() 操作符中可將數(shù)據(jù)存進數(shù)據(jù)庫并返回一個新的對象 obsB。
其他 flatMap() API:
舉例:
Observable.just(2)
.flatMap(new Func1<Integer, Observable<Long>>() {
@Override
public Observable<Long> call(Integer integer) {
// 轉(zhuǎn)換為一個定時 integer 秒的 Observable 對象
return Observable.timer(integer, TimeUnit.SECONDS);
}
});
5 過濾
5.1 filter()
public final Observable<T> filter(Func1<? super T, Boolean> predicate);
只發(fā)射滿足指定謂詞的元素管引。
使用場景: 可使用 filter 代替 if 語句。
舉例:
Observable.just(-1, -2, 0, 1, 2)
.filter(new Func1<Integer, Boolean>() {
@Override
public Boolean call(Integer integer) {
return integer > 0;
}
});
5.2 first()
public final Observable<T> first();
返回一個僅僅發(fā)射源 Observable 發(fā)射的第一個[滿足指定謂詞的]元素的 Observable,如果源 Observable 為空噩翠,則會拋出一個 NoSuchElementException
戏自。
使用場景: 順序發(fā)出多條數(shù)據(jù)猛们,只接收第一條。
其他 first() API:
舉例:
// 發(fā)射第一個元素
Observable.just(-1, -2, 0, 1, 2).first();
// 發(fā)射滿足條件的第一個元素
Observable.just(-1, -2, 0, 1, 2)
.first(new Func1<Integer, Boolean>() {
@Override
public Boolean call(Integer integer) {
return integer > 0;
}
});
// 會拋出 NoSuchElementException 異常
Observable.empty().first();
5.3 last()
public final Observable<T> last();
返回一個僅僅發(fā)射源 Observable 發(fā)射的倒數(shù)第一個[滿足指定謂詞的]元素的 Observable,如果源 Observable 為空虎韵,則會拋出一個 NoSuchElementException
硅瞧。
使用場景: 順序發(fā)出多條數(shù)據(jù),只接收最后一條。
其他 last() API:
舉例:
// 發(fā)射倒數(shù)第一個元素
Observable.just(-1, -2, 0, 1, 2).first();
// 發(fā)射滿足條件的倒數(shù)第一個元素
Observable.just(-1, -2, 0, 1, 2)
.first(new Func1<Integer, Boolean>() {
@Override
public Boolean call(Integer integer) {
return integer < 0;
}
});
// 會拋出 NoSuchElementException 異常
Observable.empty().last();
5.4 skip()
public final Observable<T> skip(int count);
跳過前面指定數(shù)量或指定時間內(nèi)的元素膀曾,只發(fā)射后面的元素。
其他 skip() API:
舉例:
Observable.just(-1, -2, 0, 1, 2)
.skip(2) // 跳過前兩條數(shù)據(jù)
5.5 skipLast()
public final Observable<T> skipLast(int count);
跳過前面指定數(shù)量或指定時間內(nèi)的元素,只發(fā)射后面的元素察迟。指定時間時會延遲源 Observable 發(fā)射的任何數(shù)據(jù)斩狱。
其他 skipLast() API:
public final Observable<T> skipLast(long time, TimeUnit unit);
public final Observable<T> skipLast(long time, TimeUnit unit, Scheduler scheduler);
舉例:
Observable.just(-1, -2, 0, 1, 2)
.skipLast(2) // 跳過后兩條數(shù)據(jù)
5.6 take()
public final Observable<T> take(final int count);
只發(fā)射前面指定數(shù)量或指定時間內(nèi)的元素喊废。
其他 take() API:
舉例:
Observable.just(-1, -2, 0, 1, 2).take(3); // 只發(fā)射前三條數(shù)據(jù)
5.7 takeLast()
public final Observable<T> takeLast(final int count);
只發(fā)射后面指定數(shù)量或指定時間內(nèi)的元素污筷。指定時間時會延遲源 Observable 發(fā)射的任何數(shù)據(jù)。
其他 takeLast() API:
public final Observable<T> takeLast(int count, long time, TimeUnit unit);
public final Observable<T> takeLast(int count, long time, TimeUnit unit, Scheduler scheduler);
public final Observable<T> takeLast(long time, TimeUnit unit);
public final Observable<T> takeLast(long time, TimeUnit unit, Scheduler scheduler);
舉例:
Observable.just(-1, -2, 0, 1, 2).takeLast(3); // 只發(fā)射后三條數(shù)據(jù)
5.8 sample()
public final Observable<T> sample(long period, TimeUnit unit);
定期發(fā)射 Observable 發(fā)射的最后一條數(shù)據(jù)乍赫。
其他 sample() API:
public final Observable<T> sample(long period, TimeUnit unit, Scheduler scheduler);
public final <U> Observable<T> sample(Observable<U> sampler);
舉例:
Observable.interval(300, TimeUnit.MILLISECONDS)
.sample(2, TimeUnit.SECONDS)
5.9 elementAt()
public final Observable<T> elementAt(int index);
只發(fā)射指定索引的元素瓣蛀。
使用場景: 按索引去集合中的元素等。
舉例:
Observable.just(-1, -2, 0, 1, 2).elementAt(2); // 發(fā)射索引為 2 的數(shù)據(jù)
5.10 elementAtOrDefault()
public final Observable<T> elementAtOrDefault(int index, T defaultValue);
只發(fā)射指定索引的元素雷厂,若該索引對應的元素不存在惋增,則發(fā)射默認值。
舉例:
Observable.just(-1, -2, 0, 1, 2).elementAtOrDefault(9, -5); // 發(fā)射索引為 9的數(shù)據(jù)改鲫,若不存在诈皿,則發(fā)射 -5
5.11 ignoreElements()
public final Observable<T> ignoreElements();
不發(fā)射任何數(shù)據(jù)林束,直接發(fā)出 onCompleted()
通知。
舉例:
Observable.just(-1, -2, 0, 1, 2).ignoreElements()
5.12 distinct()
public final Observable<T> distinct();
過濾重復的元素稽亏,過濾規(guī)則是:只允許還沒有發(fā)射過的元素通過壶冒。
其他 distinct() API:
舉例:
// 直接過濾
Observable.just(-1, -2, 0, 1, 2, 1).distinct();
// 通過生成的 key 值過濾
Observable.just(-1, -2, 0, 1, 2, 1).distinct(new Func1<Integer, Integer>() {
@Override
public Integer call(Integer integer) {
// 隨機生成 key
return integer * (int)(Math.random() * 10);
}
});
5.13 debounce()
public final Observable<T> debounce(long timeout, TimeUnit unit)
源 Observable 每產(chǎn)生結果后,如果在規(guī)定的間隔時間內(nèi)沒有產(chǎn)生新的結果截歉,則發(fā)射這個結果胖腾,否則會忽略這個結果。該操作符會過濾掉發(fā)射速率過快的數(shù)據(jù)項瘪松。
其他 debounce() API:
public final Observable<T> debounce(long timeout, TimeUnit unit, Scheduler scheduler)
public final <U> Observable<T> debounce(Func1<? super T, ? extends Observable<U>> debounceSelector)
舉例:
Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
try {
//產(chǎn)生結果的間隔時間分別為100咸作、200、300...900毫秒
for (int i = 1; i < 10; i++) {
subscriber.onNext(i);
Thread.sleep(i * 100);
}
subscriber.onCompleted();
} catch (Exception e) {
subscriber.onError(e);
}
}
});
observable.debounce(400, TimeUnit.MILLISECONDS) // 超時時間為400毫秒
該例子產(chǎn)生結果為:依次打印5宵睦、6记罚、7、8状飞。
附:功能實現(xiàn)
延時遍歷
// 遍歷
Observable<Integer> traverseObservable = Observable.just(3, 4, 5, 6);
// 計時
Observable<Long> intervalObservable = Observable.interval(1, TimeUnit.SECONDS);
Func2<Long, Integer, Integer> func2 = new Func2<Long, Integer, Integer>() {
@Override
public Integer call(Long aLong, Integer integer) {
return integer;
}
};
intervalObservable.zipWith(traverseObservable, func2)
.toBlocking()
.subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
System.out.println("onCompleted");
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
}
@Override
public void onNext(Integer integer) {
System.out.println(integer);
}
});
倒計時
int startTime = 10;
Observable.interval(0, 1, TimeUnit.SECONDS)
.take(startTime + 1) // 接收 startTime + 1 次
.map(new Func1<Long, Long>() {
@Override
public Long call(Long time) {
// 1 2 3...轉(zhuǎn)換為...3 2 1
return startTime - time;
}
})
.toBlocking()
.subscribe(new Subscriber<Long>() {
@Override
public void onCompleted() {
System.out.println("倒計時結束");
}
@Override
public void onError(Throwable e) {
System.out.println("倒計時出現(xiàn)異常");
e.printStackTrace();
}
@Override
public void onNext(Long aLong) {
System.out.println(String.format("倒計時: %s s", aLong));
}
});