響應(yīng)式編程在Android中的應(yīng)用

響應(yīng)式編程簡介

  • 響應(yīng)式編程是一種基于異步數(shù)據(jù)流概念的編程模式。數(shù)據(jù)流就像一條河:它可以被觀測袁勺,被過濾席怪,被操作,或者為新的消費(fèi)者與另外一條流合并為一條新的流蛀序。
  • 響應(yīng)式編程的一個(gè)關(guān)鍵概念是事件欢瞪。事件可以被等待,可以觸發(fā)過程哼拔,也可以觸發(fā)其它事件引有。事件是唯一的以合適的方式將我們的現(xiàn)實(shí)世界映射到我們的軟件中:如果屋里太熱了我們就打開一扇窗戶。同樣的倦逐,當(dāng)我們更改電子表(變化的傳播)中的一些數(shù)值時(shí),我們需要更新整個(gè)表格或者我們的機(jī)器人碰到墻時(shí)會(huì)轉(zhuǎn)彎(響應(yīng)事件)。
  • 今天檬姥,響應(yīng)式編程最通用的一個(gè)場景是UI:我們的移動(dòng)App必須做出對(duì)網(wǎng)絡(luò)調(diào)用曾我、用戶觸摸輸入和系統(tǒng)彈框的響應(yīng)。在這個(gè)世界上健民,軟件之所以是事件驅(qū)動(dòng)并響應(yīng)的是因?yàn)楝F(xiàn)實(shí)生活也是如此抒巢。

響應(yīng)式編程的具體實(shí)現(xiàn) - RxJava

基本概念

RxJava的四種角色

  • Observable
  • Observer
  • Subscriber
  • Subject

Observable和Subject是兩個(gè)“生產(chǎn)”實(shí)體,Observer和Subscriber是兩個(gè)“消費(fèi)”實(shí)體秉犹。

熱Observable和冷Observable

從發(fā)射物的角度來看蛉谜,有兩種不同的Observable:熱的和冷的。一個(gè)"熱"的Observable典型的只要一創(chuàng)建完就開始發(fā)射數(shù)據(jù)崇堵,因此所有后續(xù)訂閱它的觀察者可能從序列中間的某個(gè)位置開始接受數(shù)據(jù)(有一些數(shù)據(jù)錯(cuò)過了)型诚。一個(gè)"冷"的Observable會(huì)一直等待,直到有觀察者訂閱它才開始發(fā)射數(shù)據(jù)鸳劳,因此這個(gè)觀察者可以確保會(huì)收到整個(gè)數(shù)據(jù)序列狰贯。

Observable創(chuàng)建符

  • Observable.create()
Observable.create(new Observable.OnSubscribe<Object>(){
    @Override
    public void call(Subscriber<? super Object> subscriber{
    }
});
  • Observable.from()
    from() 創(chuàng)建符可以從一個(gè)列表/數(shù)組來創(chuàng)建Observable,并一個(gè)接一個(gè)的從列表/數(shù)組中發(fā)射出來每一個(gè)對(duì)象,或者也可以從Java Future 類來創(chuàng)建Observable赏廓,并發(fā)射Future對(duì)象的 .get() 方法返回的結(jié)果值涵紊。傳入 Future 作為參數(shù)時(shí),我們可以指定一個(gè)超時(shí)的值幔摸。Observable將等待來自 Future 的結(jié)果摸柄;如果在超時(shí)之前仍然沒有結(jié)果返回,Observable將會(huì)觸發(fā) onError() 方法通知觀察者有錯(cuò)誤發(fā)生了既忆。

    List<Integer> items = new ArrayList<Integer>();
    items.add(1);
    items.add(10);
    items.add(100);
    items.add(200);
    
    Observable<Integer> observableString = Observable.from(items);
    Subscription subscriptionPrint = observableString.subscribe(new        Observer<Integer>() {
      @Override
      public void onCompleted() {
      System.out.println("Observable completed");
      }
      @Override
      public void onError(Throwable e) {
      System.out.println("Oh,no! Something wrong happened驱负!");
      }
      @Override
      public void onNext(Integer item) {
      System.out.println("Item is " + item);
      }
    });
    
  • Observable.just()
    just() 方法可以傳入一到九個(gè)參數(shù),它們會(huì)按照傳入的參數(shù)的順序來發(fā)射它們尿贫。 just() 方法也可以接受列表或數(shù)組电媳,就像 from() 方法,但是它不會(huì)迭代列表發(fā)射每個(gè)值,它將會(huì)發(fā)射整個(gè)列表庆亡。通常匾乓,當(dāng)我們想發(fā)射一組已經(jīng)定義好的值時(shí)會(huì)用到它。但是如果我們的函數(shù)不是時(shí)變性的又谋,我們可以用just來創(chuàng)建一個(gè)更有組織性和可測性的代碼庫拼缝。

Observable<String> observableString = Observable.just(helloWorld
());
Subscription subscriptionPrint = observableString.subscribe(new
Observer<String>() {
    @Override
    public void onCompleted() {
    System.out.println("Observable completed");
    }
    @Override
    public void onError(Throwable e) {
    System.out.println("Oh,no! Something wrong happened!");
    }
    @Override
    public void onNext(String message) {
    System.out.println(message);
    }
});

helloWorld() 方法比較簡單,像這樣:

private String helloWorld(){
    return "Hello World";
}

Subject

Subject 既可以是 Observable彰亥,也可以是 Observer咧七。
RxJava 提供四種不同的 Subject :

  • PublishSubject
  • BehaviorSubject
    BehaviorSubject會(huì)首先向他的訂閱者發(fā)送截至訂閱前最新的一個(gè)數(shù)據(jù)對(duì)象(或初始值),然后正常發(fā)送訂閱后的數(shù)據(jù)流。

BehaviorSubject<Integer> behaviorSubject = BehaviorSubject.create(1);
```
在這個(gè)短例子中任斋,我們創(chuàng)建了一個(gè)能發(fā)射整形(Integer)的BehaviorSubject继阻。由于每當(dāng)Observes訂閱它時(shí)就會(huì)發(fā)射最新的數(shù)據(jù),所以它需要一個(gè)初始值。

  • ReplaySubject
    ReplaySubject 會(huì)緩存它所訂閱的所有數(shù)據(jù),向任意一個(gè)訂閱它的觀察者重發(fā):

ReplaySubject<Integer> replaySubject = ReplaySubject.create();
```

  • AsyncSubject

    當(dāng)Observable完成時(shí)AsyncSubject只會(huì)發(fā)布最后一個(gè)數(shù)據(jù)給已經(jīng)訂閱的每一個(gè)觀察者瘟檩。

    AsyncSubject<Integer> asyncSubject = AsyncSubject.create();
    

直接創(chuàng)建 Observable

在我們的第一個(gè)列子里抹缕,我們將檢索安裝的應(yīng)用列表并填充RecycleView的item來展示它們。我們也設(shè)想一個(gè)下拉刷新的功能和一個(gè)進(jìn)度條來告知用戶當(dāng)前任務(wù)正在執(zhí)行墨辛。

首先卓研,我們創(chuàng)建Observable。我們需要一個(gè)函數(shù)來檢索安裝的應(yīng)用程序列表并把它提供給我們的觀察者睹簇。我們一個(gè)接一個(gè)的發(fā)射這些應(yīng)用程序數(shù)據(jù)奏赘,將它們分組到一個(gè)單獨(dú)的列表中,以此來展示響應(yīng)式方法的靈活性太惠。

private Observable<AppInfo> getApps(){
    return Observable.create(subscriber -> {
        List<AppInfoRich> apps = new ArrayList<AppInfoRich>();
        final Intent mainIntent = new Intent(Intent.ACTION_MAIN, null);
        mainIntent.addCategory(Intent.CATEGORY_LAUNCHER);
        List<ResolveInfo> infos = getActivity().queryIntentActivities(mainIntent, 0);
        for(ResolveInfo info : infos){
            apps.add(new AppInfoRich(getActivity(),info));
        }
        for (AppInfoRich appInfo:apps) {
            Bitmap icon = Utils.drawableToBitmap(appInfo.getIcon());
            String name = appInfo.getName();
            String iconPath = mFilesDir + "/" + name;
            Utils.storeBitmap(App.instance, icon,name);
            if (subscriber.isUnsubscribed()){
                return;
            }
            subscriber.onNext(new AppInfo(name, iconPath, appInfo.getLastUpdateTime()));
        }
        if (!subscriber.isUnsubscribed()){
            subscriber.onCompleted();
        }
    });
}

AppInfo為App信息的實(shí)體類磨淌,包括上次更新時(shí)間、圖標(biāo)垛叨、名字三個(gè)屬性伦糯,此處省略。

需要重點(diǎn)注意的是在發(fā)射新的數(shù)據(jù)或者完成序列之前要檢測觀察者的訂閱情況嗽元。這樣的話代碼會(huì)更高效敛纲,因?yàn)槿绻麤]有觀察者等待時(shí)我們就不生成沒有必要的數(shù)據(jù)項(xiàng)。

接下來剂癌,我們來定義下拉刷新的方法:

private void refreshTheList() {
    getApps().toSortedList()
    .subscribe(new Observer<List<AppInfo>>() {
    @Override
    public void onCompleted() {
        Toast.makeText(getActivity(), "Here is the list!", Toast.LENGTH_LONG).show();
    }
    @Override
    public void onError(Throwable e) {
        Toast.makeText(getActivity(), "Something went wrong!", Toast.LENGTH_SHORT).show();
        mSwipeRefreshLayout.setRefreshing(false);
    }
    @Override
    public void onNext(List<AppInfo> appInfos) {
        mRecyclerView.setVisibility(View.VISIBLE);
        mAdapter.addApplications(appInfos);
        mSwipeRefreshLayout.setRefreshing(false);
    }
    });
}

從列表創(chuàng)建 Observable

在這個(gè)例子中淤翔,我們將引入 from() 函數(shù)。使用這個(gè)特殊的“創(chuàng)建”函數(shù)佩谷,我們可以從一個(gè)列表中創(chuàng)建一個(gè)Observable旁壮。Observable將發(fā)射出列表中的每一個(gè)元素,我們可以通過訂閱它們來對(duì)這些發(fā)出的元素做出響應(yīng)谐檀。

private void loadList(List<AppInfo> apps) {
    mRecyclerView.setVisibility(View.VISIBLE);
    Observable.from(apps).subscribe(new Observer<AppInfo>() {
        @Override
        public void onCompleted() {
            mSwipeRefreshLayout.setRefreshing(false);
            Toast.makeText(getActivity(), "Here is the list!", Toast.LENGTH_LONG).show();
        }
        @Override
        public void onError(Throwable e) {
            Toast.makeText(getActivity(), "Something went wrong!", Toast.LENGTH_SHORT).show();
            mSwipeRefreshLayout.setRefreshing(false);
        }
        @Override
        public void onNext(AppInfo appInfo) {
            mAddedApps.add(appInfo);
            mAdapter.addApplication(mAddedApps.size() - 1, appInfo);
        }
    });
}

和第一個(gè)例子一個(gè)主要的不同是我們在 onCompleted() 函數(shù)中停掉進(jìn)度條是因?yàn)槲覀円粋€(gè)一個(gè)的發(fā)射元素抡谐;
第一個(gè)例子中的Observable發(fā)射的是整個(gè)list,因此在 onNext() 函數(shù)中停掉進(jìn)度條的做法是安全的。

具有特殊功能的創(chuàng)建符

  • just()

    你可以將一個(gè)函數(shù)作為參數(shù)傳給 just() 方法桐猬,你將會(huì)得到一個(gè)已存在代碼的原始Observable版本麦撵。在一個(gè)新的響應(yīng)式架構(gòu)的基礎(chǔ)上遷移已存在的代碼,這個(gè)方法可能是一個(gè)有用的開始點(diǎn)溃肪。

  • repeat()

    假如你想對(duì)一個(gè)Observable重復(fù)發(fā)射三次數(shù)據(jù) :

    Observable.just(appOne,appTwo,appThree)
        .repeat(3)
        .subscribe();
    

    我們在 just() 創(chuàng)建Observable后追加了 repeat(3) 免胃,它將會(huì)創(chuàng)建9個(gè)元素的序列,每一個(gè)都單獨(dú)發(fā)射惫撰。

  • defer()

    有這樣一個(gè)場景羔沙,你想在這聲明一個(gè)Observable但是你又想推遲這個(gè)Observable的創(chuàng)建直到觀察者訂閱時(shí)〕辏看下面的 getInt() 函數(shù):

    private Observable<Integer> getInt(){
        return Observable.create(subscriber -> {
            if(subscriber.isUnsubscribed()){
                return;
            }
            App.L.debug("GETINT");
            subscriber.onNext(42);
            subscriber.onCompleted();
        });
    }
    

    這比較簡單扼雏,并且它沒有做太多事情坚嗜,但是它正好為我們服務(wù)。現(xiàn)在呢蛤,我們可以創(chuàng)建一個(gè)新的Observable并且應(yīng)用 defer() :

    Observable<Integer> deferred = Observable.defer(this::getInt);
    

    這次惶傻, deferred 存在棍郎,但是 getInt() create() 方法還沒有調(diào)用 : logcat日志也沒有“GETINT”打印出來 :

    deferred.subscribe(number -> {
        App.L.debug(String.valueOf(number));
    });
    

    但是一旦我們訂閱了其障, create() 方法就會(huì)被調(diào)用并且我們也可以在logcat日志中打印出兩個(gè)值:GETINT 和 42。

  • range()

    從一個(gè)指定的數(shù)字X開始發(fā)射N個(gè)數(shù)字涂佃。range() 函數(shù)用兩個(gè)數(shù)字作為參數(shù):第一個(gè)是起始點(diǎn)励翼,第二個(gè)是我們想發(fā)射數(shù)字的個(gè)數(shù)。

  • interval()

    interval() 函數(shù)在你需要?jiǎng)?chuàng)建一個(gè)輪詢程序時(shí)非常好用辜荠。interval() 函數(shù)的兩個(gè)參數(shù):一個(gè)指定兩次發(fā)射的時(shí)間間隔汽抚,另一個(gè)是用到的時(shí)間單位。

  • timer()

    如果你需要一個(gè)一段時(shí)間之后才發(fā)射的Observable伯病,你可以使用 timer()造烁。

過濾Observables

過濾序列

RxJava讓我們使用 filter() 方法來過濾我們觀測序列中不想要的值。

我們從發(fā)出的每個(gè)元素中過濾掉開頭字母不是C的 :

.filter(new Func1<AppInfo,Boolean>(){
    @Override
    public Boolean call(AppInfo appInfo){
        return appInfo.getName().startsWith("C");
    }
})

我們傳一個(gè)新的 Func1 對(duì)象給 filter() 函數(shù)午笛,即只有一個(gè)參數(shù)的函數(shù)惭蟋。 Func1 有一個(gè) AppInfo 對(duì)象來作為它的參數(shù)類型并且返回 Boolean 對(duì)象。只要條件符合 filter() 函數(shù)就會(huì)返回 true 药磺。此時(shí)告组,值會(huì)發(fā)射出去并且所有的觀察者都會(huì)接收到。

filter() 函數(shù)最常用的用法之一時(shí)過濾 null 對(duì)象:

.filter(new Func1<AppInfo,Boolean>(){
    @Override
    public Boolean call(AppInfo appInfo){
        return appInfo != null;
    }
})

它幫我們免去了在 onNext() 函數(shù)調(diào)用中再去檢測 null 值癌佩,讓我們把注意力集中在應(yīng)用業(yè)務(wù)邏輯上木缝。

獲取我們需要的數(shù)據(jù)

當(dāng)我們不需要整個(gè)序列時(shí),而是只想取開頭或結(jié)尾的幾個(gè)元素围辙,我們可以用 take() 或 takeLast() 我碟。

  • take()

    take() 函數(shù)用整數(shù)N來作為一個(gè)參數(shù),從原始的序列中發(fā)射前N個(gè)元素姚建,然后完成:

    Observable.from(apps)
        .take(3)
        .subscribe(...);
    
  • takeLast()

    如果我們想要最后N個(gè)元素矫俺,我們只需使用 takeLast() 函數(shù):

    Observable.from(apps)
        .takeLast(3)
        .subscribe(...);
    

有且僅有一次

  • distinct()

    就像 takeLast() 一樣, distinct() 作用于一個(gè)完整的序列桥胞,然后得到重復(fù)的過濾項(xiàng)恳守,它需要記錄每一個(gè)發(fā)射的值。如果你在處理一大堆序列或者大的數(shù)據(jù)記得關(guān)注內(nèi)存使用情況贩虾。

    Observable<AppInfo> fullOfDuplicates = Observable.from(apps)
        .take(3)
        .repeat(3);
    fullOfDuplicates.distinct()
        .subscribe(...);
    
  • ditinctUntilChanged()

    如果在一個(gè)可觀測序列發(fā)射一個(gè)不同于之前的一個(gè)新值時(shí)讓我們得到通知這時(shí)候該怎么做催烘?ditinctUntilChanged() 過濾函數(shù)能做到這一點(diǎn)。它能輕易的忽略掉所有的重復(fù)并且只發(fā)射出新的值缎罢。

First and last

first() 方法和 last() 方法很容易弄明白伊群。它們從Observable中只發(fā)射第一個(gè)元素或者最后一個(gè)元素考杉。這兩個(gè)都可以傳 Func1 作為參數(shù)。
與 first() 和 last() 相似的變量有: firstOrDefault() 和 lastOrDefault() 舰始。這兩個(gè)函數(shù)當(dāng)可觀測序列完成時(shí)不再發(fā)射任何值時(shí)用得上崇棠。在這種場景下,如果Observable不再發(fā)射任何值時(shí)我們可以指定發(fā)射一個(gè)默認(rèn)的值丸卷。

Skip and SkipLast

skip() 和 skipLast() 函數(shù)與 take() 和 takeLast() 相對(duì)應(yīng)枕稀。它們用整數(shù)N作參數(shù),從本質(zhì)上來說谜嫉,它們不讓Observable發(fā)射前N個(gè)或者后N個(gè)值萎坷。

ElementAt

如果我們只想要可觀測序列發(fā)射的第五個(gè)元素該怎么辦? elementAt() 函數(shù)僅從一個(gè)序列中發(fā)射第n個(gè)元素然后就完成了沐兰。
如果我們想查找第五個(gè)元素但是可觀測序列只有三個(gè)元素可供發(fā)射時(shí)該怎么辦哆档?我們可以使用 elementAtOrDefault() 。

Sampling

在Observable后面加一個(gè) sample() 住闯,我們將創(chuàng)建一個(gè)新的可觀測序列瓜浸,它將在一個(gè)指定的時(shí)間間隔里由Observable發(fā)射最近一次的數(shù)值:

Observable<Integer> sensor = [...]
sensor.sample(30,TimeUnit.SECONDS)
    .subscribe(...);

如果我們想讓它定時(shí)發(fā)射第一個(gè)元素而不是最近的一個(gè)元素,我們可以使用 throttleFirst() 比原。

Timeout

我們可以使用 timeout() 函數(shù)來監(jiān)聽源可觀測序列,就是在我們設(shè)定的時(shí)間間隔內(nèi)如果沒有得到一個(gè)值則發(fā)射一個(gè)錯(cuò)誤插佛。我們可以認(rèn)為 timeout() 為一個(gè)Observable的限時(shí)的副本。如果在指定的時(shí)間間隔內(nèi)Observable不發(fā)射值的話春寿,它監(jiān)聽的原始的Observable時(shí)就會(huì)觸發(fā) onError() 函數(shù)朗涩。

Subscription subscription = getCurrentTemperature()
    .timeout(2,TimeUnit.SECONDS)
    .subscribe(...);

Debounce

debounce() 函數(shù)過濾掉由Observable發(fā)射的速率過快的數(shù)據(jù);如果在一個(gè)指定的時(shí)間間隔過去了仍舊沒有發(fā)射一個(gè)绑改,那么它將發(fā)射最后的那個(gè)谢床。

下圖展示了多久從Observable發(fā)射一次新的數(shù)據(jù), debounce() 函數(shù)開啟一個(gè)內(nèi)部定時(shí)器厘线,如果在這個(gè)時(shí)間間隔內(nèi)沒有新的據(jù)發(fā)射识腿,則新的Observable發(fā)射出最后一個(gè)數(shù)據(jù):

debounce() 函數(shù)示意圖

變換Observables

*map家族

RxJava提供了幾個(gè)mapping函數(shù): map() , flatMap() , concatMap() , flatMapIterable() 以及 switchMap() .所有這些函數(shù)都作用于一個(gè)可觀測序列,然后變換它發(fā)射的值造壮,最后用一種新的形式返回它們渡讼。

  • Map

    RxJava的 map 函數(shù)接收一個(gè)指定的 Func 對(duì)象然后將它應(yīng)用到每一個(gè)由Observable發(fā)射的值上。

    Observable.from(apps)
        .map(new Func1<AppInfo,AppInfo>(){
            @Override
            public Appinfo call(AppInfo appInfo){
                String currentName = appInfo.getName();
                String lowerCaseName = currentName.toLowerCase();
                appInfo.setName(lowerCaseName);
                return appInfo;
            }
        })
        .subscribe(...);
    

    正如你看到的耳璧,像往常一樣創(chuàng)建我們發(fā)射的Observable之后成箫,我們追加一個(gè) map 調(diào)用,我們創(chuàng)建一個(gè)簡單的函數(shù)來更新 AppInfo對(duì)象并提供一個(gè)名字小寫的新版本給觀察者旨枯。

  • FlatMap

    在復(fù)雜的場景中蹬昌,我們有一個(gè)這樣的Observable:它發(fā)射一個(gè)數(shù)據(jù)序列,這些數(shù)據(jù)本身也可以發(fā)射Observable攀隔。RxJava的 flatMap() 函數(shù)提供一種鋪平序列的方式皂贩,然后合并這些Observables發(fā)射的數(shù)據(jù)栖榨,最后將合并后的結(jié)果作為最終的Observable。

    flatMap() 函數(shù)示意圖

    當(dāng)我們在處理可能有大量的Observables時(shí)明刷,重要是記住任何一個(gè)Observables發(fā)生錯(cuò)誤的情況婴栽, flatMap() 將會(huì)觸發(fā)它自己的 onError() 函數(shù)并放棄整個(gè)鏈。重要的一點(diǎn)提示是關(guān)于合并部分:它允許交叉辈末。正如上圖所示愚争,這意味著 flatMap() 不能夠保證在最終生成的Observable中源Observables確切的發(fā)射順序。

  • ConcatMap

    RxJava的 concatMap() 函數(shù)解決了 flatMap() 的交叉問題本冲,提供了一種能夠把發(fā)射的值連續(xù)在一起的鋪平函數(shù)准脂,而不是合并它們,如下圖所示:

    這里寫圖片描述
  • FlatMapIterable

    作為*map家族的一員檬洞, flatMapInterable() 和 flatMap() 很像。僅有的本質(zhì)不同是它將源數(shù)據(jù)兩兩結(jié)成對(duì)并生成Iterable沟饥,而不是原始數(shù)據(jù)項(xiàng)和生成的Observables添怔。

  • SwitchMap

    switchMap() 和 flatMap() 很像,除了一點(diǎn):每當(dāng)源Observable發(fā)射一個(gè)新的數(shù)據(jù)項(xiàng)(Observable)時(shí)贤旷,它將取消訂閱并停止監(jiān)視之前那個(gè)數(shù)據(jù)項(xiàng)產(chǎn)生的Observable广料,并開始監(jiān)視當(dāng)前發(fā)射的這一個(gè)。

  • Scan

    RxJava的 scan() 函數(shù)可以看做是一個(gè)累積函數(shù)幼驶。 scan() 函數(shù)對(duì)原始Observable發(fā)射的每一項(xiàng)數(shù)據(jù)都應(yīng)用一個(gè)函數(shù)艾杏,計(jì)算出函數(shù)的結(jié)果值,并將該值填充回可觀測序列盅藻,等待和下一次發(fā)射的數(shù)據(jù)一起使用购桑。

    作為一個(gè)通用的例子,給出一個(gè)累加器:

    Observable.just(1,2,3,4,5)
        .scan((sum,item) -> sum + item)
        .subscribe(new Subscriber<Integer>() {
            @Override
            public void onCompleted() {
                Log.d("RXJAVA", "Sequence completed.");
            }
            @Override
            public void onError(Throwable e) {
                Log.e("RXJAVA", "Something went south!");
            }
            @Override
            public void onNext(Integer item) {
                Log.d("RXJAVA", "item is: " + item);
            }
        });
    

    我們得到的結(jié)果是:

    RXJAVA: item is: 1
    RXJAVA: item is: 3
    RXJAVA: item is: 6
    RXJAVA: item is: 10
    RXJAVA: item is: 15
    RXJAVA: Sequence completed.

GroupBy

RxJava提供了一個(gè)有用的函數(shù)從列表中按照指定的規(guī)則: groupBy() 來分組元素氏淑。下圖中的例子展示了 groupBy() 如何將發(fā)射的值根據(jù)他們的形狀來進(jìn)行分組勃蜘。

這里寫圖片描述

這個(gè)函數(shù)將源Observable變換成一個(gè)發(fā)射Observables的新的Observable。它們中的每一個(gè)新的Observable都發(fā)射一組指定的數(shù)據(jù)假残。

為了創(chuàng)建一個(gè)分組了的已安裝應(yīng)用列表缭贡,我們在 loadList() 函數(shù)中引入了一個(gè)新的元素:

Observable<GroupedObservable<String,AppInfo>> groupedItems = Observable.from(apps)
            .groupBy(new Func1<AppInfo,String>(){
                @Override
                public String call(AppInfo appInfo){
                    SimpleDateFormat formatter = new SimpleDateFormat("MM/yyyy");
                    return formatter.format(new Date(appInfo.getLastUpdateTime()));
                }
            });

現(xiàn)在我們創(chuàng)建了一個(gè)新的Observable, groupedItems 辉懒,它將會(huì)發(fā)射一個(gè)帶有 GroupedObservable 的序列阳惹。 GroupedObservable 是一個(gè)特殊的Observable,它源自一個(gè)分組的key眶俩。在這個(gè)例子中莹汤,key就是 String ,代表的意思是 Month/Year 格式化的最近更新日期仿便。

Buffer

RxJava中的 buffer() 函數(shù)將源Observable變換一個(gè)新的Observable体啰,這個(gè)新的Observable每次發(fā)射一組列表值而不是一個(gè)一個(gè)發(fā)射攒巍。

buffer() 函數(shù)有幾種變體。其中有一個(gè)是允許你指定一個(gè) skip 值:此后每 skip 項(xiàng)數(shù)據(jù)荒勇,用count項(xiàng)數(shù)據(jù)填充緩沖區(qū)柒莉。另一個(gè)是buffer() 帶一個(gè) timespan 的參數(shù),會(huì)創(chuàng)建一個(gè)每隔timespan時(shí)間段就會(huì)發(fā)射一個(gè)列表的Observable沽翔。

Window

RxJava的 window() 函數(shù)和 buffer() 很像兢孝,但是它發(fā)射的是Observable而不是列表。

正如 buffer() 一樣, window() 也有一個(gè) skip 變體仅偎。

Cast

cast() 函數(shù)是 map() 操作符的特殊版本跨蟹。它將源Observable中的每一項(xiàng)數(shù)據(jù)都轉(zhuǎn)換為新的類型,把它變成了不同的 Class 橘沥。

組合Observables

Merge

在”異步的世界“中經(jīng)常會(huì)創(chuàng)建這樣的場景窗轩,我們有多個(gè)來源但是又只想有一個(gè)結(jié)果:多輸入,單輸出座咆。RxJava的 merge() 方法將幫助你把兩個(gè)甚至更多的Observables合并到他們發(fā)射的數(shù)據(jù)項(xiàng)里痢艺。下圖給出了把兩個(gè)序列合并在一個(gè)最終發(fā)射的Observable。

這里寫圖片描述

正如你看到的那樣介陶,發(fā)射的數(shù)據(jù)被交叉合并到一個(gè)Observable里面堤舒。注意如果你同步的合并Observable,它們將連接在一起并且不會(huì)交叉哺呜。

Observable<AppInfo> mergedObserbable = Observable.merge(observableApps,observableReversedApps);
mergedObserbable.subscribe(...);

注意錯(cuò)誤時(shí)的toast消息舌缤,你可以認(rèn)為每個(gè)Observable拋出的錯(cuò)誤都將會(huì)打斷合并。如果你需要避免這種情況某残,RxJava提供了 mergeDelayError() 国撵,它能從一個(gè)Observable中繼續(xù)發(fā)射數(shù)據(jù)即便是其中有一個(gè)拋出了錯(cuò)誤。當(dāng)所有的Observables都完成時(shí)驾锰, mergeDelayError() 將會(huì)發(fā)射 onError()卸留。

ZIP

在一種新的可能場景中處理多個(gè)數(shù)據(jù)來源時(shí)會(huì)帶來:多從個(gè)Observables接收數(shù)據(jù),處理它們椭豫,然后將它們合并成一個(gè)新的可觀測序列來使用耻瑟。RxJava有一個(gè)特殊的方法可以完成: zip() 合并兩個(gè)或者多個(gè)Observables發(fā)射出的數(shù)據(jù)項(xiàng),根據(jù)指定的函數(shù)Func* 變換它們赏酥,并發(fā)射一個(gè)新值喳整。下圖展示了 zip() 方法如何處理發(fā)射的“numbers”和“l(fā)etters”然后將它們合并一個(gè)新的數(shù)據(jù)項(xiàng):

這里寫圖片描述
Observable.zip(observableApp, tictoc, (AppInfo appInfo, Long time) -> updateTitle(appInfo, time))
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(...);

zip() 函數(shù)有三個(gè)參數(shù):兩個(gè)Observables和一個(gè) Func2 。

Join

前面兩個(gè)方法裸扶, zip() 和 merge() 方法作用在發(fā)射數(shù)據(jù)的范疇內(nèi)框都,在決定如何操作值之前有些場景我們需要考慮時(shí)間的。RxJava的 join() 函數(shù)基于時(shí)間窗口將兩個(gè)Observables發(fā)射的數(shù)據(jù)結(jié)合在一起呵晨。

這里寫圖片描述

為了正確的理解上一張圖魏保,我們解釋下 join() 需要的參數(shù):

  • 第二個(gè)Observable和源Observable結(jié)合熬尺。
  • Func1 參數(shù):在指定的由時(shí)間窗口定義時(shí)間間隔內(nèi),源Observable發(fā)射的數(shù)據(jù)和從第二個(gè)Observable發(fā)射的數(shù)據(jù)相互配合返回的Observable谓罗。
  • Func1 參數(shù):在指定的由時(shí)間窗口定義時(shí)間間隔內(nèi)粱哼,第二個(gè)Observable發(fā)射的數(shù)據(jù)和從源Observable發(fā)射的數(shù)據(jù)相互配合返回的Observable。
  • Func2 參數(shù):定義已發(fā)射的數(shù)據(jù)如何與新發(fā)射的數(shù)據(jù)項(xiàng)相結(jié)合檩咱。

combineLatest

RxJava的 combineLatest() 函數(shù)有點(diǎn)像 zip() 函數(shù)的特殊形式揭措。正如我們已經(jīng)學(xué)習(xí)的, zip() 作用于最近未打包的兩個(gè)Observables刻蚯。相反绊含, combineLatest() 作用于最近發(fā)射的數(shù)據(jù)項(xiàng):如果 Observable1 發(fā)射了A并且 Observable2 發(fā)射了B和C, combineLatest() 將會(huì)分組處理AB和AC炊汹,如下圖所示:

這里寫圖片描述

And,Then和When

在將來還有一些 zip() 滿足不了的場景躬充。如復(fù)雜的架構(gòu),或者是僅僅為了個(gè)人愛好兵扬,你可以使用And/Then/When解決方案麻裳。它們在RxJava的joins包下,使用Pattern和Plan作為中介器钟,將發(fā)射的數(shù)據(jù)集合并到一起。

這里寫圖片描述

Switch

給出一個(gè)發(fā)射多個(gè)Observables序列的源Observable妙蔗, switch() 訂閱到源Observable然后開始發(fā)射由第一個(gè)發(fā)射的Observable發(fā)射的一樣的數(shù)據(jù)傲霸。當(dāng)源Observable發(fā)射一個(gè)新的Observable時(shí), switch() 立即取消訂閱前一個(gè)發(fā)射數(shù)
據(jù)的Observable(因此打斷了從它那里發(fā)射的數(shù)據(jù)流)然后訂閱一個(gè)新的Observable眉反,并開始發(fā)射它的數(shù)據(jù)昙啄。

StartWith

RxJava的 startWith() 是 concat() 的對(duì)應(yīng)部分。正如 concat() 向發(fā)射數(shù)據(jù)的Observable追加數(shù)據(jù)那樣寸五,在Observable開始發(fā)射他們的數(shù)據(jù)之前梳凛,startWith() 通過傳遞一個(gè)參數(shù)來先發(fā)射一個(gè)數(shù)據(jù)序列。

Schedulers-解決Android主線程問題

Schedulers

調(diào)度器以一種最簡單的方式將多線程用在你的Apps的中梳杏。它們時(shí)RxJava重要的一部分并能很好地與Observables協(xié)同工作韧拒。它們無需處理實(shí)現(xiàn)、同步十性、線程叛溢、平臺(tái)限制、平臺(tái)變化而可以提供一種靈活的方式來創(chuàng)建并發(fā)程序劲适。

RxJava提供了5種調(diào)度器:

  • .io()
  • .computation()
  • .immediate()
  • .newThread()
  • .trampoline()
Schedulers.io()

這個(gè)調(diào)度器時(shí)用于I/O操作楷掉。它基于根據(jù)需要,增長或縮減來自適應(yīng)的線程池霞势。我們將使用它來修復(fù)我們之前看到的 StrictMode 違規(guī)做法烹植。由于它專用于I/O操作斑鸦,所以并不是RxJava的默認(rèn)方法;正確的使用它是由開發(fā)者決定的草雕。

重點(diǎn)需要注意的是線程池是無限制的巷屿,大量的I/O調(diào)度操作將創(chuàng)建許多個(gè)線程并占用內(nèi)存。一如既往的是促绵,我們需要在性能和簡捷兩者之間找到一個(gè)有效的平衡點(diǎn)攒庵。

Schedulers.computation()

這個(gè)是計(jì)算工作默認(rèn)的調(diào)度器,它與I/O操作無關(guān)败晴。它也是許多RxJava方法的默認(rèn)調(diào)度器: buffer() , debounce() , delay() , interval() , sample() , skip()浓冒。

Schedulers.immediate()

這個(gè)調(diào)度器允許你立即在當(dāng)前線程執(zhí)行你指定的工作。它是 timeout() , timeInterval() ,以及 timestamp() 方法默認(rèn)的調(diào)度器尖坤。

Schedulers.newThread()

這個(gè)調(diào)度器正如它所看起來的那樣:它為指定任務(wù)啟動(dòng)一個(gè)新的線程稳懒。

Schedulers.trampoline()

當(dāng)我們想在當(dāng)前線程執(zhí)行一個(gè)任務(wù)時(shí),并不是立即慢味,我們可以用 .trampoline() 將它入隊(duì)场梆。這個(gè)調(diào)度器將會(huì)處理它的隊(duì)列并且按序運(yùn)行隊(duì)列中每一個(gè)任務(wù)。它是 repeat() 和 retry() 方法默認(rèn)的調(diào)度器纯路。

非阻塞I/O操作

使用 Schedulers.io() 創(chuàng)建非阻塞的版本:

public static void storeBitmap(Context context, Bitmap bitmap, String filename) {
    Schedulers.io().createWorker().schedule(() -> {
        blockingStoreBitmap(context, bitmap, filename);
    });
}

SubscribeOn and ObserveOn

我們學(xué)到了如何在一個(gè)調(diào)度器上運(yùn)行一個(gè)任務(wù)或油。但是我們?nèi)绾卫盟鼇砗蚈bservables一起工作呢?RxJava提供了 subscribeOn() 方法來用于每個(gè)Observable對(duì)象驰唬。 subscribeOn() 方法用 Scheduler 來作為參數(shù)并在這個(gè)Scheduler上執(zhí)行Observable調(diào)用顶岸。

首先,我們需要一個(gè)新的 getApps() 方法來檢索已安裝的應(yīng)用列表:

private Observable<AppInfo> getApps() {
    return Observable.create(subscriber -> {
        List<AppInfo> apps = new ArrayList<>();
        SharedPreferences sharedPref = getActivity().getPreferences(Context.MODE_PRIVATE);
        Type appInfoType = new TypeToken<List<AppInfo>>(){}.getType();
        String serializedApps = sharedPref.getString("APPS", "");
        if (!"".equals(serializedApps)) {
            apps = new Gson().fromJson(serializedApps,appInfoType);
        }
        for (AppInfo app : apps) {
            subscriber.onNext(app);
        }
        subscriber.onCompleted();
    });
}

然后叫编,我們所需要做的是指定 getApps() 需要在調(diào)度器上執(zhí)行:

getApps().subscribeOn(Schedulers.io())
    .subscribe(new Observer<AppInfo>() { [...]

最后辖佣,我們只需在 loadList() 函數(shù)添加幾行代碼,那么每一項(xiàng)就都準(zhǔn)備好了:

getApps()
    .onBackpressureBuffer()
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Observer<AppInfo>() { [...]

observeOn() 方法將會(huì)在指定的調(diào)度器上返回結(jié)果:如例子中的UI線程搓逾。 onBackpressureBuffer() 方法將告訴Observable發(fā)射的數(shù)據(jù)如果比觀察者消費(fèi)的數(shù)據(jù)要更快的話卷谈,它必須把它們存儲(chǔ)在緩存中并提供一個(gè)合適的時(shí)間給它們。

處理耗時(shí)的任務(wù)

一個(gè)與I/O無關(guān)的耗時(shí)的任務(wù):

getObservableApps(apps)
    .onBackpressureBuffer()
    .subscribeOn(Schedulers.computation())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Observer<AppInfo>() { [...]

總結(jié)

RxJava提供了一種以面向時(shí)序的方式考慮數(shù)據(jù)的機(jī)會(huì):所有事情都是持續(xù)變化的霞篡,數(shù)據(jù)在更新世蔗,事件在觸發(fā),然后你就可以創(chuàng)建事件響應(yīng)式的寇损、靈活的凸郑、運(yùn)行流暢的App。

謹(jǐn)記可觀測序列就像一條河:它們是流動(dòng)的矛市。你可以“過濾”(filter)一條河芙沥,你可以“轉(zhuǎn)換”(transform)一條河,你可以將兩條河合并(combine)成一個(gè),然后依然暢流如初而昨。最后救氯,它就成了你想要的那條河。

“Be Water歌憨,my friend” - Bruce Lee

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末着憨,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子务嫡,更是在濱河造成了極大的恐慌甲抖,老刑警劉巖,帶你破解...
    沈念sama閱讀 218,682評(píng)論 6 507
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件心铃,死亡現(xiàn)場離奇詭異准谚,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)去扣,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,277評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門柱衔,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人愉棱,你說我怎么就攤上這事唆铐。” “怎么了奔滑?”我有些...
    開封第一講書人閱讀 165,083評(píng)論 0 355
  • 文/不壞的土叔 我叫張陵艾岂,是天一觀的道長。 經(jīng)常有香客問我朋其,道長澳盐,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,763評(píng)論 1 295
  • 正文 為了忘掉前任令宿,我火速辦了婚禮,結(jié)果婚禮上腕窥,老公的妹妹穿的比我還像新娘粒没。我一直安慰自己,他們只是感情好簇爆,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,785評(píng)論 6 392
  • 文/花漫 我一把揭開白布癞松。 她就那樣靜靜地躺著,像睡著了一般入蛆。 火紅的嫁衣襯著肌膚如雪响蓉。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,624評(píng)論 1 305
  • 那天哨毁,我揣著相機(jī)與錄音枫甲,去河邊找鬼。 笑死,一個(gè)胖子當(dāng)著我的面吹牛想幻,可吹牛的內(nèi)容都是我干的粱栖。 我是一名探鬼主播,決...
    沈念sama閱讀 40,358評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼脏毯,長吁一口氣:“原來是場噩夢啊……” “哼闹究!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起食店,我...
    開封第一講書人閱讀 39,261評(píng)論 0 276
  • 序言:老撾萬榮一對(duì)情侶失蹤渣淤,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后吉嫩,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體价认,經(jīng)...
    沈念sama閱讀 45,722評(píng)論 1 315
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,900評(píng)論 3 336
  • 正文 我和宋清朗相戀三年率挣,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了刻伊。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,030評(píng)論 1 350
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡椒功,死狀恐怖捶箱,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情动漾,我是刑警寧澤丁屎,帶...
    沈念sama閱讀 35,737評(píng)論 5 346
  • 正文 年R本政府宣布,位于F島的核電站旱眯,受9級(jí)特大地震影響晨川,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜删豺,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,360評(píng)論 3 330
  • 文/蒙蒙 一共虑、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧呀页,春花似錦妈拌、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,941評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至丸氛,卻和暖如春培愁,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背缓窜。 一陣腳步聲響...
    開封第一講書人閱讀 33,057評(píng)論 1 270
  • 我被黑心中介騙來泰國打工定续, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留谍咆,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 48,237評(píng)論 3 371
  • 正文 我出身青樓香罐,卻偏偏與公主長得像卧波,于是被迫代替她去往敵國和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子庇茫,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,976評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容