響應(yīng)式編程簡介
- 響應(yīng)式編程是一種基于異步數(shù)據(jù)流概念的編程模式。數(shù)據(jù)流就像一條河:它可以被觀測袁勺,被過濾席怪,被操作,或者為新的消費(fèi)者與另外一條流合并為一條新的流蛀序。
- 響應(yīng)式編程的一個(gè)關(guān)鍵概念是事件欢瞪。事件可以被等待,可以觸發(fā)過程哼拔,也可以觸發(fā)其它事件引有。事件是唯一的以合適的方式將我們的現(xiàn)實(shí)世界映射到我們的軟件中:如果屋里太熱了我們就打開一扇窗戶。同樣的倦逐,當(dāng)我們更改電子表(變化的傳播)中的一些數(shù)值時(shí),我們需要更新整個(gè)表格或者我們的機(jī)器人碰到墻時(shí)會(huì)轉(zhuǎn)彎(響應(yīng)事件)。
- 今天檬姥,響應(yīng)式編程最通用的一個(gè)場景是UI:我們的移動(dòng)App必須做出對(duì)網(wǎng)絡(luò)調(diào)用曾我、用戶觸摸輸入和系統(tǒng)彈框的響應(yīng)。在這個(gè)世界上健民,軟件之所以是事件驅(qū)動(dòng)并響應(yīng)的是因?yàn)楝F(xiàn)實(shí)生活也是如此抒巢。
響應(yīng)式編程的具體實(shí)現(xiàn) - RxJava
基本概念
RxJava的四種角色
- Observable
- Observer
- Subscriber
- Subject
Observable和Subject是兩個(gè)“生產(chǎn)”實(shí)體,Observer和Subscriber是兩個(gè)“消費(fèi)”實(shí)體秉犹。
熱Observable和冷Observable
從發(fā)射物的角度來看蛉谜,有兩種不同的Observable:熱的和冷的。一個(gè)"熱"的Observable典型的只要一創(chuàng)建完就開始發(fā)射數(shù)據(jù)崇堵,因此所有后續(xù)訂閱它的觀察者可能從序列中間的某個(gè)位置開始接受數(shù)據(jù)(有一些數(shù)據(jù)錯(cuò)過了)型诚。一個(gè)"冷"的Observable會(huì)一直等待,直到有觀察者訂閱它才開始發(fā)射數(shù)據(jù)鸳劳,因此這個(gè)觀察者可以確保會(huì)收到整個(gè)數(shù)據(jù)序列狰贯。
Observable創(chuàng)建符
- Observable.create()
Observable.create(new Observable.OnSubscribe<Object>(){
@Override
public void call(Subscriber<? super Object> subscriber{
}
});
-
Observable.from()
from() 創(chuàng)建符可以從一個(gè)列表/數(shù)組來創(chuàng)建Observable,并一個(gè)接一個(gè)的從列表/數(shù)組中發(fā)射出來每一個(gè)對(duì)象,或者也可以從Java Future 類來創(chuàng)建Observable赏廓,并發(fā)射Future對(duì)象的 .get() 方法返回的結(jié)果值涵紊。傳入 Future 作為參數(shù)時(shí),我們可以指定一個(gè)超時(shí)的值幔摸。Observable將等待來自 Future 的結(jié)果摸柄;如果在超時(shí)之前仍然沒有結(jié)果返回,Observable將會(huì)觸發(fā) onError() 方法通知觀察者有錯(cuò)誤發(fā)生了既忆。List<Integer> items = new ArrayList<Integer>(); items.add(1); items.add(10); items.add(100); items.add(200); Observable<Integer> observableString = Observable.from(items); Subscription subscriptionPrint = observableString.subscribe(new Observer<Integer>() { @Override public void onCompleted() { System.out.println("Observable completed"); } @Override public void onError(Throwable e) { System.out.println("Oh,no! Something wrong happened驱负!"); } @Override public void onNext(Integer item) { System.out.println("Item is " + item); } });
Observable.just()
just() 方法可以傳入一到九個(gè)參數(shù),它們會(huì)按照傳入的參數(shù)的順序來發(fā)射它們尿贫。 just() 方法也可以接受列表或數(shù)組电媳,就像 from() 方法,但是它不會(huì)迭代列表發(fā)射每個(gè)值,它將會(huì)發(fā)射整個(gè)列表庆亡。通常匾乓,當(dāng)我們想發(fā)射一組已經(jīng)定義好的值時(shí)會(huì)用到它。但是如果我們的函數(shù)不是時(shí)變性的又谋,我們可以用just來創(chuàng)建一個(gè)更有組織性和可測性的代碼庫拼缝。
Observable<String> observableString = Observable.just(helloWorld
());
Subscription subscriptionPrint = observableString.subscribe(new
Observer<String>() {
@Override
public void onCompleted() {
System.out.println("Observable completed");
}
@Override
public void onError(Throwable e) {
System.out.println("Oh,no! Something wrong happened!");
}
@Override
public void onNext(String message) {
System.out.println(message);
}
});
helloWorld() 方法比較簡單,像這樣:
private String helloWorld(){
return "Hello World";
}
Subject
Subject 既可以是 Observable彰亥,也可以是 Observer咧七。
RxJava 提供四種不同的 Subject :
- PublishSubject
- BehaviorSubject
BehaviorSubject會(huì)首先向他的訂閱者發(fā)送截至訂閱前最新的一個(gè)數(shù)據(jù)對(duì)象(或初始值),然后正常發(fā)送訂閱后的數(shù)據(jù)流。
BehaviorSubject<Integer> behaviorSubject = BehaviorSubject.create(1);
```
在這個(gè)短例子中任斋,我們創(chuàng)建了一個(gè)能發(fā)射整形(Integer)的BehaviorSubject继阻。由于每當(dāng)Observes訂閱它時(shí)就會(huì)發(fā)射最新的數(shù)據(jù),所以它需要一個(gè)初始值。
-
ReplaySubject
ReplaySubject 會(huì)緩存它所訂閱的所有數(shù)據(jù),向任意一個(gè)訂閱它的觀察者重發(fā):
ReplaySubject<Integer> replaySubject = ReplaySubject.create();
```
-
AsyncSubject
當(dāng)Observable完成時(shí)AsyncSubject只會(huì)發(fā)布最后一個(gè)數(shù)據(jù)給已經(jīng)訂閱的每一個(gè)觀察者瘟檩。
AsyncSubject<Integer> asyncSubject = AsyncSubject.create();
直接創(chuàng)建 Observable
在我們的第一個(gè)列子里抹缕,我們將檢索安裝的應(yīng)用列表并填充RecycleView的item來展示它們。我們也設(shè)想一個(gè)下拉刷新的功能和一個(gè)進(jìn)度條來告知用戶當(dāng)前任務(wù)正在執(zhí)行墨辛。
首先卓研,我們創(chuàng)建Observable。我們需要一個(gè)函數(shù)來檢索安裝的應(yīng)用程序列表并把它提供給我們的觀察者睹簇。我們一個(gè)接一個(gè)的發(fā)射這些應(yīng)用程序數(shù)據(jù)奏赘,將它們分組到一個(gè)單獨(dú)的列表中,以此來展示響應(yīng)式方法的靈活性太惠。
private Observable<AppInfo> getApps(){
return Observable.create(subscriber -> {
List<AppInfoRich> apps = new ArrayList<AppInfoRich>();
final Intent mainIntent = new Intent(Intent.ACTION_MAIN, null);
mainIntent.addCategory(Intent.CATEGORY_LAUNCHER);
List<ResolveInfo> infos = getActivity().queryIntentActivities(mainIntent, 0);
for(ResolveInfo info : infos){
apps.add(new AppInfoRich(getActivity(),info));
}
for (AppInfoRich appInfo:apps) {
Bitmap icon = Utils.drawableToBitmap(appInfo.getIcon());
String name = appInfo.getName();
String iconPath = mFilesDir + "/" + name;
Utils.storeBitmap(App.instance, icon,name);
if (subscriber.isUnsubscribed()){
return;
}
subscriber.onNext(new AppInfo(name, iconPath, appInfo.getLastUpdateTime()));
}
if (!subscriber.isUnsubscribed()){
subscriber.onCompleted();
}
});
}
AppInfo為App信息的實(shí)體類磨淌,包括上次更新時(shí)間、圖標(biāo)垛叨、名字三個(gè)屬性伦糯,此處省略。
需要重點(diǎn)注意的是在發(fā)射新的數(shù)據(jù)或者完成序列之前要檢測觀察者的訂閱情況嗽元。這樣的話代碼會(huì)更高效敛纲,因?yàn)槿绻麤]有觀察者等待時(shí)我們就不生成沒有必要的數(shù)據(jù)項(xiàng)。
接下來剂癌,我們來定義下拉刷新的方法:
private void refreshTheList() {
getApps().toSortedList()
.subscribe(new Observer<List<AppInfo>>() {
@Override
public void onCompleted() {
Toast.makeText(getActivity(), "Here is the list!", Toast.LENGTH_LONG).show();
}
@Override
public void onError(Throwable e) {
Toast.makeText(getActivity(), "Something went wrong!", Toast.LENGTH_SHORT).show();
mSwipeRefreshLayout.setRefreshing(false);
}
@Override
public void onNext(List<AppInfo> appInfos) {
mRecyclerView.setVisibility(View.VISIBLE);
mAdapter.addApplications(appInfos);
mSwipeRefreshLayout.setRefreshing(false);
}
});
}
從列表創(chuàng)建 Observable
在這個(gè)例子中淤翔,我們將引入 from() 函數(shù)。使用這個(gè)特殊的“創(chuàng)建”函數(shù)佩谷,我們可以從一個(gè)列表中創(chuàng)建一個(gè)Observable旁壮。Observable將發(fā)射出列表中的每一個(gè)元素,我們可以通過訂閱它們來對(duì)這些發(fā)出的元素做出響應(yīng)谐檀。
private void loadList(List<AppInfo> apps) {
mRecyclerView.setVisibility(View.VISIBLE);
Observable.from(apps).subscribe(new Observer<AppInfo>() {
@Override
public void onCompleted() {
mSwipeRefreshLayout.setRefreshing(false);
Toast.makeText(getActivity(), "Here is the list!", Toast.LENGTH_LONG).show();
}
@Override
public void onError(Throwable e) {
Toast.makeText(getActivity(), "Something went wrong!", Toast.LENGTH_SHORT).show();
mSwipeRefreshLayout.setRefreshing(false);
}
@Override
public void onNext(AppInfo appInfo) {
mAddedApps.add(appInfo);
mAdapter.addApplication(mAddedApps.size() - 1, appInfo);
}
});
}
和第一個(gè)例子一個(gè)主要的不同是我們在 onCompleted() 函數(shù)中停掉進(jìn)度條是因?yàn)槲覀円粋€(gè)一個(gè)的發(fā)射元素抡谐;
第一個(gè)例子中的Observable發(fā)射的是整個(gè)list,因此在 onNext() 函數(shù)中停掉進(jìn)度條的做法是安全的。
具有特殊功能的創(chuàng)建符
-
just()
你可以將一個(gè)函數(shù)作為參數(shù)傳給 just() 方法桐猬,你將會(huì)得到一個(gè)已存在代碼的原始Observable版本麦撵。在一個(gè)新的響應(yīng)式架構(gòu)的基礎(chǔ)上遷移已存在的代碼,這個(gè)方法可能是一個(gè)有用的開始點(diǎn)溃肪。
-
repeat()
假如你想對(duì)一個(gè)Observable重復(fù)發(fā)射三次數(shù)據(jù) :
Observable.just(appOne,appTwo,appThree) .repeat(3) .subscribe();
我們在 just() 創(chuàng)建Observable后追加了 repeat(3) 免胃,它將會(huì)創(chuàng)建9個(gè)元素的序列,每一個(gè)都單獨(dú)發(fā)射惫撰。
-
defer()
有這樣一個(gè)場景羔沙,你想在這聲明一個(gè)Observable但是你又想推遲這個(gè)Observable的創(chuàng)建直到觀察者訂閱時(shí)〕辏看下面的 getInt() 函數(shù):
private Observable<Integer> getInt(){ return Observable.create(subscriber -> { if(subscriber.isUnsubscribed()){ return; } App.L.debug("GETINT"); subscriber.onNext(42); subscriber.onCompleted(); }); }
這比較簡單扼雏,并且它沒有做太多事情坚嗜,但是它正好為我們服務(wù)。現(xiàn)在呢蛤,我們可以創(chuàng)建一個(gè)新的Observable并且應(yīng)用 defer() :
Observable<Integer> deferred = Observable.defer(this::getInt);
這次惶傻, deferred 存在棍郎,但是 getInt() create() 方法還沒有調(diào)用 : logcat日志也沒有“GETINT”打印出來 :
deferred.subscribe(number -> { App.L.debug(String.valueOf(number)); });
但是一旦我們訂閱了其障, create() 方法就會(huì)被調(diào)用并且我們也可以在logcat日志中打印出兩個(gè)值:GETINT 和 42。
-
range()
從一個(gè)指定的數(shù)字X開始發(fā)射N個(gè)數(shù)字涂佃。range() 函數(shù)用兩個(gè)數(shù)字作為參數(shù):第一個(gè)是起始點(diǎn)励翼,第二個(gè)是我們想發(fā)射數(shù)字的個(gè)數(shù)。
-
interval()
interval() 函數(shù)在你需要?jiǎng)?chuàng)建一個(gè)輪詢程序時(shí)非常好用辜荠。interval() 函數(shù)的兩個(gè)參數(shù):一個(gè)指定兩次發(fā)射的時(shí)間間隔汽抚,另一個(gè)是用到的時(shí)間單位。
-
timer()
如果你需要一個(gè)一段時(shí)間之后才發(fā)射的Observable伯病,你可以使用 timer()造烁。
過濾Observables
過濾序列
RxJava讓我們使用 filter() 方法來過濾我們觀測序列中不想要的值。
我們從發(fā)出的每個(gè)元素中過濾掉開頭字母不是C的 :
.filter(new Func1<AppInfo,Boolean>(){
@Override
public Boolean call(AppInfo appInfo){
return appInfo.getName().startsWith("C");
}
})
我們傳一個(gè)新的 Func1 對(duì)象給 filter() 函數(shù)午笛,即只有一個(gè)參數(shù)的函數(shù)惭蟋。 Func1 有一個(gè) AppInfo 對(duì)象來作為它的參數(shù)類型并且返回 Boolean 對(duì)象。只要條件符合 filter() 函數(shù)就會(huì)返回 true 药磺。此時(shí)告组,值會(huì)發(fā)射出去并且所有的觀察者都會(huì)接收到。
filter() 函數(shù)最常用的用法之一時(shí)過濾 null 對(duì)象:
.filter(new Func1<AppInfo,Boolean>(){
@Override
public Boolean call(AppInfo appInfo){
return appInfo != null;
}
})
它幫我們免去了在 onNext() 函數(shù)調(diào)用中再去檢測 null 值癌佩,讓我們把注意力集中在應(yīng)用業(yè)務(wù)邏輯上木缝。
獲取我們需要的數(shù)據(jù)
當(dāng)我們不需要整個(gè)序列時(shí),而是只想取開頭或結(jié)尾的幾個(gè)元素围辙,我們可以用 take() 或 takeLast() 我碟。
-
take()
take() 函數(shù)用整數(shù)N來作為一個(gè)參數(shù),從原始的序列中發(fā)射前N個(gè)元素姚建,然后完成:
Observable.from(apps) .take(3) .subscribe(...);
-
takeLast()
如果我們想要最后N個(gè)元素矫俺,我們只需使用 takeLast() 函數(shù):
Observable.from(apps) .takeLast(3) .subscribe(...);
有且僅有一次
-
distinct()
就像 takeLast() 一樣, distinct() 作用于一個(gè)完整的序列桥胞,然后得到重復(fù)的過濾項(xiàng)恳守,它需要記錄每一個(gè)發(fā)射的值。如果你在處理一大堆序列或者大的數(shù)據(jù)記得關(guān)注內(nèi)存使用情況贩虾。
Observable<AppInfo> fullOfDuplicates = Observable.from(apps) .take(3) .repeat(3); fullOfDuplicates.distinct() .subscribe(...);
-
ditinctUntilChanged()
如果在一個(gè)可觀測序列發(fā)射一個(gè)不同于之前的一個(gè)新值時(shí)讓我們得到通知這時(shí)候該怎么做催烘?ditinctUntilChanged() 過濾函數(shù)能做到這一點(diǎn)。它能輕易的忽略掉所有的重復(fù)并且只發(fā)射出新的值缎罢。
First and last
first() 方法和 last() 方法很容易弄明白伊群。它們從Observable中只發(fā)射第一個(gè)元素或者最后一個(gè)元素考杉。這兩個(gè)都可以傳 Func1 作為參數(shù)。
與 first() 和 last() 相似的變量有: firstOrDefault() 和 lastOrDefault() 舰始。這兩個(gè)函數(shù)當(dāng)可觀測序列完成時(shí)不再發(fā)射任何值時(shí)用得上崇棠。在這種場景下,如果Observable不再發(fā)射任何值時(shí)我們可以指定發(fā)射一個(gè)默認(rèn)的值丸卷。
Skip and SkipLast
skip() 和 skipLast() 函數(shù)與 take() 和 takeLast() 相對(duì)應(yīng)枕稀。它們用整數(shù)N作參數(shù),從本質(zhì)上來說谜嫉,它們不讓Observable發(fā)射前N個(gè)或者后N個(gè)值萎坷。
ElementAt
如果我們只想要可觀測序列發(fā)射的第五個(gè)元素該怎么辦? elementAt() 函數(shù)僅從一個(gè)序列中發(fā)射第n個(gè)元素然后就完成了沐兰。
如果我們想查找第五個(gè)元素但是可觀測序列只有三個(gè)元素可供發(fā)射時(shí)該怎么辦哆档?我們可以使用 elementAtOrDefault() 。
Sampling
在Observable后面加一個(gè) sample() 住闯,我們將創(chuàng)建一個(gè)新的可觀測序列瓜浸,它將在一個(gè)指定的時(shí)間間隔里由Observable發(fā)射最近一次的數(shù)值:
Observable<Integer> sensor = [...]
sensor.sample(30,TimeUnit.SECONDS)
.subscribe(...);
如果我們想讓它定時(shí)發(fā)射第一個(gè)元素而不是最近的一個(gè)元素,我們可以使用 throttleFirst() 比原。
Timeout
我們可以使用 timeout() 函數(shù)來監(jiān)聽源可觀測序列,就是在我們設(shè)定的時(shí)間間隔內(nèi)如果沒有得到一個(gè)值則發(fā)射一個(gè)錯(cuò)誤插佛。我們可以認(rèn)為 timeout() 為一個(gè)Observable的限時(shí)的副本。如果在指定的時(shí)間間隔內(nèi)Observable不發(fā)射值的話春寿,它監(jiān)聽的原始的Observable時(shí)就會(huì)觸發(fā) onError() 函數(shù)朗涩。
Subscription subscription = getCurrentTemperature()
.timeout(2,TimeUnit.SECONDS)
.subscribe(...);
Debounce
debounce() 函數(shù)過濾掉由Observable發(fā)射的速率過快的數(shù)據(jù);如果在一個(gè)指定的時(shí)間間隔過去了仍舊沒有發(fā)射一個(gè)绑改,那么它將發(fā)射最后的那個(gè)谢床。
下圖展示了多久從Observable發(fā)射一次新的數(shù)據(jù), debounce() 函數(shù)開啟一個(gè)內(nèi)部定時(shí)器厘线,如果在這個(gè)時(shí)間間隔內(nèi)沒有新的據(jù)發(fā)射识腿,則新的Observable發(fā)射出最后一個(gè)數(shù)據(jù):
變換Observables
*map家族
RxJava提供了幾個(gè)mapping函數(shù): map() , flatMap() , concatMap() , flatMapIterable() 以及 switchMap() .所有這些函數(shù)都作用于一個(gè)可觀測序列,然后變換它發(fā)射的值造壮,最后用一種新的形式返回它們渡讼。
-
Map
RxJava的 map 函數(shù)接收一個(gè)指定的 Func 對(duì)象然后將它應(yīng)用到每一個(gè)由Observable發(fā)射的值上。
Observable.from(apps) .map(new Func1<AppInfo,AppInfo>(){ @Override public Appinfo call(AppInfo appInfo){ String currentName = appInfo.getName(); String lowerCaseName = currentName.toLowerCase(); appInfo.setName(lowerCaseName); return appInfo; } }) .subscribe(...);
正如你看到的耳璧,像往常一樣創(chuàng)建我們發(fā)射的Observable之后成箫,我們追加一個(gè) map 調(diào)用,我們創(chuàng)建一個(gè)簡單的函數(shù)來更新 AppInfo對(duì)象并提供一個(gè)名字小寫的新版本給觀察者旨枯。
-
FlatMap
在復(fù)雜的場景中蹬昌,我們有一個(gè)這樣的Observable:它發(fā)射一個(gè)數(shù)據(jù)序列,這些數(shù)據(jù)本身也可以發(fā)射Observable攀隔。RxJava的 flatMap() 函數(shù)提供一種鋪平序列的方式皂贩,然后合并這些Observables發(fā)射的數(shù)據(jù)栖榨,最后將合并后的結(jié)果作為最終的Observable。
flatMap() 函數(shù)示意圖當(dāng)我們在處理可能有大量的Observables時(shí)明刷,重要是記住任何一個(gè)Observables發(fā)生錯(cuò)誤的情況婴栽, flatMap() 將會(huì)觸發(fā)它自己的 onError() 函數(shù)并放棄整個(gè)鏈。重要的一點(diǎn)提示是關(guān)于合并部分:它允許交叉辈末。正如上圖所示愚争,這意味著 flatMap() 不能夠保證在最終生成的Observable中源Observables確切的發(fā)射順序。
-
ConcatMap
RxJava的 concatMap() 函數(shù)解決了 flatMap() 的交叉問題本冲,提供了一種能夠把發(fā)射的值連續(xù)在一起的鋪平函數(shù)准脂,而不是合并它們,如下圖所示:
這里寫圖片描述 -
FlatMapIterable
作為*map家族的一員檬洞, flatMapInterable() 和 flatMap() 很像。僅有的本質(zhì)不同是它將源數(shù)據(jù)兩兩結(jié)成對(duì)并生成Iterable沟饥,而不是原始數(shù)據(jù)項(xiàng)和生成的Observables添怔。
-
SwitchMap
switchMap() 和 flatMap() 很像,除了一點(diǎn):每當(dāng)源Observable發(fā)射一個(gè)新的數(shù)據(jù)項(xiàng)(Observable)時(shí)贤旷,它將取消訂閱并停止監(jiān)視之前那個(gè)數(shù)據(jù)項(xiàng)產(chǎn)生的Observable广料,并開始監(jiān)視當(dāng)前發(fā)射的這一個(gè)。
-
Scan
RxJava的 scan() 函數(shù)可以看做是一個(gè)累積函數(shù)幼驶。 scan() 函數(shù)對(duì)原始Observable發(fā)射的每一項(xiàng)數(shù)據(jù)都應(yīng)用一個(gè)函數(shù)艾杏,計(jì)算出函數(shù)的結(jié)果值,并將該值填充回可觀測序列盅藻,等待和下一次發(fā)射的數(shù)據(jù)一起使用购桑。
作為一個(gè)通用的例子,給出一個(gè)累加器:
Observable.just(1,2,3,4,5) .scan((sum,item) -> sum + item) .subscribe(new Subscriber<Integer>() { @Override public void onCompleted() { Log.d("RXJAVA", "Sequence completed."); } @Override public void onError(Throwable e) { Log.e("RXJAVA", "Something went south!"); } @Override public void onNext(Integer item) { Log.d("RXJAVA", "item is: " + item); } });
我們得到的結(jié)果是:
RXJAVA: item is: 1
RXJAVA: item is: 3
RXJAVA: item is: 6
RXJAVA: item is: 10
RXJAVA: item is: 15
RXJAVA: Sequence completed.
GroupBy
RxJava提供了一個(gè)有用的函數(shù)從列表中按照指定的規(guī)則: groupBy() 來分組元素氏淑。下圖中的例子展示了 groupBy() 如何將發(fā)射的值根據(jù)他們的形狀來進(jìn)行分組勃蜘。
這個(gè)函數(shù)將源Observable變換成一個(gè)發(fā)射Observables的新的Observable。它們中的每一個(gè)新的Observable都發(fā)射一組指定的數(shù)據(jù)假残。
為了創(chuàng)建一個(gè)分組了的已安裝應(yīng)用列表缭贡,我們在 loadList() 函數(shù)中引入了一個(gè)新的元素:
Observable<GroupedObservable<String,AppInfo>> groupedItems = Observable.from(apps)
.groupBy(new Func1<AppInfo,String>(){
@Override
public String call(AppInfo appInfo){
SimpleDateFormat formatter = new SimpleDateFormat("MM/yyyy");
return formatter.format(new Date(appInfo.getLastUpdateTime()));
}
});
現(xiàn)在我們創(chuàng)建了一個(gè)新的Observable, groupedItems 辉懒,它將會(huì)發(fā)射一個(gè)帶有 GroupedObservable 的序列阳惹。 GroupedObservable 是一個(gè)特殊的Observable,它源自一個(gè)分組的key眶俩。在這個(gè)例子中莹汤,key就是 String ,代表的意思是 Month/Year 格式化的最近更新日期仿便。
Buffer
RxJava中的 buffer() 函數(shù)將源Observable變換一個(gè)新的Observable体啰,這個(gè)新的Observable每次發(fā)射一組列表值而不是一個(gè)一個(gè)發(fā)射攒巍。
buffer() 函數(shù)有幾種變體。其中有一個(gè)是允許你指定一個(gè) skip 值:此后每 skip 項(xiàng)數(shù)據(jù)荒勇,用count項(xiàng)數(shù)據(jù)填充緩沖區(qū)柒莉。另一個(gè)是buffer() 帶一個(gè) timespan 的參數(shù),會(huì)創(chuàng)建一個(gè)每隔timespan時(shí)間段就會(huì)發(fā)射一個(gè)列表的Observable沽翔。
Window
RxJava的 window() 函數(shù)和 buffer() 很像兢孝,但是它發(fā)射的是Observable而不是列表。
正如 buffer() 一樣, window() 也有一個(gè) skip 變體仅偎。
Cast
cast() 函數(shù)是 map() 操作符的特殊版本跨蟹。它將源Observable中的每一項(xiàng)數(shù)據(jù)都轉(zhuǎn)換為新的類型,把它變成了不同的 Class 橘沥。
組合Observables
Merge
在”異步的世界“中經(jīng)常會(huì)創(chuàng)建這樣的場景窗轩,我們有多個(gè)來源但是又只想有一個(gè)結(jié)果:多輸入,單輸出座咆。RxJava的 merge() 方法將幫助你把兩個(gè)甚至更多的Observables合并到他們發(fā)射的數(shù)據(jù)項(xiàng)里痢艺。下圖給出了把兩個(gè)序列合并在一個(gè)最終發(fā)射的Observable。
正如你看到的那樣介陶,發(fā)射的數(shù)據(jù)被交叉合并到一個(gè)Observable里面堤舒。注意如果你同步的合并Observable,它們將連接在一起并且不會(huì)交叉哺呜。
Observable<AppInfo> mergedObserbable = Observable.merge(observableApps,observableReversedApps);
mergedObserbable.subscribe(...);
注意錯(cuò)誤時(shí)的toast消息舌缤,你可以認(rèn)為每個(gè)Observable拋出的錯(cuò)誤都將會(huì)打斷合并。如果你需要避免這種情況某残,RxJava提供了 mergeDelayError() 国撵,它能從一個(gè)Observable中繼續(xù)發(fā)射數(shù)據(jù)即便是其中有一個(gè)拋出了錯(cuò)誤。當(dāng)所有的Observables都完成時(shí)驾锰, mergeDelayError() 將會(huì)發(fā)射 onError()卸留。
ZIP
在一種新的可能場景中處理多個(gè)數(shù)據(jù)來源時(shí)會(huì)帶來:多從個(gè)Observables接收數(shù)據(jù),處理它們椭豫,然后將它們合并成一個(gè)新的可觀測序列來使用耻瑟。RxJava有一個(gè)特殊的方法可以完成: zip() 合并兩個(gè)或者多個(gè)Observables發(fā)射出的數(shù)據(jù)項(xiàng),根據(jù)指定的函數(shù)Func* 變換它們赏酥,并發(fā)射一個(gè)新值喳整。下圖展示了 zip() 方法如何處理發(fā)射的“numbers”和“l(fā)etters”然后將它們合并一個(gè)新的數(shù)據(jù)項(xiàng):
Observable.zip(observableApp, tictoc, (AppInfo appInfo, Long time) -> updateTitle(appInfo, time))
.observeOn(AndroidSchedulers.mainThread())
.subscribe(...);
zip() 函數(shù)有三個(gè)參數(shù):兩個(gè)Observables和一個(gè) Func2 。
Join
前面兩個(gè)方法裸扶, zip() 和 merge() 方法作用在發(fā)射數(shù)據(jù)的范疇內(nèi)框都,在決定如何操作值之前有些場景我們需要考慮時(shí)間的。RxJava的 join() 函數(shù)基于時(shí)間窗口將兩個(gè)Observables發(fā)射的數(shù)據(jù)結(jié)合在一起呵晨。
為了正確的理解上一張圖魏保,我們解釋下 join() 需要的參數(shù):
- 第二個(gè)Observable和源Observable結(jié)合熬尺。
- Func1 參數(shù):在指定的由時(shí)間窗口定義時(shí)間間隔內(nèi),源Observable發(fā)射的數(shù)據(jù)和從第二個(gè)Observable發(fā)射的數(shù)據(jù)相互配合返回的Observable谓罗。
- Func1 參數(shù):在指定的由時(shí)間窗口定義時(shí)間間隔內(nèi)粱哼,第二個(gè)Observable發(fā)射的數(shù)據(jù)和從源Observable發(fā)射的數(shù)據(jù)相互配合返回的Observable。
- Func2 參數(shù):定義已發(fā)射的數(shù)據(jù)如何與新發(fā)射的數(shù)據(jù)項(xiàng)相結(jié)合檩咱。
combineLatest
RxJava的 combineLatest() 函數(shù)有點(diǎn)像 zip() 函數(shù)的特殊形式揭措。正如我們已經(jīng)學(xué)習(xí)的, zip() 作用于最近未打包的兩個(gè)Observables刻蚯。相反绊含, combineLatest() 作用于最近發(fā)射的數(shù)據(jù)項(xiàng):如果 Observable1 發(fā)射了A并且 Observable2 發(fā)射了B和C, combineLatest() 將會(huì)分組處理AB和AC炊汹,如下圖所示:
And,Then和When
在將來還有一些 zip() 滿足不了的場景躬充。如復(fù)雜的架構(gòu),或者是僅僅為了個(gè)人愛好兵扬,你可以使用And/Then/When解決方案麻裳。它們在RxJava的joins包下,使用Pattern和Plan作為中介器钟,將發(fā)射的數(shù)據(jù)集合并到一起。
Switch
給出一個(gè)發(fā)射多個(gè)Observables序列的源Observable妙蔗, switch() 訂閱到源Observable然后開始發(fā)射由第一個(gè)發(fā)射的Observable發(fā)射的一樣的數(shù)據(jù)傲霸。當(dāng)源Observable發(fā)射一個(gè)新的Observable時(shí), switch() 立即取消訂閱前一個(gè)發(fā)射數(shù)
據(jù)的Observable(因此打斷了從它那里發(fā)射的數(shù)據(jù)流)然后訂閱一個(gè)新的Observable眉反,并開始發(fā)射它的數(shù)據(jù)昙啄。
StartWith
RxJava的 startWith() 是 concat() 的對(duì)應(yīng)部分。正如 concat() 向發(fā)射數(shù)據(jù)的Observable追加數(shù)據(jù)那樣寸五,在Observable開始發(fā)射他們的數(shù)據(jù)之前梳凛,startWith() 通過傳遞一個(gè)參數(shù)來先發(fā)射一個(gè)數(shù)據(jù)序列。
Schedulers-解決Android主線程問題
Schedulers
調(diào)度器以一種最簡單的方式將多線程用在你的Apps的中梳杏。它們時(shí)RxJava重要的一部分并能很好地與Observables協(xié)同工作韧拒。它們無需處理實(shí)現(xiàn)、同步十性、線程叛溢、平臺(tái)限制、平臺(tái)變化而可以提供一種靈活的方式來創(chuàng)建并發(fā)程序劲适。
RxJava提供了5種調(diào)度器:
- .io()
- .computation()
- .immediate()
- .newThread()
- .trampoline()
Schedulers.io()
這個(gè)調(diào)度器時(shí)用于I/O操作楷掉。它基于根據(jù)需要,增長或縮減來自適應(yīng)的線程池霞势。我們將使用它來修復(fù)我們之前看到的 StrictMode 違規(guī)做法烹植。由于它專用于I/O操作斑鸦,所以并不是RxJava的默認(rèn)方法;正確的使用它是由開發(fā)者決定的草雕。
重點(diǎn)需要注意的是線程池是無限制的巷屿,大量的I/O調(diào)度操作將創(chuàng)建許多個(gè)線程并占用內(nèi)存。一如既往的是促绵,我們需要在性能和簡捷兩者之間找到一個(gè)有效的平衡點(diǎn)攒庵。
Schedulers.computation()
這個(gè)是計(jì)算工作默認(rèn)的調(diào)度器,它與I/O操作無關(guān)败晴。它也是許多RxJava方法的默認(rèn)調(diào)度器: buffer() , debounce() , delay() , interval() , sample() , skip()浓冒。
Schedulers.immediate()
這個(gè)調(diào)度器允許你立即在當(dāng)前線程執(zhí)行你指定的工作。它是 timeout() , timeInterval() ,以及 timestamp() 方法默認(rèn)的調(diào)度器尖坤。
Schedulers.newThread()
這個(gè)調(diào)度器正如它所看起來的那樣:它為指定任務(wù)啟動(dòng)一個(gè)新的線程稳懒。
Schedulers.trampoline()
當(dāng)我們想在當(dāng)前線程執(zhí)行一個(gè)任務(wù)時(shí),并不是立即慢味,我們可以用 .trampoline() 將它入隊(duì)场梆。這個(gè)調(diào)度器將會(huì)處理它的隊(duì)列并且按序運(yùn)行隊(duì)列中每一個(gè)任務(wù)。它是 repeat() 和 retry() 方法默認(rèn)的調(diào)度器纯路。
非阻塞I/O操作
使用 Schedulers.io() 創(chuàng)建非阻塞的版本:
public static void storeBitmap(Context context, Bitmap bitmap, String filename) {
Schedulers.io().createWorker().schedule(() -> {
blockingStoreBitmap(context, bitmap, filename);
});
}
SubscribeOn and ObserveOn
我們學(xué)到了如何在一個(gè)調(diào)度器上運(yùn)行一個(gè)任務(wù)或油。但是我們?nèi)绾卫盟鼇砗蚈bservables一起工作呢?RxJava提供了 subscribeOn() 方法來用于每個(gè)Observable對(duì)象驰唬。 subscribeOn() 方法用 Scheduler 來作為參數(shù)并在這個(gè)Scheduler上執(zhí)行Observable調(diào)用顶岸。
首先,我們需要一個(gè)新的 getApps() 方法來檢索已安裝的應(yīng)用列表:
private Observable<AppInfo> getApps() {
return Observable.create(subscriber -> {
List<AppInfo> apps = new ArrayList<>();
SharedPreferences sharedPref = getActivity().getPreferences(Context.MODE_PRIVATE);
Type appInfoType = new TypeToken<List<AppInfo>>(){}.getType();
String serializedApps = sharedPref.getString("APPS", "");
if (!"".equals(serializedApps)) {
apps = new Gson().fromJson(serializedApps,appInfoType);
}
for (AppInfo app : apps) {
subscriber.onNext(app);
}
subscriber.onCompleted();
});
}
然后叫编,我們所需要做的是指定 getApps() 需要在調(diào)度器上執(zhí)行:
getApps().subscribeOn(Schedulers.io())
.subscribe(new Observer<AppInfo>() { [...]
最后辖佣,我們只需在 loadList() 函數(shù)添加幾行代碼,那么每一項(xiàng)就都準(zhǔn)備好了:
getApps()
.onBackpressureBuffer()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<AppInfo>() { [...]
observeOn() 方法將會(huì)在指定的調(diào)度器上返回結(jié)果:如例子中的UI線程搓逾。 onBackpressureBuffer() 方法將告訴Observable發(fā)射的數(shù)據(jù)如果比觀察者消費(fèi)的數(shù)據(jù)要更快的話卷谈,它必須把它們存儲(chǔ)在緩存中并提供一個(gè)合適的時(shí)間給它們。
處理耗時(shí)的任務(wù)
一個(gè)與I/O無關(guān)的耗時(shí)的任務(wù):
getObservableApps(apps)
.onBackpressureBuffer()
.subscribeOn(Schedulers.computation())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<AppInfo>() { [...]
總結(jié)
RxJava提供了一種以面向時(shí)序的方式考慮數(shù)據(jù)的機(jī)會(huì):所有事情都是持續(xù)變化的霞篡,數(shù)據(jù)在更新世蔗,事件在觸發(fā),然后你就可以創(chuàng)建事件響應(yīng)式的寇损、靈活的凸郑、運(yùn)行流暢的App。
謹(jǐn)記可觀測序列就像一條河:它們是流動(dòng)的矛市。你可以“過濾”(filter)一條河芙沥,你可以“轉(zhuǎn)換”(transform)一條河,你可以將兩條河合并(combine)成一個(gè),然后依然暢流如初而昨。最后救氯,它就成了你想要的那條河。
“Be Water歌憨,my friend” - Bruce Lee