RxJava1.x源碼解析

帶著疑問分析RxJava1.x原理:
事件流源頭(observable)怎么發(fā)出數(shù)據(jù)
響應(yīng)者(subscriber)怎么收到數(shù)據(jù)
怎么對(duì)事件流進(jìn)行操作(operator/transformer)
整個(gè)過程的調(diào)度(scheduler)

響應(yīng)式編程

響應(yīng)式編程是一種基于異步數(shù)據(jù)流概念的編程模式择膝。響應(yīng)式編程依賴事件,事件可以被等待,可以觸發(fā)過程听想,也可以觸發(fā)其他事件郭脂。
Rx借助可觀測(cè)的序列提供一種簡(jiǎn)單的方式來創(chuàng)建異步的赞枕,基于事件驅(qū)動(dòng)的程序坛悉。

操作符

  • 轉(zhuǎn)換類操作符
    map flatMap concatMap flatMapIterable switchMap scan groupBy
flatMap
concatMap
  • 過濾類操作符
    filter take takeLast taskUntil debounce distinct distinctUntilchanged skip skipLast
debounce

如果在最后一個(gè)事件的等待時(shí)間內(nèi)重新發(fā)出了事件蒿柳,則以該事件作為最后一個(gè)事件翔冀。直到最后一個(gè)事件過了等待時(shí)間后才返回吊圾。具體例子可看下幅圖:


debounce
distinct
  • 組合類操作符
    merge zip join combineLatest and/when/then switch startSwitch
merge,無序
concat,有序
zip

源碼解析

Observable/Subscriber

Observable和Subject是兩個(gè)“生產(chǎn)”實(shí)體蛙粘,Observer和Subscriber是兩個(gè)“消費(fèi)”實(shí)體莹弊。

Observable.create()方法構(gòu)造了一個(gè)被觀察者Observable對(duì)象柠掂,同時(shí)將new出來的OnSubscribe賦值給了該Observable的成員變量onSubscribe轩猩。

Subscriber 繼承了 Subscription肿轨,用于取消訂閱上炎。

public abstract class Subscriber<T> implements Observer<T>, Subscription

事件傳遞流程

如果傳入的是Action恃逻,則先封裝成Subscriber。對(duì)傳入的Subscriber進(jìn)行包裝藕施,包裝為 SafeSubscriber,SafeSubscribersubscriber的一個(gè)代理寇损,對(duì)subscriber的一系列方法做了嚴(yán)格的安全校驗(yàn)。保證onCompleted()和onError()只會(huì)有一個(gè)被執(zhí)行且只執(zhí)行一次裳食,一旦它們其中方法被調(diào)用過后onNext()就不再執(zhí)行了矛市。

onStart() 就是在我們調(diào)用 subscribe() 的線程執(zhí)行的。

obsevable.subscribe(observer)的顯式調(diào)用流程

顯式調(diào)用流程

obsevable.subscribe(observer)的內(nèi)部調(diào)用流程

subscribe內(nèi)部調(diào)用流程

操作符流程

map操作符流程

Schedulers執(zhí)行線程

執(zhí)行線程

線程執(zhí)行的內(nèi)部調(diào)用過程

subscribeOn 影響它上面的調(diào)用執(zhí)行時(shí)所在的線程诲祸。

observeOn 影響它下面的調(diào)用執(zhí)行時(shí)所在的線程浊吏。

subscribeOn與操作符的原理一致而昨,創(chuàng)造一個(gè)新的Observable用于進(jìn)行干預(yù)操作,并通過線程池executor最終實(shí)現(xiàn)了線程切換卿捎。當(dāng)不指定observeOn時(shí)配紫,SubscriberOn()對(duì)上下游的線程都有影響。

observeOn切換線程是通過lift來實(shí)現(xiàn)午阵。Lift的功能是做包裝躺孝,將上游對(duì)下游的on***()事件傳給包裝好的Operator。
Operator繼承了Function底桂,主要是控制上下游事件發(fā)送的速率植袍,最終將上游的事件發(fā)送給內(nèi)部靜態(tài)類ObserveOnSubscriber(繼承了Action)。具體的處理操作會(huì)將封裝好的Action發(fā)送到線程池中籽懦。每個(gè)observeOn都會(huì)對(duì)它所管轄的下游Observalbe生效于个。

通過schedule()將新觀察者ObserveOnSubscriber發(fā)送給subscriberOne的所有事件換到了recursiveScheduler所對(duì)應(yīng)的線程。subscriberOne的onNext()/onCompleted()/onError()方法丟到了recursiveScheduler對(duì)應(yīng)的線程中執(zhí)行暮顺。recursiveScheduler是一個(gè)Worker厅篓,在執(zhí)行schedule()時(shí)創(chuàng)建了一個(gè)Runnable,在run()方法中調(diào)用了observeOnSubscriber.call()捶码。

整個(gè)流程傳遞

ScheduledAction是Runnable羽氮,將上游Observable.call()事件和Subscriber.onNext/onError/onCompleted事件都封裝成Action事件放入ScheduledAction這個(gè)實(shí)際的Runnable方法中,并交由Worker的schedule()方法處理惫恼。

由于指定了Thread(io/newThread/mainThread)档押,內(nèi)部會(huì)先將ThreadFactory創(chuàng)建的線程放入只有一個(gè)核心進(jìn)程的ScheduledExecutorService線程池中。在scheduler()方法被調(diào)用時(shí)執(zhí)行該Runnable祈纯。

Scheduler管理Work,Work內(nèi)部通過線程池ScheduledExecutorService執(zhí)行call()方法中封裝的Runnable對(duì)象令宿。

backpressure

backpressure主要通過Producer實(shí)現(xiàn)。原理是讓subscriber向observable主動(dòng)請(qǐng)求數(shù)據(jù)腕窥,通過producer成為observable和subscriber的數(shù)據(jù)通信的協(xié)調(diào)橋梁粒没。

大多數(shù)異步操作符,比如observeOn會(huì)有一個(gè)限定大小的Buffer油昂,

在內(nèi)部革娄,Observable通過給Subscriber調(diào)用setProducer方法,方便Subscriber之后通過記錄onNext()調(diào)用頻率(即上游下發(fā)事件速率)冕碟,調(diào)用Observable.request(n)方法拦惋,控制上游Observable發(fā)送事件的速率。

hook

在眾多節(jié)點(diǎn)(創(chuàng)建Observable安寺,獲取Scheduler等)時(shí)厕妖,通過hook可進(jìn)行任意想要的操作,記錄挑庶、修飾言秸、甚至拋出異常软能。
通過RxJavaPlugins及RxJavaHook類對(duì)關(guān)心的節(jié)點(diǎn)(hook point)插樁,讓我們可以控制(manipulate)程序在這些節(jié)點(diǎn)的行為举畸。

為什么subscribeOn 只有第一次調(diào)用生效查排?

subscribeOn 的作用域就是調(diào)用前序列中所有的 Todo List 任務(wù)清單(Observable.OnSubscribe),當(dāng)我們執(zhí)行 subscribe() 時(shí)抄沮,這些任務(wù)清單就會(huì)執(zhí)行在 subscribeOn 指定的工作線程跋核,而第二個(gè) subscribeOn 早就沒有任務(wù)可做了,所以無法生效叛买。

參考

RxJava系列6-張磊
拆輪子系列:拆 RxJava
RxJava 線程切換源碼的一些體會(huì)和思考

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末砂代,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子率挣,更是在濱河造成了極大的恐慌刻伊,老刑警劉巖,帶你破解...
    沈念sama閱讀 222,252評(píng)論 6 516
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件椒功,死亡現(xiàn)場(chǎng)離奇詭異捶箱,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)动漾,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,886評(píng)論 3 399
  • 文/潘曉璐 我一進(jìn)店門讼呢,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人谦炬,你說我怎么就攤上這事〗诼伲” “怎么了键思?”我有些...
    開封第一講書人閱讀 168,814評(píng)論 0 361
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)甫贯。 經(jīng)常有香客問我吼鳞,道長(zhǎng),這世上最難降的妖魔是什么叫搁? 我笑而不...
    開封第一講書人閱讀 59,869評(píng)論 1 299
  • 正文 為了忘掉前任赔桌,我火速辦了婚禮,結(jié)果婚禮上渴逻,老公的妹妹穿的比我還像新娘疾党。我一直安慰自己,他們只是感情好惨奕,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,888評(píng)論 6 398
  • 文/花漫 我一把揭開白布雪位。 她就那樣靜靜地躺著,像睡著了一般梨撞。 火紅的嫁衣襯著肌膚如雪雹洗。 梳的紋絲不亂的頭發(fā)上香罐,一...
    開封第一講書人閱讀 52,475評(píng)論 1 312
  • 那天,我揣著相機(jī)與錄音时肿,去河邊找鬼庇茫。 笑死,一個(gè)胖子當(dāng)著我的面吹牛螃成,可吹牛的內(nèi)容都是我干的旦签。 我是一名探鬼主播,決...
    沈念sama閱讀 41,010評(píng)論 3 422
  • 文/蒼蘭香墨 我猛地睜開眼锈颗,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼顷霹!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起击吱,我...
    開封第一講書人閱讀 39,924評(píng)論 0 277
  • 序言:老撾萬榮一對(duì)情侶失蹤淋淀,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后覆醇,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體朵纷,經(jīng)...
    沈念sama閱讀 46,469評(píng)論 1 319
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,552評(píng)論 3 342
  • 正文 我和宋清朗相戀三年永脓,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了袍辞。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,680評(píng)論 1 353
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡常摧,死狀恐怖搅吁,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情落午,我是刑警寧澤谎懦,帶...
    沈念sama閱讀 36,362評(píng)論 5 351
  • 正文 年R本政府宣布,位于F島的核電站溃斋,受9級(jí)特大地震影響界拦,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜梗劫,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 42,037評(píng)論 3 335
  • 文/蒙蒙 一享甸、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧梳侨,春花似錦蛉威、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,519評(píng)論 0 25
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至,卻和暖如春齐帚,著一層夾襖步出監(jiān)牢的瞬間妒牙,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,621評(píng)論 1 274
  • 我被黑心中介騙來泰國(guó)打工对妄, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留湘今,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 49,099評(píng)論 3 378
  • 正文 我出身青樓剪菱,卻偏偏與公主長(zhǎng)得像摩瞎,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子孝常,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,691評(píng)論 2 361

推薦閱讀更多精彩內(nèi)容