對于Rxjava想必大家都很熟悉了欢峰,這里不再贅述什么是Rxjava威兜。
今天的主題是:從源碼角度(2.0)分析,Rxjava是如何做到事件分發(fā)的?喂窟?
以下是今天學(xué)習(xí)筆記的目錄:
- 關(guān)鍵類及方法簡要說明
- 分析源碼测暗,查看事件是如何傳遞的
關(guān)鍵類及方法說明
首先我們知道Rxjava是一個擴展的“觀察者”模式,既然是“觀察者”模式磨澡,那么不可避免的會涉及到:被觀察者碗啄,觀察者,訂閱操作稳摄,事件稚字。那么Rxjava中哪些類是這些身份的扮演者呢?先上一下基本類圖厦酬。
類圖:
基本使用code:
結(jié)合圖-1和圖2,我們可以知道"被觀察者"以及"觀察者"是誰了胆描。 那"ObservableOnSubscribe","ObservableEmitter"仗阅,"Disposable"又是什么昌讲? 接下來我們就從"Observer"開始逐一梳理下他們是什么,以及如何工作的霹菊。
Observer:觀察者
從上圖中我們可以知道 “Observer” 是Rxjava 中 觀察者的身份。我們簡單介紹下Observer中各個方法的作用。
onSubscribe(Disposable d):
1:當(dāng) Observable向 Observer發(fā)送事件前旋廷,調(diào)用此方法(具體在哪調(diào)用的鸠按,后續(xù)會分
析到,這里先給出結(jié)論)饶碘。
2:可以調(diào)用 Disposable(為了便于理解目尖,我們可以把它想象成 “消息控制開關(guān)”,因為它就像我們平常接觸的各種開關(guān)一樣,來控制消息是否分發(fā)給 Observer)的dispose()來告知 Observable扎运,當(dāng)前的 Observer是否需要接受事件瑟曲。
onNext(T value):
1:該方法就是 Observable傳遞訂閱事件給Observer的回調(diào)方法,其中value就是 Observer向Observable訂閱的要監(jiān)聽的事件。
2:如方法注釋所說的那樣豪治,該方法可以調(diào)用多次(想想也是,觀察者訂閱的事件當(dāng)然可能發(fā)生多次了)洞拨。
3:當(dāng)調(diào)用onComplete()/onError()之后就算再調(diào)用onNext()事件也不會發(fā)送至 Observer。
onComplete():
1:該方法標(biāo)記之后的任何事件將不會發(fā)送給 Observer
2:當(dāng)調(diào)用了onComplete()之后再調(diào)用onError()事件是不會發(fā)送給 Observer负拟。
onError(Throwable e):
1:該方法通知觀察者出現(xiàn)了錯誤情況烦衣。
2:當(dāng)調(diào)用了onError()之后,再調(diào)用onNext()/onComplete()事件也不會發(fā)送給 Observer。
問題:
1:在實際應(yīng)用時發(fā)現(xiàn),“當(dāng)調(diào)用onComplete()之后再調(diào)用onError() app會拋出異常并crash”掩浙。
2:當(dāng)設(shè)置 observeOn(Scheduler s)之后,會導(dǎo)致 “onError()之前的事件接受不到或者丟失部分事件”花吟。
以上兩個問題會在稍后給出原因。
Observable:被觀察者
首先厨姚,我們知道它是Rxjava中 “被觀察者” 的具體實現(xiàn)衅澈。 其次,我們要創(chuàng)建一個被觀察者可以通過Observable提供的N多靜態(tài)方法"去new一個出來"谬墙。我們拿"create()"舉個例子今布。
create():
create()方法其實做的事情并不多。
- 判斷傳入的ObservableOnSubscribe是否為null芭梯。
- 創(chuàng)建ObservableCreate(該類繼承Observable)并把ObservableOnSubscribe添加到 ObservableCreate险耀,并返回ObservableCreate。
既然創(chuàng)建好了 被觀察者玖喘,那么接下來就需要 "觀察者 訂閱 被觀察者,讓被觀察者時刻保持警惕,當(dāng)有我要的事件發(fā)生時甩牺,記得通知我",那"訂閱"這個動作 Rxjava是如何實現(xiàn)的呢?
subscribe():訂閱操作
我們都知道 標(biāo)準(zhǔn)觀察者模式中,實現(xiàn)"訂閱"操作的話應(yīng)該是 "觀察者.訂閱(被觀察者)"這種寫法才對累奈,就拿給View設(shè)置點擊事件來說應(yīng)該是"View.setOnClickListener()"這種贬派。那為啥Rxjava中要反其道而行要采用"被觀察者.訂閱(觀察者)"這種方式呢? 這是因為Rxjava采用了“流式api”調(diào)用策略,這樣寫可以使代碼更簡潔,有種一氣呵成的感覺澎媒,所以就把 "訂閱"動作放到了Observable中搞乏。廢話不多說,我們來看下subscribe()都做了些什么事情吧戒努。
首先请敦,通過ObjectHelper.requireNonNull()判斷傳入的Observer是否為null镐躲。
其次,調(diào)用RxJavaPlugins.onSubscribe()返回當(dāng)前Observer(具體該方法會在后續(xù)章節(jié)分析)侍筛。
再次萤皂,調(diào)用ObjectHelper.requireNonNull()判斷傳入的Observer是否為null。
最后匣椰,調(diào)用subscribeActual()進行 訂閱操作裆熙。
subscribeActual():
通過該方法的解釋,我們可以知道此方法是 "實際訂閱動作的發(fā)生地"禽笑。
到此“觀察者模式”中的三大主角入录,“觀察者”和“被觀察者”以及“訂閱操作”就簡單介紹完了。那么問題來了佳镜,接下來“事件”需要怎么發(fā)出呢僚稿?好,我們繼續(xù)往下看邀杏。
事件:
ObservableOnSubscribe:
首先從該接口的定義中我們知道贫奠,此接口是 “事件產(chǎn)生和推送的地方”。具體的事件的產(chǎn)生和發(fā)送是在subscribe()中實現(xiàn)的望蜡。我們來舉個例子唤崭,聲情并茂的說明下吧。 我們大都玩過cs,cf之類的射擊游戲,假如我是警脖律,對面人物是匪谢肾,此時我們相遇了(我沒看到他),一根無形的線把我們“栓”在了一起小泉。此時對面哥們一看芦疏,有敵人,二話不說舉槍就開始射擊微姊,不過還好我穿了防彈衣酸茴,再加上反映比較快馬上就找好了掩體進行了反擊。
對于這樣一個場景來說兢交,假如我是Observer薪捍,那么對面那哥們就是Observable,那根“無形的線”就可以理解為subscribe動作配喳,槍對于我們來說就是“事件的產(chǎn)生者或者容器”那么“子彈”可以理解為具體要發(fā)送的“事件”了酪穿。
這里的“槍”指的就是“ObservableOnSubscribe”。
ObservableEmitter:
繼續(xù)沿用上邊的例子晴裹,我們說到了可以用“槍”來比喻成“ObservableOnSubscribe”來理解它的定義被济。那么,槍要想發(fā)射子彈的話涧团,人得扣動扳機才行只磷,對吧经磅,那我們就可以把“ObservableEmitter”理解成“槍的扳機”,用于控制事件的“發(fā)射”钮追。
看一下類圖吧馋贤。
通過類圖我們不難發(fā)現(xiàn),ObservableEmitter實現(xiàn)了Emitter接口畏陕,Emitter接口中定義了onNext(),onError(),onComplete()等三個方法,是不是感覺在哪見過仿滔?沒錯惠毁,這三個方法正好對應(yīng)的是Observer中的onNext(),onError(),onComplete()。通過Emitter調(diào)用這三個方法崎页,則會分別回調(diào)Observer對應(yīng)的方法(具體實現(xiàn)我們在稍后會給出)鞠绰。那么調(diào)用Emitter的三個方法后Observer會收到消息,事件不就是從 被觀察者向觀察者傳遞了嗎飒焦?
Disposable
最后再來說一說蜈膨,Disposable,老規(guī)矩牺荠,上圖翁巍。
我們可以把"Disposable"理解為“消息控制開關(guān)”,就像電燈的開關(guān)一樣休雌,它控制了是否消息可以送達至Observer處灶壶。
而且細心的同學(xué)觀察 圖-2以及Observer接口的定義代碼可以發(fā)現(xiàn),Observer的onSubscribe(Disposable d)把該開關(guān)返回給了 Observer杈曲,為什么要這么設(shè)計呢驰凛?
原因應(yīng)該有2個:
1:當(dāng)消息發(fā)送前,首先回調(diào)Observer的onSubscribe()告知觀察者我要開始給你發(fā)消息了担扑,你先做做準(zhǔn)備,這樣的 話我們可以在第一條消息未發(fā)送之前在該方法中做一下準(zhǔn)備工作之類的恰响。
2:再調(diào)該方法時,如果觀察者不想被觀察者發(fā)送事件涌献,可能我還沒做好準(zhǔn)備胚宦,或者我改變注意我不想接收 被觀察的發(fā)送的事件了,可以調(diào)用Disposable 的dispose()洁奈,這樣當(dāng) 被觀察發(fā)送事件的時候间唉,就會判斷,觀察者是不是需要我的事件利术,如果不需要我就不發(fā)了(實際代碼內(nèi)部也是這樣處理的呈野,看過源碼的東西大概都清楚這些事,這里就稍微介紹一下)印叁。
簡單介紹就到這里了被冒,接下來我們就從源碼角度來看看军掂,事件是怎樣從 Disposable 傳遞到 Observer的。
源碼分析
看過Rxjava源碼的同學(xué)都知道昨悼,Rxjava的代碼量還是挺多的蝗锥,我們不可能事無巨細。那么如何閱讀其中的代碼比較好呢?我覺得應(yīng)該遵從這樣一個原則:掌握大方向,梳理脈絡(luò)率触。細化小方面终议,深入理解。
首先葱蝗,我們需要了解這個功能穴张,是怎樣是實現(xiàn)的,用到了哪些類两曼,接口等皂甘,最好列出來,畫一畫類圖悼凑,流程圖偿枕,要做到心中有數(shù),不要過分追究細節(jié)户辫,只要知道這個功能的每一步對應(yīng)的是哪些類就行了渐夸。
最后,當(dāng)上步我們完成之后渔欢,就可以對存在疑慮的地方做進一步研究捺萌,比如某個知識點可能不大清楚,這時就需要花事件和心思搞明白了膘茎。如此這般流程走下來桃纯,我相信閱讀源碼并不只是枯燥乏味的事情,這其中定會充斥著很多歡樂披坏。廢話不多說了态坦,接下來我們就開始閱讀源碼吧。
大方向:
Observable創(chuàng)建:
在“關(guān)鍵類及方法說明”我們從create()方法入手簡要分析了下Observable是如何創(chuàng)建的棒拂。通過代碼我們知道伞梯,create()方法最終創(chuàng)建了一個名為“ObservableCreate”的Observable,并把“ObservableOnSubscribe”存儲到ObservableCreate中帚屉。
Observer創(chuàng)建:
Observer的創(chuàng)建也可以參考“關(guān)鍵類及方法說明”中關(guān)于Observer的介紹谜诫,這里不再說了。
接下來我們主要看一下攻旦,執(zhí)行“訂閱”操作之后事件是如何傳遞的喻旷。
事件的傳遞:
通過“關(guān)鍵類及方法說明”中關(guān)于subscribe()發(fā)生后的介紹,我們知道“實際訂閱”操作都是發(fā)生在subscribeActual()該方法中的牢屋,又因為該方法是抽象方法且预,所以我們直接進入“ObservableCreate”類查看其對subscribeActual()方法的實現(xiàn)吧槽袄。
這是ObservableCreate中subscribeActual()方法的具體實現(xiàn)。接下來我們具體分析下锋谐。
1:創(chuàng)建CreateEmitter
在“關(guān)鍵類及方法說明”中我們介紹了什么是“ObservableEmitter”遍尺,這里不再贅述。閱讀源碼我們發(fā)現(xiàn),通過調(diào)用CreateEmitter的構(gòu)造方法涮拗,把“Observer”對象保存至到了CreateEmitter中乾戏。 問題:為什么此Emitter中要保留一份外部Observer的引用呢? 稍后我們給出原因三热。
2:回調(diào)Observer的onSubscribe(Disposable d)
因為CreateEmitter也實現(xiàn)了Disposable,所以就可以把CreateEmitter回調(diào)給 Observer了歧蕉。此時觀察者就可以在onSubscribe()中做一些事件發(fā)送前的準(zhǔn)備工作什么的。
3:發(fā)送事件
首先康铭,回調(diào)ObservableOnSubscribe的subscribe()并把新創(chuàng)建的CreateEmitter返回去。此時我們就可以用該Emitter發(fā)送事件了(這就是Observable產(chǎn)生以及發(fā)送事件的地方)赌髓。例如:
3.1:OnNext()
首先从藤,判斷傳遞的“事件”是否為null。
其次锁蠕,再調(diào)用“isDisposed()”判斷“消息開關(guān)”是否已經(jīng)關(guān)閉了(這個稍后分析)夷野。
最后,如果沒關(guān)閉荣倾,則回調(diào)Observer的onNext()方法回調(diào)事件悯搔。還記在分析ObservableEmitter時引入的那個問題嗎?在此,就知道原因了吧舌仍。如果不給我一個觀察者的引用妒貌,我把事件回調(diào)給誰呢,是吧铸豁。
3.2:onComplete()
首先灌曙,調(diào)用“isDisposed()”判斷“消息開關(guān)”是否已經(jīng)關(guān)閉了,如果沒關(guān)閉和繼續(xù)节芥。
其次在刺,調(diào)用Observer的onComplete()回調(diào)Complete事件。
最后头镊,調(diào)用dispose()關(guān)閉“消息控制開關(guān)”蚣驼。
3.3:onError()
首先,判斷傳入的Throwable是否為null相艇。
其次颖杏,調(diào)用“isDisposed()”判斷“消息開關(guān)”是否已經(jīng)關(guān)閉了,如果沒關(guān)閉和繼續(xù)坛芽。如果已經(jīng)關(guān)閉了输玷,則調(diào)用RxJavaPlugins.onError(t)队丝,該方法稍后再解釋官卡。
最后释树,調(diào)用Observer的onError()回調(diào)error事件挂洛。
小方面
自此我們結(jié)合源碼宦言,大體捋清楚了,Rxjava中事件傳遞的一個過程悍汛,這就是前邊提到的“掌握大方向,梳理脈絡(luò)”现喳。接下來我們對前面遺留的諸多問題進行一一深入理解鞋拟,這部分也就是“細化小方面盘寡,深入理解”尤误。
首先我們在創(chuàng)建CreateEmitter的時候侠畔,發(fā)現(xiàn),它既實現(xiàn)了Emitter接口损晤,又實現(xiàn)了Disposable接口软棺,所以說它既是“消息發(fā)射器”又是“消息控制開關(guān)”。而且我們在分析onNext(),onComplete(),onError()方法時都發(fā)現(xiàn)尤勋,這些方法內(nèi)部都先調(diào)用了"isDisposed()"判斷“消息開關(guān)”是否關(guān)閉了喘落,接下來我們就從“消息發(fā)射器”角度來捋一捋這個方法是怎么實現(xiàn)的。
isDisposed()
我們知道CreateEmitter是繼承自AtomicReference(這是專門采用原子操作最冰,進行更新操作對象的一個原子類)瘦棋。 首先,通過get()獲取AtomicReference中的value的值暖哨,默認(rèn)值為null赌朋。 其次,調(diào)用DisposableHelper的isDisposed()把get()獲取的值傳入篇裁,并與DisposableHelper.DISPOSED進行比較沛慢,判斷不是DisposableHelper.DISPOSED則返回false,如果是的話則返回true。
setDisposable()
做為“消息發(fā)射器”CreateEmitter還必須實現(xiàn)此方法达布,我們看看一下這個方法都干了些什么吧颠焦。
1:把CreateEmitter的當(dāng)前實例和Disposable傳入RxJavaPlugins的set()方法中。
2:CreateEmitter做為AtomicReference往枣,獲取當(dāng)前CreateEmitter的value并賦值給“current”伐庭。
2.1:如果 “current ==DISPOSED”,且傳入的Disposed不為null,則調(diào)用傳入的Disposable的dispose()并跳出當(dāng)前方法分冈。
2.2:如果 “current 圾另!=DISPOSED”,執(zhí)行AtomicReference的compareAndSet() 給 CreateEmitter的value 設(shè)置為傳入的Disposable雕沉。如果current不為null的話集乔,執(zhí)行dispose()方法。
至此我們發(fā)現(xiàn)setDisposable()就是把傳入的“Disposable”保存起來,等到調(diào)用“isDisposed等方法”來判斷“消息開關(guān)是否關(guān)閉了”扰路。 接下來我們做個“猜想”: **如果在消息發(fā)送前設(shè)置Disposable為DisposableHelper.DISPOSED的話尤溜,消息是會繼續(xù)傳遞的,如果設(shè)置的是自定義的Disposable的話汗唱,消息則不會被傳遞宫莱。 ** 下面我們就實際測試下,這個“猜想”是否成立吧哩罪。
case1:在ObservableOnSubscribe的subscribe()中設(shè)置Disposable為DisposableHelper的唯一實例授霸。
log信息為下圖。
case2:在ObservableOnSubscribe的subscribe()中設(shè)置Disposable為自定義的Dispaseable际插。
根據(jù)以上測試結(jié)果確實和我們的“猜想”一致為什么呢碘耳?此時我們回過頭來看看上邊關(guān)于** isDisposed() 以及 onComplete()的分析,發(fā)現(xiàn) 當(dāng)繼續(xù)執(zhí)行CreateEmitter的onComplete時框弛,此時的Dispaseable如果為DisposableHelper的DISPOSED實例辛辨,isDisposed() 就會返回true,所以 后續(xù)的Complate會回傳給Observer了瑟枫。如果為false斗搞,當(dāng)然就不會走了唄。**
CreateEmitter做為“消息發(fā)射器”的角色的責(zé)任分析完畢后力奋,接下來我們分析下其做為“消息開關(guān)”又能干些什么吧。
當(dāng)CreateEmitter做為“消息開關(guān)”時幽七,它自身有兩個方法需要實現(xiàn)景殷,它們分別是:isDisposed()和dispose()。isDisposed()以及分析過了澡屡,接下來只需要分析下dispose()就行了猿挚。
dispose():
通過閱讀源碼我們知道,當(dāng)CreateEmitter中的value對應(yīng)的不是DisposableHelper的DISPOSED實例的話驶鹉,就會把DisposableHelper的DISPOSED保存至CreateEmitter中绩蜻。當(dāng)通過Emitter發(fā)送事件時,就會先調(diào)用isDisposed()來判斷“消息開關(guān)”是否關(guān)閉了室埋,如果關(guān)閉了办绝,則中斷事件的傳遞。
到此RxJava的關(guān)于事件分發(fā)這塊到底是如何做的已經(jīng)分析完了姚淆,如果大家看完感覺有幫助的話歡迎點擊收藏和喜歡孕蝉。如果有錯誤之處,還望各位指出腌逢,我會盡快改正的降淮。后續(xù)我會繼續(xù)分享關(guān)于,線程調(diào)度搏讶,常用操作符以及2.0關(guān)于背壓這3個方面的介紹佳鳖,謝謝大家霍殴。