1. 使用方式
- 引入依賴
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>27.0.1-jre</version>
</dependency>
- 定義被觀察者類
由該類觸發(fā)事件通知:
public class TestBus {
/**
* EventBus扶欣,默認使用PerThreadQueuedDispatcher分發(fā)器(該分發(fā)器內部維護的Executor是執(zhí)行執(zhí)行線程run方法飒箭,即使用主線程執(zhí)行監(jiān)聽方法)。
* 該分發(fā)器是每個線程內部維護了一個queue波势。
* 每個線程互不干擾(都利于本身線程去串行的執(zhí)行觀察者的方法)
*
*/
public static void testPerThreadQueuedDispatcher(){
EventBus eventBus = new EventBus();
//觀察者1
DataObserver1 observer1 = new DataObserver1();
//觀察者2
DataObserver2 observer2 = new DataObserver2();
eventBus.register(observer2);
eventBus.register(observer1);
Thread t1 = new Thread(() -> {
eventBus.post("信息1率挣;");
eventBus.post("信息5清寇;");
});
Thread t2 = new Thread(() -> {
eventBus.post("信息2;");
});
Thread t3 = new Thread(() -> {
eventBus.post(123);
});
t1.start();
t2.start();
try {
Thread.sleep(20);
} catch (InterruptedException e) {
e.printStackTrace();
}
t3.start();
}
}
- 定義多個觀察者
@Slf4j
public class DataObserver2 {
/**
* post() 不支持自動裝箱功能,只能使用Integer,不能使用int,否則handlersByType的Class會是int而不是Intege
* 而傳入的int msg參數(shù)在post(int msg)的時候會被包裝成Integer,導致無法匹配到
*/
@Subscribe
public void func(Integer msg) {
log.info("Integer msg: " + msg);
}
}
@Slf4j
public class DataObserver1 {
/**
* 只有通過@Subscribe注解的方法才會被注冊進EventBus
* 而且方法有且只能有1個參數(shù)
*
* @param msg
*/
@Subscribe
// @AllowConcurrentEvents
public void func(String msg) throws InterruptedException {
log.info("消息開始~:" + msg);
Thread.sleep(2000);
log.info("消息結束~:" + msg);
}
}
使用原理:觀察者對象注冊到EventBus中做葵,而EventBus會通過反射解析觀察者及其父類對象是否存在@Subscribe
注解占哟,若是存在,則維護一個Map(key是對應方法的參數(shù)類型酿矢,value是Subscriber
對象)榨乎。
當被觀察者通過post()
方法發(fā)送事件后,會解析事件的類型瘫筐,找打對應的Subscriber
(消費者對象)蜜暑。然后循環(huán)通過反射調用對應的觀察者方法。完成事件通知策肝。
2. EventBus源碼分析
事件總線的配置:
@Beta
public class EventBus {
private static final Logger logger = Logger.getLogger(EventBus.class.getName());
//id標識符
private final String identifier;
//發(fā)送事件的線程池
private final Executor executor;
//訂閱者異常處理器
private final SubscriberExceptionHandler exceptionHandler;
//訂閱者解析器
private final SubscriberRegistry subscribers = new SubscriberRegistry(this);
//分發(fā)器策略
private final Dispatcher dispatcher;
...
}
查看其構造方法:
public class EventBus {
...
/** Creates a new EventBus named "default". */
public EventBus() {
this("default");
}
public EventBus(String identifier) {
this(
identifier,
MoreExecutors.directExecutor(),
Dispatcher.perThreadDispatchQueue(),
LoggingHandler.INSTANCE);
}
public EventBus(SubscriberExceptionHandler exceptionHandler) {
this(
"default",
MoreExecutors.directExecutor(),
Dispatcher.perThreadDispatchQueue(),
exceptionHandler);
}
EventBus(
String identifier,
Executor executor,
Dispatcher dispatcher,
SubscriberExceptionHandler exceptionHandler) {
this.identifier = checkNotNull(identifier);
this.executor = checkNotNull(executor);
this.dispatcher = checkNotNull(dispatcher);
this.exceptionHandler = checkNotNull(exceptionHandler);
}
}
可以看到肛捍,EventBus對外暴露的構造方法隐绵,只能去修改identifier
和exceptionHandler
兩個參數(shù)。
- 發(fā)送事件的線程池
executor
使用的是MoreExecutors.directExecutor()
拙毫; - 消息的轉發(fā)器
dispatcher
使用的是Dispatcher.perThreadDispatchQueue()
氢橙;
executor
和dispatcher
兩個參數(shù)決定了什么呢?
public class EventBus {
public void post(Object event) {
//通過事件恬偷,找到所有的訂閱者悍手。
Iterator<Subscriber> eventSubscribers = subscribers.getSubscribers(event);
//存在訂閱者
if (eventSubscribers.hasNext()) {
//使用dispatcher去分發(fā)消息
dispatcher.dispatch(event, eventSubscribers);
} else if (!(event instanceof DeadEvent)) {
// the event had no subscribers and was not itself a DeadEvent
post(new DeadEvent(this, event));
}
}
}
默認:Dispatcher.perThreadDispatchQueue()的作用:
每一個線程內部都有一個queue,從而保證單線程中消息的有序性袍患。
private static final class PerThreadQueuedDispatcher extends Dispatcher {
// This dispatcher matches the original dispatch behavior of EventBus.
/** Per-thread queue of events to dispatch. */
private final ThreadLocal<Queue<Event>> queue =
new ThreadLocal<Queue<Event>>() {
@Override
protected Queue<Event> initialValue() {
return Queues.newArrayDeque();
}
};
/** Per-thread dispatch state, used to avoid reentrant event dispatching. */
private final ThreadLocal<Boolean> dispatching =
new ThreadLocal<Boolean>() {
@Override
protected Boolean initialValue() {
return false;
}
};
@Override
void dispatch(Object event, Iterator<Subscriber> subscribers) {
checkNotNull(event);
checkNotNull(subscribers);
Queue<Event> queueForThread = queue.get();
//放入隊尾【1】重入的線程事件會放入到隊列尾部
queueForThread.offer(new Event(event, subscribers));
//【1】線程再次重入后坦康,該方法!dispatching.get()為false,直接結束
if (!dispatching.get()) {
dispatching.set(true);
try {
Event nextEvent;
//檢索并刪除此隊列的頭诡延,如果此隊列為空滞欠,則返回 null 。
while ((nextEvent = queueForThread.poll()) != null) {
//第一個事件通知給所有的訂閱者肆良,才會通知后續(xù)的消息筛璧。
while (nextEvent.subscribers.hasNext()) {
//當訂閱者中再次使用同一個EventBus發(fā)布消息,線程會沖入【1】
nextEvent.subscribers.next().dispatchEvent(nextEvent.event);
}
}
} finally {
//單線程
dispatching.remove();
queue.remove();
}
}
}
//構建事件對象(隊列的元素)
private static final class Event {
private final Object event;
private final Iterator<Subscriber> subscribers;
private Event(Object event, Iterator<Subscriber> subscribers) {
this.event = event;
this.subscribers = subscribers;
}
}
}
使用場景:
代碼A中發(fā)布事件(String類型)惹恃,B訂閱者收到消息后夭谤,在B中發(fā)布事件(Integer類型)。
該分發(fā)器會確保A事件通知給所有訂閱者才會執(zhí)行B事件(同一個線程中巫糙,訂閱者發(fā)布的事件要排到后面去執(zhí)行)朗儒。
默認:MoreExecutors.directExecutor()
class Subscriber {
final void dispatchEvent(final Object event) {
//分發(fā)事件,是使用的線程池
executor.execute(
new Runnable() {
@Override
public void run() {
try {
invokeSubscriberMethod(event);
} catch (InvocationTargetException e) {
bus.handleSubscriberException(e.getCause(), context(event));
}
}
});
}
}
而MoreExecutors.directExecutor()使用如下的線程池参淹,即訂閱者使用當前線程同步的處理事件醉锄。
enum DirectExecutor implements Executor {
INSTANCE;
@Override
public void execute(Runnable command) {
command.run();
}
@Override
public String toString() {
return "MoreExecutors.directExecutor()";
}
}
訂閱者去使用EventBus的線程去消費消息,可以保證消息的有序性浙值。即先post的事件一定會先執(zhí)行恳不。
總結:
EventBus的特點:
- 單個線程上發(fā)布的所有事件都按其發(fā)布的順序被調度到所有訂閱服務器;
- 發(fā)布者和多個訂閱者使用同一個線程處理开呐⊙萄可能會影響發(fā)布者的性能,且某個訂閱者耗時负蚊,也會影響其他訂閱者神妹;
EventBus特點的場景:
public static void testPre1(){
//單例獲取到事件總線
EventBus eventBus = EventBusCenter.getInstance();
DataObserver1 observer1 = new DataObserver1();
DataObserver2 observer2 = new DataObserver2();
//注冊訂閱者1
eventBus.register(observer1);
//注冊訂閱者2
eventBus.register(observer2);
//通知訂閱者1
eventBus.post("發(fā)送事件颓哮!");
}
訂閱者1收到消息后家妆,通知訂閱者2。但是123
事件會存儲在ThreadLocal<Queue>中冕茅,等待發(fā)送事件伤极!
事件通知完所有的訂閱者蛹找,才開始通知123
事件。
@Slf4j
public class DataObserver1 {
/**
* 只有通過@Subscribe注解的方法才會被注冊進EventBus
* 而且方法有且只能有1個參數(shù)
*
* @param msg
*/
@Subscribe
public void func(String msg) throws InterruptedException {
log.info("收到消息:{}", msg);
EventBus eventBus = EventBusCenter.getInstance();
eventBus.post(123);
}
}
@Slf4j
public class DataObserver2 {
/**
* post() 不支持自動裝箱功能,只能使用Integer,不能使用int,否則handlersByType的Class會是int而不是Intege
* 而傳入的int msg參數(shù)在post(int msg)的時候會被包裝成Integer,導致無法匹配到
*/
@Subscribe
public void func(Integer msg) {
log.info("Integer msg: " + msg);
}
}
3. AsyncEventBus源碼分析
構造方法:
public AsyncEventBus(String identifier, Executor executor) {
super(identifier, executor, Dispatcher.legacyAsync(), LoggingHandler.INSTANCE);
}
- 可以傳入
exector
去異步發(fā)布消息哨坪。 - 只能使用
Dispatcher.legacyAsync()
去調度消息庸疾。
private static final class LegacyAsyncDispatcher extends Dispatcher {
// This dispatcher matches the original dispatch behavior of AsyncEventBus.
//
// We can't really make any guarantees about the overall dispatch order for this dispatcher in
// a multithreaded environment for a couple reasons:
//
// 1. Subscribers to events posted on different threads can be interleaved with each other
// freely. (A event on one thread, B event on another could yield any of
// [a1, a2, a3, b1, b2], [a1, b2, a2, a3, b2], [a1, b2, b3, a2, a3], etc.)
// 2. It's possible for subscribers to actually be dispatched to in a different order than they
// were added to the queue. It's easily possible for one thread to take the head of the
// queue, immediately followed by another thread taking the next element in the queue. That
// second thread can then dispatch to the subscriber it took before the first thread does.
//
// All this makes me really wonder if there's any value in queueing here at all. A dispatcher
// that simply loops through the subscribers and dispatches the event to each would actually
// probably provide a stronger order guarantee, though that order would obviously be different
// in some cases.
/** Global event queue. */
//【注意:】若發(fā)布者產生消息的速度遠遠大于生產者消費消息的速度,此處容易造成OOM
private final ConcurrentLinkedQueue<EventWithSubscriber> queue =
Queues.newConcurrentLinkedQueue();
@Override
void dispatch(Object event, Iterator<Subscriber> subscribers) {
checkNotNull(event);
//先放入隊列中
while (subscribers.hasNext()) {
queue.add(new EventWithSubscriber(event, subscribers.next()));
}
EventWithSubscriber e;
//隊頭取出当编,并刪除元素
while ((e = queue.poll()) != null) {
//使用配置線程池去發(fā)布事件届慈。
e.subscriber.dispatchEvent(e.event);
}
}
private static final class EventWithSubscriber {
private final Object event;
private final Subscriber subscriber;
private EventWithSubscriber(Object event, Subscriber subscriber) {
this.event = event;
this.subscriber = subscriber;
}
}
}
- 多線程事件先存儲到
ConcurrentLinkedQueue
中,然后在循環(huán)調用訂閱者忿偷。 - 可以自定義線程池可以異步的去處理事件金顿。
- (訂閱者發(fā)布的事件一定會在隊尾,但是可能會被別的線程先消費)故只能某些情況下可以保證事件按照發(fā)布的順序被調度到訂閱服務器鲤桥;
- 因為使用了
ConcurrentLinkedQueue
揍拆,所以可能會造成OOM。 - 性能沒有
ImmediateDispatcher
好茶凳。(采用了隊列)
源碼中注釋:(多線程下不能保證順序)嫂拴,所有這些讓我真的懷疑在這里排隊是否有任何價值。
LegacyAsyncDispatcher
它只是簡單地循環(huán)通過訂閱者并將事件分派給每個訂閱者贮喧。在某些情況下筒狠,可能會提供更強的順序保證,盡管順序明顯不同箱沦。
同一個線程窟蓝,A發(fā)布事件到訂閱者B,在訂閱者B中再次發(fā)布另一個事件到C饱普。線程會重入到dispatch
方法运挫,會將B發(fā)布的事件放到隊列中(排隊)。繼續(xù)從隊列頭開始消費消息套耕。
【注意:該隊列是全局隊列谁帕,每一個線程都會消費其消息》肱郏】
4. ImmediateDispatcher源碼:
Guava沒有對應的EventBus匈挖,但是我們可以繼承EventBus
類實現(xiàn)自定義的EventBus。
/** Implementation of {@link #immediate()}. */
private static final class ImmediateDispatcher extends Dispatcher {
private static final ImmediateDispatcher INSTANCE = new ImmediateDispatcher();
@Override
void dispatch(Object event, Iterator<Subscriber> subscribers) {
checkNotNull(event);
//收到消息后康愤,直接遍歷所有的訂閱者
while (subscribers.hasNext()) {
//訂閱者可以使用線程池去執(zhí)行
subscribers.next().dispatchEvent(event);
}
}
}
特點:
- 沒有使用隊列儡循,但凡事件到達后立即使用去處理;
- 可以使用線程池異步的去消費消息征冷;
- 性能要比
LegacyAsyncDispatcher
好择膝;
總結
Guava的EventBus源碼還是比較簡單、清晰的检激。從源碼來看肴捉,它一反常用的Observer的設計方式腹侣,放棄采用統(tǒng)一的接口、統(tǒng)一的事件對象類型齿穗。轉而采用基于注解掃描的綁定方式傲隶。
其實無論是強制實現(xiàn)統(tǒng)一的接口,還是基于注解的實現(xiàn)方式都是在構建一種關聯(lián)關系(或者說滿足某種契約)窃页。很明顯接口的方式是編譯層面上強制的顯式契約跺株,而注解的方式則是運行時動態(tài)綁定的隱式契約關系。接口的方式是傳統(tǒng)的方式脖卖,編譯時確定觀察者關系帖鸦,清晰明了,但通常要求有一致的事件類型胚嘲、方法簽名作儿。而基于注解實現(xiàn)的機制,剛好相反馋劈,編譯時因為沒有接口的語法層面上的依賴關系攻锰,顯得不那么清晰,至少靜態(tài)分析工具很難展示觀察者關系妓雾,但無需一致的方法簽名娶吞、事件參數(shù),至于多個訂閱者類之間的繼承關系械姻,可以繼承接收事件的通知妒蛇,可以看作既是其優(yōu)點也是其缺點。
- EventBus需要注意:發(fā)布者和訂閱者使用同一個線程楷拳,可能會影響發(fā)布者的性能绣夺。但可以保證單線程中事件的發(fā)布順序和調度順序保持一致。
- AsyncEventBus需要注意的是:發(fā)布者和訂閱者可以使用不同的線程處理欢揖;發(fā)布事件時維護了一個LinkedQueue陶耍,若訂閱者消費速度慢,可能會造成內存溢出她混;采用全局隊列維護事件順序性烈钞,但不能完全保證調度和發(fā)布的順序;性能不如直接分發(fā)好坤按;
- guava的EventBus雖然通過注解的方式更加靈活毯欣,但是沒有接口的語法層面的依賴關系,代碼維護性臭脓、可讀性不是特別好酗钞。