RxJava2 實戰(zhàn)系列文章
RxJava2 實戰(zhàn)知識梳理(1) - 后臺執(zhí)行耗時操作悬而,實時通知 UI 更新
RxJava2 實戰(zhàn)知識梳理(2) - 計算一段時間內(nèi)數(shù)據(jù)的平均值
RxJava2 實戰(zhàn)知識梳理(3) - 優(yōu)化搜索聯(lián)想功能
RxJava2 實戰(zhàn)知識梳理(4) - 結合 Retrofit 請求新聞資訊
RxJava2 實戰(zhàn)知識梳理(5) - 簡單及進階的輪詢操作
RxJava2 實戰(zhàn)知識梳理(6) - 基于錯誤類型的重試請求
RxJava2 實戰(zhàn)知識梳理(7) - 基于 combineLatest 實現(xiàn)的輸入表單驗證
RxJava2 實戰(zhàn)知識梳理(8) - 使用 publish + merge 優(yōu)化先加載緩存尸执,再讀取網(wǎng)絡數(shù)據(jù)的請求過程
RxJava2 實戰(zhàn)知識梳理(9) - 使用 timer/interval/delay 實現(xiàn)任務調(diào)度
RxJava2 實戰(zhàn)知識梳理(10) - 屏幕旋轉導致 Activity 重建時恢復任務
RxJava2 實戰(zhàn)知識梳理(11) - 檢測網(wǎng)絡狀態(tài)并自動重試請求
RxJava2 實戰(zhàn)知識梳理(12) - 實戰(zhàn)講解 publish & replay & share & refCount & autoConnect
RxJava2 實戰(zhàn)知識梳理(13) - 如何使得錯誤發(fā)生時不自動停止訂閱關系
RxJava2 實戰(zhàn)知識梳理(14) - 在 token 過期時,刷新過期 token 并重新發(fā)起請求
RxJava2 實戰(zhàn)知識梳理(15) - 實現(xiàn)一個簡單的 MVP + RxJava + Retrofit 應用
一、示例
1.1 應用場景
今天,我們介紹一種新的場景,輪詢操作创肥。也就是說,我們會嘗試間隔一段時間就向服務器發(fā)起一次請求值朋,在使用RxJava
之前,該需求的實現(xiàn)一般有兩種方式:
- 通過
Handler
發(fā)送延時消息巩搏,在handleMessage
中請求服務器之后昨登,再次發(fā)送一個延時消息,直到達到循環(huán)次數(shù)為止贯底。 - 使用
Java
提供的定時器Timer
丰辣。
我們嘗試使用RxJava2
提供的操作符來實現(xiàn)這一需求,這里演示兩種方式的輪詢禽捆,并將單次訪問的次數(shù)限制在5
次:
- 固定時延:使用
intervalRange
操作符笙什,每間隔3s
執(zhí)行一次任務。 - 變長時延:使用
repeatWhen
操作符實現(xiàn)胚想,第一次執(zhí)行完任務后琐凭,等待4s
再執(zhí)行第二次任務,在第二次任務執(zhí)行完成后浊服,等待5s
统屈,依次遞增胚吁。
2.2 示例
public class PollingActivity extends AppCompatActivity {
private static final String TAG = PollingActivity.class.getSimpleName();
private TextView mTvSimple;
private TextView mTvAdvance;
private CompositeDisposable mCompositeDisposable;
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_polling);
mTvSimple = (TextView) findViewById(R.id.tv_simple);
mTvSimple.setOnClickListener(new View.OnClickListener() {
@Override
public void onClick(View v) {
startSimplePolling();
}
});
mTvAdvance = (TextView) findViewById(R.id.tv_advance);
mTvAdvance.setOnClickListener(new View.OnClickListener() {
@Override
public void onClick(View v) {
startAdvancePolling();
}
});
mCompositeDisposable = new CompositeDisposable();
}
private void startSimplePolling() {
Log.d(TAG, "startSimplePolling");
Observable<Long> observable = Observable.intervalRange(0, 5, 0, 3000, TimeUnit.MILLISECONDS).take(5).doOnNext(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
doWork(); //這里使用了doOnNext,因此DisposableObserver的onNext要等到該方法執(zhí)行完才會回調(diào)愁憔。
}
});
DisposableObserver<Long> disposableObserver = getDisposableObserver();
observable.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(disposableObserver);
mCompositeDisposable.add(disposableObserver);
}
private void startAdvancePolling() {
Log.d(TAG, "startAdvancePolling click");
Observable<Long> observable = Observable.just(0L).doOnComplete(new Action() {
@Override
public void run() throws Exception {
doWork();
}
}).repeatWhen(new Function<Observable<Object>, ObservableSource<Long>>() {
private long mRepeatCount;
@Override
public ObservableSource<Long> apply(Observable<Object> objectObservable) throws Exception {
//必須作出反應腕扶,這里是通過flatMap操作符。
return objectObservable.flatMap(new Function<Object, ObservableSource<Long>>() {
@Override
public ObservableSource<Long> apply(Object o) throws Exception {
if (++mRepeatCount > 4) {
//return Observable.empty(); //發(fā)送onComplete消息吨掌,無法觸發(fā)下游的onComplete回調(diào)半抱。
return Observable.error(new Throwable("Polling work finished")); //發(fā)送onError消息,可以觸發(fā)下游的onError回調(diào)膜宋。
}
Log.d(TAG, "startAdvancePolling apply");
return Observable.timer(3000 + mRepeatCount * 1000, TimeUnit.MILLISECONDS);
}
});
}
});
DisposableObserver<Long> disposableObserver = getDisposableObserver();
observable.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(disposableObserver);
mCompositeDisposable.add(disposableObserver);
}
private DisposableObserver<Long> getDisposableObserver() {
return new DisposableObserver<Long>() {
@Override
public void onNext(Long aLong) {}
@Override
public void onError(Throwable throwable) {
Log.d(TAG, "DisposableObserver onError, threadId=" + Thread.currentThread().getId() + ",reason=" + throwable.getMessage());
}
@Override
public void onComplete() {
Log.d(TAG, "DisposableObserver onComplete, threadId=" + Thread.currentThread().getId());
}
};
}
private void doWork() {
long workTime = (long) (Math.random() * 500) + 500;
try {
Log.d(TAG, "doWork start, threadId=" + Thread.currentThread().getId());
Thread.sleep(workTime);
Log.d(TAG, "doWork finished");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
protected void onDestroy() {
super.onDestroy();
mCompositeDisposable.clear();
}
}
startSimplePolling
對應于固定時延輪詢:
startAdvancePolling
對應于變長時延輪詢:三窿侈、示例解析
下面,就讓我們一起來分析一下上面這兩個例子中涉及到的知識點激蹲。
3.1 intervalRange & doOnNext 實現(xiàn)固定時延輪詢
對于固定時延輪詢的需求棉磨,采用的是intervalRange
的方式來實現(xiàn),它是一個創(chuàng)建型操作符学辱,該Observable
第一次先發(fā)射一個特定的數(shù)據(jù)乘瓤,之后間隔一段時間再發(fā)送一次,它是interval
和range
的結合體策泣,這兩個操作符的原理圖為:
該操作符的優(yōu)勢在于:
- 與
interval
相比衙傀,它可以指定第一個發(fā)送數(shù)據(jù)項的時延、指定發(fā)送數(shù)據(jù)項的個數(shù)萨咕。 - 與
range
相比统抬,它可以指定兩項數(shù)據(jù)之間發(fā)送的時延。
intervalRange
的接收參數(shù)的含義為:
-
start
:發(fā)送數(shù)據(jù)的起始值危队,為Long
型聪建。 -
count
:總共發(fā)送多少項數(shù)據(jù)。 -
initialDelay
:發(fā)送第一個數(shù)據(jù)項時的起始時延茫陆。 -
period
:兩項數(shù)據(jù)之間的間隔時間金麸。 -
TimeUnit
:時間單位。
在輪詢操作中一般會進行一些耗時的網(wǎng)絡請求簿盅,因此我們選擇在doOnNext
進行處理挥下,它會在下游的onNext
方法被回調(diào)之前調(diào)用,但是它的運行線程可以通過subscribeOn
指定桨醋,下游的運行線程再通過observerOn
切換會主線程棚瘟,通過打印對應的線程ID
可以驗證結果。
當要求的數(shù)據(jù)項都發(fā)送完畢之后喜最,最后會回調(diào)onComplete
方法偎蘸。
3.2 repeatWhen 實現(xiàn)變長時延輪詢
3.2.1 使用 repeatWhen 實現(xiàn)重訂閱
之所以可以通過repeatWhen
來實現(xiàn)輪詢,是因為它為我們提供了重訂閱的功能,而重訂閱有兩點要素:
- 上游告訴我們一次訂閱已經(jīng)完成禀苦,這就需要上游回調(diào)
onComplete
函數(shù)蔓肯。 - 我們告訴上游是否需要重訂閱,通過
repeatWhen
的Function
函數(shù)所返回的Observable
確定振乏,如果該Observable
發(fā)送了onComplete
或者onError
則表示不需要重訂閱蔗包,結束整個流程;否則觸發(fā)重訂閱的操作慧邮。
其原理圖如下所示:
repeatWhen
的難點在于如何定義它的Function
參數(shù):
-
Function
的輸入是一個Observable<Object>
调限,輸出是一個泛型ObservableSource<?>
。 - 如果輸出的
Observable
發(fā)送了onComplete
或者onError
則表示不需要重訂閱误澳,結束整個流程耻矮;否則觸發(fā)重訂閱的操作。也就是說忆谓,它 僅僅是作為一個是否要觸發(fā)重訂閱的通知裆装,onNext
發(fā)送的是什么數(shù)據(jù)并不重要。 -
對于每一次訂閱的數(shù)據(jù)流 Function 函數(shù)只會回調(diào)一次倡缠,并且是在
onComplete
的時候觸發(fā)哨免,它不會收到任何的onNext
事件。 - 在
Function
函數(shù)中昙沦,必須對輸入的 Observable<Object>進行處理琢唾,這里我們使用的是flatMap
操作符接收上游的數(shù)據(jù),對于flatMap
的解釋盾饮,大家可以參考 RxJava2 實戰(zhàn)知識梳理(4) - 結合 Retrofit 請求新聞資訊 采桃。
而當我們不需要重訂閱時,有兩種方式:
- 返回
Observable.empty()
丘损,發(fā)送onComplete
消息普办,但是DisposableObserver
并不會回調(diào)onComplete
。 - 返回
Observable.error(new Throwable("Polling work finished"))
徘钥,DisposableObserver
的onError
會被回調(diào)泌豆,并接受傳過去的錯誤信息。
3.2.2 使用 Timer 實現(xiàn)兩次訂閱之間的時延
以上就是對于repeatWhen
的解釋吏饿,與repeatWhen
相類似的還有retryWhen
操作符,這個我們在下一篇文章中再介紹蔬浙,接下來猪落,我們看一下如何實現(xiàn)兩次事件的時延。
前面我們分析過畴博,重訂閱觸發(fā)的時間是在返回的ObservableSource
發(fā)送了onNext
事件之后笨忌,那么我們通過該ObservableSource
延遲發(fā)送一個事件就可以實現(xiàn)相應的需求,這里使用的是time
操作符俱病,它的原理圖如下所示官疲,也就是袱结,在訂閱完成后,等待指定的時間它才會發(fā)送消息途凫。
3.2.3 使用 doOnComplete 完成輪詢的耗時操作
由于在訂閱完成時會發(fā)送onComplete
消息垢夹,那么我們就可以在doOnComplete
中進行輪詢所要進行的具體操作,它所運行的線程通過subscribeOn
指定维费。
更多文章果元,歡迎訪問我的 Android 知識梳理系列:
- Android 知識梳理目錄:http://www.reibang.com/p/fd82d18994ce
- 個人主頁:http://lizejun.cn
- 個人知識總結目錄:http://lizejun.cn/categories/