(本文基于rxjava2.0/rxandroid2.0而寫)
RxJava很神奇!很神奇不是指它在GitHub上寫的“A library for composing asynchronous and event-based programs by using observable sequences.”揣非,而是它的設(shè)計(jì)模型很神奇。
我解析了它的源碼滴某,發(fā)現(xiàn)RxJava竟然是類似干細(xì)胞進(jìn)化辫诅。
RxJava的干細(xì)胞模型
RxJava的干細(xì)胞模型很簡(jiǎn)單,本質(zhì)上是基于觀察者模式的擴(kuò)展临庇,先上類圖零截。
一個(gè)被觀察者虛類Observable麸塞,實(shí)現(xiàn)它必須實(shí)現(xiàn)subscribeActual,一個(gè)觀察者接口類Observer涧衙,它擴(kuò)展了一些接口哪工。OK,我們看看這個(gè)簡(jiǎn)單的結(jié)構(gòu)弧哎,能干點(diǎn)什么雁比。
首先,實(shí)現(xiàn)一個(gè)簡(jiǎn)單的Observable和Observer:
public class AObservable extends Observable<Integer>{
@Override
protected void subscribeActual(Observer<? super Integer> observer) {
int a = 1+2;
observer.onNext(a);
observer.onComplete();
}
}
(new AObservable()).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer value) {
System.out.println(value);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
System.out.println("End!");
}
});
Observable做了兩件事撤嫩,計(jì)算1+2的結(jié)果偎捎,返回給Observer,Observer接收到結(jié)果后序攘,打印了出來鸭限。在這里,Observable負(fù)責(zé)處理事情两踏,Observer負(fù)責(zé)接收處理結(jié)果,就這么簡(jiǎn)單兜喻。這時(shí)候梦染,你可能開始疑惑,就只能做這么一件事情?
當(dāng)然不是帕识!
Observable進(jìn)化和相互作用
生物的干細(xì)胞能根據(jù)不同的需求泛粹,進(jìn)化成肌細(xì)胞、血紅細(xì)胞肮疗、腦細(xì)胞等等晶姊,Observable也不例外,通過繼承伪货,能實(shí)現(xiàn)不同的功能等Observable们衙,而且RxJava庫已經(jīng)幫你實(shí)現(xiàn)了大量的Observable,你只需要簡(jiǎn)單地調(diào)用就行碱呼。
另外蒙挑,進(jìn)化后的細(xì)胞相互作用后,會(huì)形成各種各樣的生理機(jī)能愚臀,Observable也是如此忆蚀。想想,當(dāng)BObservable給AObservable提供Observer的時(shí)候姑裂,會(huì)發(fā)生什么事馋袜?我們來看一個(gè)有趣的例子。
Observable.just(1, 2, 3,4 )
.map(new Function<Integer, String>() {
@Override
public String apply(Integer integer) throws Exception {
return "Hello, "+integer;
}
})
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println(s);
}
});
just(1, 2, 3,4 )實(shí)現(xiàn)了一個(gè)ObservableFromArray的Observable對(duì)象舶斧,map(...)實(shí)現(xiàn)了一個(gè)ObservableMap的Observable對(duì)象欣鳖,并且在它的subscribeActual里面給ObservableFromArray提供了Observer。這樣子捧毛,你就看到這樣子的場(chǎng)景观堂,ObservableFromArray把1234形成整型數(shù)組,挨個(gè)傳給了ObservableMap呀忧,ObservableMap把整型數(shù)組挨個(gè)轉(zhuǎn)成"Hello, x"后师痕,傳給它的Observer,打印出來而账。
你可以這樣子隨意地拼接能提供Observer的Observable胰坟,形成復(fù)雜的任務(wù)鏈。怎么樣泞辐,是不是很神奇笔横?RxJava可不是僅限于這種簡(jiǎn)單的拼接交互,不過更多的交互得你自己去挖掘了咐吼。
好了吹缔,我們來看一個(gè)更加神奇的進(jìn)化。
進(jìn)一步進(jìn)化
RxJava有兩個(gè)特殊的Observer锯茄,ObservableObserveOn和ObservableSubscribeOn厢塘,里面都包含了Scheduler茶没,用于控制任務(wù)執(zhí)行的線程。
下面晚碾,我們來看看源碼抓半,先看ObservableObserveOn。
public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize)
@Override
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker();
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
ObservableObserveOn實(shí)現(xiàn)了一個(gè)觀察者ObserveOnObserver格嘁,這個(gè)觀察者關(guān)聯(lián)了ObservableObserveOn的觀察者笛求,即subscribeActual傳入?yún)?shù)Observer<? super T> observer,和Scheduler.Worker糕簿,Scheduler.Worker會(huì)把observer的監(jiān)聽函數(shù)放入到Scheduler創(chuàng)建的線程中運(yùn)行探入。
再看ObservableSubscribeOn。
@Override
public void subscribeActual(final Observer<? super T> s) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
s.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new Runnable() {
@Override
public void run() {
source.subscribe(parent);
}
}));
}
scheduler直接把 source.subscribe(parent);放到新的線程中執(zhí)行冶伞,實(shí)現(xiàn)對(duì)source的subscribeActual線程控制新症,因?yàn)閟ubcribe會(huì)執(zhí)行subcribeActual。
更多的Observable或者Flowable(支持Backpressure的Observable)响禽,查看源碼徒爹。
一個(gè)使用示例
Observable.just("one", "two", "three", "four", "five")
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println(s);
}
});
這個(gè)示例創(chuàng)建了3個(gè)Observable,just("one", "two", "three", "four", "five")創(chuàng)建了ObservableFromArray芋类,用于把數(shù)組中的子項(xiàng)一個(gè)個(gè)輸出隆嗅,subscribeOn(Schedulers.newThread())創(chuàng)建了ObservableSubscribeOn,用于控制ObservableFromArray的subscribe的執(zhí)行線程侯繁,observeOn(AndroidSchedulers.mainThread())創(chuàng)建了ObservableObserveOn胖喳,用來控制接下來的Observe的監(jiān)聽線程。
看完贮竟,相信大家對(duì) Rxjava的機(jī)制和用法都有了很直觀的認(rèn)識(shí)丽焊!