** 本篇文章已授權(quán)微信公眾號(hào) guolin_blog (郭霖)獨(dú)家發(fā)布*
RxBus的核心功能是基于Rxjava的侠讯,既然是模擬EventBus白修,我們需要搞清楚RxJava滿足實(shí)現(xiàn)EventBus的那些條件,這樣才能更好的實(shí)現(xiàn)RxBus录平。
EventBus是Android上的一個(gè)事件發(fā)布/訂閱的事件總線框架,可以充分的解耦,簡化了四大組件愕乎、UI線程與子線程的間的事件傳遞等等。它基本工作流程如下:
- 1壁公、訂閱:
EventBus.getDefault().register(this);
- 2感论、發(fā)送事件:
EventBus.getDefault().post(event);
- 3、接受紊册、處理事件:
onEventXXX(Object event);
- 2笛粘、取消訂閱:
EventBus.getDefault().unregister(this);
根據(jù)EventBus的工作流程,我們的RxBus首先需要自身的實(shí)例湿硝,這一點(diǎn)我們可以仿照EventBus的getDefault()方法薪前,通過一個(gè)單例來實(shí)現(xiàn)。有了RxBus實(shí)例就可以進(jìn)行訂閱了关斜,在RxJava中有個(gè)Subject類示括,它繼承Observable類,同時(shí)實(shí)現(xiàn)了Observer接口痢畜,因此Subject可以同時(shí)擔(dān)當(dāng)訂閱者和被訂閱者的角色垛膝,這里我們使用Subject的子類PublishSubject來創(chuàng)建一個(gè)Subject對(duì)象(PublishSubject只有被訂閱后才會(huì)把接收到的事件立刻發(fā)送給訂閱者)鳍侣,在需要接收事件的地方,訂閱該Subject對(duì)象吼拥,之后如果Subject對(duì)象接收到事件倚聚,則會(huì)發(fā)射給該訂閱者,此時(shí)Subject對(duì)象充當(dāng)被訂閱者的角色凿可。完成了訂閱惑折,在需要發(fā)送事件的地方將事件發(fā)送給之前被訂閱的Subject對(duì)象,則此時(shí)Subject對(duì)象做為訂閱者接收事件枯跑,然后會(huì)立刻將事件轉(zhuǎn)發(fā)給訂閱該Subject對(duì)象的訂閱者惨驶,以便訂閱者處理相應(yīng)事件,到這里就完成了事件的發(fā)送與處理敛助。最后就是取消訂閱的操作了粗卜,Rxjava中,訂閱操作會(huì)返回一個(gè)Subscription對(duì)象纳击,以便在合適的時(shí)機(jī)取消訂閱续扔,防止內(nèi)存泄漏,如果一個(gè)類產(chǎn)生多個(gè)Subscription對(duì)象焕数,我們可以用一個(gè)CompositeSubscription存儲(chǔ)起來纱昧,以進(jìn)行批量的取消訂閱。
到這里我們已經(jīng)結(jié)合EventBus對(duì)RxBus的可行性以及大概的實(shí)現(xiàn)流程進(jìn)行了分析百匆,接下來結(jié)合實(shí)現(xiàn)代碼再做進(jìn)一步的解釋:
public class RxBus {
private static volatile RxBus mInstance;
private SerializedSubject<Object, Object> mSubject;
private HashMap<String, CompositeSubscription> mSubscriptionMap;
private RxBus() {
mSubject = new SerializedSubject<>(PublishSubject.create());
}
public static RxBus getInstance() {
if (mInstance == null) {
synchronized (RxBus.class) {
if (mInstance == null) {
mInstance = new RxBus();
}
}
}
return mInstance;
}
/**
* 發(fā)送事件
*
* @param o
*/
public void post(Object o) {
mSubject.onNext(o);
}
/**
* 返回指定類型的Observable實(shí)例
*
* @param type
* @param <T>
* @return
*/
public <T> Observable<T> toObservable(final Class<T> type) {
return mSubject.ofType(type);
}
/**
* 是否已有觀察者訂閱
*
* @return
*/
public boolean hasObservers() {
return mSubject.hasObservers();
}
/**
* 一個(gè)默認(rèn)的訂閱方法
*
* @param type
* @param next
* @param error
* @param <T>
* @return
*/
public <T> Subscription doSubscribe(Class<T> type, Action1<T> next, Action1<Throwable> error) {
return tObservable(type)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(next, error);
}
/**
* 保存訂閱后的subscription
* @param o
* @param subscription
*/
public void addSubscription(Object o, Subscription subscription) {
if (mSubscriptionMap == null) {
mSubscriptionMap = new HashMap<>();
}
String key = o.getClass().getName();
if (mSubscriptionMap.get(key) != null) {
mSubscriptionMap.get(key).add(subscription);
} else {
CompositeSubscription compositeSubscription = new CompositeSubscription();
compositeSubscription.add(subscription);
mSubscriptionMap.put(key, compositeSubscription);
}
}
/**
* 取消訂閱
* @param o
*/
public void unSubscribe(Object o) {
if (mSubscriptionMap == null) {
return;
}
String key = o.getClass().getName();
if (!mSubscriptionMap.containsKey(key)){
return;
}
if (mSubscriptionMap.get(key) != null) {
mSubscriptionMap.get(key).unsubscribe();
}
mSubscriptionMap.remove(key);
}
}
先看一下這個(gè)私有的構(gòu)造函數(shù):
private RxBus() {
mSubject = new SerializedSubject<>(PublishSubject.create());
}
由于Subject類是非線程安全的砌些,所以我們通過它的子類SerializedSubject將PublishSubject轉(zhuǎn)換成一個(gè)線程安全的Subject對(duì)象。之后可通過單例方法getInstance()
進(jìn)行RxBus的初始化加匈。
在toObservable()
根據(jù)事件類型存璃,通過mSubject.ofType(type);
得到一個(gè)Observable對(duì)象,讓其它訂閱者來訂閱雕拼。其實(shí)ofType()方法纵东,會(huì)過濾掉不符合條件的事件類型,然后將滿足條件的事件類型通過cast()方法啥寇,轉(zhuǎn)換成對(duì)應(yīng)類型的Observable對(duì)象偎球,這點(diǎn)可通過源碼查看。
同時(shí)封裝了一個(gè)簡單的訂閱方法doSubscribe()
辑甜,只需要傳入事件類型衰絮,相應(yīng)的回調(diào)即可。其實(shí)可以根據(jù)需求在RxBus中擴(kuò)展?jié)M足自己需求的doSubscribe()方法磷醋,來簡化使用時(shí)的代碼邏輯猫牡。
在需要發(fā)送事件的地方調(diào)用post()
方法,它間接的通過mSubject.onNext(o);
將事件發(fā)送給訂閱者邓线。
同時(shí)RxBus提供了addSubscription()
淌友、unSubscribe()
方法煌恢,分別來保存訂閱時(shí)返回的Subscription對(duì)象,以及取消訂閱震庭。
接下我們?cè)诰唧w的場景中測試一下:
1瑰抵、我們?cè)贏ctivity的onCreate()方法中進(jìn)行進(jìn)行訂閱操作:
private void doSubscribe() {
Subscription subscription1 = RxBus.getInstance()
.tObservable(String.class)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
mTv.setText("事件內(nèi)容:" + s);
}
}, new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
}
});
RxBus.getInstance().addSubscription(this, subscription1);
}
可以看到我們?cè)O(shè)定事件類型為String,并且Subscriber的回調(diào)發(fā)生在主線程器联,同時(shí)保存了Subscription對(duì)象二汛。
然后通過一個(gè)Button發(fā)送事件:
mBtn1.setOnClickListener(new View.OnClickListener() {
@Override
public void onClick(View v) {
RxBus.getInstance().post("1024");
}
});
我們直接在UI線程發(fā)送了String類型的1024,看效果:
2主籍、同樣在onCreate()方法中進(jìn)行進(jìn)行訂閱操作:
private void doSubscribe() {
Subscription subscription2 = RxBus.getInstance()
.doSubscribe(Integer.class, new Action1<Integer>() {
@Override
public void call(Integer s) {
mTv.setText("事件內(nèi)容:" + s);
}
}, new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
}
});
RxBus.getInstance().addSubscription(this, subscription2);
}
我們使用了RxBus中封裝好的doSubscribe()方法习贫,設(shè)置事件類型為Integer逛球。
這次我們通過Button在子線程中發(fā)送一個(gè)事件:
mBtn2.setOnClickListener(new View.OnClickListener() {
@Override
public void onClick(View v) {
new Thread(new Runnable() {
@Override
public void run() {
RxBus.getInstance().post(2048);
}
}).start();
}
});
在子線程發(fā)送了一個(gè)Integer類型的2048千元,看效果:
3、我們?cè)贉y試下在廣播中發(fā)送事件颤绕,訂閱方式按照?qǐng)鼍?的方式幸海。
然后定義一個(gè)檢測網(wǎng)絡(luò)狀態(tài)的廣播:
public class NetworkChangeReceiver extends BroadcastReceiver {
@Override
public void onReceive(Context context, Intent intent) {
ConnectivityManager manager = (ConnectivityManager) context.getSystemService(Context.CONNECTIVITY_SERVICE);
NetworkInfo networkInfo = manager.getActiveNetworkInfo();
if (networkInfo != null && networkInfo.isAvailable()) {
RxBus.getInstance().post("網(wǎng)絡(luò)連接成功");
} else {
RxBus.getInstance().post("網(wǎng)絡(luò)不可用");
}
}
}
在網(wǎng)絡(luò)可用與不可用時(shí)發(fā)送提示事件,然后在onCreate()方法中注冊(cè)廣播:
private void registerReceiver() {
IntentFilter intentFilter = new IntentFilter();
intentFilter.addAction("android.net.conn.CONNECTIVITY_CHANGE");
mReceiver = new NetworkChangeReceiver();
registerReceiver(mReceiver, intentFilter);
}
我們手動(dòng)打開奥务、關(guān)閉網(wǎng)絡(luò)物独,可以看到mTv上會(huì)顯示網(wǎng)絡(luò)狀態(tài)的提示信息,看效果:
最后不要忘了在onDestory()中對(duì)廣播進(jìn)行取消注冊(cè)氯葬,以及取消訂閱挡篓。
protected void onDestroy() {
super.onDestroy();
unregisterReceiver(mReceiver);
RxBus.getInstance().unSubscribe(this);
}
其它場景有興趣的可自行測試哦!到這里RxBus的基本功能就實(shí)現(xiàn)了帚称。
但是還不夠完善官研,一般情況我們都是先訂閱事件,然后發(fā)送事件闯睹,如果我們反過來戏羽,先發(fā)送了事件,再進(jìn)行訂閱操作楼吃,怎么保證發(fā)送的事件不丟失呢始花?也就是EventBus中的StickyEven功能。
其實(shí)通過RxJava實(shí)現(xiàn)類似的功能很簡單孩锡,Subject有一個(gè)子類BehaviorSubject
酷宵,在被訂閱之前,它可以緩存最近一個(gè)發(fā)送給它的事件躬窜,當(dāng)被訂閱后浇垦,它會(huì)立刻將緩存事件發(fā)送給訂閱者,這樣就解決了我們之前的疑問斩披。RxBus需要做的修改很簡單:
private RxBus() {
mSubject = new SerializedSubject<>(BehaviorSubject.create());
}
但是有一點(diǎn)需要注意BehaviorSubject只能緩存最近的一個(gè)事件溜族,如果有多個(gè)事件怎么辦讹俊?對(duì)RxJava來說都不是事,Subject還有一個(gè)子類ReplaySubject
煌抒,在被訂閱之前仍劈,它可以緩存多個(gè)發(fā)送給它的事件,在被訂閱后會(huì)發(fā)送所有事件給訂閱者寡壮,相信如何修改RxBus已經(jīng)很明顯了贩疙。
有興趣的話可以下載源碼測試:點(diǎn)我下載哦!
最后推薦一些RxJava的學(xué)習(xí)資源:RxJava入門况既、給 Android 開發(fā)者的 RxJava 詳解