Android 用RxJava模擬一個(gè)EventBus ———RxBus

** 本篇文章已授權(quán)微信公眾號(hào) guolin_blog (郭霖)獨(dú)家發(fā)布*

RxBus的核心功能是基于Rxjava的侠讯,既然是模擬EventBus白修,我們需要搞清楚RxJava滿足實(shí)現(xiàn)EventBus的那些條件,這樣才能更好的實(shí)現(xiàn)RxBus录平。

EventBus是Android上的一個(gè)事件發(fā)布/訂閱的事件總線框架,可以充分的解耦,簡化了四大組件愕乎、UI線程與子線程的間的事件傳遞等等。它基本工作流程如下:

  • 1壁公、訂閱:EventBus.getDefault().register(this);
  • 2感论、發(fā)送事件:EventBus.getDefault().post(event);
  • 3、接受紊册、處理事件:onEventXXX(Object event);
  • 2笛粘、取消訂閱:EventBus.getDefault().unregister(this);

根據(jù)EventBus的工作流程,我們的RxBus首先需要自身的實(shí)例湿硝,這一點(diǎn)我們可以仿照EventBus的getDefault()方法薪前,通過一個(gè)單例來實(shí)現(xiàn)。有了RxBus實(shí)例就可以進(jìn)行訂閱了关斜,在RxJava中有個(gè)Subject類示括,它繼承Observable類,同時(shí)實(shí)現(xiàn)了Observer接口痢畜,因此Subject可以同時(shí)擔(dān)當(dāng)訂閱者和被訂閱者的角色垛膝,這里我們使用Subject的子類PublishSubject來創(chuàng)建一個(gè)Subject對(duì)象(PublishSubject只有被訂閱后才會(huì)把接收到的事件立刻發(fā)送給訂閱者)鳍侣,在需要接收事件的地方,訂閱該Subject對(duì)象吼拥,之后如果Subject對(duì)象接收到事件倚聚,則會(huì)發(fā)射給該訂閱者,此時(shí)Subject對(duì)象充當(dāng)被訂閱者的角色凿可。完成了訂閱惑折,在需要發(fā)送事件的地方將事件發(fā)送給之前被訂閱的Subject對(duì)象,則此時(shí)Subject對(duì)象做為訂閱者接收事件枯跑,然后會(huì)立刻將事件轉(zhuǎn)發(fā)給訂閱該Subject對(duì)象的訂閱者惨驶,以便訂閱者處理相應(yīng)事件,到這里就完成了事件的發(fā)送與處理敛助。最后就是取消訂閱的操作了粗卜,Rxjava中,訂閱操作會(huì)返回一個(gè)Subscription對(duì)象纳击,以便在合適的時(shí)機(jī)取消訂閱续扔,防止內(nèi)存泄漏,如果一個(gè)類產(chǎn)生多個(gè)Subscription對(duì)象焕数,我們可以用一個(gè)CompositeSubscription存儲(chǔ)起來纱昧,以進(jìn)行批量的取消訂閱。

到這里我們已經(jīng)結(jié)合EventBus對(duì)RxBus的可行性以及大概的實(shí)現(xiàn)流程進(jìn)行了分析百匆,接下來結(jié)合實(shí)現(xiàn)代碼再做進(jìn)一步的解釋:

public class RxBus {
    private static volatile RxBus mInstance;
    private SerializedSubject<Object, Object> mSubject;
    private HashMap<String, CompositeSubscription> mSubscriptionMap;

    private RxBus() {
        mSubject = new SerializedSubject<>(PublishSubject.create());
    }

    public static RxBus getInstance() {
        if (mInstance == null) {
            synchronized (RxBus.class) {
                if (mInstance == null) {
                    mInstance = new RxBus();
                }
            }
        }
        return mInstance;
    }

    /**
     * 發(fā)送事件
     *
     * @param o
     */
    public void post(Object o) {
        mSubject.onNext(o);
    }

    /**
     * 返回指定類型的Observable實(shí)例
     *
     * @param type
     * @param <T>
     * @return
     */
    public <T> Observable<T> toObservable(final Class<T> type) {
        return mSubject.ofType(type);
    }

    /**
     * 是否已有觀察者訂閱
     *
     * @return
     */
    public boolean hasObservers() {
        return mSubject.hasObservers();
    }

    /**
     * 一個(gè)默認(rèn)的訂閱方法
     *
     * @param type
     * @param next
     * @param error
     * @param <T>
     * @return
     */
    public <T> Subscription doSubscribe(Class<T> type, Action1<T> next, Action1<Throwable> error) {
        return tObservable(type)
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(next, error);
    }

    /**
     * 保存訂閱后的subscription
     * @param o
     * @param subscription
     */
    public void addSubscription(Object o, Subscription subscription) {
        if (mSubscriptionMap == null) {
            mSubscriptionMap = new HashMap<>();
        }
        String key = o.getClass().getName();
        if (mSubscriptionMap.get(key) != null) {
            mSubscriptionMap.get(key).add(subscription);
        } else {
            CompositeSubscription compositeSubscription = new CompositeSubscription();
            compositeSubscription.add(subscription);
            mSubscriptionMap.put(key, compositeSubscription);
        }
    }

    /**
     * 取消訂閱
     * @param o
     */
    public void unSubscribe(Object o) {
        if (mSubscriptionMap == null) {
            return;
        }

        String key = o.getClass().getName();
        if (!mSubscriptionMap.containsKey(key)){
            return;
        }
        if (mSubscriptionMap.get(key) != null) {
            mSubscriptionMap.get(key).unsubscribe();
        }

        mSubscriptionMap.remove(key);
    }
}

先看一下這個(gè)私有的構(gòu)造函數(shù):

private RxBus() {
        mSubject = new SerializedSubject<>(PublishSubject.create());
    }

由于Subject類是非線程安全的砌些,所以我們通過它的子類SerializedSubject將PublishSubject轉(zhuǎn)換成一個(gè)線程安全的Subject對(duì)象。之后可通過單例方法getInstance()進(jìn)行RxBus的初始化加匈。

toObservable()根據(jù)事件類型存璃,通過mSubject.ofType(type);得到一個(gè)Observable對(duì)象,讓其它訂閱者來訂閱雕拼。其實(shí)ofType()方法纵东,會(huì)過濾掉不符合條件的事件類型,然后將滿足條件的事件類型通過cast()方法啥寇,轉(zhuǎn)換成對(duì)應(yīng)類型的Observable對(duì)象偎球,這點(diǎn)可通過源碼查看。
同時(shí)封裝了一個(gè)簡單的訂閱方法doSubscribe()辑甜,只需要傳入事件類型衰絮,相應(yīng)的回調(diào)即可。其實(shí)可以根據(jù)需求在RxBus中擴(kuò)展?jié)M足自己需求的doSubscribe()方法磷醋,來簡化使用時(shí)的代碼邏輯猫牡。

在需要發(fā)送事件的地方調(diào)用post()方法,它間接的通過mSubject.onNext(o);將事件發(fā)送給訂閱者邓线。

同時(shí)RxBus提供了addSubscription()淌友、unSubscribe()方法煌恢,分別來保存訂閱時(shí)返回的Subscription對(duì)象,以及取消訂閱震庭。

接下我們?cè)诰唧w的場景中測試一下:
1瑰抵、我們?cè)贏ctivity的onCreate()方法中進(jìn)行進(jìn)行訂閱操作:

private void doSubscribe() {
        Subscription subscription1 = RxBus.getInstance()
                .tObservable(String.class)
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Action1<String>() {
                    @Override
                    public void call(String s) {
                        mTv.setText("事件內(nèi)容:" + s);
                    }
                }, new Action1<Throwable>() {
                    @Override
                    public void call(Throwable throwable) {

                    }
                });
        RxBus.getInstance().addSubscription(this, subscription1);
    }

可以看到我們?cè)O(shè)定事件類型為String,并且Subscriber的回調(diào)發(fā)生在主線程器联,同時(shí)保存了Subscription對(duì)象二汛。
然后通過一個(gè)Button發(fā)送事件:

mBtn1.setOnClickListener(new View.OnClickListener() {
            @Override
            public void onClick(View v) {
                RxBus.getInstance().post("1024");
            }
        });

我們直接在UI線程發(fā)送了String類型的1024,看效果:

發(fā)送UI線程事件

2主籍、同樣在onCreate()方法中進(jìn)行進(jìn)行訂閱操作:

private void doSubscribe() {
        Subscription subscription2 = RxBus.getInstance()
                .doSubscribe(Integer.class, new Action1<Integer>() {
                    @Override
                    public void call(Integer s) {
                        mTv.setText("事件內(nèi)容:" + s);
                    }
                }, new Action1<Throwable>() {
                    @Override
                    public void call(Throwable throwable) {

                    }
                });
        RxBus.getInstance().addSubscription(this, subscription2);
    }

我們使用了RxBus中封裝好的doSubscribe()方法习贫,設(shè)置事件類型為Integer逛球。
這次我們通過Button在子線程中發(fā)送一個(gè)事件:

mBtn2.setOnClickListener(new View.OnClickListener() {
            @Override
            public void onClick(View v) {
                new Thread(new Runnable() {
                    @Override
                    public void run() {
                        RxBus.getInstance().post(2048);
                    }
                }).start();
            }
        });

在子線程發(fā)送了一個(gè)Integer類型的2048千元,看效果:


發(fā)送子線程事件

3、我們?cè)贉y試下在廣播中發(fā)送事件颤绕,訂閱方式按照?qǐng)鼍?的方式幸海。
然后定義一個(gè)檢測網(wǎng)絡(luò)狀態(tài)的廣播:

public class NetworkChangeReceiver extends BroadcastReceiver {
    @Override
    public void onReceive(Context context, Intent intent) {
        ConnectivityManager manager = (ConnectivityManager) context.getSystemService(Context.CONNECTIVITY_SERVICE);
        NetworkInfo networkInfo = manager.getActiveNetworkInfo();
        if (networkInfo != null && networkInfo.isAvailable()) {
            RxBus.getInstance().post("網(wǎng)絡(luò)連接成功");
        } else {
            RxBus.getInstance().post("網(wǎng)絡(luò)不可用");
        }
    }
}

在網(wǎng)絡(luò)可用與不可用時(shí)發(fā)送提示事件,然后在onCreate()方法中注冊(cè)廣播:

private void registerReceiver() {
        IntentFilter intentFilter = new IntentFilter();
        intentFilter.addAction("android.net.conn.CONNECTIVITY_CHANGE");
        mReceiver = new NetworkChangeReceiver();
        registerReceiver(mReceiver, intentFilter);
    }

我們手動(dòng)打開奥务、關(guān)閉網(wǎng)絡(luò)物独,可以看到mTv上會(huì)顯示網(wǎng)絡(luò)狀態(tài)的提示信息,看效果:


在廣播中發(fā)送事件

最后不要忘了在onDestory()中對(duì)廣播進(jìn)行取消注冊(cè)氯葬,以及取消訂閱挡篓。

protected void onDestroy() {
        super.onDestroy();
        unregisterReceiver(mReceiver);
        RxBus.getInstance().unSubscribe(this);
    }

其它場景有興趣的可自行測試哦!到這里RxBus的基本功能就實(shí)現(xiàn)了帚称。

但是還不夠完善官研,一般情況我們都是先訂閱事件,然后發(fā)送事件闯睹,如果我們反過來戏羽,先發(fā)送了事件,再進(jìn)行訂閱操作楼吃,怎么保證發(fā)送的事件不丟失呢始花?也就是EventBus中的StickyEven功能。
其實(shí)通過RxJava實(shí)現(xiàn)類似的功能很簡單孩锡,Subject有一個(gè)子類BehaviorSubject酷宵,在被訂閱之前,它可以緩存最近一個(gè)發(fā)送給它的事件躬窜,當(dāng)被訂閱后浇垦,它會(huì)立刻將緩存事件發(fā)送給訂閱者,這樣就解決了我們之前的疑問斩披。RxBus需要做的修改很簡單:

private RxBus() {
        mSubject = new SerializedSubject<>(BehaviorSubject.create());
    }

但是有一點(diǎn)需要注意BehaviorSubject只能緩存最近的一個(gè)事件溜族,如果有多個(gè)事件怎么辦讹俊?對(duì)RxJava來說都不是事,Subject還有一個(gè)子類ReplaySubject煌抒,在被訂閱之前仍劈,它可以緩存多個(gè)發(fā)送給它的事件,在被訂閱后會(huì)發(fā)送所有事件給訂閱者寡壮,相信如何修改RxBus已經(jīng)很明顯了贩疙。

有興趣的話可以下載源碼測試:點(diǎn)我下載哦!

最后推薦一些RxJava的學(xué)習(xí)資源:RxJava入門况既、給 Android 開發(fā)者的 RxJava 詳解

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末这溅,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子棒仍,更是在濱河造成了極大的恐慌悲靴,老刑警劉巖,帶你破解...
    沈念sama閱讀 216,372評(píng)論 6 498
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件莫其,死亡現(xiàn)場離奇詭異癞尚,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)乱陡,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,368評(píng)論 3 392
  • 文/潘曉璐 我一進(jìn)店門浇揩,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人憨颠,你說我怎么就攤上這事胳徽。” “怎么了爽彤?”我有些...
    開封第一講書人閱讀 162,415評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵养盗,是天一觀的道長。 經(jīng)常有香客問我淫茵,道長爪瓜,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,157評(píng)論 1 292
  • 正文 為了忘掉前任匙瘪,我火速辦了婚禮铆铆,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘丹喻。我一直安慰自己薄货,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,171評(píng)論 6 388
  • 文/花漫 我一把揭開白布碍论。 她就那樣靜靜地躺著谅猾,像睡著了一般。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上税娜,一...
    開封第一講書人閱讀 51,125評(píng)論 1 297
  • 那天坐搔,我揣著相機(jī)與錄音,去河邊找鬼敬矩。 笑死概行,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的弧岳。 我是一名探鬼主播凳忙,決...
    沈念sama閱讀 40,028評(píng)論 3 417
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢(mèng)啊……” “哼禽炬!你這毒婦竟也來了涧卵?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 38,887評(píng)論 0 274
  • 序言:老撾萬榮一對(duì)情侶失蹤腹尖,失蹤者是張志新(化名)和其女友劉穎柳恐,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體桐臊,經(jīng)...
    沈念sama閱讀 45,310評(píng)論 1 310
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡胎撤,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,533評(píng)論 2 332
  • 正文 我和宋清朗相戀三年晓殊,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了断凶。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 39,690評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡巫俺,死狀恐怖认烁,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情介汹,我是刑警寧澤却嗡,帶...
    沈念sama閱讀 35,411評(píng)論 5 343
  • 正文 年R本政府宣布,位于F島的核電站嘹承,受9級(jí)特大地震影響窗价,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜叹卷,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,004評(píng)論 3 325
  • 文/蒙蒙 一撼港、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧骤竹,春花似錦帝牡、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,659評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至,卻和暖如春罩息,著一層夾襖步出監(jiān)牢的瞬間嗤详,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,812評(píng)論 1 268
  • 我被黑心中介騙來泰國打工瓷炮, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留断楷,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 47,693評(píng)論 2 368
  • 正文 我出身青樓崭别,卻偏偏與公主長得像冬筒,于是被迫代替她去往敵國和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子茅主,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,577評(píng)論 2 353

推薦閱讀更多精彩內(nèi)容