RxJava2 實戰(zhàn)知識梳理(5) - 簡單及進階的輪詢操作

RxJava2 實戰(zhàn)系列文章

RxJava2 實戰(zhàn)知識梳理(1) - 后臺執(zhí)行耗時操作悬而,實時通知 UI 更新
RxJava2 實戰(zhàn)知識梳理(2) - 計算一段時間內(nèi)數(shù)據(jù)的平均值
RxJava2 實戰(zhàn)知識梳理(3) - 優(yōu)化搜索聯(lián)想功能
RxJava2 實戰(zhàn)知識梳理(4) - 結合 Retrofit 請求新聞資訊
RxJava2 實戰(zhàn)知識梳理(5) - 簡單及進階的輪詢操作
RxJava2 實戰(zhàn)知識梳理(6) - 基于錯誤類型的重試請求
RxJava2 實戰(zhàn)知識梳理(7) - 基于 combineLatest 實現(xiàn)的輸入表單驗證
RxJava2 實戰(zhàn)知識梳理(8) - 使用 publish + merge 優(yōu)化先加載緩存尸执,再讀取網(wǎng)絡數(shù)據(jù)的請求過程
RxJava2 實戰(zhàn)知識梳理(9) - 使用 timer/interval/delay 實現(xiàn)任務調(diào)度
RxJava2 實戰(zhàn)知識梳理(10) - 屏幕旋轉導致 Activity 重建時恢復任務
RxJava2 實戰(zhàn)知識梳理(11) - 檢測網(wǎng)絡狀態(tài)并自動重試請求
RxJava2 實戰(zhàn)知識梳理(12) - 實戰(zhàn)講解 publish & replay & share & refCount & autoConnect
RxJava2 實戰(zhàn)知識梳理(13) - 如何使得錯誤發(fā)生時不自動停止訂閱關系
RxJava2 實戰(zhàn)知識梳理(14) - 在 token 過期時,刷新過期 token 并重新發(fā)起請求
RxJava2 實戰(zhàn)知識梳理(15) - 實現(xiàn)一個簡單的 MVP + RxJava + Retrofit 應用


一、示例

1.1 應用場景

今天,我們介紹一種新的場景,輪詢操作创肥。也就是說,我們會嘗試間隔一段時間就向服務器發(fā)起一次請求值朋,在使用RxJava之前,該需求的實現(xiàn)一般有兩種方式:

  • 通過Handler發(fā)送延時消息巩搏,在handleMessage中請求服務器之后昨登,再次發(fā)送一個延時消息,直到達到循環(huán)次數(shù)為止贯底。
  • 使用Java提供的定時器Timer丰辣。

我們嘗試使用RxJava2提供的操作符來實現(xiàn)這一需求,這里演示兩種方式的輪詢禽捆,并將單次訪問的次數(shù)限制在5次:

  • 固定時延:使用intervalRange操作符笙什,每間隔3s執(zhí)行一次任務。
  • 變長時延:使用repeatWhen操作符實現(xiàn)胚想,第一次執(zhí)行完任務后琐凭,等待4s再執(zhí)行第二次任務,在第二次任務執(zhí)行完成后浊服,等待5s统屈,依次遞增胚吁。

2.2 示例

public class PollingActivity extends AppCompatActivity {

    private static final String TAG = PollingActivity.class.getSimpleName();

    private TextView mTvSimple;
    private TextView mTvAdvance;
    private CompositeDisposable mCompositeDisposable;

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_polling);
        mTvSimple = (TextView) findViewById(R.id.tv_simple);
        mTvSimple.setOnClickListener(new View.OnClickListener() {

            @Override
            public void onClick(View v) {
                startSimplePolling();
            }

        });
        mTvAdvance = (TextView) findViewById(R.id.tv_advance);
        mTvAdvance.setOnClickListener(new View.OnClickListener() {

            @Override
            public void onClick(View v) {
                startAdvancePolling();
            }

        });
        mCompositeDisposable = new CompositeDisposable();
    }

    private void startSimplePolling() {
        Log.d(TAG, "startSimplePolling");
        Observable<Long> observable = Observable.intervalRange(0, 5, 0, 3000, TimeUnit.MILLISECONDS).take(5).doOnNext(new Consumer<Long>() {

            @Override
            public void accept(Long aLong) throws Exception {
                doWork(); //這里使用了doOnNext,因此DisposableObserver的onNext要等到該方法執(zhí)行完才會回調(diào)愁憔。
            }

        });
        DisposableObserver<Long> disposableObserver = getDisposableObserver();
        observable.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(disposableObserver);
        mCompositeDisposable.add(disposableObserver);
    }

    private void startAdvancePolling() {
        Log.d(TAG, "startAdvancePolling click");
        Observable<Long> observable = Observable.just(0L).doOnComplete(new Action() {

            @Override
            public void run() throws Exception {
                doWork();
            }

        }).repeatWhen(new Function<Observable<Object>, ObservableSource<Long>>() {

            private long mRepeatCount;

            @Override
            public ObservableSource<Long> apply(Observable<Object> objectObservable) throws Exception {
                //必須作出反應腕扶,這里是通過flatMap操作符。
                return objectObservable.flatMap(new Function<Object, ObservableSource<Long>>() {

                    @Override
                    public ObservableSource<Long> apply(Object o) throws Exception {
                        if (++mRepeatCount > 4) {
                            //return Observable.empty(); //發(fā)送onComplete消息吨掌,無法觸發(fā)下游的onComplete回調(diào)半抱。
                            return Observable.error(new Throwable("Polling work finished")); //發(fā)送onError消息,可以觸發(fā)下游的onError回調(diào)膜宋。
                        }
                        Log.d(TAG, "startAdvancePolling apply");
                        return Observable.timer(3000 + mRepeatCount * 1000, TimeUnit.MILLISECONDS);
                    }

                });
            }

        });
        DisposableObserver<Long> disposableObserver = getDisposableObserver();
        observable.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(disposableObserver);
        mCompositeDisposable.add(disposableObserver);
    }

    private DisposableObserver<Long> getDisposableObserver() {

        return new DisposableObserver<Long>() {

            @Override
            public void onNext(Long aLong) {}

            @Override
            public void onError(Throwable throwable) {
                Log.d(TAG, "DisposableObserver onError, threadId=" + Thread.currentThread().getId() + ",reason=" + throwable.getMessage());
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "DisposableObserver onComplete, threadId=" + Thread.currentThread().getId());
            }
        };
    }

    private void doWork() {
        long workTime = (long) (Math.random() * 500) + 500;
        try {
            Log.d(TAG, "doWork start,  threadId=" + Thread.currentThread().getId());
            Thread.sleep(workTime);
            Log.d(TAG, "doWork finished");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    @Override
    protected void onDestroy() {
        super.onDestroy();
        mCompositeDisposable.clear();
    }
}

startSimplePolling對應于固定時延輪詢:


startAdvancePolling對應于變長時延輪詢:

三窿侈、示例解析

下面,就讓我們一起來分析一下上面這兩個例子中涉及到的知識點激蹲。

3.1 intervalRange & doOnNext 實現(xiàn)固定時延輪詢

對于固定時延輪詢的需求棉磨,采用的是intervalRange的方式來實現(xiàn),它是一個創(chuàng)建型操作符学辱,該Observable第一次先發(fā)射一個特定的數(shù)據(jù)乘瓤,之后間隔一段時間再發(fā)送一次,它是intervalrange的結合體策泣,這兩個操作符的原理圖為:

interval 原理圖

range 原理圖

該操作符的優(yōu)勢在于:

  • interval相比衙傀,它可以指定第一個發(fā)送數(shù)據(jù)項的時延、指定發(fā)送數(shù)據(jù)項的個數(shù)萨咕。
  • range相比统抬,它可以指定兩項數(shù)據(jù)之間發(fā)送的時延。

intervalRange的接收參數(shù)的含義為:

  • start:發(fā)送數(shù)據(jù)的起始值危队,為Long型聪建。
  • count:總共發(fā)送多少項數(shù)據(jù)。
  • initialDelay:發(fā)送第一個數(shù)據(jù)項時的起始時延茫陆。
  • period:兩項數(shù)據(jù)之間的間隔時間金麸。
  • TimeUnit:時間單位。

在輪詢操作中一般會進行一些耗時的網(wǎng)絡請求簿盅,因此我們選擇在doOnNext進行處理挥下,它會在下游的onNext方法被回調(diào)之前調(diào)用,但是它的運行線程可以通過subscribeOn指定桨醋,下游的運行線程再通過observerOn切換會主線程棚瘟,通過打印對應的線程ID可以驗證結果。

當要求的數(shù)據(jù)項都發(fā)送完畢之后喜最,最后會回調(diào)onComplete方法偎蘸。

3.2 repeatWhen 實現(xiàn)變長時延輪詢

3.2.1 使用 repeatWhen 實現(xiàn)重訂閱

之所以可以通過repeatWhen來實現(xiàn)輪詢,是因為它為我們提供了重訂閱的功能,而重訂閱有兩點要素:

  • 上游告訴我們一次訂閱已經(jīng)完成禀苦,這就需要上游回調(diào)onComplete函數(shù)蔓肯。
  • 我們告訴上游是否需要重訂閱,通過repeatWhenFunction函數(shù)所返回的Observable確定振乏,如果該Observable發(fā)送了onComplete或者onError則表示不需要重訂閱蔗包,結束整個流程;否則觸發(fā)重訂閱的操作慧邮。

其原理圖如下所示:

repeatWhen 原理圖

repeatWhen的難點在于如何定義它的Function參數(shù):

  • Function的輸入是一個Observable<Object>调限,輸出是一個泛型ObservableSource<?>
  • 如果輸出的Observable發(fā)送了onComplete或者onError則表示不需要重訂閱误澳,結束整個流程耻矮;否則觸發(fā)重訂閱的操作。也就是說忆谓,它 僅僅是作為一個是否要觸發(fā)重訂閱的通知裆装,onNext發(fā)送的是什么數(shù)據(jù)并不重要。
  • 對于每一次訂閱的數(shù)據(jù)流 Function 函數(shù)只會回調(diào)一次倡缠,并且是在onComplete的時候觸發(fā)哨免,它不會收到任何的onNext事件。
  • Function函數(shù)中昙沦,必須對輸入的 Observable<Object>進行處理琢唾,這里我們使用的是flatMap操作符接收上游的數(shù)據(jù),對于flatMap的解釋盾饮,大家可以參考 RxJava2 實戰(zhàn)知識梳理(4) - 結合 Retrofit 請求新聞資訊 采桃。

而當我們不需要重訂閱時,有兩種方式:

  • 返回Observable.empty()丘损,發(fā)送onComplete消息普办,但是DisposableObserver并不會回調(diào)onComplete
  • 返回Observable.error(new Throwable("Polling work finished"))徘钥,DisposableObserveronError會被回調(diào)泌豆,并接受傳過去的錯誤信息。

3.2.2 使用 Timer 實現(xiàn)兩次訂閱之間的時延

以上就是對于repeatWhen的解釋吏饿,與repeatWhen相類似的還有retryWhen操作符,這個我們在下一篇文章中再介紹蔬浙,接下來猪落,我們看一下如何實現(xiàn)兩次事件的時延。

前面我們分析過畴博,重訂閱觸發(fā)的時間是在返回的ObservableSource發(fā)送了onNext事件之后笨忌,那么我們通過該ObservableSource延遲發(fā)送一個事件就可以實現(xiàn)相應的需求,這里使用的是time操作符俱病,它的原理圖如下所示官疲,也就是袱结,在訂閱完成后,等待指定的時間它才會發(fā)送消息途凫。

timer 原理圖

3.2.3 使用 doOnComplete 完成輪詢的耗時操作

由于在訂閱完成時會發(fā)送onComplete消息垢夹,那么我們就可以在doOnComplete中進行輪詢所要進行的具體操作,它所運行的線程通過subscribeOn指定维费。


更多文章果元,歡迎訪問我的 Android 知識梳理系列:

最后編輯于
?著作權歸作者所有,轉載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市犀盟,隨后出現(xiàn)的幾起案子而晒,更是在濱河造成了極大的恐慌,老刑警劉巖阅畴,帶你破解...
    沈念sama閱讀 206,126評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件倡怎,死亡現(xiàn)場離奇詭異,居然都是意外死亡贱枣,警方通過查閱死者的電腦和手機监署,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,254評論 2 382
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來冯事,“玉大人焦匈,你說我怎么就攤上這事£墙觯” “怎么了缓熟?”我有些...
    開封第一講書人閱讀 152,445評論 0 341
  • 文/不壞的土叔 我叫張陵,是天一觀的道長摔笤。 經(jīng)常有香客問我够滑,道長,這世上最難降的妖魔是什么吕世? 我笑而不...
    開封第一講書人閱讀 55,185評論 1 278
  • 正文 為了忘掉前任彰触,我火速辦了婚禮,結果婚禮上命辖,老公的妹妹穿的比我還像新娘况毅。我一直安慰自己,他們只是感情好尔艇,可當我...
    茶點故事閱讀 64,178評論 5 371
  • 文/花漫 我一把揭開白布尔许。 她就那樣靜靜地躺著,像睡著了一般终娃。 火紅的嫁衣襯著肌膚如雪味廊。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 48,970評論 1 284
  • 那天,我揣著相機與錄音余佛,去河邊找鬼柠新。 笑死,一個胖子當著我的面吹牛辉巡,可吹牛的內(nèi)容都是我干的恨憎。 我是一名探鬼主播,決...
    沈念sama閱讀 38,276評論 3 399
  • 文/蒼蘭香墨 我猛地睜開眼红氯,長吁一口氣:“原來是場噩夢啊……” “哼框咙!你這毒婦竟也來了?” 一聲冷哼從身側響起痢甘,我...
    開封第一講書人閱讀 36,927評論 0 259
  • 序言:老撾萬榮一對情侶失蹤喇嘱,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后塞栅,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體者铜,經(jīng)...
    沈念sama閱讀 43,400評論 1 300
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 35,883評論 2 323
  • 正文 我和宋清朗相戀三年放椰,在試婚紗的時候發(fā)現(xiàn)自己被綠了作烟。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 37,997評論 1 333
  • 序言:一個原本活蹦亂跳的男人離奇死亡砾医,死狀恐怖拿撩,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情如蚜,我是刑警寧澤压恒,帶...
    沈念sama閱讀 33,646評論 4 322
  • 正文 年R本政府宣布,位于F島的核電站错邦,受9級特大地震影響探赫,放射性物質發(fā)生泄漏。R本人自食惡果不足惜撬呢,卻給世界環(huán)境...
    茶點故事閱讀 39,213評論 3 307
  • 文/蒙蒙 一伦吠、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧魂拦,春花似錦毛仪、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,204評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至借尿,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背路翻。 一陣腳步聲響...
    開封第一講書人閱讀 31,423評論 1 260
  • 我被黑心中介騙來泰國打工狈癞, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人茂契。 一個月前我還...
    沈念sama閱讀 45,423評論 2 352
  • 正文 我出身青樓蝶桶,卻偏偏與公主長得像,于是被迫代替她去往敵國和親掉冶。 傳聞我的和親對象是個殘疾皇子真竖,可洞房花燭夜當晚...
    茶點故事閱讀 42,722評論 2 345

推薦閱讀更多精彩內(nèi)容