Y事件總線:基于java的Observe和Observable實現(xiàn)的事件總線
github地址:https://github.com/lewis-v/YEventBus
使用方式
導入依賴
Add it in your root build.gradle at the end of repositories:
allprojects {
repositories {
...
maven { url 'https://jitpack.io' }
}
}
Add the dependency
dependencies {
compile 'com.github.lewis-v:YEventBus:1.0.0'
}
使用方式
定義事件類TestEvent2繼承于IEvent,并注冊事件
YEventBus.getInstance().subscriber(TestEvent2.class, new YObserver<TestEvent2>() {//訂閱事件,處理的所在的線程與分發(fā)的線程一致
@Override
public void onSuccess(TestEvent2 event) {
Log.i(TAG,event.toString());
}
@Override
public void onFail(Exception e) {
Log.e(TAG,e.getMessage());
}
});
YEventBus.getInstance().subscriber(TestEvent.class, new YMainThreadObserver<TestEvent>() {//訂閱事件,會在主線程中處理
@Override
public void onSuccess(TestEvent event) {
Log.i(TAG,event.toString());
}
@Override
public void onFail(Exception e) {
Log.e(TAG,e.getMessage());
}
});
發(fā)布事件
YEventBus.getInstance().postMainEvent(TestEvent.class,new TestEvent(TAG));//發(fā)布在主線程分發(fā)的事件
YEventBus.getInstance().postEvent(TestEvent.class,new TestEvent(TAG));//發(fā)布在子線程分發(fā)的事件
取消訂閱
YEventBus.getInstance().unSubscriber(TestEvent.class,observer);//取消某事件下的某個訂閱者的訂閱
YEventBus.getInstance().unSubscriberEvent(TestEvent.class);//取消TestEvent整個系列事件的訂閱
YEventBus.getInstance().unSubscriberAll();//取消所有事件的訂閱
具體實現(xiàn)
Observable與Observer
首先是使用java的Observable,在發(fā)布事件時需要先setChanged()在進行發(fā)布,否者是發(fā)布不了的
public class YObservable extends Observable {
public <T extends IEvent> void postEvent(T data){
setChanged();
notifyObservers(data);
}
}
然后是java的Observer,這里實現(xiàn)了OnGetEvent接口,主要是要在本來的Observer接口上加上成功與失敗的調(diào)用方法,其中Observer接口需要實現(xiàn)updata方法,此方法是在事件分發(fā)時調(diào)用的方法
interface OnGetEvent<E extends IEvent> extends Observer{
void onSuccess(E event);
void onFail(Exception e);
}
YObserver控制了事件的實際處理及異常的獲取
public abstract class YObserver<E extends IEvent> implements OnGetEvent<E> {
@Override
public void update(Observable o, Object arg) {
try {
onSuccess((E) arg);
}catch (Exception e){
onFail(e);
}
}
}
這里除了提供YObserver,還提供了YMainThreadObserver,此Observer的事件處理會在主線程中進行,添加此類的意義是,可以再發(fā)布時指定在主線程,也可以在訂閱的時候指定在主線程,當然在訂閱的時候指定的優(yōu)先級比發(fā)布的時候指定優(yōu)先級高.
public abstract class YMainThreadObserver<E extends IEvent> implements OnGetEvent<E>{
@Override
public void update(final Observable o, final Object arg) {
ThreadSchedule.getMainHandle().post(new Runnable() {
@Override
public void run() {
try {
onSuccess((E) arg);
}catch (Exception e){
onFail(e);
}
}
});
}
}
Observable管理類
YObservableManager用于管理Observable,內(nèi)部定義了ConcurrentHashMap來存儲Observable,其鍵值為對應(yīng)事件的Class,在訂閱和取消訂閱會對map進行插入或遍歷
public class YObservableManager {
private ConcurrentHashMap<Class,YObservable> mObservableMap;
private IEventHandle handle;
public YObservableManager() {
mObservableMap = new ConcurrentHashMap<>();
init();
}
public YObservableManager(ConcurrentHashMap<Class, YObservable> mObservableMap) {
this.mObservableMap = mObservableMap;
init();
}
public void init(){
handle = new YEventHandle();
}
/**
* 設(shè)置自定義的事件分發(fā)處理
* @param handle
*/
public void setHandle(IEventHandle handle) {
this.handle = handle;
}
/**
* 發(fā)布消息
* @param event
* @param <T>
*/
public <T extends IEvent> void postEvent(Class<T> event,T data){
YObservable observables = mObservableMap.get(event);
if (handle == null){
init();
}
handle.postEvent(observables,data);
}
/**
* 發(fā)布主線程消息
* @param event
* @param <T>
*/
public <T extends IEvent> void postMainEvent(Class<T> event,T data){
YObservable observables = mObservableMap.get(event);
if (handle == null){
init();
}
handle.postMainEvent(observables,data);
}
/**
* 訂閱事件
* @param event
* @param observer
* @param <T>
*/
public <T extends IEvent> void subscriber(Class<T> event,OnGetEvent<T> observer){
if (mObservableMap.containsKey(event)){
mObservableMap.get(event).addObserver(observer);
}else {
YObservable observable = new YObservable();
observable.addObserver(observer);
mObservableMap.put(event, observable);
}
}
/**
* 解除訂閱
* @param event
* @param observer
* @param <T>
*/
public <T extends IEvent> void unSubscriber(Class<T> event,YObserver<T> observer){
if (mObservableMap.containsKey(event)){
mObservableMap.get(event).deleteObserver(observer);
}
}
/**
* 解除一個事件系列的訂閱
* @param event
*/
public void unSubscriberEvent(Class<? extends IEvent> event){
if (mObservableMap.containsKey(event)) {
mObservableMap.get(event).deleteObservers();
mObservableMap.remove(event);
}
}
/**
* 解除所有事件訂閱
*/
public void unSubscriberAll(){
for (Map.Entry<Class,YObservable> entry : mObservableMap.entrySet()){
YObservable value = entry.getValue();
if (value != null){
value.deleteObservers();
}
}
mObservableMap.clear();
}
/**
* 釋放資源
*/
public void destroy(){
handle.destroy();
handle = null;
unSubscriberAll();
}
}
事件的發(fā)布
上述代碼中,在發(fā)布消息的時候會調(diào)用IEventHandle的postEvent,其實際的實現(xiàn)為
public class YEventHandle implements IEventHandle{
private ExecutorService executorServiceHandle;//處理線程池
public YEventHandle() {
init();
}
private void init(){
executorServiceHandle = Executors.newFixedThreadPool(2*Runtime.getRuntime().availableProcessors());
}
/**
* 發(fā)布消息
* @param observable
* @param data
* @param <T>
* @throws InterruptedException
*/
@Override
public <T extends IEvent> void postEvent(YObservable observable, T data) {
handle(observable,data);
}
/**
* 發(fā)布主線程處理消息
* @param observable
* @param data
* @param <T>
*/
@Override
public <T extends IEvent> void postMainEvent(YObservable observable, T data) {
handleInMain(observable,data);
}
/**
* 處理
* @param observable
* @param data
* @param <T>
*/
private <T extends IEvent> void handle(final YObservable observable, final T data){
executorServiceHandle.execute(new Runnable() {
@Override
public void run() {
if (observable != null) {
observable.postEvent(data);
}
}
});
}
/**
* 在主線程處理
* @param observable
* @param data
* @param <T>
*/
private <T extends IEvent> void handleInMain(final YObservable observable, final T data){
executorServiceHandle.execute(new Runnable() {
@Override
public void run() {
if (observable != null) {
ThreadSchedule.getMainHandle().post(new Runnable() {
@Override
public void run() {
observable.postEvent(data);
}
});
}
}
});
}
/**
* 釋放資源
*/
@Override
public void destroy() {
executorServiceHandle.shutdownNow();
executorServiceHandle = null;
}
}
事件的分發(fā)處理,會在一個線程池里進行,線程池的大小為Cpu核心數(shù)的2倍,當事件過多時會在線程池的隊列中等待,需要注意的是對事件的處理盡量不要做太耗時的任務(wù),不然把線程池中的所有線程都阻塞了會導致整個事件總線阻塞,后面的時間將無法繼續(xù)發(fā)布.
小結(jié)結(jié)
Y事件總線的實現(xiàn)只要是使用了java的Observable和Observer,其內(nèi)部也是使用一個Vector類保存Observer,在發(fā)布的時候,遍歷這里列表進行發(fā)布,這也是設(shè)計模式中的觀察與被觀察者的模式.