1戒祠,什么是RxBus
rxbus不是什么框架朝蜘,它只是一個通過rxjava實現(xiàn)eventbus的類
在android中使用時叔收,它還還可以引用AndroidLifecycle來解決內(nèi)存溢出問題
它是觀察者模式的一種應(yīng)用选酗,方便了我們在不同頁面與不同線程間的通信
2哑子,代碼
RxBus的代碼實現(xiàn)
public class RxBus {
private volatile static RxBus mDefaultInstance;
//事件總線
private final Subject<Object> mBus;
//粘性事件存儲
private final Map<Class<?>, Object> mStickyEventMap;
private RxBus() {
mBus = PublishSubject.create().toSerialized();
mStickyEventMap = new ConcurrentHashMap<>();
}
public static RxBus getInstance() {
if (mDefaultInstance == null) {
synchronized (RxBus.class) {
if (mDefaultInstance == null) {
mDefaultInstance = new RxBus();
}
}
}
return mDefaultInstance;
}
/**
* 發(fā)送事件
*/
public void post(Object event) {
mBus.onNext(event);
}
/**
* 使用Rxlifecycle解決RxJava引起的內(nèi)存泄漏
*/
public <T> Observable<T> toObservable(LifecycleOwner owner, final Class<T> eventType) {
LifecycleProvider<Lifecycle.Event> provider = AndroidLifecycle.createLifecycleProvider(owner);
return mBus.ofType(eventType).compose(provider.<T>bindToLifecycle());
}
/**
* 判斷是否有訂閱者
*/
public boolean hasObservers() {
return mBus.hasObservers();
}
public void reset() {
mDefaultInstance = null;
}
/**
* 發(fā)送一個新Sticky事件
*/
public void postSticky(Object event) {
synchronized (mStickyEventMap) {
mStickyEventMap.put(event.getClass(), event);
}
post(event);
}
/**
* 根據(jù)傳遞的 eventType 類型返回特定類型(eventType)的 被觀察者
* 使用Rxlifecycle解決RxJava引起的內(nèi)存泄漏
*/
public <T> Observable<T> toObservableSticky(LifecycleOwner owner, final Class<T> eventType) {
synchronized (mStickyEventMap) {
LifecycleProvider<Lifecycle.Event> provider = AndroidLifecycle.createLifecycleProvider(owner);
Observable<T> observable = mBus.ofType(eventType).compose(provider.<T>bindToLifecycle());
final Object event = mStickyEventMap.get(eventType);
if (event != null) {
return observable.mergeWith(Observable.create(subscriber -> subscriber.onNext(eventType.cast(event))));
} else {
return observable;
}
}
}
/**
* 根據(jù)eventType獲取Sticky事件
*/
public <T> T getStickyEvent(Class<T> eventType) {
synchronized (mStickyEventMap) {
return eventType.cast(mStickyEventMap.get(eventType));
}
}
/**
* 移除指定eventType的Sticky事件
*/
public <T> T removeStickyEvent(Class<T> eventType) {
synchronized (mStickyEventMap) {
return eventType.cast(mStickyEventMap.remove(eventType));
}
}
/**
* 移除所有的Sticky事件
*/
public void removeAllStickyEvents() {
synchronized (mStickyEventMap) {
mStickyEventMap.clear();
}
}
}
事件實體類
public class MsgEvent {
private String msg;
public MsgEvent(String msg) {
this.msg = msg;
}
public String getMsg() {
return msg;
}
public void setMsg(String msg) {
this.msg = msg;
}
}
接收事件(觀察者)
RxBus.getInstance()
.toObservable(this, MsgEvent.class)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<MsgEvent>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(MsgEvent msgEvent) {
text.setText("one " + msgEvent.getMsg());
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
事件發(fā)送
RxBus.getInstance().post(new MsgEvent("Java"));
RxBus實現(xiàn)原理
初始化分析
首先,RxBus是一個單利模式鞋喇,這沒什么可以說的声滥,畢竟后面使用到RxBus需要是公共唯一的類。
事件總線侦香,現(xiàn)在來說一下Subject<Object> mBus落塑,這是一個事件總線,什么是事件總線呢罐韩?
在簡單觀察模式中憾赁,觀察者訂閱被觀察者,單被觀察者狀態(tài)或者數(shù)據(jù)發(fā)生變化時通知觀察者伴逸,這是一對一的關(guān)系缠沈。
但當觀察者和被觀察者是多個或者不確定數(shù)量的時候膘壶,這就需要一個總線來存儲這些觀察者和被觀察者错蝴,方便在發(fā)送通知的時候找到對應(yīng)的觀察者。
public class RxBus {
...
//事件總線
private final Subject<Object> mBus;
private RxBus() {
mBus = PublishSubject.create().toSerialized();
mStickyEventMap = new ConcurrentHashMap<>();
}
...
}
這里是通過Rxjava中的PublishSubject.create().toSerialized() 來創(chuàng)建總線用來存儲觀察者颓芭。簡單的就把它當做集合吧顷锰。
觀察者的創(chuàng)建分析
public class RxBus{
...
/**
* 使用Rxlifecycle解決RxJava引起的內(nèi)存泄漏
*/
public <T> Observable<T> toObservable(LifecycleOwner owner, final Class<T> eventType) {
LifecycleProvider<Lifecycle.Event> provider = AndroidLifecycle.createLifecycleProvider(owner);
return mBus.ofType(eventType).compose(provider.<T>bindToLifecycle());
}
/**
* 發(fā)送事件
*/
public void post(Object event) {
mBus.onNext(event);
}
...
}
回顧觀察者創(chuàng)建代碼
RxBus.getInstance()
.toObservable(this, MsgEvent.class)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<MsgEvent>() { ... });
在觀察者創(chuàng)建時,先調(diào)用RxBus中的toObservable獲取一個回調(diào)事件類型為MsgEvent的被觀察者Observable亡问,其中傳入了LifecycleOwner官紫,
這個是為了防止當頁面關(guān)閉以后,訂閱事件還沒有結(jié)束州藕。
然后再執(zhí)行.subscribe()束世,并傳入一個新建的觀察者Observer,在subscribe的作用就是讓觀察者訂閱被觀察者
事件發(fā)送
執(zhí)行Subject的next進行發(fā)送
3,RxBus粘性事件
什么是粘性事件
一般情況都是先創(chuàng)建觀察者并加入到總線中床玻,然后在執(zhí)行事件發(fā)送毁涉,觀察者就可以收到相應(yīng)的事件
但是有時候也出現(xiàn)先發(fā)送事件,然后再創(chuàng)建觀察者锈死,這個時候就收不到之前的事件了贫堰,使用粘性事件就可以做到后創(chuàng)建的觀察者也可以收到之前的事件穆壕。
class RxBus{
...
/**
* 發(fā)送一個新Sticky事件
*/
public void postSticky(Object event) {
synchronized (mStickyEventMap) {
mStickyEventMap.put(event.getClass(), event);
}
post(event);
}
/**
* 根據(jù)傳遞的 eventType 類型返回特定類型(eventType)的 被觀察者
* 使用Rxlifecycle解決RxJava引起的內(nèi)存泄漏
*/
public <T> Observable<T> toObservableSticky(LifecycleOwner owner, final Class<T> eventType) {
synchronized (mStickyEventMap) {
LifecycleProvider<Lifecycle.Event> provider = AndroidLifecycle.createLifecycleProvider(owner);
Observable<T> observable = mBus.ofType(eventType).compose(provider.<T>bindToLifecycle());
final Object event = mStickyEventMap.get(eventType);
if (event != null) {
return observable.mergeWith(Observable.create(subscriber -> subscriber.onNext(eventType.cast(event))));
} else {
return observable;
}
}
}
...
}
粘性事件實際就是創(chuàng)建一個Map<Class<?>, Object> mStickyEventMap,用于存儲所有發(fā)送過的粘性事件其屏,當創(chuàng)建粘性觀察者時喇勋,會從這map中知道對應(yīng)的EventType類型的被觀察者Observable,并返回
observable.mergeWith(Observable.create(subscriber -> subscriber.onNext(eventType.cast(event))))偎行,這是返回Observable時代碼發(fā)送了從map中對應(yīng)的事件川背,這樣新創(chuàng)建的觀察者也可以能馬上收到之前的事件