為什么是Observables?
在面向?qū)ο蟮募軜嬛信_發(fā)者致力于創(chuàng)建一組解耦的實體缀旁。這樣的話翰铡,實體就可以在不用妨礙整個系統(tǒng)的情況下可以被測試、復用和維護妨退。設計這種系統(tǒng)就帶來一個棘手的負面影響:維護相關對象之間的統(tǒng)一妇萄。
在Smalltalk MVC架構中,創(chuàng)建模式的第一個例子就是用來解決這個問題的咬荷。用戶界面框架提供一種途徑使UI元素與包含數(shù)據(jù)的實體對象相分離冠句,并且同時,它提供一種靈活的方法來保持它們之間的同步幸乒。
在這本暢銷的四人組編寫的《設計模式——可復用面向?qū)ο筌浖幕A》一書中懦底,觀察者模式是最有名的設計模式之一。它是一種行為模式并提供一種以一對多的依賴來綁定對象的方法:即當一個對象發(fā)生變化時罕扎,依賴它的所有對象都會被通知并且會自動更新聚唐。
在本章中,我們將會對觀察者模式有一個概述腔召,它是如何實現(xiàn)的以及如何用RxJava來擴展杆查,Observable是什么,以及Observables如何與Iterables相關聯(lián)宴咧。
你什么時候使用觀察者模式根灯?
觀察者模式很適合下面這些場景中的任何一個:
- 當你的架構有兩個實體類,一個依賴另一個掺栅,你想讓它們互不影響或者是獨立復用它們時烙肺。
- 當一個變化的對象通知那些與它自身變化相關聯(lián)的未知數(shù)量的對象時。
- 當一個變化的對象通知那些無需推斷具體是誰的對象時氧卧。
RxJava觀察者模式工具包
在RxJava的世界里桃笙,我們有四種角色:
- Observable
- Observer
- Subscriber
- Subjects
Observables和Subjects是兩個“生產(chǎn)”實體,Observers和Subscribers是兩個“消費”實體沙绝。
Observable
當我們異步執(zhí)行一些復雜的事情搏明,Java提供了傳統(tǒng)的類鼠锈,例如Thread、Future星著、FutureTask购笆、CompletableFuture來處理這些問題。當復雜度提升虚循,這些方案就會變得麻煩和難以維護同欠。最糟糕的是,它們都不支持鏈式調(diào)用横缔。
RxJava Observables被設計用來解決這些問題铺遂。它們靈活,且易于使用茎刚,也可以鏈式調(diào)用襟锐,并且可以作用于單個結果程序上,更有甚者膛锭,也可以作用于序列上粮坞。無論何時你想發(fā)射單個標量值,或者一連串值泉沾,甚至是無窮個數(shù)值流捞蚂,你都可以使用Observable。
Observable的生命周期包含了三種可能的易于與Iterable生命周期事件相比較的事件跷究,下表展示了如何將Observable async/push 與 Iterable sync/pull相關聯(lián)起來。
Event | Iterable(pull) | Observable(push) |
---|---|---|
檢索數(shù)據(jù) | T next() |
onNext(T) |
發(fā)現(xiàn)錯誤 | throws Exception |
onError(Throwable) |
完成 | !hasNext() |
onCompleted() |
使用Iterable時敲霍,消費者從生產(chǎn)者那里以同步的方式得到值俊马,在這些值得到之前線程處于阻塞狀態(tài)。相反肩杈,使用Observable時柴我,生產(chǎn)者以異步的方式把值推給觀察者,無論何時扩然,這些值都是可用的艘儒。這種方法之所以更靈活是因為即便值是同步或異步方式到達,消費者在這兩種場景都可以根據(jù)自己的需要來處理夫偶。
為了更好地復用Iterable接口界睁,RxJava Observable類擴展了GOF觀察者模式的語義。引入了兩個新的接口:
- onCompleted() 即通知觀察者Observable沒有更多的數(shù)據(jù)兵拢。
- onError() 即觀察者有錯誤出現(xiàn)了翻斟。
熱Observables和冷Observables
從發(fā)射物的角度來看,有兩種不同的Observables:熱的和冷的说铃。一個"熱"的Observable典型的只要一創(chuàng)建完就開始發(fā)射數(shù)據(jù)访惜,因此所有后續(xù)訂閱它的觀察者可能從序列中間的某個位置開始接受數(shù)據(jù)(有一些數(shù)據(jù)錯過了)嘹履。一個"冷"的Observable會一直等待,直到有觀察者訂閱它才開始發(fā)射數(shù)據(jù)债热,因此這個觀察者可以確保會收到整個數(shù)據(jù)序列砾嫉。
創(chuàng)建一個Observable
在接下來的小節(jié)中將討論Observables提供的兩種創(chuàng)建Observable的方法。
Observable.create()
create()方法使開發(fā)者有能力從頭開始創(chuàng)建一個Observable窒篱。它需要一個OnSubscribe對象,這個對象繼承Action1,當觀察者訂閱我們的Observable時焰枢,它作為一個參數(shù)傳入并執(zhí)行call()函數(shù)。
Observable.create(new Observable.OnSubscribe<Object>(){
@Override
public void call(Subscriber<? super Object> subscriber) {
}
});
Observable通過使用subscriber變量并根據(jù)條件調(diào)用它的方法來和觀察者通信舌剂。讓我們看一個“現(xiàn)實世界”的例子:
Observable<Integer> observableString = Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> observer) {
for (int i = 0; i < 5; i++) {
observer.onNext(i);
}
observer.onCompleted();
}
});
Subscription subscriptionPrint = observableString.subscribe(new Observer<Integer>() {
@Override
public void onCompleted() {
System.out.println("Observable completed");
}
@Override
public void onError(Throwable e) {
System.out.println("Oh,no! Something wrong happened济锄!");
}
@Override
public void onNext(Integer item) {
System.out.println("Item is " + item);
}
});
例子故意寫的簡單,是因為即便是你第一次見到RxJava的操作霍转,我想讓你明白接下來要發(fā)生什么荐绝。
我們創(chuàng)建一個新的Observable<Integer>
,它執(zhí)行了5個元素的for循環(huán),一個接一個的發(fā)射他們避消,最后完成低滩。
另一方面,我們訂閱了Observable岩喷,返回一個Subscription
恕沫。一旦我們訂閱了,我們就開始接受整數(shù)纱意,并一個接一個的打印出它們婶溯。我們并不知道要接受多少整數(shù)。事實上偷霉,我們也無需知道是因為我們?yōu)槊糠N場景都提供對應的處理操作:
- 如果我們接收到了整數(shù)迄委,那么就打印它。
- 如果序列結束类少,我們就打印一個關閉的序列信息叙身。
- 如果錯誤發(fā)生了,我們就打印一個錯誤信息硫狞。
Observable.from()
在上一個例子中信轿,我們創(chuàng)建了一個整數(shù)序列并一個一個的發(fā)射它們。假如我們已經(jīng)有一個列表呢残吩?我們是不是可以不用for循環(huán)而也可以一個接一個的發(fā)射它們呢财忽?
在下面的例子代碼中,我們從一個已有的列表中創(chuàng)建一個Observable序列:
List<Integer> items = new ArrayList<Integer>();
items.add(1);
items.add(10);
items.add(100);
items.add(200);
Observable<Integer> observableString = Observable.from(items);
Subscription subscriptionPrint = observableString.subscribe(new Observer<Integer>() {
@Override
public void onCompleted() {
System.out.println("Observable completed");
}
@Override
public void onError(Throwable e) {
System.out.println("Oh,no! Something wrong happened世剖!");
}
@Override
public void onNext(Integer item) {
System.out.println("Item is " + item);
}
});
輸出的結果和上面的例子絕對是一樣的定罢。
from()
創(chuàng)建符可以從一個列表/數(shù)組來創(chuàng)建Observable,并一個接一個的從列表/數(shù)組中發(fā)射出來每一個對象,或者也可以從Java Future
類來創(chuàng)建Observable旁瘫,并發(fā)射Future對象的.get()
方法返回的結果值祖凫。傳入Future
作為參數(shù)時琼蚯,我們可以指定一個超時的值。Observable將等待來自Future
的結果惠况;如果在超時之前仍然沒有結果返回遭庶,Observable將會觸發(fā)onError()
方法通知觀察者有錯誤發(fā)生了。
Observable.just()
如果我們已經(jīng)有了一個傳統(tǒng)的Java函數(shù)稠屠,我們想把它轉(zhuǎn)變?yōu)橐粋€Observable又改怎么辦呢峦睡?我們可以用create()
方法,正如我們先前看到的权埠,或者我們也可以像下面那樣使用以此來省去許多模板代碼:
Observable<String> observableString = Observable.just(helloWorld());
Subscription subscriptionPrint = observableString.subscribe(new Observer<String>() {
@Override
public void onCompleted() {
System.out.println("Observable completed");
}
@Override
public void onError(Throwable e) {
System.out.println("Oh,no! Something wrong happened!");
}
@Override
public void onNext(String message) {
System.out.println(message);
}
});
helloWorld()
方法比較簡單榨了,像這樣:
private String helloWorld(){
return "Hello World";
}
不管怎樣,它可以是我們想要的任何函數(shù)攘蔽。在剛才的例子中龙屉,我們一旦創(chuàng)建了Observable,just()
執(zhí)行函數(shù)满俗,當我們訂閱Observable時转捕,它就會發(fā)射出返回的值。
just()
方法可以傳入一到九個參數(shù)唆垃,它們會按照傳入的參數(shù)的順序來發(fā)射它們五芝。just()
方法也可以接受列表或數(shù)組,就像from()
方法辕万,但是它不會迭代列表發(fā)射每個值,它將會發(fā)射整個列表枢步。通常,當我們想發(fā)射一組已經(jīng)定義好的值時會用到它蓄坏。但是如果我們的函數(shù)不是時變性的价捧,我們可以用just來創(chuàng)建一個更有組織性和可測性的代碼庫。
最后注意just()
創(chuàng)建符涡戳,它發(fā)射出值后,Observable正常結束脯倚,在上面那個例子中渔彰,我們會在控制臺打印出兩條信息:“Hello World”和“Observable completed”。
Observable.empty(),Observable.never(),和Observable.throw()
當我們需要一個Observable毫無理由的不再發(fā)射數(shù)據(jù)正常結束時推正,我們可以使用empty()
恍涂。我們可以使用never()
創(chuàng)建一個不發(fā)射數(shù)據(jù)并且也永遠不會結束的Observable。我們也可以使用throw()
創(chuàng)建一個不發(fā)射數(shù)據(jù)并且以錯誤結束的Observable植榕。
Subject = Observable + Observer
subject
是一個神奇的對象再沧,它可以是一個Observable同時也可以是一個Observer:它作為連接這兩個世界的一座橋梁。一個Subject可以訂閱一個Observable尊残,就像一個觀察者炒瘸,并且它可以發(fā)射新的數(shù)據(jù)淤堵,或者傳遞它接受到的數(shù)據(jù),就像一個Observable顷扩。很明顯拐邪,作為一個Observable,觀察者們或者其它Subject都可以訂閱它隘截。
一旦Subject訂閱了Observable扎阶,它將會觸發(fā)Observable開始發(fā)射。如果原始的Observable是“冷”的婶芭,這將會對訂閱一個“熱”的Observable變量產(chǎn)生影響东臀。
RxJava提供四種不同的Subject:
- PublishSubject
- BehaviorSubject
- ReplaySubject.
- AsyncSubject
PublishSubject
Publish是Subject的一個基礎子類。讓我們看看用PublishSubject實現(xiàn)傳統(tǒng)的Observable Hello World
:
PublishSubject<String> stringPublishSubject = PublishSubject.create();
Subscription subscriptionPrint = stringPublishSubject.subscribe(new Observer<String>() {
@Override
public void onCompleted() {
System.out.println("Observable completed");
}
@Override
public void onError(Throwable e) {
System.out.println("Oh,no!Something wrong happened!");
}
@Override
public void onNext(String message) {
System.out.println(message);
}
});
stringPublishSubject.onNext("Hello World");
在剛才的例子中犀农,我們創(chuàng)建了一個PublishSubject
惰赋,用create()
方法發(fā)射一個String
值,然后我們訂閱了PublishSubject井赌。此時惋耙,沒有數(shù)據(jù)要發(fā)送网棍,因此我們的觀察者只能等待,沒有阻塞線程,也沒有消耗資源硅堆。就在這隨時準備從subject接收值,如果subject沒有發(fā)射值那么我們的觀察者就會一直在等待记某。再次聲明的是痊夭,無需擔心:觀察者知道在每個場景中該做什么,我們不用擔心什么時候是因為它是響應式的:系統(tǒng)會響應耘子。我們并不關心它什么時候響應果漾。我們只關心它響應時該做什么。
最后一行代碼展示了手動發(fā)射字符串“Hello World”,它觸發(fā)了觀察者的onNext()
方法谷誓,讓我們在控制臺打印出“Hello World”信息绒障。
讓我們看一個更復雜的例子。話說我們有一個private
聲明的Observable捍歪,外部不能訪問户辱。Observable在它生命周期內(nèi)發(fā)射值,我們不用關心這些值糙臼,我們只關心他們的結束庐镐。
首先,我們創(chuàng)建一個新的PublishSubject來響應它的onNext()
方法变逃,并且外部也可以訪問它必逆。
final PublishSubject<Boolean> subject = PublishSubject.create();
subject.subscribe(new Observer<Boolean>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Boolean aBoolean) {
System.out.println("Observable Completed");
}
});
然后,我們創(chuàng)建“私有”的Observable,只有subject才可以訪問的到名眉。
Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
for (int i = 0; i < 5; i++) {
subscriber.onNext(i);
}
subscriber.onCompleted();
}
}).doOnCompleted(new Action0() {
@Override
public void call() {
subject.onNext(true);
}
}).subscribe();
Observable.create()
方法包含了我們熟悉的for循環(huán)粟矿,發(fā)射數(shù)字。doOnCompleted()
方法指定當Observable結束時要做什么事情:在subject上發(fā)射true璧针。最后嚷炉,我們訂閱了Observable。很明顯探橱,空的subscribe()
調(diào)用僅僅是為了開啟Observable申屹,而不用管已發(fā)出的任何值,也不用管完成事件或者錯誤事件隧膏。為了這個例子我們需要它像這樣哗讥。
在這個例子中,我們創(chuàng)建了一個可以連接Observables并且同時可被觀測的實體胞枕。當我們想為公共資源創(chuàng)建獨立杆煞、抽象或更易觀測的點時,這是極其有用的腐泻。
BehaviorSubject
簡單的說决乎,BehaviorSubject會首先向他的訂閱者發(fā)送截至訂閱前最新的一個數(shù)據(jù)對象(或初始值),然后正常發(fā)送訂閱后的數(shù)據(jù)流。
BehaviorSubject<Integer> behaviorSubject = BehaviorSubject.create(1);
在這個短例子中派桩,我們創(chuàng)建了一個能發(fā)射整形(Integer)的BehaviorSubject构诚。由于每當Observes訂閱它時就會發(fā)射最新的數(shù)據(jù),所以它需要一個初始值铆惑。
ReplaySubject
ReplaySubject會緩存它所訂閱的所有數(shù)據(jù),向任意一個訂閱它的觀察者重發(fā):
ReplaySubject<Integer> replaySubject = ReplaySubject.create();
AsyncSubject
當Observable完成時AsyncSubject只會發(fā)布最后一個數(shù)據(jù)給已經(jīng)訂閱的每一個觀察者范嘱。
AsyncSubject<Integer> asyncSubject = AsyncSubject.create();
總結
本章中,我們了解到了什么是觀察者模式员魏,為什么Observables在今天的編程場景中如此重要丑蛤,以及如何創(chuàng)建Observables和subjects。
下一章中撕阎,我們將創(chuàng)建第一個基于RxJava的Android應用程序受裹,學習如何檢索數(shù)據(jù)來填充listview,以及探索如何創(chuàng)建一個基于RxJava的響應式UI虏束。