Rxjava2在Api方面有不少變化,基于Rxjava1.x的Rxbus實(shí)現(xiàn)不太適用,網(wǎng)上關(guān)于2.0的資料很少,既然沒有現(xiàn)成的亡蓉,那就自己寫一個(gè)
1.API變化
1.x中是使用PublishSubject來(lái)現(xiàn)實(shí)管理消息的,并使用SerializedSubject保證現(xiàn)成安全喷舀,在2.0中砍濒,
Subject被Processer代替淋肾,通過PublishProcesser管理消息,并通過toSerialized()方法保證現(xiàn)成安全
public class RxBus {
//相當(dāng)于Rxjava1.x中的Subject
private final FlowableProcessor<Object> mBus;
private static volatile RxBus sRxBus = null;
private RxBus() {
//調(diào)用toSerialized()方法爸邢,保證線程安全
mBus = PublishProcessor.create().toSerialized();
}
public static synchronized RxBus getDefault() {
if (sRxBus == null) {
synchronized (RxBus.class) {
if (sRxBus == null) {
sRxBus = new RxBus();
}
}
}
return sRxBus;
}
/**
* 發(fā)送消息
* @param o
*/
public void post(Object o) {
new SerializedSubscriber<>(mBus).onNext(o);
}
/**
* 確定接收消息的類型
* @param aClass
* @param <T>
* @return
*/
public <T> Flowable<T> toFlowable(Class<T> aClass) {
return mBus.ofType(aClass);
}
/**
* 判斷是否有訂閱者
* @return
*/
public boolean hasSubscribers() {
return mBus.hasSubscribers();
}
}
2.簡(jiǎn)單封裝
public class RxBusHelper {
/**
* 發(fā)布消息
*
* @param o
*/
public static void post(Object o) {
RxBus.getDefault().post(o);
}
/**
* 接收消息,并在主線程處理
*
* @param aClass
* @param disposables 用于存放消息
* @param listener
* @param <T>
*/
public static <T> void doOnMainThread(Class<T> aClass, CompositeDisposable disposables, OnEventListener<T> listener) {
disposables.add(RxBus.getDefault().toFlowable(aClass).observeOn(AndroidSchedulers.mainThread()).subscribe(listener::onEvent, throwable -> listener.onError(new ErrorBean(ErrorCode.ERROR_CODE_RXBUS, ErrorCode.ERROR_DESC_RXBUS))));
}
public static <T> void doOnMainThread(Class<T> aClass, OnEventListener<T> listener) {
RxBus.getDefault().toFlowable(aClass).observeOn(AndroidSchedulers.mainThread()).subscribe(listener::onEvent, throwable -> listener.onError(new ErrorBean(ErrorCode.ERROR_CODE_RXBUS, ErrorCode.ERROR_DESC_RXBUS)));
}
/**
* 接收消息,并在子線程處理
*
* @param aClass
* @param disposables
* @param listener
* @param <T>
*/
public static <T> void doOnChildThread(Class<T> aClass, CompositeDisposable disposables, OnEventListener<T> listener) {
disposables.add(RxBus.getDefault().toFlowable(aClass).subscribeOn(Schedulers.newThread()).subscribe(listener::onEvent, throwable -> listener.onError(new ErrorBean(ErrorCode.ERROR_CODE_RXBUS, ErrorCode.ERROR_DESC_RXBUS))));
}
public static <T> void doOnChildThread(Class<T> aClass, OnEventListener<T> listener) {
RxBus.getDefault().toFlowable(aClass).subscribeOn(Schedulers.newThread()).subscribe(listener::onEvent, throwable -> listener.onError(new ErrorBean(ErrorCode.ERROR_CODE_RXBUS, ErrorCode.ERROR_DESC_RXBUS)));
}
public interface OnEventListener<T> {
void onEvent(T t);
void onError(ErrorBean errorBean);
}
}