帶著疑問分析RxJava1.x原理:
事件流源頭(observable)怎么發(fā)出數(shù)據(jù)
響應(yīng)者(subscriber)怎么收到數(shù)據(jù)
怎么對(duì)事件流進(jìn)行操作(operator/transformer)
整個(gè)過程的調(diào)度(scheduler)
響應(yīng)式編程
響應(yīng)式編程是一種基于異步數(shù)據(jù)流概念的編程模式择膝。響應(yīng)式編程依賴事件,事件可以被等待,可以觸發(fā)過程听想,也可以觸發(fā)其他事件郭脂。
Rx借助可觀測(cè)的序列提供一種簡(jiǎn)單的方式來創(chuàng)建異步的赞枕,基于事件驅(qū)動(dòng)的程序坛悉。
操作符
- 轉(zhuǎn)換類操作符
map
flatMap
concatMap
flatMapIterable
switchMap
scan
groupBy
- 過濾類操作符
filter
take
takeLast
taskUntil
debounce
distinct
distinctUntilchanged
skip
skipLast
如果在最后一個(gè)事件的等待時(shí)間內(nèi)重新發(fā)出了事件蒿柳,則以該事件作為最后一個(gè)事件翔冀。直到最后一個(gè)事件過了等待時(shí)間后才返回吊圾。具體例子可看下幅圖:
- 組合類操作符
merge
zip
join
combineLatest
and/when/then
switch
startSwitch
源碼解析
Observable/Subscriber
Observable和Subject是兩個(gè)“生產(chǎn)”實(shí)體蛙粘,Observer和Subscriber是兩個(gè)“消費(fèi)”實(shí)體莹弊。
Observable.create()方法構(gòu)造了一個(gè)被觀察者Observable對(duì)象柠掂,同時(shí)將new出來的OnSubscribe賦值給了該Observable的成員變量onSubscribe轩猩。
Subscriber 繼承了 Subscription肿轨,用于取消訂閱上炎。
public abstract class Subscriber<T> implements Observer<T>, Subscription
事件傳遞流程
如果傳入的是Action
恃逻,則先封裝成Subscriber
。對(duì)傳入的Subscriber
進(jìn)行包裝藕施,包裝為 SafeSubscriber
,SafeSubscriber
是subscriber
的一個(gè)代理寇损,對(duì)subscriber的一系列方法做了嚴(yán)格的安全校驗(yàn)。保證onCompleted()和onError()只會(huì)有一個(gè)被執(zhí)行且只執(zhí)行一次裳食,一旦它們其中方法被調(diào)用過后onNext()就不再執(zhí)行了矛市。
onStart()
就是在我們調(diào)用 subscribe()
的線程執(zhí)行的。
obsevable.subscribe(observer)
的顯式調(diào)用流程
obsevable.subscribe(observer)
的內(nèi)部調(diào)用流程
操作符流程
Schedulers執(zhí)行線程
線程執(zhí)行的內(nèi)部調(diào)用過程
subscribeOn 影響它上面的調(diào)用執(zhí)行時(shí)所在的線程诲祸。
observeOn 影響它下面的調(diào)用執(zhí)行時(shí)所在的線程浊吏。
subscribeOn與操作符的原理一致而昨,創(chuàng)造一個(gè)新的Observable用于進(jìn)行干預(yù)操作,并通過線程池executor最終實(shí)現(xiàn)了線程切換卿捎。當(dāng)不指定observeOn時(shí)配紫,SubscriberOn()對(duì)上下游的線程都有影響。
observeOn切換線程是通過lift來實(shí)現(xiàn)午阵。Lift的功能是做包裝躺孝,將上游對(duì)下游的on***()事件傳給包裝好的Operator。
Operator繼承了Function底桂,主要是控制上下游事件發(fā)送的速率植袍,最終將上游的事件發(fā)送給內(nèi)部靜態(tài)類ObserveOnSubscriber(繼承了Action)。具體的處理操作會(huì)將封裝好的Action發(fā)送到線程池中籽懦。每個(gè)observeOn都會(huì)對(duì)它所管轄的下游Observalbe生效于个。
通過schedule()將新觀察者ObserveOnSubscriber發(fā)送給subscriberOne的所有事件換到了recursiveScheduler所對(duì)應(yīng)的線程。subscriberOne的onNext()/onCompleted()/onError()方法丟到了recursiveScheduler對(duì)應(yīng)的線程中執(zhí)行暮顺。recursiveScheduler是一個(gè)Worker厅篓,在執(zhí)行schedule()時(shí)創(chuàng)建了一個(gè)Runnable,在run()方法中調(diào)用了observeOnSubscriber.call()捶码。
ScheduledAction是Runnable羽氮,將上游Observable.call()事件和Subscriber.onNext/onError/onCompleted事件都封裝成Action事件放入ScheduledAction這個(gè)實(shí)際的Runnable方法中,并交由Worker的schedule()方法處理惫恼。
由于指定了Thread(io/newThread/mainThread)档押,內(nèi)部會(huì)先將ThreadFactory創(chuàng)建的線程放入只有一個(gè)核心進(jìn)程的ScheduledExecutorService線程池中。在scheduler()方法被調(diào)用時(shí)執(zhí)行該Runnable祈纯。
Scheduler管理Work,Work內(nèi)部通過線程池ScheduledExecutorService執(zhí)行call()方法中封裝的Runnable對(duì)象令宿。
backpressure
backpressure主要通過Producer實(shí)現(xiàn)。原理是讓subscriber向observable主動(dòng)請(qǐng)求數(shù)據(jù)腕窥,通過producer成為observable和subscriber的數(shù)據(jù)通信的協(xié)調(diào)橋梁粒没。
大多數(shù)異步操作符,比如observeOn
會(huì)有一個(gè)限定大小的Buffer油昂,
在內(nèi)部革娄,Observable通過給Subscriber調(diào)用setProducer方法,方便Subscriber之后通過記錄onNext()調(diào)用頻率(即上游下發(fā)事件速率)冕碟,調(diào)用Observable.request(n)方法拦惋,控制上游Observable發(fā)送事件的速率。
hook
在眾多節(jié)點(diǎn)(創(chuàng)建Observable安寺,獲取Scheduler等)時(shí)厕妖,通過hook可進(jìn)行任意想要的操作,記錄挑庶、修飾言秸、甚至拋出異常软能。
通過RxJavaPlugins及RxJavaHook類對(duì)關(guān)心的節(jié)點(diǎn)(hook point)插樁,讓我們可以控制(manipulate)程序在這些節(jié)點(diǎn)的行為举畸。
為什么subscribeOn 只有第一次調(diào)用生效查排?
subscribeOn 的作用域就是調(diào)用前序列中所有的 Todo List 任務(wù)清單(Observable.OnSubscribe),當(dāng)我們執(zhí)行 subscribe() 時(shí)抄沮,這些任務(wù)清單就會(huì)執(zhí)行在 subscribeOn 指定的工作線程跋核,而第二個(gè) subscribeOn 早就沒有任務(wù)可做了,所以無法生效叛买。