最近再看一個項目浑此,但是那個項目里面的Rxjava是1.x版本的,由于最近又有一個項目要開始了,在封裝各種基類驯镊,所以我準備將項目中的Rxbus用Rxjava2.x來修改一下繼續(xù)用萤皂,由于2.x的Rxjava進行了代碼的重構(gòu)缘缚,所以在這里寫下我的一些收集與寫法
什么是Rxbus
先說說Rxjava
官方解釋:
RxJava is a Java VM implementation of Reactive Extensions: a library for composing asynchronous and event-based programs by using observable sequences.
RxJava是Reactive Extensions的Java VM實現(xiàn):用于通過使用observable序列來組合異步和基于事件的程序的庫。
關于Rxjava的使用和理解可以參看:https://github.com/ReactiveX/RxJava
那我們的Rxbus呢敌蚜?
RxBus:處理應用程序間各個組件的通信桥滨,或者組件與組建之間的數(shù)據(jù)傳遞。
其實說到底弛车,RxBus學的是一種思路齐媒,而并不是給你一個現(xiàn)成的庫,然后直接去調(diào)用纷跛。需要的是自己來封裝喻括。
Rxbus的簡單實現(xiàn)
導包
compile 'io.reactivex.rxjava2:rxjava:2.1.2'
compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
Rxbus類
public class RxBus {
// 主題
private final Subject<Object> bus;
// PublishSubject只會把在訂閱發(fā)生的時間點之后來自原始Observable的數(shù)據(jù)發(fā)射給觀察者
private RxBus() {
bus = PublishSubject.create().toSerialized();
}
// 靜態(tài)內(nèi)部類(單例模式的內(nèi)部類實現(xiàn)方法)
private static class SingletonHolder {
public final static RxBus sInstance = new RxBus();
}
// 單例RxBus
public static RxBus getDefault() {
return SingletonHolder.sInstance;
}
/**
* 提供了一個新的事件
* 發(fā)布
* @param o
*/
public void post(Object o) {
bus.onNext(o);
}
// 根據(jù)傳遞的 eventType 類型返回特定類型(eventType)的 被觀察者
public <T> Observable<T> toObserverable(Class<T> eventType) {
return bus.ofType(eventType); //判斷接收事件類型
}
}
不侮辱大家智商,配上注釋應該大家都懂了贫奠,就不解釋了
Rxjava1.x -> 2.x問題
如果大家有興趣的話唬血,最好還是閱讀源碼。
有兩篇官方文檔值得閱讀:
what's different in 2.0:https://github.com/ReactiveX/RxJava/wiki/What's-different-in-2.0
文檔:http://reactivex.io/RxJava/2.x/javadoc/
參考文檔:http://www.reibang.com/p/2badfbb3a33b
1. 修改EventThread.class唤崭,刪除Schedulers.immediate()相關
因為在2.0中刪除了Schedulers.immediate()這個線程的切換
2. //存放訂閱者信息
private Map<Object, CompositeSubscription> subscriptions = new HashMap<>();
修改為:
private Map<Object, CompositeDisposable> subscriptions = new HashMap<>();
3. CompositeSubscription.unsubscribe();
修改為
CompositeDisposable.dispose();
4. 在2.0中增加了Flowable 這樣就把 backpressure 的問題放到了Flowable中來處理拷恨,
而Observable 不對backpressure進行處理了。但是使用Flowable還是要注意對backpressure
的處理谢肾,不然還是會出現(xiàn)以前的問題腕侄。
5. Subscription subscription = toObservable(sub.tag(), cla);
修改為:
Disposable disposable = tObservable(sub.tag(), cla);
6. SerializedSubject 已經(jīng)變?yōu)榉莗ublic類,可以通過
bus = PublishSubject.create().toSerialized();
的方式獲取線程安全 的對象。
7. private final PublishSubject<Object> bus = PublishSubject.create();
final Subject<Object> subject = bus.toSerialized();
修改為:
bus = PublishSubject.create().toSerialized();
toSerialized() 方法中其實就是之前序列化安全對象的寫法。而SerializedSubject類已經(jīng)變成了非public的
8.//1.X public class AppBaseActivity extends AppCompatActivity {
...
private CompositeSubscription mCompositeSubscription;
protected void addSubscription(Subscription subscription) {
if (null == mCompositeSubscription) {
mCompositeSubscription = new CompositeSubscription();
}
mCompositeSubscription.add(subscription);
}
@Override
protected void onDestroy() {
if (null != mCompositeSubscription) {
mCompositeSubscription.unsubscribe();
}
super.onDestroy();
}
...
}
//2.X
public class AppBaseActivity extends AppCompatActivity {
...
private CompositeDisposable mCompositeDisposable;
protected void addDisposable(Disposable disposable) {
if (null == mCompositeDisposable) {
mCompositeDisposable = new CompositeDisposable();
}
mCompositeDisposable.add(disposable);
}
@Override
protected void onDestroy() {
if (null != mCompositeDisposable) {
mCompositeDisposable.clear();
}
super.onDestroy();
}
...
}