最近看Elastic-Job源碼,看到它里面實現(xiàn)的任務(wù)運行軌跡的持久化关划,使用的是Guava的AsyncEventBus熄求,一個內(nèi)存級別的異步事件總線服務(wù)毕莱,實現(xiàn)了簡單的生產(chǎn)-消費者模式片任,從而在不影響任務(wù)執(zhí)行效率的基礎(chǔ)上荣德,將任務(wù)執(zhí)行和任務(wù)軌跡記錄解耦掏导,大大提高了EJ的性能嚎朽。
EventBus在Elastic-Job中的使用
EventBus的使用方法不難吧黄,具體可以參考EJ里面幾個相關(guān)的類:JobEventListener、JobEventBus和LiteJobFacade产场。主要的流程如下:
- JobEventListener主要是消費者鹅髓。定義需要監(jiān)聽的方法,目前主要定義了兩個listen方法京景,注意想監(jiān)聽到的話窿冯,需要在方法前加上注解:@Subscribe和@AllowConcurrentEvents∪丰悖看字面意思就是訂閱和允許并發(fā)事件醒串。如果不加上后面那個注解,則會導(dǎo)致效率問題鄙皇,這個咱們后續(xù)分析芜赌。目前這個接口只有一個實現(xiàn)類JobEventRdbListener,實現(xiàn)了日志寫入DB的操作伴逸。
- JobEventBus參考的EventBus源碼缠沈,提供了register和post方法,去掉了unregister方法违柏。主要的功能就是注冊監(jiān)聽器和生產(chǎn)消息博烂。他的構(gòu)造方法中,默認使用的是Guava的AsyncEventBus漱竖,初始化中同時包含了注冊動作。
- LiteJobFacade主要是JobEventBus的使用者畜伐。主要調(diào)用的是JobEventBus的post方法馍惹。
@Override
public void postJobExecutionEvent(final JobExecutionEvent jobExecutionEvent) {
jobEventBus.post(jobExecutionEvent);
}
@Override
public void postJobStatusTraceEvent(final String taskId, final State state, final String message) {
TaskContext taskContext = TaskContext.from(taskId);
jobEventBus.post(new JobStatusTraceEvent(taskContext.getMetaInfo().getJobName(), taskContext.getId(),
taskContext.getSlaveId(), Source.LITE_EXECUTOR, taskContext.getType(), taskContext.getMetaInfo().getShardingItems().toString(), state, message));
if (!Strings.isNullOrEmpty(message)) {
log.trace(message);
}
}
EventBus源碼分析
言歸正傳,我們來看看EventBus到底是如何實現(xiàn)觀察者模式的玛界。他的主要實現(xiàn)類都在com.google.common.eventbus這個包下面万矾。
主要類概念分析
我們首先來看一下里面比較重要的幾個類,同時理解一些概念慎框。
- EventBus:這個類的作用有兩個良狈,一個是作為一個總線通道,另一個作用是消息的廣播笨枯。
- AsyncEventBus:異步的EventBus薪丁,功能與EventBus類似遇西,只不過實現(xiàn)方式有所差異。
- Subscriber:可以按照字面理解是訂閱者严嗜,也可以說是監(jiān)聽器粱檀。
- SubscriberRegistry:訂閱注冊表。主要存儲的是Subcriber和Event之間的關(guān)系漫玄,用于消息分發(fā)時可以迅速根據(jù)Event的類型找到Subscriber茄蚯。
- Dispatcher:事件分發(fā)器,定義了一些分發(fā)的策略睦优,里面包含三種分發(fā)器渗常。
- 兩個重要的注解@Subscribe和@AllowConcurrentEvents。第一個是標(biāo)識監(jiān)聽器的方法汗盘,第二個與第一個配合使用凳谦,標(biāo)識允許多線程執(zhí)行。
- DeadEvent:死信對象衡未,標(biāo)識沒有訂閱者關(guān)注的事件尸执。
- SubscribeExceptionHandler:訂閱者拋出異常的處理器。SubscribeExceptionContext:訂閱者拋出異常的上下文對象缓醋。
EventBus
這個類有幾個屬性:
private final String identifier;//唯一標(biāo)識如失,默認為default
private final Executor executor;//多線程處理器,默認MoreExecutors.directExecutor()
private final SubscriberExceptionHandler exceptionHandler;//異常處理器
private final SubscriberRegistry subscribers = new SubscriberRegistry(this);//訂閱注冊表
private final Dispatcher dispatcher;//消息分發(fā)器送粱,默認為Dispatcher.perThreadDispatchQueue()褪贵,單線程消息分發(fā)隊列
其中,identifier表示抗俄,同一個應(yīng)用中脆丁,可以根據(jù)identifier來區(qū)分不同的事件總線,只不過默認為default而已动雹。
EventBus主要定義了幾個方法:
注冊
public void register(Object object) {
subscribers.register(object);
}
注冊的是自己定義的監(jiān)聽器槽卫,也就是listener。
取消注冊
public void unregister(Object object) {
subscribers.unregister(object);
}
類似于注冊胰蝠。
消息廣播
public void post(Object event) {
Iterator<Subscriber> eventSubscribers = subscribers.getSubscribers(event);
if (eventSubscribers.hasNext()) {
dispatcher.dispatch(event, eventSubscribers);
} else if (!(event instanceof DeadEvent)) {
// the event had no subscribers and was not itself a DeadEvent
post(new DeadEvent(this, event));
}
}
這塊主要是根據(jù)event事件類型歼培,來獲取事件的訂閱者,然后進行事件消息的分發(fā)茸塞。當(dāng)然躲庄,如果沒有訂閱者,也就是event的類型是DeadEvent钾虐,也會進行對應(yīng)的處理噪窘。
AsyncEventBus
繼承自EventBus,主要區(qū)別在于分發(fā)器效扫,使用的是Dispatcher.legacyAsync()倔监。這個后續(xù)咱們再分析直砂。
Subscriber
乍看這個類,就是訂閱者丐枉,其實我們看源碼就能理解哆键,當(dāng)一個訂閱類的多個方法用@Subscribe注解時,每個被注解的方法對應(yīng)的是一個訂閱者瘦锹。
構(gòu)造
這個類只是package內(nèi)可見籍嘹,沒有定義為public,可以通過靜態(tài)方法create來創(chuàng)建它弯院。
static Subscriber create(EventBus bus, Object listener, Method method) {
return isDeclaredThreadSafe(method)
? new Subscriber(bus, listener, method)
: new SynchronizedSubscriber(bus, listener, method);
}
這里傳入的method就是使用了@Subscribe注解的方法辱士,這塊會先判斷這個方法是否線程安全,即是否使用@AllowConcurrentEvent來進行注解听绳,來創(chuàng)建不同的Subscriber颂碘。唯一的差別是SynchronizedSubscriber中一個方法使用了synchronized來修飾。
dispatchEvent
final void dispatchEvent(final Object event) {
executor.execute(
new Runnable() {
@Override
public void run() {
try {
invokeSubscriberMethod(event);
} catch (InvocationTargetException e) {
bus.handleSubscriberException(e.getCause(), context(event));
}
}
});
}
調(diào)用多線程來處理event椅挣。
invokeSubscriberMethod
@VisibleForTesting
void invokeSubscriberMethod(Object event) throws InvocationTargetException {
try {
method.invoke(target, checkNotNull(event));
} catch (IllegalArgumentException e) {
throw new Error("Method rejected target/argument: " + event, e);
} catch (IllegalAccessException e) {
throw new Error("Method became inaccessible: " + event, e);
} catch (InvocationTargetException e) {
if (e.getCause() instanceof Error) {
throw (Error) e.getCause();
}
throw e;
}
}
調(diào)用訂閱者的方法头岔。
SubscriberRegistry
我們之前在講到EventBus時,里面有兩個方法register和unregister鼠证,調(diào)用的就是這個類的方法峡竣。這個類的作用也講到,是存儲event和對應(yīng)的訂閱者的關(guān)系的量九。我們來看一下這個類的設(shè)計适掰。
屬性
private final ConcurrentMap<Class<?>, CopyOnWriteArraySet<Subscriber>> subscribers =
Maps.newConcurrentMap();
@Weak private final EventBus bus;
這個類有兩個屬性。
- 第一個是ConcurrentMap荠列,他的鍵是Class類类浪,也就是Event的類型,值是CopyOnWriteArraySet<Subscriber>肌似,也就是訂閱者费就。這個ConcurrentMap是Guava定義的并發(fā)Map,這個后續(xù)咱們有機會再分析锈嫩。
- 第二個屬性就是EventBus受楼。
register
注冊監(jiān)聽器。
void register(Object listener) {
Multimap<Class<?>, Subscriber> listenerMethods = findAllSubscribers(listener);
for (Map.Entry<Class<?>, Collection<Subscriber>> entry : listenerMethods.asMap().entrySet()) {
Class<?> eventType = entry.getKey();
Collection<Subscriber> eventMethodsInListener = entry.getValue();
CopyOnWriteArraySet<Subscriber> eventSubscribers = subscribers.get(eventType);
if (eventSubscribers == null) {
CopyOnWriteArraySet<Subscriber> newSet = new CopyOnWriteArraySet<Subscriber>();
eventSubscribers =
MoreObjects.firstNonNull(subscribers.putIfAbsent(eventType, newSet), newSet);
}
eventSubscribers.addAll(eventMethodsInListener);
}
}
主要的邏輯是:
- 獲取這個類中所有用@Subscribe注解的方法呼寸,存儲到Multimap中。
- 遍歷Multimap猴贰,鍵為eventType对雪,然后根據(jù)這個鍵,從緩存中獲取這個事件對應(yīng)的訂閱者集合米绕。
- 獲取到之后瑟捣,判斷集合是否為空馋艺,如果為空,新建一個集合來存儲迈套。
unregister
實現(xiàn)與register類似捐祠,先根據(jù)listener找到subscriber,找到需要監(jiān)聽的方法桑李,然后根據(jù)事件類型去移除subscriber踱蛀。
findAllSubscribers
獲取監(jiān)聽器中所有的監(jiān)聽方法。
private Multimap<Class<?>, Subscriber> findAllSubscribers(Object listener) {
Multimap<Class<?>, Subscriber> methodsInListener = HashMultimap.create();
Class<?> clazz = listener.getClass();
for (Method method : getAnnotatedMethods(clazz)) {
Class<?>[] parameterTypes = method.getParameterTypes();
Class<?> eventType = parameterTypes[0];
methodsInListener.put(eventType, Subscriber.create(bus, listener, method));
}
return methodsInListener;
}
findAllSubscribers用于查找事件類型以及事件處理器的對應(yīng)關(guān)系贵白。查找注解需要涉及到反射率拒,通過反射來獲取標(biāo)注在方法上的注解。因為Guava針對EventBus的注冊采取的是“隱式契約”而非接口這種“顯式契約”禁荒。而類與接口是存在繼承關(guān)系的猬膨,所有很有可能某個訂閱者其父類(或者父類實現(xiàn)的某個接口)也訂閱了某個事件。因此這里的查找需要順著繼承鏈向上查找父類的方法是否也被注解標(biāo)注呛伴。
getSubscribes
獲取event的訂閱者勃痴。
Iterator<Subscriber> getSubscribers(Object event) {
ImmutableSet<Class<?>> eventTypes = flattenHierarchy(event.getClass());
List<Iterator<Subscriber>> subscriberIterators =
Lists.newArrayListWithCapacity(eventTypes.size());
for (Class<?> eventType : eventTypes) {
CopyOnWriteArraySet<Subscriber> eventSubscribers = subscribers.get(eventType);
if (eventSubscribers != null) {
// eager no-copy snapshot
subscriberIterators.add(eventSubscribers.iterator());
}
}
return Iterators.concat(subscriberIterators.iterator());
}
Dispatcher
分發(fā)器,用于將event分發(fā)給subscriber热康。它內(nèi)部實現(xiàn)了三種不同類型的分發(fā)器沛申,用于不同的情況下事件的順序性。它的核心方法是:
abstract void dispatch(Object event, Iterator<Subscriber> subscribers);
它的三種實現(xiàn):
PerThreadQueuedDispatcher
EventBus默認使用的分發(fā)器褐隆。它的實現(xiàn)是通過ThreadLocal來實現(xiàn)一個事件隊列污它,每個線程包含一個這樣的內(nèi)部隊列。
它的分發(fā)代碼如下:
void dispatch(Object event, Iterator<Subscriber> subscribers) {
checkNotNull(event);
checkNotNull(subscribers);
Queue<Event> queueForThread = queue.get();
queueForThread.offer(new Event(event, subscribers));
if (!dispatching.get()) {
dispatching.set(true);
try {
Event nextEvent;
while ((nextEvent = queueForThread.poll()) != null) {
while (nextEvent.subscribers.hasNext()) {
nextEvent.subscribers.next().dispatchEvent(nextEvent.event);
}
}
} finally {
dispatching.remove();
queue.remove();
}
}
}
嵌套兩層循環(huán)庶弃,第一層事件不為空衫贬,第二層該事件下的訂閱者不為空,則分發(fā)事件下去歇攻。
LegacyAsyncDispatcher
AsyncEventBus使用的分發(fā)器固惯。它在內(nèi)部通過一個ConcurrentLinkedQueue<EventWithSubscriber>的全局隊列來存儲事件。他和PerThreadQueuedDispatcher的主要區(qū)別在于分發(fā)循環(huán)這塊缴守。
void dispatch(Object event, Iterator<Subscriber> subscribers) {
checkNotNull(event);
while (subscribers.hasNext()) {
queue.add(new EventWithSubscriber(event, subscribers.next()));
}
EventWithSubscriber e;
while ((e = queue.poll()) != null) {
e.subscriber.dispatchEvent(e.event);
}
}
是一前一后兩個循環(huán)葬毫。前面一個是遍歷事件訂閱處理器,并構(gòu)建一個事件實體對象存入隊列屡穗。后一個循環(huán)是遍歷該事件實體對象隊列贴捡,取出事件實體對象中的事件進行分發(fā)。
ImmediateDispatcher
同步分發(fā)器村砂。
void dispatch(Object event, Iterator<Subscriber> subscribers) {
checkNotNull(event);
while (subscribers.hasNext()) {
subscribers.next().dispatchEvent(event);
}
}
總結(jié)
Elastic-Job使用的EventBus烂斋,可以說很好的對任務(wù)的運行和軌跡記錄進行了解耦,借鑒了Guava的思想,將代碼優(yōu)雅發(fā)揮到了新的境界汛骂。當(dāng)然罕模,Guava對EventBus的設(shè)計思想是我們需要進行學(xué)習(xí)和使用的。