? EventBus是Android平臺(tái)上一個(gè)發(fā)布/訂閱事件總線作谚,使用EventBus可以方便的在不同的組件中進(jìn)行消息通信,避免不同組件之間的耦合鸿脓。EventBus或者類似的事件總線基本上是各個(gè)項(xiàng)目中的標(biāo)配抑钟。本文主要基于3.2.0版本介紹EventBus的實(shí)現(xiàn)方式。
EventBus的基本流程
? 如上圖所示野哭,EventBus的核心架構(gòu)是通過(guò)post()方法把Event交給EventBus在塔,由EventBus根據(jù)事件的類型,分發(fā)給Subscriber拨黔。Subscriber即訂閱者蛔溃,指的是通過(guò)EventBus的register()方法在EventBus中注冊(cè)的對(duì)象,可以是Activity蓉驹、Fragment也可以是其它對(duì)象城榛。
? EventBus的流程比較簡(jiǎn)單,主要分析幾個(gè)問(wèn)題态兴。
- EventBus的注冊(cè)流程
- EventBus分發(fā)事件的流程
- EventBus怎么切換線程
EventBus如何存儲(chǔ)訂閱關(guān)系
? 在開(kāi)始之前狠持,先介紹一下EventBus存儲(chǔ)訂閱關(guān)系的幾個(gè)Map,了解幾個(gè)Map存儲(chǔ)的數(shù)據(jù)和作用后瞻润,更容易理解EventBus的流程喘垂。
- subscriptionsByEventType:
private final Map<Class<?>, CopyOnWriteArrayList<Subscription>> subscriptionsByEventType;
變量聲明如上所示俱恶,是一個(gè)保存Event類型和對(duì)應(yīng)的訂閱對(duì)象和方法的Map涧郊,key對(duì)應(yīng)的是Event的Class對(duì)象,value是對(duì)應(yīng)的Subscription的列表。subscriptionsByEventType的作用是根據(jù)Event的類型捶箱,拿到對(duì)應(yīng)的訂閱者列表及其方法缔逛,然后通過(guò)反射的形式進(jìn)行調(diào)用页响。簡(jiǎn)單來(lái)說(shuō)闺魏,subscriptionsByEventType保存的是某個(gè)Event有哪些訂閱者。
final class Subscription {
final Object subscriber;
final SubscriberMethod subscriberMethod;
...
}
public class SubscriberMethod {
final Method method;
final ThreadMode threadMode;
final Class<?> eventType;
final int priority;
final boolean sticky;
/** Used for efficient comparison */
String methodString;
...
}
Subscription是表示訂閱的實(shí)體類鸭限,從上面兩個(gè)類的代碼可以看出蜕径,Subscription用來(lái)保存訂閱者以及對(duì)應(yīng)的方法,包括方法的對(duì)象败京、線程兜喻、事件類型、優(yōu)先級(jí)等等屬性赡麦。
- typesBySubscriber:
private final Map<Object, List<Class<?>>> typesBySubscriber;
typesBySubscriber的key是Subscriber朴皆,value對(duì)應(yīng)的是訂閱的Event的列表。typesBySubscriber的作用是在注冊(cè)成訂閱者或者取消注冊(cè)的時(shí)候泛粹,可以根據(jù)Subscriber拿到對(duì)應(yīng)的Event列表遂铡,然后在subscriptionsByEventType中中方便的移除相應(yīng)的訂閱。與subscriptionsByEventType相反戚扳,typesBySubscriber保存的是某個(gè)訂閱者忧便,訂閱了哪些Event族吻。
EventBus的注冊(cè)流程
? EventBus注冊(cè)的方式很簡(jiǎn)單帽借,通過(guò)register(this)和unregister(this)兩個(gè)方法進(jìn)行注冊(cè)和反注冊(cè),通過(guò)注解的方式標(biāo)識(shí)具體的方法及處理的線程等超歌。
public void register(Object subscriber) {
Class<?> subscriberClass = subscriber.getClass();
List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);
synchronized (this) {
for (SubscriberMethod subscriberMethod : subscriberMethods) {
subscribe(subscriber, subscriberMethod);
}
}
}
register()方法比較好理解砍艾,就是找到subscriber的訂閱的方法,然后挨個(gè)進(jìn)行注冊(cè)巍举。問(wèn)題就在于如何找到訂閱方法脆荷。
遍歷訂閱者的方法
List<SubscriberMethod> findSubscriberMethods(Class<?> subscriberClass) {
List<SubscriberMethod> subscriberMethods = METHOD_CACHE.get(subscriberClass);
if (subscriberMethods != null) {
return subscriberMethods;
}
if (ignoreGeneratedIndex) {
subscriberMethods = findUsingReflection(subscriberClass);
} else {
subscriberMethods = findUsingInfo(subscriberClass);
}
if (subscriberMethods.isEmpty()) {
throw new EventBusException("Subscriber " + subscriberClass
+ " and its super classes have no public methods with the @Subscribe annotation");
} else {
METHOD_CACHE.put(subscriberClass, subscriberMethods);
return subscriberMethods;
}
}
在SubscriberMethodFinder中維護(hù)了緩存METHOD_CACHE,保存訂閱者對(duì)應(yīng)的訂閱方法懊悯,提高再次訂閱的效率蜓谋。然后查找訂閱者的方法。EventBus3.0中新增了索引機(jī)制炭分,如果在gradle中配置了索引桃焕,則會(huì)在編譯的時(shí)候就遍歷訂閱者的方法,降低EventBus注冊(cè)占用的時(shí)間捧毛,提升EventBus的性能观堂。
private List<SubscriberMethod> findUsingInfo(Class<?> subscriberClass) {
FindState findState = prepareFindState();
findState.initForSubscriber(subscriberClass);
while (findState.clazz != null) {
findState.subscriberInfo = getSubscriberInfo(findState);
if (findState.subscriberInfo != null) {
SubscriberMethod[] array = findState.subscriberInfo.getSubscriberMethods();
for (SubscriberMethod subscriberMethod : array) {
if (findState.checkAdd(subscriberMethod.method, subscriberMethod.eventType)) {
findState.subscriberMethods.add(subscriberMethod);
}
}
} else {
findUsingReflectionInSingleClass(findState);
}
findState.moveToSuperclass();
}
return getMethodsAndRelease(findState);
}
通過(guò)內(nèi)部類FindState來(lái)遍歷方法让网,getSubscriberInfo(findState)方法通過(guò)SubscriberInfoIndex獲取訂閱者信息,如果配置了索引师痕,就會(huì)獲取到subscriberInfo溃睹,調(diào)用subscriberInfo.getSubscriberMethods()可以獲取到訂閱的方法數(shù)組。如果沒(méi)有配置索引功能則返回null胰坟,會(huì)通過(guò)反射的方式去遍歷方法因篇。最后從findState獲取訂閱方法的List返回,并重置findState笔横。
private void findUsingReflectionInSingleClass(FindState findState) {
Method[] methods;
try {
// This is faster than getMethods, especially when subscribers are fat classes like Activities
methods = findState.clazz.getDeclaredMethods();
} catch (Throwable th) {
...
}
for (Method method : methods) {
int modifiers = method.getModifiers();
if ((modifiers & Modifier.PUBLIC) != 0 && (modifiers & MODIFIERS_IGNORE) == 0) {
Class<?>[] parameterTypes = method.getParameterTypes();
if (parameterTypes.length == 1) {
Subscribe subscribeAnnotation = method.getAnnotation(Subscribe.class);
if (subscribeAnnotation != null) {
Class<?> eventType = parameterTypes[0];
if (findState.checkAdd(method, eventType)) {
ThreadMode threadMode = subscribeAnnotation.threadMode();
findState.subscriberMethods.add(new SubscriberMethod(method, eventType, threadMode,
subscribeAnnotation.priority(), subscribeAnnotation.sticky()));
}
}
} else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
String methodName = method.getDeclaringClass().getName() + "." + method.getName();
throw new EventBusException("@Subscribe method " + methodName +
"must have exactly 1 parameter but has " + parameterTypes.length);
}
} else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
String methodName = method.getDeclaringClass().getName() + "." + method.getName();
throw new EventBusException(methodName +
" is a illegal @Subscribe method: must be public, non-static, and non-abstract");
}
}
}
這里通過(guò)getDeclaredMethods拿到類的方法惜犀,根據(jù)方法的類型、參數(shù)狠裹、注解依次進(jìn)行篩選虽界,找到符合條件的方法,將方法的相關(guān)信息添加到findState中涛菠。
遍歷完訂閱方法之后莉御,對(duì)訂閱方法進(jìn)行注冊(cè)。
注冊(cè)訂閱方法
private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
Class<?> eventType = subscriberMethod.eventType;
Subscription newSubscription = new Subscription(subscriber, subscriberMethod);
CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
if (subscriptions == null) {
subscriptions = new CopyOnWriteArrayList<>();
subscriptionsByEventType.put(eventType, subscriptions);
} else {
if (subscriptions.contains(newSubscription)) {
throw new EventBusException("Subscriber " + subscriber.getClass() + " already registered to event "
+ eventType);
}
}
int size = subscriptions.size();
for (int i = 0; i <= size; i++) {
if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) {
subscriptions.add(i, newSubscription);
break;
}
}
List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber);
if (subscribedEvents == null) {
subscribedEvents = new ArrayList<>();
typesBySubscriber.put(subscriber, subscribedEvents);
}
subscribedEvents.add(eventType);
if (subscriberMethod.sticky) {
if (eventInheritance) {
// Existing sticky events of all subclasses of eventType have to be considered.
// Note: Iterating over all events may be inefficient with lots of sticky events,
// thus data structure should be changed to allow a more efficient lookup
// (e.g. an additional map storing sub classes of super classes: Class -> List<Class>).
Set<Map.Entry<Class<?>, Object>> entries = stickyEvents.entrySet();
for (Map.Entry<Class<?>, Object> entry : entries) {
Class<?> candidateEventType = entry.getKey();
if (eventType.isAssignableFrom(candidateEventType)) {
Object stickyEvent = entry.getValue();
checkPostStickyEventToSubscription(newSubscription, stickyEvent);
}
}
} else {
Object stickyEvent = stickyEvents.get(eventType);
checkPostStickyEventToSubscription(newSubscription, stickyEvent);
}
}
}
訂閱的方法中根據(jù)subscriber和subscriberMethod構(gòu)造了一個(gè)Subscription對(duì)象俗冻,然后根據(jù)優(yōu)先級(jí)保存到subscriptionsByEventType中礁叔,在發(fā)送事件的時(shí)候取出依次分發(fā)。把subscriber和事件類型添加到typesBySubscriber中迄薄,方便取消注冊(cè)琅关。如果是方法設(shè)置粘性事件屬性的話,就立刻分發(fā)現(xiàn)存的對(duì)應(yīng)的粘性事件讥蔽。
分發(fā)事件的流程
public void post(Object event) {
PostingThreadState postingState = currentPostingThreadState.get();
List<Object> eventQueue = postingState.eventQueue;
eventQueue.add(event);
if (!postingState.isPosting) {
postingState.isMainThread = isMainThread();
postingState.isPosting = true;
if (postingState.canceled) {
throw new EventBusException("Internal error. Abort state was not reset");
}
try {
while (!eventQueue.isEmpty()) {
postSingleEvent(eventQueue.remove(0), postingState);
}
} finally {
postingState.isPosting = false;
postingState.isMainThread = false;
}
}
}
把Event放進(jìn)隊(duì)列中涣易,然后調(diào)用postSingleEvent()方法逐個(gè)分發(fā)。
private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {
Class<?> eventClass = event.getClass();
boolean subscriptionFound = false;
if (eventInheritance) {
List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);
int countTypes = eventTypes.size();
for (int h = 0; h < countTypes; h++) {
Class<?> clazz = eventTypes.get(h);
subscriptionFound |= postSingleEventForEventType(event, postingState, clazz);
}
} else {
subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);
}
...
}
調(diào)用lookupAllEventTypes()方法遍歷Event的所有接口和父類冶伞,然后再調(diào)用postSingleEventForEventType()分發(fā)新症。通過(guò)subscriptionsByEventType拿到之前保存的事件的訂閱關(guān)系列表,然后再調(diào)用postToSubscription()方法分發(fā)响禽。
private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
switch (subscription.subscriberMethod.threadMode) {
case POSTING:
invokeSubscriber(subscription, event);
break;
case MAIN:
if (isMainThread) {
invokeSubscriber(subscription, event);
} else {
mainThreadPoster.enqueue(subscription, event);
}
break;
case MAIN_ORDERED:
if (mainThreadPoster != null) {
mainThreadPoster.enqueue(subscription, event);
} else {
// temporary: technically not correct as poster not decoupled from subscriber
invokeSubscriber(subscription, event);
}
break;
case BACKGROUND:
if (isMainThread) {
backgroundPoster.enqueue(subscription, event);
} else {
invokeSubscriber(subscription, event);
}
break;
case ASYNC:
asyncPoster.enqueue(subscription, event);
break;
default:
throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
}
}
根據(jù)ThreadMode屬性進(jìn)行線程的切換徒爹,比如常用的MAIN模式,如果post的線程是主線程芋类,就直接通過(guò)反射的方式調(diào)用訂閱者的訂閱方法隆嗅。如果是在子線程中調(diào)用的post方法,就把這個(gè)訂閱事件加入到mainThreadPoster的隊(duì)列中等待處理侯繁,mainThreadPoster通過(guò)Handler的方式進(jìn)行主線程切換胖喳。
EventBus怎么切換線程
- 切換到主線程
public class HandlerPoster extends Handler implements Poster {
private final PendingPostQueue queue;
public void enqueue(Subscription subscription, Object event) {
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
synchronized (this) {
queue.enqueue(pendingPost);
if (!handlerActive) {
handlerActive = true;
if (!sendMessage(obtainMessage())) {
throw new EventBusException("Could not send handler message");
}
}
}
}
@Override
public void handleMessage(Message msg) {
boolean rescheduled = false;
try {
...
while (true) {
PendingPost pendingPost = queue.poll();
if (pendingPost == null) {
synchronized (this) {
// Check again, this time in synchronized
pendingPost = queue.poll();
if (pendingPost == null) {
handlerActive = false;
return;
}
}
}
eventBus.invokeSubscriber(pendingPost);
long timeInMethod = SystemClock.uptimeMillis() - started;
if (timeInMethod >= maxMillisInsideHandleMessage) {
if (!sendMessage(obtainMessage())) {
throw new EventBusException("Could not send handler message");
}
rescheduled = true;
return;
}
}
切換到主線程由HandlerPoster類實(shí)現(xiàn),在enqueue()方法之后巫击,調(diào)用sendMessage觸發(fā)handleMessage方法禀晓,在handleMessage中反射調(diào)用訂閱方法精续。為了避免主線程占用時(shí)間過(guò)長(zhǎng),超過(guò)10ms會(huì)重新通過(guò)sendMessage方法重新安排剩下的反射調(diào)用粹懒。
- 切換到子線程
class AsyncPoster implements Runnable, Poster {
private final PendingPostQueue queue;
private final EventBus eventBus;
AsyncPoster(EventBus eventBus) {
this.eventBus = eventBus;
queue = new PendingPostQueue();
}
public void enqueue(Subscription subscription, Object event) {
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
queue.enqueue(pendingPost);
eventBus.getExecutorService().execute(this);
}
@Override
public void run() {
PendingPost pendingPost = queue.poll();
if(pendingPost == null) {
throw new IllegalStateException("No pending post available");
}
eventBus.invokeSubscriber(pendingPost);
}
}
切換到子線程通過(guò)AsyncPoster和BackGroudPoster實(shí)現(xiàn)重付,原理是一樣的,繼承Runnable類凫乖,通過(guò)線程池執(zhí)行确垫。在run方法中反射調(diào)用訂閱方法。