之前寫了一個簡單的Rxbus模塊疑俭,使用的是RxJava 1.0版本粮呢,十月底Rxjava已經(jīng)更新到了2.0.0,那我就對現(xiàn)有的Rxbus模塊做一下升級钞艇。
之前Rxbus的連接:
http://www.reibang.com/writer#/notebooks/3833653/notes/4406055
Step 1
引入最新的RxJava
compile 'io.reactivex.rxjava2:rxjava:2.0.0'
compile 'io.reactivex.rxjava2:rxandroid:2.0.0-RC1'
Step 2
修改EventThread.class啄寡,刪除Schedulers.immediate()相關(guān)
因為在2.0中刪除了Schedulers.immediate()這個線程的切換
Step 3
修改RxBus.class
Branch 1 CompositeDisposable
old
//存放訂閱者信息
private Map<Object, CompositeSubscription> subscriptions = new HashMap<>();
new
//存放訂閱者信息
private Map<Object, CompositeDisposable> subscriptions = new HashMap<>();
CompositeSubscription 修改為 CompositeDisposable
那么自然解除訂閱的方式也有了修改
old
CompositeSubscription.unsubscribe();
new
CompositeDisposable.dispose();
Branch 2 Flowable
在2.0中增加了Flowable 這樣就把 backpressure 的問題放到了Flowable中來處理,而Observable 不對backpressure進行處理了哩照。
但是使用Flowable還是要注意對backpressure的處理挺物,不然還是會出現(xiàn)以前的問題。
old
Observable.just(subscriber)
.filter(s -> s != null)//判斷訂閱者不為空
.filter(s -> subscriptions.get(subscriber)==null) //判斷訂閱者沒有在序列中
.map(s -> s.getClass())
.flatMap(s -> Observable.from(s.getDeclaredMethods()))//獲取訂閱者方法并且用Observable裝載
.map(m -> {m.setAccessible(true);return m;})//使非public方法可以被invoke,并且關(guān)閉安全檢查提升反射效率
.filter(m -> m.isAnnotationPresent(Subscribe.class))//方法必須被Subscribe注解
.subscribe(m -> {
addSubscription(m,subscriber);
});
new
Flowable.just(subscriber)
.filter(s -> s != null)//判斷訂閱者不為空
.filter(s -> subscriptions.get(subscriber)==null) //判斷訂閱者沒有在序列中
.map(s -> s.getClass())
.flatMap(s -> Flowable.fromArray(s.getDeclaredMethods()))//獲取訂閱者方法并且用Observable裝載
.map(m -> {m.setAccessible(true);return m;})//使非public方法可以被invoke,并且關(guān)閉安全檢查提升反射效率
.filter(m -> m.isAnnotationPresent(Subscribe.class))//方法必須被Subscribe注解
.subscribe(m -> {
addSubscription(m,subscriber);
});
并且要注意.from修改為了.fromArray
Branch 3 Disposable
old
Subscription subscription = tObservable(sub.tag(), cla)
new
Disposable disposable = tObservable(sub.tag(), cla)
Branch 4 元操作符修改
old
/**
* 訂閱事件
* @return
*/
public <T> Observable tObservable(int code, final Class<T> eventType) {
return bus.ofType(Msg.class)//判斷接收事件類型
.filter(new Func1<Msg, Boolean>() {
@Override
public Boolean call(Msg o) {
//過濾code同的事件
return o.code == code;
}
})
.map(new Func1<Msg, Object>() {
@Override
public Object call(Msg o) {
return o.object;
}
})
.cast(eventType);
}
new
/**
* 訂閱事件
* @return
*/
public <T> Observable tObservable(int code, final Class<T> eventType) {
return bus.ofType(Msg.class)//判斷接收事件類型
.filter(new Predicate<Msg>() {
@Override
public boolean test(Msg msg) throws Exception {
return msg.code==code;
}
})
.map(new Function<Msg, Object>() {
@Override
public Object apply(Msg msg) throws Exception {
return msg.object;
}
})
.cast(eventType);
}
filter飘弧、map在2.0中都有修改识藤,這里只涉及到現(xiàn)在所使用的操作符,其他相關(guān)操作符修改請到查看相關(guān) doc
Branch 5 subscribeWith
2.0中新增了subscribeWith()方法次伶,對于這個方法我沒有找到過多的解釋痴昧,這里暫時引用兩篇簡書中的內(nèi)容加以說明
subscribe后不再會有 Subscription 也就是如今的 Disposable,為了保持向后的兼容冠王, Flowable 提供了 subscribeWith方法 返回當(dāng)前的Subscriber對象赶撰, 并且同時提供了DefaultSubscriber, ResourceSubscriber,DisposableSubscriber柱彻,讓他們提供了Disposable接口豪娜, 可以完成和以前類似的代碼 (引用1)
需要使用subscribeWith而不是subscribe,因為subscribe方法現(xiàn)在返回void (引用2)
因為篇幅有限為避免斷章取義绒疗,如果對subscribeWith不解請去引用地址查看侵歇,引用地址在下方相關(guān)連接中。
old
/**
* 解除訂閱者
* @param subscriber 訂閱者
*/
public void unRegister(Object subscriber) {
Observable.just(subscriber)
.filter(s -> s!=null)
.map(s -> subscriptions.get(s))
.filter(subs -> subs!=null)
.subscribe(subs -> {
subs.unsubscribe();
subscriptions.remove(subscriber);
new
/**
* 解除訂閱者
* @param subscriber 訂閱者
*/
public void unRegister(Object subscriber) {
Flowable.just(subscriber)
.filter(s -> s!=null)
.map(s -> subscriptions.get(s))
.filter(subs -> subs!=null)
.subscribeWith(new Subscriber<CompositeDisposable>() {
@Override
public void onSubscribe(Subscription s) {
}
@Override
public void onNext(CompositeDisposable compositeDisposable) {
compositeDisposable.dispose();
subscriptions.remove(subscriber);
}
@Override
public void onError(Throwable t) {
}
@Override
public void onComplete() {
}
});
}
到這里Rxbus的升級就暫時完成了吓蘑。
問題
在升級過程中也遇到了幾個問題惕虑,現(xiàn)在暫時還沒有解決,也在這里記錄一下
1.
SerializedSubject 這個可以吧Subject序列化為線程安全的類沒有找到磨镶,現(xiàn)在只有SerializedObserver溃蔫、SerializedSubscriber 這兩個類,不知道以后是否會增加琳猫。
解決
SerializedSubject 已經(jīng)變?yōu)榉莗ublic類
可以通過bus = PublishSubject.create().toSerialized();的方式獲取線程安全 的對象伟叛。
2.
在Rxbus解除訂閱時我使用了RxJava的寫法,如果只改動Observable為Flowable,那么程序會報錯脐嫂。但是在另一段與Retrofit相關(guān)的代碼中卻可以使用统刮。
/**
* 解除訂閱者
* @param subscriber 訂閱者
*/
public void unRegister(Object subscriber) {
Observable.just(subscriber)
.filter(s -> s!=null)
.map(s -> subscriptions.get(s))
.filter(subs -> subs!=null)
.subscribe(subs -> {
subs.unsubscribe();
subscriptions.remove(subscriber);
APIServiceManager.getInstance()
.getTravelNotesAPI()
.getTravelNotesList(key, page + "")
.compose(RxSchedulersHelper.io_main())
.compose(SchedulersHelper.handleResult())
.doOnTerminate(() -> view.disDialog())
.subscribe(s -> RxBus.getInstance().post(RxBus.TAG_DEFAULT, s.getBookses()),
e -> RxBus.getInstance().post(RxBus.TAG_ERROR, e.getMessage()));
public interface TravelNotesAPI {
@GET(APIConfig.BASE_URL_TRAVEL_NOTES+"travellist?")
Flowable<ResponseJson<TravelNoteBook>>
getTravelNotesList(@Query("query") String query, @Query("page") String page);}
現(xiàn)在2.0出的時間不長所以如果文章中出現(xiàn)什么問題可以給我留言
相關(guān)連接
RxJava 2 :
https://github.com/ReactiveX/RxJava/tree/2.x
RxJava 2 doc:
http://reactivex.io/RxJava/2.x/javadoc/
簡書RxJava2說明:
http://www.reibang.com/p/763322683f23 (引用1)
http://www.reibang.com/p/850af4f09b61 (引用2)
backpressure相關(guān)說明:
http://www.dundunwen.com/article/275b1d92-f9da-4bb8-b111-3aa8a6ace245.html
RxBus:
https://github.com/hackerlc/GearApplication/tree/master/gearlibrary/src/main/java/gear/yc/com/gearlibrary/rxjava/rxbus