@[toc]
EventBus前言
本文主要講解EventBus的源碼解析狸剃,如果您未聽過/使用過EventBus的話請自行百度一下,幾分鐘即可GET到這門技能枚抵。
如果你還不了解EventBus的Register(訂閱)流程,那么我建議你先看看我的上一篇博客: 《EventBus3.1.1源碼解析(上篇)》.
本篇章講解的是EventBus的發(fā)布流程。
同樣說明下欺嗤,我的源碼解析是基于EventBus3.1.1的源碼:
implementation 'org.greenrobot:eventbus:3.1.1'
EventBus#使用
通常來說,我們在通常在寫訂閱事件方法的時(shí)候卫枝,這樣定義:
@Subscribe()
public void onEventRecivier(MyObject o){
...
}
當(dāng)需要指定在主線程接收時(shí)煎饼,用threadmode參數(shù):
@Subscribe(threadMode = ThreadMode.MAIN)
public void onEventRecivier(MyObject o){
...
}
當(dāng)需要把該方法事件列為粘性事件(在發(fā)布之后注冊也能收到)的時(shí)候,用sticky = true:
@Subscribe(threadMode = ThreadMode.MAIN,sticky = true)
public void onEventRecivier(MyObject o){
...
}
當(dāng)需要指定事件優(yōu)先級時(shí)校赤,用priority來排序事件優(yōu)先級(默認(rèn)priority=0):
@Subscribe(threadMode = ThreadMode.MAIN,sticky = true,priority = 1){
public void onEventRecivier(MyObject o){
...
}
當(dāng)我們要發(fā)布事件時(shí)吆玖,簡單的例子:
MyObject o = new MyObject();
EventBus.getDefault().post(o);
只需要調(diào)用EventBus的post方法就能把事件發(fā)布出去筒溃,然后每個(gè)訂閱者會收到自己定義的事件訂閱方法注解參數(shù)所規(guī)定的方式接收到事件
EventBus#Post(發(fā)布)流程
下面我們將來剖析從調(diào)用EventBus#post方法到自動(dòng)回調(diào)訂閱方法的流程,直接從post入手吧沾乘。貼上post的源碼解析:
/** Posts the given event to the event bus. */
public void post(Object event) {
/*
currentPostingThreadState看定義是: ThreadLocal<PostingThreadState>
關(guān)于ThreadLocal不了解的同學(xué)怜奖,可以看下我之前的一篇講解Handler的博客中有提到:https://blog.csdn.net/lc_miao/article/details/77504343
ThreadLocal就是每個(gè)線程都擁有自己獨(dú)立的數(shù)據(jù),每個(gè)線程都有自己的一份拷貝翅阵,不相互影響歪玲。
這里通過ThreadLocal的get方法來得到一個(gè)PostingThreadState
PostingThreadState這個(gè)類只是一個(gè)實(shí)體類,封裝了當(dāng)前線程事件發(fā)布的狀態(tài)
*/
PostingThreadState postingState = currentPostingThreadState.get();
//得到該線程的事件隊(duì)列
List<Object> eventQueue = postingState.eventQueue;
//把需要發(fā)布的消息添加到隊(duì)列尾部來
eventQueue.add(event);
//isPosting描述了當(dāng)前是否在發(fā)布中掷匠,如果當(dāng)前不是正在發(fā)布中滥崩,
if (!postingState.isPosting) {
//isMainThread記錄了當(dāng)前是否是主線程
postingState.isMainThread = isMainThread();
//要準(zhǔn)備發(fā)布事件,把狀態(tài)改為發(fā)布中
postingState.isPosting = true;
//如果該線的事件被取消了則拋出異常
if (postingState.canceled) {
throw new EventBusException("Internal error. Abort state was not reset");
}
try {
//一切都沒毛病后槐雾,那么就往隊(duì)列中取出消息事件發(fā)布出去直到取完
while (!eventQueue.isEmpty()) {
//發(fā)布單個(gè)事件夭委,重點(diǎn)方法。我們在下面追蹤進(jìn)來
//eventQueue.remove(0)相當(dāng)于隊(duì)頭的一個(gè)出隊(duì)動(dòng)作
postSingleEvent(eventQueue.remove(0), postingState);
}
} finally {
//發(fā)布完畢后重置狀態(tài)
postingState.isPosting = false;
postingState.isMainThread = false;
}
}
}
上面的代碼募强,如果你對ThreadLoacl比較熟悉的話株灸,相信沒啥問題。currentPostingThreadState.get();會返回該線程的一份實(shí)例擎值,并不需要進(jìn)行set慌烧,因?yàn)閯?chuàng)建ThreadLocal的時(shí)候重載了:
private final ThreadLocal<PostingThreadState> currentPostingThreadState = new ThreadLocal<PostingThreadState>() {
@Override
protected PostingThreadState initialValue() {
return new PostingThreadState();
}
};
當(dāng)get不到實(shí)例的時(shí)候,ThreadLocal內(nèi)部會調(diào)用initialValue來初始化實(shí)例
PostingThreadState只是一個(gè)封裝了線程發(fā)布狀態(tài)的類:
/** For ThreadLocal, much faster to set (and get multiple values). */
final static class PostingThreadState {
//事件對壘
final List<Object> eventQueue = new ArrayList<>();
//是否正在發(fā)布中
boolean isPosting;
//是否是主線程
boolean isMainThread;
//訂閱方法
Subscription subscription;
//具體的事件
Object event;
//是否被取消
boolean canceled;
}
那么上面的post方法中重點(diǎn)就是繼續(xù)追蹤postSingleEvent方法了鸠儿,源碼如下:
private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {
Class<?> eventClass = event.getClass();
boolean subscriptionFound = false;
//還記得上篇講的eventInheritance的作用么
//就是說是否支持事件繼承屹蚊, A extends B 那么發(fā)出事件A后 訂閱了事件B的需不需要收到
if (eventInheritance) {
//查找出事件的繼承鏈,也就是關(guān)于所有eventClass的父類
List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);
int countTypes = eventTypes.size();
//遍歷該繼承鏈
for (int h = 0; h < countTypes; h++) {
Class<?> clazz = eventTypes.get(h);
//調(diào)用postSingleEventForEventType进每,是下一步的核心
//傳入事件汹粤、狀態(tài)、事件類型
//注意subscriptionFound取得方法返回值田晚,用“或等于”的形式嘱兼,也就是只要一次是true那么后面都是為true了
subscriptionFound |= postSingleEventForEventType(event, postingState, clazz);
}
} else {
//如果不支持繼承,那么就不用查找出父類了贤徒,直接調(diào)用
subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);
}
//subscriptionFound字面也好理解芹壕,就是調(diào)用postSingleEventForEventType方法后得到個(gè)是否有找到訂閱類的意思
//如果沒找到的話
if (!subscriptionFound) {
//上篇解析的logNoSubscriberMessages作用在這里
if (logNoSubscriberMessages) {
logger.log(Level.FINE, "No subscribers registered for event " + eventClass);
}
//sendNoSubscriberEvent這個(gè)在上篇也講過了,是當(dāng)消息事件發(fā)送出去后沒有對應(yīng)的處理方法時(shí)接奈,是否發(fā)送 NoSubscriberEvent 事件
//當(dāng)然踢涌,事件類型本身也不能是NoSubscriberEvent,要不繼續(xù)post到這里就死循環(huán)了
//事件也不能是SubscriberExceptionEvent這個(gè)類序宦,因?yàn)檫@個(gè)是異常類也會因?yàn)閟endSubscriberExceptionEvent=true時(shí)會發(fā)送出來
//所以要先確保事件不是這兩個(gè)事件才可以發(fā)出沒有被處理的消息
if (sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class &&
eventClass != SubscriberExceptionEvent.class) {
post(new NoSubscriberEvent(this, event));
}
}
}
上面的代碼睁壁,一開始通過eventInheritance判斷是否支持繼承,如果支持則需要取出所有父類,然后都是調(diào)用postSingleEventForEventType
該方法返回是否找到訂閱類潘明,如果都沒找到的話糠惫,則通過logNoSubscriberMessages判斷是否需要打印,通過sendNoSubscriberEvent判斷是否需要發(fā)送消息未處理的事件
這些eventInheritance钉疫、logNoSubscriberMessages硼讽、sendNoSubscriberEvent變量開關(guān)我們都是在上篇在構(gòu)造自定義的EventBus時(shí)講過了
接下來我們需要追蹤的方法,那就是postSingleEventForEventType方法了牲阁,源碼如下:
private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) {
CopyOnWriteArrayList<Subscription> subscriptions;
//同步鎖固阁,其他線程會等待,不然同時(shí)讀寫會有問題
synchronized (this) {
//subscriptionsByEventType在上篇講過,作用是記錄所有的事件以及事件對應(yīng)的訂閱列表城菊,
//通過這個(gè)集合就可以查看到具體都有哪些事件备燃,每個(gè)事件具體都被哪些類所訂閱
//于是我們通過事件類型來獲取對應(yīng)的訂閱列表
subscriptions = subscriptionsByEventType.get(eventClass);
}
//做下判空
if (subscriptions != null && !subscriptions.isEmpty()) {
for (Subscription subscription : subscriptions) {
//在該線程的事件狀態(tài)記錄當(dāng)前處理的事件
postingState.event = event;
//在該線程的事件狀態(tài)記錄當(dāng)前處理的訂閱類
postingState.subscription = subscription;
//是否打斷
boolean aborted = false;
try {
//拿到訂閱類和要發(fā)布的事件后,通過postToSubscription來回調(diào)出訂閱類的訂閱方法
//下面要講的核心方法
postToSubscription(subscription, event, postingState.isMainThread);
//是否取消
aborted = postingState.canceled;
} finally {
//每次對每個(gè)訂閱類回調(diào)訂閱方法后都重置下狀態(tài)
postingState.event = null;
postingState.subscription = null;
postingState.canceled = false;
}
//如果事件狀態(tài)是取消的凌唬,那就被打斷了并齐。不繼續(xù)發(fā)了
if (aborted) {
break;
}
}
//因?yàn)檎业搅擞嗛嗩惱┙瑁苑祷豻rue 需要注意的是即使事件被取消扇谣,只要有訂閱類那就是返回true
return true;
}
//沒找到訂閱類返回false
return false;
}
上面的方法已經(jīng)追蹤到了該事件具體的訂閱列表,然后遍歷這個(gè)訂閱列表去回調(diào)對應(yīng)的事件訂閱方法渊季,也就是postToSubscription方法的作用更耻,源碼如下:
//這個(gè)源碼是不是看上去有點(diǎn)熟悉测垛,因?yàn)樗膕witch正是對應(yīng)了注解參數(shù)中的threadmode
private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
//subscription.subscriberMethod則是訂閱方法,判斷這個(gè)訂閱方法所注解的線程模式
switch (subscription.subscriberMethod.threadMode) {
//POSTING就是默認(rèn)的秧均,因?yàn)樽⒔忸怱ubscribe里面定義了默認(rèn)是ThreadMode.POSTING
//POSTING是最簡單的食侮,也就是直接在該線程上發(fā)布事件,不管該線程上是啥線程
case POSTING:
//對訂閱方法反射調(diào)用 下面將會繼續(xù)追蹤
invokeSubscriber(subscription, event);
break;
//MAIN則對應(yīng)了線程模式為主線程
case MAIN:
//如果當(dāng)前正好是主線程 則直接調(diào)用訂閱方法
if (isMainThread) {
invokeSubscriber(subscription, event);
} else {
//否則插入到mainThreadPoster中目胡,mainThreadPoster我們在上篇講過锯七。它也是一個(gè)隊(duì)列,記錄的是當(dāng)前主線程正在發(fā)布的事件隊(duì)列
//關(guān)于enqueue流程我們下面再繼續(xù)追蹤
mainThreadPoster.enqueue(subscription, event);
}
break;
//MAIN_ORDERED也是在主線程 咋看跟MAIN貌似沒區(qū)別誉己,但是區(qū)別還是有
//它指明了事件序列需要嚴(yán)格按照串行的順序來執(zhí)行
//不管當(dāng)前是不是主線程眉尸,都要把事件放到mainThreadPoster里面等待處理
case MAIN_ORDERED:
// 假如mainThreadPoster為空(在上篇講過,
// 用戶可以自定義傳入一個(gè)實(shí)現(xiàn)了MainThreadSupport接口的對象巫延,接口中實(shí)現(xiàn)了一個(gè)createPoster方法效五,如果我們把該方法返回null這里就為null了)
if (mainThreadPoster != null) {
mainThreadPoster.enqueue(subscription, event);
} else {
// temporary: technically not correct as poster not decoupled from subscriber
invokeSubscriber(subscription, event);
}
break;
//后臺線程模式地消,也就是非主線程發(fā)布炉峰。后臺模式只是確保發(fā)布過程不在主線程,并不限定在哪個(gè)線程發(fā)布
case BACKGROUND:
//這個(gè)邏輯跟主線程是反過來的脉执,當(dāng)是主線程時(shí)則進(jìn)backgroundPoster的隊(duì)列中
if (isMainThread) {
backgroundPoster.enqueue(subscription, event);
} else {
invokeSubscriber(subscription, event);
}
break;
//事件異步模式疼阔,每個(gè)事件執(zhí)行的前后沒有順序。
//直接進(jìn)asyncPoster的隊(duì)列中。下面會繼續(xù)追蹤各個(gè)Poster的隊(duì)列發(fā)布方式
case ASYNC:
asyncPoster.enqueue(subscription, event);
break;
//如果設(shè)定其他threadmode婆廊,那是不識別的迅细,會拋異常
default:
throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
}
}
上面的方法是比較核心的。它指定了需要在什么線程模式上發(fā)布淘邻,其中核心的方法是invokeSubscriber方法茵典,另外核心的是幾個(gè)poster類
我們可以看到有些是直接就調(diào)用invokeSubscriber方法進(jìn)行反射調(diào)用,有些則是直接進(jìn)隊(duì)宾舅,并沒有看到方法反射统阿,那么事件在哪里調(diào)用呢,肯定enqueue這個(gè)方法會什么操作
這里我們先說下invokeSubscriber方法筹我,源碼如下:
void invokeSubscriber(Subscription subscription, Object event) {
try {
//就是進(jìn)行一個(gè)反射操作扶平,拿到該方法載入調(diào)用
subscription.subscriberMethod.method.invoke(subscription.subscriber, event);
} catch (InvocationTargetException e) {
//異常時(shí)的處理
handleSubscriberException(subscription, event, e.getCause());
} catch (IllegalAccessException e) {
throw new IllegalStateException("Unexpected exception", e);
}
}
//反射調(diào)用事件訂閱方法時(shí)異常,則調(diào)用此方法
private void handleSubscriberException(Subscription subscription, Object event, Throwable cause) {
if (event instanceof SubscriberExceptionEvent) {
if (logSubscriberExceptions) {
// Don't send another SubscriberExceptionEvent to avoid infinite event recursion, just log
logger.log(Level.SEVERE, "SubscriberExceptionEvent subscriber " + subscription.subscriber.getClass()
+ " threw an exception", cause);
SubscriberExceptionEvent exEvent = (SubscriberExceptionEvent) event;
logger.log(Level.SEVERE, "Initial event " + exEvent.causingEvent + " caused exception in "
+ exEvent.causingSubscriber, exEvent.throwable);
}
} else {
if (throwSubscriberException) {
throw new EventBusException("Invoking subscriber failed", cause);
}
if (logSubscriberExceptions) {
logger.log(Level.SEVERE, "Could not dispatch event: " + event.getClass() + " to subscribing class "
+ subscription.subscriber.getClass(), cause);
}
//在方法的最后會判斷sendSubscriberExceptionEvent=true后發(fā)布一個(gè)事件異常的事件
//還記得前面我們說的沒找到訂閱類時(shí)的異常么蔬蕊,在那里需要判斷事件類型不為SubscriberExceptionEvent结澄,正是因?yàn)檫@里可能會發(fā)出一個(gè)SubscriberExceptionEvent事件
if (sendSubscriberExceptionEvent) {
SubscriberExceptionEvent exEvent = new SubscriberExceptionEvent(this, cause, event,
subscription.subscriber);
post(exEvent);
}
}
}
invokeSubscriber沒啥,就是反射載入方法岸夯。如果我們使用的是默認(rèn)的線程模式麻献,則到這里就調(diào)用invokeSubscriber方法后便回調(diào)了訂閱方法了
重點(diǎn)還是mainThreadPoster、backgroundPoster猜扮、asyncPoster這幾個(gè)Poster
EventBus# mainThreadPoster
首先赎瑰,它實(shí)現(xiàn)了Poster接口:
/**
* Posts events.
*
* @author William Ferguson
*/
interface Poster {
/**
* Enqueue an event to be posted for a particular subscription.
*
* @param subscription Subscription which will receive the event.
* @param event Event that will be posted to subscribers.
*/
void enqueue(Subscription subscription, Object event);
}
Poster只有一個(gè)enqueue方法,表明一個(gè)入隊(duì)操作破镰。三個(gè)Poster實(shí)現(xiàn)類我們來一個(gè)一個(gè)看餐曼,首先看下mainThreadPoster,
mainThreadPoster在EventBus中創(chuàng)建的時(shí)候是這樣的:
mainThreadSupport = builder.getMainThreadSupport();
mainThreadPoster = mainThreadSupport != null ? mainThreadSupport.createPoster(this) : null;
追蹤EventBusBuilder#getMainThreadSupport方法的源碼是:
MainThreadSupport getMainThreadSupport() {
if (mainThreadSupport != null) {
return mainThreadSupport;
} else if (Logger.AndroidLogger.isAndroidLogAvailable()) {
//默認(rèn)會返回一個(gè)AndroidHandlerMainThreadSupport類鲜漩,并傳入主線程的一個(gè)Looper
Object looperOrNull = getAndroidMainLooperOrNull();
return looperOrNull == null ? null :
new MainThreadSupport.AndroidHandlerMainThreadSupport((Looper) looperOrNull);
} else {
return null;
}
}
AndroidHandlerMainThreadSupport類是在MainThreadSupport類中定義的:
class AndroidHandlerMainThreadSupport implements MainThreadSupport {
private final Looper looper;
public AndroidHandlerMainThreadSupport(Looper looper) {
this.looper = looper;
}
@Override
public boolean isMainThread() {
return looper == Looper.myLooper();
}
默認(rèn)返回的Poster是一個(gè)HandlerPoster
@Override
public Poster createPoster(EventBus eventBus) {
return new HandlerPoster(eventBus, looper, 10);
}
}
由此可看出源譬,當(dāng)我們并沒有自定義一個(gè)Poster的時(shí)候,默認(rèn)的mainThreadPoster對象是一個(gè)HandlerPoster孕似。
HandlerPoster繼承自我們熟悉的Handler踩娘,并實(shí)現(xiàn)了Poster接口。
最重要的enqueue 的源碼如下:
public void enqueue(Subscription subscription, Object event) {
//PendingPost在入隊(duì)之前喉祭,還將事件和訂閱類封裝在PendingPost中养渴,下面會講解
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
synchronized (this) {
//實(shí)際入隊(duì)是這個(gè),封裝成一個(gè)PendingPost對象后送入隊(duì)列來
//對象queue在HnadlerPoster構(gòu)造方法中是被賦值了一個(gè)PendingPostQueue對象
//篇幅原因不展開PendingPostQueue泛烙,它只是模擬了隊(duì)列的入隊(duì)出隊(duì)的操作
queue.enqueue(pendingPost);
//暫時(shí)理解下理卑,如果當(dāng)前正在處理消息,則不做操作
//如果當(dāng)前Hnandler是空閑的蔽氨,那么發(fā)送個(gè)消息出去藐唠,消息是一個(gè)來自Handler#obtainMessage帆疟,一個(gè)空的消息
//這有什么用呢?那就得來看看是怎么處理消息的 也Handler的handlerMessage方法
if (!handlerActive) {
handlerActive = true;
if (!sendMessage(obtainMessage())) {
throw new EventBusException("Could not send handler message");
}
}
}
}
當(dāng)我們發(fā)送消息出來宇立,會出發(fā)Handler回調(diào)handleMessage方法踪宠,實(shí)現(xiàn)如下:
@Override
public void handleMessage(Message msg) {
boolean rescheduled = false;
try {
long started = SystemClock.uptimeMillis();
while (true) {
//取出消息
PendingPost pendingPost = queue.poll();
//如果沒取到
if (pendingPost == null) {
//注意這里使用了同步快再使用了一次,為什么呢妈嘹?
//細(xì)心的同學(xué)可能發(fā)現(xiàn)在上面的enqueue方法中也用到了this這個(gè)鎖來同步進(jìn)入隊(duì)列
//那么考慮一種情況柳琢,鎖被enqueue方法中拿了,然后進(jìn)了隊(duì)列润脸,之后釋放鎖
//而這里剛好到這一步實(shí)際又沒取到染厅,所以同步代碼快只是為了讓enqueue釋放鎖的時(shí)候這里檢查一下是否隊(duì)列有元素
synchronized (this) {
// Check again, this time in synchronized
pendingPost = queue.poll();
//確認(rèn)過這個(gè)期間沒有任何入隊(duì)操作,那么設(shè)置handlerActive為false并退出循環(huán)
//表示當(dāng)前沒在處理消息了津函,是空閑的了
//enqueue方法中當(dāng)判斷handlerActive不為true肖粮,則會觸發(fā)一次消息發(fā)送處理到這里來處理事件
if (pendingPost == null) {
handlerActive = false;
return;
}
}
}
//不為空那就好辦了,直接調(diào)用invokeSubscriber反射調(diào)用訂閱方法了
eventBus.invokeSubscriber(pendingPost);
long timeInMethod = SystemClock.uptimeMillis() - started;
//在事件處理之前和處理之后尔苦,記錄了個(gè)時(shí)間差涩馆。如果超過了maxMillisInsideHandleMessage
//則返回 重新發(fā)送消息
//為什么要這么操作呢
//原因是當(dāng)前的Handler Looper那是主線程的消息循環(huán)啊
//主線程不能在那里等待所有事件處理完,那樣容易阻塞了主線程去處理其他事情了
//所以每處理一個(gè)事件超過了maxMillisInsideHandleMessage允坚,則重新發(fā)送個(gè)消息在下個(gè)消息處理魂那,讓主線程的其他任務(wù)得以執(zhí)行
//maxMillisInsideHandleMessages是多少呢?默認(rèn)在MainThreadSupport的內(nèi)部類AndroidHandlerMainThreadSupport中稠项,創(chuàng)建Handler的時(shí)候傳入的值是10
//如果我們是自己定義的Poster涯雅,則就是自己傳入的一個(gè)數(shù)額了
if (timeInMethod >= maxMillisInsideHandleMessage) {
if (!sendMessage(obtainMessage())) {
throw new EventBusException("Could not send handler message");
}
rescheduled = true;
return;
}
}
} finally {
handlerActive = rescheduled;
}
}
mainThreadPoster到這里就講完了,對于默認(rèn)的mainThreadPoster來說展运,可以理解為其實(shí)mainThreadPoster本身就是個(gè)持有主線程Looper的Handler
它把事件送入隊(duì)列來活逆,然后不斷的發(fā)送消息 自己處理消息的時(shí)候?qū)κ录瓷湔{(diào)用。達(dá)到了一個(gè)事件在主線程中發(fā)布的效果拗胜。之所以加了一層Handler蔗候,是因?yàn)橹骶€程需要防止阻塞的,不能長時(shí)間在處理某個(gè)任務(wù)埂软,導(dǎo)致其他的消息阻塞了
EventBus# backgroundPoster
追蹤得好累锈遥,接下來繼續(xù)講backgroundPoster的模式。同樣勘畔,我們先得看看backgroundPoster來自哪個(gè)類的對象:
backgroundPoster = new BackgroundPoster(this);
EventBus中是這么賦值的所灸,它并不像主線程的Poster會留出接口讓開發(fā)者自定義,
還有他并沒有繼承Handler,而是實(shí)現(xiàn)了Runnable炫七。因?yàn)楹笈_線程通常又不怕阻塞爬立,生命周期短,沒必要用Handler
所以源碼中采用的是線程池的方式
我們直接看BackgroundPoster類的enqueue方法诉字,因?yàn)槭录窃谶@個(gè)方法傳進(jìn)來的:
public void enqueue(Subscription subscription, Object event) {
//這里同樣用個(gè)PendingPost來封裝下事件和訂閱者
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
synchronized (this) {
//在同步代碼快中入隊(duì)
queue.enqueue(pendingPost);
//跟HandlerPoster的這里有點(diǎn)像懦尝,也采用了個(gè)executorRunning參數(shù)來判斷當(dāng)前是否在運(yùn)行(活躍中)
if (!executorRunning) {
executorRunning = true;
//如果當(dāng)前沒有在運(yùn)行中,則出發(fā)一次線程池來執(zhí)行
// 追蹤eventBus.getExecutorService()來自于默認(rèn)的 Executors.newCachedThreadPool()
//或者是自己用EventBusBuilder時(shí)傳入的ExecutorService
//既然交給了線程池壤圃,本身又是個(gè)Runnable 那么重點(diǎn)就在run方法了
eventBus.getExecutorService().execute(this);
}
}
}
我們看下run方法陵霉,源碼如下:
@Override
public void run() {
try {
try {
//循環(huán)取事件
while (true) {
//這里的1000是個(gè)啥,點(diǎn)擊進(jìn)去追蹤下發(fā)現(xiàn)是執(zhí)行一個(gè)等待操作
//也就是說這個(gè)方法是阻塞1000ms的
//為什么要阻塞1000ms伍绳?這里我個(gè)人認(rèn)為只是為了避免重復(fù)線程任務(wù)
//沒有這個(gè)阻塞的話踊挠,隊(duì)列完畢后則線程就結(jié)束了,下次得繼續(xù)執(zhí)行任務(wù)
PendingPost pendingPost = queue.poll(1000);
if (pendingPost == null) {
synchronized (this) {
// Check again, this time in synchronized
//雙層校驗(yàn)冲杀,避免這個(gè)時(shí)候有TMD剛好有入隊(duì)操作
pendingPost = queue.poll();
if (pendingPost == null) {
executorRunning = false;
return;
}
}
}
//拿到后立馬反射調(diào)用
eventBus.invokeSubscriber(pendingPost);
}
} catch (InterruptedException e) {
eventBus.getLogger().log(Level.WARNING, Thread.currentThread().getName() + " was interruppted", e);
}
} finally {
//這里也有必要說下效床,最終是設(shè)定標(biāo)記位為不在運(yùn)行中
//考慮一種情況,假如上面的事件循環(huán)被打斷了呢权谁,也就是到了InterruptedException怎么辦
//沒怎么辦剩檀,打斷就打算吧,反正要是打斷的話我也設(shè)置當(dāng)前不是在運(yùn)行中
//等線程池下次任務(wù)執(zhí)行又可以繼續(xù)事件循環(huán)了
//由此另一方面可以看出旺芽,在一次處理隊(duì)列中的消息并不一定都是由同個(gè)線程執(zhí)行
//EventBus保證的是事件按隊(duì)列方式先進(jìn)先出沪猴,串行的方式來執(zhí)行
executorRunning = false;
}
}
EventBus# asyncPoster
追蹤得好痛苦 ,老套路采章,先看下asyncPoster的創(chuàng)建:
asyncPoster = new AsyncPoster(this);
那就追蹤下AsyncPoster這個(gè)類运嗜,與BackgroundPoster類似,也是實(shí)現(xiàn)Runnable和Poster
enqueue和run方法如下:
public void enqueue(Subscription subscription, Object event) {
//看上去好簡單悯舟,直接入隊(duì)然后進(jìn)去線程池
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
queue.enqueue(pendingPost);
eventBus.getExecutorService().execute(this);
}
//直接取隊(duì)列后執(zhí)行担租,并不管哪個(gè)Runnable會被先執(zhí)行,反正先被線程池執(zhí)行的Runnable就先去取隊(duì)列然后反射調(diào)用訂閱方法
@Override
public void run() {
PendingPost pendingPost = queue.poll();
if(pendingPost == null) {
throw new IllegalStateException("No pending post available");
}
eventBus.invokeSubscriber(pendingPost);
}
這個(gè)簡單了吧抵怎,與BackgroundPoster相比奋救,相同的地方是同樣使用的是非主線程的方式。不同在于AsyncPoster中一個(gè)Runnable只會執(zhí)行一個(gè)事件
所以幾個(gè)事件一起送入任務(wù)中反惕,就不確保哪個(gè)先執(zhí)行了菠镇,這個(gè)正是異步事件的模式了。
講解完畢了承璃,
我們再來濾清下方法流程利耍,
首先從post方法,傳入具體事件
post方法里調(diào)用了postSingleEvent方法盔粹,傳入該事件隘梨、線程對應(yīng)的事件處理狀態(tài)
postSingleEvent方法用于查出事件的繼承鏈,根據(jù)每個(gè)事件類型來發(fā)布
postSingleEvent方法里調(diào)用了postSingleEventForEventType方法舷嗡,傳入該事件轴猎、線程對應(yīng)的事件處理狀態(tài)、事件類型
postSingleEventForEventType方法用于找出時(shí)間類型對應(yīng)的訂閱者列表进萄,對每個(gè)訂閱者進(jìn)行發(fā)布
postSingleEventForEventType方法里調(diào)用了postToSubscription方法捻脖,傳入訂閱者锐峭、事件、是否在主線程
postToSubscription方法用于區(qū)分在不同的線程模式下進(jìn)行反射調(diào)用訂閱方法
postToSubscription中調(diào)用了invokeSubscriber方法最終反射調(diào)用訂閱方法
或者是進(jìn)去不同的Poster的隊(duì)列
值得思考的地方:
1可婶、源碼中大量用了關(guān)于同步代碼塊的處理沿癞,有興趣的同學(xué)可以詳細(xì)的研究每個(gè)同步塊的初衷
2、異常處理
3矛渴、優(yōu)化處理
關(guān)于EventBus的源碼解析椎扬,有出錯(cuò)之處煩請留言指出。
或者您對源碼有什么疑問具温,請?jiān)谙路搅粞耘c我探討吧蚕涤。