JAVA進階篇(10)—Guava實現(xiàn)的EventBus(調度算法源碼分析)

1. 使用方式

  1. 引入依賴
<dependency>
    <groupId>com.google.guava</groupId>
    <artifactId>guava</artifactId>
    <version>27.0.1-jre</version>
</dependency>
  1. 定義被觀察者類

由該類觸發(fā)事件通知:

public class TestBus {

    /**
     * EventBus扶欣,默認使用PerThreadQueuedDispatcher分發(fā)器(該分發(fā)器內部維護的Executor是執(zhí)行執(zhí)行線程run方法飒箭,即使用主線程執(zhí)行監(jiān)聽方法)。
     * 該分發(fā)器是每個線程內部維護了一個queue波势。
     * 每個線程互不干擾(都利于本身線程去串行的執(zhí)行觀察者的方法)
     *
     */
    public static void testPerThreadQueuedDispatcher(){
        EventBus eventBus = new EventBus();

        //觀察者1
        DataObserver1 observer1 = new DataObserver1();
        //觀察者2
        DataObserver2 observer2 = new DataObserver2();
        
        eventBus.register(observer2);
        eventBus.register(observer1);

        Thread t1 = new Thread(() -> {
            eventBus.post("信息1率挣;");
            eventBus.post("信息5清寇;");
        });

        Thread t2 = new Thread(() -> {
            eventBus.post("信息2;");
        });

        Thread t3 = new Thread(() -> {
            eventBus.post(123);
        });
        
        t1.start();
        t2.start();
        try {
            Thread.sleep(20);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        t3.start();
    }

}
  1. 定義多個觀察者
@Slf4j
public class DataObserver2 {
    /**
     * post() 不支持自動裝箱功能,只能使用Integer,不能使用int,否則handlersByType的Class會是int而不是Intege
     * 而傳入的int msg參數(shù)在post(int msg)的時候會被包裝成Integer,導致無法匹配到
     */
    @Subscribe
    public void func(Integer msg) {
        log.info("Integer msg: " + msg);
    }
}
@Slf4j
public class DataObserver1 {
    /**
     * 只有通過@Subscribe注解的方法才會被注冊進EventBus
     * 而且方法有且只能有1個參數(shù)
     *
     * @param msg
     */
    @Subscribe
//    @AllowConcurrentEvents
    public void func(String msg) throws InterruptedException {
        log.info("消息開始~:" + msg);
        Thread.sleep(2000);
        log.info("消息結束~:" + msg);
    }
}

使用原理:觀察者對象注冊到EventBus中做葵,而EventBus會通過反射解析觀察者及其父類對象是否存在@Subscribe注解占哟,若是存在,則維護一個Map(key是對應方法的參數(shù)類型酿矢,value是Subscriber對象)榨乎。
當被觀察者通過post()方法發(fā)送事件后,會解析事件的類型瘫筐,找打對應的Subscriber(消費者對象)蜜暑。然后循環(huán)通過反射調用對應的觀察者方法。完成事件通知策肝。

2. EventBus源碼分析

事件總線的配置:

@Beta
public class EventBus {

  private static final Logger logger = Logger.getLogger(EventBus.class.getName());
  //id標識符
  private final String identifier;
  //發(fā)送事件的線程池
  private final Executor executor;
  //訂閱者異常處理器
  private final SubscriberExceptionHandler exceptionHandler;
  //訂閱者解析器
  private final SubscriberRegistry subscribers = new SubscriberRegistry(this);
  //分發(fā)器策略
  private final Dispatcher dispatcher;
  ...
}

查看其構造方法:

public class EventBus {

 ...
  /** Creates a new EventBus named "default". */
  public EventBus() {
    this("default");
  }

  public EventBus(String identifier) {
    this(
        identifier,
        MoreExecutors.directExecutor(),
        Dispatcher.perThreadDispatchQueue(),
        LoggingHandler.INSTANCE);
  }

  public EventBus(SubscriberExceptionHandler exceptionHandler) {
    this(
        "default",
        MoreExecutors.directExecutor(),
        Dispatcher.perThreadDispatchQueue(),
        exceptionHandler);
  }

  EventBus(
      String identifier,
      Executor executor,
      Dispatcher dispatcher,
      SubscriberExceptionHandler exceptionHandler) {
    this.identifier = checkNotNull(identifier);
    this.executor = checkNotNull(executor);
    this.dispatcher = checkNotNull(dispatcher);
    this.exceptionHandler = checkNotNull(exceptionHandler);
  }
}

可以看到肛捍,EventBus對外暴露的構造方法隐绵,只能去修改identifierexceptionHandler兩個參數(shù)。

  • 發(fā)送事件的線程池executor使用的是MoreExecutors.directExecutor()拙毫;
  • 消息的轉發(fā)器dispatcher使用的是Dispatcher.perThreadDispatchQueue()氢橙;

executordispatcher兩個參數(shù)決定了什么呢?

public class EventBus {

  public void post(Object event) {
    //通過事件恬偷,找到所有的訂閱者悍手。
    Iterator<Subscriber> eventSubscribers = subscribers.getSubscribers(event);
    //存在訂閱者
    if (eventSubscribers.hasNext()) {
      //使用dispatcher去分發(fā)消息
      dispatcher.dispatch(event, eventSubscribers);
    } else if (!(event instanceof DeadEvent)) {
      // the event had no subscribers and was not itself a DeadEvent
      post(new DeadEvent(this, event));
    }
  }
}

默認:Dispatcher.perThreadDispatchQueue()的作用:

每一個線程內部都有一個queue,從而保證單線程中消息的有序性袍患。

  private static final class PerThreadQueuedDispatcher extends Dispatcher {

    // This dispatcher matches the original dispatch behavior of EventBus.

    /** Per-thread queue of events to dispatch. */
    private final ThreadLocal<Queue<Event>> queue =
        new ThreadLocal<Queue<Event>>() {
          @Override
          protected Queue<Event> initialValue() {
            return Queues.newArrayDeque();
          }
        };

    /** Per-thread dispatch state, used to avoid reentrant event dispatching. */
    private final ThreadLocal<Boolean> dispatching =
        new ThreadLocal<Boolean>() {
          @Override
          protected Boolean initialValue() {
            return false;
          }
        };

    @Override
    void dispatch(Object event, Iterator<Subscriber> subscribers) {
      checkNotNull(event);
      checkNotNull(subscribers);
      Queue<Event> queueForThread = queue.get();
      //放入隊尾【1】重入的線程事件會放入到隊列尾部
      queueForThread.offer(new Event(event, subscribers));
      //【1】線程再次重入后坦康,該方法!dispatching.get()為false,直接結束
      if (!dispatching.get()) {
        dispatching.set(true);
        try {
          Event nextEvent;
          //檢索并刪除此隊列的頭诡延,如果此隊列為空滞欠,則返回 null 。
          while ((nextEvent = queueForThread.poll()) != null) {
            //第一個事件通知給所有的訂閱者肆良,才會通知后續(xù)的消息筛璧。
            while (nextEvent.subscribers.hasNext()) {
              //當訂閱者中再次使用同一個EventBus發(fā)布消息,線程會沖入【1】
              nextEvent.subscribers.next().dispatchEvent(nextEvent.event);
            }
          }
        } finally {
          //單線程
          dispatching.remove();
          queue.remove();
        }
      }
    }
    //構建事件對象(隊列的元素)
    private static final class Event {
      private final Object event;
      private final Iterator<Subscriber> subscribers;

      private Event(Object event, Iterator<Subscriber> subscribers) {
        this.event = event;
        this.subscribers = subscribers;
      }
    }
  }

使用場景:

代碼A中發(fā)布事件(String類型)惹恃,B訂閱者收到消息后夭谤,在B中發(fā)布事件(Integer類型)。

該分發(fā)器會確保A事件通知給所有訂閱者才會執(zhí)行B事件(同一個線程中巫糙,訂閱者發(fā)布的事件要排到后面去執(zhí)行)朗儒。

默認:MoreExecutors.directExecutor()

class Subscriber {
  final void dispatchEvent(final Object event) {
    //分發(fā)事件,是使用的線程池
    executor.execute(
        new Runnable() {
          @Override
          public void run() {
            try {
              invokeSubscriberMethod(event);
            } catch (InvocationTargetException e) {
              bus.handleSubscriberException(e.getCause(), context(event));
            }
          }
        });
  }
}

而MoreExecutors.directExecutor()使用如下的線程池参淹,即訂閱者使用當前線程同步的處理事件醉锄。

enum DirectExecutor implements Executor {
  INSTANCE;

  @Override
  public void execute(Runnable command) {
    command.run();
  }

  @Override
  public String toString() {
    return "MoreExecutors.directExecutor()";
  }
}

訂閱者去使用EventBus的線程去消費消息,可以保證消息的有序性浙值。即先post的事件一定會先執(zhí)行恳不。

總結:

EventBus的特點:

  1. 單個線程上發(fā)布的所有事件都按其發(fā)布的順序被調度到所有訂閱服務器;
  2. 發(fā)布者和多個訂閱者使用同一個線程處理开呐⊙萄可能會影響發(fā)布者的性能,且某個訂閱者耗時负蚊,也會影響其他訂閱者神妹;

EventBus特點的場景:

    public static void testPre1(){
        //單例獲取到事件總線
        EventBus eventBus = EventBusCenter.getInstance();
        DataObserver1 observer1 = new DataObserver1();
        DataObserver2 observer2 = new DataObserver2();
        //注冊訂閱者1
        eventBus.register(observer1);
        //注冊訂閱者2
        eventBus.register(observer2);
        //通知訂閱者1
        eventBus.post("發(fā)送事件颓哮!");
    }

訂閱者1收到消息后家妆,通知訂閱者2。但是123事件會存儲在ThreadLocal<Queue>中冕茅,等待發(fā)送事件伤极!事件通知完所有的訂閱者蛹找,才開始通知123事件。

@Slf4j
public class DataObserver1 {
    /**
     * 只有通過@Subscribe注解的方法才會被注冊進EventBus
     * 而且方法有且只能有1個參數(shù)
     *
     * @param msg
     */
    @Subscribe
    public void func(String msg) throws InterruptedException {
        log.info("收到消息:{}", msg);
        EventBus eventBus = EventBusCenter.getInstance();
        eventBus.post(123);
    }
}
@Slf4j
public class DataObserver2 {
    /**
     * post() 不支持自動裝箱功能,只能使用Integer,不能使用int,否則handlersByType的Class會是int而不是Intege
     * 而傳入的int msg參數(shù)在post(int msg)的時候會被包裝成Integer,導致無法匹配到
     */
    @Subscribe
    public void func(Integer msg) {
        log.info("Integer msg: " + msg);
    }
}

3. AsyncEventBus源碼分析

構造方法:

  public AsyncEventBus(String identifier, Executor executor) {
    super(identifier, executor, Dispatcher.legacyAsync(), LoggingHandler.INSTANCE);
  }
  1. 可以傳入exector去異步發(fā)布消息哨坪。
  2. 只能使用Dispatcher.legacyAsync()去調度消息庸疾。
  private static final class LegacyAsyncDispatcher extends Dispatcher {

    // This dispatcher matches the original dispatch behavior of AsyncEventBus.
    //
    // We can't really make any guarantees about the overall dispatch order for this dispatcher in
    // a multithreaded environment for a couple reasons:
    //
    // 1. Subscribers to events posted on different threads can be interleaved with each other
    //    freely. (A event on one thread, B event on another could yield any of
    //    [a1, a2, a3, b1, b2], [a1, b2, a2, a3, b2], [a1, b2, b3, a2, a3], etc.)
    // 2. It's possible for subscribers to actually be dispatched to in a different order than they
    //    were added to the queue. It's easily possible for one thread to take the head of the
    //    queue, immediately followed by another thread taking the next element in the queue. That
    //    second thread can then dispatch to the subscriber it took before the first thread does.
    //
    // All this makes me really wonder if there's any value in queueing here at all. A dispatcher
    // that simply loops through the subscribers and dispatches the event to each would actually
    // probably provide a stronger order guarantee, though that order would obviously be different
    // in some cases.

    /** Global event queue. */
    //【注意:】若發(fā)布者產生消息的速度遠遠大于生產者消費消息的速度,此處容易造成OOM
    private final ConcurrentLinkedQueue<EventWithSubscriber> queue =
        Queues.newConcurrentLinkedQueue();

    @Override
    void dispatch(Object event, Iterator<Subscriber> subscribers) {
      checkNotNull(event);
      //先放入隊列中
      while (subscribers.hasNext()) {
        queue.add(new EventWithSubscriber(event, subscribers.next()));
      }

      EventWithSubscriber e;
      //隊頭取出当编,并刪除元素
      while ((e = queue.poll()) != null) {
        //使用配置線程池去發(fā)布事件届慈。
        e.subscriber.dispatchEvent(e.event);
      }
    }

    private static final class EventWithSubscriber {
      private final Object event;
      private final Subscriber subscriber;

      private EventWithSubscriber(Object event, Subscriber subscriber) {
        this.event = event;
        this.subscriber = subscriber;
      }
    }
  }
  1. 多線程事件先存儲到ConcurrentLinkedQueue中,然后在循環(huán)調用訂閱者忿偷。
  2. 可以自定義線程池可以異步的去處理事件金顿。
  3. (訂閱者發(fā)布的事件一定會在隊尾,但是可能會被別的線程先消費)故只能某些情況下可以保證事件按照發(fā)布的順序被調度到訂閱服務器鲤桥;
  4. 因為使用了ConcurrentLinkedQueue揍拆,所以可能會造成OOM。
  5. 性能沒有ImmediateDispatcher好茶凳。(采用了隊列)

源碼中注釋:(多線程下不能保證順序)嫂拴,所有這些讓我真的懷疑在這里排隊是否有任何價值。LegacyAsyncDispatcher它只是簡單地循環(huán)通過訂閱者并將事件分派給每個訂閱者贮喧。在某些情況下筒狠,可能會提供更強的順序保證,盡管順序明顯不同箱沦。

同一個線程窟蓝,A發(fā)布事件到訂閱者B,在訂閱者B中再次發(fā)布另一個事件到C饱普。線程會重入到dispatch方法运挫,會將B發(fā)布的事件放到隊列中(排隊)。繼續(xù)從隊列頭開始消費消息套耕。

【注意:該隊列是全局隊列谁帕,每一個線程都會消費其消息》肱郏】

4. ImmediateDispatcher源碼:

Guava沒有對應的EventBus匈挖,但是我們可以繼承EventBus類實現(xiàn)自定義的EventBus。

  /** Implementation of {@link #immediate()}. */
  private static final class ImmediateDispatcher extends Dispatcher {
    private static final ImmediateDispatcher INSTANCE = new ImmediateDispatcher();

    @Override
    void dispatch(Object event, Iterator<Subscriber> subscribers) {
      checkNotNull(event);
      //收到消息后康愤,直接遍歷所有的訂閱者
      while (subscribers.hasNext()) {
        //訂閱者可以使用線程池去執(zhí)行
        subscribers.next().dispatchEvent(event);
      }
    }
  }

特點:

  1. 沒有使用隊列儡循,但凡事件到達后立即使用去處理;
  2. 可以使用線程池異步的去消費消息征冷;
  3. 性能要比LegacyAsyncDispatcher好择膝;

總結

Guava的EventBus源碼還是比較簡單、清晰的检激。從源碼來看肴捉,它一反常用的Observer的設計方式腹侣,放棄采用統(tǒng)一的接口、統(tǒng)一的事件對象類型齿穗。轉而采用基于注解掃描的綁定方式傲隶。

其實無論是強制實現(xiàn)統(tǒng)一的接口,還是基于注解的實現(xiàn)方式都是在構建一種關聯(lián)關系(或者說滿足某種契約)窃页。很明顯接口的方式是編譯層面上強制的顯式契約跺株,而注解的方式則是運行時動態(tài)綁定的隱式契約關系。接口的方式是傳統(tǒng)的方式脖卖,編譯時確定觀察者關系帖鸦,清晰明了,但通常要求有一致的事件類型胚嘲、方法簽名作儿。而基于注解實現(xiàn)的機制,剛好相反馋劈,編譯時因為沒有接口的語法層面上的依賴關系攻锰,顯得不那么清晰,至少靜態(tài)分析工具很難展示觀察者關系妓雾,但無需一致的方法簽名娶吞、事件參數(shù),至于多個訂閱者類之間的繼承關系械姻,可以繼承接收事件的通知妒蛇,可以看作既是其優(yōu)點也是其缺點。

  1. EventBus需要注意:發(fā)布者和訂閱者使用同一個線程楷拳,可能會影響發(fā)布者的性能绣夺。但可以保證單線程中事件的發(fā)布順序和調度順序保持一致。
  2. AsyncEventBus需要注意的是:發(fā)布者和訂閱者可以使用不同的線程處理欢揖;發(fā)布事件時維護了一個LinkedQueue陶耍,若訂閱者消費速度慢,可能會造成內存溢出她混;采用全局隊列維護事件順序性烈钞,但不能完全保證調度和發(fā)布的順序;性能不如直接分發(fā)好坤按;
  3. guava的EventBus雖然通過注解的方式更加靈活毯欣,但是沒有接口的語法層面的依賴關系,代碼維護性臭脓、可讀性不是特別好酗钞。

推薦閱讀

Google-Guava-EventBus源碼解讀

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子算吩,更是在濱河造成了極大的恐慌,老刑警劉巖佃扼,帶你破解...
    沈念sama閱讀 219,039評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件偎巢,死亡現(xiàn)場離奇詭異,居然都是意外死亡兼耀,警方通過查閱死者的電腦和手機压昼,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,426評論 3 395
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來瘤运,“玉大人窍霞,你說我怎么就攤上這事≌兀” “怎么了但金?”我有些...
    開封第一講書人閱讀 165,417評論 0 356
  • 文/不壞的土叔 我叫張陵,是天一觀的道長郁季。 經(jīng)常有香客問我冷溃,道長,這世上最難降的妖魔是什么梦裂? 我笑而不...
    開封第一講書人閱讀 58,868評論 1 295
  • 正文 為了忘掉前任似枕,我火速辦了婚禮,結果婚禮上年柠,老公的妹妹穿的比我還像新娘凿歼。我一直安慰自己,他們只是感情好冗恨,可當我...
    茶點故事閱讀 67,892評論 6 392
  • 文/花漫 我一把揭開白布答憔。 她就那樣靜靜地躺著,像睡著了一般掀抹。 火紅的嫁衣襯著肌膚如雪攀唯。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,692評論 1 305
  • 那天渴丸,我揣著相機與錄音侯嘀,去河邊找鬼。 笑死谱轨,一個胖子當著我的面吹牛戒幔,可吹牛的內容都是我干的。 我是一名探鬼主播土童,決...
    沈念sama閱讀 40,416評論 3 419
  • 文/蒼蘭香墨 我猛地睜開眼诗茎,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了?” 一聲冷哼從身側響起敢订,我...
    開封第一講書人閱讀 39,326評論 0 276
  • 序言:老撾萬榮一對情侶失蹤王污,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后楚午,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體昭齐,經(jīng)...
    沈念sama閱讀 45,782評論 1 316
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 37,957評論 3 337
  • 正文 我和宋清朗相戀三年矾柜,在試婚紗的時候發(fā)現(xiàn)自己被綠了阱驾。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 40,102評論 1 350
  • 序言:一個原本活蹦亂跳的男人離奇死亡怪蔑,死狀恐怖里覆,靈堂內的尸體忽然破棺而出,到底是詐尸還是另有隱情缆瓣,我是刑警寧澤喧枷,帶...
    沈念sama閱讀 35,790評論 5 346
  • 正文 年R本政府宣布,位于F島的核電站弓坞,受9級特大地震影響割去,放射性物質發(fā)生泄漏。R本人自食惡果不足惜昼丑,卻給世界環(huán)境...
    茶點故事閱讀 41,442評論 3 331
  • 文/蒙蒙 一呻逆、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧菩帝,春花似錦咖城、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,996評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至握础,卻和暖如春辐董,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背禀综。 一陣腳步聲響...
    開封第一講書人閱讀 33,113評論 1 272
  • 我被黑心中介騙來泰國打工简烘, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人定枷。 一個月前我還...
    沈念sama閱讀 48,332評論 3 373
  • 正文 我出身青樓孤澎,卻偏偏與公主長得像,于是被迫代替她去往敵國和親欠窒。 傳聞我的和親對象是個殘疾皇子覆旭,可洞房花燭夜當晚...
    茶點故事閱讀 45,044評論 2 355

推薦閱讀更多精彩內容