示例代碼
// 注冊一個(gè)處理String類型的監(jiān)聽
@Subscribe(threadMode = ThreadMode.MAIN)
public void onHandleMessage(String a) {
mStater.setText(a);
}
// 注冊一個(gè)處理Msg類型的監(jiān)聽(Msg是我隨便寫的一個(gè)類锦聊,只有 m :String 的屬性)
@Subscribe(threadMode = ThreadMode.MAIN)
public void onHandleMsg(Msg a) {
mStater.setText(a.m);
}
存儲注冊
- 存儲消息處理器:subscriptionsByEventType
private final Map<Class<?>, CopyOnWriteArrayList<Subscription>> subscriptionsByEventType;
image.png
- 存儲注冊的類型 typesBySubscriber
為了在注銷監(jiān)聽時(shí)丛楚,將所有類型的的監(jiān)聽都注銷掉。
private final Map<Object, List<Class<?>>> typesBySubscriber;
image.png
消息分發(fā)
public void post(Object event) {
// 從ThreadLocal中獲取當(dāng)前Thread的消息分發(fā)隊(duì)列
PostingThreadState postingState = currentPostingThreadState.get();
List<Object> eventQueue = postingState.eventQueue;
eventQueue.add(event);
// 當(dāng)前線程的消息分發(fā)隊(duì)列處于空閑時(shí)徒溪,則開始分發(fā)
if (!postingState.isPosting) {
postingState.isMainThread = isMainThread();
postingState.isPosting = true;
if (postingState.canceled) {
throw new EventBusException("Internal error. Abort state was not reset");
}
try {
// 循環(huán)分發(fā)忿偷,直到消息分發(fā)隊(duì)列為空拧篮。
while (!eventQueue.isEmpty()) {
postSingleEvent(eventQueue.remove(0), postingState);
}
} finally {
postingState.isPosting = false;
postingState.isMainThread = false;
}
}
}
消息處理
private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
switch (subscription.subscriberMethod.threadMode) {
case POSTING: // 不切線程,哪個(gè)線程調(diào)用的post牵舱,就在哪個(gè)線程處理消息
invokeSubscriber(subscription, event);
break;
case MAIN:
if (isMainThread) {
// 如果主線程調(diào)用的post串绩,則直接調(diào)用方法出出力消息
invokeSubscriber(subscription, event);
} else {
// 調(diào)用主線程Poster
// 實(shí)際是一個(gè)主線程的Handler內(nèi)部維護(hù)一個(gè)PaddingPostQueue,內(nèi)部的消息可以依次分發(fā)
mainThreadPoster.enqueue(subscription, event);
}
break;
case MAIN_ORDERED:
// 默認(rèn)都走主線程Poster芜壁,因?yàn)閮?nèi)部的隊(duì)列可以保證順序
if (mainThreadPoster != null) {
mainThreadPoster.enqueue(subscription, event);
} else {
invokeSubscriber(subscription, event);
}
break;
case BACKGROUND:
if (isMainThread) {
// 如果是主線程礁凡,則入背后線程Poster
// 背后線程是一個(gè)Runnable,內(nèi)部包含一個(gè)PendingPostQueue
// 當(dāng)有消息進(jìn)來時(shí)慧妄,判斷Runnable狀態(tài)顷牌,如果非激活狀態(tài),則調(diào)用EventBus的線程池執(zhí)行該Runnable塞淹,依次處理隊(duì)列內(nèi)的消息窟蓝,所以每次提交線程池時(shí),消息處理是有序的
backgroundPoster.enqueue(subscription, event);
} else {
// 如果不是主線程饱普,哪里調(diào)用Post运挫,就在哪里處理消息
invokeSubscriber(subscription, event);
}
break;
case ASYNC:
// 背后線程是一個(gè)Runnable,內(nèi)部包含一個(gè)PendingPostQueue
// 當(dāng)有消息進(jìn)來時(shí)套耕,入隊(duì)谁帕,然后調(diào)用EventBus線程池執(zhí)行該Runnable,來一次消息冯袍,執(zhí)行一次
asyncPoster.enqueue(subscription, event);
break;
default:
throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
}
}
貼一個(gè)AsyncPoster的代碼吧
class AsyncPoster implements Runnable, Poster {
private final PendingPostQueue queue;
private final EventBus eventBus;
AsyncPoster(EventBus eventBus) {
this.eventBus = eventBus;
queue = new PendingPostQueue();
}
public void enqueue(Subscription subscription, Object event) {
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
queue.enqueue(pendingPost);
eventBus.getExecutorService().execute(this);
}
@Override
public void run() {
PendingPost pendingPost = queue.poll();
if(pendingPost == null) {
throw new IllegalStateException("No pending post available");
}
eventBus.invokeSubscriber(pendingPost);
}
}