Guava源碼解析之EventBus

最近看Elastic-Job源碼,看到它里面實現(xiàn)的任務(wù)運行軌跡的持久化关划,使用的是Guava的AsyncEventBus熄求,一個內(nèi)存級別的異步事件總線服務(wù)毕莱,實現(xiàn)了簡單的生產(chǎn)-消費者模式片任,從而在不影響任務(wù)執(zhí)行效率的基礎(chǔ)上荣德,將任務(wù)執(zhí)行和任務(wù)軌跡記錄解耦掏导,大大提高了EJ的性能嚎朽。

EventBus在Elastic-Job中的使用

EventBus的使用方法不難吧黄,具體可以參考EJ里面幾個相關(guān)的類:JobEventListener、JobEventBus和LiteJobFacade产场。主要的流程如下:

  • JobEventListener主要是消費者鹅髓。定義需要監(jiān)聽的方法,目前主要定義了兩個listen方法京景,注意想監(jiān)聽到的話窿冯,需要在方法前加上注解:@Subscribe和@AllowConcurrentEvents∪丰悖看字面意思就是訂閱和允許并發(fā)事件醒串。如果不加上后面那個注解,則會導(dǎo)致效率問題鄙皇,這個咱們后續(xù)分析芜赌。目前這個接口只有一個實現(xiàn)類JobEventRdbListener,實現(xiàn)了日志寫入DB的操作伴逸。
  • JobEventBus參考的EventBus源碼缠沈,提供了register和post方法,去掉了unregister方法违柏。主要的功能就是注冊監(jiān)聽器和生產(chǎn)消息博烂。他的構(gòu)造方法中,默認使用的是Guava的AsyncEventBus漱竖,初始化中同時包含了注冊動作。
  • LiteJobFacade主要是JobEventBus的使用者畜伐。主要調(diào)用的是JobEventBus的post方法馍惹。
    @Override
    public void postJobExecutionEvent(final JobExecutionEvent jobExecutionEvent) {
        jobEventBus.post(jobExecutionEvent);
    }
    
    @Override
    public void postJobStatusTraceEvent(final String taskId, final State state, final String message) {
        TaskContext taskContext = TaskContext.from(taskId);
        jobEventBus.post(new JobStatusTraceEvent(taskContext.getMetaInfo().getJobName(), taskContext.getId(),
                taskContext.getSlaveId(), Source.LITE_EXECUTOR, taskContext.getType(), taskContext.getMetaInfo().getShardingItems().toString(), state, message));
        if (!Strings.isNullOrEmpty(message)) {
            log.trace(message);
        }
    }

EventBus源碼分析

言歸正傳,我們來看看EventBus到底是如何實現(xiàn)觀察者模式的玛界。他的主要實現(xiàn)類都在com.google.common.eventbus這個包下面万矾。

主要類概念分析

我們首先來看一下里面比較重要的幾個類,同時理解一些概念慎框。

  • EventBus:這個類的作用有兩個良狈,一個是作為一個總線通道,另一個作用是消息的廣播笨枯。
  • AsyncEventBus:異步的EventBus薪丁,功能與EventBus類似遇西,只不過實現(xiàn)方式有所差異。
  • Subscriber:可以按照字面理解是訂閱者严嗜,也可以說是監(jiān)聽器粱檀。
  • SubscriberRegistry:訂閱注冊表。主要存儲的是Subcriber和Event之間的關(guān)系漫玄,用于消息分發(fā)時可以迅速根據(jù)Event的類型找到Subscriber茄蚯。
  • Dispatcher:事件分發(fā)器,定義了一些分發(fā)的策略睦优,里面包含三種分發(fā)器渗常。
  • 兩個重要的注解@Subscribe和@AllowConcurrentEvents。第一個是標(biāo)識監(jiān)聽器的方法汗盘,第二個與第一個配合使用凳谦,標(biāo)識允許多線程執(zhí)行。
  • DeadEvent:死信對象衡未,標(biāo)識沒有訂閱者關(guān)注的事件尸执。
  • SubscribeExceptionHandler:訂閱者拋出異常的處理器。SubscribeExceptionContext:訂閱者拋出異常的上下文對象缓醋。

EventBus

這個類有幾個屬性:

  private final String identifier;//唯一標(biāo)識如失,默認為default
  private final Executor executor;//多線程處理器,默認MoreExecutors.directExecutor()
  private final SubscriberExceptionHandler exceptionHandler;//異常處理器

  private final SubscriberRegistry subscribers = new SubscriberRegistry(this);//訂閱注冊表
  private final Dispatcher dispatcher;//消息分發(fā)器送粱,默認為Dispatcher.perThreadDispatchQueue()褪贵,單線程消息分發(fā)隊列

其中,identifier表示抗俄,同一個應(yīng)用中脆丁,可以根據(jù)identifier來區(qū)分不同的事件總線,只不過默認為default而已动雹。

EventBus主要定義了幾個方法:

注冊

public void register(Object object) {
    subscribers.register(object);
}

注冊的是自己定義的監(jiān)聽器槽卫,也就是listener。

取消注冊

public void unregister(Object object) {
    subscribers.unregister(object);
}

類似于注冊胰蝠。

消息廣播

public void post(Object event) {
    Iterator<Subscriber> eventSubscribers = subscribers.getSubscribers(event);
    if (eventSubscribers.hasNext()) {
      dispatcher.dispatch(event, eventSubscribers);
    } else if (!(event instanceof DeadEvent)) {
      // the event had no subscribers and was not itself a DeadEvent
      post(new DeadEvent(this, event));
    }
}

這塊主要是根據(jù)event事件類型歼培,來獲取事件的訂閱者,然后進行事件消息的分發(fā)茸塞。當(dāng)然躲庄,如果沒有訂閱者,也就是event的類型是DeadEvent钾虐,也會進行對應(yīng)的處理噪窘。

AsyncEventBus

繼承自EventBus,主要區(qū)別在于分發(fā)器效扫,使用的是Dispatcher.legacyAsync()倔监。這個后續(xù)咱們再分析直砂。

Subscriber

乍看這個類,就是訂閱者丐枉,其實我們看源碼就能理解哆键,當(dāng)一個訂閱類的多個方法用@Subscribe注解時,每個被注解的方法對應(yīng)的是一個訂閱者瘦锹。

構(gòu)造

這個類只是package內(nèi)可見籍嘹,沒有定義為public,可以通過靜態(tài)方法create來創(chuàng)建它弯院。

static Subscriber create(EventBus bus, Object listener, Method method) {
    return isDeclaredThreadSafe(method)
        ? new Subscriber(bus, listener, method)
        : new SynchronizedSubscriber(bus, listener, method);
}

這里傳入的method就是使用了@Subscribe注解的方法辱士,這塊會先判斷這個方法是否線程安全,即是否使用@AllowConcurrentEvent來進行注解听绳,來創(chuàng)建不同的Subscriber颂碘。唯一的差別是SynchronizedSubscriber中一個方法使用了synchronized來修飾。

dispatchEvent

  final void dispatchEvent(final Object event) {
    executor.execute(
        new Runnable() {
          @Override
          public void run() {
            try {
              invokeSubscriberMethod(event);
            } catch (InvocationTargetException e) {
              bus.handleSubscriberException(e.getCause(), context(event));
            }
          }
        });
  }

調(diào)用多線程來處理event椅挣。

invokeSubscriberMethod

@VisibleForTesting
void invokeSubscriberMethod(Object event) throws InvocationTargetException {
    try {
      method.invoke(target, checkNotNull(event));
    } catch (IllegalArgumentException e) {
      throw new Error("Method rejected target/argument: " + event, e);
    } catch (IllegalAccessException e) {
      throw new Error("Method became inaccessible: " + event, e);
    } catch (InvocationTargetException e) {
      if (e.getCause() instanceof Error) {
        throw (Error) e.getCause();
      }
      throw e;
    }
}

調(diào)用訂閱者的方法头岔。

SubscriberRegistry

我們之前在講到EventBus時,里面有兩個方法register和unregister鼠证,調(diào)用的就是這個類的方法峡竣。這個類的作用也講到,是存儲event和對應(yīng)的訂閱者的關(guān)系的量九。我們來看一下這個類的設(shè)計适掰。

屬性

private final ConcurrentMap<Class<?>, CopyOnWriteArraySet<Subscriber>> subscribers =
      Maps.newConcurrentMap();

@Weak private final EventBus bus;

這個類有兩個屬性。

  • 第一個是ConcurrentMap荠列,他的鍵是Class類类浪,也就是Event的類型,值是CopyOnWriteArraySet<Subscriber>肌似,也就是訂閱者费就。這個ConcurrentMap是Guava定義的并發(fā)Map,這個后續(xù)咱們有機會再分析锈嫩。
  • 第二個屬性就是EventBus受楼。

register

注冊監(jiān)聽器。

void register(Object listener) {
    Multimap<Class<?>, Subscriber> listenerMethods = findAllSubscribers(listener);

    for (Map.Entry<Class<?>, Collection<Subscriber>> entry : listenerMethods.asMap().entrySet()) {
      Class<?> eventType = entry.getKey();
      Collection<Subscriber> eventMethodsInListener = entry.getValue();

      CopyOnWriteArraySet<Subscriber> eventSubscribers = subscribers.get(eventType);

      if (eventSubscribers == null) {
        CopyOnWriteArraySet<Subscriber> newSet = new CopyOnWriteArraySet<Subscriber>();
        eventSubscribers =
            MoreObjects.firstNonNull(subscribers.putIfAbsent(eventType, newSet), newSet);
      }

      eventSubscribers.addAll(eventMethodsInListener);
    }
}

主要的邏輯是:

  • 獲取這個類中所有用@Subscribe注解的方法呼寸,存儲到Multimap中。
  • 遍歷Multimap猴贰,鍵為eventType对雪,然后根據(jù)這個鍵,從緩存中獲取這個事件對應(yīng)的訂閱者集合米绕。
  • 獲取到之后瑟捣,判斷集合是否為空馋艺,如果為空,新建一個集合來存儲迈套。

unregister

實現(xiàn)與register類似捐祠,先根據(jù)listener找到subscriber,找到需要監(jiān)聽的方法桑李,然后根據(jù)事件類型去移除subscriber踱蛀。

findAllSubscribers

獲取監(jiān)聽器中所有的監(jiān)聽方法。

private Multimap<Class<?>, Subscriber> findAllSubscribers(Object listener) {
    Multimap<Class<?>, Subscriber> methodsInListener = HashMultimap.create();
    Class<?> clazz = listener.getClass();
    for (Method method : getAnnotatedMethods(clazz)) {
      Class<?>[] parameterTypes = method.getParameterTypes();
      Class<?> eventType = parameterTypes[0];
      methodsInListener.put(eventType, Subscriber.create(bus, listener, method));
    }
    return methodsInListener;
}

findAllSubscribers用于查找事件類型以及事件處理器的對應(yīng)關(guān)系贵白。查找注解需要涉及到反射率拒,通過反射來獲取標(biāo)注在方法上的注解。因為Guava針對EventBus的注冊采取的是“隱式契約”而非接口這種“顯式契約”禁荒。而類與接口是存在繼承關(guān)系的猬膨,所有很有可能某個訂閱者其父類(或者父類實現(xiàn)的某個接口)也訂閱了某個事件。因此這里的查找需要順著繼承鏈向上查找父類的方法是否也被注解標(biāo)注呛伴。

getSubscribes

獲取event的訂閱者勃痴。

Iterator<Subscriber> getSubscribers(Object event) {
    ImmutableSet<Class<?>> eventTypes = flattenHierarchy(event.getClass());

    List<Iterator<Subscriber>> subscriberIterators =
        Lists.newArrayListWithCapacity(eventTypes.size());

    for (Class<?> eventType : eventTypes) {
      CopyOnWriteArraySet<Subscriber> eventSubscribers = subscribers.get(eventType);
      if (eventSubscribers != null) {
        // eager no-copy snapshot
        subscriberIterators.add(eventSubscribers.iterator());
      }
    }

    return Iterators.concat(subscriberIterators.iterator());
  }

Dispatcher

分發(fā)器,用于將event分發(fā)給subscriber热康。它內(nèi)部實現(xiàn)了三種不同類型的分發(fā)器沛申,用于不同的情況下事件的順序性。它的核心方法是:

abstract void dispatch(Object event, Iterator<Subscriber> subscribers);

它的三種實現(xiàn):

PerThreadQueuedDispatcher

EventBus默認使用的分發(fā)器褐隆。它的實現(xiàn)是通過ThreadLocal來實現(xiàn)一個事件隊列污它,每個線程包含一個這樣的內(nèi)部隊列。

它的分發(fā)代碼如下:

void dispatch(Object event, Iterator<Subscriber> subscribers) {
  checkNotNull(event);
  checkNotNull(subscribers);
  Queue<Event> queueForThread = queue.get();
  queueForThread.offer(new Event(event, subscribers));

  if (!dispatching.get()) {
    dispatching.set(true);
    try {
      Event nextEvent;
      while ((nextEvent = queueForThread.poll()) != null) {
        while (nextEvent.subscribers.hasNext()) {
          nextEvent.subscribers.next().dispatchEvent(nextEvent.event);
        }
      }
    } finally {
      dispatching.remove();
      queue.remove();
    }
 }
}

嵌套兩層循環(huán)庶弃,第一層事件不為空衫贬,第二層該事件下的訂閱者不為空,則分發(fā)事件下去歇攻。

LegacyAsyncDispatcher

AsyncEventBus使用的分發(fā)器固惯。它在內(nèi)部通過一個ConcurrentLinkedQueue<EventWithSubscriber>的全局隊列來存儲事件。他和PerThreadQueuedDispatcher的主要區(qū)別在于分發(fā)循環(huán)這塊缴守。

void dispatch(Object event, Iterator<Subscriber> subscribers) {
  checkNotNull(event);
  while (subscribers.hasNext()) {
    queue.add(new EventWithSubscriber(event, subscribers.next()));
  }

  EventWithSubscriber e;
  while ((e = queue.poll()) != null) {
    e.subscriber.dispatchEvent(e.event);
  }
}

是一前一后兩個循環(huán)葬毫。前面一個是遍歷事件訂閱處理器,并構(gòu)建一個事件實體對象存入隊列屡穗。后一個循環(huán)是遍歷該事件實體對象隊列贴捡,取出事件實體對象中的事件進行分發(fā)。

ImmediateDispatcher

同步分發(fā)器村砂。

void dispatch(Object event, Iterator<Subscriber> subscribers) {
  checkNotNull(event);
  while (subscribers.hasNext()) {
    subscribers.next().dispatchEvent(event);
  }
}

總結(jié)

Elastic-Job使用的EventBus烂斋,可以說很好的對任務(wù)的運行和軌跡記錄進行了解耦,借鑒了Guava的思想,將代碼優(yōu)雅發(fā)揮到了新的境界汛骂。當(dāng)然罕模,Guava對EventBus的設(shè)計思想是我們需要進行學(xué)習(xí)和使用的。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末帘瞭,一起剝皮案震驚了整個濱河市淑掌,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌蝶念,老刑警劉巖抛腕,帶你破解...
    沈念sama閱讀 211,042評論 6 490
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異祸轮,居然都是意外死亡兽埃,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 89,996評論 2 384
  • 文/潘曉璐 我一進店門适袜,熙熙樓的掌柜王于貴愁眉苦臉地迎上來柄错,“玉大人,你說我怎么就攤上這事苦酱∈勖玻” “怎么了?”我有些...
    開封第一講書人閱讀 156,674評論 0 345
  • 文/不壞的土叔 我叫張陵疫萤,是天一觀的道長颂跨。 經(jīng)常有香客問我,道長扯饶,這世上最難降的妖魔是什么恒削? 我笑而不...
    開封第一講書人閱讀 56,340評論 1 283
  • 正文 為了忘掉前任,我火速辦了婚禮尾序,結(jié)果婚禮上钓丰,老公的妹妹穿的比我還像新娘。我一直安慰自己每币,他們只是感情好携丁,可當(dāng)我...
    茶點故事閱讀 65,404評論 5 384
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著兰怠,像睡著了一般梦鉴。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上揭保,一...
    開封第一講書人閱讀 49,749評論 1 289
  • 那天肥橙,我揣著相機與錄音,去河邊找鬼秸侣。 笑死快骗,一個胖子當(dāng)著我的面吹牛娜庇,可吹牛的內(nèi)容都是我干的塔次。 我是一名探鬼主播方篮,決...
    沈念sama閱讀 38,902評論 3 405
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼励负!你這毒婦竟也來了藕溅?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 37,662評論 0 266
  • 序言:老撾萬榮一對情侶失蹤继榆,失蹤者是張志新(化名)和其女友劉穎巾表,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體略吨,經(jīng)...
    沈念sama閱讀 44,110評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡集币,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,451評論 2 325
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了翠忠。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片鞠苟。...
    茶點故事閱讀 38,577評論 1 340
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖秽之,靈堂內(nèi)的尸體忽然破棺而出当娱,到底是詐尸還是另有隱情,我是刑警寧澤考榨,帶...
    沈念sama閱讀 34,258評論 4 328
  • 正文 年R本政府宣布跨细,位于F島的核電站,受9級特大地震影響河质,放射性物質(zhì)發(fā)生泄漏冀惭。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 39,848評論 3 312
  • 文/蒙蒙 一掀鹅、第九天 我趴在偏房一處隱蔽的房頂上張望散休。 院中可真熱鬧,春花似錦淫半、人聲如沸溃槐。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,726評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽昏滴。三九已至,卻和暖如春对人,著一層夾襖步出監(jiān)牢的瞬間谣殊,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,952評論 1 264
  • 我被黑心中介騙來泰國打工牺弄, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留姻几,地道東北人。 一個月前我還...
    沈念sama閱讀 46,271評論 2 360
  • 正文 我出身青樓,卻偏偏與公主長得像蛇捌,于是被迫代替她去往敵國和親抚恒。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 43,452評論 2 348

推薦閱讀更多精彩內(nèi)容

  • EventBus源碼分析(一) EventBus官方介紹為一個為Android系統(tǒng)優(yōu)化的事件訂閱總線络拌,它不僅可以很...
    蕉下孤客閱讀 3,979評論 4 42
  • 對于Android開發(fā)老司機來說肯定不會陌生俭驮,它是一個基于觀察者模式的事件發(fā)布/訂閱框架,開發(fā)者可以通過極少的代碼...
    飛揚小米閱讀 1,473評論 0 50
  • 簡單的使用 EventBus是greenrobot在Android平臺發(fā)布的一款以訂閱——發(fā)布模式為核心的開源庫春贸。...
    最最最最醉人閱讀 756評論 0 13
  • EventBus源碼分析(二) 在之前的一篇文章EventBus源碼分析(一)分析了EventBus關(guān)于注冊注銷以...
    蕉下孤客閱讀 1,651評論 0 10
  • 原文鏈接:http://blog.csdn.net/u012810020/article/details/7005...
    tinyjoy閱讀 542評論 1 5