目前大多數(shù)開發(fā)者使用EventBus或者Otto作為事件總線通信庫,對于RxJava使用者來說忠聚,RxJava也可以輕松實現(xiàn)事件總線宝磨,因為它們都依據(jù)于觀察者模式缘眶。
不多說,上代碼
/**
* RxBus
* Created by YoKeyword on 2015/6/17.
*/
public class RxBus {
private static volatile RxBus defaultInstance;
private final Subject<Object, Object> bus;
// PublishSubject只會把在訂閱發(fā)生的時間點之后來自原始Observable的數(shù)據(jù)發(fā)射給觀察者
public RxBus() {
bus = new SerializedSubject<>(PublishSubject.create());
}
// 單例RxBus
public static RxBus getDefault() {
if (defaultInstance == null) {
synchronized (RxBus.class) {
if (defaultInstance == null) {
defaultInstance = new RxBus();
}
}
}
return defaultInstance ;
}
// 發(fā)送一個新的事件
public void post (Object o) {
bus.onNext(o);
}
// 根據(jù)傳遞的 eventType 類型返回特定類型(eventType)的 被觀察者
public <T> Observable<T> toObservable (Class<T> eventType) {
return bus.ofType(eventType);
// 這里感謝小鄧子的提醒: ofType = filter + cast
// return bus.filter(new Func1<Object, Boolean>() {
// @Override
// public Boolean call(Object o) {
// return eventType.isInstance(o);
// }
// }) .cast(eventType);
}
}
注:
1束莫、Subject同時充當了Observer和Observable的角色造虏,Subject是非線程安全的,要避免該問題麦箍,需要將 Subject轉(zhuǎn)換為一個 SerializedSubject
漓藕,上述RxBus類中把線程非安全的PublishSubject包裝成線程安全的Subject。
2挟裂、PublishSubject只會把在訂閱發(fā)生的時間點之后來自原始Observable的數(shù)據(jù)發(fā)射給觀察者享钞。
3、ofType操作符只發(fā)射指定類型的數(shù)據(jù),其內(nèi)部就是filter+cast(這里非常感謝@小鄧子 的提醒)
public final <R> Observable<R> ofType(final Class<R> klass) {
return filter(new Func1<T, Boolean>() {
@Override
public final Boolean call(T t) {
return klass.isInstance(t);
}
}).cast(klass);
}
filter操作符可以使你提供一個指定的測試數(shù)據(jù)項栗竖,只有通過測試的數(shù)據(jù)才會被“發(fā)射”暑脆。
cast操作符可以將一個Observable轉(zhuǎn)換成指定類型的Observable。
分析:
1狐肢、首先創(chuàng)建一個可同時充當Observer和Observable的Subject添吗;
2、在需要接收事件的地方份名,訂閱該Subject(此時Subject是作為Observable)碟联,在這之后,一旦Subject接收到事件僵腺,立即發(fā)射給該訂閱者鲤孵;
3、在我們需要發(fā)送事件的地方辰如,將事件post至Subject普监,此時Subject作為Observer接收到事件(onNext),然后會發(fā)射給所有訂閱該Subject的訂閱者琉兜。
對于RxBus的使用凯正,就和普通的RxJava訂閱事件很相似了。
先看發(fā)送事件的代碼:
RxBus.getDefault().post(new UserEvent (1, "yoyo"));
userEvent是要發(fā)送的事件豌蟋,如果你用過EventBus, 很容易理解廊散,UserEvent的代碼:
public class UserEvent {
long id;
String name;
public UserEvent(long id,String name) {
this.id= id;
this.name= name;
}
public long getId() {
return id;
}
public String getName() {
return name;
}
}
再看接收事件的代碼:
// rxSubscription是一個Subscription的全局變量,這段代碼可以在onCreate/onStart等生命周期內(nèi)
rxSubscription = RxBus.getDefault().toObserverable(UserEvent.class)
.subscribe(new Action1<UserEvent>() {
@Override
public void call(UserEvent userEvent) {
long id = userEvent.getId();
String name = userEvent.getName();
...
}
},
new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
// TODO: 處理異常
}
});
最后夺饲,一定要記得在生命周期結(jié)束的地方取消訂閱事件,防止RxJava可能會引起的內(nèi)存泄漏問題施符。
@Override
protected void onDestroy() {
super.onDestroy();
if(!rxSubscription.isUnsubscribed()) {
rxSubscription.unsubscribe();
}
}
這樣往声,一個簡單的Event Bus就實現(xiàn)了!如果你的項目已經(jīng)開始使用RxJava戳吝,也許可以考慮替換掉EventBus或Otto浩销,減小項目體積。
RxBus听哭、EventBus因為解耦太徹底慢洋,濫用的話,項目可維護性會越來越低陆盘;一些簡單場景更推薦用回調(diào)普筹、Subject來代替事件總線。
感興趣的可以閱讀我另外2篇深入RxBus的文章:
[深入RxBus:[支持Sticky事件]](http://www.reibang.com/p/71ab00a2677b)
深入RxBus:[異常處理]
參考:
http://nerds.weddingpartyapp.com/tech/2014/12/24/implementing-an-event-bus-with-rxjava-rxbus/