初學(xué)RxJava屿愚,對(duì)其許多的API頗感神奇,所以RxJava的原理充滿了興趣穷遂。正好最近教父大頭鬼也出了一篇RxJava解析的文章娱据,本人也結(jié)合源碼給出自己的理解。
這里主要先就一點(diǎn)來(lái)講解忌穿。問(wèn)題如下:
訂閱跟被訂閱的關(guān)系结啼?是如何實(shí)現(xiàn)這一機(jī)制的?
- 首先郊愧,理解OnSubscribe的概念
/**
* Invoked when Observable.subscribe is called.
*/
public interface OnSubscribe<T> extends Action1<Subscriber<? super T>> {
// cover for generics insanity
}
OnSubscribe繼承自Action1,來(lái)看看Action1是干什么的
/**
* A one-argument action.
*/
public interface Action1<T> extends Action {
void call(T t);
}
Action1僅僅是一個(gè)參數(shù)的泛型接口眠寿,提供了使用泛型的類(lèi)型來(lái)作為參數(shù)焦蘑,這樣就可以調(diào)用這個(gè)指定的泛型類(lèi)型。
在此坟乾,我把這個(gè)onSubscribe理解為訂閱的行為蝶防。這個(gè)行為是指,我在創(chuàng)建一個(gè)被觀察者的時(shí)候间学,就是要指定這個(gè)被觀察者所要發(fā)布的行為印荔;在觀察者的角度來(lái)理解详羡,就表示觀察者訂閱了這些行為。
- 理解了這個(gè)OnSubscribe之后水泉,看看Observable的創(chuàng)建窒盐。這里主要看Observable的構(gòu)造方法:
/**
* Creates an Observable with a Function to execute when it is subscribed to.
* <p>
* <em>Note:</em> Use {@link #create(OnSubscribe)} to create an Observable, instead of this constructor,
* unless you specifically have a need for inheritance.
*
* @param f
* {@link OnSubscribe} to be executed when {@link #subscribe(Subscriber)} is called
*/
protected Observable(OnSubscribe<T> f) {
this.onSubscribe = f;
}
額,很簡(jiǎn)單的嘛炕横。這里主要就是需要把OnSubscribe的屬性保存下來(lái)葡粒,(即咱們所訂閱的行為)。這里咱們著重點(diǎn)放在這段代碼的類(lèi)注釋上面:創(chuàng)建一個(gè)帶有執(zhí)行操作的被觀察者卿嘲,當(dāng)它被訂閱時(shí)夫壁,執(zhí)行它的操作(即咱們提到的訂閱行為),翻譯的不好掌唾,歡迎拍磚啊忿磅。另外,就是這里的推薦的內(nèi)容了撩扒,官方希望我們盡量通過(guò)create來(lái)創(chuàng)建Obserable吨些,而不是使用繼承。
- 訂閱者/觀察者的使用泉手。咱們知曉了被觀察者的創(chuàng)建偶器,接下來(lái)就是觀察者缝裤。它的實(shí)現(xiàn)就很簡(jiǎn)單了颊郎。主要是由一些的主要的周期函數(shù)構(gòu)成。在此姆吭,就略過(guò)了。
- 訂閱操作的實(shí)現(xiàn)检眯。這個(gè)是咱們關(guān)注的重頭戲答倡。先看代碼:
private static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
// validate and proceed
if (subscriber == null) {
throw new IllegalArgumentException("observer can not be null");
}
if (observable.onSubscribe == null) {
throw new IllegalStateException("onSubscribe function can not be null.");
/*
* the subscribe function can also be overridden but generally that's not the appropriate approach
* so I won't mention that in the exception
*/
}
// new Subscriber so onStart it
subscriber.onStart();
/*
* See https://github.com/ReactiveX/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls
* to user code from within an Observer"
*/
// if not already wrapped
if (!(subscriber instanceof SafeSubscriber)) {
// assign to `observer` so we return the protected version
subscriber = new SafeSubscriber<T>(subscriber);
}
// The code below is exactly the same an unsafeSubscribe but not used because it would
// add a significant depth to already huge call stacks.
try {
// allow the hook to intercept and/or decorate
hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);
return hook.onSubscribeReturn(subscriber);
} catch (Throwable e) {
// special handling for certain Throwable/Error/Exception types
Exceptions.throwIfFatal(e);
// if an unhandled error occurs executing the onSubscribe we will propagate it
try {
subscriber.onError(hook.onSubscribeError(e));
} catch (Throwable e2) {
Exceptions.throwIfFatal(e2);
// if this happens it means the onError itself failed (perhaps an invalid function implementation)
// so we are unable to propagate the error correctly and will just throw
RuntimeException r = new RuntimeException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
// TODO could the hook be the cause of the error in the on error handling.
hook.onSubscribeError(r);
// TODO why aren't we throwing the hook's return value.
throw r;
}
return Subscriptions.unsubscribed();
}
}
代碼略多瘪撇,能一行行讀下來(lái),還是需要相當(dāng)耐心的恕曲。這里渤涌,我就從代碼中獲取到的知識(shí)點(diǎn)做些說(shuō)明:1)首先也是最重要的,當(dāng)執(zhí)行了subscribe的方法茸俭,就開(kāi)始執(zhí)行Subscriber的訂閱操作安皱。2)Subscriber的onStart方法,是第一個(gè)被調(diào)用的酌伊。3)在代碼中,hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber)虹脯,這句代碼主要是獲取到observable的onSubscribe屬性奏候,然后調(diào)用它的call方法,而且參數(shù)還是subscriber鼻由,很熟悉了,有木有蔼紧,這就是之前所說(shuō)的訂閱的行為。3)在執(zhí)行觀察者訂閱的行為的時(shí)候彬犯,可能會(huì)出現(xiàn)錯(cuò)誤查吊,通過(guò)捕捉異常,來(lái)調(diào)用subscriber的onError方法逻卖。
總結(jié):這里一個(gè)簡(jiǎn)單的完整的訂閱與被訂閱的流程就結(jié)束了,對(duì)其原理做以概括:創(chuàng)建一個(gè)被觀察者炼杖,需要給被觀察者指定其所發(fā)布的行為(onSubscribe來(lái)實(shí)現(xiàn))盗迟;指定觀察者時(shí),只需指定相應(yīng)的觀察回調(diào)即可艇纺;在完成訂閱的操作時(shí)邮弹,是先調(diào)用subscriber的onStart方法,之后通過(guò)訂閱行為onSubscribe來(lái)調(diào)用subscriber完成相應(yīng)的訂閱操作腌乡,最后若出現(xiàn)異常則會(huì)回調(diào)subscriber的onError方法导饲。
問(wèn)題:因?yàn)橹翱催^(guò)一些RxJava的介紹文章氯材,提到一點(diǎn)Subscriber的onComplete方法和onError方法是只會(huì)執(zhí)行一個(gè),記不太確切了氢哮,是不是這樣。
但是從源碼中可以看出听盖,若是執(zhí)行到onComplete的方法時(shí)候,若在其中拋出了一場(chǎng)皆看,那之后Observable的subscribe方法會(huì)捕捉異常,又會(huì)調(diào)用到onError方法无埃,所以這樣看的話毛雇,onComplete和onError是有機(jī)會(huì)都執(zhí)行到的。
看了這個(gè)簡(jiǎn)單的訂閱织阅,產(chǎn)生了一些疑問(wèn)震捣,也就是接下來(lái)要去研究的問(wèn)題,也是下篇要解決的問(wèn)題伍派,列舉如下:
- TODO:
- onError和onComplete的執(zhí)行唯一性
2)多個(gè)觀察者是如何處理訂閱的?