Chapter 2: Observables and Subscribers-觀察者與可觀察對象
本章重點
- Observable的使用
- Observer的使用
- 其他創(chuàng)建Observable的工廠方法
- Single、Completable和Maybe
- Disposable
Observable是如何工作的
在我們做其他事情之前忌卤,我們需要學(xué)習(xí)的是一個Observable序列是如何將item通過鏈推送給Observer的喻括。在最高層,Observable傳遞這三類事件:
-
onNext()
:每次從源推送一個item到Observer -
onComplete()
:將一個完成事件推送給Observer领追,表明后續(xù)不再有onNext()
調(diào)用 -
onError()
:將一個錯誤事件推送給Observer,通常由觀察者來定義如何處理它他膳。除非使用retry
操作符來攔截這一錯誤,否則可觀察鏈將終止蔓腐,并不在發(fā)出任何事件
這三個事件是Observer中的抽象方法矩乐,稍后我們將介紹其中的一些實現(xiàn)。我們先觀察它們在日常場景中是如何使用的回论。
提示??:在RxJava1.0中
onComplete()
其實叫做onCompleted()
使用Observable.create()
現(xiàn)在散罕,讓我們使用Observable.create()來創(chuàng)建一個Observable。相對而言source是我們觀測鏈的起點傀蓉。
Observable.create()
允許我們通過lambda表達(dá)式創(chuàng)建一個Observable emitter欧漱。我們能夠調(diào)用Observable emitter的onNext()
方法來發(fā)射(一次)數(shù)據(jù)(emissions),以及調(diào)用一次onComplete()
來通知發(fā)射完成葬燎,之后便不再有事件發(fā)出误甚。這些onNext()
調(diào)用將把item連接到觀察者缚甩,它將打印每一個項目,如下代碼所示:
package chapter2;
import io.reactivex.Observable;
import io.reactivex.ObservableEmitter;
import io.reactivex.ObservableOnSubscribe;
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;
public class Launcher {
public static void main(String[] args) {
Observable<String> source = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("Alpha");
emitter.onNext("Beta");
emitter.onNext("Gamma");
emitter.onNext("Delta");
emitter.onNext("Epsilon");
emitter.onComplete();
}
});
source.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
System.out.println("RECEIVED:" + s);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
System.out.println("onComplete");
}
});
// lambada
Observable<String> sourceByLambda = Observable.create(emitter -> {
emitter.onNext("Alpha");
emitter.onNext("Beta");
emitter.onNext("Gamma");
emitter.onNext("Delta");
emitter.onNext("Epsilon");
emitter.onComplete();
});
sourceByLambda.subscribe(s -> System.out.println("REVEIVED:" + s));
}
}
它們將輸出(非lambda版本):
RECEIVED:Alpha
RECEIVED:Beta
RECEIVED:Gamma
RECEIVED:Delta
RECEIVED:Epsilon
onComplete
lamda版本:
RECEIVED:Alpha
RECEIVED:Beta
RECEIVED:Gamma
RECEIVED:Delta
RECEIVED:Epsilon
提示??:在RxJava1.0中窑邦,需要使用Observable.fromEmitter()來替換Observable.create()擅威,因為后者在RxJava1.0中的作用與RxJava2.0,并且只針對RxJava高級用戶冈钦。
onNext()是處理每一項數(shù)據(jù)的一種方式:從“Alpha”開始郊丛,到調(diào)用鏈中的下一步。在這個例子中瞧筛,下一步是Observer,它使用s -> System.out.println("RECEIVED: " + s)
這一lambda表達(dá)式來打印每一項數(shù)據(jù)厉熟。這一Lambda表達(dá)式是在Observer的onNext()
中調(diào)用的,我們稍后會更仔細(xì)的觀察Observer较幌。
警告??:需要注意的是揍瑟,Observable規(guī)定(http://reactivex.io/documentation/contract.html)數(shù)據(jù)必須按照順序一次性發(fā)送完畢,而不能通過Observable并行的發(fā)送乍炉。這似乎是一個限制绢片,但實際上這簡化了程序,使得Rx更易使用恩急。我們將在第六章:并發(fā)性與并行性中學(xué)習(xí)一些強(qiáng)大的技巧杉畜,從而在不違背Observable規(guī)定的情況下有效的利用并發(fā)性和并行性。
onComplete()
用于通知Observer衷恭,不會再有數(shù)據(jù)推過來了此叠。Observable可以是無限的,如果是這種情況随珠,則永遠(yuǎn)不會調(diào)用onComplete()
事件灭袁。從技術(shù)來講,Observable可以通過不再調(diào)用的onNext()
方式來停止發(fā)出數(shù)據(jù)窗看,從而不使用onComplete()
方法茸歧。但是如果Observable不再計劃發(fā)送數(shù)據(jù),這可能是一個糟糕的設(shè)計显沈。
盡管這個在這個例子中不會拋出異常软瞎,但是我們?nèi)耘f可以在Observable.create()代碼塊中捕獲可能會發(fā)生的異常,并通過onError()發(fā)射異常拉讯。這樣涤浇,異常就會被推送到調(diào)用鏈上交由Observer處理。前面的例子中我們的Observer不處理異常魔慷,但是你可以像下面這么做:
package chapter2;
import io.reactivex.Observable;
import io.reactivex.ObservableEmitter;
import io.reactivex.ObservableOnSubscribe;
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;
public class Launcher {
public static void main(String[] args) {
Observable<String> source = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
try{
emitter.onNext("Alpha");
emitter.onNext("Beta");
emitter.onNext("Gamma");
emitter.onNext("Delta");
emitter.onNext("Epsilon");
emitter.onComplete();
}catch (Exception e){
emitter.onError(e);
}
}
});
// Observer bind Observable
source.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
System.out.println("RECEIVED:" + s);
}
@Override
public void onError(Throwable e) {
System.out.println(e);
}
@Override
public void onComplete() {
System.out.println("onComplete");
}
});
// lambda版本
Observable<String> sourceByLambda = Observable.create(emitter -> {
try{
emitter.onNext("Alpha");
emitter.onNext("Beta");
emitter.onNext("Gamma");
emitter.onNext("Delta");
emitter.onNext("Epsilon");
}catch (Exception e){
emitter.onError(e);
}
});
sourceByLambda.subscribe(s->System.out.println("RECEIVED:" + s),
Throwable::printStackTrace);
}
}
注意只锭,onNext()
、onComplete()
和onError
并不一定會直接發(fā)送給最終的Observer院尔。它們還可以發(fā)送給調(diào)用鏈中的下一個操作符蜻展。在下面的代碼中喉誊,我們將使用map()
和filter()
操作符派生新的Observable,這些操作符將在源Observable后到最終Observer打印數(shù)據(jù)間進(jìn)行操作纵顾。
package chapter2;
import io.reactivex.Observable;
import io.reactivex.ObservableEmitter;
import io.reactivex.ObservableOnSubscribe;
import io.reactivex.functions.Consumer;
import io.reactivex.functions.Function;
import io.reactivex.functions.Predicate;
public class Launcher {
public static void main(String[] args) {
Observable<String> source = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
try {
emitter.onNext("Alpha");
emitter.onNext("Beta");
emitter.onNext("Gamma");
emitter.onNext("Delta");
emitter.onNext("Epsilon");
emitter.onComplete();
} catch (Exception e) {
emitter.onError(e);
}
}
});
Observable<Integer> lengths = source.map(new Function<String, Integer>() {
@Override
public Integer apply(String s) throws Exception {
return s.length();
}
});
Observable<Integer> filtered = lengths.filter(new Predicate<Integer>() {
@Override
public boolean test(Integer integer) throws Exception {
return integer >= 5;
}
});
filtered.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println("RECEIVED:" + integer);
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
throwable.printStackTrace();
}
});
// lambda版本
Observable<String> sourceByLambda = Observable.create(emitter -> {
try {
emitter.onNext("Alpha");
emitter.onNext("Beta");
emitter.onNext("Gamma");
emitter.onNext("Delta");
emitter.onNext("Epsilon");
emitter.onComplete();
} catch (Exception e) {
emitter.onError(e);
}
});
Observable<Integer> lengthsByLambda = sourceByLambda.map(String::length);
Observable<Integer> filteredByLambda = lengthsByLambda.filter(integer -> integer >= 5);
filteredByLambda.subscribe(s -> System.out.println("RECEIVED:" + s),
Throwable::printStackTrace);
}
}
運行后將輸出:
RECEIVED:5
RECEIVED:5
RECEIVED:5
RECEIVED:7
使用map()
和filter()
操作符可在source Observer與Observer之間伍茄,onNext()
將會把每一項數(shù)據(jù)交由map()
操作符處理。在內(nèi)部片挂,它將充當(dāng)中介觀察者幻林,將每一個字符串轉(zhuǎn)換為其長度。反過來音念,這將調(diào)用onNext()
將該整數(shù)傳遞給filter()
,而lambda表達(dá)式i -> i >= 5
將過濾掉長度小于5的數(shù)據(jù)躏敢。最后filter()
操作符調(diào)用onNext()
將每一項數(shù)據(jù)推送給最終的Observer闷愤,將結(jié)果打印出來。
值得注意的是件余,map()
操作符將從原來的Observable<String>
上產(chǎn)生一個新的可觀察對象Observable<Integer>
讥脐。filter()
操作符同樣會返回一個Observable<Integer>
,但是忽略了那些沒有達(dá)到輸出條件的數(shù)據(jù)啼器。
由于像map()
和filter()
這樣的操作符產(chǎn)生了新的可觀察對象(在內(nèi)部通過使用Observer來接收數(shù)據(jù)實現(xiàn))旬渠,我們可以將所有的可觀察對象與下一個操作符連接在一起,不必再將每一步都保存在一個中間變量中端壳。
package chapter2;
import io.reactivex.Observable;
import io.reactivex.ObservableEmitter;
import io.reactivex.ObservableOnSubscribe;
import io.reactivex.functions.Consumer;
import io.reactivex.functions.Function;
import io.reactivex.functions.Predicate;
public class Launcher {
public static void main(String[] args) {
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
try {
emitter.onNext("Alpha");
emitter.onNext("Beta");
emitter.onNext("Gamma");
emitter.onNext("Delta");
emitter.onNext("Epsilon");
emitter.onComplete();
} catch (Exception e) {
emitter.onError(e);
}
}
}).map(new Function<String, Integer>() {
@Override
public Integer apply(String s) throws Exception {
return s.length();
}
}).filter(new Predicate<Integer>() {
@Override
public boolean test(Integer integer) throws Exception {
return integer >= 5;
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println("RECEIVED:" + integer);
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
throwable.printStackTrace();
}
});
// lambda版本
Observable<String> sourceByLambda = Observable.create(emitter -> {
try {
emitter.onNext("Alpha");
emitter.onNext("Beta");
emitter.onNext("Gamma");
emitter.onNext("Delta");
emitter.onNext("Epsilon");
emitter.onComplete();
} catch (Exception e) {
emitter.onError(e);
}
});
sourceByLambda.map(String::length)
.filter(integer -> integer >= 5)
.subscribe(s -> System.out.println("RECEIVED:" + s),
Throwable::printStackTrace);
}
}
同樣會輸出如下結(jié)果:
RECEIVED:5
RECEIVED:5
RECEIVED:5
RECEIVED:7
這種鏈?zhǔn)秸{(diào)用的方式在響應(yīng)式編程中是常見的(并且是推薦的方式)告丢。它的可讀性很好,從左至右损谦,從上到下像書的結(jié)構(gòu)一樣岖免,這對可維護(hù)性和易讀性有很大的幫助。
警告??:在RxJava2.0中照捡,可觀察對象不再支持發(fā)射一個空值颅湘。如果你創(chuàng)建了一個試圖發(fā)射空值得可觀察對象,將得到一個非空y異常栗精。如果你需要發(fā)射一個空值闯参,請考慮使用Java 8或是Google的Guava庫的Optional將其封裝起來。
例如:
package chapter2;
import io.reactivex.Observable;
import java.util.Optional;
public class Launcher {
public static void main(String[] args) {
Optional<Void> voidOptional = Optional.empty();// 等同于Optional.ofNullable(null)
Observable.create(emitter -> {
emitter.onNext(voidOptional);
emitter.onComplete();
}).subscribe(s -> System.out.println(s));
}
}
這將會輸出:
Optional.empty
有關(guān)Optional的更多信息看這里http://www.importnew.com/6675.html
使用Observable.just()
在我們繼續(xù)查看subscribe()
方法之前悲立,先要提醒的是鹿寨,你可能并不需要經(jīng)常使用Observable.create()
來創(chuàng)建可觀察對象。讀完本章后面的內(nèi)容你就會發(fā)現(xiàn)级历,它更多的是幫助我們使用本來不是響應(yīng)式的元數(shù)據(jù)释移。所以一般我們僅僅需要使用其他精簡版的工廠方法來創(chuàng)建Observable
。
在之前使用Observable.create()
的例子中寥殖,我們可以使用Observable.just()
方法來實現(xiàn)同樣的功能玩讳。它最多可以發(fā)射10個同類型的數(shù)據(jù)涩蜘,然后為每一項數(shù)據(jù)調(diào)用onNext()
方法發(fā)射它們,并在發(fā)射完成后調(diào)用onComplete()
方法:
package chapter2;
import io.reactivex.Observable;
public class Launcher {
public static void main(String[] args) {
Observable.just("Alpha", "Beta", "Gamma", "Delta", "Espilon")
.map(String::length)
.filter(i -> i >= 5)
.subscribe(s -> System.out.println("RECEIVED:" + s),
Throwable::printStackTrace,() ->System.out.println("onComplete"));
}
}
我們也可以使用Observable.formIterable()
來發(fā)射任何Iterbale
類型的數(shù)據(jù)熏纯,比如說List
同诫。它也會通過調(diào)用onNext()
方法來發(fā)射每一項數(shù)據(jù),然后在迭代完成后調(diào)用onComplete()
方法樟澜。你可能會頻繁的使用這一工廠方法误窖,因為在java中迭代器是很常見的,并且很容易寫出如下代碼:
package chapter2;
import io.reactivex.Observable;
import java.util.Arrays;
import java.util.List;
public class Launcher {
public static void main(String[] args) {
List<String> items=Arrays.asList("Alpha", "Beta", "Gamma", "Delta", "Espilon");
Observable.fromIterable(items)
.map(String::length)
.filter(i -> i>=5)
.subscribe(s -> System.out.println("RECEIVED:" + s),
Throwable::printStackTrace,() ->System.out.println("onComplete"));
}
}
我們將在本章后續(xù)引入更多創(chuàng)建Observable的工廠方法秩贰,但是現(xiàn)在讓我們把重點放在了解Observer上霹俺。
Observer接口
實際上onNext()
,onComplete()
以及onError()
方法都定義在Observer
類中毒费,RxJava通過這一抽象接口來傳遞這些事件丙唧。下面這段代碼就是Observer
接口的定義。暫時不要關(guān)注onSubscribe()
方法觅玻,因為我們將在本章后面的內(nèi)容中介紹它想际。請注意其他三個方法:
package io.reactivex;
import io.reactivex.disposables.Disposable;
public interface Observer<T> {
void onSubscribe(Disposable d);
void onNext(T value);
void onError(Throwable e);
void onComplete();
}
Observer
和Observable
有時可能是相對的。在同一上下文中溪厘,調(diào)用鏈的起點和數(shù)據(jù)發(fā)出的地方的Observable
都可以稱為source Observable
胡本。在前面的示例中,你可以認(rèn)為Observable.create()
和Observable.just()
返回的Observable
是source Observable
畸悬。但是在filter()
操作符中侧甫,它的Observable
是從map()
操作符返回的。它無從得知真正的起點在哪傻昙,它只知道它正在接收來自它上游map()
發(fā)射的數(shù)據(jù)闺骚。
相反,由操作符返回的每一個Observable
內(nèi)部都是Observer妆档,它接收僻爽、轉(zhuǎn)換來自上游的數(shù)據(jù),并作為中繼將數(shù)據(jù)傳遞給下游贾惦。它不知道下游究竟是一個操作符還是調(diào)用鏈尾部的最后一個Observer
胸梆。當(dāng)我們經(jīng)常提到的Observer
指的是處于調(diào)用連尾部的最終消耗了數(shù)據(jù)的觀察者。但是對于map()
和filter()
這些操作符來說须板,其內(nèi)部同樣利用了Observer
碰镜。
我們將會在第九章:Transformers 和 自定義操作符中學(xué)習(xí)更多關(guān)于自定義操作符的知識,現(xiàn)在我們將著重于如何使用Observer
的subscribe()
方法习瑰。
警告??:在RxJava 1.0中绪颖,
Subseriber
本質(zhì)上是RxJava 2.0中的Observer
。在RxJava 1.0中甜奄,Subseriber
和RxJava 2.0的Observer
類柠横,同樣定義了三個事件窃款,但是Subseriber
是subseribe()
方法的參數(shù),并且它實現(xiàn)了Observer
接口牍氛。在RxJava 2.0中晨继,僅當(dāng)我們提到Flowables時才存在Subseriber
。我們將在第八章:Flowables 和 背壓中說到它搬俊。
實現(xiàn)并訂閱一個Observer
當(dāng)你調(diào)用Observable
的subscribe()
方法時,一個Observer
就會通過實現(xiàn)它的抽象方法來使用這三個事件紊扬。我們可以手動實現(xiàn)一個Observer
,并將它實例化后傳遞給subscribe()
方法唉擂,而不是像之前那樣將lambda表達(dá)式作為參數(shù)〔褪海現(xiàn)在不要理會onSubscribe()
方法,我們先提供一個空實現(xiàn)玩祟,在本章后面的內(nèi)容中啤挎,我們會談到過它。
package chapter2;
import io.reactivex.Observable;
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;
public class Launcher {
public static void main(String[] args) {
Observable<String> source = Observable.just("Alpha", "Beta", "Gamma", "Delta", "Espilon");
Observer<Integer> myObserver = new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
// 不去使用disposable卵凑,現(xiàn)在不要理會它
}
@Override
public void onNext(Integer value) {
System.out.println("RECEIVED:" + value);
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
}
@Override
public void onComplete() {
System.out.println("Done!");
}
};
source.map(String::length)
.filter(i -> i>=5)
.subscribe(myObserver);
}
}
這將會輸出:
RECEIVED:5
RECEIVED:5
RECEIVED:5
RECEIVED:7
Done!
我們快速創(chuàng)建了一個Observer<Integer>
類型的Observer
,它接收整數(shù)類型的參數(shù)作為長度數(shù)據(jù)胜臊。我們的Observer
作為調(diào)用鏈的尾部接收數(shù)據(jù)勺卢。這樣一來,意味著我們可以將這些數(shù)據(jù)寫入數(shù)據(jù)庫象对、文本文件黑忱,作為服務(wù)器響應(yīng),顯示在UI上勒魔,或者(像本例一樣)只是打印在控制臺中甫煞。
讓我們從開始發(fā)射字符串的源頭開始進(jìn)一步觀察這個例子。我們先是定義了我們的Observer
冠绢,并將其作為參數(shù)抚吠,傳遞給了調(diào)用鏈尾部的subscribe()
方法弟胀。注意楷力,每個字符串都被轉(zhuǎn)換成了它的長度。onNext()
方法接收每個長度數(shù)據(jù)孵户,并通過System.out.println("RECEIVED:" + value)
將其打印到控制臺萧朝。這個過程中不會拋出任何異常,但如果在我們的調(diào)用鏈中的任意一個階段拋出了異常夏哭,它將被推送到我們實現(xiàn)的onError()
方法中检柬,然后打印堆棧信息。最后竖配,當(dāng)發(fā)射源沒有數(shù)據(jù)時(發(fā)射完 "Espilon"之后),這會導(dǎo)致調(diào)用鏈上的觀察者的onComplete()
被逐一調(diào)用何址,直到最后一個onComplete()
方法將Done
打印到控制臺中里逆。
使用lambda表達(dá)式快速實現(xiàn)Observer
實現(xiàn)一個Observer
代碼有點冗長且過程麻煩。幸運的是头朱,subscribe()
的重載方法接收我們用lambda表達(dá)式實現(xiàn)這三個事件运悲。我們大多數(shù)情況下可能更喜歡這樣——指定三個用逗號分隔開的lambda表達(dá)式作為參數(shù):onNext()
lambda、onError()
lambda 和 onComplete()
lambda项钮。對于前面的例子來說班眯,我們可以使用這三個lambda表達(dá)式來實現(xiàn)那三個方法:
Consumer<Integer> onNext = i -> System.out.println("RECEIVED: " + i);
Action onComplete = () -> System.out.println("Done!");
Consumer<Throwable> onError = Throwable::printStackTrace;
我們可以將這三個lambda表達(dá)式作為subscribe()
的參數(shù),subscribe()
會將使用它們?yōu)槲覀儗崿F(xiàn)一個Observer
烁巫。這樣要簡潔的多署隘,需要的樣本代碼也少得多:
package chapter2;
import io.reactivex.Observable;
public class Launcher {
public static void main(String[] args) {
Observable<String> source =
Observable.just("Alpha", "Beta", "Gamma", "Delta",
"Epsilon");
source.map(String::length)
.filter(i -> i >= 5)
.subscribe(i -> System.out.println("RECEIVED: " + i),
Throwable::printStackTrace, () -> System.out.println("Done!"));
}
}
輸出如下:
RECEIVED: 5
RECEIVED: 5
RECEIVED: 5
RECEIVED: 7
Done!
請注意,subscribe()
還有其他的重載亚隙。你可以忽略onComplete()
磁餐,只實現(xiàn)onNext()
和onError()
。當(dāng)你不需要時onComplete()
阿弃,像下面這段代碼這樣诊霹,將不再為onComplete()
執(zhí)行任何操作:
package chapter2;
import io.reactivex.Observable;
public class Launcher {
public static void main(String[] args) {
Observable<String> source =
Observable.just("Alpha", "Beta", "Gamma", "Delta",
"Epsilon");
source.map(String::length)
.filter(i -> i >= 5)
.subscribe(i -> System.out.println("RECEIVED: " + i),
Throwable::printStackTrace);
}
}
輸出如下:
RECEIVED: 5
RECEIVED: 5
RECEIVED: 5
RECEIVED: 7
正如你前面看到的那樣,你甚至可以忽略onError()
渣淳,僅指定onNext()
:
package chapter2;
import io.reactivex.Observable;
public class Launcher {
public static void main(String[] args) {
Observable<String> source =
Observable.just("Alpha", "Beta", "Gamma", "Delta",
"Epsilon");
source.map(String::length)
.filter(i -> i >= 5)
.subscribe(i -> System.out.println("RECEIVED: " + i));
}
}
然而脾还,在生產(chǎn)環(huán)境中還是要盡量實現(xiàn)onError()
。在調(diào)用鏈中的任何地方發(fā)生的異常都會讓Observable
終止發(fā)射數(shù)據(jù)入愧,并將異常交由onError()
處理鄙漏。如果你不指定onError()
,那么異常將無法得到處理:
提示??:當(dāng)發(fā)生錯誤時棺蛛,你可以使用
retry()
操作符重新訂閱一個Observable
來嘗試恢復(fù)怔蚌。我們將在下一章節(jié)介紹如何實現(xiàn)這一操作。
值得注意的是旁赊,subscribe()
的大多數(shù)重載方法(包括我們剛剛使用的lambda縮寫)都返回了Disposable
對象桦踊,我們并沒有對它進(jìn)行任何操作。disposable可以使我們在Observer
中與Observable
斷開連接彤恶,這樣發(fā)射就會被提前終止钞钙。這對于一直運行或者是長時間運行的Observable
可是至關(guān)重要的,我們將在本章的末尾介紹disposable声离。
Observables的冷于熱
Observable
和Observer
間究竟是什么樣的微妙關(guān)系芒炼,與Observable
的實現(xiàn)有關(guān)。Observables
的冷與熱是它的一個重要特性术徊,這定義了當(dāng)存在多個Observable
時會發(fā)生的情況本刽,首先,我們先介紹Observables的冷。
Observables的冷
冷的Observable就像是一張CD子寓。它能夠被每一個聽者重新播放暗挑,所以每一個人在任何時候都能夠聽到完整的音樂。同樣斜友,Observable也會為每一個Observer
重發(fā)數(shù)據(jù)炸裆,確保每一個Observer都能夠接收到全部數(shù)據(jù)。大多數(shù)數(shù)據(jù)驅(qū)動的Observable都是冷的鲜屏,包括Observable.just()
和Observable.fromIterable()
等工廠方法烹看。
在下面的例子中,有兩個慣著差訂閱了同一個Observable
洛史。這個Observable
將會把所有數(shù)據(jù)先發(fā)送給第一個Observer
然后調(diào)用onComplete()
惯殊。然后,它將會給第二個Observer
發(fā)送所有的數(shù)據(jù)然后調(diào)用onComplete()
也殖。這兩個觀察者從不同的數(shù)據(jù)流接收到了相同的數(shù)據(jù)土思,這便是冷Observable的一個典型應(yīng)用:
package chapter2;
import io.reactivex.Observable;
public class Launcher {
public static void main(String[] args) {
Observable<String> source =
Observable.just("Alpha", "Beta", "Gamma", "Delta",
"Epsilon");
source.subscribe(i -> System.out.println("observer1 RECEIVED: " + i));
source.subscribe(i -> System.out.println("observer2 RECEIVED: " + i));
}
}
輸出如下:
observer1 RECEIVED: Alpha
observer1 RECEIVED: Beta
observer1 RECEIVED: Gamma
observer1 RECEIVED: Delta
observer1 RECEIVED: Epsilon
observer2 RECEIVED: Alpha
observer2 RECEIVED: Beta
observer2 RECEIVED: Gamma
observer2 RECEIVED: Delta
observer2 RECEIVED: Epsilon
即使第二個Observer
通過操作符轉(zhuǎn)換了它的數(shù)據(jù)集,仍舊能夠從數(shù)據(jù)集中得到自己的數(shù)據(jù)流忆嗜。使用像map()
和filter()
操作冷的Obserbvable
得到的仍舊是冷的Observable:
package chapter2;
import io.reactivex.Observable;
public class Launcher {
public static void main(String[] args) {
Observable<String> source =
Observable.just("Alpha", "Beta", "Gamma", "Delta",
"Epsilon");
source.subscribe(i -> System.out.println("observer1 RECEIVED: " + i));
source.map(String::length)
.filter(i -> i >= 5)
.subscribe(i -> System.out.println("observer2 RECEIVED: " + i));
}
}
輸出如下:
observer1 RECEIVED: Alpha
observer1 RECEIVED: Beta
observer1 RECEIVED: Gamma
observer1 RECEIVED: Delta
observer1 RECEIVED: Epsilon
observer2 RECEIVED: 5
observer2 RECEIVED: 5
observer2 RECEIVED: 5
observer2 RECEIVED: 7
如前所述己儒,發(fā)出有限數(shù)據(jù)集的Observable
通常都是冷的。
這是一個更加真實的案例:Dave Moten的RxJava-JDBC庫允許你創(chuàng)建一個冷的Observable來查詢數(shù)據(jù)庫捆毫。我們并不會深入這個庫址愿。假設(shè)你想查詢一個SQLite數(shù)據(jù)庫,并且項目中引入了SQLite JDBC
驅(qū)動和RxJava-JDBC
庫损拢。你可以像下面這段代碼所示一樣舶衬,查詢表中數(shù)據(jù):
import com.github.davidmoten.rx.jdbc.ConnectionProviderFromUrl;
import com.github.davidmoten.rx.jdbc.Database;
import rx.Observable; import java.sql.Connection;
public class Launcher {
public static void main(String[] args) {
Connection conn = new ConnectionProviderFromUrl("jdbc:sqlite:/home/thomas/rexon_metals.db").get();
Database db = Database.from(conn);
Observable<String> customerNames =
db.select("SELECT NAME FROM CUSTOMER")
.getAs(String.class);
customerNames.subscribe(s -> System.out.println(s));
}
}
輸出如下:
LITE Industrial
Rex Tooling Inc
Re-Barre Construction
Prairie Construction
Marsh Lane Metal Works
這個SQL驅(qū)動的Observable
是冷的。許多從數(shù)據(jù)庫、文本文件或者是JSON等有限的數(shù)據(jù)源發(fā)出的Observable
都是冷的过牙。重點是Observable
是如何運行的。RxJava-JDBC將會為每一個觀察者都巡行一次該查詢征绸。這意味著克锣,如果在第二個觀察者訂閱前數(shù)據(jù)發(fā)生了變化,那么它將得到與第一個觀察者所得到的不同的數(shù)據(jù)律适。即使結(jié)果數(shù)據(jù)從底層的表中發(fā)生了改變辐烂,但是Observable
仍然是冷的,因為它僅僅是重新運行了這段查詢捂贿。
再次強(qiáng)調(diào)纠修,冷的Observbales將會以某種形式為每一個Observer
重新發(fā)送這個Observbale
所取得的數(shù)據(jù)。下面厂僧,我們將會介紹比數(shù)據(jù)更像事件的熱的Observbales扣草。
Observbales的熱
你剛剛學(xué)習(xí)了什么是冷的Observable
,它的工作原理就像一張CD。而一個熱的Observbale
則更像是一個電臺辰妙。它像廣播一樣同時向所有的觀察者發(fā)射數(shù)據(jù)鹰祸。如果一個Observable
訂閱了一個熱的Observable
并接收了一些數(shù)據(jù)。這時另一個Observer
也訂閱了它密浑,那么他將錯過之前發(fā)射的那些數(shù)據(jù)蛙婴。就像電臺一樣,如果你切換的晚了些尔破,那么你將錯過那首歌街图。
從邏輯上來講,熱的Observble往往代表的是事件呆瞻,而不是有限的數(shù)據(jù)集台夺。這些事件能夠攜帶數(shù)據(jù),但是它們對時間敏感——后訂閱的觀察者會錯過之前發(fā)射的數(shù)據(jù)痴脾。
舉個栗子颤介,一個JavaFX或者Android 的UI事件都能夠被看做是熱的Observable
。在JavaFX中赞赖,你可以使用Observable.create()
創(chuàng)建一個Observable<Bollean>
來包裝ToggleButton
的selectedProperty()
滚朵。然后將布爾值轉(zhuǎn)換為"UP"或者"DOWN"表示ToggleButton
處于開啟狀態(tài)還是關(guān)閉狀態(tài),之后使用一個Observer
在Label
中顯示它前域,如下面的代碼片段所示:
提示??:請在gradle中添加RxJavaFx的依賴:
compile 'io.reactivex.rxjava2:rxjavafx:2.2.2'
package chapter2;
import io.reactivex.Observable;
import io.reactivex.functions.Function;
import javafx.application.Application;
import javafx.beans.value.ChangeListener;
import javafx.beans.value.ObservableValue;
import javafx.event.EventHandler;
import javafx.scene.Scene;
import javafx.scene.control.Button;
import javafx.scene.control.Label;
import javafx.scene.control.ToggleButton;
import javafx.scene.input.MouseEvent;
import javafx.scene.layout.VBox;
import javafx.stage.Stage;
public class MyJavaFxApp extends Application {
@Override
public void start(Stage primaryStage) throws Exception {
System.out.println("start!");
ToggleButton toggleButton = new ToggleButton("TOGGLE ME");
Label label = new Label();
Observable<Boolean> selectedStates = valuesOf(toggleButton.selectedProperty());
selectedStates.map(selected -> selected?"DOWN":"UP")
.subscribe(label::setText);// lambda還能夠引用實例的方法
VBox vBox = new VBox(toggleButton,label);
primaryStage.setScene(new Scene(vBox));
primaryStage.show();
}
private static <T> Observable<T> valuesOf(final ObservableValue<T> fxObservable){
return Observable.create(observableEmitter -> {
// 發(fā)射初始狀態(tài)
observableEmitter.onNext(fxObservable.getValue());
// 當(dāng)狀態(tài)改變時在監(jiān)聽器中發(fā)射當(dāng)前狀態(tài)
final ChangeListener<T> listener =
(observableValue,prev,current) -> observableEmitter.onNext(current);
fxObservable.addListener(listener);
});
}
}
輸出如下:
[圖片上傳失敗...(image-c7b37a-1517302658055)]
這是一個通過一個熱的Observbale<Boolean>
來包裝ToggleButton
的選擇狀態(tài)的JavaFX app辕近。
注意??:如果你使用的時OpenJDK,則需要單獨導(dǎo)入JavaFX庫匿垄。JavaFx庫的可用版本可在甲骨文官方文檔中查看http://www.oracle.com/technetwork/java/javase/downloads/index.html
JavaFX的ObservableValue
與RxJava的Observbale
沒有任何關(guān)系移宅。它是JavaFX所獨有的,但是我們可以通過使用valuesOf()
工廠方法在ChangeListener
中調(diào)用onNext()
方法從而很容易的轉(zhuǎn)換成了RxJava的Observable
椿疗。當(dāng)你每次點擊ToggleButton
時漏峰,這個Observable<Boolean>
都會發(fā)射一個true或者false來映射選擇狀態(tài)。這是一個簡單的例子届榄,表明這個Observbale
不但是在發(fā)射事件浅乔,而且也是在發(fā)射true或false這個數(shù)據(jù)。它將布爾值轉(zhuǎn)換為字符串铝条,并通過Observer
改變Label
中的文本靖苇。
在這個JavaFX的示例中,我們只有一個Observer
班缰。如果我們讓更多的觀察者參與ToggleButton
數(shù)據(jù)發(fā)出之后的事件贤壁,那么這些新的觀察者將錯過這些數(shù)據(jù)。
JavaFX和Android上的UI事件是熱的Observable的主要例子埠忘,但是你同樣可以使用熱的Observable來響應(yīng)服務(wù)器請求芯砸。如果你為某個特定的話題發(fā)射消息的Twitter流創(chuàng)建一個Observbale
萧芙,那也將是一個熱的Observbale。雖然許多熱的Observable的源的數(shù)據(jù)都是無限的(infinite)假丧,但是它們也可以不這樣双揪。他們只需同時向所有觀察者發(fā)射數(shù)據(jù),并且不對那些遲到的觀察者重發(fā)之前的數(shù)據(jù)即可包帚。
提示??:RxJavaFX(以及RxAndroid)都有各類工廠方法幫你將UI事件與
Observbale
進(jìn)行綁定渔期。你可以使用RxJavaFX的valuesOf()
這一工廠方法簡化上面的示例。
請注意渴邦,在這個JavaFX示例中我們同樣沒有處理disposal疯趟,我們將會在本章末尾再去討論它。
ConnnectableObservbale
ConnectableObservable
是一個有用的熱Observbale
谋梭。它能夠接收任何Observable
信峻,即使它是一個冷的,也能夠?qū)⑵滢D(zhuǎn)為熱的然后將所有數(shù)據(jù)同時向所有的觀察者同時發(fā)送一次瓮床。要進(jìn)行這種轉(zhuǎn)換盹舞,只需調(diào)用任意一個Observbale
的publish()
方法,就會產(chǎn)生一個ConnectableObservable
隘庄。
但是訂閱后并不會自動發(fā)射數(shù)據(jù)踢步。你需要調(diào)用connect()
方法來開始發(fā)射,這允許你提前設(shè)置好所有的觀察者丑掺。請看下面這段代碼:
package chapter2;
import io.reactivex.Observable;
import io.reactivex.observables.ConnectableObservable;
public class HotLauncher {
public static void main(String[] args){
ConnectableObservable<String> source = Observable
.just("Alpha","Beta","Delta","Gamma","Epsilon")
.publish();
source.subscribe(s -> System.out.println("observer1 RECEIVED: " + s));
source.map(String::length)
.subscribe(i -> System.out.println("observer2 RECEIVED: " + i));
//發(fā)射获印!
source.connect();
}
}
輸出如下:
observer1 RECEIVED: Alpha
observer2 RECEIVED: 5
observer1 RECEIVED: Beta
observer2 RECEIVED: 4
observer1 RECEIVED: Delta
observer2 RECEIVED: 5
observer1 RECEIVED: Gamma
observer2 RECEIVED: 5
observer1 RECEIVED: Epsilon
observer2 RECEIVED: 7
請注意,第一個觀察者接收到的是字符串街州,而另一個觀察者接收到的是長度兼丰,它們以交錯的形式打印數(shù)據(jù)。這兩種訂閱都是預(yù)先設(shè)置好的唆缴,然后通過調(diào)用connect()
來發(fā)射數(shù)據(jù)地粪。兩個觀察者會同時收到這些數(shù)據(jù):第一個觀察者接收Alpha同時第二個觀察者接收到5之后是Beta和4,后面的也一樣琐谤,而不是讓第一個觀察者在第二個觀察者之前處理完所有數(shù)據(jù)。第使用ConnectableObservable
強(qiáng)制將每一個數(shù)據(jù)都同時發(fā)送到所有觀察者的情況玩敏,我們將在第五章多播中詳細(xì)介紹斗忌。
ConnectableObservable
有助于防止為每個觀察者重新發(fā)送數(shù)據(jù)的情況。當(dāng)重新發(fā)送的代價很大旺聚,這種情況下织阳,你也許寧愿將數(shù)據(jù)同時發(fā)送給所有觀察者。即使在下游有多個觀察者砰粹,你也可以通過ConnectableObservable
簡單地強(qiáng)制上游的操作符使用單個流實例唧躲。多個觀察者通常會在上游生成多個數(shù)據(jù)流實例,但是使用public()
返回的ConnectableObservable
將上游所有的數(shù)據(jù)流合并到同一個流中。同樣弄痹,這些細(xì)微差別將在第五章多播中一一介紹饭入。
現(xiàn)在只需要記住ConnectableObservable
是熱的,因此肛真,在connect()
之后訂閱的觀察者將錯過之前發(fā)射的數(shù)據(jù)谐丢。