最簡(jiǎn)單的觀察者列車(chē)
Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("邦");
}
}).subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
}
});
觀察subscribe()得知 大體流程:
1纤勒、會(huì)立即調(diào)用onStart()方法,在其它操作之前調(diào)用
subscriber.onStart();
2挥吵、之后它喜歡用SafeSubscriber吧subscriber包起來(lái)(裝飾模式)
if (!(subscriber instanceof SafeSubscriber)) {
subscriber = new SafeSubscriber<T>(subscriber);
}
3逝嚎、包起來(lái)后,就開(kāi)始調(diào)用observable的call()方法啟動(dòng)整個(gè)列車(chē)了
hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);//可以忽略那個(gè)hook,至今沒(méi)發(fā)現(xiàn)hook中有什么實(shí)際的代碼,方法都只是返回傳入的參數(shù)而已
public <T> OnSubscribe<T> onSubscribeStart(Observable<? extends T> observableInstance, final OnSubscribe<T> onSubscribe) {//hook中僅僅返回了參數(shù)
return onSubscribe;
}
4、而我們?cè)赾all()中操作的subscribe實(shí)際上是裝飾者SafeSubscriber阿浓。原因是傳入的參數(shù)subscriber就是包好的SafeSubscriber。
public class SafeSubscriber<T> extends Subscriber<T> {
private final Subscriber<? super T> actual;
public SafeSubscriber(Subscriber<? super T> actual) {
super(actual);
this.actual = actual;
}
5蹋绽、但其實(shí)我們?cè)赾all()中調(diào)用的SafeSubscriber.onNext()方法會(huì)直接調(diào)用SafeSubscriber內(nèi)部被包起來(lái)的subscriber的onNext()方法
@Override
public void onNext(T args) {
try {
if (!done) {
actual.onNext(args);
}
} catch (Throwable e) {
// we handle here instead of another method so we don't add stacks to the frame
// which can prevent it from being able to handle StackOverflow
Exceptions.throwOrReport(e, this);
}
}
6芭毙、結(jié)果因?yàn)檫@個(gè)被包起來(lái)的subscriber方法是我們寫(xiě)的訂閱者,于是訂閱者的onNext()觸發(fā)了 PS: 所以?xún)H僅是包起來(lái),并沒(méi)有其它操作
7筋蓖、綜上所述 調(diào)用subscribe()之前都是準(zhǔn)備階段,各種包裹,存儲(chǔ)變量。一旦調(diào)用subscribe(),整個(gè)列車(chē)就啟動(dòng)了稿蹲。
最簡(jiǎn)單的異常處理:
1扭勉、并不是全程try包起來(lái)異常處理的。
2苛聘、第一個(gè)異常檢測(cè)是在subscribe()方法開(kāi)始時(shí)判斷訂閱者與被訂閱者是否為null涂炎,拋出“你是不是故意找茬”的異常,這個(gè)檢測(cè)甚至在調(diào)用onStart()之前设哗。
3唱捣、值得一提的是onStart()并沒(méi)有被try包裹起來(lái)。
if (subscriber == null) {
throw new IllegalArgumentException("subscriber can not be null");
}
if (observable.onSubscribe == null) {
throw new IllegalStateException("onSubscribe function can not be null.");
}
subscriber.onStart();
4网梢、有try塊包裹了列車(chē)的啟動(dòng)方法call()震缭。處理的方式是 (1)手動(dòng)檢測(cè)拋出致命錯(cuò)誤(這個(gè)操作挺頻繁) -> (2)傳遞Throwable給subscriber的onError() PS: 在檢測(cè)致命錯(cuò)誤后其實(shí)還會(huì)檢測(cè)是否訂閱了,但因?yàn)橐欢ㄊ?已訂閱),所以沒(méi)區(qū)別(因?yàn)楦緵](méi)初始化“是否訂閱”這個(gè)變量)
try {
hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);
return hook.onSubscribeReturn(subscriber);
} catch (Throwable e) {
Exceptions.throwIfFatal(e);//(1)手動(dòng)檢測(cè)拋出致命錯(cuò)誤
if (subscriber.isUnsubscribed()) {
RxJavaPluginUtils.handleException(hook.onSubscribeError(e));
} else {
try {
subscriber.onError(hook.onSubscribeError(e));//(2)傳遞Throwable給subscriber的onError()
} catch (Throwable e2) {//(3)onError都出錯(cuò)了、拋出 “啊战虏,完蛋啦”
Exceptions.throwIfFatal(e2);
RuntimeException r = new OnErrorFailedException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
hook.onSubscribeError(r);
throw r;
}
}
return Subscriptions.unsubscribed();
}
}
5拣宰、注意這里的onError()方法,它并不是我們寫(xiě)的onError(),而是愛(ài)包裝的SafeSubscriber的onError(),在此方法內(nèi)有一個(gè)唯一的標(biāo)識(shí)用于讓此方法只會(huì)被調(diào)用一次。
SafeSubscriber的onError()只做了一個(gè)“默認(rèn)異常處理(其實(shí)就是什么都不做)”就執(zhí)行了我們寫(xiě)的onError()來(lái)解決異常烦感。不管我們的onError()執(zhí)行成功了,還是拋出異常了,又或是根本沒(méi)寫(xiě)onError()巡社,它都會(huì)unsubscribe()來(lái)取消訂閱。 PS“是否訂閱”變量終于改變了
unsubscribe()還做了另外的操作,但這里沒(méi)有看到手趣。 PS: 這就是SafeSubscriber(安全訂閱者),它代理了對(duì)subscribe的操作,當(dāng)出異常時(shí)執(zhí)行額外的代碼晌该。 這可能是RxJava的秘密
6、如果subscriber.onError()都報(bào)錯(cuò)了绿渣、就只會(huì)檢測(cè)拋出致命錯(cuò)誤后拋出錯(cuò)誤 “你的onError()拋異常啦!朝群,異常為 $%# ” PS: 連這個(gè)異常都包起來(lái)了
PS: 你可能認(rèn)為我漏掉了onCompleted(),但這個(gè)方法無(wú)論是運(yùn)行成功,還是因失敗拋出異常,它都沒(méi)有被調(diào)用。
最簡(jiǎn)單的泛型限定:
1中符、這里被限制的類(lèi)型只有兩處姜胖,(1)、create(OnSubscribe<T>) ; (2)淀散、subscribe(Subscribe<T>)
它限制泛型的秘密在create()方法中,create()內(nèi)創(chuàng)建了Observable對(duì)象,當(dāng)光標(biāo)選中Observable構(gòu)造方法里的泛型T時(shí)谭期,整個(gè)滾動(dòng)條都綠了!
Observable擁有的泛型只有一個(gè)<T>,在構(gòu)造時(shí)實(shí)現(xiàn)了<T>的類(lèi)型,又在subscribe()中限定了<T>,導(dǎo)致subscribe()的參數(shù)泛型也必須一致了吧凉。
PS: subscribe()中機(jī)智的使用了<? super T>,這是為數(shù)不多的泛型父類(lèi)限定,理由也很簡(jiǎn)單(父類(lèi)引用子類(lèi))
<head>添加map運(yùn)算符</head>
瞬間設(shè)計(jì)模式的難度以幾何的倍數(shù)上升隧出,為了清晰直觀的看源碼,我仿寫(xiě)了它的代碼阀捅。
public class MyRxJava {
{//主體調(diào)用部分
MyObservable.create(new MyOnSubscribe() {
@Override
public void call(MySubscriber s) {
s.onNext();
}
}).subscribe(new MySubscriber() {
@Override
void onStart() {
}
@Override
void onNext() {
}
@Override
void onCompleted() {
}
});
}
}
class MyObservable {//RxJava的操作主體,Observable
private final MyOnSubscribe onSubscribe;
public MyObservable(MyOnSubscribe subscribe) {
this.onSubscribe = subscribe;
}
public static MyObservable create(MyOnSubscribe subscribe) {
return new MyObservable(subscribe);
}
public final MySubscriber subscribe(MySubscriber subscriber) {
subscriber.onStart();
this.onSubscribe.call(subscriber);
return subscriber;
}
}
interface MyOnSubscribe {//create時(shí)使用胀瞪,被訂閱者
void call(MySubscriber s);
}
abstract class MySubscriber {//訂閱時(shí)使用,訂閱者
private boolean isUnsubscribed;//是否被取消訂閱(目前沒(méi)用)
abstract void onStart();
abstract void onNext();
abstract void onCompleted();
public void unsubscribe() {
isUnsubscribed = false;
}
public boolean isUnsubscribed() {
return isUnsubscribed;
}
}