前言
大家好职员,這幾天都在思考一個問題:像一些特定場景需要觸發(fā)一些動作麻蹋,如何做到代碼的解耦,而不是顯式的調(diào)用焊切,這樣我想起了一句話:計(jì)算機(jī)科學(xué)領(lǐng)域的任何問題,都可以通過添加一個中間層來解決扮授。這里通過查閱相關(guān)的設(shè)計(jì)模式,發(fā)現(xiàn)觀察者模式很好的解決了該問題专肪。
首先先看看觀察者模式的定義:定義對象間的一種一對多的依賴關(guān)系刹勃,當(dāng)一個對象的狀態(tài)發(fā)生改變時,所有依賴于它的對象都得到通知并被自動更新嚎尤。
剛剛說了荔仁,計(jì)算機(jī)的問題都可以通過一個中間層解決,Spring的觀察者模式是如何通過中間層解決的芽死,這里給出案:applicationEventMulticaster(事件廣播器)乏梁。
spring的觀察者模式的實(shí)現(xiàn)是使用事件驅(qū)動模型來實(shí)現(xiàn)的:ApplicationEventPublisher發(fā)布事件給中間層applicationEventMulticaster,由其來通過事件類型ApplicationEvent的判斷來選擇ApplicationListener关贵,并以廣播的形式(for循環(huán))來通知(調(diào)用)真正的ApplicationListener實(shí)現(xiàn)的具體方法遇骑。
先來看看Spring觀察者模式的類圖關(guān)系:
可以看到這類圖中包含了四個重要的角色:
事件的發(fā)布者(ApplicationEventPublisher):這個為被觀察者對象,其void publishEvent(ApplicationEvent event)方法將事件發(fā)布出去揖曾;在Spring-boot中其實(shí)現(xiàn)為上下文:AnnotationConfigEmbeddedWebApplicationContext(基于注解使用的上下文)落萎。
事件類型(ApplicationEvent):事件類型為事件發(fā)布者和事件監(jiān)聽者的信息傳輸介質(zhì),使用者繼承該類炭剪,定制目標(biāo)監(jiān)聽器能識別的信息练链。
事件廣播器(ApplicationEventMulticaster):該類是整個觀察者的核心,接收發(fā)布者的事件推送并選擇適當(dāng)?shù)氖录O(jiān)聽器進(jìn)行事件的準(zhǔn)確分發(fā)念祭。
事件監(jiān)聽器(ApplicationListener):該組件為觀察者對象兑宇,由用戶自行實(shí)現(xiàn)其void onApplicationEvent(E event)方法,定制業(yè)務(wù)邏輯粱坤。
簡單使用
接下來簡單的講下 如何使用(基于Springboot的web應(yīng)用):
定義事件類型
package com.observer.message;
import org.springframework.context.ApplicationEvent;
public class MessageEvent extends ApplicationEvent {
public MessageEvent(Object source) {
super(source);
}
}
定義事件監(jiān)聽器
@Component
@Slf4j
public class MessageListener implements ApplicationListener<MessageEvent> {
@Override
public void onApplicationEvent(MessageEvent event) {
log.info("receive message!!! source = " + event.getSource());
}
}
事件發(fā)布
@Component
@Slf4j
public class MessageComponent implements InitializingBean {
/**
* 直接注入該事件發(fā)布器
*/
@Autowired
private ApplicationEventPublisher applicationEventPublisher;
public void sendMessage() {
log.info("send message");
//創(chuàng)建事件對象
MessageEvent messageEvent = new MessageEvent(new Integer(666));
applicationEventPublisher.publishEvent(messageEvent);
}
/**
* 實(shí)現(xiàn)InitializingBean的方法隶糕,在bean初始化后調(diào)用事件發(fā)布方法
*/
@Override
public void afterPropertiesSet() {
sendMessage();
}
}
啟動項(xiàng)目,則會在控制臺收到消息:
[ restartedMain] com.observer.message.MessageComponent : send message
[ restartedMain] com.observer.message.MessageListener : receive message!!! source = 666
可以看到其調(diào)用是在線程名為restartedMain執(zhí)行的(這里引入了spring-boot-devtools工具站玄,所以線程名為restartedMain枚驻,否則為main線程),可以看到已生效株旷。
實(shí)現(xiàn)原理
對該代碼MessageComponent :applicationEventPublisher.publishEvent(messageEvent)打斷點(diǎn)追蹤代碼
其調(diào)用核心為AbstractApplicationContext里的protected void publishEvent(Object event, ResolvableType eventType):
/**
* Publish the given event to all listeners.
* @param event the event to publish (may be an {@link ApplicationEvent}
* or a payload object to be turned into a {@link PayloadApplicationEvent})
* @param eventType the resolved event type, if known
* @since 4.2
*/
protected void publishEvent(Object event, ResolvableType eventType) {
Assert.notNull(event, "Event must not be null");
if (logger.isTraceEnabled()) {
logger.trace("Publishing event in " + getDisplayName() + ": " + event);
}
// Decorate event as an ApplicationEvent if necessary
//對事件進(jìn)行裝飾再登,若事件為ApplicationEvent類型尔邓,則將其轉(zhuǎn)換為ApplicationEvent,若為特殊對象锉矢,則使用PayloadApplicationEvent對其進(jìn)行包裝梯嗽,轉(zhuǎn)換為ApplicationEvent,并獲取其事件類型
ApplicationEvent applicationEvent;
if (event instanceof ApplicationEvent) {
applicationEvent = (ApplicationEvent) event;
}
else {
applicationEvent = new PayloadApplicationEvent<Object>(this, event);
if (eventType == null) {
eventType = ((PayloadApplicationEvent) applicationEvent).getResolvableType();
}
}
// Multicast right now if possible - or lazily once the multicaster is initialized
if (this.earlyApplicationEvents != null) {
this.earlyApplicationEvents.add(applicationEvent);
}
else {
getApplicationEventMulticaster().multicastEvent(applicationEvent, eventType);
}
// Publish event via parent context as well...
if (this.parent != null) {
if (this.parent instanceof AbstractApplicationContext) {
((AbstractApplicationContext) this.parent).publishEvent(event, eventType);
}
else {
this.parent.publishEvent(event);
}
}
}
核心代碼為:getApplicationEventMulticaster().multicastEvent(applicationEvent, eventType);
@Override
public void multicastEvent(final ApplicationEvent event, ResolvableType eventType) {
ResolvableType type = (eventType != null ? eventType : resolveDefaultEventType(event));
for (final ApplicationListener<?> listener : getApplicationListeners(event, type)) {
Executor executor = getTaskExecutor();
if (executor != null) {
executor.execute(new Runnable() {
@Override
public void run() {
invokeListener(listener, event);
}
});
}
else {
invokeListener(listener, event);
}
}
}
該實(shí)現(xiàn)為SimpleApplicationEventMulticaster沽损,首先獲取事件的類型灯节,并獲取其適合的監(jiān)聽器(getApplicationListeners),若該ApplicationEventMulticaster(自定義)中配置了線程池绵估,則使用線程池調(diào)用(異步調(diào)用)炎疆,若無,則做同步調(diào)用国裳。
protected void invokeListener(ApplicationListener<?> listener, ApplicationEvent event) {
ErrorHandler errorHandler = getErrorHandler();
if (errorHandler != null) {
try {
doInvokeListener(listener, event);
}
catch (Throwable err) {
errorHandler.handleError(err);
}
}
else {
doInvokeListener(listener, event);
}
}
@SuppressWarnings({"unchecked", "rawtypes"})
private void doInvokeListener(ApplicationListener listener, ApplicationEvent event) {
try {
//監(jiān)聽器實(shí)現(xiàn)的方法
listener.onApplicationEvent(event);
}
catch (ClassCastException ex) {
String msg = ex.getMessage();
if (msg == null || matchesClassCastMessage(msg, event.getClass().getName())) {
// Possibly a lambda-defined listener which we could not resolve the generic event type for
// -> let's suppress the exception and just log a debug message.
Log logger = LogFactory.getLog(getClass());
if (logger.isDebugEnabled()) {
logger.debug("Non-matching event type for listener: " + listener, ex);
}
}
else {
throw ex;
}
}
}
可以看到形入,最終調(diào)用則為listener.onApplicationEvent(event),該方法需要我們自己實(shí)現(xiàn)缝左。
其調(diào)用棧為:
其中涉及了事件發(fā)布亿遂,事件的處理,以及事件監(jiān)聽三步流程盒使。
疑點(diǎn)一 ApplicationEventMulticaster如何初始化以及其如何加載ApplicatonListener
核心方法為AbstractApplicationContext的refresh
@Override
public void refresh() throws BeansException, IllegalStateException {
synchronized (this.startupShutdownMonitor) {
......
// Initialize event multicaster for this context.
initApplicationEventMulticaster();
// Check for listener beans and register them.
registerListeners();
.......
}
......
}
}
這里省略了大部分的代碼崩掘,我們只關(guān)注核心的兩個方法:
protected void initApplicationEventMulticaster() {
ConfigurableListableBeanFactory beanFactory = getBeanFactory();
//public static final String APPLICATION_EVENT_MULTICASTER_BEAN_NAME = "applicationEventMulticaster";
if (beanFactory.containsLocalBean(APPLICATION_EVENT_MULTICASTER_BEAN_NAME)) {
this.applicationEventMulticaster =
beanFactory.getBean(APPLICATION_EVENT_MULTICASTER_BEAN_NAME, ApplicationEventMulticaster.class);
if (logger.isDebugEnabled()) {
logger.debug("Using ApplicationEventMulticaster [" + this.applicationEventMulticaster + "]");
}
}
else {
this.applicationEventMulticaster = new SimpleApplicationEventMulticaster(beanFactory);
beanFactory.registerSingleton(APPLICATION_EVENT_MULTICASTER_BEAN_NAME, this.applicationEventMulticaster);
if (logger.isDebugEnabled()) {
logger.debug("Unable to locate ApplicationEventMulticaster with name '" +
APPLICATION_EVENT_MULTICASTER_BEAN_NAME +
"': using default [" + this.applicationEventMulticaster + "]");
}
}
}
可以看到,若在啟動類中我們有自己創(chuàng)建名字為applicationEventMulticaster類型為ApplicationEventMulticaster.class的bean則將其塞入applicationContext中少办,若沒有苞慢,則實(shí)例化一個SimpleApplicationEventMulticaster做為applicationEventMulticaster。
/**
* Add beans that implement ApplicationListener as listeners.
* Doesn't affect other listeners, which can be added without being beans.
*/
protected void registerListeners() {
// Register statically specified listeners first.
// 把提前存儲好的監(jiān)聽器添加到監(jiān)聽器容器中
for (ApplicationListener<?> listener : getApplicationListeners()) {
getApplicationEventMulticaster().addApplicationListener(listener);
}
// Do not initialize FactoryBeans here: We need to leave all regular beans
// uninitialized to let post-processors apply to them!
//獲取類型是ApplicationListener的beanName集合英妓,此處不會去實(shí)例化bean
String[] listenerBeanNames = getBeanNamesForType(ApplicationListener.class, true, false);
for (String listenerBeanName : listenerBeanNames) {
getApplicationEventMulticaster().addApplicationListenerBean(listenerBeanName);
}
// Publish early application events now that we finally have a multicaster...
// 如果存在earlyEventsToProcess挽放,提前處理這些事件
Set<ApplicationEvent> earlyEventsToProcess = this.earlyApplicationEvents;
this.earlyApplicationEvents = null;
if (earlyEventsToProcess != null) {
for (ApplicationEvent earlyEvent : earlyEventsToProcess) {
getApplicationEventMulticaster().multicastEvent(earlyEvent);
}
}
}
這里會將監(jiān)聽器提前注冊進(jìn)來(上圖的15個對象),或者是將Listener的beanName先獲取到蔓纠,后續(xù)在實(shí)際獲取Listener的時候辑畦,會從該Set中通過
ApplicationListener<?> listener = beanFactory.getBean(listenerBeanName, ApplicationListener.class)獲取Listener(例如用戶自己實(shí)現(xiàn)的Listenner-上圖的messageListener)若有需要處理的事件,則會提前處理腿倚。
疑點(diǎn)二 如何對監(jiān)聽器進(jìn)行異步調(diào)用
上邊我們看到纯出,applicationEventMulticaster 在進(jìn)行監(jiān)聽器廣播時,會查看其是否已經(jīng)有線程池屬性taskExecutor敷燎,若不為空暂筝,則使用其進(jìn)行線程池調(diào)用,這里我們可以自己定義applicationEventMulticaster Bean硬贯,并傳入線程池對象焕襟。
@Bean
public TaskExecutor mutiExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setThreadNamePrefix("muti-Executor");
executor.setCorePoolSize(5);
executor.setMaxPoolSize(10);
return executor;
}
@Bean
public SimpleApplicationEventMulticaster applicationEventMulticaster(TaskExecutor mutiExecutor) {
SimpleApplicationEventMulticaster applicationEventMulticaster = new SimpleApplicationEventMulticaster();
applicationEventMulticaster.setTaskExecutor(mutiExecutor);
return applicationEventMulticaster;
}
啟動工程:
2020-01-20 15:21:42.617 INFO 8484 --- [ restartedMain] com.observer.message.MessageComponent : send message
2020-01-20 15:21:42.624 INFO 8484 --- [ muti-Executor3] com.observer.message.MessageListener : receive message!!! source = 666
發(fā)現(xiàn),其已是通過線程池做了異步調(diào)用饭豹。
但是鸵赖,該定義有缺點(diǎn):這會使得所有的事件監(jiān)聽都是使用異步調(diào)用务漩,不夠靈活,是否有一種方式的粒度更細(xì)它褪,只針對想要做異步的監(jiān)聽才走異步調(diào)用饵骨。
Spring中提供了異步調(diào)用,其實(shí)現(xiàn)原理為動態(tài)代理并使用線程池執(zhí)行特定的方法列赎。
使用@EnableAsync @Async兩個注解實(shí)現(xiàn)宏悦,其原理不在這里介紹。
@Bean
public AsyncTaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setThreadNamePrefix("async-Executor");
executor.setCorePoolSize(5);
executor.setMaxPoolSize(10);
// 設(shè)置拒絕策略
executor.setRejectedExecutionHandler(new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
// .....
}
});
// 使用預(yù)定義的異常處理類
// executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
return executor;
}
先定義個異步執(zhí)行的線程池包吝;
@SpringBootApplication
@EnableAsync
public class DemoApplication {
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
}
在工程啟動類中添加@EnableAsync注解
@Component
@Slf4j
public class MessageListener implements ApplicationListener<MessageEvent> {
@Override
@Async
public void onApplicationEvent(MessageEvent event) {
log.info("receive message!!! source = " + event.getSource());
}
}
在需要異步調(diào)用的方法中使用@Async
查看效果:
2020-01-20 15:32:50.102 INFO 4304 --- [ restartedMain] o.s.s.concurrent.ThreadPoolTaskExecutor : Initializing ExecutorService 'taskExecutor'
2020-01-20 15:32:50.107 INFO 4304 --- [async-Executor1] com.observer.message.MessageListener : receive message!!! source = 666
可以看到確實(shí)是使用了自定義的線程池的方式,這種方式比較靈活源葫,需要異步的監(jiān)聽執(zhí)行就在其方法上添加注解诗越。
疑點(diǎn)三 如何讓監(jiān)聽器有一個優(yōu)先級調(diào)用
實(shí)現(xiàn)SmartApplicationListener接口即可:
@Component
@Slf4j
public class MessageListener implements SmartApplicationListener {
@Override
public boolean supportsEventType(Class<? extends ApplicationEvent> eventType) {
return eventType == MessageEvent.class;
}
@Override
public boolean supportsSourceType(Class<?> sourceType) {
return sourceType == Integer.class;
}
@Override
public void onApplicationEvent(ApplicationEvent event) {
log.info("MessageListener receive msg: " + event.getSource());
}
@Override
public int getOrder() {
return 7778;
}
}
@Component
@Slf4j
public class Message2Listener implements SmartApplicationListener {
@Override
public boolean supportsEventType(Class<? extends ApplicationEvent> eventType) {
return eventType == MessageEvent.class;
}
@Override
public boolean supportsSourceType(Class<?> sourceType) {
return sourceType == Integer.class;
}
@Override
public void onApplicationEvent(ApplicationEvent event) {
log.info("Message2Listener receive msg: " + event.getSource());
}
@Override
public int getOrder() {
return 888;
}
}
這里使用兩個Listener來做對比,其接受的ApplicationEvent是MessageEvent息堂,還有其souceType為Integer類型嚷狞。
其中,MessageListener的order為7778荣堰,Message2Listener的order為888床未,
order越小,優(yōu)先級越大振坚,可以預(yù)判薇搁,先觸發(fā)Message2Listener的onApplicationEvent方法。
0-01-20 18:33:23.198 INFO 16724 --- [ restartedMain] com.observer.message.MessageComponent : send message
2020-01-20 18:33:23.202 INFO 16724 --- [ restartedMain] com.observer.message.Message2Listener : Message2Listener receive msg: 666
2020-01-20 18:33:23.202 INFO 16724 --- [ restartedMain] com.observer.message.MessageListener : MessageListener receive msg: 666
事實(shí)也是如此渡八,現(xiàn)在我們看看源碼是如何實(shí)現(xiàn)的啃洋,并調(diào)用其SmartApplicationListener方法的。
讓我們回顧下屎鳍,核心代碼為:
@Override
public void multicastEvent(final ApplicationEvent event, ResolvableType eventType) {
ResolvableType type = (eventType != null ? eventType : resolveDefaultEventType(event));
for (final ApplicationListener<?> listener : getApplicationListeners(event, type)) {
Executor executor = getTaskExecutor();
if (executor != null) {
executor.execute(new Runnable() {
@Override
public void run() {
invokeListener(listener, event);
}
});
}
else {
invokeListener(listener, event);
}
}
}
這邊我們來分析getApplicationListeners(event, type))方法宏娄,因?yàn)樵摲椒▽?shí)際返回的是LinkedList,遍歷迭代器則是按照其先后順序訪問的逮壁,這里則是獲取合適的Listeners和做一個優(yōu)先級排序孵坚。
protected Collection<ApplicationListener<?>> getApplicationListeners(
ApplicationEvent event, ResolvableType eventType) {
Object source = event.getSource();
Class<?> sourceType = (source != null ? source.getClass() : null);
ListenerCacheKey cacheKey = new ListenerCacheKey(eventType, sourceType);
// Quick check for existing entry on ConcurrentHashMap...
ListenerRetriever retriever = this.retrieverCache.get(cacheKey);
if (retriever != null) {
return retriever.getApplicationListeners();
}
if (this.beanClassLoader == null ||
(ClassUtils.isCacheSafe(event.getClass(), this.beanClassLoader) &&
(sourceType == null || ClassUtils.isCacheSafe(sourceType, this.beanClassLoader)))) {
// Fully synchronized building and caching of a ListenerRetriever
synchronized (this.retrievalMutex) {
retriever = this.retrieverCache.get(cacheKey);
if (retriever != null) {
return retriever.getApplicationListeners();
}
retriever = new ListenerRetriever(true);
Collection<ApplicationListener<?>> listeners =
retrieveApplicationListeners(eventType, sourceType, retriever);
this.retrieverCache.put(cacheKey, retriever);
return listeners;
}
}
else {
// No ListenerRetriever caching -> no synchronization necessary
return retrieveApplicationListeners(eventType, sourceType, null);
}
}
首先先通過eventType和sourceType構(gòu)建一個cacheKey,
ListenerCacheKey cacheKey = new ListenerCacheKey(eventType, sourceType);
final Map<ListenerCacheKey, ListenerRetriever> retrieverCache =
new ConcurrentHashMap<ListenerCacheKey, ListenerRetriever>(64);
然后從retrieverCache中獲取Listener,若不為空窥淆,則直接返回卖宠,若為空,則接著往下走祖乳,核心代碼如下:
Collection<ApplicationListener<?>> listeners =
retrieveApplicationListeners(eventType, sourceType, retriever);
private Collection<ApplicationListener<?>> retrieveApplicationListeners(
ResolvableType eventType, Class<?> sourceType, ListenerRetriever retriever) {
LinkedList<ApplicationListener<?>> allListeners = new LinkedList<ApplicationListener<?>>();
Set<ApplicationListener<?>> listeners;
Set<String> listenerBeans;
synchronized (this.retrievalMutex) {
listeners = new LinkedHashSet<ApplicationListener<?>>(this.defaultRetriever.applicationListeners);
listenerBeans = new LinkedHashSet<String>(this.defaultRetriever.applicationListenerBeans);
}
for (ApplicationListener<?> listener : listeners) {
if (supportsEvent(listener, eventType, sourceType)) {
if (retriever != null) {
retriever.applicationListeners.add(listener);
}
allListeners.add(listener);
}
}
if (!listenerBeans.isEmpty()) {
BeanFactory beanFactory = getBeanFactory();
for (String listenerBeanName : listenerBeans) {
try {
Class<?> listenerType = beanFactory.getType(listenerBeanName);
if (listenerType == null || supportsEvent(listenerType, eventType)) {
ApplicationListener<?> listener =
beanFactory.getBean(listenerBeanName, ApplicationListener.class);
if (!allListeners.contains(listener) && supportsEvent(listener, eventType, sourceType)) {
if (retriever != null) {
retriever.applicationListenerBeans.add(listenerBeanName);
}
allListeners.add(listener);
}
}
}
catch (NoSuchBeanDefinitionException ex) {
// Singleton listener instance (without backing bean definition) disappeared -
// probably in the middle of the destruction phase
}
}
}
AnnotationAwareOrderComparator.sort(allListeners);
return allListeners;
}
第一個for循環(huán)則是判斷第一次在refresh中獲取的listener,判斷其是否滿足eventType和sourceType的要求:
protected boolean supportsEvent(ApplicationListener<?> listener, ResolvableType eventType, Class<?> sourceType) {
GenericApplicationListener smartListener = (listener instanceof GenericApplicationListener ?
(GenericApplicationListener) listener : new GenericApplicationListenerAdapter(listener));
return (smartListener.supportsEventType(eventType) && smartListener.supportsSourceType(sourceType));
}
由于我們的Listener是SmartApplicationListener逗堵,則需要使用GenericApplicationListenerAdapter適配器(適配器模式)進(jìn)行封裝,并調(diào)用相應(yīng)的方法眷昆,
最終會調(diào)用SmartApplicationListener的supportsSourceType蜒秤、supportsEventType方法汁咏,即為我們自己實(shí)現(xiàn)的方法。
if (!listenerBeans.isEmpty()) 該判斷條件則是之前未實(shí)例化的Listener作媚,若不為空攘滩,則需要一樣的判斷其是否滿足要求并將其放入allListeners中。
最終纸泡,會調(diào)用AnnotationAwareOrderComparator.sort(allListeners)該方法漂问,
該方法即為實(shí)現(xiàn)次序的方法,若Listener未能實(shí)現(xiàn)Ordered接口女揭,則其優(yōu)先級則為最低蚤假,否則按照其getOrder()返回的int類型進(jìn)行排序,數(shù)字越小則優(yōu)先級越高吧兔,則返回的list中遍歷的時候越先執(zhí)行磷仰。
到此,Spring的觀察者模式就先介紹到這里境蔼。