前面在 EventBus設(shè)計(jì)與實(shí)現(xiàn)分析——特性介紹中介紹了EventBus的基本用法喻喳,及其提供的大多數(shù)特性的用法栋荸;在EventBus設(shè)計(jì)與實(shí)現(xiàn)分析——訂閱者的注冊(cè) 中介紹了EventBus中訂閱者注冊(cè)的過程框咙。這里就繼續(xù)分析EventBus的代碼伸辟,來了解其事件發(fā)布的過程煤墙。
事件的發(fā)布
如我們前面已經(jīng)了解到的梅惯,在EventBus中,有兩種不同類型得事件仿野,一種是普通事件铣减,事件被通知給訂閱者之后即被丟棄,另一種是Sticky事件脚作,事件在被通知給訂閱者之后會(huì)被保存起來葫哗,下次有訂閱者注冊(cè)針對(duì)這種事件的訂閱時(shí),訂閱者會(huì)直接得到通知球涛。
在EventBus中劣针,會(huì)以兩個(gè)不同的方法來發(fā)布這兩種不同類型的事件,這兩個(gè)方法分別是post(Object event)和postSticky(Object event):
private final Map<Class<?>, Object> stickyEvents;
private final ThreadLocal<PostingThreadState> currentPostingThreadState = new ThreadLocal<PostingThreadState>() {
@Override
protected PostingThreadState initialValue() {
return new PostingThreadState();
}
};
......
/** Posts the given event to the event bus. */
public void post(Object event) {
PostingThreadState postingState = currentPostingThreadState.get();
List<Object> eventQueue = postingState.eventQueue;
eventQueue.add(event);
if (!postingState.isPosting) {
postingState.isMainThread = Looper.getMainLooper() == Looper.myLooper();
postingState.isPosting = true;
if (postingState.canceled) {
throw new EventBusException("Internal error. Abort state was not reset");
}
try {
while (!eventQueue.isEmpty()) {
postSingleEvent(eventQueue.remove(0), postingState);
}
} finally {
postingState.isPosting = false;
postingState.isMainThread = false;
}
}
}
......
/**
* Posts the given event to the event bus and holds on to the event (because it is sticky). The most recent sticky
* event of an event's type is kept in memory for future access by subscribers using {@link Subscribe#sticky()}.
*/
public void postSticky(Object event) {
synchronized (stickyEvents) {
stickyEvents.put(event.getClass(), event);
}
// Should be posted after it is putted, in case the subscriber wants to remove immediately
post(event);
}
......
/** For ThreadLocal, much faster to set (and get multiple values). */
final static class PostingThreadState {
final List<Object> eventQueue = new ArrayList<Object>();
boolean isPosting;
boolean isMainThread;
Subscription subscription;
Object event;
boolean canceled;
}
postSticky()僅是在保存了事件之后調(diào)用post()來發(fā)布事件而已宾符。而在post()中酿秸,會(huì)借助于PostingThreadState來執(zhí)行事件發(fā)布的過程灭翔。PostingThreadState為發(fā)布的事件提供了排隊(duì)功能魏烫,同時(shí)它還描述一些發(fā)布的線程狀態(tài)炮沐。PostingThreadState還是發(fā)布過程跟外界交流的一個(gè)窗口绳军,外部可通過EventBus類提供的一些方法來控制這個(gè)狀態(tài),進(jìn)而影響發(fā)布過程埂陆,比如取消發(fā)布等操作煌张。PostingThreadState對(duì)象在ThreadLocal變量中保存呐赡,可見發(fā)布的事件的隊(duì)列是每個(gè)線程一個(gè)的。post()方法會(huì)逐個(gè)取出事件隊(duì)列中的每一個(gè)事件骏融,調(diào)用postSingleEvent()方法來發(fā)布链嘀。
private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {
Class<?> eventClass = event.getClass();
boolean subscriptionFound = false;
if (eventInheritance) {
List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);
int countTypes = eventTypes.size();
for (int h = 0; h < countTypes; h++) {
Class<?> clazz = eventTypes.get(h);
subscriptionFound |= postSingleEventForEventType(event, postingState, clazz);
}
} else {
subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);
}
if (!subscriptionFound) {
if (logNoSubscriberMessages) {
Log.d(TAG, "No subscribers registered for event " + eventClass);
}
if (sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class &&
eventClass != SubscriberExceptionEvent.class) {
post(new NoSubscriberEvent(this, event));
}
}
}
private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) {
CopyOnWriteArrayList<Subscription> subscriptions;
synchronized (this) {
subscriptions = subscriptionsByEventType.get(eventClass);
}
if (subscriptions != null && !subscriptions.isEmpty()) {
for (Subscription subscription : subscriptions) {
postingState.event = event;
postingState.subscription = subscription;
boolean aborted = false;
try {
postToSubscription(subscription, event, postingState.isMainThread);
aborted = postingState.canceled;
} finally {
postingState.event = null;
postingState.subscription = null;
postingState.canceled = false;
}
if (aborted) {
break;
}
}
return true;
}
return false;
}
......
/** Looks up all Class objects including super classes and interfaces. Should also work for interfaces. */
private static List<Class<?>> lookupAllEventTypes(Class<?> eventClass) {
synchronized (eventTypesCache) {
List<Class<?>> eventTypes = eventTypesCache.get(eventClass);
if (eventTypes == null) {
eventTypes = new ArrayList<>();
Class<?> clazz = eventClass;
while (clazz != null) {
eventTypes.add(clazz);
addInterfaces(eventTypes, clazz.getInterfaces());
clazz = clazz.getSuperclass();
}
eventTypesCache.put(eventClass, eventTypes);
}
return eventTypes;
}
}
/** Recurses through super interfaces. */
static void addInterfaces(List<Class<?>> eventTypes, Class<?>[] interfaces) {
for (Class<?> interfaceClass : interfaces) {
if (!eventTypes.contains(interfaceClass)) {
eventTypes.add(interfaceClass);
addInterfaces(eventTypes, interfaceClass.getInterfaces());
}
}
}
postSingleEvent()要發(fā)布事件,首先需要找到訂閱者档玻,我們前面在 訂閱者的注冊(cè) 中看到怀泊,訂閱者注冊(cè)時(shí)會(huì)在subscriptionsByEventType中保存事件類型和訂閱者的映射關(guān)系,那要找到訂閱者豈不是很容易误趴?
其實(shí)不完全是霹琼。關(guān)鍵是對(duì)于事件類型的處理。要通知的事件類型的訂閱者不一定僅僅包含事件對(duì)象本身的類型的訂閱者,還可能要通知事件類型的父類或?qū)崿F(xiàn)的接口的類型的訂閱者枣申。在eventInheritance被置為true時(shí)售葡,就需要通知事件類型的父類或?qū)崿F(xiàn)的接口的類型的訂閱者。lookupAllEventTypes()和addInterfaces()就用于查找所有這樣的類型忠藤。
postSingleEvent()會(huì)逐個(gè)事件類型的去通知相應(yīng)得訂閱者挟伙,這一任務(wù)由postSingleEventForEventType()來完成。而在postSingleEventForEventType()中則是根據(jù)subscriptionsByEventType找到所有的訂閱者方法模孩,并通過postToSubscription方法來逐個(gè)的向這些訂閱者方法通知事件像寒。
private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
switch (subscription.subscriberMethod.threadMode) {
case POSTING:
invokeSubscriber(subscription, event);
break;
case MAIN:
if (isMainThread) {
invokeSubscriber(subscription, event);
} else {
mainThreadPoster.enqueue(subscription, event);
}
break;
case BACKGROUND:
if (isMainThread) {
backgroundPoster.enqueue(subscription, event);
} else {
invokeSubscriber(subscription, event);
}
break;
case ASYNC:
asyncPoster.enqueue(subscription, event);
break;
default:
throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
}
}
......
/**
* Invokes the subscriber if the subscriptions is still active. Skipping subscriptions prevents race conditions
* between {@link #unregister(Object)} and event delivery. Otherwise the event might be delivered after the
* subscriber unregistered. This is particularly important for main thread delivery and registrations bound to the
* live cycle of an Activity or Fragment.
*/
void invokeSubscriber(PendingPost pendingPost) {
Object event = pendingPost.event;
Subscription subscription = pendingPost.subscription;
PendingPost.releasePendingPost(pendingPost);
if (subscription.active) {
invokeSubscriber(subscription, event);
}
}
void invokeSubscriber(Subscription subscription, Object event) {
try {
subscription.subscriberMethod.method.invoke(subscription.subscriber, event);
} catch (InvocationTargetException e) {
handleSubscriberException(subscription, event, e.getCause());
} catch (IllegalAccessException e) {
throw new IllegalStateException("Unexpected exception", e);
}
}
在postToSubscription()中事件的通知又分為同步的通知和異步的通知。同步的通知是直接調(diào)用invokeSubscriber(Subscription subscription, Object event)方法瓜贾,這會(huì)將事件對(duì)象傳遞給訂閱者方法進(jìn)行調(diào)用诺祸。而異步的通知?jiǎng)t是將事件及訂閱者拋給某個(gè)poster就結(jié)束。
對(duì)于某個(gè)訂閱者的通知要采用同步通知還是異步通知?jiǎng)t需要根據(jù)訂閱者的ThreadMode及事件發(fā)布的線程來定祭芦。具體得規(guī)則為:
訂閱者的線程模式是POSTING --------------------------------> 同步通知
訂閱者的線程模式是MAIN + 事件發(fā)布線程是主線程 ---------------> 同步通知
訂閱者的線程模式是BACKGROUND + 事件發(fā)布線程不是主線程 ------> 同步通知
訂閱者的線程模式是BACKGROUND + 事件發(fā)布線程是主線程 --------> 異步通知
訂閱者的線程模式是MAIN + 事件發(fā)布線程不是主線程 --------------> 異步通知
訂閱者的線程模式是ASYNC ----------------------------------> 異步通知
同步通知和異步通知各三種筷笨。但三種異步通知本身又各不相同,它們分別由三種不同的Poster來處理龟劲,訂閱者的線程模式是BACKGROUND
+ 事件發(fā)布線程是主線程的異步通知由BackgroundPoster
來處理胃夏,訂閱者的線程模式是MAIN
+ 事件發(fā)布線程不是主線程的異步通知由HandlerPoster
來處理,而訂閱者的線程模式是ASYNC
的異步通知由AsyncPoster
來處理昌跌。
接著就來看一下這些Poster仰禀。首先是HandlerPoster:
package org.greenrobot.eventbus;
import android.os.Handler;
import android.os.Looper;
import android.os.Message;
import android.os.SystemClock;
final class HandlerPoster extends Handler {
private final PendingPostQueue queue;
private final int maxMillisInsideHandleMessage;
private final EventBus eventBus;
private boolean handlerActive;
HandlerPoster(EventBus eventBus, Looper looper, int maxMillisInsideHandleMessage) {
super(looper);
this.eventBus = eventBus;
this.maxMillisInsideHandleMessage = maxMillisInsideHandleMessage;
queue = new PendingPostQueue();
}
void enqueue(Subscription subscription, Object event) {
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
synchronized (this) {
queue.enqueue(pendingPost);
if (!handlerActive) {
handlerActive = true;
if (!sendMessage(obtainMessage())) {
throw new EventBusException("Could not send handler message");
}
}
}
}
@Override
public void handleMessage(Message msg) {
boolean rescheduled = false;
try {
long started = SystemClock.uptimeMillis();
while (true) {
PendingPost pendingPost = queue.poll();
if (pendingPost == null) {
synchronized (this) {
// Check again, this time in synchronized
pendingPost = queue.poll();
if (pendingPost == null) {
handlerActive = false;
return;
}
}
}
eventBus.invokeSubscriber(pendingPost);
long timeInMethod = SystemClock.uptimeMillis() - started;
if (timeInMethod >= maxMillisInsideHandleMessage) {
if (!sendMessage(obtainMessage())) {
throw new EventBusException("Could not send handler message");
}
rescheduled = true;
return;
}
}
} finally {
handlerActive = rescheduled;
}
}
}
這是一個(gè)Handler。其內(nèi)部有一個(gè)PendingPostQueue queue蚕愤,enqueue()操作即是用描述訂閱者方法的Subscription對(duì)象和事件對(duì)象構(gòu)造一個(gè)PendingPost對(duì)象答恶,然后將這個(gè)PendingPost對(duì)象放入queue中,并在Handler沒有在處理事件分發(fā)時(shí)發(fā)送一個(gè)消息來喚醒對(duì)于事件分發(fā)的處理萍诱。
而在handleMessage()中悬嗓,則是逐個(gè)從queue中取出PendingPost對(duì)象,并通過EventBus的invokeSubscriber(PendingPost pendingPost)來傳遞事件對(duì)象調(diào)用訂閱者方法裕坊。這里調(diào)用的invokeSubscriber()方法與前面那個(gè)同步版本略有差異包竹,它會(huì)將Subscription對(duì)象和事件對(duì)象從PendingPost對(duì)象中提取出來,并調(diào)用同步版的方法籍凝,同時(shí)還會(huì)釋放PendingPost對(duì)象周瞎。
這里有一個(gè)蠻巧妙得設(shè)計(jì),就是那個(gè)maxMillisInsideHandleMessage饵蒂,它用于限制一次事件發(fā)布所能消耗的最多的主線程時(shí)間声诸。如果事件限制到了的時(shí)候訂閱者沒有通知完,則會(huì)發(fā)送一個(gè)消息苹享,在下一輪中繼續(xù)處理双絮。
這是一個(gè)典型的生產(chǎn)者-消費(fèi)者模型浴麻,生產(chǎn)者是事件的發(fā)布者線程,而消費(fèi)者則是主線程囤攀。
PendingPost對(duì)象是通過一個(gè)鏈表來組織的软免。
package org.greenrobot.eventbus;
final class PendingPostQueue {
private PendingPost head;
private PendingPost tail;
synchronized void enqueue(PendingPost pendingPost) {
if (pendingPost == null) {
throw new NullPointerException("null cannot be enqueued");
}
if (tail != null) {
tail.next = pendingPost;
tail = pendingPost;
} else if (head == null) {
head = tail = pendingPost;
} else {
throw new IllegalStateException("Head present, but no tail");
}
notifyAll();
}
synchronized PendingPost poll() {
PendingPost pendingPost = head;
if (head != null) {
head = head.next;
if (head == null) {
tail = null;
}
}
return pendingPost;
}
synchronized PendingPost poll(int maxMillisToWait) throws InterruptedException {
if (head == null) {
wait(maxMillisToWait);
}
return poll();
}
}
還有PendingPost:
package org.greenrobot.eventbus;
import java.util.ArrayList;
import java.util.List;
final class PendingPost {
private final static List<PendingPost> pendingPostPool = new ArrayList<PendingPost>();
Object event;
Subscription subscription;
PendingPost next;
private PendingPost(Object event, Subscription subscription) {
this.event = event;
this.subscription = subscription;
}
static PendingPost obtainPendingPost(Subscription subscription, Object event) {
synchronized (pendingPostPool) {
int size = pendingPostPool.size();
if (size > 0) {
PendingPost pendingPost = pendingPostPool.remove(size - 1);
pendingPost.event = event;
pendingPost.subscription = subscription;
pendingPost.next = null;
return pendingPost;
}
}
return new PendingPost(event, subscription);
}
static void releasePendingPost(PendingPost pendingPost) {
pendingPost.event = null;
pendingPost.subscription = null;
pendingPost.next = null;
synchronized (pendingPostPool) {
// Don't let the pool grow indefinitely
if (pendingPostPool.size() < 10000) {
pendingPostPool.add(pendingPost);
}
}
}
}
PendingPostQueue是一個(gè)線程安全的鏈表,其中鏈表的節(jié)點(diǎn)是PendingPost焚挠,它提供了最最基本的入隊(duì)和出隊(duì)操作而已膏萧。PendingPost再次用了對(duì)象池,它提供了獲取對(duì)象和釋放對(duì)象的方法蝌衔。EventBus的作者真的還是蠻喜歡用對(duì)象池的嘛榛泛。
然后再來看BackgroundPoster:
package org.greenrobot.eventbus;
import android.util.Log;
/**
* Posts events in background.
*
* @author Markus
*/
final class BackgroundPoster implements Runnable {
private final PendingPostQueue queue;
private final EventBus eventBus;
private volatile boolean executorRunning;
BackgroundPoster(EventBus eventBus) {
this.eventBus = eventBus;
queue = new PendingPostQueue();
}
public void enqueue(Subscription subscription, Object event) {
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
synchronized (this) {
queue.enqueue(pendingPost);
if (!executorRunning) {
executorRunning = true;
eventBus.getExecutorService().execute(this);
}
}
}
@Override
public void run() {
try {
try {
while (true) {
PendingPost pendingPost = queue.poll(1000);
if (pendingPost == null) {
synchronized (this) {
// Check again, this time in synchronized
pendingPost = queue.poll();
if (pendingPost == null) {
executorRunning = false;
return;
}
}
}
eventBus.invokeSubscriber(pendingPost);
}
} catch (InterruptedException e) {
Log.w("Event", Thread.currentThread().getName() + " was interruppted", e);
}
} finally {
executorRunning = false;
}
}
}
BackgroundPoster與HandlerPoster還是挺像的。兩者的差別在于BackgroundPoster是一個(gè)Runnable噩斟,它的enqueue()操作喚醒對(duì)于事件分發(fā)的處理的方法曹锨,是將對(duì)象本身放進(jìn)EventBus的ExecutorService中執(zhí)行來實(shí)現(xiàn)的;另外在處理事件分發(fā)的run()方法中剃允,無需像HandlerPoster的handleMessage()方法那樣考慮時(shí)間限制沛简,它會(huì)一次性的將隊(duì)列中所有的PendingPost處理完才結(jié)束。
對(duì)于某一個(gè)特定事件斥废,一次性的將所有的PendingPost遞交給BackgroundPoster椒楣,因而大概率的它們會(huì)在同一個(gè)線程被通知。但如果訂閱者對(duì)事件的處理過快牡肉,在下一個(gè)PendingPost還沒來得及入隊(duì)時(shí)即執(zhí)行結(jié)束捧灰,則還是有可能在不同的線程中被通知。
最后再來看一下AsyncPoster:
class AsyncPoster implements Runnable {
private final PendingPostQueue queue;
private final EventBus eventBus;
AsyncPoster(EventBus eventBus) {
this.eventBus = eventBus;
queue = new PendingPostQueue();
}
public void enqueue(Subscription subscription, Object event) {
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
queue.enqueue(pendingPost);
eventBus.getExecutorService().execute(this);
}
@Override
public void run() {
PendingPost pendingPost = queue.poll();
if(pendingPost == null) {
throw new IllegalStateException("No pending post available");
}
eventBus.invokeSubscriber(pendingPost);
}
}
它會(huì)對(duì)每一個(gè)通知(訂閱者方法 + 訂閱者對(duì)象 + 事件對(duì)象)都起一個(gè)不同的task來進(jìn)行统锤。
用一張圖來總結(jié)EventBus中事件通知的過程:
EventBus發(fā)布事件的過程大體如此毛俏。