##引言 接觸過?EventBus?和?RxJava?的都知道郭脂,可以用?RxJava?來實現(xiàn)?EventBus年碘,網(wǎng)上隨便一搜,就可以拿得到代碼展鸡。但是究竟為什么可以這么做屿衅?卻沒有類似的文章作進(jìn)一步的深度解析。(本文假定讀者都已經(jīng)了解?EventBus?和?RxJava?是什么莹弊,可以做什么涤久。)
public class RxBus { private static volatile RxBus instance; private final SerializedSubject<Object, Object> subject; private RxBus() { subject = new SerializedSubject<>(PublishSubject.create()); } public static RxBus getInstance() { if (instance == null) { synchronized (RxBus.class) { if (instance == null) { instance = new RxBus(); } } } return instance; } public void post(Object object) { subject.onNext(object); } private <T> Observable<T> toObservable(final Class<T> type) { return subject.ofType(type); } public boolean hasObservers() { return subject.hasObservers(); }}
可以看到,代碼非常簡練忍弛,當(dāng)?RxJava?遇上?EventBus?响迂,居然會如此神奇~那么問題來了,為什么這段代碼就可以實現(xiàn)?EventBus?的功能细疚?
要搞明白這幾個問題蔗彤,我們得弄清楚這些東西。
EventBus?是如何實現(xiàn)的疯兼?
RxJava?滿足了實現(xiàn)?EventBus?的哪些條件然遏?
如何用?RxJava?去封裝一個?EventBus??
簡單講镇防,事件總線啦鸣,顧名思義,分為兩個概念来氧,一個事件诫给,即 Event ,一個總線啦扬,即 Bus 中狂,這是在整個 APP 里一種規(guī)范地傳遞事件的方式。作為獨立于項目里各個模塊的 Application 級別的存在扑毡,可以很好地用來程序的解耦胃榕。使用大致有四個步驟:注冊→發(fā)送→接收→取消注冊。具體的源碼分析瞄摊,可以參看 codeKK 上?Trinea?的?EventBus源碼解析?和?kymjs張濤?的?EventBus源碼研讀勋又。
更重要的一點苦掘,RxBus?的重點應(yīng)該在?Bus?上,而不是?RxJava?上楔壤。用?RxJava?去實現(xiàn)?EventBus?的思想鹤啡。因此,應(yīng)該把分析?EventBus?作為一個重點蹲嚣。
我們來看看要實現(xiàn)一個?EventBus?需要滿足什么條件递瑰。
獲取一個?EventBus?實例,可以用單例隙畜,也可以用?Builder抖部;
注冊?EventBus?和取消注冊?EventBus;
發(fā)送和接收事件的方法议惰。
##RxJava?&&?EventBus?要實現(xiàn)?EventBus?需要滿足的條件慎颗,在?RxJava?里是如何體現(xiàn)的呢?
首先我們需要明確的是换淆,EventBus?里都有哪些角色:Event哗总、Subscriber、Publisher倍试,也就是說需要Event讯屈、Observer、Observable县习,Event 自不必說涮母,在?RxJava?里既能充當(dāng)Observer,又能充當(dāng)Observable的對象就是?Subject躁愿,而?Subject?是線程非安全的叛本,我們要構(gòu)造一個線程安全的?Subject?,需要用到它的子類?SerializedSubject彤钟,而實際使用的時候来候,我們的觀察者只需要訂閱發(fā)生之后的,來自?Observable?的數(shù)據(jù)逸雹,因此還需要給?SerializedSubject?傳入?PublishSubject?作為參數(shù)营搅。
獲取?Bus?實例,一個單例即可梆砸,當(dāng)然转质,EventBus?還提供了使用?Builder?創(chuàng)建實例的方法,可根據(jù)具體情況自行實現(xiàn)帖世;
privatestaticvolatileRxBusinstance;privateRxBus() {subject=newSerializedSubject<>(PublishSubject.create());}publicstaticRxBusgetInstance() {if(instance==null) {synchronized(RxBus.class) {if(instance==null) {? ? ? ? ? ? ? ? instance=newRxBus();? ? ? ? ? ? }? ? ? ? }? ? }returninstance;}
注冊和取消注冊?Bus;
EventBus?的注冊過程休蟹,就是對接收某個事件的所有方法進(jìn)行?subscribe()?,在?subscribe()?方法里拿到這些的方法,把這些方法存進(jìn)?subscriberMethods(一個 List 集合)中赂弓,然后把事件的類名作為 key 绑榴,把?subscriberMethods?作為 value ,存進(jìn)?methodCache(一個 HashMap 緩存)里盈魁,這樣就不用每次都去反射了彭沼。這里需要注意一點,EventBus?里用?methodCache?cache 下來的不是?Observer?备埃,也不是?Observable?,而是?Observable.subscribe(Observer)褐奴,即?Subscription?按脚,那么如果用?RxJava?,該怎么去實現(xiàn)這么個功能呢敦冬?在 RxJava 里有這樣一個類?CompositeSubscription?辅搬,對應(yīng)的是一個存儲著?Subscription?的?HashSet,因此我們只需要將接收事件的方法 add 進(jìn)一個?CompositeSubscription?脖旱,在生命周期結(jié)束的時候堪遂,再把?CompositeSubscription?取消訂閱即可。
明確了上面的流程萌庆,對 RxJava 的封裝就好辦了溶褪。我們只需要獲取?Subscription?即可。注意践险,這里跟?EventBus?是有區(qū)別的猿妈,EventBus?的封裝,是通過反射巍虫,獲取所有接收事件的方法彭则,然后注冊,當(dāng)然占遥,現(xiàn)在的?EventBus?版本里這些反射幾乎對性能沒有任何影響了「┒叮現(xiàn)在我們用?RxJava?是不是也要用反射再去獲取所有的?Subscription?呢?當(dāng)然不是瓦胎,EventBus 的機(jī)制其實類似于廣播芬萍,在接收事件的地方是沒有方法調(diào)用的,因此需要反射凛捏。但是?RxBus?則提供了調(diào)用接收事件的方法担忧,因此只需要在?Activity?或?Fragment?里 new 出來?CompositeSubscription?對象,然后在需要接收事件的地方坯癣,用?CompositeSubscription?對象去 add 進(jìn)對應(yīng)的?Subscription?就可以了(這一點在下面發(fā)送接收事件一節(jié)還會提到)瓶盛。
對應(yīng)的取消注冊的過程就簡單多了,在生命周期結(jié)束的地方,對?CompositeSubscription?取消注冊即可惩猫,以避免內(nèi)存泄露芝硬,而?CompositeSubscription?的取消注冊方法是可以自動取消?HashSet?里的所有?Subscription?的,因此無須對每個?Subscription?單獨處理轧房。
發(fā)送和接收事件拌阴。
EventBus?發(fā)送事件,就是 post 方法奶镶,在?EventBus?里有一個內(nèi)部類?PostingThreadState迟赃, 通過?postingState.eventQueue?可以獲取一個 List 集合,只要?eventQueue?不為空厂镇,就不斷地從?eventQueue?里取出事件(當(dāng)然纤壁,伴隨有是否為主線程,是否正在發(fā)送等狀態(tài)的判斷)捺信,然后調(diào)用?postSingleEvent?方法酌媒,最后調(diào)?postToSubscription?把事件發(fā)出去,post 一個迄靠,就從?eventQueue?里 remove 一個秒咨,最終又來到了我們從剛接觸 Android 就讓人很頭痛的?Handler?,這是一個叫?HandlerPoster?的類掌挚。說一千雨席,道一萬,對應(yīng)的?RxJava?處理事件就是調(diào)方法?onNext。這樣代碼就有了。
publicvoidpost(Objectobject) {? ? subject.onNext(object);}
EventBus?接收事件需要通過?onEvent?開頭的方法來遍歷獲取孕讳,第一次遍歷會緩存,僅查找?onEvent?開頭的方法雏亚,同時忽略一些特定 SDK 的方法,可以提高一些效率摩钙。在使用?RxJava?接收事件的時候罢低,根據(jù)傳遞的事件類型(eventType)可以獲取對應(yīng)類型的?Observable<EventType>?,那么問題就來了胖笛,在這里我們是不是要提供一個返回對應(yīng)的?Subscription?的方法呢网持?其實是可以的!但是需要指定?Scheduler?长踊,因為我們知道功舀,接收事件處理事件是有可能在不同的線程里的,如果在這里我們就提供一個返回?Subscription?的方法身弊,那后續(xù)的事件處理是在哪個線程呢辟汰?因此在這里就指定了 UI 線程或者異步線程列敲,后面的具體的事件處理就可能會有問題。當(dāng)然帖汞,我們也只可以在需要接收事件的地方戴而,調(diào)用?toObservable?方法,然后指定線程翩蘸。這也是相對于?Otto?的一個優(yōu)勢所意。
在?RxBus?里:
public<T>ObservabletoObservable(finalClasstype) {returnsubject.ofType(type);}
public<T>SubscriptiontoSubscription(Classtype,Action1action1,Schedulerscheduler) {returnRxBus.getInstance()? ? ? ? ? ? .toObservable(type)? ? ? ? ? ? .observeOn(scheduler)? ? ? ? ? ? .subscribe(action1);}
在?Activity?或?Fragment?里再去獲取?Subscription?。
private<T>SubscriptiontoSubscription(Classtype,Action1action1) {returnRxBus.getInstance()? ? ? ? ? ? .toObservable(type)? ? ? ? ? ? .observeOn(AndroidSchedulers.mainThread())? ? ? ? ? ? .subscribe(action1);}
最后催首,將所有的?Subscription?add 進(jìn)?CompositeSubscription?就好了扶踊。最后,一定不要忘記對?CompositeSubscription?取消注冊郎任。
到這里姻檀,有關(guān)?EventBus?的內(nèi)容,基本是完了涝滴,不過還有一點,EventBus?里是有一個?StickyEvent?的胶台,什么意思呢歼疮,就是說一般流程是,我們先去訂閱事件诈唬,然后被觀察者再去發(fā)布事件韩脏,觀察者去接收事件,但是如果是先發(fā)布了事件铸磅,再去訂閱事件呢赡矢?這時候先于訂閱事件之前發(fā)布的事件就會被丟棄,這時候?StickyEvent?就登場了阅仔。即便是先于訂閱事件之前發(fā)布了事件吹散,它已然可以接收一個最近被發(fā)布的事件,可以理解為它緩存了一個最近發(fā)布的事件八酒,而與訂閱狀態(tài)無關(guān)羞迷。當(dāng)然,只是一個衔瓮!明確了這些浊猾,要用?RxJava?實現(xiàn)就非常簡單了热鞍,在?RxJava?里有一個?BehaviorSubject?完美實現(xiàn)了這個功能衔彻,具體實現(xiàn)跟?PublishSubject?一模一樣。這時候就會有人問了幅疼,如果我們想要接收到所有的(而不是一個)在訂閱事件之前發(fā)布的事件,該怎么辦呢悴晰?很遺憾逐工,EventBus?是無法辦到的泪喊,但是?RxJava?可以!將?BehaviorSubject?換成?RelaySubject?即可哈扮。不過說句題外話蚓再,這樣的功能好雞肋啊摘仅,感覺適用場景少之又少。我想這也是?EventBus?沒有去實現(xiàn)這樣的功能的原因吧六荒。
具體使用可以參照AndroidFluxPractice掏击,Sample 里將 EventBus 替換為了 RxBus 秩铆,完美地實現(xiàn)了一模一樣的效果豺旬。